Commit a4b5f0a2 authored by KeYong's avatar KeYong

站端更新科技、许继等平台message服务处理消息功能

parent 9d81430d
package com.yeejoin.equipmanage.controller;
import com.yeejoin.equipmanage.common.entity.SignalClassify;
import com.yeejoin.equipmanage.common.vo.EquipCategoryVo;
import com.yeejoin.equipmanage.service.IStatisticsService;
import io.swagger.annotations.Api;
......@@ -10,6 +11,8 @@ import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.typroject.tyboot.core.foundation.enumeration.UserType;
import org.typroject.tyboot.core.restful.doc.TycloudOperation;
import org.typroject.tyboot.core.restful.utils.ResponseHelper;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
......@@ -95,4 +98,11 @@ public class StatisticsController {
public List<EquipCategoryVo> equipCategoryVoList(@PathVariable String systemCode) {
return iStatisticsService.equipCategoryVoList(systemCode);
}
@RequestMapping(value = "/test", method = RequestMethod.GET)
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "GET", value = "根据id查询", notes = "根据id查询")
public ResponseModel<String> getStr() {
return ResponseHelper.buildResponse(iStatisticsService.getStr());
}
}
......@@ -22,5 +22,6 @@ public interface IStatisticsService {
*/
List<EquipCategoryVo> equipCategoryVoList(String systemCode);
String getStr();
}
package com.yeejoin.equipmanage.service.impl;
import com.yeejoin.equipmanage.utils.ClassToJsonUtil;
import org.springframework.core.io.Resource;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.feign.privilege.model.AgencyUserModel;
......@@ -16,6 +18,7 @@ import com.yeejoin.equipmanage.mapper.StockDetailMapper;
import com.yeejoin.equipmanage.remote.RemoteSecurityService;
import com.yeejoin.equipmanage.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
......@@ -221,6 +224,16 @@ public class StatisticsServiceImpl implements IStatisticsService {
return equipmentSpecificMapper.selectEquipCategoryNumber(fightingSystemEntity.getId());
}
@Value("classpath:/json/commonMessage.json")
private Resource commonMessage;
@Override
public String getStr() {
String s = "{\"code\":2000,\"id\":1,\"body\":{\"warns\":[{\"systemid\":\"2\",\"pointId\":\"73811\",\"level\":\"3\",\"time\":\"2024-04-08 09:17:56.226\",\"type\":\"遥信变位\",\"deviceId\":\"\",\"content\":\"火灾报警主机_主控楼二层1#蓄电池室可燃气体火警屏蔽状态 消失\"}]}}";
String s1 = "{\"data_class\":\"realdata\",\"data_type\":\"alarm\",\"op_type\":\"subscribe_emergency\",\"condition\":{\"station_psr_id\":\"50edcb6c1b8a811030493c80a2014950ed9d4f59e8\",\"station_name\":\"中州换流站\",\"alarm_type\":\"yx_bw\"},\"data\":[{\"psrId\":\"D017020000000000000000999\",\"astId\":\"D017020000000000000000999\",\"equipType\":\"ASTType_0000111\",\"eventType\":\"OtherSignal\",\"alarmSource\":\"OWS\",\"alarmLevel\":\"3\",\"description\":\"2024-03-11 09:06:17::585 S2WCL12A E3.C01软水器再生结束信号 出现\",\"dateTime\":\"2024-03-11 09:06:17.585\"}]}";
String s2 = "{\"code\":2000,\"id\":1,\"body\":{\"time_stamp\":\"2024-04-07 16:42:52.616\",\"datatype\":\"analog\",\"name\":\"直流接地极电流IDEE\",\"value\":12.969970703125,\"key\":\"0000YC00010100409C\",\"quality\":0,”code”:1001}}";
return ClassToJsonUtil.class2json(s1, commonMessage, "k1");
}
private Double getExtinguishingCountByWarehouseId(String categoryCode, Long warehouseId) {
......
package com.yeejoin.equipmanage.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author keyong
* @title: ClassToJsonUtil
* <pre>
* @description: TODO
* </pre>
* @date 2024/4/11 11:28
*/
public class ClassToJsonUtil {
private static Map<String, Object> map = new HashMap<>();
public static String class2json(Object obj, Resource commonMessage, String topic) {
String json;
try {
json = IOUtils.toString(commonMessage.getInputStream(), String.valueOf(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new RuntimeException("获取kafka信息模板失败!");
}
List<Map> listModel = JSONObject.parseArray(json, Map.class);
if (0 < listModel.size()) {
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("kafkaTopic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(mapList)) {
Map<String, Object> map1 = mapList.get(0);
JSONObject object = JSON.parseObject(String.valueOf(obj));
analysisJson(object, "");
JSONObject res = analyseJson(JSON.toJSONString(map1), map);
res.put("kafkaTopic", map1.get("kafkaTopic"));
res.put("mqTopic", map1.get("mqTopic"));
return JSON.toJSONString(res);
}
}
return null;
}
private static JSONObject analysisJson(Object objJson, String flag) {
if (objJson instanceof JSONArray) {//如果obj为json数组
JSONArray objArray = (JSONArray) objJson;
for (int i = 0; i < objArray.size(); i++) {
analysisJson(objArray.get(i), flag);
}
} else if (objJson instanceof JSONObject) {//如果为json对象
JSONObject jsonObject = (JSONObject) objJson;
Iterator it = jsonObject.keySet().iterator();
while (it.hasNext()) {
String key = it.next().toString();
Object object = jsonObject.get(key);
//如果得到的是数组
if (object instanceof JSONArray) {
JSONArray objArray = (JSONArray) object;
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
path = key;
}
analysisJson(objArray, path);
} else if (object instanceof JSONObject) {//如果key中是一个json对象
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
path = key;
}
analysisJson((JSONObject) object, path);
} else {//如果key中是其他
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
path = key;
}
// System.out.println(path+":"+object.toString()+" ");
map.put(path, String.valueOf(object));
}
}
} else {//如果key中是其他
// System.out.println(flag+":"+objJson.toString()+" ");
map.put(flag, String.valueOf(objJson));
}
return JSONObject.parseObject(JSON.toJSONString(map));
}
public static JSONObject analyseJson(String jsonData, Map<String, Object> keyMap) {
SortedMap<String, Object> map = new TreeMap<>();
JSONObject jsonObject = JSON.parseObject(jsonData);
for (String key : jsonObject.keySet()) {
String resKey = keyMap.get(key) == null ? key : String.valueOf(keyMap.get(key));
Object value = jsonObject.get(key);
if (value instanceof JSONArray) {
JSONArray jsonArray = new JSONArray(new LinkedList<>());
JSONArray array = jsonObject.getJSONArray(key);
for (int i = 0; i < array.size(); i++) {
Object object = array.get(i);
if (object instanceof String) {
map.put(resKey, array);
} else {
JSONObject sortJson = analyseJson(String.valueOf(object), keyMap);
jsonArray.add(sortJson);
map.put(resKey, jsonArray);
}
}
} else if (value instanceof JSONObject) {
JSONObject sortJson = analyseJson(String.valueOf(value), keyMap);
map.put(resKey, sortJson);
} else {
map.put(resKey, ObjectUtils.isNotEmpty(keyMap.get(value)) ? keyMap.get(value) : "");
}
}
return new JSONObject(map);
}
}
[
{
"kafkaTopic": "k1",
"mqTopic": "romaSite/data/transmit",
"data": {
"dataType": "condition.station_psr_id",
"value": "condition.station_psr_id",
"timeStamp": "condition.station_psr_id",
"quality": "condition.station_psr_id",
"scadaId": "condition.station_psr_id",
"key": "condition.station_psr_id",
"disCreate": "condition.station_psr_id",
"name": "condition.station_psr_id"
}
},
{
"kafkaTopic": "k2",
"mqTopic": "romaSite/data/eventAlarm",
"data": {
"timeStamp": "body.warns.time",
"warns": [
{
"eventTextL1": "body.warns.systemid",
"pointId": "body.warns.pointId",
"time": "body.warns.type",
"deviceId": "deviceId",
"eventstatus": "body.warns.content"
}
]
}
}
]
\ No newline at end of file
package com.yeejoin.amos.message.kafka;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException;
......@@ -13,6 +13,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
......@@ -63,8 +65,12 @@ public class KafkaConsumerService {
Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) {
try {
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONObject data = messageObj.getJSONObject("body");
// if (data.isEmpty()) {
......@@ -72,10 +78,12 @@ public class KafkaConsumerService {
// data.put("datatype", "state");
// }
log.info("接收到Roma消息对象: {}", object);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
......@@ -92,12 +100,18 @@ public class KafkaConsumerService {
try {
// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONObject data = messageObj.getJSONObject("body");
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
......@@ -134,11 +148,13 @@ public class KafkaConsumerService {
// jsonObjectMessage.put("timestamp", timestamp);
JSONObject object = JSONObject.fromObject(record.value());
String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
......
package com.yeejoin.amos.message.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.io.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
/**
......@@ -22,8 +24,9 @@ import java.util.stream.Collectors;
*/
public class ClassToJsonUtil {
public static String class2json(Object obj, Resource commonMessage, String topic) {
private static Map<String, Object> map = new HashMap<>();
public static JSONObject class2json(Object obj, Resource commonMessage, String topic) {
String json;
try {
json = IOUtils.toString(commonMessage.getInputStream(), String.valueOf(StandardCharsets.UTF_8));
......@@ -31,34 +34,97 @@ public class ClassToJsonUtil {
throw new RuntimeException("获取kafka信息模板失败!");
}
List<Map> listModel = JSONObject.parseArray(json, Map.class);
if (0 < listModel.size()) {
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("topic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
List<Map> mapList = listModel.stream().filter(x -> String.valueOf(x.get("kafkaTopic")).equalsIgnoreCase(topic)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(mapList)) {
Map<String, Object> map = mapList.get(0);
Map<String, Object> entityObj = JSONObject.parseObject(String.valueOf(map.get("data")), Map.class);
Map<String, Object> sourceMap = JSONObject.parseObject(String.valueOf(obj), Map.class);
for (Map.Entry<String, Object> entry : entityObj.entrySet()) {
String fieldName = entry.getKey();
String fieldValue = String.valueOf(entry.getValue());
// 数据表body里面与外面有相同的key值时,若配置了" _body "标识则取body里面的key值,否则直接取body外面的值
Map<String, Object> bodyMap = JSONObject.parseObject(String.valueOf(sourceMap.get("body")), Map.class);
if (-1 < fieldValue.lastIndexOf("_body")) {
entry.setValue(bodyMap.get(fieldName));
Map<String, Object> map1 = mapList.get(0);
JSONObject object = JSON.parseObject(String.valueOf(obj));
analysisJson(object, "");
JSONObject res = analyseJson(JSON.toJSONString(map1), map);
res.put("kafkaTopic", map1.get("kafkaTopic"));
res.put("mqTopic", map1.get("mqTopic"));
return res;
}
}
return null;
}
private static JSONObject analysisJson(Object objJson, String flag) {
if (objJson instanceof JSONArray) {//如果obj为json数组
JSONArray objArray = (JSONArray) objJson;
for (int i = 0; i < objArray.size(); i++) {
analysisJson(objArray.get(i), flag);
}
} else if (objJson instanceof JSONObject) {//如果为json对象
JSONObject jsonObject = (JSONObject) objJson;
Iterator it = jsonObject.keySet().iterator();
while (it.hasNext()) {
String key = it.next().toString();
Object object = jsonObject.get(key);
//如果得到的是数组
if (object instanceof JSONArray) {
JSONArray objArray = (JSONArray) object;
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
path = key;
}
analysisJson(objArray, path);
} else if (object instanceof JSONObject) {//如果key中是一个json对象
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
if (sourceMap.containsKey(fieldValue)) {
entry.setValue(sourceMap.get(fieldValue));
} else if (bodyMap.containsKey(fieldValue)) {
entry.setValue(bodyMap.get(fieldValue));
} else {
entry.setValue("");
}
path = key;
}
analysisJson((JSONObject) object, path);
} else {//如果key中是其他
String path = "";
if (StringUtils.isNotBlank(flag)) {
path = flag + "." + key;
} else {
path = key;
}
// System.out.println(path+":"+object.toString()+" ");
map.put(path, String.valueOf(object));
}
return JSON.toJSONString(entityObj);
}
} else {//如果key中是其他
// System.out.println(flag+":"+objJson.toString()+" ");
map.put(flag, String.valueOf(objJson));
}
return null;
return JSONObject.parseObject(JSON.toJSONString(map));
}
public static JSONObject analyseJson(String jsonData, Map<String, Object> keyMap) {
SortedMap<String, Object> map = new TreeMap<>();
JSONObject jsonObject = JSON.parseObject(jsonData);
for (String key : jsonObject.keySet()) {
String resKey = keyMap.get(key) == null ? key : String.valueOf(keyMap.get(key));
Object value = jsonObject.get(key);
if (value instanceof JSONArray) {
JSONArray jsonArray = new JSONArray(new LinkedList<>());
JSONArray array = jsonObject.getJSONArray(key);
for (int i = 0; i < array.size(); i++) {
Object object = array.get(i);
if (object instanceof String) {
map.put(resKey, array);
} else {
JSONObject sortJson = analyseJson(String.valueOf(object), keyMap);
jsonArray.add(sortJson);
map.put(resKey, jsonArray);
}
}
} else if (value instanceof JSONObject) {
JSONObject sortJson = analyseJson(String.valueOf(value), keyMap);
map.put(resKey, sortJson);
} else {
map.put(resKey, ObjectUtils.isNotEmpty(keyMap.get(value)) ? keyMap.get(value) : "");
}
}
return new JSONObject(map);
}
}
[
{
"topic": "AAA",
"kafkaTopic": "k1",
"mqTopic": "romaSite/data/transmit",
"data": {
"dataType": "dataType",
"value": "value",
"timeStamp": "time_stamp",
"quality": "quality",
"scadaId": "id",
"key": "key",
"disCreate": "disCreate",
"name": "name"
"dataType": "condition.station_psr_id",
"value": "condition.station_psr_id",
"timeStamp": "condition.station_psr_id",
"quality": "condition.station_psr_id",
"scadaId": "condition.station_psr_id",
"key": "condition.station_psr_id",
"disCreate": "condition.station_psr_id",
"name": "condition.station_psr_id"
}
},
{
"topic": "BBB",
"kafkaTopic": "k2",
"mqTopic": "romaSite/data/eventAlarm",
"data": {
"timeStamp": "timeStamp",
"timeStamp": "body.warns.time",
"warns": [
{
"eventTextL1": "eventTextL1",
"pointId": "pointId",
"time": "time",
"eventTextL1": "body.warns.systemid",
"pointId": "body.warns.pointId",
"time": "body.warns.type",
"deviceId": "deviceId",
"eventstatus": "eventstatus"
"eventstatus": "body.warns.content"
}
]
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment