Commit 30ff765f authored by hezhuozhi's avatar hezhuozhi

修改kafaka消费

parent 83f15ffa
package com.yeejoin.amos.boot.module.jxiop.biz.controller;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.boot.module.jxiop.biz.Enum.SmartAnalyseEnum;
import com.yeejoin.amos.boot.module.jxiop.biz.dto.FullViewRecallDataDTO;
......@@ -172,7 +174,11 @@ public class KafkaAnalyseController {
List<String> addressInfo = idxBizFanHealthIndexMapper.getAddressInfo();
String join = String.join(",", addressInfo);
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, "1668801435891929089");
String startTime = DateUtils.convertDateToString(DateUtil.offsetDay(new Date(), -1),
DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()),
DateUtils.DATE_TIME_PATTERN);
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, "1668801435891929089",startTime,endTime);
return ResponseHelper.buildResponse(indicatorData);
}
......
......@@ -478,9 +478,13 @@ public class KafkaConsumerService {
private void buildZXZExecData(List<ConsumerRecord<String, String>> consumerRecords,
Map<String, Set<String>> gatewayPoints,
Map<String, List<IdxBizFanPointProcessVariableClassification>> zxzIds, String xgxPvConsumer) {
String startTime = DateUtils.convertDateToString(DateUtil.offsetDay(new Date(), -1),
DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()),
DateUtils.DATE_TIME_PATTERN);
for (String gatewayId : gatewayPoints.keySet()) {
String join = String.join(",", gatewayPoints.get(gatewayId));
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId);
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId,startTime,endTime);
JsonReadOptions options = JsonReadOptions.builderFromString(JSON.toJSONString(indicatorData))
.columnTypes(new Function<String, ColumnType>() {
......@@ -499,9 +503,13 @@ public class KafkaConsumerService {
private void buildExecData(List<ConsumerRecord<String, String>> consumerRecords,
Map<String, Set<String>> gatewayPoints, String xgxPvConsumer) {
String startTime = DateUtils.convertDateToString(DateUtil.offsetDay(new Date(), -1),
DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()),
DateUtils.DATE_TIME_PATTERN);
for (String gatewayId : gatewayPoints.keySet()) {
String join = String.join(",", gatewayPoints.get(gatewayId));
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId);
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId,startTime,endTime);
JsonReadOptions options = JsonReadOptions.builderFromString(JSON.toJSONString(indicatorData))
.columnTypes(new Function<String, ColumnType>() {
......@@ -816,9 +824,13 @@ public class KafkaConsumerService {
private void buildZXZPvExecData(List<ConsumerRecord<String, String>> consumerRecords,
Map<String, Set<String>> gatewayPoints,
Map<String, List<IdxBizPvPointProcessVariableClassification>> zxzIds, String xgxPvConsumer) {
String startTime = DateUtils.convertDateToString(DateUtil.offsetDay(new Date(), -1),
DateUtils.DATE_TIME_PATTERN);
String endTime = DateUtils.convertDateToString(DateUtils.getCurrentDayEndTime(new Date()),
DateUtils.DATE_TIME_PATTERN);
for (String gatewayId : gatewayPoints.keySet()) {
String join = String.join(",", gatewayPoints.get(gatewayId));
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId);
List<IndicatorData> indicatorData = indicatorDataMapper.selectByAddresses(join, gatewayId,startTime,endTime);
JsonReadOptions options = JsonReadOptions.builderFromString(JSON.toJSONString(indicatorData))
.columnTypes(new Function<String, ColumnType>() {
......
......@@ -34,8 +34,8 @@ public interface IndicatorDataMapper extends BaseMapper<IndicatorData> {
List<IndicatorData> selectDataById (@Param("id")String id);
@Select("select `id`, `value` from iot_data.indicator_data where `address` in (${addresses}) and gateway_id = #{gatewayId}")
List<IndicatorData> selectByAddresses(@Param("addresses") String addresses, @Param("gatewayId") String gatewayId);
@Select("select `id`, `value` from iot_data.indicator_data where `address` in (${addresses}) and gateway_id = #{gatewayId} and ts >= #{startTime} and ts <= #{endTime}")
List<IndicatorData> selectByAddresses(@Param("addresses") String addresses, @Param("gatewayId") String gatewayId,@Param("startTime") String startTime, @Param("endTime")String endTime);
/**
* 根据测点名称查询测点值信息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment