diff --git a/cache/cache.go b/cache/cache.go index d35f65b..0f92e39 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -102,6 +102,30 @@ func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) { return services, c.lastUpdate[key] } +// OrgRef identifies a single (organizationId, ref) pair that the cache +// tracks subgraphs for. +type OrgRef struct { + OrgId string + Ref string +} + +// AllOrgRefs returns every (orgId, ref) pair that currently has at least +// one subgraph in the cache. Used by startup warmup to pre-compute the +// merged SDL and SchemaUpdate for every known ref before the pod starts +// serving traffic. +func (c *Cache) AllOrgRefs() []OrgRef { + c.mu.RLock() + defer c.mu.RUnlock() + + var out []OrgRef + for orgId, refs := range c.services { + for ref := range refs { + out = append(out, OrgRef{OrgId: orgId, Ref: ref}) + } + } + return out +} + func (c *Cache) SubGraphId(orgId, ref, service string) string { c.mu.RLock() defer c.mu.RUnlock() diff --git a/cmd/service/service.go b/cmd/service/service.go index 8d99d70..2332dae 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -210,6 +210,8 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u Debouncer: graph.NewDebouncer(500 * time.Millisecond), } + resolver.WarmCache(rootCtx) + config := generated.Config{ Resolvers: resolver, Complexity: generated.ComplexityRoot{}, diff --git a/graph/resolver.go b/graph/resolver.go index 706be97..2f2f7b4 100644 --- a/graph/resolver.go +++ b/graph/resolver.go @@ -8,7 +8,9 @@ import ( "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitea.unbound.se/unboundsoftware/schemas/cache" + "gitea.unbound.se/unboundsoftware/schemas/graph/model" "gitea.unbound.se/unboundsoftware/schemas/middleware" + "gitea.unbound.se/unboundsoftware/schemas/sdlmerge" ) //go:generate go run github.com/99designs/gqlgen @@ -60,3 +62,60 @@ func (r *Resolver) handler(ctx context.Context, aggregate eventsourced.Aggregate func apiKeyId(orgId, name string) string { return fmt.Sprintf("%s-%s", orgId, name) } + +// WarmCache precomputes the merged SDL and SchemaUpdate (cosmo router +// config) for every (orgId, ref) tracked in the cache. Intended to run +// once at startup, after the event-sourced caches have been populated +// but before the pod accepts traffic, so the first request per ref does +// not pay the cold-start cost of running sdlmerge + wgc compose. +// +// Errors per ref are logged and skipped rather than aborting the whole +// warmup: a single bad ref must not block the pod from serving the +// remaining refs. +func (r *Resolver) WarmCache(ctx context.Context) { + refs := r.Cache.AllOrgRefs() + r.Logger.Info("Warming schema cache on startup", "refCount", len(refs)) + + for _, or := range refs { + services, lastUpdate := r.Cache.Services(or.OrgId, or.Ref, "") + if len(services) == 0 { + continue + } + + subGraphs := make([]*model.SubGraph, len(services)) + serviceSDLs := make([]string, len(services)) + for i, id := range services { + sg, err := r.fetchSubGraph(ctx, id) + if err != nil { + r.Logger.Error("warmup: fetch subgraph", "error", err, "orgId", or.OrgId, "ref", or.Ref, "id", id) + subGraphs = nil + break + } + subGraphs[i] = r.toGqlSubGraph(sg) + serviceSDLs[i] = sg.Sdl + } + if subGraphs == nil { + continue + } + + if sdl, err := sdlmerge.MergeSDLs(serviceSDLs...); err != nil { + r.Logger.Error("warmup: merge SDLs", "error", err, "orgId", or.OrgId, "ref", or.Ref) + } else { + r.Cache.SetMergedSDL(or.OrgId, or.Ref, lastUpdate, sdl, subGraphs) + } + + cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs) + if err != nil { + r.Logger.Error("warmup: generate cosmo config", "error", err, "orgId", or.OrgId, "ref", or.Ref) + continue + } + r.Cache.SetSchemaUpdate(or.OrgId, or.Ref, &model.SchemaUpdate{ + Ref: or.Ref, + ID: lastUpdate, + SubGraphs: subGraphs, + CosmoRouterConfig: &cosmoConfig, + }) + } + + r.Logger.Info("Schema cache warmup complete", "refCount", len(refs)) +}