というわけでEmbulkを使ってみたのでメモです。
対象のログ
- Fluentdで収集
- dstatの情報を fluent-plugin-dstat で取得
- fluent-plugin-s3 を利用してS3に保存
- gzip圧縮
つまりこんなログです。
2016-01-31T15:24:42Z examplekey {"hostname":"example.com","dstat":{"total cpu usage":{"usr":"0.201","sys":"0.134","idl":"99.532","wai":"0.134","hiq":"0.0","siq":"0.0"},"mem\
ory usage":{"used":"1212170240.0","buff":"313139200.0","cach":"2034069504.0","free":"246816768.0"},"load avg":{"1m":"0.090","5m":"0.080","15m":"0.090"},"net/total":{"recv":"108.\
867","send":"212.200"}}}
Embulkのインストール
まずはEmbulkをインストールします。
$ brew cask install java
$ brew install embulk
次に必要そうなPluginをインストールします。
以下は、とりあえず最終的に必要になったもの。
$ embulk gem install embulk-input-s3
$ embulk gem install embulk-output-mysql
$ embulk gem install embulk-parser-fluent-s3-log
$ embulk gem install embulk-filter-expand_json
$ embulk gem install embulk-filter-column
inとoutを適当にかいて embulk guess
まずは in
がS3なのはわかります。
github.com
tmp.yml
とかに
in:
type: s3
bucket: my-logs
path_prefix: web/
access_key_id: XXXXXXXXXXXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
out:
type: stdout
くらいを書いて
$ embulk guess tmp.yml -o config.yml
と実行するとあら不思議、「いい感じに」設定用ファイルのフォーマットを config.yml
として生成してくれます。
Embulkの半分は優しさでできているのか?
生成された config.yml
をPluginのドキュメントを見ながら修正して、 embulk preview config.yml
でプレビュー。うまくいかないときは embulk guess
で微調整していきます。
fluent-plugin-s3のフォーマットのファイルをいい感じにパースしてくれる embulk-parser-fluent-s3-log
fluent-plugin-s3で保存したログのフォーマットはタイムスタンプ、キー、JSONと並んでいたので、さてどうしたものかと思ったら「まさに」なプラグインがありました。
github.com
in:
type: s3
bucket: my-logs
path_prefix: web/
access_key_id: XXXXXXXXXXXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
decoders:
- {type: gzip}
parser:
type: fluent-s3-log
columns:
- {name: hostname, type: string}
- {name: dstat, type: string}
out:
type: stdout
ネストされたJSONを展開してくれるembulk-filter-expand_json
カラム dstat
の部分はネストしたJSONのままなので展開する必要があります。
こちらも「まさに」なプラグインがありました。
github.com
今回は dstat
カラム部分を展開するので以下のように記述しました。
in:
type: s3
bucket: my-logs
path_prefix: web/
access_key_id: XXXXXXXXXXXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
decoders:
- {type: gzip}
parser:
type: fluent-s3-log
columns:
- {name: hostname, type: string}
- {name: dstat, type: string}
filters:
- type: expand_json
json_column_name: dstat
root: "$."
expanded_columns:
- {name: "['total cpu usage'].usr", type: double}
- {name: "['total cpu usage'].sys", type: double}
- {name: "['total cpu usage'].idl", type: double}
- {name: "['total cpu usage'].wai", type: double}
- {name: "['total cpu usage'].hiq", type: double}
- {name: "['total cpu usage'].siq", type: double}
- {name: "['memory usage'].used", type: double}
- {name: "['memory usage'].buff", type: double}
- {name: "['memory usage'].cach", type: double}
- {name: "['memory usage'].free", type: double}
- {name: "['load avg'].1m", type: double}
- {name: "['load avg'].5m", type: double}
- {name: "['load avg'].15m", type: double}
- {name: "['net/total'].recv", type: double}
- {name: "['net/total'].send", type: double}
out:
type: stdout
カラム名が "['total cpu usage'].usr"
のままだとMySQLのカラム名にするのには微妙なので、カラム名をリネームします。
こちらも「まさに」なプラグインがありました。
github.com
in:
type: s3
bucket: my-logs
path_prefix: web/
access_key_id: XXXXXXXXXXXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
decoders:
- {type: gzip}
parser:
type: fluent-s3-log
columns:
- {name: hostname, type: string}
- {name: dstat, type: string}
filters:
- type: expand_json
json_column_name: dstat
root: "$."
expanded_columns:
- {name: "['total cpu usage'].usr", type: double}
- {name: "['total cpu usage'].sys", type: double}
- {name: "['total cpu usage'].idl", type: double}
- {name: "['total cpu usage'].wai", type: double}
- {name: "['total cpu usage'].hiq", type: double}
- {name: "['total cpu usage'].siq", type: double}
- {name: "['memory usage'].used", type: double}
- {name: "['memory usage'].buff", type: double}
- {name: "['memory usage'].cach", type: double}
- {name: "['memory usage'].free", type: double}
- {name: "['load avg'].1m", type: double}
- {name: "['load avg'].5m", type: double}
- {name: "['load avg'].15m", type: double}
- {name: "['net/total'].recv", type: double}
- {name: "['net/total'].send", type: double}
- type: column
columns:
- {name: time}
- {name: key}
- {name: hostname}
- {name: cpu_usr, src: "['total cpu usage'].usr"}
- {name: cpu_sys, src: "['total cpu usage'].sys"}
- {name: cpu_idl, src: "['total cpu usage'].idl"}
- {name: cpu_wai, src: "['total cpu usage'].wai"}
- {name: cpu_hiq, src: "['total cpu usage'].hiq"}
- {name: cpu_siq, src: "['total cpu usage'].siq"}
- {name: mem_used, src: "['memory usage'].used"}
- {name: mem_buff, src: "['memory usage'].buff"}
- {name: mem_cach, src: "['memory usage'].cach"}
- {name: mem_free, src: "['memory usage'].free"}
- {name: load_1m, src: "['load avg'].1m"}
- {name: load_5m, src: "['load avg'].5m"}
- {name: load_15m, src: "['load avg'].15m"}
- {name: net_recv, src: "['net/total'].recv"}
- {name: net_send, src: "['net/total'].send"}
out:
type: stdout
out
の設定
out
はMySQLなので、MySQLでテーブルを作成して、設定したら終わりです。
github.com
以下が最終形態の config.yml
です。
in:
type: s3
bucket: my-logs
path_prefix: web/
access_key_id: XXXXXXXXXXXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
decoders:
- {type: gzip}
parser:
type: fluent-s3-log
columns:
- {name: hostname, type: string}
- {name: dstat, type: string}
filters:
- type: expand_json
json_column_name: dstat
root: "$."
expanded_columns:
- {name: "['total cpu usage'].usr", type: double}
- {name: "['total cpu usage'].sys", type: double}
- {name: "['total cpu usage'].idl", type: double}
- {name: "['total cpu usage'].wai", type: double}
- {name: "['total cpu usage'].hiq", type: double}
- {name: "['total cpu usage'].siq", type: double}
- {name: "['memory usage'].used", type: double}
- {name: "['memory usage'].buff", type: double}
- {name: "['memory usage'].cach", type: double}
- {name: "['memory usage'].free", type: double}
- {name: "['load avg'].1m", type: double}
- {name: "['load avg'].5m", type: double}
- {name: "['load avg'].15m", type: double}
- {name: "['net/total'].recv", type: double}
- {name: "['net/total'].send", type: double}
- type: column
columns:
- {name: time}
- {name: key}
- {name: hostname}
- {name: cpu_usr, src: "['total cpu usage'].usr"}
- {name: cpu_sys, src: "['total cpu usage'].sys"}
- {name: cpu_idl, src: "['total cpu usage'].idl"}
- {name: cpu_wai, src: "['total cpu usage'].wai"}
- {name: cpu_hiq, src: "['total cpu usage'].hiq"}
- {name: cpu_siq, src: "['total cpu usage'].siq"}
- {name: mem_used, src: "['memory usage'].used"}
- {name: mem_buff, src: "['memory usage'].buff"}
- {name: mem_cach, src: "['memory usage'].cach"}
- {name: mem_free, src: "['memory usage'].free"}
- {name: load_1m, src: "['load avg'].1m"}
- {name: load_5m, src: "['load avg'].5m"}
- {name: load_15m, src: "['load avg'].15m"}
- {name: net_recv, src: "['net/total'].recv"}
- {name: net_send, src: "['net/total'].send"}
out:
type: mysql
host: localhost
user: username
password: userpassword
database: my_s3_logs
table: dstat
mode: insert
実行
$ embulk run config.yml
約80万レコードをS3からの取得から開始して数十分でMySQLに投入できました。これは早い(自分で書かなくてよかった。。。)。
Embulk便利!!
プラグインも今あるもので大抵のことは実現出来そうです。
これからは積極的使っていこうと思います。