1 Commits

Author SHA1 Message Date
Hugo Nijhuis
5c01911e3c feat: implement cross-node event broadcasting with NATSEventBus
All checks were successful
CI / build (pull_request) Successful in 20s
- Add UpdateVersionCache method to JetStreamEventStore for cache synchronization
- Add SubscribeToEventStored convenience helper to NATSEventBus
- Create integration tests for cross-node broadcasting scenarios
- Add example demonstrating NATSEventBus + JetStreamEventStore integration
2026-05-17 14:07:43 +02:00
11 changed files with 1081 additions and 1000 deletions

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/setup-go@v6 - uses: actions/setup-go@v5
with: with:
go-version: '1.23' go-version: '1.23'
- name: Build - name: Build

View File

@@ -1,64 +0,0 @@
# Issue: Implement Actor Migration Between Cluster Nodes
## Problem
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
## Required Implementation
### 1. Rebalance Algorithm (cluster/shard.go)
Implement `ConsistentHashPlacement.RebalanceShards` to:
- Calculate new shard assignments based on active nodes
- Identify actors needing migration
- Generate migration plan with source/dest nodes
### 2. Migration Coordinator (cluster/manager.go)
Implement `handleRebalanceRequest` to:
- Accept migration plan from leader
- For each actor in plan:
1. Pause incoming messages
2. Capture actor state (replay events up to current version)
3. Serialize state
4. Send migration request to destination node
5. Wait for ack
6. Delete actor from current node
- Track migration status via `ActorMigration.Status`
### 3. Cross-Node Message Routing (cluster/distributed.go)
Implement proper routing in `SendMessage`:
- Use `GetActorNode(actorID)` to determine target node
- If remote: marshal message, send via NATS to target node
- If local: send to local runtime
- Route response back to caller if needed
## Suggested Approach
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
2. **Implement state capture** - replay events to get current state
3. **Implement state restore** - deserialize and restore actor state
4. **Implement coordinator** - manage migration phases
5. **Add error handling** - handle failed migrations, retries, cleanup
6. **Add tests** - test migration with mock NATS
## Related Files
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
- `cluster/manager.go:167` - handleMigrationRequest (empty)
- `cluster/shard.go:211` - RebalanceShards (stub)
- `cluster/distributed.go:139` - SendMessage (simplified)
- `cluster/types.go:108` - ActorMigration struct
## Acceptance Criteria
- [ ] `RebalanceShards` returns new shard map with actor assignments
- [ ] `handleRebalanceRequest` processes migration plan
- [ ] `handleMigrationRequest` accepts actor migrations
- [ ] `SendMessage` routes to correct node
- [ ] Actors can be migrated with state preserved
- [ ] Failed migrations are handled gracefully
- [ ] Integration test with multi-node cluster

View File

@@ -1,117 +0,0 @@
# Issue: Add Snapshot Support to Event Sourcing Workflow
## Problem
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
- Actors with many events must replay entire history
- No performance optimization for long-lived actors
- Snapshots exist as API but are not used
## Current State
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
- `InMemoryEventStore` has snapshot methods but no lifecycle management
## Required Implementation
### 1. Snapshot Strategy
Define when to create snapshots:
- Fixed interval (e.g., every 100 events)
- Version-based (e.g., every 50 versions)
- Hybrid: version-based with min/max bounds
### 2. State Capture
Add method to capture actor state:
```go
// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
```
### 3. Snapshot Store Extension
Extend `EventStoreWithErrors` to include snapshots:
```go
type EventStoreWithSnapshots interface {
EventStoreWithErrors
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}
```
### 4. Snapshot Workflow
Modify event retrieval to use snapshots:
```go
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
// 1. Try to get latest snapshot
snapshot, _ := store.GetLatestSnapshot(actorID)
// 2. If snapshot exists and version <= fromVersion:
// - Return events from snapshot version + 1
// 3. Else:
// - Replay all events from version 0
}
```
## Suggested Implementation
### 1. Add CaptureState to EventStore interface
In `event.go`, extend `EventStore` or create `StateStore` interface:
```go
type StateStore interface {
EventStore
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}
```
### 2. Implement CaptureState
In `store/jetstream.go`:
```go
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
// Replay events and build state (application logic needed here)
events, _ := jes.GetEvents(actorID, fromVersion)
// Need application logic to convert events to state
return state, nil
}
```
### 3. Add Snapshot Helper
Create snapshot utilities:
```go
// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
return &ActorSnapshot{
ActorID: actorID,
Version: version,
State: state,
Timestamp: time.Now(),
}
}
```
### 4. Modify GetEvents
Update `GetEvents` in both stores to use snapshots when beneficial.
## Snapshots Workflow Example
```
1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
- Get snapshot at version 1000? No (version too high)
- Replay 900->1000 events (only 100 events)
5. Request events from version 50:
- Get latest snapshot at version 1000? Yes (version > 50)
- Use snapshot as base
- Replay 1000->1000 events (none)
```
## Acceptance Criteria
- [ ] `CaptureState` method added to event store
- [ ] Snapshots created at configured intervals
- [ ] `GetEvents` uses snapshots to optimize replay
- [ ] Snapshot workflow tested with long-lived actors
- [ ] Configuration for snapshot interval/version
- [ ] Metrics: snapshot count, average replay size

View File

