[Issue #57] Fix flaky NATSEventBus integration tests #58
@@ -281,7 +281,25 @@ func TestNATSEventBus_HighThroughput(t *testing.T) {
|
|||||||
ch := bus.Subscribe(namespace)
|
ch := bus.Subscribe(namespace)
|
||||||
defer bus.Unsubscribe(namespace, ch)
|
defer bus.Unsubscribe(namespace, ch)
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
// Synchronize subscriber readiness using a barrier event.
|
||||||
|
// This ensures the NATS subscription is fully established before bulk publishing.
|
||||||
|
readyEvent := &Event{
|
||||||
|
ID: "ready-signal",
|
||||||
|
EventType: "ReadySignal",
|
||||||
|
ActorID: "actor-throughput",
|
||||||
|
Version: 0,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
bus.Publish(namespace, readyEvent)
|
||||||
|
|
||||||
|
// Wait for the ready signal to be received
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
// Subscriber is ready, proceed with bulk publishing
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("timeout waiting for subscriber readiness signal")
|
||||||
|
}
|
||||||
|
|
||||||
// Publish many events rapidly
|
// Publish many events rapidly
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@@ -298,10 +316,10 @@ func TestNATSEventBus_HighThroughput(t *testing.T) {
|
|||||||
}
|
}
|
||||||
publishDuration := time.Since(start)
|
publishDuration := time.Since(start)
|
||||||
|
|
||||||
// Receive events with timeout
|
// Receive events with extended timeout for CI environment
|
||||||
receivedCount := 0
|
receivedCount := 0
|
||||||
receiveStart := time.Now()
|
receiveStart := time.Now()
|
||||||
timeout := time.After(30 * time.Second)
|
timeout := time.After(60 * time.Second)
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for receivedCount < numEvents {
|
for receivedCount < numEvents {
|
||||||
@@ -339,7 +357,23 @@ func TestNATSEventBus_EventOrdering(t *testing.T) {
|
|||||||
ch := bus.Subscribe(namespace)
|
ch := bus.Subscribe(namespace)
|
||||||
defer bus.Unsubscribe(namespace, ch)
|
defer bus.Unsubscribe(namespace, ch)
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
// Synchronize subscriber readiness with barrier event
|
||||||
|
readyEvent := &Event{
|
||||||
|
ID: "ordering-ready",
|
||||||
|
EventType: "ReadySignal",
|
||||||
|
ActorID: "actor-ordering",
|
||||||
|
Version: 0,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
bus.Publish(namespace, readyEvent)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
// Subscriber is ready
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("timeout waiting for ordering readiness signal")
|
||||||
|
}
|
||||||
|
|
||||||
// Publish events with sequence numbers
|
// Publish events with sequence numbers
|
||||||
for i := 0; i < numEvents; i++ {
|
for i := 0; i < numEvents; i++ {
|
||||||
@@ -356,7 +390,7 @@ func TestNATSEventBus_EventOrdering(t *testing.T) {
|
|||||||
|
|
||||||
// Receive and verify ordering
|
// Receive and verify ordering
|
||||||
received := make([]*Event, 0, numEvents)
|
received := make([]*Event, 0, numEvents)
|
||||||
timeout := time.After(10 * time.Second)
|
timeout := time.After(15 * time.Second)
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for len(received) < numEvents {
|
for len(received) < numEvents {
|
||||||
@@ -487,7 +521,23 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) {
|
|||||||
ch := bus.Subscribe(namespace)
|
ch := bus.Subscribe(namespace)
|
||||||
defer bus.Unsubscribe(namespace, ch)
|
defer bus.Unsubscribe(namespace, ch)
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
// Synchronize publisher readiness
|
||||||
|
readyEvent := &Event{
|
||||||
|
ID: "concurrent-ready",
|
||||||
|
EventType: "ReadySignal",
|
||||||
|
ActorID: "coordinator",
|
||||||
|
Version: 0,
|
||||||
|
Data: map[string]interface{}{},
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
bus.Publish(namespace, readyEvent)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
// Subscriber is ready
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("timeout waiting for concurrent readiness signal")
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(numPublishers)
|
wg.Add(numPublishers)
|
||||||
@@ -513,9 +563,9 @@ func TestNATSEventBus_ConcurrentPublishSubscribe(t *testing.T) {
|
|||||||
// Wait for all publishers to finish
|
// Wait for all publishers to finish
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// Count received events
|
// Count received events with extended timeout for CI environment
|
||||||
receivedCount := 0
|
receivedCount := 0
|
||||||
timeout := time.After(10 * time.Second)
|
timeout := time.After(30 * time.Second)
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for receivedCount < totalExpected {
|
for receivedCount < totalExpected {
|
||||||
|
|||||||
Reference in New Issue
Block a user