Commit 3ad8fdb8 authored by caotao's avatar caotao

解决分析多条预警预警系统中一条预警问题处理。

parent c7c2e34f
...@@ -4,6 +4,7 @@ import cn.hutool.core.date.DateUtil; ...@@ -4,6 +4,7 @@ import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.FanWarningRecordServiceImpl; import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.FanWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.HealthStatusIndicatorServiceImpl; import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.HealthStatusIndicatorServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.PvWarningRecordServiceImpl; import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.PvWarningRecordServiceImpl;
...@@ -28,25 +29,21 @@ import java.util.stream.Collectors; ...@@ -28,25 +29,21 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class WarningRecordStatusMessage extends EmqxListener { public class WarningRecordStatusMessage extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
/** /**
* 预警状态修改消息 - 标准化 * 预警状态修改消息 - 标准化
*/ */
public static final String WARNING_CHANGE_MESSAGE = "+/warning/change"; // public static final String WARNING_CHANGE_MESSAGE = "+/warning/change";
public static final String WARNING_CHANGE_MESSAGE = "warningchangetest";
@Autowired
private FanWaringRecordMapper fanWaringRecordMapper;
@Autowired @Autowired
private PvWaringRecordMapper pvWaringRecordMapper; protected EmqKeeper emqKeeper;
@Autowired @Autowired
FanWarningRecordServiceImpl fanWarningRecordService; FanWarningRecordServiceImpl fanWarningRecordService;
@Autowired @Autowired
PvWarningRecordServiceImpl pvWarningRecordService; PvWarningRecordServiceImpl pvWarningRecordService;
@Autowired
private FanWaringRecordMapper fanWaringRecordMapper;
@Autowired
private PvWaringRecordMapper pvWaringRecordMapper;
@PostConstruct @PostConstruct
void init() throws Exception { void init() throws Exception {
...@@ -71,8 +68,29 @@ public class WarningRecordStatusMessage extends EmqxListener { ...@@ -71,8 +68,29 @@ public class WarningRecordStatusMessage extends EmqxListener {
public void jxIopUpdateFan(JSONArray analysisResult) { public void jxIopUpdateFan(JSONArray analysisResult) {
log.info("修改预警状态信息:{}", analysisResult); log.info("修改预警状态信息:{}", analysisResult);
List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class); List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class);
taskList = taskList.stream().filter(t-> "2".equals(t.getString("processingStatus"))).collect(Collectors.toList()); taskList = taskList.stream().filter(t -> "2".equals(t.getString("processingStatus"))).collect(Collectors.toList());
List<String> traceIds = taskList.stream().map(t -> t.get("extAttr1").toString()).collect(Collectors.toList()); taskList.stream().forEach(jsonObject -> {
String[] params = jsonObject.getString("extAttr1").split("@");
QueryWrapper<FanWarningRecord> queryWrapper =new QueryWrapper<>();
queryWrapper.eq("station",jsonObject.getString("sourceAttributionDesc"));
queryWrapper.eq("equipment_name",jsonObject.getString("objectName"));
queryWrapper.eq("point_name",jsonObject.getString("objectIndexValue"));
queryWrapper.eq("kks",jsonObject.getString("objectId"));
if(params.length==3){
queryWrapper.eq("sub_system",params[0]);
queryWrapper.eq("health_level",params[1]);
queryWrapper.eq("warning_period",params[2]);
}
List<FanWarningRecord> fanWarningRecords = fanWaringRecordMapper.selectList(queryWrapper);
fanWarningRecords.stream().forEach(fanWarningRecord -> {
fanWarningRecord.setDisposotionDate(DateUtil.now());
fanWarningRecord.setStatus("1");
fanWarningRecord.setDisposotionState("已确认");
fanWaringRecordMapper.insert(fanWarningRecord);
});
});
// List<String> traceIds = taskList.stream().map(t -> t.get("extAttr1").toString()).collect(Collectors.toList());
// LambdaUpdateWrapper<IdxBizFanWarningRecord> lambda = new LambdaUpdateWrapper<>(); // LambdaUpdateWrapper<IdxBizFanWarningRecord> lambda = new LambdaUpdateWrapper<>();
// lambda.set(IdxBizFanWarningRecord::getDisposotionState, "已确认"); // lambda.set(IdxBizFanWarningRecord::getDisposotionState, "已确认");
// lambda.set(IdxBizFanWarningRecord::getStatus, "1"); // lambda.set(IdxBizFanWarningRecord::getStatus, "1");
...@@ -80,19 +98,19 @@ public class WarningRecordStatusMessage extends EmqxListener { ...@@ -80,19 +98,19 @@ public class WarningRecordStatusMessage extends EmqxListener {
// lambda.in(IdxBizFanWarningRecord::getSequenceNbr, traceIds); // lambda.in(IdxBizFanWarningRecord::getSequenceNbr, traceIds);
// idxBizFanWarningRecordService.update(lambda); // idxBizFanWarningRecordService.update(lambda);
List<FanWarningRecord> list = new ArrayList<>(); // List<FanWarningRecord> list = new ArrayList<>();
for (String traceId : traceIds) { // for (String traceId : traceIds) {
FanWarningRecord fanWarningRecord = new FanWarningRecord(); // FanWarningRecord fanWarningRecord = new FanWarningRecord();
fanWarningRecord.setTs(Long.valueOf(traceId)); // fanWarningRecord.setTs(Long.valueOf(traceId));
fanWarningRecord.setDisposotionDate(DateUtil.now()); // fanWarningRecord.setDisposotionDate(DateUtil.now());
fanWarningRecord.setStatus("1"); // fanWarningRecord.setStatus("1");
fanWarningRecord.setDisposotionState("已确认"); // fanWarningRecord.setDisposotionState("已确认");
list.add(fanWarningRecord); // list.add(fanWarningRecord);
} // }
if(list.size()>0){ // if(list.size()>0){
log.info("XXXXXXXXXXXX风电修改预警状态XXXXXXXXXXXXXXXXXX",JSON.toJSONString(list)); // log.info("XXXXXXXXXXXX风电修改预警状态XXXXXXXXXXXXXXXXXX",JSON.toJSONString(list));
fanWaringRecordMapper.updateStatusByTs(list); // fanWaringRecordMapper.updateStatusByTs(list);
} // }
} }
...@@ -100,27 +118,48 @@ public class WarningRecordStatusMessage extends EmqxListener { ...@@ -100,27 +118,48 @@ public class WarningRecordStatusMessage extends EmqxListener {
public void jxIopUpdatePv(JSONArray analysisResult) { public void jxIopUpdatePv(JSONArray analysisResult) {
log.info("修改预警状态信息:{}", analysisResult); log.info("修改预警状态信息:{}", analysisResult);
List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class); List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class);
taskList = taskList.stream().filter(t-> "2".equals(t.getString("processingStatus"))).collect(Collectors.toList()); taskList = taskList.stream().filter(t -> "2".equals(t.getString("processingStatus"))).collect(Collectors.toList());
List<String> traceIds = taskList.stream().map(t -> t.get("extAttr1").toString()).collect(Collectors.toList());
// LambdaUpdateWrapper<IdxBizPvWarningRecord> lambda = new LambdaUpdateWrapper<>(); taskList.stream().forEach(jsonObject -> {
// lambda.set(IdxBizPvWarningRecord::getDisposotionState, "已处置"); String[] params = jsonObject.getString("extAttr1").split("@");
// lambda.set(IdxBizPvWarningRecord::getStatus, "1"); QueryWrapper<PvWarningRecord> queryWrapper =new QueryWrapper<>();
// lambda.set(IdxBizPvWarningRecord::getDisposotionDate, new Date()); queryWrapper.eq("station",jsonObject.getString("sourceAttributionDesc"));
// lambda.in(IdxBizPvWarningRecord::getSequenceNbr, traceIds); queryWrapper.eq("equipment_name",jsonObject.getString("objectName"));
// idxBizPvWarningRecordService.update(lambda); queryWrapper.eq("point_name",jsonObject.getString("objectIndexValue"));
// td queryWrapper.eq("kks",jsonObject.getString("objectId"));
List<PvWarningRecord> list = new ArrayList<>(); if(params.length==3){
for (String traceId : traceIds) { queryWrapper.eq("subarray",params[0]);
PvWarningRecord pvWarningRecord = new PvWarningRecord(); queryWrapper.eq("health_level",params[1]);
pvWarningRecord.setTs(Long.valueOf(traceId)); queryWrapper.eq("warning_period",params[2]);
pvWarningRecord.setDisposotionDate(DateUtil.now()); }
pvWarningRecord.setStatus("1"); List<PvWarningRecord> pvWarningRecords = pvWaringRecordMapper.selectList(queryWrapper);
pvWarningRecord.setDisposotionState("已确认"); pvWarningRecords.stream().forEach(pvWarningRecord -> {
list.add(pvWarningRecord); pvWarningRecord.setDisposotionDate(DateUtil.now());
} pvWarningRecord.setStatus("1");
if(list.size()>0){ pvWarningRecord.setDisposotionState("已确认");
log.info("XXXXXXXXXXXX光伏修改预警状态XXXXXXXXXXXXXXXXXX",JSON.toJSONString(list)); pvWaringRecordMapper.insert(pvWarningRecord);
pvWaringRecordMapper.updateStatusByTs(list); });
} });
// List<String> traceIds = taskList.stream().map(t -> t.get("extAttr1").toString()).collect(Collectors.toList());
//// LambdaUpdateWrapper<IdxBizPvWarningRecord> lambda = new LambdaUpdateWrapper<>();
//// lambda.set(IdxBizPvWarningRecord::getDisposotionState, "已处置");
//// lambda.set(IdxBizPvWarningRecord::getStatus, "1");
//// lambda.set(IdxBizPvWarningRecord::getDisposotionDate, new Date());
//// lambda.in(IdxBizPvWarningRecord::getSequenceNbr, traceIds);
//// idxBizPvWarningRecordService.update(lambda);
// // td
// List<PvWarningRecord> list = new ArrayList<>();
// for (String traceId : traceIds) {
// PvWarningRecord pvWarningRecord = new PvWarningRecord();
// pvWarningRecord.setTs(Long.valueOf(traceId));
// pvWarningRecord.setDisposotionDate(DateUtil.now());
// pvWarningRecord.setStatus("1");
// pvWarningRecord.setDisposotionState("已确认");
// list.add(pvWarningRecord);
// }
// if (list.size() > 0) {
// log.info("XXXXXXXXXXXX光伏修改预警状态XXXXXXXXXXXXXXXXXX", JSON.toJSONString(list));
// pvWaringRecordMapper.updateStatusByTs(list);
// }
} }
} }
...@@ -1543,14 +1543,12 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1543,14 +1543,12 @@ public class HealthStatusIndicatorServiceImpl {
tabContent.add(tableContentVo10); tabContent.add(tableContentVo10);
tabContent.add(tableContentVo11); tabContent.add(tableContentVo11);
dynamicDetailsVo.setTabContent(tabContent); dynamicDetailsVo.setTabContent(tabContent);
detailsVos.add(dynamicDetailsVo); detailsVos.add(dynamicDetailsVo);
riskBizInfoVo.setDynamicDetails(detailsVos); riskBizInfoVo.setDynamicDetails(detailsVos);
bizMessage.setBizInfo(riskBizInfoVo); bizMessage.setBizInfo(riskBizInfoVo);
bizMessage.setTraceId2(idxBizPvWarningRecord.getTs().toString()); // 子系统@健康指数@预警周期
String traceId2 = idxBizPvWarningRecord.getSubarray()+"@"+idxBizPvWarningRecord.getHealthLevel()+"@"+idxBizPvWarningRecord.getWarningPeriod();
bizMessage.setTraceId2(traceId2);
try { try {
emqKeeper.getMqttClient().publish(SMART_ANALYSE_PV + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false); emqKeeper.getMqttClient().publish(SMART_ANALYSE_PV + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) { } catch (MqttException e) {
...@@ -1610,7 +1608,9 @@ public class HealthStatusIndicatorServiceImpl { ...@@ -1610,7 +1608,9 @@ public class HealthStatusIndicatorServiceImpl {
detailsVos.add(dynamicDetailsVo); detailsVos.add(dynamicDetailsVo);
riskBizInfoVo.setDynamicDetails(detailsVos); riskBizInfoVo.setDynamicDetails(detailsVos);
bizMessage.setBizInfo(riskBizInfoVo); bizMessage.setBizInfo(riskBizInfoVo);
bizMessage.setTraceId2(idxBizFanWarningRecord.getTs().toString()); // 子系统@健康指数@预警周期
String traceId2 = idxBizFanWarningRecord.getSubSystem()+"@"+idxBizFanWarningRecord.getHealthLevel()+"@"+idxBizFanWarningRecord.getWarningPeriod();
bizMessage.setTraceId2(traceId2);
try { try {
emqKeeper.getMqttClient().publish(SMART_ANALYSE_FAN + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false); emqKeeper.getMqttClient().publish(SMART_ANALYSE_FAN + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) { } catch (MqttException e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment