Add wildcard namespace subscriptions
Support NATS-style wildcard patterns ("*" and ">") for subscribing
to events across multiple namespaces. This enables cross-cutting
concerns like logging, monitoring, and auditing without requiring
separate subscriptions for each namespace.
- Add pattern.go with MatchNamespacePattern and IsWildcardPattern
- Update EventBus to track wildcard subscribers separately
- Update NATSEventBus to use NATS native wildcard support
- Add comprehensive tests for pattern matching and EventBus wildcards
- Document security implications in all relevant code comments
Closes #20
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit was merged in pull request #52.
This commit is contained in:
100
nats_eventbus.go
100
nats_eventbus.go
@@ -11,12 +11,18 @@ import (
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS
|
||||
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS.
|
||||
// Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards.
|
||||
//
|
||||
// Security Considerations:
|
||||
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
|
||||
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
|
||||
// are in place at the application layer before granting wildcard subscription access.
|
||||
type NATSEventBus struct {
|
||||
*EventBus // Embed base EventBus for local subscriptions
|
||||
nc *nats.Conn // NATS connection
|
||||
*EventBus // Embed base EventBus for local subscriptions
|
||||
nc *nats.Conn // NATS connection
|
||||
subscriptions []*nats.Subscription
|
||||
namespaceSubscribers map[string]int // Track number of subscribers per namespace
|
||||
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
|
||||
nodeID string // Unique ID for this node
|
||||
mutex sync.Mutex
|
||||
ctx context.Context
|
||||
@@ -35,69 +41,80 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
neb := &NATSEventBus{
|
||||
EventBus: NewEventBus(),
|
||||
nc: nc,
|
||||
nodeID: uuid.New().String(),
|
||||
subscriptions: make([]*nats.Subscription, 0),
|
||||
namespaceSubscribers: make(map[string]int),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
EventBus: NewEventBus(),
|
||||
nc: nc,
|
||||
nodeID: uuid.New().String(),
|
||||
subscriptions: make([]*nats.Subscription, 0),
|
||||
patternSubscribers: make(map[string]int),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
return neb, nil
|
||||
}
|
||||
|
||||
// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace
|
||||
func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event {
|
||||
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
|
||||
// Supports NATS subject patterns:
|
||||
// - "*" matches a single token
|
||||
// - ">" matches one or more tokens (only at the end)
|
||||
//
|
||||
// Security Warning: Wildcard patterns receive events from all matching namespaces,
|
||||
// bypassing namespace isolation. Only use for trusted system components.
|
||||
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
|
||||
neb.mutex.Lock()
|
||||
defer neb.mutex.Unlock()
|
||||
|
||||
// Create local subscription first
|
||||
ch := neb.EventBus.Subscribe(namespaceID)
|
||||
ch := neb.EventBus.Subscribe(namespacePattern)
|
||||
|
||||
// Check if this is the first subscriber for this namespace
|
||||
count := neb.namespaceSubscribers[namespaceID]
|
||||
// Check if this is the first subscriber for this pattern
|
||||
count := neb.patternSubscribers[namespacePattern]
|
||||
if count == 0 {
|
||||
// First subscriber - create NATS subscription
|
||||
subject := fmt.Sprintf("aether.events.%s", namespaceID)
|
||||
// NATS natively supports wildcards, so we can use the pattern directly
|
||||
subject := fmt.Sprintf("aether.events.%s", namespacePattern)
|
||||
|
||||
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||
neb.handleNATSEvent(msg)
|
||||
neb.handleNATSEvent(msg, namespacePattern)
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
|
||||
} else {
|
||||
neb.subscriptions = append(neb.subscriptions, sub)
|
||||
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
|
||||
if IsWildcardPattern(namespacePattern) {
|
||||
log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject)
|
||||
} else {
|
||||
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
neb.namespaceSubscribers[namespaceID] = count + 1
|
||||
neb.patternSubscribers[namespacePattern] = count + 1
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers
|
||||
func (neb *NATSEventBus) Unsubscribe(namespaceID string, ch <-chan *Event) {
|
||||
func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
||||
neb.mutex.Lock()
|
||||
defer neb.mutex.Unlock()
|
||||
|
||||
neb.EventBus.Unsubscribe(namespaceID, ch)
|
||||
neb.EventBus.Unsubscribe(namespacePattern, ch)
|
||||
|
||||
count := neb.namespaceSubscribers[namespaceID]
|
||||
count := neb.patternSubscribers[namespacePattern]
|
||||
if count > 0 {
|
||||
count--
|
||||
neb.namespaceSubscribers[namespaceID] = count
|
||||
neb.patternSubscribers[namespacePattern] = count
|
||||
|
||||
if count == 0 {
|
||||
delete(neb.namespaceSubscribers, namespaceID)
|
||||
log.Printf("[NATSEventBus] No more subscribers for namespace %s on node %s", namespaceID, neb.nodeID)
|
||||
delete(neb.patternSubscribers, namespacePattern)
|
||||
log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleNATSEvent processes events received from NATS
|
||||
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) {
|
||||
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) {
|
||||
var eventMsg eventMessage
|
||||
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
||||
log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err)
|
||||
@@ -109,8 +126,33 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) {
|
||||
return
|
||||
}
|
||||
|
||||
// Forward to local EventBus subscribers
|
||||
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
||||
// For wildcard subscriptions, we need to deliver to the EventBus using
|
||||
// the subscribed pattern so it reaches the correct wildcard subscriber.
|
||||
// For exact subscriptions, use the actual namespace.
|
||||
if IsWildcardPattern(subscribedPattern) {
|
||||
// Deliver using the pattern - the EventBus will route to wildcard subscribers
|
||||
neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event)
|
||||
} else {
|
||||
// Forward to local EventBus subscribers with actual namespace
|
||||
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
||||
}
|
||||
}
|
||||
|
||||
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
|
||||
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
|
||||
neb.EventBus.mutex.RLock()
|
||||
defer neb.EventBus.mutex.RUnlock()
|
||||
|
||||
for _, sub := range neb.EventBus.wildcardSubscribers {
|
||||
if sub.pattern == pattern {
|
||||
select {
|
||||
case sub.ch <- event:
|
||||
// Event delivered
|
||||
default:
|
||||
// Channel full, skip this subscriber (non-blocking)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Publish publishes an event both locally and to NATS for cross-node broadcasting
|
||||
|
||||
Reference in New Issue
Block a user