diff --git a/store/jetstream.go b/store/jetstream.go index 452e16f..20ea6eb 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -121,21 +121,42 @@ func (jes *JetStreamEventStore) GetStreamName() string { // than the current latest version for the actor. func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { jes.mu.Lock() - defer jes.mu.Unlock() - - // Get current latest version for this actor - currentVersion, err := jes.getLatestVersionLocked(event.ActorID) - if err != nil { - return fmt.Errorf("failed to get latest version: %w", err) - } - - // Validate version is strictly greater than current - if event.Version <= currentVersion { - return &aether.VersionConflictError{ - ActorID: event.ActorID, - AttemptedVersion: event.Version, - CurrentVersion: currentVersion, + // Check cache first + if version, ok := jes.versions[event.ActorID]; ok { + // Validate version against cached version + if event.Version <= version { + jes.mu.Unlock() + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: version, + } } + // Version check passed, proceed with publish + jes.mu.Unlock() + } else { + // Cache miss - need to check actual stream + jes.mu.Unlock() + + // Get current latest version without holding lock + currentVersion, err := jes.GetLatestVersion(event.ActorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Validate version is strictly greater than current + if event.Version <= currentVersion { + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: currentVersion, + } + } + + // Update cache after successful validation + jes.mu.Lock() + jes.versions[event.ActorID] = currentVersion + jes.mu.Unlock() } // Serialize event to JSON @@ -156,33 +177,14 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { return fmt.Errorf("failed to publish event to JetStream: %w", err) } - // Update version cache + // Update version cache after successful publish + jes.mu.Lock() jes.versions[event.ActorID] = event.Version + jes.mu.Unlock() return nil } -// getLatestVersionLocked returns the latest version for an actor. -// Caller must hold jes.mu. -// This method uses the optimized GetLatestVersion which fetches only the last message. -func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) { - // Check cache first - if version, ok := jes.versions[actorID]; ok { - return version, nil - } - - // Use optimized GetLatestVersion to fetch only last event - latestVersion, err := jes.GetLatestVersion(actorID) - if err != nil { - return 0, err - } - - // Update cache - jes.versions[actorID] = latestVersion - - return latestVersion, nil -} - // GetEvents retrieves all events for an actor since a version. // Note: This method silently skips malformed events for backward compatibility. // Use GetEventsWithErrors to receive information about malformed events. @@ -308,7 +310,8 @@ func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) return event.Version, nil } -// GetLatestSnapshot gets the most recent snapshot for an actor +// GetLatestSnapshot gets the most recent snapshot for an actor. +// Returns nil if no snapshot exists for the actor (consistent with GetLatestVersion). func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { // Create subject for snapshots subject := fmt.Sprintf("%s.snapshots.%s.%s", @@ -326,13 +329,15 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) if err != nil { if err == nats.ErrTimeout { - return nil, fmt.Errorf("no snapshot found for actor %s", actorID) + // No snapshot found - return nil (consistent with GetLatestVersion returning 0) + return nil, nil } return nil, fmt.Errorf("failed to fetch snapshot: %w", err) } if len(msgs) == 0 { - return nil, fmt.Errorf("no snapshot found for actor %s", actorID) + // No snapshot exists for this actor + return nil, nil } var snapshot aether.ActorSnapshot diff --git a/store/jetstream_benchmark_test.go b/store/jetstream_benchmark_test.go index fd9463d..3857076 100644 --- a/store/jetstream_benchmark_test.go +++ b/store/jetstream_benchmark_test.go @@ -57,7 +57,8 @@ func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) { } // BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache -// to show that even uncached lookups are very fast due to DeliverLast optimization +// to show that even uncached lookups are very fast due to DeliverLast optimization. +// A new store instance is created before timing to bypass the version cache. func BenchmarkGetLatestVersion_NoCache(b *testing.B) { nc := getTestNATSConnection(&testing.T{}) if nc == nil { @@ -71,7 +72,6 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) { b.Fatalf("failed to create store: %v", err) } - // Create a new store instance each iteration to bypass cache actorID := "actor-bench-nocache" // Populate with 1000 events @@ -90,15 +90,16 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) { } } - // Benchmark GetLatestVersion without using cache (fresh instance) + // Create a new store instance to bypass version cache + uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName()) + if err != nil { + b.Fatalf("failed to create uncached store: %v", err) + } + + // Benchmark GetLatestVersion without using cache b.ResetTimer() for i := 0; i < b.N; i++ { - // Create a new store to bypass version cache - newStore, err := NewJetStreamEventStore(nc, store.GetStreamName()) - if err != nil { - b.Fatalf("failed to create new store: %v", err) - } - _, err = newStore.GetLatestVersion(actorID) + _, err := uncachedStore.GetLatestVersion(actorID) if err != nil { b.Fatalf("GetLatestVersion failed: %v", err) }