From c9c8d72607aae4dcffdd76d2cc5e2ea0d7f44b89 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 15 Apr 2016 23:14:38 +0300 Subject: [PATCH] Issue #78: immediately flush chunked data to client --- http.go | 4 +++ server_test.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/http.go b/http.go index 06c8bf4..c1406e4 100644 --- a/http.go +++ b/http.go @@ -1330,6 +1330,10 @@ func writeChunk(w *bufio.Writer, b []byte) error { w.Write(strCRLF) w.Write(b) _, err := w.Write(strCRLF) + err1 := w.Flush() + if err == nil { + err = err1 + } return err } diff --git a/server_test.go b/server_test.go index eeeade9..f465d6c 100644 --- a/server_test.go +++ b/server_test.go @@ -17,6 +17,98 @@ import ( "github.com/valyala/fasthttp/fasthttputil" ) +func TestServerResponseBodyStream(t *testing.T) { + ln := fasthttputil.NewInmemoryListener() + + readyCh := make(chan struct{}) + h := func(ctx *RequestCtx) { + ctx.SetConnectionClose() + ctx.SetBodyStreamWriter(func(w *bufio.Writer) { + fmt.Fprintf(w, "first") + if err := w.Flush(); err != nil { + return + } + <-readyCh + fmt.Fprintf(w, "second") + // there is no need to flush w here, since it will + // be flushed automatically after returning from StreamWriter. + }) + } + + serverCh := make(chan struct{}) + go func() { + if err := Serve(ln, h); err != nil { + t.Fatalf("unexpected error: %s", err) + } + close(serverCh) + }() + + clientCh := make(chan struct{}) + go func() { + c, err := ln.Dial() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if _, err = c.Write([]byte("GET / HTTP/1.1\r\nHost: aa\r\n\r\n")); err != nil { + t.Fatalf("unexpected error: %s", err) + } + br := bufio.NewReader(c) + var respH ResponseHeader + if err = respH.Read(br); err != nil { + t.Fatalf("unexpected error: %s", err) + } + if respH.StatusCode() != StatusOK { + t.Fatalf("unexpected status code: %d. Expecting %d", respH.StatusCode(), StatusOK) + } + + buf := make([]byte, 1024) + n, err := br.Read(buf) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + b := buf[:n] + if string(b) != "5\r\nfirst\r\n" { + t.Fatalf("unexpected result %q. Expecting %q", b, "5\r\nfirst\r\n") + } + close(readyCh) + + n, err = br.Read(buf) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + b = buf[:n] + if string(b) != "6\r\nsecond\r\n" { + t.Fatalf("unexpected result %q. Expecting %q", b, "6\r\nsecond\r\n") + } + + tail, err := ioutil.ReadAll(br) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if string(tail) != "0\r\n\r\n" { + t.Fatalf("unexpected tail %q. Expecting %q", tail, "0\r\n\r\n") + } + + close(clientCh) + }() + + select { + case <-clientCh: + case <-time.After(time.Second): + t.Fatalf("timeout") + } + + if err := ln.Close(); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + select { + case <-serverCh: + case <-time.After(time.Second): + t.Fatalf("timeout") + } +} + func TestServerDisableKeepalive(t *testing.T) { s := &Server{ Handler: func(ctx *RequestCtx) {