Commit 91b71334 authored by KeYong's avatar KeYong

注释掉kafka

parent 359072bb
...@@ -141,11 +141,6 @@ ...@@ -141,11 +141,6 @@
<artifactId>amos-component-security</artifactId> <artifactId>amos-component-security</artifactId>
<version>1.7.13-SNAPSHOT</version> <version>1.7.13-SNAPSHOT</version>
</dependency> </dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -107,9 +107,6 @@ public class ContingencyAction implements CustomerAction { ...@@ -107,9 +107,6 @@ public class ContingencyAction implements CustomerAction {
@Autowired @Autowired
IRocketMQService rocketMQService; IRocketMQService rocketMQService;
// @Autowired
// IKafkaProducer iKafkaProducer;
@Autowired @Autowired
private IContingencyInstance contingencyInstance; private IContingencyInstance contingencyInstance;
...@@ -240,7 +237,7 @@ public class ContingencyAction implements CustomerAction { ...@@ -240,7 +237,7 @@ public class ContingencyAction implements CustomerAction {
// 将预案的确认消息发送至中心级 // 将预案的确认消息发送至中心级
if ("CONFIRM".equals(ro.getConfirm())) { if ("CONFIRM".equals(ro.getConfirm())) {
log.info("RocketMQ与Kafka发送的主题是: " + rocketTopic + ", 消息体是: " + toipResponse.toJsonStr() + "!"); log.info("RocketMQ发送的主题是: " + rocketTopic + ", 消息体是: " + toipResponse.toJsonStr() + "!");
try { try {
rocketMQService.sendMsg(rocketTopic, "plan_process", toipResponse); rocketMQService.sendMsg(rocketTopic, "plan_process", toipResponse);
} catch (Exception e) { } catch (Exception e) {
...@@ -248,13 +245,6 @@ public class ContingencyAction implements CustomerAction { ...@@ -248,13 +245,6 @@ public class ContingencyAction implements CustomerAction {
throw new RuntimeException("RocketMQ消息发送失败!"); throw new RuntimeException("RocketMQ消息发送失败!");
} }
// kafka消息发送至南瑞平台
// try {
// iKafkaProducer.sendMessage(rocketTopic, "plan_process", toipResponse);
// } catch (Exception e) {
// log.error(e.getMessage(), e);
// throw new RuntimeException("Kafka消息发送失败!");
// }
} }
} else if ("websocket".equals(pushType.toLowerCase())) { } else if ("websocket".equals(pushType.toLowerCase())) {
action.execute(msgType, contingency); action.execute(msgType, contingency);
......
package com.yeejoin.amos.fas.business.action.mq;
/**
* @author keyong
* @title: IKafkaProducer
* <pre>
* @description:
* </pre>
* @date 2022/10/14 15:16
*/
public interface IKafkaProducer {
void sendMessage(String topic, String key, Object message);
}
package com.yeejoin.amos.fas.business.action.mq;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author keyong
* @title: KafkaProducer
* <pre>
* @description:
* </pre>
* @date 2022/10/14 10:59
*/
@Configuration
@EnableKafka
public class KafkaProducer implements IKafkaProducer {
private final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Value("${kafka.producer.sysIsUsed}")
private String sysIsUsed;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static class KafkaFutureCallback implements ListenableFutureCallback<SendResult> {
private final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
@Override
public void onFailure(Throwable ex) {
log.error("send kafka failed: {}", ex);
}
@Override
public void onSuccess(SendResult result) {
log.info("send kafka success: {}", result.getProducerRecord());
}
}
@Override
public void sendMessage(String topic, String key, Object message) {
if("on".equalsIgnoreCase(sysIsUsed)){
log.info("Kafka Topic===="+topic);
log.info("Kafka msg===="+ JSON.toJSONString(message));
kafkaTemplate.send(topic, key, message).addCallback(new KafkaFutureCallback());
}else{
log.info("Kafka SendResult====kafka.producer.sysIsUsed is not open !!!");
}
}
}
package com.yeejoin.amos.fas.business.controller; package com.yeejoin.amos.fas.business.controller;
import static com.yeejoin.amos.fas.business.constants.FasConstant.appKey;
import static com.yeejoin.amos.fas.business.constants.FasConstant.product;
import static com.yeejoin.amos.fas.business.constants.FasConstant.staticOrgCode;
import static com.yeejoin.amos.fas.business.constants.FasConstant.token;
import java.util.List;
import com.yeejoin.amos.fas.business.action.mq.IKafkaProducer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
//import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.yeejoin.amos.fas.business.bo.BindPointBo; import com.yeejoin.amos.fas.business.bo.BindPointBo;
import com.yeejoin.amos.fas.business.bo.BindRegionBo; import com.yeejoin.amos.fas.business.bo.BindRegionBo;
import com.yeejoin.amos.fas.business.param.RetrieveParams; import com.yeejoin.amos.fas.business.param.RetrieveParams;
...@@ -38,10 +12,16 @@ import com.yeejoin.amos.fas.common.enums.ResourceTypeDefEnum; ...@@ -38,10 +12,16 @@ import com.yeejoin.amos.fas.common.enums.ResourceTypeDefEnum;
import com.yeejoin.amos.fas.config.Permission; import com.yeejoin.amos.fas.config.Permission;
import com.yeejoin.amos.fas.core.util.CommonResponse; import com.yeejoin.amos.fas.core.util.CommonResponse;
import com.yeejoin.amos.fas.core.util.CommonResponseUtil; import com.yeejoin.amos.fas.core.util.CommonResponseUtil;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController @RestController
@RequestMapping("/api/view3d") @RequestMapping("/api/view3d")
...@@ -338,9 +318,6 @@ public class View3dController extends BaseController { ...@@ -338,9 +318,6 @@ public class View3dController extends BaseController {
@Autowired @Autowired
IRocketMQService rocketMQService; IRocketMQService rocketMQService;
@Autowired
IKafkaProducer iKafkaProducer;
@ApiOperation(value = "rocketMQ消息推送测试", notes = "rocketMQ消息推送测试") @ApiOperation(value = "rocketMQ消息推送测试", notes = "rocketMQ消息推送测试")
@PostMapping(value="rocketMQ/send") @PostMapping(value="rocketMQ/send")
...@@ -356,17 +333,17 @@ public class View3dController extends BaseController { ...@@ -356,17 +333,17 @@ public class View3dController extends BaseController {
return CommonResponseUtil.success(msg); return CommonResponseUtil.success(msg);
} }
@ApiOperation(value = "kafka消息推送测试", notes = "kafka消息推送测试") // @ApiOperation(value = "kafka消息推送测试", notes = "kafka消息推送测试")
@PostMapping(value="kafka/send") // @PostMapping(value="kafka/send")
public CommonResponse kafkaMsgSendTest(@RequestParam(name = "topic", required = false) String topic, // public CommonResponse kafkaMsgSendTest(@RequestParam(name = "topic", required = false) String topic,
@RequestBody Object msg) { // @RequestBody Object msg) {
try { // try {
iKafkaProducer.sendMessage(topic, "kafka_msg_test", JSON.toJSONString(msg)); // iKafkaProducer.sendMessage(topic, "kafka_msg_test", JSON.toJSONString(msg));
} catch (Exception e) { // } catch (Exception e) {
log.error(e.getMessage(), e); // log.error(e.getMessage(), e);
return CommonResponseUtil.failure(e.getMessage()); // return CommonResponseUtil.failure(e.getMessage());
} // }
return CommonResponseUtil.success(msg); // return CommonResponseUtil.success(msg);
} // }
} }
...@@ -8,13 +8,13 @@ spring.datasource.hikari.maximum-pool-size = 10 ...@@ -8,13 +8,13 @@ spring.datasource.hikari.maximum-pool-size = 10
spring.datasource.testWhileIdle = true spring.datasource.testWhileIdle = true
spring.datasource.validationQuery = SELECT 1 spring.datasource.validationQuery = SELECT 1
#系统服务账号,用户后端服务调用 #\u7CFB\u7EDF\u670D\u52A1\u8D26\u53F7\uFF0C\u7528\u6237\u540E\u7AEF\u670D\u52A1\u8C03\u7528
#security.password=a1234560 #security.password=a1234560
#security.loginId=fas_autosys #security.loginId=fas_autosys
amos.system.user.user-name=fas_autosys amos.system.user.user-name=fas_autosys
amos.system.user.password=a1234560 amos.system.user.password=a1234560
#应用product appkey #\u5E94\u7528product appkey
#security.productWeb=STUDIO_APP_WEB #security.productWeb=STUDIO_APP_WEB
#security.appKey =studio_normalapp_3056965 #security.appKey =studio_normalapp_3056965
amos.system.user.app-key=studio_normalapp_3056965 amos.system.user.app-key=studio_normalapp_3056965
...@@ -45,7 +45,7 @@ file.uploadUrl=C:\\upload\\files\\ ...@@ -45,7 +45,7 @@ file.uploadUrl=C:\\upload\\files\\
#picture read #picture read
file.readUrl=http://172.16.11.201:8085/file/getFile?in= file.readUrl=http://172.16.11.201:8085/file/getFile?in=
#jpush 推送开关 #jpush \u63A8\u9001\u5F00\u5173
params.isPush=false params.isPush=false
## emqx ## emqx
emqx.clean-session=true emqx.clean-session=true
...@@ -54,82 +54,63 @@ emqx.broker=tcp://172.16.11.201:1883 ...@@ -54,82 +54,63 @@ emqx.broker=tcp://172.16.11.201:1883
emqx.user-name=admin emqx.user-name=admin
emqx.password=public emqx.password=public
#文件服务器地址 #\u6587\u4EF6\u670D\u52A1\u5668\u5730\u5740
file.downLoad.url=http://172.16.11.201:9000/ file.downLoad.url=http://172.16.11.201:9000/
#应急处置移动端默认头像地址 #\u5E94\u6025\u5904\u7F6E\u79FB\u52A8\u7AEF\u9ED8\u8BA4\u5934\u50CF\u5730\u5740
plan.instance.personImg=upload/3dview_icon/plan_via.png plan.instance.personImg=upload/3dview_icon/plan_via.png
plan.instance.playImg=upload/3dview_icon/plan_play.png plan.instance.playImg=upload/3dview_icon/plan_play.png
#应急预案动作执行默认角色编码 #\u5E94\u6025\u9884\u6848\u52A8\u4F5C\u6267\u884C\u9ED8\u8BA4\u89D2\u8272\u7F16\u7801
plan.default.roleCode=Digital_Responsing_Plan_A plan.default.roleCode=Digital_Responsing_Plan_A
# 是否使用rocketmq on/off # \u662F\u5426\u4F7F\u7528rocketmq on/off
rocketmq.producer.sysIsUsed=off rocketmq.producer.sysIsUsed=off
#rocketmq生产者配置 #rocketmq\u751F\u4EA7\u8005\u914D\u7F6E
# 是否开启自动配置 # \u662F\u5426\u5F00\u542F\u81EA\u52A8\u914D\u7F6E
rocketmq.producer.isOnOff=on rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识 # \u53D1\u9001\u540C\u4E00\u7C7B\u6D88\u606F\u8BBE\u7F6E\u4E3A\u540C\u4E00\u4E2Agroup\uFF0C\u4FDD\u8BC1\u552F\u4E00\u9ED8\u8BA4\u4E0D\u9700\u8981\u8BBE\u7F6E\uFF0Crocketmq\u4F1A\u4F7F\u7528ip@pid\uFF08pid\u4EE3\u8868jvm\u540D\u5B57\uFF09\u4F5C\u4E3A\u552F\u4E00\u6807\u8BC6
rocketmq.producer.groupName=${spring.application.name} rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址 # mq\u7684nameserver\u5730\u5740
rocketmq.producer.namesrvAddr=172.16.3.135:9876 rocketmq.producer.namesrvAddr=172.16.3.135:9876
# 消息最大长度 默认 1024 * 4 (4M) # \u6D88\u606F\u6700\u5927\u957F\u5EA6 \u9ED8\u8BA4 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096 rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000 # \u53D1\u9001\u6D88\u606F\u8D85\u65F6\u65F6\u95F4\uFF0C\u9ED8\u8BA4 3000
rocketmq.producer.sendMsgTimeOut=30000 rocketmq.producer.sendMsgTimeOut=30000
# 发送消息失败重试次数,默认2 # \u53D1\u9001\u6D88\u606F\u5931\u8D25\u91CD\u8BD5\u6B21\u6570\uFF0C\u9ED8\u8BA42
rocketmq.producer.retryTimesWhenSendFailed=2 rocketmq.producer.retryTimesWhenSendFailed=2
# 是否使用rocketmq on/off # \u662F\u5426\u4F7F\u7528rocketmq on/off
rocketmq.producer.sysIsUsed2=off rocketmq.producer.sysIsUsed2=off
#自定义groupName2 与 第一个name区分开 #\u81EA\u5B9A\u4E49groupName2 \u4E0E \u7B2C\u4E00\u4E2Aname\u533A\u5206\u5F00
rocketmq.producer.groupName2=groupName2 rocketmq.producer.groupName2=groupName2
# mq的nameserver地址 # mq\u7684nameserver\u5730\u5740
rocketmq.producer.namesrvAddr2=172.16.3.135:9876 rocketmq.producer.namesrvAddr2=172.16.3.135:9876
rocket-plan-topic =topic_fire_emergency_plan rocket-plan-topic =topic_fire_emergency_plan
rocket-equip-alarm-topic =topic_fire_equip_alarm rocket-equip-alarm-topic =topic_fire_equip_alarm
#规则ip配置,用于多网卡及docker镜像启动时添加 #\u89C4\u5219ip\u914D\u7F6E\uFF0C\u7528\u4E8E\u591A\u7F51\u5361\u53CAdocker\u955C\u50CF\u542F\u52A8\u65F6\u6DFB\u52A0
#rule.definition.local-ip=172.16.11.201 #rule.definition.local-ip=172.16.11.201
#3Dtype 分为web和ue #3Dtype \u5206\u4E3Aweb\u548Cue
integrated3Dtype =web integrated3Dtype =web
#数据同步开关 #\u6570\u636E\u540C\u6B65\u5F00\u5173
systemctl.sync.switch=false systemctl.sync.switch=false
#数字化应急预案V1.0.0.2版本,WEB数据组装,值为true,默认false #\u6570\u5B57\u5316\u5E94\u6025\u9884\u6848V1.0.0.2\u7248\u672C\uFF0CWEB\u6570\u636E\u7EC4\u88C5\uFF0C\u503C\u4E3Atrue\uFF0C\u9ED8\u8BA4false
plan.web.isUpdatePlanStep=false plan.web.isUpdatePlanStep=false
#地图动作区域展示 #\u5730\u56FE\u52A8\u4F5C\u533A\u57DF\u5C55\u793A
maparea.action.is-area=action1-2,action1-6 maparea.action.is-area=action1-2,action1-6
plan.dynamic.execut.topic=\u6362\u6D41\u7AD9\u6D88\u9632\u4E13\u9879\u9884\u6848/autoExec plan.dynamic.execut.topic=\u6362\u6D41\u7AD9\u6D88\u9632\u4E13\u9879\u9884\u6848/autoExec
#服务端获取isdp的token用 #\u670D\u52A1\u7AEF\u83B7\u53D6isdp\u7684token\u7528
sso.client.id=dce sso.client.id=dce
sso.client.secret=6t5oDDKhEODXa++UNUxxLHSF5kVqECq6j+wahtCbv8c= sso.client.secret=6t5oDDKhEODXa++UNUxxLHSF5kVqECq6j+wahtCbv8c=
sso.login.type=server_auth sso.login.type=server_auth
sso.login.client=dce sso.login.client=dce
sso.client.url=https://198.87.103.88:30443/oauth2/oauth/rest_token sso.client.url=https://198.87.103.88:30443/oauth2/oauth/rest_token
#Kafka 相关配置
kafka.producer.sysIsUsed = on
spring.kafka.bootstrap-servers=172.16.3.51:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
...@@ -84,50 +84,6 @@ ...@@ -84,50 +84,6 @@
alter table `f_fire_equipment` add column `ue4_rotation` text default null comment 'ue4旋转' after `position3d`; alter table `f_fire_equipment` add column `ue4_rotation` text default null comment 'ue4旋转' after `position3d`;
</sql> </sql>
</changeSet> </changeSet>
<changeSet author="suhuiguang" id="1587350593716-1">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="f_fire_station" columnName="ue4_location"/>
</not>
</preConditions>
<comment>f_fire_station add column ue4_location</comment>
<sql>
alter table `f_fire_station` add column `ue4_location` text default null comment 'ue4位置' after `position3d`;
</sql>
</changeSet>
<changeSet author="suhuiguang" id="1587350593716-2">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="f_fire_station" columnName="ue4_rotation"/>
</not>
</preConditions>
<comment>f_fire_station add column ue4_rotation</comment>
<sql>
alter table `f_fire_station` add column `ue4_rotation` text default null comment 'ue4旋转' after `position3d`;
</sql>
</changeSet>
<changeSet author="suhuiguang" id="1587350759717-1">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="f_water_resource" columnName="ue4_location"/>
</not>
</preConditions>
<comment>f_water_resource add column ue4_location</comment>
<sql>
alter table `f_water_resource` add column `ue4_location` text default null comment 'ue4位置' after `position3d`;
</sql>
</changeSet>
<changeSet author="suhuiguang" id="1587350759717-2">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="f_water_resource" columnName="ue4_rotation"/>
</not>
</preConditions>
<comment>f_water_resource add column ue4_rotation</comment>
<sql>
alter table `f_water_resource` add column `ue4_rotation` text default null comment 'ue4旋转' after `position3d`;
</sql>
</changeSet>
<changeSet author="suhuiguang" id="1587350860716-1"> <changeSet author="suhuiguang" id="1587350860716-1">
<preConditions onFail="MARK_RAN"> <preConditions onFail="MARK_RAN">
<not> <not>
...@@ -153,20 +109,6 @@ ...@@ -153,20 +109,6 @@
</sql> </sql>
</changeSet> </changeSet>
<changeSet id="1587882668719-1" author="suhuiguang">
<preConditions onFail="MARK_RAN">
<not>
<indexExists indexName="idx_type"/>
</not>
</preConditions>
<createIndex
indexName="idx_type"
tableName="f_fire_station"
tablespace="A String"
unique="false">
<column name="type"/>
</createIndex>
</changeSet>
<changeSet author="shanqiyun" id="1588067351000-1"> <changeSet author="shanqiyun" id="1588067351000-1">
<preConditions onFail="MARK_RAN"> <preConditions onFail="MARK_RAN">
<not> <not>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment