From 1dbe0d3c7d8af5f45740a32c1e24d7b820655f18 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 23 Jun 2016 20:58:48 +0300 Subject: [PATCH] Drop buffers with sizes greater than 95% percentile --- pool.go | 93 ++++++++++++++++++++++++++++++++-------------------- pool_test.go | 6 +++- 2 files changed, 63 insertions(+), 36 deletions(-) diff --git a/pool.go b/pool.go index e54d8e8..54962a8 100644 --- a/pool.go +++ b/pool.go @@ -1,62 +1,61 @@ package bytebufferpool import ( + "log" "sort" "sync" "sync/atomic" ) const ( - minBitSize = 8 + minBitSize = 6 steps = 20 minSize = 1 << minBitSize maxSize = 1 << (minBitSize + steps - 1) calibrateCallsThreshold = 42000 + maxPercentile = 0.95 ) type byteBufferPool struct { - idxs [steps]uint64 calls [steps]uint64 calibrating uint64 - // Pools are segemented into power-of-two sized buffers - // from minSize bytes to maxSize. - // - // This allows reducing fragmentation of ByteBuffer objects. - pools [steps]sync.Pool + defaultSize uint64 + maxSize uint64 + + pool sync.Pool } func (p *byteBufferPool) Acquire() *ByteBuffer { - for i := 0; i < steps; i++ { - idx := atomic.LoadUint64(&p.idxs[i]) - v := p.pools[idx].Get() - if v != nil { - return v.(*ByteBuffer) - } + v := p.pool.Get() + if v != nil { + return v.(*ByteBuffer) } - return &ByteBuffer{ - B: make([]byte, 0, minSize), + B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)), } } func (p *byteBufferPool) Release(b *ByteBuffer) { - bCap := cap(b.B) - if bCap > maxSize { - // Oversized buffer. - // Drop it. - return + bSize := len(b.B) + idx := bitSize(bSize-1) - minBitSize + if idx < 0 { + idx = 0 + } else if idx >= steps { + idx = steps - 1 } - idx := bitSize(bCap-1) >> minBitSize - b.B = b.B[:0] - p.pools[idx].Put(b) - if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold { p.calibrate() } + + maxSize := int(atomic.LoadUint64(&p.maxSize)) + if maxSize > 0 && bSize <= maxSize { + b.B = b.B[:0] + p.pool.Put(b) + } } func (p *byteBufferPool) calibrate() { @@ -64,38 +63,62 @@ func (p *byteBufferPool) calibrate() { return } - var a callidxs + log.Printf("calls=%d", p.calls) + + a := make(callSizes, 0, steps) + var callsSum uint64 for i := uint64(0); i < steps; i++ { - a = append(a, callidx{ - calls: atomic.SwapUint64(&p.calls[i], 0), - idx: i, + calls := atomic.SwapUint64(&p.calls[i], 0) + callsSum += calls + a = append(a, callSize{ + calls: calls, + size: minSize << i, }) } sort.Sort(a) + log.Printf("a=%#v", a) + + defaultSize := a[0].size + maxSize := defaultSize + + maxSum := uint64(float64(callsSum) * maxPercentile) + callsSum = 0 for i := 0; i < steps; i++ { - atomic.StoreUint64(&p.idxs[i], a[i].idx) + if callsSum > maxSum { + break + } + callsSum += a[i].calls + size := a[i].size + if size > maxSize { + maxSize = size + } } + log.Printf("maxSize=%d, defaultSize=%d\n", maxSize, defaultSize) + + atomic.StoreUint64(&p.defaultSize, defaultSize) + atomic.StoreUint64(&p.maxSize, maxSize) + atomic.StoreUint64(&p.calibrating, 0) } -type callidx struct { +type callSize struct { calls uint64 - idx uint64 + size uint64 } -type callidxs []callidx +type callSizes []callSize -func (ci callidxs) Len() int { +func (ci callSizes) Len() int { return len(ci) } -func (ci callidxs) Less(i, j int) bool { +func (ci callSizes) Less(i, j int) bool { return ci[i].calls > ci[j].calls } -func (ci callidxs) Swap(i, j int) { +func (ci callSizes) Swap(i, j int) { ci[i], ci[j] = ci[j], ci[i] } diff --git a/pool_test.go b/pool_test.go index bf8da10..a61c576 100644 --- a/pool_test.go +++ b/pool_test.go @@ -8,7 +8,11 @@ import ( func TestPoolCalibrate(t *testing.T) { for i := 0; i < steps*calibrateCallsThreshold; i++ { - testAcquireRelease(t, rand.Intn(10000)) + n := 1004 + if i%15 == 0 { + n = rand.Intn(15234) + } + testAcquireRelease(t, n) } }