zdsg
发布于

Kafka流数据入库,表字段过多,如何轻松配置topic的元数据

修改 littleboy 相关信息的元数据库即可

环境信息如下

lava 4.x
littleboy 4.x

假设有一个 topic 为 test2,然后里面的数据是'|'分割,有 5 列

  • 1.从 kafkatopic 表中获取到 topic 为 test2 的 id
-- clusterid需要从cluster表中获取;假设clusterid=2,server='192.168.80.118:9092,192.168.80.119:9092,192.168.80.120:9092'
insert into kafkatopic(clusterid,topicname,server,createdate) values(2,'test2','192.168.80.118:9092,192.168.80.119:9092,192.168.80.120:9092',current_date);
-- 假设返回结果id=25
select id from kafkatopic where topicname='test2'; 
  • 2.更新 topicmessage
-- 25是关联到kafkatopic的id, 中间这个topicmessagename是自定义
insert into topicmessage(topicmessagename,kafkatopicid) values('test2message',25);
-- 假设返回的id=18
select id from topicmessage where topicmessagename='test2message' and kafkatopicid=25;
  • 3.更新 topicfieldinfo 其中 index 的值需要从 0 开始递增, 所以创建一个 index 值的序列,调用即可
CREATE  SEQUENCE index_seq increment by 1 minvalue -1 maxvalue 20000 start with 0 cache 20000 owned by topicfieldinfo.index;
-- 示例的18是关联到topicmessage的id
insert into topicfieldinfo(topicmessageid,index,columnname,columntype,classifyflag,classifyvalue) values(18,nextval('index_seq'),'col1_name','text',false,'');  
insert into topicfieldinfo(topicmessageid,index,columnname,columntype,classifyflag,classifyvalue) values(18,nextval('index_seq'),'col2_name','text',false,''); 
insert into topicfieldinfo(topicmessageid,index,columnname,columntype,classifyflag,classifyvalue) values(18,nextval('index_seq'),'col3_name','text',false,'');
insert into topicfieldinfo(topicmessageid,index,columnname,columntype,classifyflag,classifyvalue) values(18,nextval('index_seq'),'col4_name','text',false,'');
insert into topicfieldinfo(topicmessageid,index,columnname,columntype,classifyflag,classifyvalue) values(18,nextval('index_seq'),'col5_name','text',false,'');
评论
    test