2 Commits

Author SHA1 Message Date
feaaec11dc chore(deps): Update actions/checkout action to v7
All checks were successful
CI / build (pull_request) Successful in 1m33s
2026-06-18 14:00:31 +00:00
b481dae0b6 feat: implement cross-node event broadcasting with NATSEventBus (#151)
All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether.

Changes:
- UpdateVersionCache method in JetStreamEventStore
- SubscribeToEventStored helper in NATSEventBus
- Integration tests for cross-node scenarios
- Example code demonstrating NATSEventBus + JetStreamEventStore

Tests: All integration tests passing.
Co-authored-by: Claude Code <noreply@anthropic.com>
Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one>
Reviewed-on: #151
2026-05-17 15:29:52 +00:00
9 changed files with 1091 additions and 362 deletions

View File

@@ -9,7 +9,7 @@ jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v7.0.0
- uses: actions/setup-go@v5 - uses: actions/setup-go@v5
with: with:
go-version: '1.23' go-version: '1.23'

View File

@@ -0,0 +1,64 @@
# Issue: Implement Actor Migration Between Cluster Nodes
## Problem
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
## Required Implementation
### 1. Rebalance Algorithm (cluster/shard.go)
Implement `ConsistentHashPlacement.RebalanceShards` to:
- Calculate new shard assignments based on active nodes
- Identify actors needing migration
- Generate migration plan with source/dest nodes
### 2. Migration Coordinator (cluster/manager.go)
Implement `handleRebalanceRequest` to:
- Accept migration plan from leader
- For each actor in plan:
1. Pause incoming messages
2. Capture actor state (replay events up to current version)
3. Serialize state
4. Send migration request to destination node
5. Wait for ack
6. Delete actor from current node
- Track migration status via `ActorMigration.Status`
### 3. Cross-Node Message Routing (cluster/distributed.go)
Implement proper routing in `SendMessage`:
- Use `GetActorNode(actorID)` to determine target node
- If remote: marshal message, send via NATS to target node
- If local: send to local runtime
- Route response back to caller if needed
## Suggested Approach
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
2. **Implement state capture** - replay events to get current state
3. **Implement state restore** - deserialize and restore actor state
4. **Implement coordinator** - manage migration phases
5. **Add error handling** - handle failed migrations, retries, cleanup
6. **Add tests** - test migration with mock NATS
## Related Files
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
- `cluster/manager.go:167` - handleMigrationRequest (empty)
- `cluster/shard.go:211` - RebalanceShards (stub)
- `cluster/distributed.go:139` - SendMessage (simplified)
- `cluster/types.go:108` - ActorMigration struct
## Acceptance Criteria
- [ ] `RebalanceShards` returns new shard map with actor assignments
- [ ] `handleRebalanceRequest` processes migration plan
- [ ] `handleMigrationRequest` accepts actor migrations
- [ ] `SendMessage` routes to correct node
- [ ] Actors can be migrated with state preserved
- [ ] Failed migrations are handled gracefully
- [ ] Integration test with multi-node cluster

View File

@@ -0,0 +1,117 @@
# Issue: Add Snapshot Support to Event Sourcing Workflow
## Problem
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
- Actors with many events must replay entire history
- No performance optimization for long-lived actors
- Snapshots exist as API but are not used
## Current State
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
- `InMemoryEventStore` has snapshot methods but no lifecycle management
## Required Implementation
### 1. Snapshot Strategy
Define when to create snapshots:
- Fixed interval (e.g., every 100 events)
- Version-based (e.g., every 50 versions)
- Hybrid: version-based with min/max bounds
### 2. State Capture
Add method to capture actor state:
```go
// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
```
### 3. Snapshot Store Extension
Extend `EventStoreWithErrors` to include snapshots:
```go
type EventStoreWithSnapshots interface {
EventStoreWithErrors
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}
```
### 4. Snapshot Workflow
Modify event retrieval to use snapshots:
```go
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
// 1. Try to get latest snapshot
snapshot, _ := store.GetLatestSnapshot(actorID)
// 2. If snapshot exists and version <= fromVersion:
// - Return events from snapshot version + 1
// 3. Else:
// - Replay all events from version 0
}
```
## Suggested Implementation
### 1. Add CaptureState to EventStore interface
In `event.go`, extend `EventStore` or create `StateStore` interface:
```go
type StateStore interface {
EventStore
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}
```
### 2. Implement CaptureState
In `store/jetstream.go`:
```go
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
// Replay events and build state (application logic needed here)
events, _ := jes.GetEvents(actorID, fromVersion)
// Need application logic to convert events to state
return state, nil
}
```
### 3. Add Snapshot Helper
Create snapshot utilities:
```go
// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
return &ActorSnapshot{
ActorID: actorID,
Version: version,
State: state,
Timestamp: time.Now(),
}
}
```
### 4. Modify GetEvents
Update `GetEvents` in both stores to use snapshots when beneficial.
## Snapshots Workflow Example
```
1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
- Get snapshot at version 1000? No (version too high)
- Replay 900->1000 events (only 100 events)
5. Request events from version 50:
- Get latest snapshot at version 1000? Yes (version > 50)
- Use snapshot as base
- Replay 1000->1000 events (none)
```
## Acceptance Criteria
- [ ] `CaptureState` method added to event store
- [ ] Snapshots created at configured intervals
- [ ] `GetEvents` uses snapshots to optimize replay
- [ ] Snapshot workflow tested with long-lived actors
- [ ] Configuration for snapshot interval/version
- [ ] Metrics: snapshot count, average replay size

View File

@@ -0,0 +1,100 @@
# Issue: Implement VM/Runtime for Actors
## Problem
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
## Required Components
### 1. VM Implementation (cluster/vm.go - new)
```go
type VirtualMachine struct {
actorID string
eventStore aether.EventStore
state map[string]interface{}
version int64
}
```
Methods needed:
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
- `Start()` - replay events to rebuild state
- `ProcessEvent(event *aether.Event)` - apply event to state
- `Stop()` - persist final state
- `GetVersion()` - current event version
### 2. Runtime Implementation (cluster/runtime.go - new)
```go
type Runtime struct {
natsConn *nats.Conn
eventStore aether.EventStore
vmRegistry VMRegistry // map[actorID]*VirtualMachine
config RuntimeConfig
}
```
Methods needed:
- `Start()` - initialize and start processing
- `LoadModel(model eventstorming.Model)` - register domain types
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
- `GetActiveVMs()` - return map of active VMs
- `CreateVM(actorID string)` - create new VM instance
- `StopVM(actorID string)` - persist and stop VM
### 3. Event Processing
- Subscribe to actor's event stream
- Replay events to build initial state
- Apply new events as they arrive
- Handle event versions and conflicts
## Suggested Design
### VM Lifecycle
```
1. Actor message arrives for actor-123
2. Runtime checks if VM exists for actor-123
3. If not, create VM:
- Replay events from event store
- Rebuild state
4. Route message to VM
5. VM processes message -> creates new events
6. Events persisted to event store
7. VM state updated
```
### State Management
- State derived from event replay
- No separate state store needed
- Can snapshot periodically for performance
- Version conflict handling using existing EventStore
## Implementation Steps
1. **Create VM struct** in `cluster/vm.go`
2. **Implement event replay** to rebuild state
3. **Create Runtime** in `cluster/runtime.go`
4. **Register Runtime with cluster** via `SetVMProvider`
5. **Implement message processing** - validate against model
6. **Add version conflict handling** using existing EventStore
7. **Write tests** - mock event store, test state transitions
## File Structure
```
cluster/
├── vm.go # VirtualMachine implementation
├── runtime.go # Runtime implementation
├── vm_test.go # VM tests
├── runtime_test.go # Runtime tests
└── integration_test.go # Integration tests
```
## Acceptance Criteria
- [ ] VM can be created with actor ID
- [ ] VM replays events to build state
- [ ] VM processes events and updates state
- [ ] VM persists current version
- [ ] Runtime can create/stop VMs
- [ ] Runtime manages VM registry
- [ ] Integration test with NATS and JetStream

View File

@@ -0,0 +1,168 @@
// Package main demonstrates cross-node event broadcasting using NATSEventBus
// and JetStreamEventStore for cluster synchronization.
//
// This example shows:
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
nc, err := nats.Connect(natsURL)
if err != nil {
log.Fatal("Failed to connect to NATS:", err)
}
defer nc.Close()
ctx := context.Background()
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
defer eventBus1.Stop()
store2, err := store.NewJetStreamEventStore(nc, "events")
if err != nil {
log.Fatal("Failed to create event store:", err)
}
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() {
time.Sleep(2 * time.Second)
actorID := "demo-actor"
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
}
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Shutting down...")
case <-done:
}
}
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
for {
select {
case <-done:
return
case <-ctx.Done():
return
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
}
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}

View File

@@ -1,353 +0,0 @@
package examples
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
//
// This pattern is suitable for scenarios where you want to automatically retry
// with exponential backoff when version conflicts occur.
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const initialBackoff = 100 * time.Millisecond
var event *aether.Event
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
log.Printf("Retry attempt %d after %v", attempt, backoff)
time.Sleep(backoff)
}
// Get the current version for the actor
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event = &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"attempt": attempt},
Timestamp: time.Now(),
}
// Attempt to save
if err := store.SaveEvent(event); err == nil {
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
return nil
} else if !errors.Is(err, aether.ErrVersionConflict) {
// Some other error occurred
return fmt.Errorf("save event failed: %w", err)
}
// If it's a version conflict, loop will retry
}
return fmt.Errorf("failed to save event after %d retries", maxRetries)
}
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
// from VersionConflictError to make intelligent retry decisions.
//
// This pattern shows how to log detailed context and potentially implement
// circuit-breaker logic based on the conflict information.
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 5
var lastConflictVersion int64
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// Create event
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"timestamp": time.Now()},
Timestamp: time.Now(),
}
// Attempt to save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// Check if it's a version conflict
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
// Not a version conflict, fail immediately
return err
}
// Extract detailed context from the conflict error
log.Printf(
"Version conflict for actor %q: attempted version %d, current version %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
// Check for thrashing (multiple conflicts with same version)
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
log.Printf("Detected version thrashing - circuit breaker would trigger here")
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
}
lastConflictVersion = versionErr.CurrentVersion
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
time.Sleep(backoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// JitterRetryPattern implements exponential backoff with jitter to prevent
// thundering herd when multiple writers retry simultaneously.
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const baseBackoff = 100 * time.Millisecond
const maxJitter = 0.1 // 10% jitter
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
// Exponential backoff with jitter
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
totalBackoff := exponentialBackoff + jitter
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
time.Sleep(totalBackoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
//
// This pattern demonstrates how application logic can use CurrentVersion to
// decide whether to retry, give up, or escalate to a higher-level handler.
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
return err
}
// Adaptive backoff based on version distance
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
if versionDistance > 10 {
// Many concurrent writers - back off more aggressively
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
} else if versionDistance > 3 {
// Moderate contention - normal backoff
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
} else {
// Light contention - minimal backoff
log.Printf("Light contention detected")
time.Sleep(50 * time.Millisecond)
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// EventualConsistencyPattern demonstrates how to handle version conflicts
// in an eventually consistent manner by publishing to a retry queue.
//
// This is useful when immediate retry is not feasible, and you want to
// defer the operation to a background worker.
type RetryQueueItem struct {
Event *aether.Event
ConflictVersion int64
ConflictAttempted int64
NextRetryTime time.Time
FailureCount int
}
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
err := store.SaveEvent(event)
if err == nil {
return
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
log.Printf("Non-retryable error: %v", err)
return
}
// Queue for retry - background worker will process this
retryItem := RetryQueueItem{
Event: event,
ConflictVersion: versionErr.CurrentVersion,
ConflictAttempted: versionErr.AttemptedVersion,
NextRetryTime: time.Now().Add(1 * time.Second),
FailureCount: 0,
}
select {
case retryQueue <- retryItem:
log.Printf("Queued event for retry: actor=%s", event.ActorID)
case <-time.After(5 * time.Second):
log.Printf("Failed to queue event for retry (queue full)")
}
}
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
//
// The circuit breaker tracks failure rates and temporarily stops retrying
// when the failure rate gets too high, allowing the system to recover.
type CircuitBreaker struct {
failureCount int
successCount int
state string // "closed", "open", "half-open"
lastFailureTime time.Time
openDuration time.Duration
failureThreshold int
successThreshold int
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
state: "closed",
openDuration: 30 * time.Second,
failureThreshold: 5,
successThreshold: 3,
}
}
func (cb *CircuitBreaker) RecordSuccess() {
if cb.state == "half-open" {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = "closed"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker closed")
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.lastFailureTime = time.Now()
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.state = "open"
log.Printf("Circuit breaker opened")
}
}
func (cb *CircuitBreaker) CanRetry() bool {
if cb.state == "closed" {
return true
}
if cb.state == "open" {
if time.Since(cb.lastFailureTime) > cb.openDuration {
cb.state = "half-open"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker half-open")
return true
}
return false
}
// half-open state allows retries
return true
}
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
if !cb.CanRetry() {
return fmt.Errorf("circuit breaker open - not retrying")
}
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
cb.RecordSuccess()
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
cb.RecordFailure()
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"strings"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@@ -24,6 +25,8 @@ type NATSEventBus struct {
subscriptions []*nats.Subscription subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node nodeID string // Unique ID for this node
streamPrefix string // NATS subject prefix for events
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
mutex sync.Mutex mutex sync.Mutex
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@@ -46,6 +49,7 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nodeID: uuid.New().String(), nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0), subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int), patternSubscribers: make(map[string]int),
streamPrefix: "aether",
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
@@ -53,6 +57,43 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil return neb, nil
} }
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
// The event store is used to automatically update version cache when EventStored events are received
// from other cluster nodes via NATS. This ensures cross-node version consistency.
//
// Example:
//
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// The namespace parameter is used as a prefix for EventStored event filtering.
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
streamPrefix := "aether"
if namespace != "" {
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
}
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: streamPrefix,
eventStore: store,
ctx: context.Background(),
cancel: func() {},
}
return neb
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern. // Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns: // Supports NATS subject patterns:
// - "*" matches a single token // - "*" matches a single token
@@ -228,3 +269,103 @@ func (neb *NATSEventBus) Stop() {
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
} }
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
// EventStored events are published by JetStreamEventStore when events are successfully saved.
// This is useful for cross-node event synchronization and version cache consistency.
//
// The returned channel receives EventStored events matching the pattern.
// The EventStored event schema:
// - EventType: "EventStored"
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// The namespacePattern supports NATS wildcards:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Example:
//
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// if event.EventType != aether.EventTypeEventStored {
// continue
// }
// actorID := event.Data["actorId"].(string)
// version, _ := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
// from all namespaces. Ensure your application handles this appropriately.
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
ch := make(chan *Event, 100)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
return
}
if eventMsg.NodeID == neb.nodeID {
return
}
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
actorID, ok := eventMsg.Event.Data["actorId"].(string)
if !ok {
return
}
version, ok := eventMsg.Event.Data["version"].(int64)
if !ok {
return
}
// Use type assertion to call UpdateVersionCache
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
es.UpdateVersionCache(actorID, version)
}
}
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
close(ch)
return ch
}
neb.subscriptions = append(neb.subscriptions, sub)
return ch
}

431
store/integration_test.go Normal file
View File

@@ -0,0 +1,431 @@
//go:build integration
package store
import (
"context"
"log"
"os"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats-server/v2/server"
)
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
opts := &server.Options{
Port: -1,
JetStream: true,
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
}
s, err := server.NewServer(opts)
if err != nil {
log.Fatal("Failed to create NATS server:", err)
}
go s.Start()
if !s.ReadyForConnections(4 * time.Second) {
log.Fatal("NATS server failed to start")
}
nc, err := nats.Connect(s.ClientURL())
if err != nil {
s.Shutdown()
log.Fatal("Failed to connect to NATS:", err)
}
return s, nc, func() {
nc.Close()
s.Shutdown()
os.RemoveAll(opts.StoreDir)
}
}
func TestUpdateVersionCache(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "test-actor-1"
tests := []struct {
name string
cachedVersion int64
newVersion int64
expectUpdate bool
expectVersion int64
}{
{
name: "update when new version is greater",
cachedVersion: 5,
newVersion: 10,
expectUpdate: true,
expectVersion: 10,
},
{
name: "do not update when new version is equal",
cachedVersion: 5,
newVersion: 5,
expectUpdate: false,
expectVersion: 5,
},
{
name: "do not update when new version is less",
cachedVersion: 10,
newVersion: 5,
expectUpdate: false,
expectVersion: 10,
},
{
name: "update when no cached version exists",
cachedVersion: 0,
newVersion: 1,
expectUpdate: true,
expectVersion: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up cached version
store.versions = make(map[string]int64)
store.versions[actorID] = tt.cachedVersion
// Call UpdateVersionCache
store.UpdateVersionCache(actorID, tt.newVersion)
// Verify result
if tt.expectUpdate {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to be updated but it wasn't cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
}
} else {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to remain cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
}
}
})
}
}
func TestUpdateVersionCache_Concurrent(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "concurrent-actor"
store.versions[actorID] = 1
const numGoroutines = 50
const maxVersion = 100
var done = make(chan struct{})
var updates int32
for i := 0; i < numGoroutines; i++ {
version := int64(1 + (i % maxVersion))
go func(v int64) {
store.UpdateVersionCache(actorID, v)
select {
case <-done:
default:
updates++
}
}(version)
}
close(done)
time.Sleep(100 * time.Millisecond)
finalVersion := store.versions[actorID]
if finalVersion > maxVersion {
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
}
}
func TestSubscribeToEventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
if eventBusWithStore == nil {
t.Fatalf("Failed to create event bus with broadcaster")
}
defer eventBusWithStore.Stop()
ch := eventBusWithStore.SubscribeToEventStored("*")
if ch == nil {
t.Fatal("SubscribeToEventStored returned nil channel")
}
actorID := "subscribe-test-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"key": "value"},
Timestamp: time.Now(),
}
eventBusWithStore.Publish("", event)
select {
case receivedEvent := <-ch:
if receivedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
data, ok := receivedEvent.Data["actorId"].(string)
if !ok || data != actorID {
t.Errorf("Expected actorId in data to be %s", actorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for EventStored event")
}
}
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "broadcast-test-actor-1"
localCh := eventBus.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"total": 99.99},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
select {
case receivedEvent := <-localCh:
if receivedEvent.EventType != "OrderPlaced" {
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for broadcast event")
}
}
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s1, nc1, cleanup1 := setupNatsServer()
defer cleanup1()
s2, nc2, cleanup2 := setupNatsServer()
defer cleanup2()
ctx := context.Background()
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
if err != nil {
t.Fatalf("Failed to create store 1: %v", err)
}
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
if err != nil {
t.Fatalf("Failed to create store 2: %v", err)
}
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
defer eventBus1.Stop()
defer eventBus2.Stop()
actorID := "multi-node-actor"
receiverCh := eventBus2.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "InventoryReserved",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"quantity": 5},
Timestamp: time.Now(),
}
eventBus1.Publish("", event)
select {
case receivedEvent := <-receiverCh:
if receivedEvent.EventType != "InventoryReserved" {
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for cross-node event")
}
}
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
if err != nil {
t.Fatalf("Failed to create tenant A store: %v", err)
}
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
if err != nil {
t.Fatalf("Failed to create tenant B store: %v", err)
}
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
defer tenantAEventBus.Stop()
defer tenantBEventBus.Stop()
tenantACh := tenantAEventBus.Subscribe("tenant-a")
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
actorID := "tenant-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TenantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"data": "tenant-a"},
Timestamp: time.Now(),
}
tenantAEventBus.Publish("tenant-a", event)
select {
case receivedEvent := <-tenantACh:
if receivedEvent.EventType != "TenantEvent" {
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
}
case <-time.After(2 * time.Second):
t.Error("Timeout waiting for tenant A to receive event")
}
select {
case <-tenantBCh:
t.Error("Tenant B should not receive tenant A's events")
case <-time.After(1 * time.Second):
// Expected - tenant B should not receive events from tenant A
}
}
func TestUpdateVersionCache_EventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "version-cache-actor"
store.UpdateVersionCache(actorID, 5)
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 10,
Data: map[string]interface{}{"test": true},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
time.Sleep(100 * time.Millisecond)
storedVersion, err := store.GetLatestVersion(actorID)
if err != nil {
t.Fatalf("Failed to get latest version: %v", err)
}
if storedVersion != 10 {
t.Errorf("Expected version 10, got %d", storedVersion)
}
cacheVersion, ok := store.GetCachedVersion(actorID)
if !ok {
t.Error("Expected version to be in cache")
} else if cacheVersion != 10 {
t.Errorf("Expected cached version 10, got %d", cacheVersion)
}
}

View File

@@ -1,6 +1,7 @@
package store package store
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
@@ -286,6 +287,28 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// publishEventStored publishes an EventStored event to the broadcaster. // publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers. // This is called after a successful SaveEvent to notify subscribers.
//
// EventStored Event Schema:
// - EventType: "EventStored" (aether.EventTypeEventStored)
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// Example usage with NATSEventBus:
//
// eventBus := aether.NewNATSEventBus(natsConn)
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
// ch := eventBus.SubscribeToEventStored("*")
//
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) { func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{ eventStored := &aether.Event{
ID: uuid.New().String(), ID: uuid.New().String(),
@@ -558,5 +581,43 @@ func sanitizeSubject(s string) string {
return s return s
} }
// UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes.
//
// Only updates if the new version is greater than the cached version to prevent
// stale cache entries from causing version conflicts.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock()
defer jes.mu.Unlock()
// Only update if the new version is greater than cached version
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
jes.versions[actorID] = version
}
}
// GetCachedVersion returns the cached version for an actor, if available.
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
jes.mu.Lock()
defer jes.mu.Unlock()
version, ok := jes.versions[actorID]
return version, ok
}
// SetBroadcaster sets the event broadcaster for this store.
// The broadcaster is used to publish EventStored events when events are saved.
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
jes.mu.Lock()
defer jes.mu.Unlock()
jes.broadcaster = broadcaster
}
// Close closes the JetStream event store and cleans up resources.
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
return nil
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors // Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)