From b9e641c2aade56aa0c0c9558b6a23787a504d08c Mon Sep 17 00:00:00 2001 From: Claude Code Date: Tue, 13 Jan 2026 01:30:39 +0100 Subject: [PATCH] fix: Address thread safety and resource management issues - Fix thread safety issue in SaveEvent: Lock now only protects cache access. NATS I/O operations (GetLatestVersion calls) happen without holding the mutex, preventing lock contention when multiple concurrent SaveEvent calls occur. - Improve cache handling: Check cache first with minimal lock hold time. For cache misses, unlock before calling GetLatestVersion, then re-lock only to update cache. - Remove getLatestVersionLocked: No longer needed now that SaveEvent doesn't hold lock during GetLatestVersion calls. - Fix error handling consistency: GetLatestSnapshot now returns (nil, nil) when no snapshot exists, consistent with GetLatestVersion returning 0 for no events. Both methods now treat empty results as normal cases rather than errors. - Fix benchmark test: BenchmarkGetLatestVersion_NoCache now creates uncachedStore outside the timing loop. Previously, creating a new store on each iteration was too expensive and didn't properly measure GetLatestVersion performance. Co-Authored-By: Claude Code --- store/jetstream.go | 83 ++++++++++++++++--------------- store/jetstream_benchmark_test.go | 19 +++---- 2 files changed, 54 insertions(+), 48 deletions(-) diff --git a/store/jetstream.go b/store/jetstream.go index 81d2418..3dcbd80 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -148,21 +148,42 @@ func (jes *JetStreamEventStore) GetStreamName() string { // than the current latest version for the actor. func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { jes.mu.Lock() - defer jes.mu.Unlock() - - // Get current latest version for this actor - currentVersion, err := jes.getLatestVersionLocked(event.ActorID) - if err != nil { - return fmt.Errorf("failed to get latest version: %w", err) - } - - // Validate version is strictly greater than current - if event.Version <= currentVersion { - return &aether.VersionConflictError{ - ActorID: event.ActorID, - AttemptedVersion: event.Version, - CurrentVersion: currentVersion, + // Check cache first + if version, ok := jes.versions[event.ActorID]; ok { + // Validate version against cached version + if event.Version <= version { + jes.mu.Unlock() + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: version, + } } + // Version check passed, proceed with publish + jes.mu.Unlock() + } else { + // Cache miss - need to check actual stream + jes.mu.Unlock() + + // Get current latest version without holding lock + currentVersion, err := jes.GetLatestVersion(event.ActorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Validate version is strictly greater than current + if event.Version <= currentVersion { + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: currentVersion, + } + } + + // Update cache after successful validation + jes.mu.Lock() + jes.versions[event.ActorID] = currentVersion + jes.mu.Unlock() } // Serialize event to JSON @@ -183,33 +204,14 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { return fmt.Errorf("failed to publish event to JetStream: %w", err) } - // Update version cache + // Update version cache after successful publish + jes.mu.Lock() jes.versions[event.ActorID] = event.Version + jes.mu.Unlock() return nil } -// getLatestVersionLocked returns the latest version for an actor. -// Caller must hold jes.mu. -// This method uses the optimized GetLatestVersion which fetches only the last message. -func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) { - // Check cache first - if version, ok := jes.versions[actorID]; ok { - return version, nil - } - - // Use optimized GetLatestVersion to fetch only last event - latestVersion, err := jes.GetLatestVersion(actorID) - if err != nil { - return 0, err - } - - // Update cache - jes.versions[actorID] = latestVersion - - return latestVersion, nil -} - // GetEvents retrieves all events for an actor since a version. // Note: This method silently skips malformed events for backward compatibility. // Use GetEventsWithErrors to receive information about malformed events. @@ -335,7 +337,8 @@ func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) return event.Version, nil } -// GetLatestSnapshot gets the most recent snapshot for an actor +// GetLatestSnapshot gets the most recent snapshot for an actor. +// Returns nil if no snapshot exists for the actor (consistent with GetLatestVersion). func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { // Create subject for snapshots subject := fmt.Sprintf("%s.snapshots.%s.%s", @@ -353,13 +356,15 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) if err != nil { if err == nats.ErrTimeout { - return nil, fmt.Errorf("no snapshot found for actor %s", actorID) + // No snapshot found - return nil (consistent with GetLatestVersion returning 0) + return nil, nil } return nil, fmt.Errorf("failed to fetch snapshot: %w", err) } if len(msgs) == 0 { - return nil, fmt.Errorf("no snapshot found for actor %s", actorID) + // No snapshot exists for this actor + return nil, nil } var snapshot aether.ActorSnapshot diff --git a/store/jetstream_benchmark_test.go b/store/jetstream_benchmark_test.go index fd9463d..3857076 100644 --- a/store/jetstream_benchmark_test.go +++ b/store/jetstream_benchmark_test.go @@ -57,7 +57,8 @@ func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) { } // BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache -// to show that even uncached lookups are very fast due to DeliverLast optimization +// to show that even uncached lookups are very fast due to DeliverLast optimization. +// A new store instance is created before timing to bypass the version cache. func BenchmarkGetLatestVersion_NoCache(b *testing.B) { nc := getTestNATSConnection(&testing.T{}) if nc == nil { @@ -71,7 +72,6 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) { b.Fatalf("failed to create store: %v", err) } - // Create a new store instance each iteration to bypass cache actorID := "actor-bench-nocache" // Populate with 1000 events @@ -90,15 +90,16 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) { } } - // Benchmark GetLatestVersion without using cache (fresh instance) + // Create a new store instance to bypass version cache + uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName()) + if err != nil { + b.Fatalf("failed to create uncached store: %v", err) + } + + // Benchmark GetLatestVersion without using cache b.ResetTimer() for i := 0; i < b.N; i++ { - // Create a new store to bypass version cache - newStore, err := NewJetStreamEventStore(nc, store.GetStreamName()) - if err != nil { - b.Fatalf("failed to create new store: %v", err) - } - _, err = newStore.GetLatestVersion(actorID) + _, err := uncachedStore.GetLatestVersion(actorID) if err != nil { b.Fatalf("GetLatestVersion failed: %v", err) }