Commit c5aa0d68 authored by tianbo's avatar tianbo

refactor(jg): 优化安全问题追溯功能

- 在 CommonMapper.xml 中添加 problemTime 字段,用于记录问题时间 - 修改 SafetyProblemTopicMessage 监听器,增加日志记录和异常处理- 更新 SafetyProblemTracingGenServiceImpl,调整批量处理大小并优化日志输出
parent 244abd0a
......@@ -1055,6 +1055,7 @@
SELECT
ui."RECORD",
tt."INFORM_END",
tt."INFORM_END" as problemTime,
ui."USE_UNIT_NAME",
ui."USE_UNIT_CREDIT_CODE",
(select unit_type from tz_base_enterprise_info where use_unit_code = ui."USE_UNIT_CREDIT_CODE") unitType,
......@@ -1092,6 +1093,7 @@
SELECT
ui."RECORD",
tt."NEXT_INSPECT_DATE",
tt."NEXT_INSPECT_DATE" as problemTime,
ui."USE_UNIT_NAME",
ui."USE_UNIT_CREDIT_CODE",
(select unit_type from tz_base_enterprise_info where use_unit_code = ui."USE_UNIT_CREDIT_CODE") unitType,
......@@ -1120,8 +1122,9 @@
WHERE d.rowNum1 =1 and d."NEXT_INSPECT_DATE" <![CDATA[<]]> to_char(now(), 'YYYY-MM-DD')
) tt on tt."RECORD" = ui."RECORD"
WHERE si."ORG_BRANCH_CODE" like '50%'
and oi."CLAIM_STATUS" not in ('草稿','已拒领','待认领')
and tt."NEXT_INSPECT_DATE" is not null
AND oi."CLAIM_STATUS" not in ('草稿','已拒领','待认领')
AND (oi."STATUS" is null or oi."STATUS" = '1')
AND tt."NEXT_INSPECT_DATE" is not null
</select>
<select id="countBizFinishedNumForDP" resultType="java.lang.Long">
SELECT count(1) FROM "tzs_jg_installation_notice" where receive_company_org_code like CONCAT(#{orgCode}, '%') and notice_status = '6616'
......@@ -2645,6 +2648,7 @@
ul.sequence_nbr problemSourceId,
ul.cert_no certNo,
ul.expiry_date expiryDate,
ul.expiry_date as problemTime,
ul.item_code itemCode,
ul.item_code_name itemName,
ul.sub_item_code subItemCode,
......@@ -2710,6 +2714,7 @@
tup.sequence_nbr problemSourceId,
tup.cert_no certNo,
tup.expiry_date expiryDate,
tup.expiry_date as problemTime,
tui.name userName,
tui.sequence_nbr userSeq
FROM
......
......@@ -78,34 +78,45 @@ public class SafetyProblemTopicMessage extends EmqxListener {
emqKeeper.subscript(buildTopic(problemEventTopic), 2, this);
executorService = Executors.newFixedThreadPool(threadNumber);
for (int i = 0; i < threadNumber; i++) {
final int threadIndex = i;
executorService.execute(() -> {
try {
log.info("启动消息处理线程-{}", threadIndex);
while (!Thread.currentThread().isInterrupted()) {
SafetyProblemEvent safetyProblemMessageEvent = blockingQueue.take();
try {
// 计算耗时开始
long start = System.currentTimeMillis();
log.info("接收到问题生产原始消息:{}", safetyProblemMessageEvent.getMessage());
JSONArray jsonObject = JSON.parseArray(safetyProblemMessageEvent.getMessage().toString());
log.info("接收到问题生产消息:{}", jsonObject);
SafetyProblemEventHandler eventHandler = SafetyProblemEventHandlerFactory.createProblemHandler(safetyProblemMessageEvent.getTopic());
eventHandler.handle(safetyProblemMessageEvent);
log.info("处理问题生产消息完成");
log.info("处理问题生产消息完成,耗时:{}", System.currentTimeMillis() - start);
} catch (JSONException | ClassCastException | IllegalArgumentException e) {
log.error("处理消息异常", e);
} catch (Exception e) {
log.error("处理消息时发生未预期异常", e);
}
}
} catch (InterruptedException e) {
log.error("处理线程被中断,准备退出", e);
log.warn("处理线程{}被中断,准备退出", threadIndex);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("处理线程{}发生异常,准备退出", threadIndex, e);
} finally {
log.info("消息处理线程-{}退出", threadIndex);
}
});
}
log.info("消息监听器初始化完成,线程数:{}", threadNumber);
}
@Override
public void processMessage(String topic, MqttMessage message) {
log.info("接收问题生产消息开始");
log.info("接收问题生产消息开始,当前队列大小:{}", blockingQueue.size());
blockingQueue.add(new SafetyProblemEvent(topic, message));
log.info("接收问题生产消息完成");
log.info("接收问题生产消息完成,添加后队列大小:{}", blockingQueue.size());
}
public static void generateProblem(JSONArray jsonArray, SafetyProblemTypeEnum problemTypeEnum, SafetyProblemTracingServiceImpl safetyProblemTracingService) {
......@@ -129,7 +140,7 @@ public class SafetyProblemTopicMessage extends EmqxListener {
safetyProblemTracing.setSourceType(SafetyProblemSourceTypeEnum.EQUIP.getName());
safetyProblemTracing.setSourceTypeCode(SafetyProblemSourceTypeEnum.EQUIP.getCode());
safetyProblemTracing.setSourceId(json.getOrDefault("RECORD", "").toString());
safetyProblemTracing.setProblemTime(new Date());
safetyProblemTracing.setProblemTime(getProblemTime(json));
safetyProblemTracing.setEquipSuperviseCode(json.getOrDefault("SUPERVISORY_CODE", "").toString());
safetyProblemTracing.setEquipList(json.getOrDefault("equList", "").toString());
safetyProblemTracing.setEquipListCode(json.getOrDefault("EQU_LIST", "").toString());
......@@ -157,6 +168,24 @@ public class SafetyProblemTopicMessage extends EmqxListener {
}
}
public static Date getProblemTime(JSONObject json) {
// 安全处理problemTime字段,支持时间戳转换
Object problemTimeObj = json.getOrDefault("problemTime", "");
Date problemTime = null;
if (problemTimeObj instanceof Long) {
problemTime = new Date((Long) problemTimeObj);
} else if (problemTimeObj instanceof String && !((String) problemTimeObj).isEmpty()) {
try {
problemTime = new Date(Long.parseLong((String) problemTimeObj));
} catch (NumberFormatException e) {
log.warn("无法解析problemTime: {}", problemTimeObj);
}
} else if (problemTimeObj instanceof Date) {
problemTime = (Date) problemTimeObj;
}
return problemTime;
}
public static void generateUnitProblem(JSONArray jsonArray, SafetyProblemTypeEnum problemTypeEnum, SafetyProblemTracingServiceImpl safetyProblemTracingService) {
generateProblem2(jsonArray, problemTypeEnum, safetyProblemTracingService);
}
......
......@@ -143,7 +143,7 @@ public class SafetyProblemTracingGenServiceImpl{
return;
}
List<String> records = mapList.stream().map(m -> m.get("RECORD").toString()).collect(Collectors.toList());
int batchSize = 10000;
int batchSize = 1000;
for (int i = 0; i < records.size(); i += batchSize) {
List<String> batch = records.subList(i, Math.min(i + batchSize, records.size()));
// 更新设备状态为未处理(异常)
......@@ -157,7 +157,9 @@ public class SafetyProblemTracingGenServiceImpl{
esEquipmentCategoryDto.add(equipmentCategoryDto);
}
esEquipmentCategory.saveAll(esEquipmentCategoryDto);
sendSafetyProblemMessage(mapList, safetyProblemTypeEnum);
// mapList也根据batch分割
List<Map<String, Object>> mapListBatch = mapList.subList(i, Math.min(i + batchSize, mapList.size()));
sendSafetyProblemMessage(mapListBatch, safetyProblemTypeEnum);
}
}
......@@ -198,6 +200,8 @@ public class SafetyProblemTracingGenServiceImpl{
if (CollectionUtil.isNotEmpty(mapList)){
try {
logger.info("发送安全追溯问题主题---->{}", safetyProblemTypeEnum.getTopic());
byte[] payload = JSON.toJSONBytes(mapList);
logger.info("发送数据大小:{} KB", payload.length / 1024);
emqKeeper.getMqttClient().publish(safetyProblemTypeEnum.getTopic(), JSON.toJSONBytes(mapList), 2, false);
logger.info("发送安全追溯问题消息成功---->");
} catch (MqttException e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment