package aether import ( "context" "encoding/json" "fmt" "log" "sync" "github.com/google/uuid" "github.com/nats-io/nats.go" ) // NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS. // // # Server-Side Filtering // // When using SubscribeWithFilter, the NATSEventBus attempts to apply filters at // the NATS subject level where possible for efficient event delivery: // // Event type filtering: When a filter specifies exactly one event type, // NATSEventBus subscribes to a type-specific NATS subject (e.g., // "aether.events.namespace.OrderPlaced"), reducing network traffic. // // For multiple event types or actor patterns, filtering is applied client-side // after receiving events from NATS. type NATSEventBus struct { *EventBus // Embed base EventBus for local subscriptions nc *nats.Conn // NATS connection subscriptions []*nats.Subscription namespaceSubscribers map[string]int // Track number of subscribers per namespace nodeID string // Unique ID for this node mutex sync.Mutex ctx context.Context cancel context.CancelFunc } // eventMessage is the wire format for events sent over NATS type eventMessage struct { NodeID string `json:"node_id"` NamespaceID string `json:"namespace_id"` Event *Event `json:"event"` } // NewNATSEventBus creates a new NATS-backed event bus func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { ctx, cancel := context.WithCancel(context.Background()) neb := &NATSEventBus{ EventBus: NewEventBus(), nc: nc, nodeID: uuid.New().String(), subscriptions: make([]*nats.Subscription, 0), namespaceSubscribers: make(map[string]int), ctx: ctx, cancel: cancel, } return neb, nil } // Subscribe creates a local subscription and ensures NATS subscription exists for the namespace. // All events published to the namespace will be delivered. func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event { return neb.SubscribeWithFilter(namespaceID, SubscriptionFilter{}) } // SubscribeWithFilter creates a filtered subscription for a namespace. // // For single event type filters, NATS subject-based filtering is used for // efficiency (server-side filtering). For multiple event types or actor // patterns, filtering is applied client-side. func (neb *NATSEventBus) SubscribeWithFilter(namespaceID string, filter SubscriptionFilter) <-chan *Event { neb.mutex.Lock() defer neb.mutex.Unlock() // Create local subscription with filter ch := neb.EventBus.SubscribeWithFilter(namespaceID, filter) // Determine which NATS subject(s) to subscribe to // For single event type, we can use a more specific subject for server-side filtering var subjects []string if len(filter.EventTypes) == 1 { // Server-side filtering: subscribe to type-specific subject subjects = []string{fmt.Sprintf("aether.events.%s.%s", namespaceID, filter.EventTypes[0])} } else if len(filter.EventTypes) > 1 { // Subscribe to each event type's subject for server-side filtering for _, et := range filter.EventTypes { subjects = append(subjects, fmt.Sprintf("aether.events.%s.%s", namespaceID, et)) } } else { // No event type filter - subscribe to wildcard for all events in namespace subjects = []string{fmt.Sprintf("aether.events.%s.>", namespaceID)} } // Check if this is the first subscriber for this namespace count := neb.namespaceSubscribers[namespaceID] if count == 0 { // First subscriber - create NATS subscriptions for _, subject := range subjects { sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { neb.handleNATSEvent(msg) }) if err != nil { log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) } else { neb.subscriptions = append(neb.subscriptions, sub) log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) } } } neb.namespaceSubscribers[namespaceID] = count + 1 return ch } // Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers func (neb *NATSEventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { neb.mutex.Lock() defer neb.mutex.Unlock() neb.EventBus.Unsubscribe(namespaceID, ch) count := neb.namespaceSubscribers[namespaceID] if count > 0 { count-- neb.namespaceSubscribers[namespaceID] = count if count == 0 { delete(neb.namespaceSubscribers, namespaceID) log.Printf("[NATSEventBus] No more subscribers for namespace %s on node %s", namespaceID, neb.nodeID) } } } // handleNATSEvent processes events received from NATS func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) { var eventMsg eventMessage if err := json.Unmarshal(msg.Data, &eventMsg); err != nil { log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err) return } // Skip events that originated from this node (already delivered locally) if eventMsg.NodeID == neb.nodeID { return } // Forward to local EventBus subscribers (filtering happens there) neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) } // Publish publishes an event both locally and to NATS for cross-node broadcasting. // Events are published to a type-specific subject to enable server-side filtering. func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { // First publish locally neb.EventBus.Publish(namespaceID, event) // Then publish to NATS for other nodes // Use type-specific subject for server-side filtering subject := fmt.Sprintf("aether.events.%s.%s", namespaceID, event.EventType) eventMsg := eventMessage{ NodeID: neb.nodeID, NamespaceID: namespaceID, Event: event, } data, err := json.Marshal(eventMsg) if err != nil { log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err) return } if err := neb.nc.Publish(subject, data); err != nil { log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err) return } } // Stop closes the NATS event bus and all subscriptions func (neb *NATSEventBus) Stop() { neb.mutex.Lock() defer neb.mutex.Unlock() neb.cancel() for _, sub := range neb.subscriptions { if err := sub.Unsubscribe(); err != nil { log.Printf("[NATSEventBus] Error unsubscribing: %v", err) } } neb.subscriptions = nil neb.EventBus.Stop() log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) }