From ef73fb6bfd9619151c977d0dfbfdfc550d4456a7 Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Sat, 10 Jan 2026 23:45:57 +0100 Subject: [PATCH] Add namespace event filtering (SubscribeWithFilter) Adds support for filtering events by type or actor pattern within namespace subscriptions. Key changes: - Add SubscriptionFilter type with EventTypes and ActorPattern fields - Add SubscribeWithFilter to EventBroadcaster interface - Implement filtering in EventBus with full wildcard pattern support preserved - Implement filtering in NATSEventBus (server-side namespace, client-side filters) - Add MatchActorPattern function for actor ID pattern matching - Add comprehensive unit tests for all filtering scenarios Filter Processing: - EventTypes: Event must match at least one type in the list (OR within types) - ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards) - Multiple filters are combined with AND logic This implementation works alongside the existing wildcard subscription support: - Namespace wildcards (* and >) work with event filters - Filters are applied after namespace pattern matching - Metrics are properly recorded for filtered subscriptions Closes #21 Co-Authored-By: Claude Opus 4.5 --- eventbus.go | 116 +++++++++----- eventbus_test.go | 406 +++++++++++++++++++++++++++++++++++++++++++++++ nats_eventbus.go | 27 +++- pattern.go | 114 +++++++++++++ pattern_test.go | 125 +++++++++++++++ 5 files changed, 750 insertions(+), 38 deletions(-) diff --git a/eventbus.go b/eventbus.go index 8c43834..6d1cd1c 100644 --- a/eventbus.go +++ b/eventbus.go @@ -18,6 +18,19 @@ type EventBroadcaster interface { // Subscribe creates a channel that receives events matching the namespace pattern. // Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple. Subscribe(namespacePattern string) <-chan *Event + + // SubscribeWithFilter creates a filtered subscription channel for a namespace pattern. + // Events are filtered by the provided SubscriptionFilter before delivery. + // Filters are applied with AND logic - events must match all specified criteria. + // + // Example: Subscribe to "orders" namespace, only receiving "OrderPlaced" events for "order-*" actors: + // filter := &SubscriptionFilter{ + // EventTypes: []string{"OrderPlaced"}, + // ActorPattern: "order-*", + // } + // ch := bus.SubscribeWithFilter("orders", filter) + SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event + Unsubscribe(namespacePattern string, ch <-chan *Event) Publish(namespaceID string, event *Event) Stop() @@ -37,6 +50,13 @@ type subscription struct { ch chan *Event } +// filteredSubscription represents a subscriber with an optional filter +type filteredSubscription struct { + pattern string + ch chan *Event + filter *SubscriptionFilter +} + // EventBus broadcasts events to multiple subscribers within a namespace. // Supports wildcard patterns for cross-namespace subscriptions. // @@ -46,9 +66,9 @@ type subscription struct { // However, it bypasses namespace isolation - use with appropriate access controls. type EventBus struct { // exactSubscribers holds subscribers for exact namespace matches (no wildcards) - exactSubscribers map[string][]chan *Event + exactSubscribers map[string][]*filteredSubscription // wildcardSubscribers holds subscribers with wildcard patterns - wildcardSubscribers []subscription + wildcardSubscribers []*filteredSubscription mutex sync.RWMutex ctx context.Context cancel context.CancelFunc @@ -59,8 +79,8 @@ type EventBus struct { func NewEventBus() *EventBus { ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ - exactSubscribers: make(map[string][]chan *Event), - wildcardSubscribers: make([]subscription, 0), + exactSubscribers: make(map[string][]*filteredSubscription), + wildcardSubscribers: make([]*filteredSubscription, 0), ctx: ctx, cancel: cancel, metrics: NewMetricsCollector(), @@ -81,21 +101,39 @@ func (eb *EventBus) Metrics() BroadcasterMetrics { // Security Warning: Wildcard patterns receive events from all matching namespaces, // bypassing namespace isolation. Only use for trusted system components. func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event { + return eb.SubscribeWithFilter(namespacePattern, nil) +} + +// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern. +// Events are filtered by the provided SubscriptionFilter before delivery. +// If filter is nil or empty, all events matching the namespace pattern are delivered. +// +// Filtering is applied client-side for efficient processing: +// - EventTypes: Only events with matching event types are delivered +// - ActorPattern: Only events from matching actors are delivered +// +// Both namespace pattern wildcards and event filters work together: +// - Namespace pattern determines which namespaces to subscribe to +// - Filter determines which events within those namespaces to receive +func (eb *EventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event { eb.mutex.Lock() defer eb.mutex.Unlock() // Create buffered channel to prevent blocking publishers ch := make(chan *Event, 100) + sub := &filteredSubscription{ + pattern: namespacePattern, + ch: ch, + filter: filter, + } + if IsWildcardPattern(namespacePattern) { // Store wildcard subscription separately - eb.wildcardSubscribers = append(eb.wildcardSubscribers, subscription{ - pattern: namespacePattern, - ch: ch, - }) + eb.wildcardSubscribers = append(eb.wildcardSubscribers, sub) } else { // Exact match subscription - eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch) + eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], sub) } // Record subscription metric @@ -123,11 +161,11 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) { } else { // Remove from exact subscribers subs := eb.exactSubscribers[namespacePattern] - for i, subscriber := range subs { - if subscriber == ch { - // Remove channel from slice + for i, sub := range subs { + if sub.ch == ch { + // Remove subscription from slice eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...) - close(subscriber) + close(sub.ch) // Record unsubscription metric eb.metrics.RecordUnsubscribe(namespacePattern) break @@ -143,8 +181,8 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) { // Publish sends an event to all subscribers of a namespace. // Events are delivered to: -// - All exact subscribers for the namespace -// - All wildcard subscribers whose pattern matches the namespace +// - All exact subscribers for the namespace (after filter matching) +// - All wildcard subscribers whose pattern matches the namespace (after filter matching) func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() @@ -154,32 +192,38 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) { // Deliver to exact subscribers subscribers := eb.exactSubscribers[namespaceID] - for _, ch := range subscribers { - select { - case ch <- event: - // Event delivered - eb.metrics.RecordReceive(namespaceID) - default: - // Channel full, skip this subscriber (non-blocking) - eb.metrics.RecordDroppedEvent(namespaceID) - } + for _, sub := range subscribers { + eb.deliverToSubscriber(sub, event, namespaceID) } // Deliver to matching wildcard subscribers for _, sub := range eb.wildcardSubscribers { if MatchNamespacePattern(sub.pattern, namespaceID) { - select { - case sub.ch <- event: - // Event delivered - eb.metrics.RecordReceive(namespaceID) - default: - // Channel full, skip this subscriber (non-blocking) - eb.metrics.RecordDroppedEvent(namespaceID) - } + eb.deliverToSubscriber(sub, event, namespaceID) } } } +// deliverToSubscriber delivers an event to a subscriber if it matches the filter +func (eb *EventBus) deliverToSubscriber(sub *filteredSubscription, event *Event, namespaceID string) { + // Apply filter if present + if sub.filter != nil && !sub.filter.IsEmpty() { + if !sub.filter.Matches(event) { + // Event doesn't match filter, skip delivery + return + } + } + + select { + case sub.ch <- event: + // Event delivered + eb.metrics.RecordReceive(namespaceID) + default: + // Channel full, skip this subscriber (non-blocking) + eb.metrics.RecordDroppedEvent(namespaceID) + } +} + // Stop closes the event bus func (eb *EventBus) Stop() { eb.mutex.Lock() @@ -189,8 +233,8 @@ func (eb *EventBus) Stop() { // Close all exact subscriber channels and update metrics for namespaceID, subs := range eb.exactSubscribers { - for _, ch := range subs { - close(ch) + for _, sub := range subs { + close(sub.ch) eb.metrics.RecordUnsubscribe(namespaceID) } } @@ -201,8 +245,8 @@ func (eb *EventBus) Stop() { eb.metrics.RecordUnsubscribe(sub.pattern) } - eb.exactSubscribers = make(map[string][]chan *Event) - eb.wildcardSubscribers = make([]subscription, 0) + eb.exactSubscribers = make(map[string][]*filteredSubscription) + eb.wildcardSubscribers = make([]*filteredSubscription, 0) } // SubscriberCount returns the number of subscribers for a namespace. diff --git a/eventbus_test.go b/eventbus_test.go index 013154d..80af900 100644 --- a/eventbus_test.go +++ b/eventbus_test.go @@ -414,3 +414,409 @@ func TestEventBus_ConcurrentOperations(t *testing.T) { wg.Wait() } + +// Tests for SubscribeWithFilter functionality + +func TestEventBus_SubscribeWithFilter_EventTypes(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe with filter for specific event types + filter := &SubscriptionFilter{ + EventTypes: []string{"OrderPlaced", "OrderShipped"}, + } + ch := eb.SubscribeWithFilter("orders", filter) + + // Publish events of different types + events := []*Event{ + {ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, + {ID: "evt-2", EventType: "OrderCancelled", ActorID: "order-2"}, // Should not be received + {ID: "evt-3", EventType: "OrderShipped", ActorID: "order-3"}, + } + + for _, e := range events { + eb.Publish("orders", e) + } + + // Should receive evt-1 and evt-3, but not evt-2 + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after receiving %d events", len(received)) + } + } + + if !received["evt-1"] || !received["evt-3"] { + t.Errorf("expected to receive evt-1 and evt-3, got %v", received) + } + + // Verify evt-2 was not received + select { + case evt := <-ch: + t.Errorf("unexpected event received: %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_SubscribeWithFilter_ActorPattern(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe with filter for specific actor pattern + filter := &SubscriptionFilter{ + ActorPattern: "order-*", + } + ch := eb.SubscribeWithFilter("events", filter) + + // Publish events from different actors + events := []*Event{ + {ID: "evt-1", EventType: "Test", ActorID: "order-123"}, + {ID: "evt-2", EventType: "Test", ActorID: "user-456"}, // Should not be received + {ID: "evt-3", EventType: "Test", ActorID: "order-789"}, + } + + for _, e := range events { + eb.Publish("events", e) + } + + // Should receive evt-1 and evt-3, but not evt-2 + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after receiving %d events", len(received)) + } + } + + if !received["evt-1"] || !received["evt-3"] { + t.Errorf("expected to receive evt-1 and evt-3, got %v", received) + } + + // Verify evt-2 was not received + select { + case evt := <-ch: + t.Errorf("unexpected event received: %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_SubscribeWithFilter_Combined(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe with filter for both event type AND actor pattern + filter := &SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + ActorPattern: "order-*", + } + ch := eb.SubscribeWithFilter("orders", filter) + + // Publish events with various combinations + events := []*Event{ + {ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-123"}, // Should be received + {ID: "evt-2", EventType: "OrderPlaced", ActorID: "user-456"}, // Wrong actor + {ID: "evt-3", EventType: "OrderCancelled", ActorID: "order-789"}, // Wrong type + {ID: "evt-4", EventType: "OrderCancelled", ActorID: "user-000"}, // Wrong both + } + + for _, e := range events { + eb.Publish("orders", e) + } + + // Should only receive evt-1 + select { + case evt := <-ch: + if evt.ID != "evt-1" { + t.Errorf("expected evt-1, got %s", evt.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for event") + } + + // Verify no more events arrive + select { + case evt := <-ch: + t.Errorf("unexpected event received: %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_SubscribeWithFilter_NilFilter(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe with nil filter - should receive all events + ch := eb.SubscribeWithFilter("events", nil) + + events := []*Event{ + {ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"}, + {ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"}, + } + + for _, e := range events { + eb.Publish("events", e) + } + + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after receiving %d events", len(received)) + } + } + + if !received["evt-1"] || !received["evt-2"] { + t.Errorf("expected all events, got %v", received) + } +} + +func TestEventBus_SubscribeWithFilter_EmptyFilter(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe with empty filter - should receive all events + ch := eb.SubscribeWithFilter("events", &SubscriptionFilter{}) + + events := []*Event{ + {ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"}, + {ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"}, + } + + for _, e := range events { + eb.Publish("events", e) + } + + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after receiving %d events", len(received)) + } + } + + if !received["evt-1"] || !received["evt-2"] { + t.Errorf("expected all events, got %v", received) + } +} + +func TestEventBus_SubscribeWithFilter_WildcardNamespaceAndFilter(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Subscribe to wildcard namespace pattern with event type filter + filter := &SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + } + ch := eb.SubscribeWithFilter("prod.*", filter) + + // Publish events to different namespaces + events := []*Event{ + {ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, // prod.orders - should match + {ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"}, // prod.orders - wrong type + {ID: "evt-3", EventType: "OrderPlaced", ActorID: "order-3"}, // staging.orders - wrong namespace + } + + eb.Publish("prod.orders", events[0]) + eb.Publish("prod.orders", events[1]) + eb.Publish("staging.orders", events[2]) + + // Should only receive evt-1 + select { + case evt := <-ch: + if evt.ID != "evt-1" { + t.Errorf("expected evt-1, got %s", evt.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("timed out waiting for event") + } + + // Verify no more events arrive + select { + case evt := <-ch: + t.Errorf("unexpected event received: %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_SubscribeWithFilter_MultipleSubscribersWithDifferentFilters(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Two subscribers with different filters on same namespace + filter1 := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}} + filter2 := &SubscriptionFilter{EventTypes: []string{"OrderShipped"}} + + ch1 := eb.SubscribeWithFilter("orders", filter1) + ch2 := eb.SubscribeWithFilter("orders", filter2) + + events := []*Event{ + {ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, + {ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"}, + } + + for _, e := range events { + eb.Publish("orders", e) + } + + // ch1 should only receive evt-1 + select { + case evt := <-ch1: + if evt.ID != "evt-1" { + t.Errorf("ch1: expected evt-1, got %s", evt.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("ch1 timed out") + } + + // ch2 should only receive evt-2 + select { + case evt := <-ch2: + if evt.ID != "evt-2" { + t.Errorf("ch2: expected evt-2, got %s", evt.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("ch2 timed out") + } + + // Verify no extra events + select { + case evt := <-ch1: + t.Errorf("ch1: unexpected event %s", evt.ID) + case evt := <-ch2: + t.Errorf("ch2: unexpected event %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_SubscribeWithFilter_UnsubscribeFiltered(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}} + ch := eb.SubscribeWithFilter("orders", filter) + + // Verify subscription count + if eb.SubscriberCount("orders") != 1 { + t.Errorf("expected 1 subscriber, got %d", eb.SubscriberCount("orders")) + } + + eb.Unsubscribe("orders", ch) + + // Verify unsubscribed + if eb.SubscriberCount("orders") != 0 { + t.Errorf("expected 0 subscribers, got %d", eb.SubscriberCount("orders")) + } +} + +func TestEventBus_SubscribeWithFilter_FilteredAndUnfilteredCoexist(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // One subscriber with filter, one without + filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}} + chFiltered := eb.SubscribeWithFilter("orders", filter) + chUnfiltered := eb.Subscribe("orders") + + events := []*Event{ + {ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, + {ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"}, + } + + for _, e := range events { + eb.Publish("orders", e) + } + + // Filtered subscriber should only receive evt-1 + select { + case evt := <-chFiltered: + if evt.ID != "evt-1" { + t.Errorf("filtered: expected evt-1, got %s", evt.ID) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("filtered subscriber timed out") + } + + // Unfiltered subscriber should receive both + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + for i := 0; i < 2; i++ { + select { + case evt := <-chUnfiltered: + received[evt.ID] = true + case <-timeout: + t.Fatalf("unfiltered timed out after %d events", len(received)) + } + } + + if !received["evt-1"] || !received["evt-2"] { + t.Errorf("unfiltered expected both events, got %v", received) + } +} + +func TestEventBus_SubscribeWithFilter_WildcardGreaterWithFilter(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Use > wildcard (matches one or more tokens) with filter + filter := &SubscriptionFilter{ + ActorPattern: "order-*", + } + ch := eb.SubscribeWithFilter(">", filter) + + events := []*Event{ + {ID: "evt-1", EventType: "Test", ActorID: "order-123"}, + {ID: "evt-2", EventType: "Test", ActorID: "user-456"}, + {ID: "evt-3", EventType: "Test", ActorID: "order-789"}, + } + + // Publish to different namespaces + eb.Publish("tenant-a", events[0]) + eb.Publish("tenant-b", events[1]) + eb.Publish("prod.orders", events[2]) + + // Should receive evt-1 and evt-3, but not evt-2 + received := make(map[string]bool) + timeout := time.After(100 * time.Millisecond) + for i := 0; i < 2; i++ { + select { + case evt := <-ch: + received[evt.ID] = true + case <-timeout: + t.Fatalf("timed out after %d events", len(received)) + } + } + + if !received["evt-1"] || !received["evt-3"] { + t.Errorf("expected evt-1 and evt-3, got %v", received) + } + + // Verify no evt-2 + select { + case evt := <-ch: + t.Errorf("unexpected event: %s", evt.ID) + case <-time.After(50 * time.Millisecond): + // Expected + } +} diff --git a/nats_eventbus.go b/nats_eventbus.go index 2e65359..f0207ab 100644 --- a/nats_eventbus.go +++ b/nats_eventbus.go @@ -61,11 +61,25 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { // Security Warning: Wildcard patterns receive events from all matching namespaces, // bypassing namespace isolation. Only use for trusted system components. func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event { + return neb.SubscribeWithFilter(namespacePattern, nil) +} + +// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern. +// Events are filtered by the provided SubscriptionFilter before delivery. +// If filter is nil or empty, all events matching the namespace pattern are delivered. +// +// For NATSEventBus: +// - Namespace pattern filtering is applied at the NATS level using native wildcards +// - EventTypes and ActorPattern filters are applied client-side after receiving messages +// +// This allows efficient server-side filtering for namespaces while providing +// flexible client-side filtering for event types and actors. +func (neb *NATSEventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event { neb.mutex.Lock() defer neb.mutex.Unlock() - // Create local subscription first - ch := neb.EventBus.Subscribe(namespacePattern) + // Create local subscription first (with filter) + ch := neb.EventBus.SubscribeWithFilter(namespacePattern, filter) // Check if this is the first subscriber for this pattern count := neb.patternSubscribers[namespacePattern] @@ -141,12 +155,21 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string } // deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern +// Applies filters before delivery. func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) { neb.EventBus.mutex.RLock() defer neb.EventBus.mutex.RUnlock() for _, sub := range neb.EventBus.wildcardSubscribers { if sub.pattern == pattern { + // Apply filter if present + if sub.filter != nil && !sub.filter.IsEmpty() { + if !sub.filter.Matches(event) { + // Event doesn't match filter, skip delivery + continue + } + } + select { case sub.ch <- event: // Event delivered from NATS diff --git a/pattern.go b/pattern.go index 2430b1b..76b69a9 100644 --- a/pattern.go +++ b/pattern.go @@ -81,3 +81,117 @@ func matchTokens(patternTokens, namespaceTokens []string) bool { func IsWildcardPattern(pattern string) bool { return strings.Contains(pattern, "*") || strings.Contains(pattern, ">") } + +// SubscriptionFilter defines optional filters for event subscriptions. +// All configured filters are combined with AND logic - an event must match +// all specified criteria to be delivered to the subscriber. +// +// Filter Processing: +// - EventTypes: Event must have an EventType matching at least one in the list (OR within types) +// - ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards) +// +// Filtering is applied client-side in the EventBus. For NATSEventBus, namespace-level +// filtering uses NATS subject patterns, while EventTypes and ActorPattern filtering +// happens after message receipt. +type SubscriptionFilter struct { + // EventTypes filters events by type. Empty slice means all event types. + // If specified, only events with an EventType in this list are delivered. + // Example: []string{"OrderPlaced", "OrderShipped"} receives only those event types. + EventTypes []string + + // ActorPattern filters events by actor ID pattern. Empty string means all actors. + // Supports NATS-style wildcards: + // - "*" matches a single token (e.g., "order-*" matches "order-123", "order-456") + // - ">" matches one or more tokens (e.g., "order.>" matches "order.us.123", "order.eu.456") + // Example: "order-*" receives events only for actors starting with "order-" + ActorPattern string +} + +// IsEmpty returns true if no filters are configured. +func (f *SubscriptionFilter) IsEmpty() bool { + return len(f.EventTypes) == 0 && f.ActorPattern == "" +} + +// Matches returns true if the event matches all configured filters. +// An empty filter matches all events. +func (f *SubscriptionFilter) Matches(event *Event) bool { + if event == nil { + return false + } + + // Check event type filter + if len(f.EventTypes) > 0 { + typeMatch := false + for _, et := range f.EventTypes { + if event.EventType == et { + typeMatch = true + break + } + } + if !typeMatch { + return false + } + } + + // Check actor pattern filter + if f.ActorPattern != "" { + if !MatchActorPattern(f.ActorPattern, event.ActorID) { + return false + } + } + + return true +} + +// MatchActorPattern checks if an actor ID matches a pattern. +// Uses the same matching logic as MatchNamespacePattern for consistency. +// +// Patterns: +// - "*" matches a single token (e.g., "order-*" matches "order-123") +// - ">" matches one or more tokens (e.g., "order.>" matches "order.us.east") +// - Exact strings match exactly (e.g., "order-123" matches only "order-123") +// +// Note: For simple prefix matching without dots (e.g., "order-*" matching "order-123"), +// this uses simplified matching where "*" matches any remaining characters in a token. +func MatchActorPattern(pattern, actorID string) bool { + // Empty pattern matches nothing + if pattern == "" { + return false + } + + // Empty actor ID matches nothing except ">" + if actorID == "" { + return false + } + + // If pattern contains dots, use token-based matching (same as namespace) + if strings.Contains(pattern, ".") || strings.Contains(actorID, ".") { + return MatchNamespacePattern(pattern, actorID) + } + + // Simple matching for non-tokenized patterns + // ">" matches any non-empty actor ID + if pattern == ">" { + return true + } + + // "*" matches any single-token actor ID (no dots) + if pattern == "*" { + return true + } + + // Check for suffix wildcard (e.g., "order-*") + if strings.HasSuffix(pattern, "*") { + prefix := strings.TrimSuffix(pattern, "*") + return strings.HasPrefix(actorID, prefix) + } + + // Check for suffix multi-match (e.g., "order->") + if strings.HasSuffix(pattern, ">") { + prefix := strings.TrimSuffix(pattern, ">") + return strings.HasPrefix(actorID, prefix) + } + + // Exact match + return pattern == actorID +} diff --git a/pattern_test.go b/pattern_test.go index 70d453a..41d8cf0 100644 --- a/pattern_test.go +++ b/pattern_test.go @@ -115,3 +115,128 @@ func BenchmarkMatchNamespacePattern(b *testing.B) { }) } } + +func TestMatchActorPattern(t *testing.T) { + tests := []struct { + name string + pattern string + actorID string + expected bool + }{ + // Empty cases + {"empty pattern", "", "actor-123", false}, + {"empty actorID", "actor-*", "", false}, + {"both empty", "", "", false}, + + // Exact matches (no dots) + {"exact match", "actor-123", "actor-123", true}, + {"exact mismatch", "actor-123", "actor-456", false}, + + // Suffix wildcard with * (simple, no dots) + {"prefix with star", "order-*", "order-123", true}, + {"prefix with star 2", "order-*", "order-456-xyz", true}, + {"prefix with star mismatch", "order-*", "user-123", false}, + {"star alone", "*", "anything", true}, + + // Suffix wildcard with > (simple, no dots) + {"prefix with greater", "order->", "order-123", true}, + {"greater alone", ">", "anything", true}, + + // Dot-separated actor IDs (uses MatchNamespacePattern) + {"dotted exact match", "order.us.123", "order.us.123", true}, + {"dotted exact mismatch", "order.us.123", "order.eu.123", false}, + {"dotted star", "order.*", "order.123", true}, + {"dotted star deep", "order.*.*", "order.us.123", true}, + {"dotted greater", "order.>", "order.us.123.456", true}, + {"dotted star mismatch depth", "order.*", "order.us.123", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := MatchActorPattern(tt.pattern, tt.actorID) + if result != tt.expected { + t.Errorf("MatchActorPattern(%q, %q) = %v, want %v", + tt.pattern, tt.actorID, result, tt.expected) + } + }) + } +} + +func TestSubscriptionFilter_IsEmpty(t *testing.T) { + tests := []struct { + name string + filter *SubscriptionFilter + expected bool + }{ + {"nil fields", &SubscriptionFilter{}, true}, + {"empty slice", &SubscriptionFilter{EventTypes: []string{}}, true}, + {"has event types", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}, false}, + {"has actor pattern", &SubscriptionFilter{ActorPattern: "order-*"}, false}, + {"has both", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.filter.IsEmpty() + if result != tt.expected { + t.Errorf("SubscriptionFilter.IsEmpty() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestSubscriptionFilter_Matches(t *testing.T) { + tests := []struct { + name string + filter *SubscriptionFilter + event *Event + expected bool + }{ + // Nil event + {"nil event", &SubscriptionFilter{}, nil, false}, + + // Empty filter matches all + {"empty filter", &SubscriptionFilter{}, &Event{EventType: "Test", ActorID: "actor-1"}, true}, + + // Event type filtering + {"event type match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}, + &Event{EventType: "OrderPlaced", ActorID: "order-1"}, true}, + {"event type mismatch", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}, + &Event{EventType: "OrderShipped", ActorID: "order-1"}, false}, + {"event type multiple match first", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}}, + &Event{EventType: "OrderPlaced", ActorID: "order-1"}, true}, + {"event type multiple match second", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}}, + &Event{EventType: "OrderShipped", ActorID: "order-1"}, true}, + {"event type multiple no match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}}, + &Event{EventType: "OrderCancelled", ActorID: "order-1"}, false}, + + // Actor pattern filtering + {"actor pattern exact match", &SubscriptionFilter{ActorPattern: "order-123"}, + &Event{EventType: "Test", ActorID: "order-123"}, true}, + {"actor pattern exact mismatch", &SubscriptionFilter{ActorPattern: "order-123"}, + &Event{EventType: "Test", ActorID: "order-456"}, false}, + {"actor pattern wildcard match", &SubscriptionFilter{ActorPattern: "order-*"}, + &Event{EventType: "Test", ActorID: "order-123"}, true}, + {"actor pattern wildcard mismatch", &SubscriptionFilter{ActorPattern: "order-*"}, + &Event{EventType: "Test", ActorID: "user-123"}, false}, + + // Combined filters (AND logic) + {"combined both match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, + &Event{EventType: "OrderPlaced", ActorID: "order-123"}, true}, + {"combined event matches actor does not", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, + &Event{EventType: "OrderPlaced", ActorID: "user-123"}, false}, + {"combined actor matches event does not", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, + &Event{EventType: "OrderShipped", ActorID: "order-123"}, false}, + {"combined neither matches", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, + &Event{EventType: "OrderShipped", ActorID: "user-123"}, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.filter.Matches(tt.event) + if result != tt.expected { + t.Errorf("SubscriptionFilter.Matches() = %v, want %v", result, tt.expected) + } + }) + } +}