package aether import ( "context" "encoding/json" "fmt" "log" "strings" "sync" "github.com/google/uuid" "github.com/nats-io/nats.go" ) // NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS. // Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards. // // Security Considerations: // Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces. // This bypasses namespace isolation at the NATS level. Ensure proper access controls // are in place at the application layer before granting wildcard subscription access. type NATSEventBus struct { *EventBus // Embed base EventBus for local subscriptions nc *nats.Conn // NATS connection subscriptions []*nats.Subscription patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) nodeID string // Unique ID for this node streamPrefix string // NATS subject prefix for events eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore) mutex sync.Mutex ctx context.Context cancel context.CancelFunc } // eventMessage is the wire format for events sent over NATS type eventMessage struct { NodeID string `json:"node_id"` NamespaceID string `json:"namespace_id"` Event *Event `json:"event"` } // NewNATSEventBus creates a new NATS-backed event bus func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { ctx, cancel := context.WithCancel(context.Background()) neb := &NATSEventBus{ EventBus: NewEventBus(), nc: nc, nodeID: uuid.New().String(), subscriptions: make([]*nats.Subscription, 0), patternSubscribers: make(map[string]int), streamPrefix: "aether", ctx: ctx, cancel: cancel, } return neb, nil } // NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration. // The event store is used to automatically update version cache when EventStored events are received // from other cluster nodes via NATS. This ensures cross-node version consistency. // // Example: // // eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc") // ch := eventBus.SubscribeToEventStored("tenant-*") // for event := range ch { // actorID := event.Data["actorId"].(string) // version := event.Data["version"].(int64) // store.UpdateVersionCache(actorID, version) // } // // The namespace parameter is used as a prefix for EventStored event filtering. // If empty, EventStored events from all namespaces will be received (requires wildcard pattern). func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus { streamPrefix := "aether" if namespace != "" { streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace)) } neb := &NATSEventBus{ EventBus: NewEventBus(), nc: nc, nodeID: uuid.New().String(), subscriptions: make([]*nats.Subscription, 0), patternSubscribers: make(map[string]int), streamPrefix: streamPrefix, eventStore: store, ctx: context.Background(), cancel: func() {}, } return neb } // Subscribe creates a local subscription and ensures NATS subscription exists for the pattern. // Supports NATS subject patterns: // - "*" matches a single token // - ">" matches one or more tokens (only at the end) // // Security Warning: Wildcard patterns receive events from all matching namespaces, // bypassing namespace isolation. Only use for trusted system components. func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event { return neb.SubscribeWithFilter(namespacePattern, nil) } // SubscribeWithFilter creates a filtered subscription channel for a namespace pattern. // Events are filtered by the provided SubscriptionFilter before delivery. // If filter is nil or empty, all events matching the namespace pattern are delivered. // // For NATSEventBus: // - Namespace pattern filtering is applied at the NATS level using native wildcards // - EventTypes and ActorPattern filters are applied client-side after receiving messages // // This allows efficient server-side filtering for namespaces while providing // flexible client-side filtering for event types and actors. func (neb *NATSEventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event { neb.mutex.Lock() defer neb.mutex.Unlock() // Create local subscription first (with filter) ch := neb.EventBus.SubscribeWithFilter(namespacePattern, filter) // Check if this is the first subscriber for this pattern count := neb.patternSubscribers[namespacePattern] if count == 0 { // First subscriber - create NATS subscription // NATS natively supports wildcards, so we can use the pattern directly subject := fmt.Sprintf("aether.events.%s", namespacePattern) sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { neb.handleNATSEvent(msg, namespacePattern) }) if err != nil { log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) // Record subscription error neb.metrics.RecordSubscribeError(namespacePattern) } else { neb.subscriptions = append(neb.subscriptions, sub) if IsWildcardPattern(namespacePattern) { log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject) } else { log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) } } } neb.patternSubscribers[namespacePattern] = count + 1 return ch } // Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) { neb.mutex.Lock() defer neb.mutex.Unlock() neb.EventBus.Unsubscribe(namespacePattern, ch) count := neb.patternSubscribers[namespacePattern] if count > 0 { count-- neb.patternSubscribers[namespacePattern] = count if count == 0 { delete(neb.patternSubscribers, namespacePattern) log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID) } } } // handleNATSEvent processes events received from NATS func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) { var eventMsg eventMessage if err := json.Unmarshal(msg.Data, &eventMsg); err != nil { log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err) return } // Skip events that originated from this node (already delivered locally) if eventMsg.NodeID == neb.nodeID { return } // For wildcard subscriptions, we need to deliver to the EventBus using // the subscribed pattern so it reaches the correct wildcard subscriber. // For exact subscriptions, use the actual namespace. if IsWildcardPattern(subscribedPattern) { // Deliver using the pattern - the EventBus will route to wildcard subscribers neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event) } else { // Forward to local EventBus subscribers with actual namespace neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) } } // deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern // Applies filters before delivery. func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) { neb.EventBus.mutex.RLock() defer neb.EventBus.mutex.RUnlock() for _, sub := range neb.EventBus.wildcardSubscribers { if sub.pattern == pattern { // Apply filter if present if sub.filter != nil && !sub.filter.IsEmpty() { if !sub.filter.Matches(event) { // Event doesn't match filter, skip delivery continue } } select { case sub.ch <- event: // Event delivered from NATS neb.metrics.RecordReceive(pattern) default: // Channel full, skip this subscriber (non-blocking) neb.metrics.RecordDroppedEvent(pattern) } } } } // Publish publishes an event both locally and to NATS for cross-node broadcasting func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { // First publish locally neb.EventBus.Publish(namespaceID, event) // Then publish to NATS for other nodes subject := fmt.Sprintf("aether.events.%s", namespaceID) eventMsg := eventMessage{ NodeID: neb.nodeID, NamespaceID: namespaceID, Event: event, } data, err := json.Marshal(eventMsg) if err != nil { log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err) neb.metrics.RecordPublishError(namespaceID) return } if err := neb.nc.Publish(subject, data); err != nil { log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err) neb.metrics.RecordPublishError(namespaceID) return } } // Stop closes the NATS event bus and all subscriptions func (neb *NATSEventBus) Stop() { neb.mutex.Lock() defer neb.mutex.Unlock() neb.cancel() for _, sub := range neb.subscriptions { if err := sub.Unsubscribe(); err != nil { log.Printf("[NATSEventBus] Error unsubscribing: %v", err) } } neb.subscriptions = nil neb.EventBus.Stop() log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) } // sanitizeSubject sanitizes a string for use in NATS subjects func sanitizeSubject(s string) string { s = strings.ReplaceAll(s, " ", "_") s = strings.ReplaceAll(s, ".", "_") s = strings.ReplaceAll(s, "*", "_") s = strings.ReplaceAll(s, ">", "_") return s } // extractActorType extracts the actor type from an actor ID func extractActorType(actorID string) string { for i, c := range actorID { if c == '-' && i > 0 { return actorID[:i] } } return "unknown" } // SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern. // EventStored events are published by JetStreamEventStore when events are successfully saved. // This is useful for cross-node event synchronization and version cache consistency. // // The returned channel receives EventStored events matching the pattern. // The EventStored event schema: // - EventType: "EventStored" // - ActorID: ID of the actor that the original event was about // - Version: version of the stored event // - Data: // - eventId: (string) ID of the stored event // - actorId: (string) ID of the actor // - version: (int64) version of the event // - timestamp: (int64) Unix timestamp of when the event was stored // // The namespacePattern supports NATS wildcards: // - "*" matches a single token // - ">" matches one or more tokens (only at the end) // // Example: // // ch := eventBus.SubscribeToEventStored("tenant-*") // for event := range ch { // if event.EventType != aether.EventTypeEventStored { // continue // } // actorID := event.Data["actorId"].(string) // version, _ := event.Data["version"].(int64) // store.UpdateVersionCache(actorID, version) // } // // Security Warning: Using wildcard patterns like ">" will receive EventStored events // from all namespaces. Ensure your application handles this appropriately. func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event { neb.mutex.Lock() defer neb.mutex.Unlock() subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>") ch := make(chan *Event, 100) sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { var eventMsg eventMessage if err := json.Unmarshal(msg.Data, &eventMsg); err != nil { log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err) return } if eventMsg.NodeID == neb.nodeID { return } if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil { actorID, ok := eventMsg.Event.Data["actorId"].(string) if !ok { return } version, ok := eventMsg.Event.Data["version"].(int64) if !ok { return } // Use type assertion to call UpdateVersionCache if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok { es.UpdateVersionCache(actorID, version) } } neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) }) if err != nil { log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err) close(ch) return ch } neb.subscriptions = append(neb.subscriptions, sub) return ch }