Files
subscriptions/README.md
argoyle da4e7df6ce
subscriptions / test (push) Has been skipped
subscriptions / vulnerabilities (push) Has been skipped
Release / release (push) Successful in 38s
subscriptions / coverage-baseline (push) Successful in 2m58s
pre-commit / pre-commit (push) Successful in 4m58s
feat: type-generic registry for cross-service read-your-writes subscriptions
The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0012), extracted and hardened from the near-identical hand-rolled handlers
in authz-service (availableCompanies) and accounting-service (entryBasesChanged)
before a third copy is written.

Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends
under the read lock so a close can't race a send), a key-sharded worker pool that
runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO
order while distinct keys run in parallel), the bounded retry/timeout budget, and
Observer metric hooks. Services supply only the event->key+payload mapping, the
read-view Producer closure, and the per-replica transient-consumer wiring.

Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:22:34 +02:00

44 lines
2.0 KiB
Markdown

# subscriptions
Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0009 tier-3, ADR-0012).
An entity shown in the UI is frequently projected from *another* service's
event, so the owning service exposes a GraphQL subscription, drives it from a
per-replica transient AMQP consumer, and pushes a lightweight poke once the
change is visible in its own read view — the client then refetches the
authoritative query. This package is the reusable, type-generic, hardened core
of that pattern, extracted from the hand-rolled copies in `authz-service`
(`availableCompanies`) and `accounting-service` (`entryBasesChanged`).
```go
import "gitea.unbound.se/shiny/subscriptions"
// One registry per subscription, parameterised by the GraphQL payload type.
reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger))
// Resolver: register a websocket consumer (key by company, user, …).
ch, cleanup, _ := reg.AddReceiver(companyID)
go func() { <-ctx.Done(); cleanup() }()
return ch, nil
// AMQP handler: gate the push on the read view, off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
basis, err := readView.FindEntryBasisById(ctx, id)
if err != nil {
return nil, false // transient read error — keep waiting
}
return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})
```
What the registry owns (so services don't re-roll it): the keyed subscriber map,
non-blocking buffered fan-out (sends under the read lock so a close can't race a
send), a bounded worker pool that runs the read-view gate **off** the AMQP
delivery goroutine, and the retry/timeout budget. What stays in the service: the
event→(key, payload) mapping and the `Producer` read-view closure.
The poke is idempotent and drop-tolerant — the client refetches on any poke — so
the worker acks immediately and a dropped/duplicated poke self-heals. Wire an
`Observer` to surface dropped/skipped pushes as metrics.