//go:build integration package store import ( "context" "log" "os" "testing" "time" "git.flowmade.one/flowmade-one/aether" "github.com/nats-io/nats.go" "github.com/nats-io/nats-server/v2/server" ) func setupNatsServer() (*server.Server, *nats.Conn, func()) { opts := &server.Options{ Port: -1, JetStream: true, StoreDir: "/tmp/nats-test-" + time.Now().Format("20060102150405"), } s, err := server.NewServer(opts) if err != nil { log.Fatal("Failed to create NATS server:", err) } go s.Start() if !s.ReadyForConnections(4 * time.Second) { log.Fatal("NATS server failed to start") } nc, err := nats.Connect(s.ClientURL()) if err != nil { s.Shutdown() log.Fatal("Failed to connect to NATS:", err) } return s, nc, func() { nc.Close() s.Shutdown() os.RemoveAll(opts.StoreDir) } } func TestUpdateVersionCache(t *testing.T) { s, nc, cleanup := setupNatsServer() defer cleanup() ctx := context.Background() store, err := NewJetStreamEventStore(nc, "test_update_cache") if err != nil { t.Fatalf("Failed to create store: %v", err) } defer store.Close(ctx) actorID := "test-actor-1" tests := []struct { name string cachedVersion int64 newVersion int64 expectUpdate bool expectVersion int64 }{ { name: "update when new version is greater", cachedVersion: 5, newVersion: 10, expectUpdate: true, expectVersion: 10, }, { name: "do not update when new version is equal", cachedVersion: 5, newVersion: 5, expectUpdate: false, expectVersion: 5, }, { name: "do not update when new version is less", cachedVersion: 10, newVersion: 5, expectUpdate: false, expectVersion: 10, }, { name: "update when no cached version exists", cachedVersion: 0, newVersion: 1, expectUpdate: true, expectVersion: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Set up cached version store.versions = make(map[string]int64) store.versions[actorID] = tt.cachedVersion // Call UpdateVersionCache store.UpdateVersionCache(actorID, tt.newVersion) // Verify result if tt.expectUpdate { if version, ok := store.versions[actorID]; !ok { t.Error("Expected version to be updated but it wasn't cached") } else if version != tt.expectVersion { t.Errorf("Expected version %d, got %d", tt.expectVersion, version) } } else { if version, ok := store.versions[actorID]; !ok { t.Error("Expected version to remain cached") } else if version != tt.expectVersion { t.Errorf("Expected version to remain %d, got %d", tt.expectVersion, version) } } }) } } func TestUpdateVersionCache_Concurrent(t *testing.T) { s, nc, cleanup := setupNatsServer() defer cleanup() ctx := context.Background() store, err := NewJetStreamEventStore(nc, "test_update_cache_concurrent") if err != nil { t.Fatalf("Failed to create store: %v", err) } defer store.Close(ctx) actorID := "concurrent-actor" store.versions[actorID] = 1 const numGoroutines = 50 const maxVersion = 100 var done = make(chan struct{}) var updates int32 for i := 0; i < numGoroutines; i++ { version := int64(1 + (i % maxVersion)) go func(v int64) { store.UpdateVersionCache(actorID, v) select { case <-done: default: updates++ } }(version) } close(done) time.Sleep(100 * time.Millisecond) finalVersion := store.versions[actorID] if finalVersion > maxVersion { t.Errorf("Expected version to be at most %d, got %d", maxVersion, finalVersion) } } func TestSubscribeToEventStored(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } s, nc, cleanup := setupNatsServer() defer cleanup() ctx := context.Background() store, err := NewJetStreamEventStore(nc, "test_subscribe_event_stored") if err != nil { t.Fatalf("Failed to create store: %v", err) } defer store.Close(ctx) eventBusWithStore := NewNATSEventBusWithBroadcaster(nc, store, "") if eventBusWithStore == nil { t.Fatalf("Failed to create event bus with broadcaster") } defer eventBusWithStore.Stop() ch := eventBusWithStore.SubscribeToEventStored("*") if ch == nil { t.Fatal("SubscribeToEventStored returned nil channel") } actorID := "subscribe-test-actor" event := &aether.Event{ ID: uuid.New().String(), EventType: "TestEvent", ActorID: actorID, Version: 1, Data: map[string]interface{}{"key": "value"}, Timestamp: time.Now(), } eventBusWithStore.Publish("", event) select { case receivedEvent := <-ch: if receivedEvent.EventType != aether.EventTypeEventStored { t.Errorf("Expected EventTypeEventStored, got %s", receivedEvent.EventType) } if receivedEvent.ActorID != actorID { t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID) } data, ok := receivedEvent.Data["actorId"].(string) if !ok || data != actorID { t.Errorf("Expected actorId in data to be %s", actorID) } case <-time.After(2 * time.Second): t.Fatal("Timeout waiting for EventStored event") } } func TestCrossNodeBroadcasting_SingleNode(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } s, nc, cleanup := setupNatsServer() defer cleanup() ctx := context.Background() store, err := NewJetStreamEventStore(nc, "test_single_node_broadcast") if err != nil { t.Fatalf("Failed to create store: %v", err) } defer store.Close(ctx) eventBus := NewNATSEventBusWithBroadcaster(nc, store, "") defer eventBus.Stop() actorID := "broadcast-test-actor-1" localCh := eventBus.Subscribe("") event := &aether.Event{ ID: uuid.New().String(), EventType: "OrderPlaced", ActorID: actorID, Version: 1, Data: map[string]interface{}{"total": 99.99}, Timestamp: time.Now(), } eventBus.Publish("", event) select { case receivedEvent := <-localCh: if receivedEvent.EventType != "OrderPlaced" { t.Errorf("Expected OrderPlaced, got %s", receivedEvent.EventType) } if receivedEvent.ActorID != actorID { t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID) } case <-time.After(2 * time.Second): t.Fatal("Timeout waiting for broadcast event") } } func TestCrossNodeBroadcasting_MultiNode(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } s1, nc1, cleanup1 := setupNatsServer() defer cleanup1() s2, nc2, cleanup2 := setupNatsServer() defer cleanup2() ctx := context.Background() store1, err := NewJetStreamEventStore(nc1, "test_multi_node_1") if err != nil { t.Fatalf("Failed to create store 1: %v", err) } store2, err := NewJetStreamEventStore(nc2, "test_multi_node_2") if err != nil { t.Fatalf("Failed to create store 2: %v", err) } eventBus1 := NewNATSEventBusWithBroadcaster(nc1, store1, "") eventBus2 := NewNATSEventBusWithBroadcaster(nc2, store2, "") defer eventBus1.Stop() defer eventBus2.Stop() actorID := "multi-node-actor" receiverCh := eventBus2.Subscribe("") event := &aether.Event{ ID: uuid.New().String(), EventType: "InventoryReserved", ActorID: actorID, Version: 1, Data: map[string]interface{}{"quantity": 5}, Timestamp: time.Now(), } eventBus1.Publish("", event) select { case receivedEvent := <-receiverCh: if receivedEvent.EventType != "InventoryReserved" { t.Errorf("Expected InventoryReserved, got %s", receivedEvent.EventType) } if receivedEvent.ActorID != actorID { t.Errorf("Expected actorID %s, got %s", actorID, receivedEvent.ActorID) } case <-time.After(3 * time.Second): t.Fatal("Timeout waiting for cross-node event") } } func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } s, nc, cleanup := setupNatsServer() defer cleanup() ctx := context.Background() tenantAStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-a") if err != nil { t.Fatalf("Failed to create tenant A store: %v", err) } tenantBStore, err := NewJetStreamEventStoreWithNamespace(nc, "events", "tenant-b") if err != nil { t.Fatalf("Failed to create tenant B store: %v", err) } tenantAEventBus := NewNATSEventBusWithBroadcaster(nc, tenantAStore, "tenant-a") tenantBEventBus := NewNATSEventBusWithBroadcaster(nc, tenantBStore, "tenant-b") defer tenantAEventBus.Stop() defer tenantBEventBus.Stop() tenantACh := tenantAEventBus.Subscribe("tenant-a") tenantBCh := tenantBEventBus.Subscribe("tenant-b") actorID := "tenant-actor" event := &aether.Event{ ID: uuid.New().String(), EventType: "TenantEvent", ActorID: actorID, Version: 1, Data: map[string]interface{}{"data": "tenant-a"}, Timestamp: time.Now(), } tenantAEventBus.Publish("tenant-a", event) select { case receivedEvent := <-tenantACh: if receivedEvent.EventType != "TenantEvent" { t.Errorf("Expected TenantEvent in tenant A, got %s", receivedEvent.EventType) } case <-time.After(2 * time.Second): t.Error("Timeout waiting for tenant A to receive event") } select { case <-tenantBCh: t.Error("Tenant B should not receive tenant A's events") case <-time.After(1 * time.Second): // Expected - tenant B should not receive events from tenant A } } func TestUpdateVersionCache_EventStored(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } s, nc, cleanup := setupNatsServer() defer cleanup() ctx := context.Background() store, err := NewJetStreamEventStore(nc, "test_version_cache_eventstored") if err != nil { t.Fatalf("Failed to create store: %v", err) } eventBus := NewNATSEventBusWithBroadcaster(nc, store, "") defer eventBus.Stop() actorID := "version-cache-actor" store.UpdateVersionCache(actorID, 5) event := &aether.Event{ ID: uuid.New().String(), EventType: "TestEvent", ActorID: actorID, Version: 10, Data: map[string]interface{}{"test": true}, Timestamp: time.Now(), } eventBus.Publish("", event) time.Sleep(100 * time.Millisecond) storedVersion, err := store.GetLatestVersion(actorID) if err != nil { t.Fatalf("Failed to get latest version: %v", err) } if storedVersion != 10 { t.Errorf("Expected version 10, got %d", storedVersion) } cacheVersion, ok := store.GetCachedVersion(actorID) if !ok { t.Error("Expected version to be in cache") } else if cacheVersion != 10 { t.Errorf("Expected cached version 10, got %d", cacheVersion) } }