Files
aether/eventbus_test.go
Hugo Nijhuis adead7e980
All checks were successful
CI / build (pull_request) Successful in 18s
CI / build (push) Successful in 16s
Add wildcard namespace subscriptions
Support NATS-style wildcard patterns ("*" and ">") for subscribing
to events across multiple namespaces. This enables cross-cutting
concerns like logging, monitoring, and auditing without requiring
separate subscriptions for each namespace.

- Add pattern.go with MatchNamespacePattern and IsWildcardPattern
- Update EventBus to track wildcard subscribers separately
- Update NATSEventBus to use NATS native wildcard support
- Add comprehensive tests for pattern matching and EventBus wildcards
- Document security implications in all relevant code comments

Closes #20

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 19:24:26 +01:00

417 lines
9.3 KiB
Go

package aether
import (
"sync"
"testing"
"time"
)
func TestEventBus_ExactSubscription(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
ch := eb.Subscribe("tenant-a")
event := &Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
}
eb.Publish("tenant-a", event)
select {
case received := <-ch:
if received.ID != event.ID {
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for event")
}
}
func TestEventBus_WildcardStarSubscription(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to all single-token namespaces
ch := eb.Subscribe("*")
event := &Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
}
eb.Publish("tenant-a", event)
select {
case received := <-ch:
if received.ID != event.ID {
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for event")
}
}
func TestEventBus_WildcardGreaterSubscription(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to all namespaces
ch := eb.Subscribe(">")
events := []*Event{
{ID: "evt-1", EventType: "Test1", ActorID: "actor-1"},
{ID: "evt-2", EventType: "Test2", ActorID: "actor-2"},
{ID: "evt-3", EventType: "Test3", ActorID: "actor-3"},
}
namespaces := []string{"tenant-a", "tenant-b", "prod.tenant.orders"}
for i, ns := range namespaces {
eb.Publish(ns, events[i])
}
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < len(events); i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d of %d events", i, len(events))
}
}
for _, evt := range events {
if !received[evt.ID] {
t.Errorf("did not receive event %s", evt.ID)
}
}
}
func TestEventBus_PrefixWildcard(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to prod.*
ch := eb.Subscribe("prod.*")
event1 := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
event2 := &Event{ID: "evt-2", EventType: "Test", ActorID: "actor-2"}
event3 := &Event{ID: "evt-3", EventType: "Test", ActorID: "actor-3"}
// Should match
eb.Publish("prod.tenant", event1)
eb.Publish("prod.orders", event2)
// Should not match (different prefix)
eb.Publish("staging.tenant", event3)
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
// Should receive exactly 2 events
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d events", len(received))
}
}
// Verify we got the right ones
if !received["evt-1"] || !received["evt-2"] {
t.Errorf("expected evt-1 and evt-2, got %v", received)
}
// Verify no third event arrives
select {
case evt := <-ch:
t.Errorf("unexpected event received: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected - no more events
}
}
func TestEventBus_MultipleWildcardSubscribers(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
ch1 := eb.Subscribe("prod.*")
ch2 := eb.Subscribe("prod.>")
ch3 := eb.Subscribe(">")
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
eb.Publish("prod.tenant.orders", event)
// ch1 (prod.*) should NOT receive - doesn't match 3 tokens
select {
case <-ch1:
t.Error("prod.* should not match prod.tenant.orders")
case <-time.After(50 * time.Millisecond):
// Expected
}
// ch2 (prod.>) should receive
select {
case received := <-ch2:
if received.ID != event.ID {
t.Errorf("expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("prod.> should match prod.tenant.orders")
}
// ch3 (>) should receive
select {
case received := <-ch3:
if received.ID != event.ID {
t.Errorf("expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("> should match prod.tenant.orders")
}
}
func TestEventBus_ExactAndWildcardCoexist(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
chExact := eb.Subscribe("tenant-a")
chWildcard := eb.Subscribe("*")
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
eb.Publish("tenant-a", event)
// Both should receive the event
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
select {
case received := <-chExact:
if received.ID != event.ID {
t.Errorf("exact: expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("exact subscriber timed out")
}
}()
go func() {
defer wg.Done()
select {
case received := <-chWildcard:
if received.ID != event.ID {
t.Errorf("wildcard: expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("wildcard subscriber timed out")
}
}()
wg.Wait()
}
func TestEventBus_WildcardUnsubscribe(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
ch := eb.Subscribe("prod.*")
// Verify it's counted
if eb.WildcardSubscriberCount() != 1 {
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
}
eb.Unsubscribe("prod.*", ch)
// Verify it's removed
if eb.WildcardSubscriberCount() != 0 {
t.Errorf("expected 0 wildcard subscribers, got %d", eb.WildcardSubscriberCount())
}
}
func TestEventBus_SubscriberCount(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Add exact subscribers
ch1 := eb.Subscribe("tenant-a")
ch2 := eb.Subscribe("tenant-a")
if eb.SubscriberCount("tenant-a") != 2 {
t.Errorf("expected 2 exact subscribers, got %d", eb.SubscriberCount("tenant-a"))
}
// Add wildcard subscriber - should not affect exact count
eb.Subscribe("*")
if eb.SubscriberCount("tenant-a") != 2 {
t.Errorf("expected 2 exact subscribers after wildcard add, got %d", eb.SubscriberCount("tenant-a"))
}
if eb.WildcardSubscriberCount() != 1 {
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
}
// Unsubscribe exact
eb.Unsubscribe("tenant-a", ch1)
if eb.SubscriberCount("tenant-a") != 1 {
t.Errorf("expected 1 exact subscriber after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
}
eb.Unsubscribe("tenant-a", ch2)
if eb.SubscriberCount("tenant-a") != 0 {
t.Errorf("expected 0 exact subscribers after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
}
}
func TestEventBus_StopClosesAllChannels(t *testing.T) {
eb := NewEventBus()
chExact := eb.Subscribe("tenant-a")
chWildcard := eb.Subscribe("*")
eb.Stop()
// Both channels should be closed
select {
case _, ok := <-chExact:
if ok {
t.Error("expected exact channel to be closed")
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for exact channel close")
}
select {
case _, ok := <-chWildcard:
if ok {
t.Error("expected wildcard channel to be closed")
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for wildcard channel close")
}
}
func TestEventBus_NamespaceIsolation(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
chA := eb.Subscribe("tenant-a")
chB := eb.Subscribe("tenant-b")
eventA := &Event{ID: "evt-a", EventType: "Test", ActorID: "actor-1"}
eventB := &Event{ID: "evt-b", EventType: "Test", ActorID: "actor-2"}
eb.Publish("tenant-a", eventA)
eb.Publish("tenant-b", eventB)
// Verify tenant-a receives only its event
select {
case received := <-chA:
if received.ID != "evt-a" {
t.Errorf("tenant-a received wrong event: %s", received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("tenant-a timed out")
}
select {
case <-chA:
t.Error("tenant-a received extra event")
case <-time.After(50 * time.Millisecond):
// Expected
}
// Verify tenant-b receives only its event
select {
case received := <-chB:
if received.ID != "evt-b" {
t.Errorf("tenant-b received wrong event: %s", received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("tenant-b timed out")
}
select {
case <-chB:
t.Error("tenant-b received extra event")
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_NonBlockingPublish(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Create subscriber but don't read from channel
_ = eb.Subscribe("tenant-a")
// Fill the channel buffer (100 events)
for i := 0; i < 150; i++ {
event := &Event{
ID: "evt",
EventType: "Test",
ActorID: "actor-1",
}
// Should not block even when channel is full
eb.Publish("tenant-a", event)
}
// If we got here without blocking, test passes
}
func TestEventBus_ConcurrentOperations(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
var wg sync.WaitGroup
// Concurrent subscriptions
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
ch := eb.Subscribe("tenant-a")
time.Sleep(10 * time.Millisecond)
eb.Unsubscribe("tenant-a", ch)
}(i)
}
// Concurrent wildcard subscriptions
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
ch := eb.Subscribe("*")
time.Sleep(10 * time.Millisecond)
eb.Unsubscribe("*", ch)
}(i)
}
// Concurrent publishes
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
event := &Event{
ID: "evt",
EventType: "Test",
ActorID: "actor-1",
}
eb.Publish("tenant-a", event)
}(i)
}
wg.Wait()
}