Compare commits

...

3 Commits

Author SHA1 Message Date
dae751a6ef Add EventBroadcaster metrics for observability and debugging
All checks were successful
CI / build (pull_request) Successful in 38s
- Add BroadcasterMetrics interface for reading metrics per namespace
- Add MetricsCollector interface and DefaultMetricsCollector implementation
- Track events_published and events_received counters per namespace
- Track active_subscriptions gauge per namespace
- Track publish_errors, subscribe_errors, and dropped_events counters
- Add MetricsProvider interface for EventBroadcaster implementations
- Integrate metrics tracking into EventBus and NATSEventBus
- Add optional Prometheus integration via PrometheusMetricsAdapter
- Add comprehensive unit tests for metrics functionality

Closes #22

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 19:51:23 +01:00
9e238c5e70 Add integration tests for NATSEventBus
All checks were successful
CI / build (push) Successful in 16s
Add comprehensive integration tests that verify NATSEventBus behavior
with a real NATS server. Tests cover:

- Cross-node event delivery (multiple NATSEventBus instances)
- Namespace isolation with single and multiple NATS connections
- High-throughput scenarios (1000 events)
- Event ordering within namespace
- No cross-namespace leakage verification
- Concurrent publish/subscribe operations
- Multiple subscribers to same namespace
- Event metadata preservation across NATS
- Large event payload handling (100KB)
- Subscribe/unsubscribe lifecycle
- Reconnection behavior
- Graceful degradation under load
- Benchmarks for publish and publish-receive

Tests require a running NATS server and are tagged with +build integration.
Run with: go test -tags=integration -v ./...

Closes #18

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 18:27:08 +00:00
adead7e980 Add wildcard namespace subscriptions
All checks were successful
CI / build (pull_request) Successful in 18s
CI / build (push) Successful in 16s
Support NATS-style wildcard patterns ("*" and ">") for subscribing
to events across multiple namespaces. This enables cross-cutting
concerns like logging, monitoring, and auditing without requiring
separate subscriptions for each namespace.

- Add pattern.go with MatchNamespacePattern and IsWildcardPattern
- Update EventBus to track wildcard subscribers separately
- Update NATSEventBus to use NATS native wildcard support
- Add comprehensive tests for pattern matching and EventBus wildcards
- Document security implications in all relevant code comments

Closes #20

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 19:24:26 +01:00
11 changed files with 2790 additions and 69 deletions

View File

