Commit 1b33f698 authored by xukaiqiang's avatar xukaiqiang

Update IEC104Client.java

parent b5d3ebfb
package com.yeejoin.amos.iec104.tcp.client;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.springframework.util.ObjectUtils;
import com.yeejoin.amos.iec104.exception.YeeException;
import com.yeejoin.amos.iec104.tcp.IEC104Decoder;
import com.yeejoin.amos.iec104.tcp.IEC104Encoder;
import com.yeejoin.amos.iec104.tcp.utils.ChangeUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import org.apache.log4j.Logger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class IEC104Client {
private final String host;
private final int port;
private Channel channel;
private Bootstrap b = null;
private String commonAddress;
private int receiveSeqNum = 0; //接收序号
private int sendSeqNum = 0; // 发送序号,每发送一个后需+1
private boolean isConnected = false;
private boolean isSendTest = false;
private ReentrantLock lock = new ReentrantLock(true);
private ScheduledExecutorService service_test = null;
private ScheduledExecutorService service_connect = null;
private Semaphore semaphore = new Semaphore(1);
private String clientId;
private boolean isClose = false;
ChannelFuture future = null;
public ChannelFutureListener channelFutureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (isClose) {
return;
}
future.addListener(this);
if (future.isSuccess() && future.channel().isActive()) {
isConnected = true;
if (service_connect != null) {
service_connect.shutdownNow();
service_connect = null;
}
Logger.getLogger(this.getClass()).debug("连接服务器成功");
} else {
isConnected = false;
Logger.getLogger(this.getClass()).debug("连接服务器失败, 重新连接");
private final String host;
private final int port;
private Channel channel;
private Bootstrap b = null;
private String commonAddress;
private int receiveSeqNum = 0; //接收序号
private int sendSeqNum = 0; // 发送序号,每发送一个后需+1
private boolean isConnected = false;
private boolean isSendTest = false;
private ReentrantLock lock = new ReentrantLock(true);
private ScheduledExecutorService service_test = null;
private ScheduledExecutorService service_connect = null;
private Semaphore semaphore = new Semaphore(1);
private String clientId;
private boolean isClose = false;
ChannelFuture future = null;
public ChannelFutureListener channelFutureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (isClose) {
return;
}
if (future.isSuccess() && future.channel().isActive()) {
isConnected = true;
if (service_connect != null) {
service_connect.shutdownNow();
service_connect = null;
}
Logger.getLogger(this.getClass()).debug("连接服务器成功");
} else {
isConnected = false;
Logger.getLogger(this.getClass()).debug("连接服务器失败, 重新连接");
// future.cause().printStackTrace();
connect2Clinet(clientId);
}
}
};
connect2Clinet(clientId);
}
}
};
// 连接服务端的端口号地址和端口号
public IEC104Client(String host, int port) {
this.host = host;
this.port = port;
}
// 连接服务端的端口号地址和端口号
public IEC104Client(String host, int port) {
this.host = host;
this.port = port;
}
private void connect2Clinet(String serviceId) {
final EventLoopGroup group = new NioEventLoopGroup();
IEC104Client client = this;
if (service_connect != null) {
service_connect.shutdownNow();
service_connect = null;
}
Runnable connect_runnable = new Runnable() {
private void connect2Clinet(String serviceId) {
final EventLoopGroup group = new NioEventLoopGroup();
IEC104Client client = this;
if (service_connect != null) {
service_connect.shutdownNow();
service_connect = null;
}
Runnable connect_runnable = new Runnable() {
public void run() {
try {
b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
@Override
public void initChannel(SocketChannel ch) throws Exception {
Logger.getLogger(this.getClass()).debug("正在连接中...");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(30, 30, 60, TimeUnit.SECONDS));
pipeline.addLast(new IEC104Decoder(serviceId)); // 编码
pipeline.addLast(new IEC104Encoder(serviceId)); // 解码
pipeline.addLast(new IEC104NewHandler(client)); // 业务处理类
}
});
// 发起异步连接请求,绑定连接端口和host信息
future = b.connect(host, port).sync();
future.addListener(channelFutureListener);
channel = future.channel();
channel.closeFuture().sync();
try {
b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
@Override
public void initChannel(SocketChannel ch) throws Exception {
Logger.getLogger(this.getClass()).debug("正在连接中...");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(30, 30, 60, TimeUnit.SECONDS));
pipeline.addLast(new IEC104Decoder(serviceId)); // 编码
pipeline.addLast(new IEC104Encoder(serviceId)); // 解码
pipeline.addLast(new IEC104NewHandler(client)); // 业务处理类
}
});
// 发起异步连接请求,绑定连接端口和host信息
future = b.connect(host, port).sync();
future.addListener(channelFutureListener);
channel = future.channel();
channel.closeFuture().sync();
} catch (Exception e) {
Logger.getLogger(this.getClass()).error("connect to " + host + ":" + port + "error: " + e.getMessage());
Logger.getLogger(this.getClass()).error("connect to " + host + ":" + port + "error: " + e.getMessage());
}
}
};
service_connect = Executors.newSingleThreadScheduledExecutor();
// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
service_connect.scheduleAtFixedRate(connect_runnable, 10, 10, TimeUnit.SECONDS);
}
public void closeChannel() {
future.addListener(channelFutureListener);
channel.close();
}
public void init(String serviceId) throws Exception {
connect2Clinet(serviceId);
}
}
public void closeChannel() {
future.addListener(channelFutureListener);
channel.close();
}
public Channel getChannel() {
return channel;
}
public void init(String serviceId) throws Exception {
connect2Clinet(serviceId);
}
public Channel getChannel() {
return channel;
}
// public void start() {
// if (ObjectUtils.isEmpty(channel)) {
// throw new YeeException("客户端初始化失败");
// }
// }
// public void sendMessage(String strBytes) {
// ChannelPromise promise = this.channel.newPromise();
// this.channel.writeAndFlush(strBytes, promise);
// }
public boolean isConnected() {
return isConnected;
}
public boolean isConnected() {
return isConnected;
}
// public String sendACKMessage() {
// String recStr = null;
// try {
......@@ -174,14 +167,14 @@ public class IEC104Client {
// }
// return recStr;
// }
// public boolean sendSOEMessage(String asduBytes) {
// return sendIMessage(asduBytes);
// }
/**
* 启动循环发送测试报文
*/
/**
* 启动循环发送测试报文
*/
// public void sendTestMessage() {
// synchronized(this) {
// if (service_test != null) {
......@@ -203,10 +196,10 @@ public class IEC104Client {
// service_test.scheduleAtFixedRate(test_runnable, 0, 10, TimeUnit.SECONDS);
// }
// }
/**
* 结束循环发送测试报文
*/
/**
* 结束循环发送测试报文
*/
// public void relaseTest() {
// this.isSendTest = false;
// if (service_test != null) {
......@@ -215,12 +208,13 @@ public class IEC104Client {
// semaphore.release();
// }
// }
/**
* 发送I帧报文
* @param asduBytes
* @return
*/
/**
* 发送I帧报文
*
* @param
* @return
*/
// public boolean sendIMessage(String asduBytes) {
//
// StringBuffer sBuffer = new StringBuffer("68");
......@@ -265,7 +259,7 @@ public class IEC104Client {
// }
// return false;
// }
// public void relase() {
// if (lock.isLocked()) {
// lock.unlock();
......@@ -292,16 +286,15 @@ public class IEC104Client {
// public boolean isSendTest() {
// return isSendTest;
// }
public String getCommonAddress() {
return commonAddress;
}
public String getCommonAddress() {
return commonAddress;
}
public void setCommonAddress(String commonAddress) {
this.commonAddress = commonAddress;
}
public void setCommonAddress(String commonAddress) {
this.commonAddress = commonAddress;
}
public void close() {
public void close() {
// isClose = true;
// this.channel.close();
// if (lock.isLocked()) {
......@@ -312,15 +305,15 @@ public class IEC104Client {
// service_connect.shutdownNow();
// service_connect = null;
// }
}
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment