Commit e96041b9 authored by 刘林's avatar 刘林

fix(equip):江西电建装备服务接收iot代码优化

parent 83e4a8c1
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
<dependency> <dependency>
<groupId>com.yeejoin</groupId> <groupId>com.yeejoin</groupId>
<artifactId>amos-component-influxdb</artifactId> <artifactId>amos-component-influxdb</artifactId>
<version>1.8.5-SNAPSHOT</version> <version>1.9.0-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
......
...@@ -91,13 +91,13 @@ ...@@ -91,13 +91,13 @@
// Map<String, String> tagsMap = new HashMap<>(); // Map<String, String> tagsMap = new HashMap<>();
// Map<String, Object> fieldsMap = new HashMap<>(); // Map<String, Object> fieldsMap = new HashMap<>();
// tagsMap.put("equipmentsIdx", key); // tagsMap.put("equipmentsIdx", key);
// tagsMap.put("address", indexAddress); // fieldsMap.put("address", indexAddress);
// tagsMap.put("gatewayId", gatewayId); // fieldsMap.put("gatewayId", gatewayId);
// tagsMap.put("dataType", dataType); // fieldsMap.put("dataType", dataType);
// tagsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName()); // fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// tagsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex)); // fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
// tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName()); // fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
// tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm())); // fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
// //
// String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum()); // String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
// fieldsMap.put("traceId", traceId); // fieldsMap.put("traceId", traceId);
......
...@@ -37,6 +37,8 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -37,6 +37,8 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
createThreadFactory()); createThreadFactory());
//iot转发实时消息存入influxdb前缀 //iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_"; private static final String MEASUREMENT = "iot_data_";
private static final String INDICATORS = "indicators_";
private static final String TOTAL_CALL_DATA = "total_call_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments"; private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
//装备更新最新消息存入influxdb前缀 //装备更新最新消息存入influxdb前缀
private static final String TRUE = "true"; private static final String TRUE = "true";
...@@ -72,6 +74,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -72,6 +74,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
@Override @Override
public void run(String... args) { public void run(String... args) {
Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic)); Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic));
//4.启动线程
thread.start(); thread.start();
} }
...@@ -85,6 +88,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -85,6 +88,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
String gatewayId = jsonObject.getString("gatewayId"); String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value"); String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId; String key = indexAddress + "_" + gatewayId;
String signalType = jsonObject.getString("signalType");
try { try {
if (equipmentIndexVOMap.get(key) != null) { if (equipmentIndexVOMap.get(key) != null) {
...@@ -93,12 +97,11 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -93,12 +97,11 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
Map<String, String> tagsMap = new HashMap<>(); Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key); tagsMap.put("equipmentsIdx", key);
tagsMap.put("address", indexAddress);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
fieldsMap.put("address", indexAddress);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm())); fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName()); fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum()); String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
...@@ -111,18 +114,24 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -111,18 +114,24 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex)); fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap); influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
//更新数据入ES库 //总召入库
if(!"transformation".equals(signalType)){
influxDbConnection.insert(TOTAL_CALL_DATA + gatewayId, tagsMap, fieldsMap);
}
influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
////更新数据入ES库
Map<String, Object> paramJson = new HashMap<>(); Map<String, Object> paramJson = new HashMap<>();
if (Arrays.asList(TRUE, FALSE).contains(value)) { if (Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("value", value); paramJson.put("value", value);
}else{ }else{
paramJson.put("value", Float.parseFloat(value)); paramJson.put("valueDouble", Float.parseFloat(value));
} }
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", simpleDateFormat.format(new Date())); paramJson.put("createdTime", new Date());
paramJson.put("unit", equipmentSpeIndex.getUnitName());
elasticSearchUtil.updateData(ES_INDEX_NAME_JX,key,JSON.toJSONString(paramJson)); elasticSearchUtil.updateData(ES_INDEX_NAME_JX,key,JSON.toJSONString(paramJson));
//influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) { if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap); fieldsMap.putAll(tagsMap);
kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap)); kafkaProducerService.sendMessageAsync(alarmTopic, JSON.toJSONString(fieldsMap));
...@@ -171,31 +180,5 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -171,31 +180,5 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
} }
} }
} }
//@Override
//public void run() {
// while (true) {
// ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// for (TopicPartition topicPartition : records.partitions()) {
// List<ConsumerRecord<String, String>> recordList = new ArrayList<>(records.records(topicPartition));
// Iterator<ConsumerRecord<String, String>> it = recordList.iterator();
// while (it.hasNext()) {
// ConsumerRecord<String, String> record = it.next();
// long startTime = System.currentTimeMillis();
// long lastOffset = recordList.get(recordList.size() - 1).offset();
// try {
// kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1)));
// } catch (Exception e) {
// log.error("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},value{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - startTime), record.value(), e);
// break;
// }
// pooledExecutor.submit(() -> {
// processRecord(record, equipmentIndexVOMap);
// });
// it.remove();
// }
// }
// }
//}
} }
} }
...@@ -63,7 +63,7 @@ public class KafkaProducerService { ...@@ -63,7 +63,7 @@ public class KafkaProducerService {
} }
@Override @Override
public void onSuccess(SendResult<String, String> stringStringSendResult) { public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("发送消息(异步) success! topic: {}, message: {}", topic, message); //log.info("发送消息(异步) success! topic: {}, message: {}", topic, message);
} }
}); });
} }
......
package com.yeejoin.equip.utils;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import java.time.Duration;
@Configuration
public class ElasticSearchConfig {
@Value("${spring.elasticsearch.rest.uris}")
private String uris;
@Value("${elasticsearch.username}")
private String username;
@Value("${elasticsearch.password}")
private String password;
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS = builder.build();
}
@Bean
public RestHighLevelClient client() {
ClientConfiguration clientConfiguration = null;
try {
clientConfiguration = ClientConfiguration.builder()
.connectedTo(uris)
.withConnectTimeout(Duration.ofSeconds(5))
.withSocketTimeout(Duration.ofSeconds(3))
.withBasicAuth(username, password)
.build();
}catch (Exception e){
e.printStackTrace();
}
assert clientConfiguration != null;
return RestClients.create(clientConfiguration).rest();
}
}
\ No newline at end of file
...@@ -40,14 +40,14 @@ public class ElasticSearchUtil { ...@@ -40,14 +40,14 @@ public class ElasticSearchUtil {
updateRequest.doc(paramJson, XContentType.JSON); updateRequest.doc(paramJson, XContentType.JSON);
try { try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult()); //log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) { if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
//新增 //新增
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id); log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true; return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) { } else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
//修改 //修改
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id); //log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true; return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) { } else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
//无变化 //无变化
......
...@@ -80,9 +80,8 @@ kafka.topic=PERSPECTIVE ...@@ -80,9 +80,8 @@ kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective emq.topic=iot/data/perspective
kafka.alarm.topic=EQUIPMENT_ALARM kafka.alarm.topic=EQUIPMENT_ALARM
spring.elasticsearch.rest.uris=http://39.98.224.23:9200 ## ES properties:
spring.elasticsearch.rest.connection-timeout=30000 biz.elasticsearch.address=139.9.173.44
spring.elasticsearch.rest.username=elastic spring.elasticsearch.rest.uris=${biz.elasticsearch.address}:9200
spring.elasticsearch.rest.password=123456 elasticsearch.username= elastic
spring.elasticsearch.rest.read-timeout=30000 elasticsearch.password= Yeejoin@2020
management.health.elasticsearch.enabled=false
\ No newline at end of file
spring.application.name=AMOS-DATA-EQUIP spring.application.name=AMOS-DATA-EQUIP
server.servlet.context-path=/data-equip server.servlet.context-path=/data-equip
server.port=8200 server.port=8100
spring.profiles.active=dev spring.profiles.active=dev
server.compression.enabled=true server.compression.enabled=true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment