4.5.4 kafka数据导入

在lsql-site.properties中添加如下的配置,并更改相关连接参数,params参数需要从生产上拷贝一份过来,给出例子如下:

#配置及各个kafka消费,多个以逗号分隔,kafka1是一个后缀与下面的xxxx.kakfka1对应
cl.stream.reader.list=kafka1
#一组kafka配置包括一下配置
#默认 无需改动
cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer
#parser 如果是json数据可以保持默认如下,如果是自定义的解析,配置为该类的全类名
cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser
#该组消费的topic
kafka.topic.kafka1=lsql_data
#该组消费的kafka机器连接多台以逗号分隔
bootstrap.servers.kafka1=kafka1.lucene.cn:9092
#该组消费的group名
kafka.group.kafka1=lsql_data_group
#一些提高消费的kafka参数配置 ,可以是其他的kafka参数配置
kafka.conf.params.kafka1=message.max.bytes:20000000;fetch.max.bytes:20485760;max.partition.fetch.bytes:20485760;fetch.message.max.bytes:20000000;replica.fetch.max.bytes:21000000

默认的CLJsonParser支持的数据格式如下

{"tablename":"example","partition":"20151005","a1":4,"a2":"l_4","a3":14,"a4":4}

默认的Kafka导入数据只支持Json格式,如果需要支持其他格式,需要自己通过java写Parser。

注:Kafka配置多个topic进行消费时,虽未在程序上进行限制,但不建议配置太多(建议10个之内)。

Copyright © lucene.xin 2020 all right reserved修改时间: 2021-07-02 11:42:23

results matching ""

    No results matching ""