网站链接: element-ui dtcms
当前位置: 首页 > 技术博文  > 技术博文

一个超级简单的使用netty NIO实现服务端和客户端的栗子

2021/6/26 18:48:25 人评论

前置条件 IDEA开发工具maven管理依赖熟悉了netty的核心组件,看这个案例会爽一点。如果不是很清楚,可以看我的这篇文章:netty全部核心组件 另外这个案例我直接使用的韩顺平老师的案例。因为我写也是一样,我们就是为了学习&#xff…

前置条件

  1. IDEA开发工具
  2. maven管理依赖
  3. 熟悉了netty的核心组件,看这个案例会爽一点。如果不是很清楚,可以看我的这篇文章:netty全部核心组件 

  另外这个案例我直接使用的韩顺平老师的案例。因为我写也是一样,我们就是为了学习,能CV的代码我就不一个一个敲了。 这段代码本来注释已经比较多了,我自己也加了一部分的注释。

引入maven依赖

  先创建一个项目

  在pom引入依赖,轻松搞定,netty给我们准好了一个大而全的包,不需要额外的引入其他的包。

<dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
       <version>4.1.20.Final</version>
</dependency>

自定义服务端的处理逻辑代码

   一般自定义的逻辑代码,我们为了方便,都去继承netty的channelHanerAdapter。 这样可以省去一部分通用的功能的开发。如果你想要自己实现,也行。看下图channelHanerAdapter实际上也是实现了 ChannelInboundHandler接口。

  这实际上是一个适配器模式,来减轻我们自定义的业务逻辑接入netty的负担。

package netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

/**
 * @author angus
 * @create 2021-06-26 12:57
 * 说明
 * 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
 * 2. 这时我们自定义一个Handler , 才能称为一个handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter{
	//读取数据实际(这里我们可以读取客户端发送的消息)
	/**
	 * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
	 * 2. Object msg: 就是客户端发送的数据 默认Object
	 */
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{

		System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
		System.out.println("server ctx =" + ctx);
		System.out.println("看看channel 和 pipeline的关系");
		Channel channel = ctx.channel();
		ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站

		//将 msg 转成一个 ByteBuf
		//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
		ByteBuf buf = (ByteBuf) msg;
		System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
		System.out.println("客户端地址:" + channel.remoteAddress());
	}

	//数据读取完毕
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//writeAndFlush 是 write + flush
		//将数据写入到缓存,并刷新
		//一般讲,我们对这个发送的数据进行编码
		ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
	}

	//处理异常, 一般是需要关闭通道
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}

 

编写服务端的引导程序

  注释已经比较清晰,我就不赘述了。

package netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import netty.NettyServerHandler;

/**
 * @author angus
 * @create 2021-06-26 12:53
 */
public class NettyServer {
	public static void main(String[] args) {
		//创建BossGroup 和 WorkerGroup
		//说明
		//1. 创建两个线程组 bossGroup 和 workerGroup
		//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
		//3. 两个都是无限循环
		//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
		//   默认实际 cpu核数 * 2
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup(); //8

		try {
			//创建服务器端的启动对象,配置参数, 俗称程序引导器
			ServerBootstrap bootstrap = new ServerBootstrap();
			//使用链式编程来进行设置
			bootstrap.group(bossGroup, workerGroup) //设置两个线程组,服务端是两个线程组,原因就是服务端实际上就是主从的 reactor模型。
					.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现, 这里实际上就是传入一个类型,由这个决定了你要使用nio,还是OIO
					.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
					.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
					//          .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
					.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象),这个程序里边使用的是匿名对象,你也可以单独拿出去,用一个类实现 ChannelInitializer
						//给pipeline 设置处理器
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
							// 拿到pipeline 放置真正处理业务逻辑的 自定义逻辑处理单元 这个是由使用者定义的。其中NettyServerHandler 是一个channelHandler。
							// 因为pipeline里边只能放ChanelHandler,所以自定义的ChanelHandler需要 去实现一个 ChannelInboundHandlerAdapter。通过ChannelInboundHandlerAdapter符合了netty的规范。
							ch.pipeline().addLast(new NettyServerHandler());
						}
					}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

			System.out.println(".....服务器 is ready...");

			//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
			//启动服务器(并绑定端口)
			ChannelFuture cf = bootstrap.bind(6668).sync();

			//给cf 注册监听器,监控我们关心的事件

			cf.addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					if (cf.isSuccess()) {
						System.out.println("监听端口 6668 成功");
					} else {
						System.out.println("监听端口 6668 失败");
					}
				}
			});

			//对关闭通道进行监听
			cf.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

}

 编写client端的处理逻辑

  这个实际上非常简单,就是模拟了一下客户端,发送一条数据。

package netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author angus
 * @create 2021-06-26 18:16
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
	//当通道就绪就会触发该方法
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("client " + ctx);
		ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
	}

	//当通道有读取事件时,会触发
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
		System.out.println("服务器的地址: " + ctx.channel().remoteAddress());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

客户端引导程序

package netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author angus
 * @create 2021-06-26 18:16
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
	//当通道就绪就会触发该方法
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("client " + ctx);
		ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
	}

	//当通道有读取事件时,会触发
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
		System.out.println("服务器的地址: " + ctx.channel().remoteAddress());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

相关资讯

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?