Commit 55fd3d3d authored by caotao's avatar caotao

智能分析修改为按照网关id同步消息

parent 81436ddf
...@@ -3,6 +3,7 @@ package com.yeejoin.amos; ...@@ -3,6 +3,7 @@ package com.yeejoin.amos;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure;
import com.yeejoin.amos.boot.biz.common.utils.oConvertUtils; import com.yeejoin.amos.boot.biz.common.utils.oConvertUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.listener.SyncDasSuccessMqttListener;
import com.yeejoin.amos.boot.module.jxiop.biz.listener.SyncESDataToTdengineMqttListener; import com.yeejoin.amos.boot.module.jxiop.biz.listener.SyncESDataToTdengineMqttListener;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -62,6 +63,8 @@ public class AmosJxiopAnalyseApplication { ...@@ -62,6 +63,8 @@ public class AmosJxiopAnalyseApplication {
Boolean openHealth; Boolean openHealth;
@Autowired @Autowired
private SyncESDataToTdengineMqttListener syncESDataToTdengineMqttListener; private SyncESDataToTdengineMqttListener syncESDataToTdengineMqttListener;
@Autowired
private SyncDasSuccessMqttListener syncDasSuccessMqttListener;
private static final Logger logger = LoggerFactory.getLogger(AmosJxiopAnalyseApplication.class); private static final Logger logger = LoggerFactory.getLogger(AmosJxiopAnalyseApplication.class);
...@@ -82,6 +85,8 @@ public class AmosJxiopAnalyseApplication { ...@@ -82,6 +85,8 @@ public class AmosJxiopAnalyseApplication {
if (openHealth) { if (openHealth) {
//订阅固化周期性数据成功的消息 //订阅固化周期性数据成功的消息
emqKeeper.subscript("sync_esdata_to_tdengine_notice", 1, syncESDataToTdengineMqttListener); emqKeeper.subscript("sync_esdata_to_tdengine_notice", 1, syncESDataToTdengineMqttListener);
//订阅业务固化同步数据成功消息
emqKeeper.subscript("sync_iotdata_to_tdengine_notice", 1, syncDasSuccessMqttListener);
} }
} }
} }
...@@ -12,8 +12,6 @@ import java.time.LocalDateTime; ...@@ -12,8 +12,6 @@ import java.time.LocalDateTime;
import java.util.Date; import java.util.Date;
/** /**
*
*
* @author system_generator * @author system_generator
* @date 2023-08-15 * @date 2023-08-15
*/ */
...@@ -21,107 +19,109 @@ import java.util.Date; ...@@ -21,107 +19,109 @@ import java.util.Date;
@Accessors(chain = true) @Accessors(chain = true)
@TableName("idx_biz_fan_warning_rule_set") @TableName("idx_biz_fan_warning_rule_set")
public class IdxBizFanWarningRuleSet{ public class IdxBizFanWarningRuleSet {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/** /**
* *
*/ */
@TableId("SEQUENCE_NBR") @TableId("SEQUENCE_NBR")
private String sequenceNbr; private String sequenceNbr;
/** /**
* *
*/ */
@TableField("RECORD") @TableField("RECORD")
private String record; private String record;
/** /**
* *
*/ */
@TableField("REC_DATE") @TableField("REC_DATE")
private Date recDate; private Date recDate;
/** /**
* *
*/ */
@TableField("REC_USER_ID") @TableField("REC_USER_ID")
private String recUserId; private String recUserId;
/** /**
* *
*/ */
@TableField("INSTANCE_ID") @TableField("INSTANCE_ID")
private String instanceId; private String instanceId;
/** /**
* *
*/ */
@TableField("STATUS") @TableField("STATUS")
private String status; private String status;
/** /**
* 预警判断条件 * 预警判断条件
*/ */
@TableField("WARNING_IF") @TableField("WARNING_IF")
private String warningIf; private String warningIf;
/** /**
* 预警判断连续周期 * 预警判断连续周期
*/ */
@TableField("WARNING_CYCLE") @TableField("WARNING_CYCLE")
private String warningCycle; private String warningCycle;
/** /**
* 预警名称(危险、警告、注意) * 预警名称(危险、警告、注意)
*/ */
@TableField("WARNING_NAME") @TableField("WARNING_NAME")
private String warningName; private String warningName;
/** /**
* *
*/ */
@TableField("ANALYSIS_POINT_ID") @TableField("ANALYSIS_POINT_ID")
private String analysisPointId; private String analysisPointId;
/** /**
* 分析周期((按天、10min、小时) * 分析周期((按天、10min、小时)
*/ */
@TableField("ANALYSIS_TYPE") @TableField("ANALYSIS_TYPE")
private String analysisType; private String analysisType;
/** /**
* 片区 * 片区
*/ */
@TableField("ARAE") @TableField("ARAE")
private String arae; private String arae;
/** /**
* 场站 * 场站
*/ */
@TableField("STATION") @TableField("STATION")
private String station; private String station;
/** /**
* 子系统 * 子系统
*/ */
@TableField("SUB_SYSTEM") @TableField("SUB_SYSTEM")
private String subSystem; private String subSystem;
/** /**
* 型号 * 型号
*/ */
@TableField("NUMBER") @TableField("NUMBER")
private String number; private String number;
/** /**
* 设备名称 * 设备名称
*/ */
@TableField("EQUIPMENT_NAME") @TableField("EQUIPMENT_NAME")
private String equipmentName; private String equipmentName;
@TableField("POINT_NAME") @TableField("POINT_NAME")
private String pointName; private String pointName;
@TableField("GATEWAY_ID")
private String gatewayId;
} }
...@@ -133,4 +133,11 @@ public class IdxBizPvWarningRuleSet{ ...@@ -133,4 +133,11 @@ public class IdxBizPvWarningRuleSet{
*/ */
@TableField("POINT_NAME") @TableField("POINT_NAME")
private String pointName; private String pointName;
/**
* 分析变量
*/
@TableField("GATEWAY_ID")
private String gatewayId;
} }
package com.yeejoin.amos.boot.module.jxiop.biz.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.CommonServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqxListener;
/**
* @author Administrator
*/
@Component
@Slf4j
public class SyncDasSuccessMqttListener extends EmqxListener {
@Autowired
CommonServiceImpl commonServiceImpl;
@Override
public void processMessage(String topic, MqttMessage mqttMessage) {
log.info(topic + "收到数据同步成功,开始计算健康指数!");
byte[] payload = mqttMessage.getPayload();
String str = new String(payload);
String msg = JSON.parse(str).toString();
JSONObject jsonObject = JSONObject.parseObject(msg);
String flag = jsonObject.get("sync_flag").toString();
String gatewayId = jsonObject.get("gatewayId").toString();
if ("success".equals(flag)&&jsonObject.containsKey("gatewayId")){
//开始异步计算光伏的健康指数算法
new Thread(()->{
//调用光伏的健康指数算法
commonServiceImpl.healthWarningMinuteByPv(gatewayId);
}).start();
//开始异步计算风机的健康指数算法
new Thread(()->{
//调用风机的健康指数算法
commonServiceImpl.healthWarningMinuteByFan(gatewayId);
}).start();
}
}
}
...@@ -16,6 +16,7 @@ import java.util.Map; ...@@ -16,6 +16,7 @@ import java.util.Map;
*/ */
public interface IdxBizFanPointProcessVariableClassificationMapper extends BaseMapper<IdxBizFanPointProcessVariableClassification> { public interface IdxBizFanPointProcessVariableClassificationMapper extends BaseMapper<IdxBizFanPointProcessVariableClassification> {
List<IdxBizFanPointProcessVariableClassificationDto> getInfluxDBData(); List<IdxBizFanPointProcessVariableClassificationDto> getInfluxDBData();
List<IdxBizFanPointProcessVariableClassificationDto> getInfluxDBDataByGatewayId(String gatewayId);
List<Map<String,Object>> selectParams(String tableName,String column,String isFx); List<Map<String,Object>> selectParams(String tableName,String column,String isFx);
......
...@@ -21,6 +21,9 @@ public interface IdxBizFanWarningRuleSetMapper extends BaseMapper<IdxBizFanWarni ...@@ -21,6 +21,9 @@ public interface IdxBizFanWarningRuleSetMapper extends BaseMapper<IdxBizFanWarni
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按10分钟'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按10分钟'")
Integer getMaxWaringCycleOfMinutes(); Integer getMaxWaringCycleOfMinutes();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按10分钟' and GATEWAY_ID = #{gatewayId}")
Integer getMaxWaringCycleOfMinutesByGatewayId(String gatewayId);
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'")
Integer getMaxWaringCycleOfHour(); Integer getMaxWaringCycleOfHour();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按天'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_fan_warning_rule_set WHERE ANALYSIS_TYPE = '按天'")
......
...@@ -17,5 +17,6 @@ public interface IdxBizPvPointProcessVariableClassificationMapper extends BaseMa ...@@ -17,5 +17,6 @@ public interface IdxBizPvPointProcessVariableClassificationMapper extends BaseMa
List<String> gateWayIdListPv(); List<String> gateWayIdListPv();
List<IdxBizPvPointProcessVariableClassificationDto> getInfluxDBData(); List<IdxBizPvPointProcessVariableClassificationDto> getInfluxDBData();
List<IdxBizPvPointProcessVariableClassificationDto> getInfluxDBDataByGatewayId(String gatewayId);
} }
...@@ -16,6 +16,8 @@ public interface IdxBizPvWarningRuleSetMapper extends BaseMapper<IdxBizPvWarning ...@@ -16,6 +16,8 @@ public interface IdxBizPvWarningRuleSetMapper extends BaseMapper<IdxBizPvWarning
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按10分钟'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按10分钟'")
Integer getMaxWaringCycleOfMinutes(); Integer getMaxWaringCycleOfMinutes();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按10分钟' and GATEWAY_ID = #{gateWayId}")
Integer getMaxWaringCycleOfMinutesByGatewayId(String gateWayId);
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按小时'")
Integer getMaxWaringCycleOfHour(); Integer getMaxWaringCycleOfHour();
@Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按天'") @Select("SELECT MAX(WARNING_CYCLE) FROM idx_biz_pv_warning_rule_set WHERE ANALYSIS_TYPE = '按天'")
......
...@@ -53,6 +53,8 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> { ...@@ -53,6 +53,8 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> {
@Select("select `value`,`value_f`, address, gateway_id from iot_data.indicator_data where ts > NOW()-10m and gateway_id = #{gatewayId}") @Select("select `value`,`value_f`, address, gateway_id from iot_data.indicator_data where ts > NOW()-10m and gateway_id = #{gatewayId}")
List<IndicatorData> selectDataByGatewayIdAndAddress(@Param("gatewayId") String gatewayId); List<IndicatorData> selectDataByGatewayIdAndAddress(@Param("gatewayId") String gatewayId);
@Select("select `value`,`value_f`, address, gateway_id from analysis_data.indicator_data where ts > NOW()-10m and gateway_id = #{gatewayId} and point_type = 'SENSOR'")
List<IndicatorData> selectDataByGatewayId(@Param("gatewayId") String gatewayId);
@Select("select created_time,`value`,`value_f`, address, gateway_id from iot_data.indicator_data where gateway_id = #{gatewayId} and `address` in ( ${addresses} ) and ts >= #{startTime} and ts <= #{endTime}") @Select("select created_time,`value`,`value_f`, address, gateway_id from iot_data.indicator_data where gateway_id = #{gatewayId} and `address` in ( ${addresses} ) and ts >= #{startTime} and ts <= #{endTime}")
List<IndicatorData> selectDataByGatewayIdAndAddressForAlarmInfoDetail(@Param("gatewayId") String gatewayId,@Param("addresses") String addresses,@Param("startTime") String startTime, @Param("endTime") String endTime); List<IndicatorData> selectDataByGatewayIdAndAddressForAlarmInfoDetail(@Param("gatewayId") String gatewayId,@Param("addresses") String addresses,@Param("startTime") String startTime, @Param("endTime") String endTime);
......
...@@ -83,7 +83,89 @@ ...@@ -83,7 +83,89 @@
GROUP BY GROUP BY
b.SEQUENCE_NBR b.SEQUENCE_NBR
</select> </select>
<select id="getInfluxDBDataByGatewayId" resultType="com.yeejoin.amos.boot.module.jxiop.biz.dto.IdxBizFanPointProcessVariableClassificationDto">
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_fan_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT PROCESS_POINT1_ID FROM `idx_biz_fan_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_fan_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.PROCESS_POINT1_ID
GROUP BY
b.SEQUENCE_NBR UNION ALL
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_fan_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT PROCESS_POINT2_ID FROM `idx_biz_fan_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_fan_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.PROCESS_POINT2_ID
GROUP BY
b.SEQUENCE_NBR UNION ALL
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_fan_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT PROCESS_POINT3_ID FROM `idx_biz_fan_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_fan_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.PROCESS_POINT3_ID
GROUP BY
b.SEQUENCE_NBR UNION ALL
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_fan_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT ANALYSIS_POINT_ID FROM `idx_biz_fan_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_fan_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.ANALYSIS_POINT_ID
and b.GATEWAY_ID = #{gatewayId}
GROUP BY
b.SEQUENCE_NBR
</select>
<select id="gateWayIdListFan" resultType="java.lang.String"> <select id="gateWayIdListFan" resultType="java.lang.String">
select GATEWAY_ID from idx_biz_fan_point_process_variable_classification group by GATEWAY_ID; select GATEWAY_ID from idx_biz_fan_point_process_variable_classification group by GATEWAY_ID;
</select> </select>
......
...@@ -88,4 +88,89 @@ ...@@ -88,4 +88,89 @@
GROUP BY GROUP BY
b.SEQUENCE_NBR b.SEQUENCE_NBR
</select> </select>
<select id="getInfluxDBDataByGatewayId" resultType="com.yeejoin.amos.boot.module.jxiop.biz.dto.IdxBizPvPointProcessVariableClassificationDto">
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_pv_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT PROCESS_POINT1_ID FROM `idx_biz_pv_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_pv_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.PROCESS_POINT1_ID
GROUP BY
b.SEQUENCE_NBR UNION ALL
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_pv_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT PROCESS_POINT2_ID FROM `idx_biz_pv_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_pv_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.PROCESS_POINT2_ID
GROUP BY
b.SEQUENCE_NBR UNION ALL
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_pv_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT PROCESS_POINT3_ID FROM `idx_biz_pv_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_pv_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.PROCESS_POINT3_ID
GROUP BY
b.SEQUENCE_NBR UNION ALL
SELECT
b.*,
ibfpvcv.PROCESS_POINT1_ID AS pointOneId,
ibfpvcv.PROCESS_POINT2_ID AS pointTwoId,
ibfpvcv.PROCESS_POINT3_ID AS pointThreeId,
ibfpvcv.ANALYSIS_POINT_ID AS pointId
FROM
(
SELECT
*
FROM
idx_biz_pv_point_process_variable_classification uxfv
WHERE
uxfv.SEQUENCE_NBR IN ( SELECT ANALYSIS_POINT_ID FROM `idx_biz_pv_point_var_central_value` WHERE ANALYSIS_POINT_ID IS NOT NULL GROUP BY ANALYSIS_POINT_ID, PROCESS_POINT1_ID, PROCESS_POINT3_ID, PROCESS_POINT2_ID )
) AS b,
idx_biz_pv_point_var_central_value ibfpvcv
WHERE
b.SEQUENCE_NBR = ibfpvcv.ANALYSIS_POINT_ID
and b.GATEWAY_ID = #{gatewayId}
GROUP BY
b.SEQUENCE_NBR
</select>
</mapper> </mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment