Commit 5bbc4a34 authored by 高建强's avatar 高建强

item:消息接收集成修改

parent 9b3d6f8b
package com.boot.bus.sqlsync.enums;
/**
* <h1>同步数据动作</h1>
*
* @Author SingleTian
* @Date 2021-04-01 09:21
*/
public enum DataSyncOperationEnum {
/**
* 创建
*/
CREATE,
/**
* 更新
*/
UPDATE,
/**
* 删除
*/
DELETE;
}
package com.boot.bus.sqlsync.enums;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.enums
* @ClassName: SyncDataTextEnum
* @Author: Jianqiang Gao
* @Description: SyncDataTextEnum
* @Date: 2021/4/13 09:55
* @Version: 1.0
*/
public enum DataSyncTextEnum {
/**
* 物联开关量
*/
IOT_SWITCH("开关"),
/**
* 物联模拟量
*/
IOT_ANALOG("模拟"),
/**
* 文本
*/
DATA_TYPE_TEXT("文本"),
/**
* 數值
*/
DATA_TYPE_NUMBER("數值");
private final String text;
DataSyncTextEnum(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
package com.boot.bus.sqlsync.enums;
/**
* <h1>同步数据类型</h1>
*
* @Author SingleTian
* @Date 2021-04-01 09:20
*/
public enum DataSyncTypeEnum {
CONTINGENCY_PLAN_DETAIL("11", "cs/v1/fireASF/dataSync"),
CONTINGENCY_PLAN_OPERATION_RECORD("11", "cs/v1/fireASF/dataSync"),
CONTINGENCY_ORIGINAL_DATA("11", "cs/v1/fireASF/dataSync"),
CONTINGENCY_PLAN_INSTANCE("11", "cs/v1/fireASF/dataSync");
/**
* 资源类型编码
*/
private final String sourceCode;
/**
* mqtt主题
*/
private final String mqTopic;
DataSyncTypeEnum(String sourceCode, String mqTopic) {
this.sourceCode = sourceCode;
this.mqTopic = mqTopic;
}
public String getSourceCode() {
return sourceCode;
}
public String getMqTopic() {
return mqTopic;
}
}
package com.boot.bus.sqlsync.message;
import com.alibaba.fastjson.JSON;
import com.boot.bus.sqlsync.enums.DataSyncOperationEnum;
import com.boot.bus.sqlsync.enums.DataSyncTypeEnum;
import lombok.Data;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* @ProjectName: YeeAMOSPatrolRoot
* @Package: com.yeejoin.amos.patrol.business.datasync
* @ClassName: SyncDataMessage
* @Author: Jianqiang Gao
* @Description: SyncDataMessage
* @Date: 2022/5/30 10:40
* @Version: 1.0
*/
@Data
public class DataSyncMessage implements Serializable {
private static final long serialVersionUID = 3950066933125606745L;
/**
* 唯一ID
*/
private String uid;
/**
* 同步的数据类型
*/
private DataSyncTypeEnum type;
/**
* 同步动作-增删改
*/
private DataSyncOperationEnum operation;
/**
* 发生时间
*/
private Long timestamp;
/**
* 同步数据列表
*/
private List<Serializable> data;
/**
* 获取对应的redis消息key
*
* @return
*/
public String redisKey() {
assert type != null;
return String.join("_", type.toString(), operation.toString(), uid);
}
/**
* Message对象转换成字节码
*
* @return
*/
public byte[] message2Bytes() {
return JSON.toJSONString(this).getBytes(StandardCharsets.UTF_8);
}
/**
* 字节码转换成Message对象
*
* @param messageBytes 字节码
* @return
*/
public static DataSyncMessage bytes2Message(byte[] messageBytes) {
return JSON.parseObject(new String(messageBytes, StandardCharsets.UTF_8), DataSyncMessage.class);
}
}
\ No newline at end of file
......@@ -6,8 +6,8 @@ import com.boot.bus.sqlsync.async.callback.IWorker;
import com.boot.bus.sqlsync.async.executor.timer.SystemClock;
import com.boot.bus.sqlsync.async.worker.WorkResult;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import com.boot.bus.sqlsync.message.DataSyncMessage;
import com.boot.bus.sqlsync.service.impl.SyncMqttMessageService;
import com.yeejoin.amos.fas.datasync.DataSyncMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......
......@@ -100,7 +100,7 @@ public class MqttReceiveConfig {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String msg = message.getPayload().toString();
syncMqttMessageService.syncData(topic, msg);
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
};
}
......
package com.boot.bus.sqlsync.service.impl;
import com.alibaba.fastjson.JSON;
import com.boot.bus.sqlsync.message.DataSyncMessage;
import com.boot.bus.sqlsync.service.infc.MqttReceiveService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.Serializable;
import java.util.List;
/**
* @author keyong
* @title: MqttReceiveServiceImpl
......@@ -21,8 +26,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIncrementMessage(String topic, String message) {
log.info(String.format("收到mqtt消息:%s", message));
DataSyncMessage dataSyncMessage = JSON.parseObject(message, DataSyncMessage.class);
List<Serializable> data = dataSyncMessage.getData();
}
}
package com.boot.bus.sqlsync.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.boot.bus.sqlsync.enums.DataSyncOperationEnum;
import com.boot.bus.sqlsync.enums.DataSyncTypeEnum;
import com.boot.bus.sqlsync.message.DataSyncMessage;
import com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.common.enums.DataSyncTypeEnum;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.datasync.DataSyncMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
......@@ -61,6 +61,4 @@ public class SyncMqttMessageService {
}
}
public void syncData(String topic, String msg) {
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment