diff --git a/nats_eventbus_integration_test.go b/nats_eventbus_integration_test.go new file mode 100644 index 0000000..55434eb --- /dev/null +++ b/nats_eventbus_integration_test.go @@ -0,0 +1,1205 @@ +// +build integration + +package aether + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/nats-io/nats.go" +) + +// These integration tests require a running NATS server with JetStream enabled. +// Run with: go test -tags=integration -v ./... +// +// To start NATS with JetStream: nats-server -js + +// getNATSConnection creates a new NATS connection for testing. +// Returns nil if NATS is not available, allowing tests to skip gracefully. +func getNATSConnection(t *testing.T) *nats.Conn { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Skipf("NATS not available: %v (run 'nats-server -js' to enable integration tests)", err) + return nil + } + return nc +} + +// TestNATSEventBus_CrossNodeEventDelivery tests that events are delivered across +// simulated nodes (multiple NATSEventBus instances sharing the same NATS connection). +func TestNATSEventBus_CrossNodeEventDelivery(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + // Create two "nodes" (separate NATSEventBus instances) + node1, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create node1: %v", err) + } + defer node1.Stop() + + node2, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create node2: %v", err) + } + defer node2.Stop() + + namespace := fmt.Sprintf("cross-node-test-%d", time.Now().UnixNano()) + + // Subscribe on node2 before publishing from node1 + ch2 := node2.Subscribe(namespace) + defer node2.Unsubscribe(namespace, ch2) + + // Give NATS time to set up subscription + time.Sleep(100 * time.Millisecond) + + // Publish event from node1 + event := &Event{ + ID: "evt-cross-node", + EventType: "CrossNodeTest", + ActorID: "actor-123", + Version: 1, + Data: map[string]interface{}{"source": "node1"}, + Timestamp: time.Now(), + } + node1.Publish(namespace, event) + + // Wait for event on node2 + select { + case received := <-ch2: + if received.ID != event.ID { + t.Errorf("event ID mismatch: got %q, want %q", received.ID, event.ID) + } + if received.EventType != event.EventType { + t.Errorf("event type mismatch: got %q, want %q", received.EventType, event.EventType) + } + if received.ActorID != event.ActorID { + t.Errorf("actor ID mismatch: got %q, want %q", received.ActorID, event.ActorID) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for cross-node event delivery") + } +} + +// TestNATSEventBus_NamespaceIsolation tests that events in one namespace +// are not received by subscribers in other namespaces. +func TestNATSEventBus_NamespaceIsolation(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + uniqueID := time.Now().UnixNano() + namespace1 := fmt.Sprintf("namespace-a-%d", uniqueID) + namespace2 := fmt.Sprintf("namespace-b-%d", uniqueID) + + // Subscribe to both namespaces + ch1 := bus.Subscribe(namespace1) + ch2 := bus.Subscribe(namespace2) + defer bus.Unsubscribe(namespace1, ch1) + defer bus.Unsubscribe(namespace2, ch2) + + // Give NATS time to set up subscriptions + time.Sleep(100 * time.Millisecond) + + // Publish event to namespace1 + event1 := &Event{ + ID: "evt-ns1", + EventType: "Namespace1Event", + ActorID: "actor-ns1", + Version: 1, + Data: map[string]interface{}{"namespace": "1"}, + Timestamp: time.Now(), + } + bus.Publish(namespace1, event1) + + // Publish event to namespace2 + event2 := &Event{ + ID: "evt-ns2", + EventType: "Namespace2Event", + ActorID: "actor-ns2", + Version: 1, + Data: map[string]interface{}{"namespace": "2"}, + Timestamp: time.Now(), + } + bus.Publish(namespace2, event2) + + // Collect events with timeout + received1 := make([]*Event, 0) + received2 := make([]*Event, 0) + + timeout := time.After(2 * time.Second) + for { + select { + case e := <-ch1: + received1 = append(received1, e) + case e := <-ch2: + received2 = append(received2, e) + case <-timeout: + goto done + } + } +done: + + // Verify namespace1 only received namespace1 event + if len(received1) != 1 { + t.Errorf("namespace1 expected 1 event, got %d", len(received1)) + } else if received1[0].ID != "evt-ns1" { + t.Errorf("namespace1 received wrong event: %s", received1[0].ID) + } + + // Verify namespace2 only received namespace2 event + if len(received2) != 1 { + t.Errorf("namespace2 expected 1 event, got %d", len(received2)) + } else if received2[0].ID != "evt-ns2" { + t.Errorf("namespace2 received wrong event: %s", received2[0].ID) + } +} + +// TestNATSEventBus_MultipleConnectionsNamespaceIsolation tests namespace isolation +// when using separate NATS connections (more realistic distributed scenario). +func TestNATSEventBus_MultipleConnectionsNamespaceIsolation(t *testing.T) { + // Create separate NATS connections + nc1, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Skipf("NATS not available: %v", err) + } + defer nc1.Close() + + nc2, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Fatalf("failed to create second connection: %v", err) + } + defer nc2.Close() + + // Create buses on different connections + bus1, err := NewNATSEventBus(nc1) + if err != nil { + t.Fatalf("failed to create bus1: %v", err) + } + defer bus1.Stop() + + bus2, err := NewNATSEventBus(nc2) + if err != nil { + t.Fatalf("failed to create bus2: %v", err) + } + defer bus2.Stop() + + uniqueID := time.Now().UnixNano() + namespaceA := fmt.Sprintf("tenant-a-%d", uniqueID) + namespaceB := fmt.Sprintf("tenant-b-%d", uniqueID) + + // bus1 subscribes to namespaceA + chA := bus1.Subscribe(namespaceA) + defer bus1.Unsubscribe(namespaceA, chA) + + // bus2 subscribes to namespaceB + chB := bus2.Subscribe(namespaceB) + defer bus2.Unsubscribe(namespaceB, chB) + + time.Sleep(100 * time.Millisecond) + + // Publish events + eventA := &Event{ + ID: "evt-tenant-a", + EventType: "TenantAEvent", + ActorID: "actor-a", + Version: 1, + Data: map[string]interface{}{"tenant": "a"}, + Timestamp: time.Now(), + } + bus1.Publish(namespaceA, eventA) + + eventB := &Event{ + ID: "evt-tenant-b", + EventType: "TenantBEvent", + ActorID: "actor-b", + Version: 1, + Data: map[string]interface{}{"tenant": "b"}, + Timestamp: time.Now(), + } + bus2.Publish(namespaceB, eventB) + + // Verify isolation + receivedA := false + receivedB := false + crossTalk := false + + timeout := time.After(2 * time.Second) +loop: + for { + select { + case e := <-chA: + if e.ID == "evt-tenant-a" { + receivedA = true + } else if e.ID == "evt-tenant-b" { + crossTalk = true + } + case e := <-chB: + if e.ID == "evt-tenant-b" { + receivedB = true + } else if e.ID == "evt-tenant-a" { + crossTalk = true + } + case <-timeout: + break loop + } + } + + if !receivedA { + t.Error("namespaceA did not receive its event") + } + if !receivedB { + t.Error("namespaceB did not receive its event") + } + if crossTalk { + t.Error("cross-namespace leakage detected!") + } +} + +// TestNATSEventBus_HighThroughput tests handling of many events in rapid succession. +func TestNATSEventBus_HighThroughput(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("high-throughput-%d", time.Now().UnixNano()) + numEvents := 1000 + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + // Publish many events rapidly + start := time.Now() + for i := 0; i < numEvents; i++ { + event := &Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "HighThroughputEvent", + ActorID: "actor-throughput", + Version: int64(i + 1), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event) + } + publishDuration := time.Since(start) + + // Receive events with timeout + receivedCount := 0 + receiveStart := time.Now() + timeout := time.After(30 * time.Second) + +loop: + for receivedCount < numEvents { + select { + case <-ch: + receivedCount++ + case <-timeout: + break loop + } + } + receiveDuration := time.Since(receiveStart) + + t.Logf("Published %d events in %v (%.0f events/sec)", numEvents, publishDuration, float64(numEvents)/publishDuration.Seconds()) + t.Logf("Received %d events in %v (%.0f events/sec)", receivedCount, receiveDuration, float64(receivedCount)/receiveDuration.Seconds()) + + if receivedCount != numEvents { + t.Errorf("expected %d events, received %d", numEvents, receivedCount) + } +} + +// TestNATSEventBus_EventOrdering tests that events are received in the order they were published. +func TestNATSEventBus_EventOrdering(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("ordering-%d", time.Now().UnixNano()) + numEvents := 100 + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + // Publish events with sequence numbers + for i := 0; i < numEvents; i++ { + event := &Event{ + ID: fmt.Sprintf("evt-%d", i), + EventType: "OrderingTest", + ActorID: "actor-ordering", + Version: int64(i + 1), + Data: map[string]interface{}{"sequence": i}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event) + } + + // Receive and verify ordering + received := make([]*Event, 0, numEvents) + timeout := time.After(10 * time.Second) + +loop: + for len(received) < numEvents { + select { + case e := <-ch: + received = append(received, e) + case <-timeout: + break loop + } + } + + if len(received) != numEvents { + t.Fatalf("expected %d events, got %d", numEvents, len(received)) + } + + // Verify ordering + for i, e := range received { + expectedSeq := i + actualSeq, ok := e.Data["sequence"].(float64) // JSON numbers decode as float64 + if !ok { + t.Errorf("event %d: sequence not found or wrong type", i) + continue + } + if int(actualSeq) != expectedSeq { + t.Errorf("event %d: sequence mismatch, got %d, want %d", i, int(actualSeq), expectedSeq) + } + } +} + +// TestNATSEventBus_NoCrossNamespaceLeakage performs explicit cross-namespace leakage testing. +func TestNATSEventBus_NoCrossNamespaceLeakage(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + uniqueID := time.Now().UnixNano() + namespaces := []string{ + fmt.Sprintf("ns-alpha-%d", uniqueID), + fmt.Sprintf("ns-beta-%d", uniqueID), + fmt.Sprintf("ns-gamma-%d", uniqueID), + } + + // Subscribe to all namespaces + channels := make(map[string]<-chan *Event) + for _, ns := range namespaces { + ch := bus.Subscribe(ns) + channels[ns] = ch + defer bus.Unsubscribe(ns, ch) + } + + time.Sleep(150 * time.Millisecond) + + // Publish unique events to each namespace + for i, ns := range namespaces { + event := &Event{ + ID: fmt.Sprintf("evt-%s-%d", ns, i), + EventType: "LeakageTest", + ActorID: fmt.Sprintf("actor-%s", ns), + Version: 1, + Data: map[string]interface{}{"namespace": ns}, + Timestamp: time.Now(), + } + bus.Publish(ns, event) + } + + // Track received events per namespace + receivedEvents := make(map[string][]*Event) + for _, ns := range namespaces { + receivedEvents[ns] = make([]*Event, 0) + } + + // Collect events with timeout + timeout := time.After(3 * time.Second) +collecting: + for { + select { + case e := <-channels[namespaces[0]]: + receivedEvents[namespaces[0]] = append(receivedEvents[namespaces[0]], e) + case e := <-channels[namespaces[1]]: + receivedEvents[namespaces[1]] = append(receivedEvents[namespaces[1]], e) + case e := <-channels[namespaces[2]]: + receivedEvents[namespaces[2]] = append(receivedEvents[namespaces[2]], e) + case <-timeout: + break collecting + } + } + + // Verify each namespace only received its own events + for _, ns := range namespaces { + events := receivedEvents[ns] + if len(events) != 1 { + t.Errorf("namespace %s: expected 1 event, got %d", ns, len(events)) + continue + } + + expectedNS, ok := events[0].Data["namespace"].(string) + if !ok { + t.Errorf("namespace %s: could not read event namespace", ns) + continue + } + if expectedNS != ns { + t.Errorf("namespace %s: received event from namespace %s (LEAKAGE!)", ns, expectedNS) + } + } +} + +// TestNATSEventBus_ConcurrentPublishSubscribe tests concurrent publish and subscribe operations. +func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("concurrent-%d", time.Now().UnixNano()) + numPublishers := 10 + numEventsPerPublisher := 50 + totalExpected := numPublishers * numEventsPerPublisher + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + var wg sync.WaitGroup + wg.Add(numPublishers) + + // Start concurrent publishers + for p := 0; p < numPublishers; p++ { + go func(publisherID int) { + defer wg.Done() + for i := 0; i < numEventsPerPublisher; i++ { + event := &Event{ + ID: fmt.Sprintf("evt-%d-%d", publisherID, i), + EventType: "ConcurrentEvent", + ActorID: fmt.Sprintf("actor-%d", publisherID), + Version: int64(i + 1), + Data: map[string]interface{}{"publisher": publisherID, "index": i}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event) + } + }(p) + } + + // Wait for all publishers to finish + wg.Wait() + + // Count received events + receivedCount := 0 + timeout := time.After(10 * time.Second) + +loop: + for receivedCount < totalExpected { + select { + case <-ch: + receivedCount++ + case <-timeout: + break loop + } + } + + if receivedCount != totalExpected { + t.Errorf("expected %d events, received %d", totalExpected, receivedCount) + } +} + +// TestNATSEventBus_MultipleSubscribersSameNamespace tests that multiple subscribers +// to the same namespace all receive the same events. +func TestNATSEventBus_MultipleSubscribersSameNamespace(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("multi-sub-%d", time.Now().UnixNano()) + numSubscribers := 5 + numEvents := 20 + + // Create multiple subscribers + channels := make([]<-chan *Event, numSubscribers) + for i := 0; i < numSubscribers; i++ { + ch := bus.Subscribe(namespace) + channels[i] = ch + defer bus.Unsubscribe(namespace, ch) + } + + time.Sleep(100 * time.Millisecond) + + // Publish events + for i := 0; i < numEvents; i++ { + event := &Event{ + ID: fmt.Sprintf("evt-multi-%d", i), + EventType: "MultiSubscriberEvent", + ActorID: "actor-multi", + Version: int64(i + 1), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event) + } + + // Count events received by each subscriber + counts := make([]int32, numSubscribers) + var wg sync.WaitGroup + wg.Add(numSubscribers) + + for i := 0; i < numSubscribers; i++ { + go func(idx int) { + defer wg.Done() + timeout := time.After(5 * time.Second) + for { + select { + case <-channels[idx]: + atomic.AddInt32(&counts[idx], 1) + if atomic.LoadInt32(&counts[idx]) >= int32(numEvents) { + return + } + case <-timeout: + return + } + } + }(i) + } + + wg.Wait() + + // Verify all subscribers received all events + for i, count := range counts { + if count != int32(numEvents) { + t.Errorf("subscriber %d: expected %d events, got %d", i, numEvents, count) + } + } +} + +// TestNATSEventBus_EventMetadataPreserved tests that event metadata is preserved +// across NATS serialization/deserialization. +func TestNATSEventBus_EventMetadataPreserved(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("metadata-%d", time.Now().UnixNano()) + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + // Create event with metadata + event := &Event{ + ID: "evt-with-metadata", + EventType: "MetadataTest", + ActorID: "actor-metadata", + Version: 1, + Data: map[string]interface{}{"test": "data"}, + Timestamp: time.Now(), + } + event.SetCorrelationID("corr-123") + event.SetCausationID("cause-456") + event.SetUserID("user-789") + event.SetTraceID("trace-abc") + event.SetSpanID("span-def") + event.SetMetadata("customKey", "customValue") + + bus.Publish(namespace, event) + + // Receive and verify metadata + select { + case received := <-ch: + if received.GetCorrelationID() != "corr-123" { + t.Errorf("correlationId mismatch: got %q", received.GetCorrelationID()) + } + if received.GetCausationID() != "cause-456" { + t.Errorf("causationId mismatch: got %q", received.GetCausationID()) + } + if received.GetUserID() != "user-789" { + t.Errorf("userId mismatch: got %q", received.GetUserID()) + } + if received.GetTraceID() != "trace-abc" { + t.Errorf("traceId mismatch: got %q", received.GetTraceID()) + } + if received.GetSpanID() != "span-def" { + t.Errorf("spanId mismatch: got %q", received.GetSpanID()) + } + if received.GetMetadata("customKey") != "customValue" { + t.Errorf("customKey mismatch: got %q", received.GetMetadata("customKey")) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for event") + } +} + +// TestNATSEventBus_LargeEventPayload tests handling of events with large data payloads. +func TestNATSEventBus_LargeEventPayload(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("large-payload-%d", time.Now().UnixNano()) + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + // Create a large payload (100KB) + largeString := make([]byte, 100*1024) + for i := range largeString { + largeString[i] = byte('a' + (i % 26)) + } + + event := &Event{ + ID: "evt-large", + EventType: "LargePayloadTest", + ActorID: "actor-large", + Version: 1, + Data: map[string]interface{}{"largeField": string(largeString)}, + Timestamp: time.Now(), + } + + bus.Publish(namespace, event) + + select { + case received := <-ch: + receivedPayload, ok := received.Data["largeField"].(string) + if !ok { + t.Fatal("largeField not found or wrong type") + } + if len(receivedPayload) != len(largeString) { + t.Errorf("payload size mismatch: got %d, want %d", len(receivedPayload), len(largeString)) + } + if receivedPayload != string(largeString) { + t.Error("payload content mismatch") + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for large event") + } +} + +// TestNATSEventBus_SubscribeUnsubscribe tests subscribe/unsubscribe lifecycle. +func TestNATSEventBus_SubscribeUnsubscribe(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("sub-unsub-%d", time.Now().UnixNano()) + + // Subscribe + ch := bus.Subscribe(namespace) + time.Sleep(100 * time.Millisecond) + + // Publish event - should be received + event1 := &Event{ + ID: "evt-before-unsub", + EventType: "SubUnsubTest", + ActorID: "actor-test", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event1) + + select { + case <-ch: + // Good, received event + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for event before unsubscribe") + } + + // Unsubscribe + bus.Unsubscribe(namespace, ch) + + // Re-subscribe with new channel + ch2 := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch2) + time.Sleep(100 * time.Millisecond) + + // Publish another event - should be received on new channel + event2 := &Event{ + ID: "evt-after-resub", + EventType: "SubUnsubTest", + ActorID: "actor-test", + Version: 2, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event2) + + select { + case e := <-ch2: + if e.ID != "evt-after-resub" { + t.Errorf("wrong event received: %s", e.ID) + } + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for event after resubscribe") + } +} + +// TestNATSEventBus_MultipleNodesMultipleNamespaces tests complex scenario with +// multiple nodes and multiple namespaces. +func TestNATSEventBus_MultipleNodesMultipleNamespaces(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + // Create multiple nodes + numNodes := 3 + nodes := make([]*NATSEventBus, numNodes) + for i := 0; i < numNodes; i++ { + node, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create node %d: %v", i, err) + } + nodes[i] = node + defer node.Stop() + } + + uniqueID := time.Now().UnixNano() + namespaces := []string{ + fmt.Sprintf("ns-multi-1-%d", uniqueID), + fmt.Sprintf("ns-multi-2-%d", uniqueID), + } + + // Each node subscribes to both namespaces + type subscription struct { + nodeIdx int + ns string + ch <-chan *Event + } + subs := make([]subscription, 0) + for i, node := range nodes { + for _, ns := range namespaces { + ch := node.Subscribe(ns) + subs = append(subs, subscription{nodeIdx: i, ns: ns, ch: ch}) + defer node.Unsubscribe(ns, ch) + } + } + + time.Sleep(150 * time.Millisecond) + + // Node 0 publishes to namespace 1 + event1 := &Event{ + ID: "evt-n0-ns1", + EventType: "MultiNodeMultiNS", + ActorID: "actor-test", + Version: 1, + Data: map[string]interface{}{"source": "node0", "namespace": namespaces[0]}, + Timestamp: time.Now(), + } + nodes[0].Publish(namespaces[0], event1) + + // Node 1 publishes to namespace 2 + event2 := &Event{ + ID: "evt-n1-ns2", + EventType: "MultiNodeMultiNS", + ActorID: "actor-test", + Version: 1, + Data: map[string]interface{}{"source": "node1", "namespace": namespaces[1]}, + Timestamp: time.Now(), + } + nodes[1].Publish(namespaces[1], event2) + + // Collect events + receivedEvents := make(map[string]map[int][]*Event) // namespace -> nodeIdx -> events + for _, ns := range namespaces { + receivedEvents[ns] = make(map[int][]*Event) + for i := 0; i < numNodes; i++ { + receivedEvents[ns][i] = make([]*Event, 0) + } + } + + timeout := time.After(5 * time.Second) +collecting: + for { + select { + case e := <-subs[0].ch: // node 0, ns 0 + receivedEvents[namespaces[0]][0] = append(receivedEvents[namespaces[0]][0], e) + case e := <-subs[1].ch: // node 0, ns 1 + receivedEvents[namespaces[1]][0] = append(receivedEvents[namespaces[1]][0], e) + case e := <-subs[2].ch: // node 1, ns 0 + receivedEvents[namespaces[0]][1] = append(receivedEvents[namespaces[0]][1], e) + case e := <-subs[3].ch: // node 1, ns 1 + receivedEvents[namespaces[1]][1] = append(receivedEvents[namespaces[1]][1], e) + case e := <-subs[4].ch: // node 2, ns 0 + receivedEvents[namespaces[0]][2] = append(receivedEvents[namespaces[0]][2], e) + case e := <-subs[5].ch: // node 2, ns 1 + receivedEvents[namespaces[1]][2] = append(receivedEvents[namespaces[1]][2], e) + case <-timeout: + break collecting + } + } + + // Verify: namespace 0 should have event from node 0 on all nodes (except node 0 which receives locally) + // Node 0 receives locally, nodes 1 and 2 receive via NATS + for nodeIdx := 0; nodeIdx < numNodes; nodeIdx++ { + events := receivedEvents[namespaces[0]][nodeIdx] + if len(events) != 1 { + t.Errorf("node %d, namespace %s: expected 1 event, got %d", nodeIdx, namespaces[0], len(events)) + } + } + + // Verify: namespace 1 should have event from node 1 on all nodes + for nodeIdx := 0; nodeIdx < numNodes; nodeIdx++ { + events := receivedEvents[namespaces[1]][nodeIdx] + if len(events) != 1 { + t.Errorf("node %d, namespace %s: expected 1 event, got %d", nodeIdx, namespaces[1], len(events)) + } + } +} + +// TestNATSEventBus_StopCleansUp tests that Stop properly cleans up resources. +func TestNATSEventBus_StopCleansUp(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + + namespace := fmt.Sprintf("stop-cleanup-%d", time.Now().UnixNano()) + + // Subscribe + ch := bus.Subscribe(namespace) + time.Sleep(100 * time.Millisecond) + + // Stop the bus + bus.Stop() + + // Verify channel is closed + select { + case _, ok := <-ch: + if ok { + t.Error("expected channel to be closed after Stop") + } + case <-time.After(1 * time.Second): + t.Error("channel was not closed after Stop") + } +} + +// BenchmarkNATSEventBus_Publish benchmarks event publishing throughput. +func BenchmarkNATSEventBus_Publish(b *testing.B) { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + b.Skipf("NATS not available: %v", err) + } + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + b.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("bench-%d", time.Now().UnixNano()) + event := &Event{ + ID: "evt-bench", + EventType: "BenchmarkEvent", + ActorID: "actor-bench", + Version: 1, + Data: map[string]interface{}{"key": "value"}, + Timestamp: time.Now(), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bus.Publish(namespace, event) + } +} + +// BenchmarkNATSEventBus_PublishReceive benchmarks end-to-end event delivery. +func BenchmarkNATSEventBus_PublishReceive(b *testing.B) { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + b.Skipf("NATS not available: %v", err) + } + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + b.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("bench-e2e-%d", time.Now().UnixNano()) + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + event := &Event{ + ID: "evt-bench", + EventType: "BenchmarkEvent", + ActorID: "actor-bench", + Version: 1, + Data: map[string]interface{}{"key": "value"}, + Timestamp: time.Now(), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + bus.Publish(namespace, event) + <-ch + } +} + +// TestNATSEventBus_ReconnectionBehavior tests that subscriptions continue to work +// after a NATS reconnection scenario. This test simulates the reconnection by +// creating a new connection and verifying the bus handles it gracefully. +// Note: True reconnection testing requires a NATS server restart which is complex +// in automated tests. This test verifies the basic resilience patterns. +func TestNATSEventBus_ReconnectionBehavior(t *testing.T) { + // Create initial connection + nc1, err := nats.Connect(nats.DefaultURL, + nats.ReconnectWait(100*time.Millisecond), + nats.MaxReconnects(5), + ) + if err != nil { + t.Skipf("NATS not available: %v", err) + } + defer nc1.Close() + + bus, err := NewNATSEventBus(nc1) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("reconnect-test-%d", time.Now().UnixNano()) + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + // Publish event before any issues + event1 := &Event{ + ID: "evt-before", + EventType: "ReconnectTest", + ActorID: "actor-test", + Version: 1, + Data: map[string]interface{}{"phase": "before"}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event1) + + // Verify event received + select { + case e := <-ch: + if e.ID != "evt-before" { + t.Errorf("unexpected event ID: %s", e.ID) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for event before reconnect test") + } + + // Simulate some delay (in real reconnect scenario, there would be a disconnect/reconnect) + time.Sleep(200 * time.Millisecond) + + // Publish event after delay - should still work + event2 := &Event{ + ID: "evt-after", + EventType: "ReconnectTest", + ActorID: "actor-test", + Version: 2, + Data: map[string]interface{}{"phase": "after"}, + Timestamp: time.Now(), + } + bus.Publish(namespace, event2) + + // Verify event received + select { + case e := <-ch: + if e.ID != "evt-after" { + t.Errorf("unexpected event ID: %s", e.ID) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for event after reconnect test") + } +} + +// TestNATSEventBus_ConnectionRecoveryWithNewSubscription tests that new subscriptions +// can be created and used after the bus has been running for a while. +func TestNATSEventBus_ConnectionRecoveryWithNewSubscription(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + uniqueID := time.Now().UnixNano() + namespace1 := fmt.Sprintf("recovery-ns1-%d", uniqueID) + namespace2 := fmt.Sprintf("recovery-ns2-%d", uniqueID) + + // First subscription + ch1 := bus.Subscribe(namespace1) + defer bus.Unsubscribe(namespace1, ch1) + + time.Sleep(100 * time.Millisecond) + + // Publish and receive on first namespace + event1 := &Event{ + ID: "evt-ns1", + EventType: "RecoveryTest", + ActorID: "actor-test", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace1, event1) + + select { + case <-ch1: + // Good + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for first event") + } + + // Wait a bit, then create new subscription (simulating late join) + time.Sleep(500 * time.Millisecond) + + ch2 := bus.Subscribe(namespace2) + defer bus.Unsubscribe(namespace2, ch2) + + time.Sleep(100 * time.Millisecond) + + // Publish and receive on second namespace + event2 := &Event{ + ID: "evt-ns2", + EventType: "RecoveryTest", + ActorID: "actor-test", + Version: 1, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace2, event2) + + select { + case e := <-ch2: + if e.ID != "evt-ns2" { + t.Errorf("wrong event: %s", e.ID) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for second namespace event") + } + + // Verify first namespace still works + event3 := &Event{ + ID: "evt-ns1-again", + EventType: "RecoveryTest", + ActorID: "actor-test", + Version: 2, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace1, event3) + + select { + case e := <-ch1: + if e.ID != "evt-ns1-again" { + t.Errorf("wrong event: %s", e.ID) + } + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for third event") + } +} + +// TestNATSEventBus_GracefulDegradation tests that the bus handles publish errors gracefully. +func TestNATSEventBus_GracefulDegradation(t *testing.T) { + nc := getNATSConnection(t) + defer nc.Close() + + bus, err := NewNATSEventBus(nc) + if err != nil { + t.Fatalf("failed to create bus: %v", err) + } + defer bus.Stop() + + namespace := fmt.Sprintf("graceful-%d", time.Now().UnixNano()) + + ch := bus.Subscribe(namespace) + defer bus.Unsubscribe(namespace, ch) + + time.Sleep(100 * time.Millisecond) + + // Publish many events rapidly - should handle without panic + for i := 0; i < 100; i++ { + event := &Event{ + ID: fmt.Sprintf("evt-graceful-%d", i), + EventType: "GracefulTest", + ActorID: "actor-test", + Version: int64(i + 1), + Data: map[string]interface{}{"index": i}, + Timestamp: time.Now(), + } + // Should not panic even under load + bus.Publish(namespace, event) + } + + // Drain events + received := 0 + timeout := time.After(10 * time.Second) +draining: + for received < 100 { + select { + case <-ch: + received++ + case <-timeout: + break draining + } + } + + if received < 90 { // Allow some tolerance for timing + t.Errorf("expected at least 90 events, got %d", received) + } +}