Compare commits
11 Commits
2fbe44fea1
...
v0.9.4
| Author | SHA1 | Date | |
|---|---|---|---|
| 3bb0511277 | |||
| f0fadb4ab7 | |||
| 59c128fe0c | |||
| 9518403394 | |||
|
3b4e513653
|
|||
| fe1de50ded | |||
| 2d243469ed | |||
| a23aa5e5e9 | |||
| 2286e092d9 | |||
|
28aa32ad8c
|
|||
| a9885f8b65 |
@@ -24,7 +24,7 @@ jobs:
|
|||||||
- name: Install goreleaser
|
- name: Install goreleaser
|
||||||
uses: goreleaser/goreleaser-action@v7
|
uses: goreleaser/goreleaser-action@v7
|
||||||
with:
|
with:
|
||||||
version: '~> v2'
|
version: 'v2.13.3'
|
||||||
install-only: true
|
install-only: true
|
||||||
- name: Release
|
- name: Release
|
||||||
run: |
|
run: |
|
||||||
|
|||||||
@@ -2,6 +2,36 @@
|
|||||||
|
|
||||||
All notable changes to this project will be documented in this file.
|
All notable changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [0.9.4] - 2026-02-23
|
||||||
|
|
||||||
|
### 🐛 Bug Fixes
|
||||||
|
|
||||||
|
- *(ci)* Pin goreleaser to v2.13.3 for Gitea SDK compatibility
|
||||||
|
|
||||||
|
## [0.9.3] - 2026-02-23
|
||||||
|
|
||||||
|
### 🐛 Bug Fixes
|
||||||
|
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.249 (#702)
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.250 (#704)
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.251
|
||||||
|
- *(deps)* Update module gitlab.com/unboundsoftware/eventsourced/pg to v1.18.3
|
||||||
|
- *(deps)* Update module github.com/vektah/gqlparser/v2 to v2.5.32
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.252 (#714)
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.253 (#716)
|
||||||
|
- *(deps)* Update module github.com/99designs/gqlgen to v0.17.87
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.254 (#718)
|
||||||
|
- *(deps)* Update module github.com/wundergraph/graphql-go-tools/v2 to v2.0.0-rc.255
|
||||||
|
- *(deps)* Update module github.com/pressly/goose/v3 to v3.27.0
|
||||||
|
- *(deps)* Update module gitlab.com/unboundsoftware/eventsourced/pg to v1.18.4 (#727)
|
||||||
|
- Prevent OOM on rapid schema publishing
|
||||||
|
|
||||||
|
### ⚙️ Miscellaneous Tasks
|
||||||
|
|
||||||
|
- *(deps)* Update pre-commit hook golangci/golangci-lint to v2.10.1 (#706)
|
||||||
|
- *(deps)* Update goreleaser/goreleaser-action action to v7
|
||||||
|
- *(deps)* Update actions/setup-node action to v6
|
||||||
|
|
||||||
## [0.9.2] - 2026-02-13
|
## [0.9.2] - 2026-02-13
|
||||||
|
|
||||||
### 🐛 Bug Fixes
|
### 🐛 Bug Fixes
|
||||||
|
|||||||
@@ -201,11 +201,13 @@ func start(closeEvents chan error, logger *slog.Logger, connectToAmqpFunc func(u
|
|||||||
defer rootCancel()
|
defer rootCancel()
|
||||||
|
|
||||||
resolver := &graph.Resolver{
|
resolver := &graph.Resolver{
|
||||||
EventStore: eventStore,
|
EventStore: eventStore,
|
||||||
Publisher: eventPublisher,
|
Publisher: eventPublisher,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
Cache: serviceCache,
|
Cache: serviceCache,
|
||||||
PubSub: graph.NewPubSub(),
|
PubSub: graph.NewPubSub(),
|
||||||
|
CosmoGenerator: graph.NewCosmoGenerator(&graph.DefaultCommandExecutor{}, 60*time.Second),
|
||||||
|
Debouncer: graph.NewDebouncer(500 * time.Millisecond),
|
||||||
}
|
}
|
||||||
|
|
||||||
config := generated.Config{
|
config := generated.Config{
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ require (
|
|||||||
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.255
|
github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.255
|
||||||
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1
|
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1
|
||||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4
|
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4
|
||||||
gitlab.com/unboundsoftware/eventsourced/pg v1.18.3
|
gitlab.com/unboundsoftware/eventsourced/pg v1.18.4
|
||||||
go.opentelemetry.io/contrib/bridges/otelslog v0.15.0
|
go.opentelemetry.io/contrib/bridges/otelslog v0.15.0
|
||||||
go.opentelemetry.io/otel v1.40.0
|
go.opentelemetry.io/otel v1.40.0
|
||||||
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0
|
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0
|
||||||
@@ -31,6 +31,7 @@ require (
|
|||||||
go.opentelemetry.io/otel/sdk/metric v1.40.0
|
go.opentelemetry.io/otel/sdk/metric v1.40.0
|
||||||
go.opentelemetry.io/otel/trace v1.40.0
|
go.opentelemetry.io/otel/trace v1.40.0
|
||||||
golang.org/x/crypto v0.48.0
|
golang.org/x/crypto v0.48.0
|
||||||
|
golang.org/x/sync v0.19.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -80,7 +81,6 @@ require (
|
|||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/mod v0.33.0 // indirect
|
golang.org/x/mod v0.33.0 // indirect
|
||||||
golang.org/x/net v0.50.0 // indirect
|
golang.org/x/net v0.50.0 // indirect
|
||||||
golang.org/x/sync v0.19.0 // indirect
|
|
||||||
golang.org/x/sys v0.41.0 // indirect
|
golang.org/x/sys v0.41.0 // indirect
|
||||||
golang.org/x/text v0.34.0 // indirect
|
golang.org/x/text v0.34.0 // indirect
|
||||||
golang.org/x/tools v0.42.0 // indirect
|
golang.org/x/tools v0.42.0 // indirect
|
||||||
|
|||||||
@@ -204,8 +204,8 @@ gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1 h1:X6269JoAzHIKCVmtgMHZH3m7x
|
|||||||
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1/go.mod h1:EAs0d6Eh0aDiQkUJlSWErHqgHFQdxx0e8I7aG/2FarY=
|
gitlab.com/unboundsoftware/eventsourced/amqp v1.9.1/go.mod h1:EAs0d6Eh0aDiQkUJlSWErHqgHFQdxx0e8I7aG/2FarY=
|
||||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4 h1:+yZkhi9/sTyBEN5vJTfvycyXgGrm07QKGSh3jiWiQdM=
|
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4 h1:+yZkhi9/sTyBEN5vJTfvycyXgGrm07QKGSh3jiWiQdM=
|
||||||
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4/go.mod h1:LrA7I7etRmhIC1PjO8c26BHm+gWsy2rC3eSMe5+XUWE=
|
gitlab.com/unboundsoftware/eventsourced/eventsourced v1.19.4/go.mod h1:LrA7I7etRmhIC1PjO8c26BHm+gWsy2rC3eSMe5+XUWE=
|
||||||
gitlab.com/unboundsoftware/eventsourced/pg v1.18.3 h1:TnGJAOmcikVX423C8Rv6G/5sPFD4i8jcKJlS7oyv5h0=
|
gitlab.com/unboundsoftware/eventsourced/pg v1.18.4 h1:ei0xdaACXw6/54w5hPscGUlJUzHJm6MQoeUP7hPqbJA=
|
||||||
gitlab.com/unboundsoftware/eventsourced/pg v1.18.3/go.mod h1:gTG4Fb+zBcFLs8jGbHCySrYPtst9T9jKlANrTxk0oxA=
|
gitlab.com/unboundsoftware/eventsourced/pg v1.18.4/go.mod h1:IryGlvRa02/IAASbGqoMHTC2Q4WHXr2QY7fLUVN3mL0=
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||||
go.opentelemetry.io/contrib/bridges/otelslog v0.15.0 h1:yOYhGNPZseueTTvWp5iBD3/CthrmvayUXYEX862dDi4=
|
go.opentelemetry.io/contrib/bridges/otelslog v0.15.0 h1:yOYhGNPZseueTTvWp5iBD3/CthrmvayUXYEX862dDi4=
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
package graph
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
|
||||||
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
|
"gitea.unbound.se/unboundsoftware/schemas/graph/model"
|
||||||
@@ -123,3 +126,36 @@ func GenerateCosmoRouterConfigWithExecutor(subGraphs []*model.SubGraph, executor
|
|||||||
|
|
||||||
return string(configJSON), nil
|
return string(configJSON), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CosmoGenerator wraps config generation with a concurrency limit and timeout
|
||||||
|
// to prevent unbounded wgc process spawning under rapid schema updates.
|
||||||
|
type CosmoGenerator struct {
|
||||||
|
sem *semaphore.Weighted
|
||||||
|
executor CommandExecutor
|
||||||
|
timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCosmoGenerator creates a CosmoGenerator that allows at most one concurrent
|
||||||
|
// wgc process and applies the given timeout to each generation attempt.
|
||||||
|
func NewCosmoGenerator(executor CommandExecutor, timeout time.Duration) *CosmoGenerator {
|
||||||
|
return &CosmoGenerator{
|
||||||
|
sem: semaphore.NewWeighted(1),
|
||||||
|
executor: executor,
|
||||||
|
timeout: timeout,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate produces a Cosmo Router config, blocking if another generation is
|
||||||
|
// already in progress. The provided context (plus the configured timeout)
|
||||||
|
// controls cancellation.
|
||||||
|
func (g *CosmoGenerator) Generate(ctx context.Context, subGraphs []*model.SubGraph) (string, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, g.timeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := g.sem.Acquire(ctx, 1); err != nil {
|
||||||
|
return "", fmt.Errorf("acquire cosmo generator: %w", err)
|
||||||
|
}
|
||||||
|
defer g.sem.Release(1)
|
||||||
|
|
||||||
|
return GenerateCosmoRouterConfigWithExecutor(subGraphs, g.executor)
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,11 +1,15 @@
|
|||||||
package graph
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -459,6 +463,114 @@ func TestGenerateCosmoRouterConfig_MockError(t *testing.T) {
|
|||||||
assert.Equal(t, 1, mockExecutor.CallCount, "Should have attempted to call executor")
|
assert.Equal(t, 1, mockExecutor.CallCount, "Should have attempted to call executor")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SlowMockExecutor simulates a slow wgc command for concurrency testing.
|
||||||
|
type SlowMockExecutor struct {
|
||||||
|
MockCommandExecutor
|
||||||
|
delay time.Duration
|
||||||
|
mu sync.Mutex
|
||||||
|
concurrent atomic.Int32
|
||||||
|
maxSeen atomic.Int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *SlowMockExecutor) Execute(name string, args ...string) ([]byte, error) {
|
||||||
|
cur := m.concurrent.Add(1)
|
||||||
|
// Track the maximum concurrent executions observed.
|
||||||
|
for {
|
||||||
|
old := m.maxSeen.Load()
|
||||||
|
if cur <= old || m.maxSeen.CompareAndSwap(old, cur) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer m.concurrent.Add(-1)
|
||||||
|
|
||||||
|
time.Sleep(m.delay)
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
return m.MockCommandExecutor.Execute(name, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCosmoGenerator_ConcurrencyLimit(t *testing.T) {
|
||||||
|
executor := &SlowMockExecutor{delay: 100 * time.Millisecond}
|
||||||
|
gen := NewCosmoGenerator(executor, 5*time.Second)
|
||||||
|
|
||||||
|
subGraphs := []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "svc",
|
||||||
|
URL: stringPtr("http://localhost:4001/query"),
|
||||||
|
Sdl: "type Query { hello: String }",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for range 5 {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
_, _ = gen.Generate(context.Background(), subGraphs)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
assert.Equal(t, int32(1), executor.maxSeen.Load(),
|
||||||
|
"at most 1 wgc process should run concurrently")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCosmoGenerator_Timeout(t *testing.T) {
|
||||||
|
// Executor that takes longer than the timeout.
|
||||||
|
executor := &SlowMockExecutor{delay: 500 * time.Millisecond}
|
||||||
|
gen := NewCosmoGenerator(executor, 50*time.Millisecond)
|
||||||
|
|
||||||
|
subGraphs := []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "svc",
|
||||||
|
URL: stringPtr("http://localhost:4001/query"),
|
||||||
|
Sdl: "type Query { hello: String }",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// First call: occupies the semaphore for 500ms.
|
||||||
|
go func() {
|
||||||
|
_, _ = gen.Generate(context.Background(), subGraphs)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Give the first goroutine time to acquire the semaphore.
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
|
||||||
|
// Second call: should timeout waiting for the semaphore.
|
||||||
|
_, err := gen.Generate(context.Background(), subGraphs)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "acquire cosmo generator")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCosmoGenerator_ContextCancellation(t *testing.T) {
|
||||||
|
executor := &SlowMockExecutor{delay: 500 * time.Millisecond}
|
||||||
|
gen := NewCosmoGenerator(executor, 5*time.Second)
|
||||||
|
|
||||||
|
subGraphs := []*model.SubGraph{
|
||||||
|
{
|
||||||
|
Service: "svc",
|
||||||
|
URL: stringPtr("http://localhost:4001/query"),
|
||||||
|
Sdl: "type Query { hello: String }",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// First call: occupies the semaphore.
|
||||||
|
go func() {
|
||||||
|
_, _ = gen.Generate(context.Background(), subGraphs)
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
|
||||||
|
// Second call with an already-cancelled context.
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
_, err := gen.Generate(ctx, subGraphs)
|
||||||
|
require.Error(t, err)
|
||||||
|
assert.Contains(t, err.Error(), "acquire cosmo generator")
|
||||||
|
}
|
||||||
|
|
||||||
// Helper function for tests
|
// Helper function for tests
|
||||||
func stringPtr(s string) *string {
|
func stringPtr(s string) *string {
|
||||||
return &s
|
return &s
|
||||||
|
|||||||
@@ -0,0 +1,42 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Debouncer coalesces rapid calls with the same key, executing only the last
|
||||||
|
// one after a configurable delay. This prevents redundant work when multiple
|
||||||
|
// updates arrive in quick succession (e.g., rapid schema publishing).
|
||||||
|
type Debouncer struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
delay time.Duration
|
||||||
|
timers map[string]*time.Timer
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDebouncer creates a Debouncer with the given delay window.
|
||||||
|
func NewDebouncer(delay time.Duration) *Debouncer {
|
||||||
|
return &Debouncer{
|
||||||
|
delay: delay,
|
||||||
|
timers: make(map[string]*time.Timer),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debounce resets the timer for key. When the timer fires (after delay with no
|
||||||
|
// new calls for the same key), fn is executed in a new goroutine.
|
||||||
|
func (d *Debouncer) Debounce(key string, fn func()) {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
|
||||||
|
if t, ok := d.timers[key]; ok {
|
||||||
|
t.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
d.timers[key] = time.AfterFunc(d.delay, func() {
|
||||||
|
d.mu.Lock()
|
||||||
|
delete(d.timers, key)
|
||||||
|
d.mu.Unlock()
|
||||||
|
|
||||||
|
fn()
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDebouncer_Coalesces(t *testing.T) {
|
||||||
|
d := NewDebouncer(50 * time.Millisecond)
|
||||||
|
var calls atomic.Int32
|
||||||
|
|
||||||
|
// Fire 10 rapid calls for the same key — only the last should execute.
|
||||||
|
for range 10 {
|
||||||
|
d.Debounce("key1", func() {
|
||||||
|
calls.Add(1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the debounce delay plus some margin.
|
||||||
|
time.Sleep(150 * time.Millisecond)
|
||||||
|
|
||||||
|
assert.Equal(t, int32(1), calls.Load(), "rapid calls should coalesce into a single execution")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDebouncer_DifferentKeys(t *testing.T) {
|
||||||
|
d := NewDebouncer(50 * time.Millisecond)
|
||||||
|
var calls atomic.Int32
|
||||||
|
|
||||||
|
d.Debounce("key-a", func() { calls.Add(1) })
|
||||||
|
d.Debounce("key-b", func() { calls.Add(1) })
|
||||||
|
d.Debounce("key-c", func() { calls.Add(1) })
|
||||||
|
|
||||||
|
time.Sleep(150 * time.Millisecond)
|
||||||
|
|
||||||
|
assert.Equal(t, int32(3), calls.Load(), "different keys should fire independently")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDebouncer_TimerReset(t *testing.T) {
|
||||||
|
d := NewDebouncer(100 * time.Millisecond)
|
||||||
|
var value atomic.Int32
|
||||||
|
|
||||||
|
// First call sets value to 1.
|
||||||
|
d.Debounce("key", func() { value.Store(1) })
|
||||||
|
|
||||||
|
// Wait 60ms (less than the 100ms delay), then replace with value 2.
|
||||||
|
time.Sleep(60 * time.Millisecond)
|
||||||
|
d.Debounce("key", func() { value.Store(2) })
|
||||||
|
|
||||||
|
// At 60ms the first timer hasn't fired yet. Wait for the second timer.
|
||||||
|
time.Sleep(150 * time.Millisecond)
|
||||||
|
|
||||||
|
require.Equal(t, int32(2), value.Load(), "later call should replace the earlier one")
|
||||||
|
}
|
||||||
+7
-5
@@ -24,11 +24,13 @@ type Publisher interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Resolver struct {
|
type Resolver struct {
|
||||||
EventStore eventsourced.EventStore
|
EventStore eventsourced.EventStore
|
||||||
Publisher Publisher
|
Publisher Publisher
|
||||||
Logger *slog.Logger
|
Logger *slog.Logger
|
||||||
Cache *cache.Cache
|
Cache *cache.Cache
|
||||||
PubSub *PubSub
|
PubSub *PubSub
|
||||||
|
CosmoGenerator *CosmoGenerator
|
||||||
|
Debouncer *Debouncer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) {
|
func (r *Resolver) apiKeyCanAccessRef(ctx context.Context, ref string, publish bool) (string, error) {
|
||||||
|
|||||||
+21
-49
@@ -174,8 +174,9 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publish schema update to subscribers
|
// Debounce schema update publishing so rapid successive updates for the
|
||||||
go func() {
|
// same org+ref only trigger one config generation.
|
||||||
|
r.Debouncer.Debounce(orgId+":"+input.Ref, func() {
|
||||||
services, lastUpdate := r.Cache.Services(orgId, input.Ref, "")
|
services, lastUpdate := r.Cache.Services(orgId, input.Ref, "")
|
||||||
r.Logger.Info("Publishing schema update after subgraph change",
|
r.Logger.Info("Publishing schema update after subgraph change",
|
||||||
"ref", input.Ref,
|
"ref", input.Ref,
|
||||||
@@ -191,19 +192,11 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
r.Logger.Error("fetch subgraph for update notification", "error", err)
|
r.Logger.Error("fetch subgraph for update notification", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
subGraphs[i] = &model.SubGraph{
|
subGraphs[i] = r.toGqlSubGraph(sg)
|
||||||
ID: sg.ID.String(),
|
|
||||||
Service: sg.Service,
|
|
||||||
URL: sg.Url,
|
|
||||||
WsURL: sg.WSUrl,
|
|
||||||
Sdl: sg.Sdl,
|
|
||||||
ChangedBy: sg.ChangedBy,
|
|
||||||
ChangedAt: sg.ChangedAt,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate Cosmo router config
|
// Generate Cosmo router config (concurrency-limited)
|
||||||
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
cosmoConfig, err := r.CosmoGenerator.Generate(context.Background(), subGraphs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.Error("generate cosmo config for update", "error", err)
|
r.Logger.Error("generate cosmo config for update", "error", err)
|
||||||
cosmoConfig = "" // Send empty if generation fails
|
cosmoConfig = "" // Send empty if generation fails
|
||||||
@@ -225,7 +218,7 @@ func (r *mutationResolver) UpdateSubGraph(ctx context.Context, input model.Input
|
|||||||
)
|
)
|
||||||
|
|
||||||
r.PubSub.Publish(input.Ref, update)
|
r.PubSub.Publish(input.Ref, update)
|
||||||
}()
|
})
|
||||||
|
|
||||||
return r.toGqlSubGraph(subGraph), nil
|
return r.toGqlSubGraph(subGraph), nil
|
||||||
}
|
}
|
||||||
@@ -292,30 +285,16 @@ func (r *queryResolver) Supergraph(ctx context.Context, ref string, isAfter *str
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
subGraphs := make([]*model.SubGraph, len(services))
|
subGraphs := make([]*model.SubGraph, len(services))
|
||||||
|
serviceSDLs := make([]string, len(services))
|
||||||
for i, id := range services {
|
for i, id := range services {
|
||||||
sg, err := r.fetchSubGraph(ctx, id)
|
sg, err := r.fetchSubGraph(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
subGraphs[i] = &model.SubGraph{
|
subGraphs[i] = r.toGqlSubGraph(sg)
|
||||||
ID: sg.ID.String(),
|
serviceSDLs[i] = sg.Sdl
|
||||||
Service: sg.Service,
|
|
||||||
URL: sg.Url,
|
|
||||||
WsURL: sg.WSUrl,
|
|
||||||
Sdl: sg.Sdl,
|
|
||||||
ChangedBy: sg.ChangedBy,
|
|
||||||
ChangedAt: sg.ChangedAt,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var serviceSDLs []string
|
|
||||||
for _, id := range services {
|
|
||||||
sg, err := r.fetchSubGraph(ctx, id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
serviceSDLs = append(serviceSDLs, sg.Sdl)
|
|
||||||
}
|
|
||||||
sdl, err := sdlmerge.MergeSDLs(serviceSDLs...)
|
sdl, err := sdlmerge.MergeSDLs(serviceSDLs...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -388,8 +367,8 @@ func (r *queryResolver) LatestSchema(ctx context.Context, ref string) (*model.Sc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate Cosmo router config
|
// Generate Cosmo router config (concurrency-limited)
|
||||||
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.Error("generate cosmo config", "error", err)
|
r.Logger.Error("generate cosmo config", "error", err)
|
||||||
cosmoConfig = "" // Return empty if generation fails
|
cosmoConfig = "" // Return empty if generation fails
|
||||||
@@ -432,9 +411,6 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<
|
|||||||
|
|
||||||
// Send initial state immediately
|
// Send initial state immediately
|
||||||
go func() {
|
go func() {
|
||||||
// Use background context for async operation
|
|
||||||
bgCtx := context.Background()
|
|
||||||
|
|
||||||
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
services, lastUpdate := r.Cache.Services(orgId, ref, "")
|
||||||
r.Logger.Info("Preparing initial schema update",
|
r.Logger.Info("Preparing initial schema update",
|
||||||
"ref", ref,
|
"ref", ref,
|
||||||
@@ -445,24 +421,16 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<
|
|||||||
|
|
||||||
subGraphs := make([]*model.SubGraph, len(services))
|
subGraphs := make([]*model.SubGraph, len(services))
|
||||||
for i, id := range services {
|
for i, id := range services {
|
||||||
sg, err := r.fetchSubGraph(bgCtx, id)
|
sg, err := r.fetchSubGraph(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.Error("fetch subgraph for initial update", "error", err, "id", id)
|
r.Logger.Error("fetch subgraph for initial update", "error", err, "id", id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
subGraphs[i] = &model.SubGraph{
|
subGraphs[i] = r.toGqlSubGraph(sg)
|
||||||
ID: sg.ID.String(),
|
|
||||||
Service: sg.Service,
|
|
||||||
URL: sg.Url,
|
|
||||||
WsURL: sg.WSUrl,
|
|
||||||
Sdl: sg.Sdl,
|
|
||||||
ChangedBy: sg.ChangedBy,
|
|
||||||
ChangedAt: sg.ChangedAt,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate Cosmo router config
|
// Generate Cosmo router config (concurrency-limited)
|
||||||
cosmoConfig, err := GenerateCosmoRouterConfig(subGraphs)
|
cosmoConfig, err := r.CosmoGenerator.Generate(ctx, subGraphs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.Error("generate cosmo config", "error", err)
|
r.Logger.Error("generate cosmo config", "error", err)
|
||||||
cosmoConfig = "" // Send empty if generation fails
|
cosmoConfig = "" // Send empty if generation fails
|
||||||
@@ -483,7 +451,11 @@ func (r *subscriptionResolver) SchemaUpdates(ctx context.Context, ref string) (<
|
|||||||
"cosmoConfigLength", len(cosmoConfig),
|
"cosmoConfigLength", len(cosmoConfig),
|
||||||
)
|
)
|
||||||
|
|
||||||
ch <- update
|
select {
|
||||||
|
case ch <- update:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Clean up subscription when context is done
|
// Clean up subscription when context is done
|
||||||
|
|||||||
Reference in New Issue
Block a user