4.5.4 kafka数据导入
在lsql-site.properties中添加如下的配置,并更改相关连接参数,params参数需要从生产上拷贝一份过来,给出例子如下:
#配置及各个kafka消费,多个以逗号分隔,kafka1是一个后缀与下面的xxxx.kakfka1对应
cl.stream.reader.list=kafka1
#一组kafka配置包括一下配置
#默认 无需改动
cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer
#parser 如果是json数据可以保持默认如下,如果是自定义的解析,配置为该类的全类名
cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser
#该组消费的topic
kafka.topic.kafka1=lsql_data
#该组消费的kafka机器连接多台以逗号分隔
bootstrap.servers.kafka1=kafka1.lucene.cn:9092
#该组消费的group名
kafka.group.kafka1=lsql_data_group
#一些提高消费的kafka参数配置 ,可以是其他的kafka参数配置
kafka.conf.params.kafka1=message.max.bytes:20000000;fetch.max.bytes:20485760;max.partition.fetch.bytes:20485760;fetch.message.max.bytes:20000000;replica.fetch.max.bytes:21000000
默认的CLJsonParser支持的数据格式如下
{"tablename":"example","partition":"20151005","a1":4,"a2":"l_4","a3":14,"a4":4}
默认的Kafka导入数据只支持Json格式,如果需要支持其他格式,需要自己通过java写Parser。
注:Kafka配置多个topic进行消费时,虽未在程序上进行限制,但不建议配置太多(建议10个之内)。