Netty简单介绍

Netty是一个NIO客户端服务器框架,可以快速简单地开发协议服务器和客户端等网络应用程序。它极大地简化和简化了TCP和UDP套接字服务器等网络编程。

“快速而简单”并不意味着由此产生的应用程序将受到可维护性或性能问题的困扰。Netty的设计经验非常丰富,包括FTP,SMTP,HTTP以及各种基于二进制和文本的传统协议。因此,Netty已经成功地找到了一个方法来实现轻松的开发,性能,稳定性和灵活性,而不用妥协。

Netty包下载以及使用

下载地址 : Netty包下载

Downloads
Netty is distributed under Apache License, v2.0. Please see the en­closed NOTICE.txt for more information.

netty-4.1.20.Final.tar.bz2 ‐ 22-Jan-2018 (Stable, Recommended)

netty-4.0.55.Final.tar.bz2 ‐ 22-Jan-2018 (Stable)

netty-3.10.6.Final-dist.tar.bz2 ‐ 29-Jun-2016 (Stable)

Changelogs and road map are available in our issue tracker.

进去后点击netty-4.1.20.Final.tar.bz2或者netty-4.0.55.Final.tar.bz2 或者netty-3.10.6.Final-dist.tar.bz2 即可下载相应的包

下载后目录为:

├── jar jar包文件夹
 |  └── ........jar 所有的分开的jar包
 |  └── all-in-one 所有jar包集合起来的一个jar包
├── javadoc 帮助文档
├── license 开源说明 
├── COMTRIBUTING.md
├── LICENSE.txt
├── NOTICE.txt
└── README.txt

下载后只需要jar文件夹里的jar包,取自己需要的.下面是jar文件作用:

netty-common 实用程序类和日志记录外观

netty-buffer 替换java.nio.ByteBuffer的ByteBuf API

netty-transport 通道API和核心传输

netty-transport-rxtx Rxtx传输

netty-transport-sctp SCTP传输

netty-transport-udt UDT传输

netty-handler 有用的ChannelHandler实现

netty-codec 编解码器框架,帮助编写一个编码器和一个解码器

netty-codec-http 与HTTP,Web Sockets,SPDY和RTSP相关的编解码器

netty-codec-socks 与SOCKS协议相关的编解码器

简单使用

  • 首先写一个Netty客户端

/**
 * @author Town
 * @created at 2017/11/4 10:39
 * @Last Modified by: Town
 * @Last Modified time: 2017/11/4 10:39
 * @Remarks netty客户端
 */
public class MyNettyClitent {
    private static MyNettyClitent nettyClient = new MyNettyClitent();

    private EventLoopGroup group;

    private NettyListener listener;

    private Channel channel;

    private boolean isConnect = false;

    private int reconnectNum = Integer.MAX_VALUE;

    private long reconnectIntervalTime = 5000;

    public static MyNettyClitent getInstance() {
        return nettyClient;
    }

