2.9.3 实例验证
离线load导入
命令行操作:
1.准备如下:test_001.log文件
1,李世民,55 2,李元霸,20 ...
2.上传到hdfs
hadoop fs -put ./test_001.log /data/test/
3.建表进入lsql的bin目录下,执行sh beeline.sh,进入后执行建表语句:
create table person(id y_string_id,name y_string_id,age y_int_id);
4.load导入
进入到lsql的bin目录下,执行load.sh 命令,将数据追加到data_operate的分区里。
HDFS导入命令: ./load.sh -t person -p data_operate -tp txt -f /data/test/test_001.log -sp , -fl id,name,age 本地导入命令:(需要增加-local参数) ./load.sh -t common_example001 -local -p data_operate -tp txt -f /root/test.txt -sp , -fl id,name,age 参数说明: -t 表名 -p 分区名 -tp 文件类型目前支持txt和json -f 文件在hdfs上的存储路径 -sp txt格式的文件分隔符,支持正则,不可见字符,可以通过urlencode编码 -fl txt格式的文件对应的lsql列的名字 -ov (可选)导入前是否清空当前分区的数据 -cond (可选)在追加数据前,先根据条件删除旧的数据 -local 本地导入时使用
脚本执行测试
离线load测试脚本
#!/bin/bash lsqlhome=/opt/software/lsql touch /root/test.json echo '{"tablename":"test","partition":"20191122","data":{"id":"4","name":"李元霸","age":14}}' > /root/test.json sh $lsqlhome/bin/sql.sh 'create table test(id string,name string,age int)' sh $lsqlhome/bin/load.sh -t test -local -f /root/test.json -p 20191121 -tp json -fl id,name,age sh $lsqlhome/bin/sql.sh 'select * from test where partition like '20191121' limit 20' if [ $? -eq 0 ]; then echo "load succeed" rm -rf /root/test.json sh $lsqlhome/bin/sql.sh 'drop table test' else echo "load failed" fi
kafka实时导入数据
java程序部分代码简单示例
Properties ps = new Properties(); ps.put("bootstrap.servers", "10.10.12.28:9092,10.10.12.29:9092,10.10.12.30:9092"); ps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); String topic = "lsql_test"; KafkaProducer< String, String> producer = new KafkaProducer<>(ps); for(long i =0l;i<50000000l;i++){ String id = "1"; String name = "name"; int age = 19; StringBuilder sb = new StringBuilder(); String partition = "20191121"; sb.append("{\"tablename\":\"person_new\",\"partition\":\""+partition+"\","); sb.append("\"id\":\""+id+"\","); sb.append("\"name\":\""name+"\","); sb.append("\"age\":"+age+"\"}"); ProducerRecord<String , String> re = new ProducerRecord<String, String>(topic, sb.toString()); producer.send(re); }
kafka实时导入测试脚本
#!/bin/bash ### Kafka实时导入部分 #LSQL Kafka Configuration #set -x export KAFKA_HOME=/opt/software/kafka export PATH=$KAFKA_HOME/bin:$PATH k_ip=10.10.12.28 lsql=/opt/software/lsql mv $lsql/config/site/lsql-site.properties $lsql/config/site/lsql-site.properties.bak echo '{"tablename":"sgl_kafka_tb","partition":"default","co11":"录信数软","coL2":"UpUpUp","co3":"十年饮冰","col4":"难凉热血"' > sql_kafka.json echo ' #Kafka导入数据实例 cl.stream.reader.list=kafka1 cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser kafka.topic.kafka1=rovisuk bootstrap.servers.kafka1=$k_ip kafka.group.kafka1=lsql_ts' >> $lsql/config/site/lsql-site.properties sh $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $k_ip:2181 --create --topic rovisuk --partitions 1 --replication-factor 2 sh $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $k_ip:9092 --topic rovisuk < lsql_kafka.json echo 'create table sgl_kafka_tb(coll string,co12 string,col3 string col4 string); select count(*) from sgl_kafka_tb; select * from sgl_kafka_tb limit 10;' > lsql_kafka.sql sh $lsql/bin/beeline.sh -f lsql_kafka.sql > lsql_kafka.log rm -rf lsql_kafka.json rm -rf lsql_kafka.sql mv $lsql/config/site/lsql-site.properties.bak $lsql/config/site/lsql-site.properties