From 18ea677585b7a5be544c6b4e2fe3a8b7597f3cf7 Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Sun, 11 Jan 2026 00:09:44 +0100 Subject: [PATCH] Fix flaky NATSEventBus integration tests The integration tests had timing issues causing intermittent failures on CI: - TestNATSEventBus_HighThroughput: Added subscriber readiness synchronization using a barrier event before bulk publishing. This ensures the NATS subscription is fully established before events are sent rapidly. Extended timeout from 30s to 60s for CI environments. - TestNATSEventBus_EventOrdering: Added readiness barrier event to synchronize subscriber setup before publishing ordered events. Extended timeout from 10s to 15s to account for CI timing variations. - TestNATSEventBus_ConcurrentPublishSubscribe: Added readiness synchronization before concurrent publishers start. Extended timeout from 10s to 30s to handle the increased load under CI constraints. Root causes: - Subscriber channels were not fully ready to receive when bulk publishing started, causing message loss - CI runners (especially ARM64) have different timing characteristics than local development - Insufficient timeouts for high-volume event collection under shared CI resources The fixes use a barrier pattern: publish a ready signal, wait to receive it, then proceed with the test. This is more reliable than fixed sleep durations. Closes #57 Co-Authored-By: Claude Opus 4.5 --- nats_eventbus_integration_test.go | 66 +++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 8 deletions(-) diff --git a/nats_eventbus_integration_test.go b/nats_eventbus_integration_test.go index 55434eb..84edf31 100644 --- a/nats_eventbus_integration_test.go +++ b/nats_eventbus_integration_test.go @@ -281,7 +281,25 @@ func TestNATSEventBus_HighThroughput(t *testing.T) { ch := bus.Subscribe(namespace) defer bus.Unsubscribe(namespace, ch) - time.Sleep(100 * time.Millisecond) + // Synchronize subscriber readiness using a barrier event. + // This ensures the NATS subscription is fully established before bulk publishing. + readyEvent := &Event{ + ID: "ready-signal", + EventType: "ReadySignal", + ActorID: "actor-throughput", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, readyEvent) + + // Wait for the ready signal to be received + select { + case <-ch: + // Subscriber is ready, proceed with bulk publishing + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for subscriber readiness signal") + } // Publish many events rapidly start := time.Now() @@ -298,10 +316,10 @@ func TestNATSEventBus_HighThroughput(t *testing.T) { } publishDuration := time.Since(start) - // Receive events with timeout + // Receive events with extended timeout for CI environment receivedCount := 0 receiveStart := time.Now() - timeout := time.After(30 * time.Second) + timeout := time.After(60 * time.Second) loop: for receivedCount < numEvents { @@ -339,7 +357,23 @@ func TestNATSEventBus_EventOrdering(t *testing.T) { ch := bus.Subscribe(namespace) defer bus.Unsubscribe(namespace, ch) - time.Sleep(100 * time.Millisecond) + // Synchronize subscriber readiness with barrier event + readyEvent := &Event{ + ID: "ordering-ready", + EventType: "ReadySignal", + ActorID: "actor-ordering", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, readyEvent) + + select { + case <-ch: + // Subscriber is ready + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for ordering readiness signal") + } // Publish events with sequence numbers for i := 0; i < numEvents; i++ { @@ -356,7 +390,7 @@ func TestNATSEventBus_EventOrdering(t *testing.T) { // Receive and verify ordering received := make([]*Event, 0, numEvents) - timeout := time.After(10 * time.Second) + timeout := time.After(15 * time.Second) loop: for len(received) < numEvents { @@ -487,7 +521,23 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { ch := bus.Subscribe(namespace) defer bus.Unsubscribe(namespace, ch) - time.Sleep(100 * time.Millisecond) + // Synchronize publisher readiness + readyEvent := &Event{ + ID: "concurrent-ready", + EventType: "ReadySignal", + ActorID: "coordinator", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, readyEvent) + + select { + case <-ch: + // Subscriber is ready + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for concurrent readiness signal") + } var wg sync.WaitGroup wg.Add(numPublishers) @@ -513,9 +563,9 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { // Wait for all publishers to finish wg.Wait() - // Count received events + // Count received events with extended timeout for CI environment receivedCount := 0 - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) loop: for receivedCount < totalExpected { -- 2.49.1