All checks were successful
CI / build (push) Successful in 22s
This PR implements cross-node event broadcasting for aether. Changes: - UpdateVersionCache method in JetStreamEventStore - SubscribeToEventStored helper in NATSEventBus - Integration tests for cross-node scenarios - Example code demonstrating NATSEventBus + JetStreamEventStore Tests: All integration tests passing. Co-authored-by: Claude Code <noreply@anthropic.com> Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one> Reviewed-on: #151
3.6 KiB
3.6 KiB
Issue: Add Snapshot Support to Event Sourcing Workflow
Problem
SnapshotStore interface is defined but snapshots are not integrated into the event sourcing workflow. This means:
- Actors with many events must replay entire history
- No performance optimization for long-lived actors
- Snapshots exist as API but are not used
Current State
EventStoreWithErrorsinevent.go:235- no snapshot methodsSnapshotStoreinterface inevent.go:245- defined but not widely usedJetStreamEventStore.GetLatestSnapshotandSaveSnapshotimplemented but not called automaticallyInMemoryEventStorehas snapshot methods but no lifecycle management
Required Implementation
1. Snapshot Strategy
Define when to create snapshots:
- Fixed interval (e.g., every 100 events)
- Version-based (e.g., every 50 versions)
- Hybrid: version-based with min/max bounds
2. State Capture
Add method to capture actor state:
// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
3. Snapshot Store Extension
Extend EventStoreWithErrors to include snapshots:
type EventStoreWithSnapshots interface {
EventStoreWithErrors
GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
SaveSnapshot(snapshot *ActorSnapshot) error
}
4. Snapshot Workflow
Modify event retrieval to use snapshots:
GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
// 1. Try to get latest snapshot
snapshot, _ := store.GetLatestSnapshot(actorID)
// 2. If snapshot exists and version <= fromVersion:
// - Return events from snapshot version + 1
// 3. Else:
// - Replay all events from version 0
}
Suggested Implementation
1. Add CaptureState to EventStore interface
In event.go, extend EventStore or create StateStore interface:
type StateStore interface {
EventStore
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}
2. Implement CaptureState
In store/jetstream.go:
func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
// Replay events and build state (application logic needed here)
events, _ := jes.GetEvents(actorID, fromVersion)
// Need application logic to convert events to state
return state, nil
}
3. Add Snapshot Helper
Create snapshot utilities:
// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
return &ActorSnapshot{
ActorID: actorID,
Version: version,
State: state,
Timestamp: time.Now(),
}
}
4. Modify GetEvents
Update GetEvents in both stores to use snapshots when beneficial.
Snapshots Workflow Example
1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
- Get snapshot at version 1000? No (version too high)
- Replay 900->1000 events (only 100 events)
5. Request events from version 50:
- Get latest snapshot at version 1000? Yes (version > 50)
- Use snapshot as base
- Replay 1000->1000 events (none)
Acceptance Criteria
CaptureStatemethod added to event store- Snapshots created at configured intervals
GetEventsuses snapshots to optimize replay- Snapshot workflow tested with long-lived actors
- Configuration for snapshot interval/version
- Metrics: snapshot count, average replay size