[Issue #22] Add EventBroadcaster metrics #49

Merged
HugoNijhuis merged 1 commits from issue-22-eventbroadcaster-metrics into main 2026-01-10 18:52:33 +00:00
Owner

Summary

Add comprehensive metrics tracking for EventBroadcaster implementations to enable observability and debugging of event publishing and subscription across namespaces.

Changes

  • Add BroadcasterMetrics interface for reading metrics per namespace
  • Add MetricsCollector interface and DefaultMetricsCollector implementation with thread-safe counters
  • Track events_published and events_received counters per namespace
  • Track active_subscriptions gauge per namespace
  • Track publish_errors, subscribe_errors, and dropped_events counters
  • Add MetricsProvider interface for EventBroadcaster implementations to expose metrics
  • Integrate metrics tracking into EventBus and NATSEventBus
  • Add optional Prometheus integration via PrometheusMetricsAdapter that implements prometheus.Collector
  • Add comprehensive unit tests for metrics functionality

Usage Example

// Access metrics from EventBus
eb := aether.NewEventBus()
metrics := eb.Metrics()
published := metrics.EventsPublished("my-namespace")
activeSubscriptions := metrics.TotalActiveSubscriptions()

// Optional Prometheus integration
adapter := aether.NewPrometheusMetricsAdapter(eb.Metrics())
prometheus.MustRegister(adapter)

Closes #22

## Summary Add comprehensive metrics tracking for EventBroadcaster implementations to enable observability and debugging of event publishing and subscription across namespaces. ## Changes - Add `BroadcasterMetrics` interface for reading metrics per namespace - Add `MetricsCollector` interface and `DefaultMetricsCollector` implementation with thread-safe counters - Track `events_published` and `events_received` counters per namespace - Track `active_subscriptions` gauge per namespace - Track `publish_errors`, `subscribe_errors`, and `dropped_events` counters - Add `MetricsProvider` interface for EventBroadcaster implementations to expose metrics - Integrate metrics tracking into `EventBus` and `NATSEventBus` - Add optional Prometheus integration via `PrometheusMetricsAdapter` that implements `prometheus.Collector` - Add comprehensive unit tests for metrics functionality ## Usage Example ```go // Access metrics from EventBus eb := aether.NewEventBus() metrics := eb.Metrics() published := metrics.EventsPublished("my-namespace") activeSubscriptions := metrics.TotalActiveSubscriptions() // Optional Prometheus integration adapter := aether.NewPrometheusMetricsAdapter(eb.Metrics()) prometheus.MustRegister(adapter) ``` Closes #22
HugoNijhuis added 1 commit 2026-01-10 18:23:12 +00:00
Add EventBroadcaster metrics for observability and debugging
All checks were successful
CI / build (pull_request) Successful in 38s
71c6011f49
Implement comprehensive metrics tracking for EventBroadcaster implementations:

- Add BroadcasterMetrics interface for reading metrics per namespace
- Add MetricsCollector interface and DefaultMetricsCollector implementation
- Track events_published and events_received counters per namespace
- Track active_subscriptions gauge per namespace
- Track publish_errors, subscribe_errors, and dropped_events counters
- Add MetricsProvider interface for EventBroadcaster implementations
- Integrate metrics tracking into EventBus and NATSEventBus
- Add optional Prometheus integration via PrometheusMetricsAdapter

Closes #22

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
HugoNijhuis force-pushed issue-22-eventbroadcaster-metrics from 71c6011f49 to dae751a6ef 2026-01-10 18:51:29 +00:00 Compare
Author
Owner

Rebased on main and fixed the implementation to work with the wildcard subscription support. The metrics are now integrated with both exact and wildcard subscriptions. All tests pass.

Rebased on main and fixed the implementation to work with the wildcard subscription support. The metrics are now integrated with both exact and wildcard subscriptions. All tests pass.
HugoNijhuis merged commit e3dbe3d52d into main 2026-01-10 18:52:33 +00:00
Author
Owner

AI Code Review

This is an automated review generated by the code-reviewer agent.

Summary

This PR adds comprehensive metrics tracking for EventBroadcaster implementations. The implementation is well-structured with clean interfaces and thread-safe operations using atomics. However, there are several issues ranging from logic bugs to missing test coverage.

Findings

Code Quality

  • metrics.go:291-311 - The double-checked locking pattern in getOrCreateNamespace is correct and well-implemented
  • metrics.go - Good separation of concerns with BroadcasterMetrics (read-only) and MetricsCollector (write) interfaces
  • metrics_prometheus.go - Clean Prometheus adapter implementation following the Collector pattern
  • eventbus.go:117-120 - CRITICAL BUG: The Stop() method has a logic error. It calls RecordUnsubscribe for each subscription but then closes all channels in a second loop. This means subscriptions are decremented before channels are closed, which could cause issues if the metrics are read during shutdown

Potential Bugs

  • eventbus.go:100-101 - LOGIC ERROR: RecordReceive is called inside the select case when an event is successfully sent. This means if there are 3 subscribers and an event is published, RecordReceive will be called 3 times for a single publish. This inflates the EventsReceived counter. The semantics are unclear - does "received" mean "delivered to subscriber" or "received by EventBus"? This needs clarification
  • nats_eventbus.go:114-115 - Inconsistent with EventBus: RecordReceive is called once when receiving from NATS (which makes sense for cross-node events), but EventBus calls it multiple times per publish
  • nats_eventbus.go:118 - After recording receive, the code calls EventBus.Publish which will record another publish and potentially more receives. This means a single cross-node event results in: 1 RecordReceive (line 115) + 1 RecordPublish (from EventBus.Publish) + N RecordReceive calls (one per subscriber). This creates confusing/incorrect metrics

Security Concerns

  • No security issues identified

Style Notes

  • go.mod:103 - Minor: Go version changed from 1.23 to 1.23.0 - this is unnecessary and creates noise in the diff
  • metrics_test.go:851 - Bug in test: string conversion rune(i) will produce unprintable characters for i < 32 and won't produce unique strings as intended. Should use strconv.Itoa(i) instead
  • Code is generally well-formatted and follows Go conventions

Test Coverage

  • Missing: No tests for PrometheusMetricsAdapter (metrics_prometheus.go)
  • Missing: No integration tests showing the interaction between EventBus metrics and NATSEventBus metrics
  • Good: Comprehensive unit tests for DefaultMetricsCollector including concurrency tests
  • Good: Tests for EventBus metrics integration
  • Issue: The test TestEventBus_DroppedEvents has a potential flakyness issue - it assumes synchronous delivery but doesn't wait/verify

Recommendations

  1. Fix the EventsReceived semantics - decide if it means "events received by the broadcaster" (should be called once per Publish) or "events delivered to subscribers" (current behavior but confusing name)
  2. Fix the NATS double-counting issue in handleNATSEvent
  3. Fix the Stop() method to close channels first, then update metrics
  4. Add Prometheus adapter tests
  5. Consider renaming EventsReceived to EventsDelivered if current behavior is intended
  6. Fix the test bug in TestEventBus_DroppedEvents (line 851)

Verdict

Needs Changes

The PR has good structure and comprehensive metrics coverage, but contains logic bugs that will result in incorrect metric values, particularly around EventsReceived counting. These issues should be addressed before merge.

## AI Code Review > This is an automated review generated by the code-reviewer agent. ### Summary This PR adds comprehensive metrics tracking for EventBroadcaster implementations. The implementation is well-structured with clean interfaces and thread-safe operations using atomics. However, there are several issues ranging from logic bugs to missing test coverage. ### Findings #### Code Quality - **metrics.go:291-311** - The double-checked locking pattern in getOrCreateNamespace is correct and well-implemented - **metrics.go** - Good separation of concerns with BroadcasterMetrics (read-only) and MetricsCollector (write) interfaces - **metrics_prometheus.go** - Clean Prometheus adapter implementation following the Collector pattern - **eventbus.go:117-120** - CRITICAL BUG: The Stop() method has a logic error. It calls RecordUnsubscribe for each subscription but then closes all channels in a second loop. This means subscriptions are decremented before channels are closed, which could cause issues if the metrics are read during shutdown #### Potential Bugs - **eventbus.go:100-101** - LOGIC ERROR: RecordReceive is called inside the select case when an event is successfully sent. This means if there are 3 subscribers and an event is published, RecordReceive will be called 3 times for a single publish. This inflates the EventsReceived counter. The semantics are unclear - does "received" mean "delivered to subscriber" or "received by EventBus"? This needs clarification - **nats_eventbus.go:114-115** - Inconsistent with EventBus: RecordReceive is called once when receiving from NATS (which makes sense for cross-node events), but EventBus calls it multiple times per publish - **nats_eventbus.go:118** - After recording receive, the code calls EventBus.Publish which will record another publish and potentially more receives. This means a single cross-node event results in: 1 RecordReceive (line 115) + 1 RecordPublish (from EventBus.Publish) + N RecordReceive calls (one per subscriber). This creates confusing/incorrect metrics #### Security Concerns - No security issues identified #### Style Notes - **go.mod:103** - Minor: Go version changed from 1.23 to 1.23.0 - this is unnecessary and creates noise in the diff - **metrics_test.go:851** - Bug in test: string conversion rune(i) will produce unprintable characters for i < 32 and won't produce unique strings as intended. Should use strconv.Itoa(i) instead - Code is generally well-formatted and follows Go conventions #### Test Coverage - **Missing**: No tests for PrometheusMetricsAdapter (metrics_prometheus.go) - **Missing**: No integration tests showing the interaction between EventBus metrics and NATSEventBus metrics - **Good**: Comprehensive unit tests for DefaultMetricsCollector including concurrency tests - **Good**: Tests for EventBus metrics integration - **Issue**: The test TestEventBus_DroppedEvents has a potential flakyness issue - it assumes synchronous delivery but doesn't wait/verify ### Recommendations 1. Fix the EventsReceived semantics - decide if it means "events received by the broadcaster" (should be called once per Publish) or "events delivered to subscribers" (current behavior but confusing name) 2. Fix the NATS double-counting issue in handleNATSEvent 3. Fix the Stop() method to close channels first, then update metrics 4. Add Prometheus adapter tests 5. Consider renaming EventsReceived to EventsDelivered if current behavior is intended 6. Fix the test bug in TestEventBus_DroppedEvents (line 851) ### Verdict **Needs Changes** The PR has good structure and comprehensive metrics coverage, but contains logic bugs that will result in incorrect metric values, particularly around EventsReceived counting. These issues should be addressed before merge.
Sign in to join this conversation.
No Reviewers
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: flowmade-one/aether#49