Compare commits
3 Commits
de30e1ef1b
...
e69f7a30e4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e69f7a30e4 | ||
|
|
a258ec9754 | ||
|
|
9d4ed1dd08 |
24
CLAUDE.md
24
CLAUDE.md
@@ -122,30 +122,6 @@ if errors.Is(err, aether.ErrVersionConflict) {
|
|||||||
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
|
- `ErrVersionConflict` - Sentinel error for version conflicts (use with `errors.Is`)
|
||||||
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
|
- `VersionConflictError` - Detailed error with ActorID, CurrentVersion, and AttemptedVersion
|
||||||
|
|
||||||
#### Version Cache Invalidation
|
|
||||||
|
|
||||||
The JetStreamEventStore maintains an in-memory cache of actor versions to optimize
|
|
||||||
repeated version checks during optimistic concurrency control. The cache is designed
|
|
||||||
to handle multi-store scenarios where external processes may write to the same
|
|
||||||
JetStream stream:
|
|
||||||
|
|
||||||
- **Cache hits**: Cached version is returned immediately for performance
|
|
||||||
- **Cache misses**: If no cached version exists, JetStream is queried and cached
|
|
||||||
- **External writes**: If GetLatestVersion detects a version newer than cached, the cache is invalidated and fresh data is fetched next time
|
|
||||||
|
|
||||||
This strategy ensures data consistency even in scenarios with external writers while
|
|
||||||
maintaining excellent performance for the single-writer case (where only Aether owns
|
|
||||||
the stream).
|
|
||||||
|
|
||||||
**Implementation detail**: The cache is invalidated by deleting the entry, forcing
|
|
||||||
a fresh fetch from JetStream on the next version check for that actor. This is
|
|
||||||
safe because:
|
|
||||||
|
|
||||||
1. SaveEvent uses getLatestVersionLocked() which checks JetStream on cache miss
|
|
||||||
2. GetLatestVersion always fetches fresh data and detects stale cache entries
|
|
||||||
3. Subsequent checks will fetch from JetStream until the cache is repopulated
|
|
||||||
|
|
||||||
|
|
||||||
### Namespace Isolation
|
### Namespace Isolation
|
||||||
|
|
||||||
Namespaces provide logical boundaries for events and subscriptions.
|
Namespaces provide logical boundaries for events and subscriptions.
|
||||||
|
|||||||
@@ -40,24 +40,6 @@ func DefaultJetStreamConfig() JetStreamConfig {
|
|||||||
|
|
||||||
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
// JetStreamEventStore implements EventStore using NATS JetStream for persistence.
|
||||||
// It also implements EventStoreWithErrors to report malformed events during replay.
|
// It also implements EventStoreWithErrors to report malformed events during replay.
|
||||||
//
|
|
||||||
// ## Version Cache Invalidation Strategy
|
|
||||||
//
|
|
||||||
// JetStreamEventStore maintains an in-memory cache of actor versions for optimistic
|
|
||||||
// concurrency control. The cache is invalidated on any miss (GetLatestVersion call
|
|
||||||
// that finds a newer version in JetStream) to ensure consistency even when external
|
|
||||||
// processes write to the same JetStream stream.
|
|
||||||
//
|
|
||||||
// If only Aether owns the stream (single-writer assumption), the cache provides
|
|
||||||
// excellent performance for repeated version checks. If external writers modify
|
|
||||||
// the stream, the cache will remain consistent because:
|
|
||||||
//
|
|
||||||
// 1. On SaveEvent: getLatestVersionLocked() checks JetStream on cache miss
|
|
||||||
// 2. On GetLatestVersion: If actual version > cached version, cache is invalidated
|
|
||||||
// 3. Subsequent checks for that actor will fetch fresh data from JetStream
|
|
||||||
//
|
|
||||||
// This strategy prevents data corruption from stale cache while maintaining
|
|
||||||
// performance for the single-writer case.
|
|
||||||
type JetStreamEventStore struct {
|
type JetStreamEventStore struct {
|
||||||
js nats.JetStreamContext
|
js nats.JetStreamContext
|
||||||
streamName string
|
streamName string
|
||||||
@@ -66,15 +48,6 @@ type JetStreamEventStore struct {
|
|||||||
versions map[string]int64 // actorID -> latest version cache
|
versions map[string]int64 // actorID -> latest version cache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
// NewJetStreamEventStore creates a new JetStream-based event store with default configuration
|
||||||
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
func NewJetStreamEventStore(natsConn *nats.Conn, streamName string) (*JetStreamEventStore, error) {
|
||||||
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
return NewJetStreamEventStoreWithConfig(natsConn, streamName, DefaultJetStreamConfig())
|
||||||
|
|||||||
@@ -1322,119 +1322,6 @@ func TestJetStreamEventStore_MultipleStoreInstances(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// === Cache Invalidation Tests ===
|
|
||||||
|
|
||||||
func TestJetStreamEventStore_CacheInvalidationOnExternalWrite(t *testing.T) {
|
|
||||||
nc := getTestNATSConnection(t)
|
|
||||||
defer nc.Close()
|
|
||||||
|
|
||||||
streamName := uniqueStreamName("test-cache-invalidation")
|
|
||||||
defer cleanupStream(nc, streamName)
|
|
||||||
|
|
||||||
// Create two stores for the same stream
|
|
||||||
store1, err := NewJetStreamEventStore(nc, streamName)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create store1: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
store2, err := NewJetStreamEventStore(nc, streamName)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create store2: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
actorID := "actor-cache-test"
|
|
||||||
|
|
||||||
// store1: Save event v1 (caches version 1)
|
|
||||||
event1 := &aether.Event{
|
|
||||||
ID: "evt-1",
|
|
||||||
EventType: "TestEvent",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: 1,
|
|
||||||
Data: map[string]interface{}{},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
if err := store1.SaveEvent(event1); err != nil {
|
|
||||||
t.Fatalf("SaveEvent from store1 failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify store1 sees version 1 (uses cache)
|
|
||||||
v1, err := store1.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GetLatestVersion from store1 failed: %v", err)
|
|
||||||
}
|
|
||||||
if v1 != 1 {
|
|
||||||
t.Errorf("store1 should see version 1, got %d", v1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// store2: Save event v2 (external write from store1's perspective)
|
|
||||||
event2 := &aether.Event{
|
|
||||||
ID: "evt-2",
|
|
||||||
EventType: "TestEvent",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: 2,
|
|
||||||
Data: map[string]interface{}{},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
if err := store2.SaveEvent(event2); err != nil {
|
|
||||||
t.Fatalf("SaveEvent from store2 failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// store1: GetLatestVersion should detect external write and return v2
|
|
||||||
// (This triggers cache invalidation because actual version > cached version)
|
|
||||||
v2, err := store1.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GetLatestVersion from store1 (after external write) failed: %v", err)
|
|
||||||
}
|
|
||||||
if v2 != 2 {
|
|
||||||
t.Errorf("store1 should see version 2 after external write, got %d", v2)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify cache was repopulated - second GetLatestVersion should use cache efficiently
|
|
||||||
v2Again, err := store1.GetLatestVersion(actorID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Second GetLatestVersion from store1 failed: %v", err)
|
|
||||||
}
|
|
||||||
if v2Again != 2 {
|
|
||||||
t.Errorf("store1 cache should have version 2, got %d", v2Again)
|
|
||||||
}
|
|
||||||
|
|
||||||
// store2: Save event v3 (another external write)
|
|
||||||
event3 := &aether.Event{
|
|
||||||
ID: "evt-3",
|
|
||||||
EventType: "TestEvent",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: 3,
|
|
||||||
Data: map[string]interface{}{},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
if err := store2.SaveEvent(event3); err != nil {
|
|
||||||
t.Fatalf("SaveEvent from store2 (v3) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// store1: After cache invalidation, SaveEvent should use fresh data from JetStream
|
|
||||||
event4 := &aether.Event{
|
|
||||||
ID: "evt-4",
|
|
||||||
EventType: "TestEvent",
|
|
||||||
ActorID: actorID,
|
|
||||||
Version: 4,
|
|
||||||
Data: map[string]interface{}{},
|
|
||||||
Timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
if err := store1.SaveEvent(event4); err != nil {
|
|
||||||
t.Fatalf("SaveEvent from store1 (after cache invalidation) failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify all 4 events are persisted
|
|
||||||
events, err := store1.GetEvents(actorID, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("GetEvents failed: %v", err)
|
|
||||||
}
|
|
||||||
if len(events) != 4 {
|
|
||||||
t.Errorf("expected 4 events after cache invalidation, got %d", len(events))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// === Interface Compliance Tests ===
|
// === Interface Compliance Tests ===
|
||||||
|
|
||||||
func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) {
|
func TestJetStreamEventStore_ImplementsEventStore(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user