Commit 3e872987 authored by 刘林's avatar 刘林

fix(message):调整kafka消息提交错误

parent 341b5dd9
...@@ -69,12 +69,11 @@ public class KafkaConsumerService { ...@@ -69,12 +69,11 @@ public class KafkaConsumerService {
data = messageObj; data = messageObj;
data.put("datatype", "state"); data.put("datatype", "state");
} }
//log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
} catch (MqttException e) { } catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e); log.error("消息转发失败" + e.getMessage(), e);
ack.acknowledge();
} }
ack.acknowledge();
} }
} }
...@@ -84,7 +83,7 @@ public class KafkaConsumerService { ...@@ -84,7 +83,7 @@ public class KafkaConsumerService {
* @param message 省级消息 * @param message 省级消息
* @param ack ack * @param ack ack
*/ */
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.risk.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle1(String message, Acknowledgment ack) { public void consumerSingle1(String message, Acknowledgment ack) {
if(isZxj) { if(isZxj) {
Optional<?> messages = Optional.ofNullable(message); Optional<?> messages = Optional.ofNullable(message);
...@@ -105,8 +104,8 @@ public class KafkaConsumerService { ...@@ -105,8 +104,8 @@ public class KafkaConsumerService {
} }
} catch (MqttException e) { } catch (MqttException e) {
log.error("消息转发失败" + e.getMessage(), e); log.error("消息转发失败" + e.getMessage(), e);
ack.acknowledge();
} }
ack.acknowledge();
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment