All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether. Changes: - UpdateVersionCache method in JetStreamEventStore - SubscribeToEventStored helper in NATSEventBus - Integration tests for cross-node scenarios - Example code demonstrating NATSEventBus + JetStreamEventStore Tests: All integration tests passing. Co-authored-by: Claude Code <noreply@anthropic.com> Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one> Reviewed-on: #151
117 lines
3.6 KiB
Markdown
117 lines
3.6 KiB
Markdown
# Issue: Add Snapshot Support to Event Sourcing Workflow
|
|
|
|
## Problem
|
|
|
|
`SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
|
|
- Actors with many events must replay entire history
|
|
- No performance optimization for long-lived actors
|
|
- Snapshots exist as API but are not used
|
|
|
|
## Current State
|
|
|
|
- `EventStoreWithErrors` in `event.go:235` - no snapshot methods
|
|
- `SnapshotStore` interface in `event.go:245` - defined but not widely used
|
|
- `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically
|
|
- `InMemoryEventStore` has snapshot methods but no lifecycle management
|
|
|
|
## Required Implementation
|
|
|
|
### 1. Snapshot Strategy
|
|
Define when to create snapshots:
|
|
- Fixed interval (e.g., every 100 events)
|
|
- Version-based (e.g., every 50 versions)
|
|
- Hybrid: version-based with min/max bounds
|
|
|
|
### 2. State Capture
|
|
Add method to capture actor state:
|
|
```go
|
|
// CaptureState rebuilds actor state by replaying events and returns it
|
|
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
|
```
|
|
|
|
### 3. Snapshot Store Extension
|
|
Extend `EventStoreWithErrors` to include snapshots:
|
|
```go
|
|
type EventStoreWithSnapshots interface {
|
|
EventStoreWithErrors
|
|
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
|
|
SaveSnapshot(snapshot *ActorSnapshot) error
|
|
}
|
|
```
|
|
|
|
### 4. Snapshot Workflow
|
|
Modify event retrieval to use snapshots:
|
|
```go
|
|
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
|
|
// 1. Try to get latest snapshot
|
|
snapshot, _ := store.GetLatestSnapshot(actorID)
|
|
|
|
// 2. If snapshot exists and version <= fromVersion:
|
|
// - Return events from snapshot version + 1
|
|
// 3. Else:
|
|
// - Replay all events from version 0
|
|
}
|
|
```
|
|
|
|
## Suggested Implementation
|
|
|
|
### 1. Add CaptureState to EventStore interface
|
|
In `event.go`, extend `EventStore` or create `StateStore` interface:
|
|
```go
|
|
type StateStore interface {
|
|
EventStore
|
|
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
|
|
}
|
|
```
|
|
|
|
### 2. Implement CaptureState
|
|
In `store/jetstream.go`:
|
|
```go
|
|
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
|
|
// Replay events and build state (application logic needed here)
|
|
events, _ := jes.GetEvents(actorID, fromVersion)
|
|
// Need application logic to convert events to state
|
|
return state, nil
|
|
}
|
|
```
|
|
|
|
### 3. Add Snapshot Helper
|
|
Create snapshot utilities:
|
|
```go
|
|
// CreateSnapshot creates snapshot from state
|
|
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
|
|
return &ActorSnapshot{
|
|
ActorID: actorID,
|
|
Version: version,
|
|
State: state,
|
|
Timestamp: time.Now(),
|
|
}
|
|
}
|
|
```
|
|
|
|
### 4. Modify GetEvents
|
|
Update `GetEvents` in both stores to use snapshots when beneficial.
|
|
|
|
## Snapshots Workflow Example
|
|
|
|
```
|
|
1. Actor has 1000 events
|
|
2. Every 100 events, create snapshot
|
|
3. Actor reaches version 1000, snapshot at version 1000
|
|
4. Request events from version 900:
|
|
- Get snapshot at version 1000? No (version too high)
|
|
- Replay 900->1000 events (only 100 events)
|
|
5. Request events from version 50:
|
|
- Get latest snapshot at version 1000? Yes (version > 50)
|
|
- Use snapshot as base
|
|
- Replay 1000->1000 events (none)
|
|
```
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] `CaptureState` method added to event store
|
|
- [ ] Snapshots created at configured intervals
|
|
- [ ] `GetEvents` uses snapshots to optimize replay
|
|
- [ ] Snapshot workflow tested with long-lived actors
|
|
- [ ] Configuration for snapshot interval/version
|
|
- [ ] Metrics: snapshot count, average replay size |