fix(store): Implement version cache invalidation strategy for JetStreamEventStore #130
@@ -303,17 +303,22 @@ func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, from
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor, invalidating cache
|
// GetLatestVersion returns the latest version for an actor, repopulating cache
|
||||||
// if the actual version in JetStream is newer than cached version.
|
// with fresh data to ensure consistency even if external processes write to
|
||||||
// This strategy ensures cache consistency even if external processes write to
|
|
||||||
// the same JetStream stream.
|
// the same JetStream stream.
|
||||||
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||||
|
// Hold lock during fetch to prevent race condition with SaveEvent
|
||||||
|
jes.mu.Lock()
|
||||||
|
defer jes.mu.Unlock()
|
||||||
|
|
||||||
events, err := jes.GetEvents(actorID, 0)
|
events, err := jes.GetEvents(actorID, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(events) == 0 {
|
if len(events) == 0 {
|
||||||
|
// No events for this actor - ensure cache is cleared
|
||||||
|
delete(jes.versions, actorID)
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -324,17 +329,10 @@ func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invalidate cache if actual version differs from cached version
|
// Always repopulate cache with the fresh data just fetched
|
||||||
// This handles the case where external writers modify the stream
|
// This ensures cache is in sync with actual state, whether from local writes
|
||||||
jes.mu.Lock()
|
// or external writes detected by version comparison
|
||||||
if cachedVersion, ok := jes.versions[actorID]; ok && latestVersion > cachedVersion {
|
jes.versions[actorID] = latestVersion
|
||||||
// Cache was stale - invalidate it by deleting
|
|
||||||
delete(jes.versions, actorID)
|
|
||||||
} else if !ok {
|
|
||||||
// Update cache for future calls
|
|
||||||
jes.versions[actorID] = latestVersion
|
|
||||||
}
|
|
||||||
jes.mu.Unlock()
|
|
||||||
|
|
||||||
return latestVersion, nil
|
return latestVersion, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1390,6 +1390,15 @@ func TestJetStreamEventStore_CacheInvalidationOnExternalWrite(t *testing.T) {
|
|||||||
t.Errorf("store1 should see version 2 after external write, got %d", v2)
|
t.Errorf("store1 should see version 2 after external write, got %d", v2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify cache was repopulated - second GetLatestVersion should use cache efficiently
|
||||||
|
v2Again, err := store1.GetLatestVersion(actorID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Second GetLatestVersion from store1 failed: %v", err)
|
||||||
|
}
|
||||||
|
if v2Again != 2 {
|
||||||
|
t.Errorf("store1 cache should have version 2, got %d", v2Again)
|
||||||
|
}
|
||||||
|
|
||||||
// store2: Save event v3 (another external write)
|
// store2: Save event v3 (another external write)
|
||||||
event3 := &aether.Event{
|
event3 := &aether.Event{
|
||||||
ID: "evt-3",
|
ID: "evt-3",
|
||||||
|
|||||||
Reference in New Issue
Block a user