Pythonで2分探索木の実装 (1)

完全2分木はターゲットせずに、ALDS1_8をターゲットとして考える。

ALDS1_8を基準に実装していくので以下のようになっている。

  • ノードの検索には、再帰ではなくWhileを用いる
  • ParentもNodeの情報として持つ
  • ノードの追加時に再配置について加味しない

削除については別途記事にする。

巡回について

実装上は、どこで print するか。という話なる。

f:id:clavier:20190616205716p:plain

先行順巡回(Preorder Tree Walk)

根節点、左部分木、右部分木の順で節点の番号を出力する。

 1 3 7 14 21 35 42 56 70 80 81 86 99
中間順巡回(Inorder Tree Walk)

左部分木、根節点、右部分木の順で節点の番号を出力する。

 35 3 1 14 7 21 80 42 70 56 86 81 99
後行順巡回(Postorder Tree Walk)

左部分木、右部分木、根節点の順で節点の番号を出力する。

 1 7 21 14 3 56 70 42 81 99 86 80 35

削除以外の実装

ノードの削除以外は特に考えること無く実装していくことが出来ると思う。 入出力は ALDS1_8 に合わせている。 先頭の insert が根(root) になる。という実装になっている。

class Node:
    def __init__(self, value):
        self.value = value
        self.left = None
        self.right = None
        self.parent = None


# BinarySearchNode
class BTS(list):
    def __init__(self):
        list.__init__(self)
        self.root = None
    
    def insert(self, value):
        z = Node(value)
        y = None
        x = self.root

        while x is not None:
            y = x
            if z.value < x.value:
                x = x.left
            else:
                x = x.right
        
        z.parent = y
        if y is None:
            self.root = z
        elif z.value < y.value:
            y.left = z
        else:
            y.right = z
    
    def recursiveFind(self, value):
        print("find", value)
        def findBTS(node):
            
            if node == None:
                return None
            if node.value == value:
                return node
            elif node.value > value:
                return findBTS(node.left)
            else:
                return findBTS(node.right)
        return findBTS(self.root)

    def find(self, value):
        x = self.root
        while x is not None and value != x.value:
            if value < x.value:
                x = x.left
            else:
                x = x.right
        return x
    
    def preorder(self):
        def preParse(u):
            if u == None:
                return
            print(" " + str(u.value), end="")
            preParse(u.left)
            preParse(u.right)
        preParse(self.root)
        print("")

    def inorder(self):
        def inParse(u):
            if u == None:
                return
            inParse(u.left)
            print(" " + str(u.value), end="")
            inParse(u.right)
        inParse(self.root)
        print("")
    
    def postorder(self):
        def postParse(u):
            if u == None:
                return
            postParse(u.left)
            postParse(u.right)
            print(" " + str(u.value), end="")
        postParse(self.root)
        print("")


if __name__ == '__main__':
    n = int(input())
    T = BTS()

    for _ in range(n):
        c = input().split(" ")
        if c[0] == "insert":
            T.insert(int(c[1]))
        elif c[0] == "find":
            z = T.find(int(c[1]))
            if z is None:
                print("no")
            else:
                print("yes")
        elif c[0] == "delete":
            z = T.find(int(c[1]))
            T.delete(z)
        else:
            T.inorder()
            T.preorder()
            T.postorder()

入力として以下のような値を渡す。

15
insert 35
insert 3
insert 1
insert 14
insert 7
insert 21
insert 80
insert 42
insert 70
insert 56
insert 86
insert 81
insert 99
print
find 50

実行結果は以下のような感じになる。

 1 3 7 14 21 35 42 56 70 80 81 86 99
 35 3 1 14 7 21 80 42 70 56 86 81 99
 1 7 21 14 3 56 70 42 81 99 86 80 35
no

fluentdのqueued_chunks_limit_size利用時の挙動の検証

fluentdのv1.3.3 で queued_chunks_limit_size が設定可能になり、説明に以下のようにあったけどパッと理解できなかったので確認をしました。

  • If you set smaller flush_interval, e.g. 1s, there are lots of small queued chunks in buffer. This is not good with file buffer because it consumes lots of fd resources when output destination has a problem. This parameter mitigates such situations.

ref: https://docs.fluentd.org/v1.0/articles/buffer-section

