Add namespace event filtering (SubscribeWithFilter)
Adds support for filtering events by type or actor pattern within namespace subscriptions. Key changes: - Add SubscriptionFilter type with EventTypes and ActorPattern fields - Add SubscribeWithFilter to EventBroadcaster interface - Implement filtering in EventBus with full wildcard pattern support preserved - Implement filtering in NATSEventBus (server-side namespace, client-side filters) - Add MatchActorPattern function for actor ID pattern matching - Add comprehensive unit tests for all filtering scenarios Filter Processing: - EventTypes: Event must match at least one type in the list (OR within types) - ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards) - Multiple filters are combined with AND logic This implementation works alongside the existing wildcard subscription support: - Namespace wildcards (* and >) work with event filters - Filters are applied after namespace pattern matching - Metrics are properly recorded for filtered subscriptions Closes #21 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit was merged in pull request #54.
This commit is contained in:
104
eventbus.go
104
eventbus.go
@@ -18,6 +18,19 @@ type EventBroadcaster interface {
|
||||
// Subscribe creates a channel that receives events matching the namespace pattern.
|
||||
// Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple.
|
||||
Subscribe(namespacePattern string) <-chan *Event
|
||||
|
||||
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
|
||||
// Events are filtered by the provided SubscriptionFilter before delivery.
|
||||
// Filters are applied with AND logic - events must match all specified criteria.
|
||||
//
|
||||
// Example: Subscribe to "orders" namespace, only receiving "OrderPlaced" events for "order-*" actors:
|
||||
// filter := &SubscriptionFilter{
|
||||
// EventTypes: []string{"OrderPlaced"},
|
||||
// ActorPattern: "order-*",
|
||||
// }
|
||||
// ch := bus.SubscribeWithFilter("orders", filter)
|
||||
SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event
|
||||
|
||||
Unsubscribe(namespacePattern string, ch <-chan *Event)
|
||||
Publish(namespaceID string, event *Event)
|
||||
Stop()
|
||||
@@ -37,6 +50,13 @@ type subscription struct {
|
||||
ch chan *Event
|
||||
}
|
||||
|
||||
// filteredSubscription represents a subscriber with an optional filter
|
||||
type filteredSubscription struct {
|
||||
pattern string
|
||||
ch chan *Event
|
||||
filter *SubscriptionFilter
|
||||
}
|
||||
|
||||
// EventBus broadcasts events to multiple subscribers within a namespace.
|
||||
// Supports wildcard patterns for cross-namespace subscriptions.
|
||||
//
|
||||
@@ -46,9 +66,9 @@ type subscription struct {
|
||||
// However, it bypasses namespace isolation - use with appropriate access controls.
|
||||
type EventBus struct {
|
||||
// exactSubscribers holds subscribers for exact namespace matches (no wildcards)
|
||||
exactSubscribers map[string][]chan *Event
|
||||
exactSubscribers map[string][]*filteredSubscription
|
||||
// wildcardSubscribers holds subscribers with wildcard patterns
|
||||
wildcardSubscribers []subscription
|
||||
wildcardSubscribers []*filteredSubscription
|
||||
mutex sync.RWMutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -59,8 +79,8 @@ type EventBus struct {
|
||||
func NewEventBus() *EventBus {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &EventBus{
|
||||
exactSubscribers: make(map[string][]chan *Event),
|
||||
wildcardSubscribers: make([]subscription, 0),
|
||||
exactSubscribers: make(map[string][]*filteredSubscription),
|
||||
wildcardSubscribers: make([]*filteredSubscription, 0),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
metrics: NewMetricsCollector(),
|
||||
@@ -81,21 +101,39 @@ func (eb *EventBus) Metrics() BroadcasterMetrics {
|
||||
// Security Warning: Wildcard patterns receive events from all matching namespaces,
|
||||
// bypassing namespace isolation. Only use for trusted system components.
|
||||
func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event {
|
||||
return eb.SubscribeWithFilter(namespacePattern, nil)
|
||||
}
|
||||
|
||||
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
|
||||
// Events are filtered by the provided SubscriptionFilter before delivery.
|
||||
// If filter is nil or empty, all events matching the namespace pattern are delivered.
|
||||
//
|
||||
// Filtering is applied client-side for efficient processing:
|
||||
// - EventTypes: Only events with matching event types are delivered
|
||||
// - ActorPattern: Only events from matching actors are delivered
|
||||
//
|
||||
// Both namespace pattern wildcards and event filters work together:
|
||||
// - Namespace pattern determines which namespaces to subscribe to
|
||||
// - Filter determines which events within those namespaces to receive
|
||||
func (eb *EventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
|
||||
eb.mutex.Lock()
|
||||
defer eb.mutex.Unlock()
|
||||
|
||||
// Create buffered channel to prevent blocking publishers
|
||||
ch := make(chan *Event, 100)
|
||||
|
||||
if IsWildcardPattern(namespacePattern) {
|
||||
// Store wildcard subscription separately
|
||||
eb.wildcardSubscribers = append(eb.wildcardSubscribers, subscription{
|
||||
sub := &filteredSubscription{
|
||||
pattern: namespacePattern,
|
||||
ch: ch,
|
||||
})
|
||||
filter: filter,
|
||||
}
|
||||
|
||||
if IsWildcardPattern(namespacePattern) {
|
||||
// Store wildcard subscription separately
|
||||
eb.wildcardSubscribers = append(eb.wildcardSubscribers, sub)
|
||||
} else {
|
||||
// Exact match subscription
|
||||
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch)
|
||||
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], sub)
|
||||
}
|
||||
|
||||
// Record subscription metric
|
||||
@@ -123,11 +161,11 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
||||
} else {
|
||||
// Remove from exact subscribers
|
||||
subs := eb.exactSubscribers[namespacePattern]
|
||||
for i, subscriber := range subs {
|
||||
if subscriber == ch {
|
||||
// Remove channel from slice
|
||||
for i, sub := range subs {
|
||||
if sub.ch == ch {
|
||||
// Remove subscription from slice
|
||||
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
|
||||
close(subscriber)
|
||||
close(sub.ch)
|
||||
// Record unsubscription metric
|
||||
eb.metrics.RecordUnsubscribe(namespacePattern)
|
||||
break
|
||||
@@ -143,8 +181,8 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
||||
|
||||
// Publish sends an event to all subscribers of a namespace.
|
||||
// Events are delivered to:
|
||||
// - All exact subscribers for the namespace
|
||||
// - All wildcard subscribers whose pattern matches the namespace
|
||||
// - All exact subscribers for the namespace (after filter matching)
|
||||
// - All wildcard subscribers whose pattern matches the namespace (after filter matching)
|
||||
func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
||||
eb.mutex.RLock()
|
||||
defer eb.mutex.RUnlock()
|
||||
@@ -154,20 +192,28 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
||||
|
||||
// Deliver to exact subscribers
|
||||
subscribers := eb.exactSubscribers[namespaceID]
|
||||
for _, ch := range subscribers {
|
||||
select {
|
||||
case ch <- event:
|
||||
// Event delivered
|
||||
eb.metrics.RecordReceive(namespaceID)
|
||||
default:
|
||||
// Channel full, skip this subscriber (non-blocking)
|
||||
eb.metrics.RecordDroppedEvent(namespaceID)
|
||||
}
|
||||
for _, sub := range subscribers {
|
||||
eb.deliverToSubscriber(sub, event, namespaceID)
|
||||
}
|
||||
|
||||
// Deliver to matching wildcard subscribers
|
||||
for _, sub := range eb.wildcardSubscribers {
|
||||
if MatchNamespacePattern(sub.pattern, namespaceID) {
|
||||
eb.deliverToSubscriber(sub, event, namespaceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// deliverToSubscriber delivers an event to a subscriber if it matches the filter
|
||||
func (eb *EventBus) deliverToSubscriber(sub *filteredSubscription, event *Event, namespaceID string) {
|
||||
// Apply filter if present
|
||||
if sub.filter != nil && !sub.filter.IsEmpty() {
|
||||
if !sub.filter.Matches(event) {
|
||||
// Event doesn't match filter, skip delivery
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case sub.ch <- event:
|
||||
// Event delivered
|
||||
@@ -176,8 +222,6 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
||||
// Channel full, skip this subscriber (non-blocking)
|
||||
eb.metrics.RecordDroppedEvent(namespaceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop closes the event bus
|
||||
@@ -189,8 +233,8 @@ func (eb *EventBus) Stop() {
|
||||
|
||||
// Close all exact subscriber channels and update metrics
|
||||
for namespaceID, subs := range eb.exactSubscribers {
|
||||
for _, ch := range subs {
|
||||
close(ch)
|
||||
for _, sub := range subs {
|
||||
close(sub.ch)
|
||||
eb.metrics.RecordUnsubscribe(namespaceID)
|
||||
}
|
||||
}
|
||||
@@ -201,8 +245,8 @@ func (eb *EventBus) Stop() {
|
||||
eb.metrics.RecordUnsubscribe(sub.pattern)
|
||||
}
|
||||
|
||||
eb.exactSubscribers = make(map[string][]chan *Event)
|
||||
eb.wildcardSubscribers = make([]subscription, 0)
|
||||
eb.exactSubscribers = make(map[string][]*filteredSubscription)
|
||||
eb.wildcardSubscribers = make([]*filteredSubscription, 0)
|
||||
}
|
||||
|
||||
// SubscriberCount returns the number of subscribers for a namespace.
|
||||
|
||||
406
eventbus_test.go
406
eventbus_test.go
@@ -414,3 +414,409 @@ func TestEventBus_ConcurrentOperations(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Tests for SubscribeWithFilter functionality
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_EventTypes(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Subscribe with filter for specific event types
|
||||
filter := &SubscriptionFilter{
|
||||
EventTypes: []string{"OrderPlaced", "OrderShipped"},
|
||||
}
|
||||
ch := eb.SubscribeWithFilter("orders", filter)
|
||||
|
||||
// Publish events of different types
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
|
||||
{ID: "evt-2", EventType: "OrderCancelled", ActorID: "order-2"}, // Should not be received
|
||||
{ID: "evt-3", EventType: "OrderShipped", ActorID: "order-3"},
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("orders", e)
|
||||
}
|
||||
|
||||
// Should receive evt-1 and evt-3, but not evt-2
|
||||
received := make(map[string]bool)
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
received[evt.ID] = true
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out after receiving %d events", len(received))
|
||||
}
|
||||
}
|
||||
|
||||
if !received["evt-1"] || !received["evt-3"] {
|
||||
t.Errorf("expected to receive evt-1 and evt-3, got %v", received)
|
||||
}
|
||||
|
||||
// Verify evt-2 was not received
|
||||
select {
|
||||
case evt := <-ch:
|
||||
t.Errorf("unexpected event received: %s", evt.ID)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_ActorPattern(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Subscribe with filter for specific actor pattern
|
||||
filter := &SubscriptionFilter{
|
||||
ActorPattern: "order-*",
|
||||
}
|
||||
ch := eb.SubscribeWithFilter("events", filter)
|
||||
|
||||
// Publish events from different actors
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "Test", ActorID: "order-123"},
|
||||
{ID: "evt-2", EventType: "Test", ActorID: "user-456"}, // Should not be received
|
||||
{ID: "evt-3", EventType: "Test", ActorID: "order-789"},
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("events", e)
|
||||
}
|
||||
|
||||
// Should receive evt-1 and evt-3, but not evt-2
|
||||
received := make(map[string]bool)
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
received[evt.ID] = true
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out after receiving %d events", len(received))
|
||||
}
|
||||
}
|
||||
|
||||
if !received["evt-1"] || !received["evt-3"] {
|
||||
t.Errorf("expected to receive evt-1 and evt-3, got %v", received)
|
||||
}
|
||||
|
||||
// Verify evt-2 was not received
|
||||
select {
|
||||
case evt := <-ch:
|
||||
t.Errorf("unexpected event received: %s", evt.ID)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_Combined(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Subscribe with filter for both event type AND actor pattern
|
||||
filter := &SubscriptionFilter{
|
||||
EventTypes: []string{"OrderPlaced"},
|
||||
ActorPattern: "order-*",
|
||||
}
|
||||
ch := eb.SubscribeWithFilter("orders", filter)
|
||||
|
||||
// Publish events with various combinations
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-123"}, // Should be received
|
||||
{ID: "evt-2", EventType: "OrderPlaced", ActorID: "user-456"}, // Wrong actor
|
||||
{ID: "evt-3", EventType: "OrderCancelled", ActorID: "order-789"}, // Wrong type
|
||||
{ID: "evt-4", EventType: "OrderCancelled", ActorID: "user-000"}, // Wrong both
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("orders", e)
|
||||
}
|
||||
|
||||
// Should only receive evt-1
|
||||
select {
|
||||
case evt := <-ch:
|
||||
if evt.ID != "evt-1" {
|
||||
t.Errorf("expected evt-1, got %s", evt.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timed out waiting for event")
|
||||
}
|
||||
|
||||
// Verify no more events arrive
|
||||
select {
|
||||
case evt := <-ch:
|
||||
t.Errorf("unexpected event received: %s", evt.ID)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_NilFilter(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Subscribe with nil filter - should receive all events
|
||||
ch := eb.SubscribeWithFilter("events", nil)
|
||||
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"},
|
||||
{ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"},
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("events", e)
|
||||
}
|
||||
|
||||
received := make(map[string]bool)
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
received[evt.ID] = true
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out after receiving %d events", len(received))
|
||||
}
|
||||
}
|
||||
|
||||
if !received["evt-1"] || !received["evt-2"] {
|
||||
t.Errorf("expected all events, got %v", received)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_EmptyFilter(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Subscribe with empty filter - should receive all events
|
||||
ch := eb.SubscribeWithFilter("events", &SubscriptionFilter{})
|
||||
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "TypeA", ActorID: "actor-1"},
|
||||
{ID: "evt-2", EventType: "TypeB", ActorID: "actor-2"},
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("events", e)
|
||||
}
|
||||
|
||||
received := make(map[string]bool)
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
received[evt.ID] = true
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out after receiving %d events", len(received))
|
||||
}
|
||||
}
|
||||
|
||||
if !received["evt-1"] || !received["evt-2"] {
|
||||
t.Errorf("expected all events, got %v", received)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_WildcardNamespaceAndFilter(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Subscribe to wildcard namespace pattern with event type filter
|
||||
filter := &SubscriptionFilter{
|
||||
EventTypes: []string{"OrderPlaced"},
|
||||
}
|
||||
ch := eb.SubscribeWithFilter("prod.*", filter)
|
||||
|
||||
// Publish events to different namespaces
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"}, // prod.orders - should match
|
||||
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"}, // prod.orders - wrong type
|
||||
{ID: "evt-3", EventType: "OrderPlaced", ActorID: "order-3"}, // staging.orders - wrong namespace
|
||||
}
|
||||
|
||||
eb.Publish("prod.orders", events[0])
|
||||
eb.Publish("prod.orders", events[1])
|
||||
eb.Publish("staging.orders", events[2])
|
||||
|
||||
// Should only receive evt-1
|
||||
select {
|
||||
case evt := <-ch:
|
||||
if evt.ID != "evt-1" {
|
||||
t.Errorf("expected evt-1, got %s", evt.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("timed out waiting for event")
|
||||
}
|
||||
|
||||
// Verify no more events arrive
|
||||
select {
|
||||
case evt := <-ch:
|
||||
t.Errorf("unexpected event received: %s", evt.ID)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_MultipleSubscribersWithDifferentFilters(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Two subscribers with different filters on same namespace
|
||||
filter1 := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
|
||||
filter2 := &SubscriptionFilter{EventTypes: []string{"OrderShipped"}}
|
||||
|
||||
ch1 := eb.SubscribeWithFilter("orders", filter1)
|
||||
ch2 := eb.SubscribeWithFilter("orders", filter2)
|
||||
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
|
||||
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"},
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("orders", e)
|
||||
}
|
||||
|
||||
// ch1 should only receive evt-1
|
||||
select {
|
||||
case evt := <-ch1:
|
||||
if evt.ID != "evt-1" {
|
||||
t.Errorf("ch1: expected evt-1, got %s", evt.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("ch1 timed out")
|
||||
}
|
||||
|
||||
// ch2 should only receive evt-2
|
||||
select {
|
||||
case evt := <-ch2:
|
||||
if evt.ID != "evt-2" {
|
||||
t.Errorf("ch2: expected evt-2, got %s", evt.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("ch2 timed out")
|
||||
}
|
||||
|
||||
// Verify no extra events
|
||||
select {
|
||||
case evt := <-ch1:
|
||||
t.Errorf("ch1: unexpected event %s", evt.ID)
|
||||
case evt := <-ch2:
|
||||
t.Errorf("ch2: unexpected event %s", evt.ID)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_UnsubscribeFiltered(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
|
||||
ch := eb.SubscribeWithFilter("orders", filter)
|
||||
|
||||
// Verify subscription count
|
||||
if eb.SubscriberCount("orders") != 1 {
|
||||
t.Errorf("expected 1 subscriber, got %d", eb.SubscriberCount("orders"))
|
||||
}
|
||||
|
||||
eb.Unsubscribe("orders", ch)
|
||||
|
||||
// Verify unsubscribed
|
||||
if eb.SubscriberCount("orders") != 0 {
|
||||
t.Errorf("expected 0 subscribers, got %d", eb.SubscriberCount("orders"))
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_FilteredAndUnfilteredCoexist(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// One subscriber with filter, one without
|
||||
filter := &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}
|
||||
chFiltered := eb.SubscribeWithFilter("orders", filter)
|
||||
chUnfiltered := eb.Subscribe("orders")
|
||||
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "OrderPlaced", ActorID: "order-1"},
|
||||
{ID: "evt-2", EventType: "OrderShipped", ActorID: "order-2"},
|
||||
}
|
||||
|
||||
for _, e := range events {
|
||||
eb.Publish("orders", e)
|
||||
}
|
||||
|
||||
// Filtered subscriber should only receive evt-1
|
||||
select {
|
||||
case evt := <-chFiltered:
|
||||
if evt.ID != "evt-1" {
|
||||
t.Errorf("filtered: expected evt-1, got %s", evt.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("filtered subscriber timed out")
|
||||
}
|
||||
|
||||
// Unfiltered subscriber should receive both
|
||||
received := make(map[string]bool)
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case evt := <-chUnfiltered:
|
||||
received[evt.ID] = true
|
||||
case <-timeout:
|
||||
t.Fatalf("unfiltered timed out after %d events", len(received))
|
||||
}
|
||||
}
|
||||
|
||||
if !received["evt-1"] || !received["evt-2"] {
|
||||
t.Errorf("unfiltered expected both events, got %v", received)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscribeWithFilter_WildcardGreaterWithFilter(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Use > wildcard (matches one or more tokens) with filter
|
||||
filter := &SubscriptionFilter{
|
||||
ActorPattern: "order-*",
|
||||
}
|
||||
ch := eb.SubscribeWithFilter(">", filter)
|
||||
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "Test", ActorID: "order-123"},
|
||||
{ID: "evt-2", EventType: "Test", ActorID: "user-456"},
|
||||
{ID: "evt-3", EventType: "Test", ActorID: "order-789"},
|
||||
}
|
||||
|
||||
// Publish to different namespaces
|
||||
eb.Publish("tenant-a", events[0])
|
||||
eb.Publish("tenant-b", events[1])
|
||||
eb.Publish("prod.orders", events[2])
|
||||
|
||||
// Should receive evt-1 and evt-3, but not evt-2
|
||||
received := make(map[string]bool)
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
received[evt.ID] = true
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out after %d events", len(received))
|
||||
}
|
||||
}
|
||||
|
||||
if !received["evt-1"] || !received["evt-3"] {
|
||||
t.Errorf("expected evt-1 and evt-3, got %v", received)
|
||||
}
|
||||
|
||||
// Verify no evt-2
|
||||
select {
|
||||
case evt := <-ch:
|
||||
t.Errorf("unexpected event: %s", evt.ID)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,11 +61,25 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
||||
// Security Warning: Wildcard patterns receive events from all matching namespaces,
|
||||
// bypassing namespace isolation. Only use for trusted system components.
|
||||
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
|
||||
return neb.SubscribeWithFilter(namespacePattern, nil)
|
||||
}
|
||||
|
||||
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
|
||||
// Events are filtered by the provided SubscriptionFilter before delivery.
|
||||
// If filter is nil or empty, all events matching the namespace pattern are delivered.
|
||||
//
|
||||
// For NATSEventBus:
|
||||
// - Namespace pattern filtering is applied at the NATS level using native wildcards
|
||||
// - EventTypes and ActorPattern filters are applied client-side after receiving messages
|
||||
//
|
||||
// This allows efficient server-side filtering for namespaces while providing
|
||||
// flexible client-side filtering for event types and actors.
|
||||
func (neb *NATSEventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
|
||||
neb.mutex.Lock()
|
||||
defer neb.mutex.Unlock()
|
||||
|
||||
// Create local subscription first
|
||||
ch := neb.EventBus.Subscribe(namespacePattern)
|
||||
// Create local subscription first (with filter)
|
||||
ch := neb.EventBus.SubscribeWithFilter(namespacePattern, filter)
|
||||
|
||||
// Check if this is the first subscriber for this pattern
|
||||
count := neb.patternSubscribers[namespacePattern]
|
||||
@@ -141,12 +155,21 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string
|
||||
}
|
||||
|
||||
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
|
||||
// Applies filters before delivery.
|
||||
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
|
||||
neb.EventBus.mutex.RLock()
|
||||
defer neb.EventBus.mutex.RUnlock()
|
||||
|
||||
for _, sub := range neb.EventBus.wildcardSubscribers {
|
||||
if sub.pattern == pattern {
|
||||
// Apply filter if present
|
||||
if sub.filter != nil && !sub.filter.IsEmpty() {
|
||||
if !sub.filter.Matches(event) {
|
||||
// Event doesn't match filter, skip delivery
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case sub.ch <- event:
|
||||
// Event delivered from NATS
|
||||
|
||||
114
pattern.go
114
pattern.go
@@ -81,3 +81,117 @@ func matchTokens(patternTokens, namespaceTokens []string) bool {
|
||||
func IsWildcardPattern(pattern string) bool {
|
||||
return strings.Contains(pattern, "*") || strings.Contains(pattern, ">")
|
||||
}
|
||||
|
||||
// SubscriptionFilter defines optional filters for event subscriptions.
|
||||
// All configured filters are combined with AND logic - an event must match
|
||||
// all specified criteria to be delivered to the subscriber.
|
||||
//
|
||||
// Filter Processing:
|
||||
// - EventTypes: Event must have an EventType matching at least one in the list (OR within types)
|
||||
// - ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards)
|
||||
//
|
||||
// Filtering is applied client-side in the EventBus. For NATSEventBus, namespace-level
|
||||
// filtering uses NATS subject patterns, while EventTypes and ActorPattern filtering
|
||||
// happens after message receipt.
|
||||
type SubscriptionFilter struct {
|
||||
// EventTypes filters events by type. Empty slice means all event types.
|
||||
// If specified, only events with an EventType in this list are delivered.
|
||||
// Example: []string{"OrderPlaced", "OrderShipped"} receives only those event types.
|
||||
EventTypes []string
|
||||
|
||||
// ActorPattern filters events by actor ID pattern. Empty string means all actors.
|
||||
// Supports NATS-style wildcards:
|
||||
// - "*" matches a single token (e.g., "order-*" matches "order-123", "order-456")
|
||||
// - ">" matches one or more tokens (e.g., "order.>" matches "order.us.123", "order.eu.456")
|
||||
// Example: "order-*" receives events only for actors starting with "order-"
|
||||
ActorPattern string
|
||||
}
|
||||
|
||||
// IsEmpty returns true if no filters are configured.
|
||||
func (f *SubscriptionFilter) IsEmpty() bool {
|
||||
return len(f.EventTypes) == 0 && f.ActorPattern == ""
|
||||
}
|
||||
|
||||
// Matches returns true if the event matches all configured filters.
|
||||
// An empty filter matches all events.
|
||||
func (f *SubscriptionFilter) Matches(event *Event) bool {
|
||||
if event == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check event type filter
|
||||
if len(f.EventTypes) > 0 {
|
||||
typeMatch := false
|
||||
for _, et := range f.EventTypes {
|
||||
if event.EventType == et {
|
||||
typeMatch = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !typeMatch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Check actor pattern filter
|
||||
if f.ActorPattern != "" {
|
||||
if !MatchActorPattern(f.ActorPattern, event.ActorID) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// MatchActorPattern checks if an actor ID matches a pattern.
|
||||
// Uses the same matching logic as MatchNamespacePattern for consistency.
|
||||
//
|
||||
// Patterns:
|
||||
// - "*" matches a single token (e.g., "order-*" matches "order-123")
|
||||
// - ">" matches one or more tokens (e.g., "order.>" matches "order.us.east")
|
||||
// - Exact strings match exactly (e.g., "order-123" matches only "order-123")
|
||||
//
|
||||
// Note: For simple prefix matching without dots (e.g., "order-*" matching "order-123"),
|
||||
// this uses simplified matching where "*" matches any remaining characters in a token.
|
||||
func MatchActorPattern(pattern, actorID string) bool {
|
||||
// Empty pattern matches nothing
|
||||
if pattern == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
// Empty actor ID matches nothing except ">"
|
||||
if actorID == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
// If pattern contains dots, use token-based matching (same as namespace)
|
||||
if strings.Contains(pattern, ".") || strings.Contains(actorID, ".") {
|
||||
return MatchNamespacePattern(pattern, actorID)
|
||||
}
|
||||
|
||||
// Simple matching for non-tokenized patterns
|
||||
// ">" matches any non-empty actor ID
|
||||
if pattern == ">" {
|
||||
return true
|
||||
}
|
||||
|
||||
// "*" matches any single-token actor ID (no dots)
|
||||
if pattern == "*" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check for suffix wildcard (e.g., "order-*")
|
||||
if strings.HasSuffix(pattern, "*") {
|
||||
prefix := strings.TrimSuffix(pattern, "*")
|
||||
return strings.HasPrefix(actorID, prefix)
|
||||
}
|
||||
|
||||
// Check for suffix multi-match (e.g., "order->")
|
||||
if strings.HasSuffix(pattern, ">") {
|
||||
prefix := strings.TrimSuffix(pattern, ">")
|
||||
return strings.HasPrefix(actorID, prefix)
|
||||
}
|
||||
|
||||
// Exact match
|
||||
return pattern == actorID
|
||||
}
|
||||
|
||||
125
pattern_test.go
125
pattern_test.go
@@ -115,3 +115,128 @@ func BenchmarkMatchNamespacePattern(b *testing.B) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMatchActorPattern(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
pattern string
|
||||
actorID string
|
||||
expected bool
|
||||
}{
|
||||
// Empty cases
|
||||
{"empty pattern", "", "actor-123", false},
|
||||
{"empty actorID", "actor-*", "", false},
|
||||
{"both empty", "", "", false},
|
||||
|
||||
// Exact matches (no dots)
|
||||
{"exact match", "actor-123", "actor-123", true},
|
||||
{"exact mismatch", "actor-123", "actor-456", false},
|
||||
|
||||
// Suffix wildcard with * (simple, no dots)
|
||||
{"prefix with star", "order-*", "order-123", true},
|
||||
{"prefix with star 2", "order-*", "order-456-xyz", true},
|
||||
{"prefix with star mismatch", "order-*", "user-123", false},
|
||||
{"star alone", "*", "anything", true},
|
||||
|
||||
// Suffix wildcard with > (simple, no dots)
|
||||
{"prefix with greater", "order->", "order-123", true},
|
||||
{"greater alone", ">", "anything", true},
|
||||
|
||||
// Dot-separated actor IDs (uses MatchNamespacePattern)
|
||||
{"dotted exact match", "order.us.123", "order.us.123", true},
|
||||
{"dotted exact mismatch", "order.us.123", "order.eu.123", false},
|
||||
{"dotted star", "order.*", "order.123", true},
|
||||
{"dotted star deep", "order.*.*", "order.us.123", true},
|
||||
{"dotted greater", "order.>", "order.us.123.456", true},
|
||||
{"dotted star mismatch depth", "order.*", "order.us.123", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := MatchActorPattern(tt.pattern, tt.actorID)
|
||||
if result != tt.expected {
|
||||
t.Errorf("MatchActorPattern(%q, %q) = %v, want %v",
|
||||
tt.pattern, tt.actorID, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscriptionFilter_IsEmpty(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
filter *SubscriptionFilter
|
||||
expected bool
|
||||
}{
|
||||
{"nil fields", &SubscriptionFilter{}, true},
|
||||
{"empty slice", &SubscriptionFilter{EventTypes: []string{}}, true},
|
||||
{"has event types", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}}, false},
|
||||
{"has actor pattern", &SubscriptionFilter{ActorPattern: "order-*"}, false},
|
||||
{"has both", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"}, false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := tt.filter.IsEmpty()
|
||||
if result != tt.expected {
|
||||
t.Errorf("SubscriptionFilter.IsEmpty() = %v, want %v", result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscriptionFilter_Matches(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
filter *SubscriptionFilter
|
||||
event *Event
|
||||
expected bool
|
||||
}{
|
||||
// Nil event
|
||||
{"nil event", &SubscriptionFilter{}, nil, false},
|
||||
|
||||
// Empty filter matches all
|
||||
{"empty filter", &SubscriptionFilter{}, &Event{EventType: "Test", ActorID: "actor-1"}, true},
|
||||
|
||||
// Event type filtering
|
||||
{"event type match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}},
|
||||
&Event{EventType: "OrderPlaced", ActorID: "order-1"}, true},
|
||||
{"event type mismatch", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}},
|
||||
&Event{EventType: "OrderShipped", ActorID: "order-1"}, false},
|
||||
{"event type multiple match first", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}},
|
||||
&Event{EventType: "OrderPlaced", ActorID: "order-1"}, true},
|
||||
{"event type multiple match second", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}},
|
||||
&Event{EventType: "OrderShipped", ActorID: "order-1"}, true},
|
||||
{"event type multiple no match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced", "OrderShipped"}},
|
||||
&Event{EventType: "OrderCancelled", ActorID: "order-1"}, false},
|
||||
|
||||
// Actor pattern filtering
|
||||
{"actor pattern exact match", &SubscriptionFilter{ActorPattern: "order-123"},
|
||||
&Event{EventType: "Test", ActorID: "order-123"}, true},
|
||||
{"actor pattern exact mismatch", &SubscriptionFilter{ActorPattern: "order-123"},
|
||||
&Event{EventType: "Test", ActorID: "order-456"}, false},
|
||||
{"actor pattern wildcard match", &SubscriptionFilter{ActorPattern: "order-*"},
|
||||
&Event{EventType: "Test", ActorID: "order-123"}, true},
|
||||
{"actor pattern wildcard mismatch", &SubscriptionFilter{ActorPattern: "order-*"},
|
||||
&Event{EventType: "Test", ActorID: "user-123"}, false},
|
||||
|
||||
// Combined filters (AND logic)
|
||||
{"combined both match", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
|
||||
&Event{EventType: "OrderPlaced", ActorID: "order-123"}, true},
|
||||
{"combined event matches actor does not", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
|
||||
&Event{EventType: "OrderPlaced", ActorID: "user-123"}, false},
|
||||
{"combined actor matches event does not", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
|
||||
&Event{EventType: "OrderShipped", ActorID: "order-123"}, false},
|
||||
{"combined neither matches", &SubscriptionFilter{EventTypes: []string{"OrderPlaced"}, ActorPattern: "order-*"},
|
||||
&Event{EventType: "OrderShipped", ActorID: "user-123"}, false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := tt.filter.Matches(tt.event)
|
||||
if result != tt.expected {
|
||||
t.Errorf("SubscriptionFilter.Matches() = %v, want %v", result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user