Commit 6385e930 authored by 高建强's avatar 高建强

item:添加预案数据同步

parent 9033007a
......@@ -19,6 +19,18 @@
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
......
package com.boot.bus.sqlsync.async.callback;
import com.boot.bus.sqlsync.async.worker.WorkResult;
/**
* 默认回调类,如果不设置的话,会默认给这个回调
* @author wuweifeng wrote on 2019-11-19.
*/
public class DefaultCallback<T, V> implements ICallback<T, V> {
@Override
public void begin() {
}
@Override
public void result(boolean success, T param, WorkResult<V> workResult) {
}
}
package com.boot.bus.sqlsync.async.callback;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import java.util.List;
/**
* @author wuweifeng wrote on 2019-12-27
* @version 1.0
*/
public class DefaultGroupCallback implements IGroupCallback {
@Override
public void success(List<WorkerWrapper> workerWrappers) {
}
@Override
public void failure(List<WorkerWrapper> workerWrappers, Exception e) {
}
}
package com.boot.bus.sqlsync.async.callback;
import com.boot.bus.sqlsync.async.worker.WorkResult;
/**
* 每个执行单元执行完毕后,会回调该接口</p>
* 需要监听执行结果的,实现该接口即可
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface ICallback<T, V> {
/**
* 任务开始的监听
*/
default void begin() {
}
/**
* 耗时操作执行完毕后,就给value注入值
*/
void result(boolean success, T param, WorkResult<V> workResult);
}
package com.boot.bus.sqlsync.async.callback;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import java.util.List;
/**
* 如果是异步执行整组的话,可以用这个组回调。不推荐使用
* @author wuweifeng wrote on 2019-11-19.
*/
public interface IGroupCallback {
/**
* 成功后,可以从wrapper里去getWorkResult
*/
void success(List<WorkerWrapper> workerWrappers);
/**
* 失败了,也可以从wrapper里去getWorkResult
*/
void failure(List<WorkerWrapper> workerWrappers, Exception e);
}
package com.boot.bus.sqlsync.async.callback;
/**
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
/**
* 每个worker都可以设置超时时间
* @return 毫秒超时时间
*/
long timeOut();
/**
* 是否开启单个执行单元的超时功能(有时是一个group设置个超时,而不具备关心单个worker的超时)
* <p>注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍</p>
* @return 是否开启
*/
boolean enableTimeOut();
}
package com.boot.bus.sqlsync.async.callback;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* 每个最小执行单元需要实现该接口
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface IWorker<T, V> {
/**
* 在这里做耗时操作,如rpc请求、IO等
*
* @param object object
* @param allWrappers 任务包装
*/
V action(T object, Map<String, WorkerWrapper> allWrappers);
/**
* 超时、异常时,返回的默认值
*
* @return 默认值
*/
default V defaultValue() {
return null;
}
}
package com.boot.bus.sqlsync.async.exception;
/**
* 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception
* @author wuweifeng wrote on 2020-02-18
* @version 1.0
*/
public class SkippedException extends RuntimeException {
public SkippedException() {
super();
}
public SkippedException(String message) {
super(message);
}
}
package com.boot.bus.sqlsync.async.executor;
import com.boot.bus.sqlsync.async.callback.DefaultGroupCallback;
import com.boot.bus.sqlsync.async.callback.IGroupCallback;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 类入口,可以根据自己情况调整core线程的数量
* @author wuweifeng wrote on 2019-12-18
* @version 1.0
*/
public class Async {
/**
* 默认线程池
*/
private static final ThreadPoolExecutor COMMON_POOL =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024,
15L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
(ThreadFactory) Thread::new);
/**
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
*/
private static ExecutorService executorService;
/**
* 出发点
*/
public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
if(workerWrappers == null || workerWrappers.size() == 0) {
return false;
}
//保存线程池变量
Async.executorService = executorService;
//定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result
Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
Set<WorkerWrapper> set = new HashSet<>();
totalWorkers(workerWrappers, set);
for (WorkerWrapper wrapper : set) {
wrapper.stopNow();
}
return false;
}
}
/**
* 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL
*/
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
if(workerWrapper == null || workerWrapper.length == 0) {
return false;
}
List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList());
return beginWork(timeout, executorService, workerWrappers);
}
/**
* 同步阻塞,直到所有都完成,或失败
*/
public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
return beginWork(timeout, COMMON_POOL, workerWrapper);
}
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper);
}
/**
* 异步执行,直到所有都完成,或失败后,发起回调
*/
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
if (groupCallback == null) {
groupCallback = new DefaultGroupCallback();
}
IGroupCallback finalGroupCallback = groupCallback;
if (executorService != null) {
executorService.submit(() -> {
try {
boolean success = beginWork(timeout, executorService, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
} else {
COMMON_POOL.submit(() -> {
try {
boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
}
}
/**
* 总共多少个执行单元
*/
@SuppressWarnings("unchecked")
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
set.addAll(workerWrappers);
for (WorkerWrapper wrapper : workerWrappers) {
if (wrapper.getNextWrappers() == null) {
continue;
}
List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
totalWorkers(wrappers, set);
}
}
/**
* 关闭线程池
*/
public static void shutDown() {
shutDown(executorService);
}
/**
* 关闭线程池
*/
public static void shutDown(ExecutorService executorService) {
if (executorService != null) {
executorService.shutdown();
} else {
COMMON_POOL.shutdown();
}
}
public static String getThreadCount() {
return "activeCount=" + COMMON_POOL.getActiveCount() +
" completedCount " + COMMON_POOL.getCompletedTaskCount() +
" largestCount " + COMMON_POOL.getLargestPoolSize();
}
}
package com.boot.bus.sqlsync.async.executor.timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 用于解决高并发下System.currentTimeMillis卡顿
* @author lry
*/
public class SystemClock {
private final int period;
private final AtomicLong now;
private static class InstanceHolder {
private static final SystemClock INSTANCE = new SystemClock(1);
}
private SystemClock(int period) {
this.period = period;
this.now = new AtomicLong(System.currentTimeMillis());
scheduleClockUpdating();
}
private static SystemClock instance() {
return InstanceHolder.INSTANCE;
}
private void scheduleClockUpdating() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "System Clock");
thread.setDaemon(true);
return thread;
});
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}
private long currentTimeMillis() {
return now.get();
}
/**
* 用来替换原来的System.currentTimeMillis()
*/
public static long now() {
return instance().currentTimeMillis();
}
}
\ No newline at end of file
package com.boot.bus.sqlsync.async.worker;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
/**
* 对依赖的wrapper的封装
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public class DependWrapper {
private WorkerWrapper<?, ?> dependWrapper;
/**
* 是否该依赖必须完成后才能执行自己.<p>
* 因为存在一个任务,依赖于多个任务,是让这多个任务全部完成后才执行自己,还是某几个执行完毕就可以执行自己
* 如
* 1
* ---3
* 2
* 或
* 1---3
* 2---3
* 这两种就不一样,上面的就是必须12都完毕,才能3
* 下面的就是1完毕就可以3
*/
private boolean must = true;
public DependWrapper(WorkerWrapper<?, ?> dependWrapper, boolean must) {
this.dependWrapper = dependWrapper;
this.must = must;
}
public DependWrapper() {
}
public WorkerWrapper<?, ?> getDependWrapper() {
return dependWrapper;
}
public void setDependWrapper(WorkerWrapper<?, ?> dependWrapper) {
this.dependWrapper = dependWrapper;
}
public boolean isMust() {
return must;
}
public void setMust(boolean must) {
this.must = must;
}
@Override
public String toString() {
return "DependWrapper{" +
"dependWrapper=" + dependWrapper +
", must=" + must +
'}';
}
}
package com.boot.bus.sqlsync.async.worker;
/**
* 结果状态
* @author wuweifeng wrote on 2019-11-19.
*/
public enum ResultState {
SUCCESS,
TIMEOUT,
EXCEPTION,
DEFAULT //默认状态
}
package com.boot.bus.sqlsync.async.worker;
/**
* 执行结果
*/
public class WorkResult<V> {
/**
* 执行的结果
*/
private V result;
/**
* 结果状态
*/
private ResultState resultState;
private Exception ex;
public WorkResult(V result, ResultState resultState) {
this(result, resultState, null);
}
public WorkResult(V result, ResultState resultState, Exception ex) {
this.result = result;
this.resultState = resultState;
this.ex = ex;
}
public static <V> WorkResult<V> defaultResult() {
return new WorkResult<>(null, ResultState.DEFAULT);
}
@Override
public String toString() {
return "WorkResult{" +
"result=" + result +
", resultState=" + resultState +
", ex=" + ex +
'}';
}
public Exception getEx() {
return ex;
}
public void setEx(Exception ex) {
this.ex = ex;
}
public V getResult() {
return result;
}
public void setResult(V result) {
this.result = result;
}
public ResultState getResultState() {
return resultState;
}
public void setResultState(ResultState resultState) {
this.resultState = resultState;
}
}
package com.boot.bus.sqlsync.async.wrapper;
import com.boot.bus.sqlsync.async.callback.DefaultCallback;
import com.boot.bus.sqlsync.async.callback.ICallback;
import com.boot.bus.sqlsync.async.callback.IWorker;
import com.boot.bus.sqlsync.async.exception.SkippedException;
import com.boot.bus.sqlsync.async.executor.timer.SystemClock;
import com.boot.bus.sqlsync.async.worker.DependWrapper;
import com.boot.bus.sqlsync.async.worker.ResultState;
import com.boot.bus.sqlsync.async.worker.WorkResult;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 对每个worker及callback进行包装,一对一
*
* @author wuweifeng wrote on 2019-11-19.
*/
public class WorkerWrapper<T, V> {
/**
* 该wrapper的唯一标识
*/
private String id;
/**
* worker将来要处理的param
*/
private T param;
private IWorker<T, V> worker;
private ICallback<T, V> callback;
/**
* 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程</p>
* -------2
* 1
* -------3
* 如1后面有2、3
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己
* 通过must字段来控制是否依赖项必须完成
* 1
* -------3
* 2
* 1、2执行完毕后才能执行3
*/
private List<DependWrapper> dependWrappers;
/**
* 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
* 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
* <p>
* 1-finish, 2-error, 3-working
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 该map存放所有wrapper的id和wrapper映射
*/
private Map<String, WorkerWrapper> forParamUseWrappers;
/**
* 也是个钩子变量,用来存临时的结果
*/
private volatile WorkResult<V> workResult = WorkResult.defaultResult();
/**
* 是否在执行自己前,去校验nextWrapper的执行结果<p>
* 1 4
* -------3
* 2
* 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
* 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
*/
private volatile boolean needCheckNextWrapperResult = true;
private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
private WorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
}
this.worker = worker;
this.param = param;
this.id = id;
//允许不设置回调
if (callback == null) {
callback = new DefaultCallback<>();
}
this.callback = callback;
}
/**
* 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的
*/
private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
this.forParamUseWrappers = forParamUseWrappers;
//将自己放到所有wrapper的集合里去
forParamUseWrappers.put(id, this);
long now = SystemClock.now();
//总的已经超时了,就快速失败,进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果自己已经执行过了。
//可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(executorService, now, remainTime);
return;
}
//如果在执行前需要校验nextWrapper的状态
if (needCheckNextWrapperResult) {
//如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
if (!checkNextWrapperResult()) {
fastFail(INIT, new SkippedException());
beginNext(executorService, now, remainTime);
return;
}
}
//如果没有任何依赖,说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
beginNext(executorService, now, remainTime);
return;
}
/*如果有前方依赖,存在两种情况
一种是前面只有一个wrapper。即 A -> B
一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
//只有一个依赖
if (dependWrappers.size() == 1) {
doDependsOneJob(fromWrapper);
beginNext(executorService, now, remainTime);
} else {
//有多个依赖时
doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
}
}
public void work(ExecutorService executorService, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
work(executorService, null, remainTime, forParamUseWrappers);
}
/**
* 总控制台超时,停止所有任务
*/
public void stopNow() {
if (getState() == INIT || getState() == WORKING) {
fastFail(getState(), null);
}
}
/**
* 判断自己下游链路上,是否存在已经出结果的或已经开始执行的
* 如果没有返回true,如果有返回false
*/
private boolean checkNextWrapperResult() {
//如果自己就是最后一个,或者后面有并行的多个,就返回true
if (nextWrappers == null || nextWrappers.size() != 1) {
return getState() == INIT;
}
WorkerWrapper nextWrapper = nextWrappers.get(0);
boolean state = nextWrapper.getState() == INIT;
//继续校验自己的next的状态
return state && nextWrapper.checkNextWrapperResult();
}
/**
* 进行下一个任务
*/
private void beginNext(ExecutorService executorService, long now, long remainTime) {
//花费的时间
long costTime = SystemClock.now() - now;
if (nextWrappers == null) {
return;
}
if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
return;
}
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private void doDependsOneJob(WorkerWrapper dependWrapper) {
if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultResult();
fastFail(INIT, null);
} else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
fastFail(INIT, null);
} else {
//前面任务正常完毕了,该自己了
fire();
}
}
private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
boolean nowDependIsMust = false;
//创建必须完成的上游wrapper集合
Set<DependWrapper> mustWrapper = new HashSet<>();
for (DependWrapper dependWrapper : dependWrappers) {
if (dependWrapper.isMust()) {
mustWrapper.add(dependWrapper);
}
if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
nowDependIsMust = dependWrapper.isMust();
}
}
//如果全部是不必须的条件,那么只要到了这里,就执行自己。
if (mustWrapper.size() == 0) {
if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
fastFail(INIT, null);
} else {
fire();
}
beginNext(executorService, now, remainTime);
return;
}
//如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干
if (!nowDependIsMust) {
return;
}
//如果fromWrapper是必须的
boolean existNoFinish = false;
boolean hasError = false;
//先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
for (DependWrapper dependWrapper : mustWrapper) {
WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
WorkResult tempWorkResult = workerWrapper.getWorkResult();
//为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
existNoFinish = true;
break;
}
if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
workResult = defaultResult();
hasError = true;
break;
}
if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
hasError = true;
break;
}
}
//只要有失败的
if (hasError) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
//都finish的话
if (!existNoFinish) {
//上游都finish了,进行自己
fire();
beginNext(executorService, now, remainTime);
return;
}
}
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
private void fire() {
//阻塞取结果
workResult = workerDoJob();
}
/**
* 快速失败
*/
private boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
return false;
}
//尚未处理过结果
if (checkIsNullResult()) {
if (e == null) {
workResult = defaultResult();
} else {
workResult = defaultExResult(e);
}
}
callback.result(false, param, workResult);
return true;
}
/**
* 具体的单个worker执行任务
*/
private WorkResult<V> workerDoJob() {
//避免重复执行
if (!checkIsNullResult()) {
return workResult;
}
try {
//如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
return workResult;
}
callback.begin();
//执行耗时操作
V resultValue = worker.action(param, forParamUseWrappers);
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return workResult;
}
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//回调成功
callback.result(true, param, workResult);
return workResult;
} catch (Exception e) {
//避免重复回调
if (!checkIsNullResult()) {
return workResult;
}
fastFail(WORKING, e);
return workResult;
}
}
public WorkResult<V> getWorkResult() {
return workResult;
}
public List<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
public void setParam(T param) {
this.param = param;
}
private boolean checkIsNullResult() {
return ResultState.DEFAULT == workResult.getResultState();
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
addDepend(new DependWrapper(workerWrapper, must));
}
private void addDepend(DependWrapper dependWrapper) {
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
//如果依赖的是重复的同一个,就不重复添加了
for (DependWrapper wrapper : dependWrappers) {
if (wrapper.equals(dependWrapper)) {
return;
}
}
dependWrappers.add(dependWrapper);
}
private void addNext(WorkerWrapper<?, ?> workerWrapper) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
//避免添加重复
for (WorkerWrapper wrapper : nextWrappers) {
if (workerWrapper.equals(wrapper)) {
return;
}
}
nextWrappers.add(workerWrapper);
}
private void addNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
if (wrappers == null) {
return;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
addNext(wrapper);
}
}
private void addDependWrappers(List<DependWrapper> dependWrappers) {
if (dependWrappers == null) {
return;
}
for (DependWrapper wrapper : dependWrappers) {
addDepend(wrapper);
}
}
private WorkResult<V> defaultResult() {
workResult.setResultState(ResultState.TIMEOUT);
workResult.setResult(worker.defaultValue());
return workResult;
}
private WorkResult<V> defaultExResult(Exception ex) {
workResult.setResultState(ResultState.EXCEPTION);
workResult.setResult(worker.defaultValue());
workResult.setEx(ex);
return workResult;
}
private int getState() {
return state.get();
}
public String getId() {
return id;
}
private boolean compareAndSetState(int expect, int update) {
return this.state.compareAndSet(expect, update);
}
private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerWrapper<?, ?> that = (WorkerWrapper<?, ?>) o;
return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
Objects.equals(param, that.param) &&
Objects.equals(worker, that.worker) &&
Objects.equals(callback, that.callback) &&
Objects.equals(nextWrappers, that.nextWrappers) &&
Objects.equals(dependWrappers, that.dependWrappers) &&
Objects.equals(state, that.state) &&
Objects.equals(workResult, that.workResult);
}
@Override
public int hashCode() {
return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
}
public static class Builder<W, C> {
/**
* 该wrapper的唯一标识
*/
private String id = UUID.randomUUID().toString();
/**
* worker将来要处理的param
*/
private W param;
private IWorker<W, C> worker;
private ICallback<W, C> callback;
/**
* 自己后面的所有
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 自己依赖的所有
*/
private List<DependWrapper> dependWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
private boolean needCheckNextWrapperResult = true;
public Builder<W, C> worker(IWorker<W, C> worker) {
this.worker = worker;
return this;
}
public Builder<W, C> param(W w) {
this.param = w;
return this;
}
public Builder<W, C> id(String id) {
if (id != null) {
this.id = id;
}
return this;
}
public Builder<W, C> needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
return this;
}
public Builder<W, C> callback(ICallback<W, C> callback) {
this.callback = callback;
return this;
}
public Builder<W, C> depend(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
depend(wrapper);
}
return this;
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper) {
return depend(wrapper, true);
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
if (wrapper == null) {
return this;
}
DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
dependWrappers.add(dependWrapper);
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
return next(wrapper, true);
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
nextWrappers.add(wrapper);
//强依赖自己
if (selfIsMust) {
if (selfIsMustSet == null) {
selfIsMustSet = new HashSet<>();
}
selfIsMustSet.add(wrapper);
}
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
next(wrapper);
}
return this;
}
public WorkerWrapper<W, C> build() {
WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(id, worker, param, callback);
wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
if (dependWrappers != null) {
for (DependWrapper workerWrapper : dependWrappers) {
workerWrapper.getDependWrapper().addNext(wrapper);
wrapper.addDepend(workerWrapper);
}
}
if (nextWrappers != null) {
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
boolean must = false;
if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
must = true;
}
workerWrapper.addDepend(wrapper, must);
wrapper.addNext(workerWrapper);
}
}
return wrapper;
}
}
}
......@@ -17,5 +17,10 @@
<artifactId>amos-boot-module-sqlsync-api</artifactId>
<version>${amos-boot-bus.version}</version>
</dependency>
<dependency>
<groupId>com.yeejoin.amos</groupId>
<artifactId>YeeAmosFireAutoSysCommon</artifactId>
<version>${business.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.boot.bus.sqlsync.async.parallel;
import com.boot.bus.sqlsync.async.callback.ICallback;
import com.boot.bus.sqlsync.async.callback.IWorker;
import com.boot.bus.sqlsync.async.executor.timer.SystemClock;
import com.boot.bus.sqlsync.async.worker.WorkResult;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import com.boot.bus.sqlsync.service.impl.SyncMqttMessageService;
import com.yeejoin.amos.fas.datasync.DataSyncMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
@Component
public class ParWorker implements IWorker<DataSyncMessage, String>, ICallback<DataSyncMessage, String> {
// @Autowired
private static SyncMqttMessageService mqttMessageService;
@Autowired
public void setDatastore(SyncMqttMessageService mqttMessageService) {
ParWorker.mqttMessageService = mqttMessageService;
}
@Override
public String action(DataSyncMessage object, Map<String, WorkerWrapper> allWrappers) {
try {
mqttMessageService.syncData(object);
} catch (Exception e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, DataSyncMessage param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" + Thread.currentThread().getName());
}
}
// @Override
// public void result(boolean success, String param, WorkResult<String> workResult) {
// if (success) {
// System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
// + "-threadName:" + Thread.currentThread().getName());
// } else {
// System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
// + "-threadName:" + Thread.currentThread().getName());
// }
// }
}
package com.boot.bus.sqlsync.emqx;
import org.springframework.stereotype.Component;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.emqtt
* @ClassName: EmqttPredicate
* @Author: Jianqiang Gao
* @Description: EmqttPredicate
* @Date: 2021/3/23 15:57
* @Version: 1.0
*/
@Component
public class EmqttPredicate {
public Boolean test(MqttEvent event) {
//测试内容
return Boolean.FALSE;
}
}
\ No newline at end of file
package com.boot.bus.sqlsync.emqx;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* @author keyong
* @title: EquipmentIotProduceConfig
* <pre>
* @description: MQTT订阅模式生产类
* </pre>
* @date 2020/10/30 14:13
*/
@Configuration
@IntegrationComponentScan
public class MqttConfiguration {
@Value("${spring.mqtt.username}")
private String userName;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.host-url}")
private String hostUrl;
@Value("${spring.mqtt.client-id}")
private String clientId;
@Value("${spring.mqtt.default-topic}")
private String defaultTopic;
@Bean
public MqttConnectOptions getMqttConnectOption(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(userName);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setConnectionTimeout(0);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOption());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_outbound", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
package com.boot.bus.sqlsync.emqx;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.emqtt
* @ClassName: MqttEvent
* @Author: Jianqiang Gao
* @Description: MqttEvent
* @Date: 2021/3/23 15:56
* @Version: 1.0
*/
@Getter
public class MqttEvent extends ApplicationEvent {
/**
*
*/
private String topic;
/**
* 发送的消息
*/
private String message;
public MqttEvent(Object source, String topic, String message) {
super(source);
this.topic = topic;
this.message = message;
}
}
\ No newline at end of file
package com.boot.bus.sqlsync.emqx;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.dto
* @ClassName: MqttProperties
* @Author: Jianqiang Gao
* @Description: MqttProperties
* @Date: 2021/3/23 15:54
* @Version: 1.0
*/
@ConfigurationProperties("spring.mqtt")
@Component
@Data
public class MqttProperties {
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
private String completionTimeout;
private Integer keepAlive;
}
\ No newline at end of file
package com.boot.bus.sqlsync.emqx;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.common.emqtt
* @ClassName: MqttServer
* @Author: Jianqiang Gao
* @Description: MqttServer
* @Date: 2021/3/23 15:58
* @Version: 1.0
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttServer {
/**
* 通道发送消息
* @param topic 主题
* @param data 数据
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}
\ No newline at end of file
package com.boot.bus.sqlsync.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.boot.bus.sqlsync.async.executor.Async;
import com.boot.bus.sqlsync.async.parallel.ParWorker;
import com.boot.bus.sqlsync.async.wrapper.WorkerWrapper;
import com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.common.enums.DataSyncTypeEnum;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.datasync.DataSyncMessage;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* <h1></h1>
*
* @Author SingleTian
* @Date 2021-04-02 09:22
*/
@Slf4j
@Component
public class SyncMqttMessageListener implements IMqttMessageListener {
final Deque<String> redisKeysQueue1 = new LinkedBlockingDeque<>();
final Deque<String> redisKeysQueue2 = new LinkedBlockingDeque<>();
final AtomicBoolean flag = new AtomicBoolean(true);
@Autowired
private IContingencyPlanInstance contingencyPlanInstance;
@Autowired
private RedisTemplate redisTemplate;
private Queue<String> getUsedQueue() {
return flag.get() ? redisKeysQueue1 : redisKeysQueue2;
}
private Queue<String> getUnusedQueue() {
return flag.get() ? redisKeysQueue2 : redisKeysQueue1;
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
DataSyncMessage message = null;
try {
message = DataSyncMessage.bytes2Message(mqttMessage.getPayload());
System.out.println(JSON.toJSONString(message));
log.info("topic:{},message:{}", topic, message);
ParWorker w = new ParWorker();
WorkerWrapper<DataSyncMessage, String> workerWrapper = new WorkerWrapper.Builder<DataSyncMessage, String>()
.worker(w)
.callback(w)
.param(message)
.build();
Async.beginWork(200, workerWrapper);
// syncData(message);
} catch (Exception e) {
log.error("同步mqtt推送数据出错:topic:{},message:{},错误信息:{}", topic, message, e.toString());
} finally {
assert message != null : "";
// 待删除的redis的key值放入队列
getUsedQueue().offer(message.redisKey());
}
}
/**
* 尝试删除redis中mqtt已经发送成功的同步数据
* 每10秒轮循一次
*/
@Scheduled(fixedRate = 10 * 1000)
synchronized private void cleanRedisKeysFinished() {
Queue<String> usedQueue = getUsedQueue();
Queue<String> unusedQueue = getUnusedQueue();
flag.set(!flag.get());
while (!usedQueue.isEmpty()) {
String key = usedQueue.poll();
if (!redisTemplate.delete(key)) {
unusedQueue.offer(key);
}
}
}
/**
* 处理redis中存在但mqtt未接收到的同步数据
* 每小时执行一次,整点触发
*/
@Scheduled(cron = "0 0 * * * ?")
synchronized private void syncByRedis() {
Set<String> keys = new HashSet<>();
for (DataSyncTypeEnum typeEnum : DataSyncTypeEnum.values()) {
keys.addAll(redisTemplate.keys(typeEnum.toString()));
}
if (keys.isEmpty()) {
return;
}
// 为减少redis
new Timer().schedule(new TimerTask() {
@Override
public void run() {
Queue<String> usedQueue = new LinkedBlockingDeque<>(getUsedQueue());
keys.forEach(key -> {
if (!usedQueue.contains(key)) {
String messageStr = String.valueOf(redisTemplate.opsForValue().get(key));
DataSyncMessage message = null;
try {
message = JSON.parseObject(messageStr, DataSyncMessage.class);
syncData(message);
} catch (Exception e) {
log.error("同步redis推送数据出错:key:{},message:{},错误信息:{}", key, message, e.getMessage());
} finally {
getUsedQueue().offer(message.redisKey());
}
}
});
}
}, 1000 * 30);
}
/**
* @param message
*/
private void syncData(DataSyncMessage message) {
DataSyncTypeEnum type = message.getType();
DataSyncOperationEnum operation = message.getOperation();
List<Serializable> data = message.getData();
assert message.getType() != null && message.getOperation() != null : "同步消息体信息不足";
if (data == null || data.isEmpty()) {
return;
}
switch (type) {
case CONTINGENCY_PLAN_INSTANCE: {
switch (operation) {
case DELETE: {
contingencyPlanInstance.astDeleteByIds(data.stream().map(x -> ((JSONObject) x).toJavaObject(ContingencyPlanInstance.class).getId()).collect(Collectors.toList()));
break;
}
default: {
contingencyPlanInstance.astSaveOrUpdateBatch(data.stream().map(x -> ((JSONObject) x).toJavaObject(ContingencyPlanInstance.class)).collect(Collectors.toList()));
}
}
break;
}
default:
}
}
}
package com.boot.bus.sqlsync.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import org.apache.ibatis.annotations.Mapper;
/**
* @ProjectName: amos-boot-bus
* @Package: com.boot.bus.sqlsync.mapper
* @ClassName: ContingencyPlanInstanceMapper
* @Author: Jianqiang Gao
* @Description: ContingencyPlanInstanceMapper
* @Date: 2022/6/9 18:00
* @Version: 1.0
*/
@Mapper
public interface ContingencyPlanInstanceMapper extends BaseMapper<ContingencyPlanInstance> {
}
package com.boot.bus.sqlsync.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.boot.bus.sqlsync.mapper.ContingencyPlanInstanceMapper;
import com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @ProjectName: amos-boot-bus
* @Package: com.boot.bus.sqlsync.service.impl
* @ClassName: ContingencyInstanceImpl
* @Author: Jianqiang Gao
* @Description: ContingencyInstanceImpl
* @Date: 2022/6/9 17:25
* @Version: 1.0
*/
@Service
public class ContingencyInstanceImpl extends ServiceImpl<ContingencyPlanInstanceMapper, ContingencyPlanInstance> implements IContingencyPlanInstance {
@Override
public void astDeleteByIds(List<String> ids) {
if (!ids.isEmpty()) {
baseMapper.deleteBatchIds(ids);
}
}
@Override
public void astSaveOrUpdateBatch(List<ContingencyPlanInstance> list) {
if (!list.isEmpty()) {
saveOrUpdateBatch(list);
}
}
}
\ No newline at end of file
package com.boot.bus.sqlsync.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.boot.bus.sqlsync.service.infc.IContingencyPlanInstance;
import com.yeejoin.amos.fas.common.enums.DataSyncOperationEnum;
import com.yeejoin.amos.fas.common.enums.DataSyncTypeEnum;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import com.yeejoin.amos.fas.datasync.DataSyncMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ProjectName: YeeFireDataProcessRoot
* @Package: com.yeejoin.dataprocess.service.impl
* @ClassName: SyncMqttMessageService
* @Author: Jianqiang Gao
* @Description: 同步mqtt消息
* @Date: 2021/7/8 16:54
* @Version: 1.0
*/
@Service("SyncMqttMessageService")
public class SyncMqttMessageService {
@Autowired
private IContingencyPlanInstance contingencyPlanInstance;
@Autowired
private RedisTemplate redisTemplate;
/**
* @param message
*/
public void syncData(DataSyncMessage message) {
DataSyncTypeEnum type = message.getType();
DataSyncOperationEnum operation = message.getOperation();
List<Serializable> data = message.getData();
assert message.getType() != null && message.getOperation() != null : "同步消息体信息不足";
if (data == null || data.isEmpty()) {
return;
}
switch (type) {
case CONTINGENCY_PLAN_INSTANCE: {
switch (operation) {
case DELETE: {
contingencyPlanInstance.astDeleteByIds(data.stream().map(x -> ((JSONObject) x).toJavaObject(ContingencyPlanInstance.class).getId()).collect(Collectors.toList()));
break;
}
default: {
contingencyPlanInstance.astSaveOrUpdateBatch(data.stream().map(x -> ((JSONObject) x).toJavaObject(ContingencyPlanInstance.class)).collect(Collectors.toList()));
}
}
break;
}
default:
// Async.shutDown();
}
}
}
\ No newline at end of file
package com.boot.bus.sqlsync.service.infc;
import com.yeejoin.amos.fas.dao.entity.ContingencyPlanInstance;
import java.util.List;
/**
* @ProjectName: amos-boot-bus
* @Package: com.boot.bus.sqlsync.service.infc
* @ClassName: IContingencyPlanInstance
* @Author: Jianqiang Gao
* @Description: IContingencyPlanInstance
* @Date: 2022/6/9 17:24
* @Version: 1.0
*/
public interface IContingencyPlanInstance {
void astDeleteByIds(List<String> collect);
void astSaveOrUpdateBatch(List<ContingencyPlanInstance> collect);
}
......@@ -4,8 +4,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.net.InetAddress;
import java.net.UnknownHostException;
......@@ -20,6 +25,11 @@ import java.net.UnknownHostException;
* @Version: 1.0
*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
@EnableEurekaClient
//@ComponentScan(value = {"com.boot.bus.sqlsync", "com.boot.bus.sqlsync.emqx", "com.boot.bus.sqlsync.listener"})
@ComponentScan(value = {"com.boot.bus.sqlsync", "com.boot.bus.sqlsync.emqx", "com.boot.bus.sqlsync.listener"})
public class AmosSqlSyncApplication {
private static final Logger logger = LoggerFactory.getLogger(AmosSqlSyncApplication.class);
......
......@@ -7,14 +7,23 @@ eureka.client.serviceUrl.defaultZone=http://172.16.11.201:10001/eureka/
## redis properties:
spring.redis.database=1
spring.redis.host=172.16.3.18
spring.redis.host=172.16.11.201
spring.redis.port=6379
spring.redis.password=yeejoin@2020
spring.redis.password=1234560
spring.redis.timeout=0
## emqx properties:
emqx.clean-session=true
emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
emqx.broker=tcp://172.16.11.201:1883
emqx.user-name=admin
emqx.password=public
\ No newline at end of file
## emqx properties
#emqx.clean-session=true
#emqx.client-id=${spring.application.name}-${random.int[1024,65536]}
#emqx.broker=tcp://172.16.11.201:1883
#emqx.user-name=admin
#emqx.password=public
#emqx.default-topic=mqtt_topic
spring.mqtt.client-id=${spring.application.name}-${random.int[1024,65536]}
spring.mqtt.default-topic=mqtt_topic
spring.mqtt.host-url=tcp://172.16.11.201:1883
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.completionTimeout=3000
spring.mqtt.keepAlive=60
\ No newline at end of file
......@@ -26,7 +26,7 @@ spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
##liquibase
spring.liquibase.change-log = classpath:/db/changelog/changelog-master.xml
spring.liquibase.change-log = classpath:/changelog/changelog-master.xml
spring.liquibase.enabled= true
## eureka properties:
......
<?xml version="1.0" encoding="utf-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">
<include file="sql-sync-1.0.0.xml" relativeToChangelogFile="true"/>
<!--全量包含视图和函数-->
<include file="sql-task-all.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">
</databaseChangeLog>
\ No newline at end of file
<?xml version="1.0" encoding="utf-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">
</databaseChangeLog>
\ No newline at end of file
......@@ -23,8 +23,9 @@
<maven.compiler.target>1.8</maven.compiler.target>
<springboot.version>2.3.11.RELEASE</springboot.version>
<springcloud.version>Hoxton.SR8</springcloud.version>
<amos.version>1.7.8</amos.version>
<fastjson.version>1.2.83</fastjson.version>
<amos.version>1.7.8</amos.version>
<business.version>3.0.1.7</business.version>
</properties>
<dependencies>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment