From e2ab13168467d93419e16de97989471786ffe0ef Mon Sep 17 00:00:00 2001 From: Joakim Olsson Date: Tue, 26 May 2026 19:47:59 +0200 Subject: [PATCH] feat: add eventsourced MetricsRecorder adapter for OpenTelemetry NewEventsourcedMetrics returns an eventsourced.MetricsRecorder that maps the framework's Metric values (command duration, event store/load, snapshots, idempotency checks) onto OTel instruments on the global MeterProvider set by SetupOTelSDK. Intended for pg.WithMetrics and eventsourced.WithMetrics. --- go.mod | 3 +- go.sum | 2 + metrics.go | 123 ++++++++++++++++++++++++++++++++++++++++++++++++ metrics_test.go | 75 +++++++++++++++++++++++++++++ 4 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 metrics.go create mode 100644 metrics_test.go diff --git a/go.mod b/go.mod index 5b38aea..8f006a0 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,13 @@ go 1.25.0 require ( github.com/99designs/gqlgen v0.17.90 + gitlab.com/unboundsoftware/eventsourced/eventsourced v1.23.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.19.0 go.opentelemetry.io/otel/log v0.19.0 + go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/sdk/log v0.19.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 @@ -26,7 +28,6 @@ require ( github.com/vektah/gqlparser/v2 v2.5.33 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect - go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/proto/otlp v1.10.0 // indirect golang.org/x/net v0.52.0 // indirect golang.org/x/sync v0.20.0 // indirect diff --git a/go.sum b/go.sum index c7aec97..45dd6c3 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/vektah/gqlparser/v2 v2.5.33 h1:lRp8aIeNUNbimf/axZd7ETg24q06hBtPaas+TcvI/7E= github.com/vektah/gqlparser/v2 v2.5.33/go.mod h1:c1I28gSOVNzlfc4WuDlqU7voQnsqI6OG2amkBAFmgts= +gitlab.com/unboundsoftware/eventsourced/eventsourced v1.23.0 h1:qcteJH9D7kHaOgLQ0fzlW9dv42hSa0Vluqt7p4kooWA= +gitlab.com/unboundsoftware/eventsourced/eventsourced v1.23.0/go.mod h1:LrA7I7etRmhIC1PjO8c26BHm+gWsy2rC3eSMe5+XUWE= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..d9ab078 --- /dev/null +++ b/metrics.go @@ -0,0 +1,123 @@ +package otelsetup + +import ( + "context" + "errors" + + "gitlab.com/unboundsoftware/eventsourced/eventsourced" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// eventsourcedMeterName is the instrumentation scope for the event-sourcing +// metrics emitted by the adapter returned from NewEventsourcedMetrics. +const eventsourcedMeterName = "gitea.unbound.se/shiny/otelsetup/eventsourced" + +// durationBucketsSeconds are explicit histogram boundaries tuned for +// sub-second event-store and command latencies. The SDK default boundaries are +// scaled for milliseconds, which would bucket nearly every second-valued +// observation into the first bucket and make percentiles useless. +var durationBucketsSeconds = []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + +// eventsourcedMetrics implements eventsourced.MetricsRecorder by translating +// the framework's Metric values into OpenTelemetry instruments registered on +// the global MeterProvider configured by SetupOTelSDK. +// +// The OTel metric instruments are safe for concurrent use, and the struct is +// immutable after construction, so Record may be called from multiple +// goroutines as the framework requires. +// +// Operation counts are read off each duration histogram's generated _count +// series rather than separate counters; the only standalone counters carry +// information a histogram count cannot (events.loaded sums the number of events +// per load, idempotency.checks counts lookups that have no duration). +type eventsourcedMetrics struct { + commandDuration metric.Float64Histogram + eventStoreDur metric.Float64Histogram + eventsLoaded metric.Int64Counter + eventLoadDur metric.Float64Histogram + snapshotStoreDur metric.Float64Histogram + snapshotLoadDur metric.Float64Histogram + idempotencyCheck metric.Int64Counter +} + +// NewEventsourcedMetrics builds an eventsourced.MetricsRecorder that records to +// the global OpenTelemetry MeterProvider. Pass the result to both +// pg.WithMetrics (for event-store operations) and eventsourced.WithMetrics +// (for command handling) so a single recorder covers store and handler +// metrics. +// +// SetupOTelSDK must have run first so the global MeterProvider is configured; +// when metrics are disabled the global provider is a no-op and recording is +// effectively free. +func NewEventsourcedMetrics() (eventsourced.MetricsRecorder, error) { + m := otel.Meter(eventsourcedMeterName) + var errs []error + hist := func(name, desc string) metric.Float64Histogram { + h, err := m.Float64Histogram( + name, + metric.WithDescription(desc), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(durationBucketsSeconds...), + ) + errs = append(errs, err) + return h + } + counter := func(name, desc string) metric.Int64Counter { + c, err := m.Int64Counter(name, metric.WithDescription(desc)) + errs = append(errs, err) + return c + } + + r := &eventsourcedMetrics{ + commandDuration: hist("eventsourced.command.duration", "Wall-clock time to process a command in Handle."), + eventStoreDur: hist("eventsourced.event.store.duration", "Time taken to persist a single event."), + eventsLoaded: counter("eventsourced.events.loaded", "Number of events loaded when rehydrating aggregates."), + eventLoadDur: hist("eventsourced.event.load.duration", "Time taken to load events for an aggregate."), + snapshotStoreDur: hist("eventsourced.snapshot.store.duration", "Time taken to persist a snapshot."), + snapshotLoadDur: hist("eventsourced.snapshot.load.duration", "Time taken to load a snapshot."), + idempotencyCheck: counter("eventsourced.idempotency.checks", "Number of command idempotency lookups."), + } + if err := errors.Join(errs...); err != nil { + return nil, err + } + return r, nil +} + +// Record implements eventsourced.MetricsRecorder. Metric types the adapter does +// not recognise (for example pg outbox metrics when the outbox is not enabled) +// are ignored. +func (e *eventsourcedMetrics) Record(ctx context.Context, raw eventsourced.Metric) { + switch m := raw.(type) { + case eventsourced.CommandDuration: + e.commandDuration.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( + attribute.String("command.type", m.CommandType), + attribute.Bool("success", m.Success), + )) + case eventsourced.EventStored: + e.eventStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( + attribute.String("aggregate.type", m.AggregateType), + attribute.String("event.type", m.EventType), + )) + case eventsourced.EventsLoaded: + attrs := metric.WithAttributes(attribute.String("aggregate.type", m.AggregateType)) + e.eventsLoaded.Add(ctx, int64(m.EventCount), attrs) + e.eventLoadDur.Record(ctx, m.Duration.Seconds(), attrs) + case eventsourced.SnapshotStored: + e.snapshotStoreDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( + attribute.String("aggregate.type", m.AggregateType), + attribute.Bool("success", m.Success), + )) + case eventsourced.SnapshotLoaded: + e.snapshotLoadDur.Record(ctx, m.Duration.Seconds(), metric.WithAttributes( + attribute.String("aggregate.type", m.AggregateType), + attribute.Bool("found", m.Found), + )) + case eventsourced.IdempotencyCheck: + e.idempotencyCheck.Add(ctx, 1, metric.WithAttributes( + attribute.String("aggregate.type", m.AggregateType), + attribute.Bool("hit", m.Hit), + )) + } +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..55423ee --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,75 @@ +package otelsetup + +import ( + "context" + "sort" + "testing" + "time" + + "gitlab.com/unboundsoftware/eventsourced/eventsourced" + "go.opentelemetry.io/otel" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestNewEventsourcedMetrics_RecordsContract(t *testing.T) { + reader := sdkmetric.NewManualReader() + otel.SetMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))) + + r, err := NewEventsourcedMetrics() + if err != nil { + t.Fatalf("NewEventsourcedMetrics returned error: %v", err) + } + if r == nil { + t.Fatal("NewEventsourcedMetrics returned nil recorder") + } + + // Recording every known metric type (and an unknown one) must not panic + // and must emit the expected instruments. + for _, m := range []eventsourced.Metric{ + eventsourced.CommandDuration{CommandType: "AddEntry", Duration: time.Millisecond, Success: true}, + eventsourced.EventStored{AggregateType: "Entry", EventType: "EntryAdded", Duration: time.Millisecond}, + eventsourced.EventsLoaded{AggregateType: "Entry", EventCount: 3, Duration: time.Millisecond}, + eventsourced.SnapshotStored{AggregateType: "Entry", Duration: time.Millisecond, Success: true}, + eventsourced.SnapshotLoaded{AggregateType: "Entry", Found: false, Duration: time.Millisecond}, + eventsourced.IdempotencyCheck{AggregateType: "Entry", Hit: true}, + unknownMetric{}, + } { + r.Record(context.Background(), m) + } + + var rm metricdata.ResourceMetrics + if err := reader.Collect(context.Background(), &rm); err != nil { + t.Fatalf("collect: %v", err) + } + + got := map[string]bool{} + for _, sm := range rm.ScopeMetrics { + for _, md := range sm.Metrics { + got[md.Name] = true + } + } + want := []string{ + "eventsourced.command.duration", + "eventsourced.event.store.duration", + "eventsourced.events.loaded", + "eventsourced.event.load.duration", + "eventsourced.snapshot.store.duration", + "eventsourced.snapshot.load.duration", + "eventsourced.idempotency.checks", + } + var missing []string + for _, w := range want { + if !got[w] { + missing = append(missing, w) + } + } + if len(missing) > 0 { + sort.Strings(missing) + t.Errorf("missing expected metrics: %v", missing) + } +} + +type unknownMetric struct{} + +func (unknownMetric) IsMetric() {} -- 2.52.0