fluent-plugin-s3 + fluent-plugin-dstatのフォーマットで保存されていたS3上のログをEmbulkを使ってMySQLに投入してみた

というわけでEmbulkを使ってみたのでメモです。

対象のログ

  • Fluentdで収集
  • dstatの情報を fluent-plugin-dstat で取得
  • fluent-plugin-s3 を利用してS3に保存
  • gzip圧縮

つまりこんなログです。

2016-01-31T15:24:42Z  examplekey  {"hostname":"example.com","dstat":{"total cpu usage":{"usr":"0.201","sys":"0.134","idl":"99.532","wai":"0.134","hiq":"0.0","siq":"0.0"},"mem\
ory usage":{"used":"1212170240.0","buff":"313139200.0","cach":"2034069504.0","free":"246816768.0"},"load avg":{"1m":"0.090","5m":"0.080","15m":"0.090"},"net/total":{"recv":"108.\
867","send":"212.200"}}}

Embulkのインストール

まずはEmbulkをインストールします。

$ brew cask install java
$ brew install embulk

次に必要そうなPluginをインストールします。

以下は、とりあえず最終的に必要になったもの。

$ embulk gem install embulk-input-s3
$ embulk gem install embulk-output-mysql
$ embulk gem install embulk-parser-fluent-s3-log
$ embulk gem install embulk-filter-expand_json
$ embulk gem install embulk-filter-column

inとoutを適当にかいて embulk guess

まずは in がS3なのはわかります。

github.com

tmp.yml とかに

in:
  type: s3
  bucket: my-logs
  path_prefix: web/
  access_key_id: XXXXXXXXXXXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
out:
  type: stdout

くらいを書いて

$ embulk guess tmp.yml -o config.yml

と実行するとあら不思議、「いい感じに」設定用ファイルのフォーマットを config.yml として生成してくれます。

Embulkの半分は優しさでできているのか?

生成された config.yml をPluginのドキュメントを見ながら修正して、 embulk preview config.yml でプレビュー。うまくいかないときは embulk guess で微調整していきます。

fluent-plugin-s3のフォーマットのファイルをいい感じにパースしてくれる embulk-parser-fluent-s3-log

fluent-plugin-s3で保存したログのフォーマットはタイムスタンプ、キー、JSONと並んでいたので、さてどうしたものかと思ったら「まさに」なプラグインがありました。

github.com

in:
  type: s3
  bucket: my-logs
  path_prefix: web/
  access_key_id: XXXXXXXXXXXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
  decoders:
  - {type: gzip}
  parser:
    type: fluent-s3-log
    columns:
      - {name: hostname, type: string}
      - {name: dstat, type: string}
out:
  type: stdout

ネストされたJSONを展開してくれるembulk-filter-expand_json

カラム dstat の部分はネストしたJSONのままなので展開する必要があります。

こちらも「まさに」なプラグインがありました。

github.com

今回は dstat カラム部分を展開するので以下のように記述しました。

in:
  type: s3
  bucket: my-logs
  path_prefix: web/
  access_key_id: XXXXXXXXXXXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
  decoders:
  - {type: gzip}
  parser:
    type: fluent-s3-log
    columns:
      - {name: hostname, type: string}
      - {name: dstat, type: string}
filters:
  - type: expand_json
    json_column_name: dstat
    root: "$."
    expanded_columns:
      - {name: "['total cpu usage'].usr", type: double}
      - {name: "['total cpu usage'].sys", type: double}
      - {name: "['total cpu usage'].idl", type: double}
      - {name: "['total cpu usage'].wai", type: double}
      - {name: "['total cpu usage'].hiq", type: double}
      - {name: "['total cpu usage'].siq", type: double}
      - {name: "['memory usage'].used", type: double}
      - {name: "['memory usage'].buff", type: double}
      - {name: "['memory usage'].cach", type: double}
      - {name: "['memory usage'].free", type: double}
      - {name: "['load avg'].1m", type: double}
      - {name: "['load avg'].5m", type: double}
      - {name: "['load avg'].15m", type: double}
      - {name: "['net/total'].recv", type: double}
      - {name: "['net/total'].send", type: double}
out:
  type: stdout

MySQLカラム名に合わせるために in 側のカラム名のリネーム

カラム名"['total cpu usage'].usr" のままだとMySQLカラム名にするのには微妙なので、カラム名をリネームします。

こちらも「まさに」なプラグインがありました。

github.com

in:
  type: s3
  bucket: my-logs
  path_prefix: web/
  access_key_id: XXXXXXXXXXXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
  decoders:
  - {type: gzip}
  parser:
    type: fluent-s3-log
    columns:
      - {name: hostname, type: string}
      - {name: dstat, type: string}
