Commit 8593722a authored by xixinzhao's avatar xixinzhao

kafka接消息逻辑中处理稳压泵

parent a9ef77d7
...@@ -622,6 +622,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -622,6 +622,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override @Override
public void afterCommit() { public void afterCommit() {
iotDatalist.forEach(iotDataVO -> {
String indexKey = iotDataVO.getKey();
String indexValue = iotDataVO.getValue().toString();
// 稳压泵启停信号处理
if (indexKey.equals(pressurePumpStart)) {
pressurePump(indexKey, indexValue, iotDatalist, topicEntity);
}
});
equipmentSpecificAlarms.forEach(action -> { equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) { if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action)); alarmLogs.add(addEquipAlarmLogRecord(action));
...@@ -900,6 +910,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -900,6 +910,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override @Override
public void afterCommit() { public void afterCommit() {
iotDatalist.forEach(iotDataVO -> {
String indexKey = iotDataVO.getKey();
String indexValue = iotDataVO.getValue().toString();
// 稳压泵启停信号处理
if (indexKey.equals(pressurePumpStart)) {
pressurePump(indexKey, indexValue, iotDatalist, topicEntity);
}
});
equipmentSpecificAlarms.forEach(action -> { equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) { if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action)); alarmLogs.add(addEquipAlarmLogRecord(action));
...@@ -2460,6 +2480,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2460,6 +2480,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// } // }
private void pressurePump(String indexKey, String indexValue, List<IotDataVO> iotDatalist, TopicEntityVo topicEntity) { private void pressurePump(String indexKey, String indexValue, List<IotDataVO> iotDatalist, TopicEntityVo topicEntity) {
log.info("开始处理稳压泵逻辑:{}值:{}", indexKey, indexValue);
List<String> listIndex = new ArrayList<>(); List<String> listIndex = new ArrayList<>();
listIndex.add(pressurePumpStart); listIndex.add(pressurePumpStart);
// 获取全部启停泵信号 // 获取全部启停泵信号
...@@ -2487,6 +2508,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2487,6 +2508,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue()); PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue());
assert valueEnum != null; assert valueEnum != null;
EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey, valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum); EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey, valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum);
log.info("稳压泵获取{}, 值为{}", valueEnum.getDescribe(), data);
Date newDate = new Date(); Date newDate = new Date();
// 2. 校验 // 2. 校验
if (!ObjectUtils.isEmpty(data.getUpdateDate())) { if (!ObjectUtils.isEmpty(data.getUpdateDate())) {
...@@ -2513,7 +2535,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2513,7 +2535,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
} else { } else {
throw new BadRequest("装备物联编码错误,请确认!"); throw new BadRequest("装备物联编码错误,请确认!");
} }
switch (valueEnum) { switch (valueEnum) {
case LAST_STOP: case LAST_STOP:
List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e -> List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e ->
...@@ -2613,6 +2634,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2613,6 +2634,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
assert pumpCheckEnum != null; assert pumpCheckEnum != null;
String leftValue = pressurePumpEnum.getLeftValue(); String leftValue = pressurePumpEnum.getLeftValue();
String rightValue = pressurePumpEnum.getRightValue(); String rightValue = pressurePumpEnum.getRightValue();
log.info("检验方式:{},大于:{},小于:{}, 间隔:{}", pumpCheckEnum.getDescribe(), leftValue, rightValue, diff);
switch (pumpCheckEnum) { switch (pumpCheckEnum) {
case LE: case LE:
if (StringUtil.isNotEmpty(rightValue)) { if (StringUtil.isNotEmpty(rightValue)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment