Implements cache invalidation on GetLatestVersion when external writers modify the JetStream stream. The strategy ensures consistency in multi-store scenarios while maintaining performance for the single-writer case. Changes: - Add cache invalidation logic to GetLatestVersion() that detects stale cache - Document version cache behavior in JetStreamEventStore struct comment - Add detailed documentation in CLAUDE.md about cache invalidation strategy - Add TestJetStreamEventStore_CacheInvalidationOnExternalWrite integration test - Cache is invalidated by deleting entry, forcing fresh fetch on next check The implementation follows the acceptance criteria by: 1. Documenting the single-writer assumption in code comments 2. Implementing cache invalidation on GetLatestVersion miss 3. Adding comprehensive test for external write scenarios Closes #126 Co-Authored-By: Claude Code <noreply@anthropic.com>
1530 lines
39 KiB
Go
1530 lines
39 KiB
Go
//go:build integration
|
|
|
|
package store
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
// These integration tests require a running NATS server with JetStream enabled.
|
|
// Run with: go test -tags=integration -v ./store/...
|
|
//
|
|
// To start NATS with JetStream: nats-server -js
|
|
|
|
// getTestNATSConnection creates a new NATS connection for testing.
|
|
// Returns nil if NATS is not available, allowing tests to skip gracefully.
|
|
func getTestNATSConnection(t *testing.T) *nats.Conn {
|
|
nc, err := nats.Connect(nats.DefaultURL)
|
|
if err != nil {
|
|
t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err)
|
|
return nil
|
|
}
|
|
|
|
// Verify JetStream is available
|
|
js, err := nc.JetStream()
|
|
if err != nil {
|
|
nc.Close()
|
|
t.Skipf("JetStream not available: %v (run 'nats-server -js' to enable integration tests)", err)
|
|
return nil
|
|
}
|
|
|
|
// Test JetStream connectivity
|
|
_, err = js.AccountInfo()
|
|
if err != nil {
|
|
nc.Close()
|
|
t.Skipf("JetStream not enabled: %v (run 'nats-server -js' to enable integration tests)", err)
|
|
return nil
|
|
}
|
|
|
|
return nc
|
|
}
|
|
|
|
// uniqueStreamName generates a unique stream name for test isolation
|
|
func uniqueStreamName(prefix string) string {
|
|
return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
|
|
}
|
|
|
|
// cleanupStream deletes a stream to clean up after tests
|
|
func cleanupStream(nc *nats.Conn, streamName string) {
|
|
js, err := nc.JetStream()
|
|
if err != nil {
|
|
return
|
|
}
|
|
_ = js.DeleteStream(streamName)
|
|
}
|
|
|
|
// === Stream Creation and Configuration Tests ===
|
|
|
|
func TestJetStreamEventStore_StreamCreation(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-stream-creation")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create JetStreamEventStore: %v", err)
|
|
}
|
|
|
|
if store == nil {
|
|
t.Fatal("expected non-nil store")
|
|
}
|
|
|
|
// Verify stream was created
|
|
js, _ := nc.JetStream()
|
|
info, err := js.StreamInfo(streamName)
|
|
if err != nil {
|
|
t.Fatalf("stream was not created: %v", err)
|
|
}
|
|
|
|
if info.Config.Name != streamName {
|
|
t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, streamName)
|
|
}
|
|
|
|
// Verify stream has correct subjects
|
|
expectedSubjects := []string{
|
|
fmt.Sprintf("%s.events.>", streamName),
|
|
fmt.Sprintf("%s.snapshots.>", streamName),
|
|
}
|
|
for _, expected := range expectedSubjects {
|
|
found := false
|
|
for _, subject := range info.Config.Subjects {
|
|
if subject == expected {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
t.Errorf("expected subject %q not found in stream config", expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_StreamCreationWithConfig(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-stream-config")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
config := JetStreamConfig{
|
|
StreamRetention: 7 * 24 * time.Hour, // 7 days
|
|
ReplicaCount: 1,
|
|
}
|
|
|
|
store, err := NewJetStreamEventStoreWithConfig(nc, streamName, config)
|
|
if err != nil {
|
|
t.Fatalf("failed to create JetStreamEventStore with config: %v", err)
|
|
}
|
|
|
|
if store == nil {
|
|
t.Fatal("expected non-nil store")
|
|
}
|
|
|
|
// Verify stream configuration
|
|
js, _ := nc.JetStream()
|
|
info, err := js.StreamInfo(streamName)
|
|
if err != nil {
|
|
t.Fatalf("stream was not created: %v", err)
|
|
}
|
|
|
|
if info.Config.MaxAge != 7*24*time.Hour {
|
|
t.Errorf("MaxAge mismatch: got %v, want %v", info.Config.MaxAge, 7*24*time.Hour)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_StreamCreationWithNamespace(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
baseName := uniqueStreamName("test-ns")
|
|
namespace := "tenant-abc"
|
|
expectedStreamName := fmt.Sprintf("%s_%s", namespace, baseName)
|
|
defer cleanupStream(nc, expectedStreamName)
|
|
|
|
store, err := NewJetStreamEventStoreWithNamespace(nc, baseName, namespace)
|
|
if err != nil {
|
|
t.Fatalf("failed to create JetStreamEventStore with namespace: %v", err)
|
|
}
|
|
|
|
if store.GetNamespace() != namespace {
|
|
t.Errorf("namespace mismatch: got %q, want %q", store.GetNamespace(), namespace)
|
|
}
|
|
|
|
if store.GetStreamName() != expectedStreamName {
|
|
t.Errorf("stream name mismatch: got %q, want %q", store.GetStreamName(), expectedStreamName)
|
|
}
|
|
|
|
// Verify namespaced stream was created
|
|
js, _ := nc.JetStream()
|
|
info, err := js.StreamInfo(expectedStreamName)
|
|
if err != nil {
|
|
t.Fatalf("namespaced stream was not created: %v", err)
|
|
}
|
|
|
|
if info.Config.Name != expectedStreamName {
|
|
t.Errorf("stream name mismatch: got %q, want %q", info.Config.Name, expectedStreamName)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_StreamAlreadyExists(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-stream-exists")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
// Create first store (creates stream)
|
|
store1, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create first store: %v", err)
|
|
}
|
|
if store1 == nil {
|
|
t.Fatal("expected non-nil store")
|
|
}
|
|
|
|
// Create second store with same stream name (should reuse existing stream)
|
|
store2, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create second store with existing stream: %v", err)
|
|
}
|
|
if store2 == nil {
|
|
t.Fatal("expected non-nil store")
|
|
}
|
|
}
|
|
|
|
// === SaveEvent Tests ===
|
|
|
|
func TestJetStreamEventStore_SaveEvent_PersistsToJetStream(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-save-event")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-123",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{
|
|
"total": 100.50,
|
|
"currency": "USD",
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
err = store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// Verify event was persisted by retrieving it
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
retrieved := events[0]
|
|
if retrieved.ID != event.ID {
|
|
t.Errorf("ID mismatch: got %q, want %q", retrieved.ID, event.ID)
|
|
}
|
|
if retrieved.EventType != event.EventType {
|
|
t.Errorf("EventType mismatch: got %q, want %q", retrieved.EventType, event.EventType)
|
|
}
|
|
if retrieved.ActorID != event.ActorID {
|
|
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, event.ActorID)
|
|
}
|
|
if retrieved.Version != event.Version {
|
|
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, event.Version)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_SaveEvent_MultipleEvents(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-multi-events")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save multiple events
|
|
for i := 1; i <= 10; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "OrderUpdated",
|
|
ActorID: "order-456",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{"update": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed for event %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 10 {
|
|
t.Errorf("expected 10 events, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_SaveEvent_WithMetadata(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-event-metadata")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-meta",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
event.SetCorrelationID("corr-123")
|
|
event.SetCausationID("cause-456")
|
|
event.SetUserID("user-789")
|
|
event.SetTraceID("trace-abc")
|
|
event.SetSpanID("span-def")
|
|
|
|
err = store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("order-456", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 1 {
|
|
t.Fatalf("expected 1 event, got %d", len(events))
|
|
}
|
|
|
|
retrieved := events[0]
|
|
if retrieved.GetCorrelationID() != "corr-123" {
|
|
t.Errorf("correlationId mismatch: got %q", retrieved.GetCorrelationID())
|
|
}
|
|
if retrieved.GetCausationID() != "cause-456" {
|
|
t.Errorf("causationId mismatch: got %q", retrieved.GetCausationID())
|
|
}
|
|
if retrieved.GetUserID() != "user-789" {
|
|
t.Errorf("userId mismatch: got %q", retrieved.GetUserID())
|
|
}
|
|
if retrieved.GetTraceID() != "trace-abc" {
|
|
t.Errorf("traceId mismatch: got %q", retrieved.GetTraceID())
|
|
}
|
|
if retrieved.GetSpanID() != "span-def" {
|
|
t.Errorf("spanId mismatch: got %q", retrieved.GetSpanID())
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_SaveEvent_Deduplication(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-dedup")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
event := &aether.Event{
|
|
ID: "evt-dedup-test",
|
|
EventType: "OrderPlaced",
|
|
ActorID: "order-456",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Save the same event twice (same event ID)
|
|
err = store.SaveEvent(event)
|
|
if err != nil {
|
|
t.Fatalf("first SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// Second save with same event ID and same version should fail with version conflict
|
|
// because version 1 already exists
|
|
err = store.SaveEvent(event)
|
|
if err == nil {
|
|
// If no error, that's a problem - we expected version conflict
|
|
t.Error("expected error when saving duplicate event, got nil")
|
|
} else if !errors.Is(err, aether.ErrVersionConflict) {
|
|
t.Errorf("expected version conflict error, got: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_SaveEvent_VersionConflict(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-version-conflict")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save first event with version 5
|
|
event1 := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 5,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent failed for first event: %v", err)
|
|
}
|
|
|
|
// Attempt to save event with lower version (should fail)
|
|
event2 := &aether.Event{
|
|
ID: "evt-2",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 3,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
err = store.SaveEvent(event2)
|
|
if err == nil {
|
|
t.Fatal("expected error when saving event with lower version, got nil")
|
|
}
|
|
|
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
t.Errorf("expected ErrVersionConflict, got %v", err)
|
|
}
|
|
|
|
var versionErr *aether.VersionConflictError
|
|
if !errors.As(err, &versionErr) {
|
|
t.Fatalf("expected VersionConflictError, got %T", err)
|
|
}
|
|
|
|
if versionErr.ActorID != "actor-123" {
|
|
t.Errorf("ActorID mismatch: got %q, want %q", versionErr.ActorID, "actor-123")
|
|
}
|
|
if versionErr.CurrentVersion != 5 {
|
|
t.Errorf("CurrentVersion mismatch: got %d, want %d", versionErr.CurrentVersion, 5)
|
|
}
|
|
if versionErr.AttemptedVersion != 3 {
|
|
t.Errorf("AttemptedVersion mismatch: got %d, want %d", versionErr.AttemptedVersion, 3)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_SaveEvent_VersionConflictEqual(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-version-equal")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save first event with version 5
|
|
event1 := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 5,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
// Attempt to save event with equal version (should fail)
|
|
event2 := &aether.Event{
|
|
ID: "evt-2",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 5,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
err = store.SaveEvent(event2)
|
|
if err == nil {
|
|
t.Fatal("expected error when saving event with equal version, got nil")
|
|
}
|
|
|
|
if !errors.Is(err, aether.ErrVersionConflict) {
|
|
t.Errorf("expected ErrVersionConflict, got %v", err)
|
|
}
|
|
}
|
|
|
|
// === GetEvents Tests ===
|
|
|
|
func TestJetStreamEventStore_GetEvents_RetrievesInOrder(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-get-order")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save events in order
|
|
for i := 1; i <= 10; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{"index": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
events, err := store.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 10 {
|
|
t.Fatalf("expected 10 events, got %d", len(events))
|
|
}
|
|
|
|
// Verify order
|
|
for i, event := range events {
|
|
expectedID := fmt.Sprintf("evt-%d", i+1)
|
|
if event.ID != expectedID {
|
|
t.Errorf("event %d: got ID %q, want %q", i, event.ID, expectedID)
|
|
}
|
|
expectedVersion := int64(i + 1)
|
|
if event.Version != expectedVersion {
|
|
t.Errorf("event %d: got Version %d, want %d", i, event.Version, expectedVersion)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetEvents_FromVersionFilters(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-from-version")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save events with versions 1-10
|
|
for i := 1; i <= 10; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
testCases := []struct {
|
|
name string
|
|
fromVersion int64
|
|
expectedLen int
|
|
minVersion int64
|
|
}{
|
|
{"from version 0", 0, 10, 1},
|
|
{"from version 5", 5, 5, 6},
|
|
{"from version 10", 10, 0, 0},
|
|
{"from version 11", 11, 0, 0},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
events, err := store.GetEvents("actor-123", tc.fromVersion)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != tc.expectedLen {
|
|
t.Errorf("expected %d events, got %d", tc.expectedLen, len(events))
|
|
}
|
|
|
|
// Verify all returned events have version > fromVersion
|
|
for _, event := range events {
|
|
if event.Version <= tc.fromVersion {
|
|
t.Errorf("event version %d is not greater than fromVersion %d", event.Version, tc.fromVersion)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetEvents_NonExistentActor(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-nonexistent")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
events, err := store.GetEvents("non-existent-actor", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents should not error for non-existent actor: %v", err)
|
|
}
|
|
|
|
if len(events) != 0 {
|
|
t.Errorf("expected 0 events for non-existent actor, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetEvents_MultipleActors(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-multi-actors")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save events for different actors
|
|
actors := []string{"actor-1", "actor-2", "actor-3"}
|
|
for _, actorID := range actors {
|
|
for i := 1; i <= 3; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%s-%d", actorID, i),
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verify each actor has its own events
|
|
for _, actorID := range actors {
|
|
events, err := store.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed for %s: %v", actorID, err)
|
|
}
|
|
if len(events) != 3 {
|
|
t.Errorf("expected 3 events for %s, got %d", actorID, len(events))
|
|
}
|
|
for _, event := range events {
|
|
if event.ActorID != actorID {
|
|
t.Errorf("event has wrong ActorID: got %q, want %q", event.ActorID, actorID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetEventsWithErrors(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-with-errors")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save valid events
|
|
for i := 1; i <= 5; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
result, err := store.GetEventsWithErrors("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEventsWithErrors failed: %v", err)
|
|
}
|
|
|
|
if len(result.Events) != 5 {
|
|
t.Errorf("expected 5 events, got %d", len(result.Events))
|
|
}
|
|
|
|
if result.HasErrors() {
|
|
t.Errorf("expected no errors, got %d", len(result.Errors))
|
|
}
|
|
}
|
|
|
|
// === GetLatestVersion Tests ===
|
|
|
|
func TestJetStreamEventStore_GetLatestVersion(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-latest-version")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save events with versions 1-5
|
|
for i := 1; i <= 5; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
latestVersion, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
|
|
if latestVersion != 5 {
|
|
t.Errorf("expected latest version 5, got %d", latestVersion)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetLatestVersion_NonExistentActor(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-latest-nonexistent")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
latestVersion, err := store.GetLatestVersion("non-existent-actor")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion should not error for non-existent actor: %v", err)
|
|
}
|
|
|
|
if latestVersion != 0 {
|
|
t.Errorf("expected version 0 for non-existent actor, got %d", latestVersion)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetLatestVersion_UpdatesAfterNewEvent(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-latest-updates")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save first event
|
|
event1 := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
version1, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
if version1 != 1 {
|
|
t.Errorf("expected version 1, got %d", version1)
|
|
}
|
|
|
|
// Save second event
|
|
event2 := &aether.Event{
|
|
ID: "evt-2",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 10,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event2); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
|
|
version2, err := store.GetLatestVersion("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion failed: %v", err)
|
|
}
|
|
if version2 != 10 {
|
|
t.Errorf("expected version 10, got %d", version2)
|
|
}
|
|
}
|
|
|
|
// === Snapshot Tests ===
|
|
|
|
func TestJetStreamEventStore_SaveAndGetSnapshot(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-snapshot")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-123",
|
|
Version: 10,
|
|
State: map[string]interface{}{
|
|
"balance": 100.50,
|
|
"status": "active",
|
|
},
|
|
Timestamp: time.Date(2026, 1, 9, 12, 0, 0, 0, time.UTC),
|
|
}
|
|
|
|
err = store.SaveSnapshot(snapshot)
|
|
if err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved.ActorID != snapshot.ActorID {
|
|
t.Errorf("ActorID mismatch: got %q, want %q", retrieved.ActorID, snapshot.ActorID)
|
|
}
|
|
if retrieved.Version != snapshot.Version {
|
|
t.Errorf("Version mismatch: got %d, want %d", retrieved.Version, snapshot.Version)
|
|
}
|
|
|
|
// Check state values (JSON unmarshaling may change types)
|
|
balance, ok := retrieved.State["balance"].(float64)
|
|
if !ok {
|
|
t.Errorf("balance is not float64: %T", retrieved.State["balance"])
|
|
} else if balance != 100.50 {
|
|
t.Errorf("balance mismatch: got %v, want %v", balance, 100.50)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetLatestSnapshot_MultipleSnapshots(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-multi-snapshot")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Save multiple snapshots
|
|
for i := 1; i <= 5; i++ {
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-123",
|
|
Version: int64(i * 10),
|
|
State: map[string]interface{}{
|
|
"iteration": i,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed for version %d: %v", i*10, err)
|
|
}
|
|
}
|
|
|
|
// Get latest should return the most recently saved
|
|
retrieved, err := store.GetLatestSnapshot("actor-123")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
if retrieved.Version != 50 {
|
|
t.Errorf("expected version 50, got %d", retrieved.Version)
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_GetLatestSnapshot_NonExistent(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-snapshot-nonexistent")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
_, err = store.GetLatestSnapshot("non-existent-actor")
|
|
if err == nil {
|
|
t.Error("expected error when getting snapshot for non-existent actor")
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_SnapshotWithComplexState(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-snapshot-complex")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
complexState := map[string]interface{}{
|
|
"string": "hello",
|
|
"integer": float64(42), // JSON numbers are float64
|
|
"float": 3.14159,
|
|
"boolean": true,
|
|
"null": nil,
|
|
"array": []interface{}{"a", "b", "c"},
|
|
"nested": map[string]interface{}{
|
|
"level1": map[string]interface{}{
|
|
"level2": "deep value",
|
|
},
|
|
},
|
|
}
|
|
|
|
snapshot := &aether.ActorSnapshot{
|
|
ActorID: "actor-complex",
|
|
Version: 1,
|
|
State: complexState,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if err := store.SaveSnapshot(snapshot); err != nil {
|
|
t.Fatalf("SaveSnapshot failed: %v", err)
|
|
}
|
|
|
|
retrieved, err := store.GetLatestSnapshot("actor-complex")
|
|
if err != nil {
|
|
t.Fatalf("GetLatestSnapshot failed: %v", err)
|
|
}
|
|
|
|
// Verify fields
|
|
if retrieved.State["string"] != "hello" {
|
|
t.Errorf("string mismatch: got %v", retrieved.State["string"])
|
|
}
|
|
if retrieved.State["boolean"] != true {
|
|
t.Errorf("boolean mismatch: got %v", retrieved.State["boolean"])
|
|
}
|
|
}
|
|
|
|
// === Namespace Isolation Tests ===
|
|
|
|
func TestJetStreamEventStore_NamespaceIsolation(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
baseName := uniqueStreamName("test-isolation")
|
|
ns1 := "tenant-a"
|
|
ns2 := "tenant-b"
|
|
expectedStream1 := fmt.Sprintf("%s_%s", ns1, baseName)
|
|
expectedStream2 := fmt.Sprintf("%s_%s", ns2, baseName)
|
|
defer cleanupStream(nc, expectedStream1)
|
|
defer cleanupStream(nc, expectedStream2)
|
|
|
|
store1, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns1)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store1: %v", err)
|
|
}
|
|
|
|
store2, err := NewJetStreamEventStoreWithNamespace(nc, baseName, ns2)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store2: %v", err)
|
|
}
|
|
|
|
// Save events to namespace 1
|
|
event1 := &aether.Event{
|
|
ID: "evt-ns1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{"namespace": "tenant-a"},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store1.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent failed for ns1: %v", err)
|
|
}
|
|
|
|
// Save events to namespace 2
|
|
event2 := &aether.Event{
|
|
ID: "evt-ns2",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{"namespace": "tenant-b"},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store2.SaveEvent(event2); err != nil {
|
|
t.Fatalf("SaveEvent failed for ns2: %v", err)
|
|
}
|
|
|
|
// Verify isolation: store1 only sees tenant-a events
|
|
events1, err := store1.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed for store1: %v", err)
|
|
}
|
|
if len(events1) != 1 {
|
|
t.Errorf("store1: expected 1 event, got %d", len(events1))
|
|
}
|
|
if events1[0].Data["namespace"] != "tenant-a" {
|
|
t.Errorf("store1: got event from wrong namespace: %v", events1[0].Data["namespace"])
|
|
}
|
|
|
|
// Verify isolation: store2 only sees tenant-b events
|
|
events2, err := store2.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed for store2: %v", err)
|
|
}
|
|
if len(events2) != 1 {
|
|
t.Errorf("store2: expected 1 event, got %d", len(events2))
|
|
}
|
|
if events2[0].Data["namespace"] != "tenant-b" {
|
|
t.Errorf("store2: got event from wrong namespace: %v", events2[0].Data["namespace"])
|
|
}
|
|
}
|
|
|
|
// === Concurrency Tests ===
|
|
|
|
func TestJetStreamEventStore_ConcurrentWrites(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-concurrent")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
numGoroutines := 10
|
|
eventsPerGoroutine := 10
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(numGoroutines)
|
|
|
|
for g := 0; g < numGoroutines; g++ {
|
|
go func(goroutineID int) {
|
|
defer wg.Done()
|
|
actorID := fmt.Sprintf("actor-%d", goroutineID)
|
|
for i := 1; i <= eventsPerGoroutine; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d-%d", goroutineID, i),
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
t.Errorf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
}(g)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Verify each actor has all events
|
|
for g := 0; g < numGoroutines; g++ {
|
|
actorID := fmt.Sprintf("actor-%d", g)
|
|
events, err := store.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
t.Errorf("GetEvents failed for %s: %v", actorID, err)
|
|
continue
|
|
}
|
|
if len(events) != eventsPerGoroutine {
|
|
t.Errorf("expected %d events for %s, got %d", eventsPerGoroutine, actorID, len(events))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_ConcurrentVersionConflict(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-concurrent-conflict")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
numGoroutines := 50
|
|
var successCount int64
|
|
var conflictCount int64
|
|
var wg sync.WaitGroup
|
|
|
|
// All goroutines try to save version 1
|
|
wg.Add(numGoroutines)
|
|
for i := 0; i < numGoroutines; i++ {
|
|
go func(id int) {
|
|
defer wg.Done()
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", id),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-contested",
|
|
Version: 1,
|
|
Data: map[string]interface{}{"goroutine": id},
|
|
Timestamp: time.Now(),
|
|
}
|
|
err := store.SaveEvent(event)
|
|
if err == nil {
|
|
atomic.AddInt64(&successCount, 1)
|
|
} else if errors.Is(err, aether.ErrVersionConflict) {
|
|
atomic.AddInt64(&conflictCount, 1)
|
|
} else {
|
|
t.Errorf("unexpected error: %v", err)
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Exactly one should succeed
|
|
if successCount != 1 {
|
|
t.Errorf("expected exactly 1 success, got %d", successCount)
|
|
}
|
|
if conflictCount != int64(numGoroutines-1) {
|
|
t.Errorf("expected %d conflicts, got %d", numGoroutines-1, conflictCount)
|
|
}
|
|
|
|
// Verify only one event was stored
|
|
events, err := store.GetEvents("actor-contested", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 1 {
|
|
t.Errorf("expected 1 event, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
// === Connection Loss/Recovery Tests ===
|
|
|
|
func TestJetStreamEventStore_PersistenceAcrossConnections(t *testing.T) {
|
|
nc1 := getTestNATSConnection(t)
|
|
|
|
streamName := uniqueStreamName("test-persistence")
|
|
defer func() {
|
|
nc := getTestNATSConnection(t)
|
|
cleanupStream(nc, streamName)
|
|
nc.Close()
|
|
}()
|
|
|
|
// Create store and save events with first connection
|
|
store1, err := NewJetStreamEventStore(nc1, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store1: %v", err)
|
|
}
|
|
|
|
for i := 1; i <= 5; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: int64(i),
|
|
Data: map[string]interface{}{"index": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store1.SaveEvent(event); err != nil {
|
|
t.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// Close first connection
|
|
nc1.Close()
|
|
|
|
// Create new connection and store
|
|
nc2 := getTestNATSConnection(t)
|
|
defer nc2.Close()
|
|
|
|
store2, err := NewJetStreamEventStore(nc2, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store2: %v", err)
|
|
}
|
|
|
|
// Verify events are still there
|
|
events, err := store2.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 5 {
|
|
t.Errorf("expected 5 events after reconnection, got %d", len(events))
|
|
}
|
|
|
|
// Verify we can continue adding events
|
|
event6 := &aether.Event{
|
|
ID: "evt-6",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 6,
|
|
Data: map[string]interface{}{"index": 6},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store2.SaveEvent(event6); err != nil {
|
|
t.Fatalf("SaveEvent failed after reconnection: %v", err)
|
|
}
|
|
|
|
events, err = store2.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 6 {
|
|
t.Errorf("expected 6 events after adding one, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-multi-instance")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
// Create multiple store instances on the same stream
|
|
store1, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store1: %v", err)
|
|
}
|
|
|
|
store2, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store2: %v", err)
|
|
}
|
|
|
|
// Save events from store1
|
|
event1 := &aether.Event{
|
|
ID: "evt-from-store1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 1,
|
|
Data: map[string]interface{}{"source": "store1"},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store1.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent from store1 failed: %v", err)
|
|
}
|
|
|
|
// Read from store2
|
|
events, err := store2.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents from store2 failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 1 {
|
|
t.Errorf("store2 should see event from store1, got %d events", len(events))
|
|
}
|
|
|
|
// Save from store2 (continuing version sequence)
|
|
event2 := &aether.Event{
|
|
ID: "evt-from-store2",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-123",
|
|
Version: 2,
|
|
Data: map[string]interface{}{"source": "store2"},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store2.SaveEvent(event2); err != nil {
|
|
t.Fatalf("SaveEvent from store2 failed: %v", err)
|
|
}
|
|
|
|
// Read from store1
|
|
events, err = store1.GetEvents("actor-123", 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents from store1 failed: %v", err)
|
|
}
|
|
|
|
if len(events) != 2 {
|
|
t.Errorf("store1 should see both events, got %d events", len(events))
|
|
}
|
|
}
|
|
|
|
|
|
// === Cache Invalidation Tests ===
|
|
|
|
func TestJetStreamEventStore_CacheInvalidationOnExternalWrite(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-cache-invalidation")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
// Create two stores for the same stream
|
|
store1, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store1: %v", err)
|
|
}
|
|
|
|
store2, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store2: %v", err)
|
|
}
|
|
|
|
actorID := "actor-cache-test"
|
|
|
|
// store1: Save event v1 (caches version 1)
|
|
event1 := &aether.Event{
|
|
ID: "evt-1",
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: 1,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store1.SaveEvent(event1); err != nil {
|
|
t.Fatalf("SaveEvent from store1 failed: %v", err)
|
|
}
|
|
|
|
// Verify store1 sees version 1 (uses cache)
|
|
v1, err := store1.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion from store1 failed: %v", err)
|
|
}
|
|
if v1 != 1 {
|
|
t.Errorf("store1 should see version 1, got %d", v1)
|
|
}
|
|
|
|
// store2: Save event v2 (external write from store1's perspective)
|
|
event2 := &aether.Event{
|
|
ID: "evt-2",
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: 2,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store2.SaveEvent(event2); err != nil {
|
|
t.Fatalf("SaveEvent from store2 failed: %v", err)
|
|
}
|
|
|
|
// store1: GetLatestVersion should detect external write and return v2
|
|
// (This triggers cache invalidation because actual version > cached version)
|
|
v2, err := store1.GetLatestVersion(actorID)
|
|
if err != nil {
|
|
t.Fatalf("GetLatestVersion from store1 (after external write) failed: %v", err)
|
|
}
|
|
if v2 != 2 {
|
|
t.Errorf("store1 should see version 2 after external write, got %d", v2)
|
|
}
|
|
|
|
// store2: Save event v3 (another external write)
|
|
event3 := &aether.Event{
|
|
ID: "evt-3",
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: 3,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store2.SaveEvent(event3); err != nil {
|
|
t.Fatalf("SaveEvent from store2 (v3) failed: %v", err)
|
|
}
|
|
|
|
// store1: After cache invalidation, SaveEvent should use fresh data from JetStream
|
|
event4 := &aether.Event{
|
|
ID: "evt-4",
|
|
EventType: "TestEvent",
|
|
ActorID: actorID,
|
|
Version: 4,
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store1.SaveEvent(event4); err != nil {
|
|
t.Fatalf("SaveEvent from store1 (after cache invalidation) failed: %v", err)
|
|
}
|
|
|
|
// Verify all 4 events are persisted
|
|
events, err := store1.GetEvents(actorID, 0)
|
|
if err != nil {
|
|
t.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
if len(events) != 4 {
|
|
t.Errorf("expected 4 events after cache invalidation, got %d", len(events))
|
|
}
|
|
}
|
|
|
|
// === Interface Compliance Tests ===
|
|
|
|
func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) {
|
|
var _ aether.EventStore = (*JetStreamEventStore)(nil)
|
|
}
|
|
|
|
func TestJetStreamEventStore_ImplementsEventStoreWithErrors(t *testing.T) {
|
|
var _ aether.EventStoreWithErrors = (*JetStreamEventStore)(nil)
|
|
}
|
|
|
|
func TestJetStreamEventStore_ImplementsSnapshotStore(t *testing.T) {
|
|
nc := getTestNATSConnection(t)
|
|
defer nc.Close()
|
|
|
|
streamName := uniqueStreamName("test-interface")
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
t.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Verify it has all SnapshotStore methods
|
|
_ = store.SaveEvent
|
|
_ = store.GetEvents
|
|
_ = store.GetLatestVersion
|
|
_ = store.GetLatestSnapshot
|
|
_ = store.SaveSnapshot
|
|
}
|
|
|
|
// === Benchmarks ===
|
|
|
|
func BenchmarkJetStreamEventStore_SaveEvent(b *testing.B) {
|
|
nc, err := nats.Connect(nats.DefaultURL)
|
|
if err != nil {
|
|
b.Skipf("NATS not available: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
streamName := fmt.Sprintf("bench-save-%d", time.Now().UnixNano())
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
b.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchmarkEvent",
|
|
ActorID: "actor-bench",
|
|
Version: int64(i + 1),
|
|
Data: map[string]interface{}{"value": i},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
b.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func BenchmarkJetStreamEventStore_GetEvents(b *testing.B) {
|
|
nc, err := nats.Connect(nats.DefaultURL)
|
|
if err != nil {
|
|
b.Skipf("NATS not available: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
|
|
streamName := fmt.Sprintf("bench-get-%d", time.Now().UnixNano())
|
|
defer cleanupStream(nc, streamName)
|
|
|
|
store, err := NewJetStreamEventStore(nc, streamName)
|
|
if err != nil {
|
|
b.Fatalf("failed to create store: %v", err)
|
|
}
|
|
|
|
// Pre-populate with events
|
|
for i := 0; i < 100; i++ {
|
|
event := &aether.Event{
|
|
ID: fmt.Sprintf("evt-%d", i),
|
|
EventType: "BenchmarkEvent",
|
|
ActorID: "actor-bench",
|
|
Version: int64(i + 1),
|
|
Data: map[string]interface{}{},
|
|
Timestamp: time.Now(),
|
|
}
|
|
if err := store.SaveEvent(event); err != nil {
|
|
b.Fatalf("SaveEvent failed: %v", err)
|
|
}
|
|
}
|
|
|
|
b.ResetTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
_, err := store.GetEvents("actor-bench", 0)
|
|
if err != nil {
|
|
b.Fatalf("GetEvents failed: %v", err)
|
|
}
|
|
}
|
|
}
|