diff --git a/Dockerfile b/Dockerfile index 3512d54..67e36fd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,9 +24,17 @@ RUN GOOS=linux GOARCH=amd64 go build \ FROM scratch as export COPY --from=build /build/coverage.txt / -FROM scratch +FROM node:22-alpine ENV TZ Europe/Stockholm + +# Install wgc CLI globally for Cosmo Router composition +RUN npm install -g wgc@latest + +# Copy timezone data and certificates COPY --from=build /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ + +# Copy the service binary COPY --from=build /release/service / + CMD ["/service"] diff --git a/go.mod b/go.mod index 127627f..6a4d2f0 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/apex/log v1.9.0 github.com/auth0/go-jwt-middleware/v2 v2.3.0 github.com/golang-jwt/jwt/v5 v5.3.0 + github.com/google/uuid v1.6.0 github.com/jmoiron/sqlx v1.4.0 github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.26.0 @@ -30,6 +31,7 @@ require ( go.opentelemetry.io/otel/sdk/log v0.14.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 go.opentelemetry.io/otel/trace v1.38.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -41,7 +43,6 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect @@ -51,6 +52,7 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect github.com/sosodev/duration v1.3.1 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/gjson v1.17.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect @@ -72,5 +74,4 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect google.golang.org/grpc v1.75.0 // indirect google.golang.org/protobuf v1.36.10 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fd5c8e7..3abb5fe 100644 --- a/go.sum +++ b/go.sum @@ -141,6 +141,8 @@ github.com/sosodev/duration v1.3.1/go.mod h1:RQIBBX0+fMLc/D9+Jb/fwvVmo0eZvDDEERA github.com/sparetimecoders/goamqp v0.3.3 h1:z/nfTPmrjeU/rIVuNOgsVLCimp3WFoNFvS3ZzXRJ6HE= github.com/sparetimecoders/goamqp v0.3.3/go.mod h1:W9NRCpWLE+Vruv2dcRSbszNil2O826d2Nv6kAkETW5o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/graph/cosmo.go b/graph/cosmo.go index 150e162..69deb88 100644 --- a/graph/cosmo.go +++ b/graph/cosmo.go @@ -1,54 +1,106 @@ package graph import ( - "encoding/json" "fmt" + "os" + "os/exec" + "path/filepath" + + "gopkg.in/yaml.v3" "gitlab.com/unboundsoftware/schemas/graph/model" ) // GenerateCosmoRouterConfig generates a Cosmo Router execution config from subgraphs +// using the official wgc CLI tool via npx func GenerateCosmoRouterConfig(subGraphs []*model.SubGraph) (string, error) { - // Build the Cosmo router config structure - // This is a simplified version - you may need to adjust based on actual Cosmo requirements - config := map[string]interface{}{ - "version": "1", - "subgraphs": convertSubGraphsToCosmo(subGraphs), - // Add other Cosmo-specific configuration as needed + if len(subGraphs) == 0 { + return "", fmt.Errorf("no subgraphs provided") } - // Marshal to JSON - configJSON, err := json.MarshalIndent(config, "", " ") + // Create a temporary directory for composition + tmpDir, err := os.MkdirTemp("", "cosmo-compose-*") if err != nil { - return "", fmt.Errorf("marshal cosmo config: %w", err) + return "", fmt.Errorf("create temp dir: %w", err) + } + defer os.RemoveAll(tmpDir) + + // Write each subgraph SDL to a file + type SubgraphConfig struct { + Name string `yaml:"name"` + RoutingURL string `yaml:"routing_url,omitempty"` + Schema map[string]string `yaml:"schema"` + Subscription map[string]interface{} `yaml:"subscription,omitempty"` } - return string(configJSON), nil -} + type InputConfig struct { + Version int `yaml:"version"` + Subgraphs []SubgraphConfig `yaml:"subgraphs"` + } -func convertSubGraphsToCosmo(subGraphs []*model.SubGraph) []map[string]interface{} { - cosmoSubgraphs := make([]map[string]interface{}, 0, len(subGraphs)) + inputConfig := InputConfig{ + Version: 1, + Subgraphs: make([]SubgraphConfig, 0, len(subGraphs)), + } for _, sg := range subGraphs { - cosmoSg := map[string]interface{}{ - "name": sg.Service, - "sdl": sg.Sdl, + // Write SDL to a temp file + schemaFile := filepath.Join(tmpDir, fmt.Sprintf("%s.graphql", sg.Service)) + if err := os.WriteFile(schemaFile, []byte(sg.Sdl), 0o644); err != nil { + return "", fmt.Errorf("write schema file for %s: %w", sg.Service, err) + } + + subgraphCfg := SubgraphConfig{ + Name: sg.Service, + Schema: map[string]string{ + "file": schemaFile, + }, } if sg.URL != nil { - cosmoSg["routing_url"] = *sg.URL + subgraphCfg.RoutingURL = *sg.URL } if sg.WsURL != nil { - cosmoSg["subscription"] = map[string]interface{}{ + subgraphCfg.Subscription = map[string]interface{}{ "url": *sg.WsURL, "protocol": "ws", "websocket_subprotocol": "graphql-ws", } } - cosmoSubgraphs = append(cosmoSubgraphs, cosmoSg) + inputConfig.Subgraphs = append(inputConfig.Subgraphs, subgraphCfg) } - return cosmoSubgraphs + // Write input config YAML + inputFile := filepath.Join(tmpDir, "input.yaml") + inputYAML, err := yaml.Marshal(inputConfig) + if err != nil { + return "", fmt.Errorf("marshal input config: %w", err) + } + if err := os.WriteFile(inputFile, inputYAML, 0o644); err != nil { + return "", fmt.Errorf("write input config: %w", err) + } + + // Execute wgc router compose + // wgc is installed globally in the Docker image + outputFile := filepath.Join(tmpDir, "config.json") + cmd := exec.Command("wgc", "router", "compose", + "--input", inputFile, + "--out", outputFile, + "--suppress-warnings", + ) + + output, err := cmd.CombinedOutput() + if err != nil { + return "", fmt.Errorf("wgc router compose failed: %w\nOutput: %s", err, string(output)) + } + + // Read the generated config + configJSON, err := os.ReadFile(outputFile) + if err != nil { + return "", fmt.Errorf("read output config: %w", err) + } + + return string(configJSON), nil } diff --git a/graph/cosmo_test.go b/graph/cosmo_test.go index 0f6de36..60cd3a0 100644 --- a/graph/cosmo_test.go +++ b/graph/cosmo_test.go @@ -203,55 +203,6 @@ func TestGenerateCosmoRouterConfig(t *testing.T) { } } -func TestConvertSubGraphsToCosmo(t *testing.T) { - tests := []struct { - name string - subGraphs []*model.SubGraph - wantLen int - validate func(t *testing.T, result []map[string]interface{}) - }{ - { - name: "preserves subgraph order", - subGraphs: []*model.SubGraph{ - {Service: "alpha", URL: stringPtr("http://a"), Sdl: "a"}, - {Service: "beta", URL: stringPtr("http://b"), Sdl: "b"}, - {Service: "gamma", URL: stringPtr("http://c"), Sdl: "c"}, - }, - wantLen: 3, - validate: func(t *testing.T, result []map[string]interface{}) { - assert.Equal(t, "alpha", result[0]["name"]) - assert.Equal(t, "beta", result[1]["name"]) - assert.Equal(t, "gamma", result[2]["name"]) - }, - }, - { - name: "includes SDL exactly as provided", - subGraphs: []*model.SubGraph{ - { - Service: "test", - URL: stringPtr("http://test"), - Sdl: "type Query { special: String! }", - }, - }, - wantLen: 1, - validate: func(t *testing.T, result []map[string]interface{}) { - assert.Equal(t, "type Query { special: String! }", result[0]["sdl"]) - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := convertSubGraphsToCosmo(tt.subGraphs) - assert.Len(t, result, tt.wantLen) - - if tt.validate != nil { - tt.validate(t, result) - } - }) - } -} - // Helper function for tests func stringPtr(s string) *string { return &s diff --git a/graph/generated/generated.go b/graph/generated/generated.go index b4e5fec..7ba0598 100644 --- a/graph/generated/generated.go +++ b/graph/generated/generated.go @@ -74,6 +74,7 @@ type ComplexityRoot struct { } Query struct { + LatestSchema func(childComplexity int, ref string) int Organizations func(childComplexity int) int Supergraph func(childComplexity int, ref string, isAfter *string) int } @@ -124,6 +125,7 @@ type MutationResolver interface { type QueryResolver interface { Organizations(ctx context.Context) ([]*model.Organization, error) Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error) + LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error) } type SubscriptionResolver interface { SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) @@ -250,6 +252,17 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin return e.complexity.Organization.Users(childComplexity), true + case "Query.latestSchema": + if e.complexity.Query.LatestSchema == nil { + break + } + + args, err := ec.field_Query_latestSchema_args(ctx, rawArgs) + if err != nil { + return 0, false + } + + return e.complexity.Query.LatestSchema(childComplexity, args["ref"].(string)), true case "Query.organizations": if e.complexity.Query.Organizations == nil { break @@ -520,6 +533,7 @@ var sources = []*ast.Source{ {Name: "../schema.graphqls", Input: `type Query { organizations: [Organization!]! @auth(user: true) supergraph(ref: String!, isAfter: String): Supergraph! @auth(organization: true) + latestSchema(ref: String!): SchemaUpdate! @auth(organization: true) } type Mutation { @@ -671,6 +685,17 @@ func (ec *executionContext) field_Query___type_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_latestSchema_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { + var err error + args := map[string]any{} + arg0, err := graphql.ProcessArgField(ctx, rawArgs, "ref", ec.unmarshalNString2string) + if err != nil { + return nil, err + } + args["ref"] = arg0 + return args, nil +} + func (ec *executionContext) field_Query_supergraph_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) { var err error args := map[string]any{} @@ -1434,6 +1459,75 @@ func (ec *executionContext) fieldContext_Query_supergraph(ctx context.Context, f return fc, nil } +func (ec *executionContext) _Query_latestSchema(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + return graphql.ResolveField( + ctx, + ec.OperationContext, + field, + ec.fieldContext_Query_latestSchema, + func(ctx context.Context) (any, error) { + fc := graphql.GetFieldContext(ctx) + return ec.resolvers.Query().LatestSchema(ctx, fc.Args["ref"].(string)) + }, + func(ctx context.Context, next graphql.Resolver) graphql.Resolver { + directive0 := next + + directive1 := func(ctx context.Context) (any, error) { + organization, err := ec.unmarshalOBoolean2ᚖbool(ctx, true) + if err != nil { + var zeroVal *model.SchemaUpdate + return zeroVal, err + } + if ec.directives.Auth == nil { + var zeroVal *model.SchemaUpdate + return zeroVal, errors.New("directive auth is not implemented") + } + return ec.directives.Auth(ctx, nil, directive0, nil, organization) + } + + next = directive1 + return next + }, + ec.marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate, + true, + true, + ) +} + +func (ec *executionContext) fieldContext_Query_latestSchema(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "ref": + return ec.fieldContext_SchemaUpdate_ref(ctx, field) + case "id": + return ec.fieldContext_SchemaUpdate_id(ctx, field) + case "subGraphs": + return ec.fieldContext_SchemaUpdate_subGraphs(ctx, field) + case "cosmoRouterConfig": + return ec.fieldContext_SchemaUpdate_cosmoRouterConfig(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type SchemaUpdate", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_latestSchema_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { return graphql.ResolveField( ctx, @@ -3997,6 +4091,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "latestSchema": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_latestSchema(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "__type": out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { diff --git a/graph/schema.graphqls b/graph/schema.graphqls index 97d82cb..ad1df55 100644 --- a/graph/schema.graphqls +++ b/graph/schema.graphqls @@ -1,6 +1,7 @@ type Query { organizations: [Organization!]! @auth(user: true) supergraph(ref: String!, isAfter: String): Supergraph! @auth(organization: true) + latestSchema(ref: String!): SchemaUpdate! @auth(organization: true) } type Mutation { diff --git a/graph/schema.resolvers.go b/graph/schema.resolvers.go index 00f6202..b426df5 100644 --- a/graph/schema.resolvers.go +++ b/graph/schema.resolvers.go @@ -238,6 +238,72 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str }, nil } +// LatestSchema is the resolver for the latestSchema field. +func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.SchemaUpdate, error) { + orgId := middleware.OrganizationFromContext(ctx) + + r.Logger.Info("LatestSchema query", + "ref", ref, + "orgId", orgId, + ) + + _, err := r.apiKeyCanAccessRef(ctx, ref, false) + if err != nil { + r.Logger.Error("API key cannot access ref", "error", err, "ref", ref) + return nil, err + } + + // Get current services and schema + services, lastUpdate := r.Cache.Services(orgId, ref, "") + r.Logger.Info("Fetching latest schema", + "ref", ref, + "orgId", orgId, + "lastUpdate", lastUpdate, + "servicesCount", len(services), + ) + + subGraphs := make([]*model.SubGraph, len(services)) + for i, id := range services { + sg, err := r.fetchSubGraph(ctx, id) + if err != nil { + r.Logger.Error("fetch subgraph", "error", err, "id", id) + return nil, err + } + subGraphs[i] = &model.SubGraph{ + ID: sg.ID.String(), + Service: sg.Service, + URL: sg.Url, + WsURL: sg.WSUrl, + Sdl: sg.Sdl, + ChangedBy: sg.ChangedBy, + ChangedAt: sg.ChangedAt, + } + } + + // Generate Cosmo router config + cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs) + if err != nil { + r.Logger.Error("generate cosmo config", "error", err) + cosmoConfig = "" // Return empty if generation fails + } + + update := &model.SchemaUpdate{ + Ref: ref, + ID: lastUpdate, + SubGraphs: subGraphs, + CosmoRouterConfig: &cosmoConfig, + } + + r.Logger.Info("Latest schema fetched", + "ref", update.Ref, + "id", update.ID, + "subGraphsCount", len(update.SubGraphs), + "cosmoConfigLength", len(cosmoConfig), + ) + + return update, nil +} + // SchemaUpdates is the resolver for the schemaUpdates field. func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) { orgId := middleware.OrganizationFromContext(ctx)