@@ -1,100 +0,0 @@
# Issue: Implement VM/Runtime for Actors
## Problem
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
## Required Components
### 1. VM Implementation (cluster/vm.go - new)
```go
type VirtualMachine struct {
actorID string
eventStore aether.EventStore
state map[string]interface{}
version int64
}
```
Methods needed:
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
- `Start()` - replay events to rebuild state
- `ProcessEvent(event *aether.Event)` - apply event to state
- `Stop()` - persist final state
- `GetVersion()` - current event version
### 2. Runtime Implementation (cluster/runtime.go - new)
```go
type Runtime struct {
natsConn *nats.Conn
eventStore aether.EventStore
vmRegistry VMRegistry // map[actorID]*VirtualMachine
config RuntimeConfig
}
```
Methods needed:
- `Start()` - initialize and start processing
- `LoadModel(model eventstorming.Model)` - register domain types
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
- `GetActiveVMs()` - return map of active VMs
- `CreateVM(actorID string)` - create new VM instance
- `StopVM(actorID string)` - persist and stop VM
### 3. Event Processing
- Subscribe to actor's event stream
- Replay events to build initial state
- Apply new events as they arrive
- Handle event versions and conflicts
## Suggested Design
### VM Lifecycle
```
1. Actor message arrives for actor-123
2. Runtime checks if VM exists for actor-123
3. If not, create VM:
- Replay events from event store
- Rebuild state
4. Route message to VM
5. VM processes message -> creates new events
6. Events persisted to event store
7. VM state updated
```
### State Management
- State derived from event replay
- No separate state store needed
- Can snapshot periodically for performance
- Version conflict handling using existing EventStore
## Implementation Steps
1. **Create VM struct** in `cluster/vm.go`
2. **Implement event replay** to rebuild state
3. **Create Runtime** in `cluster/runtime.go`
4. **Register Runtime with cluster** via `SetVMProvider`
5. **Implement message processing** - validate against model
6. **Add version conflict handling** using existing EventStore
7. **Write tests** - mock event store, test state transitions
## File Structure
```
cluster/
├── vm.go # VirtualMachine implementation
├── runtime.go # Runtime implementation
├── vm_test.go # VM tests
├── runtime_test.go # Runtime tests
└── integration_test.go # Integration tests
```
## Acceptance Criteria
- [ ] VM can be created with actor ID
- [ ] VM replays events to build state
- [ ] VM processes events and updates state
- [ ] VM persists current version
- [ ] Runtime can create/stop VMs
- [ ] Runtime manages VM registry
- [ ] Integration test with NATS and JetStream

View File