@@ -5,78 +5,177 @@ import (
"sync"
)
// EventBroadcaster defines the interface for publishing and subscribing to events
// EventBroadcaster defines the interface for publishing and subscribing to events.
//
// Subscribe accepts namespace patterns following NATS subject matching conventions:
// - Exact match: "tenant-a" matches only "tenant-a"
// - Single wildcard: "*" matches any single token, "tenant-*" matches "tenant-a", "tenant-b"
// - Multi-token wildcard: ">" matches one or more tokens (only at end of pattern)
//
// Security Warning: Wildcard subscriptions bypass namespace isolation.
// Only grant wildcard access to trusted system components.
type EventBroadcaster interface {
Subscribe(namespaceID string) <-chan *Event
Unsubscribe(namespaceID string, ch <-chan *Event)
// Subscribe creates a channel that receives events matching the namespace pattern.
// Pattern syntax follows NATS conventions: "*" matches single token, ">" matches multiple.
Subscribe(namespacePattern string) <-chan *Event
Unsubscribe(namespacePattern string, ch <-chan *Event)
Publish(namespaceID string, event *Event)
Stop()
SubscriberCount(namespaceID string) int
}
// EventBus broadcasts events to multiple subscribers within a namespace
// MetricsProvider is an optional interface that EventBroadcaster implementations
// can implement to expose metrics.
type MetricsProvider interface {
// Metrics returns the metrics collector for this broadcaster.
Metrics() BroadcasterMetrics
}
// subscription represents a single subscriber channel with its pattern
type subscription struct {
pattern string
ch chan *Event
}
// EventBus broadcasts events to multiple subscribers within a namespace.
// Supports wildcard patterns for cross-namespace subscriptions.
//
// Security Considerations:
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
// This is intentional for cross-cutting concerns like logging, monitoring, and auditing.
// However, it bypasses namespace isolation - use with appropriate access controls.
type EventBus struct {
subscribers map[string][]chan *Event // namespaceID -> channels
// exactSubscribers holds subscribers for exact namespace matches (no wildcards)
exactSubscribers map[string][]chan *Event
// wildcardSubscribers holds subscribers with wildcard patterns
wildcardSubscribers []subscription
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
metrics *DefaultMetricsCollector
}
// NewEventBus creates a new event bus
func NewEventBus() *EventBus {
ctx, cancel := context.WithCancel(context.Background())
return &EventBus{
subscribers: make(map[string][]chan *Event),
exactSubscribers: make(map[string][]chan *Event),
wildcardSubscribers: make([]subscription, 0),
ctx: ctx,
cancel: cancel,
metrics: NewMetricsCollector(),
}
}
// Subscribe creates a new subscription channel for a namespace
func (eb *EventBus) Subscribe(namespaceID string) <-chan *Event {
// Metrics returns the metrics collector for this event bus.
func (eb *EventBus) Metrics() BroadcasterMetrics {
return eb.metrics
}
// Subscribe creates a new subscription channel for a namespace pattern.
// Patterns follow NATS subject matching conventions:
// - "*" matches a single token (any sequence without ".")
// - ">" matches one or more tokens (only valid at the end)
// - Exact strings match exactly
//
// Security Warning: Wildcard patterns receive events from all matching namespaces,
// bypassing namespace isolation. Only use for trusted system components.
func (eb *EventBus) Subscribe(namespacePattern string) <-chan *Event {
eb.mutex.Lock()
defer eb.mutex.Unlock()
// Create buffered channel to prevent blocking publishers
ch := make(chan *Event, 100)
eb.subscribers[namespaceID] = append(eb.subscribers[namespaceID], ch)
if IsWildcardPattern(namespacePattern) {
// Store wildcard subscription separately
eb.wildcardSubscribers = append(eb.wildcardSubscribers, subscription{
pattern: namespacePattern,
ch: ch,
})
} else {
// Exact match subscription
eb.exactSubscribers[namespacePattern] = append(eb.exactSubscribers[namespacePattern], ch)
}
// Record subscription metric
eb.metrics.RecordSubscribe(namespacePattern)
return ch
}
// Unsubscribe removes a subscription channel
func (eb *EventBus) Unsubscribe(namespaceID string, ch <-chan *Event) {
func (eb *EventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
subs := eb.subscribers[namespaceID]
if IsWildcardPattern(namespacePattern) {
// Remove from wildcard subscribers
for i, sub := range eb.wildcardSubscribers {
if sub.ch == ch {
eb.wildcardSubscribers = append(eb.wildcardSubscribers[:i], eb.wildcardSubscribers[i+1:]...)
close(sub.ch)
// Record unsubscription metric
eb.metrics.RecordUnsubscribe(namespacePattern)
break
}
}
} else {
// Remove from exact subscribers
subs := eb.exactSubscribers[namespacePattern]
for i, subscriber := range subs {
if subscriber == ch {
// Remove channel from slice
eb.subscribers[namespaceID] = append(subs[:i], subs[i+1:]...)
eb.exactSubscribers[namespacePattern] = append(subs[:i], subs[i+1:]...)
close(subscriber)
// Record unsubscription metric
eb.metrics.RecordUnsubscribe(namespacePattern)
break
}
}
// Clean up empty namespace entries
if len(eb.subscribers[namespaceID]) == 0 {
delete(eb.subscribers, namespaceID)
if len(eb.exactSubscribers[namespacePattern]) == 0 {
delete(eb.exactSubscribers, namespacePattern)
}
}
}
// Publish sends an event to all subscribers of a namespace
// Publish sends an event to all subscribers of a namespace.
// Events are delivered to:
// - All exact subscribers for the namespace
// - All wildcard subscribers whose pattern matches the namespace
func (eb *EventBus) Publish(namespaceID string, event *Event) {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
subscribers := eb.subscribers[namespaceID]
// Record publish metric
eb.metrics.RecordPublish(namespaceID)
// Deliver to exact subscribers
subscribers := eb.exactSubscribers[namespaceID]
for _, ch := range subscribers {
select {
case ch <- event:
// Event delivered
eb.metrics.RecordReceive(namespaceID)
default:
// Channel full, skip this subscriber (non-blocking)
eb.metrics.RecordDroppedEvent(namespaceID)
}
}
// Deliver to matching wildcard subscribers
for _, sub := range eb.wildcardSubscribers {
if MatchNamespacePattern(sub.pattern, namespaceID) {
select {
case sub.ch <- event:
// Event delivered
eb.metrics.RecordReceive(namespaceID)
default:
// Channel full, skip this subscriber (non-blocking)
eb.metrics.RecordDroppedEvent(namespaceID)
}
}
}
}
@@ -88,19 +187,37 @@ func (eb *EventBus) Stop() {
eb.cancel()
// Close all subscriber channels
for _, subs := range eb.subscribers {
// Close all exact subscriber channels and update metrics
for namespaceID, subs := range eb.exactSubscribers {
for _, ch := range subs {
close(ch)
eb.metrics.RecordUnsubscribe(namespaceID)
}
}
eb.subscribers = make(map[string][]chan *Event)
// Close all wildcard subscriber channels and update metrics
for _, sub := range eb.wildcardSubscribers {
close(sub.ch)
eb.metrics.RecordUnsubscribe(sub.pattern)
}
// SubscriberCount returns the number of subscribers for a namespace
eb.exactSubscribers = make(map[string][]chan *Event)
eb.wildcardSubscribers = make([]subscription, 0)
}
// SubscriberCount returns the number of subscribers for a namespace.
// This counts only exact match subscribers, not wildcard subscribers that may match.
func (eb *EventBus) SubscriberCount(namespaceID string) int {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
return len(eb.subscribers[namespaceID])
return len(eb.exactSubscribers[namespaceID])
}
// WildcardSubscriberCount returns the number of wildcard subscribers.
// These are subscribers using "*" or ">" patterns that may receive events
// from multiple namespaces.
func (eb *EventBus) WildcardSubscriberCount() int {
eb.mutex.RLock()
defer eb.mutex.RUnlock()
return len(eb.wildcardSubscribers)
}

416
eventbus_test.go Normal file
View File

@@ -0,0 +1,416 @@
package aether
import (
"sync"
"testing"
"time"
)
func TestEventBus_ExactSubscription(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
ch := eb.Subscribe("tenant-a")
event := &Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
}
eb.Publish("tenant-a", event)
select {
case received := <-ch:
if received.ID != event.ID {
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for event")
}
}
func TestEventBus_WildcardStarSubscription(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to all single-token namespaces
ch := eb.Subscribe("*")
event := &Event{
ID: "evt-1",
EventType: "TestEvent",
ActorID: "actor-1",
}
eb.Publish("tenant-a", event)
select {
case received := <-ch:
if received.ID != event.ID {
t.Errorf("expected event ID %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Fatal("timed out waiting for event")
}
}
func TestEventBus_WildcardGreaterSubscription(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to all namespaces
ch := eb.Subscribe(">")
events := []*Event{
{ID: "evt-1", EventType: "Test1", ActorID: "actor-1"},
{ID: "evt-2", EventType: "Test2", ActorID: "actor-2"},
{ID: "evt-3", EventType: "Test3", ActorID: "actor-3"},
}
namespaces := []string{"tenant-a", "tenant-b", "prod.tenant.orders"}
for i, ns := range namespaces {
eb.Publish(ns, events[i])
}
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
for i := 0; i < len(events); i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d of %d events", i, len(events))
}
}
for _, evt := range events {
if !received[evt.ID] {
t.Errorf("did not receive event %s", evt.ID)
}
}
}
func TestEventBus_PrefixWildcard(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Subscribe to prod.*
ch := eb.Subscribe("prod.*")
event1 := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
event2 := &Event{ID: "evt-2", EventType: "Test", ActorID: "actor-2"}
event3 := &Event{ID: "evt-3", EventType: "Test", ActorID: "actor-3"}
// Should match
eb.Publish("prod.tenant", event1)
eb.Publish("prod.orders", event2)
// Should not match (different prefix)
eb.Publish("staging.tenant", event3)
received := make(map[string]bool)
timeout := time.After(100 * time.Millisecond)
// Should receive exactly 2 events
for i := 0; i < 2; i++ {
select {
case evt := <-ch:
received[evt.ID] = true
case <-timeout:
t.Fatalf("timed out after receiving %d events", len(received))
}
}
// Verify we got the right ones
if !received["evt-1"] || !received["evt-2"] {
t.Errorf("expected evt-1 and evt-2, got %v", received)
}
// Verify no third event arrives
select {
case evt := <-ch:
t.Errorf("unexpected event received: %s", evt.ID)
case <-time.After(50 * time.Millisecond):
// Expected - no more events
}
}
func TestEventBus_MultipleWildcardSubscribers(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
ch1 := eb.Subscribe("prod.*")
ch2 := eb.Subscribe("prod.>")
ch3 := eb.Subscribe(">")
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
eb.Publish("prod.tenant.orders", event)
// ch1 (prod.*) should NOT receive - doesn't match 3 tokens
select {
case <-ch1:
t.Error("prod.* should not match prod.tenant.orders")
case <-time.After(50 * time.Millisecond):
// Expected
}
// ch2 (prod.>) should receive
select {
case received := <-ch2:
if received.ID != event.ID {
t.Errorf("expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("prod.> should match prod.tenant.orders")
}
// ch3 (>) should receive
select {
case received := <-ch3:
if received.ID != event.ID {
t.Errorf("expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("> should match prod.tenant.orders")
}
}
func TestEventBus_ExactAndWildcardCoexist(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
chExact := eb.Subscribe("tenant-a")
chWildcard := eb.Subscribe("*")
event := &Event{ID: "evt-1", EventType: "Test", ActorID: "actor-1"}
eb.Publish("tenant-a", event)
// Both should receive the event
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
select {
case received := <-chExact:
if received.ID != event.ID {
t.Errorf("exact: expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("exact subscriber timed out")
}
}()
go func() {
defer wg.Done()
select {
case received := <-chWildcard:
if received.ID != event.ID {
t.Errorf("wildcard: expected %s, got %s", event.ID, received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("wildcard subscriber timed out")
}
}()
wg.Wait()
}
func TestEventBus_WildcardUnsubscribe(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
ch := eb.Subscribe("prod.*")
// Verify it's counted
if eb.WildcardSubscriberCount() != 1 {
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
}
eb.Unsubscribe("prod.*", ch)
// Verify it's removed
if eb.WildcardSubscriberCount() != 0 {
t.Errorf("expected 0 wildcard subscribers, got %d", eb.WildcardSubscriberCount())
}
}
func TestEventBus_SubscriberCount(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Add exact subscribers
ch1 := eb.Subscribe("tenant-a")
ch2 := eb.Subscribe("tenant-a")
if eb.SubscriberCount("tenant-a") != 2 {
t.Errorf("expected 2 exact subscribers, got %d", eb.SubscriberCount("tenant-a"))
}
// Add wildcard subscriber - should not affect exact count
eb.Subscribe("*")
if eb.SubscriberCount("tenant-a") != 2 {
t.Errorf("expected 2 exact subscribers after wildcard add, got %d", eb.SubscriberCount("tenant-a"))
}
if eb.WildcardSubscriberCount() != 1 {
t.Errorf("expected 1 wildcard subscriber, got %d", eb.WildcardSubscriberCount())
}
// Unsubscribe exact
eb.Unsubscribe("tenant-a", ch1)
if eb.SubscriberCount("tenant-a") != 1 {
t.Errorf("expected 1 exact subscriber after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
}
eb.Unsubscribe("tenant-a", ch2)
if eb.SubscriberCount("tenant-a") != 0 {
t.Errorf("expected 0 exact subscribers after unsubscribe, got %d", eb.SubscriberCount("tenant-a"))
}
}
func TestEventBus_StopClosesAllChannels(t *testing.T) {
eb := NewEventBus()
chExact := eb.Subscribe("tenant-a")
chWildcard := eb.Subscribe("*")
eb.Stop()
// Both channels should be closed
select {
case _, ok := <-chExact:
if ok {
t.Error("expected exact channel to be closed")
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for exact channel close")
}
select {
case _, ok := <-chWildcard:
if ok {
t.Error("expected wildcard channel to be closed")
}
case <-time.After(100 * time.Millisecond):
t.Error("timed out waiting for wildcard channel close")
}
}
func TestEventBus_NamespaceIsolation(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
chA := eb.Subscribe("tenant-a")
chB := eb.Subscribe("tenant-b")
eventA := &Event{ID: "evt-a", EventType: "Test", ActorID: "actor-1"}
eventB := &Event{ID: "evt-b", EventType: "Test", ActorID: "actor-2"}
eb.Publish("tenant-a", eventA)
eb.Publish("tenant-b", eventB)
// Verify tenant-a receives only its event
select {
case received := <-chA:
if received.ID != "evt-a" {
t.Errorf("tenant-a received wrong event: %s", received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("tenant-a timed out")
}
select {
case <-chA:
t.Error("tenant-a received extra event")
case <-time.After(50 * time.Millisecond):
// Expected
}
// Verify tenant-b receives only its event
select {
case received := <-chB:
if received.ID != "evt-b" {
t.Errorf("tenant-b received wrong event: %s", received.ID)
}
case <-time.After(100 * time.Millisecond):
t.Error("tenant-b timed out")
}
select {
case <-chB:
t.Error("tenant-b received extra event")
case <-time.After(50 * time.Millisecond):
// Expected
}
}
func TestEventBus_NonBlockingPublish(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
// Create subscriber but don't read from channel
_ = eb.Subscribe("tenant-a")
// Fill the channel buffer (100 events)
for i := 0; i < 150; i++ {
event := &Event{
ID: "evt",
EventType: "Test",
ActorID: "actor-1",
}
// Should not block even when channel is full
eb.Publish("tenant-a", event)
}
// If we got here without blocking, test passes
}
func TestEventBus_ConcurrentOperations(t *testing.T) {
eb := NewEventBus()
defer eb.Stop()
var wg sync.WaitGroup
// Concurrent subscriptions
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
ch := eb.Subscribe("tenant-a")
time.Sleep(10 * time.Millisecond)
eb.Unsubscribe("tenant-a", ch)
}(i)
}
// Concurrent wildcard subscriptions
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
ch := eb.Subscribe("*")
time.Sleep(10 * time.Millisecond)
eb.Unsubscribe("*", ch)
}(i)
}
// Concurrent publishes
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
event := &Event{
ID: "evt",
EventType: "Test",
ActorID: "actor-1",
}
eb.Publish("tenant-a", event)
}(i)
}
wg.Wait()
}

16
go.mod
View File

@@ -1,16 +1,26 @@
module git.flowmade.one/flowmade-one/aether
go 1.23
go 1.23.0
require (
github.com/google/uuid v1.6.0
github.com/nats-io/nats.go v1.37.0
github.com/prometheus/client_golang v1.23.2
)
require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
google.golang.org/protobuf v1.36.8 // indirect
)

48
go.sum
View File

@@ -1,14 +1,54 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

258
metrics.go Normal file
View File

@@ -0,0 +1,258 @@
package aether
import (
"sync"
"sync/atomic"
)
// BroadcasterMetrics provides observability metrics for EventBroadcaster implementations.
// All methods are safe for concurrent use.
type BroadcasterMetrics interface {
// EventsPublished returns the total number of events published per namespace.
EventsPublished(namespaceID string) int64
// EventsReceived returns the total number of events received per namespace.
// For EventBus this equals events delivered to subscribers.
// For NATSEventBus this includes events received from NATS.
EventsReceived(namespaceID string) int64
// ActiveSubscriptions returns the current number of active subscriptions per namespace.
ActiveSubscriptions(namespaceID string) int64
// TotalActiveSubscriptions returns the total number of active subscriptions across all namespaces.
TotalActiveSubscriptions() int64
// PublishErrors returns the total number of publish errors per namespace.
PublishErrors(namespaceID string) int64
// SubscribeErrors returns the total number of subscribe errors per namespace.
SubscribeErrors(namespaceID string) int64
// DroppedEvents returns the total number of events dropped (e.g., full channel) per namespace.
DroppedEvents(namespaceID string) int64
// Namespaces returns a list of all namespaces that have metrics.
Namespaces() []string
// Reset resets all metrics. Useful for testing.
Reset()
}
// MetricsCollector provides methods for collecting metrics.
// This interface is implemented internally and used by EventBus implementations.
type MetricsCollector interface {
BroadcasterMetrics
// RecordPublish records a successful publish event.
RecordPublish(namespaceID string)
// RecordReceive records a received event.
RecordReceive(namespaceID string)
// RecordSubscribe records a new subscription.
RecordSubscribe(namespaceID string)
// RecordUnsubscribe records a removed subscription.
RecordUnsubscribe(namespaceID string)
// RecordPublishError records a publish error.
RecordPublishError(namespaceID string)
// RecordSubscribeError records a subscribe error.
RecordSubscribeError(namespaceID string)
// RecordDroppedEvent records a dropped event (e.g., channel full).
RecordDroppedEvent(namespaceID string)
}
// namespaceMetrics holds counters for a single namespace.
type namespaceMetrics struct {
eventsPublished int64
eventsReceived int64
activeSubscriptions int64
publishErrors int64
subscribeErrors int64
droppedEvents int64
}
// DefaultMetricsCollector is the default implementation of MetricsCollector.
// It uses atomic operations for thread-safe counter updates.
type DefaultMetricsCollector struct {
mu sync.RWMutex
namespaces map[string]*namespaceMetrics
}
// NewMetricsCollector creates a new DefaultMetricsCollector.
func NewMetricsCollector() *DefaultMetricsCollector {
return &DefaultMetricsCollector{
namespaces: make(map[string]*namespaceMetrics),
}
}
// getOrCreateNamespace returns metrics for a namespace, creating if needed.
func (m *DefaultMetricsCollector) getOrCreateNamespace(namespaceID string) *namespaceMetrics {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if exists {
return ns
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
if ns, exists = m.namespaces[namespaceID]; exists {
return ns
}
ns = &namespaceMetrics{}
m.namespaces[namespaceID] = ns
return ns
}
// EventsPublished returns the total number of events published for a namespace.
func (m *DefaultMetricsCollector) EventsPublished(namespaceID string) int64 {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if !exists {
return 0
}
return atomic.LoadInt64(&ns.eventsPublished)
}
// EventsReceived returns the total number of events received for a namespace.
func (m *DefaultMetricsCollector) EventsReceived(namespaceID string) int64 {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if !exists {
return 0
}
return atomic.LoadInt64(&ns.eventsReceived)
}
// ActiveSubscriptions returns the current number of active subscriptions for a namespace.
func (m *DefaultMetricsCollector) ActiveSubscriptions(namespaceID string) int64 {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if !exists {
return 0
}
return atomic.LoadInt64(&ns.activeSubscriptions)
}
// TotalActiveSubscriptions returns the total number of active subscriptions across all namespaces.
func (m *DefaultMetricsCollector) TotalActiveSubscriptions() int64 {
m.mu.RLock()
defer m.mu.RUnlock()
var total int64
for _, ns := range m.namespaces {
total += atomic.LoadInt64(&ns.activeSubscriptions)
}
return total
}
// PublishErrors returns the total number of publish errors for a namespace.
func (m *DefaultMetricsCollector) PublishErrors(namespaceID string) int64 {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if !exists {
return 0
}
return atomic.LoadInt64(&ns.publishErrors)
}
// SubscribeErrors returns the total number of subscribe errors for a namespace.
func (m *DefaultMetricsCollector) SubscribeErrors(namespaceID string) int64 {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if !exists {
return 0
}
return atomic.LoadInt64(&ns.subscribeErrors)
}
// DroppedEvents returns the total number of dropped events for a namespace.
func (m *DefaultMetricsCollector) DroppedEvents(namespaceID string) int64 {
m.mu.RLock()
ns, exists := m.namespaces[namespaceID]
m.mu.RUnlock()
if !exists {
return 0
}
return atomic.LoadInt64(&ns.droppedEvents)
}
// Namespaces returns a list of all namespaces that have metrics.
func (m *DefaultMetricsCollector) Namespaces() []string {
m.mu.RLock()
defer m.mu.RUnlock()
namespaces := make([]string, 0, len(m.namespaces))
for ns := range m.namespaces {
namespaces = append(namespaces, ns)
}
return namespaces
}
// Reset resets all metrics.
func (m *DefaultMetricsCollector) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
m.namespaces = make(map[string]*namespaceMetrics)
}
// RecordPublish records a successful publish event.
func (m *DefaultMetricsCollector) RecordPublish(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.eventsPublished, 1)
}
// RecordReceive records a received event.
func (m *DefaultMetricsCollector) RecordReceive(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.eventsReceived, 1)
}
// RecordSubscribe records a new subscription.
func (m *DefaultMetricsCollector) RecordSubscribe(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.activeSubscriptions, 1)
}
// RecordUnsubscribe records a removed subscription.
func (m *DefaultMetricsCollector) RecordUnsubscribe(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.activeSubscriptions, -1)
}
// RecordPublishError records a publish error.
func (m *DefaultMetricsCollector) RecordPublishError(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.publishErrors, 1)
}
// RecordSubscribeError records a subscribe error.
func (m *DefaultMetricsCollector) RecordSubscribeError(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.subscribeErrors, 1)
}
// RecordDroppedEvent records a dropped event.
func (m *DefaultMetricsCollector) RecordDroppedEvent(namespaceID string) {
ns := m.getOrCreateNamespace(namespaceID)
atomic.AddInt64(&ns.droppedEvents, 1)
}

123
metrics_prometheus.go Normal file
View File

@@ -0,0 +1,123 @@
package aether
import (
"github.com/prometheus/client_golang/prometheus"
)
// PrometheusMetricsAdapter exposes BroadcasterMetrics as Prometheus metrics.
// It implements prometheus.Collector and can be registered with a Prometheus registry.
type PrometheusMetricsAdapter struct {
metrics BroadcasterMetrics
eventsPublishedDesc *prometheus.Desc
eventsReceivedDesc *prometheus.Desc
activeSubscriptionsDesc *prometheus.Desc
publishErrorsDesc *prometheus.Desc
subscribeErrorsDesc *prometheus.Desc
droppedEventsDesc *prometheus.Desc
}
// NewPrometheusMetricsAdapter creates a new PrometheusMetricsAdapter that wraps
// a BroadcasterMetrics implementation and exposes it as Prometheus metrics.
//
// The adapter implements prometheus.Collector and should be registered with
// a Prometheus registry:
//
// eb := aether.NewEventBus()
// adapter := aether.NewPrometheusMetricsAdapter(eb.Metrics())
// prometheus.MustRegister(adapter)
func NewPrometheusMetricsAdapter(metrics BroadcasterMetrics) *PrometheusMetricsAdapter {
return &PrometheusMetricsAdapter{
metrics: metrics,
eventsPublishedDesc: prometheus.NewDesc(
"aether_events_published_total",
"Total number of events published per namespace",
[]string{"namespace"},
nil,
),
eventsReceivedDesc: prometheus.NewDesc(
"aether_events_received_total",
"Total number of events received per namespace",
[]string{"namespace"},
nil,
),
activeSubscriptionsDesc: prometheus.NewDesc(
"aether_active_subscriptions",
"Number of active subscriptions per namespace",
[]string{"namespace"},
nil,
),
publishErrorsDesc: prometheus.NewDesc(
"aether_publish_errors_total",
"Total number of publish errors per namespace",
[]string{"namespace"},
nil,
),
subscribeErrorsDesc: prometheus.NewDesc(
"aether_subscribe_errors_total",
"Total number of subscribe errors per namespace",
[]string{"namespace"},
nil,
),
droppedEventsDesc: prometheus.NewDesc(
"aether_dropped_events_total",
"Total number of dropped events per namespace",
[]string{"namespace"},
nil,
),
}
}
// Describe implements prometheus.Collector.
func (a *PrometheusMetricsAdapter) Describe(ch chan<- *prometheus.Desc) {
ch <- a.eventsPublishedDesc
ch <- a.eventsReceivedDesc
ch <- a.activeSubscriptionsDesc
ch <- a.publishErrorsDesc
ch <- a.subscribeErrorsDesc
ch <- a.droppedEventsDesc
}
// Collect implements prometheus.Collector.
func (a *PrometheusMetricsAdapter) Collect(ch chan<- prometheus.Metric) {
namespaces := a.metrics.Namespaces()
for _, ns := range namespaces {
ch <- prometheus.MustNewConstMetric(
a.eventsPublishedDesc,
prometheus.CounterValue,
float64(a.metrics.EventsPublished(ns)),
ns,
)
ch <- prometheus.MustNewConstMetric(
a.eventsReceivedDesc,
prometheus.CounterValue,
float64(a.metrics.EventsReceived(ns)),
ns,
)
ch <- prometheus.MustNewConstMetric(
a.activeSubscriptionsDesc,
prometheus.GaugeValue,
float64(a.metrics.ActiveSubscriptions(ns)),
ns,
)
ch <- prometheus.MustNewConstMetric(
a.publishErrorsDesc,
prometheus.CounterValue,
float64(a.metrics.PublishErrors(ns)),
ns,
)
ch <- prometheus.MustNewConstMetric(
a.subscribeErrorsDesc,
prometheus.CounterValue,
float64(a.metrics.SubscribeErrors(ns)),
ns,
)
ch <- prometheus.MustNewConstMetric(
a.droppedEventsDesc,
prometheus.CounterValue,
float64(a.metrics.DroppedEvents(ns)),
ns,
)
}
}

304
metrics_test.go Normal file
View File

@@ -0,0 +1,304 @@
package aether_test
import (
"sync"
"testing"
"time"
"git.flowmade.one/flowmade-one/aether"
)
func TestMetricsCollector_InitialState(t *testing.T) {
mc := aether.NewMetricsCollector()
if got := mc.EventsPublished("test-ns"); got != 0 {
t.Errorf("EventsPublished() = %d, want 0", got)
}
if got := mc.EventsReceived("test-ns"); got != 0 {
t.Errorf("EventsReceived() = %d, want 0", got)
}
if got := mc.ActiveSubscriptions("test-ns"); got != 0 {
t.Errorf("ActiveSubscriptions() = %d, want 0", got)
}
if got := mc.TotalActiveSubscriptions(); got != 0 {
t.Errorf("TotalActiveSubscriptions() = %d, want 0", got)
}
if got := mc.PublishErrors("test-ns"); got != 0 {
t.Errorf("PublishErrors() = %d, want 0", got)
}
if got := mc.SubscribeErrors("test-ns"); got != 0 {
t.Errorf("SubscribeErrors() = %d, want 0", got)
}
if got := mc.DroppedEvents("test-ns"); got != 0 {
t.Errorf("DroppedEvents() = %d, want 0", got)
}
if got := len(mc.Namespaces()); got != 0 {
t.Errorf("Namespaces() = %d, want 0", got)
}
}
func TestMetricsCollector_RecordPublish(t *testing.T) {
mc := aether.NewMetricsCollector()
mc.RecordPublish("ns1")
mc.RecordPublish("ns1")
mc.RecordPublish("ns2")
if got := mc.EventsPublished("ns1"); got != 2 {
t.Errorf("EventsPublished(ns1) = %d, want 2", got)
}
if got := mc.EventsPublished("ns2"); got != 1 {
t.Errorf("EventsPublished(ns2) = %d, want 1", got)
}
}
func TestMetricsCollector_RecordReceive(t *testing.T) {
mc := aether.NewMetricsCollector()
mc.RecordReceive("ns1")
mc.RecordReceive("ns1")
mc.RecordReceive("ns1")
if got := mc.EventsReceived("ns1"); got != 3 {
t.Errorf("EventsReceived(ns1) = %d, want 3", got)
}
}
func TestMetricsCollector_Subscriptions(t *testing.T) {
mc := aether.NewMetricsCollector()
mc.RecordSubscribe("ns1")
mc.RecordSubscribe("ns1")
mc.RecordSubscribe("ns2")
if got := mc.ActiveSubscriptions("ns1"); got != 2 {
t.Errorf("ActiveSubscriptions(ns1) = %d, want 2", got)
}
if got := mc.ActiveSubscriptions("ns2"); got != 1 {
t.Errorf("ActiveSubscriptions(ns2) = %d, want 1", got)
}
if got := mc.TotalActiveSubscriptions(); got != 3 {
t.Errorf("TotalActiveSubscriptions() = %d, want 3", got)
}
mc.RecordUnsubscribe("ns1")
if got := mc.ActiveSubscriptions("ns1"); got != 1 {
t.Errorf("ActiveSubscriptions(ns1) after unsubscribe = %d, want 1", got)
}
if got := mc.TotalActiveSubscriptions(); got != 2 {
t.Errorf("TotalActiveSubscriptions() after unsubscribe = %d, want 2", got)
}
}
func TestMetricsCollector_Errors(t *testing.T) {
mc := aether.NewMetricsCollector()
mc.RecordPublishError("ns1")
mc.RecordPublishError("ns1")
mc.RecordSubscribeError("ns1")
mc.RecordDroppedEvent("ns1")
mc.RecordDroppedEvent("ns1")
mc.RecordDroppedEvent("ns1")
if got := mc.PublishErrors("ns1"); got != 2 {
t.Errorf("PublishErrors(ns1) = %d, want 2", got)
}
if got := mc.SubscribeErrors("ns1"); got != 1 {
t.Errorf("SubscribeErrors(ns1) = %d, want 1", got)
}
if got := mc.DroppedEvents("ns1"); got != 3 {
t.Errorf("DroppedEvents(ns1) = %d, want 3", got)
}
}
func TestMetricsCollector_Namespaces(t *testing.T) {
mc := aether.NewMetricsCollector()
mc.RecordPublish("ns1")
mc.RecordReceive("ns2")
mc.RecordSubscribe("ns3")
namespaces := mc.Namespaces()
if len(namespaces) != 3 {
t.Errorf("Namespaces() length = %d, want 3", len(namespaces))
}
nsMap := make(map[string]bool)
for _, ns := range namespaces {
nsMap[ns] = true
}
for _, expected := range []string{"ns1", "ns2", "ns3"} {
if !nsMap[expected] {
t.Errorf("Namespaces() missing %q", expected)
}
}
}
func TestMetricsCollector_Reset(t *testing.T) {
mc := aether.NewMetricsCollector()
mc.RecordPublish("ns1")
mc.RecordReceive("ns1")
mc.RecordSubscribe("ns1")
mc.Reset()
if got := mc.EventsPublished("ns1"); got != 0 {
t.Errorf("EventsPublished() after reset = %d, want 0", got)
}
if got := len(mc.Namespaces()); got != 0 {
t.Errorf("Namespaces() after reset = %d, want 0", got)
}
}
func TestMetricsCollector_ConcurrentAccess(t *testing.T) {
mc := aether.NewMetricsCollector()
const goroutines = 10
const iterations = 100
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for j := 0; j < iterations; j++ {
mc.RecordPublish("concurrent-ns")
mc.RecordReceive("concurrent-ns")
mc.RecordSubscribe("concurrent-ns")
mc.RecordUnsubscribe("concurrent-ns")
mc.RecordPublishError("concurrent-ns")
mc.RecordSubscribeError("concurrent-ns")
mc.RecordDroppedEvent("concurrent-ns")
}
}()
}
wg.Wait()
expected := int64(goroutines * iterations)
if got := mc.EventsPublished("concurrent-ns"); got != expected {
t.Errorf("EventsPublished() = %d, want %d", got, expected)
}
if got := mc.EventsReceived("concurrent-ns"); got != expected {
t.Errorf("EventsReceived() = %d, want %d", got, expected)
}
if got := mc.ActiveSubscriptions("concurrent-ns"); got != 0 {
t.Errorf("ActiveSubscriptions() = %d, want 0 (subscribed and unsubscribed same amount)", got)
}
if got := mc.PublishErrors("concurrent-ns"); got != expected {
t.Errorf("PublishErrors() = %d, want %d", got, expected)
}
if got := mc.SubscribeErrors("concurrent-ns"); got != expected {
t.Errorf("SubscribeErrors() = %d, want %d", got, expected)
}
if got := mc.DroppedEvents("concurrent-ns"); got != expected {
t.Errorf("DroppedEvents() = %d, want %d", got, expected)
}
}
func TestEventBus_Metrics(t *testing.T) {
eb := aether.NewEventBus()
defer eb.Stop()
metrics := eb.Metrics()
if metrics == nil {
t.Fatal("Metrics() returned nil")
}
// Subscribe and verify metrics
ch := eb.Subscribe("test-ns")
if got := metrics.ActiveSubscriptions("test-ns"); got != 1 {
t.Errorf("ActiveSubscriptions() after subscribe = %d, want 1", got)
}
// Publish and verify metrics
event := &aether.Event{
ID: "test-1",
EventType: "TestEvent",
ActorID: "actor-1",
Version: 1,
}
eb.Publish("test-ns", event)
// Wait for event delivery
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for event")
}
if got := metrics.EventsPublished("test-ns"); got != 1 {
t.Errorf("EventsPublished() after publish = %d, want 1", got)
}
if got := metrics.EventsReceived("test-ns"); got != 1 {
t.Errorf("EventsReceived() after publish = %d, want 1", got)
}
// Unsubscribe and verify metrics
eb.Unsubscribe("test-ns", ch)
if got := metrics.ActiveSubscriptions("test-ns"); got != 0 {
t.Errorf("ActiveSubscriptions() after unsubscribe = %d, want 0", got)
}
}
func TestEventBus_DroppedEvents(t *testing.T) {
eb := aether.NewEventBus()
defer eb.Stop()
metrics := eb.Metrics()
// Subscribe but don't read from channel
_ = eb.Subscribe("test-ns")
// Fill the channel buffer (default is 100)
for i := 0; i < 100; i++ {
eb.Publish("test-ns", &aether.Event{
ID: "fill-" + string(rune(i)),
EventType: "FillEvent",
})
}
// Next publish should be dropped
eb.Publish("test-ns", &aether.Event{
ID: "dropped",
EventType: "DroppedEvent",
})
if got := metrics.DroppedEvents("test-ns"); got != 1 {
t.Errorf("DroppedEvents() = %d, want 1", got)
}
}
func TestEventBus_MetricsProvider(t *testing.T) {
eb := aether.NewEventBus()
defer eb.Stop()
// Verify EventBus implements MetricsProvider
var mp aether.MetricsProvider = eb
if mp.Metrics() == nil {
t.Error("EventBus.Metrics() returned nil")
}
}
func TestEventBus_StopClearsSubscriptionMetrics(t *testing.T) {
eb := aether.NewEventBus()
metrics := eb.Metrics()
_ = eb.Subscribe("ns1")
_ = eb.Subscribe("ns1")
_ = eb.Subscribe("ns2")
if got := metrics.TotalActiveSubscriptions(); got != 3 {
t.Errorf("TotalActiveSubscriptions() before stop = %d, want 3", got)
}
eb.Stop()
if got := metrics.TotalActiveSubscriptions(); got != 0 {
t.Errorf("TotalActiveSubscriptions() after stop = %d, want 0", got)
}
}

