Files
subscriptions/subscriptions.go
argoyle da4e7df6ce
subscriptions / test (push) Has been skipped
subscriptions / vulnerabilities (push) Has been skipped
Release / release (push) Successful in 38s
subscriptions / coverage-baseline (push) Successful in 2m58s
pre-commit / pre-commit (push) Successful in 4m58s
feat: type-generic registry for cross-service read-your-writes subscriptions
The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0012), extracted and hardened from the near-identical hand-rolled handlers
in authz-service (availableCompanies) and accounting-service (entryBasesChanged)
before a third copy is written.

Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends
under the read lock so a close can't race a send), a key-sharded worker pool that
runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO
order while distinct keys run in parallel), the bounded retry/timeout budget, and
Observer metric hooks. Services supply only the event->key+payload mapping, the
read-view Producer closure, and the per-replica transient-consumer wiring.

Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:22:34 +02:00

422 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package subscriptions provides a reusable, type-generic registry for fanning
// out change notifications to in-process GraphQL subscription consumers.
//
// It is the shared core of Shiny's cross-service read-your-writes pattern
// (ADR-0012): an entity shown in the UI is frequently projected from another
// service's event, so the owning service exposes a GraphQL subscription, drives
// it from a per-replica transient AMQP consumer, and pushes a notification once
// the change is visible in its own read view — the client then refetches the
// authoritative query. Two services hand-rolled this (authz-service's
// availableCompanies, accounting-service's entryBasesChanged); this package is
// the extracted, hardened core so further cases reuse it instead of copying it.
//
// # Wiring requirement (do not get this wrong)
//
// Submit MUST be fed from a per-replica goamqp.TransientEventStreamConsumer (an
// exclusive, randomly-named queue) bound to the owning service's OWN events, so
// every replica receives every event and can push to the websockets it holds.
// This is necessarily a DIFFERENT consumer from the shared, durable read-view
// projection consumer (a work-queue, where exactly one replica handles each
// event). Wiring Submit to a shared/durable consumer silently breaks delivery in
// a multi-replica deployment: the one replica that handles an event usually does
// not hold the subscriber's websocket, so the poke is lost with no error. The
// library cannot enforce this (it is transport-agnostic) — the caller must.
//
// # Concurrency model
//
// - AddReceiver registers a subscriber (one per websocket) and returns a
// buffered channel plus a cleanup func that must be called when the
// subscription ends.
// - Submit is called from the AMQP event handler. It does NOT block that
// handler on the read view: it hands the work to a per-key worker that waits
// (with a budget) for the read view to reflect the change — via the caller's
// [Producer] — and only then pushes. Acking the AMQP message immediately is
// safe because notifications are idempotent and drop-tolerant (see below).
// - Work is sharded by key, so all events for one key are processed FIFO by a
// single worker (preserving per-key order even for payloads the client
// consumes directly), while distinct keys run in parallel (so one lagging
// read view only delays its own key's shard, not everything).
//
// Pushes happen while holding the read lock, and cleanup closes a subscriber's
// channel under the write lock, so a send can never race a close ("send on
// closed channel"). A full subscriber buffer drops the notification rather than
// blocking a slow consumer.
//
// # Payload contract
//
// T should be a lightweight notification the client reacts to by refetching the
// authoritative query (a poke such as {id, removed}), not authoritative state
// the client consumes as the source of truth. Notifications may be dropped (full
// queue or buffer) and the push is best-effort (no AMQP requeue on a persistent
// read failure), so reliability comes from the client's idempotent refetch.
// Per-key FIFO ordering is preserved, so a payload the client does consume
// directly is at least delivered in event order for a given key — but it can
// still be dropped, so a refetch-on-receipt design is strongly preferred.
package subscriptions
import (
"context"
"errors"
"hash/fnv"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// ErrEmptyKey is returned by AddReceiver when the subscription key is empty,
// which almost always indicates an unpopulated id at the call site.
var ErrEmptyKey = errors.New("subscriptions: empty subscription key")
// Producer reads the owning service's read view and returns the payload to push
// together with whether the change is yet visible there.
//
// The registry calls a Producer repeatedly until it reports ready (or a bounded
// budget elapses), so the client's refetch can never race ahead of the
// projection. A Producer MUST read current read-view state on each call (rather
// than capturing the event's historical state). On a transient read error it
// should return (nil, false) to keep waiting — the event is already durable, so
// only the projection is lagging.
//
// Convergence precondition: "retry until ready" only terminates as ready (rather
// than burning the whole budget then skipping) if the read view the Producer
// gates on is made consistent by the SAME ordered aggregate stream as the
// triggering event. Gating on a row populated by a different aggregate's events
// can fail to converge.
type Producer[T any] func(ctx context.Context) (payload *T, ready bool)
// Observer receives best-effort notifications about pushes, for
// metrics/observability. Its methods may be called concurrently. The default is
// a no-op; wire an implementation (e.g. OTel counters) via [WithObserver].
type Observer interface {
// Pushed reports that a change was gated and delivered to the key's
// subscribers — the denominator for a skip/drop rate.
Pushed(key string)
// PushSkipped reports that the read view never reflected the change within
// the retry budget, so the push was skipped.
PushSkipped(key string)
// Dropped reports that the worker queue was full, so the notification was
// dropped before it could be gated.
Dropped(key string)
// ChannelFull reports that a subscriber's buffer was full, so its
// notification was dropped.
ChannelFull(key string)
}
type noopObserver struct{}
func (noopObserver) Pushed(string) {}
func (noopObserver) PushSkipped(string) {}
func (noopObserver) Dropped(string) {}
func (noopObserver) ChannelFull(string) {}
type subscriber[T any] struct {
id string
channel chan *T
}
type job[T any] struct {
key string
produce Producer[T]
}
// Registry fans out notifications of type T to in-process subscribers keyed by
// an arbitrary string (e.g. a company id or a user email). The zero value is
// not usable; construct one with [New].
type Registry[T any] struct {
logger *slog.Logger
obs Observer
bufferSize int
retries int
retryWait time.Duration
mu sync.RWMutex
subscribers map[string]map[string]*subscriber[T]
shards []chan job[T]
baseCtx context.Context
cancel context.CancelFunc
done chan struct{}
wg sync.WaitGroup
closeOnce sync.Once
}
type config struct {
logger *slog.Logger
obs Observer
bufferSize int
retries int
retryWait time.Duration
workers int
queueSize int
}
// Option configures a [Registry].
type Option func(*config)
// WithLogger sets the structured logger. Defaults to [slog.Default].
func WithLogger(l *slog.Logger) Option {
return func(c *config) {
if l != nil {
c.logger = l
}
}
}
// WithObserver wires a metrics observer. Defaults to a no-op.
func WithObserver(o Observer) Option {
return func(c *config) {
if o != nil {
c.obs = o
}
}
}
// WithReadRetry tunes how long a worker waits for the read view to reflect a
// change before giving up on the push: up to attempts re-reads spaced by wait
// (so total reads = attempts+1; default 25 × 200ms ≈ 5s). The read-view consumer
// and this subscription consumer are independent consumers of the same event, so
// a freshly-projected change may not be visible on the first read; retrying
// avoids pushing a notification the client would refetch ahead of. Non-positive
// values are ignored (attempts clamps to ≥0).
func WithReadRetry(attempts int, wait time.Duration) Option {
return func(c *config) {
if attempts >= 0 {
c.retries = attempts
}
if wait > 0 {
c.retryWait = wait
}
}
}
// WithBufferSize sets each subscriber channel's buffer (default 20). A full
// buffer drops the notification rather than blocking.
func WithBufferSize(n int) Option {
return func(c *config) {
if n > 0 {
c.bufferSize = n
}
}
}
// WithWorkers sets the number of key-shard workers (default 4). Each key is
// handled FIFO by exactly one worker; more workers spread distinct keys over
// more goroutines so a lagging read view delays only its own shard.
func WithWorkers(n int) Option {
return func(c *config) {
if n > 0 {
c.workers = n
}
}
}
// WithQueueSize sets each shard's job-queue depth (default 64). A full queue
// drops the notification (reported via [Observer.Dropped]) rather than blocking
// the AMQP delivery goroutine.
func WithQueueSize(n int) Option {
return func(c *config) {
if n > 0 {
c.queueSize = n
}
}
}
// New builds a Registry and starts its key-shard workers. Call [Registry.Close]
// to stop them (optional; they exit with the process otherwise).
func New[T any](opts ...Option) *Registry[T] {
c := &config{
logger: slog.Default(),
obs: noopObserver{},
bufferSize: 20,
retries: 25,
retryWait: 200 * time.Millisecond,
workers: 4,
queueSize: 64,
}
for _, o := range opts {
o(c)
}
ctx, cancel := context.WithCancel(context.Background())
r := &Registry[T]{
logger: c.logger,
obs: c.obs,
bufferSize: c.bufferSize,
retries: c.retries,
retryWait: c.retryWait,
subscribers: make(map[string]map[string]*subscriber[T]),
shards: make([]chan job[T], c.workers),
baseCtx: ctx,
cancel: cancel,
done: make(chan struct{}),
}
r.wg.Add(c.workers)
for i := range r.shards {
r.shards[i] = make(chan job[T], c.queueSize)
go r.worker(r.shards[i])
}
return r
}
// AddReceiver registers a subscriber for the given key. It returns the channel
// to stream and a cleanup func that MUST be called when the subscription ends
// (e.g. from the resolver's ctx.Done) to close the channel and release the
// registration. cleanup is idempotent. Returns [ErrEmptyKey] for an empty key.
func (r *Registry[T]) AddReceiver(key string) (<-chan *T, func(), error) {
if key == "" {
return nil, nil, ErrEmptyKey
}
s := &subscriber[T]{
id: uuid.NewString(),
channel: make(chan *T, r.bufferSize),
}
r.mu.Lock()
if r.subscribers[key] == nil {
r.subscribers[key] = make(map[string]*subscriber[T])
}
r.subscribers[key][s.id] = s
total := len(r.subscribers[key])
r.mu.Unlock()
r.logger.Info("subscription registered",
"key", key, "subscription_id", s.id, "total_subscriptions", total)
cleanup := func() { r.removeReceiver(key, s.id) }
return s.channel, cleanup, nil
}
func (r *Registry[T]) removeReceiver(key, id string) {
r.mu.Lock()
defer r.mu.Unlock()
subs := r.subscribers[key]
if subs == nil {
return
}
s, ok := subs[id]
if !ok {
return
}
close(s.channel)
delete(subs, id)
remaining := len(subs)
if remaining == 0 {
delete(r.subscribers, key)
}
r.logger.Info("subscription removed",
"key", key, "subscription_id", id, "remaining_subscriptions", remaining)
}
// Submit schedules a gated push for the given key. It returns immediately: when
// the key is empty or has no subscribers it does nothing, otherwise it enqueues
// work for that key's shard worker (dropping, with an [Observer.Dropped], only if
// the shard queue is full). produce is invoked on the worker, not on the calling
// goroutine.
func (r *Registry[T]) Submit(key string, produce Producer[T]) {
if key == "" || !r.hasSubscribers(key) {
return
}
shard := r.shards[shardIndex(key, len(r.shards))]
select {
case shard <- job[T]{key: key, produce: produce}:
case <-r.done:
// shutting down
default:
r.logger.Warn("subscription job queue full; dropping notification", "key", key)
r.obs.Dropped(key)
}
}
// Close stops the worker pool and waits for in-flight gating to finish. It
// cancels any in-flight read-view wait so workers return promptly rather than
// blocking for the full retry budget. Queued-but-unstarted notifications are
// dropped (safe — they are drop-tolerant). Idempotent; Submit after Close is a
// no-op.
func (r *Registry[T]) Close() {
r.closeOnce.Do(func() {
r.cancel()
close(r.done)
r.wg.Wait()
})
}
func (r *Registry[T]) hasSubscribers(key string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.subscribers[key]) > 0
}
func (r *Registry[T]) worker(shard <-chan job[T]) {
defer r.wg.Done()
for {
select {
case j := <-shard:
r.handle(j)
case <-r.done:
return
}
}
}
func (r *Registry[T]) handle(j job[T]) {
// Bound the read-view wait so a hung read or a never-reflected event (e.g.
// redelivery of an event for a since-deleted entity) can't pin a worker.
// Derived from baseCtx so Close cancels in-flight waits promptly.
budget := time.Duration(r.retries+1) * r.retryWait
ctx, cancel := context.WithTimeout(r.baseCtx, budget)
defer cancel()
payload, ok := r.await(ctx, j.produce)
if !ok {
r.logger.Warn("change not visible in read view after retries; subscription push skipped",
"key", j.key)
r.obs.PushSkipped(j.key)
return
}
r.obs.Pushed(j.key)
r.push(j.key, payload)
}
func (r *Registry[T]) await(ctx context.Context, produce Producer[T]) (*T, bool) {
for attempt := 0; ; attempt++ {
if payload, ready := produce(ctx); ready {
return payload, true
}
if attempt >= r.retries {
return nil, false
}
select {
case <-ctx.Done():
return nil, false
case <-time.After(r.retryWait):
}
}
}
// push delivers payload to every subscriber of key. The sends happen under the
// read lock: sendNonBlocking never blocks (it drops on a full buffer), so
// holding the lock is cheap, and it prevents removeReceiver — which takes the
// write lock to close a channel — from closing a channel out from under an
// in-flight send.
func (r *Registry[T]) push(key string, payload *T) {
r.mu.RLock()
defer r.mu.RUnlock()
for _, s := range r.subscribers[key] {
select {
case s.channel <- payload:
default:
r.logger.Warn("subscription channel full; dropping notification", "key", key)
r.obs.ChannelFull(key)
}
}
}
func shardIndex(key string, n int) int {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
return int(h.Sum32() % uint32(n)) //nolint:gosec // modulo keeps the result in [0,n)
}