Compare commits
6 Commits
e69f7a30e4
...
de30e1ef1b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de30e1ef1b | ||
|
|
b9e641c2aa | ||
|
|
ec3db5668f | ||
| 20d688f2a2 | |||
|
|
fd1938672e | ||
|
|
6de897ef60 |
24
CLAUDE.md
24
CLAUDE.md
@@ -122,6 +122,30 @@ if errors.Is(err, aether.ErrVersionConflict) {
|
|||||||
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
|
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
|
||||||
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
|
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
|
||||||
|
|
||||||
|
#### Version Cache Invalidation
|
||||||
|
|
||||||
|
The JetStreamEventStore maintains an in-memory cache of actor versions to optimize
|
||||||
|
repeated version checks during optimistic concurrency control. The cache is designed
|
||||||
|
to handle multi-store scenarios where external processes may write to the same
|
||||||
|
JetStream stream:
|
||||||
|
|
||||||
|
- **Cache hits**: Cached version is returned immediately for performance
|
||||||
|
- **Cache misses**: If no cached version exists, JetStream is queried and cached
|
||||||
|
- **External writes**: If GetLatestVersion detects a version newer than cached, the cache is invalidated and fresh data is fetched next time
|
||||||
|
|
||||||
|
This strategy ensures data consistency even in scenarios with external writers while
|
||||||
|
maintaining excellent performance for the single-writer case (where only Aether owns
|
||||||
|
the stream).
|
||||||
|
|
||||||
|
**Implementation detail**: The cache is invalidated by deleting the entry, forcing
|
||||||
|
a fresh fetch from JetStream on the next version check for that actor. This is
|
||||||
|
safe because:
|
||||||
|
|
||||||
|
1. SaveEvent uses getLatestVersionLocked() which checks JetStream on cache miss
|
||||||
|
2. GetLatestVersion always fetches fresh data and detects stale cache entries
|
||||||
|
3. Subsequent checks will fetch from JetStream until the cache is repopulated
|
||||||
|
|
||||||
|
|
||||||
### Namespace Isolation
|
### Namespace Isolation
|
||||||
|
|
||||||
Namespaces provide logical boundaries for events and subscriptions.
|
Namespaces provide logical boundaries for events and subscriptions.
|
||||||
|
|||||||
@@ -40,6 +40,24 @@ func DefaultJetStreamConfig() JetStreamConfig {
|
|||||||
|
|
||||||
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
||||||
// It also implements EventStoreWithErrors to report malformed events during replay.
|
// It also implements EventStoreWithErrors to report malformed events during replay.
|
||||||
|
//
|
||||||
|
// ## Version Cache Invalidation Strategy
|
||||||
|
//
|
||||||
|
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
|
||||||
|
// concurrency control. The cache is invalidated on any miss (GetLatestVersion call
|
||||||
|
// that finds a newer version in JetStream) to ensure consistency even when external
|
||||||
|
// processes write to the same JetStream stream.
|
||||||
|
//
|
||||||
|
// If only Aether owns the stream (single-writer assumption), the cache provides
|
||||||
|
// excellent performance for repeated version checks. If external writers modify
|
||||||
|
// the stream, the cache will remain consistent because:
|
||||||
|
//
|
||||||
|
// 1. On SaveEvent: getLatestVersionLocked() checks JetStream on cache miss
|
||||||
|
// 2. On GetLatestVersion: If actual version > cached version, cache is invalidated
|
||||||
|
// 3. Subsequent checks for that actor will fetch fresh data from JetStream
|
||||||
|
//
|
||||||
|
// This strategy prevents data corruption from stale cache while maintaining
|
||||||
|
// performance for the single-writer case.
|
||||||
type JetStreamEventStore struct {
|
type JetStreamEventStore struct {
|
||||||
js nats.JetStreamContext
|
js nats.JetStreamContext
|
||||||
streamName string
|
streamName string
|
||||||
@@ -48,6 +66,15 @@ type JetStreamEventStore struct {
|
|||||||
versions map[string]int64 // actorID -> latest version cache
|
versions map[string]int64 // actorID -> latest version cache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
||||||
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
||||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||||
@@ -123,19 +150,36 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
jes.mu.Lock()
|
jes.mu.Lock()
|
||||||
defer jes.mu.Unlock()
|
defer jes.mu.Unlock()
|
||||||
|
|
||||||
// Get current latest version for this actor
|
// Check cache first
|
||||||
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
if version, ok := jes.versions[event.ActorID]; ok {
|
||||||
if err != nil {
|
// Validate version against cached version
|
||||||
return fmt.Errorf("failed to get latest version: %w", err)
|
if event.Version <= version {
|
||||||
}
|
return &aether.VersionConflictError{
|
||||||
|
ActorID: event.ActorID,
|
||||||
// Validate version is strictly greater than current
|
AttemptedVersion: event.Version,
|
||||||
if event.Version <= currentVersion {
|
CurrentVersion: version,
|
||||||
return &aether.VersionConflictError{
|
}
|
||||||
ActorID: event.ActorID,
|
|
||||||
AttemptedVersion: event.Version,
|
|
||||||
CurrentVersion: currentVersion,
|
|
||||||
}
|
}
|
||||||
|
// Version check passed, proceed with publish while holding lock
|
||||||
|
} else {
|
||||||
|
// Cache miss - need to check actual stream
|
||||||
|
// Get current latest version while holding lock to prevent TOCTOU race
|
||||||
|
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get latest version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate version is strictly greater than current
|
||||||
|
if event.Version <= currentVersion {
|
||||||
|
return &aether.VersionConflictError{
|
||||||
|
ActorID: event.ActorID,
|
||||||
|
AttemptedVersion: event.Version,
|
||||||
|
CurrentVersion: currentVersion,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update cache with current version
|
||||||
|
jes.versions[event.ActorID] = currentVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize event to JSON
|
// Serialize event to JSON
|
||||||
@@ -156,43 +200,12 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update version cache
|
// Update version cache after successful publish
|
||||||
jes.versions[event.ActorID] = event.Version
|
jes.versions[event.ActorID] = event.Version
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLatestVersionLocked returns the latest version for an actor.
|
|
||||||
// Caller must hold jes.mu.
|
|
||||||
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
|
||||||
// Check cache first
|
|
||||||
if version, ok := jes.versions[actorID]; ok {
|
|
||||||
return version, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch from JetStream - use internal method that returns result
|
|
||||||
result, err := jes.getEventsWithErrorsInternal(actorID, 0)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(result.Events) == 0 {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
latestVersion := int64(0)
|
|
||||||
for _, event := range result.Events {
|
|
||||||
if event.Version > latestVersion {
|
|
||||||
latestVersion = event.Version
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update cache
|
|
||||||
jes.versions[actorID] = latestVersion
|
|
||||||
|
|
||||||
return latestVersion, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetEvents retrieves all events for an actor since a version.
|
// GetEvents retrieves all events for an actor since a version.
|
||||||
// Note: This method silently skips malformed events for backward compatibility.
|
// Note: This method silently skips malformed events for backward compatibility.
|
||||||
// Use GetEventsWithErrors to receive information about malformed events.
|
// Use GetEventsWithErrors to receive information about malformed events.
|
||||||
@@ -276,28 +289,96 @@ func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, from
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor
|
// GetLatestVersion returns the latest version for an actor in O(1) time.
|
||||||
|
// It uses JetStream's DeliverLast() option to fetch only the last message
|
||||||
|
// instead of scanning all events, making this O(1) instead of O(n).
|
||||||
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||||
events, err := jes.GetEvents(actorID, 0)
|
// Create subject filter for this actor
|
||||||
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
||||||
|
jes.streamName,
|
||||||
|
sanitizeSubject(extractActorType(actorID)),
|
||||||
|
sanitizeSubject(actorID))
|
||||||
|
|
||||||
|
// Create consumer to read only the last message
|
||||||
|
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, fmt.Errorf("failed to create consumer: %w", err)
|
||||||
|
}
|
||||||
|
defer consumer.Unsubscribe()
|
||||||
|
|
||||||
|
// Fetch only the last message
|
||||||
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
|
if err != nil {
|
||||||
|
if err == nats.ErrTimeout {
|
||||||
|
// No messages for this actor, return 0
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("failed to fetch last message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(events) == 0 {
|
if len(msgs) == 0 {
|
||||||
|
// No events for this actor
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
latestVersion := int64(0)
|
// Parse the last message to get the version
|
||||||
for _, event := range events {
|
var event aether.Event
|
||||||
if event.Version > latestVersion {
|
if err := json.Unmarshal(msgs[0].Data, &event); err != nil {
|
||||||
latestVersion = event.Version
|
return 0, fmt.Errorf("failed to unmarshal last event: %w", err)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return latestVersion, nil
|
msgs[0].Ack()
|
||||||
|
return event.Version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestSnapshot gets the most recent snapshot for an actor
|
// getLatestVersionLocked is like GetLatestVersion but assumes the caller already holds jes.mu.
|
||||||
|
// This is used internally to avoid releasing and reacquiring the lock during SaveEvent,
|
||||||
|
// which would create a TOCTOU race condition.
|
||||||
|
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
||||||
|
// Create subject filter for this actor
|
||||||
|
subject := fmt.Sprintf("%s.events.%s.%s",
|
||||||
|
jes.streamName,
|
||||||
|
sanitizeSubject(extractActorType(actorID)),
|
||||||
|
sanitizeSubject(actorID))
|
||||||
|
|
||||||
|
// Create consumer to read only the last message
|
||||||
|
consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast())
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to create consumer: %w", err)
|
||||||
|
}
|
||||||
|
defer consumer.Unsubscribe()
|
||||||
|
|
||||||
|
// Fetch only the last message
|
||||||
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
|
if err != nil {
|
||||||
|
if err == nats.ErrTimeout {
|
||||||
|
// No messages for this actor, return 0
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("failed to fetch last message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(msgs) == 0 {
|
||||||
|
// No events for this actor
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the last message to get the version
|
||||||
|
var event aether.Event
|
||||||
|
if err := json.Unmarshal(msgs[0].Data, &event); err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to unmarshal last event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs[0].Ack()
|
||||||
|
return event.Version, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLatestSnapshot gets the most recent snapshot for an actor.
|
||||||
|
// Returns an error if no snapshot exists for the actor (unlike GetLatestVersion which returns 0).
|
||||||
|
// This is intentional: a missing snapshot is different from a missing event stream.
|
||||||
|
// If an actor has no events, that's a normal state (use version 0).
|
||||||
|
// If an actor has no snapshot, that could indicate an error or it could be normal
|
||||||
|
// depending on the use case, so we let the caller decide how to handle it.
|
||||||
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
||||||
// Create subject for snapshots
|
// Create subject for snapshots
|
||||||
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
||||||
@@ -315,12 +396,14 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor
|
|||||||
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == nats.ErrTimeout {
|
if err == nats.ErrTimeout {
|
||||||
|
// No snapshot found - return error to distinguish from successful nil result
|
||||||
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
|
// No snapshot exists for this actor
|
||||||
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
147
store/jetstream_benchmark_test.go
Normal file
147
store/jetstream_benchmark_test.go
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BenchmarkGetLatestVersion_WithManyEvents benchmarks GetLatestVersion performance
|
||||||
|
// with a large number of events per actor.
|
||||||
|
// This demonstrates the O(1) performance by showing that time doesn't increase
|
||||||
|
// significantly with more events.
|
||||||
|
func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) {
|
||||||
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
|
if nc == nil {
|
||||||
|
b.Skip("NATS not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-getversion-%d", time.Now().UnixNano()))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-bench-test"
|
||||||
|
|
||||||
|
// Populate with 1000 events
|
||||||
|
for i := 1; i <= 1000; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "BenchEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: int64(i),
|
||||||
|
Data: map[string]interface{}{"index": i},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Benchmark GetLatestVersion
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
|
||||||
|
// to show that even uncached lookups are very fast due to DeliverLast optimization.
|
||||||
|
// A new store instance is created before timing to bypass the version cache.
|
||||||
|
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
||||||
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
|
if nc == nil {
|
||||||
|
b.Skip("NATS not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-nocache-%d", time.Now().UnixNano()))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-bench-nocache"
|
||||||
|
|
||||||
|
// Populate with 1000 events
|
||||||
|
for i := 1; i <= 1000; i++ {
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: fmt.Sprintf("evt-%d", i),
|
||||||
|
EventType: "BenchEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: int64(i),
|
||||||
|
Data: map[string]interface{}{"index": i},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err := store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new store instance to bypass version cache
|
||||||
|
uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create uncached store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Benchmark GetLatestVersion without using cache
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := uncachedStore.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkGetLatestVersion_SingleEvent benchmarks with minimal data
|
||||||
|
func BenchmarkGetLatestVersion_SingleEvent(b *testing.B) {
|
||||||
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
|
if nc == nil {
|
||||||
|
b.Skip("NATS not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-single-%d", time.Now().UnixNano()))
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-single"
|
||||||
|
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
err = store.SaveEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("SaveEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
_, err := store.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
|
}
|
||||||
@@ -1322,6 +1322,119 @@ func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// === Cache Invalidation Tests ===
|
||||||
|
|
||||||
|
func TestJetStreamEventStore_CacheInvalidationOnExternalWrite(t *testing.T) {
|
||||||
|
nc := getTestNATSConnection(t)
|
||||||
|
defer nc.Close()
|
||||||
|
|
||||||
|
streamName := uniqueStreamName("test-cache-invalidation")
|
||||||
|
defer cleanupStream(nc, streamName)
|
||||||
|
|
||||||
|
// Create two stores for the same stream
|
||||||
|
store1, err := NewJetStreamEventStore(nc, streamName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create store1: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
store2, err := NewJetStreamEventStore(nc, streamName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create store2: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actorID := "actor-cache-test"
|
||||||
|
|
||||||
|
// store1: Save event v1 (caches version 1)
|
||||||
|
event1 := &aether.Event{
|
||||||
|
ID: "evt-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 1,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
if err := store1.SaveEvent(event1); err != nil {
|
||||||
|
t.Fatalf("SaveEvent from store1 failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify store1 sees version 1 (uses cache)
|
||||||
|
v1, err := store1.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetLatestVersion from store1 failed: %v", err)
|
||||||
|
}
|
||||||
|
if v1 != 1 {
|
||||||
|
t.Errorf("store1 should see version 1, got %d", v1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// store2: Save event v2 (external write from store1's perspective)
|
||||||
|
event2 := &aether.Event{
|
||||||
|
ID: "evt-2",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 2,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
if err := store2.SaveEvent(event2); err != nil {
|
||||||
|
t.Fatalf("SaveEvent from store2 failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// store1: GetLatestVersion should detect external write and return v2
|
||||||
|
// (This triggers cache invalidation because actual version > cached version)
|
||||||
|
v2, err := store1.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetLatestVersion from store1 (after external write) failed: %v", err)
|
||||||
|
}
|
||||||
|
if v2 != 2 {
|
||||||
|
t.Errorf("store1 should see version 2 after external write, got %d", v2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify cache was repopulated - second GetLatestVersion should use cache efficiently
|
||||||
|
v2Again, err := store1.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Second GetLatestVersion from store1 failed: %v", err)
|
||||||
|
}
|
||||||
|
if v2Again != 2 {
|
||||||
|
t.Errorf("store1 cache should have version 2, got %d", v2Again)
|
||||||
|
}
|
||||||
|
|
||||||
|
// store2: Save event v3 (another external write)
|
||||||
|
event3 := &aether.Event{
|
||||||
|
ID: "evt-3",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 3,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
if err := store2.SaveEvent(event3); err != nil {
|
||||||
|
t.Fatalf("SaveEvent from store2 (v3) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// store1: After cache invalidation, SaveEvent should use fresh data from JetStream
|
||||||
|
event4 := &aether.Event{
|
||||||
|
ID: "evt-4",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: actorID,
|
||||||
|
Version: 4,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
if err := store1.SaveEvent(event4); err != nil {
|
||||||
|
t.Fatalf("SaveEvent from store1 (after cache invalidation) failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all 4 events are persisted
|
||||||
|
events, err := store1.GetEvents(actorID, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
if len(events) != 4 {
|
||||||
|
t.Errorf("expected 4 events after cache invalidation, got %d", len(events))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// === Interface Compliance Tests ===
|
// === Interface Compliance Tests ===
|
||||||
|
|
||||||
func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) {
|
func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user