Commit 6cb7fb76 authored by suhuiguang's avatar suhuiguang

1.openapi发出消息,气瓶服务订阅消息进行业务处理

2.气瓶问题模块订阅消息创建充装检查不合格类型的问题
parent ac832c1d
package com.yeejoin.amos.api.openapi.face.service;
import com.alibaba.fastjson.JSON;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.google.common.collect.Lists;
import com.yeejoin.amos.api.openapi.face.model.*;
import com.yeejoin.amos.api.openapi.face.orm.dao.ESCylinderFillingRecordRepository;
......@@ -7,32 +9,20 @@ import com.yeejoin.amos.api.openapi.face.orm.dao.ESCylinderInfoRepository;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.module.cylinder.api.entity.ESCylinderFillingRecordDto;
import com.yeejoin.amos.boot.module.cylinder.api.entity.ESCylinderInfoDto;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.*;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderInfoDto;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.*;
import com.yeejoin.amos.boot.module.cylinder.flc.api.mapper.*;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StopWatch;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.utils.Bean;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.IService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.yeejoin.amos.boot.module.cylinder.flc.api.dto.CylinderInfoDto;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderFilling;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderFillingCheck;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderFillingExamine;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderFillingRecord;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderInfo;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderInspection;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderTags;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderUnit;
import com.yeejoin.amos.boot.module.cylinder.flc.api.entity.CylinderUnitVideo;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
......@@ -41,174 +31,186 @@ import java.util.stream.Collectors;
@Slf4j
@Component
@DS("tzs")
public class SyncCylinderDataService
{
/**
* 气瓶企业信息
*/
@Autowired
private CylinderUnitMapper cylinderUnitMapper;
/**
* 气瓶基本信息
*/
@Autowired
private CylinderInfoMapper cylinderInfoMapper;
/**
* 气瓶充装信息--充装前检查
*/
@Autowired
private CylinderFillingMapper cylinderFillingMapper;
/**
* 液化气体气瓶充装信息-充装后复查
*/
@Autowired
private CylinderFillingCheckMapper cylinderFillingCheckMapper;
@Autowired
private CylCylinderFillingCheckMapper cylCylinderFillingCheckMapper;
/**
* 液化气体气瓶充装信息审核
*/
@Autowired
private CylinderFillingExamineMapper cylinderFillingExamineMapper;
/**
* 气瓶标签信息
*/
@Autowired
private CylinderTagsMapper cylinderTagsMapper;
/**
* 气瓶检验信息
*/
@Autowired
private CylinderInspectionMapper cylinderInspectionMapper;
@Autowired
CylinderFillingRecordMapper cylinderFillingRecordMapper;
@Autowired
ESCylinderFillingRecordRepository esCylinderFillingRecordRepository;
@Autowired
ESCylinderInfoRepository esCylinderInfoRepository;
public void syncCylinderUnit(List<TmCylinderUnitModel> cylinderUnitDto) {
// List<CylinderUnit> cylinderUnits = Bean.toModels(cylinderUnitDto,CylinderUnit.class);
// LambdaQueryWrapper<CylinderUnit> wrapper = new LambdaQueryWrapper<>();
// List<CylinderUnit> cylinderUnitList = this.getBaseMapper().selectList(wrapper);
List<CylinderUnit> cylinderUnitList = Bean.toModels(cylinderUnitDto,CylinderUnit.class);
cylinderUnitMapper.saveOrUpdateBatch(cylinderUnitList);
}
public void syncCylinderInfo(final List<TmCylinderInfoModel> cylinderInfoDto) {
List<CylinderInfo> cylinderUnitList = Bean.toModels(cylinderInfoDto,CylinderInfo.class);
cylinderInfoMapper.saveOrUpdateBatch(cylinderUnitList);
}
public void syncCylinderFillingExamine(List<TmCylinderFillingExamineModel> cylinderFillingExamineDto) {
List<CylinderFillingExamine> cylinderFillingExamineList = Bean.toModels(cylinderFillingExamineDto,
CylinderFillingExamine.class);
cylinderFillingExamineMapper.saveOrUpdateBatch(cylinderFillingExamineList);
}
public void syncCylinderFillingRecord(List<TmCylinderFillingRecordModel> cylinderFillingRecordDtos) {
List<CylinderFillingRecord> cylinderFillingRecordList = Bean.toModels(cylinderFillingRecordDtos, CylinderFillingRecord.class);
cylCylinderFillingCheckMapper.batchInsertOrUpdate(cylinderFillingRecordList);
}
public void syncCylinderInspection(List<TmCylinderInspectionModel> cylinderInspectionDto) {
List<CylinderInspection> cylinderInspectionList = Bean.toModels(cylinderInspectionDto, CylinderInspection.class);
cylinderInspectionMapper.saveOrUpdateBatch(cylinderInspectionList);
}
public void syncCylinderTag(List<TmCylinderTagsModel> cylinderTagsDtos) {
List<CylinderTags> cylinderTagsList = Bean.toModels(cylinderTagsDtos, CylinderTags.class);
cylinderTagsMapper.saveOrUpdateBatch(cylinderTagsList);
}
public void syncCylinderFillingBefore(List<TmCylinderFillingModel> cylinderFillingDtos) {
List<CylinderFilling> cylinderFillingList = Bean.toModels(cylinderFillingDtos, CylinderFilling.class);
cylCylinderFillingCheckMapper.saveAndBatchInsert(cylinderFillingList);
}
public void syncCylinderFillingAfter(List<TmCylinderFillingCheckModel> cylinderFillingCheckDtos) {
List<CylinderFillingCheck> cylinderFillingChecList = Bean.toModels(cylinderFillingCheckDtos, CylinderFillingCheck.class);
cylinderFillingCheckMapper.saveOrUpdateByCondition(cylinderFillingChecList);
}
public void createCylinderFillingRecord(List<ESCylinderFillingRecordDto> cylinderFillingRecord) {
if (!ObjectUtils.isEmpty(cylinderFillingRecord)) {
List<String> appIds = cylinderFillingRecord.stream().map(ESCylinderFillingRecordDto::getAppId).collect(Collectors.toList());
List<String> sequenceCodeS = cylinderFillingRecord.stream().map(ESCylinderFillingRecordDto::getSequenceCode).collect(Collectors.toList());
List<ESCylinderFillingRecordDto> cylinderFillingRecordInfo = cylinderFillingRecordMapper.getCylinderFillingRecordInfo(appIds, sequenceCodeS);
cylinderFillingRecord.stream().map(item -> {
List<ESCylinderFillingRecordDto> collect = cylinderFillingRecordInfo.stream().filter(e -> item.getAppIdAndSequenceCode().equals(e.getAppIdAndSequenceCode())).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(collect)) {
item.setUnitName(collect.get(0).getUnitName());
item.setFactoryNum(collect.get(0).getFactoryNum());
item.setCylinderVariety(collect.get(0).getCylinderVariety());
item.setCylinderVarietyName(collect.get(0).getCylinderVarietyName());
item.setUnitInnerCode(collect.get(0).getUnitInnerCode());
item.setSequenceCode(collect.get(0).getSequenceCode());
item.setQrCode(collect.get(0).getQrCode());
item.setElectronicLabelCode(collect.get(0).getElectronicLabelCode());
item.setAppId(collect.get(0).getAppId());
item.setCreditCode(collect.get(0).getCreditCode());
item.setRegionCode(collect.get(0).getRegionCode());
try {
item.setInspectionDateMs(ObjectUtils.isEmpty(item.getFillingStartTime()) ? 0L : DateUtils.dateParseWithPattern(item.getFillingStartTime()).getTime());
item.setInspectionDateAfterMS(ObjectUtils.isEmpty(item.getFillingEndTime()) ? 0L : DateUtils.dateParseWithPattern(item.getFillingEndTime()).getTime());
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
return item;
}).collect(Collectors.toList());
saveCylinderFillingRecord2ES(cylinderFillingRecord);
public class SyncCylinderDataService {
/**
* 气瓶企业信息
*/
@Autowired
private CylinderUnitMapper cylinderUnitMapper;
/**
* 气瓶基本信息
*/
@Autowired
private CylinderInfoMapper cylinderInfoMapper;
/**
* 液化气体气瓶充装信息-充装后复查
*/
@Autowired
private CylinderFillingCheckMapper cylinderFillingCheckMapper;
@Autowired
private CylCylinderFillingCheckMapper cylCylinderFillingCheckMapper;
/**
* 液化气体气瓶充装信息审核
*/
@Autowired
private CylinderFillingExamineMapper cylinderFillingExamineMapper;
/**
* 气瓶标签信息
*/
@Autowired
private CylinderTagsMapper cylinderTagsMapper;
/**
* 气瓶检验信息
*/
@Autowired
private CylinderInspectionMapper cylinderInspectionMapper;
@Autowired
CylinderFillingRecordMapper cylinderFillingRecordMapper;
@Autowired
ESCylinderFillingRecordRepository esCylinderFillingRecordRepository;
@Autowired
ESCylinderInfoRepository esCylinderInfoRepository;
@Autowired
EmqKeeper emqKeeper;
@Value("${cylinder.filling.insert.topic:cylinder/filling/insert/topic}")
private String insertTopic;
public void syncCylinderUnit(List<TmCylinderUnitModel> cylinderUnitDto) {
List<CylinderUnit> cylinderUnitList = Bean.toModels(cylinderUnitDto, CylinderUnit.class);
cylinderUnitMapper.saveOrUpdateBatch(cylinderUnitList);
}
public void syncCylinderInfo(final List<TmCylinderInfoModel> cylinderInfoDto) {
List<CylinderInfo> cylinderUnitList = Bean.toModels(cylinderInfoDto, CylinderInfo.class);
cylinderInfoMapper.saveOrUpdateBatch(cylinderUnitList);
}
public void syncCylinderFillingExamine(List<TmCylinderFillingExamineModel> cylinderFillingExamineDto) {
List<CylinderFillingExamine> cylinderFillingExamineList = Bean.toModels(cylinderFillingExamineDto,
CylinderFillingExamine.class);
cylinderFillingExamineMapper.saveOrUpdateBatch(cylinderFillingExamineList);
}
public void syncCylinderFillingRecord(List<TmCylinderFillingRecordModel> cylinderFillingRecordDtos) {
List<CylinderFillingRecord> cylinderFillingRecordList = Bean.toModels(cylinderFillingRecordDtos, CylinderFillingRecord.class);
cylCylinderFillingCheckMapper.batchInsertOrUpdate(cylinderFillingRecordList);
}
public void syncCylinderInspection(List<TmCylinderInspectionModel> cylinderInspectionDto) {
List<CylinderInspection> cylinderInspectionList = Bean.toModels(cylinderInspectionDto, CylinderInspection.class);
cylinderInspectionMapper.saveOrUpdateBatch(cylinderInspectionList);
}
public void syncCylinderTag(List<TmCylinderTagsModel> cylinderTagsDtos) {
List<CylinderTags> cylinderTagsList = Bean.toModels(cylinderTagsDtos, CylinderTags.class);
cylinderTagsMapper.saveOrUpdateBatch(cylinderTagsList);
}
}
public void saveCylinderFillingRecord2ES(List<ESCylinderFillingRecordDto> records) {
List<String> ids = new ArrayList<>();
for (ESCylinderFillingRecordDto record : records) {
CylinderFillingRecord cylinderFillingRecord = new CylinderFillingRecord();
BeanUtils.copyProperties(record, cylinderFillingRecord);
ids.add(String.valueOf(record.getSequenceNbr()));
public void syncCylinderFillingBefore(List<TmCylinderFillingModel> cylinderFillingDtos) {
List<CylinderFilling> cylinderFillingList = Bean.toModels(cylinderFillingDtos, CylinderFilling.class);
cylCylinderFillingCheckMapper.saveAndBatchInsert(cylinderFillingList);
}
esCylinderFillingRecordRepository.saveAll(records);
cylinderFillingRecordMapper.updateCylinderFillingToEsStatus(ids);
}
public void createCylinderInfo2ES(TmCylinderInfoModel cylinderInfoModel) {
CylinderInfoDto cylinderInfoDto = Bean.toModel(cylinderInfoModel, new CylinderInfoDto());
List<ESCylinderInfoDto> esCylinderInfoDto = new ArrayList<>();
ESCylinderInfoDto esCylinderInfo = new ESCylinderInfoDto();
BeanUtils.copyProperties(cylinderInfoDto, esCylinderInfo);
esCylinderInfoDto.add(esCylinderInfo);
try {
esCylinderInfo.setInspectionDateMs(ObjectUtils.isEmpty(esCylinderInfo.getInspectionDate()) ? 0L : DateUtils.dateParse(esCylinderInfo.getInspectionDate(), DateUtils.DATE_TIME_PATTERN).getTime());
} catch (ParseException e) {
throw new RuntimeException(e);
public void syncCylinderFillingAfter(List<TmCylinderFillingCheckModel> cylinderFillingCheckDtos) {
List<CylinderFillingCheck> cylinderFillingChecList = Bean.toModels(cylinderFillingCheckDtos, CylinderFillingCheck.class);
cylinderFillingCheckMapper.saveOrUpdateByCondition(cylinderFillingChecList);
}
List<String> ids = Lists.newArrayList();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
esCylinderInfoRepository.saveAll(esCylinderInfoDto);
ids.add(String.valueOf(cylinderInfoDto.getSequenceNbr()));
stopWatch.stop();
if (log.isInfoEnabled()) {
log.info("存入es耗时:{} 秒", stopWatch.getTotalTimeSeconds());
public void createCylinderFillingRecord(List<ESCylinderFillingRecordDto> cylinderFillingRecord) {
if (!ObjectUtils.isEmpty(cylinderFillingRecord)) {
List<String> appIds = cylinderFillingRecord.stream().map(ESCylinderFillingRecordDto::getAppId).collect(Collectors.toList());
List<String> sequenceCodeS = cylinderFillingRecord.stream().map(ESCylinderFillingRecordDto::getSequenceCode).collect(Collectors.toList());
List<ESCylinderFillingRecordDto> cylinderFillingRecordInfo = cylinderFillingRecordMapper.getCylinderFillingRecordInfo(appIds, sequenceCodeS);
cylinderFillingRecord.stream().map(item -> {
List<ESCylinderFillingRecordDto> collect = cylinderFillingRecordInfo.stream().filter(e -> item.getAppIdAndSequenceCode().equals(e.getAppIdAndSequenceCode())).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(collect)) {
item.setUnitName(collect.get(0).getUnitName());
item.setFactoryNum(collect.get(0).getFactoryNum());
item.setCylinderVariety(collect.get(0).getCylinderVariety());
item.setCylinderVarietyName(collect.get(0).getCylinderVarietyName());
item.setUnitInnerCode(collect.get(0).getUnitInnerCode());
item.setSequenceCode(collect.get(0).getSequenceCode());
item.setQrCode(collect.get(0).getQrCode());
item.setElectronicLabelCode(collect.get(0).getElectronicLabelCode());
item.setAppId(collect.get(0).getAppId());
item.setCreditCode(collect.get(0).getCreditCode());
item.setRegionCode(collect.get(0).getRegionCode());
try {
item.setInspectionDateMs(ObjectUtils.isEmpty(item.getFillingStartTime()) ? 0L : DateUtils.dateParseWithPattern(item.getFillingStartTime()).getTime());
item.setInspectionDateAfterMS(ObjectUtils.isEmpty(item.getFillingEndTime()) ? 0L : DateUtils.dateParseWithPattern(item.getFillingEndTime()).getTime());
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
return item;
}).collect(Collectors.toList());
saveCylinderFillingRecord2ES(cylinderFillingRecord);
}
}
public void saveCylinderFillingRecord2ES(List<ESCylinderFillingRecordDto> records) {
List<String> ids = new ArrayList<>();
for (ESCylinderFillingRecordDto record : records) {
CylinderFillingRecord cylinderFillingRecord = new CylinderFillingRecord();
BeanUtils.copyProperties(record, cylinderFillingRecord);
ids.add(String.valueOf(record.getSequenceNbr()));
}
esCylinderFillingRecordRepository.saveAll(records);
cylinderFillingRecordMapper.updateCylinderFillingToEsStatus(ids);
this.publishMsg2CyService(records);
}
/**
* 发布记录创建消息
*
* @param records 所有的记录
*/
private void publishMsg2CyService(List<ESCylinderFillingRecordDto> records) {
try {
log.info("开始发送气瓶充装记录数据消息:{}", JSON.toJSONString(records));
emqKeeper.getMqttClient().publish(insertTopic, JSON.toJSONString(records).getBytes(), 2, false);
} catch (MqttException e) {
log.error("发送气瓶充装记录数据消息失败:{}", e.getMessage());
}
}
StopWatch stopWatch1 = new StopWatch();
stopWatch1.start();
cylinderInfoMapper.updateEsCylinderInfoStatus(ids);
stopWatch1.stop();
if (log.isInfoEnabled()) {
log.info("更新业务数据耗时:{} 秒", stopWatch1.getTotalTimeSeconds());
public void createCylinderInfo2ES(TmCylinderInfoModel cylinderInfoModel) {
CylinderInfoDto cylinderInfoDto = Bean.toModel(cylinderInfoModel, new CylinderInfoDto());
List<ESCylinderInfoDto> esCylinderInfoDto = new ArrayList<>();
ESCylinderInfoDto esCylinderInfo = new ESCylinderInfoDto();
BeanUtils.copyProperties(cylinderInfoDto, esCylinderInfo);
esCylinderInfoDto.add(esCylinderInfo);
try {
esCylinderInfo.setInspectionDateMs(ObjectUtils.isEmpty(esCylinderInfo.getInspectionDate()) ? 0L : DateUtils.dateParse(esCylinderInfo.getInspectionDate(), DateUtils.DATE_TIME_PATTERN).getTime());
} catch (ParseException e) {
throw new RuntimeException(e);
}
List<String> ids = Lists.newArrayList();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
esCylinderInfoRepository.saveAll(esCylinderInfoDto);
ids.add(String.valueOf(cylinderInfoDto.getSequenceNbr()));
stopWatch.stop();
if (log.isInfoEnabled()) {
log.info("存入es耗时:{} 秒", stopWatch.getTotalTimeSeconds());
}
StopWatch stopWatch1 = new StopWatch();
stopWatch1.start();
cylinderInfoMapper.updateEsCylinderInfoStatus(ids);
stopWatch1.stop();
if (log.isInfoEnabled()) {
log.info("更新业务数据耗时:{} 秒", stopWatch1.getTotalTimeSeconds());
}
}
}
}
......@@ -2,6 +2,7 @@ package com.yeejoin.amos.boot.module.cylinder.biz.event.listener;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Sequence;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderQuestionInfoDto;
......@@ -63,6 +64,7 @@ public class CylinderQuestionCreateEventListener implements ApplicationListener<
@Override
public void onApplicationEvent(CylinderQuestionCreateEvent event) {
log.info("2.收到问题创建消息:{}", JSON.toJSONString(event.getCylinderQuestionInfoDto()));
// 按照一定的规则将不同的问题对象放到不同的队列里,保证多线程不会同时操作不同的问题对象数据,减少了线程的冲突,提高执行效率
int queueIndex = Math.abs(event.getCylinderQuestionInfoDto().getQuestionObjectId().hashCode()) % threadNumber;
hashCodeBlockingQueues.get(queueIndex).add(event.getCylinderQuestionInfoDto());
......@@ -117,6 +119,7 @@ public class CylinderQuestionCreateEventListener implements ApplicationListener<
return cylinderQuestionInfo;
}).collect(Collectors.toList());
cylinderQuestionInfoService.saveBatch(cylinderQuestionInfos);
log.info("3.问题入库创建成功:{}", JSON.toJSONString(cylinderQuestionInfos));
}
private String getQuestionObjectName(String sequenceCode, String appId) {
......
package com.yeejoin.amos.boot.module.cylinder.biz.listener;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONArray;
import com.yeejoin.amos.boot.module.cylinder.api.dto.CylinderQuestionInfoDto;
import com.yeejoin.amos.boot.module.cylinder.api.enums.QuestionTypeEnum;
import com.yeejoin.amos.boot.module.cylinder.biz.event.CylinderQuestionCreateEvent;
......@@ -16,6 +16,7 @@ import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -95,9 +96,9 @@ public class CylinderFillingInsertEventListener extends EmqxListener {
@Override
public void processMessage(String topic, MqttMessage message) {
log.info("收到充装信息插入消息:{}", JSONObject.toJSONString(message));
String msg = new String(message.getPayload(), StandardCharsets.UTF_8);
CylinderFillingRecordDto cylinderFillingRecordDto = JSONObject.parseObject(msg, CylinderFillingRecordDto.class);
blockingQueue.add(cylinderFillingRecordDto);
log.info("1.收到充装信息插入消息:{}", msg);
List<CylinderFillingRecordDto> cylinderFillingRecordDtos = JSONArray.parseArray(msg, CylinderFillingRecordDto.class);
blockingQueue.addAll(cylinderFillingRecordDtos);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment