Commit 91e7cab3 authored by zhangsen's avatar zhangsen

bug修改

parent 883405a2
package com.yeejoin.amos.boot.module.jxiop.biz.emqx;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.HealthStatusIndicatorServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvWarningRecordServiceImpl;
import lombok.extern.slf4j.Slf4j;
......@@ -16,9 +18,10 @@ import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
@Component
@Slf4j
......@@ -26,11 +29,15 @@ public class WarningRecordStatusMessage extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
/**
* 预警状态修改消息 - 标准化
*/
public static final String WARNING_CHANGE_MESSAGE = "+/warning/change";
// 江西电建接收红黄绿码主题
private static final String QUESTION_STATUS_CHANGE = "question/status/change";
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<JSONObject>();
private static final BlockingQueue<JSONArray> blockingQueueFan = new LinkedBlockingQueue<JSONArray>();
private static final BlockingQueue<JSONArray> blockingQueuePv = new LinkedBlockingQueue<JSONArray>();
@Autowired
private IdxBizFanWarningRecordServiceImpl idxBizFanWarningRecordService;
......@@ -41,14 +48,21 @@ public class WarningRecordStatusMessage extends EmqxListener {
@PostConstruct
void init() throws Exception {
new Thread(taskRunnable).start();
emqKeeper.subscript(QUESTION_STATUS_CHANGE, 2, this);
emqKeeper.subscript(WARNING_CHANGE_MESSAGE, 2, this);
}
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
log.info("人员赋码消息{}", new String(message.getPayload()));
JSONObject ja = JSON.parseObject(new String(message.getPayload()));
blockingQueue.add(ja);
if (topic.contains(HealthStatusIndicatorServiceImpl.SMART_ANALYSE_PV)) {
log.info("预警状态改变消息-光伏{}", new String(message.getPayload()));
JSONArray ja = JSON.parseArray(new String(message.getPayload()));
blockingQueuePv.add(ja);
} else if (topic.contains(HealthStatusIndicatorServiceImpl.SMART_ANALYSE_FAN)) {
log.info("预警状态改变消息-风电{}", new String(message.getPayload()));
JSONArray ja = JSON.parseArray(new String(message.getPayload()));
blockingQueueFan.add(ja);
}
}
Runnable taskRunnable = new Runnable() {
......@@ -60,8 +74,10 @@ public class WarningRecordStatusMessage extends EmqxListener {
k++;
isRun = k < Integer.MAX_VALUE;
try {
JSONObject analysisResult = blockingQueue.take();
jxIopUpdate(analysisResult);
JSONArray analysisResultFan = blockingQueueFan.take();
jxIopUpdateFan(analysisResultFan);
JSONArray analysisResultPv = blockingQueuePv.take();
jxIopUpdatePv(analysisResultPv);
} catch (Exception e) {
e.printStackTrace();
}
......@@ -69,22 +85,28 @@ public class WarningRecordStatusMessage extends EmqxListener {
}
};
public void jxIopUpdate(JSONObject analysisResult) {
public void jxIopUpdateFan(JSONArray analysisResult) {
log.info("修改预警状态信息:{}", analysisResult);
if (ObjectUtils.isNotEmpty(analysisResult) && analysisResult.get("warningObjectType").toString().equals("fan")) {
List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class);
List<String> traceIds = taskList.stream().map(t -> t.get("traceId").toString()).collect(Collectors.toList());
LambdaUpdateWrapper<IdxBizFanWarningRecord> lambda = new LambdaUpdateWrapper<>();
lambda.set(IdxBizFanWarningRecord::getDisposotionState, "已处置");
lambda.set(IdxBizFanWarningRecord::getStatus, "1");
lambda.set(IdxBizFanWarningRecord::getDisposotionDate, new Date());
lambda.eq(IdxBizFanWarningRecord::getSequenceNbr, analysisResult.get("objectId"));
lambda.in(IdxBizFanWarningRecord::getSequenceNbr, traceIds);
idxBizFanWarningRecordService.update(lambda);
} else if (ObjectUtils.isNotEmpty(analysisResult) && analysisResult.get("warningObjectType").toString().equals("pv")) {
}
public void jxIopUpdatePv(JSONArray analysisResult) {
log.info("修改预警状态信息:{}", analysisResult);
List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class);
List<String> traceIds = taskList.stream().map(t -> t.get("traceId").toString()).collect(Collectors.toList());
LambdaUpdateWrapper<IdxBizPvWarningRecord> lambda = new LambdaUpdateWrapper<>();
lambda.set(IdxBizPvWarningRecord::getDisposotionState, "已处置");
lambda.set(IdxBizPvWarningRecord::getStatus, "1");
lambda.set(IdxBizPvWarningRecord::getDisposotionDate, new Date());
lambda.eq(IdxBizPvWarningRecord::getSequenceNbr, analysisResult.get("objectId"));
lambda.in(IdxBizPvWarningRecord::getSequenceNbr, traceIds);
idxBizPvWarningRecordService.update(lambda);
}
}
}
package com.yeejoin.amos.boot.module.jxiop.biz.emqx;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvWarningRecordServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 处理无需处置情况
*/
@Component
@Slf4j
public class WarningRecordStatusMessage2 extends EmqxListener {
@Autowired
protected EmqKeeper emqKeeper;
/**
* 无需处置
*/
public static final String NOT_DISPOSE_AMOS = "not/dispose/amos";
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<JSONObject>();
@Autowired
private IdxBizFanWarningRecordServiceImpl idxBizFanWarningRecordService;
@Autowired
private IdxBizPvWarningRecordServiceImpl idxBizPvWarningRecordService;
@PostConstruct
void init() throws Exception {
new Thread(taskRunnable).start();
emqKeeper.subscript(NOT_DISPOSE_AMOS, 2, this);
}
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
log.info("修改预警状态消息{}", new String(message.getPayload()));
JSONObject ja = JSON.parseObject(new String(message.getPayload()));
blockingQueue.add(ja);
}
Runnable taskRunnable = new Runnable() {
@Override
public void run() {
boolean isRun = true;
int k = 0;
while (isRun) {
k++;
isRun = k < Integer.MAX_VALUE;
try {
JSONObject analysisResult = blockingQueue.take();
jxIopUpdate(analysisResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
public void jxIopUpdate(JSONObject analysisResult) {
log.info("修改预警状态信息:{}", analysisResult);
if (ObjectUtils.isNotEmpty(analysisResult) && analysisResult.get("warningObjectType").toString().equals("fan")) {
LambdaUpdateWrapper<IdxBizFanWarningRecord> lambda = new LambdaUpdateWrapper<>();
lambda.set(IdxBizFanWarningRecord::getDisposotionState, "已处置");
lambda.set(IdxBizFanWarningRecord::getStatus, "1");
lambda.set(IdxBizFanWarningRecord::getDisposotionDate, new Date());
List<String> traceIds = (List<String>) analysisResult.get("traceIds");
lambda.in(IdxBizFanWarningRecord::getSequenceNbr, traceIds);
idxBizFanWarningRecordService.update(lambda);
} else if (ObjectUtils.isNotEmpty(analysisResult) && analysisResult.get("warningObjectType").toString().equals("pv")) {
LambdaUpdateWrapper<IdxBizPvWarningRecord> lambda = new LambdaUpdateWrapper<>();
lambda.set(IdxBizPvWarningRecord::getDisposotionState, "已处置");
lambda.set(IdxBizPvWarningRecord::getStatus, "1");
lambda.set(IdxBizPvWarningRecord::getDisposotionDate, new Date());
List<String> traceIds = (List<String>) analysisResult.get("traceIds");
lambda.in(IdxBizPvWarningRecord::getSequenceNbr, traceIds);
idxBizPvWarningRecordService.update(lambda);
}
}
}
//package com.yeejoin.amos.boot.module.jxiop.biz.emqx;
//
//import com.alibaba.fastjson.JSON;
//import com.alibaba.fastjson.JSONObject;
//import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
//import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
//import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanWarningRecord;
//import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvWarningRecord;
//import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizFanWarningRecordServiceImpl;
//import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.IdxBizPvWarningRecordServiceImpl;
//import lombok.extern.slf4j.Slf4j;
//import org.eclipse.paho.client.mqttv3.MqttMessage;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//import org.typroject.tyboot.component.emq.EmqKeeper;
//import org.typroject.tyboot.component.emq.EmqxListener;
//
//import javax.annotation.PostConstruct;
//import java.util.Date;
//import java.util.List;
//import java.util.concurrent.BlockingQueue;
//import java.util.concurrent.LinkedBlockingQueue;
//
///**
// * 处理无需处置情况
// */
//@Component
//@Slf4j
//public class WarningRecordStatusMessage2 extends EmqxListener {
//
// @Autowired
// protected EmqKeeper emqKeeper;
//
// /**
// * 无需处置
// */
// public static final String NOT_DISPOSE_AMOS = "not/dispose/amos";
//
// private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<JSONObject>();
//
// @Autowired
// private IdxBizFanWarningRecordServiceImpl idxBizFanWarningRecordService;
//
// @Autowired
// private IdxBizPvWarningRecordServiceImpl idxBizPvWarningRecordService;
//
// @PostConstruct
// void init() throws Exception {
// new Thread(taskRunnable).start();
// emqKeeper.subscript(NOT_DISPOSE_AMOS, 2, this);
// }
//
// @Override
// public void processMessage(String topic, MqttMessage message) throws Exception {
// log.info("修改预警状态消息{}", new String(message.getPayload()));
// JSONObject ja = JSON.parseObject(new String(message.getPayload()));
// blockingQueue.add(ja);
// }
//
// Runnable taskRunnable = new Runnable() {
// @Override
// public void run() {
// boolean isRun = true;
// int k = 0;
// while (isRun) {
// k++;
// isRun = k < Integer.MAX_VALUE;
// try {
// JSONObject analysisResult = blockingQueue.take();
// jxIopUpdate(analysisResult);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// }
// };
//
// public void jxIopUpdate(JSONObject analysisResult) {
// log.info("修改预警状态信息:{}", analysisResult);
// if (ObjectUtils.isNotEmpty(analysisResult) && analysisResult.get("warningObjectType").toString().equals("fan")) {
// LambdaUpdateWrapper<IdxBizFanWarningRecord> lambda = new LambdaUpdateWrapper<>();
// lambda.set(IdxBizFanWarningRecord::getDisposotionState, "已处置");
// lambda.set(IdxBizFanWarningRecord::getStatus, "1");
// lambda.set(IdxBizFanWarningRecord::getDisposotionDate, new Date());
// List<String> traceIds = (List<String>) analysisResult.get("traceIds");
// lambda.in(IdxBizFanWarningRecord::getSequenceNbr, traceIds);
// idxBizFanWarningRecordService.update(lambda);
// } else if (ObjectUtils.isNotEmpty(analysisResult) && analysisResult.get("warningObjectType").toString().equals("pv")) {
// LambdaUpdateWrapper<IdxBizPvWarningRecord> lambda = new LambdaUpdateWrapper<>();
// lambda.set(IdxBizPvWarningRecord::getDisposotionState, "已处置");
// lambda.set(IdxBizPvWarningRecord::getStatus, "1");
// lambda.set(IdxBizPvWarningRecord::getDisposotionDate, new Date());
// List<String> traceIds = (List<String>) analysisResult.get("traceIds");
// lambda.in(IdxBizPvWarningRecord::getSequenceNbr, traceIds);
// idxBizPvWarningRecordService.update(lambda);
// }
// }
//}
......@@ -162,4 +162,9 @@ public class IdxBizFanHealthIndex {
@TableField("ANALYSIS_TIME")
private String ANALYSISTIME;
/**
* KKS码
*/
@TableField("KKS")
private String kks;
}
......@@ -145,4 +145,10 @@ public class IdxBizFanWarningRecord{
@TableField("DISPOSOTION_DATE")
private Date disposotionDate;
/**
* KKS码
*/
@TableField("KKS")
private String kks;
}
......@@ -172,4 +172,10 @@ public class IdxBizPvHealthIndex{
@TableField("ANALYSIS_TIME")
private String ANALYSISTIME;
/**
* KKS码
*/
@TableField("KKS")
private String kks;
}
......@@ -146,4 +146,10 @@ public class IdxBizPvWarningRecord{
@TableField("DISPOSOTION_DATE")
private Date disposotionDate;
/**
* KKS码
*/
@TableField("KKS")
private String kks;
}
......@@ -95,9 +95,14 @@ public class HealthStatusIndicatorServiceImpl {
public static final String INDEX_KEY_PV = "THFX#FXGF#ZNFX";
/**
* 智能分析触发预警系统标识
* 智能分析触发预警系统标识 - 光伏
*/
public static final String SMART_ANALYSE = "smartAnalyse";
public static final String SMART_ANALYSE_PV = "smartAnalysePv";
/**
* 智能分析触发预警系统标识 - 风电
*/
public static final String SMART_ANALYSE_FAN = "smartAnalyseFan";
/***
......@@ -210,6 +215,7 @@ public class HealthStatusIndicatorServiceImpl {
if (!level.equals("") && flag == 0){
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(0).getKks());
idxBizPvWarningRecord.setRecord(idxBizPvHealthIndices.get(0).getRecord());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArae());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(0).getStation());
......@@ -353,6 +359,7 @@ public class HealthStatusIndicatorServiceImpl {
if (!level.equals("") && flag == 0){
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(0).getKks());
idxBizPvWarningRecord.setRecord(idxBizPvHealthIndices.get(0).getRecord());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArae());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(0).getStation());
......@@ -490,6 +497,7 @@ public class HealthStatusIndicatorServiceImpl {
if (!level.equals("") && flag == 0){
IdxBizPvWarningRecord idxBizPvWarningRecord = new IdxBizPvWarningRecord();
idxBizPvWarningRecord.setKks(idxBizPvHealthIndices.get(0).getKks());
idxBizPvWarningRecord.setRecord(idxBizPvHealthIndices.get(0).getRecord());
idxBizPvWarningRecord.setArae(idxBizPvHealthIndices.get(0).getArae());
idxBizPvWarningRecord.setStation(idxBizPvHealthIndices.get(0).getStation());
......@@ -629,6 +637,7 @@ public class HealthStatusIndicatorServiceImpl {
if (!level.equals("") && flag == 0){
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(0).getKks());
idxBizFanWarningRecord.setRecord(idxBizFanHealthIndices.get(0).getRecord());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArae());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(0).getStation());
......@@ -767,6 +776,7 @@ public class HealthStatusIndicatorServiceImpl {
if (!level.equals("") && flag == 0){
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(0).getKks());
idxBizFanWarningRecord.setRecord(idxBizFanHealthIndices.get(0).getRecord());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArae());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(0).getStation());
......@@ -907,6 +917,7 @@ public class HealthStatusIndicatorServiceImpl {
int flag = ObjectUtils.isEmpty(idxBizFanWarningRecords) || WarningNameEnum.getCode(level) > WarningNameEnum.getCode(idxBizFanWarningRecords.get(0).getWarningName()) ? 0 :1;
if (!level.equals("") && flag == 0){
IdxBizFanWarningRecord idxBizFanWarningRecord = new IdxBizFanWarningRecord();
idxBizFanWarningRecord.setKks(idxBizFanHealthIndices.get(0).getKks());
idxBizFanWarningRecord.setRecord(idxBizFanHealthIndices.get(0).getRecord());
idxBizFanWarningRecord.setArae(idxBizFanHealthIndices.get(0).getArae());
idxBizFanWarningRecord.setStation(idxBizFanHealthIndices.get(0).getStation());
......@@ -950,10 +961,10 @@ public class HealthStatusIndicatorServiceImpl {
bizMessage.setTraceId(idxBizPvWarningRecord.getSequenceNbr());
RiskBizInfoVo riskBizInfoVo = new RiskBizInfoVo();
riskBizInfoVo.setWarningObjectName(idxBizPvWarningRecord.getEquipmentName());
riskBizInfoVo.setWarningObjectCode(idxBizPvWarningRecord.getSequenceNbr());
riskBizInfoVo.setWarningObjectCode(idxBizPvWarningRecord.getKks());
riskBizInfoVo.setSourceAttribution(stationMap.get(idxBizPvWarningRecord.getGatewayId()).getProjectOrgCode());
riskBizInfoVo.setSourceAttributionDesc(idxBizPvWarningRecord.getStation());
riskBizInfoVo.setWarningObjectType("pv");
riskBizInfoVo.setWarningObjectType("equip");
List<RiskDynamicDetailsVo> detailsVos = new ArrayList<>();
RiskDynamicDetailsVo dynamicDetailsVo = new RiskDynamicDetailsVo();
dynamicDetailsVo.setTabName("预警详情");
......@@ -961,7 +972,7 @@ public class HealthStatusIndicatorServiceImpl {
riskBizInfoVo.setDynamicDetails(detailsVos);
bizMessage.setBizInfo(riskBizInfoVo);
try {
emqKeeper.getMqttClient().publish(SMART_ANALYSE + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
emqKeeper.getMqttClient().publish(SMART_ANALYSE_PV + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) {
e.printStackTrace();
}
......@@ -984,10 +995,10 @@ public class HealthStatusIndicatorServiceImpl {
bizMessage.setTraceId(idxBizFanWarningRecord.getSequenceNbr());
RiskBizInfoVo riskBizInfoVo = new RiskBizInfoVo();
riskBizInfoVo.setWarningObjectName(idxBizFanWarningRecord.getEquipmentName());
riskBizInfoVo.setWarningObjectCode(idxBizFanWarningRecord.getSequenceNbr());
riskBizInfoVo.setWarningObjectCode(idxBizFanWarningRecord.getKks());
riskBizInfoVo.setSourceAttribution(stationMap.get(idxBizFanWarningRecord.getGatewayId()).getProjectOrgCode());
riskBizInfoVo.setSourceAttributionDesc(idxBizFanWarningRecord.getStation());
riskBizInfoVo.setWarningObjectType("fan");
riskBizInfoVo.setWarningObjectType("equip");
List<RiskDynamicDetailsVo> detailsVos = new ArrayList<>();
RiskDynamicDetailsVo dynamicDetailsVo = new RiskDynamicDetailsVo();
dynamicDetailsVo.setTabName("预警详情");
......@@ -995,7 +1006,7 @@ public class HealthStatusIndicatorServiceImpl {
riskBizInfoVo.setDynamicDetails(detailsVos);
bizMessage.setBizInfo(riskBizInfoVo);
try {
emqKeeper.getMqttClient().publish(SMART_ANALYSE + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
emqKeeper.getMqttClient().publish(SMART_ANALYSE_FAN + "/data/analysis", JSON.toJSONString(bizMessage).getBytes(StandardCharsets.UTF_8), 2, false);
} catch (MqttException e) {
e.printStackTrace();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment