package aether import ( "context" "sync" ) // EventBroadcaster defines the interface for publishing and subscribing to events type EventBroadcaster interface { Subscribe(namespaceID string) <-chan *Event Unsubscribe(namespaceID string, ch <-chan *Event) Publish(namespaceID string, event *Event) Stop() SubscriberCount(namespaceID string) int } // MetricsProvider is an optional interface that EventBroadcaster implementations // can implement to expose metrics. type MetricsProvider interface { // Metrics returns the metrics collector for this broadcaster. Metrics() BroadcasterMetrics } // EventBus broadcasts events to multiple subscribers within a namespace type EventBus struct { subscribers map[string][]chan *Event // namespaceID -> channels mutex sync.RWMutex ctx context.Context cancel context.CancelFunc metrics *DefaultMetricsCollector } // NewEventBus creates a new event bus func NewEventBus() *EventBus { ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ subscribers: make(map[string][]chan *Event), ctx: ctx, cancel: cancel, metrics: NewMetricsCollector(), } } // Metrics returns the metrics collector for this event bus. func (eb *EventBus) Metrics() BroadcasterMetrics { return eb.metrics } // Subscribe creates a new subscription channel for a namespace func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { eb.mutex.Lock() defer eb.mutex.Unlock() // Create buffered channel to prevent blocking publishers ch := make(chan *Event, 100) eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch) // Record subscription metric eb.metrics.RecordSubscribe(namespaceID) return ch } // Unsubscribe removes a subscription channel func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { eb.mutex.Lock() defer eb.mutex.Unlock() subs := eb.subscribers[namespaceID] for i, subscriber := range subs { if subscriber == ch { // Remove channel from slice eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...) close(subscriber) // Record unsubscription metric eb.metrics.RecordUnsubscribe(namespaceID) break } } // Clean up empty namespace entries if len(eb.subscribers[namespaceID]) == 0 { delete(eb.subscribers, namespaceID) } } // Publish sends an event to all subscribers of a namespace func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() // Record publish metric eb.metrics.RecordPublish(namespaceID) subscribers := eb.subscribers[namespaceID] for _, ch := range subscribers { select { case ch <- event: // Event delivered - record receive metric eb.metrics.RecordReceive(namespaceID) default: // Channel full, skip this subscriber (non-blocking) eb.metrics.RecordDroppedEvent(namespaceID) } } } // Stop closes the event bus func (eb *EventBus) Stop() { eb.mutex.Lock() defer eb.mutex.Unlock() eb.cancel() // Close all subscriber channels and update metrics for namespaceID, subs := range eb.subscribers { for range subs { eb.metrics.RecordUnsubscribe(namespaceID) } for _, ch := range subs { close(ch) } } eb.subscribers = make(map[string][]chan *Event) } // SubscriberCount returns the number of subscribers for a namespace func (eb *EventBus) SubscriberCount(namespaceID string) int { eb.mutex.RLock() defer eb.mutex.RUnlock() return len(eb.subscribers[namespaceID]) }