Commit d2d88062 authored by litengwei's avatar litengwei

代码提交

parent 4565b116
......@@ -7,6 +7,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
......@@ -28,9 +29,13 @@ public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
@Value("${system.zxj}")
private boolean isZxj;
@Autowired
protected EmqKeeper emqKeeper;
/**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
......@@ -78,27 +83,29 @@ public class KafkaConsumerService {
* @param message 省级消息
* @param ack ack
*/
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.risk.topics}'.split(',')}", concurrency = "2")
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle1(String message, Acknowledgment ack) {
Optional<?> messages = Optional.ofNullable(message);
if (messages.isPresent()) {
try {
JSONObject jsonObject = JSONObject.fromObject(message);
String type = jsonObject.optString("type");
String table = jsonObject.optString("table");
if (StringUtils.isNoneEmpty(type, table)) {
if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
JSONArray array = jsonObject.getJSONArray("data");
JSONObject data = (JSONObject)array.get(0);
data.put("dbType", type);
data.put("table", table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
log.info("省级消息: {}", data);
if(isZxj) {
Optional<?> messages = Optional.ofNullable(message);
if (messages.isPresent()) {
try {
JSONObject jsonObject = JSONObject.fromObject(message);
String type = jsonObject.optString("type");
String table = jsonObject.optString("table");
if (StringUtils.isNoneEmpty(type, table)) {
if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
JSONArray array = jsonObject.getJSONArray("data");
JSONObject data = (JSONObject)array.get(0);
data.put("dbType", type);
data.put("table", table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
log.info("省级消息: {}", data);
}
}
} catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e);
ack.acknowledge();
}
} catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e);
ack.acknowledge();
}
}
}
......
#\u6CE8\u518C\u4E2D\u5FC3\u5730\u5740
eureka.client.service-url.defaultZone =http://172.16.10.216:10001/eureka/
eureka.client.service-url.defaultZone =http://172.16.11.201:10001/eureka/
eureka.instance.prefer-ip-address=true
management.endpoint.health.show-details=always
management.endpoints.web.exposure.include=*
eureka.instance.health-check-url-path=/actuator/health
eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator
eureka.instance.status-page-url-path=/actuator/info
eureka.instance.metadata-map.management.api-docs=http://172.16.10.216:${server.port}${server.servlet.context-path}/swagger-ui.html
eureka.instance.metadata-map.management.api-docs=http://172.16.11.201:${server.port}${server.servlet.context-path}/swagger-ui.html
# kafka\u96C6\u7FA4\u4FE1\u606F
......@@ -74,7 +74,7 @@ management.health.redis.enabled=false
## emqx
emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.10.216:1883
emqx.broker=tcp://172.16.11.201:1883
emqx.client-user-name=admin
emqx.client-password=public
emqx.max-inflight=1000
......@@ -86,14 +86,14 @@ emqx.max-inflight=1000
kafka.topics=JKXT2BP-XFZX-Topic
#\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
#emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj
emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj
##\u4E2D\u5FC3\u7EA7\u914D\u7F6E\u914D\u7F6E
##\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
kafka.risk.topics=JKXT2BP-XFYY-Topic
system.zxj=false
#
##\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning,emq.risk.qrcode.put,emq.risk.qrcode.clean
#emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning,emq.risk.qrcode.put,emq.risk.qrcode.clean
queue.kafka.topics=null
kafka.auto-startup=false
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment