All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether. Changes: - UpdateVersionCache method in JetStreamEventStore - SubscribeToEventStored helper in NATSEventBus - Integration tests for cross-node scenarios - Example code demonstrating NATSEventBus + JetStreamEventStore Tests: All integration tests passing. Co-authored-by: Claude Code <noreply@anthropic.com> Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one> Reviewed-on: #151
100 lines
2.9 KiB
Markdown
100 lines
2.9 KiB
Markdown
# Issue: Implement VM/Runtime for Actors
|
|
|
|
## Problem
|
|
|
|
Only interfaces exist for `Runtime` and `VirtualMachine` in `cluster/types.go` and `cluster/distributed.go`, but no actual implementation. Actors cannot be created, started, stopped, or have their state managed.
|
|
|
|
## Required Components
|
|
|
|
### 1. VM Implementation (cluster/vm.go - new)
|
|
```go
|
|
type VirtualMachine struct {
|
|
actorID string
|
|
eventStore aether.EventStore
|
|
state map[string]interface{}
|
|
version int64
|
|
}
|
|
```
|
|
|
|
Methods needed:
|
|
- `GetID()`, `GetActorID()`, `GetState()` - already in interface
|
|
- `Start()` - replay events to rebuild state
|
|
- `ProcessEvent(event *aether.Event)` - apply event to state
|
|
- `Stop()` - persist final state
|
|
- `GetVersion()` - current event version
|
|
|
|
### 2. Runtime Implementation (cluster/runtime.go - new)
|
|
```go
|
|
type Runtime struct {
|
|
natsConn *nats.Conn
|
|
eventStore aether.EventStore
|
|
vmRegistry VMRegistry // map[actorID]*VirtualMachine
|
|
config RuntimeConfig
|
|
}
|
|
```
|
|
|
|
Methods needed:
|
|
- `Start()` - initialize and start processing
|
|
- `LoadModel(model eventstorming.Model)` - register domain types
|
|
- `SendMessage(message RuntimeMessage)` - route to appropriate VM
|
|
- `GetActiveVMs()` - return map of active VMs
|
|
- `CreateVM(actorID string)` - create new VM instance
|
|
- `StopVM(actorID string)` - persist and stop VM
|
|
|
|
### 3. Event Processing
|
|
- Subscribe to actor's event stream
|
|
- Replay events to build initial state
|
|
- Apply new events as they arrive
|
|
- Handle event versions and conflicts
|
|
|
|
## Suggested Design
|
|
|
|
### VM Lifecycle
|
|
```
|
|
1. Actor message arrives for actor-123
|
|
2. Runtime checks if VM exists for actor-123
|
|
3. If not, create VM:
|
|
- Replay events from event store
|
|
- Rebuild state
|
|
4. Route message to VM
|
|
5. VM processes message -> creates new events
|
|
6. Events persisted to event store
|
|
7. VM state updated
|
|
```
|
|
|
|
### State Management
|
|
- State derived from event replay
|
|
- No separate state store needed
|
|
- Can snapshot periodically for performance
|
|
- Version conflict handling using existing EventStore
|
|
|
|
## Implementation Steps
|
|
|
|
1. **Create VM struct** in `cluster/vm.go`
|
|
2. **Implement event replay** to rebuild state
|
|
3. **Create Runtime** in `cluster/runtime.go`
|
|
4. **Register Runtime with cluster** via `SetVMProvider`
|
|
5. **Implement message processing** - validate against model
|
|
6. **Add version conflict handling** using existing EventStore
|
|
7. **Write tests** - mock event store, test state transitions
|
|
|
|
## File Structure
|
|
|
|
```
|
|
cluster/
|
|
├── vm.go # VirtualMachine implementation
|
|
├── runtime.go # Runtime implementation
|
|
├── vm_test.go # VM tests
|
|
├── runtime_test.go # Runtime tests
|
|
└── integration_test.go # Integration tests
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] VM can be created with actor ID
|
|
- [ ] VM replays events to build state
|
|
- [ ] VM processes events and updates state
|
|
- [ ] VM persists current version
|
|
- [ ] Runtime can create/stop VMs
|
|
- [ ] Runtime manages VM registry
|
|
- [ ] Integration test with NATS and JetStream |