2.9.3 实例验证

  • 离线load导入

    1. 命令行操作:

      • 1.准备如下:test_001.log文件

        1,李世民,55
        2,李元霸,20
        ...
        
      • 2.上传到hdfs

        hadoop fs -put ./test_001.log /data/test/
        
      • 3.建表进入lsql的bin目录下,执行sh beeline.sh,进入后执行建表语句:

        create table person(id y_string_id,name y_string_id,age y_int_id);
        
      • 4.load导入

        进入到lsql的bin目录下,执行load.sh 命令,将数据追加到data_operate的分区里。

        HDFS导入命令:
        ./load.sh -t person -p data_operate -tp txt -f /data/test/test_001.log -sp , -fl id,name,age
        
        本地导入命令:(需要增加-local参数)
        ./load.sh -t common_example001 -local -p data_operate -tp txt -f /root/test.txt -sp , -fl id,name,age
        
        参数说明:
        -t     表名
        -p     分区名  
        -tp    文件类型目前支持txt和json  
        -f     文件在hdfs上的存储路径
        -sp    txt格式的文件分隔符,支持正则,不可见字符,可以通过urlencode编码 
        -fl     txt格式的文件对应的lsql列的名字
        -ov    (可选)导入前是否清空当前分区的数据 
        -cond  (可选)在追加数据前,先根据条件删除旧的数据    
        -local  本地导入时使用
        
    2. 脚本执行测试

      离线load测试脚本

      #!/bin/bash
      lsqlhome=/opt/software/lsql
      touch /root/test.json
      echo '{"tablename":"test","partition":"20191122","data":{"id":"4","name":"李元霸","age":14}}'  > /root/test.json
      sh $lsqlhome/bin/sql.sh 'create table test(id string,name string,age int)'
      sh $lsqlhome/bin/load.sh -t test -local -f /root/test.json -p 20191121 -tp json -fl id,name,age
      
      sh $lsqlhome/bin/sql.sh 'select * from test where partition like '20191121'   limit 20'
      if [ $? -eq 0 ]; then
           echo "load succeed"
               rm -rf /root/test.json
               sh $lsqlhome/bin/sql.sh 'drop table test'
      else
           echo "load failed"
      fi
      
  • kafka实时导入数据

    1. java程序部分代码简单示例

              Properties ps = new Properties();
              ps.put("bootstrap.servers", "10.10.12.28:9092,10.10.12.29:9092,10.10.12.30:9092");
              ps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              ps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
              String topic = "lsql_test";
              KafkaProducer< String, String> producer = new KafkaProducer<>(ps);
              for(long i =0l;i<50000000l;i++){
                  String id = "1";
                  String name = "name";
                  int age = 19;
                  StringBuilder sb = new StringBuilder();
                  String partition = "20191121";
                  sb.append("{\"tablename\":\"person_new\",\"partition\":\""+partition+"\",");
                  sb.append("\"id\":\""+id+"\",");        
                  sb.append("\"name\":\""name+"\",");            
                  sb.append("\"age\":"+age+"\"}");
                  ProducerRecord<String , String> re = new ProducerRecord<String, String>(topic, sb.toString());
                  producer.send(re);
              }
      
    2. kafka实时导入测试脚本

      #!/bin/bash
      ### Kafka实时导入部分
      #LSQL Kafka Configuration
      #set -x
      export KAFKA_HOME=/opt/software/kafka
      export PATH=$KAFKA_HOME/bin:$PATH
      k_ip=10.10.12.28
      lsql=/opt/software/lsql
      mv $lsql/config/site/lsql-site.properties $lsql/config/site/lsql-site.properties.bak
      echo '{"tablename":"sgl_kafka_tb","partition":"default","co11":"录信数软","coL2":"UpUpUp","co3":"十年饮冰","col4":"难凉热血"' > sql_kafka.json
      echo '
      #Kafka导入数据实例
      cl.stream.reader.list=kafka1
      cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer
      cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser
      kafka.topic.kafka1=rovisuk
      bootstrap.servers.kafka1=$k_ip
      kafka.group.kafka1=lsql_ts' >> $lsql/config/site/lsql-site.properties
      
      sh $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $k_ip:2181 --create --topic rovisuk --partitions 1 --replication-factor 2
      sh $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $k_ip:9092 --topic rovisuk < lsql_kafka.json
      echo 'create table sgl_kafka_tb(coll string,co12 string,col3 string col4 string);
      select count(*) from sgl_kafka_tb;
      select * from sgl_kafka_tb limit 10;' > lsql_kafka.sql
      sh $lsql/bin/beeline.sh -f lsql_kafka.sql > lsql_kafka.log
      
      rm -rf lsql_kafka.json
      rm -rf lsql_kafka.sql
      mv $lsql/config/site/lsql-site.properties.bak $lsql/config/site/lsql-site.properties
      
Copyright © lucene.xin 2020 all right reserved修改时间: 2021-07-06 17:55:56

results matching ""

    No results matching ""