305 lines
7.8 KiB
Go
305 lines
7.8 KiB
Go
package aether_test
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.flowmade.one/flowmade-one/aether"
|
|
)
|
|
|
|
func TestMetricsCollector_InitialState(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
if got := mc.EventsPublished("test-ns"); got != 0 {
|
|
t.Errorf("EventsPublished() = %d, want 0", got)
|
|
}
|
|
if got := mc.EventsReceived("test-ns"); got != 0 {
|
|
t.Errorf("EventsReceived() = %d, want 0", got)
|
|
}
|
|
if got := mc.ActiveSubscriptions("test-ns"); got != 0 {
|
|
t.Errorf("ActiveSubscriptions() = %d, want 0", got)
|
|
}
|
|
if got := mc.TotalActiveSubscriptions(); got != 0 {
|
|
t.Errorf("TotalActiveSubscriptions() = %d, want 0", got)
|
|
}
|
|
if got := mc.PublishErrors("test-ns"); got != 0 {
|
|
t.Errorf("PublishErrors() = %d, want 0", got)
|
|
}
|
|
if got := mc.SubscribeErrors("test-ns"); got != 0 {
|
|
t.Errorf("SubscribeErrors() = %d, want 0", got)
|
|
}
|
|
if got := mc.DroppedEvents("test-ns"); got != 0 {
|
|
t.Errorf("DroppedEvents() = %d, want 0", got)
|
|
}
|
|
if got := len(mc.Namespaces()); got != 0 {
|
|
t.Errorf("Namespaces() = %d, want 0", got)
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_RecordPublish(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
mc.RecordPublish("ns1")
|
|
mc.RecordPublish("ns1")
|
|
mc.RecordPublish("ns2")
|
|
|
|
if got := mc.EventsPublished("ns1"); got != 2 {
|
|
t.Errorf("EventsPublished(ns1) = %d, want 2", got)
|
|
}
|
|
if got := mc.EventsPublished("ns2"); got != 1 {
|
|
t.Errorf("EventsPublished(ns2) = %d, want 1", got)
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_RecordReceive(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
mc.RecordReceive("ns1")
|
|
mc.RecordReceive("ns1")
|
|
mc.RecordReceive("ns1")
|
|
|
|
if got := mc.EventsReceived("ns1"); got != 3 {
|
|
t.Errorf("EventsReceived(ns1) = %d, want 3", got)
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_Subscriptions(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
mc.RecordSubscribe("ns1")
|
|
mc.RecordSubscribe("ns1")
|
|
mc.RecordSubscribe("ns2")
|
|
|
|
if got := mc.ActiveSubscriptions("ns1"); got != 2 {
|
|
t.Errorf("ActiveSubscriptions(ns1) = %d, want 2", got)
|
|
}
|
|
if got := mc.ActiveSubscriptions("ns2"); got != 1 {
|
|
t.Errorf("ActiveSubscriptions(ns2) = %d, want 1", got)
|
|
}
|
|
if got := mc.TotalActiveSubscriptions(); got != 3 {
|
|
t.Errorf("TotalActiveSubscriptions() = %d, want 3", got)
|
|
}
|
|
|
|
mc.RecordUnsubscribe("ns1")
|
|
|
|
if got := mc.ActiveSubscriptions("ns1"); got != 1 {
|
|
t.Errorf("ActiveSubscriptions(ns1) after unsubscribe = %d, want 1", got)
|
|
}
|
|
if got := mc.TotalActiveSubscriptions(); got != 2 {
|
|
t.Errorf("TotalActiveSubscriptions() after unsubscribe = %d, want 2", got)
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_Errors(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
mc.RecordPublishError("ns1")
|
|
mc.RecordPublishError("ns1")
|
|
mc.RecordSubscribeError("ns1")
|
|
mc.RecordDroppedEvent("ns1")
|
|
mc.RecordDroppedEvent("ns1")
|
|
mc.RecordDroppedEvent("ns1")
|
|
|
|
if got := mc.PublishErrors("ns1"); got != 2 {
|
|
t.Errorf("PublishErrors(ns1) = %d, want 2", got)
|
|
}
|
|
if got := mc.SubscribeErrors("ns1"); got != 1 {
|
|
t.Errorf("SubscribeErrors(ns1) = %d, want 1", got)
|
|
}
|
|
if got := mc.DroppedEvents("ns1"); got != 3 {
|
|
t.Errorf("DroppedEvents(ns1) = %d, want 3", got)
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_Namespaces(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
mc.RecordPublish("ns1")
|
|
mc.RecordReceive("ns2")
|
|
mc.RecordSubscribe("ns3")
|
|
|
|
namespaces := mc.Namespaces()
|
|
if len(namespaces) != 3 {
|
|
t.Errorf("Namespaces() length = %d, want 3", len(namespaces))
|
|
}
|
|
|
|
nsMap := make(map[string]bool)
|
|
for _, ns := range namespaces {
|
|
nsMap[ns] = true
|
|
}
|
|
|
|
for _, expected := range []string{"ns1", "ns2", "ns3"} {
|
|
if !nsMap[expected] {
|
|
t.Errorf("Namespaces() missing %q", expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_Reset(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
|
|
mc.RecordPublish("ns1")
|
|
mc.RecordReceive("ns1")
|
|
mc.RecordSubscribe("ns1")
|
|
|
|
mc.Reset()
|
|
|
|
if got := mc.EventsPublished("ns1"); got != 0 {
|
|
t.Errorf("EventsPublished() after reset = %d, want 0", got)
|
|
}
|
|
if got := len(mc.Namespaces()); got != 0 {
|
|
t.Errorf("Namespaces() after reset = %d, want 0", got)
|
|
}
|
|
}
|
|
|
|
func TestMetricsCollector_ConcurrentAccess(t *testing.T) {
|
|
mc := aether.NewMetricsCollector()
|
|
const goroutines = 10
|
|
const iterations = 100
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(goroutines)
|
|
|
|
for i := 0; i < goroutines; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < iterations; j++ {
|
|
mc.RecordPublish("concurrent-ns")
|
|
mc.RecordReceive("concurrent-ns")
|
|
mc.RecordSubscribe("concurrent-ns")
|
|
mc.RecordUnsubscribe("concurrent-ns")
|
|
mc.RecordPublishError("concurrent-ns")
|
|
mc.RecordSubscribeError("concurrent-ns")
|
|
mc.RecordDroppedEvent("concurrent-ns")
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
expected := int64(goroutines * iterations)
|
|
|
|
if got := mc.EventsPublished("concurrent-ns"); got != expected {
|
|
t.Errorf("EventsPublished() = %d, want %d", got, expected)
|
|
}
|
|
if got := mc.EventsReceived("concurrent-ns"); got != expected {
|
|
t.Errorf("EventsReceived() = %d, want %d", got, expected)
|
|
}
|
|
if got := mc.ActiveSubscriptions("concurrent-ns"); got != 0 {
|
|
t.Errorf("ActiveSubscriptions() = %d, want 0 (subscribed and unsubscribed same amount)", got)
|
|
}
|
|
if got := mc.PublishErrors("concurrent-ns"); got != expected {
|
|
t.Errorf("PublishErrors() = %d, want %d", got, expected)
|
|
}
|
|
if got := mc.SubscribeErrors("concurrent-ns"); got != expected {
|
|
t.Errorf("SubscribeErrors() = %d, want %d", got, expected)
|
|
}
|
|
if got := mc.DroppedEvents("concurrent-ns"); got != expected {
|
|
t.Errorf("DroppedEvents() = %d, want %d", got, expected)
|
|
}
|
|
}
|
|
|
|
func TestEventBus_Metrics(t *testing.T) {
|
|
eb := aether.NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
metrics := eb.Metrics()
|
|
if metrics == nil {
|
|
t.Fatal("Metrics() returned nil")
|
|
}
|
|
|
|
// Subscribe and verify metrics
|
|
ch := eb.Subscribe("test-ns")
|
|
if got := metrics.ActiveSubscriptions("test-ns"); got != 1 {
|
|
t.Errorf("ActiveSubscriptions() after subscribe = %d, want 1", got)
|
|
}
|
|
|
|
// Publish and verify metrics
|
|
event := &aether.Event{
|
|
ID: "test-1",
|
|
EventType: "TestEvent",
|
|
ActorID: "actor-1",
|
|
Version: 1,
|
|
}
|
|
eb.Publish("test-ns", event)
|
|
|
|
// Wait for event delivery
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("timeout waiting for event")
|
|
}
|
|
|
|
if got := metrics.EventsPublished("test-ns"); got != 1 {
|
|
t.Errorf("EventsPublished() after publish = %d, want 1", got)
|
|
}
|
|
if got := metrics.EventsReceived("test-ns"); got != 1 {
|
|
t.Errorf("EventsReceived() after publish = %d, want 1", got)
|
|
}
|
|
|
|
// Unsubscribe and verify metrics
|
|
eb.Unsubscribe("test-ns", ch)
|
|
if got := metrics.ActiveSubscriptions("test-ns"); got != 0 {
|
|
t.Errorf("ActiveSubscriptions() after unsubscribe = %d, want 0", got)
|
|
}
|
|
}
|
|
|
|
func TestEventBus_DroppedEvents(t *testing.T) {
|
|
eb := aether.NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
metrics := eb.Metrics()
|
|
|
|
// Subscribe but don't read from channel
|
|
_ = eb.Subscribe("test-ns")
|
|
|
|
// Fill the channel buffer (default is 100)
|
|
for i := 0; i < 100; i++ {
|
|
eb.Publish("test-ns", &aether.Event{
|
|
ID: "fill-" + string(rune(i)),
|
|
EventType: "FillEvent",
|
|
})
|
|
}
|
|
|
|
// Next publish should be dropped
|
|
eb.Publish("test-ns", &aether.Event{
|
|
ID: "dropped",
|
|
EventType: "DroppedEvent",
|
|
})
|
|
|
|
if got := metrics.DroppedEvents("test-ns"); got != 1 {
|
|
t.Errorf("DroppedEvents() = %d, want 1", got)
|
|
}
|
|
}
|
|
|
|
func TestEventBus_MetricsProvider(t *testing.T) {
|
|
eb := aether.NewEventBus()
|
|
defer eb.Stop()
|
|
|
|
// Verify EventBus implements MetricsProvider
|
|
var mp aether.MetricsProvider = eb
|
|
if mp.Metrics() == nil {
|
|
t.Error("EventBus.Metrics() returned nil")
|
|
}
|
|
}
|
|
|
|
func TestEventBus_StopClearsSubscriptionMetrics(t *testing.T) {
|
|
eb := aether.NewEventBus()
|
|
metrics := eb.Metrics()
|
|
|
|
_ = eb.Subscribe("ns1")
|
|
_ = eb.Subscribe("ns1")
|
|
_ = eb.Subscribe("ns2")
|
|
|
|
if got := metrics.TotalActiveSubscriptions(); got != 3 {
|
|
t.Errorf("TotalActiveSubscriptions() before stop = %d, want 3", got)
|
|
}
|
|
|
|
eb.Stop()
|
|
|
|
if got := metrics.TotalActiveSubscriptions(); got != 0 {
|
|
t.Errorf("TotalActiveSubscriptions() after stop = %d, want 0", got)
|
|
}
|
|
}
|