7.7. 自定义消费
7.7.1. 自定义parser
只需要实现该类即可,具体是文本怎么解析,由parser决定
public abstract class CLStreamParser implements NotProguardUser {
public abstract void init(String prefix, CLConfigKVApi config, int readerIndex, int readerCount) throws IOException;
public abstract ClParsedRowList parse(Object raw) throws Throwable;
public void setReaderMeta(Object meta) {
}
}
7.7.2. 自定义Consumer
是从kafka获取实时数据,还是从其他消息队列里读,可以通过这个接口拓展
public abstract class CLStreamConsumer implements NotProguardUser
{
public abstract void init(String prefix, CLConfigKVApi config, int readerIndex, int readerCount) throws IOException;
public abstract List <Object> read() throws IOException;
public abstract void close() throws IOException;
public void setTrans(CLStreamImport stream) {
}
public void setParser(CLStreamParser parser) {
}
}
7.7.3. 使用方法
在lsql-site.properties中将对应的类配置上即可
如kafka的配置:
cl.stream.reader.list=kafka1
cl.stream.consumer.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLKafkaConsumer
cl.stream.parser.class.kafka1=cn.lucene.plugins.service.stream.api.impl.CLJsonParser