da4e7df6ce
The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0012), extracted and hardened from the near-identical hand-rolled handlers in authz-service (availableCompanies) and accounting-service (entryBasesChanged) before a third copy is written. Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends under the read lock so a close can't race a send), a key-sharded worker pool that runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO order while distinct keys run in parallel), the bounded retry/timeout budget, and Observer metric hooks. Services supply only the event->key+payload mapping, the read-view Producer closure, and the per-replica transient-consumer wiring. Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
384 lines
9.5 KiB
Go
384 lines
9.5 KiB
Go
package subscriptions
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type ping struct {
|
|
ID string
|
|
}
|
|
|
|
func quiet() Option {
|
|
return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil)))
|
|
}
|
|
|
|
// recordObserver records the metric callbacks; safe for concurrent use.
|
|
type recordObserver struct {
|
|
mu sync.Mutex
|
|
pushed []string
|
|
skipped []string
|
|
dropped []string
|
|
channelFull []string
|
|
}
|
|
|
|
func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) }
|
|
func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) }
|
|
func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) }
|
|
func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) }
|
|
|
|
func (o *recordObserver) add(dst *[]string, k string) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
*dst = append(*dst, k)
|
|
}
|
|
|
|
func (o *recordObserver) count(get func(*recordObserver) []string) int {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
return len(get(o))
|
|
}
|
|
|
|
func (o *recordObserver) pushedCount() int {
|
|
return o.count(func(r *recordObserver) []string { return r.pushed })
|
|
}
|
|
|
|
func (o *recordObserver) skippedCount() int {
|
|
return o.count(func(r *recordObserver) []string { return r.skipped })
|
|
}
|
|
|
|
func (o *recordObserver) droppedCount() int {
|
|
return o.count(func(r *recordObserver) []string { return r.dropped })
|
|
}
|
|
|
|
func (o *recordObserver) channelFullCount() int {
|
|
return o.count(func(r *recordObserver) []string { return r.channelFull })
|
|
}
|
|
|
|
func ready(p *ping) Producer[ping] {
|
|
return func(context.Context) (*ping, bool) { return p, true }
|
|
}
|
|
|
|
func never() Producer[ping] {
|
|
return func(context.Context) (*ping, bool) { return nil, false }
|
|
}
|
|
|
|
func recv(t *testing.T, ch <-chan *ping) *ping {
|
|
t.Helper()
|
|
select {
|
|
case v := <-ch:
|
|
return v
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("timed out waiting for a push")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func assertNoPush(t *testing.T, ch <-chan *ping) {
|
|
t.Helper()
|
|
select {
|
|
case v := <-ch:
|
|
t.Fatalf("unexpected push: %+v", v)
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
}
|
|
|
|
func TestSubmit_pushesWhenReady(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
ch, cleanup, err := r.AddReceiver("c1")
|
|
require.NoError(t, err)
|
|
defer cleanup()
|
|
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
|
|
|
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
|
|
}
|
|
|
|
func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
var calls int
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
|
calls++
|
|
return &ping{ID: "x"}, true
|
|
})
|
|
|
|
// No subscribers → Submit returns on the fast path without enqueuing, so the
|
|
// producer is never invoked.
|
|
assert.Equal(t, 0, calls)
|
|
}
|
|
|
|
func TestSubmit_emptyKey_isNoop(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
var calls int
|
|
r.Submit("", func(context.Context) (*ping, bool) {
|
|
calls++
|
|
return nil, true
|
|
})
|
|
assert.Equal(t, 0, calls)
|
|
}
|
|
|
|
func TestSubmit_lagThenReady(t *testing.T) {
|
|
r := New[ping](quiet(), WithReadRetry(50, time.Millisecond))
|
|
defer r.Close()
|
|
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
|
defer cleanup()
|
|
|
|
var mu sync.Mutex
|
|
calls := 0
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
calls++
|
|
// Not visible on the first read; visible thereafter.
|
|
return &ping{ID: "eb1"}, calls > 1
|
|
})
|
|
|
|
assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch))
|
|
}
|
|
|
|
func TestSubmit_neverReady_skipsAndReports(t *testing.T) {
|
|
obs := &recordObserver{}
|
|
r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond))
|
|
defer r.Close()
|
|
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
|
defer cleanup()
|
|
|
|
r.Submit("c1", never())
|
|
|
|
assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond)
|
|
assert.Equal(t, 0, obs.pushedCount())
|
|
assertNoPush(t, ch)
|
|
}
|
|
|
|
func TestObserver_pushedOnDelivery(t *testing.T) {
|
|
obs := &recordObserver{}
|
|
r := New[ping](quiet(), WithObserver(obs))
|
|
defer r.Close()
|
|
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
|
defer cleanup()
|
|
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
|
recv(t, ch)
|
|
|
|
assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond)
|
|
}
|
|
|
|
func TestPush_allSubscribersOfKey(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
chA, cleanupA, _ := r.AddReceiver("c1")
|
|
defer cleanupA()
|
|
chB, cleanupB, _ := r.AddReceiver("c1")
|
|
defer cleanupB()
|
|
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
|
|
|
assert.Equal(t, "eb1", recv(t, chA).ID)
|
|
assert.Equal(t, "eb1", recv(t, chB).ID)
|
|
}
|
|
|
|
func TestSubmit_keysIsolated(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
chA, cleanupA, _ := r.AddReceiver("c1")
|
|
defer cleanupA()
|
|
chB, cleanupB, _ := r.AddReceiver("c2")
|
|
defer cleanupB()
|
|
|
|
r.Submit("c1", ready(&ping{ID: "eb1"}))
|
|
|
|
assert.Equal(t, "eb1", recv(t, chA).ID)
|
|
assertNoPush(t, chB)
|
|
}
|
|
|
|
// TestPerKeyOrdering asserts that events for one key are delivered FIFO even
|
|
// across the worker pool (same key → same shard → one worker). This guards the
|
|
// ordering guarantee a payload the client consumes directly relies on.
|
|
func TestPerKeyOrdering(t *testing.T) {
|
|
r := New[ping](quiet(), WithWorkers(4))
|
|
defer r.Close()
|
|
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
|
defer cleanup()
|
|
|
|
const n = 10
|
|
for i := range n {
|
|
r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)}))
|
|
}
|
|
for i := range n {
|
|
assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID)
|
|
}
|
|
}
|
|
|
|
func TestAddReceiver_emptyKeyErrors(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
ch, cleanup, err := r.AddReceiver("")
|
|
assert.ErrorIs(t, err, ErrEmptyKey)
|
|
assert.Nil(t, ch)
|
|
assert.Nil(t, cleanup)
|
|
}
|
|
|
|
func TestRemoveReceiver_closesChannel(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
ch, cleanup, _ := r.AddReceiver("c1")
|
|
cleanup()
|
|
|
|
// Draining a closed channel terminates the range/returns ok=false.
|
|
_, ok := <-ch
|
|
assert.False(t, ok)
|
|
}
|
|
|
|
func TestRemoveReceiver_idempotent(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
defer r.Close()
|
|
|
|
_, cleanup, _ := r.AddReceiver("c1")
|
|
cleanup()
|
|
assert.NotPanics(t, cleanup) // second call is a no-op
|
|
}
|
|
|
|
func TestChannelFull_dropsAndReports(t *testing.T) {
|
|
obs := &recordObserver{}
|
|
// One worker so the two pushes are serialized; buffer 1 so the second drops.
|
|
r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1))
|
|
defer r.Close()
|
|
|
|
_, cleanup, _ := r.AddReceiver("c1") // never read from the channel
|
|
defer cleanup()
|
|
|
|
r.Submit("c1", ready(&ping{ID: "first"}))
|
|
r.Submit("c1", ready(&ping{ID: "second"}))
|
|
|
|
assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond)
|
|
}
|
|
|
|
func TestSubmit_queueFull_dropsAndReports(t *testing.T) {
|
|
obs := &recordObserver{}
|
|
started := make(chan struct{})
|
|
release := make(chan struct{})
|
|
|
|
r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond))
|
|
defer r.Close() // runs last
|
|
defer close(release) // runs before Close, unblocking the worker
|
|
|
|
_, cleanup, _ := r.AddReceiver("c1")
|
|
defer cleanup()
|
|
|
|
var once sync.Once
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
|
once.Do(func() { close(started) })
|
|
<-release
|
|
return &ping{ID: "blocking"}, true
|
|
})
|
|
<-started // the single worker is now blocked in produce; its shard queue is empty
|
|
|
|
r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue
|
|
r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full → dropped
|
|
|
|
assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond)
|
|
}
|
|
|
|
func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) {
|
|
r := New[ping](quiet())
|
|
r.Close()
|
|
r.Close() // idempotent
|
|
|
|
assert.NotPanics(t, func() {
|
|
r.Submit("c1", ready(&ping{ID: "x"}))
|
|
})
|
|
}
|
|
|
|
// TestClose_cancelsInFlightGate asserts Close returns promptly even while a
|
|
// worker is mid-gate, rather than blocking for the full retry budget (~5s by
|
|
// default). Guards the registry-level context cancellation.
|
|
func TestClose_cancelsInFlightGate(t *testing.T) {
|
|
started := make(chan struct{})
|
|
r := New[ping](quiet()) // default retry budget ≈ 5s
|
|
|
|
_, cleanup, _ := r.AddReceiver("c1")
|
|
defer cleanup()
|
|
|
|
var once sync.Once
|
|
r.Submit("c1", func(context.Context) (*ping, bool) {
|
|
once.Do(func() { close(started) })
|
|
return nil, false // never ready → worker stays in the gate loop
|
|
})
|
|
<-started // worker is now in the gate loop
|
|
|
|
done := make(chan struct{})
|
|
go func() { r.Close(); close(done) }()
|
|
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("Close did not return promptly while a worker was gating")
|
|
}
|
|
}
|
|
|
|
// TestConcurrentChurn stresses the race between removeReceiver (which closes a
|
|
// subscriber channel under the write lock) and a worker push (which sends under
|
|
// the read lock). Before sends were moved under the read lock this panicked
|
|
// with "send on closed channel". Run with -race.
|
|
func TestConcurrentChurn(t *testing.T) {
|
|
r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8))
|
|
defer r.Close()
|
|
|
|
// A long-lived reader so there is always at least one subscriber being
|
|
// pushed to while others churn.
|
|
_, steadyCleanup, _ := r.AddReceiver("c1")
|
|
defer steadyCleanup()
|
|
go func() {
|
|
ch, _, _ := r.AddReceiver("c1")
|
|
for range ch { //nolint:revive // drain
|
|
}
|
|
}()
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range 2000 {
|
|
_, cleanup, err := r.AddReceiver("c1")
|
|
if err != nil {
|
|
t.Errorf("AddReceiver: %v", err)
|
|
return
|
|
}
|
|
cleanup()
|
|
}
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for range 2000 {
|
|
r.Submit("c1", ready(&ping{ID: "x"}))
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|