Netty Client实战——高并发连接池方案

引言

最近研究了Netty的相关技术,用于实施高并发场景下的消息通信,期间搜集了大量资料,围绕着netty的channel连接池的设计,这个稍微有些复杂的主题,做了大量功课,其中牵扯到蛮多技术点,要想在网上找到相关的又相对完整的参考文章,确实不太容易。在此记录一下实现的方案,用于技术沉淀。

首先,阅读本文之前需要具备一些基础知识:

  1. socket通信和长短连接
  2. 知道Netty的执行流程和相关API操作
  3. 理解什么是TCP半包,了解Netty提供的粘包和拆包解码器 在此贴出一些学习过程中遇到的优秀Blog
    官方文档
    分隔符解码器处理半包问题
    netty实战-netty client连接池设计(Netty官方新版本中已经实现了简单的连接池,可以学习连接池的设计思想)

线程模型

首先,整个系统架构的线程模型如下:

线程模型

同步通信机制

其次我们需要关注单线程内的同步请求和响应
抛出问题:
Q1:如何实现基于Netty的“请求-响应”同步通信机制

Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。用于实现微服务之间的调用和返回结果获取,要实现这个需求,需要解决两个问题:
a. 请求和响应的正确匹配。
当服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢?解决方式:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。
b. 请求线程和响应线程的通信。
因为请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty在接受到响应之后,怎么通知请求线程结果。

方案:使用LinkedBlockingQueue阻塞任务队列,使用take()获取相应的返回结果
首先需要对每一个请求标识一个全局唯一的标识,下面贴出核心代码:

NettyChannelPoolHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class ChannelTaskThread implements Callable<String> {


/**
* netty channel池
*/
final NettyClientPool nettyClientPool = NettyClientPool.getInstance();

private String message;

public ChannelTaskThread(String message){
this.message = message;
}

@Override
public String call(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
//同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值,时间戳+6位随机数
long random = Long.valueOf(sdf.format(new Date())) * 1000000 + Math.round(Math.random() * 1000000);

Channel channel = nettyClientPool.getChannel(random);
log.debug("在链接池池中取到的Channel: "+ channel.id());
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
//使用固定分隔符的半包解码器
String msg = message + DataBusConstant.DELIMITER;
buffer.writeBytes(msg.getBytes());
NettyClientHandler tcpClientHandler = channel.pipeline().get(NettyClientHandler.class);
ChannelId id = channel.id();
log.info("SEND SEQNO[{}] MESSAGE AND CHANNEL id [{}]",random,id);

String serverMsg = tcpClientHandler.sendMessage(buffer, channel);
NettyClientPool.release(channel);
return "请求SEQNO["+random+"] "+ serverMsg;
}
}

NettyClientHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

/**
* 使用阻塞式LinkedBlockingQueue,对响应结果保存
* 用于记录通道响应的结果集
*/
private static final Map<Long, LinkedBlockingQueue<String>> RESULT_MAP = new ConcurrentHashMap<>();

public String sendMessage(ByteBuf message,Channel ch) {
LinkedBlockingQueue<String> linked = new LinkedBlockingQueue<>(1);
//获取channel中存储的全局唯一随机值
Long randomId = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
RESULT_MAP.put(randomId,linked);
ch.writeAndFlush(message);
String res = null;
try {
//设置3分钟的获取超时时间或者使用take()--获取不到返回结果一直阻塞
res = RESULT_MAP.get(randomId).poll(3,TimeUnit.MINUTES);
RESULT_MAP.remove(randomId);
}catch (Exception e){
e.printStackTrace();
}
return res;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
String message = null;
if(msg instanceof String){
message = msg.toString();
}else if(msg instanceof ByteBuf){
message = ((ByteBuf)msg).toString(Charset.defaultCharset());
}
//获取channel中存储的全局唯一随机值
Long randomId = ctx.channel().attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
log.info(" READ INFO 服务端返回结果:"+ message);
LinkedBlockingQueue<String> linked = RESULT_MAP.get(randomId);
linked.add(message);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx){
boolean active = ctx.channel().isActive();
log.debug("[此时通道状态] {}", active);
}

}

连接池的创建

2. 官方提供的FixedChannelPool支持固定连接的连接池,但是不支持连接池的动态回收
直接贴连接池的创建代码,通道的动态回收结合心跳机制实现:

NettyClientPool.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@Slf4j
//和Spring整合
//@Order(Integer.MAX_VALUE+1)
//@Component
//@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class NettyClientPool {

