Commit 9acd9958 authored by 高建强's avatar 高建强

item:添加预案数据同步

parent 2f2930fa
package com.yeejoin.amos.fas.common.enums;
/**
* <h1>同步数据动作</h1>
*
* @Author SingleTian
* @Date 2021-04-01 09:21
*/
public enum DataSyncOperationEnum {
/**
* 创建
*/
CREATE,
/**
* 更新
*/
UPDATE,
/**
* 删除
*/
DELETE;
}
package com.yeejoin.amos.fas.common.enums;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.enums
* @ClassName: SyncDataTextEnum
* @Author: Jianqiang Gao
* @Description: SyncDataTextEnum
* @Date: 2021/4/13 09:55
* @Version: 1.0
*/
public enum DataSyncTextEnum {
/**
* 物联开关量
*/
IOT_SWITCH("开关"),
/**
* 物联模拟量
*/
IOT_ANALOG("模拟"),
/**
* 文本
*/
DATA_TYPE_TEXT("文本"),
/**
* 數值
*/
DATA_TYPE_NUMBER("數值");
private final String text;
DataSyncTextEnum(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
package com.yeejoin.amos.fas.common.enums;
/**
* <h1>同步数据类型</h1>
*
* @Author SingleTian
* @Date 2021-04-01 09:20
*/
public enum DataSyncTypeEnum {
CONTINGENCY_PLAN_DETAIL("11", "cs/v1/fireASF/dataSync"),
CONTINGENCY_PLAN_OPERATION_RECORD("11", "cs/v1/fireASF/dataSync"),
CONTINGENCY_ORIGINAL_DATA("11", "cs/v1/fireASF/dataSync"),
CONTINGENCY_PLAN_INSTANCE("11", "cs/v1/fireASF/dataSync");
/**
* 资源类型编码
*/
private final String sourceCode;
/**
* mqtt主题
*/
private final String mqTopic;
DataSyncTypeEnum(String sourceCode, String mqTopic) {
this.sourceCode = sourceCode;
this.mqTopic = mqTopic;
}
public String getSourceCode() {
return sourceCode;
}
public String getMqTopic() {
return mqTopic;
}
}
......@@ -17,15 +17,16 @@ import com.yeejoin.amos.fas.business.action.result.ActionResult;
import com.yeejoin.amos.fas.business.action.result.SafteyPlanResult;
import com.yeejoin.amos.fas.business.action.result.message.AbstractActionResultMessage;
import com.yeejoin.amos.fas.business.action.util.ContingencyLogPublisher;
import com.yeejoin.amos.fas.business.bo.PlanDetailSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanOperationRecordSyncBo;
import com.yeejoin.amos.fas.business.dao.mapper.PlanDetailMapper;
import com.yeejoin.amos.fas.business.dao.mapper.PlanOperationRecordMapper;
import com.yeejoin.amos.fas.business.dao.repository.IPlanDetailDao;
import com.yeejoin.amos.fas.business.dao.repository.IPlanOperationRecordDao;
import com.yeejoin.amos.fas.business.feign.IDutyModeServer;
import com.yeejoin.amos.fas.business.feign.RemoteSecurityService;
import com.yeejoin.amos.fas.business.service.impl.RuleRunigSnapshotServiceImpl;
import com.yeejoin.amos.fas.business.service.intfc.IContingencyInstance;
import com.yeejoin.amos.fas.business.service.intfc.IEquipmentService;
import com.yeejoin.amos.fas.business.service.intfc.IRiskSourceService;
import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
import com.yeejoin.amos.fas.business.service.intfc.*;
import com.yeejoin.amos.fas.business.service.model.ContingencyDeviceStatus;
import com.yeejoin.amos.fas.business.service.model.ToipResponse;
import com.yeejoin.amos.fas.business.util.CacheFactory;
......@@ -105,6 +106,18 @@ public class ContingencyAction implements CustomerAction {
@Autowired
private IContingencyInstance contingencyInstance;
@Value("${systemctl.sync.switch}")
private Boolean dataSyncSwitch;
@Autowired
private IDataSyncService dataSyncService;
@Autowired
private PlanOperationRecordMapper planOperationRecordMapper;
@Autowired
private PlanDetailMapper planDetailMapper;
@Value("${rocket-plan-topic}")
private String rocketTopic;
......@@ -746,12 +759,16 @@ public class ContingencyAction implements CustomerAction {
if (planOperationRecord != null && planOperationRecord.getStatus() == PlanRecordStatusEnum.OPERATION.getCode()) {
planOperationRecord.setStatus(PlanRecordStatusEnum.COMPLETE.getCode());
planOperationRecord.setEndTime(new Date());
planOperationRecordDao.save(planOperationRecord);
PlanOperationRecord record = planOperationRecordDao.save(planOperationRecord);
// 异步数据同步之消息发送
planOperationRecordDataSync(record);
Optional<PlanDetail> optionalPlanDetail = planDetailDao.findById(planOperationRecord.getPlanId());
if (optionalPlanDetail.get() != null) {
PlanDetail planDetail = optionalPlanDetail.get();
planDetail.setStatus(ContingencyPlanStatusEnum.AVAILABLE.getCode());
planDetailDao.save(planDetail);
PlanDetail detail = planDetailDao.save(planDetail);
// 异步数据同步之消息发送
planDetailDataSync(detail);
}
//数字预案业务屏web端发送消息
this.sendweb("recordarea", paramObj, result);
......@@ -759,6 +776,37 @@ public class ContingencyAction implements CustomerAction {
}
private void planDetailDataSync(PlanDetail detail) {
if (dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
Map<String, Object> map = new HashMap<>();
map.put("id", detail.getId());
List<PlanDetailSyncBo> planDetailSyncBoList = planDetailMapper.getPlanDetailSyncBoList(map);
dataSyncService.syncCreatedPlanDetailSyncBo(planDetailSyncBoList);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "stopPlan==>syncCreatedPlanDetailSyncBo", e);
}
}
}
private void planOperationRecordDataSync(PlanOperationRecord record) {
// 异步数据同步之消息发送
if (dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
Map<String, Object> map = new HashMap<>();
map.put("id", record.getId());
List<PlanOperationRecordSyncBo> planOperationRecordSyncBoList = planOperationRecordMapper.getPlanOperationRecordSyncBoList(map);
dataSyncService.syncCreatedPlanOperationRecordSyncBo(planOperationRecordSyncBoList);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "stopPlan==>syncCreatedPlanOperationRecordSyncBo", e);
}
}
}
@RuleMethod(methodLabel = "自动执行步骤", project = "换流站消防专项预案")
public void autoExecute(
......@@ -932,12 +980,16 @@ public class ContingencyAction implements CustomerAction {
if (planOperationRecord != null) {
planOperationRecord.setStatus(PlanRecordStatusEnum.INTERRUPT.getCode());
planOperationRecord.setEndTime(new Date());
planOperationRecordDao.save(planOperationRecord);
PlanOperationRecord record = planOperationRecordDao.save(planOperationRecord);
// 异步数据同步之消息发送
planOperationRecordDataSync(record);
Optional<PlanDetail> optionalPlanDetail = planDetailDao.findById(planOperationRecord.getPlanId());
if (optionalPlanDetail.get() != null) {
PlanDetail planDetail = optionalPlanDetail.get();
planDetail.setStatus(ContingencyPlanStatusEnum.AVAILABLE.getCode());
planDetailDao.save(planDetail);
PlanDetail detail = planDetailDao.save(planDetail);
// 异步数据同步之消息发送
planDetailDataSync(detail);
}
}
}
......
package com.yeejoin.amos.fas.business.bo;
import lombok.Data;
import java.util.Date;
/**
* 基本实体扩展类
*/
@Data
public class BasicEntityBo {
private long id;
private Date createDate;
}
package com.yeejoin.amos.fas.business.bo;
import lombok.Data;
import java.util.Date;
@Data
public abstract class BusinessEntityBo {
private Date createDate;
private String createUser;
private Date updateDate;
private String updateUser;
private Boolean isDelete;
}
package com.yeejoin.amos.fas.business.bo;
import lombok.Data;
@Data
public class ContingencyOriginalDataSyncBo extends BusinessEntityBo {
protected String id;
private String fireEquipmentName;//消防设备名称
private String fireEquipmentId;//消防设备id
private Integer layer;//显示图层
//当前探测器
private Integer fireEquipmentLayer;//当前探测器图层
private String fireEquipmentPosition;//消防设备位置
//重点设备信息
//负责人名称,手机号
private String equipmentId;//重点设备id
private String equipmentName;
private String equipmentPosition3d;
private String mobile; //负责人手机号
private String adminName;//负责人名称
//摄像头
private String cameraCodes;//摄像头编号
private String cameraIds;//摄像头id
private Integer fireCount; //火情数量
private String confirm;//是否确认火情,确认 CONFIRM,取消CANCEL,未操作 NONE
private String batchNo;
private String picture1;
private String picture2;
private String picture3;
private String picture4;
private String fireTruckRoute;
private boolean runstep; //是否已经执行流程
private String step;//当前步骤
private String stepState;//步骤的操作状态,由所有按钮的步骤状态拼接而成
}
package com.yeejoin.amos.fas.business.bo;
import lombok.Data;
import java.util.Date;
@Data
public class PlanDetailSyncBo extends BasicEntityBo {
/**
* 预案名称
*/
private String planName;
/**
* 预案编号
*/
private String code;
/**
* 预案分类id
*/
private Long classifyId;
/**
* 预案分类名称
*/
private String classifyName;
/**
* 适用范围
*/
private String planRange;
/**
* 编写部门
*/
private String editOrgName;
/**
* 版次
*/
private String edition;
/**
* 实施时间
*/
private Date implementationTime;
/**
* 备注
*/
private String remark;
/**
* 预案状态
*/
private Integer status;
/**
* 创建人
*/
private String creator;
/**
* 修改人
*/
private String reviser;
/**
* 修改时间
*/
private Date updateTime;
/**
* 部门code
*/
private String orgCode;
/**
* 删除状态(0、正常,1、删除)
*/
private Boolean isDelete;
/**
* 录入时间
*/
private Date inputTime;
}
\ No newline at end of file
package com.yeejoin.amos.fas.business.bo;
import lombok.Data;
import java.util.Date;
/**
* c_plan_operation_record
*
* @author
*/
@Data
public class PlanOperationRecordSyncBo extends BasicEntityBo {
/**
* 预案ID
*/
private Long planId;
/**
* 预案
*/
private String planName;
/**
* 运行模式(0、模拟,1、自动)
*/
private Integer planPattern;
/**
* 开始时间
*/
private Date startTime;
/**
* 结束时间
*/
private Date endTime;
/**
* 批次号
*/
private String batchNo;
/**
* 是否删除
*/
private Boolean isDelete;
/**
* 运行状态(0、运行中,1、完毕,3、中断)
*/
private Integer status;
/**
* 启动人名称
*/
private String startUserName;
/**
* 启动人id
*/
private String startUserId;
/**
* 装备code
*/
private String equipmentCode;
/**
* 执行方式(0、预案验证 1、火灾处置)
*/
private Integer executionType;
/**
* 装备名称
*/
private String equipmentName;
/**
* 装备id
*/
private Long equipmentId;
/**
* 电力装备id
*/
private Long fireEquipmentId;
/**
* 电力装备名称
*/
private String fireEquipmentName;
}
\ No newline at end of file
package com.yeejoin.amos.fas.business.dao.mapper;
import com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
......@@ -21,4 +22,5 @@ public interface ContingencyOriginalMapper extends BaseMapper {
@Param("length") Integer length,
@Param("contingencyName")String contingencyName);
List<ContingencyOriginalDataSyncBo> getContingencyOriginalDataBoList(Map<String, Object> map);
}
package com.yeejoin.amos.fas.business.dao.mapper;
import com.yeejoin.amos.fas.business.bo.PlanDetailSyncBo;
import com.yeejoin.amos.fas.business.vo.PlanDetailVo;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
......@@ -7,6 +8,7 @@ import org.springframework.stereotype.Repository;
import javax.xml.crypto.Data;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* <h1><h1>
......@@ -52,4 +54,5 @@ public interface PlanDetailMapper {
Integer planReset(@Param("date") Date date);
List<PlanDetailSyncBo> getPlanDetailSyncBoList(Map<String, Object> map);
}
package com.yeejoin.amos.fas.business.dao.mapper;
import com.yeejoin.amos.fas.business.bo.PlanOperationRecordSyncBo;
import com.yeejoin.amos.fas.business.vo.PlanDetailVo;
import org.springframework.stereotype.Repository;
......@@ -20,4 +21,5 @@ public interface PlanOperationRecordMapper {
List<Map<String, Object>> filterList(Map<String, Object> params);
List<PlanOperationRecordSyncBo> getPlanOperationRecordSyncBoList(Map<String, Object> map);
}
package com.yeejoin.amos.fas.business.datasync;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.common.enums.DataSyncTypeEnum;
import lombok.Data;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @ProjectName: YeeAMOSPatrolRoot
* @Package: com.yeejoin.amos.patrol.business.datasync
* @ClassName: SyncDataMessage
* @Author: Jianqiang Gao
* @Description: SyncDataMessage
* @Date: 2022/5/30 10:40
* @Version: 1.0
*/
@Data
public class DataSyncMessage implements Serializable {
private static final long serialVersionUID = 3950066933125606745L;
/**
* 唯一ID
*/
private String uid;
/**
* 同步的数据类型
*/
private DataSyncTypeEnum type;
/**
* 同步动作-增删改
*/
private DataSyncOperationEnum operation;
/**
* 发生时间
*/
private Long timestamp;
/**
* 同步数据列表
*/
private List<Serializable> data;
/**
* 获取对应的redis消息key
*
* @return
*/
public String redisKey() {
assert type != null;
return String.join("_", type.toString(), operation.toString(), uid);
}
/**
* Message对象转换成字节码
*
* @return
*/
public byte[] message2Bytes() {
return JSON.toJSONString(this).getBytes(StandardCharsets.UTF_8);
}
/**
* 字节码转换成Message对象
*
* @param messageBytes 字节码
* @return
*/
public static DataSyncMessage bytes2Message(byte[] messageBytes) {
return JSON.parseObject(new String(messageBytes, StandardCharsets.UTF_8), DataSyncMessage.class);
}
}
\ No newline at end of file
......@@ -6,15 +6,14 @@ import com.yeejoin.amos.fas.business.action.ContingencyAction;
import com.yeejoin.amos.fas.business.action.model.ContingencyEvent;
import com.yeejoin.amos.fas.business.action.model.ContingencyRo;
import com.yeejoin.amos.fas.business.action.util.ContingencyLogPublisher;
import com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo;
import com.yeejoin.amos.fas.business.bo.FirePlanAlarmBo;
import com.yeejoin.amos.fas.business.dao.mapper.FireEquipPointMapper;
import com.yeejoin.amos.fas.business.dao.mapper.ImpAndFireEquipMapper;
import com.yeejoin.amos.fas.business.dao.mapper.PlanDetailMapper;
import com.yeejoin.amos.fas.business.dao.mapper.View3dMapper;
import com.yeejoin.amos.fas.business.dao.mapper.*;
import com.yeejoin.amos.fas.business.dao.repository.IContingencyOriginalDataDao;
import com.yeejoin.amos.fas.business.dao.repository.IContingencyPlanInstanceRepository;
import com.yeejoin.amos.fas.business.feign.RemoteSecurityService;
import com.yeejoin.amos.fas.business.service.intfc.IContingencyInstance;
import com.yeejoin.amos.fas.business.service.intfc.IDataSyncService;
import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
import com.yeejoin.amos.fas.business.service.model.Operate;
import com.yeejoin.amos.fas.business.service.model.OperateGroup;
......@@ -76,6 +75,15 @@ public class ContingencyInstanceImpl /*extends GenericManagerImpl<ContingencyPla
@Value("${station.name}")
private String stationName;
@Value("${systemctl.sync.switch}")
private Boolean dataSyncSwitch;
@Autowired
private IDataSyncService dataSyncService;
@Autowired
private ContingencyOriginalMapper contingencyOriginalMapper;
private static Map<String, String> stepMap = new HashMap<>();
......@@ -118,7 +126,22 @@ public class ContingencyInstanceImpl /*extends GenericManagerImpl<ContingencyPla
//计算序号
int count = repository.countByBatchNo(instanceNo);
planInstance.setSort(++count);
return this.repository.save(planInstance);
ContingencyPlanInstance instance = this.repository.save(planInstance);
// 异步数据同步之消息发送
contingencyPlanInstanceDataSync(instance);
return instance;
}
private void contingencyPlanInstanceDataSync(ContingencyPlanInstance instance) {
if (dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
dataSyncService.syncCreatedContingencyPlanInstance(instance);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "createInstanceRecord==>syncCreatedContingencyPlanInstance", e);
}
}
}
......@@ -163,12 +186,15 @@ public class ContingencyInstanceImpl /*extends GenericManagerImpl<ContingencyPla
if (Integer.parseInt(stepCode) > Integer.parseInt(contingencyOriginalData.getStep())) {
contingencyOriginalData.setStepState(stepStateOnbutton);
}
iContingencyOriginalDataDao.updateByButton(
int update = iContingencyOriginalDataDao.updateByButton(
contingencyOriginalData.getConfirm(),
contingencyOriginalData.getRunstep(),
contingencyOriginalData.getStepState(),
contingencyOriginalData.getBatchNo());
batchNo
);
// 异步数据同步之消息发送
contingencyOriginalDataDataSync(batchNo, update);
//使用原始数据触发规则
if ("CONFIRM".equals(buttonState)
......@@ -218,6 +244,21 @@ public class ContingencyInstanceImpl /*extends GenericManagerImpl<ContingencyPla
return Optional.ofNullable(equipment);
}
private void contingencyOriginalDataDataSync(String batchNo, int update) {
if (update > 0 && dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
Map<String, Object> map = new HashMap<>();
map.put("batchNo", batchNo);
List<ContingencyOriginalDataSyncBo> contingencyOriginalDataBoList = contingencyOriginalMapper.getContingencyOriginalDataBoList(map);
dataSyncService.syncCreatedContingencyOriginalDataSyncBo(contingencyOriginalDataBoList);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "fire==>syncCreatedContingencyOriginalDataSyncBo", e);
}
}
}
private void publisherPlanLog(String stepCode, String buttonCode, String batchNo) {
ContingencyEvent event = new ContingencyEvent(this);
JSONObject json = new JSONObject();
......@@ -274,11 +315,14 @@ public class ContingencyInstanceImpl /*extends GenericManagerImpl<ContingencyPla
}
if (buttonState.equals("CONFIRM"))
if (buttonState.equals("CONFIRM")) {
contingencyPlanInstance.setRunstate(true);
}
operateJson = objectMapper.writeValueAsString(operateGroup);
contingencyPlanInstance.setContent(operateJson);
repository.save(contingencyPlanInstance);
ContingencyPlanInstance instance = repository.save(contingencyPlanInstance);
// 异步数据同步之消息发送
contingencyPlanInstanceDataSync(instance);
}
}
......@@ -306,18 +350,23 @@ public class ContingencyInstanceImpl /*extends GenericManagerImpl<ContingencyPla
}
}
if (buttonState.equals("CONFIRM"))
if (buttonState.equals("CONFIRM")) {
contingencyPlanInstance.setRunstate(true);
}
operateJson = objectMapper.writeValueAsString(operateGroup);
contingencyPlanInstance.setContent(operateJson);
repository.save(contingencyPlanInstance);
ContingencyPlanInstance instance = repository.save(contingencyPlanInstance);
// 异步数据同步之消息发送
contingencyPlanInstanceDataSync(instance);
}
}
@Override
public void updateStep(String step, String batchNo) {
iContingencyOriginalDataDao.updateByButtonStep(step, batchNo);
int update = iContingencyOriginalDataDao.updateByButtonStep(step, batchNo);
// 异步数据同步之消息发送
contingencyOriginalDataDataSync(batchNo, update);
}
@Override
......
package com.yeejoin.amos.fas.business.service.impl;
import com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanDetailSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanOperationRecordSyncBo;
import com.yeejoin.amos.fas.business.datasync.DataSyncMessage;
import com.yeejoin.amos.fas.business.service.intfc.IDataSyncService;
import com.yeejoin.amos.fas.business.util.DataSyncUtil;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ProjectName: YeeAMOSPatrolRoot
* @Package: com.yeejoin.amos.patrol.business.service.impl
* @ClassName: DataSyncServiceImpl
* @Author: Jianqiang Gao
* @Description: DataSyncServiceImpl
* @Date: 2022/5/30 10:48
* @Version: 1.0
*/
@Service
public class DataSyncServiceImpl implements IDataSyncService {
@Autowired
private EmqKeeper emqKeeper;
@Override
public void asyncInvoke(AsyncExec consumer) throws Exception {
consumer.exec();
}
@Override
public void syncCreatedPlanDetailSyncBo(PlanDetailSyncBo planDetailSyncBo) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(planDetailSyncBo, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncCreatedPlanDetailSyncBo(Collection<PlanDetailSyncBo> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncDeletedPlanDetailSyncBo(Collection<Long> ids) {
List<PlanDetailSyncBo> list = ids.stream().map(id -> {
PlanDetailSyncBo planDetailSyncBo = new PlanDetailSyncBo();
planDetailSyncBo.setId(id);
return planDetailSyncBo;
}).collect(Collectors.toList());
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncDeletedPlanDetailSyncBo(Long id) {
PlanDetailSyncBo planDetailSyncBo = new PlanDetailSyncBo();
planDetailSyncBo.setId(id);
DataSyncMessage message = DataSyncUtil.buildSyncMessage(planDetailSyncBo, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncUpdatedPlanDetailSyncBo(Collection<PlanDetailSyncBo> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.UPDATE);
sendMessage(message);
}
@Override
public void syncCreatedPlanOperationRecordSyncBo(PlanOperationRecordSyncBo planTaskDetailBo) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(planTaskDetailBo, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncCreatedPlanOperationRecordSyncBo(Collection<PlanOperationRecordSyncBo> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncDeletedPlanOperationRecordSyncBo(Collection<Long> ids) {
List<PlanOperationRecordSyncBo> list = ids.stream().map(id -> {
PlanOperationRecordSyncBo planOperationRecordSyncBo = new PlanOperationRecordSyncBo();
planOperationRecordSyncBo.setId(id);
return planOperationRecordSyncBo;
}).collect(Collectors.toList());
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncDeletedPlanOperationRecordSyncBo(Long id) {
PlanOperationRecordSyncBo planOperationRecordSyncBo = new PlanOperationRecordSyncBo();
planOperationRecordSyncBo.setId(id);
DataSyncMessage message = DataSyncUtil.buildSyncMessage(planOperationRecordSyncBo, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncUpdatedPlanOperationRecordSyncBo(Collection<PlanOperationRecordSyncBo> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.UPDATE);
sendMessage(message);
}
@Override
public void syncCreatedContingencyOriginalDataSyncBo(ContingencyOriginalDataSyncBo contingencyOriginalDataSyncBo) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(contingencyOriginalDataSyncBo, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncCreatedContingencyOriginalDataSyncBo(Collection<ContingencyOriginalDataSyncBo> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncDeletedContingencyOriginalDataSyncBo(Collection<String> ids) {
List<ContingencyOriginalDataSyncBo> list = ids.stream().map(id -> {
ContingencyOriginalDataSyncBo contingencyOriginalDataSyncBo = new ContingencyOriginalDataSyncBo();
contingencyOriginalDataSyncBo.setId(id);
return contingencyOriginalDataSyncBo;
}).collect(Collectors.toList());
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncDeletedContingencyOriginalDataSyncBo(String id) {
ContingencyOriginalDataSyncBo checkBo = new ContingencyOriginalDataSyncBo();
checkBo.setId(id);
DataSyncMessage message = DataSyncUtil.buildSyncMessage(checkBo, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncUpdatedContingencyOriginalDataSyncBo(Collection<ContingencyOriginalDataSyncBo> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.UPDATE);
sendMessage(message);
}
@Override
public void syncCreatedContingencyPlanInstance(ContingencyPlanInstance contingencyPlanInstance) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(contingencyPlanInstance, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncCreatedContingencyPlanInstance(Collection<ContingencyPlanInstance> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.CREATE);
sendMessage(message);
}
@Override
public void syncDeletedContingencyPlanInstance(Collection<String> ids) {
List<ContingencyPlanInstance> list = ids.stream().map(id -> {
ContingencyPlanInstance contingencyPlanInstance = new ContingencyPlanInstance();
contingencyPlanInstance.setId(id);
return contingencyPlanInstance;
}).collect(Collectors.toList());
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncDeletedContingencyPlanInstance(String id) {
ContingencyPlanInstance contingencyPlanInstance = new ContingencyPlanInstance();
contingencyPlanInstance.setId(id);
DataSyncMessage message = DataSyncUtil.buildSyncMessage(contingencyPlanInstance, DataSyncOperationEnum.DELETE);
sendMessage(message);
}
@Override
public void syncUpdatedContingencyPlanInstance(Collection<ContingencyPlanInstance> list) {
DataSyncMessage message = DataSyncUtil.buildSyncMessage(list, DataSyncOperationEnum.UPDATE);
sendMessage(message);
}
private void sendMessage(DataSyncMessage message) {
try {
emqKeeper.getMqttClient().publish(message.getType().getMqTopic(), message.message2Bytes(), 2, false);
} catch (MqttException e) {
e.printStackTrace();
}
//暂时屏蔽同步时存储redis
//redisTemplate.opsForValue().set(message.redisKey(), new String(message.message2Bytes()));
}
}
\ No newline at end of file
......@@ -8,6 +8,7 @@ import com.yeejoin.amos.component.rule.RuleTrigger;
import com.yeejoin.amos.fas.business.action.model.ContingencyRo;
import com.yeejoin.amos.fas.business.action.mq.WebMqttComponent;
import com.yeejoin.amos.fas.business.action.mq.WebMqttSubscribe;
import com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo;
import com.yeejoin.amos.fas.business.bo.SafetyExecuteBo;
import com.yeejoin.amos.fas.business.dao.mapper.*;
import com.yeejoin.amos.fas.business.dao.repository.IContingencyOriginalDataDao;
......@@ -16,6 +17,7 @@ import com.yeejoin.amos.fas.business.dao.repository.IPreplanPictureDao;
import com.yeejoin.amos.fas.business.feign.RemoteSecurityService;
import com.yeejoin.amos.fas.business.param.AlarmParam;
import com.yeejoin.amos.fas.business.service.intfc.IContingencyPlanService;
import com.yeejoin.amos.fas.business.service.intfc.IDataSyncService;
import com.yeejoin.amos.fas.business.service.intfc.IEquipmentHandlerService;
import com.yeejoin.amos.fas.business.service.intfc.IView3dService;
import com.yeejoin.amos.fas.business.util.JexlUtil;
......@@ -137,8 +139,14 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService {
@Autowired
IContingencyPlanService iContingencyPlanService;
//@Autowired
//private RocketMQTemplate rocketMQTemplate;
@Value("${systemctl.sync.switch}")
private Boolean dataSyncSwitch;
@Autowired
private IDataSyncService dataSyncService;
@Autowired
private ContingencyOriginalMapper contingencyOriginalMapper;
@Value("${rocket-plan-topic}")
private String rocketTopic;
......@@ -563,6 +571,19 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService {
ContingencyOriginalData contingencyOriginalData = new ContingencyOriginalData();
BeanUtils.copyProperties(contingencyRo, contingencyOriginalData);
iContingencyOriginalDataDao.save(contingencyOriginalData);
ContingencyOriginalData originalData = iContingencyOriginalDataDao.save(contingencyOriginalData);
// 异步数据同步之消息发送
if (dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
Map<String, Object> map = new HashMap<>();
map.put("id", originalData.getId());
List<ContingencyOriginalDataSyncBo> contingencyOriginalDataBoList = contingencyOriginalMapper.getContingencyOriginalDataBoList(map);
dataSyncService.syncCreatedContingencyOriginalDataSyncBo(contingencyOriginalDataBoList);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "alarmContingency==>syncCreatedContingencyOriginalDataSyncBo", e);
}
}
}
}
......@@ -112,7 +112,6 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
@Autowired
private FireEquipMapper fireEquipMapper;
@Autowired
private ImpAndFireEquipMapper impAndFireEquipMapper;
......@@ -137,14 +136,14 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
@Autowired
IContingencyOriginalDataDao iContingencyOriginalDataDao;
// @Autowired
// IFireEquipmentPointDao iFireEquipmentPointDao;
@Value("${systemctl.sync.switch}")
private Boolean dataSyncSwitch;
// @Autowired
// IFireEquipmentDataDao iFireEquipmentDataDao;
@Autowired
private IDataSyncService dataSyncService;
// @Autowired
// private RemoteWebSocketServer remoteWebSocketServer;
@Autowired
private ContingencyOriginalMapper contingencyOriginalMapper;
@Autowired
private IEquipmentService equipmentService;
......@@ -609,7 +608,20 @@ public class RiskSourceServiceImpl implements IRiskSourceService {
log.debug("规则调用返回==", result);
ContingencyOriginalData contingencyOriginalData = new ContingencyOriginalData();
BeanUtils.copyProperties(contingencyRo, contingencyOriginalData);
iContingencyOriginalDataDao.save(contingencyOriginalData);
ContingencyOriginalData originalData = iContingencyOriginalDataDao.save(contingencyOriginalData);
// 异步数据同步之消息发送
if ( dataSyncSwitch) {
try {
dataSyncService.asyncInvoke(() -> {
Map<String, Object> map = new HashMap<>();
map.put("id", originalData.getId());
List<ContingencyOriginalDataSyncBo> contingencyOriginalDataBoList = contingencyOriginalMapper.getContingencyOriginalDataBoList(map);
dataSyncService.syncCreatedContingencyOriginalDataSyncBo(contingencyOriginalDataBoList);
});
} catch (Exception e) {
log.info("数据同步之消息发送. [method='{}']", "alermContingency==>syncCreatedContingencyOriginalDataSyncBo", e);
}
}
}
}
......
package com.yeejoin.amos.fas.business.service.intfc;
import com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanDetailSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanOperationRecordSyncBo;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import java.util.Collection;
/**
* @ProjectName: YeeAMOSPatrolRoot
* @Package: com.yeejoin.amos.patrol.business.service.intfc
* @ClassName: IDataSyncService
* @Author: Jianqiang Gao
* @Description: IDataSyncService
* @Date: 2022/5/30 10:48
* @Version: 1.0
*/
public interface IDataSyncService {
void asyncInvoke(AsyncExec consumer) throws Exception;
@FunctionalInterface
interface AsyncExec {
void exec() throws Exception;
}
void syncCreatedPlanDetailSyncBo(PlanDetailSyncBo planOperationRecordSyncBo);
void syncCreatedPlanDetailSyncBo(Collection<PlanDetailSyncBo> list);
void syncDeletedPlanDetailSyncBo(Collection<Long> ids);
void syncDeletedPlanDetailSyncBo(Long id);
void syncUpdatedPlanDetailSyncBo(Collection<PlanDetailSyncBo> list);
void syncCreatedPlanOperationRecordSyncBo(PlanOperationRecordSyncBo planOperationRecordSyncBo);
void syncCreatedPlanOperationRecordSyncBo(Collection<PlanOperationRecordSyncBo> list);
void syncDeletedPlanOperationRecordSyncBo(Collection<Long> ids);
void syncDeletedPlanOperationRecordSyncBo(Long id);
void syncUpdatedPlanOperationRecordSyncBo(Collection<PlanOperationRecordSyncBo> list);
void syncCreatedContingencyOriginalDataSyncBo(ContingencyOriginalDataSyncBo contingencyOriginalDataSyncBo);
void syncCreatedContingencyOriginalDataSyncBo(Collection<ContingencyOriginalDataSyncBo> list);
void syncDeletedContingencyOriginalDataSyncBo(Collection<String> ids);
void syncDeletedContingencyOriginalDataSyncBo(String id);
void syncUpdatedContingencyOriginalDataSyncBo(Collection<ContingencyOriginalDataSyncBo> list);
void syncCreatedContingencyPlanInstance(ContingencyPlanInstance contingencyPlanInstance);
void syncCreatedContingencyPlanInstance(Collection<ContingencyPlanInstance> list);
void syncDeletedContingencyPlanInstance(Collection<String> ids);
void syncDeletedContingencyPlanInstance(String id);
void syncUpdatedContingencyPlanInstance(Collection<ContingencyPlanInstance> list);
}
package com.yeejoin.amos.fas.business.util;
import com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanDetailSyncBo;
import com.yeejoin.amos.fas.business.bo.PlanOperationRecordSyncBo;
import com.yeejoin.amos.fas.business.datasync.DataSyncMessage;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.common.enums.DataSyncTypeEnum;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import org.springframework.beans.BeanUtils;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
/**
* <h1>实体转换为同步消息对象</h1>
*
* @Author SingleTian
* @Date 2021-04-01 11:17
*/
public class DataSyncUtil {
public static <T> DataSyncMessage buildSyncMessage(T data, DataSyncOperationEnum operation) {
return buildSyncMessage(Arrays.asList(data), operation);
}
public static <T> DataSyncMessage buildSyncMessage(T data, DataSyncOperationEnum operation, String sign) {
return buildSyncMessage(Arrays.asList(data), operation, sign);
}
public static <T> DataSyncMessage buildSyncMessage(Collection<T> list, DataSyncOperationEnum operation) {
T t = list.stream().findFirst().orElse(null);
DataSyncTypeEnum type = getDataType(t, "");
List<Serializable> data = transData(list, type);
DataSyncMessage message = new DataSyncMessage();
message.setUid(UUID.randomUUID().toString());
message.setTimestamp(System.currentTimeMillis());
message.setOperation(operation);
message.setType(type);
message.setData(data);
return message;
}
public static <T> DataSyncMessage buildSyncMessage(Collection<T> list, DataSyncOperationEnum operation, String sign) {
T t = list.stream().findFirst().orElse(null);
DataSyncTypeEnum type = getDataType(t, sign);
List<Serializable> data = transData(list, type);
DataSyncMessage message = new DataSyncMessage();
message.setUid(UUID.randomUUID().toString());
message.setTimestamp(System.currentTimeMillis());
message.setOperation(operation);
message.setType(type);
message.setData(data);
return message;
}
private static <T> DataSyncTypeEnum getDataType(T t, String sign) {
if (t instanceof PlanDetailSyncBo) {
return DataSyncTypeEnum.CONTINGENCY_PLAN_DETAIL;
} else if (t instanceof PlanOperationRecordSyncBo) {
return DataSyncTypeEnum.CONTINGENCY_PLAN_OPERATION_RECORD;
} else if (t instanceof ContingencyOriginalDataSyncBo) {
return DataSyncTypeEnum.CONTINGENCY_ORIGINAL_DATA;
} else if (t instanceof ContingencyPlanInstance) {
return DataSyncTypeEnum.CONTINGENCY_PLAN_INSTANCE;
} else {
throw new RuntimeException("无法识别的类型");
}
}
private static List<Serializable> transData(Collection list, DataSyncTypeEnum type) {
switch (type) {
case CONTINGENCY_PLAN_DETAIL: {
return planDetailSyncBoData(list);
}
case CONTINGENCY_PLAN_OPERATION_RECORD: {
return planOperationRecordSyncBoData(list);
}
case CONTINGENCY_ORIGINAL_DATA: {
return contingencyOriginalDataSyncBoData(list);
}
case CONTINGENCY_PLAN_INSTANCE: {
return contingencyPlanInstanceData(list);
}
default:
return new ArrayList<>();
}
}
private static List<PlanDetailSyncBo> planDetailSyncBoData(Collection<PlanDetailSyncBo> list) {
return list.stream().map(i -> {
PlanDetailSyncBo planTaskBo = new PlanDetailSyncBo();
BeanUtils.copyProperties(i, planTaskBo);
return planTaskBo;
}
).collect(Collectors.toList());
}
private static List<PlanOperationRecordSyncBo> planOperationRecordSyncBoData(Collection<PlanOperationRecordSyncBo> list) {
return list.stream().map(i -> {
PlanOperationRecordSyncBo planTaskDetailBo = new PlanOperationRecordSyncBo();
BeanUtils.copyProperties(i, planTaskDetailBo);
return planTaskDetailBo;
}
).collect(Collectors.toList());
}
private static List<ContingencyOriginalDataSyncBo> contingencyOriginalDataSyncBoData(Collection<ContingencyOriginalDataSyncBo> list) {
return list.stream().map(i -> {
ContingencyOriginalDataSyncBo checkBo = new ContingencyOriginalDataSyncBo();
BeanUtils.copyProperties(i, checkBo);
return checkBo;
}
).collect(Collectors.toList());
}
private static List<ContingencyPlanInstance> contingencyPlanInstanceData(Collection<ContingencyPlanInstance> list) {
return list.stream().map(i -> {
ContingencyPlanInstance checkInputSyncBo = new ContingencyPlanInstance();
BeanUtils.copyProperties(i, checkInputSyncBo);
return checkInputSyncBo;
}
).collect(Collectors.toList());
}
}
......@@ -77,3 +77,6 @@ rocket-equip-alarm-topic =topic_fire_equip_alarm
#3Dtype 分为web和ue
integrated3Dtype =web
#数据同步开关
systemctl.sync.switch=true
......@@ -139,4 +139,26 @@
UPDATE c_plan_operation_record SET `status`=1;
UPDATE c_plan_operation_record SET `end_time`=#{date} ORDER BY id DESC LIMIT 1;
</update>
<select id="getPlanDetailSyncBoList" resultType="com.yeejoin.amos.fas.business.bo.PlanDetailSyncBo" parameterType="java.util.Map">
SELECT
pd.*,
ct.classify_name
FROM
c_plan_detail pd
LEFT JOIN c_plan_classify_tree ct ON pd.classify_id = ct.id
<where>
<if test="idList != null and idList.size() >0">
AND pd.id IN
<foreach collection="idList" item="item" index="index" open="(" close=")" separator=",">
#{item}
</foreach>
</if>
<if test="status != null and status !=''">
AND pd.`status` = #{status}
</if>
<if test="id != null">
AND pd.id = #{id}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
......@@ -88,4 +88,47 @@
ORDER BY cpor.start_time DESC
LIMIT #{start}, #{size}
</select>
<select id="getPlanOperationRecordSyncBoList" resultType="com.yeejoin.amos.fas.business.bo.PlanOperationRecordSyncBo" parameterType="java.util.Map">
SELECT
por.id,
por.plan_id,
CASE por.plan_pattern
WHEN 4 THEN '模拟'
WHEN 5 THEN '自动'
ELSE '未知' END AS plan_pattern,
CASE por.`status`
WHEN 1 THEN '运行中'
WHEN 2 THEN '完成'
WHEN 3 THEN '中断'
ELSE '未知' END AS `status`,
CASE por.execution_type
WHEN 0 THEN '预案验证'
WHEN 1 THEN '火灾处置'
ELSE '未知' END AS execution_type,
por.start_time,
por.end_time,
por.create_date,
por.batch_no,
por.is_delete,
por.start_user_name,
por.start_user_id,
por.equipment_code,
por.equipment_name,
por.equipment_id,
por.fire_equipment_id,
pd.plan_name,
e.`name` AS fireEquipmentName
FROM
c_plan_operation_record por
LEFT JOIN c_plan_detail pd ON por.plan_id = pd.id
LEFT JOIN f_equipment e ON por.fire_equipment_id = e.id
<where>
<if test="id != null">
por.`id` = #{id}
</if>
<if test="status != null and status != ''">
AND por.`status` = #{status}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
......@@ -58,7 +58,20 @@
OR m.equipment_Name like concat('%',#{contingencyName},'%')
</if>
</select>
<select id="getContingencyOriginalDataBoList" resultType="com.yeejoin.amos.fas.business.bo.ContingencyOriginalDataSyncBo" parameterType="java.util.Map">
SELECT
*
FROM
contingency_original_data d
<where>
<if test="id != null">
d.id = #{id}
</if>
<if test="batchNo != null and batchNo != ''">
AND d.batch_No = #{batchNo}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment