This documentation is part of the "Projects with Books" initiative at zenOSmosis.
The source code for this project is available on GitHub.
Core Storage Engine
Loading…
Core Storage Engine
Relevant source files
- src/lib.rs
- src/storage_engine.rs
- src/storage_engine/data_store.rs
- src/storage_engine/entry_iterator.rs
The core storage engine is the foundational layer of SIMD R Drive, providing append-only, memory-mapped binary storage with zero-copy access. This document covers the high-level architecture and key components that implement the storage functionality.
For network-based access to the storage engine, see Network Layer and RPC. For Python integration, see Python Integration. For performance optimizations including SIMD acceleration, see Performance Optimizations.
Purpose and Scope
This page introduces the core storage engine’s architecture, primary components, and design principles. Detailed information about specific subsystems is covered in child pages:
Architecture Overview
The storage engine implements an append-only binary store with in-memory indexing and memory-mapped file access. The architecture consists of four primary structures that work together to provide concurrent, high-performance storage operations.
Sources: src/storage_engine/data_store.rs:26-33 src/storage_engine.rs:1-25 src/storage_engine/entry_iterator.rs:21-25
graph TB
subgraph "Public Traits (src/storage_engine/traits.rs)"
Reader["DataStoreReader\nread(), exists(), batch_read()\nread_with_key_hash(), iter_entries()"]
Writer["DataStoreWriter\nwrite(), delete(), batch_write()\nwrite_stream(), rename(), compact()"]
end
subgraph "DataStore (src/storage_engine/data_store.rs:27-33)"
DS["DataStore"]
FileHandle["file: Arc<RwLock<BufWriter<File>>>\nWrite-locked sequential appends"]
MmapHandle["mmap: Arc<Mutex<Arc<Mmap>>>\nRemapped after writes"]
TailOffset["tail_offset: AtomicU64\nAtomic end-of-file offset"]
KeyIndex["key_indexer: Arc<RwLock<KeyIndexer>>\nHashMap with collision detection"]
PathField["path: PathBuf\nStorage file location"]
end
subgraph "Support Structures"
EH["EntryHandle\nmmap_arc + range + metadata\n(simd-r-drive-entry-handle)"]
EM["EntryMetadata\nkey_hash: u64\nprev_offset: u64\nchecksum: [u8; 4]"]
EI["EntryIterator\nmmap + cursor + seen_keys\n(entry_iterator.rs)"]
ES["EntryStream\nimpl Read for streaming\n(entry_stream.rs)"]
end
Reader -.impl for.-> DS
Writer -.impl for.-> DS
DS --> FileHandle
DS --> MmapHandle
DS --> TailOffset
DS --> KeyIndex
DS --> PathField
DS --> EH
DS --> EI
EH --> EM
EH --> ES
Core Components
DataStore
The DataStore struct is the primary interface to the storage engine. It maintains four essential shared resources protected by different synchronization primitives:
| Field | Type | Purpose | Synchronization |
|---|---|---|---|
file | Arc<RwLock<BufWriter<File>>> | Write operations to disk | RwLock for exclusive writes |
mmap | Arc<Mutex<Arc<Mmap>>> | Memory-mapped view for reads | Mutex for atomic remapping |
tail_offset | AtomicU64 | Current end-of-file position | Atomic operations |
key_indexer | Arc<RwLock<KeyIndexer>> | Hash-based key lookup | RwLock for concurrent reads |
path | PathBuf | Storage file location | Immutable after creation |
Sources: src/storage_engine/data_store.rs:26-33
Traits
The storage engine exposes two primary traits that define its public API:
-
DataStoreReader: Provides zero-copy read operations includingread(),batch_read(),exists(), and iteration methods. Implemented at src/storage_engine/data_store.rs:1027-1182 -
DataStoreWriter: Provides write operations includingwrite(),batch_write(),delete(), and streaming writes. Implemented at src/storage_engine/data_store.rs:752-1025
Sources: src/lib.rs21 src/storage_engine.rs21
EntryHandle
EntryHandle provides a zero-copy view into stored data by maintaining a reference to the memory-mapped file and a byte range. It includes the associated EntryMetadata for accessing checksums and hash values without additional lookups.
Sources: src/storage_engine.rs24 README.md:43-49
KeyIndexer
KeyIndexer maintains an in-memory hash map from 64-bit XXH3 key hashes to packed values containing a 16-bit collision detection tag and a 48-bit file offset. This structure enables O(1) key lookups while detecting hash collisions.
Sources: src/storage_engine/data_store.rs31 README.md:158-169
Component Interaction Flow
The following diagram illustrates how the core components interact during typical read and write operations, showing the actual method calls and data flow between structures.
Sources: src/storage_engine/data_store.rs:827-834 src/storage_engine/data_store.rs:847-939 src/storage_engine/data_store.rs:1040-1049 src/storage_engine/data_store.rs:502-565 src/storage_engine/data_store.rs:224-259
sequenceDiagram
participant Client
participant DS as "DataStore"
participant FileHandle as "file: RwLock<BufWriter<File>>"
participant TailOffset as "tail_offset: AtomicU64"
participant MmapHandle as "mmap: Mutex<Arc<Mmap>>"
participant KeyIndexer as "key_indexer: RwLock<KeyIndexer>"
Note over Client,KeyIndexer: Write Operation (write:827, write_with_key_hash:832)
Client->>DS: write(key, payload)
DS->>DS: compute_hash(key)
DS->>DS: write_with_key_hash(key_hash, payload)
DS->>DS: batch_write_with_key_hashes()
DS->>FileHandle: write().unwrap() [line 852]
DS->>TailOffset: load(Ordering::Acquire) [line 858]
DS->>DS: prepad_len(tail_offset) [line 909]
DS->>FileHandle: write_all(&buffer) [line 933]
DS->>FileHandle: flush() [line 934]
DS->>DS: reindex() [line 936]
DS->>FileHandle: init_mmap() [line 231]
DS->>MmapHandle: lock().unwrap() [line 232]
DS->>MmapHandle: *guard = Arc::new(new_mmap) [line 255]
DS->>KeyIndexer: write().unwrap() [line 233]
DS->>KeyIndexer: insert(key_hash, offset) [line 246]
DS->>TailOffset: store(tail_offset, Release) [line 256]
DS-->>Client: Ok(offset)
Note over Client,KeyIndexer: Read Operation (read:1040)
Client->>DS: read(key)
DS->>DS: compute_hash(key) [line 1041]
DS->>KeyIndexer: read().unwrap() [line 1042]
DS->>MmapHandle: get_mmap_arc() [line 1046]
DS->>DS: read_entry_with_context() [line 1048]
DS->>KeyIndexer: get_packed(&key_hash) [line 509]
KeyIndexer-->>DS: Some(packed)
DS->>DS: KeyIndexer::unpack(packed) [line 510]
DS->>DS: KeyIndexer::tag_from_key() [line 514]
DS->>DS: EntryMetadata::deserialize() [line 529]
DS->>DS: prepad_len(prev_tail) [line 534]
DS->>DS: EntryHandle { mmap_arc, range, metadata }
DS-->>Client: Ok(Some(EntryHandle))
Storage Initialization and Recovery
The DataStore::open() method performs a multi-stage initialization process that ensures data integrity even after crashes or incomplete writes.
flowchart TD
Start["DataStore::open(path) [line 84]"]
OpenFile["open_file_in_append_mode() [line 161]\nOpenOptions::new().read(true).write(true)\n.create(true).seek(SeekFrom::End(0))"]
GetLen["file.get_ref().metadata()?.len() [line 86]"]
InitMmap["init_mmap(&file) [line 88]\nmemmap2::MmapOptions::new().map()"]
Recover["recover_valid_chain(&mmap, file_len) [line 89]\nReturns final_len"]
CheckTrunc{"final_len < file_len? [line 91]"}
Warn["warn!() + drop(mmap) + drop(file) [line 92-99]"]
Truncate["OpenOptions::open(path)\nfile.set_len(final_len)\nfile.sync_all() [line 100-102]"]
Reopen["return Self::open(path) [line 103]"]
BuildIndex["KeyIndexer::build(mmap_for_indexer, final_len) [line 108]\nForward scan populating HashMap"]
CreateDS["DataStore { file, mmap, tail_offset,\nkey_indexer, path } [line 110-116]"]
Start --> OpenFile
OpenFile --> GetLen
GetLen --> InitMmap
InitMmap --> Recover
Recover --> CheckTrunc
CheckTrunc -->|Yes - Corruption| Warn
CheckTrunc -->|No - Valid| BuildIndex
Warn --> Truncate
Truncate --> Reopen
Reopen --> Start
BuildIndex --> CreateDS
The recover_valid_chain() function at src/storage_engine/data_store.rs:383-482 validates the storage file by:
- Backward scanning from
file_lenusing a cursor, readingEntryMetadataat each position - Following
prev_offsetchain stored in metadata (line 397), validating each link points to a valid previous tail - Deriving aligned starts using
prepad_len(prev_tail)for non-tombstone entries (line 401) - Computing chain size by accumulating entry sizes while traversing (line 426)
- Finding deepest valid chain that reaches offset 0 with
total_size <= file_len(line 473) - Returning valid offset or 0 if no valid chain exists (line 481)
Sources: src/storage_engine/data_store.rs:84-117 src/storage_engine/data_store.rs:161-170 src/storage_engine/data_store.rs:383-482
flowchart TD
WriteCall["write(key, payload) [line 827]"]
ComputeHash["compute_hash(key) [line 828]\nXXH3_64 via xxhash_rust"]
CallWriteHash["write_with_key_hash(key_hash, payload) [line 829]"]
CallBatch["batch_write_with_key_hashes() [line 833]"]
AcquireWrite["file.write().unwrap() [line 852]\nAcquire RwLock write guard"]
LoadTail["tail_offset.load(Ordering::Acquire) [line 858]"]
LoopEntries["For each (key_hash, payload)"]
CalcPad["prepad_len(tail_offset) [line 909]\n((A - offset % A) & (A-1))"]
CheckTombstone{"payload == NULL_BYTE?"}
AppendPad["buffer.resize(old_len + prepad, 0u8) [line 912]"]
ComputeChecksum["compute_checksum(payload) [line 916]\nCRC32C via crc32fast"]
BuildMetadata["EntryMetadata { key_hash, prev_offset,\nchecksum } [line 917-921]"]
SimdCopy["simd_copy(&mut entry[..], payload) [line 925]"]
AppendBuffer["buffer.extend_from_slice(&entry) [line 927]"]
UpdateTailLoop["tail_offset += entry.len() [line 929]"]
StoreOffset["key_hash_offsets.push() [line 930]"]
WriteAll["file.write_all(&buffer) [line 933]"]
Flush["file.flush() [line 934]"]
Reindex["reindex(&file, &key_hash_offsets,\ntail_offset, deleted_keys) [line 936]"]
UpdateTail["tail_offset.store(new_offset, Release) [line 256]"]
WriteCall --> ComputeHash
ComputeHash --> CallWriteHash
CallWriteHash --> CallBatch
CallBatch --> AcquireWrite
AcquireWrite --> LoadTail
LoadTail --> LoopEntries
LoopEntries --> CheckTombstone
CheckTombstone -->|No| CalcPad
CheckTombstone -->|Yes| BuildMetadata
CalcPad --> AppendPad
AppendPad --> ComputeChecksum
ComputeChecksum --> BuildMetadata
BuildMetadata --> SimdCopy
SimdCopy --> AppendBuffer
AppendBuffer --> UpdateTailLoop
UpdateTailLoop --> StoreOffset
StoreOffset --> LoopEntries
LoopEntries --> WriteAll
WriteAll --> Flush
Flush --> Reindex
Reindex --> UpdateTail
Write Path Architecture
Write operations follow a specific sequence to maintain consistency and enable zero-copy reads. All payloads are aligned to 64-byte boundaries using pre-padding.
The reindex() method at src/storage_engine/data_store.rs:224-259 performs critical post-write operations:
- Memory map update : Calls
init_mmap(write_guard)(line 231) to create newMmap, then atomically swaps via*mmap_guard = Arc::new(new_mmap)(line 255) - Index update : Iterates
key_hash_offsets, callingkey_indexer_guard.insert(*key_hash, *offset)(line 246) orremove(key_hash)for tombstones (line 243) - Collision detection :
insert()returnsErron tag mismatch, aborting batch operation (lines 246-251) - Atomic tail update :
tail_offset.store(tail_offset, Ordering::Release)(line 256) publishes new end position
Sources: src/storage_engine/data_store.rs:847-939 src/storage_engine/data_store.rs:224-259 src/storage_engine/data_store.rs:670-673
Read Path Architecture
Read operations use the hash index for O(1) lookup and return EntryHandle instances that provide zero-copy access to the memory-mapped data.
flowchart TD
ReadCall["read(key) [line 1040]"]
ComputeHash["compute_hash(key) [line 1041]\nXXH3_64 via xxhash_rust"]
AcquireIndex["key_indexer.read().unwrap() [line 1042]\nAcquire RwLock read guard"]
GetMmap["get_mmap_arc() [line 1046]\nmmap.lock() + clone Arc"]
ReadContext["read_entry_with_context(Some(key),\nkey_hash, &mmap_arc, &guard) [line 1048]"]
Lookup["key_indexer_guard.get_packed(&key_hash) [line 509]"]
CheckFound{"Found?"}
Unpack["KeyIndexer::unpack(packed) [line 510]\ntag = (packed >> 48)
as u16\noffset = packed & 0xFFFF_FFFF_FFFF"]
CheckNonHashed{"non_hashed_key.is_some()?"}
ComputeTag["KeyIndexer::tag_from_key(non_hashed_key) [line 514]"]
VerifyTag{"tag == computed_tag?"}
BoundsCheck["offset + METADATA_SIZE > mmap_arc.len()?"}
ReadMetadata["EntryMetadata::deserialize(\n&mmap_arc[offset..offset+METADATA_SIZE]) [line 529]"]
GetPrevTail["prev_tail = metadata.prev_offset [line 533]"]
CalcDerived["derived = prev_tail + prepad_len(prev_tail) [line 534]"]
CheckTombstonePattern{"entry_end - prev_tail == 1 &&\nmmap_arc[prev_tail..entry_end] == NULL_BYTE?"}
SetStartPrevTail["entry_start = prev_tail [line 543]"]
SetStartDerived["entry_start = derived [line 537]"]
BoundsCheckRange["entry_start >= entry_end // entry_end > mmap_arc.len()?"]
CheckTombstone{"entry_end - entry_start == 1 &&\nmmap_arc[entry_start..entry_end] == NULL_BYTE? [line 551]"}
CreateHandle["EntryHandle { mmap_arc, range:\nentry_start..entry_end, metadata } [line 560]"]
ReturnNone["Return None"]
ReadCall --> ComputeHash
ComputeHash --> AcquireIndex
AcquireIndex --> GetMmap
GetMmap --> ReadContext
ReadContext --> Lookup
Lookup --> CheckFound
CheckFound -->|No| ReturnNone
CheckFound -->|Yes| Unpack
Unpack --> CheckNonHashed
CheckNonHashed -->|Yes| ComputeTag
CheckNonHashed -->|No| BoundsCheck
ComputeTag --> VerifyTag
VerifyTag -->|No| ReturnNone
VerifyTag -->|Yes| BoundsCheck
BoundsCheck -->|Out of bounds| ReturnNone
BoundsCheck -->|Valid| ReadMetadata
ReadMetadata --> GetPrevTail
GetPrevTail --> CalcDerived
CalcDerived --> CheckTombstonePattern
CheckTombstonePattern -->|Yes| SetStartPrevTail
CheckTombstonePattern -->|No| SetStartDerived
SetStartPrevTail --> BoundsCheckRange
SetStartDerived --> BoundsCheckRange
BoundsCheckRange -->|Invalid| ReturnNone
BoundsCheckRange -->|Valid| CheckTombstone
CheckTombstone -->|Yes| ReturnNone
CheckTombstone -->|No| CreateHandle
The tag verification at src/storage_engine/data_store.rs:513-521 prevents hash collision issues:
- Extract stored tag : Upper 16 bits of packed value via
(packed >> 48) as u16(line 510) - Compute expected tag : When
non_hashed_keyprovided, callsKeyIndexer::tag_from_key(non_hashed_key)(line 514) - Compare tags : Returns
Nonewith warning if mismatch detected (lines 516-520)
This catches collisions where two different keys produce the same 64-bit XXH3_64 hash, using additional key bytes for verification.
Sources: src/storage_engine/data_store.rs:1040-1049 src/storage_engine/data_store.rs:502-565 src/storage_engine/data_store.rs:658-663
Batch Operations
Batch operations reduce lock acquisition overhead by processing multiple entries in a single synchronized operation.
Batch Write
The batch_write() method at src/storage_engine/data_store.rs:838-843 accumulates all entries in an in-memory buffer before acquiring the write lock once:
Sources: src/storage_engine/data_store.rs:838-843 src/storage_engine/data_store.rs:847-939
flowchart TD
BatchWrite["batch_write(entries: &[(&[u8], &[u8])]) [line 838]"]
UnzipKeys["(keys, payloads): (Vec<_>, Vec<_>) = entries.iter().unzip() [line 839]"]
HashAll["compute_hash_batch(&keys) [line 840]\nParallel/SIMD XXH3_64 if available"]
ZipHashes["hashes.zip(payloads) [line 841]"]
CallBatchHash["batch_write_with_key_hashes(hashed_entries, false) [line 842]"]
AcquireWrite["file.write().unwrap() [line 852]"]
LoadTail["tail_offset.load(Ordering::Acquire) [line 858]"]
InitBuffer["buffer = Vec::new() [line 857]"]
InitVecs["key_hash_offsets = Vec::new()\ndeleted_keys = HashSet::new() [line 860-861]"]
LoopStart{"For each (key_hash, payload)"}
CheckNull{"payload == NULL_BYTE?"}
TombstonePath["Write tombstone:\n1 byte + metadata [line 872-896]"]
NormalPath["prepad_len(tail_offset) [line 909]\nbuffer.resize(old_len + prepad, 0u8) [line 912]"]
ComputeCheck["compute_checksum(payload) [line 916]"]
BuildMeta["EntryMetadata { key_hash,\nprev_offset, checksum } [line 917-921]"]
SimdCopy["simd_copy(&mut entry, payload) [line 925]"]
AppendEntry["buffer.extend_from_slice(&entry) [line 927]"]
UpdateTailLocal["tail_offset += entry.len() [line 929]"]
PushOffset["key_hash_offsets.push((key_hash, offset)) [line 930]"]
LoopContinue["Continue loop"]
WriteAll["file.write_all(&buffer) [line 933]"]
Flush["file.flush() [line 934]"]
Reindex["reindex(&file, &key_hash_offsets,\ntail_offset, Some(&deleted_keys)) [line 936]"]
Return["Ok(tail_offset) [line 938]"]
BatchWrite --> UnzipKeys
UnzipKeys --> HashAll
HashAll --> ZipHashes
ZipHashes --> CallBatchHash
CallBatchHash --> AcquireWrite
AcquireWrite --> LoadTail
LoadTail --> InitBuffer
InitBuffer --> InitVecs
InitVecs --> LoopStart
LoopStart --> CheckNull
CheckNull -->|Yes| TombstonePath
CheckNull -->|No| NormalPath
TombstonePath --> PushOffset
NormalPath --> ComputeCheck
ComputeCheck --> BuildMeta
BuildMeta --> SimdCopy
SimdCopy --> AppendEntry
AppendEntry --> UpdateTailLocal
UpdateTailLocal --> PushOffset
PushOffset --> LoopContinue
LoopContinue --> LoopStart
LoopStart --> WriteAll
WriteAll --> Flush
Flush --> Reindex
Reindex --> Return
Batch Read
The batch_read() method at src/storage_engine/data_store.rs:1105-1109 acquires the index lock once and performs all lookups:
The batch read implementation (lines 1111-1158) uses a single lock acquisition for the entire batch:
- Single mmap clone :
get_mmap_arc()clones theArc<Mmap>once (line 1116) - Single index lock :
key_indexer.read().unwrap()held for entire batch (line 1117) - Conditional tag verification : When
non_hashed_keysprovided, passesSome(key)for collision detection (lines 1134-1145) - Parallel option available : With
parallelfeature,par_iter_entries()uses Rayon for multi-threaded processing
Sources: src/storage_engine/data_store.rs:1105-1109 src/storage_engine/data_store.rs:1111-1158
Iteration Support
The storage engine provides multiple iteration mechanisms for different use cases:
| Iterator | Type | Synchronization | Use Case | Source |
|---|---|---|---|---|
EntryIterator | Sequential | Single lock acquisition | Backward chain traversal | entry_iterator.rs:21-128 |
par_iter_entries() | Parallel (Rayon) | Lock-free after setup | Multi-threaded scanning | data_store.rs:296-361 |
IntoIterator | Consuming | Single lock acquisition | Owned iteration | data_store.rs:44-51 |
iter_entries() | Non-consuming | Single lock acquisition | Reference iteration | data_store.rs:276-280 |
EntryIterator Implementation
The EntryIterator at src/storage_engine/entry_iterator.rs:21-128 scans backward through the file:
Key features:
- Backward traversal : Follows
prev_offsetchain fromtail_offsetto 0 (line 106) - Deduplication :
seen_keys: HashSet<u64, Xxh3BuildHasher>tracks visited keys (line 24, 110) - Tombstone filtering : Skips single NULL byte entries (line 117-119)
- Zero-copy : Returns
EntryHandlewithArc::clone(&self.mmap)(line 122)
Sources: src/storage_engine/entry_iterator.rs:8-128 src/storage_engine/data_store.rs:276-280 src/storage_engine/data_store.rs:296-361 src/storage_engine/data_store.rs:44-51
Key Design Principles
Append-Only Model
All write operations append data to the end of the file. Updates and deletes create new entries rather than modifying existing data. This design ensures:
- No seeking during writes : Sequential disk I/O for maximum throughput
- Crash safety : Incomplete writes are detected and truncated during recovery
- Zero-copy reads : Memory-mapped regions remain valid as data is never modified
Sources: README.md:98-103
Fixed Alignment
Every non-tombstone payload starts on a 64-byte boundary (configurable via PAYLOAD_ALIGNMENT constant). The prepad_len() function at src/storage_engine/data_store.rs:670-673 computes padding:
This alignment enables:
- Cache-line efficiency : Payloads align with 64-byte CPU cache lines
- SIMD operations : Full-speed AVX2/AVX-512 reads without crossing boundaries
- Zero-copy typed views : Safe reinterpretation as
&[T]without unaligned access
The padding is written as zero bytes (line 769) and not counted in entry.file_size(), making it transparent to users.
Sources: src/storage_engine/data_store.rs:670-673 src/storage_engine/data_store.rs:766-771
Metadata-After-Payload
Each entry stores its metadata immediately after the payload, forming a backward-linked chain. This design:
- Minimizes seeking : Metadata and payload are adjacent on disk
- Enables recovery : The chain can be validated backward from any point
- Supports nesting : Storage containers can be stored within other containers
Sources: README.md:139-148
Lock-Free Reads
Read operations do not acquire write locks and perform zero-copy access through memory-mapped views. The AtomicU64 tail offset ensures readers see a consistent view without blocking writes.
Sources: README.md:170-183 src/storage_engine/data_store.rs:1040-1049
Dismiss
Refresh this wiki
Enter email to refresh