Commit 0be3e1c8 authored by caotao's avatar caotao

1.新增es同步数据到mysql接口。

2.新增mysql同步数据到es接口。 3.新增光字牌、模拟量等接口排序。 4.接口返回新增显示名称、测点名称、单位及值
parent 116335d7
package com.yeejoin.amos.boot.module.jxiop.api.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.github.classgraph.json.Id;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
/**
* CREATE TABLE `equipments_jxiop_doc_mysql` (
* `id` varchar(50) NOT NULL,
* `address` varchar(50) DEFAULT NULL,
* `equipment_specific_name` varchar(255) DEFAULT NULL,
* `gateway_id` varchar(50) DEFAULT NULL,
* `is_alarm` varchar(20) DEFAULT NULL,
* `created_time` date DEFAULT NULL,
* `unit` varchar(50) DEFAULT NULL,
* `value` varchar(50) DEFAULT NULL,
* `value_F` float DEFAULT NULL,
* `value_label` varchar(255) DEFAULT NULL,
* `trace_id` varchar(50) DEFAULT NULL,
* `equipment_index_name` varchar(255) DEFAULT NULL,
* `equipment_number` varchar(50) DEFAULT NULL,
* `front_module` varchar(200) DEFAULT NULL,
* `system_type` varchar(200) DEFAULT NULL,
* `picture_name` varchar(255) DEFAULT NULL,
* `display_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
* `data_type` varchar(50) DEFAULT NULL,
* PRIMARY KEY (`id`)
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
*/
@Data
@Accessors(chain = true)
@TableName("equipments_jxiop_doc_mysql")
public class EquipmentsJxiopDocMysql {
@Id
private String id;
@TableField("address")
private String address;
@TableField("data_type")
private String dataType;
@TableField("equipment_specific_name")
private String equipmentSpecificName;
@TableField("gateway_id")
private String gatewayId;
@TableField("is_alarm")
private String isAlarm;
@TableField("created_time")
private Date createdTime;
@TableField("unit")
private String unit;
@TableField("value")
private String value;
@TableField("value_F")
private Float valueF ;
@TableField("value_label")
private String valueLabel;
@TableField("trace_id")
private String traceId;
@TableField("equipment_index_name")
private String equipmentIndexName;
@TableField("equipment_number")
private String equipmentNumber;
@TableField("front_module")
private String frontModule;
@TableField("system_type")
private String systemType;
@TableField("picture_name")
private String pictureName;
@TableField("display_name")
private String displayName;
}
package com.yeejoin.amos.boot.module.jxiop.api.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.EquipmentsJxiopDocMysql;
public interface EquipmentsJxiopDocMysqlMapper extends BaseMapper<EquipmentsJxiopDocMysql> {
}
package com.yeejoin.amos.boot.module.jxiop.biz.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.github.classgraph.json.Id;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.Date;
/**
* CREATE TABLE `equipments_jxiop_doc_mysql` (
* `id` varchar(50) NOT NULL,
* `address` varchar(50) DEFAULT NULL,
* `equipment_specific_name` varchar(255) DEFAULT NULL,
* `gateway_id` varchar(50) DEFAULT NULL,
* `is_alarm` varchar(20) DEFAULT NULL,
* `created_time` date DEFAULT NULL,
* `unit` varchar(50) DEFAULT NULL,
* `value` varchar(50) DEFAULT NULL,
* `value_F` float DEFAULT NULL,
* `value_label` varchar(255) DEFAULT NULL,
* `trace_id` varchar(50) DEFAULT NULL,
* `equipment_index_name` varchar(255) DEFAULT NULL,
* `equipment_number` varchar(50) DEFAULT NULL,
* `front_module` varchar(200) DEFAULT NULL,
* `system_type` varchar(200) DEFAULT NULL,
* `picture_name` varchar(255) DEFAULT NULL,
* `display_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL,
* `data_type` varchar(50) DEFAULT NULL,
* PRIMARY KEY (`id`)
* ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
*/
@Data
@Accessors(chain = true)
@TableName("equipments_jxiop_doc_mysql")
public class EquipmentsJxiopDocMysql {
@Id
private String id;
@TableField("address")
private String address;
@TableField("data_type")
private String dataType;
@TableField("equipment_specific_name")
private String equipmentSpecificName;
@TableField("gateway_id")
private String gatewayId;
@TableField("is_alarm")
private String isAlarm;
@TableField("created_time")
private Date createdTime;
@TableField("unit")
private String unit;
@TableField("value")
private String value;
@TableField("value_F")
private Float valueF ;
@TableField("value_label")
private String valueLabel;
@TableField("trace_id")
private String traceId;
@TableField("equipment_index_name")
private String equipmentIndexName;
@TableField("equipment_number")
private String equipmentNumber;
@TableField("front_module")
private String frontModule;
@TableField("system_type")
private String systemType;
@TableField("picture_name")
private String pictureName;
@TableField("display_name")
private String displayName;
}
package com.yeejoin.amos.boot.module.jxiop.biz.entity;
import lombok.Getter;
/**
*
* @author LiuLin
* @date 2023年10月11日 09:31
*/
@Getter
public final class EsEntity<T> {
private String id;
private T data;
public EsEntity() {
}
public EsEntity(String id, T data) {
this.data = data;
this.id = id;
}
public void setId(String id) {
this.id = id;
}
public void setData(T data) {
this.data = data;
}
}
...@@ -846,14 +846,19 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator { ...@@ -846,14 +846,19 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator {
likeMap.put(CommonConstans.QueryStringSystemTypeKeyword, systemType); likeMap.put(CommonConstans.QueryStringSystemTypeKeyword, systemType);
} }
List<ESEquipments> indicatorsDtoList = commonServiceImpl.getListDataByCondtions(queryCondtion, null, ESEquipments.class, likeMap); List<ESEquipments> indicatorsDtoList = commonServiceImpl.getListDataByCondtions(queryCondtion, null, ESEquipments.class, likeMap);
Integer traceIdCount = indicatorsDtoList.stream().filter(esEquipments -> !StringUtils.isEmpty(esEquipments.getTraceId())).collect(Collectors.toList()).size();
List<Map<String, Object>> statusMaps = new ArrayList<>(); List<Map<String, Object>> statusMaps = new ArrayList<>();
for (ESEquipments listDatum : indicatorsDtoList) { for (ESEquipments listDatum : indicatorsDtoList) {
Map<String, Object> statusMap = new HashMap<>(); Map<String, Object> statusMap = new HashMap<>();
statusMap.put("traceId",listDatum.getTraceId());
String data = listDatum.getDisplayName(); String data = listDatum.getDisplayName();
if (StringUtils.isEmpty(data) || (!ObjectUtils.isEmpty(listDatum.getEquipmentNumber()) && data.equals(listDatum.getEquipmentNumber()))) { if (StringUtils.isEmpty(data) || (!ObjectUtils.isEmpty(listDatum.getEquipmentNumber()) && data.equals(listDatum.getEquipmentNumber()))) {
data = listDatum.getEquipmentIndexName(); data = listDatum.getEquipmentIndexName();
} }
statusMap.put("displayName",listDatum.getDisplayName());
statusMap.put("equipmentIndexName",listDatum.getEquipmentIndexName());
statusMap.put("unit",listDatum.getUnit());
statusMap.put("value",listDatum.getValue());
statusMap.put("addres", listDatum.getAddress()); statusMap.put("addres", listDatum.getAddress());
statusMap.put("data", data); statusMap.put("data", data);
statusMap.put("state", listDatum.getValue().equals("false") ? 0 : 1); statusMap.put("state", listDatum.getValue().equals("false") ? 0 : 1);
...@@ -864,7 +869,16 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator { ...@@ -864,7 +869,16 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator {
} }
statusMaps.add(statusMap); statusMaps.add(statusMap);
} }
statusMaps.sort(Comparator.comparingLong(o -> Long.parseLong(o.get("addres").toString()))); Collator instance = Collator.getInstance(Locale.CHINA);
String sortField;
if(traceIdCount == statusMaps.size()){
sortField = "traceId";
} else {
sortField = "addres";
}
Collections.sort(statusMaps, (e1, e2) -> {
return instance.compare(e1.get(sortField), e2.get(sortField));
});
return statusMaps; return statusMaps;
} }
...@@ -878,8 +892,14 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator { ...@@ -878,8 +892,14 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator {
List<ESEquipments> esEquipmentsList = commonServiceImpl.getListDataByCondtionsAndLike(queryCondtion, null, ESEquipments.class, likeMap); List<ESEquipments> esEquipmentsList = commonServiceImpl.getListDataByCondtionsAndLike(queryCondtion, null, ESEquipments.class, likeMap);
List<ESEquipments> listData = esEquipmentsList.stream().filter(esEquipments -> !esEquipments.getDisplayName().equals("")).collect(Collectors.toList()); List<ESEquipments> listData = esEquipmentsList.stream().filter(esEquipments -> !esEquipments.getDisplayName().equals("")).collect(Collectors.toList());
List<Map<String, Object>> statusMaps = new ArrayList<>(); List<Map<String, Object>> statusMaps = new ArrayList<>();
Integer traceIdCount = listData.stream().filter(esEquipments -> !StringUtils.isEmpty(esEquipments.getTraceId())).collect(Collectors.toList()).size();
for (ESEquipments listDatum : listData) { for (ESEquipments listDatum : listData) {
Map<String, Object> statusMap = new HashMap<>(); Map<String, Object> statusMap = new HashMap<>();
statusMap.put("traceId",listDatum.getTraceId());
statusMap.put("displayName",listDatum.getDisplayName());
statusMap.put("equipmentIndexName",listDatum.getEquipmentIndexName());
statusMap.put("unit",listDatum.getUnit());
statusMap.put("value",listDatum.getValue());
if (StringUtils.isEmpty(listDatum.getValue())) { if (StringUtils.isEmpty(listDatum.getValue())) {
statusMap.put("title", 0.00 + (StringUtils.isNotEmpty(listDatum.getUnit()) ? listDatum.getUnit() : "")); statusMap.put("title", 0.00 + (StringUtils.isNotEmpty(listDatum.getUnit()) ? listDatum.getUnit() : ""));
} else { } else {
...@@ -888,14 +908,16 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator { ...@@ -888,14 +908,16 @@ public class MonitorFanIndicatorImpl implements IMonitorFanIndicator {
statusMap.put("title1", listDatum.getDisplayName()); statusMap.put("title1", listDatum.getDisplayName());
statusMaps.add(statusMap); statusMaps.add(statusMap);
} }
Collator instance = Collator.getInstance(Locale.CHINA); Collator instance = Collator.getInstance(Locale.CHINA);
String sortField;
if(traceIdCount == statusMaps.size()){
sortField = "traceId";
} else {
sortField = "title1";
}
Collections.sort(statusMaps, (e1, e2) -> { Collections.sort(statusMaps, (e1, e2) -> {
return instance.compare(e1.get("title1"), e2.get("title1")); return instance.compare(e1.get(sortField), e2.get(sortField));
}); });
return statusMaps; return statusMaps;
} }
......
package com.yeejoin.amos.boot.module.jxiop.biz.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.module.jxiop.biz.entity.EsEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
/**
* @author LiuLin
* @date 2023年08月08日 16:30
*/
@Slf4j
@Component
public class ElasticSearchUtil {
private static final long SCROLL_TIMEOUT = 180000;
private static final int SIZE = 1000;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* ES修改数据
*
* @param indexName 索引名称
* @param id 主键
* @param paramJson 参数JSON
* @return
*/
public boolean updateData(String indexName, String id, String paramJson) {
log.info("更新ES数据,value:{}", id);
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(true);
//立即刷新数据
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON);
try {
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
//log.info("索引[{}],主键:【{}】操作结果:[{}]", indexName, id, updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) {
log.info("索引:【{}】,主键:【{}】新增成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
return true;
} else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) {
log.info("索引:[{}],主键:[{}]无变化", indexName, id);
return true;
}
} catch (IOException e) {
log.error("索引:[{}],主键:【{}】", indexName, id, e);
return false;
}
return false;
}
/**
* 单条更新
*
* @param indexName
* @param id
* @param data
* @return
* @throws IOException
*/
public boolean updateData(String indexName, String id, Object data) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//准备文档
String jsonString = JSONObject.toJSONString(data);
Map jsonMap = JSONObject.parseObject(jsonString, Map.class);
updateRequest.doc(jsonMap);
updateRequest.timeout(TimeValue.timeValueSeconds(1));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//数据为存储而不是更新
UpdateResponse update = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
return update.getGetResult().equals(DocWriteResponse.Result.UPDATED);
}
/**
* 必须传递ids集合
*
* @param indexName
* @param idList
* @param map
* @return
*/
public boolean update(String indexName, List<String> idList, Map map) {
// 创建批量请求
BulkRequest bulkRequest = new BulkRequest();
for (String id : idList) {
UpdateRequest updateRequest = new UpdateRequest(indexName, id).doc(map);
bulkRequest.add(updateRequest);
}
try {
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulk.hasFailures();
} catch (IOException e) {
return false;
}
}
/**
* Description: 批量修改数据
*
* @param index index
* @param list 更新列表
* @author LiuLin
*/
public <T> void updateBatch(String index, List<EsEntity<T>> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new UpdateRequest(index, item.getId())
.doc(JSON.toJSONString(item.getData()), XContentType.JSON)));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
list.forEach(s -> log.info("===========索引:【{}】,主键:【{}】修改成功", index, s.getId()));
} catch (Exception e) {
log.error("索引:[{}]", index, e);
}
}
/**
* Description: 批量插入数据
*
* @param index index
* @param list 插入列表
* @author LiuLin
*/
public <T> void insertBatch(String index, List<EsEntity<T>> list) {
BulkRequest request = new BulkRequest();
list.forEach(item -> request.add(new IndexRequest(index).id(item.getId())
.source(JSON.toJSONString(item.getData()), XContentType.JSON)));
try {
restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* ES异步修改数据
*
* @param indexName 索引名称
* @param id 主键
* @param paramJson 参数JSON
*/
public void updateDataAsync(String indexName, String id, String paramJson) throws IOException {
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
updateRequest.docAsUpsert(true);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson, XContentType.JSON);
restHighLevelClient.updateAsync(updateRequest, RequestOptions.DEFAULT, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) {
log.info("索引:【{}】,主键:【{}】修改成功", indexName, id);
}
}
@Override
public void onFailure(Exception e) {
log.error("索引:[{}],主键:【{}】", indexName, id, e);
}
});
}
/**
* 构建SearchResponse
*
* @param indices 索引
* @param query queryBuilder
* @param fun 返回函数
* @param <T> 返回类型
* @return List, 可以使用fun转换为T结果
* @throws Exception e
*/
public <T> List<T> searchResponse(String indices, QueryBuilder query, Function<SearchHit, T> fun) throws Exception {
SearchRequest request = new SearchRequest(indices);
Scroll scroll = new Scroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(query);
sourceBuilder.size(SIZE);
request.scroll(scroll);
request.source(sourceBuilder);
List<String> scrollIdList = new ArrayList<>();
List<T> result = new ArrayList<>();
SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHit[] hits = searchResponse.getHits().getHits();
scrollIdList.add(scrollId);
try {
while (ArrayUtils.isNotEmpty(hits)) {
for (SearchHit hit : hits) {
result.add(fun.apply(hit));
}
if (hits.length < SIZE) {
break;
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);
SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
scrollId = searchScrollResponse.getScrollId();
hits = searchScrollResponse.getHits().getHits();
scrollIdList.add(scrollId);
}
} finally {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.setScrollIds(scrollIdList);
restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
return result;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment