package aether import ( "context" "sync" ) // EventBroadcaster defines the interface for publishing and subscribing to events. // // Subscribe accepts namespace patterns following NATS subject matching conventions: // - Exact match: "tenant-a" matches only "tenant-a" // - Single wildcard: "*" matches any single token, "tenant-*" matches "tenant-a", "tenant-b" // - Multi-token wildcard: ">" matches one or more tokens (only at end of pattern) // // Security Warning: Wildcard subscriptions bypass namespace isolation. // Only grant wildcard access to trusted system components. type EventBroadcaster interface { // Subscribe creates a channel that receives events matching the namespace pattern. // Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple. Subscribe(namespacePattern string) <-chan *Event // SubscribeWithFilter creates a filtered subscription channel for a namespace pattern. // Events are filtered by the provided SubscriptionFilter before delivery. // Filters are applied with AND logic - events must match all specified criteria. // // Example: Subscribe to "orders" namespace, only receiving "OrderPlaced" events for "order-*" actors: // filter := &SubscriptionFilter{ // EventTypes: []string{"OrderPlaced"}, // ActorPattern: "order-*", // } // ch := bus.SubscribeWithFilter("orders", filter) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event Unsubscribe(namespacePattern string, ch <-chan *Event) Publish(namespaceID string, event *Event) Stop() SubscriberCount(namespaceID string) int } // MetricsProvider is an optional interface that EventBroadcaster implementations // can implement to expose metrics. type MetricsProvider interface { // Metrics returns the metrics collector for this broadcaster. Metrics() BroadcasterMetrics } // subscription represents a single subscriber channel with its pattern type subscription struct { pattern string ch chan *Event } // filteredSubscription represents a subscriber with an optional filter type filteredSubscription struct { pattern string ch chan *Event filter *SubscriptionFilter } // EventBus broadcasts events to multiple subscribers within a namespace. // Supports wildcard patterns for cross-namespace subscriptions. // // Security Considerations: // Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces. // This is intentional for cross-cutting concerns like logging, monitoring, and auditing. // However, it bypasses namespace isolation - use with appropriate access controls. type EventBus struct { // exactSubscribers holds subscribers for exact namespace matches (no wildcards) exactSubscribers map[string][]*filteredSubscription // wildcardSubscribers holds subscribers with wildcard patterns wildcardSubscribers []*filteredSubscription mutex sync.RWMutex ctx context.Context cancel context.CancelFunc metrics *DefaultMetricsCollector } // NewEventBus creates a new event bus func NewEventBus() *EventBus { ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ exactSubscribers: make(map[string][]*filteredSubscription), wildcardSubscribers: make([]*filteredSubscription, 0), ctx: ctx, cancel: cancel, metrics: NewMetricsCollector(), } } // Metrics returns the metrics collector for this event bus. func (eb *EventBus) Metrics() BroadcasterMetrics { return eb.metrics } // Subscribe creates a new subscription channel for a namespace pattern. // Patterns follow NATS subject matching conventions: // - "*" matches a single token (any sequence without ".") // - ">" matches one or more tokens (only valid at the end) // - Exact strings match exactly // // Security Warning: Wildcard patterns receive events from all matching namespaces, // bypassing namespace isolation. Only use for trusted system components. func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event { return eb.SubscribeWithFilter(namespacePattern, nil) } // SubscribeWithFilter creates a filtered subscription channel for a namespace pattern. // Events are filtered by the provided SubscriptionFilter before delivery. // If filter is nil or empty, all events matching the namespace pattern are delivered. // // Filtering is applied client-side for efficient processing: // - EventTypes: Only events with matching event types are delivered // - ActorPattern: Only events from matching actors are delivered // // Both namespace pattern wildcards and event filters work together: // - Namespace pattern determines which namespaces to subscribe to // - Filter determines which events within those namespaces to receive func (eb *EventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event { eb.mutex.Lock() defer eb.mutex.Unlock() // Create buffered channel to prevent blocking publishers ch := make(chan *Event, 100) sub := &filteredSubscription{ pattern: namespacePattern, ch: ch, filter: filter, } if IsWildcardPattern(namespacePattern) { // Store wildcard subscription separately eb.wildcardSubscribers = append(eb.wildcardSubscribers, sub) } else { // Exact match subscription eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], sub) } // Record subscription metric eb.metrics.RecordSubscribe(namespacePattern) return ch } // Unsubscribe removes a subscription channel func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) { eb.mutex.Lock() defer eb.mutex.Unlock() if IsWildcardPattern(namespacePattern) { // Remove from wildcard subscribers for i, sub := range eb.wildcardSubscribers { if sub.ch == ch { eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...) close(sub.ch) // Record unsubscription metric eb.metrics.RecordUnsubscribe(namespacePattern) break } } } else { // Remove from exact subscribers subs := eb.exactSubscribers[namespacePattern] for i, sub := range subs { if sub.ch == ch { // Remove subscription from slice eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...) close(sub.ch) // Record unsubscription metric eb.metrics.RecordUnsubscribe(namespacePattern) break } } // Clean up empty namespace entries if len(eb.exactSubscribers[namespacePattern]) == 0 { delete(eb.exactSubscribers, namespacePattern) } } } // Publish sends an event to all subscribers of a namespace. // Events are delivered to: // - All exact subscribers for the namespace (after filter matching) // - All wildcard subscribers whose pattern matches the namespace (after filter matching) func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() // Record publish metric eb.metrics.RecordPublish(namespaceID) // Deliver to exact subscribers subscribers := eb.exactSubscribers[namespaceID] for _, sub := range subscribers { eb.deliverToSubscriber(sub, event, namespaceID) } // Deliver to matching wildcard subscribers for _, sub := range eb.wildcardSubscribers { if MatchNamespacePattern(sub.pattern, namespaceID) { eb.deliverToSubscriber(sub, event, namespaceID) } } } // deliverToSubscriber delivers an event to a subscriber if it matches the filter func (eb *EventBus) deliverToSubscriber(sub *filteredSubscription, event *Event, namespaceID string) { // Apply filter if present if sub.filter != nil && !sub.filter.IsEmpty() { if !sub.filter.Matches(event) { // Event doesn't match filter, skip delivery return } } select { case sub.ch <- event: // Event delivered eb.metrics.RecordReceive(namespaceID) default: // Channel full, skip this subscriber (non-blocking) eb.metrics.RecordDroppedEvent(namespaceID) } } // Stop closes the event bus func (eb *EventBus) Stop() { eb.mutex.Lock() defer eb.mutex.Unlock() eb.cancel() // Close all exact subscriber channels and update metrics for namespaceID, subs := range eb.exactSubscribers { for _, sub := range subs { close(sub.ch) eb.metrics.RecordUnsubscribe(namespaceID) } } // Close all wildcard subscriber channels and update metrics for _, sub := range eb.wildcardSubscribers { close(sub.ch) eb.metrics.RecordUnsubscribe(sub.pattern) } eb.exactSubscribers = make(map[string][]*filteredSubscription) eb.wildcardSubscribers = make([]*filteredSubscription, 0) } // SubscriberCount returns the number of subscribers for a namespace. // This counts only exact match subscribers, not wildcard subscribers that may match. func (eb *EventBus) SubscriberCount(namespaceID string) int { eb.mutex.RLock() defer eb.mutex.RUnlock() return len(eb.exactSubscribers[namespaceID]) } // WildcardSubscriberCount returns the number of wildcard subscribers. // These are subscribers using "*" or ">" patterns that may receive events // from multiple namespaces. func (eb *EventBus) WildcardSubscriberCount() int { eb.mutex.RLock() defer eb.mutex.RUnlock() return len(eb.wildcardSubscribers) }