package main import ( "context" "fmt" "net/http" "os" "os/signal" "reflect" "sync" "syscall" "github.com/99designs/gqlgen/graphql/handler" "github.com/99designs/gqlgen/graphql/playground" "github.com/alecthomas/kong" "github.com/apex/log" "github.com/apex/log/handlers/json" sentryhttp "github.com/getsentry/sentry-go/http" "github.com/rs/cors" "github.com/sparetimecoders/goamqp" "gitlab.com/unboundsoftware/eventsourced/amqp" "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitlab.com/unboundsoftware/eventsourced/pg" "gitlab.com/unboundsoftware/schemas/cache" "gitlab.com/unboundsoftware/schemas/domain" "gitlab.com/unboundsoftware/schemas/graph" "gitlab.com/unboundsoftware/schemas/graph/generated" "gitlab.com/unboundsoftware/schemas/middleware" "gitlab.com/unboundsoftware/schemas/store" ) var CLI struct { AmqpURL string `name:"amqp-url" env:"AMQP_URL" help:"URL to use to connect to RabbitMQ" default:"amqp://user:password@localhost:5672/"` Port int `name:"port" env:"PORT" help:"Listen-port for GraphQL API" default:"8080"` APIKey string `name:"api-key" env:"API_KEY" help:"The API-key that is required"` LogLevel string `name:"log-level" env:"LOG_LEVEL" help:"The level of logging to use (debug, info, warn, error, fatal)" default:"info"` DatabaseURL string `name:"postgres-url" env:"POSTGRES_URL" help:"URL to use to connect to Postgres" default:"postgres://postgres:postgres@:5432/schemas?sslmode=disable"` DatabaseDriverName string `name:"db-driver" env:"DB_DRIVER" help:"Driver to use to connect to db" default:"postgres"` } const serviceName = "schemas" func main() { _ = kong.Parse(&CLI) log.SetHandler(json.New(os.Stdout)) log.SetLevelFromString(CLI.LogLevel) logger := log.WithField("service", serviceName) closeEvents := make(chan error) if err := start( closeEvents, logger, ConnectAMQP, ); err != nil { logger.WithError(err).Error("process error") } } func start(closeEvents chan error, logger *log.Entry, connectToAmqpFunc func(url string) (Connection, error)) error { rootCtx, rootCancel := context.WithCancel(context.Background()) defer rootCancel() db, err := store.SetupDB(CLI.DatabaseDriverName, CLI.DatabaseURL) if err != nil { return fmt.Errorf("failed to setup DB: %v", err) } eventStore, err := pg.New( db.DB, pg.WithEventTypes( &domain.SubGraphUpdated{}, ), ) if err != nil { return fmt.Errorf("failed to create eventstore: %v", err) } eventPublisher, err := goamqp.NewPublisher( goamqp.Route{ Type: domain.SubGraphUpdated{}, Key: "SubGraph.Updated", }, ) if err != nil { return fmt.Errorf("failed to create event publisher: %v", err) } amqp.New(eventPublisher) conn, err := connectToAmqpFunc(CLI.AmqpURL) if err != nil { return fmt.Errorf("failed to connect to AMQP: %v", err) } serviceCache := cache.New(logger) roots, err := eventStore.GetAggregateRoots(reflect.TypeOf(domain.SubGraph{})) if err != nil { return err } for _, root := range roots { subGraph := &domain.SubGraph{BaseAggregate: eventsourced.BaseAggregateFromString(root.String())} if _, err := eventsourced.NewHandler(subGraph, eventStore); err != nil { return err } _, err := serviceCache.Update(subGraph, nil) if err != nil { return err } } setups := []goamqp.Setup{ goamqp.UseLogger(logger.Errorf), goamqp.CloseListener(closeEvents), goamqp.WithPrefetchLimit(20), goamqp.EventStreamPublisher(eventPublisher), goamqp.TransientEventStreamConsumer("SubGraph.Updated", serviceCache.Update, domain.SubGraphUpdated{}), } if err := conn.Start(setups...); err != nil { return fmt.Errorf("failed to setup AMQP: %v", err) } defer func() { _ = conn.Close() }() logger.Info("Started") mux := http.NewServeMux() httpSrv := &http.Server{Addr: fmt.Sprintf(":%d", CLI.Port), Handler: mux} wg := sync.WaitGroup{} sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) wg.Add(1) go func() { defer wg.Done() sig := <-sigint if sig != nil { // In case our shutdown logic is broken/incomplete we reset signal // handlers so next signal goes to go itself. Go is more aggressive when // shutting down goroutines signal.Reset(os.Interrupt, syscall.SIGTERM) logger.Info("Got shutdown signal..") rootCancel() } }() wg.Add(1) go func() { defer wg.Done() err := <-closeEvents if err != nil { logger.WithError(err).Error("received close from AMQP") rootCancel() } }() wg.Add(1) go func() { defer wg.Done() <-rootCtx.Done() if err := httpSrv.Close(); err != nil { logger.WithError(err).Error("close http server") } close(sigint) close(closeEvents) }() wg.Add(1) go func() { defer wg.Done() defer rootCancel() resolver := &graph.Resolver{ EventStore: eventStore, Publisher: amqp.New(eventPublisher), Logger: logger, Cache: serviceCache, } config := generated.Config{ Resolvers: resolver, Complexity: generated.ComplexityRoot{}, } apiKeyMiddleware := middleware.NewApiKey(CLI.APIKey, logger) config.Directives.HasApiKey = apiKeyMiddleware.Directive srv := handler.NewDefaultServer(generated.NewExecutableSchema( config, )) sentryHandler := sentryhttp.New(sentryhttp.Options{Repanic: true}) mux.Handle("/", sentryHandler.HandleFunc(playground.Handler("GraphQL playground", "/query"))) mux.Handle("/health", sentryHandler.HandleFunc(healthFunc)) mux.Handle("/query", cors.AllowAll().Handler(sentryHandler.Handle(apiKeyMiddleware.Handler(srv)))) logger.Infof("connect to http://localhost:%d/ for GraphQL playground", CLI.Port) if err := httpSrv.ListenAndServe(); err != nil { logger.WithError(err).Error("listen http") } }() wg.Wait() return nil } func healthFunc(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("OK")) } func ConnectAMQP(url string) (Connection, error) { return goamqp.NewFromURL(serviceName, url) } type Connection interface { Start(opts ...goamqp.Setup) error Close() error }