Commit bae0d1c6 authored by zhangsen's avatar zhangsen

rocketmq 支持多个ip连接并发送消息

parent 5e9e1b37
...@@ -341,6 +341,7 @@ public class View3dController extends BaseController { ...@@ -341,6 +341,7 @@ public class View3dController extends BaseController {
@RequestBody Object msg) { @RequestBody Object msg) {
try { try {
rocketMQService.sendMsg(topic,"test_msg",msg); rocketMQService.sendMsg(topic,"test_msg",msg);
rocketMQService.sendMsg1(topic,"test_msg",msg);
} catch (Exception e) { } catch (Exception e) {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
return CommonResponseUtil.failure(e.getMessage()); return CommonResponseUtil.failure(e.getMessage());
......
...@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.Message; ...@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -16,12 +17,20 @@ import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService; ...@@ -16,12 +17,20 @@ import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
public class RocketMQService implements IRocketMQService { public class RocketMQService implements IRocketMQService {
private final Logger log = LoggerFactory.getLogger(RocketMQService.class); private final Logger log = LoggerFactory.getLogger(RocketMQService.class);
@Autowired @Autowired
DefaultMQProducer defaultMQProducer; @Qualifier("defaultProducer")
DefaultMQProducer defaultMQProducer;
@Value("${rocketmq.producer.sysIsUsed}")
private String sysIsUsed; @Autowired
@Qualifier("defaultProducer1")
DefaultMQProducer defaultMQProducer1;
@Value("${rocketmq.producer.sysIsUsed}")
private String sysIsUsed;
@Value("${rocketmq.producer.sysIsUsed2}")
private String sysIsUsed2;
public void sendMsg(String topic, String tag, Object msg){ public void sendMsg(String topic, String tag, Object msg){
try { try {
...@@ -42,4 +51,25 @@ public class RocketMQService implements IRocketMQService { ...@@ -42,4 +51,25 @@ public class RocketMQService implements IRocketMQService {
e.printStackTrace(); e.printStackTrace();
} }
} }
@Override
public void sendMsg1(String topic, String tag, Object msg) {
try {
if ("on".equalsIgnoreCase(sysIsUsed2)) {
log.info("rocketMQtopic====" + topic);
log.info("rocketMQmsg====" + JSON.toJSONString(msg).toString());
Message sendMsg = new Message(topic, tag, JSON.toJSONString(msg).getBytes());
SendResult sendResult = defaultMQProducer1.send(sendMsg);
log.info("rocketMQsendResult====" + JSON.toJSONString(sendResult).toString());
} else {
log.info("rocketMQsendResult====rocketmq.producer.sysIsUsed is not on");
}
} catch (Exception e) {
// TODO Auto-generated catch block
log.error(e.getMessage(), e);
e.printStackTrace();
}
}
} }
...@@ -3,5 +3,6 @@ package com.yeejoin.amos.fas.business.service.intfc; ...@@ -3,5 +3,6 @@ package com.yeejoin.amos.fas.business.service.intfc;
public interface IRocketMQService { public interface IRocketMQService {
void sendMsg(String topic, String tag, Object msg); void sendMsg(String topic, String tag, Object msg);
void sendMsg1(String topic, String tag, Object msg);
} }
...@@ -8,6 +8,8 @@ import org.apache.rocketmq.client.exception.MQClientException; ...@@ -8,6 +8,8 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -24,8 +26,12 @@ import org.springframework.context.annotation.Configuration; ...@@ -24,8 +26,12 @@ import org.springframework.context.annotation.Configuration;
public class MQProducerConfigure { public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class); public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
private String groupName; // private String groupName;
private String namesrvAddr; @Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr1;
@Value("${rocketmq.producer.namesrvAddr2}")
private String namesrvAddr2;
// 消息最大值 // 消息最大值
private Integer maxMessageSize; private Integer maxMessageSize;
// 消息发送超时时间 // 消息发送超时时间
...@@ -35,6 +41,12 @@ public class MQProducerConfigure { ...@@ -35,6 +41,12 @@ public class MQProducerConfigure {
// 是否启用rocketmq // 是否启用rocketmq
private String sysIsUsed; private String sysIsUsed;
@Value("${rocketmq.producer.groupName}")
private String name1;
@Value("${rocketmq.producer.groupName2}")
private String name2;
/** /**
* mq 生成者配置 * mq 生成者配置
...@@ -46,8 +58,10 @@ public class MQProducerConfigure { ...@@ -46,8 +58,10 @@ public class MQProducerConfigure {
public DefaultMQProducer defaultProducer() throws MQClientException { public DefaultMQProducer defaultProducer() throws MQClientException {
if("on".equalsIgnoreCase(sysIsUsed)){ if("on".equalsIgnoreCase(sysIsUsed)){
LOGGER.info("defaultProducer 正在创建---------------------------------------"); LOGGER.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName); DefaultMQProducer producer = new DefaultMQProducer(name1);
producer.setNamesrvAddr(namesrvAddr); producer.setNamesrvAddr(namesrvAddr1);
//实例名称:字段必须赋值 :::且需要区分
producer.setInstanceName(name1);
producer.setVipChannelEnabled(false); producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize); producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut); producer.setSendMsgTimeout(sendMsgTimeOut);
...@@ -56,6 +70,32 @@ public class MQProducerConfigure { ...@@ -56,6 +70,32 @@ public class MQProducerConfigure {
LOGGER.info("rocketmq producer server 开启成功----------------------------------"); LOGGER.info("rocketmq producer server 开启成功----------------------------------");
return producer; return producer;
} }
return new DefaultMQProducer(groupName); return new DefaultMQProducer(name1);
}
/**
* mq 生成者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer1() throws MQClientException {
if("on".equalsIgnoreCase(sysIsUsed)){
LOGGER.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(name2);
producer.setNamesrvAddr(namesrvAddr2);
//实例名称:字段必须赋值 :::且需要区分
producer.setInstanceName(name2);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
LOGGER.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
return new DefaultMQProducer(name2);
} }
} }
...@@ -69,6 +69,13 @@ rocketmq.producer.sendMsgTimeOut=30000 ...@@ -69,6 +69,13 @@ rocketmq.producer.sendMsgTimeOut=30000
# 发送消息失败重试次数,默认2 # 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2 rocketmq.producer.retryTimesWhenSendFailed=2
# 是否使用rocketmq on/off
rocketmq.producer.sysIsUsed2=off
#自定义groupName2 与 第一个name区分开
rocketmq.producer.groupName2=groupName2
# mq的nameserver地址
rocketmq.producer.namesrvAddr2=172.16.3.135:9876
rocket-plan-topic =topic_fire_emergency_plan rocket-plan-topic =topic_fire_emergency_plan
rocket-equip-alarm-topic =topic_fire_equip_alarm rocket-equip-alarm-topic =topic_fire_equip_alarm
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment