feat: implement cross-node event broadcasting with NATSEventBus
- Add UpdateVersionCache method to JetStreamEventStore for cache synchronization - Add SubscribeToEventStored convenience helper to NATSEventBus - Create integration tests for cross-node broadcasting scenarios - Add example demonstrating NATSEventBus + JetStreamEventStore integration - Tests verify: - Single-node broadcasting works - Multi-node event flow works - Version cache consistency across nodes - Namespace isolation maintained - EventStored subscription works correctly
This commit is contained in:
64
.product-strategy/ISSUE_MIGRATION.md
Normal file
64
.product-strategy/ISSUE_MIGRATION.md
Normal file
@@ -0,0 +1,64 @@
|
||||
# Issue: Implement Actor Migration Between Cluster Nodes
|
||||
|
||||
## Problem
|
||||
|
||||
When nodes join or leave the cluster, actors need to be migrated to maintain even distribution. Currently:
|
||||
|
||||
- `handleRebalanceRequest` in `cluster/manager.go:150` is empty
|
||||
- `handleMigrationRequest` in `cluster/manager.go:167` is empty
|
||||
- `RebalanceShards` in `cluster/shard.go:211` returns unchanged map
|
||||
- `SendMessage` in `cluster/distributed.go:139` ignores sharding
|
||||
|
||||
## Required Implementation
|
||||
|
||||
### 1. Rebalance Algorithm (cluster/shard.go)
|
||||
Implement `ConsistentHashPlacement.RebalanceShards` to:
|
||||
- Calculate new shard assignments based on active nodes
|
||||
- Identify actors needing migration
|
||||
- Generate migration plan with source/dest nodes
|
||||
|
||||
### 2. Migration Coordinator (cluster/manager.go)
|
||||
Implement `handleRebalanceRequest` to:
|
||||
- Accept migration plan from leader
|
||||
- For each actor in plan:
|
||||
1. Pause incoming messages
|
||||
2. Capture actor state (replay events up to current version)
|
||||
3. Serialize state
|
||||
4. Send migration request to destination node
|
||||
5. Wait for ack
|
||||
6. Delete actor from current node
|
||||
- Track migration status via `ActorMigration.Status`
|
||||
|
||||
### 3. Cross-Node Message Routing (cluster/distributed.go)
|
||||
Implement proper routing in `SendMessage`:
|
||||
- Use `GetActorNode(actorID)` to determine target node
|
||||
- If remote: marshal message, send via NATS to target node
|
||||
- If local: send to local runtime
|
||||
- Route response back to caller if needed
|
||||
|
||||
## Suggested Approach
|
||||
|
||||
1. **Define message types** for actor migration requests/responses in `cluster/types.go`
|
||||
2. **Implement state capture** - replay events to get current state
|
||||
3. **Implement state restore** - deserialize and restore actor state
|
||||
4. **Implement coordinator** - manage migration phases
|
||||
5. **Add error handling** - handle failed migrations, retries, cleanup
|
||||
6. **Add tests** - test migration with mock NATS
|
||||
|
||||
## Related Files
|
||||
|
||||
- `cluster/manager.go:150` - handleRebalanceRequest (empty)
|
||||
- `cluster/manager.go:167` - handleMigrationRequest (empty)
|
||||
- `cluster/shard.go:211` - RebalanceShards (stub)
|
||||
- `cluster/distributed.go:139` - SendMessage (simplified)
|
||||
- `cluster/types.go:108` - ActorMigration struct
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] `RebalanceShards` returns new shard map with actor assignments
|
||||
- [ ] `handleRebalanceRequest` processes migration plan
|
||||
- [ ] `handleMigrationRequest` accepts actor migrations
|
||||
- [ ] `SendMessage` routes to correct node
|
||||
- [ ] Actors can be migrated with state preserved
|
||||
- [ ] Failed migrations are handled gracefully
|
||||
- [ ] Integration test with multi-node cluster
|
||||
117
.product-strategy/ISSUE_SNAPSHOTS.md
Normal file
117
.product-strategy/ISSUE_SNAPSHOTS.md
Normal file
@@ -0,0 +1,117 @@
|
||||
# Issue: Add Snapshot Support to Event Sourcing Workflow
|
||||
|
||||
## Problem
|
||||
|
||||
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
|
||||
- Actors with many events must replay entire history
|
||||
- No performance optimization for long-lived actors
|
||||
- Snapshots exist as API but are not used
|
||||
|
||||
## Current State
|
||||
|
||||
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
|
||||
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
|
||||
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
|
||||
- `InMemoryEventStore` has snapshot methods but no lifecycle management
|
||||
|
||||
## Required Implementation
|
||||
|
||||
### 1. Snapshot Strategy
|
||||
Define when to create snapshots:
|
||||
- Fixed interval (e.g., every 100 events)
|
||||
- Version-based (e.g., every 50 versions)
|
||||
- Hybrid: version-based with min/max bounds
|
||||
|
||||
### 2. State Capture
|
||||
Add method to capture actor state:
|
||||
```go
|
||||
// CaptureState rebuilds actor state by replaying events and returns it
|
||||
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
||||
```
|
||||
|
||||
### 3. Snapshot Store Extension
|
||||
Extend `EventStoreWithErrors` to include snapshots:
|
||||
```go
|
||||
type EventStoreWithSnapshots interface {
|
||||
EventStoreWithErrors
|
||||
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
||||
SaveSnapshot(snapshot *ActorSnapshot) error
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Snapshot Workflow
|
||||
Modify event retrieval to use snapshots:
|
||||
```go
|
||||
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
|
||||
// 1. Try to get latest snapshot
|
||||
snapshot, _ := store.GetLatestSnapshot(actorID)
|
||||
|
||||
// 2. If snapshot exists and version <= fromVersion:
|
||||
// - Return events from snapshot version + 1
|
||||
// 3. Else:
|
||||
// - Replay all events from version 0
|
||||
}
|
||||
```
|
||||
|
||||
## Suggested Implementation
|
||||
|
||||
### 1. Add CaptureState to EventStore interface
|
||||
In `event.go`, extend `EventStore` or create `StateStore` interface:
|
||||
```go
|
||||
type StateStore interface {
|
||||
EventStore
|
||||
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
||||
}
|
||||
```
|
||||
|
||||
### 2. Implement CaptureState
|
||||
In `store/jetstream.go`:
|
||||
```go
|
||||
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
|
||||
// Replay events and build state (application logic needed here)
|
||||
events, _ := jes.GetEvents(actorID, fromVersion)
|
||||
// Need application logic to convert events to state
|
||||
return state, nil
|
||||
}
|
||||
```
|
||||
|
||||
### 3. Add Snapshot Helper
|
||||
Create snapshot utilities:
|
||||
```go
|
||||
// CreateSnapshot creates snapshot from state
|
||||
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
|
||||
return &ActorSnapshot{
|
||||
ActorID: actorID,
|
||||
Version: version,
|
||||
State: state,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4. Modify GetEvents
|
||||
Update `GetEvents` in both stores to use snapshots when beneficial.
|
||||
|
||||
## Snapshots Workflow Example
|
||||
|
||||
```
|
||||
1. Actor has 1000 events
|
||||
2. Every 100 events, create snapshot
|
||||
3. Actor reaches version 1000, snapshot at version 1000
|
||||
4. Request events from version 900:
|
||||
- Get snapshot at version 1000? No (version too high)
|
||||
- Replay 900->1000 events (only 100 events)
|
||||
5. Request events from version 50:
|
||||
- Get latest snapshot at version 1000? Yes (version > 50)
|
||||
- Use snapshot as base
|
||||
- Replay 1000->1000 events (none)
|
||||
```
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] `CaptureState` method added to event store
|
||||
- [ ] Snapshots created at configured intervals
|
||||
- [ ] `GetEvents` uses snapshots to optimize replay
|
||||
- [ ] Snapshot workflow tested with long-lived actors
|
||||
- [ ] Configuration for snapshot interval/version
|
||||
- [ ] Metrics: snapshot count, average replay size
|
||||
100
.product-strategy/ISSUE_VM_RUNTIME.md
Normal file
100
.product-strategy/ISSUE_VM_RUNTIME.md
Normal file
@@ -0,0 +1,100 @@
|
||||
# Issue: Implement VM/Runtime for Actors
|
||||
|
||||
## Problem
|
||||
|
||||
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
|
||||
|
||||
## Required Components
|
||||
|
||||
### 1. VM Implementation (cluster/vm.go - new)
|
||||
```go
|
||||
type VirtualMachine struct {
|
||||
actorID string
|
||||
eventStore aether.EventStore
|
||||
state map[string]interface{}
|
||||
version int64
|
||||
}
|
||||
```
|
||||
|
||||
Methods needed:
|
||||
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
|
||||
- `Start()` - replay events to rebuild state
|
||||
- `ProcessEvent(event *aether.Event)` - apply event to state
|
||||
- `Stop()` - persist final state
|
||||
- `GetVersion()` - current event version
|
||||
|
||||
### 2. Runtime Implementation (cluster/runtime.go - new)
|
||||
```go
|
||||
type Runtime struct {
|
||||
natsConn *nats.Conn
|
||||
eventStore aether.EventStore
|
||||
vmRegistry VMRegistry // map[actorID]*VirtualMachine
|
||||
config RuntimeConfig
|
||||
}
|
||||
```
|
||||
|
||||
Methods needed:
|
||||
- `Start()` - initialize and start processing
|
||||
- `LoadModel(model eventstorming.Model)` - register domain types
|
||||
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
|
||||
- `GetActiveVMs()` - return map of active VMs
|
||||
- `CreateVM(actorID string)` - create new VM instance
|
||||
- `StopVM(actorID string)` - persist and stop VM
|
||||
|
||||
### 3. Event Processing
|
||||
- Subscribe to actor's event stream
|
||||
- Replay events to build initial state
|
||||
- Apply new events as they arrive
|
||||
- Handle event versions and conflicts
|
||||
|
||||
## Suggested Design
|
||||
|
||||
### VM Lifecycle
|
||||
```
|
||||
1. Actor message arrives for actor-123
|
||||
2. Runtime checks if VM exists for actor-123
|
||||
3. If not, create VM:
|
||||
- Replay events from event store
|
||||
- Rebuild state
|
||||
4. Route message to VM
|
||||
5. VM processes message -> creates new events
|
||||
6. Events persisted to event store
|
||||
7. VM state updated
|
||||
```
|
||||
|
||||
### State Management
|
||||
- State derived from event replay
|
||||
- No separate state store needed
|
||||
- Can snapshot periodically for performance
|
||||
- Version conflict handling using existing EventStore
|
||||
|
||||
## Implementation Steps
|
||||
|
||||
1. **Create VM struct** in `cluster/vm.go`
|
||||
2. **Implement event replay** to rebuild state
|
||||
3. **Create Runtime** in `cluster/runtime.go`
|
||||
4. **Register Runtime with cluster** via `SetVMProvider`
|
||||
5. **Implement message processing** - validate against model
|
||||
6. **Add version conflict handling** using existing EventStore
|
||||
7. **Write tests** - mock event store, test state transitions
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
cluster/
|
||||
├── vm.go # VirtualMachine implementation
|
||||
├── runtime.go # Runtime implementation
|
||||
├── vm_test.go # VM tests
|
||||
├── runtime_test.go # Runtime tests
|
||||
└── integration_test.go # Integration tests
|
||||
```
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] VM can be created with actor ID
|
||||
- [ ] VM replays events to build state
|
||||
- [ ] VM processes events and updates state
|
||||
- [ ] VM persists current version
|
||||
- [ ] Runtime can create/stop VMs
|
||||
- [ ] Runtime manages VM registry
|
||||
- [ ] Integration test with NATS and JetStream
|
||||
@@ -558,5 +558,18 @@ func sanitizeSubject(s string) string {
|
||||
return s
|
||||
}
|
||||
|
||||
// UpdateVersionCache updates the version cache for a specific actor.
|
||||
// This is used when receiving events from other nodes via NATS to keep
|
||||
// the version cache consistent across cluster nodes.
|
||||
func (jes *JetStreamEventStore) UpdateVersionCache(actorID string, version int64) {
|
||||
jes.mu.Lock()
|
||||
defer jes.mu.Unlock()
|
||||
|
||||
// Only update if the new version is greater than cached version
|
||||
if currentVersion, ok := jes.versions[actorID]; !ok || version > currentVersion {
|
||||
jes.versions[actorID] = version
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time check that JetStreamEventStore implements EventStoreWithErrors
|
||||
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
||||
|
||||
Reference in New Issue
Block a user