Commit 4e249086 authored by 张森's avatar 张森

kafka消息过滤

parent 940986df
...@@ -52,6 +52,17 @@ ...@@ -52,6 +52,17 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<!-- 读取excel文件 -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.3</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -5,20 +5,30 @@ import com.yeejoin.amos.message.utils.ClassToJsonUtil; ...@@ -5,20 +5,30 @@ import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import static com.yeejoin.amos.message.kafka.Constant.*;
/** /**
* kafka 消费服务 * kafka 消费服务
...@@ -28,7 +38,7 @@ import static com.yeejoin.amos.message.kafka.Constant.*; ...@@ -28,7 +38,7 @@ import static com.yeejoin.amos.message.kafka.Constant.*;
**/ **/
@Slf4j @Slf4j
@Service @Service
public class KafkaConsumerService { public class KafkaConsumerService implements ApplicationRunner {
private static final String MQTT_TOPIC = "romaSite/data/transmit"; private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport"; private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
...@@ -40,67 +50,20 @@ public class KafkaConsumerService { ...@@ -40,67 +50,20 @@ public class KafkaConsumerService {
@Value("classpath:/json/commonMessage.json") @Value("classpath:/json/commonMessage.json")
private Resource commonMessage; private Resource commonMessage;
// /** /**
// * 批量消费kafka消息 * execl文件路径,读取excel
// * Kafka消息转emq */
// * // @Value("${filter.excel.path:F:\\filterExcel11.xlsx}")
// * @param consumerRecords messages @Value("${filter.excel.path}")
// * @param ack ack private String filePath;
// */
// @KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}")
// public void listen1(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try {
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
// if (messageObj.has(TOPIC)) {
// emqKeeper.getMqttClient().publish(messageObj.optString(TOPIC), messageObj.getJSONObject(DATA).toString()
// .getBytes(StandardCharsets.UTF_8), 0, false);
// }
// log.info("kafka消费zhTestGroup消息{}", messageObj);
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecords, e);
// } finally {
// ack.acknowledge();
// }
// }
/** /**
* 批量消费kafka消息 * 服务启动时,内存存储execl文档中需要的编码信息
* 监听数据表变化kafka数据转发emq
*
* @param consumerRecords messages
* @param ack ack
*/ */
// @KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}") private List<String> codeListInfo;
// public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try { private static final String topic1 = "romaSite/data/transmit";
// if (isZxj) { private static final String topic2 = "romaSite/data/eventAlarm";
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
// String type = messageObj.optString(TYPE);
// String table = messageObj.optString(TABLE);
// if (Arrays.asList(INSERT, UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
// JSONObject data = (JSONObject) messageObj.getJSONArray(DATA).get(0);
// data.put(DB_TYPE, type);
// data.put(TABLE, table);
// emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// log.info("kafka消费province消息{}", messageObj);
// }
// }
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecords, e);
// } finally {
// ack.acknowledge();
// }
// }
/** /**
* 转发苏州,绍兴换流站Kafka数据对emq * 转发苏州,绍兴换流站Kafka数据对emq
...@@ -113,17 +76,11 @@ public class KafkaConsumerService { ...@@ -113,17 +76,11 @@ public class KafkaConsumerService {
try { try {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(record.value());
// if (messageObj.getJSONObject(BODY).isEmpty()) {
// messageObj.put(DATA_TYPE, STATE);
// }
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); if ((StringUtils.isEmpty(filePath)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
} }
} catch (MqttException e) { } catch (MqttException e) {
log.error("换流站转发Kafka消息失败" + e.getMessage(), e); log.error("换流站转发Kafka消息失败" + e.getMessage(), e);
...@@ -139,30 +96,11 @@ public class KafkaConsumerService { ...@@ -139,30 +96,11 @@ public class KafkaConsumerService {
Optional<?> message = Optional.ofNullable(record.value()); Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) { if (message.isPresent()) {
try { try {
// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONArray dataArray = messageObj.getJSONArray("data");
// JSONArray jsonArray = new JSONArray();
// String timestamp = "";
// for (Object obj : dataArray) {
// JSONObject finallyObj = new JSONObject();
// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
// finallyObj.put("eventtextL1", detail.get("description"));
// finallyObj.put("pointId", detail.get("astId"));
// finallyObj.put("time", detail.get("dateTime"));
// jsonArray.add(finallyObj);
// timestamp = detail.get("dateTime").toString();
// }
// JSONObject jsonObjectMessage = new JSONObject();
// jsonObjectMessage.put("warns", jsonArray);
// jsonObjectMessage.put("timestamp", timestamp);
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); if ((StringUtils.isEmpty(filePath)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
ack.acknowledge(); ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage()); log.error("解析数据失败,{}", e.getMessage());
...@@ -171,78 +109,6 @@ public class KafkaConsumerService { ...@@ -171,78 +109,6 @@ public class KafkaConsumerService {
} }
} }
} }
//
// /**
// * 韶山换流对接Kafka
// * @param record record
// * @param ack ack
// */
// @KafkaListener(id = "kafkaConsumer", groupId = "kafkaConsumerGroup", topics = "#{'${queue.kafka.shaoshan.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
// public void kafkaConsumer(ConsumerRecord<?, String> record, Acknowledgment ack) {
// Optional<?> message = Optional.ofNullable(record.value());
// if (message.isPresent()) {
// try {
//// JSONObject messageObj = JSONObject.fromObject(record.value());
//// JSONObject data = messageObj.getJSONObject("body");
//// JSONObject object = JSONObject.fromObject(record.value());
//// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
//// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
//
// JSONObject object = JSONObject.fromObject(record.value());
// com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
// emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// ack.acknowledge();
// } catch (MqttException e) {
// log.error("解析数据失败,{}", e.getMessage());
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// }
// }
// }
//
// /**
// * 事件告警对接Kafka
// * @param record record
// * @param ack ack
// * groupId = kafkaConsumerGroup
// * 该消息的消息格式为
// * {"data_class":"realdata","data_type":"alarm","op_type":"subscribe_emergency","condition":{"station_psr_id":"50edcb6c1b8a811030493c80a2014950ed9d4f59e8","station_name":"中州换流站","alarm_type":"yx_bw"},"data":[{"psrId":"D017020000000000000000999","astId":"D017020000000000000000999","equipType":"ASTType_0000111","eventType":"OtherSignal","alarmSource":"OWS","alarmLevel":"3","description":"2024-03-11 09:06:17::585 S2WCL12A E3.C01软水器再生结束信号 出现","dateTime":"2024-03-11 09:06:17.585"}]}
// */
//
// @KafkaListener(id = "kafkaConsumerEventAlarm", groupId = "kafkaConsumerGroupEventAlarm", topics = "#{'${queue.kafka.eventAlarm.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
// public void kafkaConsumerEventAlarm(ConsumerRecord<?, String> record, Acknowledgment ack) {
// Optional<?> message = Optional.ofNullable(record.value());
// if (message.isPresent()) {
// try {
//// JSONObject messageObj = JSONObject.fromObject(record.value());
//// JSONArray dataArray = messageObj.getJSONArray("data");
//// JSONArray jsonArray = new JSONArray();
//// String timestamp = "";
//// for (Object obj : dataArray) {
//// JSONObject finallyObj = new JSONObject();
//// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
//// finallyObj.put("eventtextL1", detail.get("description"));
//// finallyObj.put("pointId", detail.get("astId"));
//// finallyObj.put("time", detail.get("dateTime"));
//// jsonArray.add(finallyObj);
//// timestamp = detail.get("dateTime").toString();
//// }
//// JSONObject jsonObjectMessage = new JSONObject();
//// jsonObjectMessage.put("warns", jsonArray);
//// jsonObjectMessage.put("timestamp", timestamp);
//
// JSONObject object = JSONObject.fromObject(record.value());
// com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
// emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// ack.acknowledge();
// } catch (MqttException e) {
// log.error("解析数据失败,{}", e.getMessage());
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// }
// }
// }
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
...@@ -267,85 +133,74 @@ public class KafkaConsumerService { ...@@ -267,85 +133,74 @@ public class KafkaConsumerService {
} }
} }
@Override
public void run(ApplicationArguments args) {
codeListInfo = readExcelFile(filePath);
}
///** /**
// * 省级消息转发 * 判断是否发送emq消息
// * * @return true 发送 false 不发送
// * @param message 省级消息 */
// * @param ack ack private Boolean isSendEmq(com.alibaba.fastjson.JSONObject res) {
// */ String key = "";
//@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") if (!StringUtils.isEmpty(res.get("kafkaTopic")) && topic1.equals(res.get("kafkaTopic").toString())) {
//public void consumerSingle1(String message, Acknowledgment ack) { key = res.getJSONObject("data").get("key").toString();
// log.info("省级消息转发打印" + message); } else if (!StringUtils.isEmpty(res.get("kafkaTopic")) && topic2.equals(res.get("kafkaTopic").toString())) {
// if(isZxj) { key = res.getJSONObject("data").getJSONArray("warns").getJSONObject(0).get("pointId").toString();
// Optional<?> messages = Optional.ofNullable(message); }
// if (messages.isPresent()) { return !StringUtils.isEmpty(key) && codeListInfo.contains(key);
// try { }
// JSONObject jsonObject = JSONObject.fromObject(message);
// String type = jsonObject.optString("type");
// String table = jsonObject.optString("table");
// if (Arrays.asList(Constant.INSERT, Constant.UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
// if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
// JSONArray array = jsonObject.getJSONArray("data");
// JSONObject data = (JSONObject)array.get(0);
// data.put("dbType", type);
// data.put("table", table);
// emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// log.info("省级消息: {}", data);
// }
// }
// } catch (MqttException e) {
// log.error("消息转发失败" + e.getMessage(), e);
// }
// ack.acknowledge();
// }
// }
//}
public static List<String> readExcelFile(String filePath) {
try (FileInputStream fis = new FileInputStream(new File(filePath));
Workbook workbook = new XSSFWorkbook(fis)) {
Sheet sheet = workbook.getSheetAt(0); // 获取第一个工作表
List<String> list = getColumnData(sheet);
// 在这里处理list中的数据,例如打印、存储等操作
return list;
} catch (IOException e) {
e.printStackTrace();
}
return new ArrayList<>();
}
/* @KafkaListener(id = "consumerBatch", topicPartitions = { private static List<String> getColumnData(Sheet sheet) {
@TopicPartition(topic = "hello-batch1", partitions = "0"), List<String> list = new ArrayList<>();
@TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4")) Iterator<Row> rowIterator = sheet.iterator();
})*/ rowIterator.next(); // 跳过表头行
// /** while (rowIterator.hasNext()) {
// * 批量消费消息 Row row = rowIterator.next();
// * @param messages Cell cell = row.getCell(0);
// */ if (cell != null) {
// @KafkaListener(id = "consumerBatch", topics = "test-batch") String cellValue = getCellValueAsString(cell);
// public void consumerBatch(List<ConsumerRecord<String, String>> messages) { list.add(cellValue);
// log.info("consumerBatch =====> messageSize: {}", messages.size()); }
// log.info(messages.toString()); }
// } return list;
}
// /** private static String getCellValueAsString(Cell cell) {
// * 指定消费异常处理器 if (cell == null) {
// * @param message return "";
// */ }
// @KafkaListener(id = "consumerException", topics = "kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler") switch (cell.getCellType()) {
// public void consumerException(String message) { case STRING:
// throw new RuntimeException("consumer exception"); return cell.getStringCellValue();
// } case NUMERIC:
// if (DateUtil.isCellDateFormatted(cell)) {
// /** return cell.getDateCellValue().toString();
// * 验证ConsumerInterceptor } else {
// * @param message return Double.toString(cell.getNumericCellValue());
// */ }
// @KafkaListener(id = "interceptor", topics = "consumer-interceptor") case BOOLEAN:
// public void consumerInterceptor(String message) { return Boolean.toString(cell.getBooleanCellValue());
// log.info("consumerInterceptor ====> message: {}", message); case FORMULA:
// } return cell.getCellFormula();
// default:
// return "";
// }
//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup" }
//@KafkaListener(topics = "test", groupId = "zhTestGroup")
//public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
// String value = record.value();
// System.out.println(value);
// System.out.println(record);
// //手动提交offset
// ack.acknowledge();
//}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment