Commit 750b645c authored by 刘林's avatar 刘林

fix(equip):优化对接IOT代码,优化es代码

parent 1de73fd0
...@@ -73,7 +73,7 @@ public class EquipmentIndexVO implements Serializable { ...@@ -73,7 +73,7 @@ public class EquipmentIndexVO implements Serializable {
private Integer isTrend; private Integer isTrend;
@ApiModelProperty(value = "是否告警") @ApiModelProperty(value = "是否告警")
private Integer isAlarm; private int isAlarm;
@ApiModelProperty(value = "指标枚举") @ApiModelProperty(value = "指标枚举")
private String valueEnum; private String valueEnum;
......
...@@ -11,9 +11,7 @@ import org.springframework.stereotype.Component; ...@@ -11,9 +11,7 @@ import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener; import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.Arrays; import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/** /**
* @author LiuLin * @author LiuLin
...@@ -35,20 +33,13 @@ public class EmqMessageService extends EmqxListener { ...@@ -35,20 +33,13 @@ public class EmqMessageService extends EmqxListener {
@Value("${kafka.topic}") @Value("${kafka.topic}")
private String kafkaTopic; private String kafkaTopic;
ExecutorService service = Executors.newFixedThreadPool(10);
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>(); private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct @PostConstruct
void init() { void init() throws Exception {
new Thread(task_runnable).start(); service.execute(new NumberThread());
String[] split = emqTopic.split(","); emqKeeper.subscript(emqTopic, 0, this);
Arrays.stream(split).forEach(e-> {
try {
emqKeeper.subscript(e, 1, this);
} catch (Exception exception) {
log.info("订阅emq消息失败 ====> message: {}", exception.getMessage());
}
});
} }
@Override @Override
...@@ -59,18 +50,20 @@ public class EmqMessageService extends EmqxListener { ...@@ -59,18 +50,20 @@ public class EmqMessageService extends EmqxListener {
messageResult.put("topic", topic); messageResult.put("topic", topic);
blockingQueue.add(messageResult); blockingQueue.add(messageResult);
} }
class NumberThread implements Runnable {
Runnable task_runnable = new Runnable() { @Override
public void run() { public void run() {
int k = 0; while (true) {
boolean b = true;
while (b) {
k++;
b = k < Integer.MAX_VALUE;
try { try {
JSONObject messageResult = blockingQueue.take(); JSONObject messageResult = blockingQueue.take();
JSONObject result = messageResult.getJSONObject("result"); JSONObject result = messageResult.getJSONObject("result");
if ((messageResult.getString("topic")).equals(emqTopic)) { if ((messageResult.getString("topic")).equals(emqTopic)) {
String dataType = result.getString("dataType");
String address = result.getString("address");
String gatewayId = result.getString("gatewayId");
String value = result.getString("value");
String signalType = result.getString("signalType");
log.info("订阅emq消息 ====> address:{},gatewayId:{},dateType:{},value:{},signalType:{}", address,gatewayId,dataType,value,signalType);
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result)); kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -78,5 +71,5 @@ public class EmqMessageService extends EmqxListener { ...@@ -78,5 +71,5 @@ public class EmqMessageService extends EmqxListener {
} }
} }
} }
}; }
} }
...@@ -82,7 +82,6 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -82,7 +82,6 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
IndicatorData indicatorData = JSON.parseObject(record.value(), IndicatorData.class); IndicatorData indicatorData = JSON.parseObject(record.value(), IndicatorData.class);
String dataType = jsonObject.getString("dataType"); String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address"); String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId"); String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value"); String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId; String key = indexAddress + "_" + gatewayId;
...@@ -92,16 +91,28 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -92,16 +91,28 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
if (redisUtils.hasKey(key)) { if (redisUtils.hasKey(key)) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key), EquipmentIndexVO.class); EquipmentIndexVO equipmentSpeIndex = JSONObject.parseObject(redisUtils.get(key), EquipmentIndexVO.class);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
//更新数据入ES库
Map<String, Object> paramJson = new HashMap<>();
if (!Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("valueF", Float.parseFloat(value));
}
paramJson.put("value", value);
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", new Date());
paramJson.put("unit", equipmentSpeIndex.getUnitName());
elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
Map<String, String> tagsMap = new HashMap<>(); Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key); tagsMap.put("equipmentsIdx", key);
fieldsMap.put("address", indexAddress); fieldsMap.put("address", indexAddress);
fieldsMap.put("gatewayId", gatewayId); fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType); fieldsMap.put("dataType", dataType);
fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm())); fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName()); fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("value", value); fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName()); fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
...@@ -116,7 +127,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -116,7 +127,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel); indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel);
//变位存入influxdb //变位存入influxdb
if ("transformation".equals(signalType)) { if ("transformation".equalsIgnoreCase(signalType)) {
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap); influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
indicatorDataMapper.insert(indicatorData); indicatorDataMapper.insert(indicatorData);
log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue()); log.info("TDEngine入库成功,{},value:{}",indicatorData.getEquipmentsIdx(),indicatorData.getValue());
...@@ -125,20 +136,10 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -125,20 +136,10 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
log.info("总召入库,key:{}",indicatorData.getEquipmentsIdx()); log.info("总召入库,key:{}",indicatorData.getEquipmentsIdx());
} }
//更新数据入ES库 if (0 != equipmentSpeIndex.getIsAlarm()) {
Map<String, Object> paramJson = new HashMap<>();
if (!Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("valueF", Float.parseFloat(value));
}
paramJson.put("value", value);
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", new Date());
paramJson.put("unit", equipmentSpeIndex.getUnitName());
elasticSearchUtil.updateData(ES_INDEX_NAME_JX, key, JSON.toJSONString(paramJson));
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap); fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap)); kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
log.info("===========发送告警信息,key:{}",indicatorData.getEquipmentsIdx());
} }
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -175,7 +176,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -175,7 +176,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
@Override @Override
public void run() { public void run() {
while (true) { while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
pooledExecutor.submit(() -> { pooledExecutor.submit(() -> {
processRecord(record); processRecord(record);
......
...@@ -56,17 +56,14 @@ public class ElasticSearchUtil { ...@@ -56,17 +56,14 @@ public class ElasticSearchUtil {
updateRequest.doc(paramJson, XContentType.JSON); updateRequest.doc(paramJson, XContentType.JSON);
try { try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult()); //log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) { if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
//新增
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id); log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true; return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) { } else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
//修改
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id); log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true; return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) { } else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
//无变化
log.info("索引:[{}],主键:[{}]无变化", indexName, id); log.info("索引:[{}],主键:[{}]无变化", indexName, id);
return true; return true;
} }
......
...@@ -14,7 +14,7 @@ spring.datasource.mysql-server.hikari.connection-timeout= 60000 ...@@ -14,7 +14,7 @@ spring.datasource.mysql-server.hikari.connection-timeout= 60000
spring.datasource.mysql-server.hikari.connection-test-query= SELECT 1 spring.datasource.mysql-server.hikari.connection-test-query= SELECT 1
#TDengine ??? #TDengine ???
spring.datasource.tdengine-server.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver spring.datasource.tdengine-server.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver
spring.datasource.tdengine-server.jdbc-url = jdbc:TAOS-RS://139.9.170.47:6041/iot_data?user=root&password=taosdata&timezone=GMT%2b8&allowMultiQueries=true spring.datasource.tdengine-server.jdbc-url = jdbc:TAOS-RS://139.9.170.47:6041/iot_data_test?user=root&password=taosdata&timezone=GMT%2b8&allowMultiQueries=true
spring.datasource.tdengine-server.username=root spring.datasource.tdengine-server.username=root
spring.datasource.tdengine-server.password=taosdata spring.datasource.tdengine-server.password=taosdata
...@@ -46,34 +46,34 @@ spring.security.user.password=a1234560 ...@@ -46,34 +46,34 @@ spring.security.user.password=a1234560
emqx.clean-session=true emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1,65536]} emqx.client-id=${spring.application.name}-${random.int[1,65536]}
emqx.biz-client-id=consumer-${random.int[1,65536]} emqx.biz-client-id=consumer-${random.int[1,65536]}
emqx.broker=tcp://172.16.3.157:1883 emqx.broker=tcp://139.9.173.44:1883
emqx.client-user-name=admin emqx.client-user-name=admin
emqx.client-password=public emqx.client-password=public
emqx.max-inflight=100 emqx.max-inflight=1000
emqx.keep-alive-interval=100 emqx.keep-alive-interval=10
emqx.biz-topic[0]= iot/data/perspective emqx.biz-topic[0]= iot/data/perspective
# influxDB # influxDB
spring.influx.url=http://172.16.11.201:8086 spring.influx.url=http://139.9.173.44:8086
spring.influx.password=Yeejoin@2020 spring.influx.password=Yeejoin@2020
spring.influx.user=root spring.influx.user=root
spring.influx.database=iot_platform spring.influx.database=iot_platform_test
spring.influx.retention_policy=default spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d spring.influx.retention_policy_time=30d
spring.influx.actions=10000 spring.influx.actions=10000
spring.influx.bufferLimit=20000 spring.influx.bufferLimit=20000
#kafka #kafka
spring.kafka.bootstrap-servers=172.16.3.157:9092 spring.kafka.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.retries=1 spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=172.16.3.157:9092 spring.kafka.producer.bootstrap-servers=139.9.173.44:9092
spring.kafka.producer.batch-size=16384 spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1 spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=messageConsumerGroup spring.kafka.consumer.group-id=messageConsumerGroup
spring.kafka.consumer.bootstrap-servers=172.16.3.157:9092 spring.kafka.consumer.bootstrap-servers=139.9.173.44:9092
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
...@@ -93,5 +93,5 @@ elasticsearch.scheme= http ...@@ -93,5 +93,5 @@ elasticsearch.scheme= http
elasticsearch.connectTimeout= 5000 elasticsearch.connectTimeout= 5000
elasticsearch.socketTimeout= 5000 elasticsearch.socketTimeout= 5000
elasticsearch.connectionRequestTimeout= 5000 elasticsearch.connectionRequestTimeout= 5000
elasticsearch.maxConnectNum= 100 elasticsearch.maxConnectNum= 1000
elasticsearch.maxConnectPerRoute= 100 elasticsearch.maxConnectPerRoute= 1000
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment