Commit 4388d6ea authored by wujiang's avatar wujiang

修改物联监盘同步数据

parent 23da8152
...@@ -21,18 +21,24 @@ public class AlarmKafkaConsumer { ...@@ -21,18 +21,24 @@ public class AlarmKafkaConsumer {
// 消费者来处理消息 // 消费者来处理消息
@KafkaListener(id = "alarmInfo", topics = { "${kafka.equipment.alarm}" }) @KafkaListener(id = "alarmInfo", topics = { "${kafka.equipment.alarm}" })
public void message1(String record, Acknowledgment ack) { public void message1(String record, Acknowledgment ack) throws InterruptedException {
// 处理业务 // 处理业务
String date = record; String date = record;
// 异步触发预警 // 异步触发预警
pointSystemServiceImpl.sendWarningAsync(date); pointSystemServiceImpl.sendWarningAsync(date);
// 等待1s
Thread.sleep(1000L);
// 手动提交 // 手动提交
ack.acknowledge(); ack.acknowledge();
} }
@KafkaListener(id = "user2", topics = { "${kafka.equipment.test}" }) @KafkaListener(id = "alarmInfo2", topics = { "${kafka.equipment.test}" })
public void message2(String record, Acknowledgment ack) { public void message2(String record, Acknowledgment ack) {
String date = record; String date = record;
System.out.println("收到告警信息" + date);
} }
// public void message1( ConsumerRecord<?, ?> record, Acknowledgment ack){ // public void message1( ConsumerRecord<?, ?> record, Acknowledgment ack){
......
...@@ -125,14 +125,14 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point ...@@ -125,14 +125,14 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
@Override @Override
public void sendWarning(String address, String value, String valueLabe, String gatewayId, String isAlarm) { public void sendWarning(String address, String value, String valueLabe, String gatewayId, String isAlarm) {
// 对应 equipment库的wl_equipment_specific_index_alarm_dic表 // 对应 equipment库的wl_equipment_specific_index_alarm_dic表
String[] s = { "1", "7", "9" }; String[] s = { "1", "7", "9" };
// 如果不满足择返回 // 如果不满足择返回
if (!Arrays.asList(s).contains(isAlarm)) { if (!Arrays.asList(s).contains(isAlarm)) {
//System.out.println("不满足告警类型: " + isAlarm); System.out.println("不满足告警类型: " + isAlarm);
return; return;
} }
System.out.println("满足告警消息address: " + address + ",gatewayId: " + gatewayId + " ,value:" + value System.out.println("满足告警消息address: " + address + ",gatewayId: " + gatewayId + " ,value:" + value
+ " ,valueLabe: " + valueLabe + " ,isAlarm: " + isAlarm); + " ,valueLabe: " + valueLabe + " ,isAlarm: " + isAlarm);
logger.info("满足告警消息address: " + address + ",gatewayId: " + gatewayId + " ,value:" + value + " ,valueLabe: " logger.info("满足告警消息address: " + address + ",gatewayId: " + gatewayId + " ,value:" + value + " ,valueLabe: "
...@@ -158,6 +158,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point ...@@ -158,6 +158,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
KKSDataWrapper.lambda().eq(KKSData::getKKSBM, pointSystem.getKks()); KKSDataWrapper.lambda().eq(KKSData::getKKSBM, pointSystem.getKks());
KKSData KKSData = kksDataMapper.selectOne(KKSDataWrapper); KKSData KKSData = kksDataMapper.selectOne(KKSDataWrapper);
JSONObject eqdata = new JSONObject(); JSONObject eqdata = new JSONObject();
if (KKSData == null) {
throw new RuntimeException("kks码查询热工院表不存在:" + pointSystem.getKks());
}
eqdata.put("kksms", KKSData.getKKSMS()); eqdata.put("kksms", KKSData.getKKSMS());
QueryWrapper<StationBasic> stationWrapper = new QueryWrapper<>(); QueryWrapper<StationBasic> stationWrapper = new QueryWrapper<>();
...@@ -211,6 +214,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point ...@@ -211,6 +214,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
dynamicDetailsList.add(dynamicDetails); dynamicDetailsList.add(dynamicDetails);
StringBuilder indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#") StringBuilder indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#")
.append(pointSystem.getNumber()).append("#").append(pointSystem.getFunctionNum()); .append(pointSystem.getNumber()).append("#").append(pointSystem.getFunctionNum());
if (pointSystem.getNumber() == null) {
indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#").append(pointSystem.getFunctionNum());
}
// 如果是升压站 // 如果是升压站
if ("SYZ".equals(pointSystem.getNumber())) { if ("SYZ".equals(pointSystem.getNumber())) {
indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#").append(pointSystem.getNumber()) indexKey = new StringBuilder(pointSystem.getStationAbbr()).append("#").append(pointSystem.getNumber())
...@@ -230,8 +236,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point ...@@ -230,8 +236,9 @@ public class PointSystemServiceImpl extends ServiceImpl<PointSystemMapper, Point
String json = esi.getValueEnum(); String json = esi.getValueEnum();
JSONArray arr = JSONArray.parseArray(json); JSONArray arr = JSONArray.parseArray(json);
List<JSONObject> list = arr.toJavaList(JSONObject.class); List<JSONObject> list = arr.toJavaList(JSONObject.class);
Map<String,String> map = list.stream().collect(Collectors.toMap(i->i.getString("key"), i->i.getString("label"))); Map<String, String> map = list.stream()
indexValue =map.get(valueLabe); .collect(Collectors.toMap(i -> i.getString("key"), i -> i.getString("label")));
indexValue = map.get(valueLabe);
} else if ("TH".equals(pointSystem.getStationAbbr())) { } else if ("TH".equals(pointSystem.getStationAbbr())) {
} }
......
...@@ -15,10 +15,9 @@ redis.cache.failure.time=10800 ...@@ -15,10 +15,9 @@ redis.cache.failure.time=10800
# mybatis-plus # mybatis-plus
mybatis-plus.mapper-locations=classpath:mapper/*Mapper.xml mybatis-plus.mapper-locations=classpath:mapper/*Mapper.xml
#消费者所在组的名称 #消费者所在组的名称
#消费者 的broker地址 #消费者 的broker地址
spring.kafka.consumer.bootstrap-servers=10.20.0.223:9092 spring.kafka.consumer.bootstrap-servers=10.20.0.223:9092,10.20.0.133:9092
# 是否自动提交 # 是否自动提交
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.enable-auto-commit=false
#offset的消费位置 #offset的消费位置
...@@ -30,26 +29,10 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S ...@@ -30,26 +29,10 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.ack-mode=manual_immediate
#监听类型 #监听类型
spring.kafka.listener.type=single spring.kafka.listener.type=single
# 并发
#spring.kafka.listener.concurrency=5
# 发生错误后,消息重发的次数。
spring.kafka.producer.retries=1
#配置kafak produce的broker地址
spring.kafka.producer.bootstrap-servers=10.20.0.223:9092
#默认批处理大小(以字节为单位)
spring.kafka.producer.batch-size=16384
#生产者可以用来缓冲等待发送到服务器的记录的内存总字节数
spring.kafka.producer.buffer-memory=33554432
# producer配置序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# kafka默认的String序列化器
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.equipment.alarm=EQUIPMENT_ALARM kafka.equipment.alarm=EQUIPMENT_ALARM
kafka.equipment.test=test88888 kafka.equipment.test=test88888
#电站对接第三方查询设备kks码 #电站对接第三方查询设备kks码
power.station.url=http://10.20.1.151:80/prod-api/fdgl/process/DataInterface power.station.url=http://172.16.4.29:80/prod-api/fdgl/process/DataInterface
#电站104采集预警 #电站104采集预警
power.station.warning=104/data/analysis power.station.warning=104/data/analysis
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment