Commit 518c38d2 authored by suhuiguang's avatar suhuiguang

feat(综合搜索):增量更新

1.人员、企业、设备增量更新公共代码
parent 835f83a5
package com.yeejoin.amos.boot.module.common.api.service;
import com.yeejoin.amos.boot.module.common.api.dto.TzsDataRefreshMessageDto;
public interface IDataRefreshService {
void refresh(TzsDataRefreshMessageDto message);
}
package com.yeejoin.amos.boot.module.common.biz.change;
import org.springframework.context.ApplicationEvent;
public class DataChangeEvent extends ApplicationEvent {
/**
* Create a new {@code ApplicationEvent}.
*
* @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null})
*/
public DataChangeEvent(Object source) {
super(source);
}
}
package com.yeejoin.amos.boot.module.tcm.biz.refresh;
import com.yeejoin.amos.boot.module.common.api.dto.TzsDataRefreshMessageDto;
import com.yeejoin.amos.boot.module.common.api.service.IDataRefreshDispatch;
import com.yeejoin.amos.boot.module.common.biz.refresh.factory.RefreshHandlerFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class TcmRefreshDispatch implements IDataRefreshDispatch {
private final RefreshHandlerFactory handlerFactory;
@Override
public void doDispatch(TzsDataRefreshMessageDto message) {
}
}
package com.yeejoin.amos.boot.module.common.biz.refresh;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.module.common.api.dto.TzsDataRefreshMessageDto;
import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage;
import com.yeejoin.amos.boot.module.common.api.service.IDataRefreshDispatch;
import com.yeejoin.amos.boot.module.common.biz.service.impl.TzsDataRefreshMessageServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;
@Component
@Slf4j
@RequiredArgsConstructor
public class DataRefreshListener {
@Value("${data.opEvent.deal.thread.number:1}")
private int threadNumber;
private final BlockingQueue<DataRefreshEvent> queue = new LinkedBlockingQueue<>();
private ExecutorService executorService;
@Autowired
private Optional<IDataRefreshDispatch> dataRefreshService;
private final TzsDataRefreshMessageServiceImpl tzsDataRefreshMessageService;
@TransactionalEventListener(value = DataRefreshEvent.class)
@Async
public void onEquipCreateOrEdit(DataRefreshEvent event) {
log.info("收到变更消息:{}", JSONObject.toJSONString(event));
queue.add(event);
}
@PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(threadNumber);
IntStream.range(0, threadNumber).forEach(i -> {
executorService.execute(() -> {
while (true) {
try {
DataRefreshEvent event = queue.take();
processEvent(event);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
});
}
private void processEvent(DataRefreshEvent event) {
TzsDataRefreshMessage message = new TzsDataRefreshMessage();
try {
// 1.记录 message
message.setDataId(event.getDataId());
message.setDataType(event.getDataType());
message.setOperation(event.getOperation().name());
message.setStatus(0);
tzsDataRefreshMessageService.save(message);
TzsDataRefreshMessageDto dataRefreshMessageDto = BeanUtil.copyProperties(message, TzsDataRefreshMessageDto.class);
// 2.调用更新处理
dataRefreshService.ifPresent(service -> service.refresh(dataRefreshMessageDto));
} catch (Exception e) {
message.setStatus(2); // 标记为失败
message.setErrorMsg(e.getMessage());
tzsDataRefreshMessageService.saveOrUpdate(message);
}
}
}
package com.yeejoin.amos.boot.module.tcm.biz.refresh.handler;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.amos.boot.biz.common.entity.BaseEntity;
import com.yeejoin.amos.boot.module.common.api.dao.EsBaseEnterpriseInfoDao;
import com.yeejoin.amos.boot.module.common.api.dao.EsUserInfoDao;
import com.yeejoin.amos.boot.module.common.api.entity.EsBaseEnterpriseInfo;
import com.yeejoin.amos.boot.module.common.api.entity.EsUserInfo;
import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage;
import com.yeejoin.amos.boot.module.common.api.service.IDataRefreshHandler;
import com.yeejoin.amos.boot.module.common.biz.utils.RefreshDataUtils;
import com.yeejoin.amos.boot.module.tcm.api.entity.TzBaseEnterpriseInfo;
import com.yeejoin.amos.boot.module.tcm.api.entity.TzBaseUnitLicence;
import com.yeejoin.amos.boot.module.tcm.api.entity.TzsUserInfo;
import com.yeejoin.amos.boot.module.tcm.api.mapper.TzBaseEnterpriseInfoMapper;
import com.yeejoin.amos.boot.module.tcm.api.mapper.TzBaseUnitLicenceMapper;
import com.yeejoin.amos.boot.module.tcm.api.mapper.TzsUserInfoMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
public class CompanyRefreshHandler implements IDataRefreshHandler {
private final EsBaseEnterpriseInfoDao enterpriseInfoDao;
private final TzBaseUnitLicenceMapper licenceMapper;
private final TzBaseEnterpriseInfoMapper tzBaseEnterpriseInfoMapper;
private final TzsUserInfoMapper userInfoMapper;
private final EsUserInfoDao esUserInfoDao;
@Override
public String supportType() {
return "company";
}
@Override
public void doRefresh(TzsDataRefreshMessage message) {
// 企业信息更新
TzBaseEnterpriseInfo enterpriseInfo = tzBaseEnterpriseInfoMapper.selectById(message.getDataId());
EsBaseEnterpriseInfo esBaseEnterpriseInfo = new EsBaseEnterpriseInfo();
BeanUtil.copyProperties(enterpriseInfo, esBaseEnterpriseInfo);
esBaseEnterpriseInfo.setSequenceNbr(enterpriseInfo.getSequenceNbr() + "");
esBaseEnterpriseInfo.setEquipCategory(RefreshDataUtils.castStrList2String(enterpriseInfo.getEquipCategory()));
List<TzBaseUnitLicence> unitLicences = licenceMapper.selectList(new LambdaQueryWrapper<TzBaseUnitLicence>().eq(TzBaseUnitLicence::getUnitCode, enterpriseInfo.getUseUnitCode()).eq(TzBaseUnitLicence::getIsDelete, false));
List<EsBaseEnterpriseInfo.License> licenses = unitLicences.stream().map(lis -> {
EsBaseEnterpriseInfo.License esLicense = new EsBaseEnterpriseInfo.License();
BeanUtil.copyProperties(lis, esLicense);
return esLicense;
}).collect(Collectors.toList());
esBaseEnterpriseInfo.setLicenses(licenses);
enterpriseInfoDao.save(esBaseEnterpriseInfo);
// 人员更新冗余的单位信息
List<TzsUserInfo> userOfOneUnit = userInfoMapper.selectList(new LambdaQueryWrapper<TzsUserInfo>().eq(TzsUserInfo::getUnitCode, enterpriseInfo.getUseUnitCode()).eq(BaseEntity::getIsDelete, false).select(BaseEntity::getSequenceNbr));
Iterable<EsUserInfo> userInfosEs = esUserInfoDao.findAllById(userOfOneUnit.stream().map(u -> String.valueOf(u.getSequenceNbr())).collect(Collectors.toList()));
userInfosEs.forEach(esUserInfo -> {
esUserInfo.setUnitType(esBaseEnterpriseInfo.getUnitType());
esUserInfo.setSuperviseOrgName(esBaseEnterpriseInfo.getSuperviseOrgName());
esUserInfo.setSuperviseOrgCode(esBaseEnterpriseInfo.getSuperviseOrgCode());
});
if (userInfosEs.iterator().hasNext()) {
esUserInfoDao.saveAll(userInfosEs);
}
}
}
package com.yeejoin.amos.boot.module.tcm.biz.refresh.handler;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.amos.boot.biz.common.entity.BaseEntity;
import com.yeejoin.amos.boot.module.common.api.dao.EsBaseEnterpriseInfoDao;
import com.yeejoin.amos.boot.module.common.api.dao.EsUserInfoDao;
import com.yeejoin.amos.boot.module.common.api.entity.EsBaseEnterpriseInfo;
import com.yeejoin.amos.boot.module.common.api.entity.EsUserInfo;
import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage;
import com.yeejoin.amos.boot.module.common.api.service.IDataRefreshHandler;
import com.yeejoin.amos.boot.module.common.biz.utils.RefreshDataUtils;
import com.yeejoin.amos.boot.module.tcm.api.entity.TzBaseEnterpriseInfo;
import com.yeejoin.amos.boot.module.tcm.api.entity.TzBaseUnitLicence;
import com.yeejoin.amos.boot.module.tcm.api.entity.TzsUserInfo;
import com.yeejoin.amos.boot.module.tcm.api.mapper.TzBaseEnterpriseInfoMapper;
import com.yeejoin.amos.boot.module.tcm.api.mapper.TzBaseUnitLicenceMapper;
import com.yeejoin.amos.boot.module.tcm.api.mapper.TzsUserInfoMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
@Slf4j
public class CompanyRefreshHandler implements IDataRefreshHandler {
private final EsBaseEnterpriseInfoDao enterpriseInfoDao;
private final TzBaseUnitLicenceMapper licenceMapper;
private final TzBaseEnterpriseInfoMapper tzBaseEnterpriseInfoMapper;
private final TzsUserInfoMapper userInfoMapper;
private final EsUserInfoDao esUserInfoDao;
@Override
public String supportType() {
return "company";
}
@Override
public void doRefresh(TzsDataRefreshMessage message) {
log.info("3库数据,企业开始刷库:唯一标识:{}", message.getDataId());
// 企业信息更新
TzBaseEnterpriseInfo enterpriseInfo = tzBaseEnterpriseInfoMapper.selectById(message.getDataId());
EsBaseEnterpriseInfo esBaseEnterpriseInfo = new EsBaseEnterpriseInfo();
BeanUtil.copyProperties(enterpriseInfo, esBaseEnterpriseInfo);
esBaseEnterpriseInfo.setSequenceNbr(enterpriseInfo.getSequenceNbr() + "");
esBaseEnterpriseInfo.setEquipCategory(RefreshDataUtils.castStrList2String(enterpriseInfo.getEquipCategory()));
List<TzBaseUnitLicence> unitLicences = licenceMapper.selectList(new LambdaQueryWrapper<TzBaseUnitLicence>().eq(TzBaseUnitLicence::getUnitCode, enterpriseInfo.getUseUnitCode()).eq(TzBaseUnitLicence::getIsDelete, false));
List<EsBaseEnterpriseInfo.License> licenses = unitLicences.stream().map(lis -> {
EsBaseEnterpriseInfo.License esLicense = new EsBaseEnterpriseInfo.License();
BeanUtil.copyProperties(lis, esLicense);
return esLicense;
}).collect(Collectors.toList());
esBaseEnterpriseInfo.setLicenses(licenses);
enterpriseInfoDao.save(esBaseEnterpriseInfo);
// 人员更新冗余的单位信息
List<TzsUserInfo> userOfOneUnit = userInfoMapper.selectList(new LambdaQueryWrapper<TzsUserInfo>().eq(TzsUserInfo::getUnitCode, enterpriseInfo.getUseUnitCode()).eq(BaseEntity::getIsDelete, false).select(BaseEntity::getSequenceNbr));
Iterable<EsUserInfo> userInfosEs = esUserInfoDao.findAllById(userOfOneUnit.stream().map(u -> String.valueOf(u.getSequenceNbr())).collect(Collectors.toList()));
userInfosEs.forEach(esUserInfo -> {
esUserInfo.setUnitType(esBaseEnterpriseInfo.getUnitType());
esUserInfo.setSuperviseOrgName(esBaseEnterpriseInfo.getSuperviseOrgName());
esUserInfo.setSuperviseOrgCode(esBaseEnterpriseInfo.getSuperviseOrgCode());
});
if (userInfosEs.iterator().hasNext()) {
esUserInfoDao.saveAll(userInfosEs);
}
}
}
package com.yeejoin.amos.boot.module.tcm.biz.refresh;
import com.yeejoin.amos.boot.module.common.api.dto.TzsDataRefreshMessageDto;
import com.yeejoin.amos.boot.module.common.api.service.IDataRefreshHandler;
import org.springframework.stereotype.Component;
@Component
public class CompanyRefreshHandler implements IDataRefreshHandler {
@Override
public String supportType() {
return "company";
}
@Override
public void doRefresh(TzsDataRefreshMessageDto message) {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment