[Issue #57] Fix flaky NATSEventBus integration tests #58
@@ -281,7 +281,25 @@ func TestNATSEventBus_HighThroughput(t *testing.T) {
|
||||
ch := bus.Subscribe(namespace)
|
||||
defer bus.Unsubscribe(namespace, ch)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Synchronize subscriber readiness using a barrier event.
|
||||
// This ensures the NATS subscription is fully established before bulk publishing.
|
||||
readyEvent := &Event{
|
||||
ID: "ready-signal",
|
||||
EventType: "ReadySignal",
|
||||
ActorID: "actor-throughput",
|
||||
Version: 0,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
bus.Publish(namespace, readyEvent)
|
||||
|
||||
// Wait for the ready signal to be received
|
||||
select {
|
||||
case <-ch:
|
||||
// Subscriber is ready, proceed with bulk publishing
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for subscriber readiness signal")
|
||||
}
|
||||
|
||||
// Publish many events rapidly
|
||||
start := time.Now()
|
||||
@@ -298,10 +316,10 @@ func TestNATSEventBus_HighThroughput(t *testing.T) {
|
||||
}
|
||||
publishDuration := time.Since(start)
|
||||
|
||||
// Receive events with timeout
|
||||
// Receive events with extended timeout for CI environment
|
||||
receivedCount := 0
|
||||
receiveStart := time.Now()
|
||||
timeout := time.After(30 * time.Second)
|
||||
timeout := time.After(60 * time.Second)
|
||||
|
||||
loop:
|
||||
for receivedCount < numEvents {
|
||||
@@ -339,7 +357,23 @@ func TestNATSEventBus_EventOrdering(t *testing.T) {
|
||||
ch := bus.Subscribe(namespace)
|
||||
defer bus.Unsubscribe(namespace, ch)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Synchronize subscriber readiness with barrier event
|
||||
readyEvent := &Event{
|
||||
ID: "ordering-ready",
|
||||
EventType: "ReadySignal",
|
||||
ActorID: "actor-ordering",
|
||||
Version: 0,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
bus.Publish(namespace, readyEvent)
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
// Subscriber is ready
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for ordering readiness signal")
|
||||
}
|
||||
|
||||
// Publish events with sequence numbers
|
||||
for i := 0; i < numEvents; i++ {
|
||||
@@ -356,7 +390,7 @@ func TestNATSEventBus_EventOrdering(t *testing.T) {
|
||||
|
||||
// Receive and verify ordering
|
||||
received := make([]*Event, 0, numEvents)
|
||||
timeout := time.After(10 * time.Second)
|
||||
timeout := time.After(15 * time.Second)
|
||||
|
||||
loop:
|
||||
for len(received) < numEvents {
|
||||
@@ -487,7 +521,23 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) {
|
||||
ch := bus.Subscribe(namespace)
|
||||
defer bus.Unsubscribe(namespace, ch)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Synchronize publisher readiness
|
||||
readyEvent := &Event{
|
||||
ID: "concurrent-ready",
|
||||
EventType: "ReadySignal",
|
||||
ActorID: "coordinator",
|
||||
Version: 0,
|
||||
Data: map[string]interface{}{},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
bus.Publish(namespace, readyEvent)
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
// Subscriber is ready
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for concurrent readiness signal")
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numPublishers)
|
||||
@@ -513,9 +563,9 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) {
|
||||
// Wait for all publishers to finish
|
||||
wg.Wait()
|
||||
|
||||
// Count received events
|
||||
// Count received events with extended timeout for CI environment
|
||||
receivedCount := 0
|
||||
timeout := time.After(10 * time.Second)
|
||||
timeout := time.After(30 * time.Second)
|
||||
|
||||
loop:
|
||||
for receivedCount < totalExpected {
|
||||
|
||||
Reference in New Issue
Block a user