Commit 67b444ca authored by 刘林's avatar 刘林

fix(message):添加省级消息转发

parent 2e2b9a76
...@@ -40,9 +40,13 @@ public class KafkaConsumerService { ...@@ -40,9 +40,13 @@ public class KafkaConsumerService {
public void consumerSingle(String message, Acknowledgment ack) { public void consumerSingle(String message, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
try { try {
String topic = messageObj.getString("topic"); String topic = null;
JSONObject data = messageObj.getJSONObject("data"); JSONObject data=null;
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); if(messageObj.has("topic")){
topic = messageObj.getString("topic");
data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
}
} catch (Exception e) { } catch (Exception e) {
log.error("消息转发失败" + e.getMessage(), e); log.error("消息转发失败" + e.getMessage(), e);
} }
...@@ -86,9 +90,10 @@ public class KafkaConsumerService { ...@@ -86,9 +90,10 @@ public class KafkaConsumerService {
if (Arrays.asList("INSERT", "UPDATE").contains(type)) { if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
JSONArray array = jsonObject.getJSONArray("data"); JSONArray array = jsonObject.getJSONArray("data");
JSONObject data = (JSONObject)array.get(0); JSONObject data = (JSONObject)array.get(0);
data.put("type", type); data.put("dbType", type);
data.put("table", table); data.put("table", table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
log.info("省级消息: {}", data);
} }
} }
} catch (MqttException e) { } catch (MqttException e) {
......
#注册中心地址 #\u6CE8\u518C\u4E2D\u5FC3\u5730\u5740
eureka.client.service-url.defaultZone =http://172.16.10.216:10001/eureka/ eureka.client.service-url.defaultZone =http://172.16.10.216:10001/eureka/
eureka.instance.prefer-ip-address=true eureka.instance.prefer-ip-address=true
management.endpoint.health.show-details=always management.endpoint.health.show-details=always
...@@ -9,62 +9,62 @@ eureka.instance.status-page-url-path=/actuator/info ...@@ -9,62 +9,62 @@ eureka.instance.status-page-url-path=/actuator/info
eureka.instance.metadata-map.management.api-docs=http://172.16.10.216:${server.port}${server.servlet.context-path}/swagger-ui.html eureka.instance.metadata-map.management.api-docs=http://172.16.10.216:${server.port}${server.servlet.context-path}/swagger-ui.html
# kafka集群信息 # kafka\u96C6\u7FA4\u4FE1\u606F
spring.kafka.bootstrap-servers=172.16.10.215:9092 spring.kafka.bootstrap-servers=172.16.10.215:9092
# 生产者配置 # \u751F\u4EA7\u8005\u914D\u7F6E
# 设置大于0的值,则客户端会将发送失败的记录重新发送 # 重试次数 # \u8BBE\u7F6E\u5927\u4E8E0\u7684\u503C\uFF0C\u5219\u5BA2\u6237\u7AEF\u4F1A\u5C06\u53D1\u9001\u5931\u8D25\u7684\u8BB0\u5F55\u91CD\u65B0\u53D1\u9001 # \u91CD\u8BD5\u6B21\u6570
spring.kafka.producer.retries=1 spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=172.16.10.215:9092 spring.kafka.producer.bootstrap-servers=172.16.10.215:9092
#16K #16K
spring.kafka.producer.batch-size=16384 spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.buffer-memory=33554432
# 应答级别 # \u5E94\u7B54\u7EA7\u522B
# acks=0 把消息发送到kafka就认为发送成功 # acks=0 \u628A\u6D88\u606F\u53D1\u9001\u5230kafka\u5C31\u8BA4\u4E3A\u53D1\u9001\u6210\u529F
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=1 \u628A\u6D88\u606F\u53D1\u9001\u5230kafka leader\u5206\u533A\uFF0C\u5E76\u4E14\u5199\u5165\u78C1\u76D8\u5C31\u8BA4\u4E3A\u53D1\u9001\u6210\u529F
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 # acks=all \u628A\u6D88\u606F\u53D1\u9001\u5230kafka leader\u5206\u533A\uFF0C\u5E76\u4E14leader\u5206\u533A\u7684\u526F\u672Cfollower\u5BF9\u6D88\u606F\u8FDB\u884C\u4E86\u540C\u6B65\u5C31\u4EFB\u52A1\u53D1\u9001\u6210\u529F
spring.kafka.producer.acks=1 spring.kafka.producer.acks=1
# 指定消息key和消息体的编解码方式 # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
# # 批量处理的最大大小 单位 byte # # \u6279\u91CF\u5904\u7406\u7684\u6700\u5927\u5927\u5C0F \u5355\u4F4D byte
# batch-size: 4096 # batch-size: 4096
# # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # # \u53D1\u9001\u5EF6\u65F6,\u5F53\u751F\u4EA7\u7AEF\u79EF\u7D2F\u7684\u6D88\u606F\u8FBE\u5230batch-size\u6216\u63A5\u6536\u5230\u6D88\u606Flinger.ms\u540E,\u751F\u4EA7\u8005\u5C31\u4F1A\u5C06\u6D88\u606F\u63D0\u4EA4\u7ED9kafka
# buffer-memory: 33554432 # buffer-memory: 33554432
# # 客户端ID # # \u5BA2\u6237\u7AEFID
# client-id: hello-kafka # client-id: hello-kafka
# # 消息压缩:none、lz4、gzip、snappy,默认为 none。 # # \u6D88\u606F\u538B\u7F29\uFF1Anone\u3001lz4\u3001gzip\u3001snappy\uFF0C\u9ED8\u8BA4\u4E3A none\u3002
# compression-type: gzip # compression-type: gzip
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消费者组 # \u6D88\u8D39\u8005\u7EC4
# 当kafka中没有初始offset或offset超出范围时将自动重置offset # \u5F53kafka\u4E2D\u6CA1\u6709\u521D\u59CBoffset\u6216offset\u8D85\u51FA\u8303\u56F4\u65F6\u5C06\u81EA\u52A8\u91CD\u7F6Eoffset
# earliest:重置为分区中最小的offset # earliest:\u91CD\u7F6E\u4E3A\u5206\u533A\u4E2D\u6700\u5C0F\u7684offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据) # latest:\u91CD\u7F6E\u4E3A\u5206\u533A\u4E2D\u6700\u65B0\u7684offset(\u6D88\u8D39\u5206\u533A\u4E2D\u65B0\u4EA7\u751F\u7684\u6570\u636E)
# none:只要有一个分区不存在已提交的offset,就抛出异常 # none:\u53EA\u8981\u6709\u4E00\u4E2A\u5206\u533A\u4E0D\u5B58\u5728\u5DF2\u63D0\u4EA4\u7684offset,\u5C31\u629B\u51FA\u5F02\u5E38
spring.kafka.consumer.group-id=zhTestGroup spring.kafka.consumer.group-id=zhTestGroup
spring.kafka.consumer.bootstrap-servers=172.16.10.215:9092 spring.kafka.consumer.bootstrap-servers=172.16.10.215:9092
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.enable-auto-commit=false
# 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # \u5F53\u5404\u5206\u533A\u4E0B\u6709\u5DF2\u63D0\u4EA4\u7684offset\u65F6\uFF0C\u4ECE\u63D0\u4EA4\u7684offset\u5F00\u59CB\u6D88\u8D39\uFF1B\u65E0\u63D0\u4EA4\u7684offset\u65F6\uFF0C\u4ECE\u5934\u5F00\u59CB\u6D88\u8D39
# # 自动提交的频率 单位 ms # # \u81EA\u52A8\u63D0\u4EA4\u7684\u9891\u7387 \u5355\u4F4D ms
# auto-commit-interval: 1000 # auto-commit-interval: 1000
# # 批量消费最大数量 # # \u6279\u91CF\u6D88\u8D39\u6700\u5927\u6570\u91CF
# max-poll-records: 100 # max-poll-records: 100
spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 # \u5F53\u6BCF\u4E00\u6761\u8BB0\u5F55\u88AB\u6D88\u8D39\u8005\u76D1\u542C\u5668\uFF08ListenerConsumer\uFF09\u5904\u7406\u4E4B\u540E\u63D0\u4EA4
# RECORD # RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 # \u5F53\u6BCF\u4E00\u6279poll()\u7684\u6570\u636E\u88AB\u6D88\u8D39\u8005\u76D1\u542C\u5668\uFF08ListenerConsumer\uFF09\u5904\u7406\u4E4B\u540E\u63D0\u4EA4
# BATCH # BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 # \u5F53\u6BCF\u4E00\u6279poll()\u7684\u6570\u636E\u88AB\u6D88\u8D39\u8005\u76D1\u542C\u5668\uFF08ListenerConsumer\uFF09\u5904\u7406\u4E4B\u540E\uFF0C\u8DDD\u79BB\u4E0A\u6B21\u63D0\u4EA4\u65F6\u95F4\u5927\u4E8ETIME\u65F6\u63D0\u4EA4
# TIME # TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 # \u5F53\u6BCF\u4E00\u6279poll()\u7684\u6570\u636E\u88AB\u6D88\u8D39\u8005\u76D1\u542C\u5668\uFF08ListenerConsumer\uFF09\u5904\u7406\u4E4B\u540E\uFF0C\u88AB\u5904\u7406record\u6570\u91CF\u5927\u4E8E\u7B49\u4E8ECOUNT\u65F6\u63D0\u4EA4
# COUNT # COUNT
# TIME | COUNT 有一个条件满足时提交 # TIME |\u3000COUNT\u3000\u6709\u4E00\u4E2A\u6761\u4EF6\u6EE1\u8DB3\u65F6\u63D0\u4EA4
# COUNT_TIME # COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 # \u5F53\u6BCF\u4E00\u6279poll()\u7684\u6570\u636E\u88AB\u6D88\u8D39\u8005\u76D1\u542C\u5668\uFF08ListenerConsumer\uFF09\u5904\u7406\u4E4B\u540E, \u624B\u52A8\u8C03\u7528Acknowledgment.acknowledge()\u540E\u63D0\u4EA4
# MANUAL # MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种 # \u624B\u52A8\u8C03\u7528Acknowledgment.acknowledge()\u540E\u7ACB\u5373\u63D0\u4EA4\uFF0C\u4E00\u822C\u4F7F\u7528\u8FD9\u79CD
# MANUAL_IMMEDIATE # MANUAL_IMMEDIATE
spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.ack-mode=manual_immediate
...@@ -80,20 +80,20 @@ emqx.client-password=public ...@@ -80,20 +80,20 @@ emqx.client-password=public
emqx.max-inflight=1000 emqx.max-inflight=1000
# 下面个配置默认站端 中心级系统的时候注释掉上边 放开下边 # \u4E0B\u9762\u4E2A\u914D\u7F6E\u9ED8\u8BA4\u7AD9\u7AEF \u4E2D\u5FC3\u7EA7\u7CFB\u7EDF\u7684\u65F6\u5019\u6CE8\u91CA\u6389\u4E0A\u8FB9 \u653E\u5F00\u4E0B\u8FB9
#站端配置 #\u7AD9\u7AEF\u914D\u7F6E
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
#kafka.topics=JKXT2BP-XFZX-Topic #kafka.topics=JKXT2BP-XFZX-Topic
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, #\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
#emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj #emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj
##中心级配置配置 ##\u4E2D\u5FC3\u7EA7\u914D\u7F6E\u914D\u7F6E
##需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 ##\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
kafka.topics=JKXT2BP-XFYY-Topic kafka.topics=JKXT2BP-XFYY-Topic
# #
##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, ##\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning,emq.risk.qrcode.put,emq.risk.qrcode.clean
queue.kafka.topics=null queue.kafka.topics=null
kafka.auto-startup=false kafka.auto-startup=false
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment