Reimplement flushing support for fasthttpadaptor (#2081)

Use a simpler implementation, and do more tests.
Instead of https://github.com/valyala/fasthttp/pull/2069
This commit is contained in:
Erik Dubbelboer
2025-10-06 09:22:13 +08:00
committed by GitHub
parent a17ec74999
commit 2272d532e1
2 changed files with 478 additions and 289 deletions
+192 -276
View File
@@ -4,10 +4,12 @@ package fasthttpadaptor
import (
"bufio"
"fmt"
"io"
"net"
"net/http"
"sync"
"sync/atomic"
"github.com/valyala/fasthttp"
)
@@ -57,25 +59,36 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
return
}
w := acquireNetHTTPResponseWriter(ctx)
// Concurrently serve the net/http handler.
w := acquireWriter(ctx)
// Serve the net/http handler concurrently so we can react to Flush/Hijack.
go func() {
defer func() {
if rec := recover(); rec != nil {
ctx.Logger().Printf("panic in net/http handler: %v", rec)
select {
case w.modeCh <- modePanicked:
default:
}
} else {
// Signal completion if no other mode was selected yet.
select {
case w.modeCh <- modeDone:
default:
}
}
_ = w.Close()
}()
h.ServeHTTP(w, r.WithContext(ctx))
select {
case w.modeCh <- modeDone:
default:
}
_ = w.Close()
}()
mode := <-w.modeCh
switch mode {
// Decide mode by first event.
switch <-w.modeCh {
case modeDone:
// No flush occurred before the handler returned.
// Send the data as one chunk.
ctx.SetStatusCode(w.StatusCode())
// Buffered, no Flush() nor Hijack().
ctx.SetStatusCode(w.status())
haveContentType := false
for k, vv := range w.Header() {
if k == fasthttp.HeaderContentType {
@@ -91,352 +104,255 @@ func NewFastHTTPHandler(h http.Handler) fasthttp.RequestHandler {
// From net/http.ResponseWriter.Write:
// If the Header does not contain a Content-Type line, Write adds a Content-Type set
// to the result of passing the initial 512 bytes of written data to DetectContentType.
l := 512
b := *w.responseBody
if len(b) < 512 {
l = len(b)
l := min(len(w.responseBody), 512)
if l > 0 {
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(w.responseBody[:l]))
}
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
}
w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
ctx.Response.SetBody(*w.responseBody)
if len(w.responseBody) > 0 {
ctx.Response.SetBody(w.responseBody)
}
w.responseMutex.Unlock()
// Release after sending response.
releaseNetHTTPResponseWriter(w)
releaseWriter(w)
case modeFlushed:
// Flush occurred before handler returned.
// Send the first 512 bytes and start streaming
// the rest of the first chunk and new data as it arrives.
ctx.SetStatusCode(w.StatusCode())
// Streaming: send headers and start SetBodyStreamWriter.
ctx.SetStatusCode(w.status())
haveContentType := false
for k, vv := range w.Header() {
// Don't copy Content-Length header when
// streaming.
// No Content-Length when streaming.
if k == fasthttp.HeaderContentLength {
continue
}
if k == fasthttp.HeaderContentType {
haveContentType = true
}
for _, v := range vv {
ctx.Response.Header.Add(k, v)
}
}
// Lock the current response body until
// it is sent in the StreamWriter function.
w.responseMutex.Lock()
if !haveContentType {
// From net/http.ResponseWriter.Write:
// If the Header does not contain a Content-Type line, Write adds a Content-Type set
// to the result of passing the initial 512 bytes of written data to DetectContentType.
l := 512
b := *w.responseBody
if len(b) < 512 {
l = len(b)
w.mu.Lock()
if len(w.responseBody) > 0 {
l := min(len(w.responseBody), 512)
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(w.responseBody[:l]))
}
ctx.Response.Header.Set(fasthttp.HeaderContentType, http.DetectContentType(b[:l]))
w.mu.Unlock()
}
// Start streaming mode on return.
ctx.SetBodyStreamWriter(func(bw *bufio.Writer) {
// Stream the first chunk.
if len(*w.responseBody) > 0 {
_, _ = bw.Write(*w.responseBody)
// Ensure cleanup only after the stream completes.
defer releaseWriter(w)
// Send pre-flush bytes.
if b := w.consumePreflush(); len(b) > 0 {
_, _ = bw.Write(b)
_ = bw.Flush()
}
// The current response body is no longer used
// past this point.
w.responseMutex.Unlock()
// Stream the rest of the data that is read
// from the net/http handler in 32 KiB chunks.
//
// Note: Data must be manually copied in chunks
// as data comes in.
chunk := acquireBuffer()
*chunk = (*chunk)[:minBufferSize]
// Stream subsequent writes from the pipe until EOF.
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)
for {
// Read net/http handler chunk.
n, err := w.r.Read(*chunk)
if err != nil {
// Handler ended due to an io.EOF
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
return
}
// Copy chunk to fasthttp response
n, err := w.pr.Read(*buf)
if n > 0 {
_, err = bw.Write((*chunk)[:n])
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
if _, e := bw.Write((*buf)[:n]); e != nil {
return
}
err = bw.Flush()
if err != nil {
// Handler ended due to an io.ErrPipeClosed
// or an error occurred.
//
// Release the response writer for reuse.
releaseBuffer(chunk)
releaseNetHTTPResponseWriter(w)
if e := bw.Flush(); e != nil {
return
}
}
if err != nil {
return
}
}
})
// Activate streaming mode for consequent `w.Flush()`
// by net/http handler.
w.streamCond.L.Lock()
w.isStreaming = true
w.streamCond.Signal()
w.streamCond.L.Unlock()
// Signal the writer that streaming is ready so Flush() can return.
close(w.streamReady)
case modeHijacked:
// The net/http handler called w.Hijack().
// Copy data bidirectionally between the
// net/http and fasthttp connections.
var wg sync.WaitGroup
wg.Add(2)
return
// Note: It is safe to assume that net.Conn automatically
// flushes data while copying.
go func() {
defer wg.Done()
_, _ = io.Copy(ctx.Conn(), w.handlerConn)
// Close the fasthttp connection when
// the net/http connection closes.
_ = ctx.Conn().Close()
}()
go func() {
defer wg.Done()
_, _ = io.Copy(w.handlerConn, ctx.Conn())
// Note: Only the net/http handler
// should close the connection.
}()
// Wait for the net/http handler to finish
// writing to the hijacked connection prior to releasing
// the writer into the writer pool.
wg.Wait()
releaseNetHTTPResponseWriter(w)
case modePanicked:
panic("net/http handler panicked")
}
}
}
// Use a minimum buffer size of 32 KiB.
const minBufferSize = 32 * 1024
var bufferPool = &sync.Pool{
var bufferPool = sync.Pool{
New: func() any {
b := make([]byte, minBufferSize)
b := make([]byte, 32*1024)
return &b
},
}
var writerPool = &sync.Pool{
New: func() any {
pr, pw := io.Pipe()
return &netHTTPResponseWriter{
h: make(http.Header),
r: pr,
w: pw,
modeCh: make(chan ModeType),
responseBody: acquireBuffer(),
streamCond: sync.NewCond(&sync.Mutex{}),
}
},
}
type ModeType int
const (
modeUnknown ModeType = iota
modeDone
modeDone = iota + 1
modeFlushed
modeHijacked
modePanicked
)
type netHTTPResponseWriter struct {
handlerConn net.Conn
ctx *fasthttp.RequestCtx
h http.Header
r *io.PipeReader
w *io.PipeWriter
modeCh chan ModeType
responseBody *[]byte
streamCond *sync.Cond
statusCode int
once sync.Once
statusMutex sync.Mutex
responseMutex sync.Mutex
connMutex sync.Mutex
isStreaming bool
// Writer implements http.ResponseWriter + http.Flusher + http.Hijacker for the adaptor.
type writer struct {
ctx *fasthttp.RequestCtx
h http.Header
statusCode atomic.Int64
mu sync.Mutex
responseBody []byte
bufPool *[]byte
pr *io.PipeReader
pw *io.PipeWriter
hijacked atomic.Bool
modeCh chan int
streamReady chan struct{}
flushOnce sync.Once
closeOnce sync.Once
}
func acquireNetHTTPResponseWriter(ctx *fasthttp.RequestCtx) *netHTTPResponseWriter {
w, ok := writerPool.Get().(*netHTTPResponseWriter)
if !ok {
panic("fasthttpadaptor: cannot get *netHTTPResponseWriter from writerPool")
func acquireWriter(ctx *fasthttp.RequestCtx) *writer {
pr, pw := io.Pipe()
return &writer{
ctx: ctx,
h: make(http.Header),
responseBody: nil,
pr: pr,
pw: pw,
modeCh: make(chan int, 1),
streamReady: make(chan struct{}),
}
w.reset()
w.ctx = ctx
return w
}
func releaseNetHTTPResponseWriter(w *netHTTPResponseWriter) {
releaseBuffer(w.responseBody)
w.Close()
writerPool.Put(w)
}
func acquireBuffer() *[]byte {
buf, ok := bufferPool.Get().(*[]byte)
if !ok {
panic("fasthttpadaptor: cannot get *[]byte from bufferPool")
func releaseWriter(w *writer) {
_ = w.Close()
if w.bufPool != nil {
bufferPool.Put(w.bufPool)
w.bufPool = nil
}
*buf = (*buf)[:0]
return buf
}
func releaseBuffer(buf *[]byte) {
bufferPool.Put(buf)
}
func (w *netHTTPResponseWriter) StatusCode() int {
w.statusMutex.Lock()
defer w.statusMutex.Unlock()
if w.statusCode == 0 {
return http.StatusOK
}
return w.statusCode
}
func (w *netHTTPResponseWriter) Header() http.Header {
func (w *writer) Header() http.Header {
return w.h
}
func (w *netHTTPResponseWriter) WriteHeader(statusCode int) {
w.statusMutex.Lock()
defer w.statusMutex.Unlock()
w.statusCode = statusCode
func (w *writer) WriteHeader(code int) {
// Allow the same codes as net/http.
if code < 100 || code > 999 {
panic(fmt.Sprintf("invalid WriteHeader code %v", code))
}
w.statusCode.CompareAndSwap(0, int64(code))
}
func (w *netHTTPResponseWriter) Write(p []byte) (int, error) {
w.streamCond.L.Lock()
defer w.streamCond.L.Unlock()
if w.isStreaming {
// Streaming mode is on.
// Stream directly to the conn writer.
return w.w.Write(p)
func (w *writer) Write(p []byte) (int, error) {
select {
case <-w.streamReady:
return w.pw.Write(p)
default:
}
// Streaming mode is off.
// Write to the first chunk for flushing later.
w.responseMutex.Lock()
*w.responseBody = append(*w.responseBody, p...)
w.responseMutex.Unlock()
w.mu.Lock()
defer w.mu.Unlock()
if w.responseBody == nil {
w.bufPool = bufferPool.Get().(*[]byte)
w.responseBody = (*w.bufPool)[:0]
}
w.responseBody = append(w.responseBody, p...)
return len(p), nil
}
func (w *netHTTPResponseWriter) Flush() {
// Trigger streaming mode setup.
w.once.Do(func() {
w.modeCh <- modeFlushed
func (w *writer) Flush() {
w.flushOnce.Do(func() {
select {
case w.modeCh <- modeFlushed:
default:
}
})
// Wait for streaming mode.
w.streamCond.L.Lock()
defer w.streamCond.L.Unlock()
for !w.isStreaming {
w.streamCond.Wait()
}
<-w.streamReady
}
func (w *netHTTPResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
// Hijack assumes control of the connection, so we need to prevent fasthttp from closing it or
// doing anything else with it.
type wrappedConn struct {
net.Conn
wg sync.WaitGroup
once sync.Once
}
func (c *wrappedConn) Close() (err error) {
c.once.Do(func() {
err = c.Conn.Close()
c.wg.Done()
})
return err
}
func (w *writer) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if !w.hijacked.CompareAndSwap(false, true) {
return nil, nil, http.ErrHijacked
}
// Tell fasthttp not to send any HTTP response before hijacking.
w.ctx.HijackSetNoResponse(true)
netHTTPConn, fasthttpConn := net.Pipe()
w.handlerConn = fasthttpConn
// Trigger hijacked mode.
w.once.Do(func() {
w.modeCh <- modeHijacked
conn := &wrappedConn{Conn: w.ctx.Conn()}
conn.wg.Add(1)
w.ctx.Hijack(func(net.Conn) {
conn.wg.Wait()
})
bufRW := bufio.NewReadWriter(bufio.NewReader(netHTTPConn), bufio.NewWriter(netHTTPConn))
bufW := bufio.NewWriter(conn)
// Write any unflushed body to the hijacked connection buffer.
w.responseMutex.Lock()
if len(*w.responseBody) > 0 {
_, _ = bufRW.Write(*w.responseBody)
_ = bufRW.Flush()
unflushedBody := w.consumePreflush()
if len(unflushedBody) > 0 {
if _, err := bufW.Write(unflushedBody); err != nil {
_ = conn.Close()
return nil, nil, err
}
}
w.responseMutex.Unlock()
return netHTTPConn, bufRW, nil
select {
case w.modeCh <- modeHijacked:
default:
}
return conn, &bufio.ReadWriter{Reader: bufio.NewReader(conn), Writer: bufW}, nil
}
func (w *netHTTPResponseWriter) Close() error {
_ = w.w.Close()
_ = w.r.Close()
w.connMutex.Lock()
if w.handlerConn != nil {
_ = w.handlerConn.Close()
}
w.connMutex.Unlock()
func (w *writer) Close() error {
w.closeOnce.Do(func() {
_ = w.pw.Close()
_ = w.pr.Close()
})
return nil
}
func (w *netHTTPResponseWriter) reset() {
// Note: reset() must only run after a fasthttp handler finishes
// proxying the full net/http handler response to ensure no data races.
w.ctx = nil
w.connMutex.Lock()
w.handlerConn = nil
w.connMutex.Unlock()
w.statusCode = 0
// Open new bidirectional pipes
pr, pw := io.Pipe()
w.r = pr
w.w = pw
// Clear the http Header
for key := range w.h {
delete(w.h, key)
// status returns the effective status code (defaults to 200).
func (w *writer) status() int {
code := int(w.statusCode.Load())
if code == 0 {
return http.StatusOK
}
return code
}
// Get a new buffer for the response body
w.responseBody = acquireBuffer()
w.once = sync.Once{}
w.streamCond.L.Lock()
w.isStreaming = false
w.streamCond.L.Unlock()
// consumePreflush returns pre-flush bytes and clears the buffer.
func (w *writer) consumePreflush() []byte {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.responseBody) == 0 {
return nil
}
out := w.responseBody
w.responseBody = nil
return out
}
+286 -13
View File
@@ -2,9 +2,12 @@ package fasthttpadaptor
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
@@ -94,7 +97,9 @@ func TestNewFastHTTPHandler(t *testing.T) {
w.Header().Set("Header1", "value1")
w.Header().Set("Header2", "value2")
w.WriteHeader(http.StatusBadRequest)
w.Write(body) //nolint:errcheck
if _, err := w.Write(body); err != nil {
t.Fatalf("unexpected error when writing response body: %v", err)
}
}
fasthttpH := NewFastHTTPHandler(http.HandlerFunc(nethttpH))
fasthttpH = setContextValueMiddleware(fasthttpH, expectedContextKey, expectedContextValue)
@@ -105,7 +110,9 @@ func TestNewFastHTTPHandler(t *testing.T) {
req.Header.SetMethod(expectedMethod)
req.SetRequestURI(expectedRequestURI)
req.Header.SetHost(expectedHost)
req.BodyWriter().Write([]byte(expectedBody)) //nolint:errcheck
if _, err := req.BodyWriter().Write([]byte(expectedBody)); err != nil {
t.Fatalf("unexpected error when writing request body: %v", err)
}
for k, v := range expectedHeader {
req.Header.Set(k, v)
}
@@ -272,6 +279,93 @@ func TestFlushHandler(t *testing.T) {
if f, ok := w.(http.Flusher); !ok {
t.Errorf("expected http.ResponseWriter to implement http.Flusher")
} else {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Content-Length", "6")
w.Header().Set("X-Foo", "bar")
if _, err := w.Write([]byte("foo")); err != nil {
t.Error(err)
}
f.Flush()
time.Sleep(time.Millisecond * 500)
if _, err := w.Write([]byte("bar")); err != nil {
t.Error(err)
}
f.Flush()
}
}
s := &fasthttp.Server{
Handler: NewFastHTTPHandler(http.HandlerFunc(nethttpH)),
}
ln := fasthttputil.NewInmemoryListener()
go func() {
if err := s.Serve(ln); err != nil {
t.Errorf("unexpected error: %v", err)
}
}()
clientCh := make(chan struct{})
go func() {
c, err := ln.Dial()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if _, err = c.Write([]byte("GET / HTTP/1.1\r\nHost: aa\r\n\r\n")); err != nil {
t.Errorf("unexpected error: %v", err)
}
resp, err := http.ReadResponse(bufio.NewReader(c), nil)
if err != nil {
t.Errorf("unexpected error reading response: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected status code: %d. Expecting %d", resp.StatusCode, http.StatusOK)
}
if resp.Header.Get("Content-Type") != "text/plain; charset=utf-8" {
t.Errorf("unexpected Content-Type header: %q. Expecting %q", resp.Header.Get("Content-Type"), "text/plain; charset=utf-8")
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil && err != io.ErrUnexpectedEOF {
t.Errorf("unexpected error reading body: %v", err)
}
if string(body) != "foobar" {
t.Errorf("unexpected response body: %q. Expecting %q", body, "foobar")
}
close(clientCh)
}()
select {
case <-clientCh:
case <-time.After(time.Second):
t.Fatal("timeout")
}
}
func TestFlushHandlerClosed(t *testing.T) {
t.Parallel()
nethttpH := func(w http.ResponseWriter, r *http.Request) {
if f, ok := w.(http.Flusher); !ok {
t.Errorf("expected http.ResponseWriter to implement http.Flusher")
} else {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("Content-Length", "6")
w.Header().Set("X-Foo", "bar")
if _, err := w.Write([]byte("foo")); err != nil {
t.Error(err)
}
@@ -371,17 +465,7 @@ func TestHijackFlush(t *testing.T) {
time.Sleep(time.Second)
if _, err := rw.WriteString("bazz"); err != nil {
t.Error(err)
}
if err := rw.Flush(); err != nil {
t.Error(err)
}
if err := c.Close(); err != nil {
t.Error(err)
}
_ = c.Close()
}
}
}
@@ -430,3 +514,192 @@ func TestHijackFlush(t *testing.T) {
t.Fatal("timeout")
}
}
func TestResourceRecyclingUnderLoad_OneEndpoint(t *testing.T) {
t.Parallel()
handler := func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello World!")
}
s := &fasthttp.Server{
Handler: NewFastHTTPHandler(http.HandlerFunc(handler)),
}
requestCount := 10
responseTimeout := 500 * time.Millisecond
expectedBody := "Hello World!"
ln := fasthttputil.NewInmemoryListener()
go func() {
if err := s.Serve(ln); err != nil {
t.Errorf("unexpected error: %v", err)
}
}()
for reqID := 1; reqID <= requestCount; reqID++ {
req := httptest.NewRequest("GET", "/", http.NoBody)
body, err := sendRequest(ln, req, responseTimeout)
if err != nil {
t.Errorf("[%d] unexpected error sending request: %v", reqID, err)
}
if string(body) != expectedBody {
t.Errorf("[%d] unexpected response: %q. Expecting %q", reqID, body, expectedBody)
}
}
}
func TestResourceRecyclingUnderLoad_MultipleEndpoints(t *testing.T) {
t.Parallel()
handlers := []struct {
endpoint string
handler fasthttp.RequestHandler
expectedBody string
}{
{
endpoint: "/done",
handler: NewFastHTTPHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello World!")
})),
expectedBody: "Hello World!",
},
{
endpoint: "/flush",
handler: NewFastHTTPHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if f, ok := w.(http.Flusher); ok {
if _, err := w.Write([]byte("foo")); err != nil {
t.Error(err)
}
f.Flush()
time.Sleep(250 * time.Millisecond)
if _, err := w.Write([]byte("bar")); err != nil {
t.Error(err)
}
f.Flush()
} else {
http.Error(w, "Flusher not supported", http.StatusInternalServerError)
}
})),
expectedBody: "foobar",
},
{
endpoint: "/hijack",
handler: NewFastHTTPHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if hj, ok := w.(http.Hijacker); ok {
conn, rw, err := hj.Hijack()
if err != nil {
t.Errorf("unexpected error: %v", err)
return
}
defer conn.Close()
if _, err := rw.WriteString("hijacked"); err != nil {
t.Errorf("unexpected error: %v", err)
}
rw.Flush()
} else {
http.Error(w, "Hijacker not supported", http.StatusInternalServerError)
}
})),
expectedBody: "hijacked",
},
}
s := &fasthttp.Server{
Handler: func(ctx *fasthttp.RequestCtx) {
for _, h := range handlers {
if string(ctx.Path()) == h.endpoint {
h.handler(ctx)
return
}
}
ctx.Error("Not Found", fasthttp.StatusNotFound)
},
}
repeatCount := 3
responseTimeout := 500 * time.Millisecond
ln := fasthttputil.NewInmemoryListener()
go func() {
if err := s.Serve(ln); err != nil {
t.Errorf("unexpected error: %v", err)
}
}()
for range repeatCount {
for _, handler := range handlers {
req := httptest.NewRequest("GET", handler.endpoint, http.NoBody)
body, err := sendRequest(ln, req, responseTimeout)
if err != nil {
t.Errorf("[%s] unexpected error sending request: %v", handler.endpoint, err)
}
if string(body) != handler.expectedBody {
t.Errorf("[%s] unexpected response: %q. Expecting %q", handler.endpoint, body, handler.expectedBody)
}
}
}
}
func sendRequest(ln *fasthttputil.InmemoryListener, req *http.Request, responseTimeout time.Duration) ([]byte, error) {
c, err := ln.Dial()
if err != nil {
return nil, err
}
if err := req.Write(c); err != nil {
return nil, err
}
time.AfterFunc(responseTimeout, func() {
c.Close()
})
response, err := io.ReadAll(c)
if err != nil {
return nil, err
}
resp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(response)), nil)
if err != nil {
// Hijacked response, return the full response instead of the parsed body.
return response, nil
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
func TestNewFastHTTPHandlerPanic(t *testing.T) {
var ctx fasthttp.RequestCtx
var req fasthttp.Request
req.Header.SetMethod(fasthttp.MethodPost)
req.SetRequestURI("/")
req.Header.SetHost("example.com")
remoteAddr, err := net.ResolveTCPAddr("tcp", "1.2.3.4:6789")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ctx.Init(&req, remoteAddr, nil)
nethttpH := func(w http.ResponseWriter, r *http.Request) {
panic("test panic")
}
fasthttpH := NewFastHTTPHandler(http.HandlerFunc(nethttpH))
defer func() {
recover() //nolint:errcheck
}()
fasthttpH(&ctx)
t.Error("expected panic, but it didn't happen")
}