Logstash 同步MySQL数据到ClickHouse

一、环境
1、操作系统:CentOS 7
2、物理环境:8C16G
二、安装
系统版本:logstash 7.17.13
官网下载:https://www.elastic.co/cn/downloads/logstash

/data/soft/logstash/bin/logstash-plugin list
Using bundled JDK: /data/soft/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
logstash-codec-avro
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-plain
logstash-codec-rubydebug
logstash-filter-aggregate
logstash-filter-anonymize
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-de_dot
logstash-filter-dissect
logstash-filter-dns
logstash-filter-drop
logstash-filter-elasticsearch
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-http
logstash-filter-json
logstash-filter-kv
logstash-filter-memcached
logstash-filter-metrics
logstash-filter-mutate
logstash-filter-prune
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-translate
logstash-filter-truncate
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
logstash-input-azure_event_hubs
logstash-input-beats
└── logstash-input-elastic_agent (alias)
logstash-input-couchdb_changes
logstash-input-dead_letter_queue
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jms
logstash-input-pipe
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
logstash-integration-elastic_enterprise_search
 ├── logstash-output-elastic_app_search
 └──  logstash-output-elastic_workplace_search
logstash-integration-jdbc
 ├── logstash-input-jdbc
 ├── logstash-filter-jdbc_streaming
 └── logstash-filter-jdbc_static
logstash-integration-kafka
 ├── logstash-input-kafka
 └── logstash-output-kafka
logstash-integration-rabbitmq
 ├── logstash-input-rabbitmq
 └── logstash-output-rabbitmq
logstash-output-clickhouse
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pipe
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs
logstash-patterns-core

单加插件:logstash-output-clickhouse
下载地址:https://github.com/mhsxq/logstash-output-clickhouse
安装方式:

yum install -y gem
gem build logstash-output-clickhouse.gemspecdist
/data/soft/logstash/bin/plugin install logstash-output-clickhouse-X.Y.Z.gem

mysql JDBC下载地址:https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.49.tar.gz

三、配置

cat logstash-table_test.conf
input {
  stdin {}
  jdbc {
    type => "jdbc"
     # 数据库连接地址
    jdbc_connection_string => "jdbc:mysql://ip:port/dbname?useSSL=false"
     # 数据库连接账号密码
    jdbc_user => "user"
    jdbc_password => "password"
     # MySQL依赖包路径;
    jdbc_driver_library => "/data/soft/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
     # 数据库重连尝试次数
    connection_retry_attempts => "5"
     # 判断数据库连接是否可用,默认false不开启
    jdbc_validate_connection => "true"
     # 数据库连接可用校验超时时间,默认3600S
    jdbc_validation_timeout => "60"
     # 开启分页查询(默认false不开启);
    jdbc_paging_enabled => "true"
     # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
    jdbc_page_size => "10000"
     # statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
     # sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
    statement => "select id,create_time,update_time,ip from thumbs_up where id > :sql_last_value"
     # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
     # lowercase_column_names => false
     # Value can be any of: fatal,error,warn,info,debug,默认info;
     # sql_log_level => warn
     # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到 last_run_metadata_path 指定的文件中;
    record_last_run => true
     # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
    use_column_value => true
     # 需要记录的字段,用于增量同步,需是数据库字段
    tracking_column => "id"
     # Value can be any of: numeric,timestamp,Default value is "numeric"
    tracking_column_type => numeric
     # record_last_run上次数据存放位置;
    last_run_metadata_path => "/data/soft/logstash/sync/table_test_last_id.txt"
     # 是否清除 last_run_metadata_path 的记录,需要增量同步时此字段必须为false;
    clean_run => false
     # 同步频率(分 时 天 月 年),默认每分钟同步一次;
    schedule => "*/5 * * * *"
  }
} 

output {
    clickhouse {
        http_hosts => ["http://127.0.0.1:8123/"]
        table => "db_test.table_test"
        request_tolerance => 5
        headers => ["Authorization" , "Basic ZGVmYXVsdDoxMjM0NTY="]
        mutations => {
id => id
create_time => create_time
update_time => update_time
ip => ip
        }
    }
}
#output的 headers => ["Authorization" , "Basic ZGVmYXVsdDoxMjM0NTY="] 中,为base64加密
["Authorization" , "Basic ZGVmYXVsdDoxMjM0NTY="]中只有`ZGVmYXVsdDoxMjM0NTY=`是加密,其余均为默认固定值:
`default:123456`经base64加密后,就是`ZGVmYXVsdDoxMjM0NTY=`

四、启动程序

nohup /data/soft/logstash/bin/logstash -f /data/soft/logstash/configlogstash-table_test.conf --path.data=/data/soft/logstash/path_data/table_test > /data/soft/logstash/logs/script_logs/table_test.log 0</dev/null &