package aether import ( "context" "sync" ) // EventBroadcaster defines the interface for publishing and subscribing to events type EventBroadcaster interface { Subscribe(namespaceID string) <-chan *Event Unsubscribe(namespaceID string, ch <-chan *Event) Publish(namespaceID string, event *Event) Stop() SubscriberCount(namespaceID string) int } // EventBus broadcasts events to multiple subscribers within a namespace type EventBus struct { subscribers map[string][]chan *Event // namespaceID -> channels mutex sync.RWMutex ctx context.Context cancel context.CancelFunc } // NewEventBus creates a new event bus func NewEventBus() *EventBus { ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ subscribers: make(map[string][]chan *Event), ctx: ctx, cancel: cancel, } } // Subscribe creates a new subscription channel for a namespace func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { eb.mutex.Lock() defer eb.mutex.Unlock() // Create buffered channel to prevent blocking publishers ch := make(chan *Event, 100) eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch) return ch } // Unsubscribe removes a subscription channel func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { eb.mutex.Lock() defer eb.mutex.Unlock() subs := eb.subscribers[namespaceID] for i, subscriber := range subs { if subscriber == ch { // Remove channel from slice eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...) close(subscriber) break } } // Clean up empty namespace entries if len(eb.subscribers[namespaceID]) == 0 { delete(eb.subscribers, namespaceID) } } // Publish sends an event to all subscribers of a namespace func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() subscribers := eb.subscribers[namespaceID] for _, ch := range subscribers { select { case ch <- event: // Event delivered default: // Channel full, skip this subscriber (non-blocking) } } } // Stop closes the event bus func (eb *EventBus) Stop() { eb.mutex.Lock() defer eb.mutex.Unlock() eb.cancel() // Close all subscriber channels for _, subs := range eb.subscribers { for _, ch := range subs { close(ch) } } eb.subscribers = make(map[string][]chan *Event) } // SubscriberCount returns the number of subscribers for a namespace func (eb *EventBus) SubscriberCount(namespaceID string) int { eb.mutex.RLock() defer eb.mutex.RUnlock() return len(eb.subscribers[namespaceID]) }