package aether import ( "sync" "testing" "time" ) func TestEventBus_Subscribe(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") if ch == nil { t.Fatal("expected Subscribe to return a channel") } // Verify subscriber count increased if count := eb.SubscriberCount("namespace-1"); count != 1 { t.Errorf("expected 1 subscriber, got %d", count) } // Subscribe again to the same namespace ch2 := eb.Subscribe("namespace-1") if ch2 == nil { t.Fatal("expected second Subscribe to return a channel") } if ch == ch2 { t.Error("expected different channels for separate subscriptions") } // Verify subscriber count if count := eb.SubscriberCount("namespace-1"); count != 2 { t.Errorf("expected 2 subscribers, got %d", count) } } func TestEventBus_Subscribe_MultipleNamespaces(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch1 := eb.Subscribe("namespace-a") ch2 := eb.Subscribe("namespace-b") ch3 := eb.Subscribe("namespace-c") if ch1 == nil || ch2 == nil || ch3 == nil { t.Fatal("expected all subscriptions to return channels") } if eb.SubscriberCount("namespace-a") != 1 { t.Errorf("expected 1 subscriber for namespace-a") } if eb.SubscriberCount("namespace-b") != 1 { t.Errorf("expected 1 subscriber for namespace-b") } if eb.SubscriberCount("namespace-c") != 1 { t.Errorf("expected 1 subscriber for namespace-c") } } func TestEventBus_Unsubscribe(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") // Verify subscription exists if count := eb.SubscriberCount("namespace-1"); count != 1 { t.Errorf("expected 1 subscriber before unsubscribe, got %d", count) } // Unsubscribe eb.Unsubscribe("namespace-1", ch) // Verify subscription removed if count := eb.SubscriberCount("namespace-1"); count != 0 { t.Errorf("expected 0 subscribers after unsubscribe, got %d", count) } } func TestEventBus_Unsubscribe_ChannelClosed(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") // Unsubscribe should close the channel eb.Unsubscribe("namespace-1", ch) // Reading from closed channel should return zero value and false select { case _, ok := <-ch: if ok { t.Error("expected channel to be closed after unsubscribe") } case <-time.After(100 * time.Millisecond): t.Error("expected immediate response from closed channel") } } func TestEventBus_Unsubscribe_NonexistentChannel(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") otherCh := make(chan *Event) // Unsubscribe with a channel that was never subscribed eb.Unsubscribe("namespace-1", otherCh) // Should not panic // Original subscription should still be there if count := eb.SubscriberCount("namespace-1"); count != 1 { t.Errorf("expected 1 subscriber, got %d", count) } // Cleanup eb.Unsubscribe("namespace-1", ch) } func TestEventBus_Unsubscribe_WrongNamespace(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") // Unsubscribe from wrong namespace eb.Unsubscribe("namespace-2", ch) // Should not panic // Original subscription should still be there if count := eb.SubscriberCount("namespace-1"); count != 1 { t.Errorf("expected 1 subscriber, got %d", count) } // Cleanup eb.Unsubscribe("namespace-1", ch) } func TestEventBus_Unsubscribe_MultipleSubscribers(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch1 := eb.Subscribe("namespace-1") ch2 := eb.Subscribe("namespace-1") ch3 := eb.Subscribe("namespace-1") if count := eb.SubscriberCount("namespace-1"); count != 3 { t.Errorf("expected 3 subscribers, got %d", count) } // Unsubscribe the middle one eb.Unsubscribe("namespace-1", ch2) if count := eb.SubscriberCount("namespace-1"); count != 2 { t.Errorf("expected 2 subscribers after removing one, got %d", count) } // Unsubscribe the first one eb.Unsubscribe("namespace-1", ch1) if count := eb.SubscriberCount("namespace-1"); count != 1 { t.Errorf("expected 1 subscriber after removing two, got %d", count) } // Unsubscribe the last one eb.Unsubscribe("namespace-1", ch3) if count := eb.SubscriberCount("namespace-1"); count != 0 { t.Errorf("expected 0 subscribers after removing all, got %d", count) } } func TestEventBus_Publish(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") event := &Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-1", Version: 1, Data: map[string]interface{}{"key": "value"}, Timestamp: time.Now(), } eb.Publish("namespace-1", event) select { case received := <-ch: if received.ID != event.ID { t.Errorf("expected event ID %q, got %q", event.ID, received.ID) } if received.EventType != event.EventType { t.Errorf("expected event type %q, got %q", event.EventType, received.EventType) } case <-time.After(100 * time.Millisecond): t.Error("expected to receive event") } } func TestEventBus_Publish_MultipleEvents(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") events := []*Event{ {ID: "evt-1", EventType: "Event1", ActorID: "actor-1", Version: 1}, {ID: "evt-2", EventType: "Event2", ActorID: "actor-1", Version: 2}, {ID: "evt-3", EventType: "Event3", ActorID: "actor-1", Version: 3}, } for _, event := range events { eb.Publish("namespace-1", event) } for i, expected := range events { select { case received := <-ch: if received.ID != expected.ID { t.Errorf("event %d: expected ID %q, got %q", i, expected.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Errorf("event %d: timeout waiting for event", i) } } } func TestEventBus_Publish_NoSubscribers(t *testing.T) { eb := NewEventBus() defer eb.Stop() event := &Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-1", Version: 1, } // Should not panic when publishing to namespace with no subscribers eb.Publish("namespace-1", event) } func TestEventBus_NamespaceIsolation(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch1 := eb.Subscribe("namespace-1") ch2 := eb.Subscribe("namespace-2") ch3 := eb.Subscribe("namespace-3") // Publish to namespace-2 only event := &Event{ ID: "evt-ns2", EventType: "Namespace2Event", ActorID: "actor-1", Version: 1, } eb.Publish("namespace-2", event) // namespace-2 should receive the event select { case received := <-ch2: if received.ID != event.ID { t.Errorf("expected event ID %q, got %q", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("namespace-2 should have received the event") } // namespace-1 and namespace-3 should NOT receive the event select { case <-ch1: t.Error("namespace-1 should NOT have received an event") case <-time.After(50 * time.Millisecond): // Expected: no event received } select { case <-ch3: t.Error("namespace-3 should NOT have received an event") case <-time.After(50 * time.Millisecond): // Expected: no event received } } func TestEventBus_NamespaceIsolation_PublishToMultiple(t *testing.T) { eb := NewEventBus() defer eb.Stop() chA := eb.Subscribe("tenant-a") chB := eb.Subscribe("tenant-b") eventA := &Event{ID: "evt-a", EventType: "EventA", ActorID: "actor-a", Version: 1} eventB := &Event{ID: "evt-b", EventType: "EventB", ActorID: "actor-b", Version: 1} eb.Publish("tenant-a", eventA) eb.Publish("tenant-b", eventB) // Verify tenant-a received only eventA select { case received := <-chA: if received.ID != "evt-a" { t.Errorf("tenant-a: expected evt-a, got %q", received.ID) } case <-time.After(100 * time.Millisecond): t.Error("tenant-a: expected to receive event") } // Verify tenant-b received only eventB select { case received := <-chB: if received.ID != "evt-b" { t.Errorf("tenant-b: expected evt-b, got %q", received.ID) } case <-time.After(100 * time.Millisecond): t.Error("tenant-b: expected to receive event") } // Ensure no cross-talk select { case <-chA: t.Error("tenant-a should not have received a second event") case <-time.After(50 * time.Millisecond): // Expected } select { case <-chB: t.Error("tenant-b should not have received a second event") case <-time.After(50 * time.Millisecond): // Expected } } func TestEventBus_Stop(t *testing.T) { eb := NewEventBus() ch1 := eb.Subscribe("namespace-1") ch2 := eb.Subscribe("namespace-2") ch3 := eb.Subscribe("namespace-1") // Stop the event bus eb.Stop() // All channels should be closed checkClosed := func(ch <-chan *Event, name string) { select { case _, ok := <-ch: if ok { t.Errorf("%s: expected channel to be closed", name) } case <-time.After(100 * time.Millisecond): t.Errorf("%s: expected immediate response from closed channel", name) } } checkClosed(ch1, "ch1") checkClosed(ch2, "ch2") checkClosed(ch3, "ch3") // Subscriber counts should be zero if count := eb.SubscriberCount("namespace-1"); count != 0 { t.Errorf("expected 0 subscribers for namespace-1, got %d", count) } if count := eb.SubscriberCount("namespace-2"); count != 0 { t.Errorf("expected 0 subscribers for namespace-2, got %d", count) } } func TestEventBus_Stop_Empty(t *testing.T) { eb := NewEventBus() // Stop with no subscriptions should not panic eb.Stop() } func TestEventBus_MultipleSubscribers(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Create multiple subscribers in the same namespace const numSubscribers = 5 channels := make([]<-chan *Event, numSubscribers) for i := 0; i < numSubscribers; i++ { channels[i] = eb.Subscribe("shared-namespace") } if count := eb.SubscriberCount("shared-namespace"); count != numSubscribers { t.Errorf("expected %d subscribers, got %d", numSubscribers, count) } // Publish an event event := &Event{ ID: "broadcast-evt", EventType: "BroadcastEvent", ActorID: "actor-1", Version: 1, } eb.Publish("shared-namespace", event) // All subscribers should receive the event for i, ch := range channels { select { case received := <-ch: if received.ID != event.ID { t.Errorf("subscriber %d: expected ID %q, got %q", i, event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Errorf("subscriber %d: timeout waiting for event", i) } } } func TestEventBus_MultipleSubscribers_AllReceive(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch1 := eb.Subscribe("namespace-1") ch2 := eb.Subscribe("namespace-1") ch3 := eb.Subscribe("namespace-1") event := &Event{ ID: "evt-multi", EventType: "MultiEvent", ActorID: "actor-1", Version: 1, } eb.Publish("namespace-1", event) // All three should receive the same event received := make([]*Event, 0, 3) for _, ch := range []<-chan *Event{ch1, ch2, ch3} { select { case evt := <-ch: received = append(received, evt) case <-time.After(100 * time.Millisecond): t.Error("expected all subscribers to receive the event") } } if len(received) != 3 { t.Errorf("expected 3 events received, got %d", len(received)) } for i, evt := range received { if evt.ID != event.ID { t.Errorf("subscriber %d: expected ID %q, got %q", i, event.ID, evt.ID) } } } func TestEventBus_ConcurrentSubscribePublish(t *testing.T) { eb := NewEventBus() defer eb.Stop() const numGoroutines = 10 const numEvents = 100 var wg sync.WaitGroup errors := make(chan error, numGoroutines*numEvents) // Start subscribers for i := 0; i < numGoroutines; i++ { wg.Add(1) go func(id int) { defer wg.Done() namespace := "concurrent-namespace" ch := eb.Subscribe(namespace) defer eb.Unsubscribe(namespace, ch) receivedCount := 0 timeout := time.After(5 * time.Second) for receivedCount < numEvents { select { case _, ok := <-ch: if !ok { return } receivedCount++ case <-timeout: errors <- nil // Timeout is acceptable in concurrent test return } } }(i) } // Give subscribers time to set up time.Sleep(50 * time.Millisecond) // Publish events concurrently for i := 0; i < numGoroutines; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < numEvents/numGoroutines; j++ { event := &Event{ ID: "concurrent-evt", EventType: "ConcurrentEvent", ActorID: "actor-1", Version: int64(id*numEvents + j), } eb.Publish("concurrent-namespace", event) } }(i) } wg.Wait() // Check for errors close(errors) for err := range errors { if err != nil { t.Errorf("concurrent error: %v", err) } } } func TestEventBus_ConcurrentSubscribeUnsubscribe(t *testing.T) { eb := NewEventBus() defer eb.Stop() const numIterations = 100 var wg sync.WaitGroup // Concurrently subscribe and unsubscribe for i := 0; i < numIterations; i++ { wg.Add(1) go func(id int) { defer wg.Done() namespace := "concurrent-sub-unsub" ch := eb.Subscribe(namespace) // Immediately unsubscribe eb.Unsubscribe(namespace, ch) }(i) } wg.Wait() // After all operations, subscriber count should be 0 if count := eb.SubscriberCount("concurrent-sub-unsub"); count != 0 { t.Errorf("expected 0 subscribers after all unsubscribes, got %d", count) } } func TestEventBus_ConcurrentPublish(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("concurrent-pub") const numPublishers = 10 const numEventsPerPublisher = 5 // Keep total under buffer size (100) var wg sync.WaitGroup // Start multiple publishers for i := 0; i < numPublishers; i++ { wg.Add(1) go func(pubID int) { defer wg.Done() for j := 0; j < numEventsPerPublisher; j++ { event := &Event{ ID: "evt", EventType: "ConcurrentPub", ActorID: "actor-1", Version: int64(pubID*numEventsPerPublisher + j), } eb.Publish("concurrent-pub", event) } }(i) } wg.Wait() // Drain the channel and count received events receivedCount := 0 timeout := time.After(500 * time.Millisecond) drainLoop: for { select { case <-ch: receivedCount++ case <-timeout: break drainLoop } } expectedTotal := numPublishers * numEventsPerPublisher if receivedCount != expectedTotal { t.Errorf("expected to receive %d events, got %d", expectedTotal, receivedCount) } } func TestEventBus_SubscriberCount_EmptyNamespace(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Non-existent namespace should return 0 if count := eb.SubscriberCount("nonexistent"); count != 0 { t.Errorf("expected 0 subscribers for nonexistent namespace, got %d", count) } } func TestEventBus_SubscriberCount_AfterUnsubscribeAll(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch1 := eb.Subscribe("namespace-1") ch2 := eb.Subscribe("namespace-1") eb.Unsubscribe("namespace-1", ch1) eb.Unsubscribe("namespace-1", ch2) // After unsubscribing all, the namespace should be cleaned up if count := eb.SubscriberCount("namespace-1"); count != 0 { t.Errorf("expected 0 subscribers, got %d", count) } } func TestEventBus_PublishNilEvent(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") // Publishing nil event should work (caller's responsibility to validate) eb.Publish("namespace-1", nil) select { case received := <-ch: if received != nil { t.Error("expected to receive nil event") } case <-time.After(100 * time.Millisecond): t.Error("expected to receive event (even nil)") } } func TestEventBus_ChannelBufferOverflow(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("namespace-1") // The channel has a buffer of 100, publish more events without reading for i := 0; i < 150; i++ { event := &Event{ ID: "evt", EventType: "OverflowTest", ActorID: "actor-1", Version: int64(i), } eb.Publish("namespace-1", event) // Should not block or panic } // Drain what we can receivedCount := 0 timeout := time.After(100 * time.Millisecond) drainLoop: for { select { case <-ch: receivedCount++ case <-timeout: break drainLoop } } // Should have received up to buffer size (100) if receivedCount != 100 { t.Errorf("expected to receive 100 events (buffer size), got %d", receivedCount) } } func TestEventBus_ImplementsEventBroadcaster(t *testing.T) { var _ EventBroadcaster = (*EventBus)(nil) } func TestNewEventBus(t *testing.T) { eb := NewEventBus() if eb == nil { t.Fatal("expected NewEventBus to return non-nil EventBus") } if eb.subscribers == nil { t.Error("expected subscribers map to be initialized") } if eb.ctx == nil { t.Error("expected context to be initialized") } if eb.cancel == nil { t.Error("expected cancel function to be initialized") } eb.Stop() }