commit 21bf76e6cc9f464f706688bb649b388764135f4e Author: Aliaksandr Valialkin Date: Wed Jun 22 19:36:29 2016 +0300 Initial implementation diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..6a6ec2e --- /dev/null +++ b/.travis.yml @@ -0,0 +1,15 @@ +language: go + +go: + - 1.6 + +script: + # build test for supported platforms + - GOOS=linux go build + - GOOS=darwin go build + - GOOS=freebsd go build + - GOOS=windows go build + - GOARCH=386 go build + + # run tests on a standard platform + - go test -v ./... diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f7c935c --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2016 Aliaksandr Valialkin, VertaMedia + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..c04fb0e --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +[![Build Status](https://travis-ci.org/valyala/bytebufferpool.svg)](https://travis-ci.org/valyala/bytebufferpool) +[![GoDoc](https://godoc.org/github.com/valyala/bytebufferpool?status.svg)](http://godoc.org/github.com/valyala/bytebufferpool) +[![Go Report](http://goreportcard.com/badge/valyala/bytebufferpool)](http://goreportcard.com/report/valyala/bytebufferpool) + +# bytebufferpool + +An implementation of a pool of byte buffers with anti-fragmentation protection. + +The pool may waste limited amount of memory due to fragmentation. +This amount equals to the maximum total size of the byte buffers +in concurrent use. diff --git a/bytebuffer.go b/bytebuffer.go new file mode 100644 index 0000000..7790e43 --- /dev/null +++ b/bytebuffer.go @@ -0,0 +1,61 @@ +package bytebufferpool + +// ByteBuffer provides byte buffer, which can be used for minimizing +// memory allocations. +// +// ByteBuffer may be used with functions appending data to the given []byte +// slice. See example code for details. +// +// Use AcquireByteBuffer for obtaining an empty byte buffer. +type ByteBuffer struct { + + // B is a byte buffer to use in append-like workloads. + // See example code for details. + B []byte +} + +// Write implements io.Writer - it appends p to ByteBuffer.B +func (b *ByteBuffer) Write(p []byte) (int, error) { + b.B = append(b.B, p...) + return len(p), nil +} + +// WriteString appends s to ByteBuffer.B +func (b *ByteBuffer) WriteString(s string) (int, error) { + b.B = append(b.B, s...) + return len(s), nil +} + +// Set sets ByteBuffer.B to p +func (b *ByteBuffer) Set(p []byte) { + b.B = append(b.B[:0], p...) +} + +// SetString sets ByteBuffer.B to s +func (b *ByteBuffer) SetString(s string) { + b.B = append(b.B[:0], s...) +} + +// Reset makes ByteBuffer.B empty. +func (b *ByteBuffer) Reset() { + b.B = b.B[:0] +} + +// AcquireByteBuffer returns an empty byte buffer from the pool. +// +// Acquired byte buffer may be returned to the pool via ReleaseByteBuffer call. +// This reduces the number of memory allocations required for byte buffer +// management. +func AcquireByteBuffer() *ByteBuffer { + return defaultByteBufferPool.Acquire() +} + +// ReleaseByteBuffer returns byte buffer to the pool. +// +// ByteBuffer.B mustn't be touched after returning it to the pool. +// Otherwise data races will occur. +func ReleaseByteBuffer(b *ByteBuffer) { + defaultByteBufferPool.Release(b) +} + +var defaultByteBufferPool byteBufferPool diff --git a/bytebuffer_example_test.go b/bytebuffer_example_test.go new file mode 100644 index 0000000..907fac2 --- /dev/null +++ b/bytebuffer_example_test.go @@ -0,0 +1,21 @@ +package bytebufferpool_test + +import ( + "fmt" + + "github.com/valyala/bytebufferpool" +) + +func ExampleByteBuffer() { + bb := bytebufferpool.AcquireByteBuffer() + + bb.WriteString("first line\n") + bb.Write([]byte("second line\n")) + bb.B = append(bb.B, "third line\n"...) + + fmt.Printf("bytebuffer contents=%q", bb.B) + + // It is safe to release byte buffer now, since it is + // no longer used. + bytebufferpool.ReleaseByteBuffer(bb) +} diff --git a/bytebuffer_test.go b/bytebuffer_test.go new file mode 100644 index 0000000..c159c1b --- /dev/null +++ b/bytebuffer_test.go @@ -0,0 +1,43 @@ +package bytebufferpool + +import ( + "fmt" + "testing" + "time" +) + +func TestByteBufferAcquireReleaseSerial(t *testing.T) { + testByteBufferAcquireRelease(t) +} + +func TestByteBufferAcquireReleaseConcurrent(t *testing.T) { + concurrency := 10 + ch := make(chan struct{}, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + testByteBufferAcquireRelease(t) + ch <- struct{}{} + }() + } + + for i := 0; i < concurrency; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timeout!") + } + } +} + +func testByteBufferAcquireRelease(t *testing.T) { + for i := 0; i < 10; i++ { + expectedS := fmt.Sprintf("num %d", i) + b := AcquireByteBuffer() + b.B = append(b.B, "num "...) + b.B = append(b.B, fmt.Sprintf("%d", i)...) + if string(b.B) != expectedS { + t.Fatalf("unexpected result: %q. Expecting %q", b.B, expectedS) + } + ReleaseByteBuffer(b) + } +} diff --git a/bytebuffer_timing_test.go b/bytebuffer_timing_test.go new file mode 100644 index 0000000..29f92de --- /dev/null +++ b/bytebuffer_timing_test.go @@ -0,0 +1,32 @@ +package bytebufferpool + +import ( + "bytes" + "testing" +) + +func BenchmarkByteBufferWrite(b *testing.B) { + s := []byte("foobarbaz") + b.RunParallel(func(pb *testing.PB) { + var buf ByteBuffer + for pb.Next() { + for i := 0; i < 100; i++ { + buf.Write(s) + } + buf.Reset() + } + }) +} + +func BenchmarkBytesBufferWrite(b *testing.B) { + s := []byte("foobarbaz") + b.RunParallel(func(pb *testing.PB) { + var buf bytes.Buffer + for pb.Next() { + for i := 0; i < 100; i++ { + buf.Write(s) + } + buf.Reset() + } + }) +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..e511b7c --- /dev/null +++ b/doc.go @@ -0,0 +1,7 @@ +// Package bytebufferpool implements a pool of byte buffers +// with anti-fragmentation protection. +// +// The pool may waste limited amount of memory due to fragmentation. +// This amount equals to the maximum total size of the byte buffers +// in concurrent use. +package bytebufferpool diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..6163417 --- /dev/null +++ b/pool.go @@ -0,0 +1,53 @@ +package bytebufferpool + +import "sync" + +const ( + minBitSize = 8 + steps = 20 + + minSize = 1 << minBitSize + maxSize = 1 << (minBitSize + steps - 1) +) + +type byteBufferPool struct { + // Pools are segemented into power-of-two sized buffers + // from minSize bytes to maxSize. + // + // This allows reducing fragmentation of ByteBuffer objects. + pools [steps]sync.Pool +} + +func (p *byteBufferPool) Acquire() *ByteBuffer { + pools := &p.pools + for i := 0; i < steps; i++ { + v := pools[i].Get() + if v != nil { + return v.(*ByteBuffer) + } + } + + return &ByteBuffer{ + B: make([]byte, 0, minSize), + } +} + +func (p *byteBufferPool) Release(b *ByteBuffer) { + n := cap(b.B) + if n > maxSize { + // Just drop oversized buffers. + return + } + b.B = b.B[:0] + idx := bitsize(n-1) >> minBitSize + p.pools[idx].Put(b) +} + +func bitsize(n int) int { + s := 0 + for n > 0 { + n >>= 1 + s++ + } + return s +} diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..dbb02e0 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,59 @@ +package bytebufferpool + +import ( + "testing" + "time" +) + +func TestPoolVariousSizesSerial(t *testing.T) { + testPoolVariousSizes(t) +} + +func TestPoolVariousSizesConcurrent(t *testing.T) { + concurrency := 5 + ch := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go func() { + testPoolVariousSizes(t) + ch <- struct{}{} + }() + } + for i := 0; i < concurrency; i++ { + select { + case <-ch: + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } + } +} + +func testPoolVariousSizes(t *testing.T) { + for i := 0; i < steps+1; i++ { + n := (1 << uint32(i)) + + testAcquireRelease(t, n) + testAcquireRelease(t, n+1) + testAcquireRelease(t, n-1) + + for j := 0; j < 10; j++ { + testAcquireRelease(t, j+n) + } + } +} + +func testAcquireRelease(t *testing.T, n int) { + bb := AcquireByteBuffer() + if len(bb.B) > 0 { + t.Fatalf("non-empty byte buffer returned from acquire") + } + bb.B = allocNBytes(bb.B, n) + ReleaseByteBuffer(bb) +} + +func allocNBytes(dst []byte, n int) []byte { + diff := n - cap(dst) + if diff <= 0 { + return dst[:n] + } + return append(dst, make([]byte, diff)...) +}