7.7. 自定义消费

7.7.1. 自定义parser

只需要实现该类即可,具体是文本怎么解析,由parser决定

public abstract class CLStreamParser implements NotProguardUser {
    public abstract void init(String prefix, CLConfigKVApi config, int readerIndex, int readerCount) throws IOException;

    public abstract ClParsedRowList parse(Object raw) throws Throwable;

    public void setReaderMeta(Object meta) { 
    }
}

7.7.2. 自定义Consumer

是从kafka获取实时数据,还是从其他消息队列里读,可以通过这个接口拓展

public abstract class  CLStreamConsumer implements NotProguardUser
{
    public abstract void init(String prefix, CLConfigKVApi config, int readerIndex, int readerCount) throws IOException;

    public abstract List <Object> read() throws IOException;

    public abstract void close() throws IOException;

    public void setTrans(CLStreamImport stream) {

    }

    public void setParser(CLStreamParser parser) {

    }
}

7.7.3. 使用方法

在lsql-site.properties中将对应的类配置上即可

如kafka的配置:

cl.stream.reader.list=kafka1
cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer
cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser
Copyright © lucene.xin 2020 all right reserved修改时间: 2021-07-02 11:42:23

results matching ""

    No results matching ""