package graph import ( "context" "fmt" "log/slog" "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitea.unbound.se/unboundsoftware/schemas/cache" "gitea.unbound.se/unboundsoftware/schemas/graph/model" "gitea.unbound.se/unboundsoftware/schemas/middleware" "gitea.unbound.se/unboundsoftware/schemas/sdlmerge" ) //go:generate go run github.com/99designs/gqlgen //go:generate gofumpt -w . //go:generate goimports -w -local gitea.unbound.se/unboundsoftware/schemas . // This file will not be regenerated automatically. // // It serves as dependency injection for your app, add any dependencies you require here. type Publisher interface { Publish(ctx context.Context, event eventsourced.Event) error } type Resolver struct { EventStore eventsourced.EventStore Publisher Publisher Logger *slog.Logger Cache *cache.Cache PubSub *PubSub CosmoGenerator *CosmoGenerator Debouncer *Debouncer } func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) { key, err := middleware.ApiKeyFromContext(ctx) if err != nil { return "", err } apiKey := r.Cache.ApiKeyByKey(key) if publish && !apiKey.Publish { return "", fmt.Errorf("provided API-key doesn't have publish privilege") } if !publish && !apiKey.Read { return "", fmt.Errorf("provided API-key doesn't have read privilege") } for _, rr := range apiKey.Refs { if rr == ref { return apiKey.Name, nil } } return "", fmt.Errorf("provided API-key doesn't have the required privilege on the requested Schema Ref") } func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate) (eventsourced.CommandHandler, error) { return eventsourced.NewHandler(ctx, aggregate, r.EventStore, eventsourced.WithEventPublisher(r.Publisher)) } func apiKeyId(orgId, name string) string { return fmt.Sprintf("%s-%s", orgId, name) } // WarmCache precomputes the merged SDL and SchemaUpdate (cosmo router // config) for every (orgId, ref) tracked in the cache. Intended to run // once at startup, after the event-sourced caches have been populated // but before the pod accepts traffic, so the first request per ref does // not pay the cold-start cost of running sdlmerge + wgc compose. // // Errors per ref are logged and skipped rather than aborting the whole // warmup: a single bad ref must not block the pod from serving the // remaining refs. func (r *Resolver) WarmCache(ctx context.Context) { refs := r.Cache.AllOrgRefs() r.Logger.Info("Warming schema cache on startup", "refCount", len(refs)) for _, or := range refs { services, lastUpdate := r.Cache.Services(or.OrgId, or.Ref, "") if len(services) == 0 { continue } subGraphs := make([]*model.SubGraph, len(services)) serviceSDLs := make([]string, len(services)) for i, id := range services { sg, err := r.fetchSubGraph(ctx, id) if err != nil { r.Logger.Error("warmup: fetch subgraph", "error", err, "orgId", or.OrgId, "ref", or.Ref, "id", id) subGraphs = nil break } subGraphs[i] = r.toGqlSubGraph(sg) serviceSDLs[i] = sg.Sdl } if subGraphs == nil { continue } if sdl, err := sdlmerge.MergeSDLs(serviceSDLs...); err != nil { r.Logger.Error("warmup: merge SDLs", "error", err, "orgId", or.OrgId, "ref", or.Ref) } else { r.Cache.SetMergedSDL(or.OrgId, or.Ref, lastUpdate, sdl, subGraphs) } cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs) if err != nil { r.Logger.Error("warmup: generate cosmo config", "error", err, "orgId", or.OrgId, "ref", or.Ref) continue } r.Cache.SetSchemaUpdate(or.OrgId, or.Ref, &model.SchemaUpdate{ Ref: or.Ref, ID: lastUpdate, SubGraphs: subGraphs, CosmoRouterConfig: &cosmoConfig, }) } r.Logger.Info("Schema cache warmup complete", "refCount", len(refs)) }