Merge main and address review feedback
All checks were successful
CI / build (pull_request) Successful in 17s
All checks were successful
CI / build (pull_request) Successful in 17s
- Add mutex protection to SaveSnapshot and GetLatestSnapshot - Add nil validation in SaveSnapshot - Fix key collision in TestSnapshotDataIntegrity_LargeState (use fmt.Sprintf) - Add concurrent access tests for snapshots Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,15 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.flowmade.one/flowmade-one/aether"
|
"git.flowmade.one/flowmade-one/aether"
|
||||||
)
|
)
|
||||||
|
|
||||||
// InMemoryEventStore provides a simple in-memory event store for testing
|
// InMemoryEventStore provides a simple in-memory event store for testing
|
||||||
type InMemoryEventStore struct {
|
type InMemoryEventStore struct {
|
||||||
|
mu sync.RWMutex
|
||||||
events map[string][]*aether.Event // actorID -> events
|
events map[string][]*aether.Event // actorID -> events
|
||||||
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
snapshots map[string][]*aether.ActorSnapshot // actorID -> snapshots (sorted by version)
|
||||||
}
|
}
|
||||||
@@ -20,6 +24,9 @@ func NewInMemoryEventStore() *InMemoryEventStore {
|
|||||||
|
|
||||||
// SaveEvent saves an event to the in-memory store
|
// SaveEvent saves an event to the in-memory store
|
||||||
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
||||||
|
es.mu.Lock()
|
||||||
|
defer es.mu.Unlock()
|
||||||
|
|
||||||
if _, exists := es.events[event.ActorID]; !exists {
|
if _, exists := es.events[event.ActorID]; !exists {
|
||||||
es.events[event.ActorID] = make([]*aether.Event, 0)
|
es.events[event.ActorID] = make([]*aether.Event, 0)
|
||||||
}
|
}
|
||||||
@@ -29,6 +36,9 @@ func (es *InMemoryEventStore) SaveEvent(event *aether.Event) error {
|
|||||||
|
|
||||||
// GetEvents retrieves events for an actor from a specific version
|
// GetEvents retrieves events for an actor from a specific version
|
||||||
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*aether.Event, error) {
|
||||||
|
es.mu.RLock()
|
||||||
|
defer es.mu.RUnlock()
|
||||||
|
|
||||||
events, exists := es.events[actorID]
|
events, exists := es.events[actorID]
|
||||||
if !exists {
|
if !exists {
|
||||||
return []*aether.Event{}, nil
|
return []*aether.Event{}, nil
|
||||||
@@ -46,6 +56,9 @@ func (es *InMemoryEventStore) GetEvents(actorID string, fromVersion int64) ([]*a
|
|||||||
|
|
||||||
// GetLatestVersion returns the latest version for an actor
|
// GetLatestVersion returns the latest version for an actor
|
||||||
func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
||||||
|
es.mu.RLock()
|
||||||
|
defer es.mu.RUnlock()
|
||||||
|
|
||||||
events, exists := es.events[actorID]
|
events, exists := es.events[actorID]
|
||||||
if !exists || len(events) == 0 {
|
if !exists || len(events) == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
@@ -63,6 +76,13 @@ func (es *InMemoryEventStore) GetLatestVersion(actorID string) (int64, error) {
|
|||||||
|
|
||||||
// SaveSnapshot saves a snapshot to the in-memory store
|
// SaveSnapshot saves a snapshot to the in-memory store
|
||||||
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
|
func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error {
|
||||||
|
if snapshot == nil {
|
||||||
|
return fmt.Errorf("snapshot cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
es.mu.Lock()
|
||||||
|
defer es.mu.Unlock()
|
||||||
|
|
||||||
if _, exists := es.snapshots[snapshot.ActorID]; !exists {
|
if _, exists := es.snapshots[snapshot.ActorID]; !exists {
|
||||||
es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0)
|
es.snapshots[snapshot.ActorID] = make([]*aether.ActorSnapshot, 0)
|
||||||
}
|
}
|
||||||
@@ -72,6 +92,9 @@ func (es *InMemoryEventStore) SaveSnapshot(snapshot *aether.ActorSnapshot) error
|
|||||||
|
|
||||||
// GetLatestSnapshot returns the most recent snapshot for an actor
|
// GetLatestSnapshot returns the most recent snapshot for an actor
|
||||||
func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
func (es *InMemoryEventStore) GetLatestSnapshot(actorID string) (*aether.ActorSnapshot, error) {
|
||||||
|
es.mu.RLock()
|
||||||
|
defer es.mu.RUnlock()
|
||||||
|
|
||||||
snapshots, exists := es.snapshots[actorID]
|
snapshots, exists := es.snapshots[actorID]
|
||||||
if !exists || len(snapshots) == 0 {
|
if !exists || len(snapshots) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|||||||
1004
store/memory_test.go
1004
store/memory_test.go
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user