package aether_test import ( "sync" "testing" "time" "git.flowmade.one/flowmade-one/aether" ) func TestMetricsCollector_InitialState(t *testing.T) { mc := aether.NewMetricsCollector() if got := mc.EventsPublished("test-ns"); got != 0 { t.Errorf("EventsPublished() = %d, want 0", got) } if got := mc.EventsReceived("test-ns"); got != 0 { t.Errorf("EventsReceived() = %d, want 0", got) } if got := mc.ActiveSubscriptions("test-ns"); got != 0 { t.Errorf("ActiveSubscriptions() = %d, want 0", got) } if got := mc.TotalActiveSubscriptions(); got != 0 { t.Errorf("TotalActiveSubscriptions() = %d, want 0", got) } if got := mc.PublishErrors("test-ns"); got != 0 { t.Errorf("PublishErrors() = %d, want 0", got) } if got := mc.SubscribeErrors("test-ns"); got != 0 { t.Errorf("SubscribeErrors() = %d, want 0", got) } if got := mc.DroppedEvents("test-ns"); got != 0 { t.Errorf("DroppedEvents() = %d, want 0", got) } if got := len(mc.Namespaces()); got != 0 { t.Errorf("Namespaces() = %d, want 0", got) } } func TestMetricsCollector_RecordPublish(t *testing.T) { mc := aether.NewMetricsCollector() mc.RecordPublish("ns1") mc.RecordPublish("ns1") mc.RecordPublish("ns2") if got := mc.EventsPublished("ns1"); got != 2 { t.Errorf("EventsPublished(ns1) = %d, want 2", got) } if got := mc.EventsPublished("ns2"); got != 1 { t.Errorf("EventsPublished(ns2) = %d, want 1", got) } } func TestMetricsCollector_RecordReceive(t *testing.T) { mc := aether.NewMetricsCollector() mc.RecordReceive("ns1") mc.RecordReceive("ns1") mc.RecordReceive("ns1") if got := mc.EventsReceived("ns1"); got != 3 { t.Errorf("EventsReceived(ns1) = %d, want 3", got) } } func TestMetricsCollector_Subscriptions(t *testing.T) { mc := aether.NewMetricsCollector() mc.RecordSubscribe("ns1") mc.RecordSubscribe("ns1") mc.RecordSubscribe("ns2") if got := mc.ActiveSubscriptions("ns1"); got != 2 { t.Errorf("ActiveSubscriptions(ns1) = %d, want 2", got) } if got := mc.ActiveSubscriptions("ns2"); got != 1 { t.Errorf("ActiveSubscriptions(ns2) = %d, want 1", got) } if got := mc.TotalActiveSubscriptions(); got != 3 { t.Errorf("TotalActiveSubscriptions() = %d, want 3", got) } mc.RecordUnsubscribe("ns1") if got := mc.ActiveSubscriptions("ns1"); got != 1 { t.Errorf("ActiveSubscriptions(ns1) after unsubscribe = %d, want 1", got) } if got := mc.TotalActiveSubscriptions(); got != 2 { t.Errorf("TotalActiveSubscriptions() after unsubscribe = %d, want 2", got) } } func TestMetricsCollector_Errors(t *testing.T) { mc := aether.NewMetricsCollector() mc.RecordPublishError("ns1") mc.RecordPublishError("ns1") mc.RecordSubscribeError("ns1") mc.RecordDroppedEvent("ns1") mc.RecordDroppedEvent("ns1") mc.RecordDroppedEvent("ns1") if got := mc.PublishErrors("ns1"); got != 2 { t.Errorf("PublishErrors(ns1) = %d, want 2", got) } if got := mc.SubscribeErrors("ns1"); got != 1 { t.Errorf("SubscribeErrors(ns1) = %d, want 1", got) } if got := mc.DroppedEvents("ns1"); got != 3 { t.Errorf("DroppedEvents(ns1) = %d, want 3", got) } } func TestMetricsCollector_Namespaces(t *testing.T) { mc := aether.NewMetricsCollector() mc.RecordPublish("ns1") mc.RecordReceive("ns2") mc.RecordSubscribe("ns3") namespaces := mc.Namespaces() if len(namespaces) != 3 { t.Errorf("Namespaces() length = %d, want 3", len(namespaces)) } nsMap := make(map[string]bool) for _, ns := range namespaces { nsMap[ns] = true } for _, expected := range []string{"ns1", "ns2", "ns3"} { if !nsMap[expected] { t.Errorf("Namespaces() missing %q", expected) } } } func TestMetricsCollector_Reset(t *testing.T) { mc := aether.NewMetricsCollector() mc.RecordPublish("ns1") mc.RecordReceive("ns1") mc.RecordSubscribe("ns1") mc.Reset() if got := mc.EventsPublished("ns1"); got != 0 { t.Errorf("EventsPublished() after reset = %d, want 0", got) } if got := len(mc.Namespaces()); got != 0 { t.Errorf("Namespaces() after reset = %d, want 0", got) } } func TestMetricsCollector_ConcurrentAccess(t *testing.T) { mc := aether.NewMetricsCollector() const goroutines = 10 const iterations = 100 var wg sync.WaitGroup wg.Add(goroutines) for i := 0; i < goroutines; i++ { go func() { defer wg.Done() for j := 0; j < iterations; j++ { mc.RecordPublish("concurrent-ns") mc.RecordReceive("concurrent-ns") mc.RecordSubscribe("concurrent-ns") mc.RecordUnsubscribe("concurrent-ns") mc.RecordPublishError("concurrent-ns") mc.RecordSubscribeError("concurrent-ns") mc.RecordDroppedEvent("concurrent-ns") } }() } wg.Wait() expected := int64(goroutines * iterations) if got := mc.EventsPublished("concurrent-ns"); got != expected { t.Errorf("EventsPublished() = %d, want %d", got, expected) } if got := mc.EventsReceived("concurrent-ns"); got != expected { t.Errorf("EventsReceived() = %d, want %d", got, expected) } if got := mc.ActiveSubscriptions("concurrent-ns"); got != 0 { t.Errorf("ActiveSubscriptions() = %d, want 0 (subscribed and unsubscribed same amount)", got) } if got := mc.PublishErrors("concurrent-ns"); got != expected { t.Errorf("PublishErrors() = %d, want %d", got, expected) } if got := mc.SubscribeErrors("concurrent-ns"); got != expected { t.Errorf("SubscribeErrors() = %d, want %d", got, expected) } if got := mc.DroppedEvents("concurrent-ns"); got != expected { t.Errorf("DroppedEvents() = %d, want %d", got, expected) } } func TestEventBus_Metrics(t *testing.T) { eb := aether.NewEventBus() defer eb.Stop() metrics := eb.Metrics() if metrics == nil { t.Fatal("Metrics() returned nil") } // Subscribe and verify metrics ch := eb.Subscribe("test-ns") if got := metrics.ActiveSubscriptions("test-ns"); got != 1 { t.Errorf("ActiveSubscriptions() after subscribe = %d, want 1", got) } // Publish and verify metrics event := &aether.Event{ ID: "test-1", EventType: "TestEvent", ActorID: "actor-1", Version: 1, } eb.Publish("test-ns", event) // Wait for event delivery select { case <-ch: case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for event") } if got := metrics.EventsPublished("test-ns"); got != 1 { t.Errorf("EventsPublished() after publish = %d, want 1", got) } if got := metrics.EventsReceived("test-ns"); got != 1 { t.Errorf("EventsReceived() after publish = %d, want 1", got) } // Unsubscribe and verify metrics eb.Unsubscribe("test-ns", ch) if got := metrics.ActiveSubscriptions("test-ns"); got != 0 { t.Errorf("ActiveSubscriptions() after unsubscribe = %d, want 0", got) } } func TestEventBus_DroppedEvents(t *testing.T) { eb := aether.NewEventBus() defer eb.Stop() metrics := eb.Metrics() // Subscribe but don't read from channel _ = eb.Subscribe("test-ns") // Fill the channel buffer (default is 100) for i := 0; i < 100; i++ { eb.Publish("test-ns", &aether.Event{ ID: "fill-" + string(rune(i)), EventType: "FillEvent", }) } // Next publish should be dropped eb.Publish("test-ns", &aether.Event{ ID: "dropped", EventType: "DroppedEvent", }) if got := metrics.DroppedEvents("test-ns"); got != 1 { t.Errorf("DroppedEvents() = %d, want 1", got) } } func TestEventBus_MetricsProvider(t *testing.T) { eb := aether.NewEventBus() defer eb.Stop() // Verify EventBus implements MetricsProvider var mp aether.MetricsProvider = eb if mp.Metrics() == nil { t.Error("EventBus.Metrics() returned nil") } } func TestEventBus_StopClearsSubscriptionMetrics(t *testing.T) { eb := aether.NewEventBus() metrics := eb.Metrics() _ = eb.Subscribe("ns1") _ = eb.Subscribe("ns1") _ = eb.Subscribe("ns2") if got := metrics.TotalActiveSubscriptions(); got != 3 { t.Errorf("TotalActiveSubscriptions() before stop = %d, want 3", got) } eb.Stop() if got := metrics.TotalActiveSubscriptions(); got != 0 { t.Errorf("TotalActiveSubscriptions() after stop = %d, want 0", got) } }