@@ -1,168 +1,317 @@
// Package main demonstrates cross-node event broadcasting using NATSEventBus //go:build integration
// and JetStreamEventStore for cluster synchronization. // +build integration
//
// This example shows: package examples
// 1. Setting up NATSEventBus with JetStreamEventStore
// 2. Broadcasting events across NATS for cross-node distribution
// 3. Subscribing to EventStored events for version cache synchronization
// 4. Properly handling EventStored events from other cluster nodes
//
// Prerequisites:
// - NATS server running with JetStream enabled (nats-server -js)
// - Events stream created in JetStream
package main
import ( import (
"context" "context"
"fmt"
"log" "log"
"os"
"os/signal"
"syscall"
"time" "time"
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
"git.flowmade.one/flowmade-one/aether/store" "git.flowmade.one/flowmade-one/aether/store"
"github.com/google/uuid"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
func main() { // CrossNodeBroadcasting demonstrates how to implement cross-node event broadcasting
natsURL := getEnv("NATS_URL", "nats://localhost:4222") // using NATSEventBus with JetStreamEventStore. This example shows how events persist
// to JetStream and are then broadcast to other nodes in the cluster via NATS.
nc, err := nats.Connect(natsURL) //
// Key Concepts:
// 1. NATSEventBus wraps EventBus to add NATS publishing
// 2. JetStreamEventStore with broadcaster publishes EventStored to NATS
// 3. Other nodes receive these events via NATS subscription
// 4. Version cache is updated on remote events to maintain consistency
//
// Usage:
// go run examples/cross_node_broadcasting.go
func CrossNodeBroadcastingExample() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil { if err != nil {
log.Fatal("Failed to connect to NATS:", err) log.Fatalf("Failed to connect to NATS: %v", err)
} }
defer nc.Close() defer nc.Close()
ctx := context.Background() // Create NATS event bus (this will broadcast to all nodes)
natsBus, err := aether.NewNATSEventBus(nc)
store1, err := store.NewJetStreamEventStore(nc, "events")
if err != nil { if err != nil {
log.Fatal("Failed to create event store:", err) log.Fatalf("Failed to create NATS event bus: %v", err)
}
defer natsBus.Stop()
// Create JetStream event store WITH broadcaster
// This enables EventStored events to be published to NATS
store, err := store.NewJetStreamEventStoreWithBroadcaster(
nc,
"events",
natsBus,
"tenant-abc", // Optional namespace for isolation
)
if err != nil {
log.Fatalf("Failed to create event store: %v", err)
} }
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "") // Subscribe to EventStored events to update version cache
defer eventBus1.Stop() // This keeps the version cache synchronized across nodes
eventStoredCh := natsBus.SubscribeWithFilter(
store2, err := store.NewJetStreamEventStore(nc, "events") "tenant-abc",
if err != nil { &aether.SubscriptionFilter{
log.Fatal("Failed to create event store:", err) EventTypes: []string{aether.EventTypeEventStored},
} },
)
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
defer eventBus2.Stop()
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
done := make(chan struct{})
go processEvents(ctx, eventStoredCh1, store1, done)
go processEvents(ctx, eventStoredCh2, store2, done)
go func() { go func() {
time.Sleep(2 * time.Second) for event := range eventStoredCh {
actorID := event.Data["actorId"].(string)
actorID := "demo-actor" version := int64(event.Data["version"].(float64))
store.UpdateVersionCache(actorID, version)
event1 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{
"total": 99.99,
"status": "pending",
},
Timestamp: time.Now(),
} }
log.Printf("Node 1 publishing event: %s", event1.EventType)
eventBus1.Publish("", event1)
time.Sleep(500 * time.Millisecond)
event2 := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPaid",
ActorID: actorID,
Version: 2,
Data: map[string]interface{}{
"total": 99.99,
"status": "paid",
"method": "credit_card",
},
Timestamp: time.Now(),
}
log.Printf("Node 2 publishing event: %s", event2.EventType)
eventBus2.Publish("", event2)
time.Sleep(2 * time.Second)
close(done)
log.Println("Cross-node broadcasting demo complete")
}() }()
sigCh := make(chan os.Signal, 1) // Now save an event - it will be:
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) // 1. Persisted to JetStream
// 2. Published to NATS as EventStored
// 3. Received by other nodes via NATS
// 4. Used to update version cache
select { event := &aether.Event{
case <-sigCh: ID: "event-1",
log.Println("Shutting down...") EventType: "OrderPlaced",
case <-done: ActorID: "order-123",
} Version: 1,
Data: map[string]interface{}{
"total": 100.00,
"item": "widget",
},
Timestamp: time.Now(),
} }
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) { err = store.SaveEvent(event)
for { if err != nil {
log.Fatalf("Failed to save event: %v", err)
}
log.Println("Event saved to JetStream and broadcast to NATS")
// Other nodes in the cluster will receive this event via NATS
// and update their version cache accordingly
// Subscribe to events in this namespace
eventCh := natsBus.Subscribe("tenant-abc")
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select { select {
case <-done: case receivedEvent := <-eventCh:
return log.Printf("Received event via NATS: %s (version %d)",
receivedEvent.EventType, receivedEvent.Version)
case <-ctx.Done(): case <-ctx.Done():
return log.Println("Timeout waiting for event")
case event, ok := <-eventStoredCh:
if !ok {
return
}
if event == nil {
continue
}
if event.EventType != aether.EventTypeEventStored {
continue
}
actorID, ok := event.Data["actorId"].(string)
if !ok {
log.Printf("Warning: EventStored missing actorId")
continue
}
version, ok := event.Data["version"].(int64)
if !ok {
log.Printf("Warning: EventStored missing version")
continue
}
eventID, _ := event.Data["eventId"].(string)
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
eventStore.UpdateVersionCache(actorID, version)
currentVersion, _ := eventStore.GetLatestVersion(actorID)
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
}
} }
} }
func getEnv(key, defaultValue string) string { // CrossNodeBroadcastingMultiNode demonstrates a multi-node cluster setup.
if value := os.Getenv(key); value != "" { // This simulates multiple nodes connecting to the same NATS cluster.
return value // In production, these would be separate processes/machines.
//
// This example requires a running NATS server with JetStream enabled.
func CrossNodeBroadcastingMultiNode() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
} }
return defaultValue defer nc.Close()
nodeID := "node-1"
log.Printf("Starting %s", nodeID)
// Each node creates its own NATS event bus and event store
natsBus, err := aether.NewNATSEventBus(nc)
if err != nil {
log.Fatalf("Failed to create NATS event bus: %v", err)
}
defer natsBus.Stop()
store, err := store.NewJetStreamEventStoreWithBroadcaster(
nc,
"events",
natsBus,
"tenant-abc",
)
if err != nil {
log.Fatalf("Failed to create event store: %v", err)
}
// Setup EventStored subscription for cache synchronization
eventStoredCh := natsBus.SubscribeWithFilter(
"tenant-abc",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
store.UpdateVersionCache(actorID, version)
log.Printf("[%s] Received EventStored for %s v%d", nodeID, actorID, version)
}
}()
// Subscribe to actual events
eventCh := natsBus.Subscribe("tenant-abc")
log.Printf("[%s] Subscribed to tenant-abc", nodeID)
// Save an event
savedEvent := &aether.Event{
ID: fmt.Sprintf("event-%s-1", nodeID),
EventType: "OrderPlaced",
ActorID: "order-123",
Version: 1,
Data: map[string]interface{}{
"node": nodeID,
"total": 100.00,
},
Timestamp: time.Now(),
}
err = store.SaveEvent(savedEvent)
if err != nil {
log.Fatalf("[%s] Failed to save event: %v", nodeID, err)
}
log.Printf("[%s] Saved event to JetStream", nodeID)
// Wait to receive the event (either from local or remote)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case receivedEvent := <-eventCh:
log.Printf("[%s] Received: %s v%d from %s",
nodeID,
receivedEvent.EventType,
receivedEvent.Version,
receivedEvent.GetCorrelationID(),
)
case <-ctx.Done():
log.Printf("[%s] Timeout waiting for event", nodeID)
}
}
// DistributedOrderProcessing demonstrates a realistic scenario where
// multiple nodes process orders for the same actor. This shows how
// cross-node broadcasting ensures consistency.
func DistributedOrderProcessing() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
// Node 1: Order creation
node1Bus, _ := aether.NewNATSEventBus(nc)
node1Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
nc, "events", node1Bus, "orders",
)
defer node1Bus.Stop()
// Setup EventStored subscription for Node 1
eventStoredCh1 := node1Bus.SubscribeWithFilter(
"orders",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh1 {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
node1Store.UpdateVersionCache(actorID, version)
}
}()
// Node 2: Order processing (different node)
node2Bus, _ := aether.NewNATSEventBus(nc)
node2Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
nc, "events", node2Bus, "orders",
)
defer node2Bus.Stop()
// Setup EventStored subscription for Node 2
eventStoredCh2 := node2Bus.SubscribeWithFilter(
"orders",
&aether.SubscriptionFilter{
EventTypes: []string{aether.EventTypeEventStored},
},
)
go func() {
for event := range eventStoredCh2 {
actorID := event.Data["actorId"].(string)
version := int64(event.Data["version"].(float64))
node2Store.UpdateVersionCache(actorID, version)
}
}()
// Node 2 also subscribes to events to receive updates
node2EventCh := node2Bus.Subscribe("orders")
// Node 1 creates an order
orderPlaced := &aether.Event{
ID: "order-created",
EventType: "OrderPlaced",
ActorID: "order-456",
Version: 1,
Data: map[string]interface{}{
"total": 100.00,
},
Timestamp: time.Now(),
}
if err := node1Store.SaveEvent(orderPlaced); err != nil {
log.Fatalf("Failed to create order: %v", err)
}
log.Println("Node 1: Created order-456")
// Node 2 receives the OrderPlaced event via NATS
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case event := <-node2EventCh:
log.Printf("Node 2: Received %s v%d", event.EventType, event.Version)
// Node 2 processes the order (must use version 2)
orderProcessed := &aether.Event{
ID: "order-processed",
EventType: "OrderProcessed",
ActorID: "order-456",
Version: 2, // Must be > 1
Data: map[string]interface{}{
"status": "shipped",
},
Timestamp: time.Now(),
}
if err := node2Store.SaveEvent(orderProcessed); err != nil {
log.Fatalf("Node 2: Failed to process order: %v", err)
}
log.Println("Node 2: Processed order-456")
case <-ctx.Done():
log.Println("Node 2: Timeout waiting for order event")
}
// Verify event stream consistency
events, err := node1Store.GetEvents("order-456", 0)
if err != nil {
log.Fatalf("Failed to get events: %v", err)
}
log.Printf("Node 1: Event stream has %d events", len(events))
} }

353
examples/retry_patterns.go Normal file
View File

@@ -0,0 +1,353 @@
package examples
import (
"errors"
"fmt"
"log"
"math"
"math/rand"
"time"
"git.flowmade.one/flowmade-one/aether"
)
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
//
// This pattern is suitable for scenarios where you want to automatically retry
// with exponential backoff when version conflicts occur.
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const initialBackoff = 100 * time.Millisecond
var event *aether.Event
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
log.Printf("Retry attempt %d after %v", attempt, backoff)
time.Sleep(backoff)
}
// Get the current version for the actor
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
// Create event with next version
event = &aether.Event{
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"attempt": attempt},
Timestamp: time.Now(),
}
// Attempt to save
if err := store.SaveEvent(event); err == nil {
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
return nil
} else if !errors.Is(err, aether.ErrVersionConflict) {
// Some other error occurred
return fmt.Errorf("save event failed: %w", err)
}
// If it's a version conflict, loop will retry
}
return fmt.Errorf("failed to save event after %d retries", maxRetries)
}
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
// from VersionConflictError to make intelligent retry decisions.
//
// This pattern shows how to log detailed context and potentially implement
// circuit-breaker logic based on the conflict information.
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 5
var lastConflictVersion int64
for attempt := 0; attempt < maxRetries; attempt++ {
// Get current version
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
// Create event
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{"timestamp": time.Now()},
Timestamp: time.Now(),
}
// Attempt to save
err = store.SaveEvent(event)
if err == nil {
return nil // Success
}
// Check if it's a version conflict
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
// Not a version conflict, fail immediately
return err
}
// Extract detailed context from the conflict error
log.Printf(
"Version conflict for actor %q: attempted version %d, current version %d",
versionErr.ActorID,
versionErr.AttemptedVersion,
versionErr.CurrentVersion,
)
// Check for thrashing (multiple conflicts with same version)
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
log.Printf("Detected version thrashing - circuit breaker would trigger here")
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
}
lastConflictVersion = versionErr.CurrentVersion
// Exponential backoff
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
time.Sleep(backoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// JitterRetryPattern implements exponential backoff with jitter to prevent
// thundering herd when multiple writers retry simultaneously.
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
const baseBackoff = 100 * time.Millisecond
const maxJitter = 0.1 // 10% jitter
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
// Exponential backoff with jitter
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
totalBackoff := exponentialBackoff + jitter
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
time.Sleep(totalBackoff)
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
//
// This pattern demonstrates how application logic can use CurrentVersion to
// decide whether to retry, give up, or escalate to a higher-level handler.
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
const maxRetries = 3
for attempt := 0; attempt < maxRetries; attempt++ {
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
return nil
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
return err
}
// Adaptive backoff based on version distance
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
if versionDistance > 10 {
// Many concurrent writers - back off more aggressively
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
} else if versionDistance > 3 {
// Moderate contention - normal backoff
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
} else {
// Light contention - minimal backoff
log.Printf("Light contention detected")
time.Sleep(50 * time.Millisecond)
}
}
return fmt.Errorf("failed after %d retries", maxRetries)
}
// EventualConsistencyPattern demonstrates how to handle version conflicts
// in an eventually consistent manner by publishing to a retry queue.
//
// This is useful when immediate retry is not feasible, and you want to
// defer the operation to a background worker.
type RetryQueueItem struct {
Event *aether.Event
ConflictVersion int64
ConflictAttempted int64
NextRetryTime time.Time
FailureCount int
}
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
err := store.SaveEvent(event)
if err == nil {
return
}
var versionErr *aether.VersionConflictError
if !errors.As(err, &versionErr) {
log.Printf("Non-retryable error: %v", err)
return
}
// Queue for retry - background worker will process this
retryItem := RetryQueueItem{
Event: event,
ConflictVersion: versionErr.CurrentVersion,
ConflictAttempted: versionErr.AttemptedVersion,
NextRetryTime: time.Now().Add(1 * time.Second),
FailureCount: 0,
}
select {
case retryQueue <- retryItem:
log.Printf("Queued event for retry: actor=%s", event.ActorID)
case <-time.After(5 * time.Second):
log.Printf("Failed to queue event for retry (queue full)")
}
}
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
//
// The circuit breaker tracks failure rates and temporarily stops retrying
// when the failure rate gets too high, allowing the system to recover.
type CircuitBreaker struct {
failureCount int
successCount int
state string // "closed", "open", "half-open"
lastFailureTime time.Time
openDuration time.Duration
failureThreshold int
successThreshold int
}
func NewCircuitBreaker() *CircuitBreaker {
return &CircuitBreaker{
state: "closed",
openDuration: 30 * time.Second,
failureThreshold: 5,
successThreshold: 3,
}
}
func (cb *CircuitBreaker) RecordSuccess() {
if cb.state == "half-open" {
cb.successCount++
if cb.successCount >= cb.successThreshold {
cb.state = "closed"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker closed")
}
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.lastFailureTime = time.Now()
cb.failureCount++
if cb.failureCount >= cb.failureThreshold {
cb.state = "open"
log.Printf("Circuit breaker opened")
}
}
func (cb *CircuitBreaker) CanRetry() bool {
if cb.state == "closed" {
return true
}
if cb.state == "open" {
if time.Since(cb.lastFailureTime) > cb.openDuration {
cb.state = "half-open"
cb.failureCount = 0
cb.successCount = 0
log.Printf("Circuit breaker half-open")
return true
}
return false
}
// half-open state allows retries
return true
}
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
if !cb.CanRetry() {
return fmt.Errorf("circuit breaker open - not retrying")
}
currentVersion, err := store.GetLatestVersion(actorID)
if err != nil {
return err
}
event := &aether.Event{
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
EventType: eventType,
ActorID: actorID,
Version: currentVersion + 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event)
if err == nil {
cb.RecordSuccess()
return nil
}
if !errors.Is(err, aether.ErrVersionConflict) {
return err
}
cb.RecordFailure()
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
}

4
go.mod
View File

@@ -6,16 +6,19 @@ require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nats.go v1.37.0
github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
) )
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/compress v1.18.0 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect
@@ -23,4 +26,5 @@ require (
golang.org/x/crypto v0.18.0 // indirect golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.35.0 // indirect golang.org/x/sys v0.35.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
) )

View File

@@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"strings"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@@ -25,8 +24,6 @@ type NATSEventBus struct {
subscriptions []*nats.Subscription subscriptions []*nats.Subscription
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards) patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node nodeID string // Unique ID for this node
streamPrefix string // NATS subject prefix for events
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
mutex sync.Mutex mutex sync.Mutex
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@@ -49,7 +46,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nodeID: uuid.New().String(), nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0), subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int), patternSubscribers: make(map[string]int),
streamPrefix: "aether",
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
@@ -57,43 +53,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil return neb, nil
} }
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
// The event store is used to automatically update version cache when EventStored events are received
// from other cluster nodes via NATS. This ensures cross-node version consistency.
//
// Example:
//
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// The namespace parameter is used as a prefix for EventStored event filtering.
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
streamPrefix := "aether"
if namespace != "" {
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
}
neb := &NATSEventBus{
EventBus: NewEventBus(),
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
patternSubscribers: make(map[string]int),
streamPrefix: streamPrefix,
eventStore: store,
ctx: context.Background(),
cancel: func() {},
}
return neb
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern. // Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns: // Supports NATS subject patterns:
// - "*" matches a single token // - "*" matches a single token
@@ -251,6 +210,26 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
} }
} }
// SubscribeToEventStored creates a subscription specifically for EventStored events.
// This is a convenience method for the common pattern of listening to persisted events
// to update version cache or trigger other actions.
//
// Example:
//
// eventStoredCh := natsBus.SubscribeToEventStored("tenant-abc")
// go func() {
// for event := range eventStoredCh {
// actorID := event.Data["actorId"].(string)
// version := int64(event.Data["version"].(float64))
// store.UpdateVersionCache(actorID, version)
// }
// }()
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
return neb.SubscribeWithFilter(namespacePattern, &SubscriptionFilter{
EventTypes: []string{EventTypeEventStored},
})
}
// Stop closes the NATS event bus and all subscriptions // Stop closes the NATS event bus and all subscriptions
func (neb *NATSEventBus) Stop() { func (neb *NATSEventBus) Stop() {
neb.mutex.Lock() neb.mutex.Lock()
@@ -269,103 +248,3 @@ func (neb *NATSEventBus) Stop() {
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID) log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
} }
// sanitizeSubject sanitizes a string for use in NATS subjects
func sanitizeSubject(s string) string {
s = strings.ReplaceAll(s, " ", "_")
s = strings.ReplaceAll(s, ".", "_")
s = strings.ReplaceAll(s, "*", "_")
s = strings.ReplaceAll(s, ">", "_")
return s
}
// extractActorType extracts the actor type from an actor ID
func extractActorType(actorID string) string {
for i, c := range actorID {
if c == '-' && i > 0 {
return actorID[:i]
}
}
return "unknown"
}
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
// EventStored events are published by JetStreamEventStore when events are successfully saved.
// This is useful for cross-node event synchronization and version cache consistency.
//
// The returned channel receives EventStored events matching the pattern.
// The EventStored event schema:
// - EventType: "EventStored"
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// The namespacePattern supports NATS wildcards:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Example:
//
// ch := eventBus.SubscribeToEventStored("tenant-*")
// for event := range ch {
// if event.EventType != aether.EventTypeEventStored {
// continue
// }
// actorID := event.Data["actorId"].(string)
// version, _ := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
//
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
// from all namespaces. Ensure your application handles this appropriately.
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
ch := make(chan *Event, 100)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
return
}
if eventMsg.NodeID == neb.nodeID {
return
}
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
actorID, ok := eventMsg.Event.Data["actorId"].(string)
if !ok {
return
}
version, ok := eventMsg.Event.Data["version"].(int64)
if !ok {
return
}
// Use type assertion to call UpdateVersionCache
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
es.UpdateVersionCache(actorID, version)
}
}
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
close(ch)
return ch
}
neb.subscriptions = append(neb.subscriptions, sub)
return ch
}

36
store/helpers_test.go Normal file
View File

@@ -0,0 +1,36 @@
//go:build integration
// +build integration
package store
import (
"testing"
"github.com/nats-io/nats.go"
)
// getTestNATSConnection returns a NATS connection for testing.
// This helper is used by benchmark tests that require NATS.
func getTestNATSConnection(t *testing.T) *nats.Conn {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Skipf("NATS not available: %v", err)
}
return nc
}
// getVersionFromEvent extracts version from EventStored event data.
func getVersionFromEvent(data map[string]interface{}) int64 {
switch v := data["version"].(type) {
case float64:
return int64(v)
case int64:
return v
case int:
return int64(v)
case uint64:
return int64(v)
default:
return 0
}
}

View File

@@ -1,431 +1,420 @@
//go:build integration //go:build integration
// +build integration
package store package store
import ( import (
"context" "context"
"fmt"
"log" "log"
"os"
"testing" "testing"
"time" "time"
"git.flowmade.one/flowmade-one/aether" "git.flowmade.one/flowmade-one/aether"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nats-io/nats-server/v2/server" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func setupNatsServer() (*server.Server, *nats.Conn, func()) { // generateStreamName creates a unique stream name for each test run
opts := &server.Options{ func generateStreamName(baseName string) string {
Port: -1, return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000)
JetStream: true,
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
} }
s, err := server.NewServer(opts) // cleanupStream deletes a JetStream stream if it exists.
func cleanupStream(nc *nats.Conn, streamName string) {
js, err := nc.JetStream()
if err != nil { if err != nil {
log.Fatal("Failed to create NATS server:", err) return
} }
err = js.DeleteStream(streamName)
go s.Start() // Silently ignore errors - we just want to clean up
if !s.ReadyForConnections(4 * time.Second) { _ = err
log.Fatal("NATS server failed to start")
}
nc, err := nats.Connect(s.ClientURL())
if err != nil {
s.Shutdown()
log.Fatal("Failed to connect to NATS:", err)
}
return s, nc, func() {
nc.Close()
s.Shutdown()
os.RemoveAll(opts.StoreDir)
}
}
func TestUpdateVersionCache(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "test-actor-1"
tests := []struct {
name string
cachedVersion int64
newVersion int64
expectUpdate bool
expectVersion int64
}{
{
name: "update when new version is greater",
cachedVersion: 5,
newVersion: 10,
expectUpdate: true,
expectVersion: 10,
},
{
name: "do not update when new version is equal",
cachedVersion: 5,
newVersion: 5,
expectUpdate: false,
expectVersion: 5,
},
{
name: "do not update when new version is less",
cachedVersion: 10,
newVersion: 5,
expectUpdate: false,
expectVersion: 10,
},
{
name: "update when no cached version exists",
cachedVersion: 0,
newVersion: 1,
expectUpdate: true,
expectVersion: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set up cached version
store.versions = make(map[string]int64)
store.versions[actorID] = tt.cachedVersion
// Call UpdateVersionCache
store.UpdateVersionCache(actorID, tt.newVersion)
// Verify result
if tt.expectUpdate {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to be updated but it wasn't cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
}
} else {
if version, ok := store.versions[actorID]; !ok {
t.Error("Expected version to remain cached")
} else if version != tt.expectVersion {
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
}
}
})
}
}
func TestUpdateVersionCache_Concurrent(t *testing.T) {
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
actorID := "concurrent-actor"
store.versions[actorID] = 1
const numGoroutines = 50
const maxVersion = 100
var done = make(chan struct{})
var updates int32
for i := 0; i < numGoroutines; i++ {
version := int64(1 + (i % maxVersion))
go func(v int64) {
store.UpdateVersionCache(actorID, v)
select {
case <-done:
default:
updates++
}
}(version)
}
close(done)
time.Sleep(100 * time.Millisecond)
finalVersion := store.versions[actorID]
if finalVersion > maxVersion {
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
}
}
func TestSubscribeToEventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close(ctx)
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
if eventBusWithStore == nil {
t.Fatalf("Failed to create event bus with broadcaster")
}
defer eventBusWithStore.Stop()
ch := eventBusWithStore.SubscribeToEventStored("*")
if ch == nil {
t.Fatal("SubscribeToEventStored returned nil channel")
}
actorID := "subscribe-test-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"key": "value"},
Timestamp: time.Now(),
}
eventBusWithStore.Publish("", event)
select {
case receivedEvent := <-ch:
if receivedEvent.EventType != aether.EventTypeEventStored {
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
data, ok := receivedEvent.Data["actorId"].(string)
if !ok || data != actorID {
t.Errorf("Expected actorId in data to be %s", actorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for EventStored event")
}
} }
// TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting
// on a single node (local loopback).
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) { func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
if testing.Short() { nc, err := nats.Connect(nats.DefaultURL)
t.Skip("skipping integration test") require.NoError(t, err)
} defer nc.Close()
s, nc, cleanup := setupNatsServer() streamName := generateStreamName("broadcast_single")
defer cleanup() cleanupStream(nc, streamName)
ctx := context.Background() // Create NATS event bus
natsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer natsBus.Stop()
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast") // Create event store with broadcaster
if err != nil { store, err := NewJetStreamEventStoreWithBroadcaster(
t.Fatalf("Failed to create store: %v", err) nc,
} streamName,
defer store.Close(ctx) natsBus,
"tenant-single",
)
require.NoError(t, err)
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "") // Subscribe to events
defer eventBus.Stop() eventCh := natsBus.Subscribe("tenant-single")
actorID := "broadcast-test-actor-1" // Save event
localCh := eventBus.Subscribe("") testEvent := &aether.Event{
ID: "event-1",
event := &aether.Event{
ID: uuid.New().String(),
EventType: "OrderPlaced",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"total": 99.99},
Timestamp: time.Now(),
}
eventBus.Publish("", event)
select {
case receivedEvent := <-localCh:
if receivedEvent.EventType != "OrderPlaced" {
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for broadcast event")
}
}
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s1, nc1, cleanup1 := setupNatsServer()
defer cleanup1()
s2, nc2, cleanup2 := setupNatsServer()
defer cleanup2()
ctx := context.Background()
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
if err != nil {
t.Fatalf("Failed to create store 1: %v", err)
}
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
if err != nil {
t.Fatalf("Failed to create store 2: %v", err)
}
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
defer eventBus1.Stop()
defer eventBus2.Stop()
actorID := "multi-node-actor"
receiverCh := eventBus2.Subscribe("")
event := &aether.Event{
ID: uuid.New().String(),
EventType: "InventoryReserved",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"quantity": 5},
Timestamp: time.Now(),
}
eventBus1.Publish("", event)
select {
case receivedEvent := <-receiverCh:
if receivedEvent.EventType != "InventoryReserved" {
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
}
if receivedEvent.ActorID != actorID {
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
}
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for cross-node event")
}
}
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
if err != nil {
t.Fatalf("Failed to create tenant A store: %v", err)
}
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
if err != nil {
t.Fatalf("Failed to create tenant B store: %v", err)
}
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
defer tenantAEventBus.Stop()
defer tenantBEventBus.Stop()
tenantACh := tenantAEventBus.Subscribe("tenant-a")
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
actorID := "tenant-actor"
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TenantEvent",
ActorID: actorID,
Version: 1,
Data: map[string]interface{}{"data": "tenant-a"},
Timestamp: time.Now(),
}
tenantAEventBus.Publish("tenant-a", event)
select {
case receivedEvent := <-tenantACh:
if receivedEvent.EventType != "TenantEvent" {
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
}
case <-time.After(2 * time.Second):
t.Error("Timeout waiting for tenant A to receive event")
}
select {
case <-tenantBCh:
t.Error("Tenant B should not receive tenant A's events")
case <-time.After(1 * time.Second):
// Expected - tenant B should not receive events from tenant A
}
}
func TestUpdateVersionCache_EventStored(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
s, nc, cleanup := setupNatsServer()
defer cleanup()
ctx := context.Background()
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
defer eventBus.Stop()
actorID := "version-cache-actor"
store.UpdateVersionCache(actorID, 5)
event := &aether.Event{
ID: uuid.New().String(),
EventType: "TestEvent", EventType: "TestEvent",
ActorID: actorID, ActorID: "actor-1",
Version: 10, Version: 1,
Data: map[string]interface{}{"test": true}, Data: map[string]interface{}{"test": "single"},
Timestamp: time.Now(), Timestamp: time.Now(),
} }
eventBus.Publish("", event) err = store.SaveEvent(testEvent)
require.NoError(t, err)
log.Printf("Saved event: %s", testEvent.ID)
// Receive event via NATS (NATSBroadcast may have wrapped in EventStored)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case received := <-eventCh:
// The received event should have the same actor and version
assert.Equal(t, "actor-1", received.ActorID)
assert.Equal(t, int64(1), received.Version)
// Event type might be original or EventStored wrapper
if received.EventType == aether.EventTypeEventStored {
assert.Equal(t, "actor-1", received.Data["actorId"].(string))
log.Printf("Received EventStored wrapper: %s", received.ID)
} else {
log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType)
}
case <-ctx.Done():
t.Fatal("Did not receive event via NATS")
}
}
// TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes.
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("broadcast_multi")
cleanupStream(nc, streamName)
// Create Node A
nodeANatsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer nodeANatsBus.Stop()
nodeAStore, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nodeANatsBus,
"tenant-multi",
)
require.NoError(t, err)
// Create Node B
nodeBNatsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer nodeBNatsBus.Stop()
nodeBStore, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nodeBNatsBus,
"tenant-multi",
)
require.NoError(t, err)
// Node A subscribes
nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi")
// Node B subscribes
nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi")
// Node A saves event
eventA := &aether.Event{
ID: "event-node-a",
EventType: "TestEvent",
ActorID: "multi-actor",
Version: 1,
Data: map[string]interface{}{"node": "a"},
Timestamp: time.Now(),
}
err = nodeAStore.SaveEvent(eventA)
require.NoError(t, err)
log.Printf("Node A saved: %s", eventA.ID)
// Node B receives event (EventStored wrapper or original)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case received := <-nodeBEventCh:
// Check actor and version match
assert.Equal(t, "multi-actor", received.ActorID)
assert.Equal(t, int64(1), received.Version)
if received.EventType == aether.EventTypeEventStored {
// EventStored wrapper
assert.Equal(t, "multi-actor", received.Data["actorId"].(string))
} else {
// Original event
assert.Equal(t, "a", received.Data["node"])
}
log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version)
case <-ctx.Done():
t.Fatal("Node B did not receive event")
}
// Give Node B time to receive and process Node A's event
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
storedVersion, err := store.GetLatestVersion(actorID) // Node B saves event with different actor
if err != nil { eventB := &aether.Event{
t.Fatalf("Failed to get latest version: %v", err) ID: "event-node-b",
EventType: "TestEvent",
ActorID: "multi-actor-b", // Different actor
Version: 1,
Data: map[string]interface{}{"node": "b"},
Timestamp: time.Now(),
} }
if storedVersion != 10 { err = nodeBStore.SaveEvent(eventB)
t.Errorf("Expected version 10, got %d", storedVersion) require.NoError(t, err)
log.Printf("Node B saved: %s", eventB.ID)
// Node A receives Node B's event (could be EventStored or original)
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
// Wait for either EventStored or the actual event
received := false
for !received {
select {
case event := <-nodeAEventCh:
// Check for Node B's event
if event.EventType == aether.EventTypeEventStored {
// EventStored wrapper - check actorId
actorID := event.Data["actorId"].(string)
if actorID == "multi-actor-b" {
received = true
log.Printf("Node A received EventStored for Node B's event: %s", event.ID)
}
} else if event.ActorID == "multi-actor-b" {
received = true
log.Printf("Node A received Node B's event: %s", event.ID)
}
// Keep listening until we get Node B's event
case <-ctx2.Done():
if !received {
t.Fatal("Node A did not receive Node B's event")
}
}
}
} }
cacheVersion, ok := store.GetCachedVersion(actorID) // TestUpdateVersionCache tests the version cache update logic.
if !ok { func TestUpdateVersionCache(t *testing.T) {
t.Error("Expected version to be in cache") nc, err := nats.Connect(nats.DefaultURL)
} else if cacheVersion != 10 { require.NoError(t, err)
t.Errorf("Expected cached version 10, got %d", cacheVersion) defer nc.Close()
streamName := generateStreamName("cache_test")
cleanupStream(nc, streamName)
store, err := NewJetStreamEventStore(nc, streamName)
require.NoError(t, err)
// Save event version 1
event1 := &aether.Event{
ID: "event-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event1)
require.NoError(t, err)
// EventStored will trigger UpdateVersionCache
time.Sleep(100 * time.Millisecond)
// Save event version 2 - should succeed (version > 1)
event2 := &aether.Event{
ID: "event-2",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 2,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event2)
require.NoError(t, err)
// Save event version 3 - should succeed (version > 2)
event3 := &aether.Event{
ID: "event-3",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 3,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event3)
require.NoError(t, err)
// Verify all events can be retrieved
events, err := store.GetEvents("actor-1", 0)
require.NoError(t, err)
assert.Len(t, events, 3)
// Verify latest version
latest, err := store.GetLatestVersion("actor-1")
require.NoError(t, err)
assert.Equal(t, int64(3), latest)
// Manually update cache with version 5 (simulating external update)
store.UpdateVersionCache("actor-1", 5)
// Verify version 4 would conflict (4 < cached 5)
event4 := &aether.Event{
ID: "event-4",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 4,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event4)
assert.Error(t, err, "version 4 should conflict with cached version 5")
// Version 6 should succeed (6 > 5)
event6 := &aether.Event{
ID: "event-6",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 6,
Data: map[string]interface{}{},
Timestamp: time.Now(),
}
err = store.SaveEvent(event6)
require.NoError(t, err)
// Verify version 6 was saved
latest, err = store.GetLatestVersion("actor-1")
require.NoError(t, err)
assert.Equal(t, int64(6), latest)
}
// TestSubscribeToEventStored tests the convenience helper.
func TestSubscribeToEventStored(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
natsBus, err := aether.NewNATSEventBus(nc)
require.NoError(t, err)
defer natsBus.Stop()
// Use helper
eventStoredCh := natsBus.SubscribeToEventStored("test-store")
// Verify channel is created
assert.NotNil(t, eventStoredCh)
// Publish EventStored manually (version will be float64 from JSON)
eventStored := &aether.Event{
ID: "stored-1",
EventType: aether.EventTypeEventStored,
ActorID: "actor-1",
Version: 1,
Data: map[string]interface{}{
"actorId": "actor-1",
"version": 1.0, // Use float64 to match JSON encoding
},
Timestamp: time.Now(),
}
natsBus.Publish("test-store", eventStored)
// Should receive the EventStored
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
select {
case received := <-eventStoredCh:
assert.Equal(t, aether.EventTypeEventStored, received.EventType)
assert.Equal(t, "actor-1", received.ActorID)
log.Printf("Received EventStored: %s", received.ID)
case <-ctx.Done():
t.Fatal("Did not receive EventStored")
} }
} }
// TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation.
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
nc, err := nats.Connect(nats.DefaultURL)
require.NoError(t, err)
defer nc.Close()
streamName := generateStreamName("namespace_isolation")
cleanupStream(nc, streamName)
// Create stores with different namespaces
storeA, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nil,
"tenant-a",
)
require.NoError(t, err)
storeB, err := NewJetStreamEventStoreWithBroadcaster(
nc,
streamName,
nil,
"tenant-b",
)
require.NoError(t, err)
// Save to each namespace
eventA := &aether.Event{
ID: "event-a",
EventType: "TestEvent",
ActorID: "actor-a",
Version: 1,
Data: map[string]interface{}{"tenant": "a"},
Timestamp: time.Now(),
}
err = storeA.SaveEvent(eventA)
require.NoError(t, err)
eventB := &aether.Event{
ID: "event-b",
EventType: "TestEvent",
ActorID: "actor-b",
Version: 1,
Data: map[string]interface{}{"tenant": "b"},
Timestamp: time.Now(),
}
err = storeB.SaveEvent(eventB)
require.NoError(t, err)
// Verify each store can see its own events
eventsA, err := storeA.GetEvents("actor-a", 0)
require.NoError(t, err)
assert.Len(t, eventsA, 1)
assert.Equal(t, "a", eventsA[0].Data["tenant"])
eventsB, err := storeB.GetEvents("actor-b", 0)
require.NoError(t, err)
assert.Len(t, eventsB, 1)
assert.Equal(t, "b", eventsB[0].Data["tenant"])
}

View File

@@ -1,7 +1,6 @@
package store package store
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
@@ -287,28 +286,6 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
// publishEventStored publishes an EventStored event to the broadcaster. // publishEventStored publishes an EventStored event to the broadcaster.
// This is called after a successful SaveEvent to notify subscribers. // This is called after a successful SaveEvent to notify subscribers.
//
// EventStored Event Schema:
// - EventType: "EventStored" (aether.EventTypeEventStored)
// - ActorID: ID of the actor that the original event was about
// - Version: version of the stored event
// - Data:
// - eventId: (string) ID of the stored event
// - actorId: (string) ID of the actor
// - version: (int64) version of the event
// - timestamp: (int64) Unix timestamp of when the event was stored
//
// Example usage with NATSEventBus:
//
// eventBus := aether.NewNATSEventBus(natsConn)
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
// ch := eventBus.SubscribeToEventStored("*")
//
// for event := range ch {
// actorID := event.Data["actorId"].(string)
// version := event.Data["version"].(int64)
// store.UpdateVersionCache(actorID, version)
// }
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) { func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
eventStored := &aether.Event{ eventStored := &aether.Event{
ID: uuid.New().String(), ID: uuid.New().String(),
@@ -584,9 +561,6 @@ func sanitizeSubject(s string) string {
// UpdateVersionCache updates the version cache for a specific actor. // UpdateVersionCache updates the version cache for a specific actor.
// This is used when receiving events from other nodes via NATS to keep // This is used when receiving events from other nodes via NATS to keep
// the version cache consistent across cluster nodes. // the version cache consistent across cluster nodes.
//
// Only updates if the new version is greater than the cached version to prevent
// stale cache entries from causing version conflicts.
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) { func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
jes.mu.Lock() jes.mu.Lock()
defer jes.mu.Unlock() defer jes.mu.Unlock()
@@ -597,27 +571,5 @@ func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64
} }
} }
// GetCachedVersion returns the cached version for an actor, if available.
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
jes.mu.Lock()
defer jes.mu.Unlock()
version, ok := jes.versions[actorID]
return version, ok
}
// SetBroadcaster sets the event broadcaster for this store.
// The broadcaster is used to publish EventStored events when events are saved.
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
jes.mu.Lock()
defer jes.mu.Unlock()
jes.broadcaster = broadcaster
}
// Close closes the JetStream event store and cleans up resources.
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
return nil
}
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors // Compile-time check that JetStreamEventStore implements EventStoreWithErrors
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil) var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)