diff --git a/eventbus.go b/eventbus.go index 161bc1b..bf5db63 100644 --- a/eventbus.go +++ b/eventbus.go @@ -14,12 +14,20 @@ type EventBroadcaster interface { SubscriberCount(namespaceID string) int } +// MetricsProvider is an optional interface that EventBroadcaster implementations +// can implement to expose metrics. +type MetricsProvider interface { + // Metrics returns the metrics collector for this broadcaster. + Metrics() BroadcasterMetrics +} + // EventBus broadcasts events to multiple subscribers within a namespace type EventBus struct { subscribers map[string][]chan *Event // namespaceID -> channels mutex sync.RWMutex ctx context.Context cancel context.CancelFunc + metrics *DefaultMetricsCollector } // NewEventBus creates a new event bus @@ -29,9 +37,15 @@ func NewEventBus() *EventBus { subscribers: make(map[string][]chan *Event), ctx: ctx, cancel: cancel, + metrics: NewMetricsCollector(), } } +// Metrics returns the metrics collector for this event bus. +func (eb *EventBus) Metrics() BroadcasterMetrics { + return eb.metrics +} + // Subscribe creates a new subscription channel for a namespace func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { eb.mutex.Lock() @@ -41,6 +55,9 @@ func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event { ch := make(chan *Event, 100) eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch) + // Record subscription metric + eb.metrics.RecordSubscribe(namespaceID) + return ch } @@ -55,6 +72,9 @@ func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) { // Remove channel from slice eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...) close(subscriber) + + // Record unsubscription metric + eb.metrics.RecordUnsubscribe(namespaceID) break } } @@ -70,13 +90,18 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) { eb.mutex.RLock() defer eb.mutex.RUnlock() + // Record publish metric + eb.metrics.RecordPublish(namespaceID) + subscribers := eb.subscribers[namespaceID] for _, ch := range subscribers { select { case ch <- event: - // Event delivered + // Event delivered - record receive metric + eb.metrics.RecordReceive(namespaceID) default: // Channel full, skip this subscriber (non-blocking) + eb.metrics.RecordDroppedEvent(namespaceID) } } } @@ -88,8 +113,11 @@ func (eb *EventBus) Stop() { eb.cancel() - // Close all subscriber channels - for _, subs := range eb.subscribers { + // Close all subscriber channels and update metrics + for namespaceID, subs := range eb.subscribers { + for range subs { + eb.metrics.RecordUnsubscribe(namespaceID) + } for _, ch := range subs { close(ch) } diff --git a/go.mod b/go.mod index 1181588..8462de1 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,26 @@ module git.flowmade.one/flowmade-one/aether -go 1.23 +go 1.23.0 require ( github.com/google/uuid v1.6.0 github.com/nats-io/nats.go v1.37.0 + github.com/prometheus/client_golang v1.23.2 ) require ( - github.com/klauspost/compress v1.17.2 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect golang.org/x/crypto v0.18.0 // indirect - golang.org/x/sys v0.16.0 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect ) diff --git a/go.sum b/go.sum index 8697b29..fd52974 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,54 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..ca254d8 --- /dev/null +++ b/metrics.go @@ -0,0 +1,258 @@ +package aether + +import ( + "sync" + "sync/atomic" +) + +// BroadcasterMetrics provides observability metrics for EventBroadcaster implementations. +// All methods are safe for concurrent use. +type BroadcasterMetrics interface { + // EventsPublished returns the total number of events published per namespace. + EventsPublished(namespaceID string) int64 + + // EventsReceived returns the total number of events received per namespace. + // For EventBus this equals events delivered to subscribers. + // For NATSEventBus this includes events received from NATS. + EventsReceived(namespaceID string) int64 + + // ActiveSubscriptions returns the current number of active subscriptions per namespace. + ActiveSubscriptions(namespaceID string) int64 + + // TotalActiveSubscriptions returns the total number of active subscriptions across all namespaces. + TotalActiveSubscriptions() int64 + + // PublishErrors returns the total number of publish errors per namespace. + PublishErrors(namespaceID string) int64 + + // SubscribeErrors returns the total number of subscribe errors per namespace. + SubscribeErrors(namespaceID string) int64 + + // DroppedEvents returns the total number of events dropped (e.g., full channel) per namespace. + DroppedEvents(namespaceID string) int64 + + // Namespaces returns a list of all namespaces that have metrics. + Namespaces() []string + + // Reset resets all metrics. Useful for testing. + Reset() +} + +// MetricsCollector provides methods for collecting metrics. +// This interface is implemented internally and used by EventBus implementations. +type MetricsCollector interface { + BroadcasterMetrics + + // RecordPublish records a successful publish event. + RecordPublish(namespaceID string) + + // RecordReceive records a received event. + RecordReceive(namespaceID string) + + // RecordSubscribe records a new subscription. + RecordSubscribe(namespaceID string) + + // RecordUnsubscribe records a removed subscription. + RecordUnsubscribe(namespaceID string) + + // RecordPublishError records a publish error. + RecordPublishError(namespaceID string) + + // RecordSubscribeError records a subscribe error. + RecordSubscribeError(namespaceID string) + + // RecordDroppedEvent records a dropped event (e.g., channel full). + RecordDroppedEvent(namespaceID string) +} + +// namespaceMetrics holds counters for a single namespace. +type namespaceMetrics struct { + eventsPublished int64 + eventsReceived int64 + activeSubscriptions int64 + publishErrors int64 + subscribeErrors int64 + droppedEvents int64 +} + +// DefaultMetricsCollector is the default implementation of MetricsCollector. +// It uses atomic operations for thread-safe counter updates. +type DefaultMetricsCollector struct { + mu sync.RWMutex + namespaces map[string]*namespaceMetrics +} + +// NewMetricsCollector creates a new DefaultMetricsCollector. +func NewMetricsCollector() *DefaultMetricsCollector { + return &DefaultMetricsCollector{ + namespaces: make(map[string]*namespaceMetrics), + } +} + +// getOrCreateNamespace returns metrics for a namespace, creating if needed. +func (m *DefaultMetricsCollector) getOrCreateNamespace(namespaceID string) *namespaceMetrics { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if exists { + return ns + } + + m.mu.Lock() + defer m.mu.Unlock() + + // Double-check after acquiring write lock + if ns, exists = m.namespaces[namespaceID]; exists { + return ns + } + + ns = &namespaceMetrics{} + m.namespaces[namespaceID] = ns + return ns +} + +// EventsPublished returns the total number of events published for a namespace. +func (m *DefaultMetricsCollector) EventsPublished(namespaceID string) int64 { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if !exists { + return 0 + } + return atomic.LoadInt64(&ns.eventsPublished) +} + +// EventsReceived returns the total number of events received for a namespace. +func (m *DefaultMetricsCollector) EventsReceived(namespaceID string) int64 { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if !exists { + return 0 + } + return atomic.LoadInt64(&ns.eventsReceived) +} + +// ActiveSubscriptions returns the current number of active subscriptions for a namespace. +func (m *DefaultMetricsCollector) ActiveSubscriptions(namespaceID string) int64 { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if !exists { + return 0 + } + return atomic.LoadInt64(&ns.activeSubscriptions) +} + +// TotalActiveSubscriptions returns the total number of active subscriptions across all namespaces. +func (m *DefaultMetricsCollector) TotalActiveSubscriptions() int64 { + m.mu.RLock() + defer m.mu.RUnlock() + + var total int64 + for _, ns := range m.namespaces { + total += atomic.LoadInt64(&ns.activeSubscriptions) + } + return total +} + +// PublishErrors returns the total number of publish errors for a namespace. +func (m *DefaultMetricsCollector) PublishErrors(namespaceID string) int64 { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if !exists { + return 0 + } + return atomic.LoadInt64(&ns.publishErrors) +} + +// SubscribeErrors returns the total number of subscribe errors for a namespace. +func (m *DefaultMetricsCollector) SubscribeErrors(namespaceID string) int64 { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if !exists { + return 0 + } + return atomic.LoadInt64(&ns.subscribeErrors) +} + +// DroppedEvents returns the total number of dropped events for a namespace. +func (m *DefaultMetricsCollector) DroppedEvents(namespaceID string) int64 { + m.mu.RLock() + ns, exists := m.namespaces[namespaceID] + m.mu.RUnlock() + + if !exists { + return 0 + } + return atomic.LoadInt64(&ns.droppedEvents) +} + +// Namespaces returns a list of all namespaces that have metrics. +func (m *DefaultMetricsCollector) Namespaces() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + namespaces := make([]string, 0, len(m.namespaces)) + for ns := range m.namespaces { + namespaces = append(namespaces, ns) + } + return namespaces +} + +// Reset resets all metrics. +func (m *DefaultMetricsCollector) Reset() { + m.mu.Lock() + defer m.mu.Unlock() + m.namespaces = make(map[string]*namespaceMetrics) +} + +// RecordPublish records a successful publish event. +func (m *DefaultMetricsCollector) RecordPublish(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.eventsPublished, 1) +} + +// RecordReceive records a received event. +func (m *DefaultMetricsCollector) RecordReceive(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.eventsReceived, 1) +} + +// RecordSubscribe records a new subscription. +func (m *DefaultMetricsCollector) RecordSubscribe(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.activeSubscriptions, 1) +} + +// RecordUnsubscribe records a removed subscription. +func (m *DefaultMetricsCollector) RecordUnsubscribe(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.activeSubscriptions, -1) +} + +// RecordPublishError records a publish error. +func (m *DefaultMetricsCollector) RecordPublishError(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.publishErrors, 1) +} + +// RecordSubscribeError records a subscribe error. +func (m *DefaultMetricsCollector) RecordSubscribeError(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.subscribeErrors, 1) +} + +// RecordDroppedEvent records a dropped event. +func (m *DefaultMetricsCollector) RecordDroppedEvent(namespaceID string) { + ns := m.getOrCreateNamespace(namespaceID) + atomic.AddInt64(&ns.droppedEvents, 1) +} diff --git a/metrics_prometheus.go b/metrics_prometheus.go new file mode 100644 index 0000000..0d921e7 --- /dev/null +++ b/metrics_prometheus.go @@ -0,0 +1,123 @@ +package aether + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// PrometheusMetricsAdapter exposes BroadcasterMetrics as Prometheus metrics. +// It implements prometheus.Collector and can be registered with a Prometheus registry. +type PrometheusMetricsAdapter struct { + metrics BroadcasterMetrics + + eventsPublishedDesc *prometheus.Desc + eventsReceivedDesc *prometheus.Desc + activeSubscriptionsDesc *prometheus.Desc + publishErrorsDesc *prometheus.Desc + subscribeErrorsDesc *prometheus.Desc + droppedEventsDesc *prometheus.Desc +} + +// NewPrometheusMetricsAdapter creates a new PrometheusMetricsAdapter that wraps +// a BroadcasterMetrics implementation and exposes it as Prometheus metrics. +// +// The adapter implements prometheus.Collector and should be registered with +// a Prometheus registry: +// +// eb := aether.NewEventBus() +// adapter := aether.NewPrometheusMetricsAdapter(eb.Metrics()) +// prometheus.MustRegister(adapter) +func NewPrometheusMetricsAdapter(metrics BroadcasterMetrics) *PrometheusMetricsAdapter { + return &PrometheusMetricsAdapter{ + metrics: metrics, + eventsPublishedDesc: prometheus.NewDesc( + "aether_events_published_total", + "Total number of events published per namespace", + []string{"namespace"}, + nil, + ), + eventsReceivedDesc: prometheus.NewDesc( + "aether_events_received_total", + "Total number of events received per namespace", + []string{"namespace"}, + nil, + ), + activeSubscriptionsDesc: prometheus.NewDesc( + "aether_active_subscriptions", + "Number of active subscriptions per namespace", + []string{"namespace"}, + nil, + ), + publishErrorsDesc: prometheus.NewDesc( + "aether_publish_errors_total", + "Total number of publish errors per namespace", + []string{"namespace"}, + nil, + ), + subscribeErrorsDesc: prometheus.NewDesc( + "aether_subscribe_errors_total", + "Total number of subscribe errors per namespace", + []string{"namespace"}, + nil, + ), + droppedEventsDesc: prometheus.NewDesc( + "aether_dropped_events_total", + "Total number of dropped events per namespace", + []string{"namespace"}, + nil, + ), + } +} + +// Describe implements prometheus.Collector. +func (a *PrometheusMetricsAdapter) Describe(ch chan<- *prometheus.Desc) { + ch <- a.eventsPublishedDesc + ch <- a.eventsReceivedDesc + ch <- a.activeSubscriptionsDesc + ch <- a.publishErrorsDesc + ch <- a.subscribeErrorsDesc + ch <- a.droppedEventsDesc +} + +// Collect implements prometheus.Collector. +func (a *PrometheusMetricsAdapter) Collect(ch chan<- prometheus.Metric) { + namespaces := a.metrics.Namespaces() + + for _, ns := range namespaces { + ch <- prometheus.MustNewConstMetric( + a.eventsPublishedDesc, + prometheus.CounterValue, + float64(a.metrics.EventsPublished(ns)), + ns, + ) + ch <- prometheus.MustNewConstMetric( + a.eventsReceivedDesc, + prometheus.CounterValue, + float64(a.metrics.EventsReceived(ns)), + ns, + ) + ch <- prometheus.MustNewConstMetric( + a.activeSubscriptionsDesc, + prometheus.GaugeValue, + float64(a.metrics.ActiveSubscriptions(ns)), + ns, + ) + ch <- prometheus.MustNewConstMetric( + a.publishErrorsDesc, + prometheus.CounterValue, + float64(a.metrics.PublishErrors(ns)), + ns, + ) + ch <- prometheus.MustNewConstMetric( + a.subscribeErrorsDesc, + prometheus.CounterValue, + float64(a.metrics.SubscribeErrors(ns)), + ns, + ) + ch <- prometheus.MustNewConstMetric( + a.droppedEventsDesc, + prometheus.CounterValue, + float64(a.metrics.DroppedEvents(ns)), + ns, + ) + } +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..42c8366 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,304 @@ +package aether_test + +import ( + "sync" + "testing" + "time" + + "git.flowmade.one/flowmade-one/aether" +) + +func TestMetricsCollector_InitialState(t *testing.T) { + mc := aether.NewMetricsCollector() + + if got := mc.EventsPublished("test-ns"); got != 0 { + t.Errorf("EventsPublished() = %d, want 0", got) + } + if got := mc.EventsReceived("test-ns"); got != 0 { + t.Errorf("EventsReceived() = %d, want 0", got) + } + if got := mc.ActiveSubscriptions("test-ns"); got != 0 { + t.Errorf("ActiveSubscriptions() = %d, want 0", got) + } + if got := mc.TotalActiveSubscriptions(); got != 0 { + t.Errorf("TotalActiveSubscriptions() = %d, want 0", got) + } + if got := mc.PublishErrors("test-ns"); got != 0 { + t.Errorf("PublishErrors() = %d, want 0", got) + } + if got := mc.SubscribeErrors("test-ns"); got != 0 { + t.Errorf("SubscribeErrors() = %d, want 0", got) + } + if got := mc.DroppedEvents("test-ns"); got != 0 { + t.Errorf("DroppedEvents() = %d, want 0", got) + } + if got := len(mc.Namespaces()); got != 0 { + t.Errorf("Namespaces() = %d, want 0", got) + } +} + +func TestMetricsCollector_RecordPublish(t *testing.T) { + mc := aether.NewMetricsCollector() + + mc.RecordPublish("ns1") + mc.RecordPublish("ns1") + mc.RecordPublish("ns2") + + if got := mc.EventsPublished("ns1"); got != 2 { + t.Errorf("EventsPublished(ns1) = %d, want 2", got) + } + if got := mc.EventsPublished("ns2"); got != 1 { + t.Errorf("EventsPublished(ns2) = %d, want 1", got) + } +} + +func TestMetricsCollector_RecordReceive(t *testing.T) { + mc := aether.NewMetricsCollector() + + mc.RecordReceive("ns1") + mc.RecordReceive("ns1") + mc.RecordReceive("ns1") + + if got := mc.EventsReceived("ns1"); got != 3 { + t.Errorf("EventsReceived(ns1) = %d, want 3", got) + } +} + +func TestMetricsCollector_Subscriptions(t *testing.T) { + mc := aether.NewMetricsCollector() + + mc.RecordSubscribe("ns1") + mc.RecordSubscribe("ns1") + mc.RecordSubscribe("ns2") + + if got := mc.ActiveSubscriptions("ns1"); got != 2 { + t.Errorf("ActiveSubscriptions(ns1) = %d, want 2", got) + } + if got := mc.ActiveSubscriptions("ns2"); got != 1 { + t.Errorf("ActiveSubscriptions(ns2) = %d, want 1", got) + } + if got := mc.TotalActiveSubscriptions(); got != 3 { + t.Errorf("TotalActiveSubscriptions() = %d, want 3", got) + } + + mc.RecordUnsubscribe("ns1") + + if got := mc.ActiveSubscriptions("ns1"); got != 1 { + t.Errorf("ActiveSubscriptions(ns1) after unsubscribe = %d, want 1", got) + } + if got := mc.TotalActiveSubscriptions(); got != 2 { + t.Errorf("TotalActiveSubscriptions() after unsubscribe = %d, want 2", got) + } +} + +func TestMetricsCollector_Errors(t *testing.T) { + mc := aether.NewMetricsCollector() + + mc.RecordPublishError("ns1") + mc.RecordPublishError("ns1") + mc.RecordSubscribeError("ns1") + mc.RecordDroppedEvent("ns1") + mc.RecordDroppedEvent("ns1") + mc.RecordDroppedEvent("ns1") + + if got := mc.PublishErrors("ns1"); got != 2 { + t.Errorf("PublishErrors(ns1) = %d, want 2", got) + } + if got := mc.SubscribeErrors("ns1"); got != 1 { + t.Errorf("SubscribeErrors(ns1) = %d, want 1", got) + } + if got := mc.DroppedEvents("ns1"); got != 3 { + t.Errorf("DroppedEvents(ns1) = %d, want 3", got) + } +} + +func TestMetricsCollector_Namespaces(t *testing.T) { + mc := aether.NewMetricsCollector() + + mc.RecordPublish("ns1") + mc.RecordReceive("ns2") + mc.RecordSubscribe("ns3") + + namespaces := mc.Namespaces() + if len(namespaces) != 3 { + t.Errorf("Namespaces() length = %d, want 3", len(namespaces)) + } + + nsMap := make(map[string]bool) + for _, ns := range namespaces { + nsMap[ns] = true + } + + for _, expected := range []string{"ns1", "ns2", "ns3"} { + if !nsMap[expected] { + t.Errorf("Namespaces() missing %q", expected) + } + } +} + +func TestMetricsCollector_Reset(t *testing.T) { + mc := aether.NewMetricsCollector() + + mc.RecordPublish("ns1") + mc.RecordReceive("ns1") + mc.RecordSubscribe("ns1") + + mc.Reset() + + if got := mc.EventsPublished("ns1"); got != 0 { + t.Errorf("EventsPublished() after reset = %d, want 0", got) + } + if got := len(mc.Namespaces()); got != 0 { + t.Errorf("Namespaces() after reset = %d, want 0", got) + } +} + +func TestMetricsCollector_ConcurrentAccess(t *testing.T) { + mc := aether.NewMetricsCollector() + const goroutines = 10 + const iterations = 100 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + mc.RecordPublish("concurrent-ns") + mc.RecordReceive("concurrent-ns") + mc.RecordSubscribe("concurrent-ns") + mc.RecordUnsubscribe("concurrent-ns") + mc.RecordPublishError("concurrent-ns") + mc.RecordSubscribeError("concurrent-ns") + mc.RecordDroppedEvent("concurrent-ns") + } + }() + } + + wg.Wait() + + expected := int64(goroutines * iterations) + + if got := mc.EventsPublished("concurrent-ns"); got != expected { + t.Errorf("EventsPublished() = %d, want %d", got, expected) + } + if got := mc.EventsReceived("concurrent-ns"); got != expected { + t.Errorf("EventsReceived() = %d, want %d", got, expected) + } + if got := mc.ActiveSubscriptions("concurrent-ns"); got != 0 { + t.Errorf("ActiveSubscriptions() = %d, want 0 (subscribed and unsubscribed same amount)", got) + } + if got := mc.PublishErrors("concurrent-ns"); got != expected { + t.Errorf("PublishErrors() = %d, want %d", got, expected) + } + if got := mc.SubscribeErrors("concurrent-ns"); got != expected { + t.Errorf("SubscribeErrors() = %d, want %d", got, expected) + } + if got := mc.DroppedEvents("concurrent-ns"); got != expected { + t.Errorf("DroppedEvents() = %d, want %d", got, expected) + } +} + +func TestEventBus_Metrics(t *testing.T) { + eb := aether.NewEventBus() + defer eb.Stop() + + metrics := eb.Metrics() + if metrics == nil { + t.Fatal("Metrics() returned nil") + } + + // Subscribe and verify metrics + ch := eb.Subscribe("test-ns") + if got := metrics.ActiveSubscriptions("test-ns"); got != 1 { + t.Errorf("ActiveSubscriptions() after subscribe = %d, want 1", got) + } + + // Publish and verify metrics + event := &aether.Event{ + ID: "test-1", + EventType: "TestEvent", + ActorID: "actor-1", + Version: 1, + } + eb.Publish("test-ns", event) + + // Wait for event delivery + select { + case <-ch: + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for event") + } + + if got := metrics.EventsPublished("test-ns"); got != 1 { + t.Errorf("EventsPublished() after publish = %d, want 1", got) + } + if got := metrics.EventsReceived("test-ns"); got != 1 { + t.Errorf("EventsReceived() after publish = %d, want 1", got) + } + + // Unsubscribe and verify metrics + eb.Unsubscribe("test-ns", ch) + if got := metrics.ActiveSubscriptions("test-ns"); got != 0 { + t.Errorf("ActiveSubscriptions() after unsubscribe = %d, want 0", got) + } +} + +func TestEventBus_DroppedEvents(t *testing.T) { + eb := aether.NewEventBus() + defer eb.Stop() + + metrics := eb.Metrics() + + // Subscribe but don't read from channel + _ = eb.Subscribe("test-ns") + + // Fill the channel buffer (default is 100) + for i := 0; i < 100; i++ { + eb.Publish("test-ns", &aether.Event{ + ID: "fill-" + string(rune(i)), + EventType: "FillEvent", + }) + } + + // Next publish should be dropped + eb.Publish("test-ns", &aether.Event{ + ID: "dropped", + EventType: "DroppedEvent", + }) + + if got := metrics.DroppedEvents("test-ns"); got != 1 { + t.Errorf("DroppedEvents() = %d, want 1", got) + } +} + +func TestEventBus_MetricsProvider(t *testing.T) { + eb := aether.NewEventBus() + defer eb.Stop() + + // Verify EventBus implements MetricsProvider + var mp aether.MetricsProvider = eb + if mp.Metrics() == nil { + t.Error("EventBus.Metrics() returned nil") + } +} + +func TestEventBus_StopClearsSubscriptionMetrics(t *testing.T) { + eb := aether.NewEventBus() + metrics := eb.Metrics() + + _ = eb.Subscribe("ns1") + _ = eb.Subscribe("ns1") + _ = eb.Subscribe("ns2") + + if got := metrics.TotalActiveSubscriptions(); got != 3 { + t.Errorf("TotalActiveSubscriptions() before stop = %d, want 3", got) + } + + eb.Stop() + + if got := metrics.TotalActiveSubscriptions(); got != 0 { + t.Errorf("TotalActiveSubscriptions() after stop = %d, want 0", got) + } +} diff --git a/nats_eventbus.go b/nats_eventbus.go index a13653e..c5b1e49 100644 --- a/nats_eventbus.go +++ b/nats_eventbus.go @@ -66,6 +66,8 @@ func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event { }) if err != nil { log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err) + // Record subscription error + neb.metrics.RecordSubscribeError(namespaceID) } else { neb.subscriptions = append(neb.subscriptions, sub) log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject) @@ -109,13 +111,16 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) { return } + // Record receive from NATS (cross-node event) + neb.metrics.RecordReceive(eventMsg.NamespaceID) + // Forward to local EventBus subscribers neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event) } // Publish publishes an event both locally and to NATS for cross-node broadcasting func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { - // First publish locally + // First publish locally (this also records metrics) neb.EventBus.Publish(namespaceID, event) // Then publish to NATS for other nodes @@ -130,11 +135,13 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) { data, err := json.Marshal(eventMsg) if err != nil { log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err) + neb.metrics.RecordPublishError(namespaceID) return } if err := neb.nc.Publish(subject, data); err != nil { log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err) + neb.metrics.RecordPublishError(namespaceID) return } }