最近,团队的小伙伴反映,我们这边一个短连接服务在一台普通的服务器上吞吐量受到限制,所以把服务迁移到高性能机器上,虽然硬件是数倍的提升但压测发现吞吐量并没有预期的效果。

结合后台服务本身的特点初步原因分析:

  1. 从下往上看:服务属于计算IO密集型,性能瓶颈多在于计算请求,但高配机压测过程中,受到单实例模块之间通讯采用串行调用的特点,虽然单点请求计算性能有很大提速,但总体并行上不去,CPU利用率低

  2. 从上往下看: 吞吐量受服务器的接受能力影响很大,由于短连接接入层目前只有一个实例,无论部署在中配或是高配,除非是多实例模式或者类似nginx这种多worker工作模型,一般情况下,单实例accept的效果有限,高并发时容易成为瓶颈

  3. 从服务进程的角度看,单个web api的请求accept队列(backlog)是有限制的,如果多实例部署也许能补短。

分析到这里,很多人都想到可以通过扩容+分布式通讯的方式来弥补短板。是的,方法是摆在面前,但是你想到一个方法不难,难的是你要如何去验证你的想法。毕竟对于一个成熟的产品技术框架,不是随便都能重构的,一定要数据说话。

不过如何调优不是本文的目的,本文的目的是如何使用Go来快速实现一个反向代理服务来验证前面的背景想法。

设计

一个反向代理层,无论是四层还是七层,我觉得实现上主要需要具备以下工作:

  • 负载均衡算法
  • 请求可传递
  • endpoints可权重配置
  • endpoints故障处理
    关于使用Go写负载均衡算法,之前在 《关于Round-Robin》这文章提及过,这里不延伸。

以http为例,go如何快速实现反向代理?

查看go的文档,发现源码net/http/httputil提供了一个叫 ReverseProxy:https://godoc.org/net/http/httputil#ReverseProxy 的玩意,这个就是golang自带反向代理功能,而且使用很简单

ReverseProxy提供了ServerHTTP方法,这意味着我们可以跟普通http handler一样简单地使用它来处理请求

ReverseProxy 暴露了NewSingleHostReverseProxy的方法

// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
        targetQuery := target.RawQuery
        director := func(req *http.Request) {
                req.URL.Scheme = target.Scheme
                req.URL.Host = target.Host
                req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
                if targetQuery == "" || req.URL.RawQuery == "" {
                        req.URL.RawQuery = targetQuery + req.URL.RawQuery
                } else {
                        req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
                }
        }
        return &ReverseProxy{Director: director}
}

这样,我们可以通过一行代码就基本上实现了主体的反向代理功能了,如下:

httputil.NewSingleHostReverseProxy(address)

实现

结合Round-Robin,我们尝试实现我们的反向代理层
带权重的负载均衡实现 round-robin.go

package roundrobin

// RR: 基于 权重round robin算法的接口
type RR interface {
    Next() interface{}
    Add(node interface{}, weight int)
    RemoveAll()
    Reset()
}

const (
    RR_NGINX = 0 //Nginx算法
    RR_LVS   = 1 //LVS算法
)

//算法实现工厂类
func NewWeightedRR(rtype int) RR {
    if rtype == RR_NGINX {
        return &WNGINX{}
    } else if rtype == RR_LVS {
        return &WLVS{}
    }
    return nil
}

//节点结构
type WeightNginx struct {
    Node            interface{}
    Weight          int
    CurrentWeight   int
    EffectiveWeight int
}

func (ww *WeightNginx) fail() {
    ww.EffectiveWeight -= ww.Weight
    if ww.EffectiveWeight < 0 {
        ww.EffectiveWeight = 0
    }
}

//nginx算法实现类
type WNGINX struct {
    nodes []*WeightNginx
    n     int
}

//增加权重节点
func (w *WNGINX) Add(node interface{}, weight int) {
    weighted := &WeightNginx{
        Node:            node,
        Weight:          weight,
        EffectiveWeight: weight}
    w.nodes = append(w.nodes, weighted)
    w.n++
}

