Commit fbc68770 authored by KeYong's avatar KeYong

提交kafka消息生产者

parent 92efc232
...@@ -140,7 +140,12 @@ ...@@ -140,7 +140,12 @@
<groupId>com.yeejoin</groupId> <groupId>com.yeejoin</groupId>
<artifactId>amos-component-security</artifactId> <artifactId>amos-component-security</artifactId>
<version>1.7.13-SNAPSHOT</version> <version>1.7.13-SNAPSHOT</version>
</dependency> </dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -12,6 +12,7 @@ import com.yeejoin.amos.fas.business.action.el.ELEvaluationContext; ...@@ -12,6 +12,7 @@ import com.yeejoin.amos.fas.business.action.el.ELEvaluationContext;
import com.yeejoin.amos.fas.business.action.model.ContingencyEvent; import com.yeejoin.amos.fas.business.action.model.ContingencyEvent;
import com.yeejoin.amos.fas.business.action.model.ContingencyRo; import com.yeejoin.amos.fas.business.action.model.ContingencyRo;
import com.yeejoin.amos.fas.business.action.model.DeviceRo; import com.yeejoin.amos.fas.business.action.model.DeviceRo;
import com.yeejoin.amos.fas.business.action.mq.IKafkaProducer;
import com.yeejoin.amos.fas.business.action.mq.WebMqttComponent; import com.yeejoin.amos.fas.business.action.mq.WebMqttComponent;
import com.yeejoin.amos.fas.business.action.result.ActionResult; import com.yeejoin.amos.fas.business.action.result.ActionResult;
import com.yeejoin.amos.fas.business.action.result.SafteyPlanResult; import com.yeejoin.amos.fas.business.action.result.SafteyPlanResult;
...@@ -110,6 +111,9 @@ public class ContingencyAction implements CustomerAction { ...@@ -110,6 +111,9 @@ public class ContingencyAction implements CustomerAction {
IRocketMQService rocketMQService; IRocketMQService rocketMQService;
@Autowired @Autowired
IKafkaProducer iKafkaProducer;
@Autowired
private IContingencyInstance contingencyInstance; private IContingencyInstance contingencyInstance;
@Value("${systemctl.sync.switch}") @Value("${systemctl.sync.switch}")
...@@ -278,13 +282,21 @@ public class ContingencyAction implements CustomerAction { ...@@ -278,13 +282,21 @@ public class ContingencyAction implements CustomerAction {
// 将预案的确认消息发送至中心级 // 将预案的确认消息发送至中心级
if ("CONFIRM".equals(ro.getConfirm())) { if ("CONFIRM".equals(ro.getConfirm())) {
log.info("RocketMQ发送的主题是: " + rocketTopic + ", 消息体是: " + toipResponse.toJsonStr() + "!"); log.info("RocketMQ与Kafka发送的主题是: " + rocketTopic + ", 消息体是: " + toipResponse.toJsonStr() + "!");
try { try {
rocketMQService.sendMsg(rocketTopic, "plan_process", toipResponse); rocketMQService.sendMsg(rocketTopic, "plan_process", toipResponse);
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
throw new RuntimeException("RocketMQ消息发送失败!"); throw new RuntimeException("RocketMQ消息发送失败!");
} }
// kafka消息发送至南瑞平台
try {
iKafkaProducer.sendMessage(rocketTopic, "plan_process", toipResponse);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException("Kafka消息发送失败!");
}
} }
} else if ("websocket".equals(pushType.toLowerCase())) { } else if ("websocket".equals(pushType.toLowerCase())) {
action.execute(msgType, contingency); action.execute(msgType, contingency);
......
package com.yeejoin.amos.fas.business.action.mq;
/**
* @author keyong
* @title: IKafkaProducer
* <pre>
* @description:
* </pre>
* @date 2022/10/14 15:16
*/
public interface IKafkaProducer {
void sendMessage(String topic, String key, Object message);
}
package com.yeejoin.amos.fas.business.action.mq;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author keyong
* @title: KafkaProducer
* <pre>
* @description:
* </pre>
* @date 2022/10/14 10:59
*/
@Configuration
@EnableKafka
public class KafkaProducer implements IKafkaProducer {
private final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Value("${kafka.producer.sysIsUsed}")
private String sysIsUsed;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static class KafkaFutureCallback implements ListenableFutureCallback<SendResult> {
private final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Override
public void onFailure(Throwable ex) {
log.error("send kafka failed: {}", ex);
}
@Override
public void onSuccess(SendResult result) {
log.info("send kafka success: {}", result.getProducerRecord());
}
}
@Override
public void sendMessage(String topic, String key, Object message) {
if("on".equalsIgnoreCase(sysIsUsed)){
log.info("Kafka Topic===="+topic);
log.info("Kafka msg===="+ JSON.toJSONString(message));
kafkaTemplate.send(topic, key, message).addCallback(new KafkaFutureCallback());
}else{
log.info("Kafka SendResult====kafka.producer.sysIsUsed is not open !!!");
}
}
}
...@@ -7,6 +7,7 @@ import static com.yeejoin.amos.fas.business.constants.FasConstant.token; ...@@ -7,6 +7,7 @@ import static com.yeejoin.amos.fas.business.constants.FasConstant.token;
import java.util.List; import java.util.List;
import com.yeejoin.amos.fas.business.action.mq.IKafkaProducer;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
...@@ -335,8 +336,11 @@ public class View3dController extends BaseController { ...@@ -335,8 +336,11 @@ public class View3dController extends BaseController {
return CommonResponseUtil.success(view3dService.dutyList(orgCode)); return CommonResponseUtil.success(view3dService.dutyList(orgCode));
} }
@Autowired @Autowired
IRocketMQService rocketMQService; IRocketMQService rocketMQService;
@Autowired
IKafkaProducer iKafkaProducer;
@ApiOperation(value = "rocketMQ消息推送测试", notes = "rocketMQ消息推送测试") @ApiOperation(value = "rocketMQ消息推送测试", notes = "rocketMQ消息推送测试")
@PostMapping(value="rocketMQ/send") @PostMapping(value="rocketMQ/send")
...@@ -351,5 +355,18 @@ public class View3dController extends BaseController { ...@@ -351,5 +355,18 @@ public class View3dController extends BaseController {
} }
return CommonResponseUtil.success(msg); return CommonResponseUtil.success(msg);
} }
@ApiOperation(value = "kafka消息推送测试", notes = "kafka消息推送测试")
@PostMapping(value="kafka/send")
public CommonResponse kafkaMsgSendTest(@RequestParam(name = "topic", required = false) String topic,
@RequestBody Object msg) {
try {
iKafkaProducer.sendMessage(topic, "kafka_msg_test", JSON.toJSONString(msg));
} catch (Exception e) {
log.error(e.getMessage(), e);
return CommonResponseUtil.failure(e.getMessage());
}
return CommonResponseUtil.success(msg);
}
} }
...@@ -104,3 +104,22 @@ sso.client.secret=6t5oDDKhEODXa++UNUxxLHSF5kVqECq6j+wahtCbv8c= ...@@ -104,3 +104,22 @@ sso.client.secret=6t5oDDKhEODXa++UNUxxLHSF5kVqECq6j+wahtCbv8c=
sso.login.type=server_auth sso.login.type=server_auth
sso.login.client=dce sso.login.client=dce
sso.client.url=https://198.87.103.88:30443/oauth2/oauth/rest_token sso.client.url=https://198.87.103.88:30443/oauth2/oauth/rest_token
#Kafka 相关配置
kafka.producer.sysIsUsed = on
spring.kafka.bootstrap-servers=172.16.3.51:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment