Pythonで2分探索木の実装 (1)
完全2分木はターゲットせずに、ALDS1_8をターゲットとして考える。
- 二分探索木 挿入| アルゴリズムとデータ構造 | Aizu Online Judge
- 二分探索木 検索 | アルゴリズムとデータ構造 | Aizu Online Judge
- 二分探索木 削除 | アルゴリズムとデータ構造 | Aizu Online Judge
ALDS1_8を基準に実装していくので以下のようになっている。
- ノードの検索には、再帰ではなくWhileを用いる
- ParentもNodeの情報として持つ
- ノードの追加時に再配置について加味しない
削除については別途記事にする。
巡回について
実装上は、どこで print するか。という話なる。
先行順巡回(Preorder Tree Walk)
根節点、左部分木、右部分木の順で節点の番号を出力する。
1 3 7 14 21 35 42 56 70 80 81 86 99
中間順巡回(Inorder Tree Walk)
左部分木、根節点、右部分木の順で節点の番号を出力する。
35 3 1 14 7 21 80 42 70 56 86 81 99
後行順巡回(Postorder Tree Walk)
左部分木、右部分木、根節点の順で節点の番号を出力する。
1 7 21 14 3 56 70 42 81 99 86 80 35
削除以外の実装
ノードの削除以外は特に考えること無く実装していくことが出来ると思う。 入出力は ALDS1_8 に合わせている。 先頭の insert が根(root) になる。という実装になっている。
class Node: def __init__(self, value): self.value = value self.left = None self.right = None self.parent = None # BinarySearchNode class BTS(list): def __init__(self): list.__init__(self) self.root = None def insert(self, value): z = Node(value) y = None x = self.root while x is not None: y = x if z.value < x.value: x = x.left else: x = x.right z.parent = y if y is None: self.root = z elif z.value < y.value: y.left = z else: y.right = z def recursiveFind(self, value): print("find", value) def findBTS(node): if node == None: return None if node.value == value: return node elif node.value > value: return findBTS(node.left) else: return findBTS(node.right) return findBTS(self.root) def find(self, value): x = self.root while x is not None and value != x.value: if value < x.value: x = x.left else: x = x.right return x def preorder(self): def preParse(u): if u == None: return print(" " + str(u.value), end="") preParse(u.left) preParse(u.right) preParse(self.root) print("") def inorder(self): def inParse(u): if u == None: return inParse(u.left) print(" " + str(u.value), end="") inParse(u.right) inParse(self.root) print("") def postorder(self): def postParse(u): if u == None: return postParse(u.left) postParse(u.right) print(" " + str(u.value), end="") postParse(self.root) print("") if __name__ == '__main__': n = int(input()) T = BTS() for _ in range(n): c = input().split(" ") if c[0] == "insert": T.insert(int(c[1])) elif c[0] == "find": z = T.find(int(c[1])) if z is None: print("no") else: print("yes") elif c[0] == "delete": z = T.find(int(c[1])) T.delete(z) else: T.inorder() T.preorder() T.postorder()
入力として以下のような値を渡す。
15 insert 35 insert 3 insert 1 insert 14 insert 7 insert 21 insert 80 insert 42 insert 70 insert 56 insert 86 insert 81 insert 99 print find 50
実行結果は以下のような感じになる。
1 3 7 14 21 35 42 56 70 80 81 86 99 35 3 1 14 7 21 80 42 70 56 86 81 99 1 7 21 14 3 56 70 42 81 99 86 80 35 no
fluentdのqueued_chunks_limit_size利用時の挙動の検証
fluentdのv1.3.3 で queued_chunks_limit_size が設定可能になり、説明に以下のようにあったけどパッと理解できなかったので確認をしました。
- If you set smaller flush_interval, e.g. 1s, there are lots of small queued chunks in buffer. This is not good with file buffer because it consumes lots of fd resources when output destination has a problem. This parameter mitigates such situations.
結果としてはこの機能が入ると、queued 状態の buffer chunk の数を制限することができるようになることがわかりました。
ドキュメントを見ると、デフォルト値がnil (No limit)となっているけど、設定しないと flush_thread_count の値になるのでflush_thread_countを指定している場合は合わせて注意が必要です。
参考:https://github.com/fluent/fluentd/commit/a8d2bbb209c775742f589fe067a0ecac7773ed94#diff-7a3f304568ba6143aaf2b7e78011d044
疑問点
疑問点は、以下のような設定の場合 queued の buffer chunk ファイルは増えないけど staged の buffer chunk が増えて消費するfdの総数はあまり変わらないのでは。と勘違いしたことです。
chunk_limit_size 3g flush_interval 30s flush_mode interval timekey 5m
前提知識
https://docs.fluentd.org/v1.0/articles/buffer-plugin-overview#the-lifecycle-of-chunks
buffer chunkでは、以下のようになファイルができると
- buufer.bxxx になっていれば state が staged 状態
buufer.qxxx になっていれば queued 状態
になっているとわかります。
-rw-r--r-- 1 td-agent td-agent 742 1月 31 01:09 /var/log/fluent/out_s3/development/buffer.b580b6af3642109b9c033a2a85ef6a4a7.log -rw-r--r-- 1 td-agent td-agent 103 1月 31 01:09 /var/log/fluent/out_s3/development/buffer.b580b6af3642109b9c033a2a85ef6a4a7.log.meta -rw-r--r-- 1 td-agent td-agent 736 1月 31 01:09 /var/log/fluent/out_s3/development/buffer.q580b6af1dce897186b95d5927ebb34ea.log -rw-r--r-- 1 td-agent td-agent 100 1月 31 01:10 /var/log/fluent/out_s3/development/buffer.q580b6af1dce897186b95d5927ebb34ea.log.meta
上記のように、buffer.bxxxxx.log と buffer.qxxxx.log というファイル(と.metaファイル)ができます。
実験
buffer chunk が queued の状態と staged の状態でそれぞれfdの消費がどうなっているのか確認します。
fluentd に定常的にデータを送って、bufferにデータが溜まるような状況※を作り出して検証しています。
※存在しないS3のバケット名を指定して、エラーが発生するようにしました。
queued_chunks_limit_sizeが効かない場合
まずは、queued_chunks_limit_size のない td-agent 1.2.2 で確認します。 queuedのファイルが大量にでき、fdを消費してることがわかります。
[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l 2019年 1月 31日 木曜日 00:56:49 UTC 224 [ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l 2019年 1月 31日 木曜日 00:57:23 UTC 256
fdを見ると以下のように消費していっていることがわかります。
[ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do sudo ls /proc/$i/fd | wc -l; done 2019年 1月 31日 木曜日 00:56:59 UTC 14 272 [ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do sudo ls /proc/$i/fd | wc -l; done 2019年 1月 31日 木曜日 00:57:57 UTC 14 336
queued_chunks_limit_sizeが効く場合
td-agent 1.3.3 で確認します。 td-agent 1.2.2のときと違いqueuedのファイルが増えていません。
[ec2-user@localhost ~]$ date; ls -lha [ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l 2019年 1月 31日 木曜日 01:13:41 UTC 12 [ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l 2019年 1月 31日 木曜日 01:17:33 UTC 12
buffer chunck の増え方をみていると flush_interval のタイミング でbuffer chunck が増えずに、timekeyのタイミングでファイルが増えていることがわかります。
[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.b* |wc -l 2019年 1月 31日 木曜日 01:13:45 UTC 50 [ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.b* |wc -l 2019年 1月 31日 木曜日 01:17:01 UTC 82
queuedのファイルの一定数ですが、stagedのファイルが増えてfdを消費しているが、queued_chunks_limit_size が効かない場合と比べると fd の増え方が緩やかであることがわかります。
[ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do sudo ls /proc/$i/fd | wc -l; done 2019年 1月 31日 木曜日 01:13:51 UTC 14 78 [ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do sudo ls /proc/$i/fd | wc -l; done 2019年 1月 31日 木曜日 01:17:05 UTC 14 110
確認結果
buffer chunkファイルが新しく作られる条件は以下の3つのどれかになります。
- flush_interval毎にstateがqueuedになるので、新しいbuffer chunck(buffered)が作る。
- timekey分の時間がたつと新しいbuffer chunck(buffered)が作られる
- chunk_limit_size * chunk_full_thresholdに到達した。
queued_chunks_limit_size を指定すると、1. の条件で buffer chunkファイルが増えなくなるため結果として fd の使用を抑えることができるということのようです。
ただし、2と3の条件ではbuffer chunkファイルが増えていくのでこの点は注意していたほうがよさそうです。
また、Buffer chunkファイルの数が抑えられるので復旧時に処理するファイルも少なくて済みそうとう気がします。
補足. 検証時の設定
検証のときは以下のような設定で行っています。
<match *.*.development.tag> @type s3 # IAM Instance attached <instance_profile_credentials> retry 5 </instance_profile_credentials> s3_bucket 出力先のバケット s3_region ap-northeast-1 check_bucket false # バケットが存在しないときに作成しないでエラーを発生させるため設定 path fluent-plugin-s3/${tag[0]}/${tag[1]}/ s3_object_key_format "%{path}%{time_slice}%{index}_#{Socket.gethostname}.%{file_extension}" time_slice_format year=%Y/month=%m/day=%d/hour=%H/%Y%m%d-%H_ format json @id s3_${tag[0]}_${tag[1]}_development @log_level debug <buffer tag,time> @type file path /var/log/fluent/out_s3/development/ chunk_limit_size 3g flush_interval 30s flush_mode interval flush_at_shutdown true retry_max_times 300 retry_wait 5m retry_max_interval 10m timekey 5m flush_thread_count 3 queued_chunks_limit_size 12 </buffer> </match>
cobraとviperで設定ファイルの値をフラグの値で上書きする
Go言語でコマンドを作ろうとしたときに、オプションの指定に設定ファイルの読み込みと、オプションで読み込んだ値を上書きをしたかったのでcobraとviperでの実現の仕方を確認する。
ロングオプションを利用する
を参考に。
記事にあるように、 viper.BindPFlag() を呼び出さないと値がフラグの値で更新されない。
また、PersistentFlags().String()でデフォルト値を設定しても、configFileの指定と違い意味が無いので行わない。
package main import ( "github.com/spf13/cobra" "fmt" "github.com/spf13/viper" "os" ) // 設定項目 type Config struct { ApplicationName string Debug bool } // 設定ファイル名 var configFile string var config Config func main() { rootCmd := &cobra.Command{ Use: "app", Run: func(c *cobra.Command, args []string) { // セットされた値の取得 fmt.Printf("configFile: %s\nconfig: %#v\n", configFile, config) }, } // デフォルト値を設定する rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "default_config.toml", "config file name") rootCmd.PersistentFlags().String("name", "", "application name") viper.BindPFlag("ApplicationName", rootCmd.PersistentFlags().Lookup("name")) cobra.OnInitialize(func() { viper.SetConfigFile(configFile) viper.AutomaticEnv() if err := viper.ReadInConfig(); err != nil { fmt.Println("config file read error") fmt.Println(err) os.Exit(1) } if err := viper.Unmarshal(&config); err != nil { fmt.Println("config file Unmarshal error") fmt.Println(err) os.Exit(1) } }) if err := rootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) } println(config.ApplicationName) }
ショートオプションも追加
ヘルプオプションの実行結果
$ go run main.go --help Usage: app [flags] Flags: -c, --config string config file name (default "default_config.toml") -h, --help help for app -n, --name string application name
ショートオプションを使うには PersistentFlags().StringVarP() を利用する。
pflag/flag.go at master · spf13/pflag · GitHubにあるようにショートオプションには1文字のみ利用可能で、それ以上の文字を指定するとエラーになる。
package main import ( "github.com/spf13/cobra" "fmt" "github.com/spf13/viper" "os" ) // 設定項目 type Config struct { ApplicationName string Debug bool } // 設定ファイル名 var configFile string var config Config func main() { rootCmd := &cobra.Command{ Use: "app", Run: func(c *cobra.Command, args []string) { // セットされた値の取得 fmt.Printf("configFile: %s\nconfig: %#v\n", configFile, config) }, } // デフォルト値を設定する rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "default_config.toml", "config file name") rootCmd.PersistentFlags().StringVarP(&config.ApplicationName, "name", "n", "", "application name") viper.BindPFlag("ApplicationName", rootCmd.PersistentFlags().Lookup("name")) cobra.OnInitialize(func() { viper.SetConfigFile(configFile) viper.AutomaticEnv() if err := viper.ReadInConfig(); err != nil { fmt.Println("config file read error") fmt.Println(err) os.Exit(1) } if err := viper.Unmarshal(&config); err != nil { fmt.Println("config file Unmarshal error") fmt.Println(err) os.Exit(1) } }) if err := rootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) } println(config.ApplicationName)
実行結果を見るとやりたかったことが実現出来ていることがわかる。
$ cat default_config.toml ApplicationName = "DEFAULT_APP_TOML" Debug = true $ go run main.go configFile: default_config.toml config: main.Config{ApplicationName:"DEFAULT_APP_TOML", Debug:true} DEFAULT_APP_TOML $ go run main.go -n abc configFile: default_config.toml config: main.Config{ApplicationName:"abc", Debug:true} abc
filebeats でNginxのログを確認する
Ingest nodeと、FileBeatのモジュールがで利用できるようになって、とりあえず小規模でとりあえず導入するような構成ならLogstashとか、Fluentd無しのFilebeat + Elasticsearch + Kibanaの構成でなんとかなるんじゃ無いかと試してみる。
ある程度以上の規模になったり、欠損が許容されないような場合は、Logstashとか、FluentdのようなAggregatorを間に挟んだほうが当然良いと思う。
Filebeatsをインストールする
公式サイトのインストール方法を参考にインストールする。
Nginx Moduleで出力するIndexを分ける
たとえばNginx用のIndexを分ける場合は以下のような設定をする。
prospectorでfield を追加して、output側で特定の条件(fields.typeが"accesslog"の場合)にnginx-%{+yyyy.MM.dd}という indexに送信するとしている。
ただし、nginx.access.geoipのマッピングをする必要が出てくるので、特に問題がなければデフォルトで提供されているfilebeat-* のIndexを使う方が手間は少ない。
filebeat.modules: - module: nginx # Access logs access: enabled: true var.paths: - /var/log/nginx/access.log prospector: fields: type: accesslog # Error logs error: enabled: true var.paths: - /var/log/nginx/error.log prospector: fields: type: "accesslog" output.elasticsearch: enabled: true hosts: ["リモートホスト:9200"] index: "filebeat-%{+yyyy.MM.dd}" indices: - index: "nginx-%{+yyyy.MM.dd}" when.equals: fields.type: "accesslog"
Nginx ModuleとSyslog Moduleで出力するIndexを分ける
Nginx Moduleはデフォルトのfilebeat-* に出力し、Syslog Moduleの出力先のみ変更する。
filebeat.modules: - module: system # Syslog syslog: enabled: true var.paths: ["/var/log/secure"] prospector: fields: app: "syslog" filebeat.modules: - module: nginx # Access logs access: enabled: true var.paths: - /var/log/nginx/access.log # Error logs error: enabled: true var.paths: - /var/log/nginx/error.log output.elasticsearch: enabled: true hosts: ["リモートホスト:9200"] index: "filebeat-%{+yyyy.MM.dd}" indices: - index: "syslog-%{+yyyy.MM.dd}" when.equals: fields.app: "syslog" - index: "filebeat-%{+yyyy.MM.dd}" default:
結論
デフォルトで用意されているDashboardは、一からダッシュボードを作らなくて良いのでこんなことが出来るというという取っ掛かりには良い。 filebeat-* になんでも入れると、ダッシュボードが上手く動かないとか出て来るので最低限でもモジュール単位ぐらいでは Index を分けたくなるが、そのあたりを filebeat だけでやる情報が中々なくて手間取ったが、そこそこ使える感じにはなった。 比較的小規模でAWSなどでCPUクレジットがあるようなインスタンスを使っている場合は、filebeatだけで済むのでCPU負荷が低くてすむという利点もある。
Prometheusのec2 service discoveryを試す
Prometheusには ec2 service discovery機能があり、EC2インスタンスに監視用のagent(exporter) をインストールして、特定のタグを設定するだけで監視・モニタリングの対象とすることができます。
通常のノードの追加
設定方法は以下のように、対象となるノードの情報をPrometheusの設定ファイルに書き、設定を反映されるためPrometheusを再起動します。
これの作業はノードを追加するたびに発生します。
- job_name: 'node' scrape_interval: 30s scrape_timeout: 15s static_configs: - targets: ['172.0.0.3:9100', '172.0.0.3:9256'] labels: name: 'server1' stage: 'prod'
ec2 service discoveryを利用したノードの追加方法
ec2 service discoveryを利用するには設定ファイルを以下のように書きます。
ec2 service discoveryを利用することで、この例ではインスタンスのタグ(Stage)に、prodかstgを値と設定すると自動的に監視・モニタリング対象として自動的に検出します。
- job_name: 'node' ec2_sd_configs: - region: ap-northeast-1 access_key: APIKeyを書く secret_key: SECRET_KEYを書く port: 9100 relabel_configs: - source_labels: [__meta_ec2_tag_Stage] regex: (stg|prod) action: keep - source_labels: [__meta_ec2_tag_Name] target_label: name - source_labels: [__meta_ec2_tag_Stage] target_label: stage
job_name はPrometheusでよく設定される監視対象をグルーピングするラベルです。
ec2_sd_configsにある設定項目は Configuration | Prometheus で確認できます。
ここでは最低限のAWSの設定と、Node exporter がリクエストを受け付ける port番号の設定のみをしています。
relabel_configsは以下のようになっています。
1. 最初の source_labels
監視・モニタリング対象とするかどうかの判断をします。この例ではEC2インスタンスのTag(Stage) の値が、stgかprodの場合は監視・モニタリング対象になります。
2. 2番目と3番目の source_labels
EC2インスタンスのTagの値をPrometheusのラベルに設定しています。
ここでは、EC2のタグのNameの値をPrometheusのラベルのnameに設定し、同様にStageの値をstageに設定しています。
このようにPrometheusのラベルに値を設定しておかないと、Prometheus内(Alertmanager含む)で値を利用できないためこのように設定します。
relabel_configsの設定は EC2 Discovery Relabelling - Robust Perception が参考になりました。
複数の exporter を利用する場合
監視に利用する exporter が複数の場合は、port を複数設定する必要が出てきますが、ec2_sd_configs では port は int値のみ指定可能で複数のportをまとめて書くことはできません。
そのため Node exporter と Process exporter を利用するような場合には、以下のように ec2_sd_configs を書く2つ書くことになります。
- job_name: 'node' ec2_sd_configs: - region: ap-northeast-1 access_key: APIKeyを書く secret_key: SECRET_KEYを書く port: 9100 relabel_configs: - source_labels: [__meta_ec2_tag_Stage] regex: (stg|prod) action: keep - source_labels: [__meta_ec2_tag_Name] target_label: name - source_labels: [__meta_ec2_tag_Stage] target_label: stage - job_name: 'process' ec2_sd_configs: - region: ap-northeast-1 access_key: APIKeyを書く secret_key: SECRET_KEYを書く port: 9256 relabel_configs: - source_labels: [__meta_ec2_tag_Stage] regex: (stg|prod) action: keep - source_labels: [__meta_ec2_tag_Name] target_label: name - source_labels: [__meta_ec2_tag_Stage] target_label: stage