计算机系统应用教程网站

网站首页 > 技术文章 正文

Go BIO/NIO探讨(5):net库的非阻塞支持

btikc 2024-10-25 10:50:56 技术文章 6 ℃ 0 评论

在涉及到Accept/Read/Write之类的操作时,Go net库默认使用了非阻塞的方式去实现,这样提高了性能,但给编程增加了额外的复杂度。本文我们将探讨下net库中如何用非阻塞的方式实现读写。为了兼顾底层细节(I/O概念)、同时了解上层如何应用(net库实现),分为五个部分:

  1. I/O模型概述:了解缓冲区的概念
  2. 阻塞vs非阻塞的区别
  3. net.Listener & net.Conn
  4. net.TCPListener & net.TCPConn
  5. net.conn上的非阻塞读写

I/O模型概述

在Linux下我们可以使用5种I/O模型:

  • Blocking I/O (阻塞式I/O)
  • NonBlocking I/O (非阻塞式 I/O)
  • I/O Multiplexing(I/O多路复用): select、poll、epoll
  • Signal Driven I/O (信号驱动式I/O)
  • AIO (posix异步I/O)

为了方便理解这五种模式的区别,首先需要理解进程和tcp/ip网络栈的交互模式。从套接字read数据或向套接字write数据时,应用进程与套接字的缓冲区进行交互。应用进程运行在用户态,套接字缓冲区在内核态,所以数据的拷贝是必不可少的。流程图如下:

我们把这个流程分为两个阶段:

  1. 第一阶段:read数据可用/write缓冲区可用之前,等待的过程
  2. 第二阶段:read数据可用/write缓冲区可用之后,数据拷贝的过程

基于这样的拆分方式,以read为例,上面的5种I/O模型工作方式如下:

Go net/http库使用了非阻塞I/O,无论是在监听套戒指上通过accept获取新的tcp连接,还是在已连接套接字上进行read/write,走的都是上图中第二种模式。我们看一下细节。

阻塞vs非阻塞

对于阻塞/非阻塞的套接字, 执行accept系统调用时:

  1. 阻塞的sockfd: 调用方会一直被阻塞,直到有一个ESTABLISHED的tcp connection(已完成三次握手)
  2. 非阻塞的sockfd: 函数accept会返回 EAGAIN 或 EWOULDBLOCK 的错误

执行read/recvfrom/recvmsg系统调用时:

  1. 阻塞的sockfd: 调用方会一直被阻塞,直到数据到达套接字缓冲区或发生错误才返回
  2. 非阻塞的sockfd: 进程缓冲区没有数据时,函数会返回 EAGAIN 或 EWOULDBLOCK

执行write/sendto/sendmsg系统调用时:

  1. 阻塞的sockfd: 调用方一直被阻塞,直到数据全部写入套接字缓冲区
  2. 非阻塞的sockfd: 如果缓冲区空间不够,会返回EAGAIN或EWOULDBLOCK,如果被其他信号打断,则返回EINTER

Go net/http库走的是per goroutine per tcp connection模式,不过网络模式是NonBlocking I/O。在用户态校验accept/read/write系统调用的错误码,并通过waitRead/waitWrite操作(底层是runtime_pollWait)函数进行polling。

polling 翻译成中文是“轮询”, polling 有两种:

  1. busy wait(忙等待): 占用一个cpu,持续地查询一个I/O设备的状态
  2. interrupt-driven(中断驱动): polling的进程/线程进入休眠状态,等待中断信号的唤醒

显然,net库使用的是后者。在开始聊net库的polling之前,先回顾下net库里的常见的结构:net.Listener和net.Conn

net.Listener & net.Conn

回顾一下之前聊过的概念,HTTP是应用层协议,其下层是传输层传输层,传输层协议有TCP/UDP。Server端有两类套接字

一类是通过socket系统调用创建的监听套接字(LISTEN状态),只有一个。这个套接字默认状态是CLOSED,通过bind绑定IP:port,通过listen启动监听;
另一类是通过accept系统调用获取的已连接套接字(ESTABLISHED状态);

在Go net库中,net.Lister interface 定义了监听套接字上的行为;net.Conn定义了已连接套接字的行为。

net.Listener interface 的具体实现有 TCPListener和UnixListener,分别对应TCP/Unix Server的实现。通过调用Accept方法,可以获取新的net.Conn,对应的实现是TCPConn/UnixConn:

// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
  // Accept waits for and returns the next connection to the listener.
  Accept() (Conn, error)

  // Close closes the listener.
  // Any blocked Accept operations will be unblocked and return errors.
  Close() error

  // Addr returns the listener's network address.
  Addr() Addr
}

net.Conn interface定义了在一个已连接套接字上的行为,比如我们要聊的Read/Write。

