Fix server crash (nil pointer panic) when subscriber disconnects during publish
This commit is contained in:
@@ -3901,6 +3901,134 @@ func (m *mockResponseWriter) WriteHeader(statusCode int) {
|
||||
m.writeHeaderHit = true
|
||||
}
|
||||
|
||||
// closableResponseWriter simulates a real HTTP response writer that becomes invalid
|
||||
// after the handler returns. In production, Go's HTTP server calls finishRequest() after
|
||||
// the handler returns, which nils out the underlying bufio.Writer. Any subsequent Flush()
|
||||
// from a straggler Publish goroutine causes a nil pointer panic. This mock tracks whether
|
||||
// any Write or Flush occurred after the handler returned (i.e. after Close was called).
|
||||
type closableResponseWriter struct {
|
||||
header http.Header
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
wroteAfterClose atomic.Bool
|
||||
}
|
||||
|
||||
func newClosableResponseWriter() *closableResponseWriter {
|
||||
return &closableResponseWriter{
|
||||
header: make(http.Header),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *closableResponseWriter) Header() http.Header {
|
||||
return w.header
|
||||
}
|
||||
|
||||
func (w *closableResponseWriter) Write(b []byte) (int, error) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if w.closed {
|
||||
w.wroteAfterClose.Store(true)
|
||||
return 0, errors.New("write after handler returned")
|
||||
}
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (w *closableResponseWriter) WriteHeader(statusCode int) {}
|
||||
|
||||
func (w *closableResponseWriter) Flush() {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
if w.closed {
|
||||
w.wroteAfterClose.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
// Close simulates Go's HTTP server cleaning up the response writer after the handler returns.
|
||||
func (w *closableResponseWriter) Close() {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
w.closed = true
|
||||
}
|
||||
|
||||
func TestServer_SubscribeHTTP_NoWriteAfterHandlerReturn(t *testing.T) {
|
||||
// This test reproduces the panic from https://github.com/binwiederhier/ntfy/issues/338:
|
||||
//
|
||||
// panic: runtime error: invalid memory address or nil pointer dereference
|
||||
// bufio.(*Writer).Flush(...)
|
||||
// net/http.(*response).Flush(...)
|
||||
// server.(*Server).handleSubscribeHTTP.func2(...)
|
||||
// server.(*topic).Publish.func1.1(...)
|
||||
//
|
||||
// The race: topic.Publish() copies the subscriber list and calls each subscriber in its own
|
||||
// goroutine. If the subscriber disconnects, the handler returns and Go's HTTP server cleans up
|
||||
// the response writer. But a Publish goroutine that copied the subscriber list BEFORE
|
||||
// Unsubscribe may still call sub() AFTER the handler returns.
|
||||
//
|
||||
// This test deterministically reproduces the scenario by:
|
||||
// 1. Subscribing via handleSubscribeHTTP (which registers a sub closure on the topic)
|
||||
// 2. Copying the subscriber function from the topic (simulating what topic.Publish does)
|
||||
// 3. Cancelling the subscription and waiting for the handler to fully return
|
||||
// 4. Calling the copied subscriber function AFTER the handler has returned
|
||||
// 5. Checking that no write/flush occurred on the (now-invalid) response writer
|
||||
//
|
||||
// Without the wlock+closed fix, calling the subscriber after the handler returns writes to
|
||||
// the closed response writer (which in production causes a nil pointer panic on Flush).
|
||||
// With the fix, the subscriber sees closed=true and returns without writing.
|
||||
t.Parallel()
|
||||
s := newTestServer(t, newTestConfig(t))
|
||||
|
||||
rw := newClosableResponseWriter()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", "/mytopic/json", nil)
|
||||
require.Nil(t, err)
|
||||
req.RemoteAddr = "9.9.9.9:1234"
|
||||
|
||||
// Start the subscribe handler (blocks until context is cancelled)
|
||||
handlerDone := make(chan struct{})
|
||||
go func() {
|
||||
s.handle(rw, req)
|
||||
close(handlerDone)
|
||||
}()
|
||||
time.Sleep(100 * time.Millisecond) // Wait for subscription to be registered
|
||||
|
||||
// Grab a copy of the subscriber function from the topic, exactly as topic.Publish() does
|
||||
// via subscribersCopy(). This must happen BEFORE cancel/Unsubscribe removes the subscriber.
|
||||
s.mu.RLock()
|
||||
tp := s.topics["mytopic"]
|
||||
s.mu.RUnlock()
|
||||
require.NotNil(t, tp)
|
||||
subscribersCopy := tp.subscribersCopy()
|
||||
require.Equal(t, 1, len(subscribersCopy))
|
||||
|
||||
var copiedSub subscriber
|
||||
for _, sub := range subscribersCopy {
|
||||
copiedSub = sub.subscriber
|
||||
}
|
||||
|
||||
// Cancel the subscription and wait for the handler to fully return.
|
||||
// At this point, the deferred cleanup in handleSubscribeHTTP runs:
|
||||
// - With fix: wlock.Lock() waits for in-flight sub(), sets closed=true, wlock.Unlock()
|
||||
// - Without fix: nothing prevents future sub() calls from writing
|
||||
cancel()
|
||||
<-handlerDone
|
||||
|
||||
// Simulate Go's HTTP server cleaning up the response writer after the handler returns.
|
||||
// In production, this is finishRequest() which nils out the bufio.Writer.
|
||||
rw.Close()
|
||||
|
||||
// Now call the copied subscriber function, simulating a straggler Publish goroutine
|
||||
// that copied the subscriber list before Unsubscribe ran. In production, this is exactly
|
||||
// how the panic occurs: the goroutine spawned by topic.Publish calls sub() after the
|
||||
// handler has already returned and Go has cleaned up the response writer.
|
||||
v := newVisitor(s.config, s.messageCache, s.userManager, netip.MustParseAddr("9.9.9.9"), nil)
|
||||
msg := newDefaultMessage("mytopic", "straggler message")
|
||||
_ = copiedSub(v, msg)
|
||||
|
||||
require.False(t, rw.wroteAfterClose.Load(),
|
||||
"sub() wrote to the response writer after the handler returned; "+
|
||||
"in production this causes a nil pointer panic in bufio.(*Writer).Flush()")
|
||||
}
|
||||
|
||||
func TestServer_HandleError_SkipsWriteHeaderOnHijackedConnection(t *testing.T) {
|
||||
// Test that handleError does not call WriteHeader for WebSocket errors wrapped
|
||||
// with errWebSocketPostUpgrade (indicating the connection was hijacked)
|
||||
|
||||
Reference in New Issue
Block a user