func (w *WNGINX) RemoveAll() {
    w.nodes = w.nodes[:0]
    w.n = 0
}

//下次轮询事件
func (w *WNGINX) Next() interface{} {
    if w.n == 0 {
        return nil
    }
    if w.n == 1 {
        return w.nodes[0].Node
    }

    return nextWeightedNode(w.nodes).Node
}

func nextWeightedNode(nodes []*WeightNginx) (best *WeightNginx) {
    total := 0

    for i := 0; i < len(nodes); i++ {
        w := nodes[i]

        if w == nil {
            continue
        }

        w.CurrentWeight += w.EffectiveWeight
        total += w.EffectiveWeight
        if w.EffectiveWeight < w.Weight {
            w.EffectiveWeight++
        }

        if best == nil || w.CurrentWeight > best.CurrentWeight {
            best = w
        }
    }

    if best == nil {
        return nil
    }
    best.CurrentWeight -= total
    return best
}

func (w *WNGINX) Reset() {
    for _, s := range w.nodes {
        s.EffectiveWeight = s.Weight
        s.CurrentWeight = 0
    }
}

//节点结构
type WeightLvs struct {
    Node   interface{}
    Weight int
}

//lvs算法实现类
type WLVS struct {
    nodes []*WeightLvs
    n     int
    gcd   int //通用的权重因子
    maxW  int //最大权重
    i     int //被选择的次数
    cw    int //当前的权重值
}

//下次轮询事件
func (w *WLVS) Next() interface{} {
    if w.n == 0 {
        return nil
    }

    if w.n == 1 {
        return w.nodes[0].Node
    }

    for {
        w.i = (w.i + 1) % w.n
        if w.i == 0 {
            w.cw = w.cw - w.gcd
            if w.cw <= 0 {
                w.cw = w.maxW
                if w.cw == 0 {
                    return nil
                }
            }
        }
        if w.nodes[w.i].Weight >= w.cw {
            return w.nodes[w.i].Node
        }
    }
}

//增加权重节点
func (w *WLVS) Add(node interface{}, weight int) {
    weighted := &WeightLvs{Node: node, Weight: weight}
    if weight > 0 {
        if w.gcd == 0 {
            w.gcd = weight
            w.maxW = weight
            w.i = -1
            w.cw = 0
        } else {
            w.gcd = gcd(w.gcd, weight)
            if w.maxW < weight {
                w.maxW = weight
            }
        }
    }
    w.nodes = append(w.nodes, weighted)
    w.n++
}

func gcd(x, y int) int {
    var t int
    for {
        t = (x % y)
        if t > 0 {
            x = y
            y = t
        } else {
            return y
        }
    }
}
func (w *WLVS) RemoveAll() {
    w.nodes = w.nodes[:0]
    w.n = 0
    w.gcd = 0
    w.maxW = 0
    w.i = -1
    w.cw = 0
}
func (w *WLVS) Reset() {
    w.i = -1
    w.cw = 0
}

主体部分 main.go

var RR = rr.NewWeightedRR(rr.RR_NGINX)

type handle struct {
    addrs []string
}

func (this *handle) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    addr := RR.Next().(string)
    remote, err := url.Parse("http://" + addr)
    if err != nil {
        panic(err)
    }
    proxy := httputil.NewSingleHostReverseProxy(remote)
    proxy.ServeHTTP(w, r)
}

func startServer() {
    //被代理的服务器host和port
    h := &handle{}
    h.addrs = []string{"172.17.0.2:28080", "172.17.0.3:28080"}

    w := 1
    for _, e := range h.addrs {
        RR.Add(e, w)
        w++
    }
    err := http.ListenAndServe(":28080", h)
    if err != nil {
        log.Fatalln("ListenAndServe: ", err)
    }
}

func main() {
    startServer()
}

在ReverseProxy中的ServeHTTP方法实现了这个具体的过程,主要是对源http包头进行重新封装,而后发送到后端服务器。

这样,我们一个简单快速的反向代理层就实现了,日常可以基于它自定义负载我们的服务。
转载自:https://lihaoquan.me/2018/4/24/go-reverse-proxy.html