Commit 96b22d7e authored by maoying's avatar maoying

添加rocket启动配置

parent 7a7dcec2
...@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.Message; ...@@ -6,6 +6,7 @@ import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
...@@ -15,18 +16,26 @@ import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService; ...@@ -15,18 +16,26 @@ import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
public class RocketMQService implements IRocketMQService { public class RocketMQService implements IRocketMQService {
private final Logger log = LoggerFactory.getLogger(RocketMQService.class); private final Logger log = LoggerFactory.getLogger(RocketMQService.class);
@Autowired @Autowired
DefaultMQProducer defaultMQProducer; DefaultMQProducer defaultMQProducer;
@Value("${rocketmq.producer.sysIsUsed}")
private String sysIsUsed;
public void sendMsg(String topic, String tag, Object msg){ public void sendMsg(String topic, String tag, Object msg){
try { try {
log.info("rocketMQtopic===="+topic); if("on".equalsIgnoreCase(sysIsUsed)){
log.info("rocketMQmsg===="+JSON.toJSONString(msg).toString()); log.info("rocketMQtopic===="+topic);
log.info("rocketMQmsg===="+JSON.toJSONString(msg).toString());
Message sendMsg = new Message(topic, tag, JSON.toJSONString(msg).getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg); Message sendMsg = new Message(topic, tag, JSON.toJSONString(msg).getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("rocketMQsendResult===="+JSON.toJSONString(sendResult).toString());
log.info("rocketMQsendResult===="+JSON.toJSONString(sendResult).toString());
}else{
log.info("rocketMQsendResult====rocketmq.producer.sysIsUsed is not on");
}
} catch (Exception e) { } catch (Exception e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
......
package com.yeejoin.amos.fas.business.feign; package com.yeejoin.amos.fas.config;
import lombok.Getter; import lombok.Getter;
...@@ -14,8 +14,6 @@ import org.springframework.context.annotation.Bean; ...@@ -14,8 +14,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
/** /**
* @author: lockie
* @Date: 2020/4/21 10:28
* @Description: mq生产者配置 * @Description: mq生产者配置
*/ */
@Getter @Getter
...@@ -34,6 +32,9 @@ public class MQProducerConfigure { ...@@ -34,6 +32,9 @@ public class MQProducerConfigure {
private Integer sendMsgTimeOut; private Integer sendMsgTimeOut;
// 失败重试次数 // 失败重试次数
private Integer retryTimesWhenSendFailed; private Integer retryTimesWhenSendFailed;
// 是否启用rocketmq
private String sysIsUsed;
/** /**
* mq 生成者配置 * mq 生成者配置
...@@ -43,15 +44,18 @@ public class MQProducerConfigure { ...@@ -43,15 +44,18 @@ public class MQProducerConfigure {
@Bean @Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on") @ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer() throws MQClientException { public DefaultMQProducer defaultProducer() throws MQClientException {
LOGGER.info("defaultProducer 正在创建---------------------------------------"); if("on".equalsIgnoreCase(sysIsUsed)){
DefaultMQProducer producer = new DefaultMQProducer(groupName); LOGGER.info("defaultProducer 正在创建---------------------------------------");
producer.setNamesrvAddr(namesrvAddr); DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setVipChannelEnabled(false); producer.setNamesrvAddr(namesrvAddr);
producer.setMaxMessageSize(maxMessageSize); producer.setVipChannelEnabled(false);
producer.setSendMsgTimeout(sendMsgTimeOut); producer.setMaxMessageSize(maxMessageSize);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed); producer.setSendMsgTimeout(sendMsgTimeOut);
producer.start(); producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
LOGGER.info("rocketmq producer server 开启成功----------------------------------"); producer.start();
return producer; LOGGER.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
return new DefaultMQProducer(groupName);
} }
} }
...@@ -27,7 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor; ...@@ -27,7 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
@Configuration @Configuration
@EnableCaching @EnableCaching
public class Config { public class RedisConfig {
@Value("${spring.redis.host}") @Value("${spring.redis.host}")
private String host; private String host;
......
...@@ -50,6 +50,11 @@ emqx.broker=tcp://172.16.11.201:1883 ...@@ -50,6 +50,11 @@ emqx.broker=tcp://172.16.11.201:1883
emqx.user-name=admin emqx.user-name=admin
emqx.password=public emqx.password=public
#文件服务器地址
file.downLoad.url=http://172.16.11.201:9000/
# 是否使用rocketmq on/off
rocketmq.producer.sysIsUsed=off
#rocketmq生产者配置 #rocketmq生产者配置
# 是否开启自动配置 # 是否开启自动配置
rocketmq.producer.isOnOff=on rocketmq.producer.isOnOff=on
...@@ -67,10 +72,8 @@ rocketmq.producer.retryTimesWhenSendFailed=2 ...@@ -67,10 +72,8 @@ rocketmq.producer.retryTimesWhenSendFailed=2
rocket-plan-topic =topic_fire_emergency_plan rocket-plan-topic =topic_fire_emergency_plan
rocket-equip-alarm-topic =topic_fire_equip_alarm rocket-equip-alarm-topic =topic_fire_equip_alarm
#文件服务器地址
file.downLoad.url=http://172.16.11.201:9000/
#规则ip配置,用于多网卡及docker镜像启动时添加 #规则ip配置,用于多网卡及docker镜像启动时添加
rule.definition.local-ip=172.16.11.201 #rule.definition.local-ip=172.16.11.201
#3Dtype 分为web和ue #3Dtype 分为web和ue
integrated3Dtype =web integrated3Dtype =web
...@@ -52,3 +52,28 @@ emqx.password=public ...@@ -52,3 +52,28 @@ emqx.password=public
#文件服务器地址 #文件服务器地址
file.downLoad.url=http://39.98.246.31:8888/ file.downLoad.url=http://39.98.246.31:8888/
# 是否使用rocketmq on/off
rocketmq.producer.sysIsUsed=off
#rocketmq生产者配置
# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.producer.namesrvAddr=172.16.3.51:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=30000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
rocket-plan-topic =topic_fire_emergency_plan
rocket-equip-alarm-topic =topic_fire_equip_alarm
#规则ip配置,用于多网卡及docker镜像启动时添加
#rule.definition.local-ip=172.16.11.201
#3Dtype 分为web和ue
integrated3Dtype =web
...@@ -52,3 +52,28 @@ emqx.password=public ...@@ -52,3 +52,28 @@ emqx.password=public
#文件服务器地址 #文件服务器地址
file.downLoad.url=http://39.98.246.31:8888/ file.downLoad.url=http://39.98.246.31:8888/
# 是否使用rocketmq on/off
rocketmq.producer.sysIsUsed=off
#rocketmq生产者配置
# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.producer.namesrvAddr=172.16.3.51:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=30000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
rocket-plan-topic =topic_fire_emergency_plan
rocket-equip-alarm-topic =topic_fire_equip_alarm
#规则ip配置,用于多网卡及docker镜像启动时添加
#rule.definition.local-ip=172.16.11.201
#3Dtype 分为web和ue
integrated3Dtype =web
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment