网站首页 > 技术文章 正文
在涉及到Accept/Read/Write之类的操作时,Go net库默认使用了非阻塞的方式去实现,这样提高了性能,但给编程增加了额外的复杂度。本文我们将探讨下net库中如何用非阻塞的方式实现读写。为了兼顾底层细节(I/O概念)、同时了解上层如何应用(net库实现),分为五个部分:
- I/O模型概述:了解缓冲区的概念
- 阻塞vs非阻塞的区别
- net.Listener & net.Conn
- net.TCPListener & net.TCPConn
- net.conn上的非阻塞读写
I/O模型概述
在Linux下我们可以使用5种I/O模型:
- Blocking I/O (阻塞式I/O)
- NonBlocking I/O (非阻塞式 I/O)
- I/O Multiplexing(I/O多路复用): select、poll、epoll
- Signal Driven I/O (信号驱动式I/O)
- AIO (posix异步I/O)
为了方便理解这五种模式的区别,首先需要理解进程和tcp/ip网络栈的交互模式。从套接字read数据或向套接字write数据时,应用进程与套接字的缓冲区进行交互。应用进程运行在用户态,套接字缓冲区在内核态,所以数据的拷贝是必不可少的。流程图如下:
我们把这个流程分为两个阶段:
- 第一阶段:read数据可用/write缓冲区可用之前,等待的过程
- 第二阶段:read数据可用/write缓冲区可用之后,数据拷贝的过程
基于这样的拆分方式,以read为例,上面的5种I/O模型工作方式如下:
Go net/http库使用了非阻塞I/O,无论是在监听套戒指上通过accept获取新的tcp连接,还是在已连接套接字上进行read/write,走的都是上图中第二种模式。我们看一下细节。
阻塞vs非阻塞
对于阻塞/非阻塞的套接字, 执行accept系统调用时:
- 阻塞的sockfd: 调用方会一直被阻塞,直到有一个ESTABLISHED的tcp connection(已完成三次握手)
- 非阻塞的sockfd: 函数accept会返回 EAGAIN 或 EWOULDBLOCK 的错误
执行read/recvfrom/recvmsg系统调用时:
- 阻塞的sockfd: 调用方会一直被阻塞,直到数据到达套接字缓冲区或发生错误才返回
- 非阻塞的sockfd: 进程缓冲区没有数据时,函数会返回 EAGAIN 或 EWOULDBLOCK
执行write/sendto/sendmsg系统调用时:
- 阻塞的sockfd: 调用方一直被阻塞,直到数据全部写入套接字缓冲区
- 非阻塞的sockfd: 如果缓冲区空间不够,会返回EAGAIN或EWOULDBLOCK,如果被其他信号打断,则返回EINTER
Go net/http库走的是per goroutine per tcp connection模式,不过网络模式是NonBlocking I/O。在用户态校验accept/read/write系统调用的错误码,并通过waitRead/waitWrite操作(底层是runtime_pollWait)函数进行polling。
polling 翻译成中文是“轮询”, polling 有两种:
- busy wait(忙等待): 占用一个cpu,持续地查询一个I/O设备的状态
- interrupt-driven(中断驱动): polling的进程/线程进入休眠状态,等待中断信号的唤醒
显然,net库使用的是后者。在开始聊net库的polling之前,先回顾下net库里的常见的结构:net.Listener和net.Conn
net.Listener & net.Conn
回顾一下之前聊过的概念,HTTP是应用层协议,其下层是传输层传输层,传输层协议有TCP/UDP。Server端有两类套接字
一类是通过socket系统调用创建的监听套接字(LISTEN状态),只有一个。这个套接字默认状态是CLOSED,通过bind绑定IP:port,通过listen启动监听;
另一类是通过accept系统调用获取的已连接套接字(ESTABLISHED状态);
在Go net库中,net.Lister interface 定义了监听套接字上的行为;net.Conn定义了已连接套接字的行为。
net.Listener interface 的具体实现有 TCPListener和UnixListener,分别对应TCP/Unix Server的实现。通过调用Accept方法,可以获取新的net.Conn,对应的实现是TCPConn/UnixConn:
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
// Addr returns the listener's network address.
Addr() Addr
}
net.Conn interface定义了在一个已连接套接字上的行为,比如我们要聊的Read/Write。
// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
Read(b []byte) (n int, err error)
// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(b []byte) (n int, err error)
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
Close() error
// LocalAddr returns the local network address, if known.
LocalAddr() Addr
// RemoteAddr returns the remote network address, if known.
RemoteAddr() Addr
// ... 省略部分代码
}
在具体实现中,net库定义了一个conn struct,作为TCPConn/UnixConn/IPConn/UDPConn的基础,嵌入到这些结构中。然而,这些XXXConn并不直接实现net.Conn Read/Write方法,而是都依赖 conn struct,自己只封装独有的操作。
type conn struct {
fd *netFD
}
// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}
// UnixConn is an implementation of the Conn interface for connections
// to Unix domain sockets.
type UnixConn struct {
conn
}
为了更好理解这个过程,我们看看net.TCPListener和net.TCPConn的实现。
net.TCPListener & net.TCPConn
net.TCPListener有两个成员变量:
- fd是对监听套接字的封装,socket系统调用的返回值是int值,net.netFD是面向对象方式的封装
- lc是监听的配置,比如ipv6支持,keepalive配置
accept方法依赖net.netFD通过NonBlock的方式获取新的TCPConn,它是对已连接套接字的封装。代码如下:
// net/tcpsock_posix.go
// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
fd *netFD
lc ListenConfig
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// net/tcpsock.go
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}
通过Accept得到的TCPConn实例,它的表面类型是net.Conn,底层类型是*TCPConn。在net/http库里,通过server.newConn将其封装成一个net/http.conn对象。tcp conn上的数据读写和HTTP协议的实现都被封装在net/http.conn struct里。
func (srv *Server) Serve(l net.Listener) error {
for {
// rw 表面类型是net.Conn, 底层类型是*net.TCPConn
rw, err := l.Accept()
// ...省略部分逻辑
// c 的类型是 *http.conn
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
// http处理连接上的数据
go c.serve(connCtx)
}
// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
if debugServerConnections {
c.rwc = newLoggingConn("server", c.rwc)
}
return c
}
从套接字读取数据,数据可用后,操作上是把数据从套接字缓冲区(内核态)拷贝到应用缓冲区(用户态)。http.conn struct的实现是:通过bufio.NewReader分配一个大小为4096的缓冲区c.bufr,后续应用层只需要读取自己的缓冲区。
// net/http.conn serve方法
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
读取数据以后的处理涉及到HTTP协议和Web框架,我们这里进行深入探讨。Mozilla MDN网站上有对HTTP协议的详细解释,net/http库对这个标准进行了实现,有兴趣的话可以结合那个文档一起看。
net.conn上非阻塞读写
Go net/http库中,net.conn上的Read/Write是通过非阻塞的方式实现的。当数据可用时,就是读取缓冲区;数据不可用时,具体的方式就是polling。
polling操作会通过 gopark 函数休眠当前goroutine。runtime_pollWait函数的实现被链接到poll_runtime_pollWait函数。在linux系统下,它会循环调用netpollblock函数,以支持超时重试。涉及polling逻辑的代码调用是:
- net/net.go: conn struct, 封装了基于流的网络连接,更关心之上的数据读写;
- net/fd_posix.go: netFD struct, 对操作套接字socket的面向对象封装, 提供accept/connect/read/write/close等操作语义;
- internal/poll/fd_unix.go: FD struct, 对文件描述符的面向对象封装,它可以是socket或操作系统文件描述符;
- internal/poll/fd_poll_runtime.go: pollDesc struct, 提供polling的能力,方法有waitRead/waitWrite等;
- internal/poll/fd_poll_runtime.go: func runtime_pollWait 支持polling的底层方法。
由此我们也可以发现,polling不仅仅是对于网络文件描述符(套接字/socket),对于普通文件描述符也同样适用。
以Read为例子,函数调用链路的代码如下:
// 位置: net/net.go
type conn struct {
fd *netFD
}
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// 位置: net/fd_posix.go
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
// 位置: internal/poll/fd_unix.go
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// ...省略部分代码
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...省略部分代码
// 从Sysfd缓冲区读取数据写入goroutine缓冲区p
// 忽略中断信号 EINTER
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
// 位置: internal/poll/fd_poll_runtime.go
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
通过面向对象的封装,对代码逻辑进行了分层,每层的struct有自己的职能。I/O polling 涉及到 runtime 的职能范围,所以通过 go:linkname 链接到runtime库的函数。上面提到的internal/poll.runtime_pollWait的代码实现是 runtime.poll_runtime_pollWait,下面是具体的代码实现:
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// ...省略部分代码
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
poll_runtime_pollWait 通过一个for循环反复调用 netpollblock 函数,直到其返回pollNoError。 在netpollblock 函数内部的工作流程如下:
- 获取当前goroutine的polling状态,可以是pdReady或pdWait
- 将其置为pdWait状态
- 通过gopark休眠当前goroutine
- (一段时间后) 当前goroutine被唤醒,其polling状态必然是pdReady,做最后的校验
- (polling状态是ready,进行后续的read/write)
关键流程的代码如下:
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// poller持有的goroutine
// rg 是read goroutine
// wg 是write goroutine
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 把goroutine置为等待状态
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 当前goroutine被唤醒
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
在Go net/http库中,accept每成功获取一个新的套接字(等价于tcp conn),都会启动一个新的goroutine对这个套接字进行polling。现代的大型web服务通常支持非常高的qps,流量高峰到来时,会导致goroutine数量暴增,副作用主要有两方面:gc性能衰减和Goroutine的调度性能恶化。从软件层面,runtime设置了goroutine的上限是10000个(sched.maxmcount), linux能支持的文件描述符数量也有一个上限。具体到多少个goroutine服务器的性能会出现严重衰减,与业务逻辑有很大的相关性。
对于Java等其他语言,往往通过操作系统线程去处理一个tcp conn,线程上下文切换的代价比goroutine高很多。为了解决这类问题,有了I/O多路复用和异步I/O模型。
篇幅有限,下篇文章接着说。
猜你喜欢
- 2024-10-25 什么是喉梗阻 反流性食管炎嗓子疼怎么办
- 2024-10-25 阻塞列队详解!让你轻松理解阻塞列队
- 2024-10-25 【健康科普】认识阻塞性睡眠呼吸暂停低通气综合征
- 2024-10-25 气象科普|阻塞高压为何方神圣?后期它将给我国制造多轮冷空气
- 2024-10-25 非阻塞算法CAS 非阻塞函数
- 2024-10-25 为什么网络 I/O 会被阻塞?I/O 到底是什么?
- 2024-10-25 如何用Java设计阻塞队列,再说说ArrayBlocking和LinkedBlocking
- 2024-10-25 使用 Python Socket 实现非阻塞 I/O入门讲解
- 2024-10-25 梗阻性无精子症:多种选择,何为最佳?
- 2024-10-25 焦耳小偷 一个神奇的电路 焦耳小偷电路需要注意的地方
你 发表评论:
欢迎- 最近发表
-
- 吴谨言专访大反转!痛批耍大牌后竟翻红,六公主七连发力显真诚
- 港股2月28日物业股涨幅榜:CHINAOVSPPT涨1.72%位居首位
- 港股2月28日物业股午盘:CHINAOVSPPT涨1.72%位居首位
- 港股3月2日物业股涨幅榜:CHINAOVSPPT涨1.03%位居首位
- 港股3月2日物业股午盘:CHINAOVSPPT涨1.03%
- 天赋与心痛的背后:邓鸣贺成长悲剧引发的深刻反思
- 冯小刚女儿徐朵追星范丞丞 同框合照曝光惹人羡,回应网友尽显亲民
- “资本大佬”王冉:51岁娶小17岁童瑶,并承诺余生为娇妻保驾护航
- 港股3月2日物业股午盘:CHINAOVSPPT涨1.03%位居首位
- 「IT之家开箱」vivo S15 图赏:双镜云窗,盛夏风光
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)