Commit 9cee2dd5 authored by suhuiguang's avatar suhuiguang

1.增加气瓶日统计表

parent 3b17190d
...@@ -24,13 +24,13 @@ public class CylinderFillingRecordStatisticsUnitDayDto extends BaseModel { ...@@ -24,13 +24,13 @@ public class CylinderFillingRecordStatisticsUnitDayDto extends BaseModel {
@ApiModelProperty(value = "充装次数") @ApiModelProperty(value = "充装次数")
private Long totalSum; private Integer totalSum;
@ApiModelProperty(value = "充装前检查数量") @ApiModelProperty(value = "充装前检查数量")
private int fillingBeforeSum; private Integer fillingBeforeSum;
@ApiModelProperty(value = "充装后检查数量") @ApiModelProperty(value = "充装后检查数量")
private int fillingAfterSum; private Integer fillingAfterSum;
@ApiModelProperty(value = "企业编码") @ApiModelProperty(value = "企业编码")
private String appId; private String appId;
......
...@@ -28,19 +28,19 @@ public class CylinderFillingRecordStatisticsUnitDay extends BaseEntity { ...@@ -28,19 +28,19 @@ public class CylinderFillingRecordStatisticsUnitDay extends BaseEntity {
* 充装次数 * 充装次数
*/ */
@TableField("total_sum") @TableField("total_sum")
private Long totalSum; private Integer totalSum;
/** /**
* 充装前检查数量 * 充装前检查数量
*/ */
@TableField("filling_before_sum") @TableField("filling_before_sum")
private int fillingBeforeSum; private Integer fillingBeforeSum;
/** /**
* 充装后检查数量 * 充装后检查数量
*/ */
@TableField("filling_after_sum") @TableField("filling_after_sum")
private int fillingAfterSum; private Integer fillingAfterSum;
/** /**
* 企业编码 * 企业编码
......
package com.yeejoin.amos.boot.module.cylinder.biz.event;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderFillingRecordStatisticsUnitDayDto;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* @author Administrator
*/
@Getter
public class CylinderStatisticsUnitDayInsertOrUpdateEvent extends ApplicationEvent {
private CylinderFillingRecordStatisticsUnitDayDto cylinderFillingRecordStatisticsUnitDayDto;
/**
* Create a new {@code ApplicationEvent}.
*
* @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null})
*/
public CylinderStatisticsUnitDayInsertOrUpdateEvent(Object source, CylinderFillingRecordStatisticsUnitDayDto cylinderFillingRecordStatisticsUnitDayDto) {
super(source);
this.cylinderFillingRecordStatisticsUnitDayDto = cylinderFillingRecordStatisticsUnitDayDto;
}
}
...@@ -86,7 +86,7 @@ public class CylinderQuestionCreateEventListener implements ApplicationListener< ...@@ -86,7 +86,7 @@ public class CylinderQuestionCreateEventListener implements ApplicationListener<
waitSaveData.add(questionInfoDto); waitSaveData.add(questionInfoDto);
try { try {
reentrantLock.lock(); reentrantLock.lock();
if (queue.size() > batchSize) { if (queue.size() >= batchSize) {
queue.drainTo(waitSaveData, batchSize); queue.drainTo(waitSaveData, batchSize);
} }
} finally { } finally {
......
package com.yeejoin.amos.boot.module.cylinder.biz.event.listener;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Sequence;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderFillingRecordStatisticsUnitDayDto;
import com.yeejoin.amos.boot.module.cylinder.api.entity.CylinderFillingRecordStatisticsUnitDay;
import com.yeejoin.amos.boot.module.cylinder.biz.event.CylinderStatisticsUnitDayInsertOrUpdateEvent;
import com.yeejoin.amos.boot.module.cylinder.biz.service.impl.CylinderFillingRecordStatisticsUnitDayServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author Administrator
*/
@Component
@Slf4j
public class CylinderStatisticsUnitDayInsertOrUpdateEventListener implements ApplicationListener<CylinderStatisticsUnitDayInsertOrUpdateEvent> {
@Value("${cylinder.day.statistics.insertOrUpdate.thread.number:3}")
private int threadNumber;
/**
* 批量入库大小
*/
private int batchSize = 10;
private List<BlockingQueue<CylinderFillingRecordStatisticsUnitDayDto>> hashCodeBlockingQueues = new ArrayList<>();
private ReentrantLock reentrantLock = new ReentrantLock();
private Sequence sequence;
private CylinderFillingRecordStatisticsUnitDayServiceImpl statisticsUnitDayService;
public CylinderStatisticsUnitDayInsertOrUpdateEventListener(CylinderFillingRecordStatisticsUnitDayServiceImpl statisticsUnitDayService,
Sequence sequence) {
this.statisticsUnitDayService = statisticsUnitDayService;
this.sequence = sequence;
}
@Override
public void onApplicationEvent(CylinderStatisticsUnitDayInsertOrUpdateEvent event) {
CylinderFillingRecordStatisticsUnitDayDto fillingRecordStatisticsUnitDayDto = event.getCylinderFillingRecordStatisticsUnitDayDto();
log.info("2.收到气瓶单位日统计事件消息:{}", JSON.toJSONString(fillingRecordStatisticsUnitDayDto));
// 按照一定的规则将统计维度不同的数据放到不同的队列里,保证多线程不会同时操作相同行数据,减少了线程的冲突,提高执行效率
int queueIndex = Math.abs(fillingRecordStatisticsUnitDayDto.getAppId().hashCode()) % threadNumber;
hashCodeBlockingQueues.get(queueIndex).add(fillingRecordStatisticsUnitDayDto);
}
@PostConstruct
public void init() {
// 初始化队列,按照线程数动态创建队列
initQueue();
// 初始化多线程消费线程
ExecutorService executorService = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) {
BlockingQueue<CylinderFillingRecordStatisticsUnitDayDto> queue = hashCodeBlockingQueues.get(i);
executorService.execute(() -> {
while (true) {
try {
CylinderFillingRecordStatisticsUnitDayDto fillingRecordStatisticsUnitDayDto = queue.take();
List<CylinderFillingRecordStatisticsUnitDayDto> unitDayDtos = new ArrayList<>();
unitDayDtos.add(fillingRecordStatisticsUnitDayDto);
try {
reentrantLock.lock();
if (queue.size() >= batchSize) {
queue.drainTo(unitDayDtos, batchSize);
}
} finally {
reentrantLock.unlock();
}
this.buildAndSaveData(unitDayDtos);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}
}
private void buildAndSaveData(List<CylinderFillingRecordStatisticsUnitDayDto> unitDayDtos) {
List<CylinderFillingRecordStatisticsUnitDay> existStatisticsData = queryStatisticsDayOfToday(unitDayDtos);
List<CylinderFillingRecordStatisticsUnitDayDto> insertStatisticsData = this.buildStatisticsDataOfInsert(unitDayDtos, existStatisticsData);
List<CylinderFillingRecordStatisticsUnitDayDto> updateStatisticsData = this.buildStatisticsDataOfUpdate(unitDayDtos, existStatisticsData);
this.sumUpdateStatisticsData(existStatisticsData, updateStatisticsData);
this.saveOrUpdateBatch(insertStatisticsData, updateStatisticsData);
}
private void saveOrUpdateBatch(List<CylinderFillingRecordStatisticsUnitDayDto> insertStatisticsData, List<CylinderFillingRecordStatisticsUnitDayDto> updateStatisticsData) {
// appId + 日期 存在则累计更新(update),不存在则插入(insert)
List<CylinderFillingRecordStatisticsUnitDay> entitys = this.mergeDataAnCastDto2Entity(insertStatisticsData, updateStatisticsData);
statisticsUnitDayService.saveOrUpdateBatch(entitys);
log.info("3.气瓶日统计表入库成功");
}
private List<CylinderFillingRecordStatisticsUnitDay> mergeDataAnCastDto2Entity(List<CylinderFillingRecordStatisticsUnitDayDto> insertStatisticsData, List<CylinderFillingRecordStatisticsUnitDayDto> updateStatisticsData) {
insertStatisticsData.addAll(updateStatisticsData);
return BeanUtil.copyToList(insertStatisticsData, CylinderFillingRecordStatisticsUnitDay.class);
}
private void sumUpdateStatisticsData(List<CylinderFillingRecordStatisticsUnitDay> existStatisticsData, List<CylinderFillingRecordStatisticsUnitDayDto> updateStatisticsData) {
Map<String, CylinderFillingRecordStatisticsUnitDay> existStatisticsDataMap = existStatisticsData.stream().collect(Collectors.toMap(CylinderFillingRecordStatisticsUnitDay::getAppId, Function.identity()));
updateStatisticsData.forEach(up -> {
up.setFillingNotPassedCount(existStatisticsDataMap.get(up.getAppId()).getFillingNotPassedCount() + up.getFillingNotPassedCount());
up.setFillingQuantity(this.sumFillingQuantity(existStatisticsDataMap.get(up.getAppId()).getFillingQuantity(), up.getFillingQuantity()));
up.setFillingBeforeSum(existStatisticsDataMap.get(up.getAppId()).getFillingBeforeSum() + up.getFillingBeforeSum());
up.setFillingAfterSum(existStatisticsDataMap.get(up.getAppId()).getFillingAfterSum() + up.getFillingAfterSum());
up.setTotalSum(existStatisticsDataMap.get(up.getAppId()).getTotalSum() + 1);
up.setSequenceNbr(existStatisticsDataMap.get(up.getAppId()).getSequenceNbr());
up.setFillingDate(new Date());
});
}
/**
* 累计充装量计算
*
* @param fillingQuantity 数据库已有充装量
* @param newFillingQuantity 新增加充装量
* @return 求和
*/
private BigDecimal sumFillingQuantity(BigDecimal fillingQuantity, BigDecimal newFillingQuantity) {
return fillingQuantity.add(newFillingQuantity).setScale(2, RoundingMode.HALF_UP);
}
private List<CylinderFillingRecordStatisticsUnitDayDto> buildStatisticsDataOfInsert(List<CylinderFillingRecordStatisticsUnitDayDto> unitDayDtos, List<CylinderFillingRecordStatisticsUnitDay> existStatisticsData) {
return unitDayDtos.stream().filter(s -> existStatisticsData.stream().noneMatch(e -> e.getAppId().equals(s.getAppId()))).peek(u -> u.setSequenceNbr(sequence.nextId())).collect(Collectors.toList());
}
private List<CylinderFillingRecordStatisticsUnitDayDto> buildStatisticsDataOfUpdate(List<CylinderFillingRecordStatisticsUnitDayDto> unitDayDtos, List<CylinderFillingRecordStatisticsUnitDay> existStatisticsData) {
return unitDayDtos.stream().filter(s -> existStatisticsData.stream().anyMatch(e -> e.getAppId().equals(s.getAppId()))).collect(Collectors.toList());
}
private List<CylinderFillingRecordStatisticsUnitDay> queryStatisticsDayOfToday(List<CylinderFillingRecordStatisticsUnitDayDto> unitDayDtos) {
Set<String> appIds = unitDayDtos.stream().map(CylinderFillingRecordStatisticsUnitDayDto::getAppId).collect(Collectors.toSet());
return statisticsUnitDayService.list(new LambdaQueryWrapper<CylinderFillingRecordStatisticsUnitDay>().in(CylinderFillingRecordStatisticsUnitDay::getAppId, appIds).eq(CylinderFillingRecordStatisticsUnitDay::getFillingDate, DateUtil.format(new Date(), DatePattern.NORM_DATE_PATTERN)));
}
private void initQueue() {
for (int i = 0; i < threadNumber; i++) {
hashCodeBlockingQueues.add(new LinkedBlockingQueue<>());
}
}
}
package com.yeejoin.amos.boot.module.cylinder.biz.listener; package com.yeejoin.amos.boot.module.cylinder.biz.listener;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderQuestionInfoDto;
import com.yeejoin.amos.boot.module.cylinder.api.enums.QuestionTypeEnum;
import com.yeejoin.amos.boot.module.cylinder.biz.event.CylinderQuestionCreateEvent;
import com.yeejoin.amos.boot.module.cylinder.biz.event.publisher.EventPublisher;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderFillingRecordDto; import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderFillingRecordDto;
import com.yeejoin.amos.boot.module.cylinder.flc.api.enums.CyclinderStatus;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -32,22 +27,22 @@ public class CylinderFillingInsertEventListener extends EmqxListener { ...@@ -32,22 +27,22 @@ public class CylinderFillingInsertEventListener extends EmqxListener {
@Value("${cylinder.filling.insert.msg.custom.thread.number:3}") @Value("${cylinder.filling.insert.msg.custom.thread.number:3}")
private int threadNumber; private int threadNumber;
@Value("${cylinder.filling.insert.topic:cylinder/filling/insert/topic}") @Value("${cylinder.filling.insert.topic:cylinder/filling/insert/topic2}")
private String insertTopic; private String insertTopic;
@Value("${spring.application.name:TZS-CYLINDER}") @Value("${spring.application.name:TZS-CYLINDER}")
private String applicationName; private String applicationName;
private EventPublisher eventPublisher;
private EmqKeeper emqKeeper; private EmqKeeper emqKeeper;
private List<ICylinderFillingInsertListener> cylinderFillingInsertListeners;
private BlockingQueue<CylinderFillingRecordDto> blockingQueue = new LinkedBlockingQueue<>(); private BlockingQueue<CylinderFillingRecordDto> blockingQueue = new LinkedBlockingQueue<>();
public CylinderFillingInsertEventListener(EventPublisher eventPublisher, public CylinderFillingInsertEventListener(EmqKeeper emqKeeper,
EmqKeeper emqKeeper) { List<ICylinderFillingInsertListener> cylinderFillingInsertListeners) {
this.eventPublisher = eventPublisher;
this.emqKeeper = emqKeeper; this.emqKeeper = emqKeeper;
this.cylinderFillingInsertListeners = cylinderFillingInsertListeners;
} }
...@@ -79,19 +74,9 @@ public class CylinderFillingInsertEventListener extends EmqxListener { ...@@ -79,19 +74,9 @@ public class CylinderFillingInsertEventListener extends EmqxListener {
} }
private void createCylinderFillingQuestion(CylinderFillingRecordDto cylinderFillingRecordDto) { private void createCylinderFillingQuestion(CylinderFillingRecordDto cylinderFillingRecordDto) {
if (CyclinderStatus.HEGE.getName().equals(cylinderFillingRecordDto.getFillingResult()) && CyclinderStatus.HEGE.getName().equals(cylinderFillingRecordDto.getCheckResult())) { cylinderFillingInsertListeners.forEach(listener -> {
log.info("气瓶充装信息前后全部合格,不创建问题!"); listener.handle(cylinderFillingRecordDto);
} else { });
CylinderQuestionInfoDto questionInfo = new CylinderQuestionInfoDto();
questionInfo.setQuestionType(QuestionTypeEnum.CZJCBHG.getCode());
questionInfo.setQuestionTypeName(QuestionTypeEnum.CZJCBHG.getName());
questionInfo.setQuestionAttributionId(cylinderFillingRecordDto.getAppId());
questionInfo.setQuestionAttributionName(cylinderFillingRecordDto.getFillingUnitName());
questionInfo.setRegionCode(cylinderFillingRecordDto.getRegionCode());
questionInfo.setQuestionObjectId(cylinderFillingRecordDto.getSequenceCode());
questionInfo.setHappenDate(cylinderFillingRecordDto.getSyncDate());
eventPublisher.publish(new CylinderQuestionCreateEvent(this, questionInfo));
}
} }
@Override @Override
......
package com.yeejoin.amos.boot.module.cylinder.biz.listener;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderQuestionInfoDto;
import com.yeejoin.amos.boot.module.cylinder.api.enums.QuestionTypeEnum;
import com.yeejoin.amos.boot.module.cylinder.biz.event.CylinderQuestionCreateEvent;
import com.yeejoin.amos.boot.module.cylinder.biz.event.publisher.EventPublisher;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderFillingRecordDto;
import com.yeejoin.amos.boot.module.cylinder.flc.api.enums.CyclinderStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author Administrator
*/
@Component
@Slf4j
public class CylinderFillingInsertListenerForQuestion implements ICylinderFillingInsertListener {
private EventPublisher eventPublisher;
public CylinderFillingInsertListenerForQuestion(EventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@Override
public void handle(CylinderFillingRecordDto cylinderFillingRecordDto) {
if (CyclinderStatus.HEGE.getName().equals(cylinderFillingRecordDto.getFillingResult()) && CyclinderStatus.HEGE.getName().equals(cylinderFillingRecordDto.getCheckResult())) {
log.info("气瓶充装信息前后全部合格,不创建问题!");
} else {
CylinderQuestionInfoDto questionInfo = new CylinderQuestionInfoDto();
questionInfo.setQuestionType(QuestionTypeEnum.CZJCBHG.getCode());
questionInfo.setQuestionTypeName(QuestionTypeEnum.CZJCBHG.getName());
questionInfo.setQuestionAttributionId(cylinderFillingRecordDto.getAppId());
questionInfo.setQuestionAttributionName(cylinderFillingRecordDto.getFillingUnitName());
questionInfo.setRegionCode(cylinderFillingRecordDto.getRegionCode());
questionInfo.setQuestionObjectId(cylinderFillingRecordDto.getSequenceCode());
questionInfo.setHappenDate(cylinderFillingRecordDto.getSyncDate());
eventPublisher.publish(new CylinderQuestionCreateEvent(this, questionInfo));
}
}
}
package com.yeejoin.amos.boot.module.cylinder.biz.listener;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderFillingRecordStatisticsUnitDayDto;
import com.yeejoin.amos.boot.module.cylinder.biz.event.CylinderStatisticsUnitDayInsertOrUpdateEvent;
import com.yeejoin.amos.boot.module.cylinder.biz.event.publisher.EventPublisher;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderFillingRecordDto;
import com.yeejoin.amos.boot.module.cylinder.flc.api.enums.CyclinderStatus;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.Date;
/**
* @author Administrator
*/
@Component
public class CylinderFillingInsertListenerForStatistics implements ICylinderFillingInsertListener {
private EventPublisher eventPublisher;
public CylinderFillingInsertListenerForStatistics(EventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
@Override
public void handle(CylinderFillingRecordDto cylinderFillingRecordDto) {
CylinderFillingRecordStatisticsUnitDayDto statisticsUnitDay = new CylinderFillingRecordStatisticsUnitDayDto();
statisticsUnitDay.setAppId(cylinderFillingRecordDto.getAppId());
statisticsUnitDay.setRegionCode(cylinderFillingRecordDto.getRegionCode());
statisticsUnitDay.setFillingDate(cylinderFillingRecordDto.getSyncDate());
statisticsUnitDay.setFillingAfterSum(StringUtils.isEmpty(cylinderFillingRecordDto.getFillingEndtime()) ? 0 : 1);
statisticsUnitDay.setFillingBeforeSum(StringUtils.isEmpty(cylinderFillingRecordDto.getFillingStarttime()) ? 0 : 1);
statisticsUnitDay.setFillingQuantity(new BigDecimal(cylinderFillingRecordDto.getFillingQuantity()));
statisticsUnitDay.setFillingNotPassedCount(this.justIsOk(cylinderFillingRecordDto));
statisticsUnitDay.setFillingDate(new Date());
statisticsUnitDay.setRecDate(new Date());
statisticsUnitDay.setTotalSum(1);
eventPublisher.publish(new CylinderStatisticsUnitDayInsertOrUpdateEvent(this, statisticsUnitDay));
}
private int justIsOk(CylinderFillingRecordDto cylinderFillingRecordDto) {
if (CyclinderStatus.HEGE.getName().equals(cylinderFillingRecordDto.getFillingResult()) && CyclinderStatus.HEGE.getName().equals(cylinderFillingRecordDto.getCheckResult())) {
return 0;
} else {
return 1;
}
}
}
package com.yeejoin.amos.boot.module.cylinder.biz.listener;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderFillingRecordDto;
/**
* @author Administrator
*/
public interface ICylinderFillingInsertListener {
/**
* 事件处理
* @param cylinderFillingRecordDto 充装对象
*/
void handle(CylinderFillingRecordDto cylinderFillingRecordDto);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment