计算机系统应用教程网站

网站首页 > 技术文章 正文

nginx负载均衡-普通hash和一致性hash负载均衡实现

btikc 2024-09-11 01:57:36 技术文章 21 ℃ 0 评论

哈希负载均衡原理
??ngx_http_upstream_hash_module支持普通的hash及一致性hash两种负载均衡算法,默认的是普通的hash来进行负载均衡。
??nginx 普通的hash算法支持配置http变量值作为hash值计算的key,通过hash计算得出的hash值和总权重的余数作为挑选server的依据;nginx的一致性hash(chash)算法则要复杂一些。这里会对一致性hash的机制原理作详细的说明。

一致性hash算法的原理

?? 一致性hash用于对hash算法的改进,后端服务器在配置的server的数量发生变化后,同一个upstream server接收到的请求会的数量和server数量变化之间会有变化。尤其是在负载均衡配置的upstream server数量发生增长后,造成产生的请求可能会在后端的upstream server中并不均匀,有的upstream server负载很低,有的upstream server负载较高,这样的负载均衡的效果比较差,可能对upstream server造成不良的影响。由此,产生了一致性hash算法来均衡。
?? 那么为什么一致性hash算法能改善这种情况呢?这里引用网上资料的一致性hash算法的图例,并以图例作为说明。


??上图表明了产生的hash值与server之间的关系,首先计算出虚拟节点的hash值 这里的hash值在配置初始化的时候就已经是计算完成了的。这里的key值就是上图中的对象,机器就是配置的upstream server节点。 这样在计算配置key值生成的hash之后,通过此hash值映射的位置开始顺时针查找,将upstream server定位到找到的第一个upstream server节点上。
??一致性hash与普通的hash的方式在于 在配置的upstream server的数量发生变化之后,普通的hash计算的方法会对所有访问造成影响,从而使得upstream server对于请求的负载可能会不均匀;而一致性hash通过上述的处理,upstream server的数量变化只会影响计算出key值hash与其”最近”的预分配的虚拟节,这里起到的作用是将配置upstream server数量变化造成的整体hash计算的影响收敛到了局部。这样,就算配置的upstream server的数量发生了变化,对负载整体而言,请求发送到upstream server的分布还是均匀的。

一致性hash point点结构(虚拟节点 这里是单个虚拟节点的描述)

typedef struct {
    uint32_t                            hash;   //hash值
    ngx_str_t                          *server; //server名
} ngx_http_upstream_chash_point_t;

一致性hash虚拟节点集合描述

typedef struct {  
    ngx_uint_t                          number;
    ngx_http_upstream_chash_point_t     point[1];
} ngx_http_upstream_chash_points_t1234

upstream hash的server配置结构

typedef struct {
    ngx_http_complex_value_t            key;      //生成hash的key值变量
    ngx_http_upstream_chash_points_t   *points;   //一致性hash的集合点结构
} ngx_http_upstream_hash_srv_conf_t;

实际upstream连接发生时的数据的结构

typedef struct {
    /* the round robin data must be first */
    ngx_http_upstream_rr_peer_data_t    rrp;             //round-robin轮询数据
    ngx_http_upstream_hash_srv_conf_t  *conf;            //server的配置
    ngx_str_t                           key;             //hash的key值
    ngx_uint_t                          tries;           //尝试连接到upstream server的次数
    ngx_uint_t                          rehash;          //rehash 重新hash的值会在挑选upstream server的时候进行更新
    uint32_t                            hash;            //crc32计算得出的hash值     
    ngx_event_get_peer_pt               get_rr_peer;     //轮询获取upstream server的函数 在round-robin中实现(ngx_http_upstream_get_round_robin_peer)
} ngx_http_upstream_hash_peer_data_t

需要C/C++ Linux服务器架构师学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

以下是普通hash方式的处理
1.配置upstream server hash

static ngx_int_t
ngx_http_upstream_init_hash(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
{
    if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
         //轮询的upstream server配置初始化设置
        return NGX_ERROR;
    }
    //设置在产生获取upstream server(连接动作之间) 的初始化函数
    us->peer.init = ngx_http_upstream_init_hash_peer; 

    return NGX_OK;
}

2.设置获取upstream server(连接动作发生时 但是此时还没有产生实际的tcp连接 只是作为挑选server的初始化设置)

static ngx_int_t
ngx_http_upstream_init_hash_peer(ngx_http_request_t *r,
    ngx_http_upstream_srv_conf_t *us)
{
    ...
    //为hash_peer中发生获取upstream server动作的数据分配内存
    hp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_hash_peer_data_t));
    if (hp == NULL) {
        return NGX_ERROR;
    }

    r->upstream->peer.data = &hp->rrp; 

    if (ngx_http_upstream_init_round_robin_peer(r, us) != NGX_OK) {
        return NGX_ERROR;
    }
    //upstream server的获取设置为upstream hash模块配置的获取方式
    r->upstream->peer.get = ngx_http_upstream_get_hash_peer;

    hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module);
    //获得待使用的生成hash值的key值 (这里的key值是nginx配置文件中的变量)
    if (ngx_http_complex_value(r, &hcf->key, &hp->key) != NGX_OK) {
        return NGX_ERROR;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "upstream hash key:\"%V\"", &hp->key);
    //初始值的设置
    hp->conf = hcf;
    hp->tries = 0;
    hp->rehash = 0;
    hp->hash = 0;
    //这里设置了默认的挑选upstream server的方式(在hash方式找不到合适的upstream server时启用)
    hp->get_rr_peer = ngx_http_upstream_get_round_robin_peer;

    return NGX_OK;
}

3.nginx与upstream server的tcp连接之前获取需连接的upstream server;这里的是
? ngx_http_upstream_get_hash_peer

static ngx_int_t
ngx_http_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data)
{
    ngx_http_upstream_hash_peer_data_t  *hp = data;

    ...

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                   "get hash peer, try: %ui", pc->tries);

    ngx_http_upstream_rr_peers_wlock(hp->rrp.peers); //读写锁设置 在多进程竞争时同步

    if (hp->tries > 20 || hp->rrp.peers->single) { 
        //尝试次数超过了20此或者只有一个upstream server 判定无必要进行hash的方式获取upstream server
        ngx_http_upstream_rr_peers_unlock(hp->rrp.peers);
        return hp->get_rr_peer(pc, &hp->rrp);
    }

    now = ngx_time();

    pc->cached = 0;
    pc->connection = NULL;

    for ( ;; ) {

        /*
         * Hash expression is compatible with Cache::Memcached:
         * ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
         * with REHASH omitted at the first iteration.
         */

        ngx_crc32_init(hash);

        if (hp->rehash > 0) {
            //rehash的值大于0意味着至少经过了一次hash计算 
            //这时要用rehash的值来更新hash的计算 否则hash的值是不会改变的
            size = ngx_sprintf(buf, "%ui", hp->rehash) - buf;
            ngx_crc32_update(&hash, buf, size);
        }
        //得到了hash值
        ngx_crc32_update(&hash, hp->key.data, hp->key.len);
        ngx_crc32_final(hash);

        hash = (hash >> 16) & 0x7fff; //hash值是32位的数字 这里用高位的16位作为hash的值

        hp->hash += hash; //这里的hash值会被累计
        hp->rehash++; //表示经过了一次hash计算

        w = hp->hash % hp->rrp.peers->total_weight;//防止生成的hash值超过了total_weight值的大小
        peer = hp->rrp.peers->peer;
        p = 0;

        while (w >= peer->weight) { //这里是计算得到的权重值超过了配置的权重值
            //这里会调整w的值 直到挑选出合适的peer (这里的peer 实质就是upstream server)
            w -= peer->weight;
            peer = peer->next;
            p++;
        }

        n = p / (8 * sizeof(uintptr_t));
        m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));

        if (hp->rrp.tried[n] & m) { //已经用过了
            goto next;
        }

        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                       "get hash peer, value:%uD, peer:%ui", hp->hash, p);

        if (peer->down) { //upstream server down标记
            goto next;
        }

        if (peer->max_fails
            && peer->fails >= peer->max_fails
            && now - peer->checked <= peer->fail_timeout) 
            //判定upstream server不可用
        {
            goto next;
        }

        if (peer->max_conns && peer->conns >= peer->max_conns) {
        //超过了配置的upstream server的最大连接数
            goto next;
        }

        break;

    next:

        if (++hp->tries > 20) { //超过了20次 会启用轮询的方式获取upstream server
            ngx_http_upstream_rr_peers_unlock(hp->rrp.peers);
            return hp->get_rr_peer(pc, &hp->rrp);
        }
    }
    //这里是找到了合适的upstream server 并且把这个upstream server设置为当前使用的
    hp->rrp.current = peer;
   //socket 及server name 设置
    pc->sockaddr = peer->sockaddr;
    pc->socklen = peer->socklen;
    pc->name = &peer->name;

    peer->conns++; //由于使用了 因此增加一次挑选出的upstream server的连接数

    if (now - peer->checked > peer->fail_timeout) { //由于当前的upstream server能使用 更新下checked的时间
        peer->checked = now;
    }

    ngx_http_upstream_rr_peers_unlock(hp->rrp.peers); //此时释放upstream servers集合的锁

    hp->rrp.tried[n] |= m; //尝试过的标记更新

    return NGX_OK;
}

普通hash的方式获取upstream server到此。

以下是一致性hash(consistency hash)负载均衡的处理
1.初始化一致性hash upstream server的配置初始化

static ngx_int_t
ngx_http_upstream_init_chash(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us)
{
    ...
    union {
        uint32_t                        value;
        u_char                          byte[4];
    } prev_hash;

    if (ngx_http_upstream_init_round_robin(cf, us) != NGX_OK) {
        //与普通hash方式一样 设置round-robin配置 这里是一些nginx配置文件中设置的配置
        return NGX_ERROR;
    }
    //upstream server连接动作发生前的一致性hash的upstream server的初始化
    us->peer.init = ngx_http_upstream_init_chash_peer;

    peers = us->peer.data;
    //设置虚拟节点的数量,这里用的值是upstream server总权重的160倍
    npoints = peers->total_weight * 160;

    //计算所需分配空间的大小
    size = sizeof(ngx_http_upstream_chash_points_t)
           + sizeof(ngx_http_upstream_chash_point_t) * (npoints - 1);

    points = ngx_palloc(cf->pool, size); //这里为虚拟节点分配内存
    if (points == NULL) {
        return NGX_ERROR;
    }

    points->number = 0; //虚拟节点的数量初始化为0

    for (peer = peers->peer; peer; peer = peer->next) {
        server = &peer->server;

        /*
         * Hash expression is compatible with Cache::Memcached::Fast:
         * crc32(HOST \0 PORT PREV_HASH).
         */
        //这里配置的server scheme是unix
        if (server->len >= 5
            && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0) 
        {
            host = server->data + 5;
            host_len = server->len - 5;
            port = NULL;
            port_len = 0;
            goto done;
        }
        //这里的server配置的是IP
        for (j = 0; j < server->len; j++) {
            c = server->data[server->len - j - 1];

            if (c == ':') {
                host = server->data;
                host_len = server->len - j - 1;
                port = server->data + server->len - j;
                port_len = j;
                goto done;
            }

            if (c < '0' || c > '9') {
                break;
            }
        }
//普通的域名server
        host = server->data;
        host_len = server->len;
        port = NULL;
        port_len = 0;

    done:

        ngx_crc32_init(base_hash);
        ngx_crc32_update(&base_hash, host, host_len);
        ngx_crc32_update(&base_hash, (u_char *) "", 1);
        ngx_crc32_update(&base_hash, port, port_len);

        prev_hash.value = 0;
        npoints = peer->weight * 160; //这里为单个upstream server设置虚拟节点的数量

        for (j = 0; j < npoints; j++) {  
           //通过prev_hash的处理 使得计算出的每个虚拟节点的hash值都会不同
            hash = base_hash;

            ngx_crc32_update(&hash, prev_hash.byte, 4);
            ngx_crc32_final(hash);

            points->point[points->number].hash = hash;
            points->point[points->number].server = server;
            points->number++;

#if (NGX_HAVE_LITTLE_ENDIAN)
            prev_hash.value = hash;
#else
            prev_hash.byte[0] = (u_char) (hash & 0xff);
            prev_hash.byte[1] = (u_char) ((hash >> 8) & 0xff);
            prev_hash.byte[2] = (u_char) ((hash >> 16) & 0xff);
            prev_hash.byte[3] = (u_char) ((hash >> 24) & 0xff);
#endif
        }
    }
    //使用快速排序来使得虚拟节点的hash是由小到大顺序分布的
    ngx_qsort(points->point,
              points->number,
              sizeof(ngx_http_upstream_chash_point_t),
              ngx_http_upstream_chash_cmp_points);

    for (i = 0, j = 1; j < points->number; j++) {
        /*
        这里为O(n)复杂度的计算 目的是对虚拟节点的hash进行去重 保证虚拟节点集合中的节点的hash值都是不同的
        */
        if (points->point[i].hash != points->point[j].hash) {
            points->point[++i] = points->point[j];
        }
    }
    //此时来更新虚拟节点的有效数量
    points->number = i + 1;

    hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module);
    //将得到的虚拟节点集合设置upstream server的配置信息中
    hcf->points = points;

    return NGX_OK;
}

2.设置获取upstream server(连接动作发生时 但是此时还没有产生实际的tcp连接 只是作为挑选server的初始化设置)

static ngx_int_t
ngx_http_upstream_init_chash_peer(ngx_http_request_t *r,
    ngx_http_upstream_srv_conf_t *us)
{
    ...
    if (ngx_http_upstream_init_hash_peer(r, us) != NGX_OK) {
    //
        return NGX_ERROR;
    }
    //设置upstream server 取得的方式为一致性hash的方式获取
    r->upstream->peer.get = ngx_http_upstream_get_chash_peer;

    hp = r->upstream->peer.data;
    hcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_hash_module);
    //通过设置的key变量计算得到hash值
    hash = ngx_crc32_long(hp->key.data, hp->key.len);
    //设置upstream servers集合的读锁
    ngx_http_upstream_rr_peers_rlock(hp->rrp.peers);
    //通过计算得到的hash值从配置的虚拟节点中找到虚拟节点的hash值
    hp->hash = ngx_http_upstream_find_chash_point(hcf->points, hash);
    //释放upstream servers集合的写锁
    ngx_http_upstream_rr_peers_unlock(hp->rrp.peers);

    return NGX_OK;
}

3.nginx与upstream server的tcp连接之前获取需连接的upstream server