    /**
     * 连接服务器
     */
    public synchronized MyNettyClitent connect() {
        if (!isConnect) {
            group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap().group(group)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioSocketChannel.class)
                    .handler(new NettyClientInitializer(listener));
            try {
                bootstrap.connect(AppUrl.SOCKET_SERVER, AppUrl.SOCKET_PORT).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            Logger.i(" Netty TCP --->连接成功");
                            isConnect = true;
                            channel = channelFuture.channel();
                        } else {
                            isConnect = false;
                        }
                    }
                }).sync();

            } catch (Exception e) {
                Logger.e(e.getMessage());
                listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_ERROR);
                reconnect();
            }
        }
        return this;
    }

    /**
     * 断开连接
     */
    public void disconnect() {
        group.shutdownGracefully();
    }

    /**
     * 断开重连机制
     */
    public void reconnect() {
        if (reconnectNum > 0 && !isConnect && Public_Utils.isTCP) {//如果不是连接状态就重连
            reconnectNum--;
            try {
                Thread.sleep(reconnectIntervalTime);
            } catch (InterruptedException e) {
            }
            Logger.e("Netty TCP --->重新连接");
            disconnect();
            connect();
        } else {
            disconnect();
        }
    }

    /**
     * 发送消息
     * @param data 数据
     * @param listener 监听接口
     * @return 发送成功与否
     */
    protected boolean sendMsgToServer(String data, ChannelFutureListener listener) {
        boolean flag = channel != null && isConnect;
        if (flag) {
            ByteBuf buf = Unpooled.copiedBuffer(data.getBytes());
            channel.writeAndFlush(buf).addListener(listener);
        }
        return flag;
    }

    /**
     * 发送消息
     * @param msg
     */
    public  void sendMsg(final String msg){
        sendMsgToServer(msg,  new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    Logger.d("Netty TCP --->消息发送成功"+" 【" + msg + "】 ");
                } else {
                    Logger.d("Netty TCP --->消息发送失败"+" 【" + msg + "】 ");
                }
            }
        });
    }

    /**
     * 设置断开重连次数
     * @param reconnectNum
     */
    public void setReconnectNum(int reconnectNum) {
        this.reconnectNum = reconnectNum;
    }

    /**
     * 设置重连机制延时时间
     * @param reconnectIntervalTime
     */
    public void setReconnectIntervalTime(long reconnectIntervalTime) {
        this.reconnectIntervalTime = reconnectIntervalTime;
    }

    /**
     * 得到TCP连接状态
     * @return true[连接] false[未连接]
     */
    public boolean getConnectStatus() {
        return isConnect;
    }

    /**
     * 手动设置TCP连接状态
     * @param status
     */
    public void setConnectStatus(boolean status) {
        this.isConnect = status;
    }

    /**
     * 设置回调接口
     * @param listener
     */
    public void setListener(NettyListener listener) {
        this.listener = listener;
    }
}

初始化基本连接数据以及断开重连接口,以及各种设置方法.

  • 回调接口

/**
 * @author Town
 * @created at 2017/11/4 10:37
 * @Last Modified by: Town
 * @Last Modified time: 2017/11/4 10:37
 * @Remarks 回调接口
 */
public interface NettyListener {

    byte STATUS_CONNECT_SUCCESS = 1;//连接状态

    byte STATUS_CONNECT_CLOSED = 0;//关闭未连接

    byte STATUS_CONNECT_ERROR = 0;//出错未连接


//    /**
//     * 当接收到系统消息(ByteBuf类型)
//     */
//    void onMessageResponse(ByteBuf byteBuf);

    /**
     *当接收到系统消息(String类型)
     */
    void onMessageResponse(String byteBuf);

    /**
     * 当服务状态发生变化时触发
     */
     void onServiceStatusConnectChanged(int statusCode);
}

定义连接状态值 收到消息的接收类型以及状态发生改变的调用方法.

  • SimpleChannelInboundHandler
/**
 * @author Town
 * @created at 2017/11/4 10:38
 * @Last Modified by: Town
 * @Last Modified time: 2017/11/4 10:38
 * @Remarks Netty
 */
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 回调接口
     */
    private NettyListener listener;

    public NettyClientHandler(NettyListener listener) {
        this.listener = listener;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        try {
            MyNettyClitent.getInstance().setConnectStatus(true);
            listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_SUCCESS);
        } catch (Exception e) {
            e.getMessage();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        try {
            MyNettyClitent.getInstance().setConnectStatus(false);
            listener.onServiceStatusConnectChanged(NettyListener.STATUS_CONNECT_CLOSED);
            MyNettyClitent.getInstance().reconnect();
        } catch (Exception e) {
            e.getMessage();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
//        Logger.d("Netty TCP ---> 执行这里了吗"+byteBuf);
        try {
            byte[] req = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(req);
            String body = new String(req, "UTF-8");
            if (body.contains("HEARTBEAT")) {
                Logger.d("Netty TCP 收到心跳包 --->" + " 【" + body + "】 ");
            } else {
                Logger.d("Netty TCP 收到消息 --->" + " 【" + body + "】 ");
                listener.onMessageResponse(body);
//                ctx.writeAndFlush(body);
            }
        } catch (Exception e) {
            e.getMessage();
        }
    }

}

channelRead0方法接收服务器发过来的消息,此处可以判断是否是心跳包来打印日志.或者过滤一些东西.此处接收类型取决于SimpleChannelInboundHandler .

  • ChannelInitializer