/**
* volatile保持线程之间的可见性,连接池的创建是单例,在这里可加可不加
*/
volatile private static NettyClientPool nettyClientPool;
/**
* key为目标主机的InetSocketAddress对象,value为目标主机对应的连接池
*/
public ChannelPoolMap<InetSocketAddress, FixedChannelPool> poolMap;

final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap strap = new Bootstrap();

// @Value("${netty.server.addresses}")
private static String addresses = "127.0.0.1:7000";

volatile private static Map<InetSocketAddress,FixedChannelPool> pools = new HashMap<>(4);
volatile private static List<InetSocketAddress> addressList;

private NettyClientPool(){
//如果和Spring整合,构造方法内的build方法调用注掉
build();
}

/**
* 单例
* @return
*/
// @Bean(initMethod = "build")
public static NettyClientPool getInstance(){
if(nettyClientPool == null) {
synchronized (NettyClientPool.class) {
if(nettyClientPool == null) {
nettyClientPool = new NettyClientPool();
}
}
}
return nettyClientPool;
}

public void build(){
log.info("NettyClientPool build......");
strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);

poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {
return new FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), DataBusConstant.MAX_CONNECTIONS);
}
};
getInetAddresses(addresses);

for (InetSocketAddress address: addressList){
pools.put(address,poolMap.get(address));
}
}

/**
* <pre>功能描述:
* 根据随机数取出随机的server对应pool,从pool中取出channel
* pool.acquiredChannelCount(); 对应池中的channel数目
* 连接池的动态扩容: 指定最大连接数为{@link Integer.MAX_VALUE},如果连接池队列中取不到channel,会自动创建channel,默认使用FIFO的获取方式,回收的channel优先被再次get到
* SERVER的宕机自动切换: 指定重试次数,get()发生连接异常,则对随机数+1,从下一个池中重新获取,
*
* 后期如有必要可优化为:Server注册到注册中心,从注册中心获取连接池对应的address,或者注册到zookeeper中,都需要单独写实现
*
* </pre>
* @方法名称 getChannel
* @作者 zhangdong
* @创建时间 2019/4/23 11:39
* @param random
* @return io.netty.channel.Channel
*/
public Channel getChannel (long random){
int retry = 0;
Channel channel = null;
try {
//按时间戳取余
Long poolIndex = random % pools.size();
InetSocketAddress address = addressList.get(poolIndex.intValue());
FixedChannelPool pool = pools.get(address);
Future<Channel> future = pool.acquire();
channel = future.get();
AttributeKey<Long> randomID = AttributeKey.valueOf(DataBusConstant.RANDOM_KEY);
channel.attr(randomID).set(random);
//如果是因为服务端挂点,连接失败而获取不到channel,则随机数执行+1操作,从下一个池获取
}catch (ExecutionException e){
log.error(e.getMessage());
//每个池,尝试获取取2次
int count = 2;
if(retry < addressList.size() * count){
retry ++;
return getChannel( ++ random);
}else {
log.error("没有可以获取到channel连接的server,server list [{}]",addressList);
throw new RuntimeException("没有可以获取到channel连接的server");
}
}
catch (InterruptedException e){
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
}
return channel;
}


/**
* <pre>功能描述:
* 回收channel进池,需要保证随机值和getChannel获取到的随机值是同一个,才能从同一个pool中释放资源
* </pre>
* @方法名称 release
* @作者 zhangdong
* @创建时间 2019/4/23 11:16
* @param ch
* @return void
*/
public static void release(Channel ch){
long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
ch.flush();
Long poolIndex = random % pools.size();
pools.get(addressList.get(poolIndex.intValue())).release(ch);
}

/**
* 获取线程池的hash值
* @param ch
* @return
*/
public static int getPoolHash(Channel ch){
long random = ch.attr(AttributeKey.<Long>valueOf(DataBusConstant.RANDOM_KEY)).get();
Long poolIndex = random % pools.size();
return System.identityHashCode(pools.get(addressList.get(poolIndex.intValue())));
}

/**
* <pre>功能描述:
* 获取服务端server列表,每个server对应一个pool
* </pre>
* @方法名称 getInetAddredd
* @作者 zhangdong
* @创建时间 2019/4/23 11:17
* @param addresses
* @return void
*/
public void getInetAddresses(String addresses){
addressList = new ArrayList<>(4);
if(StringUtils.isEmpty(addresses)){
throw new RuntimeException("address列表为空");
}
String[] splits = addresses.split(",");
for (String address: splits){
String[] split = address.split(":");
if(split.length==0){
throw new RuntimeException("[" + address + "]不符合IP:PORT格式");
}
addressList.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
}
}
}