結果としてはこの機能が入ると、queued 状態の buffer chunk の数を制限することができるようになることがわかりました。
ドキュメントを見ると、デフォルト値がnil (No limit)となっているけど、設定しないと flush_thread_count の値になるのでflush_thread_countを指定している場合は合わせて注意が必要です。
参考:https://github.com/fluent/fluentd/commit/a8d2bbb209c775742f589fe067a0ecac7773ed94#diff-7a3f304568ba6143aaf2b7e78011d044

疑問点

疑問点は、以下のような設定の場合 queued の buffer chunk ファイルは増えないけど staged の buffer chunk が増えて消費するfdの総数はあまり変わらないのでは。と勘違いしたことです。

    chunk_limit_size 3g
    flush_interval 30s
    flush_mode interval
    timekey     5m

前提知識

https://docs.fluentd.org/v1.0/articles/buffer-plugin-overview#the-lifecycle-of-chunks

buffer chunkでは、以下のようになファイルができると

  • buufer.bxxx になっていれば state が staged 状態
  • buufer.qxxx になっていれば queued 状態

    になっているとわかります。

-rw-r--r-- 1 td-agent td-agent 742  1月 31 01:09 /var/log/fluent/out_s3/development/buffer.b580b6af3642109b9c033a2a85ef6a4a7.log
-rw-r--r-- 1 td-agent td-agent 103  1月 31 01:09 /var/log/fluent/out_s3/development/buffer.b580b6af3642109b9c033a2a85ef6a4a7.log.meta
-rw-r--r-- 1 td-agent td-agent 736  1月 31 01:09 /var/log/fluent/out_s3/development/buffer.q580b6af1dce897186b95d5927ebb34ea.log
-rw-r--r-- 1 td-agent td-agent 100  1月 31 01:10 /var/log/fluent/out_s3/development/buffer.q580b6af1dce897186b95d5927ebb34ea.log.meta

上記のように、buffer.bxxxxx.log と buffer.qxxxx.log というファイル(と.metaファイル)ができます。

実験

buffer chunk が queued の状態と staged の状態でそれぞれfdの消費がどうなっているのか確認します。
fluentd に定常的にデータを送って、bufferにデータが溜まるような状況※を作り出して検証しています。

※存在しないS3のバケット名を指定して、エラーが発生するようにしました。

queued_chunks_limit_sizeが効かない場合

まずは、queued_chunks_limit_size のない td-agent 1.2.2 で確認します。 queuedのファイルが大量にでき、fdを消費してることがわかります。