static ngx_int_t
ngx_http_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data)
{
    ngx_http_upstream_hash_peer_data_t  *hp = data;
    ...
    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                   "get consistent hash peer, try: %ui", pc->tries);
    //设置upstream servers的写锁
    ngx_http_upstream_rr_peers_wlock(hp->rrp.peers);

    if (hp->tries > 20 || hp->rrp.peers->single) {
        /*
        这里的判断跟普通hash一样 超过20次重试次数或者只有一个upstream server
        直接释放锁 然后用upstream默认的轮询方式来得到upstream server
        */
        ngx_http_upstream_rr_peers_unlock(hp->rrp.peers);
        return hp->get_rr_peer(pc, &hp->rrp);
    }

    //连接的cached标记为假
    pc->cached = 0;
    pc->connection = NULL;
    //设置开始的时间
    now = ngx_time();
    //设置配置
    hcf = hp->conf;
    //得到配置的虚拟节点集合
    points = hcf->points; 
    //取得虚拟节点的第一个节点
    point = &points->point[0];

    for ( ;; ) {
          /*
          这里的hp->hash的值已经被ngx_http_upstream_init_chash_peer
          计算得出 计算一致性hash计算得到的upstream server
          剩下的工作就是对这个upstream server进行判断了
          */
        server = point[hp->hash % points->number].server;

        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                       "consistent hash peer:%uD, server:\"%V\"",
                       hp->hash, server);

        best = NULL;
        best_i = 0;
        total = 0;

        for (peer = hp->rrp.peers->peer, i = 0;
             peer;
             peer = peer->next, i++)
        {
            n = i / (8 * sizeof(uintptr_t));
            m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));

            if (hp->rrp.tried[n] & m) {
            //已经被用过了 直接跳过
                continue;
            }

            if (peer->down) {
            //标记为down下线的 直接跳过
                continue;
            }

            if (peer->max_fails
                && peer->fails >= peer->max_fails
                && now - peer->checked <= peer->fail_timeout)
            { //达到了失败判定的条件 直接跳过
                continue;
            }

            if (peer->max_conns && peer->conns >= peer->max_conns) {
            //超过了配置的最大连接数 直接跳过
                continue;
            }

            if (peer->server.len != server->len
                || ngx_strncmp(peer->server.data, server->data, server->len)
                   != 0)
            {  //不是一致性hash计算得到的server 直接跳过
                continue;
            }
            //upstream server的权重值调整
            peer->current_weight += peer->effective_weight;
            total += peer->effective_weight;

            if (peer->effective_weight < peer->weight) {
                peer->effective_weight++;
            }

            if (best == NULL || peer->current_weight > best->current_weight) {
               //  找到了最合适的upstream server 
                best = peer;
                best_i = i;
            }
        }

        if (best) {
            //确认找到了upstream server 调整best server的当前权重值
            best->current_weight -= total;
            goto found;
        }

        hp->hash++; //增加hash的次数
        hp->tries++; //尝试次数增加

        if (hp->tries > 20) { 
            /*
            尝试次数超过了20次,释放对upstream servers集合的锁
            并且调用upstream默认的方法 此处是轮询来得到upstream 
            server
            */
            ngx_http_upstream_rr_peers_unlock(hp->rrp.peers);
            return hp->get_rr_peer(pc, &hp->rrp);
        }
    }

found:
   //找到了符合条件的upstream server,将其设置为当前使用的
    hp->rrp.current = best;
   //设置套接字信息及server名称
    pc->sockaddr = best->sockaddr;
    pc->socklen = best->socklen;
    pc->name = &best->name;
   //增加一次找到的upstream server的连接数
    best->conns++;

    if (now - best->checked > best->fail_timeout) {
        //更新检查的时间
        best->checked = now;
    }
    //释放锁
    ngx_http_upstream_rr_peers_unlock(hp->rrp.peers);

    n = best_i / (8 * sizeof(uintptr_t));
    m = (uintptr_t) 1 << best_i % (8 * sizeof(uintptr_t));
    //标记已经使用过
    hp->rrp.tried[n] |= m;

    return NGX_OK;
}

一致性hash查找和key值计算得出的hash值最接近的虚拟节点

static ngx_uint_t
ngx_http_upstream_find_chash_point(ngx_http_upstream_chash_points_t *points,
    uint32_t hash)
{
    ...
    /* find first point >= hash */

    point = &points->point[0];

    i = 0;
    j = points->number;

    while (i < j) {  //这里查询用的是二分查找的方式 因为points在初始化的时候就已经排过序了 因此是有序的  
        k = (i + j) / 2;

        if (hash > point[k].hash) {
            i = k + 1;

        } else if (hash < point[k].hash) {
            j = k;

        } else {
            return k;
        }
    }

    return i;
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表