// Conn is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Conn simultaneously.
type Conn interface {
  // Read reads data from the connection.
  // Read can be made to time out and return an error after a fixed
  // time limit; see SetDeadline and SetReadDeadline.
  Read(b []byte) (n int, err error)

  // Write writes data to the connection.
  // Write can be made to time out and return an error after a fixed
  // time limit; see SetDeadline and SetWriteDeadline.
  Write(b []byte) (n int, err error)

  // Close closes the connection.
  // Any blocked Read or Write operations will be unblocked and return errors.
  Close() error

  // LocalAddr returns the local network address, if known.
  LocalAddr() Addr

  // RemoteAddr returns the remote network address, if known.
  RemoteAddr() Addr

  // ... 省略部分代码
}

在具体实现中,net库定义了一个conn struct,作为TCPConn/UnixConn/IPConn/UDPConn的基础,嵌入到这些结构中。然而,这些XXXConn并不直接实现net.Conn Read/Write方法,而是都依赖 conn struct,自己只封装独有的操作。

type conn struct {
  fd *netFD
}

// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
  conn
}

// UnixConn is an implementation of the Conn interface for connections
// to Unix domain sockets.
type UnixConn struct {
  conn
}

为了更好理解这个过程,我们看看net.TCPListener和net.TCPConn的实现。

net.TCPListener & net.TCPConn

net.TCPListener有两个成员变量:

  • fd是对监听套接字的封装,socket系统调用的返回值是int值,net.netFD是面向对象方式的封装
  • lc是监听的配置,比如ipv6支持,keepalive配置

accept方法依赖net.netFD通过NonBlock的方式获取新的TCPConn,它是对已连接套接字的封装。代码如下:

// net/tcpsock_posix.go

// TCPListener is a TCP network listener. Clients should typically
// use variables of type Listener instead of assuming TCP.
type TCPListener struct {
  fd *netFD
  lc ListenConfig
}

func (ln *TCPListener) accept() (*TCPConn, error) {
  fd, err := ln.fd.accept()
  if err != nil {
    return nil, err
  }
  tc := newTCPConn(fd)
  if ln.lc.KeepAlive >= 0 {
    setKeepAlive(fd, true)
    ka := ln.lc.KeepAlive
    if ln.lc.KeepAlive == 0 {
      ka = defaultTCPKeepAlive
    }
    setKeepAlivePeriod(fd, ka)
  }
  return tc, nil
}

// net/tcpsock.go
func newTCPConn(fd *netFD) *TCPConn {
  c := &TCPConn{conn{fd}}
  setNoDelay(c.fd, true)
  return c
}

通过Accept得到的TCPConn实例,它的表面类型是net.Conn,底层类型是*TCPConn。在net/http库里,通过server.newConn将其封装成一个net/http.conn对象。tcp conn上的数据读写和HTTP协议的实现都被封装在net/http.conn struct里。

func (srv *Server) Serve(l net.Listener) error {
  for {
    // rw 表面类型是net.Conn, 底层类型是*net.TCPConn
    rw, err := l.Accept()
    // ...省略部分逻辑

    // c 的类型是 *http.conn
    c := srv.newConn(rw)
    c.setState(c.rwc, StateNew, runHooks) // before Serve can return

    // http处理连接上的数据
    go c.serve(connCtx)
  }

// Create new connection from rwc.
func (srv *Server) newConn(rwc net.Conn) *conn {
  c := &conn{
    server: srv,
    rwc:    rwc,
  }
  if debugServerConnections {
    c.rwc = newLoggingConn("server", c.rwc)
  }
  return c
}

从套接字读取数据,数据可用后,操作上是把数据从套接字缓冲区(内核态)拷贝到应用缓冲区(用户态)。http.conn struct的实现是:通过bufio.NewReader分配一个大小为4096的缓冲区c.bufr,后续应用层只需要读取自己的缓冲区。

// net/http.conn serve方法
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)

读取数据以后的处理涉及到HTTP协议和Web框架,我们这里进行深入探讨。Mozilla MDN网站上有对HTTP协议的详细解释,net/http库对这个标准进行了实现,有兴趣的话可以结合那个文档一起看。

net.conn上非阻塞读写

Go net/http库中,net.conn上的Read/Write是通过非阻塞的方式实现的。当数据可用时,就是读取缓冲区;数据不可用时,具体的方式就是polling。

polling操作会通过 gopark 函数休眠当前goroutine。runtime_pollWait函数的实现被链接到poll_runtime_pollWait函数。在linux系统下,它会循环调用netpollblock函数,以支持超时重试。涉及polling逻辑的代码调用是:

  1. net/net.go: conn struct, 封装了基于流的网络连接,更关心之上的数据读写;
  2. net/fd_posix.go: netFD struct, 对操作套接字socket的面向对象封装, 提供accept/connect/read/write/close等操作语义;
  3. internal/poll/fd_unix.go: FD struct, 对文件描述符的面向对象封装,它可以是socket或操作系统文件描述符;
  4. internal/poll/fd_poll_runtime.go: pollDesc struct, 提供polling的能力,方法有waitRead/waitWrite等;
  5. internal/poll/fd_poll_runtime.go: func runtime_pollWait 支持polling的底层方法。

由此我们也可以发现,polling不仅仅是对于网络文件描述符(套接字/socket),对于普通文件描述符也同样适用。

