Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
amos-boot-biz
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
项目统一框架
amos-boot-biz
Commits
d9444816
Commit
d9444816
authored
Apr 16, 2024
by
张森
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
删除无用代码
parent
a4b5f0a2
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
70 additions
and
80 deletions
+70
-80
KafkaConsumerService.java
.../com/yeejoin/amos/message/kafka/KafkaConsumerService.java
+70
-80
No files found.
amos-boot-utils/amos-boot-utils-message/src/main/java/com/yeejoin/amos/message/kafka/KafkaConsumerService.java
View file @
d9444816
...
@@ -65,20 +65,10 @@ public class KafkaConsumerService {
...
@@ -65,20 +65,10 @@ public class KafkaConsumerService {
Optional
<?>
messages
=
Optional
.
ofNullable
(
record
.
value
());
Optional
<?>
messages
=
Optional
.
ofNullable
(
record
.
value
());
if
(
messages
.
isPresent
())
{
if
(
messages
.
isPresent
())
{
try
{
try
{
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
JSONObject
object
=
JSONObject
.
fromObject
(
record
.
value
());
JSONObject
object
=
JSONObject
.
fromObject
(
record
.
value
());
com
.
alibaba
.
fastjson
.
JSONObject
jsonObj
=
ClassToJsonUtil
.
class2json
(
object
,
commonMessage
,
record
.
topic
());
com
.
alibaba
.
fastjson
.
JSONObject
jsonObj
=
ClassToJsonUtil
.
class2json
(
object
,
commonMessage
,
record
.
topic
());
emqKeeper
.
getMqttClient
().
publish
(
String
.
valueOf
(
jsonObj
.
get
(
"mqTopic"
)),
JSON
.
toJSONString
(
jsonObj
).
getBytes
(
"UTF-8"
),
0
,
false
);
emqKeeper
.
getMqttClient
().
publish
(
String
.
valueOf
(
jsonObj
.
get
(
"mqTopic"
)),
JSON
.
toJSONString
(
jsonObj
).
getBytes
(
"UTF-8"
),
0
,
false
);
// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONObject data = messageObj.getJSONObject("body");
// if (data.isEmpty()) {
// data = messageObj;
// data.put("datatype", "state");
// }
log
.
info
(
"接收到Roma消息对象: {}"
,
object
);
log
.
info
(
"接收到Roma消息对象: {}"
,
object
);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
ack
.
acknowledge
();
ack
.
acknowledge
();
}
catch
(
MqttException
e
)
{
}
catch
(
MqttException
e
)
{
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
...
@@ -87,77 +77,77 @@ public class KafkaConsumerService {
...
@@ -87,77 +77,77 @@ public class KafkaConsumerService {
}
}
}
}
}
}
//
/**
// /**
* 韶山换流对接Kafka
// * 韶山换流对接Kafka
* @param record record
// * @param record record
* @param ack ack
// * @param ack ack
*/
// */
@KafkaListener
(
id
=
"kafkaConsumer"
,
groupId
=
"kafkaConsumerGroup"
,
topics
=
"#{'${queue.kafka.shaoshan.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
// @KafkaListener(id = "kafkaConsumer", groupId = "kafkaConsumerGroup", topics = "#{'${queue.kafka.shaoshan.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
public
void
kafkaConsumer
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
// public void kafkaConsumer(ConsumerRecord<?, String> record, Acknowledgment ack) {
Optional
<?>
message
=
Optional
.
ofNullable
(
record
.
value
());
// Optional<?> message = Optional.ofNullable(record.value());
if
(
message
.
isPresent
())
{
// if (message.isPresent()) {
try
{
// try {
// JSONObject messageObj = JSONObject.fromObject(record.value());
//// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONObject data = messageObj.getJSONObject("body");
//// JSONObject data = messageObj.getJSONObject("body");
//// JSONObject object = JSONObject.fromObject(record.value());
//// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
//// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
//
// JSONObject object = JSONObject.fromObject(record.value());
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
// com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
// emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// ack.acknowledge();
JSONObject
object
=
JSONObject
.
fromObject
(
record
.
value
());
// } catch (MqttException e) {
com
.
alibaba
.
fastjson
.
JSONObject
jsonObj
=
ClassToJsonUtil
.
class2json
(
object
,
commonMessage
,
record
.
topic
());
// log.error("解析数据失败,{}", e.getMessage());
emqKeeper
.
getMqttClient
().
publish
(
String
.
valueOf
(
jsonObj
.
get
(
"mqTopic"
)),
JSON
.
toJSONString
(
jsonObj
).
getBytes
(
"UTF-8"
),
0
,
false
);
// } catch (UnsupportedEncodingException e) {
ack
.
acknowledge
();
// e.printStackTrace();
}
catch
(
MqttException
e
)
{
// }
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
// }
}
catch
(
UnsupportedEncodingException
e
)
{
// }
e
.
printStackTrace
();
//
}
// /**
}
// * 事件告警对接Kafka
}
// * @param record record
// * @param ack ack
/**
// * groupId = kafkaConsumerGroup
* 事件告警对接Kafka
// * 该消息的消息格式为
* @param record record
// * {"data_class":"realdata","data_type":"alarm","op_type":"subscribe_emergency","condition":{"station_psr_id":"50edcb6c1b8a811030493c80a2014950ed9d4f59e8","station_name":"中州换流站","alarm_type":"yx_bw"},"data":[{"psrId":"D017020000000000000000999","astId":"D017020000000000000000999","equipType":"ASTType_0000111","eventType":"OtherSignal","alarmSource":"OWS","alarmLevel":"3","description":"2024-03-11 09:06:17::585 S2WCL12A E3.C01软水器再生结束信号 出现","dateTime":"2024-03-11 09:06:17.585"}]}
* @param ack ack
// */
* groupId = kafkaConsumerGroup
//
* 该消息的消息格式为
// @KafkaListener(id = "kafkaConsumerEventAlarm", groupId = "kafkaConsumerGroupEventAlarm", topics = "#{'${queue.kafka.eventAlarm.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
* {"data_class":"realdata","data_type":"alarm","op_type":"subscribe_emergency","condition":{"station_psr_id":"50edcb6c1b8a811030493c80a2014950ed9d4f59e8","station_name":"中州换流站","alarm_type":"yx_bw"},"data":[{"psrId":"D017020000000000000000999","astId":"D017020000000000000000999","equipType":"ASTType_0000111","eventType":"OtherSignal","alarmSource":"OWS","alarmLevel":"3","description":"2024-03-11 09:06:17::585 S2WCL12A E3.C01软水器再生结束信号 出现","dateTime":"2024-03-11 09:06:17.585"}]}
// public void kafkaConsumerEventAlarm(ConsumerRecord<?, String> record, Acknowledgment ack) {
*/
// Optional<?> message = Optional.ofNullable(record.value());
// if (message.isPresent()) {
@KafkaListener
(
id
=
"kafkaConsumerEventAlarm"
,
groupId
=
"kafkaConsumerGroupEventAlarm"
,
topics
=
"#{'${queue.kafka.eventAlarm.topics}'.split(',')}"
,
containerFactory
=
"kafkaRomaContainerFactory"
)
// try {
public
void
kafkaConsumerEventAlarm
(
ConsumerRecord
<?,
String
>
record
,
Acknowledgment
ack
)
{
//// JSONObject messageObj = JSONObject.fromObject(record.value());
Optional
<?>
message
=
Optional
.
ofNullable
(
record
.
value
());
//// JSONArray dataArray = messageObj.getJSONArray("data");
if
(
message
.
isPresent
())
{
//// JSONArray jsonArray = new JSONArray();
try
{
//// String timestamp = "";
// JSONObject messageObj = JSONObject.fromObject(record.value());
//// for (Object obj : dataArray) {
// JSONArray dataArray = messageObj.getJSONArray("data");
//// JSONObject finallyObj = new JSONObject();
// JSONArray jsonArray = new JSONArray();
//// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
// String timestamp = "";
//// finallyObj.put("eventtextL1", detail.get("description"));
// for (Object obj : dataArray) {
//// finallyObj.put("pointId", detail.get("astId"));
// JSONObject finallyObj = new JSONObject();
//// finallyObj.put("time", detail.get("dateTime"));
// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
//// jsonArray.add(finallyObj);
// finallyObj.put("eventtextL1", detail.get("description"));
//// timestamp = detail.get("dateTime").toString();
// finallyObj.put("pointId", detail.get("astId"));
//// }
// finallyObj.put("time", detail.get("dateTime"));
//// JSONObject jsonObjectMessage = new JSONObject();
// jsonArray.add(finallyObj);
//// jsonObjectMessage.put("warns", jsonArray);
// timestamp = detail.get("dateTime").toString();
//// jsonObjectMessage.put("timestamp", timestamp);
// }
//
// JSONObject jsonObjectMessage = new JSONObject();
// JSONObject object = JSONObject.fromObject(record.value());
// jsonObjectMessage.put("warns", jsonArray);
// com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
// jsonObjectMessage.put("timestamp", timestamp);
// emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// ack.acknowledge();
JSONObject
object
=
JSONObject
.
fromObject
(
record
.
value
());
// } catch (MqttException e) {
com
.
alibaba
.
fastjson
.
JSONObject
jsonObj
=
ClassToJsonUtil
.
class2json
(
object
,
commonMessage
,
record
.
topic
());
// log.error("解析数据失败,{}", e.getMessage());
emqKeeper
.
getMqttClient
().
publish
(
String
.
valueOf
(
jsonObj
.
get
(
"mqTopic"
)),
JSON
.
toJSONString
(
jsonObj
).
getBytes
(
"UTF-8"
),
0
,
false
);
// } catch (UnsupportedEncodingException e) {
ack
.
acknowledge
();
// e.printStackTrace();
}
catch
(
MqttException
e
)
{
// }
log
.
error
(
"解析数据失败,{}"
,
e
.
getMessage
());
// }
}
catch
(
UnsupportedEncodingException
e
)
{
// }
e
.
printStackTrace
();
}
}
}
/* @KafkaListener(id = "consumerBatch", topicPartitions = {
/* @KafkaListener(id = "consumerBatch", topicPartitions = {
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment