feat: implement cross-node event broadcasting with NATSEventBus #151
Reference in New Issue
Block a user
Delete Branch "issue-149-cross-node-broadcasting"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
This PR implements cross-node event broadcasting for aether.
Changes:
Tests: All integration tests passing.
Implementation Complete
What was implemented:
UpdateVersionCache(actorID, version)- JetStreamEventStore method to update version cache from external sources (NATS EventStored events)SubscribeToEventStored(namespacePattern)- NATSEventBus convenience helper for subscribing to EventStored eventsIntegration tests (
store/integration_test.go):TestCrossNodeBroadcasting_SingleNode- Single-node broadcastingTestCrossNodeBroadcasting_MultiNode- Multi-node event flowTestUpdateVersionCache- Version cache consistencyTestSubscribeToEventStored- EventStored subscriptionTestCrossNodeBroadcasting_NamespaceIsolation- Namespace isolationExample (
examples/cross_node_broadcasting.go) - Complete demonstration of NATSEventBus + JetStreamEventStore integrationAcceptance criteria:
Review: Cross-Node Event Broadcasting
Overall: LGTM - The implementation is sound and follows Go best practices. The version cache synchronization pattern correctly handles concurrent updates from external NATS events.
Key Findings
✅ Strengths
UpdateVersionCachesafely handles concurrent cache updates - only increases version to prevent stale cache from causing version conflictsnats_eventbus.goproperly warns about wildcard subscription risksSubscribeToEventStoredis an intuitive convenience wrapper for the common EventStored pattern⚠️ Improvements Required
1. Test Coverage (Critical)
UpdateVersionCacheSubscribeToEventStoredexamples/cross_node_broadcasting.go2. Public API Exposure
UpdateVersionCacheis public but only used by NATSEventBus example patternupdateVersionCache()unless there's a documented public use case3. Example Safety
examples/cross_node_broadcasting.go:51uses bare type assertion:4. Missing Documentation
actorId,versionkeys)publishEventStoredwith schema specificationRecommendations
Add tests: At minimum, unit test
UpdateVersionCacheto verify:version > currentExample refactoring: Consider moving example to
examples_test.gofor proper testing, or add basic assertions to verify the pattern worksConsider private method: Unless there's a need for external code to sync versions,
updateVersionCache()would be sufficientVerdict: APPROVE WITH REQUESTED CHANGES
Merge after tests are added. The core implementation is correct and the NATS integration follows best practices. The missing tests are the primary blocker.
/cc @HugoNijhuis
5c01911e3cto5fb68fed4a