最近,团队的小伙伴反映,我们这边一个短连接服务在一台普通的服务器上吞吐量受到限制,所以把服务迁移到高性能机器上,虽然硬件是数倍的提升但压测发现吞吐量并没有预期的效果。
结合后台服务本身的特点初步原因分析:
-
从下往上看:服务属于计算IO密集型,性能瓶颈多在于计算请求,但高配机压测过程中,受到单实例模块之间通讯采用串行调用的特点,虽然单点请求计算性能有很大提速,但总体并行上不去,CPU利用率低
-
从上往下看: 吞吐量受服务器的接受能力影响很大,由于短连接接入层目前只有一个实例,无论部署在中配或是高配,除非是多实例模式或者类似nginx这种多worker工作模型,一般情况下,单实例accept的效果有限,高并发时容易成为瓶颈
-
从服务进程的角度看,单个web api的请求accept队列(backlog)是有限制的,如果多实例部署也许能补短。
分析到这里,很多人都想到可以通过扩容+分布式通讯的方式来弥补短板。是的,方法是摆在面前,但是你想到一个方法不难,难的是你要如何去验证你的想法。毕竟对于一个成熟的产品技术框架,不是随便都能重构的,一定要数据说话。
不过如何调优不是本文的目的,本文的目的是如何使用Go来快速实现一个反向代理服务来验证前面的背景想法。
设计
一个反向代理层,无论是四层还是七层,我觉得实现上主要需要具备以下工作:
- 负载均衡算法
- 请求可传递
- endpoints可权重配置
- endpoints故障处理
关于使用Go写负载均衡算法,之前在 《关于Round-Robin》这文章提及过,这里不延伸。
以http为例,go如何快速实现反向代理?
查看go的文档,发现源码net/http/httputil提供了一个叫 ReverseProxy:https://godoc.org/net/http/httputil#ReverseProxy 的玩意,这个就是golang自带反向代理功能,而且使用很简单
ReverseProxy提供了ServerHTTP方法,这意味着我们可以跟普通http handler一样简单地使用它来处理请求
ReverseProxy 暴露了NewSingleHostReverseProxy的方法
// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
targetQuery := target.RawQuery
director := func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
return &ReverseProxy{Director: director}
}
这样,我们可以通过一行代码就基本上实现了主体的反向代理功能了,如下:
httputil.NewSingleHostReverseProxy(address)
实现
结合Round-Robin,我们尝试实现我们的反向代理层
带权重的负载均衡实现 round-robin.go
package roundrobin
// RR: 基于 权重round robin算法的接口
type RR interface {
Next() interface{}
Add(node interface{}, weight int)
RemoveAll()
Reset()
}
const (
RR_NGINX = 0 //Nginx算法
RR_LVS = 1 //LVS算法
)
//算法实现工厂类
func NewWeightedRR(rtype int) RR {
if rtype == RR_NGINX {
return &WNGINX{}
} else if rtype == RR_LVS {
return &WLVS{}
}
return nil
}
//节点结构
type WeightNginx struct {
Node interface{}
Weight int
CurrentWeight int
EffectiveWeight int
}
func (ww *WeightNginx) fail() {
ww.EffectiveWeight -= ww.Weight
if ww.EffectiveWeight < 0 {
ww.EffectiveWeight = 0
}
}
//nginx算法实现类
type WNGINX struct {
nodes []*WeightNginx
n int
}
//增加权重节点
func (w *WNGINX) Add(node interface{}, weight int) {
weighted := &WeightNginx{
Node: node,
Weight: weight,
EffectiveWeight: weight}
w.nodes = append(w.nodes, weighted)
w.n++
}
func (w *WNGINX) RemoveAll() {
w.nodes = w.nodes[:0]
w.n = 0
}
//下次轮询事件
func (w *WNGINX) Next() interface{} {
if w.n == 0 {
return nil
}
if w.n == 1 {
return w.nodes[0].Node
}
return nextWeightedNode(w.nodes).Node
}
func nextWeightedNode(nodes []*WeightNginx) (best *WeightNginx) {
total := 0
for i := 0; i < len(nodes); i++ {
w := nodes[i]
if w == nil {
continue
}
w.CurrentWeight += w.EffectiveWeight
total += w.EffectiveWeight
if w.EffectiveWeight < w.Weight {
w.EffectiveWeight++
}
if best == nil || w.CurrentWeight > best.CurrentWeight {
best = w
}
}
if best == nil {
return nil
}
best.CurrentWeight -= total
return best
}
func (w *WNGINX) Reset() {
for _, s := range w.nodes {
s.EffectiveWeight = s.Weight
s.CurrentWeight = 0
}
}
//节点结构
type WeightLvs struct {
Node interface{}
Weight int
}
//lvs算法实现类
type WLVS struct {
nodes []*WeightLvs
n int
gcd int //通用的权重因子
maxW int //最大权重
i int //被选择的次数
cw int //当前的权重值
}
//下次轮询事件
func (w *WLVS) Next() interface{} {
if w.n == 0 {
return nil
}
if w.n == 1 {
return w.nodes[0].Node
}
for {
w.i = (w.i + 1) % w.n
if w.i == 0 {
w.cw = w.cw - w.gcd
if w.cw <= 0 {
w.cw = w.maxW
if w.cw == 0 {
return nil
}
}
}
if w.nodes[w.i].Weight >= w.cw {
return w.nodes[w.i].Node
}
}
}
//增加权重节点
func (w *WLVS) Add(node interface{}, weight int) {
weighted := &WeightLvs{Node: node, Weight: weight}
if weight > 0 {
if w.gcd == 0 {
w.gcd = weight
w.maxW = weight
w.i = -1
w.cw = 0
} else {
w.gcd = gcd(w.gcd, weight)
if w.maxW < weight {
w.maxW = weight
}
}
}
w.nodes = append(w.nodes, weighted)
w.n++
}
func gcd(x, y int) int {
var t int
for {
t = (x % y)
if t > 0 {
x = y
y = t
} else {
return y
}
}
}
func (w *WLVS) RemoveAll() {
w.nodes = w.nodes[:0]
w.n = 0
w.gcd = 0
w.maxW = 0
w.i = -1
w.cw = 0
}
func (w *WLVS) Reset() {
w.i = -1
w.cw = 0
}
主体部分 main.go
var RR = rr.NewWeightedRR(rr.RR_NGINX)
type handle struct {
addrs []string
}
func (this *handle) ServeHTTP(w http.ResponseWriter, r *http.Request) {
addr := RR.Next().(string)
remote, err := url.Parse("http://" + addr)
if err != nil {
panic(err)
}
proxy := httputil.NewSingleHostReverseProxy(remote)
proxy.ServeHTTP(w, r)
}
func startServer() {
//被代理的服务器host和port
h := &handle{}
h.addrs = []string{"172.17.0.2:28080", "172.17.0.3:28080"}
w := 1
for _, e := range h.addrs {
RR.Add(e, w)
w++
}
err := http.ListenAndServe(":28080", h)
if err != nil {
log.Fatalln("ListenAndServe: ", err)
}
}
func main() {
startServer()
}
在ReverseProxy中的ServeHTTP方法实现了这个具体的过程,主要是对源http包头进行重新封装,而后发送到后端服务器。
这样,我们一个简单快速的反向代理层就实现了,日常可以基于它自定义负载我们的服务。
转载自:https://lihaoquan.me/2018/4/24/go-reverse-proxy.html