Commit c014e982 authored by 刘林's avatar 刘林

fix(equip):优化对接IOT代码

parent 94572d37
...@@ -32,8 +32,8 @@ public class ESEquipments { ...@@ -32,8 +32,8 @@ public class ESEquipments {
private String gatewayId; private String gatewayId;
@Field(type = FieldType.Text) @Field(type = FieldType.Text)
private String isAlarm; private String isAlarm;
@Field(type = FieldType.Date, format = DateFormat.basic_date_time, index = false) //@Field(type = FieldType.Date, format = DateFormat.basic_date_time, index = false)
private Date createdTime; //private Date createdTime;
@Field(type = FieldType.Text , index = false) @Field(type = FieldType.Text , index = false)
private String unit; private String unit;
@Field(type = FieldType.Text) @Field(type = FieldType.Text)
......
...@@ -13,6 +13,7 @@ import org.apache.commons.lang3.ObjectUtils; ...@@ -13,6 +13,7 @@ import org.apache.commons.lang3.ObjectUtils;
import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date;
import java.util.Map; import java.util.Map;
import static com.yeejoin.equip.mqtt.message.MqttConstant.*; import static com.yeejoin.equip.mqtt.message.MqttConstant.*;
...@@ -28,10 +29,14 @@ public class MessageTransfer { ...@@ -28,10 +29,14 @@ public class MessageTransfer {
* 转为原生数据,payload为字节数组 * 转为原生数据,payload为字节数组
**/ **/
public static IndicatorData mqttMessage2RawMessage(String payload, Map<String, Object> headers) { public static IndicatorData mqttMessage2RawMessage(String payload, Map<String, Object> headers) {
log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload)); //log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload));
RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils"); RedisUtils redisUtils = (RedisUtils) SpringUtils.getBean("redisUtils");
IndicatorData indicatorData = JSON.parseObject(payload, IndicatorData.class); IndicatorData indicatorData = JSON.parseObject(payload, IndicatorData.class);
if(!indicatorData.getSignalType().equals("transformation")){
log.info("received TotalSummon message:address{},gateway:{},time:{},signalType:{}", indicatorData.getAddress(),indicatorData.getGatewayId(),
new Date(),indicatorData.getSignalType());
}
//log.info("received raw message, header >>> {}, payload >>> {}", headers, JSONObject.toJSONString(payload));
try { try {
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString(); String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
String[] topicItems = topic.split(TOPIC_SPLITTER); String[] topicItems = topic.split(TOPIC_SPLITTER);
......
package com.yeejoin.equip.service; package com.yeejoin.equip.service;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.yeejoin.equip.entity.ESEquipments; import com.yeejoin.equip.entity.ESEquipments;
...@@ -12,7 +12,6 @@ import org.springframework.scheduling.annotation.Scheduled; ...@@ -12,7 +12,6 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
/** /**
* @author LiuLin * @author LiuLin
* @date 2023/6/25 * @date 2023/6/25
...@@ -34,12 +33,13 @@ public class HandleESMessage2TDService { ...@@ -34,12 +33,13 @@ public class HandleESMessage2TDService {
@Scheduled(cron = "0 */10 * * * ?") @Scheduled(cron = "0 */10 * * * ?")
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void syncEsData2TDEngine() throws Exception { public void syncEsData2TDEngine() throws Exception {
List<ESEquipments> result = elasticSearchUtil.searchResponse(ES_INDEX, null, hit -> JSON.parseObject(hit.getSourceAsString(), ESEquipments.class)); List<ESEquipments> result = elasticSearchUtil.searchResponse(ES_INDEX, null, hit -> JSONObject.parseObject(hit.getSourceAsString(), ESEquipments.class));
List<List<ESEquipments>> allDataList = Lists.partition(result, SIZE); List<List<ESEquipments>> allDataList = Lists.partition(result, SIZE);
for (List<ESEquipments> tempDataList : allDataList) { for (List<ESEquipments> tempDataList : allDataList) {
if (CollectionUtils.isNotEmpty(tempDataList)) { if (CollectionUtils.isNotEmpty(tempDataList)) {
esEquipmentsMapper.batchInsert(tempDataList); esEquipmentsMapper.batchInsert(tempDataList);
} }
} }
log.info("同步ES数据至TDEngine成功!共同步{}条!",result.size());
} }
} }
...@@ -23,6 +23,7 @@ import java.util.Map; ...@@ -23,6 +23,7 @@ import java.util.Map;
@Component("handleMessageService") @Component("handleMessageService")
public class HandleMessageService { public class HandleMessageService {
private static final String MEASUREMENT = "iot_data_"; private static final String MEASUREMENT = "iot_data_";
private static final String TOTAL_DATA_ = "total_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments"; private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
@Autowired @Autowired
protected KafkaProducerService kafkaProducerService; protected KafkaProducerService kafkaProducerService;
...@@ -48,21 +49,16 @@ public class HandleMessageService { ...@@ -48,21 +49,16 @@ public class HandleMessageService {
fieldsMap.put("isAlarm", indicatorData.getIsAlarm()); fieldsMap.put("isAlarm", indicatorData.getIsAlarm());
fieldsMap.put("equipmentSpecificName", indicatorData.getEquipmentSpecificName()); fieldsMap.put("equipmentSpecificName", indicatorData.getEquipmentSpecificName());
fieldsMap.put("value", indicatorData.getValue()); fieldsMap.put("value", indicatorData.getValue());
fieldsMap.put("valueLabel", indicatorData.getValueLabel().isEmpty() ? indicatorData.getValue() : indicatorData.getValueLabel()); fieldsMap.put("valueLabel", indicatorData.getValueLabel());
fieldsMap.put("equipmentIndexName", indicatorData.getEquipmentIndexName()); fieldsMap.put("equipmentIndexName", indicatorData.getEquipmentIndexName());
fieldsMap.put("unit", indicatorData.getUnit()); fieldsMap.put("unit", indicatorData.getUnit());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date())); fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
if ("transformation".equals(indicatorData.getSignalType())) {
influxDbConnection.insert(MEASUREMENT + indicatorData.getGatewayId(), tagsMap, fieldsMap);
indicatorDataMapper.insert(indicatorData);
}
//更新数据入ES库 //更新数据入ES库
Map<String, Object> paramJson = new HashMap<>(); Map<String, Object> paramJson = new HashMap<>();
paramJson.put("valueF", indicatorData.getValueF()); paramJson.put("valueF", indicatorData.getValueF());
paramJson.put("value", indicatorData.getValue()); paramJson.put("value", indicatorData.getValue());
paramJson.put("valueLabel", indicatorData.getValueLabel().isEmpty() ? indicatorData.getValue() : indicatorData.getValueLabel()); paramJson.put("valueLabel", indicatorData.getValueLabel());
paramJson.put("createdTime", new Date()); paramJson.put("createdTime", new Date());
paramJson.put("unit", indicatorData.getUnit()); paramJson.put("unit", indicatorData.getUnit());
elasticSearchUtil.updateData(ES_INDEX_NAME_JX, indicatorData.getEquipmentsIdx(), JSON.toJSONString(paramJson)); elasticSearchUtil.updateData(ES_INDEX_NAME_JX, indicatorData.getEquipmentsIdx(), JSON.toJSONString(paramJson));
...@@ -71,6 +67,15 @@ public class HandleMessageService { ...@@ -71,6 +67,15 @@ public class HandleMessageService {
fieldsMap.putAll(tagsMap); fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap)); kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
} }
if ("transformation".equals(indicatorData.getSignalType())) {
influxDbConnection.insert(MEASUREMENT + indicatorData.getGatewayId(), tagsMap, fieldsMap);
indicatorDataMapper.insert(indicatorData);
log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue());
}else{
influxDbConnection.insert(TOTAL_DATA_ + indicatorData.getGatewayId(), tagsMap, fieldsMap);
}
} catch (Exception e) { } catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e); log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
} }
......
...@@ -35,7 +35,7 @@ public class ElasticSearchClient { ...@@ -35,7 +35,7 @@ public class ElasticSearchClient {
.withConnectTimeout(elasticSearchConfig.getConnectTimeout()) .withConnectTimeout(elasticSearchConfig.getConnectTimeout())
.withBasicAuth(elasticSearchConfig.getUsername(), elasticSearchConfig.getPassword()) .withBasicAuth(elasticSearchConfig.getUsername(), elasticSearchConfig.getPassword())
.build(); .build();
}catch (Exception e){ } catch (Exception e) {
log.error("连接ES异常" + e.getMessage(), e); log.error("连接ES异常" + e.getMessage(), e);
} }
return RestClients.create(Objects.requireNonNull(clientConfiguration)).rest(); return RestClients.create(Objects.requireNonNull(clientConfiguration)).rest();
......
package com.yeejoin.equip.utils; package com.yeejoin.equip.utils;
import com.alibaba.fastjson.JSON;
import com.yeejoin.equip.entity.ESEquipments;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
...@@ -44,18 +44,36 @@ public class ElasticSearchUtil { ...@@ -44,18 +44,36 @@ public class ElasticSearchUtil {
* @param indexName 索引名称 * @param indexName 索引名称
* @param id 主键 * @param id 主键
* @param paramJson 参数JSON * @param paramJson 参数JSON
* @return
*/ */
public void updateData(String indexName, String id, String paramJson) { public boolean updateData(String indexName, String id, String paramJson) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id); UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增 //如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(false); updateRequest.docAsUpsert(true);
//立即刷新数据
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON); updateRequest.doc(paramJson, XContentType.JSON);
try { try {
restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
//新增
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
//修改
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
//无变化
log.info("索引:[{}],主键:[{}]无变化", indexName, id);
return true;
}
} catch (IOException e) { } catch (IOException e) {
log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e.getMessage()); log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e);
return false;
} }
return false;
} }
/** /**
......
...@@ -43,8 +43,8 @@ spring.security.user.name=admin ...@@ -43,8 +43,8 @@ spring.security.user.name=admin
spring.security.user.password=a1234560 spring.security.user.password=a1234560
emqx.clean-session=true emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]} emqx.client-id=${spring.application.name}-${random.int[1,65536]}
emqx.biz-client-id=consumer-${random.int[1024,65536]} emqx.biz-client-id=consumer-${random.int[1,65536]}
emqx.broker=tcp://172.16.3.157:1883 emqx.broker=tcp://172.16.3.157:1883
emqx.client-user-name=admin emqx.client-user-name=admin
emqx.client-password=public emqx.client-password=public
......
spring:
datasource:
mysql-server:
driver-class-name: com.mysql.cj.jdbc.Driver
password: Yeejoin@2020
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://139.9.173.44:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
username: root
hikari:
auto-commit: true
pool-name: DatebookHikariCP
connection-test-query: SELECT 1
connection-timeout: 60000
idle-timeout: 500000
max-lifetime: 1800000
maximum-pool-size: 30
minimum-idle: 3
tdengine-server:
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
jdbc-url: jdbc:TAOS-RS://172.16.3.157:6041/test?user=root&password=taosdata&timezone=GMT%2b8
username: root
password: taosdata
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
<springProperty scope="context" name="PATTERN" source="log.pattern" <springProperty scope="context" name="PATTERN" source="log.pattern"
defaultValue="-|%d{yyyy-MM-dd HH:mm:ss.SSS}|%-5level|%X{tid}|%thread|%logger{36}.%M:%L-%msg%n"/> defaultValue="-|%d{yyyy-MM-dd HH:mm:ss.SSS}|%-5level|%X{tid}|%thread|%logger{36}.%M:%L-%msg%n"/>
<property name="LOG_NAME" value="message"/> <property name="LOG_NAME" value="equip"/>
<property name="LOG_PATH" value="./logs"/> <property name="LOG_PATH" value="./logs"/>
<property name="LOG_DIR" value="${LOG_PATH}/${LOG_NAME}/%d{yyyyMMdd}"/> <property name="LOG_DIR" value="${LOG_PATH}/${LOG_NAME}/%d{yyyyMMdd}"/>
<property name="CHARSET" value="UTF-8"/> <property name="CHARSET" value="UTF-8"/>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment