Files
argoyle da4e7df6ce
subscriptions / test (push) Has been skipped
subscriptions / vulnerabilities (push) Has been skipped
Release / release (push) Successful in 38s
subscriptions / coverage-baseline (push) Successful in 2m58s
pre-commit / pre-commit (push) Successful in 4m58s
feat: type-generic registry for cross-service read-your-writes subscriptions
The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions
(ADR-0012), extracted and hardened from the near-identical hand-rolled handlers
in authz-service (availableCompanies) and accounting-service (entryBasesChanged)
before a third copy is written.

Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends
under the read lock so a close can't race a send), a key-sharded worker pool that
runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO
order while distinct keys run in parallel), the bounded retry/timeout budget, and
Observer metric hooks. Services supply only the event->key+payload mapping, the
read-view Producer closure, and the per-replica transient-consumer wiring.

Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:22:34 +02:00

3.6 KiB

subscriptions

Shared Go library: the reusable core of Shiny's cross-service read-your-writes GraphQL subscriptions.

Shared Documentation

@../docs/claude/architecture.md @../docs/claude/go-services.md @../docs/claude/event-sourcing.md @../docs/claude/conventions.md

Library Information

Purpose

Single home for the subscription subscriber-registry + read-view gate + fan-out that was hand-rolled (near-identically) in authz-service/subscription and accounting-service/subscription. Implements the mechanism mandated by ADR-0012 (cross-service read-your-writes via owning-service subscriptions), which is the concrete form of ADR-0009 tier-3. ADR-0012 requires new instances of the pattern to use this library rather than copy it.

Usage

import "gitea.unbound.se/shiny/subscriptions"

// One registry per subscription field, parameterised by the GraphQL payload.
reg := subscriptions.New[model.EntryBasisChange](
    subscriptions.WithLogger(logger),
    subscriptions.WithObserver(otelObserver), // optional metrics
)

// Resolver — register the websocket consumer; cleanup on ctx.Done.
ch, cleanup, err := reg.AddReceiver(companyID)

// AMQP Process — gate on the read view, push off the delivery goroutine.
reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) {
    basis, err := readView.FindEntryBasisById(ctx, id)
    if err != nil { return nil, false }
    return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil)
})

Exported API

  • New[T](opts...) *Registry[T] — starts the worker pool; Close() stops it.
  • (*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error) — register a subscriber; the resolver returns the channel and calls cleanup on ctx.Done.
  • (*Registry[T]).Submit(key, Producer[T]) — from the AMQP handler; non-blocking.
  • Producer[T] func(ctx) (*T, ready bool) — reads current read-view state, returns the payload + whether the change is visible; retried until ready.
  • Options: WithLogger, WithObserver, WithReadRetry(attempts, wait), WithBufferSize, WithWorkers, WithQueueSize.
  • ObserverPushSkipped/Dropped/ChannelFull hooks for metrics.

Design notes (the load-bearing bits, per ADR-0012)

  • Per-replica. Feed Submit from a goamqp.TransientEventStreamConsumer on the owning service's own events, so every replica sees every event and can push to the websockets it holds — distinct from the shared durable read-view consumer.
  • Read-view gate. The Producer must read current read-view state on each call (so out-of-order delivery across workers is still consistent) and report not-ready on a transient read error. The registry retries until ready or the budget elapses, so the client's refetch can't race the projection.
  • Off the delivery goroutine. Submit enqueues to a bounded worker pool and returns; the AMQP message is acked immediately. The poke is idempotent and drop-tolerant, so losing at-least-once on the poke is fine — the client refetches on any poke.
  • No send on closed channel. Pushes happen under the read lock; cleanup closes under the write lock.

Conventions

Standard Shiny library scaffolding: gofumpt/goimports -local, golangci-lint, gitleaks and conventional-commit checks via pre-commit; coverage-regression gate in CI (.testcoverage.yml); releases auto-tagged from conventional commits by the shared Release workflow. Bump consuming services' go.mod after a release. This library is concurrency-critical — always run go test -race and keep the concurrent-churn test green before changing the locking or worker model.