Commit befd8474 authored by tianbo's avatar tianbo

refactor(jg): 西安导入数据修改

parent abd07747
......@@ -28,7 +28,7 @@ public class XiAnDataDockController {
/**
* 西安除电梯外七大类设备批量导入
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/importData")
@ApiOperation(httpMethod = "POST", value = "西安除电梯外七大类设备批量导入", notes = "西安除电梯外七大类设备批量导入")
public Object importPressureData(@RequestParam MultipartFile file) {
......@@ -53,7 +53,7 @@ public class XiAnDataDockController {
/**
* 西安电梯历史设备以及业务数据批量导入
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@TycloudOperation(ApiLevel = UserType.AGENCY, needAuth = false)
@PostMapping(value = "/importElevatorData")
@ApiOperation(httpMethod = "POST", value = "西安电梯历史设备以及业务数据批量导入", notes = "西安电梯历史设备以及业务数据批量导入")
public Object importElevatorData(@RequestParam MultipartFile file, @RequestParam(required = false, defaultValue = "true") Boolean isRegistration) {
......
package com.yeejoin.amos.api.openapi.face.orm.dao;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.api.openapi.face.orm.entity.OpenapiBizToken;
import org.apache.ibatis.annotations.Param;
......@@ -32,6 +33,7 @@ public interface OpenapiBizTokenMapper extends BaseMapper<OpenapiBizToken> {
@Select("select DEVELOPER_AGENCY from iot_openapi_biz_token where DEVELOPER_AGENCY is not null AND DATA_TYPE = 'cyl' group by DEVELOPER_AGENCY")
public List<String> getServiceList();
@Select("select use_unit_code, use_unit from amos_tzs_biz.tz_base_enterprise_info where app_id = '${appId}'")
@Select("select use_unit_code, use_unit from amos_tzs_biz.tz_base_enterprise_info where app_id = #{appId}")
@DS("tzs")
Map<String, String> queryUnitInfoByAppId(@Param("appId") String appId);
}
\ No newline at end of file
......@@ -66,6 +66,8 @@ public class OpenapiBizTokenService extends BaseService<OpenapiBizTokenModel, Op
idPasswordAuthModel.setPassword(DesUtil.encode(systemUserInfo.getPassword(), Constant.PASSWORD_ENCODE_KEY));
BizTokenModel bizTokenModel = new BizTokenModel();
bizTokenModel.setAppId(appId);
bizTokenModel.setProduct(appId);
RequestContext.setProduct(appId);
RequestContext.setAppKey(bizTokenModel.getAppKey());
FeignClientResult<HashMap<String, Object>> responseModel = Privilege.authClient.idpassword(idPasswordAuthModel);
......@@ -73,9 +75,6 @@ public class OpenapiBizTokenService extends BaseService<OpenapiBizTokenModel, Op
String token = (String) authModel.get("token");
bizTokenModel.setToken(token);
RequestContext.setToken(token);
// AgencyUserModel user = Privilege.agencyUserClient.getme().getResult();
// bizTokenModel.setApiCompanyCode(user.getCompanys().get(0).getCompanyCode()); // 当前一个人只能有一个单位
// bizTokenModel.setApiCompanyName(user.getCompanys().get(0).getCompanyName()); // 当前一个人只能有一个单位
// 调整为从业务表根据appId查询单位信息
bizTokenModel.setApiCompanyCode(unitCode);
bizTokenModel.setApiCompanyName(unitName);
......
......@@ -783,7 +783,7 @@ public class XiAnDataDockServiceImpl {
Map<String, Object> rMap = new HashMap<>();
// 主线程中获取登录信息传递到异步线程中
RequestContextWrapper contextWrapper = RequestContextWrapper.capture();
Set<String> importResult = ConcurrentHashMap.newKeySet(); // 使用线程安全的Set
Set<String> errorResult = ConcurrentHashMap.newKeySet(); // 使用线程安全的Set
// 分批处理,但限制同时运行的批次数
int maxConcurrentBatches = 5; // 最大并发批次数
Semaphore semaphore = new Semaphore(maxConcurrentBatches);
......@@ -812,7 +812,7 @@ public class XiAnDataDockServiceImpl {
Collection<?> data = (Collection<?>) resultMap.get("result");
data.stream()
.filter(item -> item instanceof String)
.forEach(item -> importResult.add((String) item));
.forEach(item -> errorResult.add((String) item));
}
if (resultMap.containsKey("status") && Integer.parseInt(resultMap.get("status").toString()) != 200) {
throw new RuntimeException(resultMap.get("message").toString());
......@@ -828,9 +828,9 @@ public class XiAnDataDockServiceImpl {
}
// 等待所有异步任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
if (!ValidationUtil.isEmpty(importResult)) {
rMap.put("message", importResult);
log.info("更新失败:{}", importResult);
if (!ValidationUtil.isEmpty(errorResult)) {
rMap.put("message", errorResult);
log.info("更新失败:{}", errorResult);
return rMap;
}
log.info("设备保存成功");
......
......@@ -19,7 +19,7 @@ import java.util.List;
* @date 2022-03-04
*/
@Service
@DS("cyl")
@DS("openapi")
public class CylinderSyncServiceImpl {
@Autowired
private CylCylinderFillingCheckMapper fillingCheckMapper;
......
......@@ -16,7 +16,6 @@ import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yeejoin.amos.boot.biz.common.bo.CompanyBo;
......@@ -38,6 +37,7 @@ import com.yeejoin.amos.boot.module.jg.api.dto.*;
import com.yeejoin.amos.boot.module.jg.api.entity.*;
import com.yeejoin.amos.boot.module.jg.api.enums.BusinessTypeEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.CertificateStatusEnum;
import com.yeejoin.amos.boot.module.jg.api.enums.SafetyProblemStatusEnum;
import com.yeejoin.amos.boot.module.jg.api.mapper.CommonMapper;
import com.yeejoin.amos.boot.module.jg.api.mapper.JgUseRegistrationEqMapper;
import com.yeejoin.amos.boot.module.jg.api.mapper.JgUseRegistrationManageMapper;
......@@ -176,6 +176,8 @@ public class DataDockServiceImpl {
private final PieLineDataChangeServiceImpl pieLineDataChangeService;
private final ElevatorDataTransactionService elevatorDataTransactionService;
@Value("${equip.detail.path:/mixuap?appId=1742358052905971713&id=1734100233714954241&formType=detail&record=%s&DATA_SOURCE=%s}")
private String equipRoutePath;
......@@ -825,6 +827,7 @@ public class DataDockServiceImpl {
equipmentCategoryDto.setSTATUS("已认领");
equipmentCategoryDto.setIS_DO_BUSINESS(Boolean.TRUE);
equipmentCategoryDto.setIS_COMPLETE_XA(Objects.equals("0", isCompleteXa));
equipmentCategoryDto.setProblemStatus(SafetyProblemStatusEnum.HANDLED.getCode());
esEquipmentCategory.save(equipmentCategoryDto);
}
}
......@@ -2653,39 +2656,114 @@ public class DataDockServiceImpl {
public Set<String> saveElevatorData(List<Map<String, Object>> equLists) {
RequestContextWrapper contextWrapper = RequestContextWrapper.capture();
Set<String> recordSet = Sets.newConcurrentHashSet();
Set<String> inUseRecordSet = Sets.newConcurrentHashSet();
try {
CompletableFuture.allOf(
equLists.parallelStream().map(equ -> CompletableFuture.runAsync(() -> {
Set<String> resultSet = Sets.newConcurrentHashSet();
// 分批处理大小
int batchSize = 100;
int totalSize = equLists.size();
int successCount = 0;
int failedCount = 0;
// 分批处理数据
for (int i = 0; i < totalSize; i += batchSize) {
int endIndex = Math.min(i + batchSize, totalSize);
List<Map<String, Object>> batch = equLists.subList(i, endIndex);
// 批内并行处理(但每个数据项独立事务)
List<CompletableFuture<Map<String, Object>>> futures = batch.stream()
.map(equ -> CompletableFuture.supplyAsync(() -> {
contextWrapper.apply();
try {
Object resultObj = saveElevatorDataInTransaction(equ, "jg_his_xa", null);
if (resultObj instanceof String) {
recordSet.add(resultObj.toString());
}
if (resultObj instanceof Map) {
Map<String, Object> result = (Map<String, Object>) resultObj;
inUseRecordSet.add(toJSONString(result));
}
} catch (LocalBadRequest e) {
inUseRecordSet.add(e.getMessage());
Object resultObj = elevatorDataTransactionService.saveSingleElevatorData(equ);
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("data", resultObj);
result.put("equ", equ);
return result;
} catch (Exception e) {
Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("exception", e);
result.put("equ", equ);
return result;
}
}, executorService)).toArray(CompletableFuture[]::new)
).join();
} catch (Exception e) {
throw new RuntimeException(e);
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
eventPublisher.publish(new EquipCreateOrEditEvent(this, BusinessTypeEnum.JG_NEW_EQUIP.name(), recordSet, EquipCreateOrEditEvent.EquipType.equip));
}, executorService))
.collect(Collectors.toList());
// 等待批内所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 处理批内结果
for (CompletableFuture<Map<String, Object>> future : futures) {
try {
Map<String, Object> result = future.get();
if ((Boolean) result.get("success")) {
Object resultObj = result.get("data");
if (resultObj instanceof String) {
recordSet.add(resultObj.toString());
successCount++;
}
if (resultObj instanceof Map) {
resultSet.add(JSONObject.toJSONString(resultObj));
failedCount++;
}
} else {
Exception e = (Exception) result.get("exception");
Map<String, Object> equ = (Map<String, Object>) result.get("equ");
handleProcessException(e, equ, resultSet);
failedCount++;
}
} catch (Exception e) {
log.error("获取处理结果时发生异常", e);
failedCount++;
}
}
});
log.info("未更新设备record:{}", inUseRecordSet);
return inUseRecordSet;
log.info("已处理数据 {}/{} - 成功: {}, 失败: {}",
Math.min(i + batchSize, totalSize), totalSize, successCount, failedCount);
}
// 发布成功记录的事件
if (!recordSet.isEmpty()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
eventPublisher.publish(new EquipCreateOrEditEvent(this, BusinessTypeEnum.JG_NEW_EQUIP.name(), recordSet, EquipCreateOrEditEvent.EquipType.equip));
}
});
}
log.info("数据导入完成 - 总计: {}条, 成功: {}条, 失败: {}条",
totalSize, successCount, failedCount);
log.info("未更新设备record:{}", resultSet);
return resultSet;
}
private void handleProcessException(Exception e, Map<String, Object> equ, Set<String> inUseRecordSet) {
try {
String traceInfo = Arrays.stream(e.getStackTrace())
.filter(x -> x.getClassName().contains("com.yeejoin"))
.map(x -> x.getClassName() + "." + x.getMethodName() + ":" + x.getLineNumber())
.collect(Collectors.toList())
.toString();
String record = Optional.ofNullable(equ.get("record"))
.map(String::valueOf)
.orElse(UUID.randomUUID().toString());
// esEquipmentCategory.deleteById(record);
ExcelImportErrorLogDto errorLogDto = JSON.parseObject(toJSONString(equ), ExcelImportErrorLogDto.class);
errorLogDto.setErrorInfo(e.getMessage());
errorLogDto.setTraceInfo(traceInfo);
errorLogDto.setCreateTime(new Date());
excelImportErrorLogDao.save(errorLogDto);
inUseRecordSet.add(JSONObject.toJSONString(errorLogDto));
log.error("处理数据时发生异常,record: {}", record, e);
} catch (Exception handleException) {
log.error("处理异常信息时发生错误", handleException);
}
}
@GlobalTransactional(rollbackFor = Exception.class)
public Object saveElevatorDataInTransaction(Map<String, Object> equ, String dataSource, String remark) {
Map<String, String> errorResult = Maps.newConcurrentMap();
String record = Optional.ofNullable(equ.get("record")).map(String::valueOf).orElse(UUID.randomUUID().toString());
......@@ -2696,83 +2774,61 @@ public class DataDockServiceImpl {
String useRegistrationCode = Optional.ofNullable(equ.get("useOrgCode")).map(String::valueOf).orElse("").trim();
dataSource = !ValidationUtil.isEmpty(equ.get("dataSource")) ? MapUtil.getStr(equ, "dataSource") : dataSource;
try {
if (!equList.isEmpty()) {
// 判断设备是否已经做过除历史登记外的其他业务,如果做了其他业务则不能更新并记录反馈
Integer useCount = commonMapper.countEquipInUseTimesForXaElevator(record);
if (useCount > 0) {
errorResult.put("id", record);
errorResult.put("type", "inUse");
errorResult.put("msg", inUseError);
return errorResult;
}
// 判断是否做过历史登记
boolean isRegistered = true;
Map<String, Object> useRegistrationMap = jgUseRegistrationServiceImpl.getJgUseRegistrationMapper().getUseRegistrationDetail(record);
if (ValidationUtil.isEmpty(useRegistrationMap) || ValidationUtil.isEmpty(useRegistrationMap.get("UseRegistratSequenceNbr"))) {
isRegistered = false;
}
IdxBizJgRegisterInfo oldRegisterInfo = idxBizJgRegisterInfoService.getOne(new QueryWrapper<IdxBizJgRegisterInfo>().eq("record", record));
IdxBizJgUseInfo useInfo = this.saveUseInfo(equ, record, dataSource, remark, null);
this.saveDesignInfo(equ, record);
this.saveFactoryInfo(equ, record);
IdxBizJgRegisterInfo registerInfo = this.saveRegisterInfo(equ, record, equCategory, null);
// 西安导入电梯属地监管部门处理
this.handleSupervisionOffice(equ);
this.saveSupervisionInfo(equ, record);
IdxBizJgOtherInfo otherInfo = this.saveOtherInfo(equ, record, equList);
this.saveInspectInfo(equ, record);
this.saveTechParams(equ, record, equList);
this.saveEquInfoToEs(record, isCompleteXa);
this.saveInstallInfo(equ);//安装信息
this.historyEquUpdateMaintenanceInfo(equ);//维保信息
// isRegistered=false是未做过登记设备,且dataSource=jg_his,且有使用登记证号则做历史登记
if (!ValidationUtil.isEmpty(useRegistrationCode) && !isRegistered && "jg_his_xa".equals(dataSource)) {
this.handleHistoryEquip(equ);//历史登记
}
// isRegistered=true表示已做过历史平台登记,则需更新历史单据等信息。
if (isRegistered) {
this.updateHistoryInfo(equ, registerInfo, otherInfo, oldRegisterInfo, useInfo);
}
// 西安未做登记的新设备修改纳管状态为已纳管
if ("jg_xa".equals(dataSource)) {
this.updateUseManagementStatus(useInfo);
this.saveNewEquipResumeInfo(useInfo);
}
} else if (!businessId.isEmpty()) {
this.saveResumeInfo(equ);
equ.put("record", equ.get("resumeSeq"));
equ.put("xaSerial", equ.get("resumeSeq"));
} else {
String orgBranchName = (String) Optional.ofNullable(equ.get("supervisionOffice"))
.orElseGet(() -> equ.get("supervisionDepartment"));
String orgBranchCode = (String) Optional.ofNullable(equ.get("supervisionOfficeCode"))
.orElseGet(() -> equ.get("supervisionDepartmentCode"));
this.updateOrgBranchInfo(record, orgBranchName, orgBranchCode);
}
return record;
} catch (Exception e) {
String traceInfo = Arrays.stream(e.getStackTrace()).filter(x-> x.getClassName().contains("com.yeejoin")).map(x -> x.getClassName() + "."+ x.getMethodName() + ":" + x.getLineNumber()).collect(Collectors.toList()).toString();
esEquipmentCategory.deleteById(record);
// superviseInfoMapper.deleteDataAll(Collections.singletonList(record));
ExcelImportErrorLogDto errorLogDto = JSON.parseObject(toJSONString(equ), ExcelImportErrorLogDto.class);
errorLogDto.setErrorInfo(e.getMessage());
errorLogDto.setTraceInfo(traceInfo);
errorLogDto.setCreateTime(new Date());
excelImportErrorLogDao.save(errorLogDto);
String errorMessage = e.getMessage();
if (errorMessage == null) {
errorMessage = "Unknown error occurred: " + e.getClass().getName();
}
log.error("{}数据:保存时出现异常,对应数据:{}", dataSource, toJSONString(equ), e);
errorResult.put("id", record);
errorResult.put("type", "error");
errorResult.put("msg", errorMessage);
errorResult.put("traceInfo", traceInfo);
throw new LocalBadRequest(toJSONString(errorResult));
if (!equList.isEmpty()) {
// 判断设备是否已经做过除历史登记外的其他业务,如果做了其他业务则不能更新并记录反馈
Integer useCount = commonMapper.countEquipInUseTimesForXaElevator(record);
if (useCount > 0) {
errorResult.put("id", record);
errorResult.put("type", "inUse");
errorResult.put("msg", inUseError);
return errorResult;
}
// 判断是否做过历史登记
boolean isRegistered = true;
Map<String, Object> useRegistrationMap = jgUseRegistrationServiceImpl.getJgUseRegistrationMapper().getUseRegistrationDetail(record);
if (ValidationUtil.isEmpty(useRegistrationMap) || ValidationUtil.isEmpty(useRegistrationMap.get("UseRegistratSequenceNbr"))) {
isRegistered = false;
}
IdxBizJgRegisterInfo oldRegisterInfo = idxBizJgRegisterInfoService.getOne(new QueryWrapper<IdxBizJgRegisterInfo>().eq("record", record));
IdxBizJgUseInfo useInfo = this.saveUseInfo(equ, record, dataSource, remark, null);
this.saveDesignInfo(equ, record);
this.saveFactoryInfo(equ, record);
IdxBizJgRegisterInfo registerInfo = this.saveRegisterInfo(equ, record, equCategory, null);
// 西安导入电梯属地监管部门处理
this.handleSupervisionOffice(equ);
this.saveSupervisionInfo(equ, record);
IdxBizJgOtherInfo otherInfo = this.saveOtherInfo(equ, record, equList);
this.saveInspectInfo(equ, record);
this.saveTechParams(equ, record, equList);
this.saveEquInfoToEs(record, isCompleteXa);
this.saveInstallInfo(equ);//安装信息
this.historyEquUpdateMaintenanceInfo(equ);//维保信息
// isRegistered=false是未做过登记设备,且dataSource=jg_his,且有使用登记证号则做历史登记
if (!ValidationUtil.isEmpty(useRegistrationCode) && !isRegistered && "jg_his_xa".equals(dataSource)) {
this.handleHistoryEquip(equ);//历史登记
}
// isRegistered=true表示已做过历史平台登记,则需更新历史单据等信息。
if (isRegistered) {
this.updateHistoryInfo(equ, registerInfo, otherInfo, oldRegisterInfo, useInfo);
}
// 西安未做登记的新设备修改纳管状态为已纳管
if ("jg_xa".equals(dataSource)) {
this.updateUseManagementStatus(useInfo);
this.saveNewEquipResumeInfo(useInfo);
}
} else if (!businessId.isEmpty()) {
this.saveResumeInfo(equ);
equ.put("record", equ.get("resumeSeq"));
equ.put("xaSerial", equ.get("resumeSeq"));
} else {
String orgBranchName = (String) Optional.ofNullable(equ.get("supervisionOffice"))
.orElseGet(() -> equ.get("supervisionDepartment"));
String orgBranchCode = (String) Optional.ofNullable(equ.get("supervisionOfficeCode"))
.orElseGet(() -> equ.get("supervisionDepartmentCode"));
this.updateOrgBranchInfo(record, orgBranchName, orgBranchCode);
}
return record;
}
private void saveNewEquipResumeInfo(IdxBizJgUseInfo useInfo) {
......
package com.yeejoin.amos.boot.module.jg.biz.service.impl;
import io.seata.spring.annotation.GlobalTransactional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
@Service
public class ElevatorDataTransactionService {
private final DataDockServiceImpl dataDockService;
@Autowired
public ElevatorDataTransactionService(@Lazy DataDockServiceImpl dataDockService) {
this.dataDockService = dataDockService;
}
@Transactional(rollbackFor = Exception.class)
@GlobalTransactional(rollbackFor = Exception.class)
public Object saveSingleElevatorData(Map<String, Object> equ) {
return dataDockService.saveElevatorDataInTransaction(equ, "jg_his_xa", null);
}
}
......@@ -652,7 +652,7 @@ public class JgChangeRegistrationTransferServiceImpl extends BaseService<JgChang
*/
@Override
@ResultFieldMapping({
@ResultFieldMapping.ResultFieldMap(sourceField = "changeRegistrationTransfer.transferUseInfo",
@ResultFieldMapping.ResultFieldMap(sourceField = "changeRegistrationTransfer.estateUnitCode",
targetField = "changeRegistrationTransfer.estateUnitSeq",
serviceClass = CommonServiceImpl.class,
queryMethod = "queryTcmUnitSeqByCreditCode",
......
......@@ -11,7 +11,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.typroject.tyboot.core.rdbms.annotation.Condition;
import org.typroject.tyboot.core.rdbms.annotation.Operator;
@DS("cyl")
@DS("openapi")
@Service
public class CylinderOpenApiServiceImpl {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment