Files
aether/.product-strategy/ISSUE_SNAPSHOTS.md
Hugo Nijhuis b481dae0b6
All checks were successful
CI / build (push) Successful in 22s
feat: implement cross-node event broadcasting with NATSEventBus (#151)
This PR implements cross-node event broadcasting for aether.

Changes:
- UpdateVersionCache method in JetStreamEventStore
- SubscribeToEventStored helper in NATSEventBus
- Integration tests for cross-node scenarios
- Example code demonstrating NATSEventBus + JetStreamEventStore

Tests: All integration tests passing.
Co-authored-by: Claude Code <noreply@anthropic.com>
Co-authored-by: Hugo Nijhuis <hugo.nijhuis@flowmade.one>
Reviewed-on: #151
2026-05-17 15:29:52 +00:00

3.6 KiB

Issue: Add Snapshot Support to Event Sourcing Workflow

Problem

SnapshotStore interface is defined but snapshots are not integrated into the event sourcing workflow. This means:

  • Actors with many events must replay entire history
  • No performance optimization for long-lived actors
  • Snapshots exist as API but are not used

Current State

  • EventStoreWithErrors in event.go:235 - no snapshot methods
  • SnapshotStore interface in event.go:245 - defined but not widely used
  • JetStreamEventStore.GetLatestSnapshot and SaveSnapshot implemented but not called automatically
  • InMemoryEventStore has snapshot methods but no lifecycle management

Required Implementation

1. Snapshot Strategy

Define when to create snapshots:

  • Fixed interval (e.g., every 100 events)
  • Version-based (e.g., every 50 versions)
  • Hybrid: version-based with min/max bounds

2. State Capture

Add method to capture actor state:

// CaptureState rebuilds actor state by replaying events and returns it
CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)

3. Snapshot Store Extension

Extend EventStoreWithErrors to include snapshots:

type EventStoreWithSnapshots interface {
    EventStoreWithErrors
    GetLatestSnapshot(actorID string) (*ActorSnapshot, error)
    SaveSnapshot(snapshot *ActorSnapshot) error
}

4. Snapshot Workflow

Modify event retrieval to use snapshots:

GetEvents(actorID string, fromVersion int64) ([]*Event, error) {
    // 1. Try to get latest snapshot
    snapshot, _ := store.GetLatestSnapshot(actorID)
    
    // 2. If snapshot exists and version <= fromVersion:
    //    - Return events from snapshot version + 1
    // 3. Else:
    //    - Replay all events from version 0
}

Suggested Implementation

1. Add CaptureState to EventStore interface

In event.go, extend EventStore or create StateStore interface:

type StateStore interface {
    EventStore
    CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error)
}

2. Implement CaptureState

In store/jetstream.go:

func (jes *JetStreamEventStore) CaptureState(actorID string, fromVersion int64) (map[string]interface{}, error) {
    // Replay events and build state (application logic needed here)
    events, _ := jes.GetEvents(actorID, fromVersion)
    // Need application logic to convert events to state
    return state, nil
}

3. Add Snapshot Helper

Create snapshot utilities:

// CreateSnapshot creates snapshot from state
func CreateSnapshot(actorID string, version int64, state map[string]interface{}) *ActorSnapshot {
    return &ActorSnapshot{
        ActorID:   actorID,
        Version:   version,
        State:     state,
        Timestamp: time.Now(),
    }
}

4. Modify GetEvents

Update GetEvents in both stores to use snapshots when beneficial.

Snapshots Workflow Example

1. Actor has 1000 events
2. Every 100 events, create snapshot
3. Actor reaches version 1000, snapshot at version 1000
4. Request events from version 900:
   - Get snapshot at version 1000? No (version too high)
   - Replay 900->1000 events (only 100 events)
5. Request events from version 50:
   - Get latest snapshot at version 1000? Yes (version > 50)
   - Use snapshot as base
   - Replay 1000->1000 events (none)

Acceptance Criteria

  • CaptureState method added to event store
  • Snapshots created at configured intervals
  • GetEvents uses snapshots to optimize replay
  • Snapshot workflow tested with long-lived actors
  • Configuration for snapshot interval/version
  • Metrics: snapshot count, average replay size