Drop buffers with sizes greater than 95% percentile

This commit is contained in:
Aliaksandr Valialkin
2016-06-23 20:58:48 +03:00
parent c0dd811f50
commit 1dbe0d3c7d
2 changed files with 63 additions and 36 deletions
+58 -35
View File
@@ -1,62 +1,61 @@
package bytebufferpool
import (
"log"
"sort"
"sync"
"sync/atomic"
)
const (
minBitSize = 8
minBitSize = 6
steps = 20
minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
calibrateCallsThreshold = 42000
maxPercentile = 0.95
)
type byteBufferPool struct {
idxs [steps]uint64
calls [steps]uint64
calibrating uint64
// Pools are segemented into power-of-two sized buffers
// from minSize bytes to maxSize.
//
// This allows reducing fragmentation of ByteBuffer objects.
pools [steps]sync.Pool
defaultSize uint64
maxSize uint64
pool sync.Pool
}
func (p *byteBufferPool) Acquire() *ByteBuffer {
for i := 0; i < steps; i++ {
idx := atomic.LoadUint64(&p.idxs[i])
v := p.pools[idx].Get()
if v != nil {
return v.(*ByteBuffer)
}
v := p.pool.Get()
if v != nil {
return v.(*ByteBuffer)
}
return &ByteBuffer{
B: make([]byte, 0, minSize),
B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
}
}
func (p *byteBufferPool) Release(b *ByteBuffer) {
bCap := cap(b.B)
if bCap > maxSize {
// Oversized buffer.
// Drop it.
return
bSize := len(b.B)
idx := bitSize(bSize-1) - minBitSize
if idx < 0 {
idx = 0
} else if idx >= steps {
idx = steps - 1
}
idx := bitSize(bCap-1) >> minBitSize
b.B = b.B[:0]
p.pools[idx].Put(b)
if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize > 0 && bSize <= maxSize {
b.B = b.B[:0]
p.pool.Put(b)
}
}
func (p *byteBufferPool) calibrate() {
@@ -64,38 +63,62 @@ func (p *byteBufferPool) calibrate() {
return
}
var a callidxs
log.Printf("calls=%d", p.calls)
a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
a = append(a, callidx{
calls: atomic.SwapUint64(&p.calls[i], 0),
idx: i,
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
})
}
sort.Sort(a)
log.Printf("a=%#v", a)
defaultSize := a[0].size
maxSize := defaultSize
maxSum := uint64(float64(callsSum) * maxPercentile)
callsSum = 0
for i := 0; i < steps; i++ {
atomic.StoreUint64(&p.idxs[i], a[i].idx)
if callsSum > maxSum {
break
}
callsSum += a[i].calls
size := a[i].size
if size > maxSize {
maxSize = size
}
}
log.Printf("maxSize=%d, defaultSize=%d\n", maxSize, defaultSize)
atomic.StoreUint64(&p.defaultSize, defaultSize)
atomic.StoreUint64(&p.maxSize, maxSize)
atomic.StoreUint64(&p.calibrating, 0)
}
type callidx struct {
type callSize struct {
calls uint64
idx uint64
size uint64
}
type callidxs []callidx
type callSizes []callSize
func (ci callidxs) Len() int {
func (ci callSizes) Len() int {
return len(ci)
}
func (ci callidxs) Less(i, j int) bool {
func (ci callSizes) Less(i, j int) bool {
return ci[i].calls > ci[j].calls
}
func (ci callidxs) Swap(i, j int) {
func (ci callSizes) Swap(i, j int) {
ci[i], ci[j] = ci[j], ci[i]
}
+5 -1
View File
@@ -8,7 +8,11 @@ import (
func TestPoolCalibrate(t *testing.T) {
for i := 0; i < steps*calibrateCallsThreshold; i++ {
testAcquireRelease(t, rand.Intn(10000))
n := 1004
if i%15 == 0 {
n = rand.Intn(15234)
}
testAcquireRelease(t, n)
}
}