/**
 *  @author Town
 *  @created at 2017/11/4 10:38
 *  @Last Modified by: Town
 *  @Last Modified time: 2017/11/4 10:38
 *  @Remarks 配置一个新的Channel
 */
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

    private NettyListener listener;

    private int WRITE_WAIT_SECONDS = 10;

    private int READ_WAIT_SECONDS = 13;

    public NettyClientInitializer(NettyListener listener) {
        this.listener = listener;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));    // 开启日志,可以设置日志等级
        pipeline.addLast(new DelimiterBasedFrameDecoder(1536, true, false, Unpooled.wrappedBuffer("$$".getBytes())));
//        pipeline.addLast(new IdleStateHandler(30, 60, 100));
        pipeline.addLast(new NettyClientHandler(listener));
    }
}

ChannelInitializer的主要目的是为程序员提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作。ChannelInitializer虽然会在一开始会被注册到Channel相关的pipeline里,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。

  • NettyService

/**
 * @author Town
 * @created at 2017/11/4 10:36
 * @Last Modified by:   Town
 * @Last Modified time: 2017/11/4 10:36
 * @Remarks Netty服务
 */
public class NettyService extends Service implements NettyListener {
    /**
     * 监听网络状态
     */
    private NetworkReceiver receiver;

    private ScheduledExecutorService mScheduledExecutorService;

    private void shutdown() {
        if (mScheduledExecutorService != null) {
            mScheduledExecutorService.shutdown();
            mScheduledExecutorService = null;
        }
    }

    @Override
    public void onCreate() {
        super.onCreate();

        receiver = new NetworkReceiver();
        IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
        LocalBroadcastManager.getInstance(this).registerReceiver(receiver, filter);

        // 自定义心跳,每隔45秒向服务器发送心跳包
        mScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                String requestBody = "1111" +"内容"+ "$$";
                Logger.d("Netty TCP  发心跳--->" + requestBody);
                MyNettyClitent.getInstance().sendMsgToServer(requestBody, new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) {
                        if (future.isSuccess()) {
                            Logger.d("Netty TCP --->心跳发送成功");
                        } else {
                            Logger.e("Netty TCP --->心跳发送失败");
                        }
                    }
                });
            }
        }, 3, 45, TimeUnit.SECONDS);
    }

    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        MyNettyClitent.getInstance().setListener(this);
        connect();
        return START_NOT_STICKY;
    }

    @Override
    public void onServiceStatusConnectChanged(int statusCode) { //连接状态监听
        Logger.d("Netty TCP 连接状态 --->" + statusCode);
        if (statusCode == NettyListener.STATUS_CONNECT_SUCCESS) {//在线状态
            String requestBody = "1111" +"内容"+ "$$";
            Logger.d("Netty TCP  发心跳--->" + requestBody);
            MyNettyClitent.getInstance().sendMsgToServer(requestBody, new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        Logger.d("Netty TCP --->心跳发送成功");
                    } else {
                        Logger.e("Netty TCP --->心跳发送失败");
                    }
                }
            });
        } //else {
////            Public_Utils.isTcpStatus = false;
//
//        }
    }


    @Override
    public void onMessageResponse(String byteBuf) {
                Intent intent = new Intent();
                if (byteBuf != null) {
        //这里接收服务器消息
                }

    }

    /**
     * 连接服务器
     */
    private void connect() {
        if (!MyNettyClitent.getInstance().getConnectStatus()) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    MyNettyClitent.getInstance().connect();//连接服务器
                }
            }).start();
        }
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        LocalBroadcastManager.getInstance(this).unregisterReceiver(receiver);
        shutdown();
        MyNettyClitent.getInstance().setReconnectNum(0);
        MyNettyClitent.getInstance().disconnect();
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    public class NetworkReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {
            ConnectivityManager cm = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
            NetworkInfo activeNetwork = cm.getActiveNetworkInfo();
            if (activeNetwork != null) { //
                if (activeNetwork.getType() == ConnectivityManager.TYPE_WIFI
                        || activeNetwork.getType() == ConnectivityManager.TYPE_MOBILE) {
                    connect();
                }
            }
        }
    }
}

在onMessageResponse方法里接收服务器发过来的消息,根据自己业务场景再做处理.