NettyChannelPoolHandler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class NettyChannelPoolHandler implements ChannelPoolHandler {

static final ByteBuf byteBuf = Unpooled.copiedBuffer(DataBusConstant.DELIMITER.getBytes());

@Override
public void channelReleased(Channel ch){
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
log.info("|-->回收Channel. Channel ID: " + ch.id());
}

@Override
public void channelAcquired(Channel ch){
log.info("|-->获取Channel. Channel ID: " + ch.id());
}

@Override
public void channelCreated(Channel ch){

log.info("|-->创建Channel. Channel ID: " + ch.id()
+"\r\n|-->创建Channel. Channel REAL HASH: " + System.identityHashCode(ch));
SocketChannel channel = (SocketChannel) ch;
channel.config().setKeepAlive(true);
channel.config().setTcpNoDelay(true);
channel.pipeline()
//开启Netty自带的心跳处理器,每5秒发送一次心跳
.addLast(new IdleStateHandler(0, 0, 5,TimeUnit.SECONDS))
.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf))
.addLast(new NettyClientHandler());
}
}

通道的动态回收

3. 心跳机制的实现保证心跳不会失活,丢失心跳包的通道的管理,参考上面的 NettyChannelPoolHandler 处理器
动态通道回收,在 NettyClientHandler 类中实现userEventTriggered方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

volatile static Map<Integer,Set<Channel>> coreChannel = new HashMap();

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("[客户端心跳监测发送] 通道编号:{}", ctx.channel().id());
Channel channel = ctx.channel();
if (evt instanceof IdleStateEvent) {
//当客户端开始发送心跳检测时。说明没有业务请求过来,释放通道数为设定的 CORE_CONNECTIONS
if(channel.isActive()){
//使用pool的hash值作为Key,维护 CORE_CONNECTIONS个数个通道,多余的关闭
int poolHash = NettyClientPool.getPoolHash(channel);
Set<Channel> channels = coreChannel.get(poolHash);
channels = channels == null ? new HashSet<>(DataBusConstant.CORE_CONNECTIONS) : channels;
channels.add(channel);
if(channels.stream().filter(x-> x.isActive()).count() > DataBusConstant.CORE_CONNECTIONS){
log.info("关闭 CORE_CONNECTIONS 范围之外的通道:{}",channel.id());
channels.remove(channel);
channel.close();
}
coreChannel.put(poolHash,channels);
}
String heartBeat = DataBusConstant.HEART_BEAT + DataBusConstant.DELIMITER;
ByteBuf byteBuf = Unpooled.copiedBuffer(heartBeat.getBytes());
channel.writeAndFlush(byteBuf);
} else {
super.userEventTriggered(ctx, evt);
}
}

辅助类

常量类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DataBusConstant {

public static final String DELIMITER = "%#_#%";

public static final String HEART_BEAT = "ping-pong-ping-pong";

/**
* 最大连接数
*/
public static final int MAX_CONNECTIONS = Integer.MAX_VALUE;

/**
* 核心链接数,该数目内的通道 在没有业务请求时发送心跳防止失活,超过部分的通道close掉
*/
public static final int CORE_CONNECTIONS = 1;

/**
* 同一个线程使用同一个全局唯一的随机数,保证从同一个池中获取和释放资源,同时使用改随机数作为Key获取返回值
*/
public static final String RANDOM_KEY = "randomID";

/**
* 服务端丢失心跳次数,达到该次数,则关闭通道,默认3次
*/
public static final int LOOS_HEART_BEAT_COUNT = 3;

}

任务连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class NettyTaskPool {

/**
* 线程池线程数量,对应CachedThreadPoolExecutor
*/
private static final int CORE_POLL_SIZE = 0;
private static final int MAX_POLL_SIZE = Integer.MAX_VALUE;

//手动创建线程池
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
CORE_POLL_SIZE,
MAX_POLL_SIZE,
3,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
new ThreadPoolExecutor.DiscardOldestPolicy());

public static String submitTask(String message) throws Exception{
//单个任务在线程池内分配单个线程,用于同步等待封装的返回结果
Future<String> submit = threadPool.submit(new ChannelTaskThread(message));
String response = submit.get();
log.info("\n\t submitTask 返回的 Response: \r\n\t\t[ "+ response +" ]\n");
return response;
}

}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestPool {

public static void main(String[] args) throws Exception{
for (int i = 0; i < 5; i++) {
//模拟多线程客户端,提交任务,
new Thread(()-> {
try {
for (int j = 0; j < 10; j++) {
String longMsgBody = j + "中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国,中华人民共和国" + j;
NettyTaskPool.submitTask(longMsgBody);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}

server端省略

文章作者: egnod
文章链接: http://itboyer.github.io/12466/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 EGNOD'S BLOG
支付宝打赏
微信打赏