Files
aether/eventbus.go
Hugo Nijhuis ef73fb6bfd
All checks were successful
CI / build (pull_request) Successful in 19s
CI / build (push) Successful in 39s
Add namespace event filtering (SubscribeWithFilter)
Adds support for filtering events by type or actor pattern within namespace
subscriptions. Key changes:

- Add SubscriptionFilter type with EventTypes and ActorPattern fields
- Add SubscribeWithFilter to EventBroadcaster interface
- Implement filtering in EventBus with full wildcard pattern support preserved
- Implement filtering in NATSEventBus (server-side namespace, client-side filters)
- Add MatchActorPattern function for actor ID pattern matching
- Add comprehensive unit tests for all filtering scenarios

Filter Processing:
- EventTypes: Event must match at least one type in the list (OR within types)
- ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards)
- Multiple filters are combined with AND logic

This implementation works alongside the existing wildcard subscription support:
- Namespace wildcards (* and >) work with event filters
- Filters are applied after namespace pattern matching
- Metrics are properly recorded for filtered subscriptions

Closes #21

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 23:45:57 +01:00

268 lines
9.0 KiB
Go

package aether
import (
"context"
"sync"
)
// EventBroadcaster defines the interface for publishing and subscribing to events.
//
// Subscribe accepts namespace patterns following NATS subject matching conventions:
// - Exact match: "tenant-a" matches only "tenant-a"
// - Single wildcard: "*" matches any single token, "tenant-*" matches "tenant-a", "tenant-b"
// - Multi-token wildcard: ">" matches one or more tokens (only at end of pattern)
//
// Security Warning: Wildcard subscriptions bypass namespace isolation.
// Only grant wildcard access to trusted system components.
type EventBroadcaster interface {
// Subscribe creates a channel that receives events matching the namespace pattern.
// Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple.
Subscribe(namespacePattern string) <-chan *Event
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
// Events are filtered by the provided SubscriptionFilter before delivery.
// Filters are applied with AND logic - events must match all specified criteria.
//
// Example: Subscribe to "orders" namespace, only receiving "OrderPlaced" events for "order-*" actors:
// filter := &SubscriptionFilter{
// EventTypes: []string{"OrderPlaced"},
// ActorPattern: "order-*",
// }
// ch := bus.SubscribeWithFilter("orders", filter)
SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event
Unsubscribe(namespacePattern string, ch <-chan *Event)
Publish(namespaceID string, event *Event)
Stop()
SubscriberCount(namespaceID string) int
}
// MetricsProvider is an optional interface that EventBroadcaster implementations
// can implement to expose metrics.
type MetricsProvider interface {
// Metrics returns the metrics collector for this broadcaster.
Metrics() BroadcasterMetrics
}
// subscription represents a single subscriber channel with its pattern
type subscription struct {
pattern string
ch chan *Event
}
// filteredSubscription represents a subscriber with an optional filter
type filteredSubscription struct {
pattern string
ch chan *Event
filter *SubscriptionFilter
}
// EventBus broadcasts events to multiple subscribers within a namespace.
// Supports wildcard patterns for cross-namespace subscriptions.
//
// Security Considerations:
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
// This is intentional for cross-cutting concerns like logging, monitoring, and auditing.
// However, it bypasses namespace isolation - use with appropriate access controls.
type EventBus struct {
// exactSubscribers holds subscribers for exact namespace matches (no wildcards)
exactSubscribers map[string][]*filteredSubscription
// wildcardSubscribers holds subscribers with wildcard patterns
wildcardSubscribers []*filteredSubscription
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
metrics *DefaultMetricsCollector
}
// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
ctx, cancel := context.WithCancel(context.Background())
return &EventBus{
exactSubscribers: make(map[string][]*filteredSubscription),
wildcardSubscribers: make([]*filteredSubscription, 0),
ctx: ctx,
cancel: cancel,
metrics: NewMetricsCollector(),
}
}
// Metrics returns the metrics collector for this event bus.
func (eb *EventBus) Metrics() BroadcasterMetrics {
return eb.metrics
}
// Subscribe creates a new subscription channel for a namespace pattern.
// Patterns follow NATS subject matching conventions:
// - "*" matches a single token (any sequence without ".")
// - ">" matches one or more tokens (only valid at the end)
// - Exact strings match exactly
//
// Security Warning: Wildcard patterns receive events from all matching namespaces,
// bypassing namespace isolation. Only use for trusted system components.
func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event {
return eb.SubscribeWithFilter(namespacePattern, nil)
}
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
// Events are filtered by the provided SubscriptionFilter before delivery.
// If filter is nil or empty, all events matching the namespace pattern are delivered.
//
// Filtering is applied client-side for efficient processing:
// - EventTypes: Only events with matching event types are delivered
// - ActorPattern: Only events from matching actors are delivered
//
// Both namespace pattern wildcards and event filters work together:
// - Namespace pattern determines which namespaces to subscribe to
// - Filter determines which events within those namespaces to receive
func (eb *EventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
eb.mutex.Lock()
defer eb.mutex.Unlock()
// Create buffered channel to prevent blocking publishers
ch := make(chan *Event, 100)
sub := &filteredSubscription{
pattern: namespacePattern,
ch: ch,
filter: filter,
}
if IsWildcardPattern(namespacePattern) {
// Store wildcard subscription separately
eb.wildcardSubscribers = append(eb.wildcardSubscribers, sub)
} else {
// Exact match subscription
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], sub)
}
// Record subscription metric
eb.metrics.RecordSubscribe(namespacePattern)
return ch
}
// Unsubscribe removes a subscription channel
func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
if IsWildcardPattern(namespacePattern) {
// Remove from wildcard subscribers
for i, sub := range eb.wildcardSubscribers {
if sub.ch == ch {
eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...)
close(sub.ch)
// Record unsubscription metric
eb.metrics.RecordUnsubscribe(namespacePattern)
break
}
}
} else {
// Remove from exact subscribers
subs := eb.exactSubscribers[namespacePattern]
for i, sub := range subs {
if sub.ch == ch {
// Remove subscription from slice
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
close(sub.ch)
// Record unsubscription metric
eb.metrics.RecordUnsubscribe(namespacePattern)
break
}
}
// Clean up empty namespace entries
if len(eb.exactSubscribers[namespacePattern]) == 0 {
delete(eb.exactSubscribers, namespacePattern)
}
}
}
// Publish sends an event to all subscribers of a namespace.
// Events are delivered to:
// - All exact subscribers for the namespace (after filter matching)
// - All wildcard subscribers whose pattern matches the namespace (after filter matching)
func (eb *EventBus) Publish(namespaceID string, event *Event) {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
// Record publish metric
eb.metrics.RecordPublish(namespaceID)
// Deliver to exact subscribers
subscribers := eb.exactSubscribers[namespaceID]
for _, sub := range subscribers {
eb.deliverToSubscriber(sub, event, namespaceID)
}
// Deliver to matching wildcard subscribers
for _, sub := range eb.wildcardSubscribers {
if MatchNamespacePattern(sub.pattern, namespaceID) {
eb.deliverToSubscriber(sub, event, namespaceID)
}
}
}
// deliverToSubscriber delivers an event to a subscriber if it matches the filter
func (eb *EventBus) deliverToSubscriber(sub *filteredSubscription, event *Event, namespaceID string) {
// Apply filter if present
if sub.filter != nil && !sub.filter.IsEmpty() {
if !sub.filter.Matches(event) {
// Event doesn't match filter, skip delivery
return
}
}
select {
case sub.ch <- event:
// Event delivered
eb.metrics.RecordReceive(namespaceID)
default:
// Channel full, skip this subscriber (non-blocking)
eb.metrics.RecordDroppedEvent(namespaceID)
}
}
// Stop closes the event bus
func (eb *EventBus) Stop() {
eb.mutex.Lock()
defer eb.mutex.Unlock()
eb.cancel()
// Close all exact subscriber channels and update metrics
for namespaceID, subs := range eb.exactSubscribers {
for _, sub := range subs {
close(sub.ch)
eb.metrics.RecordUnsubscribe(namespaceID)
}
}
// Close all wildcard subscriber channels and update metrics
for _, sub := range eb.wildcardSubscribers {
close(sub.ch)
eb.metrics.RecordUnsubscribe(sub.pattern)
}
eb.exactSubscribers = make(map[string][]*filteredSubscription)
eb.wildcardSubscribers = make([]*filteredSubscription, 0)
}
// SubscriberCount returns the number of subscribers for a namespace.
// This counts only exact match subscribers, not wildcard subscribers that may match.
func (eb *EventBus) SubscriberCount(namespaceID string) int {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
return len(eb.exactSubscribers[namespaceID])
}
// WildcardSubscriberCount returns the number of wildcard subscribers.
// These are subscribers using "*" or ">" patterns that may receive events
// from multiple namespaces.
func (eb *EventBus) WildcardSubscriberCount() int {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
return len(eb.wildcardSubscribers)
}