Commit 53cb5759 authored by zhangsen's avatar zhangsen

CCS接收消息部分提交

parent c5bb4b6a
...@@ -184,4 +184,16 @@ public class FireEquipmentSignalLog extends BaseEntity { ...@@ -184,4 +184,16 @@ public class FireEquipmentSignalLog extends BaseEntity {
*/ */
private String protectedObjectName; private String protectedObjectName;
/**
* 上报信号标识(每次信号都有唯一的标识,与站内报警数据进行关联)
*/
@TableField("signal_id")
private String signalId;
/**
* 问题编号
*/
@TableField("question_num")
private String questionNum;
} }
package com.yeejoin.amos.boot.module.ccs.api.service;
public interface MqttReceiveService {
/**
* 增量数据处理
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIncrementMessage(String topic, String message);
}
package com.yeejoin.amos.boot.module.ccs.api.vo;
import lombok.Data;
/**
* @author keyong
* @title: IotDataVO
* <pre>
* @description: 物联系统发送的增量数据封装VO
* </pre>
* @date 2021/1/7 17:44
*/
@Data
public class MqttDataVO {
private String key;
private Object value;
}
...@@ -40,6 +40,9 @@ ...@@ -40,6 +40,9 @@
where where
a.station_code = s.code a.station_code = s.code
and s.status = false and s.status = false
<if test="isAlarm != null">
and is_alarm = #{isAlarm}
</if>
<if test="stationCode != null and stationCode != ''"> <if test="stationCode != null and stationCode != ''">
and a.station_code = #{stationCode} and a.station_code = #{stationCode}
</if> </if>
......
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
<artifactId>amos-boot-module-ccs-api</artifactId> <artifactId>amos-boot-module-ccs-api</artifactId>
<version>${amos-biz-boot.version}</version> <version>${amos-biz-boot.version}</version>
</dependency> </dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.6.3</version>
</dependency>
</dependencies> </dependencies>
......
package com.yeejoin.amos.boot.module.ccs.biz.config;
import com.github.pagehelper.util.StringUtil;
import com.yeejoin.amos.boot.module.ccs.api.service.MqttReceiveService;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import java.util.ArrayList;
import java.util.List;
/**
* @author keyong
* @title: CcsMqttReceiveConfig
* <pre>
* @description: MQTT订阅模式消费类
* </pre>
* @date 2020/10/29 19:23
*/
@Configuration
@IntegrationComponentScan
public class CcsMqttReceiveConfig {
@Value("${emqx.user-name}")
private String userName;
@Value("${emqx.password}")
private String password;
@Value("${emqx.broker}")
private String hostUrl;
@Value("${emqx.client-id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
// @Value("${spring.mqtt.completionTimeout}")
// private int completionTimeout;
// 全局变量adapter
public MqttPahoMessageDrivenChannelAdapter adapter;
@Autowired
public MqttReceiveService mqttReceiveService;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(userName);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(20);
mqttConnectOptions.setAutomaticReconnect(true);
// mqttConnectOptions.setConnectionTimeout(0);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
// 接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttPahoClientFactory(), defaultTopic);
// adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(0);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
// int endIndex = topic.lastIndexOf("/");
// if (endIndex > 0 && StringUtil.isNotEmpty(String.valueOf(message))) {
// String dataType = topic.substring(endIndex + 1);
// if (dataType.equals("property") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
// }
// }
};
}
}
package com.yeejoin.amos.boot.module.ccs.biz.service.impl; package com.yeejoin.amos.boot.module.ccs.biz.service.impl;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils; import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.module.ccs.api.dto.FireEquipmentSignalLogDto; import com.yeejoin.amos.boot.module.ccs.api.dto.FireEquipmentSignalLogDto;
import com.yeejoin.amos.boot.module.ccs.api.entity.FireEquipmentSignalLog; import com.yeejoin.amos.boot.module.ccs.api.entity.FireEquipmentSignalLog;
import com.yeejoin.amos.boot.module.ccs.api.mapper.FireEquipmentSignalLogMapper; import com.yeejoin.amos.boot.module.ccs.api.mapper.FireEquipmentSignalLogMapper;
import com.yeejoin.amos.boot.module.ccs.api.service.IFireEquipmentSignalLogService; import com.yeejoin.amos.boot.module.ccs.api.service.IFireEquipmentSignalLogService;
import com.yeejoin.amos.boot.module.ccs.biz.util.RequestUtil; import com.yeejoin.amos.boot.module.ccs.biz.util.RequestUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.typroject.tyboot.core.rdbms.service.BaseService; import org.typroject.tyboot.core.rdbms.service.BaseService;
...@@ -47,4 +49,22 @@ public class FireEquipmentSignalLogServiceImpl extends BaseService<FireEquipment ...@@ -47,4 +49,22 @@ public class FireEquipmentSignalLogServiceImpl extends BaseService<FireEquipment
params.put("stationCode",stationCode); params.put("stationCode",stationCode);
return this.getBaseMapper().queryAlarmLogList(params); return this.getBaseMapper().queryAlarmLogList(params);
} }
public void saveBatchLog(List<FireEquipmentSignalLog> list) {
if (CollectionUtils.isNotEmpty(list)) {
this.saveBatch(list);
}
}
//TODO 修改待补充
public void updateBatchLog(List<FireEquipmentSignalLog> list) {
if (CollectionUtils.isNotEmpty(list)) {
list.stream().forEach(item -> {
UpdateWrapper<FireEquipmentSignalLog> wrapper = new UpdateWrapper<>();
wrapper.lambda().eq(FireEquipmentSignalLog::getFireEquipmentMrid, item.getFireEquipmentMrid())
.eq(FireEquipmentSignalLog::getFireEquipmentIndexKey, item.getFireEquipmentIndexKey());
});
}
}
} }
\ No newline at end of file
package com.yeejoin.amos.boot.module.ccs.biz.service.impl;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONArray;
import com.github.xiaoymin.knife4j.core.util.StrUtil;
import com.yeejoin.amos.boot.module.ccs.api.entity.FireEquipmentSignalLog;
import com.yeejoin.amos.boot.module.ccs.api.service.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {
private static Boolean bool = Boolean.FALSE;
@Autowired
private FireEquipmentSignalLogServiceImpl fireEquipmentSignalLogServiceImpl;
//TODO 逻辑待补充
@Override
public void handlerMqttIncrementMessage(String topic, String message) {
List<Map<String, String>> list = new ArrayList<>();
try {
list = (List<Map<String, String>>) JSONArray.parseObject(message, List.class);
// 将JSON字符串转换成实体类型
// List<FireEquipmentSignalLog> log = JSONUtil.toBean(message, FireEquipmentSignalLog.class);
} catch (Exception e) {
log.error("解析失败");
throw new BadRequest("解析失败");
}
if (CollectionUtils.isNotEmpty(list)) {
List<FireEquipmentSignalLog> doList = new ArrayList<>();
for (Map<String, String> stringStringMap : list) {
FireEquipmentSignalLog fireEquipmentSignalLog = new FireEquipmentSignalLog();
if (stringStringMap.containsKey("objcect_type") && StrUtil.isNotBlank(stringStringMap.get("objcect_type"))) {
if (!stringStringMap.get("objcect_type").equals("EQUIP")) {
continue;
}
}
//当为告警时新增数据
doList.add(fireEquipmentSignalLog);
//当为消除告警时修改原有数据 (根据设备编码、)
}
if (CollectionUtils.isNotEmpty(doList)) {
fireEquipmentSignalLogServiceImpl.saveBatchLog(doList);
}
}
log.info(String.format("收到mqtt消息:%s", message));
}
}
...@@ -126,7 +126,7 @@ public class EquipmentAlarmController extends AbstractBaseController { ...@@ -126,7 +126,7 @@ public class EquipmentAlarmController extends AbstractBaseController {
queryRequests.add(request4); queryRequests.add(request4);
CommonRequest request5 = new CommonRequest(); CommonRequest request5 = new CommonRequest();
request5.setName("orgCode"); request5.setName("orgCode");
request5.setValue(this.getOrgCode()); request5.setValue(null);
queryRequests.add(request5); queryRequests.add(request5);
CommonRequest request6 = new CommonRequest(); CommonRequest request6 = new CommonRequest();
request6.setName("handleStatus"); request6.setName("handleStatus");
......
## DB properties: ## DB properties:
spring.datasource.url=jdbc:mysql://172.16.10.85:3306/amos-ccs-biz?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8 spring.datasource.url=jdbc:mysql://172.16.11.201:3306/dl_ccs_biz?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8
spring.datasource.username=root spring.datasource.username=root
spring.datasource.password=Amos2019Mysql8 spring.datasource.password=Yeejoin@2020
## eureka properties: ## eureka properties:
eureka.instance.hostname=172.16.11.20 eureka.instance.hostname=172.16.11.201
eureka.client.serviceUrl.defaultZone=http://${eureka.instance.hostname}:10001/eureka/ eureka.client.serviceUrl.defaultZone=http://${eureka.instance.hostname}:10001/eureka/
## redis properties: ## redis properties:
spring.redis.database=1 spring.redis.database=1
spring.redis.host=172.16.11.20 spring.redis.host=172.16.11.201
spring.redis.port=6379 spring.redis.port=6379
spring.redis.password=1234560 spring.redis.password=1234560
## emqx properties: ## emqx properties:
#emqx.clean-session=true emqx.clean-session=true
#emqx.client-id=${spring.application.name}-${random.int[1024,65536]} emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
#emqx.broker=tcp://172.16.11.33:1883 emqx.broker=tcp://172.16.11.201:1883
#emqx.user-name=admin emqx.user-name=admin
#emqx.password=public emqx.password=public
\ No newline at end of file
#mqtt.scene.host=mqtt://172.16.10.201:18083/mqtt
#mqtt.client.product.id=mqtt
mqtt.topic=topic_mqtt
#spring.mqtt.completionTimeout=3000
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment