From d08f5c8fdb5621a168aae42cbc9d88bbe44483a7 Mon Sep 17 00:00:00 2001 From: Hugo Nijhuis Date: Sat, 10 Jan 2026 19:24:34 +0100 Subject: [PATCH] Add comprehensive unit tests for EventBus Test coverage for all EventBus methods: - Subscribe: creates subscriptions, returns unique channels, supports multiple namespaces - Unsubscribe: removes subscriptions, closes channels, handles edge cases - Publish: delivers events to subscribers, handles empty namespaces - Namespace isolation: events only reach subscribers in the same namespace - Stop: closes all channels and cleans up subscribers - Multiple subscribers: all subscribers in a namespace receive events - Concurrent operations: thread-safe subscribe/unsubscribe/publish Closes #17 Co-Authored-By: Claude Opus 4.5 --- eventbus_test.go | 697 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 697 insertions(+) create mode 100644 eventbus_test.go diff --git a/eventbus_test.go b/eventbus_test.go new file mode 100644 index 0000000..854607f --- /dev/null +++ b/eventbus_test.go @@ -0,0 +1,697 @@ +package aether + +import ( + "sync" + "testing" + "time" +) + +func TestEventBus_Subscribe(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + if ch == nil { + t.Fatal("expected Subscribe to return a channel") + } + + // Verify subscriber count increased + if count := eb.SubscriberCount("namespace-1"); count != 1 { + t.Errorf("expected 1 subscriber, got %d", count) + } + + // Subscribe again to the same namespace + ch2 := eb.Subscribe("namespace-1") + if ch2 == nil { + t.Fatal("expected second Subscribe to return a channel") + } + if ch == ch2 { + t.Error("expected different channels for separate subscriptions") + } + + // Verify subscriber count + if count := eb.SubscriberCount("namespace-1"); count != 2 { + t.Errorf("expected 2 subscribers, got %d", count) + } +} + +func TestEventBus_Subscribe_MultipleNamespaces(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch1 := eb.Subscribe("namespace-a") + ch2 := eb.Subscribe("namespace-b") + ch3 := eb.Subscribe("namespace-c") + + if ch1 == nil || ch2 == nil || ch3 == nil { + t.Fatal("expected all subscriptions to return channels") + } + + if eb.SubscriberCount("namespace-a") != 1 { + t.Errorf("expected 1 subscriber for namespace-a") + } + if eb.SubscriberCount("namespace-b") != 1 { + t.Errorf("expected 1 subscriber for namespace-b") + } + if eb.SubscriberCount("namespace-c") != 1 { + t.Errorf("expected 1 subscriber for namespace-c") + } +} + +func TestEventBus_Unsubscribe(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + // Verify subscription exists + if count := eb.SubscriberCount("namespace-1"); count != 1 { + t.Errorf("expected 1 subscriber before unsubscribe, got %d", count) + } + + // Unsubscribe + eb.Unsubscribe("namespace-1", ch) + + // Verify subscription removed + if count := eb.SubscriberCount("namespace-1"); count != 0 { + t.Errorf("expected 0 subscribers after unsubscribe, got %d", count) + } +} + +func TestEventBus_Unsubscribe_ChannelClosed(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + // Unsubscribe should close the channel + eb.Unsubscribe("namespace-1", ch) + + // Reading from closed channel should return zero value and false + select { + case _, ok := <-ch: + if ok { + t.Error("expected channel to be closed after unsubscribe") + } + case <-time.After(100 * time.Millisecond): + t.Error("expected immediate response from closed channel") + } +} + +func TestEventBus_Unsubscribe_NonexistentChannel(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + otherCh := make(chan *Event) + + // Unsubscribe with a channel that was never subscribed + eb.Unsubscribe("namespace-1", otherCh) // Should not panic + + // Original subscription should still be there + if count := eb.SubscriberCount("namespace-1"); count != 1 { + t.Errorf("expected 1 subscriber, got %d", count) + } + + // Cleanup + eb.Unsubscribe("namespace-1", ch) +} + +func TestEventBus_Unsubscribe_WrongNamespace(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + // Unsubscribe from wrong namespace + eb.Unsubscribe("namespace-2", ch) // Should not panic + + // Original subscription should still be there + if count := eb.SubscriberCount("namespace-1"); count != 1 { + t.Errorf("expected 1 subscriber, got %d", count) + } + + // Cleanup + eb.Unsubscribe("namespace-1", ch) +} + +func TestEventBus_Unsubscribe_MultipleSubscribers(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch1 := eb.Subscribe("namespace-1") + ch2 := eb.Subscribe("namespace-1") + ch3 := eb.Subscribe("namespace-1") + + if count := eb.SubscriberCount("namespace-1"); count != 3 { + t.Errorf("expected 3 subscribers, got %d", count) + } + + // Unsubscribe the middle one + eb.Unsubscribe("namespace-1", ch2) + + if count := eb.SubscriberCount("namespace-1"); count != 2 { + t.Errorf("expected 2 subscribers after removing one, got %d", count) + } + + // Unsubscribe the first one + eb.Unsubscribe("namespace-1", ch1) + + if count := eb.SubscriberCount("namespace-1"); count != 1 { + t.Errorf("expected 1 subscriber after removing two, got %d", count) + } + + // Unsubscribe the last one + eb.Unsubscribe("namespace-1", ch3) + + if count := eb.SubscriberCount("namespace-1"); count != 0 { + t.Errorf("expected 0 subscribers after removing all, got %d", count) + } +} + +func TestEventBus_Publish(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + event := &Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + Data: map[string]interface{}{"key": "value"}, + Timestamp: time.Now(), + } + + eb.Publish("namespace-1", event) + + select { + case received := <-ch: + if received.ID != event.ID { + t.Errorf("expected event ID %q, got %q", event.ID, received.ID) + } + if received.EventType != event.EventType { + t.Errorf("expected event type %q, got %q", event.EventType, received.EventType) + } + case <-time.After(100 * time.Millisecond): + t.Error("expected to receive event") + } +} + +func TestEventBus_Publish_MultipleEvents(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + events := []*Event{ + {ID: "evt-1", EventType: "Event1", ActorID: "actor-1", Version: 1}, + {ID: "evt-2", EventType: "Event2", ActorID: "actor-1", Version: 2}, + {ID: "evt-3", EventType: "Event3", ActorID: "actor-1", Version: 3}, + } + + for _, event := range events { + eb.Publish("namespace-1", event) + } + + for i, expected := range events { + select { + case received := <-ch: + if received.ID != expected.ID { + t.Errorf("event %d: expected ID %q, got %q", i, expected.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("event %d: timeout waiting for event", i) + } + } +} + +func TestEventBus_Publish_NoSubscribers(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + event := &Event{ + ID: "evt-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + } + + // Should not panic when publishing to namespace with no subscribers + eb.Publish("namespace-1", event) +} + +func TestEventBus_NamespaceIsolation(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch1 := eb.Subscribe("namespace-1") + ch2 := eb.Subscribe("namespace-2") + ch3 := eb.Subscribe("namespace-3") + + // Publish to namespace-2 only + event := &Event{ + ID: "evt-ns2", + EventType: "Namespace2Event", + ActorID: "actor-1", + Version: 1, + } + eb.Publish("namespace-2", event) + + // namespace-2 should receive the event + select { + case received := <-ch2: + if received.ID != event.ID { + t.Errorf("expected event ID %q, got %q", event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("namespace-2 should have received the event") + } + + // namespace-1 and namespace-3 should NOT receive the event + select { + case <-ch1: + t.Error("namespace-1 should NOT have received an event") + case <-time.After(50 * time.Millisecond): + // Expected: no event received + } + + select { + case <-ch3: + t.Error("namespace-3 should NOT have received an event") + case <-time.After(50 * time.Millisecond): + // Expected: no event received + } +} + +func TestEventBus_NamespaceIsolation_PublishToMultiple(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + chA := eb.Subscribe("tenant-a") + chB := eb.Subscribe("tenant-b") + + eventA := &Event{ID: "evt-a", EventType: "EventA", ActorID: "actor-a", Version: 1} + eventB := &Event{ID: "evt-b", EventType: "EventB", ActorID: "actor-b", Version: 1} + + eb.Publish("tenant-a", eventA) + eb.Publish("tenant-b", eventB) + + // Verify tenant-a received only eventA + select { + case received := <-chA: + if received.ID != "evt-a" { + t.Errorf("tenant-a: expected evt-a, got %q", received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("tenant-a: expected to receive event") + } + + // Verify tenant-b received only eventB + select { + case received := <-chB: + if received.ID != "evt-b" { + t.Errorf("tenant-b: expected evt-b, got %q", received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Error("tenant-b: expected to receive event") + } + + // Ensure no cross-talk + select { + case <-chA: + t.Error("tenant-a should not have received a second event") + case <-time.After(50 * time.Millisecond): + // Expected + } + + select { + case <-chB: + t.Error("tenant-b should not have received a second event") + case <-time.After(50 * time.Millisecond): + // Expected + } +} + +func TestEventBus_Stop(t *testing.T) { + eb := NewEventBus() + + ch1 := eb.Subscribe("namespace-1") + ch2 := eb.Subscribe("namespace-2") + ch3 := eb.Subscribe("namespace-1") + + // Stop the event bus + eb.Stop() + + // All channels should be closed + checkClosed := func(ch <-chan *Event, name string) { + select { + case _, ok := <-ch: + if ok { + t.Errorf("%s: expected channel to be closed", name) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("%s: expected immediate response from closed channel", name) + } + } + + checkClosed(ch1, "ch1") + checkClosed(ch2, "ch2") + checkClosed(ch3, "ch3") + + // Subscriber counts should be zero + if count := eb.SubscriberCount("namespace-1"); count != 0 { + t.Errorf("expected 0 subscribers for namespace-1, got %d", count) + } + if count := eb.SubscriberCount("namespace-2"); count != 0 { + t.Errorf("expected 0 subscribers for namespace-2, got %d", count) + } +} + +func TestEventBus_Stop_Empty(t *testing.T) { + eb := NewEventBus() + + // Stop with no subscriptions should not panic + eb.Stop() +} + +func TestEventBus_MultipleSubscribers(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Create multiple subscribers in the same namespace + const numSubscribers = 5 + channels := make([]<-chan *Event, numSubscribers) + for i := 0; i < numSubscribers; i++ { + channels[i] = eb.Subscribe("shared-namespace") + } + + if count := eb.SubscriberCount("shared-namespace"); count != numSubscribers { + t.Errorf("expected %d subscribers, got %d", numSubscribers, count) + } + + // Publish an event + event := &Event{ + ID: "broadcast-evt", + EventType: "BroadcastEvent", + ActorID: "actor-1", + Version: 1, + } + eb.Publish("shared-namespace", event) + + // All subscribers should receive the event + for i, ch := range channels { + select { + case received := <-ch: + if received.ID != event.ID { + t.Errorf("subscriber %d: expected ID %q, got %q", i, event.ID, received.ID) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("subscriber %d: timeout waiting for event", i) + } + } +} + +func TestEventBus_MultipleSubscribers_AllReceive(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch1 := eb.Subscribe("namespace-1") + ch2 := eb.Subscribe("namespace-1") + ch3 := eb.Subscribe("namespace-1") + + event := &Event{ + ID: "evt-multi", + EventType: "MultiEvent", + ActorID: "actor-1", + Version: 1, + } + + eb.Publish("namespace-1", event) + + // All three should receive the same event + received := make([]*Event, 0, 3) + for _, ch := range []<-chan *Event{ch1, ch2, ch3} { + select { + case evt := <-ch: + received = append(received, evt) + case <-time.After(100 * time.Millisecond): + t.Error("expected all subscribers to receive the event") + } + } + + if len(received) != 3 { + t.Errorf("expected 3 events received, got %d", len(received)) + } + + for i, evt := range received { + if evt.ID != event.ID { + t.Errorf("subscriber %d: expected ID %q, got %q", i, event.ID, evt.ID) + } + } +} + +func TestEventBus_ConcurrentSubscribePublish(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + const numGoroutines = 10 + const numEvents = 100 + + var wg sync.WaitGroup + errors := make(chan error, numGoroutines*numEvents) + + // Start subscribers + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + namespace := "concurrent-namespace" + ch := eb.Subscribe(namespace) + defer eb.Unsubscribe(namespace, ch) + + receivedCount := 0 + timeout := time.After(5 * time.Second) + for receivedCount < numEvents { + select { + case _, ok := <-ch: + if !ok { + return + } + receivedCount++ + case <-timeout: + errors <- nil // Timeout is acceptable in concurrent test + return + } + } + }(i) + } + + // Give subscribers time to set up + time.Sleep(50 * time.Millisecond) + + // Publish events concurrently + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < numEvents/numGoroutines; j++ { + event := &Event{ + ID: "concurrent-evt", + EventType: "ConcurrentEvent", + ActorID: "actor-1", + Version: int64(id*numEvents + j), + } + eb.Publish("concurrent-namespace", event) + } + }(i) + } + + wg.Wait() + + // Check for errors + close(errors) + for err := range errors { + if err != nil { + t.Errorf("concurrent error: %v", err) + } + } +} + +func TestEventBus_ConcurrentSubscribeUnsubscribe(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + const numIterations = 100 + var wg sync.WaitGroup + + // Concurrently subscribe and unsubscribe + for i := 0; i < numIterations; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + namespace := "concurrent-sub-unsub" + ch := eb.Subscribe(namespace) + // Immediately unsubscribe + eb.Unsubscribe(namespace, ch) + }(i) + } + + wg.Wait() + + // After all operations, subscriber count should be 0 + if count := eb.SubscriberCount("concurrent-sub-unsub"); count != 0 { + t.Errorf("expected 0 subscribers after all unsubscribes, got %d", count) + } +} + +func TestEventBus_ConcurrentPublish(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("concurrent-pub") + const numPublishers = 10 + const numEventsPerPublisher = 5 // Keep total under buffer size (100) + + var wg sync.WaitGroup + + // Start multiple publishers + for i := 0; i < numPublishers; i++ { + wg.Add(1) + go func(pubID int) { + defer wg.Done() + for j := 0; j < numEventsPerPublisher; j++ { + event := &Event{ + ID: "evt", + EventType: "ConcurrentPub", + ActorID: "actor-1", + Version: int64(pubID*numEventsPerPublisher + j), + } + eb.Publish("concurrent-pub", event) + } + }(i) + } + + wg.Wait() + + // Drain the channel and count received events + receivedCount := 0 + timeout := time.After(500 * time.Millisecond) +drainLoop: + for { + select { + case <-ch: + receivedCount++ + case <-timeout: + break drainLoop + } + } + + expectedTotal := numPublishers * numEventsPerPublisher + if receivedCount != expectedTotal { + t.Errorf("expected to receive %d events, got %d", expectedTotal, receivedCount) + } +} + +func TestEventBus_SubscriberCount_EmptyNamespace(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + // Non-existent namespace should return 0 + if count := eb.SubscriberCount("nonexistent"); count != 0 { + t.Errorf("expected 0 subscribers for nonexistent namespace, got %d", count) + } +} + +func TestEventBus_SubscriberCount_AfterUnsubscribeAll(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch1 := eb.Subscribe("namespace-1") + ch2 := eb.Subscribe("namespace-1") + + eb.Unsubscribe("namespace-1", ch1) + eb.Unsubscribe("namespace-1", ch2) + + // After unsubscribing all, the namespace should be cleaned up + if count := eb.SubscriberCount("namespace-1"); count != 0 { + t.Errorf("expected 0 subscribers, got %d", count) + } +} + +func TestEventBus_PublishNilEvent(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + // Publishing nil event should work (caller's responsibility to validate) + eb.Publish("namespace-1", nil) + + select { + case received := <-ch: + if received != nil { + t.Error("expected to receive nil event") + } + case <-time.After(100 * time.Millisecond): + t.Error("expected to receive event (even nil)") + } +} + +func TestEventBus_ChannelBufferOverflow(t *testing.T) { + eb := NewEventBus() + defer eb.Stop() + + ch := eb.Subscribe("namespace-1") + + // The channel has a buffer of 100, publish more events without reading + for i := 0; i < 150; i++ { + event := &Event{ + ID: "evt", + EventType: "OverflowTest", + ActorID: "actor-1", + Version: int64(i), + } + eb.Publish("namespace-1", event) // Should not block or panic + } + + // Drain what we can + receivedCount := 0 + timeout := time.After(100 * time.Millisecond) +drainLoop: + for { + select { + case <-ch: + receivedCount++ + case <-timeout: + break drainLoop + } + } + + // Should have received up to buffer size (100) + if receivedCount != 100 { + t.Errorf("expected to receive 100 events (buffer size), got %d", receivedCount) + } +} + +func TestEventBus_ImplementsEventBroadcaster(t *testing.T) { + var _ EventBroadcaster = (*EventBus)(nil) +} + +func TestNewEventBus(t *testing.T) { + eb := NewEventBus() + if eb == nil { + t.Fatal("expected NewEventBus to return non-nil EventBus") + } + if eb.subscribers == nil { + t.Error("expected subscribers map to be initialized") + } + if eb.ctx == nil { + t.Error("expected context to be initialized") + } + if eb.cancel == nil { + t.Error("expected cancel function to be initialized") + } + eb.Stop() +}