Commit 4a939d48 authored by suhuiguang's avatar suhuiguang

feat(综合搜索):增量更新

1.、数据同步增加删除逻辑
parent f265cec7
...@@ -342,6 +342,12 @@ public class ESEquipmentInfo { ...@@ -342,6 +342,12 @@ public class ESEquipmentInfo {
private LocalDate INSPECT_DATE; private LocalDate INSPECT_DATE;
/** /**
* 单位类型-多个逗号分隔开-实时的单位类型
*/
@Field(type = FieldType.Keyword)
private String unitType;
/**
* 最新一条检验信息 * 最新一条检验信息
*/ */
@Field(type = FieldType.Nested) @Field(type = FieldType.Nested)
......
...@@ -2,7 +2,9 @@ package com.yeejoin.amos.boot.module.common.api.service; ...@@ -2,7 +2,9 @@ package com.yeejoin.amos.boot.module.common.api.service;
import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage; import com.yeejoin.amos.boot.module.common.api.entity.TzsDataRefreshMessage;
import java.util.List;
public interface IDataRefreshDispatch { public interface IDataRefreshDispatch {
void doDispatch(TzsDataRefreshMessage message); void doDispatch(String dataType, List<TzsDataRefreshMessage> messages);
} }
...@@ -3,13 +3,15 @@ package com.yeejoin.amos.boot.module.common.biz.refresh; ...@@ -3,13 +3,15 @@ package com.yeejoin.amos.boot.module.common.biz.refresh;
import lombok.Getter; import lombok.Getter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import java.util.List;
@Getter @Getter
public class DataRefreshEvent extends ApplicationEvent { public class DataRefreshEvent extends ApplicationEvent {
/** /**
* 业务数据ID:设备record、企业的id、人员的id * 业务数据ID:设备record、企业的id、人员的id
*/ */
private final String dataId; private final List<String> dataIds;
/** /**
* 数据类型 * 数据类型
...@@ -27,9 +29,9 @@ public class DataRefreshEvent extends ApplicationEvent { ...@@ -27,9 +29,9 @@ public class DataRefreshEvent extends ApplicationEvent {
* @param source the object on which the event initially occurred or with * @param source the object on which the event initially occurred or with
* which the event is associated (never {@code null}) * which the event is associated (never {@code null})
*/ */
public DataRefreshEvent(Object source, String dataId, String dataType, Operation operation) { public DataRefreshEvent(Object source, List<String> dataIds, String dataType, Operation operation) {
super(source); super(source);
this.dataId = dataId; this.dataIds = dataIds;
this.dataType = dataType; this.dataType = dataType;
this.operation = operation; this.operation = operation;
} }
......
...@@ -13,6 +13,7 @@ import org.springframework.scheduling.annotation.Async; ...@@ -13,6 +13,7 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
import java.util.List;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
...@@ -24,15 +25,17 @@ public class DataRefreshDispatcher implements IDataRefreshDispatch { ...@@ -24,15 +25,17 @@ public class DataRefreshDispatcher implements IDataRefreshDispatch {
@Override @Override
@Async @Async
public void doDispatch(TzsDataRefreshMessage refreshMessage) { public void doDispatch(String dataType, List<TzsDataRefreshMessage> messages) {
try { IDataRefreshHandler dataRefreshHandler = refreshHandlerFactory.getRefreshHandler(dataType);
IDataRefreshHandler dataRefreshHandler = refreshHandlerFactory.getRefreshHandler(refreshMessage.getDataType()); messages.forEach(message -> {
dataRefreshHandler.doRefresh(refreshMessage); try {
markRefreshSuccess(refreshMessage); dataRefreshHandler.doRefresh(message);
} catch (Exception e) { markRefreshSuccess(message);
log.error("三库数据刷新执行失败,消息内容:{}", JSONObject.toJSONString(refreshMessage), e); } catch (Exception e) {
markRefreshFailure(refreshMessage); log.error("三库数据刷新执行失败,消息内容:{}", JSONObject.toJSONString(messages), e);
} markRefreshFailure(message);
}
});
} }
private void markRefreshSuccess(TzsDataRefreshMessage message) { private void markRefreshSuccess(TzsDataRefreshMessage message) {
......
...@@ -15,11 +15,13 @@ import org.springframework.stereotype.Component; ...@@ -15,11 +15,13 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
@Component @Component
...@@ -65,24 +67,22 @@ public class DataRefreshListener { ...@@ -65,24 +67,22 @@ public class DataRefreshListener {
private void processEvent(DataRefreshEvent event) { private void processEvent(DataRefreshEvent event) {
TzsDataRefreshMessage message = new TzsDataRefreshMessage(); // 1.记录 message
try { List<TzsDataRefreshMessage> messages = createMsg(event);
// 1.记录 message // 2.调用更新处理
createMsg(event, message); dataRefreshService.ifPresent(service -> service.doDispatch(event.getDataType(), messages));
// 2.调用更新处理
dataRefreshService.ifPresent(service -> service.doDispatch(message));
} catch (Exception e) {
message.setStatus(Constants.REFRESH_STATUS_FAILURE); // 标记为失败
message.setErrorMsg(e.getMessage());
tzsDataRefreshMessageService.saveOrUpdate(message);
}
} }
private void createMsg(DataRefreshEvent event, TzsDataRefreshMessage message) { private List<TzsDataRefreshMessage> createMsg(DataRefreshEvent event) {
message.setDataId(event.getDataId()); List<TzsDataRefreshMessage> messages = event.getDataIds().stream().map(dataId -> {
message.setDataType(event.getDataType()); TzsDataRefreshMessage message = new TzsDataRefreshMessage();
message.setOperation(event.getOperation().name()); message.setDataId(dataId);
message.setStatus(Constants.REFRESH_STATUS_DEALING); // 流程中 message.setDataType(event.getDataType());
tzsDataRefreshMessageService.save(message); message.setOperation(event.getOperation().name());
message.setStatus(Constants.REFRESH_STATUS_DEALING); // 流程中
return message;
}).collect(Collectors.toList());
tzsDataRefreshMessageService.saveBatch(messages);
return messages;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment