feat: add Cosmo Router config generation and PubSub support #625
@@ -195,6 +195,7 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u
|
|||||||
Publisher: eventPublisher,
|
Publisher: eventPublisher,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
Cache: serviceCache,
|
Cache: serviceCache,
|
||||||
|
PubSub: graph.NewPubSub(),
|
||||||
}
|
}
|
||||||
|
|
||||||
config := generated.Config{
|
config := generated.Config{
|
||||||
|
|||||||
@@ -0,0 +1,54 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GenerateCosmoRouterConfig generates a Cosmo Router execution config from subgraphs
|
||||||
|
func GenerateCosmoRouterConfig(subGraphs []*model.SubGraph) (string, error) {
|
||||||
|
// Build the Cosmo router config structure
|
||||||
|
// This is a simplified version - you may need to adjust based on actual Cosmo requirements
|
||||||
|
config := map[string]interface{}{
|
||||||
|
"version": "1",
|
||||||
|
"subgraphs": convertSubGraphsToCosmo(subGraphs),
|
||||||
|
// Add other Cosmo-specific configuration as needed
|
||||||
|
}
|
||||||
|
|
||||||
|
// Marshal to JSON
|
||||||
|
configJSON, err := json.MarshalIndent(config, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("marshal cosmo config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(configJSON), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertSubGraphsToCosmo(subGraphs []*model.SubGraph) []map[string]interface{} {
|
||||||
|
cosmoSubgraphs := make([]map[string]interface{}, 0, len(subGraphs))
|
||||||
|
|
||||||
|
for _, sg := range subGraphs {
|
||||||
|
cosmoSg := map[string]interface{}{
|
||||||
|
"name": sg.Service,
|
||||||
|
"sdl": sg.Sdl,
|
||||||
|
}
|
||||||
|
|
||||||
|
if sg.URL != nil {
|
||||||
|
cosmoSg["routing_url"] = *sg.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
if sg.WsURL != nil {
|
||||||
|
cosmoSg["subscription"] = map[string]interface{}{
|
||||||
|
"url": *sg.WsURL,
|
||||||
|
"protocol": "ws",
|
||||||
|
"websocket_subprotocol": "graphql-ws",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cosmoSubgraphs = append(cosmoSubgraphs, cosmoSg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cosmoSubgraphs
|
||||||
|
}
|
||||||
@@ -0,0 +1,258 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGenerateCosmoRouterConfig(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
subGraphs []*model.SubGraph
|
||||||
|
wantErr bool
|
||||||
|
validate func(t *testing.T, config string)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "single subgraph with all fields",
|
||||||
|
subGraphs: []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "test-service",
|
||||||
|
URL: stringPtr("http://localhost:4001/query"),
|
||||||
|
WsURL: stringPtr("ws://localhost:4001/query"),
|
||||||
|
Sdl: "type Query { test: String }",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
validate: func(t *testing.T, config string) {
|
||||||
|
var result map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(config), &result)
|
||||||
|
require.NoError(t, err, "Config should be valid JSON")
|
||||||
|
|
||||||
|
assert.Equal(t, "1", result["version"], "Version should be 1")
|
||||||
|
|
||||||
|
subgraphs, ok := result["subgraphs"].([]interface{})
|
||||||
|
require.True(t, ok, "subgraphs should be an array")
|
||||||
|
require.Len(t, subgraphs, 1, "Should have 1 subgraph")
|
||||||
|
|
||||||
|
sg := subgraphs[0].(map[string]interface{})
|
||||||
|
assert.Equal(t, "test-service", sg["name"])
|
||||||
|
assert.Equal(t, "http://localhost:4001/query", sg["routing_url"])
|
||||||
|
assert.Equal(t, "type Query { test: String }", sg["sdl"])
|
||||||
|
|
||||||
|
subscription, ok := sg["subscription"].(map[string]interface{})
|
||||||
|
require.True(t, ok, "Should have subscription config")
|
||||||
|
assert.Equal(t, "ws://localhost:4001/query", subscription["url"])
|
||||||
|
assert.Equal(t, "ws", subscription["protocol"])
|
||||||
|
assert.Equal(t, "graphql-ws", subscription["websocket_subprotocol"])
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple subgraphs",
|
||||||
|
subGraphs: []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "service-1",
|
||||||
|
URL: stringPtr("http://localhost:4001/query"),
|
||||||
|
Sdl: "type Query { field1: String }",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Service: "service-2",
|
||||||
|
URL: stringPtr("http://localhost:4002/query"),
|
||||||
|
Sdl: "type Query { field2: String }",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Service: "service-3",
|
||||||
|
URL: stringPtr("http://localhost:4003/query"),
|
||||||
|
WsURL: stringPtr("ws://localhost:4003/query"),
|
||||||
|
Sdl: "type Subscription { updates: String }",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
validate: func(t *testing.T, config string) {
|
||||||
|
var result map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(config), &result)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
subgraphs := result["subgraphs"].([]interface{})
|
||||||
|
assert.Len(t, subgraphs, 3, "Should have 3 subgraphs")
|
||||||
|
|
||||||
|
// Check first service has no subscription
|
||||||
|
sg1 := subgraphs[0].(map[string]interface{})
|
||||||
|
assert.Equal(t, "service-1", sg1["name"])
|
||||||
|
_, hasSubscription := sg1["subscription"]
|
||||||
|
assert.False(t, hasSubscription, "service-1 should not have subscription config")
|
||||||
|
|
||||||
|
// Check third service has subscription
|
||||||
|
sg3 := subgraphs[2].(map[string]interface{})
|
||||||
|
assert.Equal(t, "service-3", sg3["name"])
|
||||||
|
subscription, hasSubscription := sg3["subscription"]
|
||||||
|
assert.True(t, hasSubscription, "service-3 should have subscription config")
|
||||||
|
assert.NotNil(t, subscription)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "subgraph with no URL",
|
||||||
|
subGraphs: []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "test-service",
|
||||||
|
URL: nil,
|
||||||
|
WsURL: nil,
|
||||||
|
Sdl: "type Query { test: String }",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
validate: func(t *testing.T, config string) {
|
||||||
|
var result map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(config), &result)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
subgraphs := result["subgraphs"].([]interface{})
|
||||||
|
sg := subgraphs[0].(map[string]interface{})
|
||||||
|
|
||||||
|
// Should not have routing_url or subscription fields if URLs are nil
|
||||||
|
_, hasRoutingURL := sg["routing_url"]
|
||||||
|
assert.False(t, hasRoutingURL, "Should not have routing_url when URL is nil")
|
||||||
|
|
||||||
|
_, hasSubscription := sg["subscription"]
|
||||||
|
assert.False(t, hasSubscription, "Should not have subscription when WsURL is nil")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty subgraphs",
|
||||||
|
subGraphs: []*model.SubGraph{},
|
||||||
|
wantErr: false,
|
||||||
|
validate: func(t *testing.T, config string) {
|
||||||
|
var result map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(config), &result)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
subgraphs := result["subgraphs"].([]interface{})
|
||||||
|
assert.Len(t, subgraphs, 0, "Should have empty subgraphs array")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "nil subgraphs",
|
||||||
|
subGraphs: nil,
|
||||||
|
wantErr: false,
|
||||||
|
validate: func(t *testing.T, config string) {
|
||||||
|
var result map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(config), &result)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
subgraphs := result["subgraphs"].([]interface{})
|
||||||
|
assert.Len(t, subgraphs, 0, "Should handle nil subgraphs as empty array")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "complex SDL with multiple types",
|
||||||
|
subGraphs: []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "complex-service",
|
||||||
|
URL: stringPtr("http://localhost:4001/query"),
|
||||||
|
Sdl: `
|
||||||
|
type Query {
|
||||||
|
user(id: ID!): User
|
||||||
|
users: [User!]!
|
||||||
|
}
|
||||||
|
|
||||||
|
type User {
|
||||||
|
id: ID!
|
||||||
|
name: String!
|
||||||
|
email: String!
|
||||||
|
}
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
validate: func(t *testing.T, config string) {
|
||||||
|
var result map[string]interface{}
|
||||||
|
err := json.Unmarshal([]byte(config), &result)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
subgraphs := result["subgraphs"].([]interface{})
|
||||||
|
sg := subgraphs[0].(map[string]interface{})
|
||||||
|
sdl := sg["sdl"].(string)
|
||||||
|
|
||||||
|
assert.Contains(t, sdl, "type Query")
|
||||||
|
assert.Contains(t, sdl, "type User")
|
||||||
|
assert.Contains(t, sdl, "email: String!")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
config, err := GenerateCosmoRouterConfig(tt.subGraphs)
|
||||||
|
|
||||||
|
if tt.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, config, "Config should not be empty")
|
||||||
|
|
||||||
|
if tt.validate != nil {
|
||||||
|
tt.validate(t, config)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConvertSubGraphsToCosmo(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
subGraphs []*model.SubGraph
|
||||||
|
wantLen int
|
||||||
|
validate func(t *testing.T, result []map[string]interface{})
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "preserves subgraph order",
|
||||||
|
subGraphs: []*model.SubGraph{
|
||||||
|
{Service: "alpha", URL: stringPtr("http://a"), Sdl: "a"},
|
||||||
|
{Service: "beta", URL: stringPtr("http://b"), Sdl: "b"},
|
||||||
|
{Service: "gamma", URL: stringPtr("http://c"), Sdl: "c"},
|
||||||
|
},
|
||||||
|
wantLen: 3,
|
||||||
|
validate: func(t *testing.T, result []map[string]interface{}) {
|
||||||
|
assert.Equal(t, "alpha", result[0]["name"])
|
||||||
|
assert.Equal(t, "beta", result[1]["name"])
|
||||||
|
assert.Equal(t, "gamma", result[2]["name"])
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "includes SDL exactly as provided",
|
||||||
|
subGraphs: []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "test",
|
||||||
|
URL: stringPtr("http://test"),
|
||||||
|
Sdl: "type Query { special: String! }",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantLen: 1,
|
||||||
|
validate: func(t *testing.T, result []map[string]interface{}) {
|
||||||
|
assert.Equal(t, "type Query { special: String! }", result[0]["sdl"])
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
result := convertSubGraphsToCosmo(tt.subGraphs)
|
||||||
|
assert.Len(t, result, tt.wantLen)
|
||||||
|
|
||||||
|
if tt.validate != nil {
|
||||||
|
tt.validate(t, result)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function for tests
|
||||||
|
func stringPtr(s string) *string {
|
||||||
|
return &s
|
||||||
|
}
|
||||||
@@ -42,6 +42,7 @@ type Config struct {
|
|||||||
type ResolverRoot interface {
|
type ResolverRoot interface {
|
||||||
Mutation() MutationResolver
|
Mutation() MutationResolver
|
||||||
Query() QueryResolver
|
Query() QueryResolver
|
||||||
|
Subscription() SubscriptionResolver
|
||||||
}
|
}
|
||||||
|
|
||||||
type DirectiveRoot struct {
|
type DirectiveRoot struct {
|
||||||
@@ -77,6 +78,13 @@ type ComplexityRoot struct {
|
|||||||
Supergraph func(childComplexity int, ref string, isAfter *string) int
|
Supergraph func(childComplexity int, ref string, isAfter *string) int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SchemaUpdate struct {
|
||||||
|
CosmoRouterConfig func(childComplexity int) int
|
||||||
|
ID func(childComplexity int) int
|
||||||
|
Ref func(childComplexity int) int
|
||||||
|
SubGraphs func(childComplexity int) int
|
||||||
|
}
|
||||||
|
|
||||||
SubGraph struct {
|
SubGraph struct {
|
||||||
ChangedAt func(childComplexity int) int
|
ChangedAt func(childComplexity int) int
|
||||||
ChangedBy func(childComplexity int) int
|
ChangedBy func(childComplexity int) int
|
||||||
@@ -94,6 +102,10 @@ type ComplexityRoot struct {
|
|||||||
SubGraphs func(childComplexity int) int
|
SubGraphs func(childComplexity int) int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Subscription struct {
|
||||||
|
SchemaUpdates func(childComplexity int, ref string) int
|
||||||
|
}
|
||||||
|
|
||||||
Unchanged struct {
|
Unchanged struct {
|
||||||
ID func(childComplexity int) int
|
ID func(childComplexity int) int
|
||||||
MinDelaySeconds func(childComplexity int) int
|
MinDelaySeconds func(childComplexity int) int
|
||||||
@@ -113,6 +125,9 @@ type QueryResolver interface {
|
|||||||
Organizations(ctx context.Context) ([]*model.Organization, error)
|
Organizations(ctx context.Context) ([]*model.Organization, error)
|
||||||
Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error)
|
Supergraph(ctx context.Context, ref string, isAfter *string) (model.Supergraph, error)
|
||||||
}
|
}
|
||||||
|
type SubscriptionResolver interface {
|
||||||
|
SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error)
|
||||||
|
}
|
||||||
|
|
||||||
type executableSchema struct {
|
type executableSchema struct {
|
||||||
schema *ast.Schema
|
schema *ast.Schema
|
||||||
@@ -253,6 +268,31 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
|||||||
|
|
||||||
return e.complexity.Query.Supergraph(childComplexity, args["ref"].(string), args["isAfter"].(*string)), true
|
return e.complexity.Query.Supergraph(childComplexity, args["ref"].(string), args["isAfter"].(*string)), true
|
||||||
|
|
||||||
|
case "SchemaUpdate.cosmoRouterConfig":
|
||||||
|
if e.complexity.SchemaUpdate.CosmoRouterConfig == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.complexity.SchemaUpdate.CosmoRouterConfig(childComplexity), true
|
||||||
|
case "SchemaUpdate.id":
|
||||||
|
if e.complexity.SchemaUpdate.ID == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.complexity.SchemaUpdate.ID(childComplexity), true
|
||||||
|
case "SchemaUpdate.ref":
|
||||||
|
if e.complexity.SchemaUpdate.Ref == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.complexity.SchemaUpdate.Ref(childComplexity), true
|
||||||
|
case "SchemaUpdate.subGraphs":
|
||||||
|
if e.complexity.SchemaUpdate.SubGraphs == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.complexity.SchemaUpdate.SubGraphs(childComplexity), true
|
||||||
|
|
||||||
case "SubGraph.changedAt":
|
case "SubGraph.changedAt":
|
||||||
if e.complexity.SubGraph.ChangedAt == nil {
|
if e.complexity.SubGraph.ChangedAt == nil {
|
||||||
break
|
break
|
||||||
@@ -321,6 +361,18 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
|
|||||||
|
|
||||||
return e.complexity.SubGraphs.SubGraphs(childComplexity), true
|
return e.complexity.SubGraphs.SubGraphs(childComplexity), true
|
||||||
|
|
||||||
|
case "Subscription.schemaUpdates":
|
||||||
|
if e.complexity.Subscription.SchemaUpdates == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
args, err := ec.field_Subscription_schemaUpdates_args(ctx, rawArgs)
|
||||||
|
if err != nil {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.complexity.Subscription.SchemaUpdates(childComplexity, args["ref"].(string)), true
|
||||||
|
|
||||||
case "Unchanged.id":
|
case "Unchanged.id":
|
||||||
if e.complexity.Unchanged.ID == nil {
|
if e.complexity.Unchanged.ID == nil {
|
||||||
break
|
break
|
||||||
@@ -396,6 +448,23 @@ func (e *executableSchema) Exec(ctx context.Context) graphql.ResponseHandler {
|
|||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
data.MarshalGQL(&buf)
|
data.MarshalGQL(&buf)
|
||||||
|
|
||||||
|
return &graphql.Response{
|
||||||
|
Data: buf.Bytes(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case ast.Subscription:
|
||||||
|
next := ec._Subscription(ctx, opCtx.Operation.SelectionSet)
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
return func(ctx context.Context) *graphql.Response {
|
||||||
|
buf.Reset()
|
||||||
|
data := next(ctx)
|
||||||
|
|
||||||
|
if data == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
data.MarshalGQL(&buf)
|
||||||
|
|
||||||
return &graphql.Response{
|
return &graphql.Response{
|
||||||
Data: buf.Bytes(),
|
Data: buf.Bytes(),
|
||||||
}
|
}
|
||||||
@@ -459,6 +528,10 @@ type Mutation {
|
|||||||
updateSubGraph(input: InputSubGraph!): SubGraph! @auth(organization: true)
|
updateSubGraph(input: InputSubGraph!): SubGraph! @auth(organization: true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Subscription {
|
||||||
|
schemaUpdates(ref: String!): SchemaUpdate! @auth(organization: true)
|
||||||
|
}
|
||||||
|
|
||||||
type Organization {
|
type Organization {
|
||||||
id: ID!
|
id: ID!
|
||||||
name: String!
|
name: String!
|
||||||
@@ -504,6 +577,13 @@ type SubGraph {
|
|||||||
changedAt: Time!
|
changedAt: Time!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SchemaUpdate {
|
||||||
|
ref: String!
|
||||||
|
id: ID!
|
||||||
|
subGraphs: [SubGraph!]!
|
||||||
|
cosmoRouterConfig: String
|
||||||
|
}
|
||||||
|
|
||||||
input InputAPIKey {
|
input InputAPIKey {
|
||||||
name: String!
|
name: String!
|
||||||
organizationId: ID!
|
organizationId: ID!
|
||||||
@@ -607,6 +687,17 @@ func (ec *executionContext) field_Query_supergraph_args(ctx context.Context, raw
|
|||||||
return args, nil
|
return args, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) field_Subscription_schemaUpdates_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
||||||
|
var err error
|
||||||
|
args := map[string]any{}
|
||||||
|
arg0, err := graphql.ProcessArgField(ctx, rawArgs, "ref", ec.unmarshalNString2string)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
args["ref"] = arg0
|
||||||
|
return args, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ec *executionContext) field___Directive_args_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
func (ec *executionContext) field___Directive_args_args(ctx context.Context, rawArgs map[string]any) (map[string]any, error) {
|
||||||
var err error
|
var err error
|
||||||
args := map[string]any{}
|
args := map[string]any{}
|
||||||
@@ -1451,6 +1542,138 @@ func (ec *executionContext) fieldContext_Query___schema(_ context.Context, field
|
|||||||
return fc, nil
|
return fc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) _SchemaUpdate_ref(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
|
||||||
|
return graphql.ResolveField(
|
||||||
|
ctx,
|
||||||
|
ec.OperationContext,
|
||||||
|
field,
|
||||||
|
ec.fieldContext_SchemaUpdate_ref,
|
||||||
|
func(ctx context.Context) (any, error) {
|
||||||
|
return obj.Ref, nil
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
ec.marshalNString2string,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) fieldContext_SchemaUpdate_ref(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||||
|
fc = &graphql.FieldContext{
|
||||||
|
Object: "SchemaUpdate",
|
||||||
|
Field: field,
|
||||||
|
IsMethod: false,
|
||||||
|
IsResolver: false,
|
||||||
|
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||||
|
return nil, errors.New("field of type String does not have child fields")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return fc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) _SchemaUpdate_id(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
|
||||||
|
return graphql.ResolveField(
|
||||||
|
ctx,
|
||||||
|
ec.OperationContext,
|
||||||
|
field,
|
||||||
|
ec.fieldContext_SchemaUpdate_id,
|
||||||
|
func(ctx context.Context) (any, error) {
|
||||||
|
return obj.ID, nil
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
ec.marshalNID2string,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) fieldContext_SchemaUpdate_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||||
|
fc = &graphql.FieldContext{
|
||||||
|
Object: "SchemaUpdate",
|
||||||
|
Field: field,
|
||||||
|
IsMethod: false,
|
||||||
|
IsResolver: false,
|
||||||
|
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||||
|
return nil, errors.New("field of type ID does not have child fields")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return fc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) _SchemaUpdate_subGraphs(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
|
||||||
|
return graphql.ResolveField(
|
||||||
|
ctx,
|
||||||
|
ec.OperationContext,
|
||||||
|
field,
|
||||||
|
ec.fieldContext_SchemaUpdate_subGraphs,
|
||||||
|
func(ctx context.Context) (any, error) {
|
||||||
|
return obj.SubGraphs, nil
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
ec.marshalNSubGraph2ᚕᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSubGraphᚄ,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) fieldContext_SchemaUpdate_subGraphs(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||||
|
fc = &graphql.FieldContext{
|
||||||
|
Object: "SchemaUpdate",
|
||||||
|
Field: field,
|
||||||
|
IsMethod: false,
|
||||||
|
IsResolver: false,
|
||||||
|
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||||
|
switch field.Name {
|
||||||
|
case "id":
|
||||||
|
return ec.fieldContext_SubGraph_id(ctx, field)
|
||||||
|
case "service":
|
||||||
|
return ec.fieldContext_SubGraph_service(ctx, field)
|
||||||
|
case "url":
|
||||||
|
return ec.fieldContext_SubGraph_url(ctx, field)
|
||||||
|
case "wsUrl":
|
||||||
|
return ec.fieldContext_SubGraph_wsUrl(ctx, field)
|
||||||
|
case "sdl":
|
||||||
|
return ec.fieldContext_SubGraph_sdl(ctx, field)
|
||||||
|
case "changedBy":
|
||||||
|
return ec.fieldContext_SubGraph_changedBy(ctx, field)
|
||||||
|
case "changedAt":
|
||||||
|
return ec.fieldContext_SubGraph_changedAt(ctx, field)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("no field named %q was found under type SubGraph", field.Name)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return fc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) _SchemaUpdate_cosmoRouterConfig(ctx context.Context, field graphql.CollectedField, obj *model.SchemaUpdate) (ret graphql.Marshaler) {
|
||||||
|
return graphql.ResolveField(
|
||||||
|
ctx,
|
||||||
|
ec.OperationContext,
|
||||||
|
field,
|
||||||
|
ec.fieldContext_SchemaUpdate_cosmoRouterConfig,
|
||||||
|
func(ctx context.Context) (any, error) {
|
||||||
|
return obj.CosmoRouterConfig, nil
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
ec.marshalOString2ᚖstring,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) fieldContext_SchemaUpdate_cosmoRouterConfig(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||||
|
fc = &graphql.FieldContext{
|
||||||
|
Object: "SchemaUpdate",
|
||||||
|
Field: field,
|
||||||
|
IsMethod: false,
|
||||||
|
IsResolver: false,
|
||||||
|
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||||
|
return nil, errors.New("field of type String does not have child fields")
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return fc, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ec *executionContext) _SubGraph_id(ctx context.Context, field graphql.CollectedField, obj *model.SubGraph) (ret graphql.Marshaler) {
|
func (ec *executionContext) _SubGraph_id(ctx context.Context, field graphql.CollectedField, obj *model.SubGraph) (ret graphql.Marshaler) {
|
||||||
return graphql.ResolveField(
|
return graphql.ResolveField(
|
||||||
ctx,
|
ctx,
|
||||||
@@ -1786,6 +2009,75 @@ func (ec *executionContext) fieldContext_SubGraphs_subGraphs(_ context.Context,
|
|||||||
return fc, nil
|
return fc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) _Subscription_schemaUpdates(ctx context.Context, field graphql.CollectedField) (ret func(ctx context.Context) graphql.Marshaler) {
|
||||||
|
return graphql.ResolveFieldStream(
|
||||||
|
ctx,
|
||||||
|
ec.OperationContext,
|
||||||
|
field,
|
||||||
|
ec.fieldContext_Subscription_schemaUpdates,
|
||||||
|
func(ctx context.Context) (any, error) {
|
||||||
|
fc := graphql.GetFieldContext(ctx)
|
||||||
|
return ec.resolvers.Subscription().SchemaUpdates(ctx, fc.Args["ref"].(string))
|
||||||
|
},
|
||||||
|
func(ctx context.Context, next graphql.Resolver) graphql.Resolver {
|
||||||
|
directive0 := next
|
||||||
|
|
||||||
|
directive1 := func(ctx context.Context) (any, error) {
|
||||||
|
organization, err := ec.unmarshalOBoolean2ᚖbool(ctx, true)
|
||||||
|
if err != nil {
|
||||||
|
var zeroVal *model.SchemaUpdate
|
||||||
|
return zeroVal, err
|
||||||
|
}
|
||||||
|
if ec.directives.Auth == nil {
|
||||||
|
var zeroVal *model.SchemaUpdate
|
||||||
|
return zeroVal, errors.New("directive auth is not implemented")
|
||||||
|
}
|
||||||
|
return ec.directives.Auth(ctx, nil, directive0, nil, organization)
|
||||||
|
}
|
||||||
|
|
||||||
|
next = directive1
|
||||||
|
return next
|
||||||
|
},
|
||||||
|
ec.marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) fieldContext_Subscription_schemaUpdates(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
|
||||||
|
fc = &graphql.FieldContext{
|
||||||
|
Object: "Subscription",
|
||||||
|
Field: field,
|
||||||
|
IsMethod: true,
|
||||||
|
IsResolver: true,
|
||||||
|
Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
|
||||||
|
switch field.Name {
|
||||||
|
case "ref":
|
||||||
|
return ec.fieldContext_SchemaUpdate_ref(ctx, field)
|
||||||
|
case "id":
|
||||||
|
return ec.fieldContext_SchemaUpdate_id(ctx, field)
|
||||||
|
case "subGraphs":
|
||||||
|
return ec.fieldContext_SchemaUpdate_subGraphs(ctx, field)
|
||||||
|
case "cosmoRouterConfig":
|
||||||
|
return ec.fieldContext_SchemaUpdate_cosmoRouterConfig(ctx, field)
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("no field named %q was found under type SchemaUpdate", field.Name)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
err = ec.Recover(ctx, r)
|
||||||
|
ec.Error(ctx, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
ctx = graphql.WithFieldContext(ctx, fc)
|
||||||
|
if fc.Args, err = ec.field_Subscription_schemaUpdates_args(ctx, field.ArgumentMap(ec.Variables)); err != nil {
|
||||||
|
ec.Error(ctx, err)
|
||||||
|
return fc, err
|
||||||
|
}
|
||||||
|
return fc, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (ec *executionContext) _Unchanged_id(ctx context.Context, field graphql.CollectedField, obj *model.Unchanged) (ret graphql.Marshaler) {
|
func (ec *executionContext) _Unchanged_id(ctx context.Context, field graphql.CollectedField, obj *model.Unchanged) (ret graphql.Marshaler) {
|
||||||
return graphql.ResolveField(
|
return graphql.ResolveField(
|
||||||
ctx,
|
ctx,
|
||||||
@@ -3737,6 +4029,57 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var schemaUpdateImplementors = []string{"SchemaUpdate"}
|
||||||
|
|
||||||
|
func (ec *executionContext) _SchemaUpdate(ctx context.Context, sel ast.SelectionSet, obj *model.SchemaUpdate) graphql.Marshaler {
|
||||||
|
fields := graphql.CollectFields(ec.OperationContext, sel, schemaUpdateImplementors)
|
||||||
|
|
||||||
|
out := graphql.NewFieldSet(fields)
|
||||||
|
deferred := make(map[string]*graphql.FieldSet)
|
||||||
|
for i, field := range fields {
|
||||||
|
switch field.Name {
|
||||||
|
case "__typename":
|
||||||
|
out.Values[i] = graphql.MarshalString("SchemaUpdate")
|
||||||
|
case "ref":
|
||||||
|
out.Values[i] = ec._SchemaUpdate_ref(ctx, field, obj)
|
||||||
|
if out.Values[i] == graphql.Null {
|
||||||
|
out.Invalids++
|
||||||
|
}
|
||||||
|
case "id":
|
||||||
|
out.Values[i] = ec._SchemaUpdate_id(ctx, field, obj)
|
||||||
|
if out.Values[i] == graphql.Null {
|
||||||
|
out.Invalids++
|
||||||
|
}
|
||||||
|
case "subGraphs":
|
||||||
|
out.Values[i] = ec._SchemaUpdate_subGraphs(ctx, field, obj)
|
||||||
|
if out.Values[i] == graphql.Null {
|
||||||
|
out.Invalids++
|
||||||
|
}
|
||||||
|
case "cosmoRouterConfig":
|
||||||
|
out.Values[i] = ec._SchemaUpdate_cosmoRouterConfig(ctx, field, obj)
|
||||||
|
default:
|
||||||
|
panic("unknown field " + strconv.Quote(field.Name))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.Dispatch(ctx)
|
||||||
|
if out.Invalids > 0 {
|
||||||
|
return graphql.Null
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddInt32(&ec.deferred, int32(len(deferred)))
|
||||||
|
|
||||||
|
for label, dfs := range deferred {
|
||||||
|
ec.processDeferredGroup(graphql.DeferredGroup{
|
||||||
|
Label: label,
|
||||||
|
Path: graphql.GetPath(ctx),
|
||||||
|
FieldSet: dfs,
|
||||||
|
Context: ctx,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
var subGraphImplementors = []string{"SubGraph"}
|
var subGraphImplementors = []string{"SubGraph"}
|
||||||
|
|
||||||
func (ec *executionContext) _SubGraph(ctx context.Context, sel ast.SelectionSet, obj *model.SubGraph) graphql.Marshaler {
|
func (ec *executionContext) _SubGraph(ctx context.Context, sel ast.SelectionSet, obj *model.SubGraph) graphql.Marshaler {
|
||||||
@@ -3854,6 +4197,26 @@ func (ec *executionContext) _SubGraphs(ctx context.Context, sel ast.SelectionSet
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var subscriptionImplementors = []string{"Subscription"}
|
||||||
|
|
||||||
|
func (ec *executionContext) _Subscription(ctx context.Context, sel ast.SelectionSet) func(ctx context.Context) graphql.Marshaler {
|
||||||
|
fields := graphql.CollectFields(ec.OperationContext, sel, subscriptionImplementors)
|
||||||
|
ctx = graphql.WithFieldContext(ctx, &graphql.FieldContext{
|
||||||
|
Object: "Subscription",
|
||||||
|
})
|
||||||
|
if len(fields) != 1 {
|
||||||
|
ec.Errorf(ctx, "must subscribe to exactly one stream")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch fields[0].Name {
|
||||||
|
case "schemaUpdates":
|
||||||
|
return ec._Subscription_schemaUpdates(ctx, fields[0])
|
||||||
|
default:
|
||||||
|
panic("unknown field " + strconv.Quote(fields[0].Name))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var unchangedImplementors = []string{"Unchanged", "Supergraph"}
|
var unchangedImplementors = []string{"Unchanged", "Supergraph"}
|
||||||
|
|
||||||
func (ec *executionContext) _Unchanged(ctx context.Context, sel ast.SelectionSet, obj *model.Unchanged) graphql.Marshaler {
|
func (ec *executionContext) _Unchanged(ctx context.Context, sel ast.SelectionSet, obj *model.Unchanged) graphql.Marshaler {
|
||||||
@@ -4441,6 +4804,20 @@ func (ec *executionContext) marshalNOrganization2ᚖgitlabᚗcomᚋunboundsoftwa
|
|||||||
return ec._Organization(ctx, sel, v)
|
return ec._Organization(ctx, sel, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) marshalNSchemaUpdate2gitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate(ctx context.Context, sel ast.SelectionSet, v model.SchemaUpdate) graphql.Marshaler {
|
||||||
|
return ec._SchemaUpdate(ctx, sel, &v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *executionContext) marshalNSchemaUpdate2ᚖgitlabᚗcomᚋunboundsoftwareᚋschemasᚋgraphᚋmodelᚐSchemaUpdate(ctx context.Context, sel ast.SelectionSet, v *model.SchemaUpdate) graphql.Marshaler {
|
||||||
|
if v == nil {
|
||||||
|
if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) {
|
||||||
|
ec.Errorf(ctx, "the requested element is null which the schema does not allow")
|
||||||
|
}
|
||||||
|
return graphql.Null
|
||||||
|
}
|
||||||
|
return ec._SchemaUpdate(ctx, sel, v)
|
||||||
|
}
|
||||||
|
|
||||||
func (ec *executionContext) unmarshalNString2string(ctx context.Context, v any) (string, error) {
|
func (ec *executionContext) unmarshalNString2string(ctx context.Context, v any) (string, error) {
|
||||||
res, err := graphql.UnmarshalString(v)
|
res, err := graphql.UnmarshalString(v)
|
||||||
return res, graphql.ErrorOnPath(ctx, err)
|
return res, graphql.ErrorOnPath(ctx, err)
|
||||||
|
|||||||
@@ -49,6 +49,13 @@ type Organization struct {
|
|||||||
type Query struct {
|
type Query struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SchemaUpdate struct {
|
||||||
|
Ref string `json:"ref"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
SubGraphs []*SubGraph `json:"subGraphs"`
|
||||||
|
CosmoRouterConfig *string `json:"cosmoRouterConfig,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
type SubGraph struct {
|
type SubGraph struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
Service string `json:"service"`
|
Service string `json:"service"`
|
||||||
@@ -68,6 +75,9 @@ type SubGraphs struct {
|
|||||||
|
|
||||||
func (SubGraphs) IsSupergraph() {}
|
func (SubGraphs) IsSupergraph() {}
|
||||||
|
|
||||||
|
type Subscription struct {
|
||||||
|
}
|
||||||
|
|
||||||
type Unchanged struct {
|
type Unchanged struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
MinDelaySeconds int `json:"minDelaySeconds"`
|
MinDelaySeconds int `json:"minDelaySeconds"`
|
||||||
|
|||||||
@@ -0,0 +1,66 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PubSub handles publishing schema updates to subscribers
|
||||||
|
type PubSub struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
subscribers map[string][]chan *model.SchemaUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPubSub() *PubSub {
|
||||||
|
return &PubSub{
|
||||||
|
subscribers: make(map[string][]chan *model.SchemaUpdate),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe creates a new subscription channel for a given schema ref
|
||||||
|
func (ps *PubSub) Subscribe(ref string) chan *model.SchemaUpdate {
|
||||||
|
ps.mu.Lock()
|
||||||
|
defer ps.mu.Unlock()
|
||||||
|
|
||||||
|
ch := make(chan *model.SchemaUpdate, 10)
|
||||||
|
ps.subscribers[ref] = append(ps.subscribers[ref], ch)
|
||||||
|
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe removes a subscription channel
|
||||||
|
func (ps *PubSub) Unsubscribe(ref string, ch chan *model.SchemaUpdate) {
|
||||||
|
ps.mu.Lock()
|
||||||
|
defer ps.mu.Unlock()
|
||||||
|
|
||||||
|
subs := ps.subscribers[ref]
|
||||||
|
for i, sub := range subs {
|
||||||
|
if sub == ch {
|
||||||
|
// Remove this subscriber
|
||||||
|
ps.subscribers[ref] = append(subs[:i], subs[i+1:]...)
|
||||||
|
close(sub)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up empty subscriber lists
|
||||||
|
if len(ps.subscribers[ref]) == 0 {
|
||||||
|
delete(ps.subscribers, ref)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish sends a schema update to all subscribers of a given ref
|
||||||
|
func (ps *PubSub) Publish(ref string, update *model.SchemaUpdate) {
|
||||||
|
ps.mu.RLock()
|
||||||
|
defer ps.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, ch := range ps.subscribers[ref] {
|
||||||
|
// Non-blocking send - if subscriber is slow, skip
|
||||||
|
select {
|
||||||
|
case ch <- update:
|
||||||
|
default:
|
||||||
|
// Channel full, subscriber is too slow - skip this update
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,256 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"gitlab.com/unboundsoftware/schemas/graph/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPubSub_SubscribeAndPublish(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
ref := "Test@dev"
|
||||||
|
|
||||||
|
// Subscribe
|
||||||
|
ch := ps.Subscribe(ref)
|
||||||
|
require.NotNil(t, ch, "Subscribe should return a channel")
|
||||||
|
|
||||||
|
// Publish
|
||||||
|
update := &model.SchemaUpdate{
|
||||||
|
Ref: ref,
|
||||||
|
ID: "test-id-1",
|
||||||
|
SubGraphs: []*model.SubGraph{
|
||||||
|
{
|
||||||
|
ID: "sg1",
|
||||||
|
Service: "test-service",
|
||||||
|
Sdl: "type Query { test: String }",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
go ps.Publish(ref, update)
|
||||||
|
|
||||||
|
// Receive
|
||||||
|
select {
|
||||||
|
case received := <-ch:
|
||||||
|
assert.Equal(t, update.Ref, received.Ref, "Ref should match")
|
||||||
|
assert.Equal(t, update.ID, received.ID, "ID should match")
|
||||||
|
assert.Equal(t, len(update.SubGraphs), len(received.SubGraphs), "SubGraphs count should match")
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Fatal("Timeout waiting for published update")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_MultipleSubscribers(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
ref := "Test@dev"
|
||||||
|
|
||||||
|
// Create multiple subscribers
|
||||||
|
ch1 := ps.Subscribe(ref)
|
||||||
|
ch2 := ps.Subscribe(ref)
|
||||||
|
ch3 := ps.Subscribe(ref)
|
||||||
|
|
||||||
|
update := &model.SchemaUpdate{
|
||||||
|
Ref: ref,
|
||||||
|
ID: "test-id-2",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish once
|
||||||
|
ps.Publish(ref, update)
|
||||||
|
|
||||||
|
// All subscribers should receive the update
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(3)
|
||||||
|
|
||||||
|
checkReceived := func(ch <-chan *model.SchemaUpdate, name string) {
|
||||||
|
defer wg.Done()
|
||||||
|
select {
|
||||||
|
case received := <-ch:
|
||||||
|
assert.Equal(t, update.ID, received.ID, "%s should receive correct update", name)
|
||||||
|
case <-time.After(1 * time.Second):
|
||||||
|
t.Errorf("%s: Timeout waiting for update", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go checkReceived(ch1, "Subscriber 1")
|
||||||
|
go checkReceived(ch2, "Subscriber 2")
|
||||||
|
go checkReceived(ch3, "Subscriber 3")
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_DifferentRefs(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
|
||||||
|
ref1 := "Test1@dev"
|
||||||
|
ref2 := "Test2@dev"
|
||||||
|
|
||||||
|
ch1 := ps.Subscribe(ref1)
|
||||||
|
ch2 := ps.Subscribe(ref2)
|
||||||
|
|
||||||
|
update1 := &model.SchemaUpdate{Ref: ref1, ID: "id1"}
|
||||||
|
update2 := &model.SchemaUpdate{Ref: ref2, ID: "id2"}
|
||||||
|
|
||||||
|
// Publish to ref1
|
||||||
|
ps.Publish(ref1, update1)
|
||||||
|
|
||||||
|
// Only ch1 should receive
|
||||||
|
select {
|
||||||
|
case received := <-ch1:
|
||||||
|
assert.Equal(t, "id1", received.ID)
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("ch1 should have received update")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ch2 should not receive ref1's update
|
||||||
|
select {
|
||||||
|
case <-ch2:
|
||||||
|
t.Fatal("ch2 should not receive ref1's update")
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
// Expected - no update
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish to ref2
|
||||||
|
ps.Publish(ref2, update2)
|
||||||
|
|
||||||
|
// Now ch2 should receive
|
||||||
|
select {
|
||||||
|
case received := <-ch2:
|
||||||
|
assert.Equal(t, "id2", received.ID)
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("ch2 should have received update")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_Unsubscribe(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
ref := "Test@dev"
|
||||||
|
|
||||||
|
ch := ps.Subscribe(ref)
|
||||||
|
|
||||||
|
// Unsubscribe
|
||||||
|
ps.Unsubscribe(ref, ch)
|
||||||
|
|
||||||
|
// Channel should be closed
|
||||||
|
_, ok := <-ch
|
||||||
|
assert.False(t, ok, "Channel should be closed after unsubscribe")
|
||||||
|
|
||||||
|
// Publishing after unsubscribe should not panic
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
ps.Publish(ref, &model.SchemaUpdate{Ref: ref})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_BufferedChannel(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
ref := "Test@dev"
|
||||||
|
|
||||||
|
ch := ps.Subscribe(ref)
|
||||||
|
|
||||||
|
// Publish multiple updates quickly (up to buffer size of 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
update := &model.SchemaUpdate{
|
||||||
|
Ref: ref,
|
||||||
|
ID: string(rune('a' + i)),
|
||||||
|
}
|
||||||
|
ps.Publish(ref, update)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All 10 should be buffered and receivable
|
||||||
|
received := 0
|
||||||
|
timeout := time.After(1 * time.Second)
|
||||||
|
|
||||||
|
for received < 10 {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
received++
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatalf("Only received %d out of 10 updates", received)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, 10, received, "Should receive all buffered updates")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_SlowSubscriber(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
ref := "Test@dev"
|
||||||
|
|
||||||
|
ch := ps.Subscribe(ref)
|
||||||
|
|
||||||
|
// Fill the buffer (10 items)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ps.Publish(ref, &model.SchemaUpdate{Ref: ref})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish one more - this should be dropped (channel full, non-blocking send)
|
||||||
|
ps.Publish(ref, &model.SchemaUpdate{Ref: ref, ID: "should-be-dropped"})
|
||||||
|
|
||||||
|
// Drain the channel
|
||||||
|
count := 0
|
||||||
|
timeout := time.After(500 * time.Millisecond)
|
||||||
|
|
||||||
|
drainLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case update := <-ch:
|
||||||
|
count++
|
||||||
|
// Should not receive the dropped update
|
||||||
|
assert.NotEqual(t, "should-be-dropped", update.ID, "Should not receive dropped update")
|
||||||
|
case <-timeout:
|
||||||
|
break drainLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should have received exactly 10 (the buffer size), not 11
|
||||||
|
assert.Equal(t, 10, count, "Should only receive buffered updates, not the dropped one")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSub_ConcurrentPublish(t *testing.T) {
|
||||||
|
ps := NewPubSub()
|
||||||
|
ref := "Test@dev"
|
||||||
|
|
||||||
|
ch := ps.Subscribe(ref)
|
||||||
|
|
||||||
|
numPublishers := 10
|
||||||
|
updatesPerPublisher := 10
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(numPublishers)
|
||||||
|
|
||||||
|
// Multiple goroutines publishing concurrently
|
||||||
|
for i := 0; i < numPublishers; i++ {
|
||||||
|
go func(publisherID int) {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < updatesPerPublisher; j++ {
|
||||||
|
ps.Publish(ref, &model.SchemaUpdate{
|
||||||
|
Ref: ref,
|
||||||
|
ID: string(rune('a' + publisherID)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Should not panic and subscriber should receive updates
|
||||||
|
// (exact count may vary due to buffer and timing)
|
||||||
|
timeout := time.After(1 * time.Second)
|
||||||
|
received := 0
|
||||||
|
|
||||||
|
receiveLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
received++
|
||||||
|
case <-timeout:
|
||||||
|
break receiveLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Greater(t, received, 0, "Should have received some updates")
|
||||||
|
}
|
||||||
@@ -28,6 +28,7 @@ type Resolver struct {
|
|||||||
Publisher Publisher
|
Publisher Publisher
|
||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
Cache *cache.Cache
|
Cache *cache.Cache
|
||||||
|
PubSub *PubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) {
|
func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) {
|
||||||
|
|||||||
@@ -9,6 +9,10 @@ type Mutation {
|
|||||||
updateSubGraph(input: InputSubGraph!): SubGraph! @auth(organization: true)
|
updateSubGraph(input: InputSubGraph!): SubGraph! @auth(organization: true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Subscription {
|
||||||
|
schemaUpdates(ref: String!): SchemaUpdate! @auth(organization: true)
|
||||||
|
}
|
||||||
|
|
||||||
type Organization {
|
type Organization {
|
||||||
id: ID!
|
id: ID!
|
||||||
name: String!
|
name: String!
|
||||||
@@ -54,6 +58,13 @@ type SubGraph {
|
|||||||
changedAt: Time!
|
changedAt: Time!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SchemaUpdate {
|
||||||
|
ref: String!
|
||||||
|
id: ID!
|
||||||
|
subGraphs: [SubGraph!]!
|
||||||
|
cosmoRouterConfig: String
|
||||||
|
}
|
||||||
|
|
||||||
input InputAPIKey {
|
input InputAPIKey {
|
||||||
name: String!
|
name: String!
|
||||||
organizationId: ID!
|
organizationId: ID!
|
||||||
|
|||||||
+101
-2
@@ -119,6 +119,44 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Publish schema update to subscribers
|
||||||
|
go func() {
|
||||||
|
services, lastUpdate := r.Cache.Services(orgId, input.Ref, "")
|
||||||
|
subGraphs := make([]*model.SubGraph, len(services))
|
||||||
|
for i, id := range services {
|
||||||
|
sg, err := r.fetchSubGraph(context.Background(), id)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("fetch subgraph for update notification", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
subGraphs[i] = &model.SubGraph{
|
||||||
|
ID: sg.ID.String(),
|
||||||
|
Service: sg.Service,
|
||||||
|
URL: sg.Url,
|
||||||
|
WsURL: sg.WSUrl,
|
||||||
|
Sdl: sg.Sdl,
|
||||||
|
ChangedBy: sg.ChangedBy,
|
||||||
|
ChangedAt: sg.ChangedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate Cosmo router config
|
||||||
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("generate cosmo config for update", "error", err)
|
||||||
|
cosmoConfig = "" // Send empty if generation fails
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish to all subscribers of this ref
|
||||||
|
r.PubSub.Publish(input.Ref, &model.SchemaUpdate{
|
||||||
|
Ref: input.Ref,
|
||||||
|
ID: lastUpdate,
|
||||||
|
SubGraphs: subGraphs,
|
||||||
|
CosmoRouterConfig: &cosmoConfig,
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
return r.toGqlSubGraph(subGraph), nil
|
return r.toGqlSubGraph(subGraph), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,13 +222,74 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SchemaUpdates is the resolver for the schemaUpdates field.
|
||||||
|
func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<-chan *model.SchemaUpdate, error) {
|
||||||
|
orgId := middleware.OrganizationFromContext(ctx)
|
||||||
|
_, err := r.apiKeyCanAccessRef(ctx, ref, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe to updates for this ref
|
||||||
|
ch := r.PubSub.Subscribe(ref)
|
||||||
|
|
||||||
|
// Send initial state immediately
|
||||||
|
go func() {
|
||||||
|
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
||||||
|
subGraphs := make([]*model.SubGraph, len(services))
|
||||||
|
for i, id := range services {
|
||||||
|
sg, err := r.fetchSubGraph(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("fetch subgraph for initial update", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
subGraphs[i] = &model.SubGraph{
|
||||||
|
ID: sg.ID.String(),
|
||||||
|
Service: sg.Service,
|
||||||
|
URL: sg.Url,
|
||||||
|
WsURL: sg.WSUrl,
|
||||||
|
Sdl: sg.Sdl,
|
||||||
|
ChangedBy: sg.ChangedBy,
|
||||||
|
ChangedAt: sg.ChangedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate Cosmo router config
|
||||||
|
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("generate cosmo config", "error", err)
|
||||||
|
cosmoConfig = "" // Send empty if generation fails
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send initial update
|
||||||
|
ch <- &model.SchemaUpdate{
|
||||||
|
Ref: ref,
|
||||||
|
ID: lastUpdate,
|
||||||
|
SubGraphs: subGraphs,
|
||||||
|
CosmoRouterConfig: &cosmoConfig,
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Clean up subscription when context is done
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
r.PubSub.Unsubscribe(ref, ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Mutation returns generated.MutationResolver implementation.
|
// Mutation returns generated.MutationResolver implementation.
|
||||||
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
|
func (r *Resolver) Mutation() generated.MutationResolver { return &mutationResolver{r} }
|
||||||
|
|
||||||
// Query returns generated.QueryResolver implementation.
|
// Query returns generated.QueryResolver implementation.
|
||||||
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
func (r *Resolver) Query() generated.QueryResolver { return &queryResolver{r} }
|
||||||
|
|
||||||
|
// Subscription returns generated.SubscriptionResolver implementation.
|
||||||
|
func (r *Resolver) Subscription() generated.SubscriptionResolver { return &subscriptionResolver{r} }
|
||||||
|
|
||||||
type (
|
type (
|
||||||
mutationResolver struct{ *Resolver }
|
mutationResolver struct{ *Resolver }
|
||||||
queryResolver struct{ *Resolver }
|
queryResolver struct{ *Resolver }
|
||||||
|
subscriptionResolver struct{ *Resolver }
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user