Commit dba2135f authored by caotao's avatar caotao

数据采集服务更新

parent ea23a769
......@@ -15,22 +15,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!--
<dependency>
<groupId>com.amosframework.boot</groupId>
<artifactId>amos-boot-module-ugp-api</artifactId>
<version>${amos-biz-boot.version}</version>
</dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.amosframework.boot</groupId>-->
<!-- <artifactId>amos-boot-module-common-biz</artifactId>-->
<!-- <version>${amos-biz-boot.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>com.amosframework.boot</groupId>-->
<!-- <artifactId>amos-boot-biz-common</artifactId>-->
<!-- <version>1.0.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
......@@ -39,6 +23,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
......@@ -66,6 +51,11 @@
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
</dependencies>
<build>
......
......@@ -31,7 +31,7 @@ import java.net.InetAddress;
@EnableAsync
@EnableScheduling
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, DruidDataSourceAutoConfigure.class})
@MapperScan({"com.yeejoin.amos.boot.module.das.mapper.msyql","com.yeejoin.amos.boot.module.das.mapper.tdengineanalysis","com.yeejoin.amos.boot.module.das.mapper.tdengineiot"})
@MapperScan({"com.yeejoin.amos.boot.module.das.mapper.msyql","com.yeejoin.amos.boot.module.das.mapper.analysis","com.yeejoin.amos.boot.module.das.mapper.iot"})
@ComponentScan({"springfox.documentation.schema", "com.yeejoin.amos.boot.module.das","com.yeejoin.amos.boot.module.das.service.impl","org.typroject.tyboot.component"})
public class AmosJxiopDasApplication {
......
......@@ -19,6 +19,7 @@ public class IndicatorData {
private String pointSeq;
private String pointAddress;
private String pointLocation;
private String pointType;
private String pointName;
private String value="0";
private Double valueF;
......
......@@ -8,6 +8,6 @@ import java.util.List;
@Component
public interface IndicatorDataMapper {
int insertBatch(@Param("list") List<IndicatorData> list, @Param("gatewayId")String gatewayId);
int insertBatch(@Param("list") List<IndicatorData> list, @Param("gatewayId")String gatewayId,@Param("dasTime")String dasTime);
void createTable();
}
......@@ -8,6 +8,6 @@ import java.util.List;
public interface FrontGatewayDevicePointsMapper extends BaseMapper<FrontGatewayDevicePoints> {
@Select("select distinct gateway_id from iot_front_gateway_device_points")
List<String> getGatewayIds();
@Select("select SEQUENCE_NBR,POINT_NAME,POINT_DATA_TYPE,POINT_ADDRESS,POINT_LOCATION from iot_front_gateway_device_points where gateway_id = #{gatewayId}")
@Select("select SEQUENCE_NBR,POINT_NAME,POINT_DATA_TYPE,POINT_ADDRESS,POINT_LOCATION,POINT_TYPE,DATA_TYPE from iot_front_gateway_device_points where gateway_id = #{gatewayId}")
List<FrontGatewayDevicePoints> getFrontGatewayDevicePointsByGatewayId(String gatewayId);
}
package com.yeejoin.amos.boot.module.das.service.impl;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
......@@ -18,6 +19,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.util.*;
import java.util.stream.Collectors;
......@@ -41,16 +43,17 @@ public class DasServiceImpl implements DasService {
* 完成后,会记录此次操作所花费的时间。
*/
public void dataSolidification() {
String dasTime = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:00");
log.info("数据采集开始执行-采集时间::" + dasTime );
// 记录操作开始时间
Long startTime = System.currentTimeMillis();
Long startTime = System.currentTimeMillis();
// 创建新表
indicatorDataMapper.createTable();
// 获取所有网关ID
List<String> gateWayIds = frontGatewayDevicePointsMapper.getGatewayIds();
// 并行处理每个网关ID的数据凝固
gateWayIds.parallelStream().forEach(gatewayId -> {
dataSolidificationByGatewayId(gatewayId);
dataSolidificationByGatewayId(gatewayId,dasTime);
});
// 记录操作结束时间
Long endTime = System.currentTimeMillis();
......@@ -66,7 +69,7 @@ public class DasServiceImpl implements DasService {
* @param gatewayId 网关的唯一标识符,用于查询相关设备点信息和数据点值。
*/
@Async("jxiopAsyncExecutor")
public void dataSolidificationByGatewayId(String gatewayId) {
public void dataSolidificationByGatewayId(String gatewayId,String dasTime) {
// 根据网关ID查询设备点信息
List<FrontGatewayDevicePoints> tempPoints = frontGatewayDevicePointsMapper.getFrontGatewayDevicePointsByGatewayId(gatewayId);
if (!ObjectUtils.isEmpty(tempPoints)) {
......@@ -90,6 +93,7 @@ public class DasServiceImpl implements DasService {
indicatorData.setPointAddress(point.getPointAddress());
indicatorData.setPointLocation(point.getPointLocation());
indicatorData.setPointName(point.getPointName());
indicatorData.setPointType(point.getPointType());
// 设置数据点的值,如果是布尔值则进行转换
indicatorData.setValue(stbMap.get(point.getSequenceNbr().toString()));
if (!ObjectUtils.isEmpty(indicatorData.getValue()) && !booleans.contains(indicatorData.getValue())) {
......@@ -104,10 +108,9 @@ public class DasServiceImpl implements DasService {
// 批量插入构建的数据模型到数据库
Lists.partition(listAll, 1000).stream().forEach(
list -> {
indicatorDataMapper.insertBatch(list, gatewayId);
indicatorDataMapper.insertBatch(list, gatewayId,dasTime);
}
);
// 向EMQX发送消息,通知数据同步成功
try {
HashMap<String, String> syncFlag = new HashMap<>();
......
......@@ -31,6 +31,8 @@ spring.redis.password=yeejoin@2020
emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://10.20.1.210:2883
emqx.client-user-name=admin
emqx.client-password=public
emqx.user-name=admin
emqx.password=public
mqtt.scene.host=mqtt://10.20.1.210:8083/mqtt
......
......@@ -8,11 +8,13 @@
<update id="createTable">
create STABLE if not exists indicator_data
(created_time timestamp,
das_time VARCHAR(100),
data_type NCHAR(12),
point_seq VARCHAR(100) ,
point_address VARCHAR(100) ,
point_location VARCHAR(500) ,
point_name VARCHAR(200),
point_type VARCHAR(50),
`value` VARCHAR(50),
`value_f` float)
TAGS (gateway_id binary(64));
......@@ -24,11 +26,13 @@
indicator_data_#{gatewayId,jdbcType=VARCHAR} USING indicator_data
TAGS (#{gatewayId,jdbcType=VARCHAR})
VALUES (NOW + #{index}a,
#{dasTime,jdbcType=VARCHAR},
#{item.dataType,jdbcType=VARCHAR},
#{item.pointSeq,jdbcType=VARCHAR},
#{item.pointAddress,jdbcType=VARCHAR},
#{item.pointLocation,jdbcType=VARCHAR},
#{item.pointName,jdbcType=VARCHAR},
#{item.pointType,jdbcType=VARCHAR},
#{item.value,jdbcType=VARCHAR},
#{item.valueF,jdbcType=FLOAT})
</foreach>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment