Commit 409c5167 authored by KeYong's avatar KeYong

Merge branch 'develop_dl_3.7.0.9_huaian' of…

Merge branch 'develop_dl_3.7.0.9_huaian' of http://39.98.45.134:8090/moa/amos-boot-biz into develop_dl_3.7.0.9_huaian
parents 8e991fd8 68873b35
...@@ -52,7 +52,11 @@ public class RedisKey { ...@@ -52,7 +52,11 @@ public class RedisKey {
* 装备指标Key值 * 装备指标Key值
*/ */
public static final String EQUIP_INDEX_ADDRESS_KEY = "equip_index_address_key"; public static final String EQUIP_INDEX_ADDRESS_KEY = "equip_index_address_key";
/**
* 韶山换流站指标Key
*/
public static final String EQUIP_INDEX_ADDRESS_KEY_STATION = "equip_index_address_key_station";
/** 驼峰转下划线(简单写法,效率低于 ) */ /** 驼峰转下划线(简单写法,效率低于 ) */
public static String humpToLine(String str) { public static String humpToLine(String str) {
return str.replaceAll("[A-Z]", "_$0").toLowerCase(); return str.replaceAll("[A-Z]", "_$0").toLowerCase();
......
...@@ -72,6 +72,9 @@ public class EquipmentIndexVO { ...@@ -72,6 +72,9 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "信号的索引键key,用于唯一索引信号") @ApiModelProperty(value = "信号的索引键key,用于唯一索引信号")
private String indexAddress; private String indexAddress;
@ApiModelProperty(value = "信号的索引键pointId,用于唯一索引信号")
private String eventAddress;
@ApiModelProperty(value = "测点类型,analog/state") @ApiModelProperty(value = "测点类型,analog/state")
private String dataType; private String dataType;
......
package com.yeejoin.equipmanage.common.enums;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
public interface MqttConstant {
String TRUE = "true";
String FALSE = "false";
String STATE = "state";
String DIS_CREATE = "discreate";
String ONE_1 = "1";
String ONE_1_0 = "1.0";
}
package com.yeejoin.equipmanage.common.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class SShanMessage {
private String eventTextL1;
private String pointId;
private String time;
private String deviceId;
}
package com.yeejoin.equipmanage.common.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.List;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class SShanStationMessage {
private String timestamp;
private List<SShanMessage> warns;
}
package com.yeejoin.equipmanage.common.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* 对接苏州,绍兴换流站Kafka数据
* @author LiuLin
* @date 2023年08月02日 11:02
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class StationMessage {
private String dataType;
private String value;
private String timeStamp;
private String quality;
private String scadaId;
private String key;
private String disCreate;
private String name;
}
...@@ -33,6 +33,7 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner { ...@@ -33,6 +33,7 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner {
log.info(">>服务启动执行,执行预加载数据等操作"); log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS); redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY); redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY_STATION);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null); List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream() Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null) .filter(v -> v.getGatewayId() != null)
...@@ -40,8 +41,12 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner { ...@@ -40,8 +41,12 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner {
Map<String, Object> equipmentIndexKeyMap = equipSpecificIndexList.stream() Map<String, Object> equipmentIndexKeyMap = equipSpecificIndexList.stream()
.filter(v -> v.getIndexAddress() != null && v.getGatewayId() == null) .filter(v -> v.getIndexAddress() != null && v.getGatewayId() == null)
.collect(Collectors.toMap(EquipmentIndexVO::getIndexAddress, Function.identity(),(v1, v2) -> v1)); .collect(Collectors.toMap(EquipmentIndexVO::getIndexAddress, Function.identity(),(v1, v2) -> v1));
Map<String, Object> equipmentIndexEventMap = equipSpecificIndexList.stream()
.filter(v -> v.getEventAddress() != null)
.collect(Collectors.toMap(EquipmentIndexVO::getEventAddress, Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap); redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY, equipmentIndexKeyMap); redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY, equipmentIndexKeyMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY_STATION, equipmentIndexEventMap);
} }
} }
\ No newline at end of file
...@@ -126,6 +126,7 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -126,6 +126,7 @@ public class EquipmentIotMqttReceiveConfig {
list.add("+/+/event"); // 添加iot事件监听 list.add("+/+/event"); // 添加iot事件监听
list.add("+/+/transmit"); // 添加交换站事件监听 list.add("+/+/transmit"); // 添加交换站事件监听
list.add("+/+/perspective"); // 添加交换站事件监听 list.add("+/+/perspective"); // 添加交换站事件监听
list.add("+/+/shaoshan"); // 添加换流站韶山监听事件
String[] arr = list.toArray(new String[list.size()]); String[] arr = list.toArray(new String[list.size()]);
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr); adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr);
adapter.setCompletionTimeout(completionTimeout); adapter.setCompletionTimeout(completionTimeout);
...@@ -151,8 +152,8 @@ public class EquipmentIotMqttReceiveConfig { ...@@ -151,8 +152,8 @@ public class EquipmentIotMqttReceiveConfig {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg); mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){ }else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttRomaMessage(topic,msg); mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)){ }else if (dataType.equals("shaoshan") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttIotMessage(topic,msg); mqttReceiveService.handlerMqttStationMessage(topic,msg);
} }
} }
}; };
......
...@@ -114,5 +114,5 @@ public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecif ...@@ -114,5 +114,5 @@ public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecif
List<EquipmentSpecificIndex> getEquipIndexInIndex(@Param("list") List<String> listIndex); List<EquipmentSpecificIndex> getEquipIndexInIndex(@Param("list") List<String> listIndex);
EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId); EquipmentSpecificIndex getEquipmentSpeIndexByAddress(String indexAddress, String eventAddress, String gatewayId);
} }
...@@ -40,5 +40,5 @@ public interface IEquipmentSpecificIndexService extends IService<EquipmentSpecif ...@@ -40,5 +40,5 @@ public interface IEquipmentSpecificIndexService extends IService<EquipmentSpecif
* @param indexAddress indexAddress * @param indexAddress indexAddress
* @return EquipmentSpecificIndex * @return EquipmentSpecificIndex
*/ */
EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId); EquipmentSpecificIndex getEquipmentSpeIndexByAddress(String indexAddress, String eventAddress, String gatewayId);
} }
...@@ -30,5 +30,5 @@ public interface MqttReceiveService { ...@@ -30,5 +30,5 @@ public interface MqttReceiveService {
* @param topic 主题 * @param topic 主题
* @param message 消息内容 * @param message 消息内容
*/ */
void handlerMqttIotMessage(String topic, String message); void handlerMqttStationMessage(String topic, String message);
} }
...@@ -30,7 +30,7 @@ public class EquipmentSpecificIndexServiceImpl extends ServiceImpl<EquipmentSpec ...@@ -30,7 +30,7 @@ public class EquipmentSpecificIndexServiceImpl extends ServiceImpl<EquipmentSpec
} }
@Override @Override
public EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId) { public EquipmentSpecificIndex getEquipmentSpeIndexByAddress(String indexAddress,String eventAddress, String gatewayId) {
return this.baseMapper.getEquipmentSpeIndexByIndexAddress(indexAddress,gatewayId); return this.baseMapper.getEquipmentSpeIndexByAddress(indexAddress, eventAddress, gatewayId);
} }
} }
...@@ -522,7 +522,7 @@ ...@@ -522,7 +522,7 @@
</select> </select>
<!-- 根据信号索引查询装备性能指标 --> <!-- 根据信号索引查询装备性能指标 -->
<select id="getEquipmentSpeIndexByIndexAddress" <select id="getEquipmentSpeIndexByAddress"
resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex"> resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
SELECT wesi.id AS id, SELECT wesi.id AS id,
wei.name_key AS nameKey, wei.name_key AS nameKey,
...@@ -561,10 +561,16 @@ ...@@ -561,10 +561,16 @@
LEFT JOIN wl_equipment_specific AS wes ON wes.id = wesi.equipment_specific_id LEFT JOIN wl_equipment_specific AS wes ON wes.id = wesi.equipment_specific_id
LEFT JOIN wl_equipment_detail ed ON ed.id = wes.equipment_detail_id LEFT JOIN wl_equipment_detail ed ON ed.id = wes.equipment_detail_id
LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id
WHERE <where>
wesi.index_address = #{indexAddress} <if test="indexAddress != null">
<if test="gatewayId != null"> AND wesi.index_address = #{indexAddress}
AND wesi.gateway_id = #{gatewayId} </if>
</if> <if test="eventAddress != null">
AND wesi.event_address = #{eventAddress}
</if>
<if test="gatewayId != null">
AND wesi.gateway_id = #{gatewayId}
</if>
</where>
</select> </select>
</mapper> </mapper>
\ No newline at end of file
...@@ -474,21 +474,22 @@ ...@@ -474,21 +474,22 @@
DROP PROCEDURE IF EXISTS `updatePlanTask`; DROP PROCEDURE IF EXISTS `updatePlanTask`;
</createProcedure> </createProcedure>
</changeSet> </changeSet>
<changeSet author="gaodongdong" id="1610421278000-21-230419" runAlways="true"> <changeSet author="gaodongdong" id="1610421278000-21-230915" runAlways="true">
<createProcedure procedureName="updatePlanTask" > <createProcedure procedureName="updatePlanTask" >
CREATE PROCEDURE `updatePlanTask`(IN `planTaskId` BIGINT,IN `pointId` BIGINT,IN `planTaskDetailId` BIGINT,IN `executorId` BIGINT) CREATE DEFINER=`root`@`%` PROCEDURE `updatePlanTask`(IN `planTaskId` BIGINT,IN `pointId` BIGINT,IN `planTaskDetailId` BIGINT,IN `executorId` BIGINT)
BEGIN BEGIN
declare num int ; declare num int ;
declare orgCode VARCHAR(50) ; declare orgCode VARCHAR(50) ;
declare currentTaskNum int ; declare currentTaskNum int ;
declare pointNum int ;
update p_plan_task_detail set is_finish = 1 where id=planTaskDetailId; update p_plan_task_detail set is_finish = 1 where id=planTaskDetailId;
select finish_num, org_code from p_plan_task where id = planTaskId into num, orgCode; select finish_num, org_code, point_num from p_plan_task where id = planTaskId into num, orgCode, pointNum;
select count(1) into currentTaskNum from p_plan_task_detail where task_no = planTaskId and is_finish in(0,2); select count(1) into currentTaskNum from p_plan_task_detail where task_no = planTaskId and is_finish in(0,2);
UPDATE p_plan_task_detail SET executor_id = executorId,executor_date = current_timestamp where id = planTaskDetailId; UPDATE p_plan_task_detail SET executor_id = executorId,executor_date = current_timestamp where id = planTaskDetailId;
if currentTaskNum > 0 THEN if currentTaskNum > 0 THEN
update p_plan_task set finish_num = (num + 1) where id = planTaskId; update p_plan_task set finish_num = (num + 1) where id = planTaskId and pointNum > num;
ELSE ELSE
update p_plan_task set finish_num = (num + 1), finish_status = 2 where id = planTaskId; update p_plan_task set finish_num = (num + 1), finish_status = 2 where id = planTaskId and pointNum > num;
end if; end if;
if executorId > 0 then if executorId > 0 then
call planTaskStatistics(executorId, null, orgCode); call planTaskStatistics(executorId, null, orgCode);
......
...@@ -9,8 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener; ...@@ -9,8 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Optional; import java.util.Optional;
...@@ -23,34 +21,36 @@ import java.util.Optional; ...@@ -23,34 +21,36 @@ import java.util.Optional;
@Slf4j @Slf4j
@Service @Service
public class KafkaConsumerService { public class KafkaConsumerService {
private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String MQTT_TOPIC_SHAOSHAN = "romaSite/data/shaoshan";
@Autowired @Autowired
protected EmqKeeper emqKeeper; protected EmqKeeper emqKeeper;
private static final String MQTT_TOPIC = "romaSite/data/transmit";
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
*
* @param message 消息 * @param message 消息
*
*/ */
@KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "consumerSingle", idIsGroup = false, topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2")
public void consumerSingle(String message,Acknowledgment ack) { public void consumerSingle(String message, Acknowledgment ack) {
JSONObject messageObj = JSONObject.fromObject(message); JSONObject messageObj = JSONObject.fromObject(message);
try { try {
String topic = messageObj.getString("topic"); String topic = messageObj.getString("topic");
JSONObject data = messageObj.getJSONObject("data"); JSONObject data = messageObj.getJSONObject("data");
emqKeeper.getMqttClient().publish(topic, data.toString().getBytes("UTF-8"), 0,false); emqKeeper.getMqttClient().publish(topic, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); log.error("解析数据失败,{}", e.getMessage());
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (Exception e) {
// log.info("单条消息 ====> message: {}", message);
} }
// log.info("单条消息 ====> message: {}", message);
ack.acknowledge();
} }
/**
* 绍兴,苏州换流站对接Kafka数据
* @param record record
* @param ack ack
*/
@KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory") @KafkaListener(id = "kafkaRoma", groupId = "kafkaRoma", topics = "#{'${queue.kafka.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) { public void kafkaListener(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
...@@ -58,17 +58,37 @@ public class KafkaConsumerService { ...@@ -58,17 +58,37 @@ public class KafkaConsumerService {
try { try {
JSONObject messageObj = JSONObject.fromObject(record.value()); JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body"); JSONObject data = messageObj.getJSONObject("body");
if (data.size() == 0){ if (data.isEmpty()) {
data = messageObj; data = messageObj;
data.put("datatype","state"); data.put("datatype", "state");
} }
log.info("接收到Roma消息对象: {}", data); log.info("接收到Roma消息对象: {}", data);
emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false); emqKeeper.getMqttClient().publish(MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage());
}
}
}
/**
* 韶山换流对接Kafka
* @param record record
* @param ack ack
*/
@KafkaListener(id = "kafkaConsumer", groupId = "kafkaConsumerGroup", topics = "#{'${queue.kafka.shaoshan.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public void kafkaConsumer(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
JSONObject messageObj = JSONObject.fromObject(record.value());
JSONObject data = messageObj.getJSONObject("body");
emqKeeper.getMqttClient().publish(MQTT_TOPIC_SHAOSHAN, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
e.printStackTrace(); log.error("解析数据失败,{}", e.getMessage());
} }
} }
ack.acknowledge();
} }
...@@ -115,6 +135,4 @@ public class KafkaConsumerService { ...@@ -115,6 +135,4 @@ public class KafkaConsumerService {
// //手动提交offset // //手动提交offset
// ack.acknowledge(); // ack.acknowledge();
// } // }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment