- Fix thread safety issue in SaveEvent: Lock now only protects cache access. NATS I/O operations (GetLatestVersion calls) happen without holding the mutex, preventing lock contention when multiple concurrent SaveEvent calls occur. - Improve cache handling: Check cache first with minimal lock hold time. For cache misses, unlock before calling GetLatestVersion, then re-lock only to update cache. - Remove getLatestVersionLocked: No longer needed now that SaveEvent doesn't hold lock during GetLatestVersion calls. - Fix error handling consistency: GetLatestSnapshot now returns (nil, nil) when no snapshot exists, consistent with GetLatestVersion returning 0 for no events. Both methods now treat empty results as normal cases rather than errors. - Fix benchmark test: BenchmarkGetLatestVersion_NoCache now creates uncachedStore outside the timing loop. Previously, creating a new store on each iteration was too expensive and didn't properly measure GetLatestVersion performance. Co-Authored-By: Claude Code <noreply@anthropic.com>
148 lines
3.6 KiB
Go
148 lines
3.6 KiB
Go
//go:build integration
|
|
|
|
package store
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
)
|
|
|
|
// BenchmarkGetLatestVersion_WithManyEvents benchmarks GetLatestVersion performance
|
|
// with a large number of events per actor.
|
|
// This demonstrates the O(1) performance by showing that time doesn't increase
|
|
// significantly with more events.
|
|
func BenchmarkGetLatestVersion_WithManyEvents(b *testing.B) {
|
|
nc := getTestNATSConnection(&testing.T{})
|
|
if nc == nil {
|
|
b.Skip("NATS not available")
|
|
return
|
|
}
|
|
defer nc.Close()
|
|
|
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-getversion-%d", time.Now().UnixNano()))
|
|
if err != nil {
|
|
b.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
actorID := "actor-bench-test"
|
|
|
|
// Populate with 1000 events
|
|
for i := 1; i <= 1000; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchEvent",
|
|
ActorID: actorID,
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{"index": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
// Benchmark GetLatestVersion
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
}
|
|
b.StopTimer()
|
|
}
|
|
|
|
// BenchmarkGetLatestVersion_NoCache benchmarks GetLatestVersion without cache
|
|
// to show that even uncached lookups are very fast due to DeliverLast optimization.
|
|
// A new store instance is created before timing to bypass the version cache.
|
|
func BenchmarkGetLatestVersion_NoCache(b *testing.B) {
|
|
nc := getTestNATSConnection(&testing.T{})
|
|
if nc == nil {
|
|
b.Skip("NATS not available")
|
|
return
|
|
}
|
|
defer nc.Close()
|
|
|
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-nocache-%d", time.Now().UnixNano()))
|
|
if err != nil {
|
|
b.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
actorID := "actor-bench-nocache"
|
|
|
|
// Populate with 1000 events
|
|
for i := 1; i <= 1000; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchEvent",
|
|
ActorID: actorID,
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{"index": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
err := store.SaveEvent(event)
|
|
if err != nil {
|
|
b.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
// Create a new store instance to bypass version cache
|
|
uncachedStore, err := NewJetStreamEventStore(nc, store.GetStreamName())
|
|
if err != nil {
|
|
b.Fatalf("failed to create uncached store: %v", err)
|
|
}
|
|
|
|
// Benchmark GetLatestVersion without using cache
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := uncachedStore.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
}
|
|
b.StopTimer()
|
|
}
|
|
|
|
// BenchmarkGetLatestVersion_SingleEvent benchmarks with minimal data
|
|
func BenchmarkGetLatestVersion_SingleEvent(b *testing.B) {
|
|
nc := getTestNATSConnection(&testing.T{})
|
|
if nc == nil {
|
|
b.Skip("NATS not available")
|
|
return
|
|
}
|
|
defer nc.Close()
|
|
|
|
store, err := NewJetStreamEventStore(nc, fmt.Sprintf("bench-single-%d", time.Now().UnixNano()))
|
|
if err != nil {
|
|
b.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
actorID := "actor-single"
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
err = store.SaveEvent(event)
|
|
if err != nil {
|
|
b.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := store.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
b.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
}
|
|
b.StopTimer()
|
|
}
|