Commit 3df46798 authored by caotao's avatar caotao

1.数据采集服务新增代码注释。

parent 43aba8c7
...@@ -5,5 +5,8 @@ import org.springframework.scheduling.annotation.Async; ...@@ -5,5 +5,8 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
public interface DasService { public interface DasService {
/**
* 数据固化
*/
void dataSolidification(); void dataSolidification();
} }
...@@ -35,36 +35,62 @@ public class DasServiceImpl implements DasService { ...@@ -35,36 +35,62 @@ public class DasServiceImpl implements DasService {
@Scheduled(cron = "0 */10 * * * ?") @Scheduled(cron = "0 */10 * * * ?")
@Override @Override
/**
* 数据固化方法。该方法用于将数据从物联监盘同步到TDengine数据库。
* 这个过程首先会创建一个新的表,然后并行处理每个网关ID的数据固化过程。
* 完成后,会记录此次操作所花费的时间。
*/
public void dataSolidification() { public void dataSolidification() {
// 记录操作开始时间
Long startTime = System.currentTimeMillis(); Long startTime = System.currentTimeMillis();
// 创建新表
indicatorDataMapper.createTable(); indicatorDataMapper.createTable();
// 获取所有网关ID
List<String> gateWayIds = frontGatewayDevicePointsMapper.getGatewayIds(); List<String> gateWayIds = frontGatewayDevicePointsMapper.getGatewayIds();
// 并行处理每个网关ID的数据凝固
gateWayIds.parallelStream().forEach(gatewayId -> { gateWayIds.parallelStream().forEach(gatewayId -> {
dataSolidificationByGatewayId(gatewayId); dataSolidificationByGatewayId(gatewayId);
}); });
// 记录操作结束时间
Long endTime = System.currentTimeMillis(); Long endTime = System.currentTimeMillis();
log.info("同步ES数据至TDengine耗时:" + (endTime - startTime) + "ms"); // 记录操作耗时日志
log.info("同步物联监盘数据至TDengine耗时:" + (endTime - startTime) + "ms");
} }
/**
* 根据网关ID进行数据凝固操作。
* 该操作首先查询指定网关ID的设备点信息,然后基于这些信息从TDengine数据库中获取相应的数据点值,并将这些值
* 组装成新的数据模型存储到数据库中。此外,该操作还会向EMQX发送一条消息,通知数据同步成功。
*
* @param gatewayId 网关的唯一标识符,用于查询相关设备点信息和数据点值。
*/
@Async("jxiopAsyncExecutor") @Async("jxiopAsyncExecutor")
public void dataSolidificationByGatewayId(String gatewayId) { public void dataSolidificationByGatewayId(String gatewayId) {
// 根据网关ID查询设备点信息
List<FrontGatewayDevicePoints> tempPoints = frontGatewayDevicePointsMapper.getFrontGatewayDevicePointsByGatewayId(gatewayId); List<FrontGatewayDevicePoints> tempPoints = frontGatewayDevicePointsMapper.getFrontGatewayDevicePointsByGatewayId(gatewayId);
if (!ObjectUtils.isEmpty(tempPoints)) { if (!ObjectUtils.isEmpty(tempPoints)) {
// 检查在TDengine中是否存在对应的表
Long tableCount = tdengineIotDataMapper.getTtableCount("stb_" + gatewayId); Long tableCount = tdengineIotDataMapper.getTtableCount("stb_" + gatewayId);
if (!(tableCount > 0)) { if (!(tableCount > 0)) {
return; return; // 如果表不存在,则直接返回
} }
// 从TDengine中查询数据点值
List<StbDtoData> stbDtoDataList = tdengineIotDataMapper.getStbDtoDataByStbName("stb_" + gatewayId); List<StbDtoData> stbDtoDataList = tdengineIotDataMapper.getStbDtoDataByStbName("stb_" + gatewayId);
// 将查询到的数据点值转换为Map形式存储
Map<String, String> stbMap = stbDtoDataList.parallelStream().collect(Collectors.toMap(StbDtoData::getPointSeq, StbDtoData::getValue)); Map<String, String> stbMap = stbDtoDataList.parallelStream().collect(Collectors.toMap(StbDtoData::getPointSeq, StbDtoData::getValue));
if (stbMap.size() > 0) { if (stbMap.size() > 0) {
// 遍历设备点信息,将每个设备点与从TDengine获取的值匹配,并构建新的数据模型
List<IndicatorData> listAll = new ArrayList<>(); List<IndicatorData> listAll = new ArrayList<>();
tempPoints.stream().forEach(point -> { tempPoints.stream().forEach(point -> {
IndicatorData indicatorData = new IndicatorData(); IndicatorData indicatorData = new IndicatorData();
// 设置数据模型的各项属性
indicatorData.setDataType(point.getDataType()); indicatorData.setDataType(point.getDataType());
indicatorData.setPointSeq(point.getSequenceNbr().toString()); indicatorData.setPointSeq(point.getSequenceNbr().toString());
indicatorData.setPointAddress(point.getPointAddress()); indicatorData.setPointAddress(point.getPointAddress());
indicatorData.setPointLocation(point.getPointLocation()); indicatorData.setPointLocation(point.getPointLocation());
indicatorData.setPointName(point.getPointName()); indicatorData.setPointName(point.getPointName());
// 设置数据点的值,如果是布尔值则进行转换
indicatorData.setValue(stbMap.get(point.getSequenceNbr().toString())); indicatorData.setValue(stbMap.get(point.getSequenceNbr().toString()));
if (!ObjectUtils.isEmpty(indicatorData.getValue()) && !booleans.contains(indicatorData.getValue())) { if (!ObjectUtils.isEmpty(indicatorData.getValue()) && !booleans.contains(indicatorData.getValue())) {
try { try {
...@@ -72,16 +98,17 @@ public class DasServiceImpl implements DasService { ...@@ -72,16 +98,17 @@ public class DasServiceImpl implements DasService {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
listAll.add(indicatorData); listAll.add(indicatorData);
}); });
// 批量插入构建的数据模型到数据库
Lists.partition(listAll, 1000).stream().forEach( Lists.partition(listAll, 1000).stream().forEach(
list -> { list -> {
indicatorDataMapper.insertBatch(list, gatewayId); indicatorDataMapper.insertBatch(list, gatewayId);
} }
); );
// 向EMQX发送消息,通知数据同步成功
try { try {
HashMap<String, String> syncFlag = new HashMap<>(); HashMap<String, String> syncFlag = new HashMap<>();
syncFlag.put("gateway_id", gatewayId); syncFlag.put("gateway_id", gatewayId);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment