Commit a721a7ea authored by 刘林's avatar 刘林

fix(message):过滤不需要经message转发消息数据库名称

parent 252b2196
package com.yeejoin.amos.message.kafka;
/**
* @author LiuLin
* @date 2023年09月02日 11:02
*/
public interface Constant {
String INSERT = "INSERT";
String UPDATE = "UPDATE";
}
package com.yeejoin.amos.message.kafka;
/**
* 数据库表名称
* @author LiuLin
* @date 2023年08月02日 11:02
*/
public enum DBTableTypeEnum {
p_check,
idx_biz_hidden_danger,
idx_biz_defect,
wl_equipment_specific_alarm,
;
public static DBTableTypeEnum have(String type){
for (DBTableTypeEnum tableType : DBTableTypeEnum.values()) {
if(tableType.name().equals(type)) {
return tableType;
}
}
return null;
}
}
\ No newline at end of file
......@@ -41,8 +41,8 @@ public class KafkaConsumerService {
JSONObject messageObj = JSONObject.fromObject(message);
try {
String topic = null;
JSONObject data=null;
if(messageObj.has("topic")){
JSONObject data = null;
if (messageObj.has("topic")) {
topic = messageObj.getString("topic");
data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
......@@ -75,8 +75,9 @@ public class KafkaConsumerService {
/**
* 省级消息转发
*
* @param message 省级消息
* @param ack ack
* @param ack ack
*/
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.risk.topics}'.split(',')}", concurrency = "2")
public void consumerSingle1(String message, Acknowledgment ack) {
......@@ -87,9 +88,9 @@ public class KafkaConsumerService {
String type = jsonObject.optString("type");
String table = jsonObject.optString("table");
if (StringUtils.isNoneEmpty(type, table)) {
if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
if (Arrays.asList(Constant.INSERT, Constant.UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
JSONArray array = jsonObject.getJSONArray("data");
JSONObject data = (JSONObject)array.get(0);
JSONObject data = (JSONObject) array.get(0);
data.put("dbType", type);
data.put("table", table);
emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment