[Issue #22] Add EventBroadcaster metrics (#49)
All checks were successful
CI / build (push) Successful in 19s
All checks were successful
CI / build (push) Successful in 19s
This commit was merged in pull request #49.
This commit is contained in:
36
eventbus.go
36
eventbus.go
@@ -24,6 +24,13 @@ type EventBroadcaster interface {
|
|||||||
SubscriberCount(namespaceID string) int
|
SubscriberCount(namespaceID string) int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MetricsProvider is an optional interface that EventBroadcaster implementations
|
||||||
|
// can implement to expose metrics.
|
||||||
|
type MetricsProvider interface {
|
||||||
|
// Metrics returns the metrics collector for this broadcaster.
|
||||||
|
Metrics() BroadcasterMetrics
|
||||||
|
}
|
||||||
|
|
||||||
// subscription represents a single subscriber channel with its pattern
|
// subscription represents a single subscriber channel with its pattern
|
||||||
type subscription struct {
|
type subscription struct {
|
||||||
pattern string
|
pattern string
|
||||||
@@ -45,6 +52,7 @@ type EventBus struct {
|
|||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
metrics *DefaultMetricsCollector
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventBus creates a new event bus
|
// NewEventBus creates a new event bus
|
||||||
@@ -55,9 +63,15 @@ func NewEventBus() *EventBus {
|
|||||||
wildcardSubscribers: make([]subscription, 0),
|
wildcardSubscribers: make([]subscription, 0),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
metrics: NewMetricsCollector(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Metrics returns the metrics collector for this event bus.
|
||||||
|
func (eb *EventBus) Metrics() BroadcasterMetrics {
|
||||||
|
return eb.metrics
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe creates a new subscription channel for a namespace pattern.
|
// Subscribe creates a new subscription channel for a namespace pattern.
|
||||||
// Patterns follow NATS subject matching conventions:
|
// Patterns follow NATS subject matching conventions:
|
||||||
// - "*" matches a single token (any sequence without ".")
|
// - "*" matches a single token (any sequence without ".")
|
||||||
@@ -84,6 +98,9 @@ func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event {
|
|||||||
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch)
|
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record subscription metric
|
||||||
|
eb.metrics.RecordSubscribe(namespacePattern)
|
||||||
|
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,6 +115,8 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
|||||||
if sub.ch == ch {
|
if sub.ch == ch {
|
||||||
eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...)
|
eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...)
|
||||||
close(sub.ch)
|
close(sub.ch)
|
||||||
|
// Record unsubscription metric
|
||||||
|
eb.metrics.RecordUnsubscribe(namespacePattern)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -109,6 +128,8 @@ func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
|
|||||||
// Remove channel from slice
|
// Remove channel from slice
|
||||||
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
|
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
|
||||||
close(subscriber)
|
close(subscriber)
|
||||||
|
// Record unsubscription metric
|
||||||
|
eb.metrics.RecordUnsubscribe(namespacePattern)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -128,14 +149,19 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
|||||||
eb.mutex.RLock()
|
eb.mutex.RLock()
|
||||||
defer eb.mutex.RUnlock()
|
defer eb.mutex.RUnlock()
|
||||||
|
|
||||||
|
// Record publish metric
|
||||||
|
eb.metrics.RecordPublish(namespaceID)
|
||||||
|
|
||||||
// Deliver to exact subscribers
|
// Deliver to exact subscribers
|
||||||
subscribers := eb.exactSubscribers[namespaceID]
|
subscribers := eb.exactSubscribers[namespaceID]
|
||||||
for _, ch := range subscribers {
|
for _, ch := range subscribers {
|
||||||
select {
|
select {
|
||||||
case ch <- event:
|
case ch <- event:
|
||||||
// Event delivered
|
// Event delivered
|
||||||
|
eb.metrics.RecordReceive(namespaceID)
|
||||||
default:
|
default:
|
||||||
// Channel full, skip this subscriber (non-blocking)
|
// Channel full, skip this subscriber (non-blocking)
|
||||||
|
eb.metrics.RecordDroppedEvent(namespaceID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,8 +171,10 @@ func (eb *EventBus) Publish(namespaceID string, event *Event) {
|
|||||||
select {
|
select {
|
||||||
case sub.ch <- event:
|
case sub.ch <- event:
|
||||||
// Event delivered
|
// Event delivered
|
||||||
|
eb.metrics.RecordReceive(namespaceID)
|
||||||
default:
|
default:
|
||||||
// Channel full, skip this subscriber (non-blocking)
|
// Channel full, skip this subscriber (non-blocking)
|
||||||
|
eb.metrics.RecordDroppedEvent(namespaceID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -159,16 +187,18 @@ func (eb *EventBus) Stop() {
|
|||||||
|
|
||||||
eb.cancel()
|
eb.cancel()
|
||||||
|
|
||||||
// Close all exact subscriber channels
|
// Close all exact subscriber channels and update metrics
|
||||||
for _, subs := range eb.exactSubscribers {
|
for namespaceID, subs := range eb.exactSubscribers {
|
||||||
for _, ch := range subs {
|
for _, ch := range subs {
|
||||||
close(ch)
|
close(ch)
|
||||||
|
eb.metrics.RecordUnsubscribe(namespaceID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all wildcard subscriber channels
|
// Close all wildcard subscriber channels and update metrics
|
||||||
for _, sub := range eb.wildcardSubscribers {
|
for _, sub := range eb.wildcardSubscribers {
|
||||||
close(sub.ch)
|
close(sub.ch)
|
||||||
|
eb.metrics.RecordUnsubscribe(sub.pattern)
|
||||||
}
|
}
|
||||||
|
|
||||||
eb.exactSubscribers = make(map[string][]chan *Event)
|
eb.exactSubscribers = make(map[string][]chan *Event)
|
||||||
|
|||||||
16
go.mod
16
go.mod
@@ -1,16 +1,26 @@
|
|||||||
module git.flowmade.one/flowmade-one/aether
|
module git.flowmade.one/flowmade-one/aether
|
||||||
|
|
||||||
go 1.23
|
go 1.23.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/nats-io/nats.go v1.37.0
|
github.com/nats-io/nats.go v1.37.0
|
||||||
|
github.com/prometheus/client_golang v1.23.2
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||||
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
|
github.com/kr/text v0.2.0 // indirect
|
||||||
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
github.com/prometheus/client_model v0.6.2 // indirect
|
||||||
|
github.com/prometheus/common v0.66.1 // indirect
|
||||||
|
github.com/prometheus/procfs v0.16.1 // indirect
|
||||||
|
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||||
golang.org/x/crypto v0.18.0 // indirect
|
golang.org/x/crypto v0.18.0 // indirect
|
||||||
golang.org/x/sys v0.16.0 // indirect
|
golang.org/x/sys v0.35.0 // indirect
|
||||||
|
google.golang.org/protobuf v1.36.8 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
48
go.sum
48
go.sum
@@ -1,14 +1,54 @@
|
|||||||
|
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||||
|
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||||
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
|
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||||
|
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||||
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
|
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||||
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
|
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||||
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||||
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
||||||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
|
||||||
|
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
|
||||||
|
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
|
||||||
|
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
|
||||||
|
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
|
||||||
|
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
|
||||||
|
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
|
||||||
|
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
|
||||||
|
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||||
|
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
|
||||||
|
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||||
|
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
|
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
|
||||||
|
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||||
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
|
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
|
||||||
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
|
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
|
||||||
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
|
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||||
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
|
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
|
||||||
|
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|||||||
258
metrics.go
Normal file
258
metrics.go
Normal file
@@ -0,0 +1,258 @@
|
|||||||
|
package aether
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BroadcasterMetrics provides observability metrics for EventBroadcaster implementations.
|
||||||
|
// All methods are safe for concurrent use.
|
||||||
|
type BroadcasterMetrics interface {
|
||||||
|
// EventsPublished returns the total number of events published per namespace.
|
||||||
|
EventsPublished(namespaceID string) int64
|
||||||
|
|
||||||
|
// EventsReceived returns the total number of events received per namespace.
|
||||||
|
// For EventBus this equals events delivered to subscribers.
|
||||||
|
// For NATSEventBus this includes events received from NATS.
|
||||||
|
EventsReceived(namespaceID string) int64
|
||||||
|
|
||||||
|
// ActiveSubscriptions returns the current number of active subscriptions per namespace.
|
||||||
|
ActiveSubscriptions(namespaceID string) int64
|
||||||
|
|
||||||
|
// TotalActiveSubscriptions returns the total number of active subscriptions across all namespaces.
|
||||||
|
TotalActiveSubscriptions() int64
|
||||||
|
|
||||||
|
// PublishErrors returns the total number of publish errors per namespace.
|
||||||
|
PublishErrors(namespaceID string) int64
|
||||||
|
|
||||||
|
// SubscribeErrors returns the total number of subscribe errors per namespace.
|
||||||
|
SubscribeErrors(namespaceID string) int64
|
||||||
|
|
||||||
|
// DroppedEvents returns the total number of events dropped (e.g., full channel) per namespace.
|
||||||
|
DroppedEvents(namespaceID string) int64
|
||||||
|
|
||||||
|
// Namespaces returns a list of all namespaces that have metrics.
|
||||||
|
Namespaces() []string
|
||||||
|
|
||||||
|
// Reset resets all metrics. Useful for testing.
|
||||||
|
Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricsCollector provides methods for collecting metrics.
|
||||||
|
// This interface is implemented internally and used by EventBus implementations.
|
||||||
|
type MetricsCollector interface {
|
||||||
|
BroadcasterMetrics
|
||||||
|
|
||||||
|
// RecordPublish records a successful publish event.
|
||||||
|
RecordPublish(namespaceID string)
|
||||||
|
|
||||||
|
// RecordReceive records a received event.
|
||||||
|
RecordReceive(namespaceID string)
|
||||||
|
|
||||||
|
// RecordSubscribe records a new subscription.
|
||||||
|
RecordSubscribe(namespaceID string)
|
||||||
|
|
||||||
|
// RecordUnsubscribe records a removed subscription.
|
||||||
|
RecordUnsubscribe(namespaceID string)
|
||||||
|
|
||||||
|
// RecordPublishError records a publish error.
|
||||||
|
RecordPublishError(namespaceID string)
|
||||||
|
|
||||||
|
// RecordSubscribeError records a subscribe error.
|
||||||
|
RecordSubscribeError(namespaceID string)
|
||||||
|
|
||||||
|
// RecordDroppedEvent records a dropped event (e.g., channel full).
|
||||||
|
RecordDroppedEvent(namespaceID string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// namespaceMetrics holds counters for a single namespace.
|
||||||
|
type namespaceMetrics struct {
|
||||||
|
eventsPublished int64
|
||||||
|
eventsReceived int64
|
||||||
|
activeSubscriptions int64
|
||||||
|
publishErrors int64
|
||||||
|
subscribeErrors int64
|
||||||
|
droppedEvents int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultMetricsCollector is the default implementation of MetricsCollector.
|
||||||
|
// It uses atomic operations for thread-safe counter updates.
|
||||||
|
type DefaultMetricsCollector struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
namespaces map[string]*namespaceMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMetricsCollector creates a new DefaultMetricsCollector.
|
||||||
|
func NewMetricsCollector() *DefaultMetricsCollector {
|
||||||
|
return &DefaultMetricsCollector{
|
||||||
|
namespaces: make(map[string]*namespaceMetrics),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getOrCreateNamespace returns metrics for a namespace, creating if needed.
|
||||||
|
func (m *DefaultMetricsCollector) getOrCreateNamespace(namespaceID string) *namespaceMetrics {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
return ns
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
// Double-check after acquiring write lock
|
||||||
|
if ns, exists = m.namespaces[namespaceID]; exists {
|
||||||
|
return ns
|
||||||
|
}
|
||||||
|
|
||||||
|
ns = &namespaceMetrics{}
|
||||||
|
m.namespaces[namespaceID] = ns
|
||||||
|
return ns
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventsPublished returns the total number of events published for a namespace.
|
||||||
|
func (m *DefaultMetricsCollector) EventsPublished(namespaceID string) int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return atomic.LoadInt64(&ns.eventsPublished)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EventsReceived returns the total number of events received for a namespace.
|
||||||
|
func (m *DefaultMetricsCollector) EventsReceived(namespaceID string) int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return atomic.LoadInt64(&ns.eventsReceived)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ActiveSubscriptions returns the current number of active subscriptions for a namespace.
|
||||||
|
func (m *DefaultMetricsCollector) ActiveSubscriptions(namespaceID string) int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return atomic.LoadInt64(&ns.activeSubscriptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TotalActiveSubscriptions returns the total number of active subscriptions across all namespaces.
|
||||||
|
func (m *DefaultMetricsCollector) TotalActiveSubscriptions() int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
var total int64
|
||||||
|
for _, ns := range m.namespaces {
|
||||||
|
total += atomic.LoadInt64(&ns.activeSubscriptions)
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishErrors returns the total number of publish errors for a namespace.
|
||||||
|
func (m *DefaultMetricsCollector) PublishErrors(namespaceID string) int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return atomic.LoadInt64(&ns.publishErrors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeErrors returns the total number of subscribe errors for a namespace.
|
||||||
|
func (m *DefaultMetricsCollector) SubscribeErrors(namespaceID string) int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return atomic.LoadInt64(&ns.subscribeErrors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DroppedEvents returns the total number of dropped events for a namespace.
|
||||||
|
func (m *DefaultMetricsCollector) DroppedEvents(namespaceID string) int64 {
|
||||||
|
m.mu.RLock()
|
||||||
|
ns, exists := m.namespaces[namespaceID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return atomic.LoadInt64(&ns.droppedEvents)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Namespaces returns a list of all namespaces that have metrics.
|
||||||
|
func (m *DefaultMetricsCollector) Namespaces() []string {
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
|
||||||
|
namespaces := make([]string, 0, len(m.namespaces))
|
||||||
|
for ns := range m.namespaces {
|
||||||
|
namespaces = append(namespaces, ns)
|
||||||
|
}
|
||||||
|
return namespaces
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset resets all metrics.
|
||||||
|
func (m *DefaultMetricsCollector) Reset() {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.namespaces = make(map[string]*namespaceMetrics)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordPublish records a successful publish event.
|
||||||
|
func (m *DefaultMetricsCollector) RecordPublish(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.eventsPublished, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordReceive records a received event.
|
||||||
|
func (m *DefaultMetricsCollector) RecordReceive(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.eventsReceived, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSubscribe records a new subscription.
|
||||||
|
func (m *DefaultMetricsCollector) RecordSubscribe(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.activeSubscriptions, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordUnsubscribe records a removed subscription.
|
||||||
|
func (m *DefaultMetricsCollector) RecordUnsubscribe(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.activeSubscriptions, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordPublishError records a publish error.
|
||||||
|
func (m *DefaultMetricsCollector) RecordPublishError(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.publishErrors, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSubscribeError records a subscribe error.
|
||||||
|
func (m *DefaultMetricsCollector) RecordSubscribeError(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.subscribeErrors, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordDroppedEvent records a dropped event.
|
||||||
|
func (m *DefaultMetricsCollector) RecordDroppedEvent(namespaceID string) {
|
||||||
|
ns := m.getOrCreateNamespace(namespaceID)
|
||||||
|
atomic.AddInt64(&ns.droppedEvents, 1)
|
||||||
|
}
|
||||||
123
metrics_prometheus.go
Normal file
123
metrics_prometheus.go
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
package aether
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PrometheusMetricsAdapter exposes BroadcasterMetrics as Prometheus metrics.
|
||||||
|
// It implements prometheus.Collector and can be registered with a Prometheus registry.
|
||||||
|
type PrometheusMetricsAdapter struct {
|
||||||
|
metrics BroadcasterMetrics
|
||||||
|
|
||||||
|
eventsPublishedDesc *prometheus.Desc
|
||||||
|
eventsReceivedDesc *prometheus.Desc
|
||||||
|
activeSubscriptionsDesc *prometheus.Desc
|
||||||
|
publishErrorsDesc *prometheus.Desc
|
||||||
|
subscribeErrorsDesc *prometheus.Desc
|
||||||
|
droppedEventsDesc *prometheus.Desc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPrometheusMetricsAdapter creates a new PrometheusMetricsAdapter that wraps
|
||||||
|
// a BroadcasterMetrics implementation and exposes it as Prometheus metrics.
|
||||||
|
//
|
||||||
|
// The adapter implements prometheus.Collector and should be registered with
|
||||||
|
// a Prometheus registry:
|
||||||
|
//
|
||||||
|
// eb := aether.NewEventBus()
|
||||||
|
// adapter := aether.NewPrometheusMetricsAdapter(eb.Metrics())
|
||||||
|
// prometheus.MustRegister(adapter)
|
||||||
|
func NewPrometheusMetricsAdapter(metrics BroadcasterMetrics) *PrometheusMetricsAdapter {
|
||||||
|
return &PrometheusMetricsAdapter{
|
||||||
|
metrics: metrics,
|
||||||
|
eventsPublishedDesc: prometheus.NewDesc(
|
||||||
|
"aether_events_published_total",
|
||||||
|
"Total number of events published per namespace",
|
||||||
|
[]string{"namespace"},
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
eventsReceivedDesc: prometheus.NewDesc(
|
||||||
|
"aether_events_received_total",
|
||||||
|
"Total number of events received per namespace",
|
||||||
|
[]string{"namespace"},
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
activeSubscriptionsDesc: prometheus.NewDesc(
|
||||||
|
"aether_active_subscriptions",
|
||||||
|
"Number of active subscriptions per namespace",
|
||||||
|
[]string{"namespace"},
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
publishErrorsDesc: prometheus.NewDesc(
|
||||||
|
"aether_publish_errors_total",
|
||||||
|
"Total number of publish errors per namespace",
|
||||||
|
[]string{"namespace"},
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
subscribeErrorsDesc: prometheus.NewDesc(
|
||||||
|
"aether_subscribe_errors_total",
|
||||||
|
"Total number of subscribe errors per namespace",
|
||||||
|
[]string{"namespace"},
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
droppedEventsDesc: prometheus.NewDesc(
|
||||||
|
"aether_dropped_events_total",
|
||||||
|
"Total number of dropped events per namespace",
|
||||||
|
[]string{"namespace"},
|
||||||
|
nil,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Describe implements prometheus.Collector.
|
||||||
|
func (a *PrometheusMetricsAdapter) Describe(ch chan<- *prometheus.Desc) {
|
||||||
|
ch <- a.eventsPublishedDesc
|
||||||
|
ch <- a.eventsReceivedDesc
|
||||||
|
ch <- a.activeSubscriptionsDesc
|
||||||
|
ch <- a.publishErrorsDesc
|
||||||
|
ch <- a.subscribeErrorsDesc
|
||||||
|
ch <- a.droppedEventsDesc
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect implements prometheus.Collector.
|
||||||
|
func (a *PrometheusMetricsAdapter) Collect(ch chan<- prometheus.Metric) {
|
||||||
|
namespaces := a.metrics.Namespaces()
|
||||||
|
|
||||||
|
for _, ns := range namespaces {
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
a.eventsPublishedDesc,
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(a.metrics.EventsPublished(ns)),
|
||||||
|
ns,
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
a.eventsReceivedDesc,
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(a.metrics.EventsReceived(ns)),
|
||||||
|
ns,
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
a.activeSubscriptionsDesc,
|
||||||
|
prometheus.GaugeValue,
|
||||||
|
float64(a.metrics.ActiveSubscriptions(ns)),
|
||||||
|
ns,
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
a.publishErrorsDesc,
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(a.metrics.PublishErrors(ns)),
|
||||||
|
ns,
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
a.subscribeErrorsDesc,
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(a.metrics.SubscribeErrors(ns)),
|
||||||
|
ns,
|
||||||
|
)
|
||||||
|
ch <- prometheus.MustNewConstMetric(
|
||||||
|
a.droppedEventsDesc,
|
||||||
|
prometheus.CounterValue,
|
||||||
|
float64(a.metrics.DroppedEvents(ns)),
|
||||||
|
ns,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
304
metrics_test.go
Normal file
304
metrics_test.go
Normal file
@@ -0,0 +1,304 @@
|
|||||||
|
package aether_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMetricsCollector_InitialState(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
if got := mc.EventsPublished("test-ns"); got != 0 {
|
||||||
|
t.Errorf("EventsPublished() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := mc.EventsReceived("test-ns"); got != 0 {
|
||||||
|
t.Errorf("EventsReceived() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := mc.ActiveSubscriptions("test-ns"); got != 0 {
|
||||||
|
t.Errorf("ActiveSubscriptions() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := mc.TotalActiveSubscriptions(); got != 0 {
|
||||||
|
t.Errorf("TotalActiveSubscriptions() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := mc.PublishErrors("test-ns"); got != 0 {
|
||||||
|
t.Errorf("PublishErrors() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := mc.SubscribeErrors("test-ns"); got != 0 {
|
||||||
|
t.Errorf("SubscribeErrors() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := mc.DroppedEvents("test-ns"); got != 0 {
|
||||||
|
t.Errorf("DroppedEvents() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := len(mc.Namespaces()); got != 0 {
|
||||||
|
t.Errorf("Namespaces() = %d, want 0", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_RecordPublish(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
mc.RecordPublish("ns1")
|
||||||
|
mc.RecordPublish("ns1")
|
||||||
|
mc.RecordPublish("ns2")
|
||||||
|
|
||||||
|
if got := mc.EventsPublished("ns1"); got != 2 {
|
||||||
|
t.Errorf("EventsPublished(ns1) = %d, want 2", got)
|
||||||
|
}
|
||||||
|
if got := mc.EventsPublished("ns2"); got != 1 {
|
||||||
|
t.Errorf("EventsPublished(ns2) = %d, want 1", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_RecordReceive(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
mc.RecordReceive("ns1")
|
||||||
|
mc.RecordReceive("ns1")
|
||||||
|
mc.RecordReceive("ns1")
|
||||||
|
|
||||||
|
if got := mc.EventsReceived("ns1"); got != 3 {
|
||||||
|
t.Errorf("EventsReceived(ns1) = %d, want 3", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_Subscriptions(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
mc.RecordSubscribe("ns1")
|
||||||
|
mc.RecordSubscribe("ns1")
|
||||||
|
mc.RecordSubscribe("ns2")
|
||||||
|
|
||||||
|
if got := mc.ActiveSubscriptions("ns1"); got != 2 {
|
||||||
|
t.Errorf("ActiveSubscriptions(ns1) = %d, want 2", got)
|
||||||
|
}
|
||||||
|
if got := mc.ActiveSubscriptions("ns2"); got != 1 {
|
||||||
|
t.Errorf("ActiveSubscriptions(ns2) = %d, want 1", got)
|
||||||
|
}
|
||||||
|
if got := mc.TotalActiveSubscriptions(); got != 3 {
|
||||||
|
t.Errorf("TotalActiveSubscriptions() = %d, want 3", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
mc.RecordUnsubscribe("ns1")
|
||||||
|
|
||||||
|
if got := mc.ActiveSubscriptions("ns1"); got != 1 {
|
||||||
|
t.Errorf("ActiveSubscriptions(ns1) after unsubscribe = %d, want 1", got)
|
||||||
|
}
|
||||||
|
if got := mc.TotalActiveSubscriptions(); got != 2 {
|
||||||
|
t.Errorf("TotalActiveSubscriptions() after unsubscribe = %d, want 2", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_Errors(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
mc.RecordPublishError("ns1")
|
||||||
|
mc.RecordPublishError("ns1")
|
||||||
|
mc.RecordSubscribeError("ns1")
|
||||||
|
mc.RecordDroppedEvent("ns1")
|
||||||
|
mc.RecordDroppedEvent("ns1")
|
||||||
|
mc.RecordDroppedEvent("ns1")
|
||||||
|
|
||||||
|
if got := mc.PublishErrors("ns1"); got != 2 {
|
||||||
|
t.Errorf("PublishErrors(ns1) = %d, want 2", got)
|
||||||
|
}
|
||||||
|
if got := mc.SubscribeErrors("ns1"); got != 1 {
|
||||||
|
t.Errorf("SubscribeErrors(ns1) = %d, want 1", got)
|
||||||
|
}
|
||||||
|
if got := mc.DroppedEvents("ns1"); got != 3 {
|
||||||
|
t.Errorf("DroppedEvents(ns1) = %d, want 3", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_Namespaces(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
mc.RecordPublish("ns1")
|
||||||
|
mc.RecordReceive("ns2")
|
||||||
|
mc.RecordSubscribe("ns3")
|
||||||
|
|
||||||
|
namespaces := mc.Namespaces()
|
||||||
|
if len(namespaces) != 3 {
|
||||||
|
t.Errorf("Namespaces() length = %d, want 3", len(namespaces))
|
||||||
|
}
|
||||||
|
|
||||||
|
nsMap := make(map[string]bool)
|
||||||
|
for _, ns := range namespaces {
|
||||||
|
nsMap[ns] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, expected := range []string{"ns1", "ns2", "ns3"} {
|
||||||
|
if !nsMap[expected] {
|
||||||
|
t.Errorf("Namespaces() missing %q", expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_Reset(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
|
||||||
|
mc.RecordPublish("ns1")
|
||||||
|
mc.RecordReceive("ns1")
|
||||||
|
mc.RecordSubscribe("ns1")
|
||||||
|
|
||||||
|
mc.Reset()
|
||||||
|
|
||||||
|
if got := mc.EventsPublished("ns1"); got != 0 {
|
||||||
|
t.Errorf("EventsPublished() after reset = %d, want 0", got)
|
||||||
|
}
|
||||||
|
if got := len(mc.Namespaces()); got != 0 {
|
||||||
|
t.Errorf("Namespaces() after reset = %d, want 0", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCollector_ConcurrentAccess(t *testing.T) {
|
||||||
|
mc := aether.NewMetricsCollector()
|
||||||
|
const goroutines = 10
|
||||||
|
const iterations = 100
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(goroutines)
|
||||||
|
|
||||||
|
for i := 0; i < goroutines; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < iterations; j++ {
|
||||||
|
mc.RecordPublish("concurrent-ns")
|
||||||
|
mc.RecordReceive("concurrent-ns")
|
||||||
|
mc.RecordSubscribe("concurrent-ns")
|
||||||
|
mc.RecordUnsubscribe("concurrent-ns")
|
||||||
|
mc.RecordPublishError("concurrent-ns")
|
||||||
|
mc.RecordSubscribeError("concurrent-ns")
|
||||||
|
mc.RecordDroppedEvent("concurrent-ns")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
expected := int64(goroutines * iterations)
|
||||||
|
|
||||||
|
if got := mc.EventsPublished("concurrent-ns"); got != expected {
|
||||||
|
t.Errorf("EventsPublished() = %d, want %d", got, expected)
|
||||||
|
}
|
||||||
|
if got := mc.EventsReceived("concurrent-ns"); got != expected {
|
||||||
|
t.Errorf("EventsReceived() = %d, want %d", got, expected)
|
||||||
|
}
|
||||||
|
if got := mc.ActiveSubscriptions("concurrent-ns"); got != 0 {
|
||||||
|
t.Errorf("ActiveSubscriptions() = %d, want 0 (subscribed and unsubscribed same amount)", got)
|
||||||
|
}
|
||||||
|
if got := mc.PublishErrors("concurrent-ns"); got != expected {
|
||||||
|
t.Errorf("PublishErrors() = %d, want %d", got, expected)
|
||||||
|
}
|
||||||
|
if got := mc.SubscribeErrors("concurrent-ns"); got != expected {
|
||||||
|
t.Errorf("SubscribeErrors() = %d, want %d", got, expected)
|
||||||
|
}
|
||||||
|
if got := mc.DroppedEvents("concurrent-ns"); got != expected {
|
||||||
|
t.Errorf("DroppedEvents() = %d, want %d", got, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_Metrics(t *testing.T) {
|
||||||
|
eb := aether.NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
metrics := eb.Metrics()
|
||||||
|
if metrics == nil {
|
||||||
|
t.Fatal("Metrics() returned nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe and verify metrics
|
||||||
|
ch := eb.Subscribe("test-ns")
|
||||||
|
if got := metrics.ActiveSubscriptions("test-ns"); got != 1 {
|
||||||
|
t.Errorf("ActiveSubscriptions() after subscribe = %d, want 1", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish and verify metrics
|
||||||
|
event := &aether.Event{
|
||||||
|
ID: "test-1",
|
||||||
|
EventType: "TestEvent",
|
||||||
|
ActorID: "actor-1",
|
||||||
|
Version: 1,
|
||||||
|
}
|
||||||
|
eb.Publish("test-ns", event)
|
||||||
|
|
||||||
|
// Wait for event delivery
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("timeout waiting for event")
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := metrics.EventsPublished("test-ns"); got != 1 {
|
||||||
|
t.Errorf("EventsPublished() after publish = %d, want 1", got)
|
||||||
|
}
|
||||||
|
if got := metrics.EventsReceived("test-ns"); got != 1 {
|
||||||
|
t.Errorf("EventsReceived() after publish = %d, want 1", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe and verify metrics
|
||||||
|
eb.Unsubscribe("test-ns", ch)
|
||||||
|
if got := metrics.ActiveSubscriptions("test-ns"); got != 0 {
|
||||||
|
t.Errorf("ActiveSubscriptions() after unsubscribe = %d, want 0", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_DroppedEvents(t *testing.T) {
|
||||||
|
eb := aether.NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
metrics := eb.Metrics()
|
||||||
|
|
||||||
|
// Subscribe but don't read from channel
|
||||||
|
_ = eb.Subscribe("test-ns")
|
||||||
|
|
||||||
|
// Fill the channel buffer (default is 100)
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
eb.Publish("test-ns", &aether.Event{
|
||||||
|
ID: "fill-" + string(rune(i)),
|
||||||
|
EventType: "FillEvent",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next publish should be dropped
|
||||||
|
eb.Publish("test-ns", &aether.Event{
|
||||||
|
ID: "dropped",
|
||||||
|
EventType: "DroppedEvent",
|
||||||
|
})
|
||||||
|
|
||||||
|
if got := metrics.DroppedEvents("test-ns"); got != 1 {
|
||||||
|
t.Errorf("DroppedEvents() = %d, want 1", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_MetricsProvider(t *testing.T) {
|
||||||
|
eb := aether.NewEventBus()
|
||||||
|
defer eb.Stop()
|
||||||
|
|
||||||
|
// Verify EventBus implements MetricsProvider
|
||||||
|
var mp aether.MetricsProvider = eb
|
||||||
|
if mp.Metrics() == nil {
|
||||||
|
t.Error("EventBus.Metrics() returned nil")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEventBus_StopClearsSubscriptionMetrics(t *testing.T) {
|
||||||
|
eb := aether.NewEventBus()
|
||||||
|
metrics := eb.Metrics()
|
||||||
|
|
||||||
|
_ = eb.Subscribe("ns1")
|
||||||
|
_ = eb.Subscribe("ns1")
|
||||||
|
_ = eb.Subscribe("ns2")
|
||||||
|
|
||||||
|
if got := metrics.TotalActiveSubscriptions(); got != 3 {
|
||||||
|
t.Errorf("TotalActiveSubscriptions() before stop = %d, want 3", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
eb.Stop()
|
||||||
|
|
||||||
|
if got := metrics.TotalActiveSubscriptions(); got != 0 {
|
||||||
|
t.Errorf("TotalActiveSubscriptions() after stop = %d, want 0", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -79,6 +79,8 @@ func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
|
|||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
|
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
|
||||||
|
// Record subscription error
|
||||||
|
neb.metrics.RecordSubscribeError(namespacePattern)
|
||||||
} else {
|
} else {
|
||||||
neb.subscriptions = append(neb.subscriptions, sub)
|
neb.subscriptions = append(neb.subscriptions, sub)
|
||||||
if IsWildcardPattern(namespacePattern) {
|
if IsWildcardPattern(namespacePattern) {
|
||||||
@@ -147,9 +149,11 @@ func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Eve
|
|||||||
if sub.pattern == pattern {
|
if sub.pattern == pattern {
|
||||||
select {
|
select {
|
||||||
case sub.ch <- event:
|
case sub.ch <- event:
|
||||||
// Event delivered
|
// Event delivered from NATS
|
||||||
|
neb.metrics.RecordReceive(pattern)
|
||||||
default:
|
default:
|
||||||
// Channel full, skip this subscriber (non-blocking)
|
// Channel full, skip this subscriber (non-blocking)
|
||||||
|
neb.metrics.RecordDroppedEvent(pattern)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -172,11 +176,13 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
|
|||||||
data, err := json.Marshal(eventMsg)
|
data, err := json.Marshal(eventMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err)
|
log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err)
|
||||||
|
neb.metrics.RecordPublishError(namespaceID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := neb.nc.Publish(subject, data); err != nil {
|
if err := neb.nc.Publish(subject, data); err != nil {
|
||||||
log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err)
|
log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err)
|
||||||
|
neb.metrics.RecordPublishError(namespaceID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user