[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l
2019年  1月 31日 木曜日 00:56:49 UTC
224
[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l
2019年  1月 31日 木曜日 00:57:23 UTC
256

fdを見ると以下のように消費していっていることがわかります。

[ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do  sudo ls /proc/$i/fd | wc -l; done
2019年  1月 31日 木曜日 00:56:59 UTC
14
272
[ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do  sudo ls /proc/$i/fd | wc -l; done
2019年  1月 31日 木曜日 00:57:57 UTC
14
336
queued_chunks_limit_sizeが効く場合

td-agent 1.3.3 で確認します。 td-agent 1.2.2のときと違いqueuedのファイルが増えていません。

[ec2-user@localhost ~]$ date; ls -lha [ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l
2019年  1月 31日 木曜日 01:13:41 UTC
12
[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.q* |wc -l
2019年  1月 31日 木曜日 01:17:33 UTC
12

buffer chunck の増え方をみていると flush_interval のタイミング でbuffer chunck が増えずに、timekeyのタイミングでファイルが増えていることがわかります。

[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.b* |wc -l
2019年  1月 31日 木曜日 01:13:45 UTC
50
[ec2-user@localhost ~]$ date; ls -lha /var/log/fluent/out_s3/development/buffer.b* |wc -l
2019年  1月 31日 木曜日 01:17:01 UTC
82

queuedのファイルの一定数ですが、stagedのファイルが増えてfdを消費しているが、queued_chunks_limit_size が効かない場合と比べると fd の増え方が緩やかであることがわかります。

[ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do  sudo ls /proc/$i/fd | wc -l; done
2019年  1月 31日 木曜日 01:13:51 UTC
14
78
[ec2-user@localhost ~]$ date; for i in $(ps aux | grep "[t]d-agent" | awk '{print $2}'); do  sudo ls /proc/$i/fd | wc -l; done
2019年  1月 31日 木曜日 01:17:05 UTC
14
110

確認結果

buffer chunkファイルが新しく作られる条件は以下の3つのどれかになります。

  1. flush_interval毎にstateがqueuedになるので、新しいbuffer chunck(buffered)が作る。
  2. timekey分の時間がたつと新しいbuffer chunck(buffered)が作られる
  3. chunk_limit_size * chunk_full_thresholdに到達した。

queued_chunks_limit_size を指定すると、1. の条件で buffer chunkファイルが増えなくなるため結果として fd の使用を抑えることができるということのようです。

ただし、2と3の条件ではbuffer chunkファイルが増えていくのでこの点は注意していたほうがよさそうです。

また、Buffer chunkファイルの数が抑えられるので復旧時に処理するファイルも少なくて済みそうとう気がします。

補足. 検証時の設定

検証のときは以下のような設定で行っています。

<match *.*.development.tag>
  @type s3

  # IAM Instance attached
  <instance_profile_credentials>
    retry 5
  </instance_profile_credentials>

  s3_bucket 出力先のバケット
  s3_region ap-northeast-1
  check_bucket false # バケットが存在しないときに作成しないでエラーを発生させるため設定

  path fluent-plugin-s3/${tag[0]}/${tag[1]}/
  s3_object_key_format "%{path}%{time_slice}%{index}_#{Socket.gethostname}.%{file_extension}"
  time_slice_format year=%Y/month=%m/day=%d/hour=%H/%Y%m%d-%H_

  format json

  @id s3_${tag[0]}_${tag[1]}_development
  @log_level debug

  <buffer tag,time>
    @type file
    path /var/log/fluent/out_s3/development/
    chunk_limit_size 3g
    flush_interval 30s
    flush_mode interval
    flush_at_shutdown true
    retry_max_times 300
    retry_wait 5m
    retry_max_interval 10m
    timekey     5m
    flush_thread_count 3
    queued_chunks_limit_size 12
  </buffer>
</match>

cobraとviperで設定ファイルの値をフラグの値で上書きする

Go言語でコマンドを作ろうとしたときに、オプションの指定に設定ファイルの読み込みと、オプションで読み込んだ値を上書きをしたかったのでcobraとviperでの実現の仕方を確認する。

ロングオプションを利用する

qiita.com

を参考に。

記事にあるように、 viper.BindPFlag() を呼び出さないと値がフラグの値で更新されない。

また、PersistentFlags().String()でデフォルト値を設定しても、configFileの指定と違い意味が無いので行わない。

package main

import (
    "github.com/spf13/cobra"
    "fmt"
    "github.com/spf13/viper"
    "os"
)

// 設定項目
type Config struct {
    ApplicationName string
    Debug bool
}

// 設定ファイル名
var configFile string

var config Config


func main() {
    rootCmd := &cobra.Command{
        Use: "app",
        Run: func(c *cobra.Command, args []string) {
            // セットされた値の取得
            fmt.Printf("configFile: %s\nconfig: %#v\n", configFile, config)
        },
    }

    // デフォルト値を設定する
    rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "default_config.toml", "config file name")

    rootCmd.PersistentFlags().String("name", "", "application name")

    viper.BindPFlag("ApplicationName", rootCmd.PersistentFlags().Lookup("name"))

    cobra.OnInitialize(func() {
        viper.SetConfigFile(configFile)
        viper.AutomaticEnv()

        if err := viper.ReadInConfig(); err != nil {
            fmt.Println("config file read error")
            fmt.Println(err)
            os.Exit(1)
        }

        if err := viper.Unmarshal(&config); err != nil {
            fmt.Println("config file Unmarshal error")
            fmt.Println(err)
            os.Exit(1)
        }
    })

    if err := rootCmd.Execute(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }

    println(config.ApplicationName)
}

ショートオプションも追加

ヘルプオプションの実行結果

$ go run main.go  --help
Usage:
  app [flags]

Flags:
  -c, --config string   config file name (default "default_config.toml")
  -h, --help            help for app
  -n, --name string     application name

ショートオプションを使うには PersistentFlags().StringVarP() を利用する。

pflag/flag.go at master · spf13/pflag · GitHubにあるようにショートオプションには1文字のみ利用可能で、それ以上の文字を指定するとエラーになる。

package main

import (
    "github.com/spf13/cobra"
    "fmt"
    "github.com/spf13/viper"
    "os"
)

// 設定項目
type Config struct {
    ApplicationName string
    Debug bool
}

// 設定ファイル名
var configFile string

var config Config


func main() {
    rootCmd := &cobra.Command{
        Use: "app",
        Run: func(c *cobra.Command, args []string) {
            // セットされた値の取得
            fmt.Printf("configFile: %s\nconfig: %#v\n", configFile, config)
        },
    }

    // デフォルト値を設定する
    rootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "default_config.toml", "config file name")

    rootCmd.PersistentFlags().StringVarP(&config.ApplicationName, "name", "n", "", "application name")

    viper.BindPFlag("ApplicationName", rootCmd.PersistentFlags().Lookup("name"))

    cobra.OnInitialize(func() {
        viper.SetConfigFile(configFile)
        viper.AutomaticEnv()

        if err := viper.ReadInConfig(); err != nil {
            fmt.Println("config file read error")
            fmt.Println(err)
            os.Exit(1)
        }

        if err := viper.Unmarshal(&config); err != nil {
            fmt.Println("config file Unmarshal error")
            fmt.Println(err)
            os.Exit(1)
        }
    })

    if err := rootCmd.Execute(); err != nil {
        fmt.Println(err)
        os.Exit(1)
    }

    println(config.ApplicationName)

実行結果を見るとやりたかったことが実現出来ていることがわかる。

$ cat default_config.toml
ApplicationName = "DEFAULT_APP_TOML"
Debug = true

$ go run main.go 
configFile: default_config.toml
config: main.Config{ApplicationName:"DEFAULT_APP_TOML", Debug:true}
DEFAULT_APP_TOML

$ go run main.go  -n abc
configFile: default_config.toml
config: main.Config{ApplicationName:"abc", Debug:true}
abc

filebeats でNginxのログを確認する

Ingest nodeと、FileBeatのモジュールがで利用できるようになって、とりあえず小規模でとりあえず導入するような構成ならLogstashとか、Fluentd無しのFilebeat + Elasticsearch + Kibanaの構成でなんとかなるんじゃ無いかと試してみる。
ある程度以上の規模になったり、欠損が許容されないような場合は、Logstashとか、FluentdのようなAggregatorを間に挟んだほうが当然良いと思う。

Filebeatsをインストールする

公式サイトのインストール方法を参考にインストールする。

www.elastic.co

Nginx Moduleで出力するIndexを分ける

たとえばNginx用のIndexを分ける場合は以下のような設定をする。
prospectorでfield を追加して、output側で特定の条件(fields.typeが"accesslog"の場合)にnginx-%{+yyyy.MM.dd}という indexに送信するとしている。
ただし、nginx.access.geoipのマッピングをする必要が出てくるので、特に問題がなければデフォルトで提供されているfilebeat-* のIndexを使う方が手間は少ない。

filebeat.modules:
- module: nginx
  # Access logs
  access:
    enabled: true
    var.paths:
      - /var/log/nginx/access.log
    prospector:
      fields:
        type: accesslog

  # Error logs
  error:
    enabled: true
    var.paths:
      - /var/log/nginx/error.log
    prospector:
      fields:
        type: "accesslog"


output.elasticsearch:
  enabled: true

  hosts: ["リモートホスト:9200"]
  index: "filebeat-%{+yyyy.MM.dd}"
  indices:
    - index: "nginx-%{+yyyy.MM.dd}"
      when.equals:
        fields.type: "accesslog"

Nginx ModuleとSyslog Moduleで出力するIndexを分ける

Nginx Moduleはデフォルトのfilebeat-* に出力し、Syslog Moduleの出力先のみ変更する。

filebeat.modules:
- module: system
  # Syslog
  syslog:
    enabled: true
    var.paths: ["/var/log/secure"]
    prospector:
      fields:
        app: "syslog"

filebeat.modules:
- module: nginx
  # Access logs
  access:
    enabled: true
    var.paths:
      - /var/log/nginx/access.log

  # Error logs
  error:
    enabled: true
    var.paths:
      - /var/log/nginx/error.log


output.elasticsearch:
  enabled: true

  hosts: ["リモートホスト:9200"]
  index: "filebeat-%{+yyyy.MM.dd}"
  indices:
    - index: "syslog-%{+yyyy.MM.dd}"
      when.equals:
        fields.app: "syslog"
    - index: "filebeat-%{+yyyy.MM.dd}"
      default:

結論

デフォルトで用意されているDashboardは、一からダッシュボードを作らなくて良いのでこんなことが出来るというという取っ掛かりには良い。         filebeat-* になんでも入れると、ダッシュボードが上手く動かないとか出て来るので最低限でもモジュール単位ぐらいでは Index を分けたくなるが、そのあたりを filebeat だけでやる情報が中々なくて手間取ったが、そこそこ使える感じにはなった。         比較的小規模でAWSなどでCPUクレジットがあるようなインスタンスを使っている場合は、filebeatだけで済むのでCPU負荷が低くてすむという利点もある。

Prometheusのec2 service discoveryを試す

Prometheusには ec2 service discovery機能があり、EC2インスタンスに監視用のagent(exporter) をインストールして、特定のタグを設定するだけで監視・モニタリングの対象とすることができます。

通常のノードの追加

設定方法は以下のように、対象となるノードの情報をPrometheusの設定ファイルに書き、設定を反映されるためPrometheusを再起動します。
これの作業はノードを追加するたびに発生します。

  - job_name: 'node'
    scrape_interval: 30s
    scrape_timeout:  15s
    static_configs:
      - targets: ['172.0.0.3:9100', '172.0.0.3:9256']
        labels:
          name: 'server1'
          stage: 'prod'

ec2 service discoveryを利用したノードの追加方法

ec2 service discoveryを利用するには設定ファイルを以下のように書きます。
ec2 service discoveryを利用することで、この例ではインスタンスのタグ(Stage)に、prodかstgを値と設定すると自動的に監視・モニタリング対象として自動的に検出します。

  - job_name: 'node'
    ec2_sd_configs:
      - region: ap-northeast-1
        access_key: APIKeyを書く
        secret_key: SECRET_KEYを書く
        port: 9100
    relabel_configs:
      - source_labels: [__meta_ec2_tag_Stage]
        regex: (stg|prod)
        action: keep
      - source_labels: [__meta_ec2_tag_Name]
        target_label: name
      - source_labels: [__meta_ec2_tag_Stage]
        target_label: stage

job_name はPrometheusでよく設定される監視対象をグルーピングするラベルです。
ec2_sd_configsにある設定項目は Configuration | Prometheus で確認できます。
ここでは最低限のAWSの設定と、Node exporter がリクエストを受け付ける port番号の設定のみをしています。

relabel_configsは以下のようになっています。

1. 最初の source_labels

監視・モニタリング対象とするかどうかの判断をします。この例ではEC2インスタンスのTag(Stage) の値が、stgかprodの場合は監視・モニタリング対象になります。

2. 2番目と3番目の source_labels

EC2インスタンスのTagの値をPrometheusのラベルに設定しています。
ここでは、EC2のタグのNameの値をPrometheusのラベルのnameに設定し、同様にStageの値をstageに設定しています。
このようにPrometheusのラベルに値を設定しておかないと、Prometheus内(Alertmanager含む)で値を利用できないためこのように設定します。

relabel_configsの設定は EC2 Discovery Relabelling - Robust Perception が参考になりました。

複数の exporter を利用する場合

監視に利用する exporter が複数の場合は、port を複数設定する必要が出てきますが、ec2_sd_configs では port は int値のみ指定可能で複数のportをまとめて書くことはできません。
そのため Node exporter と Process exporter を利用するような場合には、以下のように ec2_sd_configs を書く2つ書くことになります。

  - job_name: 'node'
    ec2_sd_configs:
      - region: ap-northeast-1
        access_key: APIKeyを書く
        secret_key: SECRET_KEYを書く
        port: 9100
    relabel_configs:
      - source_labels: [__meta_ec2_tag_Stage]
        regex: (stg|prod)
        action: keep
      - source_labels: [__meta_ec2_tag_Name]
        target_label: name
      - source_labels: [__meta_ec2_tag_Stage]
        target_label: stage

  - job_name: 'process'
    ec2_sd_configs:
      - region: ap-northeast-1
        access_key: APIKeyを書く
        secret_key: SECRET_KEYを書く
        port: 9256
    relabel_configs:
      - source_labels: [__meta_ec2_tag_Stage]
        regex: (stg|prod)
        action: keep
      - source_labels: [__meta_ec2_tag_Name]
        target_label: name
      - source_labels: [__meta_ec2_tag_Stage]
        target_label: stage