mirror of
https://github.com/valyala/fasthttp.git
synced 2026-06-13 15:46:49 +03:00
76aa66015c
LBClient.get() took an RLock and then read cs[0] without checking for an empty slice. Once every client was removed via RemoveClients, cs is empty, so get() panicked on cs[0]; because the RUnlock was not deferred it was never reached, leaking the lock and deadlocking every later AddClient and RemoveClients call. Defer the unlock in get() and return nil for an empty client set, with DoDeadline/DoTimeout reporting the new ErrNoAvailableClients instead of panicking. AddClient and RemoveClients now also defer their unlock so a panic in the user-supplied RemoveClients callback can't leak the write lock. RemoveClients additionally niled cc.cs[idx] before invoking rc, so a panic in rc left cc.cs with nil holes that would later crash get(). Compact the slice first and nil the unused tail only afterwards, so a panic leaves the client set usable. Fixes #2270 Signed-off-by: Y.Horie <u5.horie@gmail.com>
229 lines
5.6 KiB
Go
229 lines
5.6 KiB
Go
package fasthttp
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// ErrNoAvailableClients is returned by LBClient methods when no clients are
|
|
// available, for example after every client has been removed.
|
|
var ErrNoAvailableClients = errors.New("no available clients")
|
|
|
|
// BalancingClient is the interface for clients, which may be passed
|
|
// to LBClient.Clients.
|
|
type BalancingClient interface {
|
|
DoDeadline(req *Request, resp *Response, deadline time.Time) error
|
|
PendingRequests() int
|
|
}
|
|
|
|
// LBClient balances requests among available LBClient.Clients.
|
|
//
|
|
// It has the following features:
|
|
//
|
|
// - Balances load among available clients using 'least loaded' + 'least total'
|
|
// hybrid technique.
|
|
// - Dynamically decreases load on unhealthy clients.
|
|
//
|
|
// It is forbidden copying LBClient instances. Create new instances instead.
|
|
//
|
|
// It is safe calling LBClient methods from concurrently running goroutines.
|
|
type LBClient struct {
|
|
noCopy noCopy
|
|
|
|
// HealthCheck is a callback called after each request.
|
|
//
|
|
// The request, response and the error returned by the client
|
|
// is passed to HealthCheck, so the callback may determine whether
|
|
// the client is healthy.
|
|
//
|
|
// Load on the current client is decreased if HealthCheck returns false.
|
|
//
|
|
// By default HealthCheck returns false if err != nil.
|
|
HealthCheck func(req *Request, resp *Response, err error) bool
|
|
|
|
// Clients must contain non-zero clients list.
|
|
// Incoming requests are balanced among these clients.
|
|
Clients []BalancingClient
|
|
|
|
cs []*lbClient
|
|
|
|
// Timeout is the request timeout used when calling LBClient.Do.
|
|
//
|
|
// DefaultLBClientTimeout is used by default.
|
|
Timeout time.Duration
|
|
|
|
mu sync.RWMutex
|
|
|
|
once sync.Once
|
|
}
|
|
|
|
// DefaultLBClientTimeout is the default request timeout used by LBClient
|
|
// when calling LBClient.Do.
|
|
//
|
|
// The timeout may be overridden via LBClient.Timeout.
|
|
const DefaultLBClientTimeout = time.Second
|
|
|
|
// DoDeadline calls DoDeadline on the least loaded client.
|
|
func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
|
c := cc.get()
|
|
if c == nil {
|
|
return ErrNoAvailableClients
|
|
}
|
|
return c.DoDeadline(req, resp, deadline)
|
|
}
|
|
|
|
// DoTimeout calculates deadline and calls DoDeadline on the least loaded client.
|
|
func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
|
|
deadline := time.Now().Add(timeout)
|
|
c := cc.get()
|
|
if c == nil {
|
|
return ErrNoAvailableClients
|
|
}
|
|
return c.DoDeadline(req, resp, deadline)
|
|
}
|
|
|
|
// Do calculates timeout using LBClient.Timeout and calls DoTimeout
|
|
// on the least loaded client.
|
|
func (cc *LBClient) Do(req *Request, resp *Response) error {
|
|
timeout := cc.Timeout
|
|
if timeout <= 0 {
|
|
timeout = DefaultLBClientTimeout
|
|
}
|
|
return cc.DoTimeout(req, resp, timeout)
|
|
}
|
|
|
|
func (cc *LBClient) init() {
|
|
cc.mu.Lock()
|
|
defer cc.mu.Unlock()
|
|
if len(cc.Clients) == 0 {
|
|
// developer sanity-check
|
|
panic("BUG: LBClient.Clients cannot be empty")
|
|
}
|
|
for _, c := range cc.Clients {
|
|
cc.cs = append(cc.cs, &lbClient{
|
|
c: c,
|
|
healthCheck: cc.HealthCheck,
|
|
})
|
|
}
|
|
}
|
|
|
|
// AddClient adds a new client to the balanced clients and
|
|
// returns the new total number of clients.
|
|
func (cc *LBClient) AddClient(c BalancingClient) int {
|
|
cc.mu.Lock()
|
|
defer cc.mu.Unlock()
|
|
cc.cs = append(cc.cs, &lbClient{
|
|
c: c,
|
|
healthCheck: cc.HealthCheck,
|
|
})
|
|
return len(cc.cs)
|
|
}
|
|
|
|
// RemoveClients removes clients using the provided callback.
|
|
// If rc returns true, the passed client will be removed.
|
|
// Returns the new total number of clients.
|
|
func (cc *LBClient) RemoveClients(rc func(BalancingClient) bool) int {
|
|
cc.mu.Lock()
|
|
// defer so a panic in the user-supplied rc can't leak the lock.
|
|
defer cc.mu.Unlock()
|
|
n := 0
|
|
for _, cs := range cc.cs {
|
|
if rc(cs.c) {
|
|
continue
|
|
}
|
|
cc.cs[n] = cs
|
|
n++
|
|
}
|
|
// Nil out the now-unused tail so removed clients can be garbage collected.
|
|
// This is done only after the loop so a panic in rc can't leave cc.cs with
|
|
// nil holes that would later crash get().
|
|
for i := n; i < len(cc.cs); i++ {
|
|
cc.cs[i] = nil
|
|
}
|
|
cc.cs = cc.cs[:n]
|
|
|
|
return len(cc.cs)
|
|
}
|
|
|
|
func (cc *LBClient) get() *lbClient {
|
|
cc.once.Do(cc.init)
|
|
|
|
cc.mu.RLock()
|
|
defer cc.mu.RUnlock()
|
|
|
|
cs := cc.cs
|
|
if len(cs) == 0 {
|
|
// No clients (e.g. all removed): avoid panicking on cs[0].
|
|
return nil
|
|
}
|
|
|
|
minC := cs[0]
|
|
minN := minC.PendingRequests()
|
|
minT := atomic.LoadUint64(&minC.total)
|
|
for _, c := range cs[1:] {
|
|
n := c.PendingRequests()
|
|
t := atomic.LoadUint64(&c.total)
|
|
if n < minN || (n == minN && t < minT) {
|
|
minC = c
|
|
minN = n
|
|
minT = t
|
|
}
|
|
}
|
|
return minC
|
|
}
|
|
|
|
type lbClient struct {
|
|
c BalancingClient
|
|
healthCheck func(req *Request, resp *Response, err error) bool
|
|
penalty uint32
|
|
|
|
// total amount of requests handled.
|
|
total uint64
|
|
}
|
|
|
|
func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
|
|
err := c.c.DoDeadline(req, resp, deadline)
|
|
if !c.isHealthy(req, resp, err) && c.incPenalty() {
|
|
// Penalize the client returning error, so the next requests
|
|
// are routed to another clients.
|
|
time.AfterFunc(penaltyDuration, c.decPenalty)
|
|
} else {
|
|
atomic.AddUint64(&c.total, 1)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *lbClient) PendingRequests() int {
|
|
n := c.c.PendingRequests()
|
|
m := atomic.LoadUint32(&c.penalty)
|
|
return n + int(m)
|
|
}
|
|
|
|
func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
|
|
if c.healthCheck == nil {
|
|
return err == nil
|
|
}
|
|
return c.healthCheck(req, resp, err)
|
|
}
|
|
|
|
func (c *lbClient) incPenalty() bool {
|
|
m := atomic.AddUint32(&c.penalty, 1)
|
|
if m > maxPenalty {
|
|
c.decPenalty()
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *lbClient) decPenalty() {
|
|
atomic.AddUint32(&c.penalty, ^uint32(0))
|
|
}
|
|
|
|
const (
|
|
maxPenalty = 300
|
|
|
|
penaltyDuration = 3 * time.Second
|
|
)
|