Files
aether/nats_eventbus.go
Hugo Nijhuis ef73fb6bfd
All checks were successful
CI / build (pull_request) Successful in 19s
CI / build (push) Successful in 39s
Add namespace event filtering (SubscribeWithFilter)
Adds support for filtering events by type or actor pattern within namespace
subscriptions. Key changes:

- Add SubscriptionFilter type with EventTypes and ActorPattern fields
- Add SubscribeWithFilter to EventBroadcaster interface
- Implement filtering in EventBus with full wildcard pattern support preserved
- Implement filtering in NATSEventBus (server-side namespace, client-side filters)
- Add MatchActorPattern function for actor ID pattern matching
- Add comprehensive unit tests for all filtering scenarios

Filter Processing:
- EventTypes: Event must match at least one type in the list (OR within types)
- ActorPattern: Event's ActorID must match the pattern (supports * and > wildcards)
- Multiple filters are combined with AND logic

This implementation works alongside the existing wildcard subscription support:
- Namespace wildcards (* and >) work with event filters
- Filters are applied after namespace pattern matching
- Metrics are properly recorded for filtered subscriptions

Closes #21

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 23:45:57 +01:00

231 lines
7.6 KiB
Go

package aether
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS.
// Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards.
//
// Security Considerations:
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
// are in place at the application layer before granting wildcard subscription access.
type NATSEventBus struct {
*EventBus // Embed base EventBus for local subscriptions
nc *nats.Conn // NATS connection
subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node
mutex sync.Mutex
ctx context.Context
cancel context.CancelFunc
}
// eventMessage is the wire format for events sent over NATS
type eventMessage struct {
NodeID string `json:"node_id"`
NamespaceID string `json:"namespace_id"`
Event *Event `json:"event"`
}
// NewNATSEventBus creates a new NATS-backed event bus
func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
ctx, cancel := context.WithCancel(context.Background())
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
ctx: ctx,
cancel: cancel,
}
return neb, nil
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Security Warning: Wildcard patterns receive events from all matching namespaces,
// bypassing namespace isolation. Only use for trusted system components.
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
return neb.SubscribeWithFilter(namespacePattern, nil)
}
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
// Events are filtered by the provided SubscriptionFilter before delivery.
// If filter is nil or empty, all events matching the namespace pattern are delivered.
//
// For NATSEventBus:
// - Namespace pattern filtering is applied at the NATS level using native wildcards
// - EventTypes and ActorPattern filters are applied client-side after receiving messages
//
// This allows efficient server-side filtering for namespaces while providing
// flexible client-side filtering for event types and actors.
func (neb *NATSEventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
// Create local subscription first (with filter)
ch := neb.EventBus.SubscribeWithFilter(namespacePattern, filter)
// Check if this is the first subscriber for this pattern
count := neb.patternSubscribers[namespacePattern]
if count == 0 {
// First subscriber - create NATS subscription
// NATS natively supports wildcards, so we can use the pattern directly
subject := fmt.Sprintf("aether.events.%s", namespacePattern)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
neb.handleNATSEvent(msg, namespacePattern)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
// Record subscription error
neb.metrics.RecordSubscribeError(namespacePattern)
} else {
neb.subscriptions = append(neb.subscriptions, sub)
if IsWildcardPattern(namespacePattern) {
log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject)
} else {
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
}
}
}
neb.patternSubscribers[namespacePattern] = count + 1
return ch
}
// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers
func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
neb.mutex.Lock()
defer neb.mutex.Unlock()
neb.EventBus.Unsubscribe(namespacePattern, ch)
count := neb.patternSubscribers[namespacePattern]
if count > 0 {
count--
neb.patternSubscribers[namespacePattern] = count
if count == 0 {
delete(neb.patternSubscribers, namespacePattern)
log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID)
}
}
}
// handleNATSEvent processes events received from NATS
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err)
return
}
// Skip events that originated from this node (already delivered locally)
if eventMsg.NodeID == neb.nodeID {
return
}
// For wildcard subscriptions, we need to deliver to the EventBus using
// the subscribed pattern so it reaches the correct wildcard subscriber.
// For exact subscriptions, use the actual namespace.
if IsWildcardPattern(subscribedPattern) {
// Deliver using the pattern - the EventBus will route to wildcard subscribers
neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event)
} else {
// Forward to local EventBus subscribers with actual namespace
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
}
}
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
// Applies filters before delivery.
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
neb.EventBus.mutex.RLock()
defer neb.EventBus.mutex.RUnlock()
for _, sub := range neb.EventBus.wildcardSubscribers {
if sub.pattern == pattern {
// Apply filter if present
if sub.filter != nil && !sub.filter.IsEmpty() {
if !sub.filter.Matches(event) {
// Event doesn't match filter, skip delivery
continue
}
}
select {
case sub.ch <- event:
// Event delivered from NATS
neb.metrics.RecordReceive(pattern)
default:
// Channel full, skip this subscriber (non-blocking)
neb.metrics.RecordDroppedEvent(pattern)
}
}
}
}
// Publish publishes an event both locally and to NATS for cross-node broadcasting
func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
// First publish locally
neb.EventBus.Publish(namespaceID, event)
// Then publish to NATS for other nodes
subject := fmt.Sprintf("aether.events.%s", namespaceID)
eventMsg := eventMessage{
NodeID: neb.nodeID,
NamespaceID: namespaceID,
Event: event,
}
data, err := json.Marshal(eventMsg)
if err != nil {
log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err)
neb.metrics.RecordPublishError(namespaceID)
return
}
if err := neb.nc.Publish(subject, data); err != nil {
log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err)
neb.metrics.RecordPublishError(namespaceID)
return
}
}
// Stop closes the NATS event bus and all subscriptions
func (neb *NATSEventBus) Stop() {
neb.mutex.Lock()
defer neb.mutex.Unlock()
neb.cancel()
for _, sub := range neb.subscriptions {
if err := sub.Unsubscribe(); err != nil {
log.Printf("[NATSEventBus] Error unsubscribing: %v", err)
}
}
neb.subscriptions = nil
neb.EventBus.Stop()
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
}