一、环境
1、操作系统:CentOS 7
2、物理环境:8C16G
二、安装
系统版本:logstash 7.17.13
官网下载:https://www.elastic.co/cn/downloads/logstash
/data/soft/logstash/bin/logstash-plugin list
Using bundled JDK: /data/soft/logstash/jdk
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
logstash-codec-avro
logstash-codec-cef
logstash-codec-collectd
logstash-codec-dots
logstash-codec-edn
logstash-codec-edn_lines
logstash-codec-es_bulk
logstash-codec-fluent
logstash-codec-graphite
logstash-codec-json
logstash-codec-json_lines
logstash-codec-line
logstash-codec-msgpack
logstash-codec-multiline
logstash-codec-netflow
logstash-codec-plain
logstash-codec-rubydebug
logstash-filter-aggregate
logstash-filter-anonymize
logstash-filter-cidr
logstash-filter-clone
logstash-filter-csv
logstash-filter-date
logstash-filter-de_dot
logstash-filter-dissect
logstash-filter-dns
logstash-filter-drop
logstash-filter-elasticsearch
logstash-filter-fingerprint
logstash-filter-geoip
logstash-filter-grok
logstash-filter-http
logstash-filter-json
logstash-filter-kv
logstash-filter-memcached
logstash-filter-metrics
logstash-filter-mutate
logstash-filter-prune
logstash-filter-ruby
logstash-filter-sleep
logstash-filter-split
logstash-filter-syslog_pri
logstash-filter-throttle
logstash-filter-translate
logstash-filter-truncate
logstash-filter-urldecode
logstash-filter-useragent
logstash-filter-uuid
logstash-filter-xml
logstash-input-azure_event_hubs
logstash-input-beats
└── logstash-input-elastic_agent (alias)
logstash-input-couchdb_changes
logstash-input-dead_letter_queue
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jms
logstash-input-pipe
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
logstash-integration-elastic_enterprise_search
├── logstash-output-elastic_app_search
└── logstash-output-elastic_workplace_search
logstash-integration-jdbc
├── logstash-input-jdbc
├── logstash-filter-jdbc_streaming
└── logstash-filter-jdbc_static
logstash-integration-kafka
├── logstash-input-kafka
└── logstash-output-kafka
logstash-integration-rabbitmq
├── logstash-input-rabbitmq
└── logstash-output-rabbitmq
logstash-output-clickhouse
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pipe
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs
logstash-patterns-core
单加插件:logstash-output-clickhouse
下载地址:https://github.com/mhsxq/logstash-output-clickhouse
安装方式:
yum install -y gem
gem build logstash-output-clickhouse.gemspecdist
/data/soft/logstash/bin/plugin install logstash-output-clickhouse-X.Y.Z.gem
mysql JDBC下载地址:https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-java-5.1.49.tar.gz
三、配置
cat logstash-table_test.conf
input {
stdin {}
jdbc {
type => "jdbc"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://ip:port/dbname?useSSL=false"
# 数据库连接账号密码
jdbc_user => "user"
jdbc_password => "password"
# MySQL依赖包路径;
jdbc_driver_library => "/data/soft/logstash/logstash-core/lib/jars/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库重连尝试次数
connection_retry_attempts => "5"
# 判断数据库连接是否可用,默认false不开启
jdbc_validate_connection => "true"
# 数据库连接可用校验超时时间,默认3600S
jdbc_validation_timeout => "60"
# 开启分页查询(默认false不开启);
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
jdbc_page_size => "10000"
# statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
# sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
statement => "select id,create_time,update_time,ip from thumbs_up where id > :sql_last_value"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
# lowercase_column_names => false
# Value can be any of: fatal,error,warn,info,debug,默认info;
# sql_log_level => warn
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到 last_run_metadata_path 指定的文件中;
record_last_run => true
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "id"
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => numeric
# record_last_run上次数据存放位置;
last_run_metadata_path => "/data/soft/logstash/sync/table_test_last_id.txt"
# 是否清除 last_run_metadata_path 的记录,需要增量同步时此字段必须为false;
clean_run => false
# 同步频率(分 时 天 月 年),默认每分钟同步一次;
schedule => "*/5 * * * *"
}
}
output {
clickhouse {
http_hosts => ["http://127.0.0.1:8123/"]
table => "db_test.table_test"
request_tolerance => 5
headers => ["Authorization" , "Basic ZGVmYXVsdDoxMjM0NTY="]
mutations => {
id => id
create_time => create_time
update_time => update_time
ip => ip
}
}
}
#output的 headers => ["Authorization" , "Basic ZGVmYXVsdDoxMjM0NTY="] 中,为base64加密
["Authorization" , "Basic ZGVmYXVsdDoxMjM0NTY="]中只有`ZGVmYXVsdDoxMjM0NTY=`是加密,其余均为默认固定值:
`default:123456`经base64加密后,就是`ZGVmYXVsdDoxMjM0NTY=`
四、启动程序
nohup /data/soft/logstash/bin/logstash -f /data/soft/logstash/configlogstash-table_test.conf --path.data=/data/soft/logstash/path_data/table_test > /data/soft/logstash/logs/script_logs/table_test.log 0</dev/null &