Commit a128e36c authored by 刘林's avatar 刘林

fix(equip):对接韶山换流站kafka数据

parent 5560474f
......@@ -52,7 +52,11 @@ public class RedisKey {
* 装备指标Key值
*/
public static final String EQUIP_INDEX_ADDRESS_KEY = "equip_index_address_key";
/**
* 韶山换流站指标Key
*/
public static final String EQUIP_INDEX_ADDRESS_KEY_STATION = "equip_index_address_key_station";
/** 驼峰转下划线(简单写法,效率低于 ) */
public static String humpToLine(String str) {
return str.replaceAll("[A-Z]", "_$0").toLowerCase();
......
......@@ -72,6 +72,9 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "信号的索引键key,用于唯一索引信号")
private String indexAddress;
@ApiModelProperty(value = "信号的索引键pointId,用于唯一索引信号")
private String eventAddress;
@ApiModelProperty(value = "测点类型,analog/state")
private String dataType;
......
package com.yeejoin.equipmanage.common.enums;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
public interface MqttConstant {
String TRUE = "true";
String FALSE = "false";
String STATE = "state";
String DIS_CREATE = "discreate";
String ONE_1 = "1";
String ONE_1_0 = "1.0";
}
package com.yeejoin.equipmanage.common.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class SShanMessage {
private String eventTextL1;
private String pointId;
private String time;
private String deviceId;
}
package com.yeejoin.equipmanage.common.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.List;
/**
* @author LiuLin
* @date 2023年08月02日 11:02
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class SShanStationMessage {
private String timestamp;
private List<SShanMessage> warns;
}
package com.yeejoin.equipmanage.common.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* 对接苏州,绍兴换流站Kafka数据
* @author LiuLin
* @date 2023年08月02日 11:02
*/
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class StationMessage {
private String dataType;
private String value;
private String timeStamp;
private String quality;
private String scadaId;
private String key;
private String disCreate;
private String name;
}
......@@ -33,6 +33,7 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner {
log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY_STATION);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null)
......@@ -40,8 +41,12 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner {
Map<String, Object> equipmentIndexKeyMap = equipSpecificIndexList.stream()
.filter(v -> v.getIndexAddress() != null && v.getGatewayId() == null)
.collect(Collectors.toMap(EquipmentIndexVO::getIndexAddress, Function.identity(),(v1, v2) -> v1));
Map<String, Object> equipmentIndexEventMap = equipSpecificIndexList.stream()
.filter(v -> v.getEventAddress() != null)
.collect(Collectors.toMap(EquipmentIndexVO::getEventAddress, Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY, equipmentIndexKeyMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY_STATION, equipmentIndexEventMap);
}
}
\ No newline at end of file
......@@ -126,6 +126,7 @@ public class EquipmentIotMqttReceiveConfig {
list.add("+/+/event"); // 添加iot事件监听
list.add("+/+/transmit"); // 添加交换站事件监听
list.add("+/+/perspective"); // 添加交换站事件监听
list.add("+/+/shaoshan"); // 添加换流站韶山监听事件
String[] arr = list.toArray(new String[list.size()]);
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttPahoClientFactory(), arr);
adapter.setCompletionTimeout(completionTimeout);
......@@ -151,8 +152,8 @@ public class EquipmentIotMqttReceiveConfig {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttIotMessage(topic,msg);
}else if (dataType.equals("shaoshan") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttStationMessage(topic,msg);
}
}
};
......
......@@ -114,5 +114,5 @@ public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecif
List<EquipmentSpecificIndex> getEquipIndexInIndex(@Param("list") List<String> listIndex);
EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId);
EquipmentSpecificIndex getEquipmentSpeIndexByAddress(String indexAddress, String eventAddress, String gatewayId);
}
......@@ -40,5 +40,5 @@ public interface IEquipmentSpecificIndexService extends IService<EquipmentSpecif
* @param indexAddress indexAddress
* @return EquipmentSpecificIndex
*/
EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId);
EquipmentSpecificIndex getEquipmentSpeIndexByAddress(String indexAddress, String eventAddress, String gatewayId);
}
......@@ -30,5 +30,5 @@ public interface MqttReceiveService {
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIotMessage(String topic, String message);
void handlerMqttStationMessage(String topic, String message);
}
......@@ -30,7 +30,7 @@ public class EquipmentSpecificIndexServiceImpl extends ServiceImpl<EquipmentSpec
}
@Override
public EquipmentSpecificIndex getEquipmentSpeIndexByIndexAddress(String indexAddress,String gatewayId) {
return this.baseMapper.getEquipmentSpeIndexByIndexAddress(indexAddress,gatewayId);
public EquipmentSpecificIndex getEquipmentSpeIndexByAddress(String indexAddress,String eventAddress, String gatewayId) {
return this.baseMapper.getEquipmentSpeIndexByAddress(indexAddress, eventAddress, gatewayId);
}
}
......@@ -522,7 +522,7 @@
</select>
<!-- 根据信号索引查询装备性能指标 -->
<select id="getEquipmentSpeIndexByIndexAddress"
<select id="getEquipmentSpeIndexByAddress"
resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
SELECT wesi.id AS id,
wei.name_key AS nameKey,
......@@ -561,10 +561,16 @@
LEFT JOIN wl_equipment_specific AS wes ON wes.id = wesi.equipment_specific_id
LEFT JOIN wl_equipment_detail ed ON ed.id = wes.equipment_detail_id
LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id
WHERE
wesi.index_address = #{indexAddress}
<if test="gatewayId != null">
AND wesi.gateway_id = #{gatewayId}
</if>
<where>
<if test="indexAddress != null">
AND wesi.index_address = #{indexAddress}
</if>
<if test="eventAddress != null">
AND wesi.event_address = #{eventAddress}
</if>
<if test="gatewayId != null">
AND wesi.gateway_id = #{gatewayId}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment