diff --git a/eventbus.go b/eventbus.go index 161bc1b..30dfea9 100644 --- a/eventbus.go +++ b/eventbus.go @@ -5,72 +5,131 @@ import ( "sync" ) -// EventBroadcaster defines the interface for publishing and subscribing to events +// EventBroadcaster defines the interface for publishing and subscribing to events. +// +// Subscribe accepts namespace patterns following NATS subject matching conventions: +// - Exact match: "tenant-a" matches only "tenant-a" +// - Single wildcard: "*" matches any single token, "tenant-*" matches "tenant-a", "tenant-b" +// - Multi-token wildcard: ">" matches one or more tokens (only at end of pattern) +// +// Security Warning: Wildcard subscriptions bypass namespace isolation. +// Only grant wildcard access to trusted system components. type EventBroadcaster interface { - Subscribe(namespaceID string) <-chan *Event - Unsubscribe(namespaceID string, ch <-chan *Event) + // Subscribe creates a channel that receives events matching the namespace pattern. + // Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple. + Subscribe(namespacePattern string) <-chan *Event + Unsubscribe(namespacePattern string, ch <-chan *Event) Publish(namespaceID string, event *Event) Stop() SubscriberCount(namespaceID string) int } -// EventBus broadcasts events to multiple subscribers within a namespace +// subscription represents a single subscriber channel with its pattern +type subscription struct { + pattern string + ch chan *Event +} + +// EventBus broadcasts events to multiple subscribers within a namespace. +// Supports wildcard patterns for cross-namespace subscriptions. +// +// Security Considerations: +// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces. +// This is intentional for cross-cutting concerns like logging, monitoring, and auditing. +// However, it bypasses namespace isolation - use with appropriate access controls. type EventBus struct { - subscribers map[string][]chan *Event // namespaceID -> channels - mutex sync.RWMutex - ctx context.Context - cancel context.CancelFunc + // exactSubscribers holds subscribers for exact namespace matches (no wildcards) + exactSubscribers map[string][]chan *Event + // wildcardSubscribers holds subscribers with wildcard patterns + wildcardSubscribers []subscription + mutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc } // NewEventBus creates a new event bus func NewEventBus() *EventBus { ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ - subscribers: make(map[string][]chan *Event), - ctx: ctx, - cancel: cancel, + exactSubscribers: make(map[string][]chan *Event), + wildcardSubscribers: make([]subscription, 0), + ctx: ctx, + cancel: cancel, } } -// Subscribe creates a new subscription channel for a namespace -func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { +// Subscribe creates a new subscription channel for a namespace pattern. +// Patterns follow NATS subject matching conventions: +// - "*" matches a single token (any sequence without ".") +// - ">" matches one or more tokens (only valid at the end) +// - Exact strings match exactly +// +// Security Warning: Wildcard patterns receive events from all matching namespaces, +// bypassing namespace isolation. Only use for trusted system components. +func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event { eb.mutex.Lock() defer eb.mutex.Unlock() // Create buffered channel to prevent blocking publishers ch := make(chan *Event, 100) - eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch) + + if IsWildcardPattern(namespacePattern) { + // Store wildcard subscription separately + eb.wildcardSubscribers = append(eb.wildcardSubscribers, subscription{ + pattern: namespacePattern, + ch: ch, + }) + } else { + // Exact match subscription + eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch) + } return ch } // Unsubscribe removes a subscription channel -func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { +func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) { eb.mutex.Lock() defer eb.mutex.Unlock() - subs := eb.subscribers[namespaceID] - for i, subscriber := range subs { - if subscriber == ch { - // Remove channel from slice - eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...) - close(subscriber) - break + if IsWildcardPattern(namespacePattern) { + // Remove from wildcard subscribers + for i, sub := range eb.wildcardSubscribers { + if sub.ch == ch { + eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...) + close(sub.ch) + break + } + } + } else { + // Remove from exact subscribers + subs := eb.exactSubscribers[namespacePattern] + for i, subscriber := range subs { + if subscriber == ch { + // Remove channel from slice + eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...) + close(subscriber) + break + } } - } - // Clean up empty namespace entries - if len(eb.subscribers[namespaceID]) == 0 { - delete(eb.subscribers, namespaceID) + // Clean up empty namespace entries + if len(eb.exactSubscribers[namespacePattern]) == 0 { + delete(eb.exactSubscribers, namespacePattern) + } } } -// Publish sends an event to all subscribers of a namespace +// Publish sends an event to all subscribers of a namespace. +// Events are delivered to: +// - All exact subscribers for the namespace +// - All wildcard subscribers whose pattern matches the namespace func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() - subscribers := eb.subscribers[namespaceID] + // Deliver to exact subscribers + subscribers := eb.exactSubscribers[namespaceID] for _, ch := range subscribers { select { case ch <- event: @@ -79,6 +138,18 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) { // Channel full, skip this subscriber (non-blocking) } } + + // Deliver to matching wildcard subscribers + for _, sub := range eb.wildcardSubscribers { + if MatchNamespacePattern(sub.pattern, namespaceID) { + select { + case sub.ch <- event: + // Event delivered + default: + // Channel full, skip this subscriber (non-blocking) + } + } + } } // Stop closes the event bus @@ -88,19 +159,35 @@ func (eb *EventBus) Stop() { eb.cancel() - // Close all subscriber channels - for _, subs := range eb.subscribers { + // Close all exact subscriber channels + for _, subs := range eb.exactSubscribers { for _, ch := range subs { close(ch) } } - eb.subscribers = make(map[string][]chan *Event) + // Close all wildcard subscriber channels + for _, sub := range eb.wildcardSubscribers { + close(sub.ch) + } + + eb.exactSubscribers = make(map[string][]chan *Event) + eb.wildcardSubscribers = make([]subscription, 0) } -// SubscriberCount returns the number of subscribers for a namespace +// SubscriberCount returns the number of subscribers for a namespace. +// This counts only exact match subscribers, not wildcard subscribers that may match. func (eb *EventBus) SubscriberCount(namespaceID string) int { eb.mutex.RLock() defer eb.mutex.RUnlock() - return len(eb.subscribers[namespaceID]) + return len(eb.exactSubscribers[namespaceID]) +} + +// WildcardSubscriberCount returns the number of wildcard subscribers. +// These are subscribers using "*" or ">" patterns that may receive events +// from multiple namespaces. +func (eb *EventBus) WildcardSubscriberCount() int { + eb.mutex.RLock() + defer eb.mutex.RUnlock() + return len(eb.wildcardSubscribers) } diff --git a/eventbus_test.go b/eventbus_test.go new file mode 100644 index 0000000..013154d --- /dev/null +++ b/eventbus_test.go @@ -0,0 +1,416 @@ +package aether + +import ( + "sync" + "testing" + "time" +) + +func TestEventBus_ExactSubscription(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("tenant-a") + + event := &Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + } + + eb.Publish("tenant-a", event) + + select { + case received := <-ch: + if received.ID != event.ID { + t.Errorf("expected event ID %s, got %s", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for event") + } +} + +func TestEventBus_WildcardStarSubscription(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe to all single-token namespaces + ch := eb.Subscribe("*") + + event := &Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + } + + eb.Publish("tenant-a", event) + + select { + case received := <-ch: + if received.ID != event.ID { + t.Errorf("expected event ID %s, got %s", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for event") + } +} + +func TestEventBus_WildcardGreaterSubscription(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe to all namespaces + ch := eb.Subscribe(">") + + events := []*Event{ + {ID: "evt-1", EventType: "Test1", ActorID: "actor-1"}, + {ID: "evt-2", EventType: "Test2", ActorID: "actor-2"}, + {ID: "evt-3", EventType: "Test3", ActorID: "actor-3"}, + } + + namespaces := []string{"tenant-a", "tenant-b", "prod.tenant.orders"} + + for i, ns := range namespaces { + eb.Publish(ns, events[i]) + } + + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + + for i := 0; i < len(events); i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after receiving %d of %d events", i, len(events)) + } + } + + for _, evt := range events { + if !received[evt.ID] { + t.Errorf("did not receive event %s", evt.ID) + } + } +} + +func TestEventBus_PrefixWildcard(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe to prod.* + ch := eb.Subscribe("prod.*") + + event1 := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"} + event2 := &Event{ID: "evt-2", EventType: "Test", ActorID: "actor-2"} + event3 := &Event{ID: "evt-3", EventType: "Test", ActorID: "actor-3"} + + // Should match + eb.Publish("prod.tenant", event1) + eb.Publish("prod.orders", event2) + // Should not match (different prefix) + eb.Publish("staging.tenant", event3) + + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + + // Should receive exactly 2 events + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after receiving %d events", len(received)) + } + } + + // Verify we got the right ones + if !received["evt-1"] || !received["evt-2"] { + t.Errorf("expected evt-1 and evt-2, got %v", received) + } + + // Verify no third event arrives + select { + case evt := <-ch: + t.Errorf("unexpected event received: %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected - no more events + } +} + +func TestEventBus_MultipleWildcardSubscribers(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch1 := eb.Subscribe("prod.*") + ch2 := eb.Subscribe("prod.>") + ch3 := eb.Subscribe(">") + + event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"} + + eb.Publish("prod.tenant.orders", event) + + // ch1 (prod.*) should NOT receive - doesn't match 3 tokens + select { + case <-ch1: + t.Error("prod.* should not match prod.tenant.orders") + case <-time.After(50 * time.Millisecond): + // Expected + } + + // ch2 (prod.>) should receive + select { + case received := <-ch2: + if received.ID != event.ID { + t.Errorf("expected %s, got %s", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("prod.> should match prod.tenant.orders") + } + + // ch3 (>) should receive + select { + case received := <-ch3: + if received.ID != event.ID { + t.Errorf("expected %s, got %s", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("> should match prod.tenant.orders") + } +} + +func TestEventBus_ExactAndWildcardCoexist(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + chExact := eb.Subscribe("tenant-a") + chWildcard := eb.Subscribe("*") + + event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"} + + eb.Publish("tenant-a", event) + + // Both should receive the event + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + select { + case received := <-chExact: + if received.ID != event.ID { + t.Errorf("exact: expected %s, got %s", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("exact subscriber timed out") + } + }() + + go func() { + defer wg.Done() + select { + case received := <-chWildcard: + if received.ID != event.ID { + t.Errorf("wildcard: expected %s, got %s", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("wildcard subscriber timed out") + } + }() + + wg.Wait() +} + +func TestEventBus_WildcardUnsubscribe(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("prod.*") + + // Verify it's counted + if eb.WildcardSubscriberCount() != 1 { + t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount()) + } + + eb.Unsubscribe("prod.*", ch) + + // Verify it's removed + if eb.WildcardSubscriberCount() != 0 { + t.Errorf("expected 0 wildcard subscribers, got %d", eb.WildcardSubscriberCount()) + } +} + +func TestEventBus_SubscriberCount(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Add exact subscribers + ch1 := eb.Subscribe("tenant-a") + ch2 := eb.Subscribe("tenant-a") + + if eb.SubscriberCount("tenant-a") != 2 { + t.Errorf("expected 2 exact subscribers, got %d", eb.SubscriberCount("tenant-a")) + } + + // Add wildcard subscriber - should not affect exact count + eb.Subscribe("*") + + if eb.SubscriberCount("tenant-a") != 2 { + t.Errorf("expected 2 exact subscribers after wildcard add, got %d", eb.SubscriberCount("tenant-a")) + } + if eb.WildcardSubscriberCount() != 1 { + t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount()) + } + + // Unsubscribe exact + eb.Unsubscribe("tenant-a", ch1) + if eb.SubscriberCount("tenant-a") != 1 { + t.Errorf("expected 1 exact subscriber after unsubscribe, got %d", eb.SubscriberCount("tenant-a")) + } + + eb.Unsubscribe("tenant-a", ch2) + if eb.SubscriberCount("tenant-a") != 0 { + t.Errorf("expected 0 exact subscribers after unsubscribe, got %d", eb.SubscriberCount("tenant-a")) + } +} + +func TestEventBus_StopClosesAllChannels(t *testing.T) { + eb := NewEventBus() + + chExact := eb.Subscribe("tenant-a") + chWildcard := eb.Subscribe("*") + + eb.Stop() + + // Both channels should be closed + select { + case _, ok := <-chExact: + if ok { + t.Error("expected exact channel to be closed") + } + case <-time.After(100 * time.Millisecond): + t.Error("timed out waiting for exact channel close") + } + + select { + case _, ok := <-chWildcard: + if ok { + t.Error("expected wildcard channel to be closed") + } + case <-time.After(100 * time.Millisecond): + t.Error("timed out waiting for wildcard channel close") + } +} + +func TestEventBus_NamespaceIsolation(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + chA := eb.Subscribe("tenant-a") + chB := eb.Subscribe("tenant-b") + + eventA := &Event{ID: "evt-a", EventType: "Test", ActorID: "actor-1"} + eventB := &Event{ID: "evt-b", EventType: "Test", ActorID: "actor-2"} + + eb.Publish("tenant-a", eventA) + eb.Publish("tenant-b", eventB) + + // Verify tenant-a receives only its event + select { + case received := <-chA: + if received.ID != "evt-a" { + t.Errorf("tenant-a received wrong event: %s", received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("tenant-a timed out") + } + + select { + case <-chA: + t.Error("tenant-a received extra event") + case <-time.After(50 * time.Millisecond): + // Expected + } + + // Verify tenant-b receives only its event + select { + case received := <-chB: + if received.ID != "evt-b" { + t.Errorf("tenant-b received wrong event: %s", received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("tenant-b timed out") + } + + select { + case <-chB: + t.Error("tenant-b received extra event") + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_NonBlockingPublish(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Create subscriber but don't read from channel + _ = eb.Subscribe("tenant-a") + + // Fill the channel buffer (100 events) + for i := 0; i < 150; i++ { + event := &Event{ + ID: "evt", + EventType: "Test", + ActorID: "actor-1", + } + // Should not block even when channel is full + eb.Publish("tenant-a", event) + } + + // If we got here without blocking, test passes +} + +func TestEventBus_ConcurrentOperations(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + var wg sync.WaitGroup + + // Concurrent subscriptions + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + ch := eb.Subscribe("tenant-a") + time.Sleep(10 * time.Millisecond) + eb.Unsubscribe("tenant-a", ch) + }(i) + } + + // Concurrent wildcard subscriptions + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + ch := eb.Subscribe("*") + time.Sleep(10 * time.Millisecond) + eb.Unsubscribe("*", ch) + }(i) + } + + // Concurrent publishes + for i := 0; i < 10; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + event := &Event{ + ID: "evt", + EventType: "Test", + ActorID: "actor-1", + } + eb.Publish("tenant-a", event) + }(i) + } + + wg.Wait() +} diff --git a/nats_eventbus.go b/nats_eventbus.go index a13653e..b8c3d97 100644 --- a/nats_eventbus.go +++ b/nats_eventbus.go @@ -11,12 +11,18 @@ import ( "github.com/nats-io/nats.go" ) -// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS +// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS. +// Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards. +// +// Security Considerations: +// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces. +// This bypasses namespace isolation at the NATS level. Ensure proper access controls +// are in place at the application layer before granting wildcard subscription access. type NATSEventBus struct { - *EventBus // Embed base EventBus for local subscriptions - nc *nats.Conn // NATS connection + *EventBus // Embed base EventBus for local subscriptions + nc *nats.Conn // NATS connection subscriptions []*nats.Subscription - namespaceSubscribers map[string]int // Track number of subscribers per namespace + patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) nodeID string // Unique ID for this node mutex sync.Mutex ctx context.Context @@ -35,69 +41,80 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { ctx, cancel := context.WithCancel(context.Background()) neb := &NATSEventBus{ - EventBus: NewEventBus(), - nc: nc, - nodeID: uuid.New().String(), - subscriptions: make([]*nats.Subscription, 0), - namespaceSubscribers: make(map[string]int), - ctx: ctx, - cancel: cancel, + EventBus: NewEventBus(), + nc: nc, + nodeID: uuid.New().String(), + subscriptions: make([]*nats.Subscription, 0), + patternSubscribers: make(map[string]int), + ctx: ctx, + cancel: cancel, } return neb, nil } -// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace -func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event { +// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern. +// Supports NATS subject patterns: +// - "*" matches a single token +// - ">" matches one or more tokens (only at the end) +// +// Security Warning: Wildcard patterns receive events from all matching namespaces, +// bypassing namespace isolation. Only use for trusted system components. +func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event { neb.mutex.Lock() defer neb.mutex.Unlock() // Create local subscription first - ch := neb.EventBus.Subscribe(namespaceID) + ch := neb.EventBus.Subscribe(namespacePattern) - // Check if this is the first subscriber for this namespace - count := neb.namespaceSubscribers[namespaceID] + // Check if this is the first subscriber for this pattern + count := neb.patternSubscribers[namespacePattern] if count == 0 { // First subscriber - create NATS subscription - subject := fmt.Sprintf("aether.events.%s", namespaceID) + // NATS natively supports wildcards, so we can use the pattern directly + subject := fmt.Sprintf("aether.events.%s", namespacePattern) sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { - neb.handleNATSEvent(msg) + neb.handleNATSEvent(msg, namespacePattern) }) if err != nil { log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) } else { neb.subscriptions = append(neb.subscriptions, sub) - log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) + if IsWildcardPattern(namespacePattern) { + log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject) + } else { + log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) + } } } - neb.namespaceSubscribers[namespaceID] = count + 1 + neb.patternSubscribers[namespacePattern] = count + 1 return ch } // Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers -func (neb *NATSEventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { +func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) { neb.mutex.Lock() defer neb.mutex.Unlock() - neb.EventBus.Unsubscribe(namespaceID, ch) + neb.EventBus.Unsubscribe(namespacePattern, ch) - count := neb.namespaceSubscribers[namespaceID] + count := neb.patternSubscribers[namespacePattern] if count > 0 { count-- - neb.namespaceSubscribers[namespaceID] = count + neb.patternSubscribers[namespacePattern] = count if count == 0 { - delete(neb.namespaceSubscribers, namespaceID) - log.Printf("[NATSEventBus] No more subscribers for namespace %s on node %s", namespaceID, neb.nodeID) + delete(neb.patternSubscribers, namespacePattern) + log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID) } } } // handleNATSEvent processes events received from NATS -func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) { +func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) { var eventMsg eventMessage if err := json.Unmarshal(msg.Data, &eventMsg); err != nil { log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err) @@ -109,8 +126,33 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) { return } - // Forward to local EventBus subscribers - neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) + // For wildcard subscriptions, we need to deliver to the EventBus using + // the subscribed pattern so it reaches the correct wildcard subscriber. + // For exact subscriptions, use the actual namespace. + if IsWildcardPattern(subscribedPattern) { + // Deliver using the pattern - the EventBus will route to wildcard subscribers + neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event) + } else { + // Forward to local EventBus subscribers with actual namespace + neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) + } +} + +// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern +func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) { + neb.EventBus.mutex.RLock() + defer neb.EventBus.mutex.RUnlock() + + for _, sub := range neb.EventBus.wildcardSubscribers { + if sub.pattern == pattern { + select { + case sub.ch <- event: + // Event delivered + default: + // Channel full, skip this subscriber (non-blocking) + } + } + } } // Publish publishes an event both locally and to NATS for cross-node broadcasting diff --git a/pattern.go b/pattern.go new file mode 100644 index 0000000..2430b1b --- /dev/null +++ b/pattern.go @@ -0,0 +1,83 @@ +package aether + +import "strings" + +// MatchNamespacePattern checks if a namespace matches a pattern. +// Patterns follow NATS subject matching conventions where tokens are separated by dots: +// - "*" matches exactly one token (any sequence without ".") +// - ">" matches one or more tokens (only valid at the end of a pattern) +// - Exact strings match exactly +// +// Examples: +// - "tenant-a" matches "tenant-a" (exact match) +// - "*" matches any single-token namespace like "tenant-a" or "production" +// - ">" matches any namespace with one or more tokens +// - "prod.*" matches "prod.tenant", "prod.orders" (but not "prod.tenant.orders") +// - "prod.>" matches "prod.tenant", "prod.tenant.orders", "prod.a.b.c" +// - "*.tenant.*" matches "prod.tenant.orders", "staging.tenant.events" +// +// Security Considerations: +// Wildcard subscriptions provide cross-namespace visibility. Use with caution: +// - "*" or ">" patterns receive events from ALL matching namespaces +// - This bypasses namespace isolation for the subscriber +// - Only grant wildcard subscription access to trusted system components +// - Consider auditing wildcard subscription usage +// - For multi-tenant systems, wildcard access should be restricted to admin/ops +// - Use the most specific pattern possible to minimize exposure +func MatchNamespacePattern(pattern, namespace string) bool { + // Empty pattern matches nothing + if pattern == "" { + return false + } + + // ">" matches everything when used alone + if pattern == ">" { + return namespace != "" + } + + patternTokens := strings.Split(pattern, ".") + namespaceTokens := strings.Split(namespace, ".") + + return matchTokens(patternTokens, namespaceTokens) +} + +// matchTokens recursively matches pattern tokens against namespace tokens +func matchTokens(patternTokens, namespaceTokens []string) bool { + // If pattern is exhausted, namespace must also be exhausted + if len(patternTokens) == 0 { + return len(namespaceTokens) == 0 + } + + patternToken := patternTokens[0] + + // ">" matches one or more remaining tokens (must be last pattern token) + if patternToken == ">" { + // ">" requires at least one token to match + return len(namespaceTokens) >= 1 + } + + // If namespace is exhausted but pattern has more tokens, no match + if len(namespaceTokens) == 0 { + return false + } + + namespaceToken := namespaceTokens[0] + + // "*" matches exactly one token + if patternToken == "*" { + return matchTokens(patternTokens[1:], namespaceTokens[1:]) + } + + // Exact match required + if patternToken == namespaceToken { + return matchTokens(patternTokens[1:], namespaceTokens[1:]) + } + + return false +} + +// IsWildcardPattern returns true if the pattern contains wildcards (* or >). +// Wildcard patterns can match multiple namespaces and bypass namespace isolation. +func IsWildcardPattern(pattern string) bool { + return strings.Contains(pattern, "*") || strings.Contains(pattern, ">") +} diff --git a/pattern_test.go b/pattern_test.go new file mode 100644 index 0000000..70d453a --- /dev/null +++ b/pattern_test.go @@ -0,0 +1,117 @@ +package aether + +import "testing" + +func TestMatchNamespacePattern(t *testing.T) { + tests := []struct { + name string + pattern string + namespace string + expected bool + }{ + // Exact matches + {"exact match", "tenant-a", "tenant-a", true}, + {"exact mismatch", "tenant-a", "tenant-b", false}, + {"exact match with dots", "prod.tenant.a", "prod.tenant.a", true}, + {"exact mismatch with dots", "prod.tenant.a", "prod.tenant.b", false}, + + // Empty cases + {"empty pattern", "", "tenant-a", false}, + {"empty namespace exact", "tenant-a", "", false}, + {"empty namespace catch-all", ">", "", false}, + {"both empty", "", "", false}, + + // Single wildcard (*) - matches one token (NATS semantics: tokens are dot-separated) + {"star matches any single token", "*", "tenant-a", true}, + {"star matches any single token 2", "*", "anything", true}, + {"star does not match multi-token", "*", "prod.tenant", false}, + {"prefix with star", "prod.*", "prod.tenant", true}, + {"prefix with star 2", "prod.*", "prod.orders", true}, + {"prefix with star no match extra tokens", "prod.*", "prod.tenant.orders", false}, + {"prefix with star no match wrong prefix", "prod.*", "staging.tenant", false}, + {"middle wildcard", "prod.*.orders", "prod.tenant.orders", true}, + {"middle wildcard no match", "prod.*.orders", "prod.tenant.events", false}, + {"multiple stars", "*.tenant.*", "prod.tenant.orders", true}, + {"multiple stars 2", "*.*.orders", "prod.tenant.orders", true}, + {"multiple stars no match", "*.*.orders", "prod.orders", false}, + + // Multi-token wildcard (>) - matches one or more tokens + {"greater matches one", ">", "tenant", true}, + {"greater matches multi", ">", "prod.tenant.orders", true}, + {"prefix greater", "prod.>", "prod.tenant", true}, + {"prefix greater multi", "prod.>", "prod.tenant.orders.items", true}, + {"prefix greater no match different prefix", "prod.>", "staging.tenant", false}, + {"prefix greater requires at least one", "prod.>", "prod", false}, + {"deep prefix greater", "prod.tenant.>", "prod.tenant.orders", true}, + + // Combined wildcards + {"star then greater", "*.>", "prod.tenant", true}, + {"star then greater multi", "*.>", "prod.tenant.orders", true}, + {"star then greater no match single", "*.>", "prod", false}, + + // Edge cases + {"trailing dot in pattern", "tenant.", "tenant.", true}, + {"just dots", "..", "..", true}, + {"star at end", "prod.tenant.*", "prod.tenant.a", true}, + {"star at end no match", "prod.tenant.*", "prod.other.a", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := MatchNamespacePattern(tt.pattern, tt.namespace) + if result != tt.expected { + t.Errorf("MatchNamespacePattern(%q, %q) = %v, want %v", + tt.pattern, tt.namespace, result, tt.expected) + } + }) + } +} + +func TestIsWildcardPattern(t *testing.T) { + tests := []struct { + pattern string + expected bool + }{ + {"tenant-a", false}, + {"prod.tenant.orders", false}, + {"*", true}, + {"prod.*", true}, + {"*.orders", true}, + {">", true}, + {"prod.>", true}, + {"*.>", true}, + {"prod.*.orders", true}, + } + + for _, tt := range tests { + t.Run(tt.pattern, func(t *testing.T) { + result := IsWildcardPattern(tt.pattern) + if result != tt.expected { + t.Errorf("IsWildcardPattern(%q) = %v, want %v", + tt.pattern, result, tt.expected) + } + }) + } +} + +func BenchmarkMatchNamespacePattern(b *testing.B) { + benchmarks := []struct { + name string + pattern string + namespace string + }{ + {"exact", "tenant-a", "tenant-a"}, + {"star", "*", "tenant-a"}, + {"prefix_star", "prod.*", "prod.tenant"}, + {"greater", ">", "prod.tenant.orders"}, + {"complex", "prod.*.>", "prod.tenant.orders.items"}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + MatchNamespacePattern(bm.pattern, bm.namespace) + } + }) + } +}