[Issue #19] Add namespace-scoped event stores #48
36
CLAUDE.md
36
CLAUDE.md
@@ -124,7 +124,11 @@ if errors.Is(err, aether.ErrVersionConflict) {
|
||||
|
||||
### Namespace Isolation
|
||||
|
||||
Namespaces provide logical boundaries for events and subscriptions:
|
||||
Namespaces provide logical boundaries for events and subscriptions.
|
||||
|
||||
#### Event Bus Namespaces
|
||||
|
||||
The event bus supports namespace-scoped pub/sub:
|
||||
|
||||
```go
|
||||
// Subscribe to events in a namespace
|
||||
@@ -134,6 +138,36 @@ ch := eventBus.Subscribe("tenant-abc")
|
||||
eventBus.Publish("tenant-abc", event) // Only tenant-abc subscribers see this
|
||||
```
|
||||
|
||||
#### Namespace-Scoped Event Stores
|
||||
|
||||
JetStreamEventStore supports optional namespace prefixes for complete storage isolation:
|
||||
|
||||
```go
|
||||
// Create a namespaced event store (convenience function)
|
||||
store, err := store.NewJetStreamEventStoreWithNamespace(natsConn, "events", "tenant-abc")
|
||||
|
||||
// Or configure via JetStreamConfig
|
||||
config := store.JetStreamConfig{
|
||||
Namespace: "tenant-abc",
|
||||
StreamRetention: 30 * 24 * time.Hour,
|
||||
ReplicaCount: 3,
|
||||
}
|
||||
store, err := store.NewJetStreamEventStoreWithConfig(natsConn, "events", config)
|
||||
|
||||
// The actual stream name becomes "tenant-abc_events"
|
||||
// Events from one namespace cannot be read from another namespace's store
|
||||
```
|
||||
|
||||
Namespace isolation at the storage level ensures:
|
||||
- **Complete data isolation**: Events stored with one namespace prefix are invisible to stores with different namespaces
|
||||
- **Backward compatibility**: Empty namespace (default) works exactly as before
|
||||
- **Safe characters**: Namespace names are sanitized (spaces, dots, wildcards become underscores)
|
||||
|
||||
Use namespace-scoped stores when you need strong isolation guarantees at the persistence layer, such as:
|
||||
- Multi-tenant deployments where tenant data must be completely separated
|
||||
- Logical boundaries between different domains or bounded contexts
|
||||
- Test isolation in integration tests
|
||||
|
||||
### Clustering
|
||||
|
||||
Aether handles node discovery, leader election, and shard distribution:
|
||||
|
||||
@@ -23,6 +23,11 @@ type JetStreamConfig struct {
|
||||
StreamRetention time.Duration
|
||||
// ReplicaCount is the number of replicas for high availability (default: 1)
|
||||
ReplicaCount int
|
||||
// Namespace is an optional prefix for stream names to provide storage isolation.
|
||||
// When set, the actual stream name becomes "{namespace}_{streamName}".
|
||||
// Events in namespaced stores are completely isolated from other namespaces.
|
||||
// Leave empty for backward-compatible non-namespaced behavior.
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// DefaultJetStreamConfig returns the default configuration
|
||||
@@ -48,6 +53,15 @@ func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamE
|
||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||
}
|
||||
|
||||
// NewJetStreamEventStoreWithNamespace creates a new JetStream-based event store with namespace isolation.
|
||||
// The namespace is prefixed to the stream name to ensure complete isolation at the storage level.
|
||||
// This is a convenience function; the same can be achieved by setting Namespace in JetStreamConfig.
|
||||
func NewJetStreamEventStoreWithNamespace(natsConn *nats.Conn, streamName string, namespace string) (*JetStreamEventStore, error) {
|
||||
config := DefaultJetStreamConfig()
|
||||
config.Namespace = namespace
|
||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, config)
|
||||
}
|
||||
|
||||
// NewJetStreamEventStoreWithConfig creates a new JetStream-based event store with custom configuration
|
||||
func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, config JetStreamConfig) (*JetStreamEventStore, error) {
|
||||
js, err := natsConn.JetStream()
|
||||
@@ -63,10 +77,16 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
||||
config.ReplicaCount = DefaultReplicaCount
|
||||
}
|
||||
|
||||
// Apply namespace prefix to stream name if provided
|
||||
effectiveStreamName := streamName
|
||||
if config.Namespace != "" {
|
||||
effectiveStreamName = fmt.Sprintf("%s_%s", sanitizeSubject(config.Namespace), streamName)
|
||||
}
|
||||
|
||||
// Create or update the stream
|
||||
stream := &nats.StreamConfig{
|
||||
Name: streamName,
|
||||
Subjects: []string{fmt.Sprintf("%s.events.>", streamName), fmt.Sprintf("%s.snapshots.>", streamName)},
|
||||
Name: effectiveStreamName,
|
||||
Subjects: []string{fmt.Sprintf("%s.events.>", effectiveStreamName), fmt.Sprintf("%s.snapshots.>", effectiveStreamName)},
|
||||
Storage: nats.FileStorage,
|
||||
Retention: nats.LimitsPolicy,
|
||||
MaxAge: config.StreamRetention,
|
||||
@@ -80,12 +100,22 @@ func NewJetStreamEventStoreWithConfig(natsConn *nats.Conn, streamName string, co
|
||||
|
||||
return &JetStreamEventStore{
|
||||
js: js,
|
||||
streamName: streamName,
|
||||
streamName: effectiveStreamName,
|
||||
config: config,
|
||||
versions: make(map[string]int64),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetNamespace returns the namespace configured for this store, or empty string if not namespaced.
|
||||
func (jes *JetStreamEventStore) GetNamespace() string {
|
||||
return jes.config.Namespace
|
||||
}
|
||||
|
||||
// GetStreamName returns the effective stream name (including namespace prefix if applicable).
|
||||
func (jes *JetStreamEventStore) GetStreamName() string {
|
||||
return jes.streamName
|
||||
}
|
||||
|
||||
// SaveEvent persists an event to JetStream.
|
||||
// Returns VersionConflictError if the event's version is not strictly greater
|
||||
// than the current latest version for the actor.
|
||||
|
||||
124
store/namespace_test.go
Normal file
124
store/namespace_test.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestJetStreamConfigNamespace(t *testing.T) {
|
||||
t.Run("default config has empty namespace", func(t *testing.T) {
|
||||
config := DefaultJetStreamConfig()
|
||||
if config.Namespace != "" {
|
||||
t.Errorf("expected empty namespace in default config, got %q", config.Namespace)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("namespace can be set in config", func(t *testing.T) {
|
||||
config := JetStreamConfig{
|
||||
Namespace: "tenant-abc",
|
||||
}
|
||||
if config.Namespace != "tenant-abc" {
|
||||
t.Errorf("expected namespace tenant-abc, got %q", config.Namespace)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespacedStreamName(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
baseStreamName string
|
||||
namespace string
|
||||
expectedStreamName string
|
||||
}{
|
||||
{
|
||||
name: "no namespace - stream name unchanged",
|
||||
baseStreamName: "events",
|
||||
namespace: "",
|
||||
expectedStreamName: "events",
|
||||
},
|
||||
{
|
||||
name: "with namespace - prefixed stream name",
|
||||
baseStreamName: "events",
|
||||
namespace: "tenant-abc",
|
||||
expectedStreamName: "tenant-abc_events",
|
||||
},
|
||||
{
|
||||
name: "namespace with dots - sanitized",
|
||||
baseStreamName: "events",
|
||||
namespace: "tenant.abc",
|
||||
expectedStreamName: "tenant_abc_events",
|
||||
},
|
||||
{
|
||||
name: "namespace with spaces - sanitized",
|
||||
baseStreamName: "events",
|
||||
namespace: "tenant abc",
|
||||
expectedStreamName: "tenant_abc_events",
|
||||
},
|
||||
{
|
||||
name: "namespace with special chars - sanitized",
|
||||
baseStreamName: "events",
|
||||
namespace: "tenant*abc>def",
|
||||
expectedStreamName: "tenant_abc_def_events",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// We can't create a real JetStreamEventStore without NATS,
|
||||
// but we can test the stream name logic by examining the expected format
|
||||
effectiveStreamName := tt.baseStreamName
|
||||
if tt.namespace != "" {
|
||||
effectiveStreamName = sanitizeSubject(tt.namespace) + "_" + tt.baseStreamName
|
||||
}
|
||||
|
||||
if effectiveStreamName != tt.expectedStreamName {
|
||||
t.Errorf("expected stream name %q, got %q", tt.expectedStreamName, effectiveStreamName)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSanitizeSubject(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
expected string
|
||||
}{
|
||||
{"simple", "simple"},
|
||||
{"with spaces", "with_spaces"},
|
||||
{"with.dots", "with_dots"},
|
||||
{"with*stars", "with_stars"},
|
||||
{"with>greater", "with_greater"},
|
||||
{"complex.name with*special>chars", "complex_name_with_special_chars"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
result := sanitizeSubject(tt.input)
|
||||
if result != tt.expected {
|
||||
t.Errorf("sanitizeSubject(%q) = %q, want %q", tt.input, result, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractActorType(t *testing.T) {
|
||||
tests := []struct {
|
||||
actorID string
|
||||
expectedType string
|
||||
}{
|
||||
{"order-123", "order"},
|
||||
{"user-abc-def", "user"},
|
||||
{"nodelimiter", "unknown"},
|
||||
{"", "unknown"},
|
||||
{"-leadingdash", "unknown"},
|
||||
{"a-b", "a"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.actorID, func(t *testing.T) {
|
||||
result := extractActorType(tt.actorID)
|
||||
if result != tt.expectedType {
|
||||
t.Errorf("extractActorType(%q) = %q, want %q", tt.actorID, result, tt.expectedType)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user