以Read为例子,函数调用链路的代码如下:

// 位置: net/net.go
type conn struct {
  fd *netFD
}
func (c *conn) Read(b []byte) (int, error) {
  if !c.ok() {
    return 0, syscall.EINVAL
  }
  n, err := c.fd.Read(b)
  if err != nil && err != io.EOF {
    err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
  }
  return n, err
}

// 位置: net/fd_posix.go
// Network file descriptor.
type netFD struct {
  pfd poll.FD

  // immutable until Close
  family      int
  sotype      int
  isConnected bool // handshake completed or use of association with peer
  net         string
  laddr       Addr
  raddr       Addr
}

func (fd *netFD) Read(p []byte) (n int, err error) {
  n, err = fd.pfd.Read(p)
  runtime.KeepAlive(fd)
  return n, wrapSyscallError(readSyscallName, err)
}

// 位置: internal/poll/fd_unix.go
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
  // Lock sysfd and serialize access to Read and Write methods.
  fdmu fdMutex
  // System file descriptor. Immutable until Close.
  Sysfd int
  // I/O poller.
  pd pollDesc

  // ...省略部分代码
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
  // ...省略部分代码
  // 从Sysfd缓冲区读取数据写入goroutine缓冲区p
  // 忽略中断信号 EINTER
  for {
    n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
    if err != nil {
      n = 0
      if err == syscall.EAGAIN && fd.pd.pollable() {
        if err = fd.pd.waitRead(fd.isFile); err == nil {
          continue
        }
      }
    }
    err = fd.eofError(n, err)
    return n, err
  }
}

// 位置: internal/poll/fd_poll_runtime.go
type pollDesc struct {
  runtimeCtx uintptr
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
  if pd.runtimeCtx == 0 {
    return errors.New("waiting for unsupported file type")
  }
  res := runtime_pollWait(pd.runtimeCtx, mode)
  return convertErr(res, isFile)
}

func (pd *pollDesc) waitRead(isFile bool) error {
  return pd.wait('r', isFile)
}

通过面向对象的封装,对代码逻辑进行了分层,每层的struct有自己的职能。I/O polling 涉及到 runtime 的职能范围,所以通过 go:linkname 链接到runtime库的函数。上面提到的internal/poll.runtime_pollWait的代码实现是 runtime.poll_runtime_pollWait,下面是具体的代码实现:

// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
  // ...省略部分代码

  for !netpollblock(pd, int32(mode), false) {
    errcode = netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
      return errcode
    }
    // Can happen if timeout has fired and unblocked us,
    // but before we had a chance to run, timeout has been reset.
    // Pretend it has not happened and retry.
  }
  return pollNoError
}

poll_runtime_pollWait 通过一个for循环反复调用 netpollblock 函数,直到其返回pollNoError。 在netpollblock 函数内部的工作流程如下:

  1. 获取当前goroutine的polling状态,可以是pdReady或pdWait
  2. 将其置为pdWait状态
  3. 通过gopark休眠当前goroutine
  4. (一段时间后) 当前goroutine被唤醒,其polling状态必然是pdReady,做最后的校验
  5. (polling状态是ready,进行后续的read/write)

关键流程的代码如下:

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
  // poller持有的goroutine
  // rg 是read goroutine
  // wg 是write goroutine
  gpp := &pd.rg
  if mode == 'w' {
    gpp = &pd.wg
  }

  // 把goroutine置为等待状态
  // set the gpp semaphore to pdWait
  for {
    // Consume notification if already ready.
    if gpp.CompareAndSwap(pdReady, 0) {
      return true
    }
    if gpp.CompareAndSwap(0, pdWait) {
      break
    }

    // Double check that this isn't corrupt; otherwise we'd loop
    // forever.
    if v := gpp.Load(); v != pdReady && v != 0 {
      throw("runtime: double wait")
    }
  }

  // need to recheck error states after setting gpp to pdWait
  // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
  // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
  if waitio || netpollcheckerr(pd, mode) == pollNoError {
    gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
  }
  // 当前goroutine被唤醒
  // be careful to not lose concurrent pdReady notification
  old := gpp.Swap(0)
  if old > pdWait {
    throw("runtime: corrupted polldesc")
  }
  return old == pdReady
}

在Go net/http库中,accept每成功获取一个新的套接字(等价于tcp conn),都会启动一个新的goroutine对这个套接字进行polling。现代的大型web服务通常支持非常高的qps,流量高峰到来时,会导致goroutine数量暴增,副作用主要有两方面:gc性能衰减和Goroutine的调度性能恶化。从软件层面,runtime设置了goroutine的上限是10000个(sched.maxmcount), linux能支持的文件描述符数量也有一个上限。具体到多少个goroutine服务器的性能会出现严重衰减,与业务逻辑有很大的相关性。

对于Java等其他语言,往往通过操作系统线程去处理一个tcp conn,线程上下文切换的代价比goroutine高很多。为了解决这类问题,有了I/O多路复用和异步I/O模型。

篇幅有限,下篇文章接着说。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表