Commit a617bdbc authored by maoying's avatar maoying

添加rocketmq

parent f0b6a25e
......@@ -108,12 +108,17 @@
<artifactId>itextpdf</artifactId>
<version>5.5.13</version>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<!--<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
</dependency>-->
</dependencies>
......
......@@ -26,6 +26,7 @@ import com.yeejoin.amos.fas.business.service.impl.RuleRunigSnapshotServiceImpl;
import com.yeejoin.amos.fas.business.service.intfc.IContingencyInstance;
import com.yeejoin.amos.fas.business.service.intfc.IEquipmentService;
import com.yeejoin.amos.fas.business.service.intfc.IRiskSourceService;
import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
import com.yeejoin.amos.fas.business.service.model.ContingencyDeviceStatus;
import com.yeejoin.amos.fas.business.service.model.ToipResponse;
import com.yeejoin.amos.fas.business.util.CacheFactory;
......@@ -40,7 +41,6 @@ import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.dao.entity.Equipment;
import com.yeejoin.amos.fas.dao.entity.PlanDetail;
import com.yeejoin.amos.fas.dao.entity.PlanOperationRecord;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
//import com.yeejoin.amos.fas.business.service.intfc.FireStengthService;
@Component
@RuleActionBean(beanLabel = "动态预案" )
......@@ -86,8 +85,6 @@ public class ContingencyAction implements CustomerAction {
@Autowired
private IContingencyInstance iContingencyInstance;
// @Autowired
// private FireStengthService fireStrengthService;
@Autowired
private ContingencyLogPublisher contingencyLogPublisher;
......@@ -104,10 +101,11 @@ public class ContingencyAction implements CustomerAction {
private RedisTemplate redisTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
IRocketMQService rocketMQService;
@Value("${rocket-plan-topic}")
private String rocketTopic;
private static Map<String, String> OPERATE_RECORD_ID = new HashMap<>();
......@@ -301,22 +299,9 @@ public class ContingencyAction implements CustomerAction {
String msg = JSON.toJSONString(event);
log.info("RocketMQ发送的主题是: " + rocketTopic + ", 消息体是: " + msg + "!");
try {
FirePlanAlarmBo fpab = new FirePlanAlarmBo();
fpab.setDeviceId(ro.getEquipmentCode());
fpab.setDeviceName(ro.getEquipmentName());
fpab.setFiredeviceId(ro.getFireEquipmentCode());
fpab.setFiredeviceName(ro.getFireEquipmentName());
fpab.setStationId("00602");
fpab.setWarningId(ro.getBatchNo());
fpab.setWarningInfo(ro.getFireEquipmentName()+"发生火灾报警");
fpab.setWarningTime(DateUtil.getLongCurrentDate());
List<FirePlanAlarmBo> list = new ArrayList<FirePlanAlarmBo>();
list.add(fpab);
rocketMQTemplate.convertAndSend("/topic/fire/equip-alarm", JSON.toJSONString(list));
rocketMQTemplate.convertAndSend(rocketTopic, msg);
rocketMQService.sendMsg(rocketTopic, "plan_process", msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException("RocketMQ消息发送失败!");
}
}
......
package com.yeejoin.amos.fas.business.controller;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import com.yeejoin.amos.fas.business.action.CustomerAction;
import com.yeejoin.amos.fas.business.bo.FirePlanAlarmBo;
import com.yeejoin.amos.fas.business.dao.mapper.View3dMapper;
import com.yeejoin.amos.fas.business.service.impl.RiskSourceServiceImpl;
import com.yeejoin.amos.fas.business.service.intfc.IContingencyInstance;
import com.yeejoin.amos.fas.business.service.intfc.IContingencyOriginalDataService;
import com.yeejoin.amos.fas.business.service.intfc.IEquipmentService;
import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
import com.yeejoin.amos.fas.business.service.intfc.IRuleRunningSnapshotService;
import com.yeejoin.amos.fas.config.Permission;
import com.yeejoin.amos.fas.core.common.request.CommonPageable;
......@@ -15,20 +41,10 @@ import com.yeejoin.amos.fas.core.util.CommonResponse;
import com.yeejoin.amos.fas.core.util.CommonResponseUtil;
import com.yeejoin.amos.fas.core.util.DateUtil;
import com.yeejoin.amos.fas.dao.entity.Equipment;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.*;
import org.typroject.tyboot.core.foundation.context.RequestContext;
import java.util.*;
@RestController
@RequestMapping(value = "/api/timeline")
......@@ -56,6 +72,14 @@ public class TimeLineController extends BaseController{
private RedisTemplate redisTemplate;
@Autowired
private IRuleRunningSnapshotService ruleRunningSnapshotService;
@Autowired
IRocketMQService rocketMQService;
@Value("${rocket-equip-alarm-topic}")
private String rocketTopicFireEquipAlarm;
@Autowired
private View3dMapper view3dMapper;
@Permission
//@Authorization(ingore = true)
......@@ -87,11 +111,22 @@ public class TimeLineController extends BaseController{
map.put("token",getToken());
map.put("product",getProduct());
fireQueue.addLast(map);
//应急指挥给总部推送消息
sendPlanAlarm(batchNo,buttonCode);
return CommonResponseUtil.success("SUCCESS");
}
private void sendPlanAlarm(String batchNo ,String buttonCode){
//确警后推送报警数据
if("FIRE_CONFIRM".equals(buttonCode)){
FirePlanAlarmBo firePlanAlarm = view3dMapper.getPlanAlarmInfo(batchNo);
List<FirePlanAlarmBo> list = new ArrayList<FirePlanAlarmBo>();
list.add(firePlanAlarm);
rocketMQService.sendMsg(rocketTopicFireEquipAlarm,"plan_alarm", list);
}
}
@Permission
//@Authorization(ingore = true)
@ApiOperation(httpMethod = "PUT",value = "点击按钮", notes = "点击按钮")
@RequestMapping(value = "/fire/exit", produces = "application/json;charset=UTF-8", method = RequestMethod.PUT)
public CommonResponse fireExit(@RequestParam("batchNo") String batchNo,
......
package com.yeejoin.amos.fas.business.controller;
import static com.yeejoin.amos.fas.business.constants.FasConstant.appKey;
import static com.yeejoin.amos.fas.business.constants.FasConstant.product;
import static com.yeejoin.amos.fas.business.constants.FasConstant.staticOrgCode;
import static com.yeejoin.amos.fas.business.constants.FasConstant.token;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
//import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
......@@ -15,10 +24,12 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.fas.business.bo.BindPointBo;
import com.yeejoin.amos.fas.business.bo.BindRegionBo;
import com.yeejoin.amos.fas.business.param.RetrieveParams;
import com.yeejoin.amos.fas.business.service.intfc.IRiskSourceService;
import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
import com.yeejoin.amos.fas.business.service.intfc.IView3dService;
import com.yeejoin.amos.fas.business.vo.ExceptionRegionVo;
import com.yeejoin.amos.fas.business.vo.ReginParams;
......@@ -31,12 +42,13 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import static com.yeejoin.amos.fas.business.constants.FasConstant.*;
@RestController
@RequestMapping("/api/view3d")
@Api(tags="全景监控api")
public class View3dController extends BaseController {
private final Logger log = LoggerFactory.getLogger(DictController.class);
@Autowired
private IRiskSourceService riskSourceService;
@Autowired
......@@ -321,15 +333,16 @@ public class View3dController extends BaseController {
}
@Autowired
private RocketMQTemplate rocketMQTemplate;
IRocketMQService rocketMQService;
@ApiOperation(value = "rocketMQ消息推送测试", notes = "rocketMQ消息推送测试")
@PostMapping(value="rocketMQ/send")
public CommonResponse rocketMQTemplate(@RequestParam(required = true, defaultValue = "all") String topic,
@RequestBody Map msg) {
@RequestBody Object msg) {
try {
rocketMQTemplate.convertAndSend(topic, msg);
rocketMQService.sendMsg(topic,"test_msg",msg);
} catch (Exception e) {
log.error(e.getMessage(), e);
return CommonResponseUtil.failure(e.getMessage());
}
return CommonResponseUtil.success(msg);
......
package com.yeejoin.amos.fas.business.dao.mapper;
import com.yeejoin.amos.fas.business.bo.CheckErrorBo;
import com.yeejoin.amos.fas.business.bo.FirePlanAlarmBo;
import com.yeejoin.amos.fas.business.bo.RiskPointRpnChangeBo;
import com.yeejoin.amos.fas.business.bo.SafetyExecuteBo;
import com.yeejoin.amos.fas.business.vo.View3dNodeVo;
......@@ -177,4 +178,6 @@ public interface View3dMapper extends BaseMapper {
* @return list
*/
List<Map<String, Object>> getAllPointInRegions(@Param("ids") List<Long> ids);
FirePlanAlarmBo getPlanAlarmInfo(@Param("batchNo")String batchNo);
}
package com.yeejoin.amos.fas.business.feign;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: lockie
* @Date: 2020/4/21 10:28
* @Description: mq生产者配置
*/
@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);
private String groupName;
private String namesrvAddr;
// 消息最大值
private Integer maxMessageSize;
// 消息发送超时时间
private Integer sendMsgTimeOut;
// 失败重试次数
private Integer retryTimesWhenSendFailed;
/**
* mq 生成者配置
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
public DefaultMQProducer defaultProducer() throws MQClientException {
LOGGER.info("defaultProducer 正在创建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setVipChannelEnabled(false);
producer.setMaxMessageSize(maxMessageSize);
producer.setSendMsgTimeout(sendMsgTimeOut);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
producer.start();
LOGGER.info("rocketmq producer server 开启成功----------------------------------");
return producer;
}
}
......@@ -27,7 +27,7 @@ import com.yeejoin.amos.fas.common.enums.EquipmentRiskTypeEnum;
import com.yeejoin.amos.fas.core.util.StringUtil;
import com.yeejoin.amos.fas.dao.entity.*;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
//import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
......@@ -136,8 +136,8 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService {
@Autowired
IContingencyPlanService iContingencyPlanService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
//@Autowired
//private RocketMQTemplate rocketMQTemplate;
@Value("${rocket-plan-topic}")
private String rocketTopic;
......@@ -177,24 +177,25 @@ public class HandlerMqttMessageImpl implements IEquipmentHandlerService {
if(topicEntity.getSimulationDate().equals("false") && nameKeys.contains(typeCode) && indexStateIsChange(equipmentSpecificIndex)){
log.info("指标值没有发生变化: " + equipmentSpecificIndex.getIotCode()+"-"+equipmentSpecificIndex.getNameKey()+":"+equipmentSpecificIndex.getValue());
// 三维屏指标状态推送
String msg = JSON.toJSONString(equipmentSpecificIndex);
equipmentSpecificIndex.setId(equipmentSpecific.getId());
equipmentSpecificIndex.setName(equipmentSpecific.getName());
equipmentSpecificIndex.setCode(equipmentSpecific.getCode());
equipmentSpecificIndex.setType("equip");
String msg = JSON.toJSONString(equipmentSpecificIndex);
String title = String.format("/%s/%s", serviceName, "data/refresh/indexStatus");
webMqttComponent.publish(title, msg);
// 中心级系统指标推送消息
if ("FIREALARM".equals(typeCode) && "true".equals(equipmentSpecificIndex.getValue()) && PlanFlagBo.getFlag()) {
log.info("RocketMQ发送的主题是: " + rocketTopic + ", 消息体是: " + msg + "!");
try {
rocketMQTemplate.convertAndSend(rocketTopic, msg);
PlanFlagBo.setFlag(Boolean.FALSE);
} catch (Exception e) {
throw new RuntimeException("RocketMQ消息发送失败!");
}
}
// // 中心级系统指标推送消息
// if ("FIREALARM".equals(typeCode) && "true".equals(equipmentSpecificIndex.getValue()) && PlanFlagBo.getFlag()) {
// log.info("RocketMQ发送的主题是: " + rocketTopic + ", 消息体是: " + msg + "!");
// try {
// // rocketMQTemplate.convertAndSend(rocketTopic, msg);
// PlanFlagBo.setFlag(Boolean.FALSE);
// } catch (Exception e) {
// throw new RuntimeException("RocketMQ消息发送失败!");
// }
// }
}
Equipment equipment = topicEntity.getEquipment()==null?impAndFireEquipMapper.queryImpEqumtByFireEquipmt(eqSpecId):topicEntity.getEquipment();
......
package com.yeejoin.amos.fas.business.service.impl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.fas.business.service.intfc.IRocketMQService;
@Service("rocketMQService")
public class RocketMQService implements IRocketMQService {
private final Logger log = LoggerFactory.getLogger(RocketMQService.class);
@Autowired
DefaultMQProducer defaultMQProducer;
public void sendMsg(String topic, String tag, Object msg){
try {
log.info("rocketMQtopic===="+topic);
log.info("rocketMQmsg===="+JSON.toJSONString(msg).toString());
Message sendMsg = new Message(topic, tag, JSON.toJSONString(msg).getBytes());
SendResult sendResult = defaultMQProducer.send(sendMsg);
log.info("rocketMQsendResult===="+JSON.toJSONString(sendResult).toString());
} catch (Exception e) {
// TODO Auto-generated catch block
log.error(e.getMessage(), e);
e.printStackTrace();
}
}
}
package com.yeejoin.amos.fas.business.service.intfc;
public interface IRocketMQService {
void sendMsg(String topic, String tag, Object msg);
}
......@@ -51,10 +51,21 @@ emqx.user-name=admin
emqx.password=public
#rocketmq生产者配置
rocketmq.name-server=172.16.3.51:9876
rocketmq.producer.group=my-group
rocketmq.producer.sendMessageTimeout=300000
rocket-plan-topic=/topic/fire/emergency-plan
# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.producer.namesrvAddr=172.16.3.51:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=30000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
rocket-plan-topic =topic_fire_emergency_plan
rocket-equip-alarm-topic =topic_fire_equip_alarm
#文件服务器地址
file.downLoad.url=http://172.16.11.201:9000/
......
spring.application.name = AMOS-AUTOSYS
spring.application.name = AMOS-AUTOSYS-my
server.servlet.context-path=/fireAutoSys
server.port = 8085
#environment
......
......@@ -1474,54 +1474,39 @@
</where>
LIMIT ${start},${length}
</select>
<!-- <select id="getAllPointInRegions" resultType="java.util.Map">-->
<!-- select-->
<!-- CONCAT(type,'-',id) as `key`,-->
<!-- id as pointId,-->
<!-- name,-->
<!-- type,-->
<!-- risk_source_id as regionId,-->
<!-- position3d-->
<!-- from-->
<!-- (select id,name,'riskSource' as type, parent_id as risk_source_id,position3d-->
<!-- from f_risk_source where is_region = 'FALSE'-->
<!-- UNION all-->
<!-- select id,name,'patrol' as type,risk_source_id,coordinates as position3d-->
<!-- from p_point WHERE is_delete = FALSE-->
<!-- UNION all-->
<!-- select id,name ,'impEquipment' as type,risk_source_id,position3d-->
<!-- from f_equipment e-->
<!-- UNION all-->
<!-- select id,name,'monitorEquipment' as type,risk_source_id,position3d-->
<!-- from f_fire_equipment where equip_classify = 0-->
<!-- UNION all-->
<!-- select id,name,'video' as type,risk_source_id,position3d-->
<!-- from f_fire_equipment where equip_classify = 2-->
<!-- UNION all-->
<!-- select id,name ,'hydrant' as type,risk_source_id,position3d-->
<!-- from f_water_resource where type = 1-->
<!-- UNION all-->
<!-- select id,name,'pool' as type,risk_source_id,position3d-->
<!-- from f_water_resource where type = 2-->
<!-- UNION all-->
<!-- select id,name,'fireCar' as type,risk_source_id,position3d-->
<!-- from f_fire_car-->
<!-- UNION all-->
<!-- select id,name,'fireEquipment' as type,risk_source_id,position3d-->
<!-- from f_fire_equipment where equip_classify = 3-->
<!-- UNION all-->
<!-- select id,name,'fireChamber' as type,risk_source_id,position3d-->
<!-- from f_fire_station where type = 2-->
<!-- UNION all-->
<!-- select id,name,'fireFoamRoom' as type,risk_source_id,position3d-->
<!-- from f_fire_station where type = 1-->
<!-- ) as sp-->
<!-- where-->
<!-- position3d <![CDATA[<>]]> '' and-->
<!-- risk_source_id in-->
<!-- <foreach collection="ids" open="(" separator="," close=")" item="id">-->
<!-- #{id}-->
<!-- </foreach>-->
<!-- order by regionId-->
<!-- </select>-->
<select id="getPlanAlarmInfo" resultType="com.yeejoin.amos.fas.business.bo.FirePlanAlarmBo">
SELECT
'00602' stationId,
cod.batch_No,
cod.equipment_Name AS deviceName,
fe.`code` AS deviceId,
cod.equipment_Id,
wes.`code` AS FiredeviceId,
cod.fire_Equipment_Id,
cod.fire_Equipment_Name AS FiredeviceName,
wesa.create_date AS warningTime,
IF (
ISNULL(wesa.id),
'',
CONCAT(
'位于',
wesa.location,
' ',
wes.`name`,
'发生',
wesa.equipment_specific_index_name
)
) AS warningInfo
FROM
`contingency_original_data` cod
LEFT JOIN f_equipment fe ON fe.id = cod.equipment_Id
LEFT JOIN wl_equipment_specific wes ON wes.id = cod.fire_Equipment_Id
LEFT JOIN wl_equipment_specific_alarm wesa ON wesa.equipment_specific_id = wes.id
WHERE
cod.batch_No = #{batchNo}
ORDER BY
wesa.create_date DESC
LIMIT 1
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment