Commit 4aee40fe authored by xixinzhao's avatar xixinzhao

稳压泵启动时长消息推送

parent 6dc848ac
......@@ -7,7 +7,8 @@ import java.util.Map;
public enum PressurePumpEnum {
// ALONE_START_YXSC("FHS_PressurePump_Start_ALONE_START_YXSC", "ge", "5", "", "aaa", "bbb", "last"),
ALONE_START_YXSC("FHS_PressurePump_Start_ALONE_START_YXSC","", "0 0/1 * * * ?", "5",
PressurePumpValueEnum.PUMP_START_TIME.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ_YXSC.getCode()),
ALONE_START_QT("FHS_PressurePump_Start_ALONE_START_QT", PressurePumpCheckEnum.LE.getCode(), "", "5",
PressurePumpValueEnum.LAST_STOP.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ.getCode()),
ALONE_STOP_QT("FHS_PressurePump_Stop_ALONE_STOP_QT", PressurePumpCheckEnum.GE.getCode(), "5", "",
......
......@@ -5,6 +5,7 @@ public enum PressurePumpValueEnum {
LAST_STOP("lastStop", "上次停泵时间"),
LAST_START("lastStart", "上次启泵时间"),
LATELY_STOP("latelyStop", "所有泵最近一次停泵时间"),
PUMP_START_TIME("pumpStartTime", "计算启动时常"),
LATELY_START("latelyStart", "所有泵最近一次启泵时间");
private String code;
......
package com.yeejoin.equipmanage.quartz;
import com.yeejoin.amos.feign.systemctl.model.MessageModel;
import com.yeejoin.equipmanage.common.entity.EquipmentSpecific;
import com.yeejoin.equipmanage.common.enums.PressurePumpEnum;
import com.yeejoin.equipmanage.common.enums.PressurePumpMessageEnum;
import com.yeejoin.equipmanage.common.utils.DateUtils;
import com.yeejoin.equipmanage.common.utils.StringUtil;
import com.yeejoin.equipmanage.common.vo.Token;
import com.yeejoin.equipmanage.fegin.SystemctlFeign;
import com.yeejoin.equipmanage.remote.RemoteSecurityService;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.text.SimpleDateFormat;
import java.util.*;
@Slf4j
public class PumpSendMessage implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
Map jobDataMap = (Map) jobExecutionContext.getJobDetail().getJobDataMap().get("parameterList");
String jobName = String.valueOf(jobDataMap.get("jobName"));
String triggerName = String.valueOf(jobDataMap.get("triggerName"));
String triggerGroupName = String.valueOf(jobDataMap.get("triggerGroupName"));
String jobGroupName = String.valueOf(jobDataMap.get("jobGroupName"));
EquipmentSpecific equipmentSpecific = (EquipmentSpecific)jobDataMap.get("equipmentSpecific");
PressurePumpEnum pressurePumpEnum = (PressurePumpEnum)jobDataMap.get("pressurePumpEnum");
SystemctlFeign systemctlFeign = (SystemctlFeign)jobDataMap.get("systemctlFeign");
RemoteSecurityService remoteSecurityService = (RemoteSecurityService)jobDataMap.get("remoteSecurityService");
MessageModel model = new MessageModel();
model.setSendTime(new Date());
String body = "";
Map<String, String> map = new HashMap<>(4);
String allMessage = PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ_YXSC.getAllMessage();
if (StringUtil.isNotEmpty(allMessage)) {
String content = String.format(allMessage, pressurePumpEnum.getRightValue());
map.put("content", content);
map.put("type", "漏水提醒");
map.put("time", new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(new Date()));
map.put("name", equipmentSpecific.getName());
}
String recordMessage = PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ_YXSC.getRecordMessage();
if (StringUtil.isNotEmpty(recordMessage)) {
body = String.format(recordMessage,new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(new Date()), equipmentSpecific.getName(), equipmentSpecific.getPosition(), pressurePumpEnum.getRightValue());
}
model.setBody(body);
model.setExtras(map);
model.setMsgType("pressurePump");
model.setIsSendApp(false);
model.setTerminal("WEB");
model.setIsSendWeb(true);
model.setCategory(1);
List<String> receive = new ArrayList<>();
receive.add("system");
model.setRecivers(receive);
Token token = remoteSecurityService.getServerToken();
systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
QuartzManager.removeJob(jobName,jobGroupName,triggerName,triggerGroupName);
} catch (Exception e) {
log.error("稳压泵运行时常消息发送失败"+ e.getMessage());
}
}
}
package com.yeejoin.equipmanage.quartz;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import java.util.Map;
public class QuartzManager {
private static SchedulerFactory gSchedulerFactory = new StdSchedulerFactory();
private static String JOB_GROUP_NAME = "EQUIP_JOBGROUP_NAME";
private static String TRIGGER_GROUP_NAME = "EQUIP_TRIGGERGROUP_NAME";
/**
* @Description: 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名
* @param jobName 任务名
* @param cls 任务
* @param time 时间设置,参考quartz说明文档
*/
public static void addJob(String jobName, Class cls, String time,Object scheduleJob) {
try {
Scheduler sched = gSchedulerFactory.getScheduler();
JobDetail job = JobBuilder.newJob(cls)
.withIdentity(jobName, JOB_GROUP_NAME)
.build();
// 添加具体任务方法
job.getJobDataMap().put("scheduleJob", scheduleJob);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(time);
// 按新的cronExpression表达式构建一个新的trigger
Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity(jobName, TRIGGER_GROUP_NAME)
.withSchedule(scheduleBuilder).build();
//交给scheduler去调度
sched.scheduleJob(job, trigger);
// 启动
if (!sched.isShutdown()) {
sched.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 添加一个定时任务
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param jobClass 任务
* @param time 时间设置,参考quartz说明文档
*/
public static void addJob(String jobName, String jobGroupName,
String triggerName, String triggerGroupName, Class jobClass,
String time, Map<String,Object> parameter) {
try {
Scheduler sched = gSchedulerFactory.getScheduler();
JobDetail job = JobBuilder.newJob(jobClass)
.withIdentity(jobName, jobGroupName)
.build();
job.getJobDataMap().put("parameterList", parameter);
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(time);
// 按新的cronExpression表达式构建一个新的trigger
Trigger trigger = TriggerBuilder
.newTrigger()
.withIdentity(triggerName, triggerGroupName)
.withSchedule(scheduleBuilder).build();
sched.scheduleJob(job, trigger);
// 启动
if (!sched.isShutdown()) {
sched.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 修改一个任务的触发时间(使用默认的任务组名,触发器名,触发器组名)
* @param jobName
* @param time
*/
public static void modifyJobTime(String jobName, String time) {
TriggerKey triggerKey = TriggerKey.triggerKey(
jobName, TRIGGER_GROUP_NAME);
try {
Scheduler sched = gSchedulerFactory.getScheduler();
CronTrigger trigger =(CronTrigger) sched.getTrigger(triggerKey);
if (trigger == null) {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(time)) {
CronScheduleBuilder scheduleBuilder =CronScheduleBuilder.cronSchedule(time);
//按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
.withSchedule(scheduleBuilder).build();
//按新的trigger重新设置job执行
sched.rescheduleJob(triggerKey, trigger);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 修改一个任务的触发时间
* @param triggerName
* @param triggerGroupName
* @param time
* @author qgw
* @date 2016年1月27日 下午4:45:15 ^_^
*/
public static void modifyJobTime(String triggerName,String triggerGroupName, String time) {
TriggerKey triggerKey = TriggerKey.triggerKey(
triggerName, triggerGroupName);
try {
Scheduler sched = gSchedulerFactory.getScheduler();
CronTrigger trigger = (CronTrigger) sched.getTrigger(triggerKey);
if (trigger == null) {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(time)) {
// trigger已存在,则更新相应的定时设置
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder
.cronSchedule(time);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey)
.withSchedule(scheduleBuilder).build();
// 按新的trigger重新设置job执行
sched.resumeTrigger(triggerKey);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description 移除一个任务(使用默认的任务组名,触发器名,触发器组名)
* @param jobName
* @author qgw
* @date 2016年1月29日 下午2:21:16 ^_^
*/
public static void removeJob(String jobName) {
TriggerKey triggerKey = TriggerKey.triggerKey(
jobName, TRIGGER_GROUP_NAME);
JobKey jobKey = JobKey.jobKey(jobName, JOB_GROUP_NAME);
try {
Scheduler sched = gSchedulerFactory.getScheduler();
Trigger trigger = (Trigger) sched.getTrigger(triggerKey);
if (trigger == null) {
return;
}
sched.pauseTrigger(triggerKey);;// 停止触发器
sched.unscheduleJob(triggerKey);// 移除触发器
sched.deleteJob(jobKey);// 删除任务
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 移除一个任务
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
* @author qgw
* @date 2016年1月29日 下午2:21:16 ^_^
*/
public static void removeJob(String jobName, String jobGroupName,
String triggerName, String triggerGroupName) {
TriggerKey triggerKey = TriggerKey.triggerKey(
jobName, triggerGroupName);
JobKey jobKey = JobKey.jobKey(jobName, jobGroupName);
try {
Scheduler sched = gSchedulerFactory.getScheduler();
sched.pauseTrigger(triggerKey);// 停止触发器
sched.unscheduleJob(triggerKey);// 移除触发器
sched.deleteJob(jobKey);// 删除任务
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:暂停一个任务
* @param jobName
* @param jobGroupName
*/
public static void pauseJob(String jobName, String jobGroupName) {
JobKey jobKey =JobKey.jobKey(jobName, jobName);
try {
Scheduler sched = gSchedulerFactory.getScheduler();
sched.pauseJob(jobKey);
} catch (SchedulerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* @Description:暂停一个任务(使用默认组名)
* @param jobName
*/
public static void pauseJob(String jobName) {
JobKey jobKey =JobKey.jobKey(jobName, JOB_GROUP_NAME);
try {
Scheduler sched = gSchedulerFactory.getScheduler();
sched.pauseJob(jobKey);
} catch (SchedulerException e) {
e.printStackTrace();
}
}
/**
* @Description:启动所有定时任务
* @author qgw
* @date 2016年1月29日 下午2:21:16 ^_^
*/
public static void startJobs() {
try {
Scheduler sched = gSchedulerFactory.getScheduler();
sched.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description 关闭所有定时任务
* @author qgw
* @date 2016年1月25日 下午2:26:54 ^_^
*/
public static void shutdownJobs() {
try {
Scheduler sched = gSchedulerFactory.getScheduler();
if (!sched.isShutdown()) {
sched.shutdown();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 任务是否存在
* @param jobName 任务名
* @param jobGroupName 任务组名
* @return 是否存在
*/
public static boolean checkExists(String jobName, String jobGroupName){
try {
Scheduler sched = gSchedulerFactory.getScheduler();
return sched.checkExists( new JobKey(jobName,
jobGroupName));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
......@@ -19,6 +19,8 @@ import com.yeejoin.equipmanage.common.utils.StringUtil;
import com.yeejoin.equipmanage.common.vo.*;
import com.yeejoin.equipmanage.fegin.SystemctlFeign;
import com.yeejoin.equipmanage.mapper.*;
import com.yeejoin.equipmanage.quartz.PumpSendMessage;
import com.yeejoin.equipmanage.quartz.QuartzManager;
import com.yeejoin.equipmanage.remote.RemoteSecurityService;
import com.yeejoin.equipmanage.service.*;
import com.yeejoin.equipmanage.utils.BeanUtil;
......@@ -61,6 +63,9 @@ import java.util.stream.Collectors;
@Slf4j
@Service
public class MqttReceiveServiceImpl implements MqttReceiveService {
private static String PUMP_JOB_GROUP_NAME = "EQUIP_PUMP_JOB_GROUP_NAME";
private static String PUMP_TRIGGER_NAME = "EQUIP_PUMP_TRIGGER_NAME";
private static String PUMP_TRIGGER_GROUP_NAME = "EQUIP_PUMP_TRIGGER_GROUP_NAME";
private static Map<String, TemperatureAlarmDto> temperatureMap = new HashMap<>();
......@@ -1337,7 +1342,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 1. 获取需要校验的值
PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue());
assert valueEnum != null;
EquipmentSpecificIndex data = getPressurePumpDateByType(valueEnum, topicEntity, equipmentSpeIndexList);
EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey,valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum);
Date newDate = new Date();
// 2. 校验
if (!ObjectUtils.isEmpty(data)) {
......@@ -1348,9 +1353,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
private EquipmentSpecificIndex getPressurePumpDateByType(PressurePumpValueEnum valueEnum, TopicEntityVo topicEntity, List<EquipmentSpecificIndex> equipmentSpeIndexList) {
private EquipmentSpecificIndex getPressurePumpDateByType(String indexKey, PressurePumpValueEnum valueEnum, TopicEntityVo topicEntity, List<EquipmentSpecificIndex> equipmentSpeIndexList, PressurePumpEnum pressurePumpEnum) {
String iotCode = topicEntity.getIotCode();
EquipmentSpecificIndex equipmentSpecificIndex = null;
String jobName = topicEntity.getIotCode()+"_"+indexKey;
String triggerName = PUMP_TRIGGER_NAME+"-"+topicEntity.getIotCode();
switch (valueEnum) {
case LAST_STOP:
List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e ->
......@@ -1361,6 +1368,12 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
break;
case LAST_START:
boolean b = QuartzManager.checkExists(jobName, PUMP_JOB_GROUP_NAME);
// 删除这个稳压泵的监听任务
if(b) {
QuartzManager.removeJob(jobName,PUMP_JOB_GROUP_NAME,triggerName,PUMP_TRIGGER_GROUP_NAME);
}
List<EquipmentSpecificIndex> lastStart = equipmentSpeIndexList.stream().filter(e ->
StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList());
......@@ -1384,6 +1397,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
equipmentSpecificIndex = latelyStart.get(0);
}
break;
case PUMP_START_TIME:
startTimeCompute(indexKey, topicEntity, pressurePumpEnum);
break;
default:
break;
}
......@@ -1476,15 +1492,57 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
break;
}
model.setSendTime(new Date());
model.setBody(body);
model.setExtras(map);
model.setMsgType("pressurePump");
model.setSendTime(new Date());
model.setIsSendApp(false);
model.setTerminal("WEB");
model.setIsSendWeb(true);
model.setCategory(1);
List<String> receive = new ArrayList<>();
receive.add("system");
model.setRecivers(receive);
Token token = remoteSecurityService.getServerToken();
systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
}
private void startTimeCompute(String indexKey, TopicEntityVo topicEntity, PressurePumpEnum pressurePumpEnum) {
String jobName = topicEntity.getIotCode()+"_"+indexKey;
String triggerName = PUMP_TRIGGER_NAME+"-"+topicEntity.getIotCode();
String cron = pressurePumpEnum.getLeftValue();
EquipmentSpecific equipmentSpecific = null;
try {
LambdaQueryWrapper<EquipmentSpecific> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(EquipmentSpecific::getIotCode, topicEntity.getIotCode());
equipmentSpecific = equipmentSpecificMapper.selectOne(wrapper);
}catch (Exception e) {
log.error("根据iotCod查询失败" + topicEntity.getIotCode());
}
boolean b = QuartzManager.checkExists(jobName, PUMP_JOB_GROUP_NAME);
if (indexKey.equals(pressurePumpStart)) {
if (b) {
// 任务存在 更新时间
QuartzManager.modifyJobTime(triggerName,PUMP_TRIGGER_GROUP_NAME,cron);
} else {
QuartzManager.removeJob(jobName,PUMP_JOB_GROUP_NAME,triggerName,PUMP_TRIGGER_GROUP_NAME);
// 任务不存在,新增
// 传参
if (ObjectUtils.isEmpty(equipmentSpecific)) {
return;
}
Map<String,Object> parameter = new HashMap<>(6);
parameter.put("jobName", jobName);
parameter.put("triggerName", triggerName);
parameter.put("triggerGroupName", PUMP_TRIGGER_GROUP_NAME);
parameter.put("jobGroupName", PUMP_JOB_GROUP_NAME);
parameter.put("equipmentSpecific", equipmentSpecific);
parameter.put("pressurePumpEnum", pressurePumpEnum);
parameter.put("remoteSecurityService", remoteSecurityService);
parameter.put("systemctlFeign", systemctlFeign);
QuartzManager.addJob(jobName,PUMP_JOB_GROUP_NAME,triggerName,PUMP_TRIGGER_GROUP_NAME, PumpSendMessage.class,cron,parameter);
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment