diff --git a/nats_eventbus_integration_test.go b/nats_eventbus_integration_test.go index 55434eb..84edf31 100644 --- a/nats_eventbus_integration_test.go +++ b/nats_eventbus_integration_test.go @@ -281,7 +281,25 @@ func TestNATSEventBus_HighThroughput(t *testing.T) { ch := bus.Subscribe(namespace) defer bus.Unsubscribe(namespace, ch) - time.Sleep(100 * time.Millisecond) + // Synchronize subscriber readiness using a barrier event. + // This ensures the NATS subscription is fully established before bulk publishing. + readyEvent := &Event{ + ID: "ready-signal", + EventType: "ReadySignal", + ActorID: "actor-throughput", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, readyEvent) + + // Wait for the ready signal to be received + select { + case <-ch: + // Subscriber is ready, proceed with bulk publishing + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for subscriber readiness signal") + } // Publish many events rapidly start := time.Now() @@ -298,10 +316,10 @@ func TestNATSEventBus_HighThroughput(t *testing.T) { } publishDuration := time.Since(start) - // Receive events with timeout + // Receive events with extended timeout for CI environment receivedCount := 0 receiveStart := time.Now() - timeout := time.After(30 * time.Second) + timeout := time.After(60 * time.Second) loop: for receivedCount < numEvents { @@ -339,7 +357,23 @@ func TestNATSEventBus_EventOrdering(t *testing.T) { ch := bus.Subscribe(namespace) defer bus.Unsubscribe(namespace, ch) - time.Sleep(100 * time.Millisecond) + // Synchronize subscriber readiness with barrier event + readyEvent := &Event{ + ID: "ordering-ready", + EventType: "ReadySignal", + ActorID: "actor-ordering", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, readyEvent) + + select { + case <-ch: + // Subscriber is ready + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for ordering readiness signal") + } // Publish events with sequence numbers for i := 0; i < numEvents; i++ { @@ -356,7 +390,7 @@ func TestNATSEventBus_EventOrdering(t *testing.T) { // Receive and verify ordering received := make([]*Event, 0, numEvents) - timeout := time.After(10 * time.Second) + timeout := time.After(15 * time.Second) loop: for len(received) < numEvents { @@ -487,7 +521,23 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { ch := bus.Subscribe(namespace) defer bus.Unsubscribe(namespace, ch) - time.Sleep(100 * time.Millisecond) + // Synchronize publisher readiness + readyEvent := &Event{ + ID: "concurrent-ready", + EventType: "ReadySignal", + ActorID: "coordinator", + Version: 0, + Data: map[string]interface{}{}, + Timestamp: time.Now(), + } + bus.Publish(namespace, readyEvent) + + select { + case <-ch: + // Subscriber is ready + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for concurrent readiness signal") + } var wg sync.WaitGroup wg.Add(numPublishers) @@ -513,9 +563,9 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { // Wait for all publishers to finish wg.Wait() - // Count received events + // Count received events with extended timeout for CI environment receivedCount := 0 - timeout := time.After(10 * time.Second) + timeout := time.After(30 * time.Second) loop: for receivedCount < totalExpected {