The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0012), extracted and hardened from the near-identical hand-rolled handlers in authz-service (availableCompanies) and accounting-service (entryBasesChanged) before a third copy is written. Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends under the read lock so a close can't race a send), a key-sharded worker pool that runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO order while distinct keys run in parallel), the bounded retry/timeout budget, and Observer metric hooks. Services supply only the event->key+payload mapping, the read-view Producer closure, and the per-replica transient-consumer wiring. Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2.0 KiB
subscriptions
Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0009 tier-3, ADR-0012).
An entity shown in the UI is frequently projected from another service's
event, so the owning service exposes a GraphQL subscription, drives it from a
per-replica transient AMQP consumer, and pushes a lightweight poke once the
change is visible in its own read view — the client then refetches the
authoritative query. This package is the reusable, type-generic, hardened core
of that pattern, extracted from the hand-rolled copies in authz-service
(availableCompanies) and accounting-service (entryBasesChanged).
import "gitea.unbound.se/shiny/subscriptions"
// One registry per subscription, parameterised by the GraphQL payload type.
reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger))
// Resolver: register a websocket consumer (key by company, user, …).
ch, cleanup, _ := reg.AddReceiver(companyID)
go func() { <-ctx.Done(); cleanup() }()
return ch, nil
// AMQP handler: gate the push on the read view, off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
basis, err := readView.FindEntryBasisById(ctx, id)
if err != nil {
return nil, false // transient read error — keep waiting
}
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})
What the registry owns (so services don't re-roll it): the keyed subscriber map,
non-blocking buffered fan-out (sends under the read lock so a close can't race a
send), a bounded worker pool that runs the read-view gate off the AMQP
delivery goroutine, and the retry/timeout budget. What stays in the service: the
event→(key, payload) mapping and the Producer read-view closure.
The poke is idempotent and drop-tolerant — the client refetches on any poke — so
the worker acks immediately and a dropped/duplicated poke self-heals. Wire an
Observer to surface dropped/skipped pushes as metrics.