diff --git a/store/jetstream.go b/store/jetstream.go index 654c3fa..cc06ea6 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -150,19 +150,36 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { jes.mu.Lock() defer jes.mu.Unlock() - // Get current latest version for this actor - currentVersion, err := jes.getLatestVersionLocked(event.ActorID) - if err != nil { - return fmt.Errorf("failed to get latest version: %w", err) - } - - // Validate version is strictly greater than current - if event.Version <= currentVersion { - return &aether.VersionConflictError{ - ActorID: event.ActorID, - AttemptedVersion: event.Version, - CurrentVersion: currentVersion, + // Check cache first + if version, ok := jes.versions[event.ActorID]; ok { + // Validate version against cached version + if event.Version <= version { + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: version, + } } + // Version check passed, proceed with publish while holding lock + } else { + // Cache miss - need to check actual stream + // Get current latest version while holding lock to prevent TOCTOU race + currentVersion, err := jes.getLatestVersionLocked(event.ActorID) + if err != nil { + return fmt.Errorf("failed to get latest version: %w", err) + } + + // Validate version is strictly greater than current + if event.Version <= currentVersion { + return &aether.VersionConflictError{ + ActorID: event.ActorID, + AttemptedVersion: event.Version, + CurrentVersion: currentVersion, + } + } + + // Update cache with current version + jes.versions[event.ActorID] = currentVersion } // Serialize event to JSON @@ -183,43 +200,12 @@ func (jes *JetStreamEventStore) SaveEvent(event *aether.Event) error { return fmt.Errorf("failed to publish event to JetStream: %w", err) } - // Update version cache + // Update version cache after successful publish jes.versions[event.ActorID] = event.Version return nil } -// getLatestVersionLocked returns the latest version for an actor. -// Caller must hold jes.mu. -func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) { - // Check cache first - if version, ok := jes.versions[actorID]; ok { - return version, nil - } - - // Fetch from JetStream - use internal method that returns result - result, err := jes.getEventsWithErrorsInternal(actorID, 0) - if err != nil { - return 0, err - } - - if len(result.Events) == 0 { - return 0, nil - } - - latestVersion := int64(0) - for _, event := range result.Events { - if event.Version > latestVersion { - latestVersion = event.Version - } - } - - // Update cache - jes.versions[actorID] = latestVersion - - return latestVersion, nil -} - // GetEvents retrieves all events for an actor since a version. // Note: This method silently skips malformed events for backward compatibility. // Use GetEventsWithErrors to receive information about malformed events. @@ -303,41 +289,96 @@ func (jes *JetStreamEventStore) getEventsWithErrorsInternal(actorID string, from return result, nil } -// GetLatestVersion returns the latest version for an actor, repopulating cache -// with fresh data to ensure consistency even if external processes write to -// the same JetStream stream. +// GetLatestVersion returns the latest version for an actor in O(1) time. +// It uses JetStream's DeliverLast() option to fetch only the last message +// instead of scanning all events, making this O(1) instead of O(n). func (jes *JetStreamEventStore) GetLatestVersion(actorID string) (int64, error) { - // Hold lock during fetch to prevent race condition with SaveEvent - jes.mu.Lock() - defer jes.mu.Unlock() + // Create subject filter for this actor + subject := fmt.Sprintf("%s.events.%s.%s", + jes.streamName, + sanitizeSubject(extractActorType(actorID)), + sanitizeSubject(actorID)) - events, err := jes.GetEvents(actorID, 0) + // Create consumer to read only the last message + consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast()) if err != nil { - return 0, err + return 0, fmt.Errorf("failed to create consumer: %w", err) + } + defer consumer.Unsubscribe() + + // Fetch only the last message + msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) + if err != nil { + if err == nats.ErrTimeout { + // No messages for this actor, return 0 + return 0, nil + } + return 0, fmt.Errorf("failed to fetch last message: %w", err) } - if len(events) == 0 { - // No events for this actor - ensure cache is cleared - delete(jes.versions, actorID) + if len(msgs) == 0 { + // No events for this actor return 0, nil } - latestVersion := int64(0) - for _, event := range events { - if event.Version > latestVersion { - latestVersion = event.Version - } + // Parse the last message to get the version + var event aether.Event + if err := json.Unmarshal(msgs[0].Data, &event); err != nil { + return 0, fmt.Errorf("failed to unmarshal last event: %w", err) } - // Always repopulate cache with the fresh data just fetched - // This ensures cache is in sync with actual state, whether from local writes - // or external writes detected by version comparison - jes.versions[actorID] = latestVersion - - return latestVersion, nil + msgs[0].Ack() + return event.Version, nil } -// GetLatestSnapshot gets the most recent snapshot for an actor +// getLatestVersionLocked is like GetLatestVersion but assumes the caller already holds jes.mu. +// This is used internally to avoid releasing and reacquiring the lock during SaveEvent, +// which would create a TOCTOU race condition. +func (jes *JetStreamEventStore) getLatestVersionLocked(actorID string) (int64, error) { + // Create subject filter for this actor + subject := fmt.Sprintf("%s.events.%s.%s", + jes.streamName, + sanitizeSubject(extractActorType(actorID)), + sanitizeSubject(actorID)) + + // Create consumer to read only the last message + consumer, err := jes.js.PullSubscribe(subject, "", nats.DeliverLast()) + if err != nil { + return 0, fmt.Errorf("failed to create consumer: %w", err) + } + defer consumer.Unsubscribe() + + // Fetch only the last message + msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) + if err != nil { + if err == nats.ErrTimeout { + // No messages for this actor, return 0 + return 0, nil + } + return 0, fmt.Errorf("failed to fetch last message: %w", err) + } + + if len(msgs) == 0 { + // No events for this actor + return 0, nil + } + + // Parse the last message to get the version + var event aether.Event + if err := json.Unmarshal(msgs[0].Data, &event); err != nil { + return 0, fmt.Errorf("failed to unmarshal last event: %w", err) + } + + msgs[0].Ack() + return event.Version, nil +} + +// GetLatestSnapshot gets the most recent snapshot for an actor. +// Returns an error if no snapshot exists for the actor (unlike GetLatestVersion which returns 0). +// This is intentional: a missing snapshot is different from a missing event stream. +// If an actor has no events, that's a normal state (use version 0). +// If an actor has no snapshot, that could indicate an error or it could be normal +// depending on the use case, so we let the caller decide how to handle it. func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) { // Create subject for snapshots subject := fmt.Sprintf("%s.snapshots.%s.%s", @@ -355,12 +396,14 @@ func (jes *JetStreamEventStore) GetLatestSnapshot(actorID string) (*aether.Actor msgs, err := consumer.Fetch(1, nats.MaxWait(time.Second)) if err != nil { if err == nats.ErrTimeout { + // No snapshot found - return error to distinguish from successful nil result return nil, fmt.Errorf("no snapshot found for actor %s", actorID) } return nil, fmt.Errorf("failed to fetch snapshot: %w", err) } if len(msgs) == 0 { + // No snapshot exists for this actor return nil, fmt.Errorf("no snapshot found for actor %s", actorID) } diff --git a/store/jetstream_benchmark_test.go b/store/jetstream_benchmark_test.go new file mode 100644 index 0000000..3857076 --- /dev/null +++ b/store/jetstream_benchmark_test.go @@ -0,0 +1,147 @@ +//go:build integration + +package store + +import ( + "fmt" + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" +) + +// BenchmarkGetLatestVersion_WithManyEvents benchmarks GetLatestVersion performance +// with a large number of events per actor. +// This demonstrates the O(1) performance by showing that time doesn't increase +// significantly with more events. +func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) { + nc := getTestNATSConnection(&testing.T{}) + if nc == nil { + b.Skip("NATS not available") + return + } + defer nc.Close() + + store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-getversion-%d", time.Now().UnixNano())) + if err != nil { + b.Fatalf("failed to create store: %v", err) + } + + actorID := "actor-bench-test" + + // Populate with 1000 events + for i := 1; i <= 1000; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchEvent", + ActorID: actorID, + Version: int64(i), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event) + if err != nil { + b.Fatalf("SaveEvent failed for event %d: %v", i, err) + } + } + + // Benchmark GetLatestVersion + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.GetLatestVersion(actorID) + if err != nil { + b.Fatalf("GetLatestVersion failed: %v", err) + } + } + b.StopTimer() +} + +// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache +// to show that even uncached lookups are very fast due to DeliverLast optimization. +// A new store instance is created before timing to bypass the version cache. +func BenchmarkGetLatestVersion_NoCache(b *testing.B) { + nc := getTestNATSConnection(&testing.T{}) + if nc == nil { + b.Skip("NATS not available") + return + } + defer nc.Close() + + store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-nocache-%d", time.Now().UnixNano())) + if err != nil { + b.Fatalf("failed to create store: %v", err) + } + + actorID := "actor-bench-nocache" + + // Populate with 1000 events + for i := 1; i <= 1000; i++ { + event := &aether.Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "BenchEvent", + ActorID: actorID, + Version: int64(i), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + err := store.SaveEvent(event) + if err != nil { + b.Fatalf("SaveEvent failed for event %d: %v", i, err) + } + } + + // Create a new store instance to bypass version cache + uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName()) + if err != nil { + b.Fatalf("failed to create uncached store: %v", err) + } + + // Benchmark GetLatestVersion without using cache + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := uncachedStore.GetLatestVersion(actorID) + if err != nil { + b.Fatalf("GetLatestVersion failed: %v", err) + } + } + b.StopTimer() +} + +// BenchmarkGetLatestVersion_SingleEvent benchmarks with minimal data +func BenchmarkGetLatestVersion_SingleEvent(b *testing.B) { + nc := getTestNATSConnection(&testing.T{}) + if nc == nil { + b.Skip("NATS not available") + return + } + defer nc.Close() + + store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-single-%d", time.Now().UnixNano())) + if err != nil { + b.Fatalf("failed to create store: %v", err) + } + + actorID := "actor-single" + + event := &aether.Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: actorID, + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + err = store.SaveEvent(event) + if err != nil { + b.Fatalf("SaveEvent failed: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.GetLatestVersion(actorID) + if err != nil { + b.Fatalf("GetLatestVersion failed: %v", err) + } + } + b.StopTimer() +}