fix: Address thread safety and resource management issues
- Fix thread safety issue in SaveEvent: Lock now only protects cache access. NATS I/O operations (GetLatestVersion calls) happen without holding the mutex, preventing lock contention when multiple concurrent SaveEvent calls occur. - Improve cache handling: Check cache first with minimal lock hold time. For cache misses, unlock before calling GetLatestVersion, then re-lock only to update cache. - Remove getLatestVersionLocked: No longer needed now that SaveEvent doesn't hold lock during GetLatestVersion calls. - Fix error handling consistency: GetLatestSnapshot now returns (nil, nil) when no snapshot exists, consistent with GetLatestVersion returning 0 for no events. Both methods now treat empty results as normal cases rather than errors. - Fix benchmark test: BenchmarkGetLatestVersion_NoCache now creates uncachedStore outside the timing loop. Previously, creating a new store on each iteration was too expensive and didn't properly measure GetLatestVersion performance. Co-Authored-By: Claude Code <noreply@anthropic.com>
This commit is contained in:
@@ -148,21 +148,42 @@ func (jes *JetStreamEventStore) GetStreamName() string {
|
|||||||
// than the current latest version for the actor.
|
// than the current latest version for the actor.
|
||||||
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
||||||
jes.mu.Lock()
|
jes.mu.Lock()
|
||||||
defer jes.mu.Unlock()
|
// Check cache first
|
||||||
|
if version, ok := jes.versions[event.ActorID]; ok {
|
||||||
// Get current latest version for this actor
|
// Validate version against cached version
|
||||||
currentVersion, err := jes.getLatestVersionLocked(event.ActorID)
|
if event.Version <= version {
|
||||||
if err != nil {
|
jes.mu.Unlock()
|
||||||
return fmt.Errorf("failed to get latest version: %w", err)
|
return &aether.VersionConflictError{
|
||||||
}
|
ActorID: event.ActorID,
|
||||||
|
AttemptedVersion: event.Version,
|
||||||
// Validate version is strictly greater than current
|
CurrentVersion: version,
|
||||||
if event.Version <= currentVersion {
|
}
|
||||||
return &aether.VersionConflictError{
|
|
||||||
ActorID: event.ActorID,
|
|
||||||
AttemptedVersion: event.Version,
|
|
||||||
CurrentVersion: currentVersion,
|
|
||||||
}
|
}
|
||||||
|
// Version check passed, proceed with publish
|
||||||
|
jes.mu.Unlock()
|
||||||
|
} else {
|
||||||
|
// Cache miss - need to check actual stream
|
||||||
|
jes.mu.Unlock()
|
||||||
|
|
||||||
|
// Get current latest version without holding lock
|
||||||
|
currentVersion, err := jes.GetLatestVersion(event.ActorID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get latest version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate version is strictly greater than current
|
||||||
|
if event.Version <= currentVersion {
|
||||||
|
return &aether.VersionConflictError{
|
||||||
|
ActorID: event.ActorID,
|
||||||
|
AttemptedVersion: event.Version,
|
||||||
|
CurrentVersion: currentVersion,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update cache after successful validation
|
||||||
|
jes.mu.Lock()
|
||||||
|
jes.versions[event.ActorID] = currentVersion
|
||||||
|
jes.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize event to JSON
|
// Serialize event to JSON
|
||||||
@@ -183,33 +204,14 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
return fmt.Errorf("failed to publish event to JetStream: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update version cache
|
// Update version cache after successful publish
|
||||||
|
jes.mu.Lock()
|
||||||
jes.versions[event.ActorID] = event.Version
|
jes.versions[event.ActorID] = event.Version
|
||||||
|
jes.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLatestVersionLocked returns the latest version for an actor.
|
|
||||||
// Caller must hold jes.mu.
|
|
||||||
// This method uses the optimized GetLatestVersion which fetches only the last message.
|
|
||||||
func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) {
|
|
||||||
// Check cache first
|
|
||||||
if version, ok := jes.versions[actorID]; ok {
|
|
||||||
return version, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use optimized GetLatestVersion to fetch only last event
|
|
||||||
latestVersion, err := jes.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update cache
|
|
||||||
jes.versions[actorID] = latestVersion
|
|
||||||
|
|
||||||
return latestVersion, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetEvents retrieves all events for an actor since a version.
|
// GetEvents retrieves all events for an actor since a version.
|
||||||
// Note: This method silently skips malformed events for backward compatibility.
|
// Note: This method silently skips malformed events for backward compatibility.
|
||||||
// Use GetEventsWithErrors to receive information about malformed events.
|
// Use GetEventsWithErrors to receive information about malformed events.
|
||||||
@@ -335,7 +337,8 @@ func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error)
|
|||||||
return event.Version, nil
|
return event.Version, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLatestSnapshot gets the most recent snapshot for an actor
|
// GetLatestSnapshot gets the most recent snapshot for an actor.
|
||||||
|
// Returns nil if no snapshot exists for the actor (consistent with GetLatestVersion).
|
||||||
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
||||||
// Create subject for snapshots
|
// Create subject for snapshots
|
||||||
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
subject := fmt.Sprintf("%s.snapshots.%s.%s",
|
||||||
@@ -353,13 +356,15 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor
|
|||||||
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == nats.ErrTimeout {
|
if err == nats.ErrTimeout {
|
||||||
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
// No snapshot found - return nil (consistent with GetLatestVersion returning 0)
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
return nil, fmt.Errorf("failed to fetch snapshot: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(msgs) == 0 {
|
if len(msgs) == 0 {
|
||||||
return nil, fmt.Errorf("no snapshot found for actor %s", actorID)
|
// No snapshot exists for this actor
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var snapshot aether.ActorSnapshot
|
var snapshot aether.ActorSnapshot
|
||||||
|
|||||||
@@ -57,7 +57,8 @@ func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
|
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
|
||||||
// to show that even uncached lookups are very fast due to DeliverLast optimization
|
// to show that even uncached lookups are very fast due to DeliverLast optimization.
|
||||||
|
// A new store instance is created before timing to bypass the version cache.
|
||||||
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
||||||
nc := getTestNATSConnection(&testing.T{})
|
nc := getTestNATSConnection(&testing.T{})
|
||||||
if nc == nil {
|
if nc == nil {
|
||||||
@@ -71,7 +72,6 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
|||||||
b.Fatalf("failed to create store: %v", err)
|
b.Fatalf("failed to create store: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new store instance each iteration to bypass cache
|
|
||||||
actorID := "actor-bench-nocache"
|
actorID := "actor-bench-nocache"
|
||||||
|
|
||||||
// Populate with 1000 events
|
// Populate with 1000 events
|
||||||
@@ -90,15 +90,16 @@ func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Benchmark GetLatestVersion without using cache (fresh instance)
|
// Create a new store instance to bypass version cache
|
||||||
|
uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("failed to create uncached store: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Benchmark GetLatestVersion without using cache
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
// Create a new store to bypass version cache
|
_, err := uncachedStore.GetLatestVersion(actorID)
|
||||||
newStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
|
|
||||||
if err != nil {
|
|
||||||
b.Fatalf("failed to create new store: %v", err)
|
|
||||||
}
|
|
||||||
_, err = newStore.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("GetLatestVersion failed: %v", err)
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user