Netty简单介绍
Netty是一个NIO客户端服务器框架,可以快速简单地开发协议服务器和客户端等网络应用程序。它极大地简化和简化了TCP和UDP套接字服务器等网络编程。
“快速而简单”并不意味着由此产生的应用程序将受到可维护性或性能问题的困扰。Netty的设计经验非常丰富,包括FTP,SMTP,HTTP以及各种基于二进制和文本的传统协议。因此,Netty已经成功地找到了一个方法来实现轻松的开发,性能,稳定性和灵活性,而不用妥协。
Netty包下载以及使用
下载地址 : Netty包下载
Downloads
Netty is distributed under Apache License, v2.0. Please see the enclosed NOTICE.txt for more information.
netty-4.1.20.Final.tar.bz2 ‐ 22-Jan-2018 (Stable, Recommended)
netty-4.0.55.Final.tar.bz2 ‐ 22-Jan-2018 (Stable)
netty-3.10.6.Final-dist.tar.bz2 ‐ 29-Jun-2016 (Stable)
Changelogs and road map are available in our issue tracker.
进去后点击netty-4.1.20.Final.tar.bz2或者netty-4.0.55.Final.tar.bz2 或者netty-3.10.6.Final-dist.tar.bz2 即可下载相应的包
下载后目录为:
├── jar jar包文件夹
| └── ........jar 所有的分开的jar包
| └── all-in-one 所有jar包集合起来的一个jar包
├── javadoc 帮助文档
├── license 开源说明
├── COMTRIBUTING.md
├── LICENSE.txt
├── NOTICE.txt
└── README.txt
下载后只需要jar文件夹里的jar包,取自己需要的.下面是jar文件作用:
netty-common 实用程序类和日志记录外观
netty-buffer 替换java.nio.ByteBuffer的ByteBuf API
netty-transport 通道API和核心传输
netty-transport-rxtx Rxtx传输
netty-transport-sctp SCTP传输
netty-transport-udt UDT传输
netty-handler 有用的ChannelHandler实现
netty-codec 编解码器框架,帮助编写一个编码器和一个解码器
netty-codec-http 与HTTP,Web Sockets,SPDY和RTSP相关的编解码器
netty-codec-socks 与SOCKS协议相关的编解码器
简单使用
- 首先写一个Netty客户端
/**
* @author Town
* @created at 2017/11/4 10:39
* @Last Modified by: Town
* @Last Modified time: 2017/11/4 10:39
* @Remarks netty客户端
*/
public class MyNettyClitent {
private static MyNettyClitent nettyClient = new MyNettyClitent();
private EventLoopGroup group;
private NettyListener listener;
private Channel channel;
private boolean isConnect = false;
private int reconnectNum = Integer.MAX_VALUE;
private long reconnectIntervalTime = 5000;
public static MyNettyClitent getInstance() {
return nettyClient;
}
/**
* 连接服务器
*/
public synchronized MyNettyClitent connect() {
if (!isConnect) {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap().group(group)
.option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class)
.handler(new NettyClientInitializer(listener));
try {
bootstrap.connect(AppUrl.SOCKET_SERVER, AppUrl.SOCKET_PORT).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
Logger.i(" Netty TCP --->连接成功");
isConnect = true;
channel = channelFuture.channel();
} else {
isConnect = false;
}
}
}).sync();
} catch (Exception e) {
Logger.e(e.getMessage());
listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_ERROR);
reconnect();
}
}
return this;
}
/**
* 断开连接
*/
public void disconnect() {
group.shutdownGracefully();
}
/**
* 断开重连机制
*/
public void reconnect() {
if (reconnectNum > 0 && !isConnect && Public_Utils.isTCP) {//如果不是连接状态就重连
reconnectNum--;
try {
Thread.sleep(reconnectIntervalTime);
} catch (InterruptedException e) {
}
Logger.e("Netty TCP --->重新连接");
disconnect();
connect();
} else {
disconnect();
}
}
/**
* 发送消息
* @param data 数据
* @param listener 监听接口
* @return 发送成功与否
*/
protected boolean sendMsgToServer(String data, ChannelFutureListener listener) {
boolean flag = channel != null && isConnect;
if (flag) {
ByteBuf buf = Unpooled.copiedBuffer(data.getBytes());
channel.writeAndFlush(buf).addListener(listener);
}
return flag;
}
/**
* 发送消息
* @param msg
*/
public void sendMsg(final String msg){
sendMsgToServer(msg, new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
Logger.d("Netty TCP --->消息发送成功"+" 【" + msg + "】 ");
} else {
Logger.d("Netty TCP --->消息发送失败"+" 【" + msg + "】 ");
}
}
});
}
/**
* 设置断开重连次数
* @param reconnectNum
*/
public void setReconnectNum(int reconnectNum) {
this.reconnectNum = reconnectNum;
}
/**
* 设置重连机制延时时间
* @param reconnectIntervalTime
*/
public void setReconnectIntervalTime(long reconnectIntervalTime) {
this.reconnectIntervalTime = reconnectIntervalTime;
}
/**
* 得到TCP连接状态
* @return true[连接] false[未连接]
*/
public boolean getConnectStatus() {
return isConnect;
}
/**
* 手动设置TCP连接状态
* @param status
*/
public void setConnectStatus(boolean status) {
this.isConnect = status;
}
/**
* 设置回调接口
* @param listener
*/
public void setListener(NettyListener listener) {
this.listener = listener;
}
}
初始化基本连接数据以及断开重连接口,以及各种设置方法.
- 回调接口
/**
* @author Town
* @created at 2017/11/4 10:37
* @Last Modified by: Town
* @Last Modified time: 2017/11/4 10:37
* @Remarks 回调接口
*/
public interface NettyListener {
byte STATUS_CONNECT_SUCCESS = 1;//连接状态
byte STATUS_CONNECT_CLOSED = 0;//关闭未连接
byte STATUS_CONNECT_ERROR = 0;//出错未连接
// /**
// * 当接收到系统消息(ByteBuf类型)
// */
// void onMessageResponse(ByteBuf byteBuf);
/**
*当接收到系统消息(String类型)
*/
void onMessageResponse(String byteBuf);
/**
* 当服务状态发生变化时触发
*/
void onServiceStatusConnectChanged(int statusCode);
}
定义连接状态值 收到消息的接收类型以及状态发生改变的调用方法.
- SimpleChannelInboundHandler
/**
* @author Town
* @created at 2017/11/4 10:38
* @Last Modified by: Town
* @Last Modified time: 2017/11/4 10:38
* @Remarks Netty
*/
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 回调接口
*/
private NettyListener listener;
public NettyClientHandler(NettyListener listener) {
this.listener = listener;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
try {
MyNettyClitent.getInstance().setConnectStatus(true);
listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_SUCCESS);
} catch (Exception e) {
e.getMessage();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
try {
MyNettyClitent.getInstance().setConnectStatus(false);
listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_CLOSED);
MyNettyClitent.getInstance().reconnect();
} catch (Exception e) {
e.getMessage();
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
// Logger.d("Netty TCP ---> 执行这里了吗"+byteBuf);
try {
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
String body = new String(req, "UTF-8");
if (body.contains("HEARTBEAT")) {
Logger.d("Netty TCP 收到心跳包 --->" + " 【" + body + "】 ");
} else {
Logger.d("Netty TCP 收到消息 --->" + " 【" + body + "】 ");
listener.onMessageResponse(body);
// ctx.writeAndFlush(body);
}
} catch (Exception e) {
e.getMessage();
}
}
}
channelRead0方法接收服务器发过来的消息,此处可以判断是否是心跳包来打印日志.或者过滤一些东西.此处接收类型取决于SimpleChannelInboundHandler
.
- ChannelInitializer
/**
* @author Town
* @created at 2017/11/4 10:38
* @Last Modified by: Town
* @Last Modified time: 2017/11/4 10:38
* @Remarks 配置一个新的Channel
*/
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
private NettyListener listener;
private int WRITE_WAIT_SECONDS = 10;
private int READ_WAIT_SECONDS = 13;
public NettyClientInitializer(NettyListener listener) {
this.listener = listener;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG)); // 开启日志,可以设置日志等级
pipeline.addLast(new DelimiterBasedFrameDecoder(1536, true, false, Unpooled.wrappedBuffer("$$".getBytes())));
// pipeline.addLast(new IdleStateHandler(30, 60, 100));
pipeline.addLast(new NettyClientHandler(listener));
}
}
ChannelInitializer的主要目的是为程序员提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作。ChannelInitializer虽然会在一开始会被注册到Channel相关的pipeline里,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。
- NettyService
/**
* @author Town
* @created at 2017/11/4 10:36
* @Last Modified by: Town
* @Last Modified time: 2017/11/4 10:36
* @Remarks Netty服务
*/
public class NettyService extends Service implements NettyListener {
/**
* 监听网络状态
*/
private NetworkReceiver receiver;
private ScheduledExecutorService mScheduledExecutorService;
private void shutdown() {
if (mScheduledExecutorService != null) {
mScheduledExecutorService.shutdown();
mScheduledExecutorService = null;
}
}
@Override
public void onCreate() {
super.onCreate();
receiver = new NetworkReceiver();
IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
LocalBroadcastManager.getInstance(this).registerReceiver(receiver, filter);
// 自定义心跳,每隔45秒向服务器发送心跳包
mScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
mScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
String requestBody = "1111" +"内容"+ "$$";
Logger.d("Netty TCP 发心跳--->" + requestBody);
MyNettyClitent.getInstance().sendMsgToServer(requestBody, new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
Logger.d("Netty TCP --->心跳发送成功");
} else {
Logger.e("Netty TCP --->心跳发送失败");
}
}
});
}
}, 3, 45, TimeUnit.SECONDS);
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
MyNettyClitent.getInstance().setListener(this);
connect();
return START_NOT_STICKY;
}
@Override
public void onServiceStatusConnectChanged(int statusCode) { //连接状态监听
Logger.d("Netty TCP 连接状态 --->" + statusCode);
if (statusCode == NettyListener.STATUS_CONNECT_SUCCESS) {//在线状态
String requestBody = "1111" +"内容"+ "$$";
Logger.d("Netty TCP 发心跳--->" + requestBody);
MyNettyClitent.getInstance().sendMsgToServer(requestBody, new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
Logger.d("Netty TCP --->心跳发送成功");
} else {
Logger.e("Netty TCP --->心跳发送失败");
}
}
});
} //else {
//// Public_Utils.isTcpStatus = false;
//
// }
}
@Override
public void onMessageResponse(String byteBuf) {
Intent intent = new Intent();
if (byteBuf != null) {
//这里接收服务器消息
}
}
/**
* 连接服务器
*/
private void connect() {
if (!MyNettyClitent.getInstance().getConnectStatus()) {
new Thread(new Runnable() {
@Override
public void run() {
MyNettyClitent.getInstance().connect();//连接服务器
}
}).start();
}
}
@Override
public void onDestroy() {
super.onDestroy();
LocalBroadcastManager.getInstance(this).unregisterReceiver(receiver);
shutdown();
MyNettyClitent.getInstance().setReconnectNum(0);
MyNettyClitent.getInstance().disconnect();
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
throw new UnsupportedOperationException("Not yet implemented");
}
public class NetworkReceiver extends BroadcastReceiver {
@Override
public void onReceive(Context context, Intent intent) {
ConnectivityManager cm = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
NetworkInfo activeNetwork = cm.getActiveNetworkInfo();
if (activeNetwork != null) { //
if (activeNetwork.getType() == ConnectivityManager.TYPE_WIFI
|| activeNetwork.getType() == ConnectivityManager.TYPE_MOBILE) {
connect();
}
}
}
}
}
在onMessageResponse方法里接收服务器发过来的消息,根据自己业务场景再做处理.