7.11. 多线程Streaminsert

7.11.1. 概述

​ Streaminsert是为了简化kafka的使用,我们配好kafka后,在lsql-site.properties里面配置CLJsonParser后便可以使用该功能,通过beeline或者jdbc接口交互,插入语句格式如下:

Streaminsert {“tablename”:”tb1”,”partition”:”p1”,”col”:1,”col2”:”xxxx”}

其中语句必须以streaminsert开头,后边是json数据。

7.11.2. 使用方法

1. lsql-site.properties文件配置

\#在安装好kafka后lsql内必须又对应的配置
cl.stream.reader.list=kafka1
cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer
cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser
kafka.topic.kafka1=xxx
bootstrap.servers.kafka1=xxx
kafka.group.kafka1=xxx
kafka.conf.params.kafka1=

2. 单条条插入数据格式

表结构:

create table test(id y_int_id,name y_string_ids,wg y_double_id);

通过beeline 或jdbc插入

#在json中tablename,partition,list固定,其中list是一个json数组,数据组每一列是该表的字段内容的json.

streaminsert {"tablename":"test","partition":"p1","list":[{"id":1,"name":"lucene1","wg":4.56},{"id":2,"name":"lucene2","wg":4.88},{"id":3,"name":"lucene3","wg":4.99}]};

img

3. 单条插入

表结构:

create table test(id y_int_id,name y_string_ids,wg y_double_id);

通过beeline 或jdbc插入

单条插入 除了tablename,partition(不写默认default分区)外,其他的是该表的列值

streaminsert {"tablename":"test","partition":"p2", "id":1,"name":"lucene1","wg":4.55};
streaminsert {"tablename":"test","partition":"p2", "id":2,"name":"lucene1","wg":4.66};
streaminsert {"tablename":"test","partition":"p2", "id":3,"name":"lucene1","wg":4.77};

img

Copyright © lucene.xin 2020 all right reserved修改时间: 2021-07-02 11:42:23

results matching ""

    No results matching ""