All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether. Changes: - UpdateVersionCache method in JetStreamEventStore - SubscribeToEventStored helper in NATSEventBus - Integration tests for cross-node scenarios - Example code demonstrating NATSEventBus + JetStreamEventStore Tests: All integration tests passing. Co-authored-by: Claude Code <noreply@anthropic.com> Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one> Reviewed-on: #151
372 lines
12 KiB
Go
372 lines
12 KiB
Go
package aether
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS.
|
|
// Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards.
|
|
//
|
|
// Security Considerations:
|
|
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
|
|
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
|
|
// are in place at the application layer before granting wildcard subscription access.
|
|
type NATSEventBus struct {
|
|
*EventBus // Embed base EventBus for local subscriptions
|
|
nc *nats.Conn // NATS connection
|
|
subscriptions []*nats.Subscription
|
|
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
|
|
nodeID string // Unique ID for this node
|
|
streamPrefix string // NATS subject prefix for events
|
|
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
|
|
mutex sync.Mutex
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// eventMessage is the wire format for events sent over NATS
|
|
type eventMessage struct {
|
|
NodeID string `json:"node_id"`
|
|
NamespaceID string `json:"namespace_id"`
|
|
Event *Event `json:"event"`
|
|
}
|
|
|
|
// NewNATSEventBus creates a new NATS-backed event bus
|
|
func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
neb := &NATSEventBus{
|
|
EventBus: NewEventBus(),
|
|
nc: nc,
|
|
nodeID: uuid.New().String(),
|
|
subscriptions: make([]*nats.Subscription, 0),
|
|
patternSubscribers: make(map[string]int),
|
|
streamPrefix: "aether",
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
return neb, nil
|
|
}
|
|
|
|
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
|
|
// The event store is used to automatically update version cache when EventStored events are received
|
|
// from other cluster nodes via NATS. This ensures cross-node version consistency.
|
|
//
|
|
// Example:
|
|
//
|
|
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
|
|
// ch := eventBus.SubscribeToEventStored("tenant-*")
|
|
// for event := range ch {
|
|
// actorID := event.Data["actorId"].(string)
|
|
// version := event.Data["version"].(int64)
|
|
// store.UpdateVersionCache(actorID, version)
|
|
// }
|
|
//
|
|
// The namespace parameter is used as a prefix for EventStored event filtering.
|
|
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
|
|
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
|
|
streamPrefix := "aether"
|
|
if namespace != "" {
|
|
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
|
|
}
|
|
|
|
neb := &NATSEventBus{
|
|
EventBus: NewEventBus(),
|
|
nc: nc,
|
|
nodeID: uuid.New().String(),
|
|
subscriptions: make([]*nats.Subscription, 0),
|
|
patternSubscribers: make(map[string]int),
|
|
streamPrefix: streamPrefix,
|
|
eventStore: store,
|
|
ctx: context.Background(),
|
|
cancel: func() {},
|
|
}
|
|
|
|
return neb
|
|
}
|
|
|
|
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
|
|
// Supports NATS subject patterns:
|
|
// - "*" matches a single token
|
|
// - ">" matches one or more tokens (only at the end)
|
|
//
|
|
// Security Warning: Wildcard patterns receive events from all matching namespaces,
|
|
// bypassing namespace isolation. Only use for trusted system components.
|
|
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
|
|
return neb.SubscribeWithFilter(namespacePattern, nil)
|
|
}
|
|
|
|
// SubscribeWithFilter creates a filtered subscription channel for a namespace pattern.
|
|
// Events are filtered by the provided SubscriptionFilter before delivery.
|
|
// If filter is nil or empty, all events matching the namespace pattern are delivered.
|
|
//
|
|
// For NATSEventBus:
|
|
// - Namespace pattern filtering is applied at the NATS level using native wildcards
|
|
// - EventTypes and ActorPattern filters are applied client-side after receiving messages
|
|
//
|
|
// This allows efficient server-side filtering for namespaces while providing
|
|
// flexible client-side filtering for event types and actors.
|
|
func (neb *NATSEventBus) SubscribeWithFilter(namespacePattern string, filter *SubscriptionFilter) <-chan *Event {
|
|
neb.mutex.Lock()
|
|
defer neb.mutex.Unlock()
|
|
|
|
// Create local subscription first (with filter)
|
|
ch := neb.EventBus.SubscribeWithFilter(namespacePattern, filter)
|
|
|
|
// Check if this is the first subscriber for this pattern
|
|
count := neb.patternSubscribers[namespacePattern]
|
|
if count == 0 {
|
|
// First subscriber - create NATS subscription
|
|
// NATS natively supports wildcards, so we can use the pattern directly
|
|
subject := fmt.Sprintf("aether.events.%s", namespacePattern)
|
|
|
|
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
neb.handleNATSEvent(msg, namespacePattern)
|
|
})
|
|
if err != nil {
|
|
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
|
|
// Record subscription error
|
|
neb.metrics.RecordSubscribeError(namespacePattern)
|
|
} else {
|
|
neb.subscriptions = append(neb.subscriptions, sub)
|
|
if IsWildcardPattern(namespacePattern) {
|
|
log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject)
|
|
} else {
|
|
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
|
|
}
|
|
}
|
|
}
|
|
|
|
neb.patternSubscribers[namespacePattern] = count + 1
|
|
|
|
return ch
|
|
}
|
|
|
|
// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers
|
|
func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
|
neb.mutex.Lock()
|
|
defer neb.mutex.Unlock()
|
|
|
|
neb.EventBus.Unsubscribe(namespacePattern, ch)
|
|
|
|
count := neb.patternSubscribers[namespacePattern]
|
|
if count > 0 {
|
|
count--
|
|
neb.patternSubscribers[namespacePattern] = count
|
|
|
|
if count == 0 {
|
|
delete(neb.patternSubscribers, namespacePattern)
|
|
log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleNATSEvent processes events received from NATS
|
|
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) {
|
|
var eventMsg eventMessage
|
|
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
|
log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err)
|
|
return
|
|
}
|
|
|
|
// Skip events that originated from this node (already delivered locally)
|
|
if eventMsg.NodeID == neb.nodeID {
|
|
return
|
|
}
|
|
|
|
// For wildcard subscriptions, we need to deliver to the EventBus using
|
|
// the subscribed pattern so it reaches the correct wildcard subscriber.
|
|
// For exact subscriptions, use the actual namespace.
|
|
if IsWildcardPattern(subscribedPattern) {
|
|
// Deliver using the pattern - the EventBus will route to wildcard subscribers
|
|
neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event)
|
|
} else {
|
|
// Forward to local EventBus subscribers with actual namespace
|
|
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
|
}
|
|
}
|
|
|
|
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
|
|
// Applies filters before delivery.
|
|
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
|
|
neb.EventBus.mutex.RLock()
|
|
defer neb.EventBus.mutex.RUnlock()
|
|
|
|
for _, sub := range neb.EventBus.wildcardSubscribers {
|
|
if sub.pattern == pattern {
|
|
// Apply filter if present
|
|
if sub.filter != nil && !sub.filter.IsEmpty() {
|
|
if !sub.filter.Matches(event) {
|
|
// Event doesn't match filter, skip delivery
|
|
continue
|
|
}
|
|
}
|
|
|
|
select {
|
|
case sub.ch <- event:
|
|
// Event delivered from NATS
|
|
neb.metrics.RecordReceive(pattern)
|
|
default:
|
|
// Channel full, skip this subscriber (non-blocking)
|
|
neb.metrics.RecordDroppedEvent(pattern)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Publish publishes an event both locally and to NATS for cross-node broadcasting
|
|
func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
|
|
// First publish locally
|
|
neb.EventBus.Publish(namespaceID, event)
|
|
|
|
// Then publish to NATS for other nodes
|
|
subject := fmt.Sprintf("aether.events.%s", namespaceID)
|
|
|
|
eventMsg := eventMessage{
|
|
NodeID: neb.nodeID,
|
|
NamespaceID: namespaceID,
|
|
Event: event,
|
|
}
|
|
|
|
data, err := json.Marshal(eventMsg)
|
|
if err != nil {
|
|
log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err)
|
|
neb.metrics.RecordPublishError(namespaceID)
|
|
return
|
|
}
|
|
|
|
if err := neb.nc.Publish(subject, data); err != nil {
|
|
log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err)
|
|
neb.metrics.RecordPublishError(namespaceID)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Stop closes the NATS event bus and all subscriptions
|
|
func (neb *NATSEventBus) Stop() {
|
|
neb.mutex.Lock()
|
|
defer neb.mutex.Unlock()
|
|
|
|
neb.cancel()
|
|
|
|
for _, sub := range neb.subscriptions {
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
log.Printf("[NATSEventBus] Error unsubscribing: %v", err)
|
|
}
|
|
}
|
|
neb.subscriptions = nil
|
|
|
|
neb.EventBus.Stop()
|
|
|
|
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
|
|
}
|
|
|
|
// sanitizeSubject sanitizes a string for use in NATS subjects
|
|
func sanitizeSubject(s string) string {
|
|
s = strings.ReplaceAll(s, " ", "_")
|
|
s = strings.ReplaceAll(s, ".", "_")
|
|
s = strings.ReplaceAll(s, "*", "_")
|
|
s = strings.ReplaceAll(s, ">", "_")
|
|
return s
|
|
}
|
|
|
|
// extractActorType extracts the actor type from an actor ID
|
|
func extractActorType(actorID string) string {
|
|
for i, c := range actorID {
|
|
if c == '-' && i > 0 {
|
|
return actorID[:i]
|
|
}
|
|
}
|
|
return "unknown"
|
|
}
|
|
|
|
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
|
|
// EventStored events are published by JetStreamEventStore when events are successfully saved.
|
|
// This is useful for cross-node event synchronization and version cache consistency.
|
|
//
|
|
// The returned channel receives EventStored events matching the pattern.
|
|
// The EventStored event schema:
|
|
// - EventType: "EventStored"
|
|
// - ActorID: ID of the actor that the original event was about
|
|
// - Version: version of the stored event
|
|
// - Data:
|
|
// - eventId: (string) ID of the stored event
|
|
// - actorId: (string) ID of the actor
|
|
// - version: (int64) version of the event
|
|
// - timestamp: (int64) Unix timestamp of when the event was stored
|
|
//
|
|
// The namespacePattern supports NATS wildcards:
|
|
// - "*" matches a single token
|
|
// - ">" matches one or more tokens (only at the end)
|
|
//
|
|
// Example:
|
|
//
|
|
// ch := eventBus.SubscribeToEventStored("tenant-*")
|
|
// for event := range ch {
|
|
// if event.EventType != aether.EventTypeEventStored {
|
|
// continue
|
|
// }
|
|
// actorID := event.Data["actorId"].(string)
|
|
// version, _ := event.Data["version"].(int64)
|
|
// store.UpdateVersionCache(actorID, version)
|
|
// }
|
|
//
|
|
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
|
|
// from all namespaces. Ensure your application handles this appropriately.
|
|
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
|
|
neb.mutex.Lock()
|
|
defer neb.mutex.Unlock()
|
|
|
|
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
|
|
|
|
ch := make(chan *Event, 100)
|
|
|
|
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
|
var eventMsg eventMessage
|
|
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
|
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
|
|
return
|
|
}
|
|
|
|
if eventMsg.NodeID == neb.nodeID {
|
|
return
|
|
}
|
|
|
|
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
|
|
actorID, ok := eventMsg.Event.Data["actorId"].(string)
|
|
if !ok {
|
|
return
|
|
}
|
|
version, ok := eventMsg.Event.Data["version"].(int64)
|
|
if !ok {
|
|
return
|
|
}
|
|
// Use type assertion to call UpdateVersionCache
|
|
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
|
|
es.UpdateVersionCache(actorID, version)
|
|
}
|
|
}
|
|
|
|
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
|
})
|
|
|
|
if err != nil {
|
|
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
neb.subscriptions = append(neb.subscriptions, sub)
|
|
|
|
return ch
|
|
}
|