fix: Address thread safety and resource management issues
Some checks failed
CI / build (pull_request) Successful in 19s
CI / integration (pull_request) Failing after 1m59s

- Fix thread safety issue in SaveEvent: Lock now only protects cache access. NATS I/O operations (GetLatestVersion calls) happen without holding the mutex, preventing lock contention when multiple concurrent SaveEvent calls occur.

- Improve cache handling: Check cache first with minimal lock hold time. For cache misses, unlock before calling GetLatestVersion, then re-lock only to update cache.

- Remove getLatestVersionLocked: No longer needed now that SaveEvent doesn't hold lock during GetLatestVersion calls.

- Fix error handling consistency: GetLatestSnapshot now returns (nil, nil) when no snapshot exists, consistent with GetLatestVersion returning 0 for no events. Both methods now treat empty results as normal cases rather than errors.

- Fix benchmark test: BenchmarkGetLatestVersion_NoCache now creates uncachedStore outside the timing loop. Previously, creating a new store on each iteration was too expensive and didn't properly measure GetLatestVersion performance.

Co-Authored-By: Claude Code <noreply@anthropic.com>
This commit is contained in:
Claude Code
2026-01-13 01:30:39 +01:00
parent 9d4ed1dd08
commit a258ec9754
2 changed files with 54 additions and 48 deletions

View File

@@ -57,7 +57,8 @@ func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) {
}
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
// to show that even uncached lookups are very fast due to DeliverLast optimization
// to show that even uncached lookups are very fast due to DeliverLast optimization.
// A new store instance is created before timing to bypass the version cache.
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
nc := getTestNATSConnection(&testing.T{})
if nc == nil {
@@ -71,7 +72,6 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
b.Fatalf("failed to create store: %v", err)
}
// Create a new store instance each iteration to bypass cache
actorID := "actor-bench-nocache"
// Populate with 1000 events
@@ -90,15 +90,16 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
}
}
// Benchmark GetLatestVersion without using cache (fresh instance)
// Create a new store instance to bypass version cache
uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
if err != nil {
b.Fatalf("failed to create uncached store: %v", err)
}
// Benchmark GetLatestVersion without using cache
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Create a new store to bypass version cache
newStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
if err != nil {
b.Fatalf("failed to create new store: %v", err)
}
_, err = newStore.GetLatestVersion(actorID)
_, err := uncachedStore.GetLatestVersion(actorID)
if err != nil {
b.Fatalf("GetLatestVersion failed: %v", err)
}