commit da4e7df6cee6c158cde1da1fa99bea537f2fba63 Author: Joakim Olsson Date: Tue Jun 16 14:22:34 2026 +0200 feat: type-generic registry for cross-service read-your-writes subscriptions The shared core of Shiny's cross-service read-your-writes GraphQL subscriptions (ADR-0012), extracted and hardened from the near-identical hand-rolled handlers in authz-service (availableCompanies) and accounting-service (entryBasesChanged) before a third copy is written. Registry[T] owns the keyed subscriber map, non-blocking buffered fan-out (sends under the read lock so a close can't race a send), a key-sharded worker pool that runs the read-view gate OFF the AMQP delivery goroutine (preserving per-key FIFO order while distinct keys run in parallel), the bounded retry/timeout budget, and Observer metric hooks. Services supply only the event->key+payload mapping, the read-view Producer closure, and the per-replica transient-consumer wiring. Reviewed pre-publish (Go + Event Sourcing + Architecture). 99% coverage, race-clean. Co-Authored-By: Claude Opus 4.8 (1M context) diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..04cd3ad --- /dev/null +++ b/.editorconfig @@ -0,0 +1,11 @@ +root = true + +[*] +end_of_line = lf +insert_final_newline = true +charset = utf-8 +trim_trailing_whitespace = true + +[*.go] +indent_style = tab +indent_size = 2 diff --git a/.gitea/workflows/ci.yaml b/.gitea/workflows/ci.yaml new file mode 100644 index 0000000..e42acf9 --- /dev/null +++ b/.gitea/workflows/ci.yaml @@ -0,0 +1,107 @@ +name: subscriptions + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + if: gitea.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: 'stable' + - name: Format check + run: | + go install mvdan.cc/gofumpt@latest + test -z "$(gofumpt -l .)" + - name: Run tests + run: go test -race -coverprofile=coverage.txt ./... + + - name: Filter test files from coverage + run: | + grep -v -E '_test\.go:' coverage.txt > coverage.filtered.txt || true + mv coverage.filtered.txt coverage.txt + + - name: Check coverage + id: coverage + run: | + go install github.com/vladopajic/go-test-coverage/v2@latest + go-test-coverage --config ./.testcoverage.yml --github-action-output + - name: Restore baseline coverage + uses: actions/cache/restore@v5 + with: + path: coverage-baseline.txt + key: coverage-baseline-${{ gitea.run_id }} + restore-keys: | + coverage-baseline- + - name: Compare coverage + run: | + CURRENT="${{ steps.coverage.outputs.total-coverage }}" + if [ -f coverage-baseline.txt ]; then + BASE=$(cat coverage-baseline.txt) + echo "Base coverage: ${BASE}%" + echo "Current coverage: ${CURRENT}%" + if [ "$(echo "$CURRENT < $BASE" | bc -l)" -eq 1 ]; then + echo "::error::Coverage decreased from ${BASE}% to ${CURRENT}%" + exit 1 + fi + echo "Coverage maintained or improved: ${BASE}% -> ${CURRENT}%" + else + echo "No baseline coverage found yet, skipping comparison" + echo "Current coverage: ${CURRENT}%" + fi + - name: Post coverage comment + env: + GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} + GITEA_URL: ${{ gitea.server_url }} + run: | + COVERAGE="${{ steps.coverage.outputs.total-coverage }}" + curl -X POST "${GITEA_URL}/api/v1/repos/${{ gitea.repository }}/issues/${{ gitea.event.pull_request.number }}/comments" \ + -H "Authorization: token ${GITEA_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"body\": \"## Coverage Report\n\nTotal coverage: **${COVERAGE}%**\"}" + + coverage-baseline: + # Records main's coverage into the Actions cache for the next PR's + # regression gate to read. Post-merge only, not a required check, blocks + # nothing (cf. ADR-0010). + if: gitea.event_name == 'push' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: 'stable' + - name: Compute coverage + id: coverage + run: | + go install github.com/vladopajic/go-test-coverage/v2@latest + go test -coverprofile=coverage.txt ./... + grep -v -E '_test\.go:' coverage.txt > coverage.filtered.txt || true + mv coverage.filtered.txt coverage.txt + go-test-coverage --config ./.testcoverage.yml --github-action-output + - name: Write baseline file + run: echo "${{ steps.coverage.outputs.total-coverage }}" > coverage-baseline.txt + - name: Save baseline to cache + uses: actions/cache/save@v5 + with: + path: coverage-baseline.txt + key: coverage-baseline-${{ gitea.run_id }} + + vulnerabilities: + if: gitea.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: 'stable' + - name: Check vulnerabilities + run: | + go install golang.org/x/vuln/cmd/govulncheck@latest + govulncheck ./... diff --git a/.gitea/workflows/pre-commit.yaml b/.gitea/workflows/pre-commit.yaml new file mode 100644 index 0000000..e427748 --- /dev/null +++ b/.gitea/workflows/pre-commit.yaml @@ -0,0 +1,25 @@ +name: pre-commit +permissions: read-all + +on: + pull_request: + push: + branches: + - main + +jobs: + pre-commit: + runs-on: ubuntu-latest + env: + SKIP: no-commit-to-branch + steps: + - uses: actions/checkout@v6 + - uses: actions/setup-go@v6 + with: + go-version: stable + - uses: actions/setup-python@v6 + with: + python-version: '3.14' + - name: Install goimports + run: go install golang.org/x/tools/cmd/goimports@latest + - uses: pre-commit/action@v3.0.1 diff --git a/.gitea/workflows/release.yaml b/.gitea/workflows/release.yaml new file mode 100644 index 0000000..ef6ec99 --- /dev/null +++ b/.gitea/workflows/release.yaml @@ -0,0 +1,9 @@ +name: Release + +on: + push: + branches: [main] + +jobs: + release: + uses: unboundsoftware/shared-workflows/.gitea/workflows/Release.yml@main diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ab2f1aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +.claude +/release +coverage.txt +coverage-baseline.txt diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..5381cc5 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,22 @@ +version: "2" +run: + allow-parallel-runners: true +linters: + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..02cc8ec --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,39 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v6.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + args: + - --allow-multiple-documents + - id: check-added-large-files +- repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook + rev: v9.25.0 + hooks: + - id: commitlint + stages: [ commit-msg ] + additional_dependencies: [ '@commitlint/config-conventional' ] +- repo: https://github.com/dnephin/pre-commit-golang + rev: v0.5.1 + hooks: + - id: go-mod-tidy + - id: go-imports + args: + - -local + - gitea.unbound.se/shiny/subscriptions +- repo: https://github.com/lietu/go-pre-commit + rev: v1.0.0 + hooks: + - id: go-test + - id: gofumpt +- repo: https://github.com/golangci/golangci-lint + rev: v2.12.2 + hooks: + - id: golangci-lint-full +- repo: https://github.com/gitleaks/gitleaks + rev: v8.30.1 + hooks: + - id: gitleaks diff --git a/.testcoverage.yml b/.testcoverage.yml new file mode 100644 index 0000000..7aec8b6 --- /dev/null +++ b/.testcoverage.yml @@ -0,0 +1,13 @@ +# Coverage configuration for go-test-coverage +# https://github.com/vladopajic/go-test-coverage + +profile: coverage.txt + +threshold: + file: 0 + package: 0 + total: 0 + +exclude: + paths: + - _test\.go$ diff --git a/.version b/.version new file mode 100644 index 0000000..557859c --- /dev/null +++ b/.version @@ -0,0 +1,3 @@ +{ + "version": "v0.1.0" +} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..30c79b3 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [0.1.0] - 2026-06-16 + +### πŸš€ Features + +- Initial version: type-generic `Registry[T]` for cross-service read-your-writes GraphQL subscriptions (ADR-0012) β€” keyed subscriber map, non-blocking fan-out (sends under the read lock), a bounded worker pool that runs the read-view gate off the AMQP delivery goroutine, and `Observer` metric hooks. Extracted and hardened from the hand-rolled `authz-service` and `accounting-service` copies. + + diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..399aa0a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,82 @@ +# subscriptions + +Shared Go library: the reusable core of Shiny's cross-service read-your-writes +GraphQL subscriptions. + +## Shared Documentation + +@../docs/claude/architecture.md +@../docs/claude/go-services.md +@../docs/claude/event-sourcing.md +@../docs/claude/conventions.md + +## Library Information + +### Purpose + +Single home for the subscription subscriber-registry + read-view gate + fan-out +that was hand-rolled (near-identically) in `authz-service/subscription` and +`accounting-service/subscription`. Implements the mechanism mandated by +**ADR-0012** (cross-service read-your-writes via owning-service subscriptions), +which is the concrete form of **ADR-0009** tier-3. ADR-0012 requires new +instances of the pattern to use this library rather than copy it. + +### Usage + +```go +import "gitea.unbound.se/shiny/subscriptions" + +// One registry per subscription field, parameterised by the GraphQL payload. +reg := subscriptions.New[model.EntryBasisChange]( + subscriptions.WithLogger(logger), + subscriptions.WithObserver(otelObserver), // optional metrics +) + +// Resolver β€” register the websocket consumer; cleanup on ctx.Done. +ch, cleanup, err := reg.AddReceiver(companyID) + +// AMQP Process β€” gate on the read view, push off the delivery goroutine. +reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) { + basis, err := readView.FindEntryBasisById(ctx, id) + if err != nil { return nil, false } + return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil) +}) +``` + +### Exported API + +- `New[T](opts...) *Registry[T]` β€” starts the worker pool; `Close()` stops it. +- `(*Registry[T]).AddReceiver(key) (<-chan *T, cleanup func(), error)` β€” register + a subscriber; the resolver returns the channel and calls cleanup on ctx.Done. +- `(*Registry[T]).Submit(key, Producer[T])` β€” from the AMQP handler; non-blocking. +- `Producer[T] func(ctx) (*T, ready bool)` β€” reads current read-view state, + returns the payload + whether the change is visible; retried until ready. +- Options: `WithLogger`, `WithObserver`, `WithReadRetry(attempts, wait)`, + `WithBufferSize`, `WithWorkers`, `WithQueueSize`. +- `Observer` β€” `PushSkipped`/`Dropped`/`ChannelFull` hooks for metrics. + +### Design notes (the load-bearing bits, per ADR-0012) + +- **Per-replica.** Feed `Submit` from a `goamqp.TransientEventStreamConsumer` on + the owning service's *own* events, so every replica sees every event and can + push to the websockets it holds β€” distinct from the shared durable read-view + consumer. +- **Read-view gate.** The `Producer` must read *current* read-view state on each + call (so out-of-order delivery across workers is still consistent) and report + not-ready on a transient read error. The registry retries until ready or the + budget elapses, so the client's refetch can't race the projection. +- **Off the delivery goroutine.** `Submit` enqueues to a bounded worker pool and + returns; the AMQP message is acked immediately. The poke is idempotent and + drop-tolerant, so losing at-least-once on the poke is fine β€” the client + refetches on any poke. +- **No send on closed channel.** Pushes happen under the read lock; cleanup + closes under the write lock. + +### Conventions + +Standard Shiny library scaffolding: `gofumpt`/`goimports -local`, golangci-lint, +gitleaks and conventional-commit checks via pre-commit; coverage-regression gate +in CI (`.testcoverage.yml`); releases auto-tagged from conventional commits by +the shared Release workflow. Bump consuming services' `go.mod` after a release. +This library is concurrency-critical β€” always run `go test -race` and keep the +concurrent-churn test green before changing the locking or worker model. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ed1e440 --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# subscriptions + +Shared core for Shiny's cross-service read-your-writes GraphQL subscriptions +(ADR-0009 tier-3, ADR-0012). + +An entity shown in the UI is frequently projected from *another* service's +event, so the owning service exposes a GraphQL subscription, drives it from a +per-replica transient AMQP consumer, and pushes a lightweight poke once the +change is visible in its own read view β€” the client then refetches the +authoritative query. This package is the reusable, type-generic, hardened core +of that pattern, extracted from the hand-rolled copies in `authz-service` +(`availableCompanies`) and `accounting-service` (`entryBasesChanged`). + +```go +import "gitea.unbound.se/shiny/subscriptions" + +// One registry per subscription, parameterised by the GraphQL payload type. +reg := subscriptions.New[model.EntryBasisChange](subscriptions.WithLogger(logger)) + +// Resolver: register a websocket consumer (key by company, user, …). +ch, cleanup, _ := reg.AddReceiver(companyID) +go func() { <-ctx.Done(); cleanup() }() +return ch, nil + +// AMQP handler: gate the push on the read view, off the delivery goroutine. +reg.Submit(ev.CompanyID, func(ctx context.Context) (*model.EntryBasisChange, bool) { + basis, err := readView.FindEntryBasisById(ctx, id) + if err != nil { + return nil, false // transient read error β€” keep waiting + } + return &model.EntryBasisChange{ID: id, Removed: removed}, removed == (basis == nil) +}) +``` + +What the registry owns (so services don't re-roll it): the keyed subscriber map, +non-blocking buffered fan-out (sends under the read lock so a close can't race a +send), a bounded worker pool that runs the read-view gate **off** the AMQP +delivery goroutine, and the retry/timeout budget. What stays in the service: the +eventβ†’(key, payload) mapping and the `Producer` read-view closure. + +The poke is idempotent and drop-tolerant β€” the client refetches on any poke β€” so +the worker acks immediately and a dropped/duplicated poke self-heals. Wire an +`Observer` to surface dropped/skipped pushes as metrics. diff --git a/cliff.toml b/cliff.toml new file mode 100644 index 0000000..ac04085 --- /dev/null +++ b/cliff.toml @@ -0,0 +1,80 @@ +# git-cliff ~ default configuration file +# https://git-cliff.org/docs/configuration +# +# Lines starting with "#" are comments. +# Configuration options are organized into tables and keys. +# See documentation for more information on available options. + +[changelog] +# template for the changelog header +header = """ +# Changelog\n +All notable changes to this project will be documented in this file.\n +""" +# template for the changelog body +# https://keats.github.io/tera/docs/#introduction +body = """ +{% if version %}\ + ## [{{ version | trim_start_matches(pat="v") }}] - {{ timestamp | date(format="%Y-%m-%d") }} +{% else %}\ + ## [unreleased] +{% endif %}\ +{% for group, commits in commits | group_by(attribute="group") %} + ### {{ group | striptags | trim | upper_first }} + {% for commit in commits %} + - {% if commit.scope %}*({{ commit.scope }})* {% endif %}\ + {% if commit.breaking %}[**breaking**] {% endif %}\ + {{ commit.message | upper_first }}\ + {% endfor %} +{% endfor %}\n +""" +# template for the changelog footer +footer = """ + +""" +# remove the leading and trailing s +trim = true +# postprocessors +postprocessors = [ + # { pattern = '', replace = "https://github.com/orhun/git-cliff" }, # replace repository URL +] +# render body even when there are no releases to process +# render_always = true +# output file path +# output = "test.md" + +[git] +# parse the commits based on https://www.conventionalcommits.org +conventional_commits = true +# filter out the commits that are not conventional +filter_unconventional = true +# process each line of a commit as an individual commit +split_commits = false +# regex for preprocessing the commit messages +commit_preprocessors = [ + # Replace issue numbers + #{ pattern = '\((\w+\s)?#([0-9]+)\)', replace = "([#${2}](/issues/${2}))"}, + # Check spelling of the commit with https://github.com/crate-ci/typos + # If the spelling is incorrect, it will be automatically fixed. + #{ pattern = '.*', replace_command = 'typos --write-changes -' }, +] +# regex for parsing and grouping commits +commit_parsers = [ + { message = "^feat", group = "πŸš€ Features" }, + { message = "^fix", group = "πŸ› Bug Fixes" }, + { message = "^doc", group = "πŸ“š Documentation" }, + { message = "^perf", group = "⚑ Performance" }, + { message = "^refactor", group = "🚜 Refactor" }, + { message = "^style", group = "🎨 Styling" }, + { message = "^test", group = "πŸ§ͺ Testing" }, + { message = "^chore\\(release\\): prepare for", skip = true }, + { message = "^chore|^ci", group = "βš™οΈ Miscellaneous Tasks" }, + { body = ".*security", group = "πŸ›‘οΈ Security" }, + { message = "^revert", group = "◀️ Revert" }, +] +# filter out the commits that are not matched by commit parsers +filter_commits = false +# sort the tags topologically +topo_order = false +# sort the commits inside sections by oldest/newest order +sort_commits = "oldest" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0134165 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module gitea.unbound.se/shiny/subscriptions + +go 1.25 + +require ( + github.com/google/uuid v1.6.0 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..42976e0 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/renovate.json b/renovate.json new file mode 100644 index 0000000..5db72dd --- /dev/null +++ b/renovate.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:recommended" + ] +} diff --git a/subscriptions.go b/subscriptions.go new file mode 100644 index 0000000..4b52bed --- /dev/null +++ b/subscriptions.go @@ -0,0 +1,421 @@ +// Package subscriptions provides a reusable, type-generic registry for fanning +// out change notifications to in-process GraphQL subscription consumers. +// +// It is the shared core of Shiny's cross-service read-your-writes pattern +// (ADR-0012): an entity shown in the UI is frequently projected from another +// service's event, so the owning service exposes a GraphQL subscription, drives +// it from a per-replica transient AMQP consumer, and pushes a notification once +// the change is visible in its own read view β€” the client then refetches the +// authoritative query. Two services hand-rolled this (authz-service's +// availableCompanies, accounting-service's entryBasesChanged); this package is +// the extracted, hardened core so further cases reuse it instead of copying it. +// +// # Wiring requirement (do not get this wrong) +// +// Submit MUST be fed from a per-replica goamqp.TransientEventStreamConsumer (an +// exclusive, randomly-named queue) bound to the owning service's OWN events, so +// every replica receives every event and can push to the websockets it holds. +// This is necessarily a DIFFERENT consumer from the shared, durable read-view +// projection consumer (a work-queue, where exactly one replica handles each +// event). Wiring Submit to a shared/durable consumer silently breaks delivery in +// a multi-replica deployment: the one replica that handles an event usually does +// not hold the subscriber's websocket, so the poke is lost with no error. The +// library cannot enforce this (it is transport-agnostic) β€” the caller must. +// +// # Concurrency model +// +// - AddReceiver registers a subscriber (one per websocket) and returns a +// buffered channel plus a cleanup func that must be called when the +// subscription ends. +// - Submit is called from the AMQP event handler. It does NOT block that +// handler on the read view: it hands the work to a per-key worker that waits +// (with a budget) for the read view to reflect the change β€” via the caller's +// [Producer] β€” and only then pushes. Acking the AMQP message immediately is +// safe because notifications are idempotent and drop-tolerant (see below). +// - Work is sharded by key, so all events for one key are processed FIFO by a +// single worker (preserving per-key order even for payloads the client +// consumes directly), while distinct keys run in parallel (so one lagging +// read view only delays its own key's shard, not everything). +// +// Pushes happen while holding the read lock, and cleanup closes a subscriber's +// channel under the write lock, so a send can never race a close ("send on +// closed channel"). A full subscriber buffer drops the notification rather than +// blocking a slow consumer. +// +// # Payload contract +// +// T should be a lightweight notification the client reacts to by refetching the +// authoritative query (a poke such as {id, removed}), not authoritative state +// the client consumes as the source of truth. Notifications may be dropped (full +// queue or buffer) and the push is best-effort (no AMQP requeue on a persistent +// read failure), so reliability comes from the client's idempotent refetch. +// Per-key FIFO ordering is preserved, so a payload the client does consume +// directly is at least delivered in event order for a given key β€” but it can +// still be dropped, so a refetch-on-receipt design is strongly preferred. +package subscriptions + +import ( + "context" + "errors" + "hash/fnv" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" +) + +// ErrEmptyKey is returned by AddReceiver when the subscription key is empty, +// which almost always indicates an unpopulated id at the call site. +var ErrEmptyKey = errors.New("subscriptions: empty subscription key") + +// Producer reads the owning service's read view and returns the payload to push +// together with whether the change is yet visible there. +// +// The registry calls a Producer repeatedly until it reports ready (or a bounded +// budget elapses), so the client's refetch can never race ahead of the +// projection. A Producer MUST read current read-view state on each call (rather +// than capturing the event's historical state). On a transient read error it +// should return (nil, false) to keep waiting β€” the event is already durable, so +// only the projection is lagging. +// +// Convergence precondition: "retry until ready" only terminates as ready (rather +// than burning the whole budget then skipping) if the read view the Producer +// gates on is made consistent by the SAME ordered aggregate stream as the +// triggering event. Gating on a row populated by a different aggregate's events +// can fail to converge. +type Producer[T any] func(ctx context.Context) (payload *T, ready bool) + +// Observer receives best-effort notifications about pushes, for +// metrics/observability. Its methods may be called concurrently. The default is +// a no-op; wire an implementation (e.g. OTel counters) via [WithObserver]. +type Observer interface { + // Pushed reports that a change was gated and delivered to the key's + // subscribers β€” the denominator for a skip/drop rate. + Pushed(key string) + // PushSkipped reports that the read view never reflected the change within + // the retry budget, so the push was skipped. + PushSkipped(key string) + // Dropped reports that the worker queue was full, so the notification was + // dropped before it could be gated. + Dropped(key string) + // ChannelFull reports that a subscriber's buffer was full, so its + // notification was dropped. + ChannelFull(key string) +} + +type noopObserver struct{} + +func (noopObserver) Pushed(string) {} +func (noopObserver) PushSkipped(string) {} +func (noopObserver) Dropped(string) {} +func (noopObserver) ChannelFull(string) {} + +type subscriber[T any] struct { + id string + channel chan *T +} + +type job[T any] struct { + key string + produce Producer[T] +} + +// Registry fans out notifications of type T to in-process subscribers keyed by +// an arbitrary string (e.g. a company id or a user email). The zero value is +// not usable; construct one with [New]. +type Registry[T any] struct { + logger *slog.Logger + obs Observer + bufferSize int + retries int + retryWait time.Duration + + mu sync.RWMutex + subscribers map[string]map[string]*subscriber[T] + + shards []chan job[T] + baseCtx context.Context + cancel context.CancelFunc + done chan struct{} + wg sync.WaitGroup + closeOnce sync.Once +} + +type config struct { + logger *slog.Logger + obs Observer + bufferSize int + retries int + retryWait time.Duration + workers int + queueSize int +} + +// Option configures a [Registry]. +type Option func(*config) + +// WithLogger sets the structured logger. Defaults to [slog.Default]. +func WithLogger(l *slog.Logger) Option { + return func(c *config) { + if l != nil { + c.logger = l + } + } +} + +// WithObserver wires a metrics observer. Defaults to a no-op. +func WithObserver(o Observer) Option { + return func(c *config) { + if o != nil { + c.obs = o + } + } +} + +// WithReadRetry tunes how long a worker waits for the read view to reflect a +// change before giving up on the push: up to attempts re-reads spaced by wait +// (so total reads = attempts+1; default 25 Γ— 200ms β‰ˆ 5s). The read-view consumer +// and this subscription consumer are independent consumers of the same event, so +// a freshly-projected change may not be visible on the first read; retrying +// avoids pushing a notification the client would refetch ahead of. Non-positive +// values are ignored (attempts clamps to β‰₯0). +func WithReadRetry(attempts int, wait time.Duration) Option { + return func(c *config) { + if attempts >= 0 { + c.retries = attempts + } + if wait > 0 { + c.retryWait = wait + } + } +} + +// WithBufferSize sets each subscriber channel's buffer (default 20). A full +// buffer drops the notification rather than blocking. +func WithBufferSize(n int) Option { + return func(c *config) { + if n > 0 { + c.bufferSize = n + } + } +} + +// WithWorkers sets the number of key-shard workers (default 4). Each key is +// handled FIFO by exactly one worker; more workers spread distinct keys over +// more goroutines so a lagging read view delays only its own shard. +func WithWorkers(n int) Option { + return func(c *config) { + if n > 0 { + c.workers = n + } + } +} + +// WithQueueSize sets each shard's job-queue depth (default 64). A full queue +// drops the notification (reported via [Observer.Dropped]) rather than blocking +// the AMQP delivery goroutine. +func WithQueueSize(n int) Option { + return func(c *config) { + if n > 0 { + c.queueSize = n + } + } +} + +// New builds a Registry and starts its key-shard workers. Call [Registry.Close] +// to stop them (optional; they exit with the process otherwise). +func New[T any](opts ...Option) *Registry[T] { + c := &config{ + logger: slog.Default(), + obs: noopObserver{}, + bufferSize: 20, + retries: 25, + retryWait: 200 * time.Millisecond, + workers: 4, + queueSize: 64, + } + for _, o := range opts { + o(c) + } + ctx, cancel := context.WithCancel(context.Background()) + r := &Registry[T]{ + logger: c.logger, + obs: c.obs, + bufferSize: c.bufferSize, + retries: c.retries, + retryWait: c.retryWait, + subscribers: make(map[string]map[string]*subscriber[T]), + shards: make([]chan job[T], c.workers), + baseCtx: ctx, + cancel: cancel, + done: make(chan struct{}), + } + r.wg.Add(c.workers) + for i := range r.shards { + r.shards[i] = make(chan job[T], c.queueSize) + go r.worker(r.shards[i]) + } + return r +} + +// AddReceiver registers a subscriber for the given key. It returns the channel +// to stream and a cleanup func that MUST be called when the subscription ends +// (e.g. from the resolver's ctx.Done) to close the channel and release the +// registration. cleanup is idempotent. Returns [ErrEmptyKey] for an empty key. +func (r *Registry[T]) AddReceiver(key string) (<-chan *T, func(), error) { + if key == "" { + return nil, nil, ErrEmptyKey + } + + s := &subscriber[T]{ + id: uuid.NewString(), + channel: make(chan *T, r.bufferSize), + } + + r.mu.Lock() + if r.subscribers[key] == nil { + r.subscribers[key] = make(map[string]*subscriber[T]) + } + r.subscribers[key][s.id] = s + total := len(r.subscribers[key]) + r.mu.Unlock() + + r.logger.Info("subscription registered", + "key", key, "subscription_id", s.id, "total_subscriptions", total) + + cleanup := func() { r.removeReceiver(key, s.id) } + return s.channel, cleanup, nil +} + +func (r *Registry[T]) removeReceiver(key, id string) { + r.mu.Lock() + defer r.mu.Unlock() + + subs := r.subscribers[key] + if subs == nil { + return + } + s, ok := subs[id] + if !ok { + return + } + close(s.channel) + delete(subs, id) + remaining := len(subs) + if remaining == 0 { + delete(r.subscribers, key) + } + r.logger.Info("subscription removed", + "key", key, "subscription_id", id, "remaining_subscriptions", remaining) +} + +// Submit schedules a gated push for the given key. It returns immediately: when +// the key is empty or has no subscribers it does nothing, otherwise it enqueues +// work for that key's shard worker (dropping, with an [Observer.Dropped], only if +// the shard queue is full). produce is invoked on the worker, not on the calling +// goroutine. +func (r *Registry[T]) Submit(key string, produce Producer[T]) { + if key == "" || !r.hasSubscribers(key) { + return + } + shard := r.shards[shardIndex(key, len(r.shards))] + select { + case shard <- job[T]{key: key, produce: produce}: + case <-r.done: + // shutting down + default: + r.logger.Warn("subscription job queue full; dropping notification", "key", key) + r.obs.Dropped(key) + } +} + +// Close stops the worker pool and waits for in-flight gating to finish. It +// cancels any in-flight read-view wait so workers return promptly rather than +// blocking for the full retry budget. Queued-but-unstarted notifications are +// dropped (safe β€” they are drop-tolerant). Idempotent; Submit after Close is a +// no-op. +func (r *Registry[T]) Close() { + r.closeOnce.Do(func() { + r.cancel() + close(r.done) + r.wg.Wait() + }) +} + +func (r *Registry[T]) hasSubscribers(key string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.subscribers[key]) > 0 +} + +func (r *Registry[T]) worker(shard <-chan job[T]) { + defer r.wg.Done() + for { + select { + case j := <-shard: + r.handle(j) + case <-r.done: + return + } + } +} + +func (r *Registry[T]) handle(j job[T]) { + // Bound the read-view wait so a hung read or a never-reflected event (e.g. + // redelivery of an event for a since-deleted entity) can't pin a worker. + // Derived from baseCtx so Close cancels in-flight waits promptly. + budget := time.Duration(r.retries+1) * r.retryWait + ctx, cancel := context.WithTimeout(r.baseCtx, budget) + defer cancel() + + payload, ok := r.await(ctx, j.produce) + if !ok { + r.logger.Warn("change not visible in read view after retries; subscription push skipped", + "key", j.key) + r.obs.PushSkipped(j.key) + return + } + r.obs.Pushed(j.key) + r.push(j.key, payload) +} + +func (r *Registry[T]) await(ctx context.Context, produce Producer[T]) (*T, bool) { + for attempt := 0; ; attempt++ { + if payload, ready := produce(ctx); ready { + return payload, true + } + if attempt >= r.retries { + return nil, false + } + select { + case <-ctx.Done(): + return nil, false + case <-time.After(r.retryWait): + } + } +} + +// push delivers payload to every subscriber of key. The sends happen under the +// read lock: sendNonBlocking never blocks (it drops on a full buffer), so +// holding the lock is cheap, and it prevents removeReceiver β€” which takes the +// write lock to close a channel β€” from closing a channel out from under an +// in-flight send. +func (r *Registry[T]) push(key string, payload *T) { + r.mu.RLock() + defer r.mu.RUnlock() + for _, s := range r.subscribers[key] { + select { + case s.channel <- payload: + default: + r.logger.Warn("subscription channel full; dropping notification", "key", key) + r.obs.ChannelFull(key) + } + } +} + +func shardIndex(key string, n int) int { + h := fnv.New32a() + _, _ = h.Write([]byte(key)) + return int(h.Sum32() % uint32(n)) //nolint:gosec // modulo keeps the result in [0,n) +} diff --git a/subscriptions_test.go b/subscriptions_test.go new file mode 100644 index 0000000..4d16d3b --- /dev/null +++ b/subscriptions_test.go @@ -0,0 +1,383 @@ +package subscriptions + +import ( + "context" + "fmt" + "io" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type ping struct { + ID string +} + +func quiet() Option { + return WithLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) +} + +// recordObserver records the metric callbacks; safe for concurrent use. +type recordObserver struct { + mu sync.Mutex + pushed []string + skipped []string + dropped []string + channelFull []string +} + +func (o *recordObserver) Pushed(k string) { o.add(&o.pushed, k) } +func (o *recordObserver) PushSkipped(k string) { o.add(&o.skipped, k) } +func (o *recordObserver) Dropped(k string) { o.add(&o.dropped, k) } +func (o *recordObserver) ChannelFull(k string) { o.add(&o.channelFull, k) } + +func (o *recordObserver) add(dst *[]string, k string) { + o.mu.Lock() + defer o.mu.Unlock() + *dst = append(*dst, k) +} + +func (o *recordObserver) count(get func(*recordObserver) []string) int { + o.mu.Lock() + defer o.mu.Unlock() + return len(get(o)) +} + +func (o *recordObserver) pushedCount() int { + return o.count(func(r *recordObserver) []string { return r.pushed }) +} + +func (o *recordObserver) skippedCount() int { + return o.count(func(r *recordObserver) []string { return r.skipped }) +} + +func (o *recordObserver) droppedCount() int { + return o.count(func(r *recordObserver) []string { return r.dropped }) +} + +func (o *recordObserver) channelFullCount() int { + return o.count(func(r *recordObserver) []string { return r.channelFull }) +} + +func ready(p *ping) Producer[ping] { + return func(context.Context) (*ping, bool) { return p, true } +} + +func never() Producer[ping] { + return func(context.Context) (*ping, bool) { return nil, false } +} + +func recv(t *testing.T, ch <-chan *ping) *ping { + t.Helper() + select { + case v := <-ch: + return v + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for a push") + return nil + } +} + +func assertNoPush(t *testing.T, ch <-chan *ping) { + t.Helper() + select { + case v := <-ch: + t.Fatalf("unexpected push: %+v", v) + case <-time.After(100 * time.Millisecond): + } +} + +func TestSubmit_pushesWhenReady(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + ch, cleanup, err := r.AddReceiver("c1") + require.NoError(t, err) + defer cleanup() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + + assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch)) +} + +func TestSubmit_noSubscribers_doesNotEnqueue(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + var calls int + r.Submit("c1", func(context.Context) (*ping, bool) { + calls++ + return &ping{ID: "x"}, true + }) + + // No subscribers β†’ Submit returns on the fast path without enqueuing, so the + // producer is never invoked. + assert.Equal(t, 0, calls) +} + +func TestSubmit_emptyKey_isNoop(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + var calls int + r.Submit("", func(context.Context) (*ping, bool) { + calls++ + return nil, true + }) + assert.Equal(t, 0, calls) +} + +func TestSubmit_lagThenReady(t *testing.T) { + r := New[ping](quiet(), WithReadRetry(50, time.Millisecond)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + var mu sync.Mutex + calls := 0 + r.Submit("c1", func(context.Context) (*ping, bool) { + mu.Lock() + defer mu.Unlock() + calls++ + // Not visible on the first read; visible thereafter. + return &ping{ID: "eb1"}, calls > 1 + }) + + assert.Equal(t, &ping{ID: "eb1"}, recv(t, ch)) +} + +func TestSubmit_neverReady_skipsAndReports(t *testing.T) { + obs := &recordObserver{} + r := New[ping](quiet(), WithObserver(obs), WithReadRetry(2, time.Millisecond)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + r.Submit("c1", never()) + + assert.Eventually(t, func() bool { return obs.skippedCount() == 1 }, time.Second, 5*time.Millisecond) + assert.Equal(t, 0, obs.pushedCount()) + assertNoPush(t, ch) +} + +func TestObserver_pushedOnDelivery(t *testing.T) { + obs := &recordObserver{} + r := New[ping](quiet(), WithObserver(obs)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + recv(t, ch) + + assert.Eventually(t, func() bool { return obs.pushedCount() == 1 }, time.Second, 5*time.Millisecond) +} + +func TestPush_allSubscribersOfKey(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + chA, cleanupA, _ := r.AddReceiver("c1") + defer cleanupA() + chB, cleanupB, _ := r.AddReceiver("c1") + defer cleanupB() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + + assert.Equal(t, "eb1", recv(t, chA).ID) + assert.Equal(t, "eb1", recv(t, chB).ID) +} + +func TestSubmit_keysIsolated(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + chA, cleanupA, _ := r.AddReceiver("c1") + defer cleanupA() + chB, cleanupB, _ := r.AddReceiver("c2") + defer cleanupB() + + r.Submit("c1", ready(&ping{ID: "eb1"})) + + assert.Equal(t, "eb1", recv(t, chA).ID) + assertNoPush(t, chB) +} + +// TestPerKeyOrdering asserts that events for one key are delivered FIFO even +// across the worker pool (same key β†’ same shard β†’ one worker). This guards the +// ordering guarantee a payload the client consumes directly relies on. +func TestPerKeyOrdering(t *testing.T) { + r := New[ping](quiet(), WithWorkers(4)) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + const n = 10 + for i := range n { + r.Submit("c1", ready(&ping{ID: fmt.Sprintf("e%02d", i)})) + } + for i := range n { + assert.Equal(t, fmt.Sprintf("e%02d", i), recv(t, ch).ID) + } +} + +func TestAddReceiver_emptyKeyErrors(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + ch, cleanup, err := r.AddReceiver("") + assert.ErrorIs(t, err, ErrEmptyKey) + assert.Nil(t, ch) + assert.Nil(t, cleanup) +} + +func TestRemoveReceiver_closesChannel(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + ch, cleanup, _ := r.AddReceiver("c1") + cleanup() + + // Draining a closed channel terminates the range/returns ok=false. + _, ok := <-ch + assert.False(t, ok) +} + +func TestRemoveReceiver_idempotent(t *testing.T) { + r := New[ping](quiet()) + defer r.Close() + + _, cleanup, _ := r.AddReceiver("c1") + cleanup() + assert.NotPanics(t, cleanup) // second call is a no-op +} + +func TestChannelFull_dropsAndReports(t *testing.T) { + obs := &recordObserver{} + // One worker so the two pushes are serialized; buffer 1 so the second drops. + r := New[ping](quiet(), WithObserver(obs), WithBufferSize(1), WithWorkers(1)) + defer r.Close() + + _, cleanup, _ := r.AddReceiver("c1") // never read from the channel + defer cleanup() + + r.Submit("c1", ready(&ping{ID: "first"})) + r.Submit("c1", ready(&ping{ID: "second"})) + + assert.Eventually(t, func() bool { return obs.channelFullCount() == 1 }, time.Second, 5*time.Millisecond) +} + +func TestSubmit_queueFull_dropsAndReports(t *testing.T) { + obs := &recordObserver{} + started := make(chan struct{}) + release := make(chan struct{}) + + r := New[ping](quiet(), WithObserver(obs), WithWorkers(1), WithQueueSize(1), WithReadRetry(0, time.Millisecond)) + defer r.Close() // runs last + defer close(release) // runs before Close, unblocking the worker + + _, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + var once sync.Once + r.Submit("c1", func(context.Context) (*ping, bool) { + once.Do(func() { close(started) }) + <-release + return &ping{ID: "blocking"}, true + }) + <-started // the single worker is now blocked in produce; its shard queue is empty + + r.Submit("c1", ready(&ping{ID: "fills-queue"})) // occupies the cap-1 shard queue + r.Submit("c1", ready(&ping{ID: "dropped"})) // queue full β†’ dropped + + assert.Eventually(t, func() bool { return obs.droppedCount() >= 1 }, time.Second, 5*time.Millisecond) +} + +func TestClose_isIdempotentAndSubmitAfterCloseIsNoop(t *testing.T) { + r := New[ping](quiet()) + r.Close() + r.Close() // idempotent + + assert.NotPanics(t, func() { + r.Submit("c1", ready(&ping{ID: "x"})) + }) +} + +// TestClose_cancelsInFlightGate asserts Close returns promptly even while a +// worker is mid-gate, rather than blocking for the full retry budget (~5s by +// default). Guards the registry-level context cancellation. +func TestClose_cancelsInFlightGate(t *testing.T) { + started := make(chan struct{}) + r := New[ping](quiet()) // default retry budget β‰ˆ 5s + + _, cleanup, _ := r.AddReceiver("c1") + defer cleanup() + + var once sync.Once + r.Submit("c1", func(context.Context) (*ping, bool) { + once.Do(func() { close(started) }) + return nil, false // never ready β†’ worker stays in the gate loop + }) + <-started // worker is now in the gate loop + + done := make(chan struct{}) + go func() { r.Close(); close(done) }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Close did not return promptly while a worker was gating") + } +} + +// TestConcurrentChurn stresses the race between removeReceiver (which closes a +// subscriber channel under the write lock) and a worker push (which sends under +// the read lock). Before sends were moved under the read lock this panicked +// with "send on closed channel". Run with -race. +func TestConcurrentChurn(t *testing.T) { + r := New[ping](quiet(), WithReadRetry(0, time.Millisecond), WithWorkers(8)) + defer r.Close() + + // A long-lived reader so there is always at least one subscriber being + // pushed to while others churn. + _, steadyCleanup, _ := r.AddReceiver("c1") + defer steadyCleanup() + go func() { + ch, _, _ := r.AddReceiver("c1") + for range ch { //nolint:revive // drain + } + }() + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + for range 2000 { + _, cleanup, err := r.AddReceiver("c1") + if err != nil { + t.Errorf("AddReceiver: %v", err) + return + } + cleanup() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for range 2000 { + r.Submit("c1", ready(&ping{ID: "x"})) + } + }() + + wg.Wait() +}