package aether import ( "sync" "testing" "time" ) func TestEventBus_ExactSubscription(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("tenant-a") event := &Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-1", } eb.Publish("tenant-a", event) select { case received := <-ch: if received.ID != event.ID { t.Errorf("expected event ID %s, got %s", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Fatal("timed out waiting for event") } } func TestEventBus_WildcardStarSubscription(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Subscribe to all single-token namespaces ch := eb.Subscribe("*") event := &Event{ ID: "evt-1", EventType: "TestEvent", ActorID: "actor-1", } eb.Publish("tenant-a", event) select { case received := <-ch: if received.ID != event.ID { t.Errorf("expected event ID %s, got %s", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Fatal("timed out waiting for event") } } func TestEventBus_WildcardGreaterSubscription(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Subscribe to all namespaces ch := eb.Subscribe(">") events := []*Event{ {ID: "evt-1", EventType: "Test1", ActorID: "actor-1"}, {ID: "evt-2", EventType: "Test2", ActorID: "actor-2"}, {ID: "evt-3", EventType: "Test3", ActorID: "actor-3"}, } namespaces := []string{"tenant-a", "tenant-b", "prod.tenant.orders"} for i, ns := range namespaces { eb.Publish(ns, events[i]) } received := make(map[string]bool) timeout := time.After(100 * time.Millisecond) for i := 0; i < len(events); i++ { select { case evt := <-ch: received[evt.ID] = true case <-timeout: t.Fatalf("timed out after receiving %d of %d events", i, len(events)) } } for _, evt := range events { if !received[evt.ID] { t.Errorf("did not receive event %s", evt.ID) } } } func TestEventBus_PrefixWildcard(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Subscribe to prod.* ch := eb.Subscribe("prod.*") event1 := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"} event2 := &Event{ID: "evt-2", EventType: "Test", ActorID: "actor-2"} event3 := &Event{ID: "evt-3", EventType: "Test", ActorID: "actor-3"} // Should match eb.Publish("prod.tenant", event1) eb.Publish("prod.orders", event2) // Should not match (different prefix) eb.Publish("staging.tenant", event3) received := make(map[string]bool) timeout := time.After(100 * time.Millisecond) // Should receive exactly 2 events for i := 0; i < 2; i++ { select { case evt := <-ch: received[evt.ID] = true case <-timeout: t.Fatalf("timed out after receiving %d events", len(received)) } } // Verify we got the right ones if !received["evt-1"] || !received["evt-2"] { t.Errorf("expected evt-1 and evt-2, got %v", received) } // Verify no third event arrives select { case evt := <-ch: t.Errorf("unexpected event received: %s", evt.ID) case <-time.After(50 * time.Millisecond): // Expected - no more events } } func TestEventBus_MultipleWildcardSubscribers(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch1 := eb.Subscribe("prod.*") ch2 := eb.Subscribe("prod.>") ch3 := eb.Subscribe(">") event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"} eb.Publish("prod.tenant.orders", event) // ch1 (prod.*) should NOT receive - doesn't match 3 tokens select { case <-ch1: t.Error("prod.* should not match prod.tenant.orders") case <-time.After(50 * time.Millisecond): // Expected } // ch2 (prod.>) should receive select { case received := <-ch2: if received.ID != event.ID { t.Errorf("expected %s, got %s", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("prod.> should match prod.tenant.orders") } // ch3 (>) should receive select { case received := <-ch3: if received.ID != event.ID { t.Errorf("expected %s, got %s", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("> should match prod.tenant.orders") } } func TestEventBus_ExactAndWildcardCoexist(t *testing.T) { eb := NewEventBus() defer eb.Stop() chExact := eb.Subscribe("tenant-a") chWildcard := eb.Subscribe("*") event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"} eb.Publish("tenant-a", event) // Both should receive the event var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() select { case received := <-chExact: if received.ID != event.ID { t.Errorf("exact: expected %s, got %s", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("exact subscriber timed out") } }() go func() { defer wg.Done() select { case received := <-chWildcard: if received.ID != event.ID { t.Errorf("wildcard: expected %s, got %s", event.ID, received.ID) } case <-time.After(100 * time.Millisecond): t.Error("wildcard subscriber timed out") } }() wg.Wait() } func TestEventBus_WildcardUnsubscribe(t *testing.T) { eb := NewEventBus() defer eb.Stop() ch := eb.Subscribe("prod.*") // Verify it's counted if eb.WildcardSubscriberCount() != 1 { t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount()) } eb.Unsubscribe("prod.*", ch) // Verify it's removed if eb.WildcardSubscriberCount() != 0 { t.Errorf("expected 0 wildcard subscribers, got %d", eb.WildcardSubscriberCount()) } } func TestEventBus_SubscriberCount(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Add exact subscribers ch1 := eb.Subscribe("tenant-a") ch2 := eb.Subscribe("tenant-a") if eb.SubscriberCount("tenant-a") != 2 { t.Errorf("expected 2 exact subscribers, got %d", eb.SubscriberCount("tenant-a")) } // Add wildcard subscriber - should not affect exact count eb.Subscribe("*") if eb.SubscriberCount("tenant-a") != 2 { t.Errorf("expected 2 exact subscribers after wildcard add, got %d", eb.SubscriberCount("tenant-a")) } if eb.WildcardSubscriberCount() != 1 { t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount()) } // Unsubscribe exact eb.Unsubscribe("tenant-a", ch1) if eb.SubscriberCount("tenant-a") != 1 { t.Errorf("expected 1 exact subscriber after unsubscribe, got %d", eb.SubscriberCount("tenant-a")) } eb.Unsubscribe("tenant-a", ch2) if eb.SubscriberCount("tenant-a") != 0 { t.Errorf("expected 0 exact subscribers after unsubscribe, got %d", eb.SubscriberCount("tenant-a")) } } func TestEventBus_StopClosesAllChannels(t *testing.T) { eb := NewEventBus() chExact := eb.Subscribe("tenant-a") chWildcard := eb.Subscribe("*") eb.Stop() // Both channels should be closed select { case _, ok := <-chExact: if ok { t.Error("expected exact channel to be closed") } case <-time.After(100 * time.Millisecond): t.Error("timed out waiting for exact channel close") } select { case _, ok := <-chWildcard: if ok { t.Error("expected wildcard channel to be closed") } case <-time.After(100 * time.Millisecond): t.Error("timed out waiting for wildcard channel close") } } func TestEventBus_NamespaceIsolation(t *testing.T) { eb := NewEventBus() defer eb.Stop() chA := eb.Subscribe("tenant-a") chB := eb.Subscribe("tenant-b") eventA := &Event{ID: "evt-a", EventType: "Test", ActorID: "actor-1"} eventB := &Event{ID: "evt-b", EventType: "Test", ActorID: "actor-2"} eb.Publish("tenant-a", eventA) eb.Publish("tenant-b", eventB) // Verify tenant-a receives only its event select { case received := <-chA: if received.ID != "evt-a" { t.Errorf("tenant-a received wrong event: %s", received.ID) } case <-time.After(100 * time.Millisecond): t.Error("tenant-a timed out") } select { case <-chA: t.Error("tenant-a received extra event") case <-time.After(50 * time.Millisecond): // Expected } // Verify tenant-b receives only its event select { case received := <-chB: if received.ID != "evt-b" { t.Errorf("tenant-b received wrong event: %s", received.ID) } case <-time.After(100 * time.Millisecond): t.Error("tenant-b timed out") } select { case <-chB: t.Error("tenant-b received extra event") case <-time.After(50 * time.Millisecond): // Expected } } func TestEventBus_NonBlockingPublish(t *testing.T) { eb := NewEventBus() defer eb.Stop() // Create subscriber but don't read from channel _ = eb.Subscribe("tenant-a") // Fill the channel buffer (100 events) for i := 0; i < 150; i++ { event := &Event{ ID: "evt", EventType: "Test", ActorID: "actor-1", } // Should not block even when channel is full eb.Publish("tenant-a", event) } // If we got here without blocking, test passes } func TestEventBus_ConcurrentOperations(t *testing.T) { eb := NewEventBus() defer eb.Stop() var wg sync.WaitGroup // Concurrent subscriptions for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() ch := eb.Subscribe("tenant-a") time.Sleep(10 * time.Millisecond) eb.Unsubscribe("tenant-a", ch) }(i) } // Concurrent wildcard subscriptions for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() ch := eb.Subscribe("*") time.Sleep(10 * time.Millisecond) eb.Unsubscribe("*", ch) }(i) } // Concurrent publishes for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() event := &Event{ ID: "evt", EventType: "Test", ActorID: "actor-1", } eb.Publish("tenant-a", event) }(i) } wg.Wait() }