Commit 607f7dc9 authored by suhuiguang's avatar suhuiguang

优化版本2 并行查询

parent ea9231fd
...@@ -233,6 +233,22 @@ public class CheckController extends AbstractBaseController { ...@@ -233,6 +233,22 @@ public class CheckController extends AbstractBaseController {
String userId = RequestContext.getExeUserId(); String userId = RequestContext.getExeUserId();
try { try {
UserTransmittableUtils.setUser(); UserTransmittableUtils.setUser();
planTaskService.handleAllBatch2(ids, userId);
} catch (Exception e) {
return ResponseHelperUtil.buildErrorResponse("提交失败:"+e.getMessage());
} finally {
UserTransmittableUtils.clear();
}
return ResponseHelper.buildResponse("提交成功");
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(value = "一键提交巡检任务", notes = "一键提交巡检任务<font color='blue'>手机app</font>")
@RequestMapping(value = "/saveRecordAll1", produces = "application/json;charset=UTF-8", method = RequestMethod.POST)
public ResponseModel<Object> saveCheckRecordAll1(@RequestParam String ids) {
String userId = RequestContext.getExeUserId();
try {
UserTransmittableUtils.setUser();
planTaskService.handleAllBatch(ids, userId); planTaskService.handleAllBatch(ids, userId);
} catch (Exception e) { } catch (Exception e) {
return ResponseHelperUtil.buildErrorResponse("提交失败:"+e.getMessage()); return ResponseHelperUtil.buildErrorResponse("提交失败:"+e.getMessage());
......
...@@ -40,10 +40,6 @@ public class CheckRecordDataConsumer implements Runnable { ...@@ -40,10 +40,6 @@ public class CheckRecordDataConsumer implements Runnable {
private IPlanTaskDao iPlanTaskDao; private IPlanTaskDao iPlanTaskDao;
private ICheckDao iCheckDao;
private ICheckInputDao iCheckInputDao;
private PlanTaskMapper planTaskMapper; private PlanTaskMapper planTaskMapper;
private IPlanTaskDetailDao iPlanTaskDetailDao; private IPlanTaskDetailDao iPlanTaskDetailDao;
...@@ -64,8 +60,6 @@ public class CheckRecordDataConsumer implements Runnable { ...@@ -64,8 +60,6 @@ public class CheckRecordDataConsumer implements Runnable {
public CheckRecordDataConsumer(BlockingQueue<CheckRecordDataDto> blockingQueue, ApplicationContext applicationContext) { public CheckRecordDataConsumer(BlockingQueue<CheckRecordDataDto> blockingQueue, ApplicationContext applicationContext) {
this.blockingQueue = blockingQueue; this.blockingQueue = blockingQueue;
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
iCheckDao = applicationContext.getBean(ICheckDao.class);
iCheckInputDao = applicationContext.getBean(ICheckInputDao.class);
planTaskMapper = applicationContext.getBean(PlanTaskMapper.class); planTaskMapper = applicationContext.getBean(PlanTaskMapper.class);
iPlanTaskDao = applicationContext.getBean(IPlanTaskDao.class); iPlanTaskDao = applicationContext.getBean(IPlanTaskDao.class);
iPlanTaskDetailDao = applicationContext.getBean(IPlanTaskDetailDao.class); iPlanTaskDetailDao = applicationContext.getBean(IPlanTaskDetailDao.class);
...@@ -197,7 +191,7 @@ public class CheckRecordDataConsumer implements Runnable { ...@@ -197,7 +191,7 @@ public class CheckRecordDataConsumer implements Runnable {
CheckInput checkInput = new CheckInput(); CheckInput checkInput = new CheckInput();
if (XJConstant.INPUT_ITEM_SELECT.equals(inputItem.getItemType())) { if (XJConstant.INPUT_ITEM_SELECT.equals(inputItem.getItemType())) {
checkInput = paraseSelect(checkInput, inputItem.getDataJson(), inputItem.getIsScore()); checkInput = paraseSelect(checkInput, inputItem.getDataJson());
} else if (XJConstant.INPUT_ITEM_NUMBER.equals(inputItem.getItemType())) { } else if (XJConstant.INPUT_ITEM_NUMBER.equals(inputItem.getItemType())) {
checkInput.setInputValue(inputItem.getDefaultValue()); checkInput.setInputValue(inputItem.getDefaultValue());
} else if (XJConstant.INPUT_ITEM_TEXT.equals(inputItem.getItemType())) { } else if (XJConstant.INPUT_ITEM_TEXT.equals(inputItem.getItemType())) {
...@@ -222,7 +216,7 @@ public class CheckRecordDataConsumer implements Runnable { ...@@ -222,7 +216,7 @@ public class CheckRecordDataConsumer implements Runnable {
checkInputMapper.insertBatch(checkInputs); checkInputMapper.insertBatch(checkInputs);
} }
private CheckInput paraseSelect(CheckInput checkInput, String json, String isScore) { private CheckInput paraseSelect(CheckInput checkInput, String json) {
JSONArray jsonArray = JSONArray.parseArray(json); JSONArray jsonArray = JSONArray.parseArray(json);
if (!ObjectUtils.isEmpty(jsonArray)) { if (!ObjectUtils.isEmpty(jsonArray)) {
for (int i = 0; i < jsonArray.size(); i++) { for (int i = 0; i < jsonArray.size(); i++) {
......
...@@ -4,16 +4,15 @@ import com.yeejoin.amos.patrol.business.data.CheckRecordDataConsumer; ...@@ -4,16 +4,15 @@ import com.yeejoin.amos.patrol.business.data.CheckRecordDataConsumer;
import com.yeejoin.amos.patrol.business.dto.CheckRecordDataDto; import com.yeejoin.amos.patrol.business.dto.CheckRecordDataDto;
import com.yeejoin.amos.patrol.business.event.CheckRecordInsertEvent; import com.yeejoin.amos.patrol.business.event.CheckRecordInsertEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.event.EventListener;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
/** /**
...@@ -21,33 +20,27 @@ import java.util.concurrent.LinkedBlockingDeque; ...@@ -21,33 +20,27 @@ import java.util.concurrent.LinkedBlockingDeque;
*/ */
@Component @Component
@Slf4j @Slf4j
public class CheckRecordInsertListener implements ApplicationContextAware, ApplicationListener<CheckRecordInsertEvent> { public class CheckRecordInsertListener {
private final BlockingQueue<CheckRecordDataDto> blockingQueue = new LinkedBlockingDeque<>(); private final BlockingQueue<CheckRecordDataDto> blockingQueue = new LinkedBlockingDeque<>();
private ApplicationContext applicationContext; private int coreThreads = Runtime.getRuntime().availableProcessors() * 2;
@Autowired @Autowired
Executor asyncServiceExecutor; private ApplicationContext applicationContext;
private int curSystemThreads = Runtime.getRuntime().availableProcessors();
@Override @EventListener
public void onApplicationEvent(CheckRecordInsertEvent event) { public void onApplicationEvent(CheckRecordInsertEvent event) {
blockingQueue.add(event.getMessage()); blockingQueue.add(event.getMessage());
} }
@PostConstruct @PostConstruct
public void init() { public void init() {
Executor executor = Executors.newFixedThreadPool(coreThreads);
CheckRecordDataConsumer consumer = new CheckRecordDataConsumer(blockingQueue, applicationContext); CheckRecordDataConsumer consumer = new CheckRecordDataConsumer(blockingQueue, applicationContext);
for (int i = 0; i < curSystemThreads; i++) { for (int i = 0; i < coreThreads; i++) {
asyncServiceExecutor.execute(consumer); executor.execute(consumer);
} }
} }
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
} }
...@@ -1712,6 +1712,46 @@ public class PlanTaskServiceImpl implements IPlanTaskService { ...@@ -1712,6 +1712,46 @@ public class PlanTaskServiceImpl implements IPlanTaskService {
this.sendInsertMessage(planTasks, planTaskDetails); this.sendInsertMessage(planTasks, planTaskDetails);
} }
@Override
@Transactional(rollbackFor = Exception.class)
public void handleAllBatch2(String planTaskIds, String userId) {
List<Long> planTaskIdsList = Arrays.stream(planTaskIds.split(","))
.map(Long::parseLong)
.collect(Collectors.toList());
// 1.数组准备:查询要一键提交的任务及更新状态
CompletableFuture<List<PlanTask>> planTasksFuture = CompletableFuture.supplyAsync(() -> iplanTaskDao.findAllByIdIn(planTaskIdsList), asyncServiceExecutor);
// 2.数组准备:查询未完成状态的任务明细及更新状态
CompletableFuture<List<PlanTaskDetail>> planTaskDetailsFuture = CompletableFuture.supplyAsync(() -> planTaskDetailMapper.findAllByIdInAndStatus(planTaskIdsList, PlanTaskDetailStatusEnum.NOTSTARTED.getValue()), asyncServiceExecutor);
// 3.数组准备: es存储数据组装1
CompletableFuture<List<ESTaskDetailDto>> allESTaskDetailDtosFuture = CompletableFuture.supplyAsync(() -> esTaskDetail.findAllByPlanTaskIdIn(planTaskIdsList.stream().map(String::valueOf).collect(Collectors.toList())), asyncServiceExecutor);
// 4.数组准备: es存储数据组装2
CompletableFuture<List<ESPlanTaskListDto>> esPlanTaskListDtosFuture = CompletableFuture.supplyAsync(() -> esPlanTaskList.findAllByPlanTaskIdIn(planTaskIdsList.stream().map(String::valueOf).collect(Collectors.toList())), asyncServiceExecutor);
CompletableFuture.allOf(planTasksFuture, planTaskDetailsFuture, allESTaskDetailDtosFuture, esPlanTaskListDtosFuture).join();
List<PlanTask> planTasks = planTasksFuture.join();
List<PlanTaskDetail> planTaskDetails = planTaskDetailsFuture.join();
List<ESTaskDetailDto> esTaskDetailDtos = allESTaskDetailDtosFuture.join();
List<ESPlanTaskListDto> esPlanTaskListDtos = esPlanTaskListDtosFuture.join();
finshPlanTask(planTasks, planTaskDetails);
buildESTaskDetailDtoData2(esTaskDetailDtos, planTaskDetails);
buildESPlanTaskListDtoData2(esPlanTaskListDtos, esTaskDetailDtos, planTaskDetails);
saveMustData(esTaskDetailDtos, esPlanTaskListDtos, planTasks);
this.sendInsertMessage(planTasks, planTaskDetails);
}
private void finshPlanTask(List<PlanTask> planTasks, List<PlanTaskDetail> planTaskDetails) {
planTasks.forEach(planTask -> {
planTask.setFinishStatus(XJConstant.TASK_STATUS_FINISH);
if (planTask.getRiskStatus() != 1) {
planTask.setRiskStatus(XJConstant.NORISK_NUM);
}
});
planTaskDetails.forEach(planTaskDetail -> {
planTaskDetail.setStatus(PlanTaskDetailStatusEnum.QUALIFIED.getValue());
});
}
private void sendInsertMessage(List<PlanTask> planTasks, List<PlanTaskDetail> planTaskDetails){ private void sendInsertMessage(List<PlanTask> planTasks, List<PlanTaskDetail> planTaskDetails){
CheckRecordDataDto dataDto = new CheckRecordDataDto(); CheckRecordDataDto dataDto = new CheckRecordDataDto();
dataDto.setPlanTasks(planTasks); dataDto.setPlanTasks(planTasks);
...@@ -1808,6 +1848,43 @@ public class PlanTaskServiceImpl implements IPlanTaskService { ...@@ -1808,6 +1848,43 @@ public class PlanTaskServiceImpl implements IPlanTaskService {
} }
private List<ESTaskDetailDto> buildESTaskDetailDtoData2(List<ESTaskDetailDto> allESTaskDetailDtos, List<PlanTaskDetail> planTaskDetails) {
// 查询所有的
Map<Long, ESTaskDetailDto> esTaskDetailDtoMap = allESTaskDetailDtos.stream().collect(Collectors.toMap(ESTaskDetailDto::getId, Function.identity()));
// 只更新未完成状态
planTaskDetails.forEach(planTaskDetail -> {
ESTaskDetailDto esTaskDetailDto = esTaskDetailDtoMap.get(planTaskDetail.getId());
JSONObject appCheckInput = esTaskDetailDto.getAppCheckInput();
Map<String, List<Map<String, Object>>> mapList = (Map<String, List<Map<String, Object>>>) JSON.parse(appCheckInput.toJSONString());
List<AppCheckInputRespone> appCheckInputRespones = new ArrayList<>();
for (Map<String, Object> map : mapList.get("items")) {
AppCheckInputRespone appCheckInputRespone = new AppCheckInputRespone();
BeanUtil.copyProperties(map, appCheckInputRespone);
InputItem inputItem = cacheHelper.getInputItemCacheData(map.get("checkInputId").toString());
CheckInput checkInput = new CheckInput();
String itemType = String.valueOf(map.get("itemType"));
if (XJConstant.INPUT_ITEM_SELECT.equals(itemType)) {
paraseSelect(checkInput, String.valueOf(map.get("dataJson")));
appCheckInputRespone.setInputValue(checkInput.getInputValue());
} else if (XJConstant.INPUT_ITEM_NUMBER.equals(itemType)) {
appCheckInputRespone.setInputValue(inputItem.getDefaultValue());
} else if (XJConstant.INPUT_ITEM_TEXT.equals(itemType)) {
appCheckInputRespone.setInputValue(inputItem.getDefaultValue());
}
appCheckInputRespone.setPointInputImgUrls(new ArrayList<>());
appCheckInputRespones.add(appCheckInputRespone);
}
appCheckInput.put("items", appCheckInputRespones);
esTaskDetailDto.setPointImgUrls(new ArrayList<>());
esTaskDetailDto.setAppCheckInput(appCheckInput);
esTaskDetailDto.setPointStatus(String.valueOf(PlanTaskDetailIsFinishEnum.FINISHED.getValue()));
esTaskDetailDto.setPointImgUrls(new ArrayList<>());
});
return allESTaskDetailDtos;
}
private List<ESPlanTaskListDto> buildESPlanTaskListDtoData(List<PlanTask> planTasks, List<ESTaskDetailDto> esTaskDetailDtos, List<PlanTaskDetail> planTaskDetails) { private List<ESPlanTaskListDto> buildESPlanTaskListDtoData(List<PlanTask> planTasks, List<ESTaskDetailDto> esTaskDetailDtos, List<PlanTaskDetail> planTaskDetails) {
List<String> ids = planTasks.stream().map(p -> String.valueOf(p.getId())).collect(Collectors.toList()); List<String> ids = planTasks.stream().map(p -> String.valueOf(p.getId())).collect(Collectors.toList());
List<ESPlanTaskListDto> esPlanTaskListDtos = esPlanTaskList.findAllByPlanTaskIdIn(ids); List<ESPlanTaskListDto> esPlanTaskListDtos = esPlanTaskList.findAllByPlanTaskIdIn(ids);
...@@ -1823,6 +1900,19 @@ public class PlanTaskServiceImpl implements IPlanTaskService { ...@@ -1823,6 +1900,19 @@ public class PlanTaskServiceImpl implements IPlanTaskService {
return esPlanTaskListDtos; return esPlanTaskListDtos;
} }
private List<ESPlanTaskListDto> buildESPlanTaskListDtoData2(List<ESPlanTaskListDto> esPlanTaskListDtos, List<ESTaskDetailDto> esTaskDetailDtos, List<PlanTaskDetail> planTaskDetails) {
esPlanTaskListDtos.forEach(esPlanTaskListDto -> {
esPlanTaskListDto.setFinishStatus(String.valueOf(PlanTaskFinishStatusEnum.FINISHED.getValue()));
esPlanTaskListDto.setFinshNum(this.filterByStatus("1", esPlanTaskListDto.getPlanTaskId(), esTaskDetailDtos));
esPlanTaskListDto.setOmission(this.filterByStatus("3", esPlanTaskListDto.getPlanTaskId(), esTaskDetailDtos));
esPlanTaskListDto.setUnqualified(this.filterByStatus("2", esPlanTaskListDto.getPlanTaskId(), esTaskDetailDtos));
esPlanTaskListDto.setUnplan(this.filterByStatus("0", esPlanTaskListDto.getPlanTaskId(), esTaskDetailDtos));
esPlanTaskListDto.setTaskPlanNum(this.filterByStatus("1", esPlanTaskListDto.getPlanTaskId(), esTaskDetailDtos));
esPlanTaskListDto.setPoints(this.buildPlanTaskPoint(esPlanTaskListDto.getPlanTaskId(), planTaskDetails));
});
return esPlanTaskListDtos;
}
private List<PlanTaskDetail> buildPlanTaskPoint(String planTaskId, List<PlanTaskDetail> planTaskDetails) { private List<PlanTaskDetail> buildPlanTaskPoint(String planTaskId, List<PlanTaskDetail> planTaskDetails) {
return planTaskDetails.stream().filter(t -> Long.parseLong(planTaskId) == t.getTaskNo()).collect(Collectors.toList()); return planTaskDetails.stream().filter(t -> Long.parseLong(planTaskId) == t.getTaskNo()).collect(Collectors.toList());
} }
...@@ -1867,6 +1957,7 @@ public class PlanTaskServiceImpl implements IPlanTaskService { ...@@ -1867,6 +1957,7 @@ public class PlanTaskServiceImpl implements IPlanTaskService {
return planTaskDetailMapper.findPlanTaskByTaskIdAndPointId(plantaskId, pointId); return planTaskDetailMapper.findPlanTaskByTaskIdAndPointId(plantaskId, pointId);
} }
@Override
public Page<Map<String, Object>> getPlanTasks(String toke, String product, String appKey, HashMap<String, Object> params) { public Page<Map<String, Object>> getPlanTasks(String toke, String product, String appKey, HashMap<String, Object> params) {
CommonPageable pageParam = new CommonPageable(); CommonPageable pageParam = new CommonPageable();
List<Map<String, Object>> content = Lists.newArrayList(); List<Map<String, Object>> content = Lists.newArrayList();
...@@ -1933,7 +2024,6 @@ public class PlanTaskServiceImpl implements IPlanTaskService { ...@@ -1933,7 +2024,6 @@ public class PlanTaskServiceImpl implements IPlanTaskService {
try { try {
response = restHighLevelClient.search(request, RequestOptions.DEFAULT); response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
for (org.elasticsearch.search.SearchHit hit : response.getHits().getHits()) { for (org.elasticsearch.search.SearchHit hit : response.getHits().getHits()) {
System.out.println(hit);
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(hit);
JSONObject dto2 = jsonObject.getJSONObject("sourceAsMap"); JSONObject dto2 = jsonObject.getJSONObject("sourceAsMap");
Map map = dto2; Map map = dto2;
......
...@@ -211,4 +211,7 @@ public interface IPlanTaskService { ...@@ -211,4 +211,7 @@ public interface IPlanTaskService {
void backPatrolInfo(); void backPatrolInfo();
void handleAllBatch(String ids, String userId); void handleAllBatch(String ids, String userId);
void handleAllBatch2(String ids, String userId);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment