Commit 277ecdc7 authored by litengwei's avatar litengwei

messgae修改

parent 8f69a4e0
package com.yeejoin.amos.message.kafka; package com.yeejoin.amos.message.kafka;
import com.alibaba.excel.metadata.Sheet;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.message.utils.ClassToJsonUtil; import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -82,7 +83,7 @@ public class KafkaConsumerService implements ApplicationRunner { ...@@ -82,7 +83,7 @@ public class KafkaConsumerService implements ApplicationRunner {
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) { if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
log.info("kafka上报mqtt数据:{}", JSON.toJSONString(jsonObj)); log.info("kafka上报mqtt数据:{}", JSON.toJSONString(jsonObj));
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 2, false);
} }
} }
} catch (MqttException e) { } catch (MqttException e) {
...@@ -108,7 +109,7 @@ public class KafkaConsumerService implements ApplicationRunner { ...@@ -108,7 +109,7 @@ public class KafkaConsumerService implements ApplicationRunner {
if (messageObj.has("topic")) { if (messageObj.has("topic")) {
topic = messageObj.getString("topic"); topic = messageObj.getString("topic");
data = messageObj.getJSONObject("data"); data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 2, false);
ack.acknowledge(); ack.acknowledge();
log.info("消息转发成功" + messageObj); log.info("消息转发成功" + messageObj);
} }
...@@ -140,7 +141,7 @@ public class KafkaConsumerService implements ApplicationRunner { ...@@ -140,7 +141,7 @@ public class KafkaConsumerService implements ApplicationRunner {
public static List<String> readExcelFile(String filePath) { public static List<String> readExcelFile(String filePath) {
try (FileInputStream fis = new FileInputStream(new File(filePath)); try (FileInputStream fis = new FileInputStream(new File(filePath));
Workbook workbook = new XSSFWorkbook(fis)) { Workbook workbook = new XSSFWorkbook(fis)) {
Sheet sheet = workbook.getSheetAt(0); // 获取第一个工作表 com.alibaba.excel.metadata.Sheet sheet = workbook.getSheetAt(0); // 获取第一个工作表
List<String> list = getColumnData(sheet); List<String> list = getColumnData(sheet);
// 在这里处理list中的数据,例如打印、存储等操作 // 在这里处理list中的数据,例如打印、存储等操作
return list; return list;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment