Netty介绍 Netty是一个NIO客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化并简化了TCP和UDP套接字服务器等网络编程。 “快速简便”并不意味着最终的应用程序会受到可维护性或性能问题的影响。Netty经过精心设计,具有丰富的协议,如FTP,SMTP,HTTP以及各种二进制和基于文本的传统协议。因此,Netty成功地找到了一种在不妥协的情况下实现易于开发,性能,稳定性和灵活性的方法。Netty 版本3x(稳定,jdk1.5+),4x(推荐,稳定,jdk1.6+),5x(不推荐),新版本不是很稳定,所以这里使用的是 Netty4x 版本
本案例项目源码地址: https://github.com/haoxiaoyong1014/springboot-netty
项目依赖 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.21.Final</version > </dependency >
IdleStateHandler
Netty 可以使用 IdleStateHandler 来实现连接管理,当连接空闲时间太长(没有发送、接收消息)时则会触发一个事件,我们便可在该事件中实现心跳机制。
编写服务端 public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler <CustomProtocol > { private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatSimpleHandle.class); private static final ByteBuf HEART_BEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new CustomProtocol(123456L , "pong" ).toString(), CharsetUtil.UTF_8)); @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { NettySocketHolder.remove((NioSocketChannel) ctx.channel()); } @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { LOGGER.info("已经5秒没有收到信息!" ); ctx.writeAndFlush(HEART_BEAT).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } super .userEventTriggered(ctx, evt); } @Override protected void channelRead0 (ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception { LOGGER.info("收到customProtocol={}" , customProtocol); NettySocketHolder.put(customProtocol.getId(), (NioSocketChannel) ctx.channel()); } }
对上面代码简要说明: HeartBeatSimpleHandle
继承了 ChannelInboundHandlerAdapter 的一个扩展(SimpleChannelInboundHandler), 而ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 ChannelInboundHandler提供了可以重写的各种事件处理程序方法,包括channelRead0()方法. 目前,只需继承 SimpleChannelInboundHandler或ChannelInboundHandlerAdapter 而不是自己实现处理程序接口。 我们重写了channelRead0()方法,每当接收到新数据时,都会使用收到的消息调用此方法。 当有多个客户端连上来时,服务端需要区分开,不然响应消息就会发生混乱。 所以每当有个连接上来的时候,我们都将当前的 Channel 与连上的客户端 ID 进行关联(因此每个连上的客户端 ID 都必须唯一)。 这里采用了一个 Map 来保存这个关系,并且在断开连接时自动取消这个关联。
public class NettySocketHolder { private static final Map<Long, NioSocketChannel> MAP = new ConcurrentHashMap<>(16 ); public static void put (Long id, NioSocketChannel socketChannel) { MAP.put(id, socketChannel); } public static NioSocketChannel get (Long id) { return MAP.get(id); } public static Map<Long, NioSocketChannel> getMAP () { return MAP; } public static void remove (NioSocketChannel nioSocketChannel) { MAP.entrySet().stream().filter(entry -> entry.getValue() == nioSocketChannel).forEach(entry -> MAP.remove(entry.getKey())); } }
这里使用了jdk1.8的Lambda 表达式启动引导程序
@Component public class HeartBeatServer { private final static Logger LOGGER = LoggerFactory.getLogger(HeartBeatServer.class); private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); @Value ("${netty.server.port}" ) private int nettyPort; @PostConstruct public void start () throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(nettyPort)) .childOption(ChannelOption.SO_KEEPALIVE, true ) .childHandler(new HeartbeatInitializer()); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { LOGGER.info("启动 Netty 成功" ); } } @PreDestroy public void destroy () { boss.shutdownGracefully().syncUninterruptibly(); work.shutdownGracefully().syncUninterruptibly(); LOGGER.info("关闭 Netty 成功" ); } }
对上面代码进行简要说明 (1),NioEventLoopGroup是一个处理I / O操作的多线程事件循环。 Netty为不同类型的传输提供各种EventLoopGroup实现。我们在此示例中实现了服务器端应用程序,因此将使用两个NioEventLoopGroup。第一个,通常称为“老板”,接受传入连接。第二个,通常称为“工人”,一旦老板接受连接并将接受的连接注册到工作人员,就处理被接受连接的流量。使用了多少个线程以及它们如何映射到创建的Channels取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。 (2),ServerBootstrap是一个设置服务器的帮助程序类。 (3), 在这里,我们指定使用NioServerSocketChannel
类,该类用于实例化新的Channel以接受传入的连接。 (4),保持长连接 (5), childHandler()方法需要一个ChannelInitializer类,ChannelInitializer是一个特殊的处理程序,旨在帮助用户配置新的Channel,您最有可能希望通过添加一些处理程序(如DiscardServerHandler)来配置新Channel的ChannelPipeline,以实现您的网络应用程序。 随着应用程序变得复杂,您可能会向管道添加更多处理程序,并最终将此匿名类提取到顶级类中。 这里我们用HeartbeatInitializer类继承了ChannelInitializer类并重写了initChannel()方法
public class HeartbeatInitializer extends ChannelInitializer <Channel > { @Override protected void initChannel (Channel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(5 , 0 , 0 )) .addLast(new HeartbeatDecoder()) .addLast(new HeartBeatSimpleHandle()); } }
(6).绑定到端口并启动服务器。 (7).当程序关闭时优雅的关闭 Neety (8).将IdleStateHandler 添加到 ChannelPipeline 中,也会有一个定时任务,每5秒校验一次是否有收到消息,否则就主动发送一次请求。 (9),服务端解码器,服务端与客户端采用的是自定义的 POJO 进行通讯的,所以需要在客户端进行编码,服务端进行解码,也都只需要各自实现一个编解码器即可。(下面也会说到)
public class HeartbeatDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { long id = byteBuf.readLong(); byte [] bytes = new byte [byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String content = new String(bytes); CustomProtocol customProtocol = new CustomProtocol(); customProtocol.setId(id); customProtocol.setContent(content); list.add(customProtocol); } }
(10),HeartBeatSimpleHandle
继承了 ChannelInboundHandlerAdapter 的一个扩展(SimpleChannelInboundHandler)
编写客户端
当客户端空闲了 N 秒没有给服务端发送消息时会自动发送一个心跳来维持连接。
public class EchoClientHandle extends SimpleChannelInboundHandler <ByteBuf > { private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class); public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.WRITER_IDLE) { LOGGER.info("已经10秒没收到消息了" ); CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat" ,CustomProtocol.class); ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } super .userEventTriggered(ctx, evt); } @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { LOGGER.info("客户端收到消息={}" , byteBuf.toString(CharsetUtil.UTF_8)); } }
对上面代码简要说明: EchoClientHandle继承了 ChannelInboundHandlerAdapter 的一个扩展(SimpleChannelInboundHandler), 而ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 ChannelInboundHandler提供了可以重写的各种事件处理程序方法,包括channelRead0()方法. 目前,只需继承 SimpleChannelInboundHandler或ChannelInboundHandlerAdapter 而不是自己实现处理程序接口。 我们重写了channelRead0()方法,每当接收到新数据时,都会使用收到的消息调用此方法。 由于整合了 SpringBoot,在这里我们向服务端发送的是一个 单例的 Bean(上面所说的 pojo),所涉及到的类有:
SpringBeanFactory
@Component public final class SpringBeanFactory implements ApplicationContextAware { private static ApplicationContext context; public static <T> T getBean (Class<T> c) { return context.getBean(c); } public static <T> T getBean (String name,Class<T> clazz) { return context.getBean(name,clazz); } @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { context = applicationContext; } }
CustomProtocol
public class CustomProtocol implements Serializable { private static final long serialVersionUID = 290429819350651974L ; private long id; private String content;
HeartBeatConfig
将CustomProtocol设置为一个 Bean,并赋值@Configuration public class HeartBeatConfig { @Value ("${channel.id}" ) private long id; @Bean (value = "heartBeat" ) public CustomProtocol heartBeat () { return new CustomProtocol(id,"ping" ) ; } }
当然我们还需要一个启动引导类
@Component public class HeartbeatClient { private final static Logger LOGGER = LoggerFactory.getLogger(HeartbeatClient.class); private EventLoopGroup group = new NioEventLoopGroup(); @Value ("${netty.server.port}" ) private int nettyPort; @Value ("${netty.server.host}" ) private String host; private SocketChannel socketChannel; @PostConstruct public void start () throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new CustomerHandleInitializer()); ChannelFuture future = bootstrap.connect(host, nettyPort).sync(); if (future.isSuccess()) { LOGGER.info("启动 Netty 成功" ); } socketChannel = (SocketChannel) future.channel(); } }
对上面代码进行简要说明: Netty中服务器和客户端之间最大和唯一的区别是使用了不同的Bootstrap和Channel实现。 (1),如果只指定一个EventLoopGroup,它将同时用作boss组和worker组。 但是,老板工作者不会用于客户端。 (2),Bootstrap(客户端使用)类似于ServerBootstrap(服务端使用),不同之处在于它适用于非服务器通道 (3),NioSocketChannel用于创建客户端通道,而不是NioServerSocketChannel(服务端) (4),客户端handler()方法与服务端childHandler()同样都是需要一个ChannelInitializer类,ChannelInitializer是一个特殊的处理程序,这里用CustomerHandleInitializer继承了ChannelInitializer
public class CustomerHandleInitializer extends ChannelInitializer <Channel > { @Override protected void initChannel (Channel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0 , 10 , 0 )) .addLast(new HeartbeatEncode()) .addLast(new EchoClientHandle()); } }
(5),我们这里应该调用connect()方法而不是bind()方法。 (6),客户端的心跳其实也是类似,也需要在 ChannelPipeline 中添加一个 IdleStateHandler (7),客户端编码器
public class HeartbeatEncode extends MessageToByteEncoder <CustomProtocol > { @Override protected void encode (ChannelHandlerContext channelHandlerContext, CustomProtocol customProtocol, ByteBuf byteBuf) throws Exception { byteBuf.writeLong(customProtocol.getId()) ; byteBuf.writeBytes(customProtocol.getContent().getBytes()) ; } }
(8),EchoClientHandle继承了 ChannelInboundHandlerAdapter 的一个扩展(SimpleChannelInboundHandler)
这里贴出客户端与服务端的配置文件
客户端
# web port server.port=8082 # 通道 ID channel.id=100 netty.server.host=127.0 .0.1 netty.server.port=11211
服务端
# web port server.port=8081 netty.server.port=11211
启动客户端以及服务端
本案例项目源码地址: https://github.com/haoxiaoyong1014/springboot-netty
参考文章:
https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/ 需要了解更多参考官网
学习资料