diff --git a/cmd/service/service.go b/cmd/service/service.go index 686f398..5b287c6 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -84,6 +84,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url } eventStore, err := pg.New( + rootCtx, db.DB, pg.WithEventTypes( &domain.SubGraphUpdated{}, @@ -92,29 +93,32 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url if err != nil { return fmt.Errorf("failed to create eventstore: %v", err) } - eventPublisher, err := goamqp.NewPublisher( + publisher, err := goamqp.NewPublisher( goamqp.Route{ Type: domain.SubGraphUpdated{}, Key: "SubGraph.Updated", }, ) + if err != nil { + return fmt.Errorf("failed to create publisher: %v", err) + } + eventPublisher, err := amqp.New(publisher) if err != nil { return fmt.Errorf("failed to create event publisher: %v", err) } - amqp.New(eventPublisher) conn, err := connectToAmqpFunc(cli.AmqpURL) if err != nil { return fmt.Errorf("failed to connect to AMQP: %v", err) } serviceCache := cache.New(logger) - roots, err := eventStore.GetAggregateRoots(reflect.TypeOf(domain.SubGraph{})) + roots, err := eventStore.GetAggregateRoots(rootCtx, reflect.TypeOf(domain.SubGraph{})) if err != nil { return err } for _, root := range roots { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} - if _, err := eventsourced.NewHandler(subGraph, eventStore); err != nil { + if _, err := eventsourced.NewHandler(rootCtx, subGraph, eventStore); err != nil { return err } _, err := serviceCache.Update(subGraph, nil) @@ -126,7 +130,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url goamqp.UseLogger(logger.Errorf), goamqp.CloseListener(closeEvents), goamqp.WithPrefetchLimit(20), - goamqp.EventStreamPublisher(eventPublisher), + goamqp.EventStreamPublisher(publisher), goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}), } if err := conn.Start(rootCtx, setups...); err != nil { @@ -187,7 +191,7 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url resolver := &graph.Resolver{ EventStore: eventStore, - Publisher: amqp.New(eventPublisher), + Publisher: eventPublisher, Logger: logger, Cache: serviceCache, } diff --git a/go.mod b/go.mod index b56b67d..20f945e 100644 --- a/go.mod +++ b/go.mod @@ -14,9 +14,9 @@ require ( github.com/stretchr/testify v1.8.1 github.com/vektah/gqlparser/v2 v2.5.1 github.com/wundergraph/graphql-go-tools v1.57.2-0.20221005155749-a4fdba38990b - gitlab.com/unboundsoftware/eventsourced/amqp v1.5.0 - gitlab.com/unboundsoftware/eventsourced/eventsourced v1.9.3 - gitlab.com/unboundsoftware/eventsourced/pg v1.9.0 + gitlab.com/unboundsoftware/eventsourced/amqp v1.6.4 + gitlab.com/unboundsoftware/eventsourced/eventsourced v1.11.3 + gitlab.com/unboundsoftware/eventsourced/pg v1.10.3 ) require ( @@ -30,7 +30,7 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/kr/text v0.2.0 // indirect - github.com/lib/pq v1.10.6 // indirect + github.com/lib/pq v1.10.7 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index ce39a72..deead3b 100644 --- a/go.sum +++ b/go.sum @@ -31,7 +31,6 @@ github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U github.com/bradleyjkemp/cupaloy/v2 v2.6.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= -github.com/caarlos0/env v3.5.0+incompatible/go.mod h1:tdCsowwCzMLdkqRYDlHpZCp2UooDD3MspDBjZ2AD02Y= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -86,8 +85,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs= -github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= +github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/logrusorgru/aurora/v3 v3.0.0/go.mod h1:vsR12bk5grlLvLXAYrBsb5Oc/N+LxAlxggSjiwMnCUc= github.com/matryer/moq v0.2.3/go.mod h1:9RtPYjTnH1bSBIkpvtHkFN7nbWAnO7oRpdJkEIn6UtE= github.com/matryer/moq v0.2.7/go.mod h1:kITsx543GOENm48TUAQyJ9+SAvFSr7iGQXPoth/VUBk= @@ -120,7 +119,6 @@ github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJ github.com/qri-io/jsonpointer v0.1.1/go.mod h1:DnJPaYgiKu56EuDp8TU5wFLdZIcAnb/uH9v37ZaMV64= github.com/qri-io/jsonschema v0.2.1 h1:NNFoKms+kut6ABPf6xiKNM5214jzxAhDBrPHCJ97Wg0= github.com/qri-io/jsonschema v0.2.1/go.mod h1:g7DPkiOsK1xv6T/Ao5scXRkd+yTFygcANPBaaqW+VrI= -github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rogpeppe/fastuuid v1.1.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -139,7 +137,6 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs= -github.com/sparetimecoders/goamqp v0.1.1/go.mod h1:JIydmIgCqETEHIiGYmN03gNSs2bghWBHEqnR/Lfmzb0= github.com/sparetimecoders/goamqp v0.1.3 h1:+i9gtgFm4ffSgX20/xkCxSBiM5cZ+/13hzVFcpLrCz4= github.com/sparetimecoders/goamqp v0.1.3/go.mod h1:BKUl32yHsxpKEZEn7oEgyKB8Y0C4dk5n+17FModO6iM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -153,7 +150,6 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.3/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -189,14 +185,13 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -gitlab.com/unboundsoftware/eventsourced/amqp v1.5.0 h1:YJu6oD8vzxuSZZqPv2N3UyQzvWDWW0h/XTHiy0aiWSo= -gitlab.com/unboundsoftware/eventsourced/amqp v1.5.0/go.mod h1:dDShzDLym/gM7Mad26Y0K2TTLUy97hO46XC69Zi91qQ= -gitlab.com/unboundsoftware/eventsourced/eventsourced v1.9.0/go.mod h1:nm3W8Gr8shALbgBepOocFRqDenG/oZFBON8WLIlOvck= -gitlab.com/unboundsoftware/eventsourced/eventsourced v1.9.2/go.mod h1:i1Woh2JHIgAK27nFxS7q6/fAceSqX4VR7PYTizY5EMI= -gitlab.com/unboundsoftware/eventsourced/eventsourced v1.9.3 h1:Khls+eq34tovtDBnSPHpC4tKswUlNDCYHH9Vl5A+hTo= -gitlab.com/unboundsoftware/eventsourced/eventsourced v1.9.3/go.mod h1:c4rRdyIFW2s49hnLv5c5HsfL3I144mLnvvIOb0JZdpk= -gitlab.com/unboundsoftware/eventsourced/pg v1.9.0 h1:Hr8kgACHTFICDMV71waf/+CZVKkKJWzXqPGbomMUs70= -gitlab.com/unboundsoftware/eventsourced/pg v1.9.0/go.mod h1:tfwAGkvmnCpJCAIr94hljJzZXm+W1aHAXjuYJXbXYiI= +gitlab.com/unboundsoftware/eventsourced/amqp v1.6.4 h1:k9xA5fo3zvP2W5GZseAytAivJvVvBKezn5acZssrgvk= +gitlab.com/unboundsoftware/eventsourced/amqp v1.6.4/go.mod h1:XHg6Men3GHsA/x9ln+atApW4ST2ZHVMp3NPnxW51JoA= +gitlab.com/unboundsoftware/eventsourced/eventsourced v1.11.2/go.mod h1:vGYGhwwjQjal7d+niWo4wKZ6ZI1zc1ehHPBfPbc2ICg= +gitlab.com/unboundsoftware/eventsourced/eventsourced v1.11.3 h1:ICrsTn9XB7aKV06qMaedM4s+rXhqtY2W9rscC4zFQqc= +gitlab.com/unboundsoftware/eventsourced/eventsourced v1.11.3/go.mod h1:vGYGhwwjQjal7d+niWo4wKZ6ZI1zc1ehHPBfPbc2ICg= +gitlab.com/unboundsoftware/eventsourced/pg v1.10.3 h1:bmkondNhHrnOTFhFVpCVKjcbqCvx3WZP7DsevbVaK3o= +gitlab.com/unboundsoftware/eventsourced/pg v1.10.3/go.mod h1:qTO/V7FxIHP8gZQTyuxs29+KGRjzKDgp+XIrPWG65Lg= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/graph/resolver.go b/graph/resolver.go index 55a1df0..56de381 100644 --- a/graph/resolver.go +++ b/graph/resolver.go @@ -1,6 +1,8 @@ package graph import ( + "context" + "github.com/apex/log" "gitlab.com/unboundsoftware/eventsourced/eventsourced" @@ -14,8 +16,7 @@ import ( // It serves as dependency injection for your app, add any dependencies you require here. type Publisher interface { - Publish(event eventsourced.Event) error - Stop() error + Publish(ctx context.Context, event eventsourced.Event) error } type Resolver struct { @@ -25,6 +26,6 @@ type Resolver struct { Cache *cache.Cache } -func (r *Resolver) handler(aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) { - return eventsourced.NewHandler(aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher)) +func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) { + return eventsourced.NewHandler(ctx, aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher)) } diff --git a/graph/schema.helpers.go b/graph/schema.helpers.go index 6cb1b02..6dc24d2 100644 --- a/graph/schema.helpers.go +++ b/graph/schema.helpers.go @@ -1,15 +1,17 @@ package graph import ( + "context" + "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/schemas/domain" "gitlab.com/unboundsoftware/schemas/graph/model" ) -func (r *Resolver) fetchSubGraph(subGraphId string) (*domain.SubGraph, error) { +func (r *Resolver) fetchSubGraph(ctx context.Context, subGraphId string) (*domain.SubGraph, error) { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(subGraphId)} - _, err := r.handler(subGraph) + _, err := r.handler(ctx, subGraph) if err != nil { return nil, err } diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 58af27c..3086782 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -24,7 +24,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input if subGraphId != "" { subGraph.BaseAggregate = eventsourced.BaseAggregateFromString(subGraphId) } - handler, err := r.handler(subGraph) + handler, err := r.handler(ctx, subGraph) if err != nil { return nil, err } @@ -36,7 +36,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input serviceSDLs := []string{input.Sdl} services, _ := r.Cache.Services(input.Ref, "") for _, id := range services { - sg, err := r.fetchSubGraph(id) + sg, err := r.fetchSubGraph(ctx, id) if err != nil { return nil, err } @@ -48,7 +48,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input if err != nil { return nil, err } - _, err = handler.Handle(domain.UpdateSubGraph{ + _, err = handler.Handle(ctx, domain.UpdateSubGraph{ Ref: input.Ref, Service: input.Service, Url: input.URL, @@ -89,7 +89,7 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str } subGraphs := make([]*model.SubGraph, len(services)) for i, id := range services { - sg, err := r.fetchSubGraph(id) + sg, err := r.fetchSubGraph(ctx, id) if err != nil { return nil, err }