[Issue #17] Add unit tests for EventBus #53
697
eventbus_test.go
Normal file
697
eventbus_test.go
Normal file
@@ -0,0 +1,697 @@
|
||||
package aether
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEventBus_Subscribe(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
if ch == nil {
|
||||
t.Fatal("expected Subscribe to return a channel")
|
||||
}
|
||||
|
||||
// Verify subscriber count increased
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 1 {
|
||||
t.Errorf("expected 1 subscriber, got %d", count)
|
||||
}
|
||||
|
||||
// Subscribe again to the same namespace
|
||||
ch2 := eb.Subscribe("namespace-1")
|
||||
if ch2 == nil {
|
||||
t.Fatal("expected second Subscribe to return a channel")
|
||||
}
|
||||
if ch == ch2 {
|
||||
t.Error("expected different channels for separate subscriptions")
|
||||
}
|
||||
|
||||
// Verify subscriber count
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 2 {
|
||||
t.Errorf("expected 2 subscribers, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Subscribe_MultipleNamespaces(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch1 := eb.Subscribe("namespace-a")
|
||||
ch2 := eb.Subscribe("namespace-b")
|
||||
ch3 := eb.Subscribe("namespace-c")
|
||||
|
||||
if ch1 == nil || ch2 == nil || ch3 == nil {
|
||||
t.Fatal("expected all subscriptions to return channels")
|
||||
}
|
||||
|
||||
if eb.SubscriberCount("namespace-a") != 1 {
|
||||
t.Errorf("expected 1 subscriber for namespace-a")
|
||||
}
|
||||
if eb.SubscriberCount("namespace-b") != 1 {
|
||||
t.Errorf("expected 1 subscriber for namespace-b")
|
||||
}
|
||||
if eb.SubscriberCount("namespace-c") != 1 {
|
||||
t.Errorf("expected 1 subscriber for namespace-c")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Unsubscribe(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
// Verify subscription exists
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 1 {
|
||||
t.Errorf("expected 1 subscriber before unsubscribe, got %d", count)
|
||||
}
|
||||
|
||||
// Unsubscribe
|
||||
eb.Unsubscribe("namespace-1", ch)
|
||||
|
||||
// Verify subscription removed
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers after unsubscribe, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Unsubscribe_ChannelClosed(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
// Unsubscribe should close the channel
|
||||
eb.Unsubscribe("namespace-1", ch)
|
||||
|
||||
// Reading from closed channel should return zero value and false
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if ok {
|
||||
t.Error("expected channel to be closed after unsubscribe")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("expected immediate response from closed channel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Unsubscribe_NonexistentChannel(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
otherCh := make(chan *Event)
|
||||
|
||||
// Unsubscribe with a channel that was never subscribed
|
||||
eb.Unsubscribe("namespace-1", otherCh) // Should not panic
|
||||
|
||||
// Original subscription should still be there
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 1 {
|
||||
t.Errorf("expected 1 subscriber, got %d", count)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
eb.Unsubscribe("namespace-1", ch)
|
||||
}
|
||||
|
||||
func TestEventBus_Unsubscribe_WrongNamespace(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
// Unsubscribe from wrong namespace
|
||||
eb.Unsubscribe("namespace-2", ch) // Should not panic
|
||||
|
||||
// Original subscription should still be there
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 1 {
|
||||
t.Errorf("expected 1 subscriber, got %d", count)
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
eb.Unsubscribe("namespace-1", ch)
|
||||
}
|
||||
|
||||
func TestEventBus_Unsubscribe_MultipleSubscribers(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch1 := eb.Subscribe("namespace-1")
|
||||
ch2 := eb.Subscribe("namespace-1")
|
||||
ch3 := eb.Subscribe("namespace-1")
|
||||
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 3 {
|
||||
t.Errorf("expected 3 subscribers, got %d", count)
|
||||
}
|
||||
|
||||
// Unsubscribe the middle one
|
||||
eb.Unsubscribe("namespace-1", ch2)
|
||||
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 2 {
|
||||
t.Errorf("expected 2 subscribers after removing one, got %d", count)
|
||||
}
|
||||
|
||||
// Unsubscribe the first one
|
||||
eb.Unsubscribe("namespace-1", ch1)
|
||||
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 1 {
|
||||
t.Errorf("expected 1 subscriber after removing two, got %d", count)
|
||||
}
|
||||
|
||||
// Unsubscribe the last one
|
||||
eb.Unsubscribe("namespace-1", ch3)
|
||||
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers after removing all, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Publish(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
event := &Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
Data: map[string]interface{}{"key": "value"},
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
eb.Publish("namespace-1", event)
|
||||
|
||||
select {
|
||||
case received := <-ch:
|
||||
if received.ID != event.ID {
|
||||
t.Errorf("expected event ID %q, got %q", event.ID, received.ID)
|
||||
}
|
||||
if received.EventType != event.EventType {
|
||||
t.Errorf("expected event type %q, got %q", event.EventType, received.EventType)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("expected to receive event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Publish_MultipleEvents(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
events := []*Event{
|
||||
{ID: "evt-1", EventType: "Event1", ActorID: "actor-1", Version: 1},
|
||||
{ID: "evt-2", EventType: "Event2", ActorID: "actor-1", Version: 2},
|
||||
{ID: "evt-3", EventType: "Event3", ActorID: "actor-1", Version: 3},
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
eb.Publish("namespace-1", event)
|
||||
}
|
||||
|
||||
for i, expected := range events {
|
||||
select {
|
||||
case received := <-ch:
|
||||
if received.ID != expected.ID {
|
||||
t.Errorf("event %d: expected ID %q, got %q", i, expected.ID, received.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Errorf("event %d: timeout waiting for event", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Publish_NoSubscribers(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
event := &Event{
|
||||
ID: "evt-1",
|
||||
EventType: "TestEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
// Should not panic when publishing to namespace with no subscribers
|
||||
eb.Publish("namespace-1", event)
|
||||
}
|
||||
|
||||
func TestEventBus_NamespaceIsolation(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch1 := eb.Subscribe("namespace-1")
|
||||
ch2 := eb.Subscribe("namespace-2")
|
||||
ch3 := eb.Subscribe("namespace-3")
|
||||
|
||||
// Publish to namespace-2 only
|
||||
event := &Event{
|
||||
ID: "evt-ns2",
|
||||
EventType: "Namespace2Event",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
}
|
||||
eb.Publish("namespace-2", event)
|
||||
|
||||
// namespace-2 should receive the event
|
||||
select {
|
||||
case received := <-ch2:
|
||||
if received.ID != event.ID {
|
||||
t.Errorf("expected event ID %q, got %q", event.ID, received.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("namespace-2 should have received the event")
|
||||
}
|
||||
|
||||
// namespace-1 and namespace-3 should NOT receive the event
|
||||
select {
|
||||
case <-ch1:
|
||||
t.Error("namespace-1 should NOT have received an event")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected: no event received
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ch3:
|
||||
t.Error("namespace-3 should NOT have received an event")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected: no event received
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_NamespaceIsolation_PublishToMultiple(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
chA := eb.Subscribe("tenant-a")
|
||||
chB := eb.Subscribe("tenant-b")
|
||||
|
||||
eventA := &Event{ID: "evt-a", EventType: "EventA", ActorID: "actor-a", Version: 1}
|
||||
eventB := &Event{ID: "evt-b", EventType: "EventB", ActorID: "actor-b", Version: 1}
|
||||
|
||||
eb.Publish("tenant-a", eventA)
|
||||
eb.Publish("tenant-b", eventB)
|
||||
|
||||
// Verify tenant-a received only eventA
|
||||
select {
|
||||
case received := <-chA:
|
||||
if received.ID != "evt-a" {
|
||||
t.Errorf("tenant-a: expected evt-a, got %q", received.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("tenant-a: expected to receive event")
|
||||
}
|
||||
|
||||
// Verify tenant-b received only eventB
|
||||
select {
|
||||
case received := <-chB:
|
||||
if received.ID != "evt-b" {
|
||||
t.Errorf("tenant-b: expected evt-b, got %q", received.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("tenant-b: expected to receive event")
|
||||
}
|
||||
|
||||
// Ensure no cross-talk
|
||||
select {
|
||||
case <-chA:
|
||||
t.Error("tenant-a should not have received a second event")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
|
||||
select {
|
||||
case <-chB:
|
||||
t.Error("tenant-b should not have received a second event")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Stop(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
|
||||
ch1 := eb.Subscribe("namespace-1")
|
||||
ch2 := eb.Subscribe("namespace-2")
|
||||
ch3 := eb.Subscribe("namespace-1")
|
||||
|
||||
// Stop the event bus
|
||||
eb.Stop()
|
||||
|
||||
// All channels should be closed
|
||||
checkClosed := func(ch <-chan *Event, name string) {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if ok {
|
||||
t.Errorf("%s: expected channel to be closed", name)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Errorf("%s: expected immediate response from closed channel", name)
|
||||
}
|
||||
}
|
||||
|
||||
checkClosed(ch1, "ch1")
|
||||
checkClosed(ch2, "ch2")
|
||||
checkClosed(ch3, "ch3")
|
||||
|
||||
// Subscriber counts should be zero
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers for namespace-1, got %d", count)
|
||||
}
|
||||
if count := eb.SubscriberCount("namespace-2"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers for namespace-2, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_Stop_Empty(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
|
||||
// Stop with no subscriptions should not panic
|
||||
eb.Stop()
|
||||
}
|
||||
|
||||
func TestEventBus_MultipleSubscribers(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Create multiple subscribers in the same namespace
|
||||
const numSubscribers = 5
|
||||
channels := make([]<-chan *Event, numSubscribers)
|
||||
for i := 0; i < numSubscribers; i++ {
|
||||
channels[i] = eb.Subscribe("shared-namespace")
|
||||
}
|
||||
|
||||
if count := eb.SubscriberCount("shared-namespace"); count != numSubscribers {
|
||||
t.Errorf("expected %d subscribers, got %d", numSubscribers, count)
|
||||
}
|
||||
|
||||
// Publish an event
|
||||
event := &Event{
|
||||
ID: "broadcast-evt",
|
||||
EventType: "BroadcastEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
}
|
||||
eb.Publish("shared-namespace", event)
|
||||
|
||||
// All subscribers should receive the event
|
||||
for i, ch := range channels {
|
||||
select {
|
||||
case received := <-ch:
|
||||
if received.ID != event.ID {
|
||||
t.Errorf("subscriber %d: expected ID %q, got %q", i, event.ID, received.ID)
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Errorf("subscriber %d: timeout waiting for event", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_MultipleSubscribers_AllReceive(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch1 := eb.Subscribe("namespace-1")
|
||||
ch2 := eb.Subscribe("namespace-1")
|
||||
ch3 := eb.Subscribe("namespace-1")
|
||||
|
||||
event := &Event{
|
||||
ID: "evt-multi",
|
||||
EventType: "MultiEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
eb.Publish("namespace-1", event)
|
||||
|
||||
// All three should receive the same event
|
||||
received := make([]*Event, 0, 3)
|
||||
for _, ch := range []<-chan *Event{ch1, ch2, ch3} {
|
||||
select {
|
||||
case evt := <-ch:
|
||||
received = append(received, evt)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("expected all subscribers to receive the event")
|
||||
}
|
||||
}
|
||||
|
||||
if len(received) != 3 {
|
||||
t.Errorf("expected 3 events received, got %d", len(received))
|
||||
}
|
||||
|
||||
for i, evt := range received {
|
||||
if evt.ID != event.ID {
|
||||
t.Errorf("subscriber %d: expected ID %q, got %q", i, event.ID, evt.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_ConcurrentSubscribePublish(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
const numGoroutines = 10
|
||||
const numEvents = 100
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errors := make(chan error, numGoroutines*numEvents)
|
||||
|
||||
// Start subscribers
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
namespace := "concurrent-namespace"
|
||||
ch := eb.Subscribe(namespace)
|
||||
defer eb.Unsubscribe(namespace, ch)
|
||||
|
||||
receivedCount := 0
|
||||
timeout := time.After(5 * time.Second)
|
||||
for receivedCount < numEvents {
|
||||
select {
|
||||
case _, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
receivedCount++
|
||||
case <-timeout:
|
||||
errors <- nil // Timeout is acceptable in concurrent test
|
||||
return
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Give subscribers time to set up
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Publish events concurrently
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < numEvents/numGoroutines; j++ {
|
||||
event := &Event{
|
||||
ID: "concurrent-evt",
|
||||
EventType: "ConcurrentEvent",
|
||||
ActorID: "actor-1",
|
||||
Version: int64(id*numEvents + j),
|
||||
}
|
||||
eb.Publish("concurrent-namespace", event)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Check for errors
|
||||
close(errors)
|
||||
for err := range errors {
|
||||
if err != nil {
|
||||
t.Errorf("concurrent error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_ConcurrentSubscribeUnsubscribe(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
const numIterations = 100
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Concurrently subscribe and unsubscribe
|
||||
for i := 0; i < numIterations; i++ {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
namespace := "concurrent-sub-unsub"
|
||||
ch := eb.Subscribe(namespace)
|
||||
// Immediately unsubscribe
|
||||
eb.Unsubscribe(namespace, ch)
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// After all operations, subscriber count should be 0
|
||||
if count := eb.SubscriberCount("concurrent-sub-unsub"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers after all unsubscribes, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_ConcurrentPublish(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("concurrent-pub")
|
||||
const numPublishers = 10
|
||||
const numEventsPerPublisher = 5 // Keep total under buffer size (100)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start multiple publishers
|
||||
for i := 0; i < numPublishers; i++ {
|
||||
wg.Add(1)
|
||||
go func(pubID int) {
|
||||
defer wg.Done()
|
||||
for j := 0; j < numEventsPerPublisher; j++ {
|
||||
event := &Event{
|
||||
ID: "evt",
|
||||
EventType: "ConcurrentPub",
|
||||
ActorID: "actor-1",
|
||||
Version: int64(pubID*numEventsPerPublisher + j),
|
||||
}
|
||||
eb.Publish("concurrent-pub", event)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Drain the channel and count received events
|
||||
receivedCount := 0
|
||||
timeout := time.After(500 * time.Millisecond)
|
||||
drainLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
receivedCount++
|
||||
case <-timeout:
|
||||
break drainLoop
|
||||
}
|
||||
}
|
||||
|
||||
expectedTotal := numPublishers * numEventsPerPublisher
|
||||
if receivedCount != expectedTotal {
|
||||
t.Errorf("expected to receive %d events, got %d", expectedTotal, receivedCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscriberCount_EmptyNamespace(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
// Non-existent namespace should return 0
|
||||
if count := eb.SubscriberCount("nonexistent"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers for nonexistent namespace, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_SubscriberCount_AfterUnsubscribeAll(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch1 := eb.Subscribe("namespace-1")
|
||||
ch2 := eb.Subscribe("namespace-1")
|
||||
|
||||
eb.Unsubscribe("namespace-1", ch1)
|
||||
eb.Unsubscribe("namespace-1", ch2)
|
||||
|
||||
// After unsubscribing all, the namespace should be cleaned up
|
||||
if count := eb.SubscriberCount("namespace-1"); count != 0 {
|
||||
t.Errorf("expected 0 subscribers, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_PublishNilEvent(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
// Publishing nil event should work (caller's responsibility to validate)
|
||||
eb.Publish("namespace-1", nil)
|
||||
|
||||
select {
|
||||
case received := <-ch:
|
||||
if received != nil {
|
||||
t.Error("expected to receive nil event")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("expected to receive event (even nil)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_ChannelBufferOverflow(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
defer eb.Stop()
|
||||
|
||||
ch := eb.Subscribe("namespace-1")
|
||||
|
||||
// The channel has a buffer of 100, publish more events without reading
|
||||
for i := 0; i < 150; i++ {
|
||||
event := &Event{
|
||||
ID: "evt",
|
||||
EventType: "OverflowTest",
|
||||
ActorID: "actor-1",
|
||||
Version: int64(i),
|
||||
}
|
||||
eb.Publish("namespace-1", event) // Should not block or panic
|
||||
}
|
||||
|
||||
// Drain what we can
|
||||
receivedCount := 0
|
||||
timeout := time.After(100 * time.Millisecond)
|
||||
drainLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
receivedCount++
|
||||
case <-timeout:
|
||||
break drainLoop
|
||||
}
|
||||
}
|
||||
|
||||
// Should have received up to buffer size (100)
|
||||
if receivedCount != 100 {
|
||||
t.Errorf("expected to receive 100 events (buffer size), got %d", receivedCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBus_ImplementsEventBroadcaster(t *testing.T) {
|
||||
var _ EventBroadcaster = (*EventBus)(nil)
|
||||
}
|
||||
|
||||
func TestNewEventBus(t *testing.T) {
|
||||
eb := NewEventBus()
|
||||
if eb == nil {
|
||||
t.Fatal("expected NewEventBus to return non-nil EventBus")
|
||||
}
|
||||
if eb.subscribers == nil {
|
||||
t.Error("expected subscribers map to be initialized")
|
||||
}
|
||||
if eb.ctx == nil {
|
||||
t.Error("expected context to be initialized")
|
||||
}
|
||||
if eb.cancel == nil {
|
||||
t.Error("expected cancel function to be initialized")
|
||||
}
|
||||
eb.Stop()
|
||||
}
|
||||
Reference in New Issue
Block a user