Embulk、必要にせまられて使ってみたけど相当良い。
— k1LoW (@k1LoW) March 11, 2016
というわけでEmbulkを使ってみたのでメモです。
対象のログ
- Fluentdで収集
- dstatの情報を fluent-plugin-dstat で取得
- fluent-plugin-s3 を利用してS3に保存
- gzip圧縮
つまりこんなログです。
2016-01-31T15:24:42Z examplekey {"hostname":"example.com","dstat":{"total cpu usage":{"usr":"0.201","sys":"0.134","idl":"99.532","wai":"0.134","hiq":"0.0","siq":"0.0"},"mem\ ory usage":{"used":"1212170240.0","buff":"313139200.0","cach":"2034069504.0","free":"246816768.0"},"load avg":{"1m":"0.090","5m":"0.080","15m":"0.090"},"net/total":{"recv":"108.\ 867","send":"212.200"}}}
Embulkのインストール
まずはEmbulkをインストールします。
$ brew cask install java $ brew install embulk
次に必要そうなPluginをインストールします。
以下は、とりあえず最終的に必要になったもの。
$ embulk gem install embulk-input-s3 $ embulk gem install embulk-output-mysql $ embulk gem install embulk-parser-fluent-s3-log $ embulk gem install embulk-filter-expand_json $ embulk gem install embulk-filter-column
inとoutを適当にかいて embulk guess
まずは in
がS3なのはわかります。
tmp.yml
とかに
in: type: s3 bucket: my-logs path_prefix: web/ access_key_id: XXXXXXXXXXXXXXXXXXXXXX secret_access_key: XXXXXXXXXXXXXXXXXXXXXX out: type: stdout
くらいを書いて
$ embulk guess tmp.yml -o config.yml
と実行するとあら不思議、「いい感じに」設定用ファイルのフォーマットを config.yml
として生成してくれます。
Embulkの半分は優しさでできているのか?
生成された config.yml
をPluginのドキュメントを見ながら修正して、 embulk preview config.yml
でプレビュー。うまくいかないときは embulk guess
で微調整していきます。
fluent-plugin-s3のフォーマットのファイルをいい感じにパースしてくれる embulk-parser-fluent-s3-log
fluent-plugin-s3で保存したログのフォーマットはタイムスタンプ、キー、JSONと並んでいたので、さてどうしたものかと思ったら「まさに」なプラグインがありました。
in: type: s3 bucket: my-logs path_prefix: web/ access_key_id: XXXXXXXXXXXXXXXXXXXXXX secret_access_key: XXXXXXXXXXXXXXXXXXXXXX decoders: - {type: gzip} parser: type: fluent-s3-log columns: - {name: hostname, type: string} - {name: dstat, type: string} out: type: stdout
ネストされたJSONを展開してくれるembulk-filter-expand_json
カラム dstat
の部分はネストしたJSONのままなので展開する必要があります。
こちらも「まさに」なプラグインがありました。
今回は dstat
カラム部分を展開するので以下のように記述しました。
in: type: s3 bucket: my-logs path_prefix: web/ access_key_id: XXXXXXXXXXXXXXXXXXXXXX secret_access_key: XXXXXXXXXXXXXXXXXXXXXX decoders: - {type: gzip} parser: type: fluent-s3-log columns: - {name: hostname, type: string} - {name: dstat, type: string} filters: - type: expand_json json_column_name: dstat root: "$." expanded_columns: - {name: "['total cpu usage'].usr", type: double} - {name: "['total cpu usage'].sys", type: double} - {name: "['total cpu usage'].idl", type: double} - {name: "['total cpu usage'].wai", type: double} - {name: "['total cpu usage'].hiq", type: double} - {name: "['total cpu usage'].siq", type: double} - {name: "['memory usage'].used", type: double} - {name: "['memory usage'].buff", type: double} - {name: "['memory usage'].cach", type: double} - {name: "['memory usage'].free", type: double} - {name: "['load avg'].1m", type: double} - {name: "['load avg'].5m", type: double} - {name: "['load avg'].15m", type: double} - {name: "['net/total'].recv", type: double} - {name: "['net/total'].send", type: double} out: type: stdout
MySQLのカラム名に合わせるために in
側のカラム名のリネーム
カラム名が "['total cpu usage'].usr"
のままだとMySQLのカラム名にするのには微妙なので、カラム名をリネームします。
こちらも「まさに」なプラグインがありました。
in: type: s3 bucket: my-logs path_prefix: web/ access_key_id: XXXXXXXXXXXXXXXXXXXXXX secret_access_key: XXXXXXXXXXXXXXXXXXXXXX decoders: - {type: gzip} parser: type: fluent-s3-log columns: - {name: hostname, type: string} - {name: dstat, type: string} filters: - type: expand_json json_column_name: dstat root: "$." expanded_columns: - {name: "['total cpu usage'].usr", type: double} - {name: "['total cpu usage'].sys", type: double} - {name: "['total cpu usage'].idl", type: double} - {name: "['total cpu usage'].wai", type: double} - {name: "['total cpu usage'].hiq", type: double} - {name: "['total cpu usage'].siq", type: double} - {name: "['memory usage'].used", type: double} - {name: "['memory usage'].buff", type: double} - {name: "['memory usage'].cach", type: double} - {name: "['memory usage'].free", type: double} - {name: "['load avg'].1m", type: double} - {name: "['load avg'].5m", type: double} - {name: "['load avg'].15m", type: double} - {name: "['net/total'].recv", type: double} - {name: "['net/total'].send", type: double} - type: column columns: - {name: time} - {name: key} - {name: hostname} - {name: cpu_usr, src: "['total cpu usage'].usr"} - {name: cpu_sys, src: "['total cpu usage'].sys"} - {name: cpu_idl, src: "['total cpu usage'].idl"} - {name: cpu_wai, src: "['total cpu usage'].wai"} - {name: cpu_hiq, src: "['total cpu usage'].hiq"} - {name: cpu_siq, src: "['total cpu usage'].siq"} - {name: mem_used, src: "['memory usage'].used"} - {name: mem_buff, src: "['memory usage'].buff"} - {name: mem_cach, src: "['memory usage'].cach"} - {name: mem_free, src: "['memory usage'].free"} - {name: load_1m, src: "['load avg'].1m"} - {name: load_5m, src: "['load avg'].5m"} - {name: load_15m, src: "['load avg'].15m"} - {name: net_recv, src: "['net/total'].recv"} - {name: net_send, src: "['net/total'].send"} out: type: stdout
out
の設定
out
はMySQLなので、MySQLでテーブルを作成して、設定したら終わりです。
以下が最終形態の config.yml
です。
in: type: s3 bucket: my-logs path_prefix: web/ access_key_id: XXXXXXXXXXXXXXXXXXXXXX secret_access_key: XXXXXXXXXXXXXXXXXXXXXX decoders: - {type: gzip} parser: type: fluent-s3-log columns: - {name: hostname, type: string} - {name: dstat, type: string} filters: - type: expand_json json_column_name: dstat root: "$." expanded_columns: - {name: "['total cpu usage'].usr", type: double} - {name: "['total cpu usage'].sys", type: double} - {name: "['total cpu usage'].idl", type: double} - {name: "['total cpu usage'].wai", type: double} - {name: "['total cpu usage'].hiq", type: double} - {name: "['total cpu usage'].siq", type: double} - {name: "['memory usage'].used", type: double} - {name: "['memory usage'].buff", type: double} - {name: "['memory usage'].cach", type: double} - {name: "['memory usage'].free", type: double} - {name: "['load avg'].1m", type: double} - {name: "['load avg'].5m", type: double} - {name: "['load avg'].15m", type: double} - {name: "['net/total'].recv", type: double} - {name: "['net/total'].send", type: double} - type: column columns: - {name: time} - {name: key} - {name: hostname} - {name: cpu_usr, src: "['total cpu usage'].usr"} - {name: cpu_sys, src: "['total cpu usage'].sys"} - {name: cpu_idl, src: "['total cpu usage'].idl"} - {name: cpu_wai, src: "['total cpu usage'].wai"} - {name: cpu_hiq, src: "['total cpu usage'].hiq"} - {name: cpu_siq, src: "['total cpu usage'].siq"} - {name: mem_used, src: "['memory usage'].used"} - {name: mem_buff, src: "['memory usage'].buff"} - {name: mem_cach, src: "['memory usage'].cach"} - {name: mem_free, src: "['memory usage'].free"} - {name: load_1m, src: "['load avg'].1m"} - {name: load_5m, src: "['load avg'].5m"} - {name: load_15m, src: "['load avg'].15m"} - {name: net_recv, src: "['net/total'].recv"} - {name: net_send, src: "['net/total'].send"} out: type: mysql host: localhost user: username password: userpassword database: my_s3_logs table: dstat mode: insert
実行
$ embulk run config.yml
約80万レコードをS3からの取得から開始して数十分でMySQLに投入できました。これは早い(自分で書かなくてよかった。。。)。
Embulk便利!!
プラグインも今あるもので大抵のことは実現出来そうです。
これからは積極的使っていこうと思います。