Skip to content

为 logstash 安装 clickhouse 插件

简单介绍

logstash 的 clickhosue 插件是用 ruby 写的,https://github.com/funcmike/logstash-output-clickhouse 这是一个归档项目,没有再维护了。主要实现的就是将数据通过 http JSONEachRow的方式提交给 clickhouse,只实现了 output 阶段。

安装插件,手动编译

  • 确保服务器安装了 rubygem
  • clone 项目,git clone https://github.com/funcmike/logstash-output-clickhouse.git
  • cd logstash-output-clickhouse
  • gem build logstash-output-clickhouse.gemspec
  • logstash-plugin install logstash-output-clickhouse-0.1.0.gem
  • 检查插件是否安装成功:bin/logstash-plugin list | grep clickhouse

注意如果使用了国内源的,可能会出现超时,SSL 的问题

增加::ssl_verify_mode: 0

root@debian:~# cat ~/.gemrc
---
:backtrace: false
:bulk_threshold: 1000
:sources:
- https://gems.ruby-china.com/
:update_sources: true
:verbose: true
:ssl_verify_mode: 0

离线包安装

  • 找一个已经安装好的,打一个离线包安装,注意版本问题

demo

input {
    kafka {
        bootstrap_servers => "internal.kafka.1.xxxx.me:9092"
        group_id => "sync_kd_6"
        topics => ["test.detailKd"]
        consumer_threads => 5
        codec => "json"
        auto_offset_reset => 'earliest'
        decorate_events => true
    }
}

filter {
    mutate {
      add_field => { "@data" => "%{data}" }
    }
    json {
      source => "@data"
      remove_field => ["@data", "data", "@timestamp", "@version", "event"]
    }
    ruby {
        code => "event.set('time', event.get('time').to_i * 1000)"
    }
}

output {
    clickhouse {
        http_hosts => ["http://10.0.0.89:8123"]
        table => "detail.kd"
        request_tolerance => 5
        flush_size => 3000
        pool_max => 1000
        mutations => {
            "id" => "id"
            "user" => "user"
            "total" => "total"
            "free" => "free"
            "relId" => "relId"
            "type" => "type"
            "time" => "time"
        }
    }

}