package cache import ( "fmt" "log/slog" "sync" "time" "github.com/sparetimecoders/goamqp" "gitlab.com/unboundsoftware/eventsourced/eventsourced" "gitea.unbound.se/unboundsoftware/schemas/domain" "gitea.unbound.se/unboundsoftware/schemas/graph/model" "gitea.unbound.se/unboundsoftware/schemas/hash" ) type Cache struct { mu sync.RWMutex organizations map[string]domain.Organization users map[string][]string apiKeys map[string]domain.APIKey // keyed by organizationId-name services map[string]map[string]map[string]struct{} subGraphs map[string]string lastUpdate map[string]string mergedSDLs map[string]*mergedSDLEntry schemaUpdates map[string]*model.SchemaUpdate logger *slog.Logger } // mergedSDLEntry holds a precomputed merged SDL together with the lastUpdate // id it was computed against, so stale entries can be detected on read. type mergedSDLEntry struct { ID string SDL string SubGraphs []*model.SubGraph } func (c *Cache) OrganizationByAPIKey(apiKey string) *domain.Organization { c.mu.RLock() defer c.mu.RUnlock() // Find the API key by comparing hashes for _, key := range c.apiKeys { if hash.CompareAPIKey(key.Key, apiKey) { org, exists := c.organizations[key.OrganizationId] if !exists { return nil } return &org } } return nil } func (c *Cache) OrganizationsByUser(sub string) []domain.Organization { c.mu.RLock() defer c.mu.RUnlock() orgIds := c.users[sub] orgs := make([]domain.Organization, len(orgIds)) for i, id := range orgIds { orgs[i] = c.organizations[id] } return orgs } func (c *Cache) AllOrganizations() []domain.Organization { c.mu.RLock() defer c.mu.RUnlock() orgs := make([]domain.Organization, 0, len(c.organizations)) for _, org := range c.organizations { orgs = append(orgs, org) } return orgs } func (c *Cache) ApiKeyByKey(key string) *domain.APIKey { c.mu.RLock() defer c.mu.RUnlock() // Find the API key by comparing hashes for _, apiKey := range c.apiKeys { if hash.CompareAPIKey(apiKey.Key, key) { return &apiKey } } return nil } func (c *Cache) Services(orgId, ref, lastUpdate string) ([]string, string) { c.mu.RLock() defer c.mu.RUnlock() key := refKey(orgId, ref) var services []string if lastUpdate == "" || c.lastUpdate[key] > lastUpdate { for k := range c.services[orgId][ref] { services = append(services, k) } } return services, c.lastUpdate[key] } func (c *Cache) SubGraphId(orgId, ref, service string) string { c.mu.RLock() defer c.mu.RUnlock() return c.subGraphs[subGraphKey(orgId, ref, service)] } func (c *Cache) Update(msg any, _ goamqp.Headers) (any, error) { c.mu.Lock() defer c.mu.Unlock() switch m := msg.(type) { case *domain.OrganizationAdded: o := domain.Organization{ BaseAggregate: eventsourced.BaseAggregateFromString(m.ID.String()), } m.UpdateOrganization(&o) c.organizations[m.ID.String()] = o c.addUser(m.Initiator, o) c.logger.With("org_id", m.ID.String(), "event", "OrganizationAdded").Debug("cache updated") case *domain.UserAddedToOrganization: org, exists := c.organizations[m.ID.String()] if exists { m.UpdateOrganization(&org) c.organizations[m.ID.String()] = org c.addUser(m.UserId, org) c.logger.With("org_id", m.ID.String(), "user_id", m.UserId, "event", "UserAddedToOrganization").Debug("cache updated") } else { c.logger.With("org_id", m.ID.String(), "event", "UserAddedToOrganization").Warn("organization not found in cache") } case *domain.APIKeyAdded: key := domain.APIKey{ Name: m.Name, OrganizationId: m.OrganizationId, Key: m.Key, // This is now the hashed key Refs: m.Refs, Read: m.Read, Publish: m.Publish, CreatedBy: m.Initiator, CreatedAt: m.When(), } // Use composite key: organizationId-name c.apiKeys[apiKeyId(m.OrganizationId, m.Name)] = key org := c.organizations[m.OrganizationId] org.APIKeys = append(org.APIKeys, key) c.organizations[m.OrganizationId] = org c.logger.With("org_id", m.OrganizationId, "key_name", m.Name, "event", "APIKeyAdded").Debug("cache updated") case *domain.APIKeyRemoved: orgId := m.ID.String() org, exists := c.organizations[orgId] if exists { // Remove from organization's API keys list for i, key := range org.APIKeys { if key.Name == m.KeyName { org.APIKeys = append(org.APIKeys[:i], org.APIKeys[i+1:]...) break } } c.organizations[orgId] = org // Remove from apiKeys map delete(c.apiKeys, apiKeyId(orgId, m.KeyName)) c.logger.With("org_id", orgId, "key_name", m.KeyName, "event", "APIKeyRemoved").Debug("cache updated") } else { c.logger.With("org_id", orgId, "event", "APIKeyRemoved").Warn("organization not found in cache") } case *domain.OrganizationRemoved: orgId := m.ID.String() org, exists := c.organizations[orgId] if exists { // Remove all API keys for this organization for _, key := range org.APIKeys { delete(c.apiKeys, apiKeyId(orgId, key.Name)) } // Remove organization from all users for userId, userOrgs := range c.users { for i, userOrgId := range userOrgs { if userOrgId == orgId { c.users[userId] = append(userOrgs[:i], userOrgs[i+1:]...) break } } // If user has no more organizations, remove from map if len(c.users[userId]) == 0 { delete(c.users, userId) } } // Remove services for this organization if refs, exists := c.services[orgId]; exists { for ref := range refs { // Remove all subgraphs for this org/ref combination for service := range refs[ref] { delete(c.subGraphs, subGraphKey(orgId, ref, service)) } // Remove cached results for this org/ref rk := refKey(orgId, ref) delete(c.lastUpdate, rk) delete(c.mergedSDLs, rk) delete(c.schemaUpdates, rk) } delete(c.services, orgId) } // Remove organization delete(c.organizations, orgId) c.logger.With("org_id", orgId, "event", "OrganizationRemoved").Debug("cache updated") } else { c.logger.With("org_id", orgId, "event", "OrganizationRemoved").Warn("organization not found in cache") } case *domain.SubGraphUpdated: c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.Time) c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraphUpdated").Debug("cache updated") case *domain.Organization: c.organizations[m.ID.String()] = *m c.addUser(m.CreatedBy, *m) for _, k := range m.APIKeys { // Use composite key: organizationId-name c.apiKeys[apiKeyId(k.OrganizationId, k.Name)] = k } c.logger.With("org_id", m.ID.String(), "event", "Organization aggregate loaded").Debug("cache updated") case *domain.SubGraph: c.updateSubGraph(m.OrganizationId, m.Ref, m.ID.String(), m.Service, m.ChangedAt) c.logger.With("org_id", m.OrganizationId, "ref", m.Ref, "service", m.Service, "event", "SubGraph aggregate loaded").Debug("cache updated") default: c.logger.With("msg", msg).Warn("unexpected message received") } return nil, nil } func (c *Cache) updateSubGraph(orgId string, ref string, subGraphId string, service string, updated time.Time) { if _, exists := c.services[orgId]; !exists { c.services[orgId] = make(map[string]map[string]struct{}) } if _, exists := c.services[orgId][ref]; !exists { c.services[orgId][ref] = make(map[string]struct{}) } c.services[orgId][ref][subGraphId] = struct{}{} c.subGraphs[subGraphKey(orgId, ref, service)] = subGraphId c.lastUpdate[refKey(orgId, ref)] = updated.Format(time.RFC3339Nano) } func (c *Cache) addUser(sub string, organization domain.Organization) { user, exists := c.users[sub] orgId := organization.ID.String() if !exists { c.users[sub] = []string{orgId} return } // Check if organization already exists for this user for _, id := range user { if id == orgId { return // Already exists, no need to add } } c.users[sub] = append(user, orgId) } func New(logger *slog.Logger) *Cache { return &Cache{ organizations: make(map[string]domain.Organization), users: make(map[string][]string), apiKeys: make(map[string]domain.APIKey), services: make(map[string]map[string]map[string]struct{}), subGraphs: make(map[string]string), lastUpdate: make(map[string]string), mergedSDLs: make(map[string]*mergedSDLEntry), schemaUpdates: make(map[string]*model.SchemaUpdate), logger: logger, } } // GetMergedSDL returns the cached merged SDL for (orgId, ref) if it was // computed against the current lastUpdate. Returns nil when missing or stale. func (c *Cache) GetMergedSDL(orgId, ref string) *mergedSDLEntry { c.mu.RLock() defer c.mu.RUnlock() key := refKey(orgId, ref) entry := c.mergedSDLs[key] if entry == nil || entry.ID != c.lastUpdate[key] { return nil } return entry } // MergedSDLEntry exposes the cached merged SDL fields to callers. func (e *mergedSDLEntry) Unpack() (id, sdl string, subGraphs []*model.SubGraph) { return e.ID, e.SDL, e.SubGraphs } // SetMergedSDL stores a precomputed merged SDL for (orgId, ref). The entry // is only retained while c.lastUpdate[key] matches id; subsequent updates // invalidate it implicitly via the version mismatch in GetMergedSDL. func (c *Cache) SetMergedSDL(orgId, ref, id, sdl string, subGraphs []*model.SubGraph) { c.mu.Lock() defer c.mu.Unlock() c.mergedSDLs[refKey(orgId, ref)] = &mergedSDLEntry{ ID: id, SDL: sdl, SubGraphs: subGraphs, } } // GetSchemaUpdate returns the cached SchemaUpdate (subgraphs + cosmo router // config) for (orgId, ref) when its id matches the current lastUpdate. // Returns nil when missing or stale. func (c *Cache) GetSchemaUpdate(orgId, ref string) *model.SchemaUpdate { c.mu.RLock() defer c.mu.RUnlock() key := refKey(orgId, ref) upd := c.schemaUpdates[key] if upd == nil || upd.ID != c.lastUpdate[key] { return nil } return upd } // SetSchemaUpdate stores a precomputed SchemaUpdate for (orgId, ref). func (c *Cache) SetSchemaUpdate(orgId, ref string, update *model.SchemaUpdate) { c.mu.Lock() defer c.mu.Unlock() c.schemaUpdates[refKey(orgId, ref)] = update } func refKey(orgId string, ref string) string { return fmt.Sprintf("%s<->%s", orgId, ref) } func subGraphKey(orgId string, ref string, service string) string { return fmt.Sprintf("%s<->%s<->%s", orgId, ref, service) } func apiKeyId(orgId string, name string) string { return fmt.Sprintf("%s<->%s", orgId, name) }