Commit 190fcbc8 authored by 刘林's avatar 刘林

fix(equip):添加tdengine

parent af3fc8a6
...@@ -95,6 +95,13 @@ ...@@ -95,6 +95,13 @@
<artifactId>spring-data-elasticsearch</artifactId> <artifactId>spring-data-elasticsearch</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.yeejoin.equip.config; package com.yeejoin.equip.config;
import com.yeejoin.equip.entity.EquipmentIndexVO; import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper; import com.yeejoin.equip.mapper.mysql.EquipmentSpecificIndexMapper;
import com.yeejoin.equip.service.KafkaMessageService;
import com.yeejoin.equip.utils.RedisKey; import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils; import com.yeejoin.equip.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -18,8 +21,9 @@ import java.util.stream.Collectors; ...@@ -18,8 +21,9 @@ import java.util.stream.Collectors;
* @date 2023/6/15 * @date 2023/6/15
* @apiNote * @apiNote
*/ */
@Component
@Slf4j @Slf4j
@Component
@Transactional(transactionManager = "mysqlTransactionManager")
public class EquipmentIndexCacheRunner implements CommandLineRunner { public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Resource @Resource
...@@ -27,20 +31,18 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner { ...@@ -27,20 +31,18 @@ public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Resource @Resource
private RedisUtils redisUtils; private RedisUtils redisUtils;
@Autowired
private KafkaMessageService kafkaMessageService;
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
log.info(">>服务启动执行,执行预加载数据等操作"); log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS); redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null); List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream() Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null) .filter(v -> v.getGatewayId() != null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1)); .collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity(),(v1, v2) -> v1));
Map<String, Object> equipmentIndexKeyMap = equipSpecificIndexList.stream()
.filter(v -> v.getIndexAddress() != null && v.getGatewayId() == null)
.collect(Collectors.toMap(EquipmentIndexVO::getIndexAddress, Function.identity(),(v1, v2) -> v1));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap); redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS_KEY, equipmentIndexKeyMap); kafkaMessageService.init();
} }
} }
\ No newline at end of file
package com.yeejoin.equip.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @author LiuLin
* @date 2023年09月15日 10:42
*/
@Configuration
@MapperScan(basePackages = {"com.yeejoin.equip.mapper.mysql"}, sqlSessionTemplateRef = "mysqlSqlSessionTemplate")
public class MysqlServerConfig {
@Bean(name = "mysqlDataSource")
@ConfigurationProperties(prefix = "spring.datasource.mysql-server")
@Primary
public DataSource mysqlDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "mysqlSqlSessionFactory")
@Primary
public SqlSessionFactory mysqlSqlSessionFactory(@Qualifier("mysqlDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/mysql/*.xml"));
return bean.getObject();
}
@Bean(name = "mysqlTransactionManager")
@Primary
public DataSourceTransactionManager mysqlTransactionManager(@Qualifier("mysqlDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "mysqlSqlSessionTemplate")
@Primary
public SqlSessionTemplate mysqlSqlSessionTemplate(@Qualifier("mysqlSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
package com.yeejoin.equip.config;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @author LiuLin
* @date 2023年09月15日 10:55
*/
@Configuration
@MapperScan(basePackages = {"com.yeejoin.equip.mapper.tdengine"}, sqlSessionTemplateRef = "tdEngineSqlSessionTemplate")
public class TDEngineServerConfig {
@Bean(name = "tdEngineDataSource")
@ConfigurationProperties(prefix = "spring.datasource.td-engine-server")
public DataSource tdEngineDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "tdEngineSqlSessionFactory")
public SqlSessionFactory tdEngineSqlSessionFactory(@Qualifier("tdEngineDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/tdengine/*.xml"));
return bean.getObject();
}
@Bean(name = "tdEngineTransactionManager")
public DataSourceTransactionManager tdEngineTransactionManager(@Qualifier("tdEngineDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "tdEngineSqlSessionTemplate")
public SqlSessionTemplate tdEngineSqlSessionTemplate(@Qualifier("tdEngineSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
package com.yeejoin.equip.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
/**
* @author LiuLin
* @Description: 指标数据表
* @date 2023/09/14 14:30
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class IndicatorData {
private String address;
private String gatewayId;
private String equipmentsIdx;
private String dataType;
private String isAlarm;
private String equipmentSpecificName;
private String equipmentIndexName;
private String valueLabel;
private String value;
private String unit;
private Date createdTime;
}
...@@ -39,44 +39,40 @@ public class EmqMessageService extends EmqxListener { ...@@ -39,44 +39,40 @@ public class EmqMessageService extends EmqxListener {
private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>(); private static final BlockingQueue<JSONObject> blockingQueue = new LinkedBlockingQueue<>();
@PostConstruct @PostConstruct
void init() { void init() throws Exception {
new Thread(task_runnable).start(); emqKeeper.subscript(emqTopic, 1, this);
String[] split = emqTopic.split(",");
Arrays.stream(split).forEach(e-> {
try {
emqKeeper.subscript(e, 1, this);
} catch (Exception exception) {
log.info("订阅emq消息失败 ====> message: {}", exception.getMessage());
}
});
} }
@Override @Override
public void processMessage(String topic, MqttMessage message) throws Exception { public void processMessage(String topic, MqttMessage message) throws Exception {
JSONObject result = JSONObject.fromObject(new String(message.getPayload())); JSONObject result = JSONObject.fromObject(new String(message.getPayload()));
JSONObject messageResult = new JSONObject(); //JSONObject messageResult = new JSONObject();
messageResult.put("result", result); //messageResult.put("result", result);
messageResult.put("topic", topic); //messageResult.put("topic", topic);
blockingQueue.add(messageResult); //blockingQueue.add(messageResult);
}
Runnable task_runnable = new Runnable() { if (topic.equals(emqTopic)) {
public void run() { kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
int k = 0;
boolean b = true;
while (b) {
k++;
b = k < Integer.MAX_VALUE;
try {
JSONObject messageResult = blockingQueue.take();
JSONObject result = messageResult.getJSONObject("result");
if ((messageResult.getString("topic")).equals(emqTopic)) {
kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
}
} catch (Exception e) {
Thread.currentThread().interrupt();
}
}
} }
}; }
//Runnable task_runnable = new Runnable() {
// public void run() {
// int k = 0;
// boolean b = true;
// while (b) {
// k++;
// b = k < Integer.MAX_VALUE;
// try {
// JSONObject messageResult = blockingQueue.take();
// JSONObject result = messageResult.getJSONObject("result");
// if ((messageResult.getString("topic")).equals(emqTopic)) {
// kafkaProducerService.sendMessageAsync(kafkaTopic,JSON.toJSONString(result));
// }
// } catch (Exception e) {
// Thread.currentThread().interrupt();
// }
// }
// }
//};
} }
...@@ -6,6 +6,8 @@ import com.alibaba.fastjson.JSONObject; ...@@ -6,6 +6,8 @@ import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection; import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.config.KafkaConsumerConfig; import com.yeejoin.equip.config.KafkaConsumerConfig;
import com.yeejoin.equip.entity.EquipmentIndexVO; import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.entity.IndicatorData;
import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
import com.yeejoin.equip.utils.ElasticSearchUtil; import com.yeejoin.equip.utils.ElasticSearchUtil;
import com.yeejoin.equip.utils.RedisKey; import com.yeejoin.equip.utils.RedisKey;
import com.yeejoin.equip.utils.RedisUtils; import com.yeejoin.equip.utils.RedisUtils;
...@@ -37,14 +39,10 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -37,14 +39,10 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
createThreadFactory()); createThreadFactory());
//iot转发实时消息存入influxdb前缀 //iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_"; private static final String MEASUREMENT = "iot_data_";
private static final String INDICATORS = "indicators_";
private static final String TOTAL_CALL_DATA = "total_call_data_";
private static final String ES_INDEX_NAME_JX = "jxiop_equipments"; private static final String ES_INDEX_NAME_JX = "jxiop_equipments";
//装备更新最新消息存入influxdb前缀 //装备更新最新消息存入influxdb前缀
private static final String TRUE = "true"; private static final String TRUE = "true";
private static final String FALSE = "false"; private static final String FALSE = "false";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@Autowired @Autowired
protected KafkaProducerService kafkaProducerService; protected KafkaProducerService kafkaProducerService;
@Autowired @Autowired
...@@ -53,6 +51,8 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -53,6 +51,8 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
private InfluxDbConnection influxDbConnection; private InfluxDbConnection influxDbConnection;
@Autowired @Autowired
private RedisUtils redisUtils; private RedisUtils redisUtils;
@Autowired
private IndicatorDataMapper indicatorDataMapper;
@Value("${kafka.alarm.topic}") @Value("${kafka.alarm.topic}")
private String alarmTopic; private String alarmTopic;
...@@ -66,7 +66,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -66,7 +66,7 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
private static ThreadFactory createThreadFactory() { private static ThreadFactory createThreadFactory() {
return runnable -> { return runnable -> {
Thread thread = new Thread(runnable); Thread thread = new Thread(runnable);
thread.setName(String.format("clojure-agent-send-pool-%d", KafkaConsumerWithThread.sendThreadPoolCounter.getAndIncrement())); thread.setName(String.format("kafka-consumer-iot-pool-%d", KafkaConsumerWithThread.sendThreadPoolCounter.getAndIncrement()));
return thread; return thread;
}; };
} }
...@@ -74,14 +74,12 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -74,14 +74,12 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
@Override @Override
public void run(String... args) { public void run(String... args) {
Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic)); Thread thread = new Thread(new KafkaConsumerThread(consumerConfig.consumerConfigs(), topic));
//4.启动线程
thread.start(); thread.start();
} }
private void processRecord(ConsumerRecord<String, String> record, Map<Object, Object> equipmentIndexVOMap) { private void processRecord(ConsumerRecord<String, String> record, Map<Object, Object> equipmentIndexVOMap) {
// 处理消息记录
//log.info("监听Kafka集群message:{}",record.value());
JSONObject jsonObject = JSONObject.parseObject(record.value()); JSONObject jsonObject = JSONObject.parseObject(record.value());
IndicatorData indicatorData = JSON.parseObject(record.value(), IndicatorData.class);
String dataType = jsonObject.getString("dataType"); String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address"); String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId"); String traceId = jsonObject.getString("traceId");
...@@ -105,28 +103,31 @@ public class KafkaConsumerWithThread implements CommandLineRunner { ...@@ -105,28 +103,31 @@ public class KafkaConsumerWithThread implements CommandLineRunner {
fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm())); fieldsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName()); fieldsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum()); String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value); fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); fieldsMap.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName()); fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName()); fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date())); fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
//总召入库 indicatorData.setIsAlarm(String.valueOf(equipmentSpeIndex.getIsAlarm()));
if(!"transformation".equals(signalType)){ indicatorData.setEquipmentIndexName(equipmentSpeIndex.getEquipmentIndexName());
influxDbConnection.insert(TOTAL_CALL_DATA + gatewayId, tagsMap, fieldsMap); indicatorData.setEquipmentSpecificName(equipmentSpeIndex.getEquipmentSpecificName());
indicatorData.setUnit(equipmentSpeIndex.getUnitName());
indicatorData.setEquipmentsIdx(key);
indicatorData.setValueLabel(valueLabel.isEmpty() ? value : valueLabel);
//变位存入influxdb
if ("transformation".equals(signalType)){
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
indicatorDataMapper.insert(indicatorData);
} }
influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
////更新数据入ES库 //更新数据入ES库
Map<String, Object> paramJson = new HashMap<>(); Map<String, Object> paramJson = new HashMap<>();
if (Arrays.asList(TRUE, FALSE).contains(value)) { if (!Arrays.asList(TRUE, FALSE).contains(value)) {
paramJson.put("value", value);
}else{
paramJson.put("valueDouble", Float.parseFloat(value)); paramJson.put("valueDouble", Float.parseFloat(value));
} }
paramJson.put("value", value);
paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel); paramJson.put("valueLabel", valueLabel.isEmpty() ? value : valueLabel);
paramJson.put("createdTime", new Date()); paramJson.put("createdTime", new Date());
paramJson.put("unit", equipmentSpeIndex.getUnitName()); paramJson.put("unit", equipmentSpeIndex.getUnitName());
......
package com.yeejoin.equip.mapper; package com.yeejoin.equip.mapper.mysql;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.equip.entity.EquipmentIndexVO; import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.entity.EquipmentSpecificIndex; import com.yeejoin.equip.entity.EquipmentSpecificIndex;
import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
/** /**
...@@ -11,7 +11,7 @@ import java.util.List; ...@@ -11,7 +11,7 @@ import java.util.List;
* @date 2020/10/30 11:16 * @date 2020/10/30 11:16
* @since v2.0 * @since v2.0
*/ */
@Mapper @Component
public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecificIndex> { public interface EquipmentSpecificIndexMapper extends BaseMapper<EquipmentSpecificIndex> {
List<EquipmentIndexVO> getEquipSpecificIndexList(EquipmentIndexVO equipmentIndexVo); List<EquipmentIndexVO> getEquipSpecificIndexList(EquipmentIndexVO equipmentIndexVo);
......
package com.yeejoin.equip.mapper.tdengine;
import com.yeejoin.equip.entity.IndicatorData;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author CuiXi
* @version 1.0
* @Description:
* @date 2021/3/11 14:30
*/
@Component
public interface IndicatorDataMapper {
int insert(IndicatorData indicatorData);
int batchInsert(List<IndicatorData> indicatorDataList);
void createDB();
void createTable();
}
package com.yeejoin.equip.service; package com.yeejoin.equip.service;
import java.util.Map;
/** /**
* @author LiuLin * @author LiuLin
* @date 2023年07月12日 10:44 * @date 2023年07月12日 10:44
*/ */
public interface KafkaMessageService { public interface KafkaMessageService {
void init();
void handlerMessage(String message, Map<Object, Object> equipmentIndexVOMap);
} }
package com.yeejoin.equip.service.impl; package com.yeejoin.equip.service.impl;
import com.alibaba.fastjson.JSON; import com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.equip.entity.EquipmentIndexVO;
import com.yeejoin.equip.service.KafkaMessageService; import com.yeejoin.equip.service.KafkaMessageService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils; import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @author LiuLin * @author LiuLin
...@@ -27,117 +15,14 @@ import java.util.concurrent.TimeUnit; ...@@ -27,117 +15,14 @@ import java.util.concurrent.TimeUnit;
*/ */
@Slf4j @Slf4j
@Service @Service
@Transactional(transactionManager = "tdEngineTransactionManager")
public class KafkaMessageServiceImpl implements KafkaMessageService { public class KafkaMessageServiceImpl implements KafkaMessageService {
private Executor dataExecutor = new ThreadPoolTaskExecutor();
@Autowired @Autowired
private InfluxDbConnection influxDbConnection; private IndicatorDataMapper indicatorDataMapper;
@Value("${kafka.alarm.topic}")
private String alarmTopic;
//iot转发实时消息存入influxdb前缀
private static final String MEASUREMENT = "iot_data_";
//装备更新最新消息存入influxdb前缀
private static final String INDICATORS = "indicators_";
//装备更新最新消息存入influxdb固定时间
private static final Long TIME = 1688558007051L;
@Override
public void handlerMessage(String message, Map<Object, Object> equipmentIndexVOMap) {
dataExecutor.execute(new Runnable() {
@Override
public void run() {
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
log.info("接收到iot消息: 指标名称:{},地址:{},值:{},网关{}",
equipmentSpeIndex.getEquipmentIndexName(),indexAddress,value,gatewayId);
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("equipmentsIdx", key);
tagsMap.put("address", indexAddress);
tagsMap.put("gatewayId", gatewayId);
tagsMap.put("dataType", dataType);
tagsMap.put("isAlarm", String.valueOf(equipmentSpeIndex.getIsAlarm()));
tagsMap.put("equipmentSpecificName", equipmentSpeIndex.getEquipmentSpecificName());
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert(MEASUREMENT + gatewayId, tagsMap, fieldsMap);
influxDbConnection.insert(INDICATORS + gatewayId, tagsMap, fieldsMap, TIME, TimeUnit.MILLISECONDS);
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
fieldsMap.putAll(tagsMap);
//kafkaProducerService.sendMessageAsync(alarmTopic,JSON.toJSONString(fieldsMap));
}
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
}
}
});
}
private String valueTranslate(String value, String enumStr) {
if (ObjectUtils.isEmpty(enumStr)) {
return "";
}
try {
JSONArray jsonArray = JSONArray.parseArray(enumStr);
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
if (jsonObject.get("key").equals(value)) {
return jsonObject.getString("label");
}
}
} catch (Exception e) {
e.printStackTrace();
}
return "";
}
@PostConstruct public void init() {
public void iotAsyncExecutor() { indicatorDataMapper.createDB();
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor(); indicatorDataMapper.createTable();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
} }
} }
package com.yeejoin.equip.utils; package com.yeejoin.equip.utils;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.client.RestClients;
import java.time.Duration; import java.time.Duration;
/**
* \* Created with IntelliJ IDEA.
* \* User: 煦仔
* \* Date: 2020-12-22
* \* Time: 9:40
* \* To change this template use File | Settings | File Templates.
* \* Description:
* \
*/
@Configuration @Configuration
public class ElasticSearchConfig { public class ElasticSearchConfig {
@Value("${spring.elasticsearch.rest.uris}")
private String uris;
@Value("${elasticsearch.username}") @Autowired(required = false)
private String username; private ElasticSearchRuntimeEnvironment esRuntimeEnvironment;
@Value("${elasticsearch.password}")
private String password;
public static final RequestOptions COMMON_OPTIONS; //@Bean
static { ////当前es相关的配置存在则实例化RestHighLevelClient,如果不存在则不实例化RestHighLevelClient
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); //@ConditionalOnBean(value = ElasticSearchRuntimeEnvironment.class)
COMMON_OPTIONS = builder.build(); //public RestHighLevelClient restHighLevelClient(){
} //
// //es地址,以逗号分隔
// String nodes = esRuntimeEnvironment.getAddress();
// nodes = nodes.contains("http://") ? nodes.replace("http://","") : nodes;
// //es密码
// String password = esRuntimeEnvironment.getPassword();
// String scheme = esRuntimeEnvironment.getScheme();
// List<HttpHost> httpHostList = new ArrayList<>();
// //拆分es地址
// for(String address : nodes.split(",")){
// int index = address.lastIndexOf(":");
// httpHostList.add(new HttpHost(address.substring(0, index),Integer.parseInt(address.substring(index + 1)),scheme));
// }
// //转换成 HttpHost 数组
// HttpHost[] httpHosts = httpHostList.toArray(new HttpHost[0]);
//
// //构建连接对象
// RestClientBuilder builder = RestClient.builder(httpHosts);
//
// //使用账号密码连接
// if ( StringUtils.isNotEmpty(password)){
// String username = esRuntimeEnvironment.getUsername();
// CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(username,password));
// builder.setHttpClientConfigCallback(f->f.setDefaultCredentialsProvider(credentialsProvider));
// }
//
// // 异步连接延时配置
// builder.setRequestConfigCallback(requestConfigBuilder -> {
// requestConfigBuilder.setConnectTimeout(esRuntimeEnvironment.getConnectTimeout());
// requestConfigBuilder.setSocketTimeout(esRuntimeEnvironment.getSocketTimeout());
// requestConfigBuilder.setConnectionRequestTimeout(esRuntimeEnvironment.getConnectionRequestTimeout());
// return requestConfigBuilder;
// });
//
// // 异步连接数配置
// builder.setHttpClientConfigCallback(httpClientBuilder -> {
// httpClientBuilder.setMaxConnTotal(esRuntimeEnvironment.getMaxConnectNum());
// httpClientBuilder.setMaxConnPerRoute(esRuntimeEnvironment.getMaxConnectPerRoute());
// return httpClientBuilder;
// });
//
// return new RestHighLevelClient(builder);
//}
@Bean @Bean
public RestHighLevelClient client() { public RestHighLevelClient client() {
ClientConfiguration clientConfiguration = null; ClientConfiguration clientConfiguration = null;
try { try {
clientConfiguration = ClientConfiguration.builder() clientConfiguration = ClientConfiguration.builder()
.connectedTo(uris) .connectedTo(esRuntimeEnvironment.getAddress())
.withConnectTimeout(Duration.ofSeconds(5)) .withConnectTimeout(Duration.ofSeconds(5))
.withSocketTimeout(Duration.ofSeconds(3)) .withSocketTimeout(Duration.ofSeconds(3))
.withBasicAuth(username, password) .withSocketTimeout(esRuntimeEnvironment.getSocketTimeout())
.withConnectTimeout(esRuntimeEnvironment.getConnectTimeout())
.withBasicAuth(esRuntimeEnvironment.getUsername(), esRuntimeEnvironment.getPassword())
.build(); .build();
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
......
package com.yeejoin.equip.utils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* \* Created with IntelliJ IDEA.
* \* User: 煦仔
* \* Date: 2020-12-22
* \* Time: 11:01
* \* To change this template use File | Settings | File Templates.
* \* Description: ElasticSearch 配置
* \
*/
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Component
@ConfigurationProperties(prefix = "elasticsearch")
@ConditionalOnProperty("elasticsearch.address")
public class ElasticSearchRuntimeEnvironment {
/**
* es连接地址,如果有多个用,隔开
*/
private String address;
/**
* es用户名
*/
private String username;
/**
* es密码
*/
private String password;
/**
* 协议
*/
private String scheme;
/**
* 连接超时时间
*/
private int connectTimeout;
/**
* Socket 连接超时时间
*/
private int socketTimeout;
/**
* 获取连接的超时时间
*/
private int connectionRequestTimeout;
/**
* 最大连接数
*/
private int maxConnectNum;
/**
* 最大路由连接数
*/
private int maxConnectPerRoute;
}
\ No newline at end of file
package com.yeejoin.equip.utils; package com.yeejoin.equip.utils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
...@@ -29,35 +27,17 @@ public class ElasticSearchUtil { ...@@ -29,35 +27,17 @@ public class ElasticSearchUtil {
* @param indexName 索引名称 * @param indexName 索引名称
* @param id 主键 * @param id 主键
* @param paramJson 参数JSON * @param paramJson 参数JSON
* @return
*/ */
public boolean updateData(String indexName, String id, String paramJson) { public void updateData(String indexName, String id, String paramJson) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id); UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增 //如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(true); updateRequest.docAsUpsert(false);
//立即刷新数据
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON); updateRequest.doc(paramJson, XContentType.JSON);
try { try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
//log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
//新增
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
//修改
//log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
//无变化
log.info("索引:[{}],主键:[{}]无变化", indexName, id);
return true;
}
} catch (IOException e) { } catch (IOException e) {
log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e); log.error("索引:[{}],主键:【{}】,更新异常:[{}]", indexName, id, e.getMessage());
return false;
} }
return false;
} }
} }
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver #spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://172.16.11.201:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai #spring.datasource.url = jdbc:mysql://139.9.173.44:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
spring.datasource.username=root #spring.datasource.username=root
spring.datasource.password=Yeejoin@2020 #spring.datasource.password=Yeejoin@2020
spring.datasource.type=com.zaxxer.hikari.HikariDataSource #spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.pool-name=DatebookHikariCP #spring.datasource.hikari.pool-name=DatebookHikariCP
spring.datasource.hikari.minimum-idle= 3 #spring.datasource.hikari.minimum-idle= 3
spring.datasource.hikari.maximum-pool-size= 30 #spring.datasource.hikari.maximum-pool-size= 30
spring.datasource.hikari.auto-commit= true #spring.datasource.hikari.auto-commit= true
spring.datasource.hikari.idle-timeout= 500000 #spring.datasource.hikari.idle-timeout= 500000
spring.datasource.hikari.max-lifetime= 1800000 #spring.datasource.hikari.max-lifetime= 1800000
spring.datasource.hikari.connection-timeout= 60000 #spring.datasource.hikari.connection-timeout= 60000
spring.datasource.hikari.connection-test-query= SELECT 1 #spring.datasource.hikari.connection-test-query= SELECT 1
spring.redis.database=1 spring.redis.database=1
spring.redis.host=172.16.11.201 spring.redis.host=172.16.11.201
...@@ -33,20 +33,15 @@ eureka.instance.metadata-map.management.api-docs=http://localhost:${server.port} ...@@ -33,20 +33,15 @@ eureka.instance.metadata-map.management.api-docs=http://localhost:${server.port}
eureka.instance.hostname= 172.16.11.201 eureka.instance.hostname= 172.16.11.201
eureka.instance.prefer-ip-address = true eureka.instance.prefer-ip-address = true
eureka.client.serviceUrl.defaultZone=http://${spring.security.user.name}:${spring.security.user.password}@172.16.10.220:10001/eureka/ eureka.client.serviceUrl.defaultZone=http://${spring.security.user.name}:${spring.security.user.password}@172.16.10.220:10001/eureka/
spring.security.user.name=admin spring.security.user.name=admin
spring.security.user.password=a1234560 spring.security.user.password=a1234560
## emqx ## emqx
emqx.clean-session=true emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]} emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.11.201:1883 emqx.broker=tcp://172.16.3.157:1883
emqx.user-name=admin emqx.user-name=admin
emqx.password=public emqx.password=public
mqtt.scene.host=mqtt://172.16.11.201:8083/mqtt
mqtt.client.product.id=mqtt
mqtt.topic=topic_mqtt
spring.mqtt.completionTimeout=3000
# influxDB # influxDB
spring.influx.url=http://172.16.11.201:8086 spring.influx.url=http://172.16.11.201:8086
...@@ -59,29 +54,35 @@ spring.influx.actions=10000 ...@@ -59,29 +54,35 @@ spring.influx.actions=10000
spring.influx.bufferLimit=20000 spring.influx.bufferLimit=20000
#kafka #kafka
spring.kafka.bootstrap-servers=172.16.10.215:9092 spring.kafka.bootstrap-servers=172.16.3.157:9092
spring.kafka.producer.retries=1 spring.kafka.producer.retries=1
spring.kafka.producer.bootstrap-servers=172.16.10.215:9092 spring.kafka.producer.bootstrap-servers=172.16.3.157:9092
spring.kafka.producer.batch-size=16384 spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=1 spring.kafka.producer.acks=1
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=messageConsumerGroup spring.kafka.consumer.group-id=messageConsumerGroup
spring.kafka.consumer.bootstrap-servers=172.16.10.215:9092 spring.kafka.consumer.bootstrap-servers=172.16.3.157:9092
spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.fetch-max-wait= 1000
spring.kafka.consumer.max-poll-records=1000
spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.type=batch spring.kafka.listener.type=batch
kafka.topic=PERSPECTIVE kafka.topic=PERSPECTIVE
emq.topic=iot/data/perspective emq.topic=iot/data/perspective
kafka.alarm.topic=EQUIPMENT_ALARM kafka.alarm.topic=EQUIPMENT_ALARM
## ES properties: elasticsearch.address= 139.9.173.44:9200
biz.elasticsearch.address=139.9.173.44
spring.elasticsearch.rest.uris=${biz.elasticsearch.address}:9200
elasticsearch.username= elastic elasticsearch.username= elastic
elasticsearch.password= Yeejoin@2020 elasticsearch.password= Yeejoin@2020
elasticsearch.scheme= http
elasticsearch.connectTimeout= 5000
elasticsearch.socketTimeout= 5000
elasticsearch.connectionRequestTimeout= 5000
elasticsearch.maxConnectNum= 100
elasticsearch.maxConnectPerRoute= 100
\ No newline at end of file
spring:
datasource:
mysql-server:
driver-class-name: com.mysql.cj.jdbc.Driver
password: Yeejoin@2020
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: jdbc:mysql://139.9.173.44:3306/equipment?useUnicode=true&allowMultiQueries=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai
username: root
hikari:
auto-commit: true
pool-name: DatebookHikariCP
connection-test-query: SELECT 1
connection-timeout: 60000
idle-timeout: 500000
max-lifetime: 1800000
maximum-pool-size: 30
minimum-idle: 3
tdengine-server:
driver-class-name: com.taosdata.jdbc.rs.RestfulDriver
jdbc-url: jdbc:TAOS-RS://172.16.3.157:6041/test?user=root&password=taosdata&timezone=GMT%2b8
username: root
password: taosdata
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yeejoin.equip.mapper.EquipmentSpecificIndexMapper"> <mapper namespace="com.yeejoin.equip.mapper.mysql.EquipmentSpecificIndexMapper">
<select id="getEquipSpecificIndexList" resultType="com.yeejoin.equip.entity.EquipmentIndexVO"> <resultMap id="ComplementCode" type="com.yeejoin.equip.entity.EquipmentIndexVO">
<result property="equipmentId" column="equipment_specific_id"/>
<result property="id" column="id"/>
<result property="nameKey" column="name_key"/>
<result property="value" column="value"/>
<result property="valueEnum" column="value_enum"/>
<result property="unitName" column="unit"/>
<result property="indexAddress" column="index_address"/>
<result property="gatewayId" column="gateway_id"/>
<result property="isAlarm" column="is_alarm"/>
<result property="equipmentIndexName" column="equipment_index_name"/>
<result property="equipmentSpecificName" column="equipment_specific_name"/>
<result property="dataType" column="data_type"/>
</resultMap>
<select id="getEquipSpecificIndexList" resultMap="ComplementCode">
SELECT SELECT
si.equipment_specific_id AS equipmentId, si.equipment_specific_id AS equipmentId,
ei.id, ei.id,
ei.name_key, ei.name_key,
ei.name AS perfQuotaName,
si.value, si.value,
ei.is_iot, ei.is_iot,
si.unit AS unitName,
ei.sort_num,
si.create_date,
si.update_date,
si.index_address, si.index_address,
si.gateway_id, si.gateway_id,
si.data_type, si.data_type,
si.equipment_specific_name, si.equipment_specific_name,
si.equipment_index_name, si.equipment_index_name,
si.is_alarm, si.is_alarm
si.value_enum AS valueEnum
FROM FROM
wl_equipment_specific_index si wl_equipment_specific_index si
LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.yeejoin.equip.mapper.tdengine.IndicatorDataMapper">
<insert id="insert" parameterType="com.yeejoin.equip.entity.IndicatorData" >
insert into test.indicator_data_1
(address, gateway_Id, equipments_idx, data_type,is_alarm,equipment_specific_name,equipment_index_name,
value_label,value1,unit,created_time)
values
(#{address,jdbcType=VARCHAR},
#{gatewayId,jdbcType=VARCHAR},
#{equipmentsIdx,jdbcType=VARCHAR},
#{dataType,jdbcType=VARCHAR},
#{isAlarm,jdbcType=VARCHAR},
#{equipmentSpecificName,jdbcType=VARCHAR},
#{equipmentIndexName,jdbcType=VARCHAR},
#{valueLabel,jdbcType=VARCHAR},
#{value,jdbcType=VARCHAR},
#{unit,jdbcType=VARCHAR},
now)
</insert>
<update id="createDB" >
create database if not exists test keep 730;
</update>
<update id="createTable" >
create table if not exists test.indicator_data(created_time timestamp, address NCHAR(64),gateway_id NCHAR(64),equipments_idx NCHAR(64), data_type NCHAR(12),is_alarm BIGINT,
value_label VARCHAR(24), unit NCHAR(12),equipment_index_name VARCHAR(200) ,equipment_specific_name VARCHAR(200),valueE VARCHAR(12));
</update>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment