diff --git a/nats_eventbus_integration_test.go b/nats_eventbus_integration_test.go index 61528b6..911f257 100644 --- a/nats_eventbus_integration_test.go +++ b/nats_eventbus_integration_test.go @@ -302,6 +302,26 @@ func TestNATSEventBus_HighThroughput(t *testing.T) { t.Fatal("timeout waiting for subscriber readiness signal") } + // Start receiving events BEFORE publishing to avoid buffer overflow. + // The EventBus has a 100-event buffer and uses non-blocking sends, + // so we must consume events concurrently to avoid drops. + var receivedCount int32 + receiveStart := time.Now() + done := make(chan struct{}) + + go func() { + defer close(done) + timeout := time.After(60 * time.Second) + for atomic.LoadInt32(&receivedCount) < int32(numEvents) { + select { + case <-ch: + atomic.AddInt32(&receivedCount, 1) + case <-timeout: + return + } + } + }() + // Publish many events rapidly start := time.Now() for i := 0; i < numEvents; i++ { @@ -317,27 +337,16 @@ func TestNATSEventBus_HighThroughput(t *testing.T) { } publishDuration := time.Since(start) - // Receive events with extended timeout for CI environment - receivedCount := 0 - receiveStart := time.Now() - timeout := time.After(60 * time.Second) - -loop: - for receivedCount < numEvents { - select { - case <-ch: - receivedCount++ - case <-timeout: - break loop - } - } + // Wait for receiver to finish + <-done receiveDuration := time.Since(receiveStart) + finalCount := atomic.LoadInt32(&receivedCount) t.Logf("Published %d events in %v (%.0f events/sec)", numEvents, publishDuration, float64(numEvents)/publishDuration.Seconds()) - t.Logf("Received %d events in %v (%.0f events/sec)", receivedCount, receiveDuration, float64(receivedCount)/receiveDuration.Seconds()) + t.Logf("Received %d events in %v (%.0f events/sec)", finalCount, receiveDuration, float64(finalCount)/receiveDuration.Seconds()) - if receivedCount != numEvents { - t.Errorf("expected %d events, received %d", numEvents, receivedCount) + if finalCount != int32(numEvents) { + t.Errorf("expected %d events, received %d", numEvents, finalCount) } } @@ -410,13 +419,19 @@ loop: // Verify ordering for i, e := range received { expectedSeq := i - actualSeq, ok := e.Data["sequence"].(float64) // JSON numbers decode as float64 - if !ok { - t.Errorf("event %d: sequence not found or wrong type", i) + // Handle both int (local delivery) and float64 (JSON/NATS delivery) types + var actualSeq int + switch v := e.Data["sequence"].(type) { + case int: + actualSeq = v + case float64: + actualSeq = int(v) + default: + t.Errorf("event %d: sequence not found or wrong type: %T", i, e.Data["sequence"]) continue } - if int(actualSeq) != expectedSeq { - t.Errorf("event %d: sequence mismatch, got %d, want %d", i, int(actualSeq), expectedSeq) + if actualSeq != expectedSeq { + t.Errorf("event %d: sequence mismatch, got %d, want %d", i, actualSeq, expectedSeq) } } } @@ -540,6 +555,24 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { t.Fatal("timeout waiting for concurrent readiness signal") } + // Start receiving events BEFORE publishing to avoid buffer overflow. + // The EventBus has a 100-event buffer and uses non-blocking sends. + var receivedCount int32 + done := make(chan struct{}) + + go func() { + defer close(done) + timeout := time.After(30 * time.Second) + for atomic.LoadInt32(&receivedCount) < int32(totalExpected) { + select { + case <-ch: + atomic.AddInt32(&receivedCount, 1) + case <-timeout: + return + } + } + }() + var wg sync.WaitGroup wg.Add(numPublishers) @@ -564,22 +597,12 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) { // Wait for all publishers to finish wg.Wait() - // Count received events with extended timeout for CI environment - receivedCount := 0 - timeout := time.After(30 * time.Second) + // Wait for receiver to finish + <-done + finalCount := atomic.LoadInt32(&receivedCount) -loop: - for receivedCount < totalExpected { - select { - case <-ch: - receivedCount++ - case <-timeout: - break loop - } - } - - if receivedCount != totalExpected { - t.Errorf("expected %d events, received %d", totalExpected, receivedCount) + if finalCount != int32(totalExpected) { + t.Errorf("expected %d events, received %d", totalExpected, finalCount) } }