diff --git a/CLAUDE.md b/CLAUDE.md index d67d261..df9a26d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -124,7 +124,11 @@ if errors.Is(err, aether.ErrVersionConflict) { ### Namespace Isolation -Namespaces provide logical boundaries for events and subscriptions: +Namespaces provide logical boundaries for events and subscriptions. + +#### Event Bus Namespaces + +The event bus supports namespace-scoped pub/sub: ```go // Subscribe to events in a namespace @@ -134,6 +138,36 @@ ch := eventBus.Subscribe("tenant-abc") eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this ``` +#### Namespace-Scoped Event Stores + +JetStreamEventStore supports optional namespace prefixes for complete storage isolation: + +```go +// Create a namespaced event store (convenience function) +store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc") + +// Or configure via JetStreamConfig +config := store.JetStreamConfig{ + Namespace: "tenant-abc", + StreamRetention: 30 * 24 * time.Hour, + ReplicaCount: 3, +} +store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config) + +// The actual stream name becomes "tenant-abc_events" +// Events from one namespace cannot be read from another namespace's store +``` + +Namespace isolation at the storage level ensures: +- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces +- **Backward compatibility**: Empty namespace (default) works exactly as before +- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores) + +Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as: +- Multi-tenant deployments where tenant data must be completely separated +- Logical boundaries between different domains or bounded contexts +- Test isolation in integration tests + ### Clustering Aether handles node discovery, leader election, and shard distribution: diff --git a/store/jetstream.go b/store/jetstream.go index 9e66eed..5bb8605 100644 --- a/store/jetstream.go +++ b/store/jetstream.go @@ -23,6 +23,11 @@ type JetStreamConfig struct { StreamRetention time.Duration // ReplicaCount is the number of replicas for high availability (default: 1) ReplicaCount int + // Namespace is an optional prefix for stream names to provide storage isolation. + // When set, the actual stream name becomes "{namespace}_{streamName}". + // Events in namespaced stores are completely isolated from other namespaces. + // Leave empty for backward-compatible non-namespaced behavior. + Namespace string } // DefaultJetStreamConfig returns the default configuration @@ -48,6 +53,15 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig()) } +// NewJetStreamEventStoreWithNamespace creates a new JetStream-based event store with namespace isolation. +// The namespace is prefixed to the stream name to ensure complete isolation at the storage level. +// This is a convenience function; the same can be achieved by setting Namespace in JetStreamConfig. +func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string, namespace string) (*JetStreamEventStore, error) { + config := DefaultJetStreamConfig() + config.Namespace = namespace + return NewJetStreamEventStoreWithConfig(natsConn, streamName, config) +} + // NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) { js, err := natsConn.JetStream() @@ -63,10 +77,16 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co config.ReplicaCount = DefaultReplicaCount } + // Apply namespace prefix to stream name if provided + effectiveStreamName := streamName + if config.Namespace != "" { + effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName) + } + // Create or update the stream stream := &nats.StreamConfig{ - Name: streamName, - Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)}, + Name: effectiveStreamName, + Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)}, Storage: nats.FileStorage, Retention: nats.LimitsPolicy, MaxAge: config.StreamRetention, @@ -80,12 +100,22 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co return &JetStreamEventStore{ js: js, - streamName: streamName, + streamName: effectiveStreamName, config: config, versions: make(map[string]int64), }, nil } +// GetNamespace returns the namespace configured for this store, or empty string if not namespaced. +func (jes *JetStreamEventStore) GetNamespace() string { + return jes.config.Namespace +} + +// GetStreamName returns the effective stream name (including namespace prefix if applicable). +func (jes *JetStreamEventStore) GetStreamName() string { + return jes.streamName +} + // SaveEvent persists an event to JetStream. // Returns VersionConflictError if the event's version is not strictly greater // than the current latest version for the actor. diff --git a/store/namespace_test.go b/store/namespace_test.go new file mode 100644 index 0000000..3b66b3c --- /dev/null +++ b/store/namespace_test.go @@ -0,0 +1,124 @@ +package store + +import ( + "testing" +) + +func TestJetStreamConfigNamespace(t *testing.T) { + t.Run("default config has empty namespace", func(t *testing.T) { + config := DefaultJetStreamConfig() + if config.Namespace != "" { + t.Errorf("expected empty namespace in default config, got %q", config.Namespace) + } + }) + + t.Run("namespace can be set in config", func(t *testing.T) { + config := JetStreamConfig{ + Namespace: "tenant-abc", + } + if config.Namespace != "tenant-abc" { + t.Errorf("expected namespace tenant-abc, got %q", config.Namespace) + } + }) +} + +func TestNamespacedStreamName(t *testing.T) { + tests := []struct { + name string + baseStreamName string + namespace string + expectedStreamName string + }{ + { + name: "no namespace - stream name unchanged", + baseStreamName: "events", + namespace: "", + expectedStreamName: "events", + }, + { + name: "with namespace - prefixed stream name", + baseStreamName: "events", + namespace: "tenant-abc", + expectedStreamName: "tenant-abc_events", + }, + { + name: "namespace with dots - sanitized", + baseStreamName: "events", + namespace: "tenant.abc", + expectedStreamName: "tenant_abc_events", + }, + { + name: "namespace with spaces - sanitized", + baseStreamName: "events", + namespace: "tenant abc", + expectedStreamName: "tenant_abc_events", + }, + { + name: "namespace with special chars - sanitized", + baseStreamName: "events", + namespace: "tenant*abc>def", + expectedStreamName: "tenant_abc_def_events", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // We can't create a real JetStreamEventStore without NATS, + // but we can test the stream name logic by examining the expected format + effectiveStreamName := tt.baseStreamName + if tt.namespace != "" { + effectiveStreamName = sanitizeSubject(tt.namespace) + "_" + tt.baseStreamName + } + + if effectiveStreamName != tt.expectedStreamName { + t.Errorf("expected stream name %q, got %q", tt.expectedStreamName, effectiveStreamName) + } + }) + } +} + +func TestSanitizeSubject(t *testing.T) { + tests := []struct { + input string + expected string + }{ + {"simple", "simple"}, + {"with spaces", "with_spaces"}, + {"with.dots", "with_dots"}, + {"with*stars", "with_stars"}, + {"with>greater", "with_greater"}, + {"complex.name with*special>chars", "complex_name_with_special_chars"}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + result := sanitizeSubject(tt.input) + if result != tt.expected { + t.Errorf("sanitizeSubject(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} + +func TestExtractActorType(t *testing.T) { + tests := []struct { + actorID string + expectedType string + }{ + {"order-123", "order"}, + {"user-abc-def", "user"}, + {"nodelimiter", "unknown"}, + {"", "unknown"}, + {"-leadingdash", "unknown"}, + {"a-b", "a"}, + } + + for _, tt := range tests { + t.Run(tt.actorID, func(t *testing.T) { + result := extractActorType(tt.actorID) + if result != tt.expectedType { + t.Errorf("extractActorType(%q) = %q, want %q", tt.actorID, result, tt.expectedType) + } + }) + } +}