网站首页 > 技术文章 正文
推荐视频:
6种epoll的设计,让你吊打面试官【linux服务器开发】
网络原理tcp/udp,网络编程epoll/reactor,面试中正经“八股文”
c/c++ linux服务器开发学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂
为了行文方便,以下将侦听 socket 称之为 listenfd,将由调用 accept 函数返回的 socket 称之为 clientfd。
我们知道如果需要使用 IO 复用函数统一管理各个 fd,需要将 clientfd 设置成非阻塞的,那么 listenfd 一定要设置成非阻塞的吗?答案是不一定的——只要不用 IO 复用函数去管理 listenfd 就可以了,listenfd 如果不设置成非阻塞的,那么 accept 函数在没有新连接时就会阻塞。
1. 结构一 listenfd 设置为阻塞模式,为了 listenfd 独立分配一个接受连接线程
有很多的服务器程序结构确实采用的就是阻塞的 listenfd,为了不让 accept 函数在没有连接时阻塞对程序其他逻辑执行流造成影响,我们通常将 accept 函数放在一个独立的线程中,这个线程的伪码如下:
//接受连接线程
void* accept_thread_func(void* param)
{
//可以在这里做一些初始化工作...
while (退出标志)
{
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
//没有连接时,线程会阻塞在accept函数处
int clientfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientaddrlen);
if (clientfd != -1)
{
//出错了,可以在此做一些清理资源动作,如关闭listenfd
break;
}
//将clientfd交给其他IO线程的IO复用函数
//由于跨线程操作,可以需要一些锁对公共操作的资源进行保护
}
}
其他 IO 线程的结构还是利用 IO 复用函数处理 clientfd 的 one thread one loop 结构,这里以 epoll_wait 为例,即:
//其他IO线程
void* io_thread_func(void* param)
{
//可以在这里做一些初始化工作
while (退出标志)
{
epoll_event epoll_events[1024];
//所有的clientfd都挂载到epollfd由epoll_wait统一检测读写事件
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
//epoll_wait返回时处理对应clientfd上的读写事件
//其他一些操作
}
}
当然,这里的 IO 线程可以存在多个,这种结构示意图如下:
将 clientfd 从 accept_thread_func 交给 io_thread_func方法也很多,这里以使用一个互斥锁来实现为例:
//存储accept函数产生的clientfd的多线程共享变量
std::vector<int> g_vecClientfds;
//保护g_vecClientfds的互斥体
std::mutex g_clientfdMutex;
//接受连接线程
void* accept_thread_func(void* param)
{
//可以在这里做一些初始化工作...
while (退出标志)
{
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
//没有连接时,线程会阻塞在accept函数处
int clientfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientaddrlen);
if (clientfd != -1)
{
//出错了,可以在此做一些清理资源动作,如关闭listenfd
break;
}
//将clientfd交给其他IO线程的IO复用函数
//由于跨线程操作,可以需要一些锁对公共操作的资源进行保护
std::lock_guard<std::mutex> scopedLock(g_clientfdMutex);
g_vecClientfds.push_back(clientfd);
}
}
//其他IO线程
void* io_thread_func(void* param)
{
//可以在这里做一些初始化工作
while (退出标志)
{
epoll_event epoll_events[1024];
//所有的clientfd都挂载到epollfd由epoll_wait统一检测读写事件
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
//epoll_wait返回时处理对应clientfd上的读写事件
//其他一些操作
//从共享变量g_vecClientfds取出新的clientfd
retrieveNewClientfds(epollfd);
}
}
void retrieveNewClientfds(int epollfd)
{
std::lock_guard<std::mutex> scopedLock(g_clientfdMutex);
if (!g_vecClientfds.empty())
{
//遍历g_vecClientfds取出各个fd,然后将fd设置挂载到所在线程的epollfd上
//全部取出后,清空g_vecClientfds
g_vecClientfds.clear();
}
}
注意上述代码中,由于要求 clientfd 是非阻塞的,设置 clientfd 为非阻塞的这段逻辑你可以放在 accept_thread_func 或 io_thread_func 中均可。
上述代码有点效率问题,某个时刻 accept_thread_func 往 g_vecClientfds 添加了一个 clientfd,但此时如果 io_thread_func 函数正阻塞在 epoll_wait 处,所以此时我们要唤醒 epoll_wait,我们已经在《one thread one loop 思想》中介绍了如何设计这个唤醒逻辑,这里就不再赘述了。
【文章福利】需要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等)
2. 结构二 listenfd 为阻塞模式,使用同一个 one thread one loop 结构去处理 listenfd 的事件
单独为 listenfd 分配一个线程毕竟是对资源的一种浪费,有读者可能说,listenfd 虽然设置成了阻塞模式,但我可以将 listenfd 挂载在到某个 loop 的 epollfd 上,当 epoll_wait 返回且 listenfd 上有读事件时调用 accept 函数时,此时 accept 就不会阻塞了。伪码如下:
void* io_thread_func(void* param)
{
//可以在这里做一些初始化工作
while (退出标志)
{
epoll_event epoll_events[1024];
//listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (listenfd上有事件)
{
//此时调用accept函数不会阻塞
int clientfd = accept(listenfd, ...);
//对clientfd作进一步处理
}
//其他一些操作
}
}
如上述代码所示,这种情况下确实可以将 listenfd 设置成阻塞模式,调用 accept 函数也不会造成流程阻塞。
但是,问题是这样的设计存在严重的效率问题:这种设计在每一轮循环中只能一次接受一个连接(每次循环仅调用了一次 accept),如果连接数较多,这种处理速度可能跟不上,所以要在一个循环里面处理 accept,但是实际情形是我们没法确定下一轮调用 accept 时 backlog 队列中是否还有新连接呀,如果没有,由于 listenfd 是阻塞模式的, accept 会阻塞。
3. 结构三 listenfd 为阻塞模式,使用同一个 one thread one loop 结构去处理 listenfd 的事件
当将 listenfd 设置成非阻塞模式,我们就不会存在这种窘境了。伪码如下:
void* io_thread_func(void* param)
{
//可以在这里做一些初始化工作
while (退出标志)
{
epoll_event epoll_events[1024];
//listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if (listenfd上有事件)
{
while (true)
{
//此时调用accept函数不会阻塞
int clientfd = accept(listenfd, ...);
if (clientfd == -1)
{
//错误码是EWOULDBLOCK说明此时已经没有新连接了
//可以退出内层的while循环了
if (errno == EWOULDBLOCK)
break;
//被信号中断重新调用一次accept即可
else if (errno == EINTR)
continue;
else
{
//其他情况认为出错
//做一次错误处理逻辑
}
} else {
//正常接受连接
//对clientfd作进一步处理
}//end inner-if
}//end inner-while-loop
}//end outer-if
//其他一些操作
}//end outer-while-loop
}
将 listenfd 设置成非阻塞模式还有一个好处时,我们可以自己定义一次 listenfd 读事件时最大接受多少连接数,这个逻辑也很容易实现,只需要将上述代码的内层 while 循环的判断条件从 true 改成特定的次数就可以:
1void* io_thread_func(void* param)
2{
3 //可以在这里做一些初始化工作
4
5 //每次处理的最大连接数目
6 const int MAX_ACCEPTS_PER_CALL = 200;
7 //当前数量
8 int currentAccept;
9
10 while (退出标志)
11 {
12 epoll_event epoll_events[1024];
13 //listenfd和clientfd都挂载到epollfd由epoll_wait统一检测读写事件
14 n = epoll_wait(epollfd, epoll_events, 1024, 1000);
15
16 if (listenfd上有事件)
17 {
18 currentAccept = 0;
19 while (currentAccept <= MAX_ACCEPTS_PER_CALL)
20 {
21 //此时调用accept函数不会阻塞
22 int clientfd = accept(listenfd, ...);
23 if (clientfd == -1)
24 {
25 //错误码是EWOULDBLOCK说明此时已经没有新连接了
26 //可以退出内层的while循环了
27 if (errno == EWOULDBLOCK)
28 break;
29 //被信号中断重新调用一次accept即可
30 else if (errno == EINTR)
31 continue;
32 else
33 {
34 //其他情况认为出错
35 //做一次错误处理逻辑
36 }
37 } else {
38 //累加处理数量
39 ++currentAccept;
40 //正常接受连接
41 //对clientfd作进一步处理
42 }//end inner-if
43 }//end inner-while-loop
44
45 }//end outer-if
46
47 //其他一些操作
48 }//end outer-while-loop
49}
这是一段比较常用的逻辑,我们以 redis-server 的源码中的使用为例:
//https://github.com/balloonwj/redis-6.0.3/blob/master/src/networking.c
//networking.c 971行
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
//MAX_ACCEPTS_PER_CALL在redis中是1000
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
//每次最大处理max个连接数目
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
//未达到每次处理新连接的最大数时已经无新连接待接收,直接while循环
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
原文地址:https://www.bianchengquan.com/article/563552.html
猜你喜欢
- 2024-10-25 什么是喉梗阻 反流性食管炎嗓子疼怎么办
- 2024-10-25 阻塞列队详解!让你轻松理解阻塞列队
- 2024-10-25 【健康科普】认识阻塞性睡眠呼吸暂停低通气综合征
- 2024-10-25 气象科普|阻塞高压为何方神圣?后期它将给我国制造多轮冷空气
- 2024-10-25 非阻塞算法CAS 非阻塞函数
- 2024-10-25 为什么网络 I/O 会被阻塞?I/O 到底是什么?
- 2024-10-25 如何用Java设计阻塞队列,再说说ArrayBlocking和LinkedBlocking
- 2024-10-25 使用 Python Socket 实现非阻塞 I/O入门讲解
- 2024-10-25 梗阻性无精子症:多种选择,何为最佳?
- 2024-10-25 焦耳小偷 一个神奇的电路 焦耳小偷电路需要注意的地方
你 发表评论:
欢迎- 最近发表
-
- 吴谨言专访大反转!痛批耍大牌后竟翻红,六公主七连发力显真诚
- 港股2月28日物业股涨幅榜:CHINAOVSPPT涨1.72%位居首位
- 港股2月28日物业股午盘:CHINAOVSPPT涨1.72%位居首位
- 港股3月2日物业股涨幅榜:CHINAOVSPPT涨1.03%位居首位
- 港股3月2日物业股午盘:CHINAOVSPPT涨1.03%
- 天赋与心痛的背后:邓鸣贺成长悲剧引发的深刻反思
- 冯小刚女儿徐朵追星范丞丞 同框合照曝光惹人羡,回应网友尽显亲民
- “资本大佬”王冉:51岁娶小17岁童瑶,并承诺余生为娇妻保驾护航
- 港股3月2日物业股午盘:CHINAOVSPPT涨1.03%位居首位
- 「IT之家开箱」vivo S15 图赏:双镜云窗,盛夏风光
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)