View File

@@ -11,12 +11,18 @@ import (
"github.com/nats-io/nats.go"
)
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS
// NATSEventBus is an EventBus that broadcasts events across all cluster nodes using NATS.
// Supports wildcard patterns for cross-namespace subscriptions using NATS native wildcards.
//
// Security Considerations:
// Wildcard subscriptions (using "*" or ">") receive events from multiple namespaces.
// This bypasses namespace isolation at the NATS level. Ensure proper access controls
// are in place at the application layer before granting wildcard subscription access.
type NATSEventBus struct {
*EventBus // Embed base EventBus for local subscriptions
nc *nats.Conn // NATS connection
subscriptions []*nats.Subscription
namespaceSubscribers map[string]int // Track number of subscribers per namespace
patternSubscribers map[string]int // Track number of subscribers per pattern (includes wildcards)
nodeID string // Unique ID for this node
mutex sync.Mutex
ctx context.Context
@@ -39,7 +45,7 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
nc: nc,
nodeID: uuid.New().String(),
subscriptions: make([]*nats.Subscription, 0),
namespaceSubscribers: make(map[string]int),
patternSubscribers: make(map[string]int),
ctx: ctx,
cancel: cancel,
}
@@ -47,57 +53,70 @@ func NewNATSEventBus(nc *nats.Conn) (*NATSEventBus, error) {
return neb, nil
}
// Subscribe creates a local subscription and ensures NATS subscription exists for the namespace
func (neb *NATSEventBus) Subscribe(namespaceID string) <-chan *Event {
// Subscribe creates a local subscription and ensures NATS subscription exists for the pattern.
// Supports NATS subject patterns:
// - "*" matches a single token
// - ">" matches one or more tokens (only at the end)
//
// Security Warning: Wildcard patterns receive events from all matching namespaces,
// bypassing namespace isolation. Only use for trusted system components.
func (neb *NATSEventBus) Subscribe(namespacePattern string) <-chan *Event {
neb.mutex.Lock()
defer neb.mutex.Unlock()
// Create local subscription first
ch := neb.EventBus.Subscribe(namespaceID)
ch := neb.EventBus.Subscribe(namespacePattern)
// Check if this is the first subscriber for this namespace
count := neb.namespaceSubscribers[namespaceID]
// Check if this is the first subscriber for this pattern
count := neb.patternSubscribers[namespacePattern]
if count == 0 {
// First subscriber - create NATS subscription
subject := fmt.Sprintf("aether.events.%s", namespaceID)
// NATS natively supports wildcards, so we can use the pattern directly
subject := fmt.Sprintf("aether.events.%s", namespacePattern)
sub, err := neb.nc.Subscribe(subject, func(msg *nats.Msg) {
neb.handleNATSEvent(msg)
neb.handleNATSEvent(msg, namespacePattern)
})
if err != nil {
log.Printf("[NATSEventBus] Failed to subscribe to NATS subject %s: %v", subject, err)
// Record subscription error
neb.metrics.RecordSubscribeError(namespacePattern)
} else {
neb.subscriptions = append(neb.subscriptions, sub)
if IsWildcardPattern(namespacePattern) {
log.Printf("[NATSEventBus] Node %s subscribed to wildcard pattern %s", neb.nodeID, subject)
} else {
log.Printf("[NATSEventBus] Node %s subscribed to %s", neb.nodeID, subject)
}
}
}
neb.namespaceSubscribers[namespaceID] = count + 1
neb.patternSubscribers[namespacePattern] = count + 1
return ch
}
// Unsubscribe removes a local subscription and cleans up NATS subscription if no more subscribers
func (neb *NATSEventBus) Unsubscribe(namespaceID string, ch <-chan *Event) {
func (neb *NATSEventBus) Unsubscribe(namespacePattern string, ch <-chan *Event) {
neb.mutex.Lock()
defer neb.mutex.Unlock()
neb.EventBus.Unsubscribe(namespaceID, ch)
neb.EventBus.Unsubscribe(namespacePattern, ch)
count := neb.namespaceSubscribers[namespaceID]
count := neb.patternSubscribers[namespacePattern]
if count > 0 {
count--
neb.namespaceSubscribers[namespaceID] = count
neb.patternSubscribers[namespacePattern] = count
if count == 0 {
delete(neb.namespaceSubscribers, namespaceID)
log.Printf("[NATSEventBus] No more subscribers for namespace %s on node %s", namespaceID, neb.nodeID)
delete(neb.patternSubscribers, namespacePattern)
log.Printf("[NATSEventBus] No more subscribers for pattern %s on node %s", namespacePattern, neb.nodeID)
}
}
}
// handleNATSEvent processes events received from NATS
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) {
func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg, subscribedPattern string) {
var eventMsg eventMessage
if err := json.Unmarshal(msg.Data, &eventMsg); err != nil {
log.Printf("[NATSEventBus] Failed to unmarshal event: %v", err)
@@ -109,9 +128,36 @@ func (neb *NATSEventBus) handleNATSEvent(msg *nats.Msg) {
return
}
// Forward to local EventBus subscribers
// For wildcard subscriptions, we need to deliver to the EventBus using
// the subscribed pattern so it reaches the correct wildcard subscriber.
// For exact subscriptions, use the actual namespace.
if IsWildcardPattern(subscribedPattern) {
// Deliver using the pattern - the EventBus will route to wildcard subscribers
neb.deliverToWildcardSubscribers(subscribedPattern, eventMsg.Event)
} else {
// Forward to local EventBus subscribers with actual namespace
neb.EventBus.Publish(eventMsg.NamespaceID, eventMsg.Event)
}
}
// deliverToWildcardSubscribers delivers an event to subscribers of a specific wildcard pattern
func (neb *NATSEventBus) deliverToWildcardSubscribers(pattern string, event *Event) {
neb.EventBus.mutex.RLock()
defer neb.EventBus.mutex.RUnlock()
for _, sub := range neb.EventBus.wildcardSubscribers {
if sub.pattern == pattern {
select {
case sub.ch <- event:
// Event delivered from NATS
neb.metrics.RecordReceive(pattern)
default:
// Channel full, skip this subscriber (non-blocking)
neb.metrics.RecordDroppedEvent(pattern)
}
}
}
}
// Publish publishes an event both locally and to NATS for cross-node broadcasting
func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
@@ -130,11 +176,13 @@ func (neb *NATSEventBus) Publish(namespaceID string, event *Event) {
data, err := json.Marshal(eventMsg)
if err != nil {
log.Printf("[NATSEventBus] Failed to marshal event for NATS: %v", err)
neb.metrics.RecordPublishError(namespaceID)
return
}
if err := neb.nc.Publish(subject, data); err != nil {
log.Printf("[NATSEventBus] Failed to publish event to NATS: %v", err)
neb.metrics.RecordPublishError(namespaceID)
return
}
}

File diff suppressed because it is too large Load Diff

83
pattern.go Normal file
View File

@@ -0,0 +1,83 @@
package aether
import "strings"
// MatchNamespacePattern checks if a namespace matches a pattern.
// Patterns follow NATS subject matching conventions where tokens are separated by dots:
// - "*" matches exactly one token (any sequence without ".")
// - ">" matches one or more tokens (only valid at the end of a pattern)
// - Exact strings match exactly
//
// Examples:
// - "tenant-a" matches "tenant-a" (exact match)
// - "*" matches any single-token namespace like "tenant-a" or "production"
// - ">" matches any namespace with one or more tokens
// - "prod.*" matches "prod.tenant", "prod.orders" (but not "prod.tenant.orders")
// - "prod.>" matches "prod.tenant", "prod.tenant.orders", "prod.a.b.c"
// - "*.tenant.*" matches "prod.tenant.orders", "staging.tenant.events"
//
// Security Considerations:
// Wildcard subscriptions provide cross-namespace visibility. Use with caution:
// - "*" or ">" patterns receive events from ALL matching namespaces
// - This bypasses namespace isolation for the subscriber
// - Only grant wildcard subscription access to trusted system components
// - Consider auditing wildcard subscription usage
// - For multi-tenant systems, wildcard access should be restricted to admin/ops
// - Use the most specific pattern possible to minimize exposure
func MatchNamespacePattern(pattern, namespace string) bool {
// Empty pattern matches nothing
if pattern == "" {
return false
}
// ">" matches everything when used alone
if pattern == ">" {
return namespace != ""
}
patternTokens := strings.Split(pattern, ".")
namespaceTokens := strings.Split(namespace, ".")
return matchTokens(patternTokens, namespaceTokens)
}
// matchTokens recursively matches pattern tokens against namespace tokens
func matchTokens(patternTokens, namespaceTokens []string) bool {
// If pattern is exhausted, namespace must also be exhausted
if len(patternTokens) == 0 {
return len(namespaceTokens) == 0
}
patternToken := patternTokens[0]
// ">" matches one or more remaining tokens (must be last pattern token)
if patternToken == ">" {
// ">" requires at least one token to match
return len(namespaceTokens) >= 1
}
// If namespace is exhausted but pattern has more tokens, no match
if len(namespaceTokens) == 0 {
return false
}
namespaceToken := namespaceTokens[0]
// "*" matches exactly one token
if patternToken == "*" {
return matchTokens(patternTokens[1:], namespaceTokens[1:])
}
// Exact match required
if patternToken == namespaceToken {
return matchTokens(patternTokens[1:], namespaceTokens[1:])
}
return false
}
// IsWildcardPattern returns true if the pattern contains wildcards (* or >).
// Wildcard patterns can match multiple namespaces and bypass namespace isolation.
func IsWildcardPattern(pattern string) bool {
return strings.Contains(pattern, "*") || strings.Contains(pattern, ">")
}

117
pattern_test.go Normal file
View File

@@ -0,0 +1,117 @@
package aether
import "testing"
func TestMatchNamespacePattern(t *testing.T) {
tests := []struct {
name string
pattern string
namespace string
expected bool
}{
// Exact matches
{"exact match", "tenant-a", "tenant-a", true},
{"exact mismatch", "tenant-a", "tenant-b", false},
{"exact match with dots", "prod.tenant.a", "prod.tenant.a", true},
{"exact mismatch with dots", "prod.tenant.a", "prod.tenant.b", false},
// Empty cases
{"empty pattern", "", "tenant-a", false},
{"empty namespace exact", "tenant-a", "", false},
{"empty namespace catch-all", ">", "", false},
{"both empty", "", "", false},
// Single wildcard (*) - matches one token (NATS semantics: tokens are dot-separated)
{"star matches any single token", "*", "tenant-a", true},
{"star matches any single token 2", "*", "anything", true},
{"star does not match multi-token", "*", "prod.tenant", false},
{"prefix with star", "prod.*", "prod.tenant", true},
{"prefix with star 2", "prod.*", "prod.orders", true},
{"prefix with star no match extra tokens", "prod.*", "prod.tenant.orders", false},
{"prefix with star no match wrong prefix", "prod.*", "staging.tenant", false},
{"middle wildcard", "prod.*.orders", "prod.tenant.orders", true},
{"middle wildcard no match", "prod.*.orders", "prod.tenant.events", false},
{"multiple stars", "*.tenant.*", "prod.tenant.orders", true},
{"multiple stars 2", "*.*.orders", "prod.tenant.orders", true},
{"multiple stars no match", "*.*.orders", "prod.orders", false},
// Multi-token wildcard (>) - matches one or more tokens
{"greater matches one", ">", "tenant", true},
{"greater matches multi", ">", "prod.tenant.orders", true},
{"prefix greater", "prod.>", "prod.tenant", true},
{"prefix greater multi", "prod.>", "prod.tenant.orders.items", true},
{"prefix greater no match different prefix", "prod.>", "staging.tenant", false},
{"prefix greater requires at least one", "prod.>", "prod", false},
{"deep prefix greater", "prod.tenant.>", "prod.tenant.orders", true},
// Combined wildcards
{"star then greater", "*.>", "prod.tenant", true},
{"star then greater multi", "*.>", "prod.tenant.orders", true},
{"star then greater no match single", "*.>", "prod", false},
// Edge cases
{"trailing dot in pattern", "tenant.", "tenant.", true},
{"just dots", "..", "..", true},
{"star at end", "prod.tenant.*", "prod.tenant.a", true},
{"star at end no match", "prod.tenant.*", "prod.other.a", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := MatchNamespacePattern(tt.pattern, tt.namespace)
if result != tt.expected {
t.Errorf("MatchNamespacePattern(%q, %q) = %v, want %v",
tt.pattern, tt.namespace, result, tt.expected)
}
})
}
}
func TestIsWildcardPattern(t *testing.T) {
tests := []struct {
pattern string
expected bool
}{
{"tenant-a", false},
{"prod.tenant.orders", false},
{"*", true},
{"prod.*", true},
{"*.orders", true},
{">", true},
{"prod.>", true},
{"*.>", true},
{"prod.*.orders", true},
}
for _, tt := range tests {
t.Run(tt.pattern, func(t *testing.T) {
result := IsWildcardPattern(tt.pattern)
if result != tt.expected {
t.Errorf("IsWildcardPattern(%q) = %v, want %v",
tt.pattern, result, tt.expected)
}
})
}
}
func BenchmarkMatchNamespacePattern(b *testing.B) {
benchmarks := []struct {
name string
pattern string
namespace string
}{
{"exact", "tenant-a", "tenant-a"},
{"star", "*", "tenant-a"},
{"prefix_star", "prod.*", "prod.tenant"},
{"greater", ">", "prod.tenant.orders"},
{"complex", "prod.*.>", "prod.tenant.orders.items"},
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
MatchNamespacePattern(bm.pattern, bm.namespace)
}
})
}
}