Files
fasthttp/lbclient.go
T
Moritz Poldrack d0f2727a4d get rid of some panics (#1526)
* client: simplify (*HostClient).do()

Remove an allocation in favour of deferring a call to release the
response.

* client: remove panic in dialAddr

Return an error instead of panicking if the user supplied a nonsensical
DialFunc.

* compression: remove panic on invalid compression level

If a compression level exceeding gzip's boundaries is provided, fasthttp
will panic. Instead it would be better to handle this error for them by
limiting it to the minimum or maximum value, depending on the direction
the user has exceeded the limits.

Clamp the value of gzip to always be between gzip.BestSpeed and
gzip.BestCompression.

* peripconn: remove panic on negative connection count

When a negative count is reached when unregistering a connection, a
panic is caused even though data-integrity is not at risk.

Replace the panic() with a simple clamp on the value to ensure the
value does not exceed it's expected lower bounds.

References: #1504

* compress: remove error on failed nonblocking writes

Since there is no way of handling or even logging non-critical errors in
stateless non-blocking writecalls, just drop them and hope the user
notices and tries again.

* workerPool: remove panic on redundant Start and Stop calls

Instead of panicking for invalid behaviour, it's preferable to just turn
the function into a noop.

* http: remove panic on invalid form boundary

* http: remove panic on negative reads

Since bufio already panics on negative reads, it is not necessary to do
so as well. If the length is zero and for some reason no error is
returned, readBodyIdentity and appendBodyFixedSize now errors in these
cases.

Link: https://github.com/golang/go/blob/851f6fd61425c810959c7ab51e6dc86f8a63c970/src/bufio/bufio.go#L246

* fs: remove panic on negative reader count

When a negative count is reached when unregistering a reader, a panic is
thrown even though data-integrity is not at risk.

Replace the panic() with a simple clamp on the value to ensure the
value does not exceed it's expected lower bounds.

* server: remove panic in favour of a segfault

Panicking with "BUG: " obscures the error. As the segfault causes a
panic anyway, just let the chaos unfold.

* server: remove panic in favour of returning an error

Writing on a timed-out response is not endangering data integrity and
just fails.

* chore: add comments to all panics

* chore: fix minor typo
2023-03-30 03:38:28 +02:00

204 lines
4.9 KiB
Go

package fasthttp
import (
"sync"
"sync/atomic"
"time"
)
// BalancingClient is the interface for clients, which may be passed
// to LBClient.Clients.
type BalancingClient interface {
DoDeadline(req *Request, resp *Response, deadline time.Time) error
PendingRequests() int
}
// LBClient balances requests among available LBClient.Clients.
//
// It has the following features:
//
// - Balances load among available clients using 'least loaded' + 'least total'
// hybrid technique.
// - Dynamically decreases load on unhealthy clients.
//
// It is forbidden copying LBClient instances. Create new instances instead.
//
// It is safe calling LBClient methods from concurrently running goroutines.
type LBClient struct {
noCopy noCopy
// Clients must contain non-zero clients list.
// Incoming requests are balanced among these clients.
Clients []BalancingClient
// HealthCheck is a callback called after each request.
//
// The request, response and the error returned by the client
// is passed to HealthCheck, so the callback may determine whether
// the client is healthy.
//
// Load on the current client is decreased if HealthCheck returns false.
//
// By default HealthCheck returns false if err != nil.
HealthCheck func(req *Request, resp *Response, err error) bool
// Timeout is the request timeout used when calling LBClient.Do.
//
// DefaultLBClientTimeout is used by default.
Timeout time.Duration
cs []*lbClient
once sync.Once
mu sync.RWMutex
}
// DefaultLBClientTimeout is the default request timeout used by LBClient
// when calling LBClient.Do.
//
// The timeout may be overridden via LBClient.Timeout.
const DefaultLBClientTimeout = time.Second
// DoDeadline calls DoDeadline on the least loaded client
func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
return cc.get().DoDeadline(req, resp, deadline)
}
// DoTimeout calculates deadline and calls DoDeadline on the least loaded client
func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
return cc.get().DoDeadline(req, resp, deadline)
}
// Do calculates timeout using LBClient.Timeout and calls DoTimeout
// on the least loaded client.
func (cc *LBClient) Do(req *Request, resp *Response) error {
timeout := cc.Timeout
if timeout <= 0 {
timeout = DefaultLBClientTimeout
}
return cc.DoTimeout(req, resp, timeout)
}
func (cc *LBClient) init() {
cc.mu.Lock()
defer cc.mu.Unlock()
if len(cc.Clients) == 0 {
// developer sanity-check
panic("BUG: LBClient.Clients cannot be empty")
}
for _, c := range cc.Clients {
cc.cs = append(cc.cs, &lbClient{
c: c,
healthCheck: cc.HealthCheck,
})
}
}
// AddClient adds a new client to the balanced clients
// returns the new total number of clients
func (cc *LBClient) AddClient(c BalancingClient) int {
cc.mu.Lock()
cc.cs = append(cc.cs, &lbClient{
c: c,
healthCheck: cc.HealthCheck,
})
cc.mu.Unlock()
return len(cc.cs)
}
// RemoveClients removes clients using the provided callback
// if rc returns true, the passed client will be removed
// returns the new total number of clients
func (cc *LBClient) RemoveClients(rc func(BalancingClient) bool) int {
cc.mu.Lock()
n := 0
for idx, cs := range cc.cs {
cc.cs[idx] = nil
if rc(cs.c) {
continue
}
cc.cs[n] = cs
n++
}
cc.cs = cc.cs[:n]
cc.mu.Unlock()
return len(cc.cs)
}
func (cc *LBClient) get() *lbClient {
cc.once.Do(cc.init)
cc.mu.RLock()
cs := cc.cs
minC := cs[0]
minN := minC.PendingRequests()
minT := atomic.LoadUint64(&minC.total)
for _, c := range cs[1:] {
n := c.PendingRequests()
t := atomic.LoadUint64(&c.total)
if n < minN || (n == minN && t < minT) {
minC = c
minN = n
minT = t
}
}
cc.mu.RUnlock()
return minC
}
type lbClient struct {
c BalancingClient
healthCheck func(req *Request, resp *Response, err error) bool
penalty uint32
// total amount of requests handled.
total uint64
}
func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
err := c.c.DoDeadline(req, resp, deadline)
if !c.isHealthy(req, resp, err) && c.incPenalty() {
// Penalize the client returning error, so the next requests
// are routed to another clients.
time.AfterFunc(penaltyDuration, c.decPenalty)
} else {
atomic.AddUint64(&c.total, 1)
}
return err
}
func (c *lbClient) PendingRequests() int {
n := c.c.PendingRequests()
m := atomic.LoadUint32(&c.penalty)
return n + int(m)
}
func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
if c.healthCheck == nil {
return err == nil
}
return c.healthCheck(req, resp, err)
}
func (c *lbClient) incPenalty() bool {
m := atomic.AddUint32(&c.penalty, 1)
if m > maxPenalty {
c.decPenalty()
return false
}
return true
}
func (c *lbClient) decPenalty() {
atomic.AddUint32(&c.penalty, ^uint32(0))
}
const (
maxPenalty = 300
penaltyDuration = 3 * time.Second
)