feat: add Cosmo Router config generation and PubSub support

Creates a new `GenerateCosmoRouterConfig` function to build and 
serialize a Cosmo Router configuration from subgraphs. Implements 
PubSub mechanism for managing schema updates, allowing 
subscription to updates. Adds Subscription resolver and updates 
existing structures to accommodate new functionalities. This 
enhances the system's capabilities for dynamic updates and 
configuration management.
This commit is contained in:
2025-11-19 11:29:30 +01:00
parent f6e4458efa
commit 80daed081d
10 changed files with 1135 additions and 2 deletions
+377
View File
@@ -42,6 +42,7 @@ type Config struct {
type ResolverRoot interface {
Mutation() MutationResolver
Query() QueryResolver
Subscription() SubscriptionResolver
}
type DirectiveRoot struct {
@@ -77,6 +78,13 @@ type ComplexityRoot struct {
Supergraph func(childComplexity int, ref string, isAfter *string) int
}
SchemaUpdate struct {
CosmoRouterConfig func(childComplexity int) int
ID func(childComplexity int) int
Ref func(childComplexity int) int
SubGraphs func(childComplexity int) int
}
SubGraph struct {
ChangedAt func(childComplexity int) int
ChangedBy func(childComplexity int) int
@@ -94,6 +102,10 @@ type ComplexityRoot struct {
SubGraphs func(childComplexity int) int
}
Subscription struct {
SchemaUpdates func(childComplexity int, ref string) int
}
Unchanged struct {
ID func(childComplexity int) int
MinDelaySeconds func(childComplexity int) int
@@ -113,6 +125,9 @@ type QueryResolver interface {
Organizations(ctx context.Context) ([]*model.Organization, error)
Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error)
}
type SubscriptionResolver interface {
SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error)
}
type executableSchema struct {
schema *ast.Schema
@@ -253,6 +268,31 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.Query.Supergraph(childComplexity, args["ref"].(string), args["isAfter"].(*string)), true
case "SchemaUpdate.cosmoRouterConfig":
if e.complexity.SchemaUpdate.CosmoRouterConfig == nil {
break
}
return e.complexity.SchemaUpdate.CosmoRouterConfig(childComplexity), true
case "SchemaUpdate.id":
if e.complexity.SchemaUpdate.ID == nil {
break
}
return e.complexity.SchemaUpdate.ID(childComplexity), true
case "SchemaUpdate.ref":
if e.complexity.SchemaUpdate.Ref == nil {
break
}
return e.complexity.SchemaUpdate.Ref(childComplexity), true
case "SchemaUpdate.subGraphs":
if e.complexity.SchemaUpdate.SubGraphs == nil {
break
}
return e.complexity.SchemaUpdate.SubGraphs(childComplexity), true
case "SubGraph.changedAt":
if e.complexity.SubGraph.ChangedAt == nil {
break
@@ -321,6 +361,18 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
return e.complexity.SubGraphs.SubGraphs(childComplexity), true
case "Subscription.schemaUpdates":
if e.complexity.Subscription.SchemaUpdates == nil {
break
}
args, err := ec.field_Subscription_schemaUpdates_args(ctx, rawArgs)
if err != nil {
return 0, false
}
return e.complexity.Subscription.SchemaUpdates(childComplexity, args["ref"].(string)), true
case "Unchanged.id":
if e.complexity.Unchanged.ID == nil {
break
@@ -396,6 +448,23 @@ func (e *executableSchema) Exec(ctx context.Context) graphql.ResponseHandler {
var buf bytes.Buffer
data.MarshalGQL(&buf)
return &graphql.Response{
Data: buf.Bytes(),
}
}
case ast.Subscription:
next := ec._Subscription(ctx, opCtx.Operation.SelectionSet)
var buf bytes.Buffer
return func(ctx context.Context) *graphql.Response {
buf.Reset()
data := next(ctx)
if data == nil {
return nil
}
data.MarshalGQL(&buf)
return &graphql.Response{
Data: buf.Bytes(),
}
@@ -459,6 +528,10 @@ type Mutation {
updateSubGraph(input: InputSubGraph!): SubGraph! @auth(organization: true)
}
type Subscription {
schemaUpdates(ref: String!): SchemaUpdate! @auth(organization: true)
}
type Organization {
id: ID!
name: String!
@@ -504,6 +577,13 @@ type SubGraph {
changedAt: Time!
}
type SchemaUpdate {
ref: String!
id: ID!
subGraphs: [SubGraph!]!
cosmoRouterConfig: String
}
input InputAPIKey {
name: String!
organizationId: ID!
@@ -607,6 +687,17 @@ func (ec *executionContext) field_Query_supergraph_args(ctx context.Context, raw
return args, nil
}
func (ec *executionContext) field_Subscription_schemaUpdates_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
var err error
args := map[string]any{}
arg0, err := graphql.ProcessArgField(ctx, rawArgs, "ref", ec.unmarshalNString2string)
if err != nil {
return nil, err
}
args["ref"] = arg0
return args, nil
}
func (ec *executionContext) field___Directive_args_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
var err error
args := map[string]any{}
@@ -1451,6 +1542,138 @@ func (ec *executionContext) fieldContext_Query___schema(_ context.Context, field
return fc, nil
}
func (ec *executionContext) _SchemaUpdate_ref(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
ec.OperationContext,
field,
ec.fieldContext_SchemaUpdate_ref,
func(ctx context.Context) (any, error) {
return obj.Ref, nil
},
nil,
ec.marshalNString2string,
true,
true,
)
}
func (ec *executionContext) fieldContext_SchemaUpdate_ref(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "SchemaUpdate",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _SchemaUpdate_id(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
ec.OperationContext,
field,
ec.fieldContext_SchemaUpdate_id,
func(ctx context.Context) (any, error) {
return obj.ID, nil
},
nil,
ec.marshalNID2string,
true,
true,
)
}
func (ec *executionContext) fieldContext_SchemaUpdate_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "SchemaUpdate",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type ID does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _SchemaUpdate_subGraphs(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
ec.OperationContext,
field,
ec.fieldContext_SchemaUpdate_subGraphs,
func(ctx context.Context) (any, error) {
return obj.SubGraphs, nil
},
nil,
ec.marshalNSubGraph2ᚕᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSubGraphᚄ,
true,
true,
)
}
func (ec *executionContext) fieldContext_SchemaUpdate_subGraphs(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "SchemaUpdate",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name {
case "id":
return ec.fieldContext_SubGraph_id(ctx, field)
case "service":
return ec.fieldContext_SubGraph_service(ctx, field)
case "url":
return ec.fieldContext_SubGraph_url(ctx, field)
case "wsUrl":
return ec.fieldContext_SubGraph_wsUrl(ctx, field)
case "sdl":
return ec.fieldContext_SubGraph_sdl(ctx, field)
case "changedBy":
return ec.fieldContext_SubGraph_changedBy(ctx, field)
case "changedAt":
return ec.fieldContext_SubGraph_changedAt(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type SubGraph", field.Name)
},
}
return fc, nil
}
func (ec *executionContext) _SchemaUpdate_cosmoRouterConfig(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
ec.OperationContext,
field,
ec.fieldContext_SchemaUpdate_cosmoRouterConfig,
func(ctx context.Context) (any, error) {
return obj.CosmoRouterConfig, nil
},
nil,
ec.marshalOString2ᚖstring,
true,
false,
)
}
func (ec *executionContext) fieldContext_SchemaUpdate_cosmoRouterConfig(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "SchemaUpdate",
Field: field,
IsMethod: false,
IsResolver: false,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
return nil, errors.New("field of type String does not have child fields")
},
}
return fc, nil
}
func (ec *executionContext) _SubGraph_id(ctx context.Context, field graphql.CollectedField, obj *model.SubGraph) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
@@ -1786,6 +2009,75 @@ func (ec *executionContext) fieldContext_SubGraphs_subGraphs(_ context.Context,
return fc, nil
}
func (ec *executionContext) _Subscription_schemaUpdates(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) {
return graphql.ResolveFieldStream(
ctx,
ec.OperationContext,
field,
ec.fieldContext_Subscription_schemaUpdates,
func(ctx context.Context) (any, error) {
fc := graphql.GetFieldContext(ctx)
return ec.resolvers.Subscription().SchemaUpdates(ctx, fc.Args["ref"].(string))
},
func(ctx context.Context, next graphql.Resolver) graphql.Resolver {
directive0 := next
directive1 := func(ctx context.Context) (any, error) {
organization, err := ec.unmarshalOBoolean2ᚖbool(ctx, true)
if err != nil {
var zeroVal *model.SchemaUpdate
return zeroVal, err
}
if ec.directives.Auth == nil {
var zeroVal *model.SchemaUpdate
return zeroVal, errors.New("directive auth is not implemented")
}
return ec.directives.Auth(ctx, nil, directive0, nil, organization)
}
next = directive1
return next
},
ec.marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate,
true,
true,
)
}
func (ec *executionContext) fieldContext_Subscription_schemaUpdates(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
fc = &graphql.FieldContext{
Object: "Subscription",
Field: field,
IsMethod: true,
IsResolver: true,
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
switch field.Name {
case "ref":
return ec.fieldContext_SchemaUpdate_ref(ctx, field)
case "id":
return ec.fieldContext_SchemaUpdate_id(ctx, field)
case "subGraphs":
return ec.fieldContext_SchemaUpdate_subGraphs(ctx, field)
case "cosmoRouterConfig":
return ec.fieldContext_SchemaUpdate_cosmoRouterConfig(ctx, field)
}
return nil, fmt.Errorf("no field named %q was found under type SchemaUpdate", field.Name)
},
}
defer func() {
if r := recover(); r != nil {
err = ec.Recover(ctx, r)
ec.Error(ctx, err)
}
}()
ctx = graphql.WithFieldContext(ctx, fc)
if fc.Args, err = ec.field_Subscription_schemaUpdates_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
ec.Error(ctx, err)
return fc, err
}
return fc, nil
}
func (ec *executionContext) _Unchanged_id(ctx context.Context, field graphql.CollectedField, obj *model.Unchanged) (ret graphql.Marshaler) {
return graphql.ResolveField(
ctx,
@@ -3737,6 +4029,57 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
return out
}
var schemaUpdateImplementors = []string{"SchemaUpdate"}
func (ec *executionContext) _SchemaUpdate(ctx context.Context, sel ast.SelectionSet, obj *model.SchemaUpdate) graphql.Marshaler {
fields := graphql.CollectFields(ec.OperationContext, sel, schemaUpdateImplementors)
out := graphql.NewFieldSet(fields)
deferred := make(map[string]*graphql.FieldSet)
for i, field := range fields {
switch field.Name {
case "__typename":
out.Values[i] = graphql.MarshalString("SchemaUpdate")
case "ref":
out.Values[i] = ec._SchemaUpdate_ref(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
case "id":
out.Values[i] = ec._SchemaUpdate_id(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
case "subGraphs":
out.Values[i] = ec._SchemaUpdate_subGraphs(ctx, field, obj)
if out.Values[i] == graphql.Null {
out.Invalids++
}
case "cosmoRouterConfig":
out.Values[i] = ec._SchemaUpdate_cosmoRouterConfig(ctx, field, obj)
default:
panic("unknown field " + strconv.Quote(field.Name))
}
}
out.Dispatch(ctx)
if out.Invalids > 0 {
return graphql.Null
}
atomic.AddInt32(&ec.deferred, int32(len(deferred)))
for label, dfs := range deferred {
ec.processDeferredGroup(graphql.DeferredGroup{
Label: label,
Path: graphql.GetPath(ctx),
FieldSet: dfs,
Context: ctx,
})
}
return out
}
var subGraphImplementors = []string{"SubGraph"}
func (ec *executionContext) _SubGraph(ctx context.Context, sel ast.SelectionSet, obj *model.SubGraph) graphql.Marshaler {
@@ -3854,6 +4197,26 @@ func (ec *executionContext) _SubGraphs(ctx context.Context, sel ast.SelectionSet
return out
}
var subscriptionImplementors = []string{"Subscription"}
func (ec *executionContext) _Subscription(ctx context.Context, sel ast.SelectionSet) func(ctx context.Context) graphql.Marshaler {
fields := graphql.CollectFields(ec.OperationContext, sel, subscriptionImplementors)
ctx = graphql.WithFieldContext(ctx, &graphql.FieldContext{
Object: "Subscription",
})
if len(fields) != 1 {
ec.Errorf(ctx, "must subscribe to exactly one stream")
return nil
}
switch fields[0].Name {
case "schemaUpdates":
return ec._Subscription_schemaUpdates(ctx, fields[0])
default:
panic("unknown field " + strconv.Quote(fields[0].Name))
}
}
var unchangedImplementors = []string{"Unchanged", "Supergraph"}
func (ec *executionContext) _Unchanged(ctx context.Context, sel ast.SelectionSet, obj *model.Unchanged) graphql.Marshaler {
@@ -4441,6 +4804,20 @@ func (ec *executionContext) marshalNOrganization2ᚖgitlabᚗcomᚋunboundsoftwa
return ec._Organization(ctx, sel, v)
}
func (ec *executionContext) marshalNSchemaUpdate2gitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate(ctx context.Context, sel ast.SelectionSet, v model.SchemaUpdate) graphql.Marshaler {
return ec._SchemaUpdate(ctx, sel, &v)
}
func (ec *executionContext) marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate(ctx context.Context, sel ast.SelectionSet, v *model.SchemaUpdate) graphql.Marshaler {
if v == nil {
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
}
return graphql.Null
}
return ec._SchemaUpdate(ctx, sel, v)
}
func (ec *executionContext) unmarshalNString2string(ctx context.Context, v any) (string, error) {
res, err := graphql.UnmarshalString(v)
return res, graphql.ErrorOnPath(ctx, err)