filters:
  - type: expand_json
    json_column_name: dstat
    root: "$."
    expanded_columns:
      - {name: "['total cpu usage'].usr", type: double}
      - {name: "['total cpu usage'].sys", type: double}
      - {name: "['total cpu usage'].idl", type: double}
      - {name: "['total cpu usage'].wai", type: double}
      - {name: "['total cpu usage'].hiq", type: double}
      - {name: "['total cpu usage'].siq", type: double}
      - {name: "['memory usage'].used", type: double}
      - {name: "['memory usage'].buff", type: double}
      - {name: "['memory usage'].cach", type: double}
      - {name: "['memory usage'].free", type: double}
      - {name: "['load avg'].1m", type: double}
      - {name: "['load avg'].5m", type: double}
      - {name: "['load avg'].15m", type: double}
      - {name: "['net/total'].recv", type: double}
      - {name: "['net/total'].send", type: double}
  - type: column
    columns:
      - {name: time}
      - {name: key}
      - {name: hostname}
      - {name: cpu_usr, src: "['total cpu usage'].usr"}
      - {name: cpu_sys, src: "['total cpu usage'].sys"}
      - {name: cpu_idl, src: "['total cpu usage'].idl"}
      - {name: cpu_wai, src: "['total cpu usage'].wai"}
      - {name: cpu_hiq, src: "['total cpu usage'].hiq"}
      - {name: cpu_siq, src: "['total cpu usage'].siq"}
      - {name: mem_used, src: "['memory usage'].used"}
      - {name: mem_buff, src: "['memory usage'].buff"}
      - {name: mem_cach, src: "['memory usage'].cach"}
      - {name: mem_free, src: "['memory usage'].free"}
      - {name: load_1m, src: "['load avg'].1m"}
      - {name: load_5m, src: "['load avg'].5m"}
      - {name: load_15m, src: "['load avg'].15m"}
      - {name: net_recv, src: "['net/total'].recv"}
      - {name: net_send, src: "['net/total'].send"}
out:
  type: stdout

out の設定

outMySQLなので、MySQLでテーブルを作成して、設定したら終わりです。

github.com

以下が最終形態の config.yml です。

in:
  type: s3
  bucket: my-logs
  path_prefix: web/
  access_key_id: XXXXXXXXXXXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXXXXX
  decoders:
  - {type: gzip}
  parser:
    type: fluent-s3-log
    columns:
      - {name: hostname, type: string}
      - {name: dstat, type: string}
filters:
  - type: expand_json
    json_column_name: dstat
    root: "$."
    expanded_columns:
      - {name: "['total cpu usage'].usr", type: double}
      - {name: "['total cpu usage'].sys", type: double}
      - {name: "['total cpu usage'].idl", type: double}
      - {name: "['total cpu usage'].wai", type: double}
      - {name: "['total cpu usage'].hiq", type: double}
      - {name: "['total cpu usage'].siq", type: double}
      - {name: "['memory usage'].used", type: double}
      - {name: "['memory usage'].buff", type: double}
      - {name: "['memory usage'].cach", type: double}
      - {name: "['memory usage'].free", type: double}
      - {name: "['load avg'].1m", type: double}
      - {name: "['load avg'].5m", type: double}
      - {name: "['load avg'].15m", type: double}
      - {name: "['net/total'].recv", type: double}
      - {name: "['net/total'].send", type: double}
  - type: column
    columns:
      - {name: time}
      - {name: key}
      - {name: hostname}
      - {name: cpu_usr, src: "['total cpu usage'].usr"}
      - {name: cpu_sys, src: "['total cpu usage'].sys"}
      - {name: cpu_idl, src: "['total cpu usage'].idl"}
      - {name: cpu_wai, src: "['total cpu usage'].wai"}
      - {name: cpu_hiq, src: "['total cpu usage'].hiq"}
      - {name: cpu_siq, src: "['total cpu usage'].siq"}
      - {name: mem_used, src: "['memory usage'].used"}
      - {name: mem_buff, src: "['memory usage'].buff"}
      - {name: mem_cach, src: "['memory usage'].cach"}
      - {name: mem_free, src: "['memory usage'].free"}
      - {name: load_1m, src: "['load avg'].1m"}
      - {name: load_5m, src: "['load avg'].5m"}
      - {name: load_15m, src: "['load avg'].15m"}
      - {name: net_recv, src: "['net/total'].recv"}
      - {name: net_send, src: "['net/total'].send"}
out:
  type: mysql
  host: localhost
  user: username
  password: userpassword
  database: my_s3_logs
  table: dstat
  mode: insert

実行

$ embulk run config.yml

約80万レコードをS3からの取得から開始して数十分でMySQLに投入できました。これは早い(自分で書かなくてよかった。。。)。

Embulk便利!!

プラグインも今あるもので大抵のことは実現出来そうです。

これからは積極的使っていこうと思います。