Commit 83f4203a authored by xixinzhao's avatar xixinzhao

中心级登陆后发送登录信息至站端

parent 96e04787
package com.yeejoin.amos.boot.module.jcs.api.dto;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.io.Serializable;
/**
* @author DELL
*/
@Data
@ApiModel("用户登录信息")
public class AuthDataDto implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty("机构编码")
private String agencyCode;
@ApiModelProperty("登录账号")
private String loginId;
@ApiModelProperty("用户编号")
private String userId;
@ApiModelProperty("用户名")
private String userName;
@ApiModelProperty("用户类型")
private String userType;
@ApiModelProperty("来源产品")
private String actionByProduct;
@ApiModelProperty("来源IP")
private String actionByIp;
@ApiModelProperty("session状态:激活,过期")
private String sessionStatus;
@ApiModelProperty("token")
private String token;
}
package com.yeejoin.amos.boot.module.jcs.api.service;
/**
* 中心级用户登录信息同步至站端
* @author DELL
*/
public interface ICcsToStationUserInfo {
/**
* 发送登录信息至站端
* @param userInfo 用户登录后信息
*/
void sendUserInfoToStation(String userInfo);
/**
* 保存用户信息至redis
*/
void saveUserInfoToRedis() throws Exception;
}
package com.yeejoin.amos.boot.module.jcs.biz.activeMq;
import com.yeejoin.amos.boot.module.jcs.api.service.ICcsToStationUserInfo;
import com.yeejoin.amos.boot.module.jcs.biz.config.StartLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
/**
* @author DELL
*/
@Component
public class QueueConsumerService {
private final Logger logger = LoggerFactory.getLogger(StartLoader.class);
@Autowired
ICcsToStationUserInfo ccsToStationUserInfo;
@JmsListener(destination = "amos.privilege.v1.STATE_GRID.AMOS_ADMIN.login", containerFactory = "jmsListenerContainerQueue")
public void message(TextMessage textMessage) {
try {
if(textMessage == null || textMessage.getText() == null){
return;
}
logger.info("收到activeMQ发送登录信息>>{}", textMessage.getText());
ccsToStationUserInfo.sendUserInfoToStation(textMessage.getText());
} catch (Exception e) {
logger.error("消费activeMQ发送登录信息失败,{}", e.getMessage());
}
}
}
package com.yeejoin.amos.boot.module.jcs.biz.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
*
* @author DELL
*/
@Configuration
@IntegrationComponentScan
public class ActiveMqQueueConfig {
@Value("${spring.activemq.user}")
private String usrName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
/**
* 连接MQ服务端
* @return
*/
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
}
/**
* MQ的队列监听
* @param connectionFactory
* @return
*/
@Bean("jmsListenerContainerQueue")
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
// 启用queue 不启用topic
bean.setPubSubDomain(false);
return bean;
}
}
...@@ -3,6 +3,7 @@ package com.yeejoin.amos.boot.module.jcs.biz.config; ...@@ -3,6 +3,7 @@ package com.yeejoin.amos.boot.module.jcs.biz.config;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.module.jcs.api.dto.AlertNewsDto; import com.yeejoin.amos.boot.module.jcs.api.dto.AlertNewsDto;
import com.yeejoin.amos.boot.module.jcs.api.service.ICcsToStationUserInfo;
import com.yeejoin.amos.component.rule.config.ClazzUtils; import com.yeejoin.amos.component.rule.config.ClazzUtils;
import com.yeejoin.amos.component.rule.config.RuleConfig; import com.yeejoin.amos.component.rule.config.RuleConfig;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
...@@ -35,10 +36,16 @@ public class StartLoader implements ApplicationRunner { ...@@ -35,10 +36,16 @@ public class StartLoader implements ApplicationRunner {
private String topic; private String topic;
@Value("${mqtt.topic.alert.iot.web}") @Value("${mqtt.topic.alert.iot.web}")
private String topicweb; private String topicweb;
@Autowired
ICcsToStationUserInfo ccsToStationUserInfo;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
logger.info("開始監聽物聯警情======================================"); logger.info("開始監聽物聯警情======================================");
loadSysParams(); loadSysParams();
// 监听中心级登录信息
ccsToStationUserInfo.saveUserInfoToRedis();
} }
public void loadSysParams(){ public void loadSysParams(){
......
package com.yeejoin.amos.boot.module.jcs.biz.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeejoin.amos.boot.module.jcs.api.dto.AuthDataDto;
import com.yeejoin.amos.boot.module.jcs.api.service.ICcsToStationUserInfo;
import com.yeejoin.amos.boot.module.jcs.biz.config.StartLoader;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.component.cache.Redis;
import org.typroject.tyboot.component.cache.enumeration.CacheType;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
/**
* 中心级用户登录信息同步实现类
* @author xxz
*/
@Service
public class CcsToStationUserInfoImpl implements ICcsToStationUserInfo {
private final Logger logger = LoggerFactory.getLogger(StartLoader.class);
private static final String SESSION = "SESSION";
private static final String SESSION_TOKEN = "SESSION_TOKEN";
private static final String SESSION_LOGINID = "SESSION_LOGINID";
private static final String SESSION_USERID = "SESSION_USERID";
private static Long DEFAULT_SESSION_EXPIRATION = 2592000L;
@Autowired
private EmqKeeper emqKeeper;
@Autowired
private RedisTemplate redisTemplate;
@Value("${mqtt.topic.ccs.user.login}")
private String ccsUserLogin;
@Value("${mqtt.topic.station.user.login}")
private String stationUserLogin;
@Override
public void sendUserInfoToStation(String userInfo) {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", stationUserLogin);
jsonObject.put("data", userInfo);
emqKeeper.getMqttClient().publish(ccsUserLogin, jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8), 0, false);
logger.info("发送用户登录信息至站端>>>>{}", userInfo);
} catch (Exception e) {
logger.error("发送登录用户信息至站端失败{}", e.getMessage());
}
}
@Override
public void saveUserInfoToRedis() {
try {
emqKeeper.subscript(stationUserLogin, 1, new EmqxListener() {
@Override
public void processMessage(String s, MqttMessage mqttMessage) {
byte[] payload = mqttMessage.getPayload();
String obj = new String(payload);
if (!ValidationUtil.isEmpty(obj)) {
JSONObject json = JSON.parseObject(obj);
JSONObject body = JSON.parseObject(JSON.toJSONString(json.get("body")));
AuthDataDto authData = JSON.parseObject(JSON.toJSONString(body.get("authData")), AuthDataDto.class);
if (!ObjectUtils.isEmpty(authData)) {
createSession(authData);
}
}
}
});
} catch (Exception e) {
logger.info("订阅中心级用户登录信息异常>>{}", e.getMessage());
}
}
private void createSession(AuthDataDto authData) {
this.removeSession(authData.getActionByProduct(),authData.getLoginId());
this.redisTemplate.opsForValue().set(sessionCacheKeyWithToken(authData.getToken(),authData.getActionByProduct()),authData);
this.redisTemplate.opsForValue().set(sessionCacheKeyWithLoginId(authData.getLoginId(),authData.getActionByProduct()),authData);
this.redisTemplate.opsForValue().set(loginIdCacheWithToken(authData.getToken()),authData.getLoginId());
this.redisTemplate.opsForValue().set(sessionCacheKeyWithUserId(authData.getUserId()),authData);
redisTemplate.expire(sessionCacheKeyWithToken(authData.getToken(),authData.getActionByProduct()),DEFAULT_SESSION_EXPIRATION, TimeUnit.SECONDS);
redisTemplate.expire(sessionCacheKeyWithLoginId(authData.getLoginId(),authData.getActionByProduct()),DEFAULT_SESSION_EXPIRATION, TimeUnit.SECONDS);
redisTemplate.expire(loginIdCacheWithToken(authData.getToken()),DEFAULT_SESSION_EXPIRATION, TimeUnit.SECONDS);
redisTemplate.expire(sessionCacheKeyWithUserId(authData.getUserId()),DEFAULT_SESSION_EXPIRATION, TimeUnit.SECONDS);
}
private void removeSession(String actionByProduct ,String loginId) {
AuthDataDto sessionsModel = (AuthDataDto)this.redisTemplate.opsForValue().get(sessionCacheKeyWithLoginId(loginId,actionByProduct));
if(!ObjectUtils.isEmpty(sessionsModel)) {
this.redisTemplate.delete(sessionCacheKeyWithLoginId(loginId,actionByProduct));
this.redisTemplate.delete(sessionCacheKeyWithToken(sessionsModel.getToken(),actionByProduct));
AuthDataDto model = queryByUserId(sessionsModel.getUserId());
if(!ObjectUtils.isEmpty(model) && model.getToken().equals(sessionsModel.getToken())) {
this.redisTemplate.delete(sessionCacheKeyWithUserId(sessionsModel.getUserId()));
}
}
}
private static String sessionCacheKeyWithToken(String token,String actionByProduct) {
return Redis.genKey(CacheType.ERASABLE.name(),SESSION_TOKEN,actionByProduct,token);
}
private static String sessionCacheKeyWithLoginId(String loginId,String actionByProduct) {
return Redis.genKey(CacheType.ERASABLE.name(),SESSION,actionByProduct,loginId);
}
private static String loginIdCacheWithToken(String token) {
return Redis.genKey(CacheType.ERASABLE.name(),SESSION_LOGINID,token);
}
private static String sessionCacheKeyWithUserId(String userId) {
return Redis.genKey(CacheType.ERASABLE.name(),SESSION_USERID,userId);
}
private AuthDataDto queryByUserId(String userId) {
return (AuthDataDto) this.redisTemplate.opsForValue().get(sessionCacheKeyWithUserId(userId));
}
}
#注册中心地址 #注册中心地址
eureka.client.service-url.defaultZone =http://172.16.11.201:10001/eureka/ eureka.client.service-url.defaultZone =http://172.16.10.216:10001/eureka/
eureka.instance.prefer-ip-address=true eureka.instance.prefer-ip-address=true
management.endpoint.health.show-details=always management.endpoint.health.show-details=always
management.endpoints.web.exposure.include=* management.endpoints.web.exposure.include=*
eureka.instance.health-check-url-path=/actuator/health eureka.instance.health-check-url-path=/actuator/health
eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator eureka.instance.metadata-map.management.context-path=${server.servlet.context-path}/actuator
eureka.instance.status-page-url-path=/actuator/info eureka.instance.status-page-url-path=/actuator/info
eureka.instance.metadata-map.management.api-docs=http://172.16.11.201:${server.port}${server.servlet.context-path}/swagger-ui.html eureka.instance.metadata-map.management.api-docs=http://172.16.10.216:${server.port}${server.servlet.context-path}/swagger-ui.html
# kafka集群信息 # kafka集群信息
...@@ -74,7 +74,7 @@ management.health.redis.enabled=false ...@@ -74,7 +74,7 @@ management.health.redis.enabled=false
## emqx ## emqx
emqx.clean-session=true emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]} emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.11.201:1883 emqx.broker=tcp://172.16.10.216:1883
emqx.client-user-name=admin emqx.client-user-name=admin
emqx.client-password=public emqx.client-password=public
emqx.max-inflight=1000 emqx.max-inflight=1000
...@@ -83,16 +83,17 @@ emqx.max-inflight=1000 ...@@ -83,16 +83,17 @@ emqx.max-inflight=1000
# 下面个配置默认站端 中心级系统的时候注释掉上边 放开下边 # 下面个配置默认站端 中心级系统的时候注释掉上边 放开下边
#站端配置 #站端配置
#需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 #需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
kafka.topics=null #kafka.topics=null
#需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, #需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created #emq.topic=emq.xf.created,emq.iot.created,emq.patrol.created,emq.sign.created,emq.bussSign.created,emq.user.created,emq.risk.created
##中心级配置配置 ##中心级配置配置
##需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 ##需要监听得kafka消息主题 根据是否是中心极和站端选择需要监听得主题进行配置
#kafka.topics=JKXT2BP-XFYY-Topic kafka.topics=JKXT2BP-XFYY-Topic
# #
##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created, ##需要监听得eqm消息主题 根据是否是中心极和站端选择需要监听得主题进行配置 emq.iot.created,
#emq.topic= emq.topic=ccs-user-login-info,sync.execute
queue.kafka.topics=null queue.kafka.topics=null
kafka.auto-startup=false kafka.auto-startup=false
\ No newline at end of file
...@@ -43,5 +43,15 @@ ...@@ -43,5 +43,15 @@
"code": "equipQrcode", "code": "equipQrcode",
"emqTopic": "emq.mcb.zxj", "emqTopic": "emq.mcb.zxj",
"akkaTopic": "JKXT2BP-XFYY-Topic" "akkaTopic": "JKXT2BP-XFYY-Topic"
},
{
"code": "ccsLoginInfo",
"emqTopic": "ccs-user-login-info",
"akkaTopic": "JKXT2BP-XFZX-Topic"
},
{
"code": "syncExecute",
"emqTopic": "sync.execute",
"akkaTopic": "JKXT2BP-XFZX-Topic"
} }
] ]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment