diff --git a/eventbus.go b/eventbus.go index 161bc1b..1ae29cf 100644 --- a/eventbus.go +++ b/eventbus.go @@ -2,21 +2,119 @@ package aether import ( "context" + "strings" "sync" ) +// SubscriptionFilter defines criteria for filtering events in a subscription. +// Multiple filters are combined with AND logic - an event must match all +// non-empty filter criteria to be delivered. +// +// # Event Type Filtering +// +// EventTypes specifies which event types to receive. If empty, all event types +// are delivered. Otherwise, only events with a matching EventType are delivered. +// +// filter := SubscriptionFilter{ +// EventTypes: []string{"OrderPlaced", "OrderShipped"}, +// } +// +// # Actor Pattern Filtering +// +// ActorPattern specifies a pattern to match against actor IDs. Patterns support +// two matching modes: +// +// Prefix matching with "*" suffix: +// +// "order-*" // matches "order-123", "order-456", etc. +// "user-*" // matches "user-abc", "user-xyz", etc. +// +// Exact matching (no wildcard): +// +// "order-123" // matches only "order-123" +// +// An empty ActorPattern matches all actor IDs. +// +// # Combining Filters +// +// When both EventTypes and ActorPattern are specified, an event must match +// both criteria (AND logic): +// +// filter := SubscriptionFilter{ +// EventTypes: []string{"OrderPlaced"}, +// ActorPattern: "order-*", +// } +// // Only delivers OrderPlaced events from actors starting with "order-" +type SubscriptionFilter struct { + // EventTypes limits delivery to events with matching EventType. + // Empty slice means all event types are accepted. + EventTypes []string + + // ActorPattern matches against event ActorID. + // Supports prefix matching with "*" suffix (e.g., "order-*"). + // Empty string matches all actor IDs. + ActorPattern string +} + +// Matches returns true if the event passes all filter criteria. +// An empty filter (no event types, no actor pattern) matches all events. +func (f SubscriptionFilter) Matches(event *Event) bool { + if event == nil { + return false + } + + // Check event type filter + if len(f.EventTypes) > 0 { + found := false + for _, et := range f.EventTypes { + if event.EventType == et { + found = true + break + } + } + if !found { + return false + } + } + + // Check actor pattern filter + if f.ActorPattern != "" { + if strings.HasSuffix(f.ActorPattern, "*") { + // Prefix matching + prefix := strings.TrimSuffix(f.ActorPattern, "*") + if !strings.HasPrefix(event.ActorID, prefix) { + return false + } + } else { + // Exact matching + if event.ActorID != f.ActorPattern { + return false + } + } + } + + return true +} + // EventBroadcaster defines the interface for publishing and subscribing to events type EventBroadcaster interface { Subscribe(namespaceID string) <-chan *Event + SubscribeWithFilter(namespaceID string, filter SubscriptionFilter) <-chan *Event Unsubscribe(namespaceID string, ch <-chan *Event) Publish(namespaceID string, event *Event) Stop() SubscriberCount(namespaceID string) int } +// filteredSubscriber holds a subscriber channel and its filter +type filteredSubscriber struct { + ch chan *Event + filter SubscriptionFilter +} + // EventBus broadcasts events to multiple subscribers within a namespace type EventBus struct { - subscribers map[string][]chan *Event // namespaceID -> channels + subscribers map[string][]filteredSubscriber // namespaceID -> filtered subscribers mutex sync.RWMutex ctx context.Context cancel context.CancelFunc @@ -26,20 +124,31 @@ type EventBus struct { func NewEventBus() *EventBus { ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ - subscribers: make(map[string][]chan *Event), + subscribers: make(map[string][]filteredSubscriber), ctx: ctx, cancel: cancel, } } -// Subscribe creates a new subscription channel for a namespace +// Subscribe creates a new subscription channel for a namespace. +// All events published to the namespace will be delivered. func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { + return eb.SubscribeWithFilter(namespaceID, SubscriptionFilter{}) +} + +// SubscribeWithFilter creates a new subscription channel for a namespace with filtering. +// Only events matching the filter criteria will be delivered. +func (eb *EventBus) SubscribeWithFilter(namespaceID string, filter SubscriptionFilter) <-chan *Event { eb.mutex.Lock() defer eb.mutex.Unlock() // Create buffered channel to prevent blocking publishers ch := make(chan *Event, 100) - eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch) + sub := filteredSubscriber{ + ch: ch, + filter: filter, + } + eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], sub) return ch } @@ -51,10 +160,10 @@ func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { subs := eb.subscribers[namespaceID] for i, subscriber := range subs { - if subscriber == ch { - // Remove channel from slice + if subscriber.ch == ch { + // Remove subscriber from slice eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...) - close(subscriber) + close(subscriber.ch) break } } @@ -65,15 +174,20 @@ func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { } } -// Publish sends an event to all subscribers of a namespace +// Publish sends an event to all subscribers of a namespace whose filters match func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() subscribers := eb.subscribers[namespaceID] - for _, ch := range subscribers { + for _, sub := range subscribers { + // Apply filter before delivering + if !sub.filter.Matches(event) { + continue + } + select { - case ch <- event: + case sub.ch <- event: // Event delivered default: // Channel full, skip this subscriber (non-blocking) @@ -90,12 +204,12 @@ func (eb *EventBus) Stop() { // Close all subscriber channels for _, subs := range eb.subscribers { - for _, ch := range subs { - close(ch) + for _, sub := range subs { + close(sub.ch) } } - eb.subscribers = make(map[string][]chan *Event) + eb.subscribers = make(map[string][]filteredSubscriber) } // SubscriberCount returns the number of subscribers for a namespace diff --git a/eventbus_test.go b/eventbus_test.go new file mode 100644 index 0000000..3fb4d88 --- /dev/null +++ b/eventbus_test.go @@ -0,0 +1,948 @@ +package aether + +import ( + "sync" + "testing" + "time" +) + +// === SubscriptionFilter Tests === + +func TestSubscriptionFilter_Matches_EmptyFilter(t *testing.T) { + filter := SubscriptionFilter{} + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "nil event", + event: nil, + want: false, + }, + { + name: "any event type matches", + event: &Event{ + ID: "evt-1", + EventType: "OrderPlaced", + ActorID: "order-123", + }, + want: true, + }, + { + name: "any actor matches", + event: &Event{ + ID: "evt-2", + EventType: "UserCreated", + ActorID: "user-abc", + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubscriptionFilter_Matches_EventTypeFilter(t *testing.T) { + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced", "OrderShipped"}, + } + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "matching first event type", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-123", + }, + want: true, + }, + { + name: "matching second event type", + event: &Event{ + EventType: "OrderShipped", + ActorID: "order-123", + }, + want: true, + }, + { + name: "non-matching event type", + event: &Event{ + EventType: "OrderCancelled", + ActorID: "order-123", + }, + want: false, + }, + { + name: "empty event type", + event: &Event{ + EventType: "", + ActorID: "order-123", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubscriptionFilter_Matches_SingleEventType(t *testing.T) { + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + } + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "matching event type", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-123", + }, + want: true, + }, + { + name: "non-matching event type", + event: &Event{ + EventType: "OrderShipped", + ActorID: "order-123", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubscriptionFilter_Matches_ActorPrefixPattern(t *testing.T) { + filter := SubscriptionFilter{ + ActorPattern: "order-*", + } + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "matching prefix", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-123", + }, + want: true, + }, + { + name: "matching prefix with long suffix", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-abc-def-ghi", + }, + want: true, + }, + { + name: "exactly prefix (no suffix)", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-", + }, + want: true, + }, + { + name: "non-matching prefix", + event: &Event{ + EventType: "UserCreated", + ActorID: "user-123", + }, + want: false, + }, + { + name: "prefix without hyphen", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order123", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubscriptionFilter_Matches_ActorExactPattern(t *testing.T) { + filter := SubscriptionFilter{ + ActorPattern: "order-123", + } + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "exact match", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-123", + }, + want: true, + }, + { + name: "longer actor ID", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-1234", + }, + want: false, + }, + { + name: "shorter actor ID", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-12", + }, + want: false, + }, + { + name: "different actor ID", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-456", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubscriptionFilter_Matches_CombinedFilters(t *testing.T) { + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced", "OrderShipped"}, + ActorPattern: "order-*", + } + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "matches both filters", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "order-123", + }, + want: true, + }, + { + name: "matches event type but not actor", + event: &Event{ + EventType: "OrderPlaced", + ActorID: "user-123", + }, + want: false, + }, + { + name: "matches actor but not event type", + event: &Event{ + EventType: "OrderCancelled", + ActorID: "order-123", + }, + want: false, + }, + { + name: "matches neither", + event: &Event{ + EventType: "UserCreated", + ActorID: "user-123", + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubscriptionFilter_Matches_WildcardOnly(t *testing.T) { + // Just "*" should match everything (prefix is empty) + filter := SubscriptionFilter{ + ActorPattern: "*", + } + + tests := []struct { + name string + event *Event + want bool + }{ + { + name: "matches any actor", + event: &Event{ + EventType: "Test", + ActorID: "anything-at-all", + }, + want: true, + }, + { + name: "matches empty actor ID", + event: &Event{ + EventType: "Test", + ActorID: "", + }, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filter.Matches(tt.event); got != tt.want { + t.Errorf("Matches() = %v, want %v", got, tt.want) + } + }) + } +} + +// === EventBus Tests === + +func TestNewEventBus(t *testing.T) { + bus := NewEventBus() + + if bus == nil { + t.Fatal("NewEventBus returned nil") + } + if bus.subscribers == nil { + t.Error("subscribers map is nil") + } +} + +func TestEventBus_Subscribe(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + ch := bus.Subscribe("test-namespace") + + if ch == nil { + t.Fatal("Subscribe returned nil channel") + } + + count := bus.SubscriberCount("test-namespace") + if count != 1 { + t.Errorf("expected 1 subscriber, got %d", count) + } +} + +func TestEventBus_SubscribeWithFilter(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + } + ch := bus.SubscribeWithFilter("test-namespace", filter) + + if ch == nil { + t.Fatal("SubscribeWithFilter returned nil channel") + } + + count := bus.SubscriberCount("test-namespace") + if count != 1 { + t.Errorf("expected 1 subscriber, got %d", count) + } +} + +func TestEventBus_Publish_NoFilter(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + ch := bus.Subscribe("test-namespace") + + event := &Event{ + ID: "evt-1", + EventType: "OrderPlaced", + ActorID: "order-123", + } + + bus.Publish("test-namespace", event) + + select { + case received := <-ch: + if received.ID != event.ID { + t.Errorf("received event ID %q, want %q", received.ID, event.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("timeout waiting for event") + } +} + +func TestEventBus_Publish_WithEventTypeFilter(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + } + ch := bus.SubscribeWithFilter("test-namespace", filter) + + // This event should be delivered + matchingEvent := &Event{ + ID: "evt-1", + EventType: "OrderPlaced", + ActorID: "order-123", + } + + // This event should NOT be delivered + nonMatchingEvent := &Event{ + ID: "evt-2", + EventType: "OrderShipped", + ActorID: "order-123", + } + + bus.Publish("test-namespace", matchingEvent) + bus.Publish("test-namespace", nonMatchingEvent) + + // Should receive matching event + select { + case received := <-ch: + if received.ID != matchingEvent.ID { + t.Errorf("received event ID %q, want %q", received.ID, matchingEvent.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("timeout waiting for matching event") + } + + // Should NOT receive non-matching event + select { + case received := <-ch: + t.Errorf("received unexpected event: %+v", received) + case <-time.After(50 * time.Millisecond): + // Expected - no event should be received + } +} + +func TestEventBus_Publish_WithActorPatternFilter(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + filter := SubscriptionFilter{ + ActorPattern: "order-*", + } + ch := bus.SubscribeWithFilter("test-namespace", filter) + + // This event should be delivered + matchingEvent := &Event{ + ID: "evt-1", + EventType: "Test", + ActorID: "order-123", + } + + // This event should NOT be delivered + nonMatchingEvent := &Event{ + ID: "evt-2", + EventType: "Test", + ActorID: "user-456", + } + + bus.Publish("test-namespace", matchingEvent) + bus.Publish("test-namespace", nonMatchingEvent) + + // Should receive matching event + select { + case received := <-ch: + if received.ID != matchingEvent.ID { + t.Errorf("received event ID %q, want %q", received.ID, matchingEvent.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("timeout waiting for matching event") + } + + // Should NOT receive non-matching event + select { + case received := <-ch: + t.Errorf("received unexpected event: %+v", received) + case <-time.After(50 * time.Millisecond): + // Expected - no event should be received + } +} + +func TestEventBus_Publish_WithCombinedFilters(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + ActorPattern: "order-*", + } + ch := bus.SubscribeWithFilter("test-namespace", filter) + + tests := []struct { + name string + event *Event + shouldMatch bool + }{ + { + name: "matches both filters", + event: &Event{ + ID: "evt-1", + EventType: "OrderPlaced", + ActorID: "order-123", + }, + shouldMatch: true, + }, + { + name: "matches event type only", + event: &Event{ + ID: "evt-2", + EventType: "OrderPlaced", + ActorID: "user-123", + }, + shouldMatch: false, + }, + { + name: "matches actor only", + event: &Event{ + ID: "evt-3", + EventType: "OrderShipped", + ActorID: "order-123", + }, + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + bus.Publish("test-namespace", tt.event) + + select { + case received := <-ch: + if !tt.shouldMatch { + t.Errorf("received unexpected event: %+v", received) + } else if received.ID != tt.event.ID { + t.Errorf("received event ID %q, want %q", received.ID, tt.event.ID) + } + case <-time.After(50 * time.Millisecond): + if tt.shouldMatch { + t.Error("timeout waiting for matching event") + } + // Expected for non-matching events + } + }) + } +} + +func TestEventBus_MultipleSubscribers_DifferentFilters(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + // Subscriber for all events + chAll := bus.Subscribe("test-namespace") + + // Subscriber for OrderPlaced only + chOrders := bus.SubscribeWithFilter("test-namespace", SubscriptionFilter{ + EventTypes: []string{"OrderPlaced"}, + }) + + // Subscriber for users only + chUsers := bus.SubscribeWithFilter("test-namespace", SubscriptionFilter{ + ActorPattern: "user-*", + }) + + orderEvent := &Event{ + ID: "evt-1", + EventType: "OrderPlaced", + ActorID: "order-123", + } + + userEvent := &Event{ + ID: "evt-2", + EventType: "UserCreated", + ActorID: "user-456", + } + + bus.Publish("test-namespace", orderEvent) + bus.Publish("test-namespace", userEvent) + + // chAll should receive both events + for i := 0; i < 2; i++ { + select { + case <-chAll: + // Expected + case <-time.After(100 * time.Millisecond): + t.Error("chAll: timeout waiting for event") + } + } + + // chOrders should receive only the order event + select { + case received := <-chOrders: + if received.ID != orderEvent.ID { + t.Errorf("chOrders: received event ID %q, want %q", received.ID, orderEvent.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("chOrders: timeout waiting for order event") + } + + // chOrders should NOT receive the user event + select { + case received := <-chOrders: + t.Errorf("chOrders: received unexpected event: %+v", received) + case <-time.After(50 * time.Millisecond): + // Expected + } + + // chUsers should receive only the user event + select { + case received := <-chUsers: + if received.ID != userEvent.ID { + t.Errorf("chUsers: received event ID %q, want %q", received.ID, userEvent.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("chUsers: timeout waiting for user event") + } + + // chUsers should NOT receive the order event + select { + case received := <-chUsers: + t.Errorf("chUsers: received unexpected event: %+v", received) + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_Unsubscribe(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + ch := bus.Subscribe("test-namespace") + + if bus.SubscriberCount("test-namespace") != 1 { + t.Errorf("expected 1 subscriber before unsubscribe") + } + + bus.Unsubscribe("test-namespace", ch) + + if bus.SubscriberCount("test-namespace") != 0 { + t.Errorf("expected 0 subscribers after unsubscribe") + } +} + +func TestEventBus_Unsubscribe_MultipleSubscribers(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + ch1 := bus.Subscribe("test-namespace") + ch2 := bus.Subscribe("test-namespace") + + if bus.SubscriberCount("test-namespace") != 2 { + t.Errorf("expected 2 subscribers, got %d", bus.SubscriberCount("test-namespace")) + } + + bus.Unsubscribe("test-namespace", ch1) + + if bus.SubscriberCount("test-namespace") != 1 { + t.Errorf("expected 1 subscriber after first unsubscribe, got %d", bus.SubscriberCount("test-namespace")) + } + + bus.Unsubscribe("test-namespace", ch2) + + if bus.SubscriberCount("test-namespace") != 0 { + t.Errorf("expected 0 subscribers after second unsubscribe, got %d", bus.SubscriberCount("test-namespace")) + } +} + +func TestEventBus_Publish_DifferentNamespaces(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + ch1 := bus.Subscribe("namespace-1") + ch2 := bus.Subscribe("namespace-2") + + event1 := &Event{ + ID: "evt-1", + EventType: "Test", + ActorID: "actor-1", + } + + event2 := &Event{ + ID: "evt-2", + EventType: "Test", + ActorID: "actor-2", + } + + bus.Publish("namespace-1", event1) + bus.Publish("namespace-2", event2) + + // ch1 should receive event1 + select { + case received := <-ch1: + if received.ID != event1.ID { + t.Errorf("ch1: received event ID %q, want %q", received.ID, event1.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("ch1: timeout waiting for event") + } + + // ch1 should NOT receive event2 + select { + case received := <-ch1: + t.Errorf("ch1: received unexpected event: %+v", received) + case <-time.After(50 * time.Millisecond): + // Expected + } + + // ch2 should receive event2 + select { + case received := <-ch2: + if received.ID != event2.ID { + t.Errorf("ch2: received event ID %q, want %q", received.ID, event2.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("ch2: timeout waiting for event") + } +} + +func TestEventBus_Stop(t *testing.T) { + bus := NewEventBus() + + ch1 := bus.Subscribe("namespace-1") + ch2 := bus.Subscribe("namespace-2") + + bus.Stop() + + // Channels should be closed + select { + case _, ok := <-ch1: + if ok { + t.Error("ch1 should be closed after Stop") + } + default: + // Channel is closed and empty, which is expected + } + + select { + case _, ok := <-ch2: + if ok { + t.Error("ch2 should be closed after Stop") + } + default: + // Channel is closed and empty, which is expected + } + + // Subscriber count should be 0 + if bus.SubscriberCount("namespace-1") != 0 { + t.Error("expected 0 subscribers after Stop") + } +} + +func TestEventBus_SubscriberCount(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + // No subscribers initially + if count := bus.SubscriberCount("test-namespace"); count != 0 { + t.Errorf("expected 0 subscribers initially, got %d", count) + } + + // Add subscribers + ch1 := bus.Subscribe("test-namespace") + if count := bus.SubscriberCount("test-namespace"); count != 1 { + t.Errorf("expected 1 subscriber, got %d", count) + } + + ch2 := bus.Subscribe("test-namespace") + if count := bus.SubscriberCount("test-namespace"); count != 2 { + t.Errorf("expected 2 subscribers, got %d", count) + } + + // Different namespace + bus.Subscribe("other-namespace") + if count := bus.SubscriberCount("test-namespace"); count != 2 { + t.Errorf("expected 2 subscribers for test-namespace, got %d", count) + } + if count := bus.SubscriberCount("other-namespace"); count != 1 { + t.Errorf("expected 1 subscriber for other-namespace, got %d", count) + } + + // Unsubscribe + bus.Unsubscribe("test-namespace", ch1) + if count := bus.SubscriberCount("test-namespace"); count != 1 { + t.Errorf("expected 1 subscriber after unsubscribe, got %d", count) + } + + bus.Unsubscribe("test-namespace", ch2) + if count := bus.SubscriberCount("test-namespace"); count != 0 { + t.Errorf("expected 0 subscribers after unsubscribe, got %d", count) + } +} + +func TestEventBus_ConcurrentPublishAndSubscribe(t *testing.T) { + bus := NewEventBus() + defer bus.Stop() + + var wg sync.WaitGroup + numGoroutines := 100 + eventsPerGoroutine := 10 + + // Start subscribers in goroutines + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + ch := bus.Subscribe("test-namespace") + + // Read a few events then unsubscribe + for j := 0; j < eventsPerGoroutine; j++ { + select { + case <-ch: + // Received event + case <-time.After(200 * time.Millisecond): + // Timeout, continue + } + } + + bus.Unsubscribe("test-namespace", ch) + }(i) + } + + // Publish events concurrently + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < eventsPerGoroutine; j++ { + event := &Event{ + ID: "evt", + EventType: "Test", + ActorID: "actor", + } + bus.Publish("test-namespace", event) + } + }(i) + } + + wg.Wait() + + // No subscribers should remain + if count := bus.SubscriberCount("test-namespace"); count != 0 { + t.Errorf("expected 0 subscribers after test, got %d", count) + } +} + +func TestEventBus_Interface(t *testing.T) { + // Verify EventBus implements EventBroadcaster interface + var _ EventBroadcaster = (*EventBus)(nil) +} + +// === Benchmark Tests === + +func BenchmarkSubscriptionFilter_Matches(b *testing.B) { + filter := SubscriptionFilter{ + EventTypes: []string{"OrderPlaced", "OrderShipped", "OrderDelivered"}, + ActorPattern: "order-*", + } + + event := &Event{ + EventType: "OrderPlaced", + ActorID: "order-123", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + filter.Matches(event) + } +} + +func BenchmarkEventBus_Publish(b *testing.B) { + bus := NewEventBus() + defer bus.Stop() + + ch := bus.Subscribe("test-namespace") + + event := &Event{ + ID: "evt-1", + EventType: "Test", + ActorID: "actor-1", + } + + // Drain the channel in a goroutine + go func() { + for range ch { + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bus.Publish("test-namespace", event) + } +} + +func BenchmarkEventBus_PublishWithFilter(b *testing.B) { + bus := NewEventBus() + defer bus.Stop() + + filter := SubscriptionFilter{ + EventTypes: []string{"Test"}, + ActorPattern: "actor-*", + } + ch := bus.SubscribeWithFilter("test-namespace", filter) + + event := &Event{ + ID: "evt-1", + EventType: "Test", + ActorID: "actor-1", + } + + // Drain the channel in a goroutine + go func() { + for range ch { + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bus.Publish("test-namespace", event) + } +} diff --git a/nats_eventbus.go b/nats_eventbus.go index a13653e..aaec9a6 100644 --- a/nats_eventbus.go +++ b/nats_eventbus.go @@ -11,7 +11,19 @@ import ( "github.com/nats-io/nats.go" ) -// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS +// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS. +// +// # Server-Side Filtering +// +// When using SubscribeWithFilter, the NATSEventBus attempts to apply filters at +// the NATS subject level where possible for efficient event delivery: +// +// Event type filtering: When a filter specifies exactly one event type, +// NATSEventBus subscribes to a type-specific NATS subject (e.g., +// "aether.events.namespace.OrderPlaced"), reducing network traffic. +// +// For multiple event types or actor patterns, filtering is applied client-side +// after receiving events from NATS. type NATSEventBus struct { *EventBus // Embed base EventBus for local subscriptions nc *nats.Conn // NATS connection @@ -47,28 +59,54 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) { return neb, nil } -// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace +// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace. +// All events published to the namespace will be delivered. func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event { + return neb.SubscribeWithFilter(namespaceID, SubscriptionFilter{}) +} + +// SubscribeWithFilter creates a filtered subscription for a namespace. +// +// For single event type filters, NATS subject-based filtering is used for +// efficiency (server-side filtering). For multiple event types or actor +// patterns, filtering is applied client-side. +func (neb *NATSEventBus) SubscribeWithFilter(namespaceID string, filter SubscriptionFilter) <-chan *Event { neb.mutex.Lock() defer neb.mutex.Unlock() - // Create local subscription first - ch := neb.EventBus.Subscribe(namespaceID) + // Create local subscription with filter + ch := neb.EventBus.SubscribeWithFilter(namespaceID, filter) + + // Determine which NATS subject(s) to subscribe to + // For single event type, we can use a more specific subject for server-side filtering + var subjects []string + if len(filter.EventTypes) == 1 { + // Server-side filtering: subscribe to type-specific subject + subjects = []string{fmt.Sprintf("aether.events.%s.%s", namespaceID, filter.EventTypes[0])} + } else if len(filter.EventTypes) > 1 { + // Subscribe to each event type's subject for server-side filtering + for _, et := range filter.EventTypes { + subjects = append(subjects, fmt.Sprintf("aether.events.%s.%s", namespaceID, et)) + } + } else { + // No event type filter - subscribe to wildcard for all events in namespace + subjects = []string{fmt.Sprintf("aether.events.%s.>", namespaceID)} + } // Check if this is the first subscriber for this namespace count := neb.namespaceSubscribers[namespaceID] if count == 0 { - // First subscriber - create NATS subscription - subject := fmt.Sprintf("aether.events.%s", namespaceID) - - sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { - neb.handleNATSEvent(msg) - }) - if err != nil { - log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) - } else { - neb.subscriptions = append(neb.subscriptions, sub) - log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) + // First subscriber - create NATS subscriptions + for _, subject := range subjects { + sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) { + neb.handleNATSEvent(msg) + }) + if err != nil { + log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) + } else { + neb.subscriptions = append(neb.subscriptions, sub) + log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) + } } } @@ -109,17 +147,19 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) { return } - // Forward to local EventBus subscribers + // Forward to local EventBus subscribers (filtering happens there) neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) } -// Publish publishes an event both locally and to NATS for cross-node broadcasting +// Publish publishes an event both locally and to NATS for cross-node broadcasting. +// Events are published to a type-specific subject to enable server-side filtering. func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { // First publish locally neb.EventBus.Publish(namespaceID, event) // Then publish to NATS for other nodes - subject := fmt.Sprintf("aether.events.%s", namespaceID) + // Use type-specific subject for server-side filtering + subject := fmt.Sprintf("aether.events.%s.%s", namespaceID, event.EventType) eventMsg := eventMessage{ NodeID: neb.nodeID,