Commit 56bdc722 authored by KeYong's avatar KeYong

Merge remote-tracking branch 'origin/develop_dl' into develop_dl

parents 84efc3c6 ca374051
...@@ -89,6 +89,12 @@ ...@@ -89,6 +89,12 @@
<version>2.4</version> <version>2.4</version>
<classifier>jdk15</classifier> <classifier>jdk15</classifier>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.yeejoin.equip.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Configuration
@EnableAsync
public class EquipExecutorConfig {
@Bean(name = "equipAsyncExecutor")
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(500);
//配置队列大小
executor.setQueueCapacity(2000);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("namePrefix");
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(30);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行--拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
...@@ -5,9 +5,9 @@ import com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper; ...@@ -5,9 +5,9 @@ import com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper;
import com.yeejoin.equip.utils.RedisKey; import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils; import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
...@@ -22,20 +22,25 @@ import java.util.stream.Collectors; ...@@ -22,20 +22,25 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
public class EquipmentIndexCacheRunner implements CommandLineRunner { public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Autowired @Resource
private EquipmentSpecificIndexMapper equipmentSpecificIndexMapper; private EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Autowired @Resource
private RedisUtils redisUtils; private RedisUtils redisUtils;
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
log.info(">>服务启动执行,执行预加载数据等操作"); log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS); redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null); List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream() Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null && v.getIndexAddress() !=null) .filter(v -> v.getGatewayId() != null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1)); .collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1));
Map<String, Object> equipmentIndexKeyMap = equipSpecificIndexList.stream()
.filter(v -> v.getIndexAddress() != null && v.getGatewayId() == null)
.collect(Collectors.toMap(EquipmentIndexVO::getIndexAddress, Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap); redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY, equipmentIndexKeyMap);
} }
} }
\ No newline at end of file
...@@ -3,16 +3,10 @@ package com.yeejoin.equip.config; ...@@ -3,16 +3,10 @@ package com.yeejoin.equip.config;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import java.util.Properties;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/** /**
* @author LiuLin * @author LiuLin
...@@ -32,27 +26,13 @@ public class KafkaConsumerConfig { ...@@ -32,27 +26,13 @@ public class KafkaConsumerConfig {
@Value("${spring.kafka.consumer.enable-auto-commit}") @Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit; private boolean enableAutoCommit;
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { @Bean("consumerConfig")
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); public Properties consumerConfigs() {
factory.setConsumerFactory(consumerFactory()); Properties props = new Properties();
// 设置并发量,小于或者等于 Topic 的分区数
factory.setConcurrency(5);
// 设置为批量监听
factory.setBatchListener(Boolean.TRUE);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
// 自动提交 // 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//两次Poll之间的最大允许间隔。 //两次Poll之间的最大允许间隔。
//消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
...@@ -70,5 +50,4 @@ public class KafkaConsumerConfig { ...@@ -70,5 +50,4 @@ public class KafkaConsumerConfig {
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
return props; return props;
} }
} }
...@@ -3,12 +3,12 @@ package com.yeejoin.equip.entity; ...@@ -3,12 +3,12 @@ package com.yeejoin.equip.entity;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
import java.util.Date; import java.util.Date;
@Data @Data
@ApiModel(value = "性能指标详情返回vo实体", description = "性能指标详情返回vo实体") @ApiModel(value = "性能指标详情返回vo实体", description = "性能指标详情返回vo实体")
public class EquipmentIndexVO { public class EquipmentIndexVO implements Serializable {
@ApiModelProperty(value = "id") @ApiModelProperty(value = "id")
private Long id; private Long id;
...@@ -84,6 +84,7 @@ public class EquipmentIndexVO { ...@@ -84,6 +84,7 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "装备名称") @ApiModelProperty(value = "装备名称")
private String equipmentSpecificName; private String equipmentSpecificName;
@ApiModelProperty(value = "指标名称") @ApiModelProperty(value = "装备指标名称")
private String equipmentIndexName; private String equipmentIndexName;
} }
package com.yeejoin.equip.kafka; //package com.yeejoin.equip.kafka;
//
import com.yeejoin.equip.service.KafkaMessageService; //import com.alibaba.fastjson.JSON;
import com.yeejoin.equip.utils.RedisKey; //import com.alibaba.fastjson.JSONArray;
import com.yeejoin.equip.utils.RedisUtils; //import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j; //import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import org.apache.kafka.clients.consumer.ConsumerRecord; //import com.yeejoin.equip.entity.EquipmentIndexVO;
import org.springframework.beans.factory.annotation.Autowired; //import com.yeejoin.equip.utils.ElasticSearchUtil;
import org.springframework.kafka.annotation.KafkaListener; //import com.yeejoin.equip.utils.RedisKey;
import org.springframework.kafka.support.Acknowledgment; //import com.yeejoin.equip.utils.RedisUtils;
import org.springframework.stereotype.Service; //import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; //import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*; //import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
/** //import org.springframework.kafka.annotation.KafkaListener;
* @author LiuLin //import org.springframework.kafka.support.Acknowledgment;
* @date 2023/6/25 //import org.springframework.stereotype.Service;
* @apiNote kafka 消费服务类 //import org.springframework.util.CollectionUtils;
*/ //import org.springframework.util.ObjectUtils;
@Slf4j //
@Service //import java.text.SimpleDateFormat;
public class KafkaConsumerService { //import java.util.*;
//
@Autowired ///**
protected KafkaProducerService kafkaProducerService; // * @author LiuLin
// * @date 2023/6/25
@Autowired // * @apiNote kafka 消费服务类
private KafkaMessageService kafkaMessageService; // */
//@Slf4j
@Autowired //@Service
private RedisUtils redisUtils; //public class KafkaConsumerService {
//
// //iot转发实时消息存入influxdb前缀
@KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup") // private static final String MEASUREMENT = "iot_data_";
public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) { // //装备更新最新消息存入influxdb前缀
try { // private static final String TRUE = "true";
if (CollectionUtils.isEmpty(consumerRecords)) { // private static final String FALSE = "false";
return; // //装备更新最新消息存入influxdb固定时间
} // private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS); // @Autowired
// protected KafkaProducerService kafkaProducerService;
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { // @Autowired
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value()); // private InfluxDbConnection influxDbConnection;
if (kafkaMessage.isPresent()) { // @Autowired
String message = (String) kafkaMessage.get(); // private RedisUtils redisUtils;
kafkaMessageService.handlerMessage(message,equipmentIndexVOMap); // @Value("${kafka.alarm.topic}")
} // private String alarmTopic;
} // @Autowired
} catch (Exception e) { // private ElasticSearchUtil elasticSearchUtil;
log.error("kafka失败,当前失败的批次。data:{}", consumerRecords); //
e.printStackTrace(); // @KafkaListener(topics = "#{'${kafka.topic}'.split(',')}", groupId = "messageConsumerGroup")
} finally { // public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
ack.acknowledge(); // try {
} // if (CollectionUtils.isEmpty(consumerRecords)) {
} // return;
} // }
\ No newline at end of file // Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
//
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// String message = (String) kafkaMessage.get();
// this.handleMessage(message, equipmentIndexVOMap);
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次。data:{}", consumerRecords);
// e.printStackTrace();
// } finally {
// ack.acknowledge();
// }
// }
//
// private void handleMessage(String message, Map<Object, Object> equipmentIndexVOMap) {
//
// JSONObject jsonObject = JSONObject.parseObject(message);
// String dataType = jsonObject.getString("dataType");
// String indexAddress = jsonObject.getString("address");
// String traceId = jsonObject.getString("traceId");
// String gatewayId = jsonObject.getString("gatewayId");
// String value = jsonObject.getString("value");
// String key = indexAddress + "_" + gatewayId;
//
// try {
// if (equipmentIndexVOMap.get(key) != null) {
// SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
// log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
// equipmentSpeIndex.getEquipmentIndexName(), indexAddress, value, gatewayId);
//
// Map<String, String> tagsMap = new HashMap<>();
// Map<String, Object> fieldsMap = new HashMap<>();
// tagsMap.put("equipmentsIdx", key);
// tagsMap.put("address", indexAddress);
// tagsMap.put("gatewayId", gatewayId);
// tagsMap.put("dataType", dataType);
// tagsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// tagsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
// tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
// tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
//
// String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
// fieldsMap.put("traceId", traceId);
// fieldsMap.put("value", value);
// fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
// fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
// influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
//
// //更新数据入ES库
// Map<String, Object> paramJson = new HashMap<>();
// if (Arrays.asList(TRUE, FALSE).contains(value)) {
// paramJson.put("value", value);
// } else {
// paramJson.put("value", Float.parseFloat(value));
// }
// paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
// paramJson.put("createdTime", simpleDateFormat.format(new Date()));
// paramJson.put("unit", equipmentSpeIndex.getUnitName());
// elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
//
// if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
// fieldsMap.putAll(tagsMap);
// //kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
// }
// }
// } catch (Exception e) {
// log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
// }
//
// }
//
// private String valueTranslate(String value, String enumStr) {
// if (ObjectUtils.isEmpty(enumStr)) {
// return "";
// }
// try {
// JSONArray jsonArray = JSONArray.parseArray(enumStr);
// for (int i = 0; i < jsonArray.size(); i++) {
// JSONObject jsonObject = jsonArray.getJSONObject(i);
// if (jsonObject.get("key").equals(value)) {
// return jsonObject.getString("label");
// }
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// return "";
// }
//}
//
//
package com.yeejoin.equip.kafka;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.config.KafkaConsumerConfig;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.utils.ElasticSearchUtil;
import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author LiuLin
* @date 2023年08月01日 17:27
*/
@Slf4j
@Component
public class KafkaConsumerWithThread implements CommandLineRunner {
final private static AtomicLong sendThreadPoolCounter = new AtomicLong(0);
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors(),
createThreadFactory());
//iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
//装备更新最新消息存入influxdb前缀
private static final String TRUE = "true";
private static final String FALSE = "false";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@Autowired
protected KafkaProducerService kafkaProducerService;
@Autowired
private KafkaConsumerConfig consumerConfig;
@Autowired
private InfluxDbConnection influxDbConnection;
@Autowired
private RedisUtils redisUtils;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
@Value("${kafka.topic}")
private String topic;
@Autowired
private ElasticSearchUtil elasticSearchUtil;
private static ThreadFactory createThreadFactory() {
return runnable -> {
Thread thread = new Thread(runnable);
thread.setName(String.format("clojure-agent-send-pool-%d", KafkaConsumerWithThread.sendThreadPoolCounter.getAndIncrement()));
return thread;
};
}
@Override
public void run(String... args) {
Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic));
thread.start();
}
private void processRecord(ConsumerRecord<String, String> record, Map<Object, Object> equipmentIndexVOMap) {
// 处理消息记录
//log.info("监听Kafka集群message:{}",record.value());
JSONObject jsonObject = JSONObject.parseObject(record.value());
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
tagsMap.put("address", indexAddress);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
//更新数据入ES库
Map<String, Object> paramJson = new HashMap<>();
if (Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("value", value);
}else{
paramJson.put("value", Float.parseFloat(value));
}
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", simpleDateFormat.format(new Date()));
elasticSearchUtil.updateData(ES_INDEX_NAME_JX,key,JSON.toJSONString(paramJson));
//influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
}
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
log.error("告警枚举转换异常" + e.getMessage(), e);
}
return "";
}
public class KafkaConsumerThread implements Runnable {
private final KafkaConsumer<String, String> kafkaConsumer;
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
pooledExecutor.submit(() -> {
processRecord(record, equipmentIndexVOMap);
});
}
}
}
//@Override
//public void run() {
// while (true) {
// ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// for (TopicPartition topicPartition : records.partitions()) {
// List<ConsumerRecord<String, String>> recordList = new ArrayList<>(records.records(topicPartition));
// Iterator<ConsumerRecord<String, String>> it = recordList.iterator();
// while (it.hasNext()) {
// ConsumerRecord<String, String> record = it.next();
// long startTime = System.currentTimeMillis();
// long lastOffset = recordList.get(recordList.size() - 1).offset();
// try {
// kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
// } catch (Exception e) {
// log.error("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},value{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - startTime), record.value(), e);
// break;
// }
// pooledExecutor.submit(() -> {
// processRecord(record, equipmentIndexVOMap);
// });
// it.remove();
// }
// }
// }
//}
}
}
...@@ -6,7 +6,6 @@ import org.springframework.kafka.support.SendResult; ...@@ -6,7 +6,6 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
......
package com.yeejoin.equip.utils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author LiuLin
* @date 2023年08月08日 16:30
*/
@Slf4j
@Component
public class ElasticSearchUtil {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* ES修改数据
*
* @param indexName 索引名称
* @param id 主键
* @param paramJson 参数JSON
* @return
*/
public boolean updateData(String indexName, String id, String paramJson) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(true);
//立即刷新数据
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON);
try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
//新增
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
//修改
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
//无变化
log.info("索引:[{}],主键:[{}]无变化", indexName, id);
return true;
}
} catch (IOException e) {
log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e);
return false;
}
return false;
}
}
...@@ -12,4 +12,6 @@ public class RedisKey { ...@@ -12,4 +12,6 @@ public class RedisKey {
* 装备指标Key值 * 装备指标Key值
*/ */
public static final String EQUIP_INDEX_ADDRESS = "equip_index_address"; public static final String EQUIP_INDEX_ADDRESS = "equip_index_address";
public static final String EQUIP_INDEX_ADDRESS_KEY = "equip_index_address_key";
} }
...@@ -79,3 +79,10 @@ spring.kafka.listener.type=batch ...@@ -79,3 +79,10 @@ spring.kafka.listener.type=batch
kafka.topic=PERSPECTIVE kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective emq.topic=iot/data/perspective
kafka.alarm.topic=EQUIPMENT_ALARM kafka.alarm.topic=EQUIPMENT_ALARM
spring.elasticsearch.rest.uris=http://39.98.224.23:9200
spring.elasticsearch.rest.connection-timeout=30000
spring.elasticsearch.rest.username=elastic
spring.elasticsearch.rest.password=123456
spring.elasticsearch.rest.read-timeout=30000
management.health.elasticsearch.enabled=false
\ No newline at end of file
...@@ -36,6 +36,10 @@ public interface OrgUsrMapper extends BaseMapper<OrgUsr> { ...@@ -36,6 +36,10 @@ public interface OrgUsrMapper extends BaseMapper<OrgUsr> {
List<OrgUsr> selectCompanyDepartmentMsg(); List<OrgUsr> selectCompanyDepartmentMsg();
List<Map<String, Object>> selectStaticFire(String bizOrgCode);
List<Map<String, Object>> selectStaticYw(String bizOrgCode);
List<Map<String, Object>> selectPersonAllList(Map<String, Object> map); List<Map<String, Object>> selectPersonAllList(Map<String, Object> map);
List<OrgUsr> queryOrgUsrListByBizOrgCode(String bizOrgCode); List<OrgUsr> queryOrgUsrListByBizOrgCode(String bizOrgCode);
......
...@@ -115,6 +115,10 @@ public interface IOrgUsrService { ...@@ -115,6 +115,10 @@ public interface IOrgUsrService {
void updateDynamicFormInstance(Long instanceId, List<DynamicFormInstance> fromValueList) throws Exception; void updateDynamicFormInstance(Long instanceId, List<DynamicFormInstance> fromValueList) throws Exception;
/** /**
* @throws Exception
*/
List<Map<String, Object>> selectStatic(String bizOrgCode, String type);
/**
* @param id * @param id
* @throws Exception * @throws Exception
*/ */
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
max(case v.field_code when 'administrativePositionCode' then IFNULL(v.field_value_label,v.field_value) end) administrativePositionCode, max(case v.field_code when 'administrativePositionCode' then IFNULL(v.field_value_label,v.field_value) end) administrativePositionCode,
max(case v.field_code when 'internalPositionCode' then IFNULL(v.field_value_label,v.field_value) end) internalPositionCode, max(case v.field_code when 'internalPositionCode' then IFNULL(v.field_value_label,v.field_value) end) internalPositionCode,
max(case v.field_code when 'fireManagementPostCode' then IFNULL(v.field_value_label,v.field_value) end) fireManagementPostCode, max(case v.field_code when 'fireManagementPostCode' then IFNULL(v.field_value_label,v.field_value) end) fireManagementPostCode,
max(case v.field_code when 'fireManagementPostCode' then v.field_value end) fireManagementPost,
max(case v.field_code when 'positionType' then IFNULL(v.field_value,v.field_value_label) end) positionType, max(case v.field_code when 'positionType' then IFNULL(v.field_value,v.field_value_label) end) positionType,
max(case v.field_code when 'certificateType' then IFNULL(v.field_value_label,v.field_value) end) certificateType, max(case v.field_code when 'certificateType' then IFNULL(v.field_value_label,v.field_value) end) certificateType,
max(case v.field_code when 'holdingTime' then IFNULL(v.field_value_label,v.field_value) end) holdingTime, max(case v.field_code when 'holdingTime' then IFNULL(v.field_value_label,v.field_value) end) holdingTime,
...@@ -84,6 +85,12 @@ ...@@ -84,6 +85,12 @@
<if test="map.peopleType != null and map.peopleType != ''"> <if test="map.peopleType != null and map.peopleType != ''">
AND g.peopleType = #{map.peopleType} AND g.peopleType = #{map.peopleType}
</if> </if>
<if test="map.fireManagementPost != null">
AND g.fireManagementPost IN
<foreach item="item" index="index" collection="map.fireManagementPost" open="(" separator="," close=")">
#{item}
</foreach>
</if>
GROUP BY GROUP BY
u.sequence_nbr , u.sequence_nbr ,
u.biz_org_name , u.biz_org_name ,
...@@ -112,6 +119,7 @@ ...@@ -112,6 +119,7 @@
u.amos_org_id amosOrgId, u.amos_org_id amosOrgId,
u.biz_org_code bizOrgCode, u.biz_org_code bizOrgCode,
u.parent_id parentId, u.parent_id parentId,
(select bizOrgName from cb_org_usr where sequence_nbr = u.parent_id) as companyName,
g.* g.*
FROM FROM
cb_org_usr u cb_org_usr u
...@@ -130,7 +138,9 @@ ...@@ -130,7 +138,9 @@
max(case v.field_code when 'internalPositionCode' then IFNULL(v.field_value_label,v.field_value) end) internalPositionCode, max(case v.field_code when 'internalPositionCode' then IFNULL(v.field_value_label,v.field_value) end) internalPositionCode,
max(case v.field_code when 'fireManagementPostCode' then IFNULL(v.field_value_label,v.field_value) end) fireManagementPostCode, max(case v.field_code when 'fireManagementPostCode' then IFNULL(v.field_value_label,v.field_value) end) fireManagementPostCode,
max(case v.field_code when 'fireManagementPostCode' then v.field_value end) fireManagementPost, max(case v.field_code when 'fireManagementPostCode' then v.field_value end) fireManagementPost,
max(case v.field_code when 'fireManagementPostCode' then IFNULL(v.field_value_label,'其他') end) fireManagementPostName,
max(case v.field_code when 'positionType' then IFNULL(v.field_value,v.field_value_label) end) positionType, max(case v.field_code when 'positionType' then IFNULL(v.field_value,v.field_value_label) end) positionType,
max(case v.field_code when 'positionType' then IFNULL(v.field_value_label,'其他') end) positionTypeName,
max(case v.field_code when 'certificateType' then IFNULL(v.field_value_label,v.field_value) end) certificateType, max(case v.field_code when 'certificateType' then IFNULL(v.field_value_label,v.field_value) end) certificateType,
max(case v.field_code when 'holdingTime' then IFNULL(v.field_value_label,v.field_value) end) holdingTime, max(case v.field_code when 'holdingTime' then IFNULL(v.field_value_label,v.field_value) end) holdingTime,
max(case v.field_code when 'auditCycle' then IFNULL(v.field_value_label,v.field_value) end) auditCycle, max(case v.field_code when 'auditCycle' then IFNULL(v.field_value_label,v.field_value) end) auditCycle,
...@@ -170,12 +180,9 @@ ...@@ -170,12 +180,9 @@
AND FIND_IN_SET(#{map.positionType},g.positionType) AND FIND_IN_SET(#{map.positionType},g.positionType)
</if> </if>
<if test="map.peopleType != null and map.peopleType != ''"> <if test="map.peopleType != null and map.peopleType != ''">
AND g.peopleType IN AND g.peopleType = #{map.peopleType}
<foreach item="item" index="index" collection="map.peopleType" open="(" separator="," close=")">
#{item}
</foreach>
</if> </if>
<if test="map.fireManagementPost != null and map.fireManagementPost != ''"> <if test="map.fireManagementPost != null">
AND g.fireManagementPost IN AND g.fireManagementPost IN
<foreach item="item" index="index" collection="map.fireManagementPost" open="(" separator="," close=")"> <foreach item="item" index="index" collection="map.fireManagementPost" open="(" separator="," close=")">
#{item} #{item}
...@@ -1307,5 +1314,104 @@ LEFT JOIN ( ...@@ -1307,5 +1314,104 @@ LEFT JOIN (
)a where a.sequenceNbr is not null )a where a.sequenceNbr is not null
LIMIT #{map.pageNum}, #{map.pageSize} LIMIT #{map.pageNum}, #{map.pageSize}
</select> </select>
<select id="selectStaticFire" resultType="java.util.Map">
SELECT count( postName ) as num,( count( postName )/( SELECT count(*) AS count FROM cb_org_usr a WHERE is_delete = 0 AND biz_org_type = 'PERSON'
AND biz_org_code like concat(#{bizOrgCode}, '%')
))* 100 AS percent, postName
FROM
(
SELECT
a.*,
IFNULL( b.NAME, '其他' ) AS postName
FROM
(
SELECT DISTINCT
u.sequence_nbr sequenceNbr,
g.*
FROM
cb_org_usr u
LEFT JOIN (
SELECT
v.`instance_id`,
max( CASE v.field_code WHEN 'fireManagementPostCode' THEN IFNULL( v.field_value_label, v.field_value ) END ) fireManagementPostCode,
max( CASE v.field_code WHEN 'fireManagementPostCode' THEN v.field_value END ) fireManagementPost,
max( CASE v.field_code WHEN 'fireManagementPostCode' THEN v.field_value_label END ) fireManagementPostName,
max( CASE v.field_code WHEN 'positionType' THEN IFNULL( v.field_value, v.field_value_label ) END ) positionType,
max( CASE v.field_code WHEN 'positionType' THEN v.field_value_label END ) positionTypeName,
max( CASE v.field_code WHEN 'peopleType' THEN v.field_value END ) peopleType
FROM
`cb_dynamic_form_instance` v
WHERE
v.group_code = 246
GROUP BY
v.`instance_id`
) g ON u.sequence_nbr = g.instance_id
WHERE
u.biz_org_type = 'person'
AND g.peopleType = '1601'
AND u.is_delete = 0
AND u.biz_org_code like concat(#{bizOrgCode}, '%')
GROUP BY
u.sequence_nbr,
u.biz_org_name,
u.biz_org_code
ORDER BY
u.rec_date DESC
) a
LEFT JOIN ( SELECT * FROM cb_data_dictionary WHERE type = 'XFGLGW' ) b ON LOCATE( b.CODE, a.fireManagementPost ) != 0
WHERE
a.sequenceNbr IS NOT NULL
) a
GROUP BY
postName
</select>
<select id="selectStaticYw" resultType="java.util.Map">
SELECT count( postName ) as num, (count( postName )/( SELECT count(*) AS count FROM cb_org_usr a WHERE is_delete = 0 AND biz_org_type = 'PERSON'
AND biz_org_code like concat(#{bizOrgCode}, '%')
))* 100 AS percent, postName
FROM
(
SELECT
a.*,
IFNULL( b.NAME, '其他' ) AS postName
FROM
(
SELECT DISTINCT
u.sequence_nbr sequenceNbr,
g.*
FROM
cb_org_usr u
LEFT JOIN (
SELECT
v.`instance_id`,
max( CASE v.field_code WHEN 'positionType' THEN v.field_value END ) positionType,
max( CASE v.field_code WHEN 'positionType' THEN v.field_value_label END ) positionTypeName,
max( CASE v.field_code WHEN 'peopleType' THEN v.field_value END ) peopleType
FROM
`cb_dynamic_form_instance` v
WHERE
v.group_code = 246
GROUP BY
v.`instance_id`
) g ON u.sequence_nbr = g.instance_id
WHERE
u.biz_org_type = 'PERSON'
AND g.peopleType = '1602'
AND u.biz_org_code like concat(#{bizOrgCode}, '%')
AND u.is_delete = 0
GROUP BY
u.sequence_nbr,
u.biz_org_name,
u.biz_org_code
ORDER BY
u.rec_date DESC
) a
LEFT JOIN ( SELECT * FROM cb_data_dictionary WHERE type = 'GWMC' ) b ON LOCATE( b.CODE, a.positionType ) != 0
WHERE
a.sequenceNbr IS NOT NULL
) a
GROUP BY
postName
</select>
</mapper> </mapper>
...@@ -179,6 +179,20 @@ public class OrgPersonController extends BaseController { ...@@ -179,6 +179,20 @@ public class OrgPersonController extends BaseController {
return ResponseHelper.buildResponse(result); return ResponseHelper.buildResponse(result);
} }
/**
* 消防资源监管,驻站消防员,运维人员环状态图统计接口
*
* @param bizOrgCode type
* @return
*/
@TycloudOperation(needAuth = false, ApiLevel = UserType.AGENCY)
@RequestMapping(value = "/getStaticByFire", method = RequestMethod.GET)
@ApiOperation(httpMethod = "GET", value = "消防资源监管,驻站消防员,运维人员环状态图统计接口", notes = "消防资源监管,驻站消防员,运维人员环状态图统计接口")
public ResponseModel<List<Map<String, Object>>> selectStatic(String bizOrgCode, String type) throws Exception {
List<Map<String, Object>> result = iOrgUsrService.selectStatic(bizOrgCode, type);
return ResponseHelper.buildResponse(result);
}
/** /**
* 获取人员树 * 获取人员树
......
...@@ -600,11 +600,13 @@ public class OrgUsrServiceImpl extends BaseService<OrgUsrDto, OrgUsr, OrgUsrMapp ...@@ -600,11 +600,13 @@ public class OrgUsrServiceImpl extends BaseService<OrgUsrDto, OrgUsr, OrgUsrMapp
map.put("positionType", positionType); map.put("positionType", positionType);
} }
String fireManagementPost = ""; List<String> fireManagementPost;
if(req.containsKey("fireManagementPost")) { if(req.containsKey("fireManagementPost")) {
fireManagementPost = req.get("fireManagementPost").toString(); if(StringUtils.isNotEmpty(req.get("fireManagementPost").toString())) {
fireManagementPost = Arrays.asList(req.get("fireManagementPost").toString().split(","));
map.put("fireManagementPost", fireManagementPost); map.put("fireManagementPost", fireManagementPost);
} }
}
String peopleType = ""; String peopleType = "";
if(req.containsKey("peopleType")) { if(req.containsKey("peopleType")) {
...@@ -915,6 +917,17 @@ public class OrgUsrServiceImpl extends BaseService<OrgUsrDto, OrgUsr, OrgUsrMapp ...@@ -915,6 +917,17 @@ public class OrgUsrServiceImpl extends BaseService<OrgUsrDto, OrgUsr, OrgUsrMapp
} }
@Override @Override
public List<Map<String, Object>> selectStatic(String bizOrgCode, String type) {
List<Map<String,Object>> listMap;
if("1601".equals(type)) {
listMap = this.baseMapper.selectStaticFire(bizOrgCode);
} else {
listMap = this.baseMapper.selectStaticYw(bizOrgCode);
}
return listMap;
}
@Override
public Map<String, Object> selectForShowById(OrgUsr orgUsr, Long id) throws Exception { public Map<String, Object> selectForShowById(OrgUsr orgUsr, Long id) throws Exception {
QueryWrapper<DynamicFormColumn> queryWrapper = new QueryWrapper<DynamicFormColumn>(); QueryWrapper<DynamicFormColumn> queryWrapper = new QueryWrapper<DynamicFormColumn>();
queryWrapper.eq("group_code", OrgPersonEnum.人员.getCode()); queryWrapper.eq("group_code", OrgPersonEnum.人员.getCode());
......
...@@ -67,7 +67,7 @@ public class EquipmentManageController extends AbstractBaseController{ ...@@ -67,7 +67,7 @@ public class EquipmentManageController extends AbstractBaseController{
companyId = result.get("sequenceNbr").toString(); companyId = result.get("sequenceNbr").toString();
} }
} }
return equipmentManageService.queryEquipmenInfoAndCount(equipmentName,equipmentCode,construction,maintenance,bizOrgCode,formGroupId,current,pageSize,controBoxBuildId,companyId, nameOrCode); return equipmentManageService.queryEquipmenInfoAndCount(equipmentName,equipmentCode,construction,maintenance,bizOrgCode,formGroupId,current,pageSize,controBoxBuildId,companyId, nameOrCode, null);
} }
@GetMapping(value = "/getUtils") @GetMapping(value = "/getUtils")
......
...@@ -596,6 +596,7 @@ public class FireFightingSystemController extends AbstractBaseController { ...@@ -596,6 +596,7 @@ public class FireFightingSystemController extends AbstractBaseController {
@RequestParam(value = "construction", required = false) String construction, @RequestParam(value = "construction", required = false) String construction,
@RequestParam(value = "maintenance", required = false) String maintenance, @RequestParam(value = "maintenance", required = false) String maintenance,
@RequestParam(value = "bizOrgCode", required = false) String bizOrgCode, @RequestParam(value = "bizOrgCode", required = false) String bizOrgCode,
@RequestParam(value = "systemStatus", required = false) String systemStatus,
@RequestParam(value = "formGroupId", required = false) String formGroupId, @RequestParam(value = "formGroupId", required = false) String formGroupId,
@RequestParam(value = "controBoxBuildId", required = false) String controBoxBuildId, @RequestParam(value = "controBoxBuildId", required = false) String controBoxBuildId,
@RequestParam(value = "current") int current, @RequestParam(value = "current") int current,
...@@ -626,7 +627,7 @@ public class FireFightingSystemController extends AbstractBaseController { ...@@ -626,7 +627,7 @@ public class FireFightingSystemController extends AbstractBaseController {
} }
} }
return fireFightingSystemService.queryEquipmenInfoAndCount(equipmentName, equipmentCode, construction, maintenance, bizOrgCode, formGroupId, current, pageSize,controBoxBuildId, companyId, nameOrCode); return fireFightingSystemService.queryEquipmenInfoAndCount(equipmentName, equipmentCode, construction, maintenance, bizOrgCode, formGroupId, current, pageSize,controBoxBuildId, companyId, nameOrCode, systemStatus);
} }
/** /**
......
...@@ -26,7 +26,7 @@ public interface EquipmentManageService extends IService<EquipmentManageEntity> ...@@ -26,7 +26,7 @@ public interface EquipmentManageService extends IService<EquipmentManageEntity>
* @param pageSize * @param pageSize
* @return * @return
*/ */
Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, String maintenance, String bizOrgCode, String formGroupId , int spage, int pageSize, String controBoxBuildId,String companyId, String nameOrCode); Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, String maintenance, String bizOrgCode, String formGroupId , int spage, int pageSize, String controBoxBuildId,String companyId, String nameOrCode,String systemStatus);
/** /**
* 获取下拉菜单数据 * 获取下拉菜单数据
......
...@@ -45,7 +45,7 @@ public interface IFireFightingSystemService extends IService<FireFightingSystemE ...@@ -45,7 +45,7 @@ public interface IFireFightingSystemService extends IService<FireFightingSystemE
* @param pageSize * @param pageSize
* @return * @return
*/ */
Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, String maintenance, String bizOrgCode, String formGroupId, int current, int pageSize,String controBoxBuildId,String companyId, String nameOrCode); Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, String maintenance, String bizOrgCode, String formGroupId, int current, int pageSize,String controBoxBuildId,String companyId, String nameOrCode, String systemStatus);
FireFightingSystemEntity getOneById(Long id); FireFightingSystemEntity getOneById(Long id);
......
...@@ -53,7 +53,7 @@ public class EquipmentManageServiceImpl extends ServiceImpl<EquipmentManageMappe ...@@ -53,7 +53,7 @@ public class EquipmentManageServiceImpl extends ServiceImpl<EquipmentManageMappe
@Override @Override
public Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, String maintenance, public Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, String maintenance,
String bizOrgCode, String formGroupId, int current, int pageSize,String controBoxBuildId,String companyCode, String nameOrCode) { String bizOrgCode, String formGroupId, int current, int pageSize,String controBoxBuildId,String companyCode, String nameOrCode, String systemStatus) {
HttpServletRequest request = null; HttpServletRequest request = null;
Map map = new HashMap<String, Object>(); Map map = new HashMap<String, Object>();
map.put("equimentName", equimentName); map.put("equimentName", equimentName);
...@@ -67,6 +67,7 @@ public class EquipmentManageServiceImpl extends ServiceImpl<EquipmentManageMappe ...@@ -67,6 +67,7 @@ public class EquipmentManageServiceImpl extends ServiceImpl<EquipmentManageMappe
map.put("formGroupId", formGroupId); map.put("formGroupId", formGroupId);
map.put("controBoxBuildId",controBoxBuildId); map.put("controBoxBuildId",controBoxBuildId);
map.put("nameOrCode", nameOrCode); map.put("nameOrCode", nameOrCode);
map.put("systemStatus", systemStatus);
List<EquipmentManageVo> dataList = equipmentManageMapper.queryEquipmenInfo(map); List<EquipmentManageVo> dataList = equipmentManageMapper.queryEquipmenInfo(map);
Long count = equipmentManageMapper.queryEquipmenCount(map); Long count = equipmentManageMapper.queryEquipmenCount(map);
map.clear(); map.clear();
......
...@@ -191,9 +191,9 @@ public class FireFightingSystemServiceImpl extends ServiceImpl<FireFightingSyste ...@@ -191,9 +191,9 @@ public class FireFightingSystemServiceImpl extends ServiceImpl<FireFightingSyste
@Override @Override
public Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction, public Map<String, Object> queryEquipmenInfoAndCount(String equimentName, String equimentCode, String construction,
String maintenance, String bizOrgCode, String formGroupId, int current, int pageSize, String controBoxBuildId, String companyId, String nameOrCode) { String maintenance, String bizOrgCode, String formGroupId, int current, int pageSize, String controBoxBuildId, String companyId, String nameOrCode,String systemStatus) {
Map<String, Object> map = equipmentManageService.queryEquipmenInfoAndCount(equimentName, equimentCode, Map<String, Object> map = equipmentManageService.queryEquipmenInfoAndCount(equimentName, equimentCode,
construction, maintenance, bizOrgCode, formGroupId, current, pageSize, controBoxBuildId, companyId, nameOrCode); construction, maintenance, bizOrgCode, formGroupId, current, pageSize, controBoxBuildId, companyId, nameOrCode, systemStatus);
List<EquipmentManageVo> dataList = (List<EquipmentManageVo>) map.get("dataList"); List<EquipmentManageVo> dataList = (List<EquipmentManageVo>) map.get("dataList");
StringBuilder stb = new StringBuilder(); StringBuilder stb = new StringBuilder();
dataList.forEach(y -> { dataList.forEach(y -> {
......
package com.yeejoin.amos.boot.module.jcs.biz.controller; package com.yeejoin.amos.boot.module.jcs.biz.controller;
import com.yeejoin.amos.boot.biz.common.bo.ReginParams;
import com.yeejoin.amos.boot.biz.common.controller.BaseController; import com.yeejoin.amos.boot.biz.common.controller.BaseController;
import com.yeejoin.amos.boot.module.jcs.api.service.IFireResourceSupervisionService; import com.yeejoin.amos.boot.module.jcs.api.service.IFireResourceSupervisionService;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
...@@ -26,8 +27,9 @@ public class FireResourceSupervisionController extends BaseController { ...@@ -26,8 +27,9 @@ public class FireResourceSupervisionController extends BaseController {
@ApiOperation(httpMethod = "GET", value = "驻站消防员、运维人员统计信息查询", notes = "驻站消防员、运维人员统计信息查询") @ApiOperation(httpMethod = "GET", value = "驻站消防员、运维人员统计信息查询", notes = "驻站消防员、运维人员统计信息查询")
@RequestMapping(value = "/stats", method = RequestMethod.GET) @RequestMapping(value = "/stats", method = RequestMethod.GET)
public ResponseModel<Object> stats() { public ResponseModel<Object> stats() {
String orgCode = this.getOrgCode(); ReginParams reginParams = getSelectedOrgInfo();
Map<String, Map<String, Number>> personnelStats = iFireResourceSupervisionService.getPersonnelStats(orgCode); String bizOrgCode = reginParams.getPersonIdentity().getCompanyBizOrgCode();
Map<String, Map<String, Number>> personnelStats = iFireResourceSupervisionService.getPersonnelStats(bizOrgCode);
return ResponseHelper.buildResponse(personnelStats); return ResponseHelper.buildResponse(personnelStats);
} }
} }
...@@ -51,6 +51,9 @@ ...@@ -51,6 +51,9 @@
<if test="bizOrgCode != null and bizOrgCode != ''"> <if test="bizOrgCode != null and bizOrgCode != ''">
AND sys.biz_org_code like CONCAT(#{bizOrgCode},'%') AND sys.biz_org_code like CONCAT(#{bizOrgCode},'%')
</if> </if>
<if test="systemStatus != null and systemStatus != ''">
AND sys.system_status = #{systemStatus}
</if>
<if test="companyCode != null and companyCode != ''"> <if test="companyCode != null and companyCode != ''">
AND sys.biz_org_code = #{companyCode} AND sys.biz_org_code = #{companyCode}
</if> </if>
......
...@@ -78,7 +78,7 @@ public class KafkaConsumerService { ...@@ -78,7 +78,7 @@ public class KafkaConsumerService {
* @param message 省级消息 * @param message 省级消息
* @param ack ack * @param ack ack
*/ */
@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") @KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.risk.topics}'.split(',')}", concurrency = "2")
public void consumerSingle1(String message, Acknowledgment ack) { public void consumerSingle1(String message, Acknowledgment ack) {
Optional<?> messages = Optional.ofNullable(message); Optional<?> messages = Optional.ofNullable(message);
if (messages.isPresent()) { if (messages.isPresent()) {
......
...@@ -83,14 +83,14 @@ emqx.max-inflight=1000 ...@@ -83,14 +83,14 @@ emqx.max-inflight=1000
# \u4E0B\u9762\u4E2A\u914D\u7F6E\u9ED8\u8BA4\u7AD9\u7AEF \u4E2D\u5FC3\u7EA7\u7CFB\u7EDF\u7684\u65F6\u5019\u6CE8\u91CA\u6389\u4E0A\u8FB9 \u653E\u5F00\u4E0B\u8FB9 # \u4E0B\u9762\u4E2A\u914D\u7F6E\u9ED8\u8BA4\u7AD9\u7AEF \u4E2D\u5FC3\u7EA7\u7CFB\u7EDF\u7684\u65F6\u5019\u6CE8\u91CA\u6389\u4E0A\u8FB9 \u653E\u5F00\u4E0B\u8FB9
#\u7AD9\u7AEF\u914D\u7F6E #\u7AD9\u7AEF\u914D\u7F6E
#\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E #\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
#kafka.topics=JKXT2BP-XFZX-Topic kafka.topics=JKXT2BP-XFZX-Topic
#\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created, #\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
#emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj #emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created,emq.mcb.zxj
##\u4E2D\u5FC3\u7EA7\u914D\u7F6E\u914D\u7F6E ##\u4E2D\u5FC3\u7EA7\u914D\u7F6E\u914D\u7F6E
##\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E ##\u9700\u8981\u76D1\u542C\u5F97kafka\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E
kafka.topics=JKXT2BP-XFYY-Topic kafka.risk.topics=JKXT2BP-XFYY-Topic
# #
##\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created, ##\u9700\u8981\u76D1\u542C\u5F97eqm\u6D88\u606F\u4E3B\u9898 \u6839\u636E\u662F\u5426\u662F\u4E2D\u5FC3\u6781\u548C\u7AD9\u7AEF\u9009\u62E9\u9700\u8981\u76D1\u542C\u5F97\u4E3B\u9898\u8FDB\u884C\u914D\u7F6E emq.iot.created,
emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning,emq.risk.qrcode.put,emq.risk.qrcode.clean emq.topic=ccs-user-login-info,sync.execute,data/mcb/warning,emq.risk.qrcode.put,emq.risk.qrcode.clean
......
...@@ -304,6 +304,11 @@ ...@@ -304,6 +304,11 @@
</repository> <repository> <id>Snapshots</id> <name>Snapshots</name> <url>http://4v059425e3.zicp.vip:13535/nexus/content/repositories/snapshots/</url> </repository> <repository> <id>Snapshots</id> <name>Snapshots</name> <url>http://4v059425e3.zicp.vip:13535/nexus/content/repositories/snapshots/</url>
</repository> --> </repository> -->
<repository> <repository>
<id>public</id>
<name>public</name>
<url>http://113.142.68.105:8081/nexus/content/repositories/public/</url>
</repository>
<repository>
<id>Releases</id> <id>Releases</id>
<name>Releases</name> <name>Releases</name>
<url>http://36.46.149.14:8081/nexus/content/repositories/releases/</url> <url>http://36.46.149.14:8081/nexus/content/repositories/releases/</url>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment