All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether. Changes: - UpdateVersionCache method in JetStreamEventStore - SubscribeToEventStored helper in NATSEventBus - Integration tests for cross-node scenarios - Example code demonstrating NATSEventBus + JetStreamEventStore Tests: All integration tests passing. Co-authored-by: Claude Code <noreply@anthropic.com> Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one> Reviewed-on: #151
2.9 KiB
2.9 KiB
Issue: Implement VM/Runtime for Actors
Problem
Only interfaces exist for Runtime and VirtualMachine in cluster/types.go and cluster/distributed.go, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
Required Components
1. VM Implementation (cluster/vm.go - new)
type VirtualMachine struct {
actorID string
eventStore aether.EventStore
state map[string]interface{}
version int64
}
Methods needed:
GetID(),GetActorID(),GetState()- already in interfaceStart()- replay events to rebuild stateProcessEvent(event *aether.Event)- apply event to stateStop()- persist final stateGetVersion()- current event version
2. Runtime Implementation (cluster/runtime.go - new)
type Runtime struct {
natsConn *nats.Conn
eventStore aether.EventStore
vmRegistry VMRegistry // map[actorID]*VirtualMachine
config RuntimeConfig
}
Methods needed:
Start()- initialize and start processingLoadModel(model eventstorming.Model)- register domain typesSendMessage(message RuntimeMessage)- route to appropriate VMGetActiveVMs()- return map of active VMsCreateVM(actorID string)- create new VM instanceStopVM(actorID string)- persist and stop VM
3. Event Processing
- Subscribe to actor's event stream
- Replay events to build initial state
- Apply new events as they arrive
- Handle event versions and conflicts
Suggested Design
VM Lifecycle
1. Actor message arrives for actor-123
2. Runtime checks if VM exists for actor-123
3. If not, create VM:
- Replay events from event store
- Rebuild state
4. Route message to VM
5. VM processes message -> creates new events
6. Events persisted to event store
7. VM state updated
State Management
- State derived from event replay
- No separate state store needed
- Can snapshot periodically for performance
- Version conflict handling using existing EventStore
Implementation Steps
- Create VM struct in
cluster/vm.go - Implement event replay to rebuild state
- Create Runtime in
cluster/runtime.go - Register Runtime with cluster via
SetVMProvider - Implement message processing - validate against model
- Add version conflict handling using existing EventStore
- Write tests - mock event store, test state transitions
File Structure
cluster/
├── vm.go # VirtualMachine implementation
├── runtime.go # Runtime implementation
├── vm_test.go # VM tests
├── runtime_test.go # Runtime tests
└── integration_test.go # Integration tests
Acceptance Criteria
- VM can be created with actor ID
- VM replays events to build state
- VM processes events and updates state
- VM persists current version
- Runtime can create/stop VMs
- Runtime manages VM registry
- Integration test with NATS and JetStream