Add snapshot support to event sourcing workflow #148

Open
opened 2026-05-15 09:36:04 +00:00 by HugoNijhuis · 0 comments
Owner

Problem

SnapshotStore interface is defined but snapshots are not integrated into the event sourcing workflow. This means:

  • Actors with many events must replay entire history
  • No performance optimization for long-lived actors
  • Snapshots exist as API but are not used

Current State

  • EventStoreWithErrors in event.go:235 - no snapshot methods
  • SnapshotStore interface in event.go:245 - defined but not widely used
  • JetStreamEventStore.GetLatestSnapshot and SaveSnapshot implemented but not called automatically
  • InMemoryEventStore has snapshot methods but no lifecycle management

Required Implementation

1. Snapshot Strategy

Define when to create snapshots:

  • Fixed interval (e.g., every 100 events)
  • Version-based (e.g., every 50 versions)
  • Hybrid: version-based with min/max bounds

2. State Capture

Add method to capture actor state:

// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)

3. Snapshot Store Extension

Extend EventStoreWithErrors to include snapshots:

type EventStoreWithSnapshots interface {
    EventStoreWithErrors
    GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
    SaveSnapshot(snapshot *ActorSnapshot) error
}

4. Snapshot Workflow

Modify event retrieval to use snapshots:

GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
    // 1. Try to get latest snapshot
    snapshot, _ := store.GetLatestSnapshot(actorID)
    
    // 2. If snapshot exists and version <= fromVersion:
    //    - Return events from snapshot version + 1
    // 3. Else:
    //    - Replay all events from version 0
}

Suggested Implementation

1. Add CaptureState to EventStore interface

In event.go, extend EventStore or create StateStore interface:

type StateStore interface {
    EventStore
    CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}

2. Implement CaptureState

In store/jetstream.go:

func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
    // Replay events and build state (application logic needed here)
    events, _ := jes.GetEvents(actorID, fromVersion)
    // Need application logic to convert events to state
    return state, nil
}

3. Add Snapshot Helper

Create snapshot utilities:

// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
    return &ActorSnapshot{
        ActorID:   actorID,
        Version:   version,
        State:     state,
        Timestamp: time.Now(),
    }
}

4. Modify GetEvents

Update GetEvents in both stores to use snapshots when beneficial.

Snapshots Workflow Example

1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
   - Get snapshot at version 1000? No (version too high)
   - Replay 900->1000 events (only 100 events)
5. Request events from version 50:
   - Get latest snapshot at version 1000? Yes (version > 50)
   - Use snapshot as base
   - Replay 1000->1000 events (none)

Acceptance Criteria

  • CaptureState method added to event store
  • Snapshots created at configured intervals
  • GetEvents uses snapshots to optimize replay
  • Snapshot workflow tested with long-lived actors
  • Configuration for snapshot interval/version
  • Metrics: snapshot count, average replay size
## Problem `SnapshotStore` interface is defined but snapshots are not integrated into the event sourcing workflow. This means: - Actors with many events must replay entire history - No performance optimization for long-lived actors - Snapshots exist as API but are not used ## Current State - `EventStoreWithErrors` in `event.go:235` - no snapshot methods - `SnapshotStore` interface in `event.go:245` - defined but not widely used - `JetStreamEventStore.GetLatestSnapshot` and `SaveSnapshot` implemented but not called automatically - `InMemoryEventStore` has snapshot methods but no lifecycle management ## Required Implementation ### 1. Snapshot Strategy Define when to create snapshots: - Fixed interval (e.g., every 100 events) - Version-based (e.g., every 50 versions) - Hybrid: version-based with min/max bounds ### 2. State Capture Add method to capture actor state: ```go // CaptureState rebuilds actor state by replaying events and returns it CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) ``` ### 3. Snapshot Store Extension Extend `EventStoreWithErrors` to include snapshots: ```go type EventStoreWithSnapshots interface { EventStoreWithErrors GetLatestSnapshot(actorID string) (*ActorSnapshot, error) SaveSnapshot(snapshot *ActorSnapshot) error } ``` ### 4. Snapshot Workflow Modify event retrieval to use snapshots: ```go GetEvents(actorID string, fromVersion int64) ([]*Event, error) { // 1. Try to get latest snapshot snapshot, _ := store.GetLatestSnapshot(actorID) // 2. If snapshot exists and version <= fromVersion: // - Return events from snapshot version + 1 // 3. Else: // - Replay all events from version 0 } ``` ## Suggested Implementation ### 1. Add CaptureState to EventStore interface In `event.go`, extend `EventStore` or create `StateStore` interface: ```go type StateStore interface { EventStore CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) } ``` ### 2. Implement CaptureState In `store/jetstream.go`: ```go func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) { // Replay events and build state (application logic needed here) events, _ := jes.GetEvents(actorID, fromVersion) // Need application logic to convert events to state return state, nil } ``` ### 3. Add Snapshot Helper Create snapshot utilities: ```go // CreateSnapshot creates snapshot from state func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot { return &ActorSnapshot{ ActorID: actorID, Version: version, State: state, Timestamp: time.Now(), } } ``` ### 4. Modify GetEvents Update `GetEvents` in both stores to use snapshots when beneficial. ## Snapshots Workflow Example ``` 1. Actor has 1000 events 2. Every 100 events, create snapshot 3. Actor reaches version 1000, snapshot at version 1000 4. Request events from version 900: - Get snapshot at version 1000? No (version too high) - Replay 900->1000 events (only 100 events) 5. Request events from version 50: - Get latest snapshot at version 1000? Yes (version > 50) - Use snapshot as base - Replay 1000->1000 events (none) ``` ## Acceptance Criteria - [ ] `CaptureState` method added to event store - [ ] Snapshots created at configured intervals - [ ] `GetEvents` uses snapshots to optimize replay - [ ] Snapshot workflow tested with long-lived actors - [ ] Configuration for snapshot interval/version - [ ] Metrics: snapshot count, average replay size
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: flowmade-one/aether#148