fix: keep proxy slot through response body
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -50,6 +51,7 @@ func (s *Service) ProxyHTTP(w http.ResponseWriter, r *http.Request, targetIP, pr
|
|||||||
Transport: retryTransport{
|
Transport: retryTransport{
|
||||||
base: &http.Transport{
|
base: &http.Transport{
|
||||||
DisableKeepAlives: true,
|
DisableKeepAlives: true,
|
||||||
|
DisableCompression: true,
|
||||||
ResponseHeaderTimeout: 5 * time.Second,
|
ResponseHeaderTimeout: 5 * time.Second,
|
||||||
},
|
},
|
||||||
limiter: s.proxyLimiter(targetIP),
|
limiter: s.proxyLimiter(targetIP),
|
||||||
@@ -139,10 +141,20 @@ func (t retryTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
resp, err := base.RoundTrip(nextReq)
|
resp, err := base.RoundTrip(nextReq)
|
||||||
releaseProxySlot(t.limiter)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if resp == nil || resp.Body == nil {
|
||||||
|
releaseProxySlot(t.limiter)
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
resp.Body = &releaseOnCloseReadCloser{
|
||||||
|
ReadCloser: resp.Body,
|
||||||
|
release: func() {
|
||||||
|
releaseProxySlot(t.limiter)
|
||||||
|
},
|
||||||
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
releaseProxySlot(t.limiter)
|
||||||
lastErr = err
|
lastErr = err
|
||||||
if !shouldRetryProxyRequest(req, err) || attempt == attempts-1 {
|
if !shouldRetryProxyRequest(req, err) || attempt == attempts-1 {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -152,6 +164,22 @@ func (t retryTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
|||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type releaseOnCloseReadCloser struct {
|
||||||
|
io.ReadCloser
|
||||||
|
once sync.Once
|
||||||
|
release func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *releaseOnCloseReadCloser) Close() error {
|
||||||
|
err := r.ReadCloser.Close()
|
||||||
|
r.once.Do(func() {
|
||||||
|
if r.release != nil {
|
||||||
|
r.release()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func acquireProxySlot(ctx context.Context, limiter chan struct{}, timeout time.Duration) error {
|
func acquireProxySlot(ctx context.Context, limiter chan struct{}, timeout time.Duration) error {
|
||||||
if limiter == nil {
|
if limiter == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package webdevice
|
package webdevice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
@@ -148,6 +149,44 @@ func TestProxyHTTPClosesUpstreamConnection(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryTransportHoldsProxySlotUntilBodyClose(t *testing.T) {
|
||||||
|
limiter := make(chan struct{}, 1)
|
||||||
|
transport := retryTransport{
|
||||||
|
base: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||||
|
return &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: io.NopCloser(strings.NewReader("ok")),
|
||||||
|
Header: http.Header{},
|
||||||
|
Request: req,
|
||||||
|
}, nil
|
||||||
|
}),
|
||||||
|
limiter: limiter,
|
||||||
|
attempts: 1,
|
||||||
|
acquireTimeout: time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "http://portal/test", nil)
|
||||||
|
resp, err := transport.RoundTrip(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("RoundTrip() error = %v", err)
|
||||||
|
}
|
||||||
|
if got := len(limiter); got != 1 {
|
||||||
|
t.Fatalf("limiter len after RoundTrip = %d, want 1", got)
|
||||||
|
}
|
||||||
|
if err := resp.Body.Close(); err != nil {
|
||||||
|
t.Fatalf("Body.Close() error = %v", err)
|
||||||
|
}
|
||||||
|
if got := len(limiter); got != 0 {
|
||||||
|
t.Fatalf("limiter len after Body.Close = %d, want 0", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type roundTripperFunc func(*http.Request) (*http.Response, error)
|
||||||
|
|
||||||
|
func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
|
return f(req)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSanitizeProxyRequestHeaderDropsLoginCookie(t *testing.T) {
|
func TestSanitizeProxyRequestHeaderDropsLoginCookie(t *testing.T) {
|
||||||
source := http.Header{}
|
source := http.Header{}
|
||||||
source.Set("User-Agent", "browser")
|
source.Set("User-Agent", "browser")
|
||||||
|
|||||||
Reference in New Issue
Block a user