Commit e97d5216 authored by litengwei's avatar litengwei

pms预案推送逻辑

parent c276a15e
......@@ -2,7 +2,6 @@ package com.yeejoin.amos.fas.business.action;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.rule.MethodParam;
import com.yeejoin.amos.component.rule.RuleActionBean;
......@@ -36,17 +35,14 @@ import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.dao.entity.Equipment;
import com.yeejoin.amos.fas.dao.entity.PlanDetail;
import com.yeejoin.amos.fas.dao.entity.PlanOperationRecord;
import com.yeejoin.amos.fas.datasync.bo.PlanDetailSyncBo;
import com.yeejoin.amos.fas.datasync.bo.PlanOperationRecordSyncBo;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.io.Resource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
......@@ -54,13 +50,12 @@ import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
......@@ -88,6 +83,9 @@ public class ContingencyAction implements CustomerAction {
@Value("${spring.application.name}")
private String serviceName;
@Value("${stationCode:00001}")
private String stationCode;
@Value("${station.name}")
private String stationName;
......@@ -131,6 +129,9 @@ public class ContingencyAction implements CustomerAction {
@Autowired
private IEmergencyTaskService emergencyTaskService;
@Autowired
private EmqKeeper emqKeeper;
@Value("${rocket-plan-topic}")
private String rocketTopic;
......@@ -232,6 +233,7 @@ public class ContingencyAction implements CustomerAction {
AbstractActionResultMessage<?> action = (AbstractActionResultMessage<?>) constructor.newInstance(result);
if ("mqtt".equals(pushType.toLowerCase())) {
ToipResponse toipResponse = action.buildResponse(msgType, contingency, result.toJson());
toipResponse.setStationCode(stationCode);
String topic = String.format("/%s/%s/%s", serviceName, stationName, "plan");
log.info(String.format("mqtt[%s]:【 %s 】", topic, toipResponse.toJsonStr()));
webMqttComponent.publish(topic, toipResponse.toJsonStr());
......@@ -243,6 +245,8 @@ public class ContingencyAction implements CustomerAction {
event.setContingency(contingency);
contingencyLogPublisher.publish(event);
boolean flag = false;
JSONObject jsonObject = new JSONObject();
// 将预案的确认消息发送至中心级
if ("CONFIRM".equals(ro.getConfirm())) {
log.info("RocketMQ发送的主题是: " + rocketTopic + ", 消息体是: " + toipResponse.toJsonStr() + "!");
......@@ -252,7 +256,20 @@ public class ContingencyAction implements CustomerAction {
log.error(e.getMessage(), e);
throw new RuntimeException("RocketMQ消息发送失败!");
}
flag = true;
}
if(result.getDataNew().get("content").equals("startPlan") || result.getDataNew().get("content").equals("stopPlan")) {
flag = true;
}
if(flag) {
// 发送emq消息转kafka 至总部 pms3.0
jsonObject.put("topic", topic);
jsonObject.put("data", toipResponse);
try {
emqKeeper.getMqttClient().publish("emq.plan.info.created", jsonObject.toString().getBytes(), 1, false);
} catch (MqttException e) {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
}
}
} else if ("websocket".equals(pushType.toLowerCase())) {
action.execute(msgType, contingency);
......@@ -902,6 +919,17 @@ public class ContingencyAction implements CustomerAction {
updateNumberPlan(batchNo);
// 应急处置中断,初始化planStep,json数据;更新预案结束时间
planStepService.initPlanStep();
Map<String, Object> tempmap2 = new HashMap<>();
SafteyPlanResult result1 = new SafteyPlanResult();
tempmap2.put("type", "event");
tempmap2.put("content", "stopPlan");
//数字预案结束状态
tempmap2.put("status", PlanRecordStatusEnum.COMPLETE.getCode());
result1.add(tempmap2);
this.sendcmd("message", new Object(), result1);
// 更新预案结束时间
contingencyInstanceInfoService.updateEndTimeById(batchNo);
}
......
......@@ -29,7 +29,7 @@ public class WebMqttComponent {
while (true) {
try {
Message msg = queue.take();
emqKeeper.getMqttClient().publish(msg.getTopic(), msg.getJsonStr().getBytes(), 0, false);
emqKeeper.getMqttClient().publish(msg.getTopic(), msg.getJsonStr().getBytes(), 1, false);
} catch (Exception e) {
e.printStackTrace();
}
......
package com.yeejoin.amos.fas.business.action.result;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSON;
......@@ -12,4 +13,6 @@ public interface ActionResult
public void add(Object data);
public List<?> getData();
Map<String, Object> getDataNew();
}
......@@ -26,8 +26,12 @@ public class SafteyPlanResult implements ActionResult{
@Override
public List<?> getData() {
return null;
}
@Override
public Map<String, Object> getDataNew() {
return data;
}
}
......@@ -55,4 +55,9 @@ public class SimpleResult extends AbstractActionResult implements ActionResult
// TODO Auto-generated method stub
return data;
}
@Override
public Map<String, Object> getDataNew() {
return null;
}
}
......@@ -2,6 +2,7 @@ package com.yeejoin.amos.fas.business.event;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.fas.business.action.ContingencyAction;
import com.yeejoin.amos.fas.business.action.model.ContingencyRo;
import com.yeejoin.amos.fas.business.action.mq.WebMqttComponent;
......@@ -15,6 +16,7 @@ import com.yeejoin.amos.fas.business.vo.ContingencyPlanInstanceVO;
import com.yeejoin.amos.fas.dao.entity.ContingencyOriginalData;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.dao.entity.Equipment;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.map.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -37,6 +39,7 @@ import java.util.*;
* @date 2023/4/4 16:33
*/
@Component
@Slf4j
public class BizContingencyHandler implements EventHandler{
......@@ -221,6 +224,13 @@ public class BizContingencyHandler implements EventHandler{
result.put("msgContext", Collections.EMPTY_MAP);
result.put("msgType", "getStepList");
result.put("planStep", objects);
JSONObject jsonObject = new JSONObject();
// 发送emq消息转kafka 至总部 pms3.0
jsonObject.put("topic", topic);
jsonObject.put("data", result);
webMqttComponent.publish("emq.plan.info.created", JSON.toJSONString(jsonObject));
log.info("发送kafka消息至总部=================");
String planTask = "";
if (redisTemplate.hasKey("planTask")) {
planTask = Objects.requireNonNull(redisTemplate.opsForValue().get("planTask")).toString();
......
......@@ -35,6 +35,7 @@ import com.yeejoin.amos.feign.privilege.model.AgencyUserModel;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
......@@ -46,6 +47,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import java.util.*;
......@@ -515,6 +517,13 @@ public class ContingencyInstanceImpl implements IContingencyInstance {
result.put("msgContext", Collections.EMPTY_MAP);
result.put("msgType", "getStepList");
result.put("planStep", objects);
JSONObject jsonObject = new JSONObject();
// 发送emq消息转kafka 至总部 pms3.0
jsonObject.put("topic", topic);
jsonObject.put("data", result);
webMqttComponent.publish("emq.plan.info.created", JSON.toJSONString(jsonObject));
log.info("发送kafka消息至总部=================");
String planTask = "";
if (redisTemplate.hasKey("planTask")) {
planTask = Objects.requireNonNull(redisTemplate.opsForValue().get("planTask")).toString();
......
......@@ -38,6 +38,8 @@ public class ToipResponse implements Serializable
*/
private String msgType;
private String stationCode;
/**
* 模板类型
* 例如
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment