Commit 25285773 authored by chenzhao's avatar chenzhao

Merge branch 'developer' of http://39.98.45.134:8090/moa/amos-boot-biz into developer

parents ad8e7b93 375ff847
......@@ -4,11 +4,9 @@ import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizFanWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.IdxBizPvWarningRecord;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.*;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.FanWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.HealthStatusIndicatorServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.PvWarningRecordServiceImpl;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.FanWaringRecordMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdMapper2.PvWaringRecordMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.FanWarningRecord;
......@@ -16,14 +14,14 @@ import com.yeejoin.amos.boot.module.jxiop.biz.tdengine.PvWarningRecord;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Component
......@@ -38,16 +36,6 @@ public class WarningRecordStatusMessage extends EmqxListener {
public static final String WARNING_CHANGE_MESSAGE = "+/warning/change";
private static final BlockingQueue<JSONArray> blockingQueueFan = new LinkedBlockingQueue<JSONArray>();
private static final BlockingQueue<JSONArray> blockingQueuePv = new LinkedBlockingQueue<JSONArray>();
@Autowired
private IdxBizFanWarningRecordServiceImpl idxBizFanWarningRecordService;
@Autowired
private IdxBizPvWarningRecordServiceImpl idxBizPvWarningRecordService;
@Autowired
private FanWaringRecordMapper fanWaringRecordMapper;
......@@ -62,7 +50,6 @@ public class WarningRecordStatusMessage extends EmqxListener {
@PostConstruct
void init() throws Exception {
new Thread(taskRunnable).start();
emqKeeper.subscript(WARNING_CHANGE_MESSAGE, 2, this);
}
......@@ -71,35 +58,16 @@ public class WarningRecordStatusMessage extends EmqxListener {
if (topic.contains(HealthStatusIndicatorServiceImpl.SMART_ANALYSE_PV)) {
log.info("预警状态改变消息-光伏{}", new String(message.getPayload()));
JSONArray ja = JSON.parseArray(new String(message.getPayload()));
blockingQueuePv.add(ja);
jxIopUpdatePv(ja);
} else if (topic.contains(HealthStatusIndicatorServiceImpl.SMART_ANALYSE_FAN)) {
log.info("预警状态改变消息-风电{}", new String(message.getPayload()));
JSONArray ja = JSON.parseArray(new String(message.getPayload()));
blockingQueueFan.add(ja);
jxIopUpdateFan(ja);
}
}
Runnable taskRunnable = new Runnable() {
@Override
public void run() {
boolean isRun = true;
int k = 0;
while (isRun) {
k++;
isRun = k < Integer.MAX_VALUE;
try {
JSONArray analysisResultFan = blockingQueueFan.take();
jxIopUpdateFan(analysisResultFan);
JSONArray analysisResultPv = blockingQueuePv.take();
jxIopUpdatePv(analysisResultPv);
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
@Async("async")
public void jxIopUpdateFan(JSONArray analysisResult) {
log.info("修改预警状态信息:{}", analysisResult);
List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class);
......@@ -124,6 +92,7 @@ public class WarningRecordStatusMessage extends EmqxListener {
}
@Async("async")
public void jxIopUpdatePv(JSONArray analysisResult) {
log.info("修改预警状态信息:{}", analysisResult);
List<JSONObject> taskList = JSONObject.parseArray(analysisResult.toJSONString(), JSONObject.class);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment