Compare commits
1 Commits
issue-149-
...
5c01911e3c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5c01911e3c |
@@ -1,64 +0,0 @@
|
||||
# Issue: Implement Actor Migration Between Cluster Nodes
|
||||
|
||||
## Problem
|
||||
|
||||
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
|
||||
|
||||
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
|
||||
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
|
||||
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
|
||||
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
|
||||
|
||||
## Required Implementation
|
||||
|
||||
### 1. Rebalance Algorithm (cluster/shard.go)
|
||||
Implement `ConsistentHashPlacement.RebalanceShards` to:
|
||||
- Calculate new shard assignments based on active nodes
|
||||
- Identify actors needing migration
|
||||
- Generate migration plan with source/dest nodes
|
||||
|
||||
### 2. Migration Coordinator (cluster/manager.go)
|
||||
Implement `handleRebalanceRequest` to:
|
||||
- Accept migration plan from leader
|
||||
- For each actor in plan:
|
||||
1. Pause incoming messages
|
||||
2. Capture actor state (replay events up to current version)
|
||||
3. Serialize state
|
||||
4. Send migration request to destination node
|
||||
5. Wait for ack
|
||||
6. Delete actor from current node
|
||||
- Track migration status via `ActorMigration.Status`
|
||||
|
||||
### 3. Cross-Node Message Routing (cluster/distributed.go)
|
||||
Implement proper routing in `SendMessage`:
|
||||
- Use `GetActorNode(actorID)` to determine target node
|
||||
- If remote: marshal message, send via NATS to target node
|
||||
- If local: send to local runtime
|
||||
- Route response back to caller if needed
|
||||
|
||||
## Suggested Approach
|
||||
|
||||
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
|
||||
2. **Implement state capture** - replay events to get current state
|
||||
3. **Implement state restore** - deserialize and restore actor state
|
||||
4. **Implement coordinator** - manage migration phases
|
||||
5. **Add error handling** - handle failed migrations, retries, cleanup
|
||||
6. **Add tests** - test migration with mock NATS
|
||||
|
||||
## Related Files
|
||||
|
||||
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
|
||||
- `cluster/manager.go:167` - handleMigrationRequest (empty)
|
||||
- `cluster/shard.go:211` - RebalanceShards (stub)
|
||||
- `cluster/distributed.go:139` - SendMessage (simplified)
|
||||
- `cluster/types.go:108` - ActorMigration struct
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] `RebalanceShards` returns new shard map with actor assignments
|
||||
- [ ] `handleRebalanceRequest` processes migration plan
|
||||
- [ ] `handleMigrationRequest` accepts actor migrations
|
||||
- [ ] `SendMessage` routes to correct node
|
||||
- [ ] Actors can be migrated with state preserved
|
||||
- [ ] Failed migrations are handled gracefully
|
||||
- [ ] Integration test with multi-node cluster
|
||||
@@ -1,117 +0,0 @@
|
||||
# Issue: Add Snapshot Support to Event Sourcing Workflow
|
||||
|
||||
## Problem
|
||||
|
||||
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
|
||||
- Actors with many events must replay entire history
|
||||
- No performance optimization for long-lived actors
|
||||
- Snapshots exist as API but are not used
|
||||
|
||||
## Current State
|
||||
|
||||
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
|
||||
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
|
||||
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
|
||||
- `InMemoryEventStore` has snapshot methods but no lifecycle management
|
||||
|
||||
## Required Implementation
|
||||
|
||||
### 1. Snapshot Strategy
|
||||
Define when to create snapshots:
|
||||
- Fixed interval (e.g., every 100 events)
|
||||
- Version-based (e.g., every 50 versions)
|
||||
- Hybrid: version-based with min/max bounds
|
||||
|
||||
### 2. State Capture
|
||||
Add method to capture actor state:
|
||||
```go
|
||||
// CaptureState rebuilds actor state by replaying events and returns it
|
||||
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
||||
```
|
||||
|
||||
### 3. Snapshot Store Extension
|
||||
Extend `EventStoreWithErrors` to include snapshots:
|
||||
```go
|
||||
type EventStoreWithSnapshots interface {
|
||||
EventStoreWithErrors
|
||||
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
||||
SaveSnapshot(snapshot *ActorSnapshot) error
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Snapshot Workflow
|
||||
Modify event retrieval to use snapshots:
|
||||
```go
|
||||
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
|
||||
// 1. Try to get latest snapshot
|
||||
snapshot, _ := store.GetLatestSnapshot(actorID)
|
||||
|
||||
// 2. If snapshot exists and version <= fromVersion:
|
||||
// - Return events from snapshot version + 1
|
||||
// 3. Else:
|
||||
// - Replay all events from version 0
|
||||
}
|
||||
```
|
||||
|
||||
## Suggested Implementation
|
||||
|
||||
### 1. Add CaptureState to EventStore interface
|
||||
In `event.go`, extend `EventStore` or create `StateStore` interface:
|
||||
```go
|
||||
type StateStore interface {
|
||||
EventStore
|
||||
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
||||
}
|
||||
```
|
||||
|
||||
### 2. Implement CaptureState
|
||||
In `store/jetstream.go`:
|
||||
```go
|
||||
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
|
||||
// Replay events and build state (application logic needed here)
|
||||
events, _ := jes.GetEvents(actorID, fromVersion)
|
||||
// Need application logic to convert events to state
|
||||
return state, nil
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Add Snapshot Helper
|
||||
Create snapshot utilities:
|
||||
```go
|
||||
// CreateSnapshot creates snapshot from state
|
||||
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
|
||||
return &ActorSnapshot{
|
||||
ActorID: actorID,
|
||||
Version: version,
|
||||
State: state,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Modify GetEvents
|
||||
Update `GetEvents` in both stores to use snapshots when beneficial.
|
||||
|
||||
## Snapshots Workflow Example
|
||||
|
||||
```
|
||||
1. Actor has 1000 events
|
||||
2. Every 100 events, create snapshot
|
||||
3. Actor reaches version 1000, snapshot at version 1000
|
||||
4. Request events from version 900:
|
||||
- Get snapshot at version 1000? No (version too high)
|
||||
- Replay 900->1000 events (only 100 events)
|
||||
5. Request events from version 50:
|
||||
- Get latest snapshot at version 1000? Yes (version > 50)
|
||||
- Use snapshot as base
|
||||
- Replay 1000->1000 events (none)
|
||||
```
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] `CaptureState` method added to event store
|
||||
- [ ] Snapshots created at configured intervals
|
||||
- [ ] `GetEvents` uses snapshots to optimize replay
|
||||
- [ ] Snapshot workflow tested with long-lived actors
|
||||
- [ ] Configuration for snapshot interval/version
|
||||
- [ ] Metrics: snapshot count, average replay size
|
||||
@@ -1,100 +0,0 @@
|
||||
# Issue: Implement VM/Runtime for Actors
|
||||
|
||||
## Problem
|
||||
|
||||
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
|
||||
|
||||
## Required Components
|
||||
|
||||
### 1. VM Implementation (cluster/vm.go - new)
|
||||
```go
|
||||
type VirtualMachine struct {
|
||||
actorID string
|
||||
eventStore aether.EventStore
|
||||
state map[string]interface{}
|
||||
version int64
|
||||
}
|
||||
```
|
||||
|
||||
Methods needed:
|
||||
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
|
||||
- `Start()` - replay events to rebuild state
|
||||
- `ProcessEvent(event *aether.Event)` - apply event to state
|
||||
- `Stop()` - persist final state
|
||||
- `GetVersion()` - current event version
|
||||
|
||||
### 2. Runtime Implementation (cluster/runtime.go - new)
|
||||
```go
|
||||
type Runtime struct {
|
||||
natsConn *nats.Conn
|
||||
eventStore aether.EventStore
|
||||
vmRegistry VMRegistry // map[actorID]*VirtualMachine
|
||||
config RuntimeConfig
|
||||
}
|
||||
```
|
||||
|
||||
Methods needed:
|
||||
- `Start()` - initialize and start processing
|
||||
- `LoadModel(model eventstorming.Model)` - register domain types
|
||||
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
|
||||
- `GetActiveVMs()` - return map of active VMs
|
||||
- `CreateVM(actorID string)` - create new VM instance
|
||||
- `StopVM(actorID string)` - persist and stop VM
|
||||
|
||||
### 3. Event Processing
|
||||
- Subscribe to actor's event stream
|
||||
- Replay events to build initial state
|
||||
- Apply new events as they arrive
|
||||
- Handle event versions and conflicts
|
||||
|
||||
## Suggested Design
|
||||
|
||||
### VM Lifecycle
|
||||
```
|
||||
1. Actor message arrives for actor-123
|
||||
2. Runtime checks if VM exists for actor-123
|
||||
3. If not, create VM:
|
||||
- Replay events from event store
|
||||
- Rebuild state
|
||||
4. Route message to VM
|
||||
5. VM processes message -> creates new events
|
||||
6. Events persisted to event store
|
||||
7. VM state updated
|
||||
```
|
||||
|
||||
### State Management
|
||||
- State derived from event replay
|
||||
- No separate state store needed
|
||||
- Can snapshot periodically for performance
|
||||
- Version conflict handling using existing EventStore
|
||||
|
||||
## Implementation Steps
|
||||
|
||||
1. **Create VM struct** in `cluster/vm.go`
|
||||
2. **Implement event replay** to rebuild state
|
||||
3. **Create Runtime** in `cluster/runtime.go`
|
||||
4. **Register Runtime with cluster** via `SetVMProvider`
|
||||
5. **Implement message processing** - validate against model
|
||||
6. **Add version conflict handling** using existing EventStore
|
||||
7. **Write tests** - mock event store, test state transitions
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
cluster/
|
||||
├── vm.go # VirtualMachine implementation
|
||||
├── runtime.go # Runtime implementation
|
||||
├── vm_test.go # VM tests
|
||||
├── runtime_test.go # Runtime tests
|
||||
└── integration_test.go # Integration tests
|
||||
```
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] VM can be created with actor ID
|
||||
- [ ] VM replays events to build state
|
||||
- [ ] VM processes events and updates state
|
||||
- [ ] VM persists current version
|
||||
- [ ] Runtime can create/stop VMs
|
||||
- [ ] Runtime manages VM registry
|
||||
- [ ] Integration test with NATS and JetStream
|
||||
@@ -1,168 +1,317 @@
|
||||
// Package main demonstrates cross-node event broadcasting using NATSEventBus
|
||||
// and JetStreamEventStore for cluster synchronization.
|
||||
//
|
||||
// This example shows:
|
||||
// 1. Setting up NATSEventBus with JetStreamEventStore
|
||||
// 2. Broadcasting events across NATS for cross-node distribution
|
||||
// 3. Subscribing to EventStored events for version cache synchronization
|
||||
// 4. Properly handling EventStored events from other cluster nodes
|
||||
//
|
||||
// Prerequisites:
|
||||
// - NATS server running with JetStream enabled (nats-server -js)
|
||||
// - Events stream created in JetStream
|
||||
package main
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package examples
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"git.flowmade.one/flowmade-one/aether/store"
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
func main() {
|
||||
natsURL := getEnv("NATS_URL", "nats://localhost:4222")
|
||||
|
||||
nc, err := nats.Connect(natsURL)
|
||||
// CrossNodeBroadcasting demonstrates how to implement cross-node event broadcasting
|
||||
// using NATSEventBus with JetStreamEventStore. This example shows how events persist
|
||||
// to JetStream and are then broadcast to other nodes in the cluster via NATS.
|
||||
//
|
||||
// Key Concepts:
|
||||
// 1. NATSEventBus wraps EventBus to add NATS publishing
|
||||
// 2. JetStreamEventStore with broadcaster publishes EventStored to NATS
|
||||
// 3. Other nodes receive these events via NATS subscription
|
||||
// 4. Version cache is updated on remote events to maintain consistency
|
||||
//
|
||||
// Usage:
|
||||
// go run examples/cross_node_broadcasting.go
|
||||
func CrossNodeBroadcastingExample() {
|
||||
// Connect to NATS
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to connect to NATS:", err)
|
||||
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
store1, err := store.NewJetStreamEventStore(nc, "events")
|
||||
// Create NATS event bus (this will broadcast to all nodes)
|
||||
natsBus, err := aether.NewNATSEventBus(nc)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create event store:", err)
|
||||
log.Fatalf("Failed to create NATS event bus: %v", err)
|
||||
}
|
||||
defer natsBus.Stop()
|
||||
|
||||
// Create JetStream event store WITH broadcaster
|
||||
// This enables EventStored events to be published to NATS
|
||||
store, err := store.NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
"events",
|
||||
natsBus,
|
||||
"tenant-abc", // Optional namespace for isolation
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create event store: %v", err)
|
||||
}
|
||||
|
||||
eventBus1 := aether.NewNATSEventBusWithBroadcaster(nc, store1, "")
|
||||
defer eventBus1.Stop()
|
||||
|
||||
store2, err := store.NewJetStreamEventStore(nc, "events")
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create event store:", err)
|
||||
}
|
||||
|
||||
eventBus2 := aether.NewNATSEventBusWithBroadcaster(nc, store2, "")
|
||||
defer eventBus2.Stop()
|
||||
|
||||
eventStoredCh1 := eventBus1.SubscribeToEventStored("*")
|
||||
eventStoredCh2 := eventBus2.SubscribeToEventStored("*")
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go processEvents(ctx, eventStoredCh1, store1, done)
|
||||
go processEvents(ctx, eventStoredCh2, store2, done)
|
||||
// Subscribe to EventStored events to update version cache
|
||||
// This keeps the version cache synchronized across nodes
|
||||
eventStoredCh := natsBus.SubscribeWithFilter(
|
||||
"tenant-abc",
|
||||
&aether.SubscriptionFilter{
|
||||
EventTypes: []string{aether.EventTypeEventStored},
|
||||
},
|
||||
)
|
||||
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
actorID := "demo-actor"
|
||||
|
||||
event1 := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 99.99,
|
||||
"status": "pending",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
for event := range eventStoredCh {
|
||||
actorID := event.Data["actorId"].(string)
|
||||
version := int64(event.Data["version"].(float64))
|
||||
store.UpdateVersionCache(actorID, version)
|
||||
}
|
||||
|
||||
log.Printf("Node 1 publishing event: %s", event1.EventType)
|
||||
eventBus1.Publish("", event1)
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
event2 := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "OrderPaid",
|
||||
ActorID: actorID,
|
||||
Version: 2,
|
||||
Data: map[string]interface{}{
|
||||
"total": 99.99,
|
||||
"status": "paid",
|
||||
"method": "credit_card",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
log.Printf("Node 2 publishing event: %s", event2.EventType)
|
||||
eventBus2.Publish("", event2)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
close(done)
|
||||
|
||||
log.Println("Cross-node broadcasting demo complete")
|
||||
}()
|
||||
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
// Now save an event - it will be:
|
||||
// 1. Persisted to JetStream
|
||||
// 2. Published to NATS as EventStored
|
||||
// 3. Received by other nodes via NATS
|
||||
// 4. Used to update version cache
|
||||
|
||||
select {
|
||||
case <-sigCh:
|
||||
log.Println("Shutting down...")
|
||||
case <-done:
|
||||
event := &aether.Event{
|
||||
ID: "event-1",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-123",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.00,
|
||||
"item": "widget",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func processEvents(ctx context.Context, eventStoredCh <-chan *aether.Event, eventStore *store.JetStreamEventStore, done chan struct{}) {
|
||||
for {
|
||||
err = store.SaveEvent(event)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to save event: %v", err)
|
||||
}
|
||||
|
||||
log.Println("Event saved to JetStream and broadcast to NATS")
|
||||
|
||||
// Other nodes in the cluster will receive this event via NATS
|
||||
// and update their version cache accordingly
|
||||
|
||||
// Subscribe to events in this namespace
|
||||
eventCh := natsBus.Subscribe("tenant-abc")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case receivedEvent := <-eventCh:
|
||||
log.Printf("Received event via NATS: %s (version %d)",
|
||||
receivedEvent.EventType, receivedEvent.Version)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case event, ok := <-eventStoredCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if event == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if event.EventType != aether.EventTypeEventStored {
|
||||
continue
|
||||
}
|
||||
|
||||
actorID, ok := event.Data["actorId"].(string)
|
||||
if !ok {
|
||||
log.Printf("Warning: EventStored missing actorId")
|
||||
continue
|
||||
}
|
||||
|
||||
version, ok := event.Data["version"].(int64)
|
||||
if !ok {
|
||||
log.Printf("Warning: EventStored missing version")
|
||||
continue
|
||||
}
|
||||
|
||||
eventID, _ := event.Data["eventId"].(string)
|
||||
|
||||
log.Printf("Received EventStored: actor=%s, version=%d, eventId=%s", actorID, version, eventID)
|
||||
|
||||
eventStore.UpdateVersionCache(actorID, version)
|
||||
|
||||
currentVersion, _ := eventStore.GetLatestVersion(actorID)
|
||||
log.Printf("Updated cache: %s now has version %d (cached: %d)", actorID, version, currentVersion)
|
||||
}
|
||||
log.Println("Timeout waiting for event")
|
||||
}
|
||||
}
|
||||
|
||||
func getEnv(key, defaultValue string) string {
|
||||
if value := os.Getenv(key); value != "" {
|
||||
return value
|
||||
// CrossNodeBroadcastingMultiNode demonstrates a multi-node cluster setup.
|
||||
// This simulates multiple nodes connecting to the same NATS cluster.
|
||||
// In production, these would be separate processes/machines.
|
||||
//
|
||||
// This example requires a running NATS server with JetStream enabled.
|
||||
func CrossNodeBroadcastingMultiNode() {
|
||||
// Connect to NATS
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
nodeID := "node-1"
|
||||
log.Printf("Starting %s", nodeID)
|
||||
|
||||
// Each node creates its own NATS event bus and event store
|
||||
natsBus, err := aether.NewNATSEventBus(nc)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create NATS event bus: %v", err)
|
||||
}
|
||||
defer natsBus.Stop()
|
||||
|
||||
store, err := store.NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
"events",
|
||||
natsBus,
|
||||
"tenant-abc",
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create event store: %v", err)
|
||||
}
|
||||
|
||||
// Setup EventStored subscription for cache synchronization
|
||||
eventStoredCh := natsBus.SubscribeWithFilter(
|
||||
"tenant-abc",
|
||||
&aether.SubscriptionFilter{
|
||||
EventTypes: []string{aether.EventTypeEventStored},
|
||||
},
|
||||
)
|
||||
|
||||
go func() {
|
||||
for event := range eventStoredCh {
|
||||
actorID := event.Data["actorId"].(string)
|
||||
version := int64(event.Data["version"].(float64))
|
||||
store.UpdateVersionCache(actorID, version)
|
||||
log.Printf("[%s] Received EventStored for %s v%d", nodeID, actorID, version)
|
||||
}
|
||||
}()
|
||||
|
||||
// Subscribe to actual events
|
||||
eventCh := natsBus.Subscribe("tenant-abc")
|
||||
log.Printf("[%s] Subscribed to tenant-abc", nodeID)
|
||||
|
||||
// Save an event
|
||||
savedEvent := &aether.Event{
|
||||
ID: fmt.Sprintf("event-%s-1", nodeID),
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-123",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"node": nodeID,
|
||||
"total": 100.00,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(savedEvent)
|
||||
if err != nil {
|
||||
log.Fatalf("[%s] Failed to save event: %v", nodeID, err)
|
||||
}
|
||||
|
||||
log.Printf("[%s] Saved event to JetStream", nodeID)
|
||||
|
||||
// Wait to receive the event (either from local or remote)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case receivedEvent := <-eventCh:
|
||||
log.Printf("[%s] Received: %s v%d from %s",
|
||||
nodeID,
|
||||
receivedEvent.EventType,
|
||||
receivedEvent.Version,
|
||||
receivedEvent.GetCorrelationID(),
|
||||
)
|
||||
case <-ctx.Done():
|
||||
log.Printf("[%s] Timeout waiting for event", nodeID)
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
// DistributedOrderProcessing demonstrates a realistic scenario where
|
||||
// multiple nodes process orders for the same actor. This shows how
|
||||
// cross-node broadcasting ensures consistency.
|
||||
func DistributedOrderProcessing() {
|
||||
// Connect to NATS
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to NATS: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
// Node 1: Order creation
|
||||
node1Bus, _ := aether.NewNATSEventBus(nc)
|
||||
node1Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
|
||||
nc, "events", node1Bus, "orders",
|
||||
)
|
||||
defer node1Bus.Stop()
|
||||
|
||||
// Setup EventStored subscription for Node 1
|
||||
eventStoredCh1 := node1Bus.SubscribeWithFilter(
|
||||
"orders",
|
||||
&aether.SubscriptionFilter{
|
||||
EventTypes: []string{aether.EventTypeEventStored},
|
||||
},
|
||||
)
|
||||
go func() {
|
||||
for event := range eventStoredCh1 {
|
||||
actorID := event.Data["actorId"].(string)
|
||||
version := int64(event.Data["version"].(float64))
|
||||
node1Store.UpdateVersionCache(actorID, version)
|
||||
}
|
||||
}()
|
||||
|
||||
// Node 2: Order processing (different node)
|
||||
node2Bus, _ := aether.NewNATSEventBus(nc)
|
||||
node2Store, _ := store.NewJetStreamEventStoreWithBroadcaster(
|
||||
nc, "events", node2Bus, "orders",
|
||||
)
|
||||
defer node2Bus.Stop()
|
||||
|
||||
// Setup EventStored subscription for Node 2
|
||||
eventStoredCh2 := node2Bus.SubscribeWithFilter(
|
||||
"orders",
|
||||
&aether.SubscriptionFilter{
|
||||
EventTypes: []string{aether.EventTypeEventStored},
|
||||
},
|
||||
)
|
||||
go func() {
|
||||
for event := range eventStoredCh2 {
|
||||
actorID := event.Data["actorId"].(string)
|
||||
version := int64(event.Data["version"].(float64))
|
||||
node2Store.UpdateVersionCache(actorID, version)
|
||||
}
|
||||
}()
|
||||
|
||||
// Node 2 also subscribes to events to receive updates
|
||||
node2EventCh := node2Bus.Subscribe("orders")
|
||||
|
||||
// Node 1 creates an order
|
||||
orderPlaced := &aether.Event{
|
||||
ID: "order-created",
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: "order-456",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"total": 100.00,
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := node1Store.SaveEvent(orderPlaced); err != nil {
|
||||
log.Fatalf("Failed to create order: %v", err)
|
||||
}
|
||||
log.Println("Node 1: Created order-456")
|
||||
|
||||
// Node 2 receives the OrderPlaced event via NATS
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case event := <-node2EventCh:
|
||||
log.Printf("Node 2: Received %s v%d", event.EventType, event.Version)
|
||||
|
||||
// Node 2 processes the order (must use version 2)
|
||||
orderProcessed := &aether.Event{
|
||||
ID: "order-processed",
|
||||
EventType: "OrderProcessed",
|
||||
ActorID: "order-456",
|
||||
Version: 2, // Must be > 1
|
||||
Data: map[string]interface{}{
|
||||
"status": "shipped",
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if err := node2Store.SaveEvent(orderProcessed); err != nil {
|
||||
log.Fatalf("Node 2: Failed to process order: %v", err)
|
||||
}
|
||||
log.Println("Node 2: Processed order-456")
|
||||
|
||||
case <-ctx.Done():
|
||||
log.Println("Node 2: Timeout waiting for order event")
|
||||
}
|
||||
|
||||
// Verify event stream consistency
|
||||
events, err := node1Store.GetEvents("order-456", 0)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to get events: %v", err)
|
||||
}
|
||||
log.Printf("Node 1: Event stream has %d events", len(events))
|
||||
}
|
||||
353
examples/retry_patterns.go
Normal file
353
examples/retry_patterns.go
Normal file
@@ -0,0 +1,353 @@
|
||||
package examples
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
)
|
||||
|
||||
// SimpleRetryPattern demonstrates a basic retry loop using VersionConflictError.
|
||||
//
|
||||
// This pattern is suitable for scenarios where you want to automatically retry
|
||||
// with exponential backoff when version conflicts occur.
|
||||
func SimpleRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 3
|
||||
const initialBackoff = 100 * time.Millisecond
|
||||
|
||||
var event *aether.Event
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * initialBackoff
|
||||
log.Printf("Retry attempt %d after %v", attempt, backoff)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
// Get the current version for the actor
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get latest version: %w", err)
|
||||
}
|
||||
|
||||
// Create event with next version
|
||||
event = &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%d-%d", time.Now().UnixNano(), attempt),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{"attempt": attempt},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Attempt to save
|
||||
if err := store.SaveEvent(event); err == nil {
|
||||
log.Printf("Successfully saved event for actor %s at version %d", actorID, event.Version)
|
||||
return nil
|
||||
} else if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
// Some other error occurred
|
||||
return fmt.Errorf("save event failed: %w", err)
|
||||
}
|
||||
// If it's a version conflict, loop will retry
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to save event after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// ConflictDetailedRetryPattern demonstrates how to extract detailed information
|
||||
// from VersionConflictError to make intelligent retry decisions.
|
||||
//
|
||||
// This pattern shows how to log detailed context and potentially implement
|
||||
// circuit-breaker logic based on the conflict information.
|
||||
func ConflictDetailedRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 5
|
||||
var lastConflictVersion int64
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
// Get current version
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create event
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{"timestamp": time.Now()},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Attempt to save
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil // Success
|
||||
}
|
||||
|
||||
// Check if it's a version conflict
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
// Not a version conflict, fail immediately
|
||||
return err
|
||||
}
|
||||
|
||||
// Extract detailed context from the conflict error
|
||||
log.Printf(
|
||||
"Version conflict for actor %q: attempted version %d, current version %d",
|
||||
versionErr.ActorID,
|
||||
versionErr.AttemptedVersion,
|
||||
versionErr.CurrentVersion,
|
||||
)
|
||||
|
||||
// Check for thrashing (multiple conflicts with same version)
|
||||
if lastConflictVersion == versionErr.CurrentVersion && attempt > 0 {
|
||||
log.Printf("Detected version thrashing - circuit breaker would trigger here")
|
||||
return fmt.Errorf("circuit breaker: too many conflicts at version %d", versionErr.CurrentVersion)
|
||||
}
|
||||
lastConflictVersion = versionErr.CurrentVersion
|
||||
|
||||
// Exponential backoff
|
||||
backoff := time.Duration(math.Pow(2, float64(attempt))) * 100 * time.Millisecond
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// JitterRetryPattern implements exponential backoff with jitter to prevent
|
||||
// thundering herd when multiple writers retry simultaneously.
|
||||
func JitterRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 3
|
||||
const baseBackoff = 100 * time.Millisecond
|
||||
const maxJitter = 0.1 // 10% jitter
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Exponential backoff with jitter
|
||||
exponentialBackoff := time.Duration(math.Pow(2, float64(attempt))) * baseBackoff
|
||||
jitter := time.Duration(rand.Float64() * float64(exponentialBackoff) * maxJitter)
|
||||
totalBackoff := exponentialBackoff + jitter
|
||||
|
||||
log.Printf("Retrying in %v (attempt %d/%d)", totalBackoff, attempt+1, maxRetries)
|
||||
time.Sleep(totalBackoff)
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// AdaptiveRetryPattern adjusts retry strategy based on version conflict patterns.
|
||||
//
|
||||
// This pattern demonstrates how application logic can use CurrentVersion to
|
||||
// decide whether to retry, give up, or escalate to a higher-level handler.
|
||||
func AdaptiveRetryPattern(store aether.EventStore, actorID string, eventType string) error {
|
||||
const maxRetries = 3
|
||||
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Adaptive backoff based on version distance
|
||||
versionDistance := versionErr.CurrentVersion - versionErr.AttemptedVersion
|
||||
if versionDistance > 10 {
|
||||
// Many concurrent writers - back off more aggressively
|
||||
log.Printf("High contention detected (gap: %d), aggressive backoff", versionDistance)
|
||||
time.Sleep(time.Duration(versionDistance*10) * time.Millisecond)
|
||||
} else if versionDistance > 3 {
|
||||
// Moderate contention - normal backoff
|
||||
log.Printf("Moderate contention detected (gap: %d)", versionDistance)
|
||||
time.Sleep(time.Duration(versionDistance) * time.Millisecond)
|
||||
} else {
|
||||
// Light contention - minimal backoff
|
||||
log.Printf("Light contention detected")
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed after %d retries", maxRetries)
|
||||
}
|
||||
|
||||
// EventualConsistencyPattern demonstrates how to handle version conflicts
|
||||
// in an eventually consistent manner by publishing to a retry queue.
|
||||
//
|
||||
// This is useful when immediate retry is not feasible, and you want to
|
||||
// defer the operation to a background worker.
|
||||
type RetryQueueItem struct {
|
||||
Event *aether.Event
|
||||
ConflictVersion int64
|
||||
ConflictAttempted int64
|
||||
NextRetryTime time.Time
|
||||
FailureCount int
|
||||
}
|
||||
|
||||
func EventualConsistencyPattern(store aether.EventStore, retryQueue chan<- RetryQueueItem, event *aether.Event) {
|
||||
err := store.SaveEvent(event)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var versionErr *aether.VersionConflictError
|
||||
if !errors.As(err, &versionErr) {
|
||||
log.Printf("Non-retryable error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Queue for retry - background worker will process this
|
||||
retryItem := RetryQueueItem{
|
||||
Event: event,
|
||||
ConflictVersion: versionErr.CurrentVersion,
|
||||
ConflictAttempted: versionErr.AttemptedVersion,
|
||||
NextRetryTime: time.Now().Add(1 * time.Second),
|
||||
FailureCount: 0,
|
||||
}
|
||||
|
||||
select {
|
||||
case retryQueue <- retryItem:
|
||||
log.Printf("Queued event for retry: actor=%s", event.ActorID)
|
||||
case <-time.After(5 * time.Second):
|
||||
log.Printf("Failed to queue event for retry (queue full)")
|
||||
}
|
||||
}
|
||||
|
||||
// CircuitBreakerPattern implements a simple circuit breaker for version conflicts.
|
||||
//
|
||||
// The circuit breaker tracks failure rates and temporarily stops retrying
|
||||
// when the failure rate gets too high, allowing the system to recover.
|
||||
type CircuitBreaker struct {
|
||||
failureCount int
|
||||
successCount int
|
||||
state string // "closed", "open", "half-open"
|
||||
lastFailureTime time.Time
|
||||
openDuration time.Duration
|
||||
failureThreshold int
|
||||
successThreshold int
|
||||
}
|
||||
|
||||
func NewCircuitBreaker() *CircuitBreaker {
|
||||
return &CircuitBreaker{
|
||||
state: "closed",
|
||||
openDuration: 30 * time.Second,
|
||||
failureThreshold: 5,
|
||||
successThreshold: 3,
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *CircuitBreaker) RecordSuccess() {
|
||||
if cb.state == "half-open" {
|
||||
cb.successCount++
|
||||
if cb.successCount >= cb.successThreshold {
|
||||
cb.state = "closed"
|
||||
cb.failureCount = 0
|
||||
cb.successCount = 0
|
||||
log.Printf("Circuit breaker closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *CircuitBreaker) RecordFailure() {
|
||||
cb.lastFailureTime = time.Now()
|
||||
cb.failureCount++
|
||||
if cb.failureCount >= cb.failureThreshold {
|
||||
cb.state = "open"
|
||||
log.Printf("Circuit breaker opened")
|
||||
}
|
||||
}
|
||||
|
||||
func (cb *CircuitBreaker) CanRetry() bool {
|
||||
if cb.state == "closed" {
|
||||
return true
|
||||
}
|
||||
if cb.state == "open" {
|
||||
if time.Since(cb.lastFailureTime) > cb.openDuration {
|
||||
cb.state = "half-open"
|
||||
cb.failureCount = 0
|
||||
cb.successCount = 0
|
||||
log.Printf("Circuit breaker half-open")
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
// half-open state allows retries
|
||||
return true
|
||||
}
|
||||
|
||||
func CircuitBreakerRetryPattern(store aether.EventStore, cb *CircuitBreaker, actorID string, eventType string) error {
|
||||
if !cb.CanRetry() {
|
||||
return fmt.Errorf("circuit breaker open - not retrying")
|
||||
}
|
||||
|
||||
currentVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &aether.Event{
|
||||
ID: fmt.Sprintf("evt-%s-%d", actorID, time.Now().UnixNano()),
|
||||
EventType: eventType,
|
||||
ActorID: actorID,
|
||||
Version: currentVersion + 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = store.SaveEvent(event)
|
||||
if err == nil {
|
||||
cb.RecordSuccess()
|
||||
return nil
|
||||
}
|
||||
|
||||
if !errors.Is(err, aether.ErrVersionConflict) {
|
||||
return err
|
||||
}
|
||||
|
||||
cb.RecordFailure()
|
||||
return fmt.Errorf("save failed with version conflict, circuit breaker status: %s", cb.state)
|
||||
}
|
||||
4
go.mod
4
go.mod
@@ -6,16 +6,19 @@ require (
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/nats-io/nats.go v1.37.0
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/stretchr/testify v1.11.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.66.1 // indirect
|
||||
github.com/prometheus/procfs v0.16.1 // indirect
|
||||
@@ -23,4 +26,5 @@ require (
|
||||
golang.org/x/crypto v0.18.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
google.golang.org/protobuf v1.36.8 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
161
nats_eventbus.go
161
nats_eventbus.go
@@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -25,8 +24,6 @@ type NATSEventBus struct {
|
||||
subscriptions []*nats.Subscription
|
||||
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
|
||||
nodeID string // Unique ID for this node
|
||||
streamPrefix string // NATS subject prefix for events
|
||||
eventStore interface{} // Optional event store for version cache sync (jetstream.JetStreamEventStore)
|
||||
mutex sync.Mutex
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
@@ -49,7 +46,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
||||
nodeID: uuid.New().String(),
|
||||
subscriptions: make([]*nats.Subscription, 0),
|
||||
patternSubscribers: make(map[string]int),
|
||||
streamPrefix: "aether",
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
@@ -57,43 +53,6 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
|
||||
return neb, nil
|
||||
}
|
||||
|
||||
// NewNATSEventBusWithBroadcaster creates a new NATS-backed event bus with JetStreamEventStore integration.
|
||||
// The event store is used to automatically update version cache when EventStored events are received
|
||||
// from other cluster nodes via NATS. This ensures cross-node version consistency.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// eventBus := aether.NewNATSEventBusWithBroadcaster(natsConn, store, "tenant-abc")
|
||||
// ch := eventBus.SubscribeToEventStored("tenant-*")
|
||||
// for event := range ch {
|
||||
// actorID := event.Data["actorId"].(string)
|
||||
// version := event.Data["version"].(int64)
|
||||
// store.UpdateVersionCache(actorID, version)
|
||||
// }
|
||||
//
|
||||
// The namespace parameter is used as a prefix for EventStored event filtering.
|
||||
// If empty, EventStored events from all namespaces will be received (requires wildcard pattern).
|
||||
func NewNATSEventBusWithBroadcaster(nc *nats.Conn, store interface{}, namespace string) *NATSEventBus {
|
||||
streamPrefix := "aether"
|
||||
if namespace != "" {
|
||||
streamPrefix = fmt.Sprintf("aether.%s", sanitizeSubject(namespace))
|
||||
}
|
||||
|
||||
neb := &NATSEventBus{
|
||||
EventBus: NewEventBus(),
|
||||
nc: nc,
|
||||
nodeID: uuid.New().String(),
|
||||
subscriptions: make([]*nats.Subscription, 0),
|
||||
patternSubscribers: make(map[string]int),
|
||||
streamPrefix: streamPrefix,
|
||||
eventStore: store,
|
||||
ctx: context.Background(),
|
||||
cancel: func() {},
|
||||
}
|
||||
|
||||
return neb
|
||||
}
|
||||
|
||||
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
|
||||
// Supports NATS subject patterns:
|
||||
// - "*" matches a single token
|
||||
@@ -251,6 +210,26 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeToEventStored creates a subscription specifically for EventStored events.
|
||||
// This is a convenience method for the common pattern of listening to persisted events
|
||||
// to update version cache or trigger other actions.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// eventStoredCh := natsBus.SubscribeToEventStored("tenant-abc")
|
||||
// go func() {
|
||||
// for event := range eventStoredCh {
|
||||
// actorID := event.Data["actorId"].(string)
|
||||
// version := int64(event.Data["version"].(float64))
|
||||
// store.UpdateVersionCache(actorID, version)
|
||||
// }
|
||||
// }()
|
||||
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
|
||||
return neb.SubscribeWithFilter(namespacePattern, &SubscriptionFilter{
|
||||
EventTypes: []string{EventTypeEventStored},
|
||||
})
|
||||
}
|
||||
|
||||
// Stop closes the NATS event bus and all subscriptions
|
||||
func (neb *NATSEventBus) Stop() {
|
||||
neb.mutex.Lock()
|
||||
@@ -269,103 +248,3 @@ func (neb *NATSEventBus) Stop() {
|
||||
|
||||
log.Printf("[NATSEventBus] Node %s stopped", neb.nodeID)
|
||||
}
|
||||
|
||||
// sanitizeSubject sanitizes a string for use in NATS subjects
|
||||
func sanitizeSubject(s string) string {
|
||||
s = strings.ReplaceAll(s, " ", "_")
|
||||
s = strings.ReplaceAll(s, ".", "_")
|
||||
s = strings.ReplaceAll(s, "*", "_")
|
||||
s = strings.ReplaceAll(s, ">", "_")
|
||||
return s
|
||||
}
|
||||
|
||||
// extractActorType extracts the actor type from an actor ID
|
||||
func extractActorType(actorID string) string {
|
||||
for i, c := range actorID {
|
||||
if c == '-' && i > 0 {
|
||||
return actorID[:i]
|
||||
}
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
// SubscribeToEventStored creates a subscription to EventStored events for a namespace pattern.
|
||||
// EventStored events are published by JetStreamEventStore when events are successfully saved.
|
||||
// This is useful for cross-node event synchronization and version cache consistency.
|
||||
//
|
||||
// The returned channel receives EventStored events matching the pattern.
|
||||
// The EventStored event schema:
|
||||
// - EventType: "EventStored"
|
||||
// - ActorID: ID of the actor that the original event was about
|
||||
// - Version: version of the stored event
|
||||
// - Data:
|
||||
// - eventId: (string) ID of the stored event
|
||||
// - actorId: (string) ID of the actor
|
||||
// - version: (int64) version of the event
|
||||
// - timestamp: (int64) Unix timestamp of when the event was stored
|
||||
//
|
||||
// The namespacePattern supports NATS wildcards:
|
||||
// - "*" matches a single token
|
||||
// - ">" matches one or more tokens (only at the end)
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// ch := eventBus.SubscribeToEventStored("tenant-*")
|
||||
// for event := range ch {
|
||||
// if event.EventType != aether.EventTypeEventStored {
|
||||
// continue
|
||||
// }
|
||||
// actorID := event.Data["actorId"].(string)
|
||||
// version, _ := event.Data["version"].(int64)
|
||||
// store.UpdateVersionCache(actorID, version)
|
||||
// }
|
||||
//
|
||||
// Security Warning: Using wildcard patterns like ">" will receive EventStored events
|
||||
// from all namespaces. Ensure your application handles this appropriately.
|
||||
func (neb *NATSEventBus) SubscribeToEventStored(namespacePattern string) <-chan *Event {
|
||||
neb.mutex.Lock()
|
||||
defer neb.mutex.Unlock()
|
||||
|
||||
subject := fmt.Sprintf("%s.%s.%s", neb.streamPrefix, namespacePattern, "events.>")
|
||||
|
||||
ch := make(chan *Event, 100)
|
||||
|
||||
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
|
||||
var eventMsg eventMessage
|
||||
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
|
||||
log.Printf("[NATSEventBus] Failed to unmarshal EventStored event: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if eventMsg.NodeID == neb.nodeID {
|
||||
return
|
||||
}
|
||||
|
||||
if eventMsg.Event.EventType == EventTypeEventStored && neb.eventStore != nil {
|
||||
actorID, ok := eventMsg.Event.Data["actorId"].(string)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
version, ok := eventMsg.Event.Data["version"].(int64)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// Use type assertion to call UpdateVersionCache
|
||||
if es, ok := neb.eventStore.(interface{ UpdateVersionCache(string, int64) }); ok {
|
||||
es.UpdateVersionCache(actorID, version)
|
||||
}
|
||||
}
|
||||
|
||||
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[NATSEventBus] Failed to subscribe to EventStored: %v", err)
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
neb.subscriptions = append(neb.subscriptions, sub)
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
36
store/helpers_test.go
Normal file
36
store/helpers_test.go
Normal file
@@ -0,0 +1,36 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// getTestNATSConnection returns a NATS connection for testing.
|
||||
// This helper is used by benchmark tests that require NATS.
|
||||
func getTestNATSConnection(t *testing.T) *nats.Conn {
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
t.Skipf("NATS not available: %v", err)
|
||||
}
|
||||
return nc
|
||||
}
|
||||
|
||||
// getVersionFromEvent extracts version from EventStored event data.
|
||||
func getVersionFromEvent(data map[string]interface{}) int64 {
|
||||
switch v := data["version"].(type) {
|
||||
case float64:
|
||||
return int64(v)
|
||||
case int64:
|
||||
return v
|
||||
case int:
|
||||
return int64(v)
|
||||
case uint64:
|
||||
return int64(v)
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
@@ -1,431 +1,420 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.flowmade.one/flowmade-one/aether"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupNatsServer() (*server.Server, *nats.Conn, func()) {
|
||||
opts := &server.Options{
|
||||
Port: -1,
|
||||
JetStream: true,
|
||||
StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"),
|
||||
}
|
||||
|
||||
s, err := server.NewServer(opts)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create NATS server:", err)
|
||||
}
|
||||
|
||||
go s.Start()
|
||||
if !s.ReadyForConnections(4 * time.Second) {
|
||||
log.Fatal("NATS server failed to start")
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(s.ClientURL())
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
log.Fatal("Failed to connect to NATS:", err)
|
||||
}
|
||||
|
||||
return s, nc, func() {
|
||||
nc.Close()
|
||||
s.Shutdown()
|
||||
os.RemoveAll(opts.StoreDir)
|
||||
}
|
||||
// generateStreamName creates a unique stream name for each test run
|
||||
func generateStreamName(baseName string) string {
|
||||
return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000)
|
||||
}
|
||||
|
||||
func TestUpdateVersionCache(t *testing.T) {
|
||||
s, nc, cleanup := setupNatsServer()
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, "test_update_cache")
|
||||
// cleanupStream deletes a JetStream stream if it exists.
|
||||
func cleanupStream(nc *nats.Conn, streamName string) {
|
||||
js, err := nc.JetStream()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store: %v", err)
|
||||
}
|
||||
defer store.Close(ctx)
|
||||
|
||||
actorID := "test-actor-1"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
cachedVersion int64
|
||||
newVersion int64
|
||||
expectUpdate bool
|
||||
expectVersion int64
|
||||
}{
|
||||
{
|
||||
name: "update when new version is greater",
|
||||
cachedVersion: 5,
|
||||
newVersion: 10,
|
||||
expectUpdate: true,
|
||||
expectVersion: 10,
|
||||
},
|
||||
{
|
||||
name: "do not update when new version is equal",
|
||||
cachedVersion: 5,
|
||||
newVersion: 5,
|
||||
expectUpdate: false,
|
||||
expectVersion: 5,
|
||||
},
|
||||
{
|
||||
name: "do not update when new version is less",
|
||||
cachedVersion: 10,
|
||||
newVersion: 5,
|
||||
expectUpdate: false,
|
||||
expectVersion: 10,
|
||||
},
|
||||
{
|
||||
name: "update when no cached version exists",
|
||||
cachedVersion: 0,
|
||||
newVersion: 1,
|
||||
expectUpdate: true,
|
||||
expectVersion: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Set up cached version
|
||||
store.versions = make(map[string]int64)
|
||||
store.versions[actorID] = tt.cachedVersion
|
||||
|
||||
// Call UpdateVersionCache
|
||||
store.UpdateVersionCache(actorID, tt.newVersion)
|
||||
|
||||
// Verify result
|
||||
if tt.expectUpdate {
|
||||
if version, ok := store.versions[actorID]; !ok {
|
||||
t.Error("Expected version to be updated but it wasn't cached")
|
||||
} else if version != tt.expectVersion {
|
||||
t.Errorf("Expected version %d, got %d", tt.expectVersion, version)
|
||||
}
|
||||
} else {
|
||||
if version, ok := store.versions[actorID]; !ok {
|
||||
t.Error("Expected version to remain cached")
|
||||
} else if version != tt.expectVersion {
|
||||
t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateVersionCache_Concurrent(t *testing.T) {
|
||||
s, nc, cleanup := setupNatsServer()
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store: %v", err)
|
||||
}
|
||||
defer store.Close(ctx)
|
||||
|
||||
actorID := "concurrent-actor"
|
||||
store.versions[actorID] = 1
|
||||
|
||||
const numGoroutines = 50
|
||||
const maxVersion = 100
|
||||
|
||||
var done = make(chan struct{})
|
||||
var updates int32
|
||||
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
version := int64(1 + (i % maxVersion))
|
||||
go func(v int64) {
|
||||
store.UpdateVersionCache(actorID, v)
|
||||
select {
|
||||
case <-done:
|
||||
default:
|
||||
updates++
|
||||
}
|
||||
}(version)
|
||||
}
|
||||
|
||||
close(done)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
finalVersion := store.versions[actorID]
|
||||
if finalVersion > maxVersion {
|
||||
t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeToEventStored(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
s, nc, cleanup := setupNatsServer()
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store: %v", err)
|
||||
}
|
||||
defer store.Close(ctx)
|
||||
|
||||
eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "")
|
||||
if eventBusWithStore == nil {
|
||||
t.Fatalf("Failed to create event bus with broadcaster")
|
||||
}
|
||||
defer eventBusWithStore.Stop()
|
||||
|
||||
ch := eventBusWithStore.SubscribeToEventStored("*")
|
||||
if ch == nil {
|
||||
t.Fatal("SubscribeToEventStored returned nil channel")
|
||||
}
|
||||
|
||||
actorID := "subscribe-test-actor"
|
||||
event := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "TestEvent",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"key": "value"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
eventBusWithStore.Publish("", event)
|
||||
|
||||
select {
|
||||
case receivedEvent := <-ch:
|
||||
if receivedEvent.EventType != aether.EventTypeEventStored {
|
||||
t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType)
|
||||
}
|
||||
if receivedEvent.ActorID != actorID {
|
||||
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
|
||||
}
|
||||
data, ok := receivedEvent.Data["actorId"].(string)
|
||||
if !ok || data != actorID {
|
||||
t.Errorf("Expected actorId in data to be %s", actorID)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Timeout waiting for EventStored event")
|
||||
return
|
||||
}
|
||||
err = js.DeleteStream(streamName)
|
||||
// Silently ignore errors - we just want to clean up
|
||||
_ = err
|
||||
}
|
||||
|
||||
// TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting
|
||||
// on a single node (local loopback).
|
||||
func TestCrossNodeBroadcasting_SingleNode(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
require.NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
s, nc, cleanup := setupNatsServer()
|
||||
defer cleanup()
|
||||
streamName := generateStreamName("broadcast_single")
|
||||
cleanupStream(nc, streamName)
|
||||
|
||||
ctx := context.Background()
|
||||
// Create NATS event bus
|
||||
natsBus, err := aether.NewNATSEventBus(nc)
|
||||
require.NoError(t, err)
|
||||
defer natsBus.Stop()
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store: %v", err)
|
||||
}
|
||||
defer store.Close(ctx)
|
||||
// Create event store with broadcaster
|
||||
store, err := NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
streamName,
|
||||
natsBus,
|
||||
"tenant-single",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
|
||||
defer eventBus.Stop()
|
||||
// Subscribe to events
|
||||
eventCh := natsBus.Subscribe("tenant-single")
|
||||
|
||||
actorID := "broadcast-test-actor-1"
|
||||
localCh := eventBus.Subscribe("")
|
||||
|
||||
event := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "OrderPlaced",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"total": 99.99},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
eventBus.Publish("", event)
|
||||
|
||||
select {
|
||||
case receivedEvent := <-localCh:
|
||||
if receivedEvent.EventType != "OrderPlaced" {
|
||||
t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType)
|
||||
}
|
||||
if receivedEvent.ActorID != actorID {
|
||||
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Timeout waiting for broadcast event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
s1, nc1, cleanup1 := setupNatsServer()
|
||||
defer cleanup1()
|
||||
|
||||
s2, nc2, cleanup2 := setupNatsServer()
|
||||
defer cleanup2()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store 1: %v", err)
|
||||
}
|
||||
|
||||
store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store 2: %v", err)
|
||||
}
|
||||
|
||||
eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "")
|
||||
eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "")
|
||||
defer eventBus1.Stop()
|
||||
defer eventBus2.Stop()
|
||||
|
||||
actorID := "multi-node-actor"
|
||||
receiverCh := eventBus2.Subscribe("")
|
||||
|
||||
event := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "InventoryReserved",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"quantity": 5},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
eventBus1.Publish("", event)
|
||||
|
||||
select {
|
||||
case receivedEvent := <-receiverCh:
|
||||
if receivedEvent.EventType != "InventoryReserved" {
|
||||
t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType)
|
||||
}
|
||||
if receivedEvent.ActorID != actorID {
|
||||
t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("Timeout waiting for cross-node event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
s, nc, cleanup := setupNatsServer()
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create tenant A store: %v", err)
|
||||
}
|
||||
|
||||
tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create tenant B store: %v", err)
|
||||
}
|
||||
|
||||
tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a")
|
||||
tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b")
|
||||
defer tenantAEventBus.Stop()
|
||||
defer tenantBEventBus.Stop()
|
||||
|
||||
tenantACh := tenantAEventBus.Subscribe("tenant-a")
|
||||
tenantBCh := tenantBEventBus.Subscribe("tenant-b")
|
||||
|
||||
actorID := "tenant-actor"
|
||||
event := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
EventType: "TenantEvent",
|
||||
ActorID: actorID,
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"data": "tenant-a"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
tenantAEventBus.Publish("tenant-a", event)
|
||||
|
||||
select {
|
||||
case receivedEvent := <-tenantACh:
|
||||
if receivedEvent.EventType != "TenantEvent" {
|
||||
t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("Timeout waiting for tenant A to receive event")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-tenantBCh:
|
||||
t.Error("Tenant B should not receive tenant A's events")
|
||||
case <-time.After(1 * time.Second):
|
||||
// Expected - tenant B should not receive events from tenant A
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateVersionCache_EventStored(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
s, nc, cleanup := setupNatsServer()
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create store: %v", err)
|
||||
}
|
||||
|
||||
eventBus := NewNATSEventBusWithBroadcaster(nc, store, "")
|
||||
defer eventBus.Stop()
|
||||
|
||||
actorID := "version-cache-actor"
|
||||
store.UpdateVersionCache(actorID, 5)
|
||||
|
||||
event := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
// Save event
|
||||
testEvent := &aether.Event{
|
||||
ID: "event-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: actorID,
|
||||
Version: 10,
|
||||
Data: map[string]interface{}{"test": true},
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"test": "single"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
eventBus.Publish("", event)
|
||||
err = store.SaveEvent(testEvent)
|
||||
require.NoError(t, err)
|
||||
log.Printf("Saved event: %s", testEvent.ID)
|
||||
|
||||
// Receive event via NATS (NATSBroadcast may have wrapped in EventStored)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case received := <-eventCh:
|
||||
// The received event should have the same actor and version
|
||||
assert.Equal(t, "actor-1", received.ActorID)
|
||||
assert.Equal(t, int64(1), received.Version)
|
||||
// Event type might be original or EventStored wrapper
|
||||
if received.EventType == aether.EventTypeEventStored {
|
||||
assert.Equal(t, "actor-1", received.Data["actorId"].(string))
|
||||
log.Printf("Received EventStored wrapper: %s", received.ID)
|
||||
} else {
|
||||
log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Did not receive event via NATS")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes.
|
||||
func TestCrossNodeBroadcasting_MultiNode(t *testing.T) {
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
require.NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
streamName := generateStreamName("broadcast_multi")
|
||||
cleanupStream(nc, streamName)
|
||||
|
||||
// Create Node A
|
||||
nodeANatsBus, err := aether.NewNATSEventBus(nc)
|
||||
require.NoError(t, err)
|
||||
defer nodeANatsBus.Stop()
|
||||
|
||||
nodeAStore, err := NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
streamName,
|
||||
nodeANatsBus,
|
||||
"tenant-multi",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create Node B
|
||||
nodeBNatsBus, err := aether.NewNATSEventBus(nc)
|
||||
require.NoError(t, err)
|
||||
defer nodeBNatsBus.Stop()
|
||||
|
||||
nodeBStore, err := NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
streamName,
|
||||
nodeBNatsBus,
|
||||
"tenant-multi",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Node A subscribes
|
||||
nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi")
|
||||
|
||||
// Node B subscribes
|
||||
nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi")
|
||||
|
||||
// Node A saves event
|
||||
eventA := &aether.Event{
|
||||
ID: "event-node-a",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "multi-actor",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"node": "a"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = nodeAStore.SaveEvent(eventA)
|
||||
require.NoError(t, err)
|
||||
log.Printf("Node A saved: %s", eventA.ID)
|
||||
|
||||
// Node B receives event (EventStored wrapper or original)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case received := <-nodeBEventCh:
|
||||
// Check actor and version match
|
||||
assert.Equal(t, "multi-actor", received.ActorID)
|
||||
assert.Equal(t, int64(1), received.Version)
|
||||
if received.EventType == aether.EventTypeEventStored {
|
||||
// EventStored wrapper
|
||||
assert.Equal(t, "multi-actor", received.Data["actorId"].(string))
|
||||
} else {
|
||||
// Original event
|
||||
assert.Equal(t, "a", received.Data["node"])
|
||||
}
|
||||
log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version)
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Node B did not receive event")
|
||||
}
|
||||
|
||||
// Give Node B time to receive and process Node A's event
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
storedVersion, err := store.GetLatestVersion(actorID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get latest version: %v", err)
|
||||
// Node B saves event with different actor
|
||||
eventB := &aether.Event{
|
||||
ID: "event-node-b",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "multi-actor-b", // Different actor
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"node": "b"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
if storedVersion != 10 {
|
||||
t.Errorf("Expected version 10, got %d", storedVersion)
|
||||
}
|
||||
err = nodeBStore.SaveEvent(eventB)
|
||||
require.NoError(t, err)
|
||||
log.Printf("Node B saved: %s", eventB.ID)
|
||||
|
||||
cacheVersion, ok := store.GetCachedVersion(actorID)
|
||||
if !ok {
|
||||
t.Error("Expected version to be in cache")
|
||||
} else if cacheVersion != 10 {
|
||||
t.Errorf("Expected cached version 10, got %d", cacheVersion)
|
||||
// Node A receives Node B's event (could be EventStored or original)
|
||||
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel2()
|
||||
|
||||
// Wait for either EventStored or the actual event
|
||||
received := false
|
||||
for !received {
|
||||
select {
|
||||
case event := <-nodeAEventCh:
|
||||
// Check for Node B's event
|
||||
if event.EventType == aether.EventTypeEventStored {
|
||||
// EventStored wrapper - check actorId
|
||||
actorID := event.Data["actorId"].(string)
|
||||
if actorID == "multi-actor-b" {
|
||||
received = true
|
||||
log.Printf("Node A received EventStored for Node B's event: %s", event.ID)
|
||||
}
|
||||
} else if event.ActorID == "multi-actor-b" {
|
||||
received = true
|
||||
log.Printf("Node A received Node B's event: %s", event.ID)
|
||||
}
|
||||
// Keep listening until we get Node B's event
|
||||
case <-ctx2.Done():
|
||||
if !received {
|
||||
t.Fatal("Node A did not receive Node B's event")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestUpdateVersionCache tests the version cache update logic.
|
||||
func TestUpdateVersionCache(t *testing.T) {
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
require.NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
streamName := generateStreamName("cache_test")
|
||||
cleanupStream(nc, streamName)
|
||||
|
||||
store, err := NewJetStreamEventStore(nc, streamName)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Save event version 1
|
||||
event1 := &aether.Event{
|
||||
ID: "event-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err = store.SaveEvent(event1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// EventStored will trigger UpdateVersionCache
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Save event version 2 - should succeed (version > 1)
|
||||
event2 := &aether.Event{
|
||||
ID: "event-2",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 2,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err = store.SaveEvent(event2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Save event version 3 - should succeed (version > 2)
|
||||
event3 := &aether.Event{
|
||||
ID: "event-3",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 3,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err = store.SaveEvent(event3)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify all events can be retrieved
|
||||
events, err := store.GetEvents("actor-1", 0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, events, 3)
|
||||
|
||||
// Verify latest version
|
||||
latest, err := store.GetLatestVersion("actor-1")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(3), latest)
|
||||
|
||||
// Manually update cache with version 5 (simulating external update)
|
||||
store.UpdateVersionCache("actor-1", 5)
|
||||
|
||||
// Verify version 4 would conflict (4 < cached 5)
|
||||
event4 := &aether.Event{
|
||||
ID: "event-4",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 4,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err = store.SaveEvent(event4)
|
||||
assert.Error(t, err, "version 4 should conflict with cached version 5")
|
||||
|
||||
// Version 6 should succeed (6 > 5)
|
||||
event6 := &aether.Event{
|
||||
ID: "event-6",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 6,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
err = store.SaveEvent(event6)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify version 6 was saved
|
||||
latest, err = store.GetLatestVersion("actor-1")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(6), latest)
|
||||
}
|
||||
|
||||
// TestSubscribeToEventStored tests the convenience helper.
|
||||
func TestSubscribeToEventStored(t *testing.T) {
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
require.NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
natsBus, err := aether.NewNATSEventBus(nc)
|
||||
require.NoError(t, err)
|
||||
defer natsBus.Stop()
|
||||
|
||||
// Use helper
|
||||
eventStoredCh := natsBus.SubscribeToEventStored("test-store")
|
||||
|
||||
// Verify channel is created
|
||||
assert.NotNil(t, eventStoredCh)
|
||||
|
||||
// Publish EventStored manually (version will be float64 from JSON)
|
||||
eventStored := &aether.Event{
|
||||
ID: "stored-1",
|
||||
EventType: aether.EventTypeEventStored,
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{
|
||||
"actorId": "actor-1",
|
||||
"version": 1.0, // Use float64 to match JSON encoding
|
||||
},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
natsBus.Publish("test-store", eventStored)
|
||||
|
||||
// Should receive the EventStored
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case received := <-eventStoredCh:
|
||||
assert.Equal(t, aether.EventTypeEventStored, received.EventType)
|
||||
assert.Equal(t, "actor-1", received.ActorID)
|
||||
log.Printf("Received EventStored: %s", received.ID)
|
||||
case <-ctx.Done():
|
||||
t.Fatal("Did not receive EventStored")
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation.
|
||||
func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) {
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
require.NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
streamName := generateStreamName("namespace_isolation")
|
||||
cleanupStream(nc, streamName)
|
||||
|
||||
// Create stores with different namespaces
|
||||
storeA, err := NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
streamName,
|
||||
nil,
|
||||
"tenant-a",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
storeB, err := NewJetStreamEventStoreWithBroadcaster(
|
||||
nc,
|
||||
streamName,
|
||||
nil,
|
||||
"tenant-b",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Save to each namespace
|
||||
eventA := &aether.Event{
|
||||
ID: "event-a",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-a",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"tenant": "a"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = storeA.SaveEvent(eventA)
|
||||
require.NoError(t, err)
|
||||
|
||||
eventB := &aether.Event{
|
||||
ID: "event-b",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-b",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"tenant": "b"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
err = storeB.SaveEvent(eventB)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify each store can see its own events
|
||||
eventsA, err := storeA.GetEvents("actor-a", 0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, eventsA, 1)
|
||||
assert.Equal(t, "a", eventsA[0].Data["tenant"])
|
||||
|
||||
eventsB, err := storeB.GetEvents("actor-b", 0)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, eventsB, 1)
|
||||
assert.Equal(t, "b", eventsB[0].Data["tenant"])
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -287,28 +286,6 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||
|
||||
// publishEventStored publishes an EventStored event to the broadcaster.
|
||||
// This is called after a successful SaveEvent to notify subscribers.
|
||||
//
|
||||
// EventStored Event Schema:
|
||||
// - EventType: "EventStored" (aether.EventTypeEventStored)
|
||||
// - ActorID: ID of the actor that the original event was about
|
||||
// - Version: version of the stored event
|
||||
// - Data:
|
||||
// - eventId: (string) ID of the stored event
|
||||
// - actorId: (string) ID of the actor
|
||||
// - version: (int64) version of the event
|
||||
// - timestamp: (int64) Unix timestamp of when the event was stored
|
||||
//
|
||||
// Example usage with NATSEventBus:
|
||||
//
|
||||
// eventBus := aether.NewNATSEventBus(natsConn)
|
||||
// store := store.NewJetStreamEventStoreWithBroadcaster(natsConn, "events", eventBus, "")
|
||||
// ch := eventBus.SubscribeToEventStored("*")
|
||||
//
|
||||
// for event := range ch {
|
||||
// actorID := event.Data["actorId"].(string)
|
||||
// version := event.Data["version"].(int64)
|
||||
// store.UpdateVersionCache(actorID, version)
|
||||
// }
|
||||
func (jes *JetStreamEventStore) publishEventStored(originalEvent *aether.Event) {
|
||||
eventStored := &aether.Event{
|
||||
ID: uuid.New().String(),
|
||||
@@ -584,9 +561,6 @@ func sanitizeSubject(s string) string {
|
||||
// UpdateVersionCache updates the version cache for a specific actor.
|
||||
// This is used when receiving events from other nodes via NATS to keep
|
||||
// the version cache consistent across cluster nodes.
|
||||
//
|
||||
// Only updates if the new version is greater than the cached version to prevent
|
||||
// stale cache entries from causing version conflicts.
|
||||
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
|
||||
jes.mu.Lock()
|
||||
defer jes.mu.Unlock()
|
||||
@@ -597,27 +571,5 @@ func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64
|
||||
}
|
||||
}
|
||||
|
||||
// GetCachedVersion returns the cached version for an actor, if available.
|
||||
func (jes *JetStreamEventStore) GetCachedVersion(actorID string) (int64, bool) {
|
||||
jes.mu.Lock()
|
||||
defer jes.mu.Unlock()
|
||||
|
||||
version, ok := jes.versions[actorID]
|
||||
return version, ok
|
||||
}
|
||||
|
||||
// SetBroadcaster sets the event broadcaster for this store.
|
||||
// The broadcaster is used to publish EventStored events when events are saved.
|
||||
func (jes *JetStreamEventStore) SetBroadcaster(broadcaster aether.EventBroadcaster) {
|
||||
jes.mu.Lock()
|
||||
defer jes.mu.Unlock()
|
||||
jes.broadcaster = broadcaster
|
||||
}
|
||||
|
||||
// Close closes the JetStream event store and cleans up resources.
|
||||
func (jes *JetStreamEventStore) Close(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
||||
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
||||
|
||||
Reference in New Issue
Block a user