//go:build integration // +build integration package store import ( "context" "fmt" "log" "testing" "time" "git.flowmade.one/flowmade-one/aether" "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // generateStreamName creates a unique stream name for each test run func generateStreamName(baseName string) string { return fmt.Sprintf("%s_%s_%d", baseName, "tv149", time.Now().UnixNano()%100000000) } // cleanupStream deletes a JetStream stream if it exists. func cleanupStream(nc *nats.Conn, streamName string) { js, err := nc.JetStream() if err != nil { return } err = js.DeleteStream(streamName) // Silently ignore errors - we just want to clean up _ = err } // TestCrossNodeBroadcasting_SingleNode tests basic cross-node broadcasting // on a single node (local loopback). func TestCrossNodeBroadcasting_SingleNode(t *testing.T) { nc, err := nats.Connect(nats.DefaultURL) require.NoError(t, err) defer nc.Close() streamName := generateStreamName("broadcast_single") cleanupStream(nc, streamName) // Create NATS event bus natsBus, err := aether.NewNATSEventBus(nc) require.NoError(t, err) defer natsBus.Stop() // Create event store with broadcaster store, err := NewJetStreamEventStoreWithBroadcaster( nc, streamName, natsBus, "tenant-single", ) require.NoError(t, err) // Subscribe to events eventCh := natsBus.Subscribe("tenant-single") // Save event testEvent := &aether.Event{ ID: "event-1", EventType: "TestEvent", ActorID: "actor-1", Version: 1, Data: map[string]interface{}{"test": "single"}, Timestamp: time.Now(), } err = store.SaveEvent(testEvent) require.NoError(t, err) log.Printf("Saved event: %s", testEvent.ID) // Receive event via NATS (NATSBroadcast may have wrapped in EventStored) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() select { case received := <-eventCh: // The received event should have the same actor and version assert.Equal(t, "actor-1", received.ActorID) assert.Equal(t, int64(1), received.Version) // Event type might be original or EventStored wrapper if received.EventType == aether.EventTypeEventStored { assert.Equal(t, "actor-1", received.Data["actorId"].(string)) log.Printf("Received EventStored wrapper: %s", received.ID) } else { log.Printf("Received via NATS: %s (type: %s)", received.ID, received.EventType) } case <-ctx.Done(): t.Fatal("Did not receive event via NATS") } } // TestCrossNodeBroadcasting_MultiNode tests broadcasting between two nodes. func TestCrossNodeBroadcasting_MultiNode(t *testing.T) { nc, err := nats.Connect(nats.DefaultURL) require.NoError(t, err) defer nc.Close() streamName := generateStreamName("broadcast_multi") cleanupStream(nc, streamName) // Create Node A nodeANatsBus, err := aether.NewNATSEventBus(nc) require.NoError(t, err) defer nodeANatsBus.Stop() nodeAStore, err := NewJetStreamEventStoreWithBroadcaster( nc, streamName, nodeANatsBus, "tenant-multi", ) require.NoError(t, err) // Create Node B nodeBNatsBus, err := aether.NewNATSEventBus(nc) require.NoError(t, err) defer nodeBNatsBus.Stop() nodeBStore, err := NewJetStreamEventStoreWithBroadcaster( nc, streamName, nodeBNatsBus, "tenant-multi", ) require.NoError(t, err) // Node A subscribes nodeAEventCh := nodeANatsBus.Subscribe("tenant-multi") // Node B subscribes nodeBEventCh := nodeBNatsBus.Subscribe("tenant-multi") // Node A saves event eventA := &aether.Event{ ID: "event-node-a", EventType: "TestEvent", ActorID: "multi-actor", Version: 1, Data: map[string]interface{}{"node": "a"}, Timestamp: time.Now(), } err = nodeAStore.SaveEvent(eventA) require.NoError(t, err) log.Printf("Node A saved: %s", eventA.ID) // Node B receives event (EventStored wrapper or original) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() select { case received := <-nodeBEventCh: // Check actor and version match assert.Equal(t, "multi-actor", received.ActorID) assert.Equal(t, int64(1), received.Version) if received.EventType == aether.EventTypeEventStored { // EventStored wrapper assert.Equal(t, "multi-actor", received.Data["actorId"].(string)) } else { // Original event assert.Equal(t, "a", received.Data["node"]) } log.Printf("Node B received: %s (actor: %s, version: %d)", received.ID, received.ActorID, received.Version) case <-ctx.Done(): t.Fatal("Node B did not receive event") } // Give Node B time to receive and process Node A's event time.Sleep(100 * time.Millisecond) // Node B saves event with different actor eventB := &aether.Event{ ID: "event-node-b", EventType: "TestEvent", ActorID: "multi-actor-b", // Different actor Version: 1, Data: map[string]interface{}{"node": "b"}, Timestamp: time.Now(), } err = nodeBStore.SaveEvent(eventB) require.NoError(t, err) log.Printf("Node B saved: %s", eventB.ID) // Node A receives Node B's event (could be EventStored or original) ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) defer cancel2() // Wait for either EventStored or the actual event received := false for !received { select { case event := <-nodeAEventCh: // Check for Node B's event if event.EventType == aether.EventTypeEventStored { // EventStored wrapper - check actorId actorID := event.Data["actorId"].(string) if actorID == "multi-actor-b" { received = true log.Printf("Node A received EventStored for Node B's event: %s", event.ID) } } else if event.ActorID == "multi-actor-b" { received = true log.Printf("Node A received Node B's event: %s", event.ID) } // Keep listening until we get Node B's event case <-ctx2.Done(): if !received { t.Fatal("Node A did not receive Node B's event") } } } } // TestUpdateVersionCache tests the version cache update logic. func TestUpdateVersionCache(t *testing.T) { nc, err := nats.Connect(nats.DefaultURL) require.NoError(t, err) defer nc.Close() streamName := generateStreamName("cache_test") cleanupStream(nc, streamName) store, err := NewJetStreamEventStore(nc, streamName) require.NoError(t, err) // Save event version 1 event1 := &aether.Event{ ID: "event-1", EventType: "TestEvent", ActorID: "actor-1", Version: 1, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event1) require.NoError(t, err) // EventStored will trigger UpdateVersionCache time.Sleep(100 * time.Millisecond) // Save event version 2 - should succeed (version > 1) event2 := &aether.Event{ ID: "event-2", EventType: "TestEvent", ActorID: "actor-1", Version: 2, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event2) require.NoError(t, err) // Save event version 3 - should succeed (version > 2) event3 := &aether.Event{ ID: "event-3", EventType: "TestEvent", ActorID: "actor-1", Version: 3, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event3) require.NoError(t, err) // Verify all events can be retrieved events, err := store.GetEvents("actor-1", 0) require.NoError(t, err) assert.Len(t, events, 3) // Verify latest version latest, err := store.GetLatestVersion("actor-1") require.NoError(t, err) assert.Equal(t, int64(3), latest) // Manually update cache with version 5 (simulating external update) store.UpdateVersionCache("actor-1", 5) // Verify version 4 would conflict (4 < cached 5) event4 := &aether.Event{ ID: "event-4", EventType: "TestEvent", ActorID: "actor-1", Version: 4, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event4) assert.Error(t, err, "version 4 should conflict with cached version 5") // Version 6 should succeed (6 > 5) event6 := &aether.Event{ ID: "event-6", EventType: "TestEvent", ActorID: "actor-1", Version: 6, Data: map[string]interface{}{}, Timestamp: time.Now(), } err = store.SaveEvent(event6) require.NoError(t, err) // Verify version 6 was saved latest, err = store.GetLatestVersion("actor-1") require.NoError(t, err) assert.Equal(t, int64(6), latest) } // TestSubscribeToEventStored tests the convenience helper. func TestSubscribeToEventStored(t *testing.T) { nc, err := nats.Connect(nats.DefaultURL) require.NoError(t, err) defer nc.Close() natsBus, err := aether.NewNATSEventBus(nc) require.NoError(t, err) defer natsBus.Stop() // Use helper eventStoredCh := natsBus.SubscribeToEventStored("test-store") // Verify channel is created assert.NotNil(t, eventStoredCh) // Publish EventStored manually (version will be float64 from JSON) eventStored := &aether.Event{ ID: "stored-1", EventType: aether.EventTypeEventStored, ActorID: "actor-1", Version: 1, Data: map[string]interface{}{ "actorId": "actor-1", "version": 1.0, // Use float64 to match JSON encoding }, Timestamp: time.Now(), } natsBus.Publish("test-store", eventStored) // Should receive the EventStored ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() select { case received := <-eventStoredCh: assert.Equal(t, aether.EventTypeEventStored, received.EventType) assert.Equal(t, "actor-1", received.ActorID) log.Printf("Received EventStored: %s", received.ID) case <-ctx.Done(): t.Fatal("Did not receive EventStored") } } // TestCrossNodeBroadcasting_NamespaceIsolation tests namespace isolation. func TestCrossNodeBroadcasting_NamespaceIsolation(t *testing.T) { nc, err := nats.Connect(nats.DefaultURL) require.NoError(t, err) defer nc.Close() streamName := generateStreamName("namespace_isolation") cleanupStream(nc, streamName) // Create stores with different namespaces storeA, err := NewJetStreamEventStoreWithBroadcaster( nc, streamName, nil, "tenant-a", ) require.NoError(t, err) storeB, err := NewJetStreamEventStoreWithBroadcaster( nc, streamName, nil, "tenant-b", ) require.NoError(t, err) // Save to each namespace eventA := &aether.Event{ ID: "event-a", EventType: "TestEvent", ActorID: "actor-a", Version: 1, Data: map[string]interface{}{"tenant": "a"}, Timestamp: time.Now(), } err = storeA.SaveEvent(eventA) require.NoError(t, err) eventB := &aether.Event{ ID: "event-b", EventType: "TestEvent", ActorID: "actor-b", Version: 1, Data: map[string]interface{}{"tenant": "b"}, Timestamp: time.Now(), } err = storeB.SaveEvent(eventB) require.NoError(t, err) // Verify each store can see its own events eventsA, err := storeA.GetEvents("actor-a", 0) require.NoError(t, err) assert.Len(t, eventsA, 1) assert.Equal(t, "a", eventsA[0].Data["tenant"]) eventsB, err := storeB.GetEvents("actor-b", 0) require.NoError(t, err) assert.Len(t, eventsB, 1) assert.Equal(t, "b", eventsB[0].Data["tenant"]) }