Commit 7d02aa63 authored by suhuiguang's avatar suhuiguang

fix(jyjc):bug处理

1.报检推送失败时记录日志
parent 3fbf1aef
...@@ -40,6 +40,11 @@ public interface BizCommonConstant { ...@@ -40,6 +40,11 @@ public interface BizCommonConstant {
String PUSH_STATUS_SUCCESS = "2"; String PUSH_STATUS_SUCCESS = "2";
/** /**
* 推送状态失败
*/
String PUSH_STATUS_FAILED = "99";
/**
* 待分配业务负责人 * 待分配业务负责人
*/ */
String JS_DFP = "66190"; String JS_DFP = "66190";
......
...@@ -28,15 +28,13 @@ import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.CommonServiceImpl; ...@@ -28,15 +28,13 @@ import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.CommonServiceImpl;
import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationEquipServiceImpl; import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationEquipServiceImpl;
import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationPushLogServiceImpl; import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationPushLogServiceImpl;
import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationServiceImpl; import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationServiceImpl;
import com.yeejoin.amos.boot.module.jyjc.biz.util.JsonUtils;
import com.yeejoin.amos.boot.module.jyjc.biz.util.JyjcConstant; import com.yeejoin.amos.boot.module.jyjc.biz.util.JyjcConstant;
import com.yeejoin.amos.boot.module.ymt.api.entity.*; import com.yeejoin.amos.boot.module.ymt.api.entity.*;
import com.yeejoin.amos.boot.module.ymt.api.mapper.*; import com.yeejoin.amos.boot.module.ymt.api.mapper.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.event.TransactionalEventListener;
...@@ -55,90 +53,85 @@ import java.util.stream.Collectors; ...@@ -55,90 +53,85 @@ import java.util.stream.Collectors;
*/ */
@Component @Component
@Slf4j @Slf4j
@RequiredArgsConstructor
public class InspectionApplicationPushEventListener { public class InspectionApplicationPushEventListener {
@Value("classpath:/json/companyCodeRegName.json")
private Resource regNameJson;
private Map<String, String> companyCodeRegNameMap;
private final BlockingQueue<JyjcInspectionApplicationModel> queue = new LinkedBlockingQueue<>(); private final BlockingQueue<JyjcInspectionApplicationModel> queue = new LinkedBlockingQueue<>();
@Value("${inspection.push.max.deal.thread.num:2}") @Value("${inspection.push.max.deal.thread.num:2}")
private int threadNum; private int threadNum;
@Autowired
JyjcInspectionApplicationPushLogServiceImpl pushLogService;
@Autowired private final JyjcInspectionApplicationPushLogServiceImpl pushLogService;
JyjcInspectionApplicationEquipServiceImpl applicationEquipService;
private final JyjcInspectionApplicationEquipServiceImpl applicationEquipService;
private final IdxBizJgConstructionInfoMapper constructionInfoMapper;
private final IdxBizJgUseInfoMapper useInfoMapper;
private final IdxBizJgRegisterInfoMapper idxBizJgRegisterInfoMapper;
private final TzBaseEnterpriseInfoMapper baseEnterpriseInfoMapper;
private final EquipTechParamBoilerMapper equipTechParamBoilerMapper;
private final EquipTechParamVesselMapper equipTechParamVesselMapper;
@Autowired
IdxBizJgConstructionInfoMapper constructionInfoMapper;
@Autowired private final EquipTechParamElevatorMapper equipTechParamElevatorMapper;
IdxBizJgUseInfoMapper useInfoMapper;
@Autowired
IdxBizJgRegisterInfoMapper idxBizJgRegisterInfoMapper;
@Autowired private final EquipTechParamLiftingMapper equipTechParamLiftingMapper;
TzBaseEnterpriseInfoMapper baseEnterpriseInfoMapper;
@Autowired
private EquipTechParamBoilerMapper equipTechParamBoilerMapper;
@Autowired private final EquipTechParamVehicleMapper equipTechParamVehicleMapper;
private EquipTechParamVesselMapper equipTechParamVesselMapper;
@Autowired
private EquipTechParamElevatorMapper equipTechParamElevatorMapper;
@Autowired private final EquipTechParamRidesMapper equipTechParamRidesMapper;
private EquipTechParamLiftingMapper equipTechParamLiftingMapper;
@Autowired
private EquipTechParamVehicleMapper equipTechParamVehicleMapper;
@Autowired private final EquipTechParamPipelineMapper equipTechParamPipelineMapper;
private EquipTechParamRidesMapper equipTechParamRidesMapper;
@Autowired
private EquipTechParamPipelineMapper equipTechParamPipelineMapper;
@Autowired private final EquipTechParamRopewayMapper equipTechParamRopewayMapper;
private EquipTechParamRopewayMapper equipTechParamRopewayMapper;
@Autowired
KafkaProducer kafkaProducer;
@Autowired private final KafkaProducer kafkaProducer;
SnowflakeIdUtil sequence;
@Autowired
IdxBizJgFactoryInfoMapper factoryInfoMapper;
@Autowired private final SnowflakeIdUtil sequence;
IdxBizJgMaintenanceRecordInfoMapper maintenanceInfoMapper;
@Autowired
JgUseRegistrationManageMapper jgUseRegistrationManageMapper;
@Autowired private final IdxBizJgFactoryInfoMapper factoryInfoMapper;
IdxBizJgDesignInfoMapper designInfoMapper;
@javax.annotation.Resource
IdxBizJgProjectContraptionMapper projectContraptionMapper;
private final IdxBizJgMaintenanceRecordInfoMapper maintenanceInfoMapper;
@javax.annotation.Resource
private JyjcInspectionApplicationServiceImpl inspectionApplicationService;
@javax.annotation.Resource private final JgUseRegistrationManageMapper jgUseRegistrationManageMapper;
private TzBaseEnterpriseInfoMapper enterpriseInfoMapper;
@javax.annotation.Resource
private CommonServiceImpl commonService; private final IdxBizJgDesignInfoMapper designInfoMapper;
private final IdxBizJgProjectContraptionMapper projectContraptionMapper;
private final JyjcInspectionApplicationServiceImpl inspectionApplicationService;
private final TzBaseEnterpriseInfoMapper enterpriseInfoMapper;
private final CommonServiceImpl commonService;
/** /**
...@@ -151,15 +144,10 @@ public class InspectionApplicationPushEventListener { ...@@ -151,15 +144,10 @@ public class InspectionApplicationPushEventListener {
*/ */
private final static List<String> JDJY_ARRAY = Arrays.asList("AZJDJY", "GZJDJY", "WXJDJY"); private final static List<String> JDJY_ARRAY = Arrays.asList("AZJDJY", "GZJDJY", "WXJDJY");
/**
* 定首检-检验类型
*/
private final static List<String> DSJ_ARRAY = Arrays.asList("DQJY", "SCJY");
private static final BigDecimal ZERO = new BigDecimal("0.000"); private static final BigDecimal ZERO = new BigDecimal("0.000");
@Autowired
private IdxBizJgSupervisionInfoMapper idxBizJgSupervisionInfoMapper;
private final IdxBizJgSupervisionInfoMapper idxBizJgSupervisionInfoMapper;
@TransactionalEventListener(value = InspectionApplicationPushEvent.class) @TransactionalEventListener(value = InspectionApplicationPushEvent.class)
public void onApplicationEvent(InspectionApplicationPushEvent event) { public void onApplicationEvent(InspectionApplicationPushEvent event) {
...@@ -170,7 +158,6 @@ public class InspectionApplicationPushEventListener { ...@@ -170,7 +158,6 @@ public class InspectionApplicationPushEventListener {
@PostConstruct @PostConstruct
public void init() { public void init() {
ExecutorService executorService = Executors.newFixedThreadPool(threadNum); ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
companyCodeRegNameMap = JsonUtils.getResourceJson(regNameJson);
for (int i = 0; i < threadNum; i++) { for (int i = 0; i < threadNum; i++) {
executorService.execute(() -> { executorService.execute(() -> {
while (true) { while (true) {
...@@ -188,12 +175,15 @@ public class InspectionApplicationPushEventListener { ...@@ -188,12 +175,15 @@ public class InspectionApplicationPushEventListener {
} }
} }
/**
* 发送kafka指定的主题
*
* @param dockingUnitCode 对接单位appId
* @param pushLog 本地日志表
*/
private void pushData2Kafka(String dockingUnitCode, JyjcInspectionApplicationPushLog pushLog) { private void pushData2Kafka(String dockingUnitCode, JyjcInspectionApplicationPushLog pushLog) {
// 发送kafka指定的主题
String topic = String.format(INSPECTION_APPLICATION_PUSH_TOPIC, dockingUnitCode); String topic = String.format(INSPECTION_APPLICATION_PUSH_TOPIC, dockingUnitCode);
kafkaProducer.sendMessage(topic, pushLog.getPushData()); kafkaProducer.sendMessageWithLog(topic, pushLog.getPushData(), pushLog);
pushLog.setPushStatus(BizCommonConstant.PUSH_STATUS_SUCCESS);
pushLogService.updateById(pushLog);
} }
private JyjcInspectionApplicationPushLog createPushData(JyjcInspectionApplicationModel applicationModel) { private JyjcInspectionApplicationPushLog createPushData(JyjcInspectionApplicationModel applicationModel) {
...@@ -210,12 +200,6 @@ public class InspectionApplicationPushEventListener { ...@@ -210,12 +200,6 @@ public class InspectionApplicationPushEventListener {
return pushLog; return pushLog;
} }
private List<String> getCanDealInspectionType() {
List<String> canDealInspectionTypes = new ArrayList<>(JDJY_ARRAY);
canDealInspectionTypes.addAll(DSJ_ARRAY);
return canDealInspectionTypes;
}
private String calTotalLength(List<PipelinePushItemDto> pipelines) { private String calTotalLength(List<PipelinePushItemDto> pipelines) {
if (pipelines == null || pipelines.isEmpty()) { if (pipelines == null || pipelines.isEmpty()) {
return ZERO.toPlainString(); return ZERO.toPlainString();
...@@ -365,7 +349,7 @@ public class InspectionApplicationPushEventListener { ...@@ -365,7 +349,7 @@ public class InspectionApplicationPushEventListener {
private void setMaintenanceInfo(InspectionEquipData equipData, String equipUnicode) { private void setMaintenanceInfo(InspectionEquipData equipData, String equipUnicode) {
QueryWrapper<IdxBizJgMaintenanceRecordInfo> queryWrapper = new QueryWrapper<>(); QueryWrapper<IdxBizJgMaintenanceRecordInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(IdxBizJgMaintenanceRecordInfo::getRecord, equipUnicode) queryWrapper.lambda().eq(IdxBizJgMaintenanceRecordInfo::getRecord, equipUnicode)
.select(IdxBizJgMaintenanceRecordInfo::getRecord,IdxBizJgMaintenanceRecordInfo::getMeUnitCreditCode, IdxBizJgMaintenanceRecordInfo::getMeUnitName) .select(IdxBizJgMaintenanceRecordInfo::getRecord, IdxBizJgMaintenanceRecordInfo::getMeUnitCreditCode, IdxBizJgMaintenanceRecordInfo::getMeUnitName)
.orderByDesc(IdxBizJgMaintenanceRecordInfo::getRecDate).last("limit 1"); .orderByDesc(IdxBizJgMaintenanceRecordInfo::getRecDate).last("limit 1");
IdxBizJgMaintenanceRecordInfo maintenanceInfo = maintenanceInfoMapper.selectOne(queryWrapper); IdxBizJgMaintenanceRecordInfo maintenanceInfo = maintenanceInfoMapper.selectOne(queryWrapper);
if (maintenanceInfo != null) { if (maintenanceInfo != null) {
...@@ -417,7 +401,7 @@ public class InspectionApplicationPushEventListener { ...@@ -417,7 +401,7 @@ public class InspectionApplicationPushEventListener {
.last("limit 1"); .last("limit 1");
JgUseRegistrationManage manage = jgUseRegistrationManageMapper.selectOne(queryWrapper); JgUseRegistrationManage manage = jgUseRegistrationManageMapper.selectOne(queryWrapper);
if (manage != null) { if (manage != null) {
equipData.setRegUnitName(companyCodeRegNameMap.getOrDefault(manage.getReceiveCompanyCode(), manage.getReceiveOrgName())); equipData.setRegUnitName(manage.getReceiveOrgName());
} }
} }
} }
......
package com.yeejoin.amos.boot.module.jyjc.biz.kafka; package com.yeejoin.amos.boot.module.jyjc.biz.kafka;
import com.yeejoin.amos.boot.module.jyjc.api.common.BizCommonConstant;
import com.yeejoin.amos.boot.module.jyjc.api.entity.JyjcInspectionApplicationPushLog;
import com.yeejoin.amos.boot.module.jyjc.biz.service.impl.JyjcInspectionApplicationPushLogServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/** /**
* @author Administrator * @author Administrator
*/ */
@Component @Component
@Slf4j @Slf4j
@RequiredArgsConstructor
public class KafkaProducer { public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate; private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) { private final JyjcInspectionApplicationPushLogServiceImpl pushLogService;
this.kafkaTemplate = kafkaTemplate;
}
/** /**
* 发送消息(异步) * 发送消息(异步)
...@@ -26,23 +29,40 @@ public class KafkaProducer { ...@@ -26,23 +29,40 @@ public class KafkaProducer {
* @param topic 主题 * @param topic 主题
* @param message 消息内容 * @param message 消息内容
*/ */
public void sendMessage(String topic, String message) { public void sendMessageWithLog(String topic, String message, JyjcInspectionApplicationPushLog pushLog) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("kafka开始发送数据:主题:{},消息{}", topic, message); log.info("kafka开始发送数据:主题:{},消息{}", topic, message);
} }
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); try {
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
@Override future.addCallback(result -> {
public void onFailure(Throwable throwable) {
log.error("发送消息(异步) failure! topic : {}, message: {}", topic, message);
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("发送消息(异步) success!"); if (result != null) {
log.info("Kafka发送成功! 主题: {}, 分区: {}, Offset: {}", topic, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()
);
}
} }
} updatePushLogStatus(pushLog, BizCommonConstant.PUSH_STATUS_SUCCESS, null);
}); }, ex -> {
log.error("Kafka发送失败! 主题: {}, 消息: {}", topic, message, ex);
updatePushLogStatus(pushLog, BizCommonConstant.PUSH_STATUS_FAILED, ex.getMessage());
throw new RuntimeException("Kafka消息发送失败", ex); // 触发事务回滚(需配合事务配置)
});
} catch (Exception e) {
// 同步阶段即失败(如Topic不存在、Broker宕机)
log.error("Kafka同步发送失败! 主题: {}, 消息丢弃", topic, e);
updatePushLogStatus(pushLog, BizCommonConstant.PUSH_STATUS_FAILED, e.getMessage());
}
}
private void updatePushLogStatus(JyjcInspectionApplicationPushLog pushLog, String status, String errorMsg) {
try {
pushLog.setPushStatus(status);
pushLog.setRemark(errorMsg);
pushLogService.updateById(pushLog);
} catch (Exception e) {
log.error("更新推送日志状态失败! ID: {}", pushLog.getSequenceNbr(), e);
throw e;
}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment