feat: organizations and API keys
This commit is contained in:
+72
-16
@@ -35,10 +35,11 @@ import (
|
||||
type CLI struct {
|
||||
AmqpURL string `name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@localhost:5672/"`
|
||||
Port int `name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080"`
|
||||
APIKey string `name:"api-key" env:"API_KEY" help:"The API-key that is required"`
|
||||
LogLevel string `name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info"`
|
||||
DatabaseURL string `name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@:5432/schemas?sslmode=disable"`
|
||||
DatabaseDriverName string `name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres"`
|
||||
Issuer string `name:"issuer" env:"ISSUER" help:"The JWT token issuer to use" default:"unbound.eu.auth0.com"`
|
||||
StrictSSL bool `name:"strict-ssl" env:"STRICT_SSL" help:"Should strict SSL handling be enabled" default:"true"`
|
||||
SentryConfig
|
||||
}
|
||||
|
||||
@@ -88,16 +89,31 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
||||
db.DB,
|
||||
pg.WithEventTypes(
|
||||
&domain.SubGraphUpdated{},
|
||||
&domain.OrganizationAdded{},
|
||||
&domain.APIKeyAdded{},
|
||||
),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create eventstore: %v", err)
|
||||
}
|
||||
|
||||
if err := store.RunEventStoreMigrations(db); err != nil {
|
||||
return fmt.Errorf("event migrations: %w", err)
|
||||
}
|
||||
|
||||
publisher, err := goamqp.NewPublisher(
|
||||
goamqp.Route{
|
||||
Type: domain.SubGraphUpdated{},
|
||||
Key: "SubGraph.Updated",
|
||||
},
|
||||
goamqp.Route{
|
||||
Type: domain.OrganizationAdded{},
|
||||
Key: "Organization.Added",
|
||||
},
|
||||
goamqp.Route{
|
||||
Type: domain.APIKeyAdded{},
|
||||
Key: "Organization.APIKeyAdded",
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create publisher: %v", err)
|
||||
@@ -112,19 +128,11 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
||||
}
|
||||
|
||||
serviceCache := cache.New(logger)
|
||||
roots, err := eventStore.GetAggregateRoots(rootCtx, reflect.TypeOf(domain.SubGraph{}))
|
||||
if err != nil {
|
||||
return err
|
||||
if err := loadOrganizations(rootCtx, eventStore, serviceCache); err != nil {
|
||||
return fmt.Errorf("caching organizations: %w", err)
|
||||
}
|
||||
for _, root := range roots {
|
||||
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
|
||||
if _, err := eventsourced.NewHandler(rootCtx, subGraph, eventStore); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := serviceCache.Update(subGraph, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := loadSubGraphs(rootCtx, eventStore, serviceCache); err != nil {
|
||||
return fmt.Errorf("caching subgraphs: %w", err)
|
||||
}
|
||||
setups := []goamqp.Setup{
|
||||
goamqp.UseLogger(logger.Errorf),
|
||||
@@ -132,6 +140,8 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
||||
goamqp.WithPrefetchLimit(20),
|
||||
goamqp.EventStreamPublisher(publisher),
|
||||
goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}),
|
||||
goamqp.TransientEventStreamConsumer("Organization.Added", serviceCache.Update, domain.OrganizationAdded{}),
|
||||
goamqp.TransientEventStreamConsumer("Organization.APIKeyAdded", serviceCache.Update, domain.APIKeyAdded{}),
|
||||
}
|
||||
if err := conn.Start(rootCtx, setups...); err != nil {
|
||||
return fmt.Errorf("failed to setup AMQP: %v", err)
|
||||
@@ -200,8 +210,10 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
||||
Resolvers: resolver,
|
||||
Complexity: generated.ComplexityRoot{},
|
||||
}
|
||||
apiKeyMiddleware := middleware.NewApiKey(cli.APIKey, logger)
|
||||
config.Directives.HasApiKey = apiKeyMiddleware.Directive
|
||||
apiKeyMiddleware := middleware.NewApiKey()
|
||||
mw := middleware.NewAuth0("https://schemas.unbound.se", cli.Issuer, cli.StrictSSL)
|
||||
authMiddleware := middleware.NewAuth(serviceCache)
|
||||
config.Directives.Auth = authMiddleware.Directive
|
||||
srv := handler.NewDefaultServer(generated.NewExecutableSchema(
|
||||
config,
|
||||
))
|
||||
@@ -209,7 +221,15 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
||||
sentryHandler := sentryhttp.New(sentryhttp.Options{Repanic: true})
|
||||
mux.Handle("/", sentryHandler.HandleFunc(playground.Handler("GraphQL playground", "/query")))
|
||||
mux.Handle("/health", http.HandlerFunc(healthFunc))
|
||||
mux.Handle("/query", cors.AllowAll().Handler(sentryHandler.Handle(apiKeyMiddleware.Handler(srv))))
|
||||
mux.Handle("/query", cors.AllowAll().Handler(
|
||||
sentryHandler.Handle(
|
||||
mw.Middleware().CheckJWT(
|
||||
apiKeyMiddleware.Handler(
|
||||
authMiddleware.Handler(srv),
|
||||
),
|
||||
),
|
||||
),
|
||||
))
|
||||
|
||||
logger.Infof("connect to http://localhost:%d/ for GraphQL playground", cli.Port)
|
||||
|
||||
@@ -223,6 +243,42 @@ func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadOrganizations(ctx context.Context, eventStore eventsourced.EventStore, serviceCache *cache.Cache) error {
|
||||
roots, err := eventStore.GetAggregateRoots(ctx, reflect.TypeOf(domain.Organization{}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, root := range roots {
|
||||
organization := &domain.Organization{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
|
||||
if _, err := eventsourced.NewHandler(ctx, organization, eventStore); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := serviceCache.Update(organization, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadSubGraphs(ctx context.Context, eventStore eventsourced.EventStore, serviceCache *cache.Cache) error {
|
||||
roots, err := eventStore.GetAggregateRoots(ctx, reflect.TypeOf(domain.SubGraph{}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, root := range roots {
|
||||
subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())}
|
||||
if _, err := eventsourced.NewHandler(ctx, subGraph, eventStore); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := serviceCache.Update(subGraph, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func healthFunc(w http.ResponseWriter, _ *http.Request) {
|
||||
_, _ = w.Write([]byte("OK"))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user