引言 最近研究了Netty的相关技术,用于实施高并发场景下的消息通信,期间搜集了大量资料,围绕着netty的channel连接池的设计,这个稍微有些复杂的主题,做了大量功课,其中牵扯到蛮多技术点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。在此记录一下实现的方案,用于技术沉淀。
首先,阅读本文之前需要具备一些基础知识:
socket通信和长短连接
知道Netty的执行流程和相关API操作
理解什么是TCP半包,了解Netty提供的粘包和拆包解码器
在此贴出一些学习过程中遇到的优秀Blog官方文档 分隔符解码器处理半包问题 netty实战-netty client连接池设计 (Netty官方新版本中已经实现了简单的连接池,可以学习连接池的设计思想)
线程模型 首先,整个系统架构的线程模型如下:
同步通信机制 其次我们需要关注单线程内的同步请求和响应 抛出问题: Q1:如何实现基于Netty的“请求-响应”同步通信机制
Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题: a. 请求和响应的正确匹配。 当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。 b. 请求线程和响应线程的通信。 因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。
方案:使用LinkedBlockingQueue
阻塞任务队列,使用take()获取相应的返回结果 首先需要对每一个请求标识一个全局唯一的标识,下面贴出核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf 4jpublic class ChannelTaskThread implements Callable <String > { final NettyClientPool nettyClientPool = NettyClientPool.getInstance(); private String message; public ChannelTaskThread (String message) { this .message = message; } @Override public String call () { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS" ); long random = Long.valueOf(sdf.format(new Date())) * 1000000 + Math.round(Math.random() * 1000000 ); Channel channel = nettyClientPool.getChannel(random); log.debug("在链接池池中取到的Channel: " + channel.id()); UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false ); ByteBuf buffer = allocator.buffer(20 ); String msg = message + DataBusConstant.DELIMITER; buffer.writeBytes(msg.getBytes()); NettyClientHandler tcpClientHandler = channel.pipeline().get(NettyClientHandler.class); ChannelId id = channel.id(); log.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]" ,random,id); String serverMsg = tcpClientHandler.sendMessage(buffer, channel); NettyClientPool.release(channel); return "请求SEQNO[" +random+"] " + serverMsg; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf 4jpublic class NettyClientHandler extends ChannelInboundHandlerAdapter { private static final Map<Long, LinkedBlockingQueue<String>> RESULT_MAP = new ConcurrentHashMap<>(); public String sendMessage (ByteBuf message,Channel ch) { LinkedBlockingQueue<String> linked = new LinkedBlockingQueue<>(1 ); Long randomId = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get(); RESULT_MAP.put(randomId,linked); ch.writeAndFlush(message); String res = null ; try { res = RESULT_MAP.get(randomId).poll(3 ,TimeUnit.MINUTES); RESULT_MAP.remove(randomId); }catch (Exception e){ e.printStackTrace(); } return res; } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { String message = null ; if (msg instanceof String){ message = msg.toString(); }else if (msg instanceof ByteBuf){ message = ((ByteBuf)msg).toString(Charset.defaultCharset()); } Long randomId = ctx.channel().attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get(); log.info(" READ INFO 服务端返回结果:" + message); LinkedBlockingQueue<String> linked = RESULT_MAP.get(randomId); linked.add(message); } @Override public void channelReadComplete (ChannelHandlerContext ctx) { boolean active = ctx.channel().isActive(); log.debug("[此时通道状态] {}" , active); } }
连接池的创建 2. 官方提供的FixedChannelPool支持固定连接的连接池,但是不支持连接池的动态回收 直接贴连接池的创建代码,通道的动态回收结合心跳机制实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 @Slf 4jpublic class NettyClientPool { volatile private static NettyClientPool nettyClientPool; public ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap; final EventLoopGroup group = new NioEventLoopGroup(); final Bootstrap strap = new Bootstrap(); private static String addresses = "127.0.0.1:7000" ; volatile private static Map<InetSocketAddress,FixedChannelPool> pools = new HashMap<>(4 ); volatile private static List<InetSocketAddress> addressList; private NettyClientPool () { build(); } public static NettyClientPool getInstance () { if (nettyClientPool == null ) { synchronized (NettyClientPool.class) { if (nettyClientPool == null ) { nettyClientPool = new NettyClientPool(); } } } return nettyClientPool; } public void build () { log.info("NettyClientPool build......" ); strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true ) .option(ChannelOption.SO_KEEPALIVE, true ); poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() { @Override protected FixedChannelPool newPool (InetSocketAddress key) { return new FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS); } }; getInetAddresses(addresses); for (InetSocketAddress address: addressList){ pools.put(address,poolMap.get(address)); } } public Channel getChannel (long random) { int retry = 0 ; Channel channel = null ; try { Long poolIndex = random % pools.size(); InetSocketAddress address = addressList.get(poolIndex.intValue()); FixedChannelPool pool = pools.get(address); Future<Channel> future = pool.acquire(); channel = future.get(); AttributeKey<Long> randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY); channel.attr(randomID).set(random); }catch (ExecutionException e){ log.error(e.getMessage()); int count = 2 ; if (retry < addressList.size() * count){ retry ++; return getChannel( ++ random); }else { log.error("没有可以获取到channel连接的server,server list [{}]" ,addressList); throw new RuntimeException("没有可以获取到channel连接的server" ); } } catch (InterruptedException e){ e.printStackTrace(); }catch (Exception e){ e.printStackTrace(); } return channel; } public static void release (Channel ch) { long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get(); ch.flush(); Long poolIndex = random % pools.size(); pools.get(addressList.get(poolIndex.intValue())).release(ch); } public static int getPoolHash (Channel ch) { long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get(); Long poolIndex = random % pools.size(); return System.identityHashCode(pools.get(addressList.get(poolIndex.intValue()))); } public void getInetAddresses (String addresses) { addressList = new ArrayList<>(4 ); if (StringUtils.isEmpty(addresses)){ throw new RuntimeException("address列表为空" ); } String[] splits = addresses.split("," ); for (String address: splits){ String[] split = address.split(":" ); if (split.length==0 ){ throw new RuntimeException("[" + address + "]不符合IP:PORT格式" ); } addressList.add(new InetSocketAddress(split[0 ], Integer.parseInt(split[1 ]))); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf 4jpublic class NettyChannelPoolHandler implements ChannelPoolHandler { static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes()); @Override public void channelReleased (Channel ch) { ch.writeAndFlush(Unpooled.EMPTY_BUFFER); log.info("|-->回收Channel. Channel ID: " + ch.id()); } @Override public void channelAcquired (Channel ch) { log.info("|-->获取Channel. Channel ID: " + ch.id()); } @Override public void channelCreated (Channel ch) { log.info("|-->创建Channel. Channel ID: " + ch.id() +"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch)); SocketChannel channel = (SocketChannel) ch; channel.config().setKeepAlive(true ); channel.config().setTcpNoDelay(true ); channel.pipeline() .addLast(new IdleStateHandler(0 , 0 , 5 ,TimeUnit.SECONDS)) .addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf)) .addLast(new NettyClientHandler()); } }
通道的动态回收 3. 心跳机制的实现保证心跳不会失活,丢失心跳包的通道的管理,参考上面的 NettyChannelPoolHandler 处理器 动态通道回收,在 NettyClientHandler 类中实现userEventTriggered
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 volatile static Map<Integer,Set<Channel>> coreChannel = new HashMap();@Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { log.info("[客户端心跳监测发送] 通道编号:{}" , ctx.channel().id()); Channel channel = ctx.channel(); if (evt instanceof IdleStateEvent) { if (channel.isActive()){ int poolHash = NettyClientPool.getPoolHash(channel); Set<Channel> channels = coreChannel.get(poolHash); channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels; channels.add(channel); if (channels.stream().filter(x-> x.isActive()).count() > DataBusConstant.CORE_CONNECTIONS){ log.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}" ,channel.id()); channels.remove(channel); channel.close(); } coreChannel.put(poolHash,channels); } String heartBeat = DataBusConstant.HEART_BEAT + DataBusConstant.DELIMITER; ByteBuf byteBuf = Unpooled.copiedBuffer(heartBeat.getBytes()); channel.writeAndFlush(byteBuf); } else { super .userEventTriggered(ctx, evt); } }
辅助类 常量类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class DataBusConstant { public static final String DELIMITER = "%#_#%" ; public static final String HEART_BEAT = "ping-pong-ping-pong" ; public static final int MAX_CONNECTIONS = Integer.MAX_VALUE; public static final int CORE_CONNECTIONS = 1 ; public static final String RANDOM_KEY = "randomID" ; public static final int LOOS_HEART_BEAT_COUNT = 3 ; }
任务连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Slf 4jpublic class NettyTaskPool { private static final int CORE_POLL_SIZE = 0 ; private static final int MAX_POLL_SIZE = Integer.MAX_VALUE; private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( CORE_POLL_SIZE, MAX_POLL_SIZE, 3 , TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.DiscardOldestPolicy()); public static String submitTask (String message) throws Exception { Future<String> submit = threadPool.submit(new ChannelTaskThread(message)); String response = submit.get(); log.info("\n\t submitTask 返回的 Response: \r\n\t\t[ " + response +" ]\n" ); return response; } }
测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class TestPool { public static void main (String[] args) throws Exception { for (int i = 0 ; i < 5 ; i++) { new Thread(()-> { try { for (int j = 0 ; j < 10 ; j++) { String longMsgBody = j + "中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国" + j; NettyTaskPool.submitTask(longMsgBody); } } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
server端省略