Skip to content
Draft

mmap #47

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/borghash/HashTable.pxd
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
from libc.stdint cimport uint8_t, uint32_t

cdef class HashTable:
cdef int ksize, vsize
cdef public int ksize, vsize
cdef readonly size_t capacity, used
cdef size_t initial_capacity, tombstones
cdef float max_load_factor, min_load_factor, shrink_factor, grow_factor
cdef uint32_t* table
cdef uint32_t kv_capacity, kv_used
cdef public uint32_t kv_capacity, kv_used
cdef float kv_grow_factor
cdef uint8_t* keys
cdef uint8_t* values
cdef uint8_t* kv
cdef uint8_t* header
cdef int fd
cdef size_t mmap_size
cdef uint32_t kv_offset
cdef int stats_get, stats_set, stats_del, stats_iter, stats_lookup, stats_linear
cdef int stats_resize_table, stats_resize_kv

cdef size_t _get_index(self, uint8_t* key)
cdef int _lookup_index(self, uint8_t* key_ptr, size_t* index_ptr)
cpdef void update_table_only(self, bytes key, uint32_t kv_index)
cdef void _resize_table(self, size_t new_capacity)
cdef void _resize_kv(self, size_t new_capacity)
144 changes: 111 additions & 33 deletions src/borghash/HashTable.pyx
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
"""
HashTable: low-level hash table mapping fully random bytes keys to bytes values.
Key and value lengths can be chosen, but are fixed thereafter.
The keys and values are stored in arrays separate from the hashtable.
The hashtable only stores the 32-bit indices into the key/value arrays.
The keys and values are stored together in an array separate from the hashtable.
The hashtable only stores the 32-bit indices into the key/value array.
"""
from __future__ import annotations
from typing import BinaryIO, Iterator, Any

from libc.stdlib cimport malloc, free, realloc
from libc.string cimport memcpy, memset, memcmp
from libc.stdint cimport uint8_t, uint32_t
from libc.errno cimport errno
from posix.unistd cimport close, ftruncate, lseek, SEEK_END
from posix.fcntl cimport open as c_open, O_RDWR, O_CREAT
from posix.mman cimport mmap, munmap, MAP_SHARED, PROT_READ, PROT_WRITE

from collections.abc import Mapping

Expand Down Expand Up @@ -47,18 +51,28 @@ cdef class HashTable:
key_size: int = 0, value_size: int = 0, capacity: int = MIN_CAPACITY,
max_load_factor: float = 0.5, min_load_factor: float = 0.10,
shrink_factor: float = 0.4, grow_factor: float = 2.0,
kv_grow_factor: float = 1.3) -> None:
kv_grow_factor: float = 1.3,
path: str = None, kv_offset: int = 0) -> None:
# the load of the ht (.table) shall be between 0.25 and 0.5, so it is fast and has few collisions.
# it is cheap to have a low hash table load, because .table only stores uint32_t indices into the
# .keys and .values array.
# the keys/values arrays have bigger elements and are not hash tables, thus collisions and load
# .kv array.
# the .kv array has bigger elements and is not a hash table, thus collisions and load
# factor are no concern there. the kv_grow_factor can be relatively small.
if key_size < 4:
raise ValueError("key_size must be specified and must be >= 4.")
if not value_size:
raise ValueError("value_size must be specified and must be > 0.")
self.ksize = key_size
self.vsize = value_size
# vvv mmap vvv
self.fd = -1
self.mmap_size = 0
self.kv_offset = kv_offset
if path:
self.fd = c_open(path.encode('utf-8'), O_RDWR | O_CREAT, 0o644)
if self.fd == -1:
raise OSError(errno, f"Could not open {path}")
# ^^^ mmap ^^^
# vvv hash table vvv
self.max_load_factor = max_load_factor
self.min_load_factor = min_load_factor
Expand All @@ -71,13 +85,31 @@ cdef class HashTable:
self.table = NULL
self._resize_table(self.initial_capacity)
# ^^^ hash table ^^^
# vvv kv arrays vvv
# vvv kv array vvv
self.kv_grow_factor = kv_grow_factor
self.kv_used = 0
self.keys = NULL
self.values = NULL
self._resize_kv(int(self.initial_capacity * self.max_load_factor))
# ^^^ kv arrays ^^^
self.kv = NULL
self.header = NULL
if self.fd != -1:
# For mmap, we determine current size and capacity from file size.
file_size = lseek(self.fd, 0, SEEK_END)
if file_size > 0:
self.mmap_size = file_size
new_kv = mmap(NULL, self.mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED, self.fd, 0)
if new_kv == <void*> -1:
raise OSError(errno, "mmap failed")
self.header = <uint8_t*> new_kv
if self.kv_offset > 0:
self.kv = <uint8_t*> new_kv + self.kv_offset
self.kv_capacity = <uint32_t>((self.mmap_size - self.kv_offset) // (self.ksize + self.vsize))
else:
self.kv = NULL
self.kv_capacity = 0
else:
self._resize_kv(int(self.initial_capacity * self.max_load_factor))
else:
self._resize_kv(int(self.initial_capacity * self.max_load_factor))
# ^^^ kv array ^^^
# vvv stats vvv
self.stats_get = 0
self.stats_set = 0
Expand All @@ -92,8 +124,17 @@ cdef class HashTable:

def __del__(self) -> None:
free(self.table)
free(self.keys)
free(self.values)
self.table = NULL
if self.fd != -1:
if self.header != NULL:
munmap(self.header, self.mmap_size)
self.header = NULL
self.kv = NULL
close(self.fd)
self.fd = -1
else:
free(self.kv)
self.kv = NULL

def clear(self) -> None:
"""Empty the HashTable and start from scratch."""
Expand Down Expand Up @@ -122,7 +163,7 @@ cdef class HashTable:
self.stats_lookup += 1
while (kv_index := self.table[index]) != FREE_BUCKET:
self.stats_linear += 1
if kv_index != TOMBSTONE_BUCKET and memcmp(self.keys + kv_index * self.ksize, key_ptr, self.ksize) == 0:
if kv_index != TOMBSTONE_BUCKET and memcmp(self.kv + kv_index * (self.ksize + self.vsize), key_ptr, self.ksize) == 0:
if index_ptr:
index_ptr[0] = index
return 1 # found
Expand All @@ -142,20 +183,21 @@ cdef class HashTable:
self.stats_set += 1
if self._lookup_index(key_ptr, &index):
kv_index = self.table[index]
memcpy(self.values + kv_index * self.vsize, value_ptr, self.vsize)
memcpy(self.kv + kv_index * (self.ksize + self.vsize) + self.ksize, value_ptr, self.vsize)
return

if self.kv_used >= self.kv_capacity:
self._resize_kv(int(self.kv_capacity * self.kv_grow_factor))
# "+ 1" ensures growth even for very small or 0 capacity.
self._resize_kv(int(self.kv_capacity * self.kv_grow_factor + 1))
if self.kv_used >= self.kv_capacity:
# Should never happen. See "RESERVED" constant - we allow almost 4Gi kv entries.
# For a typical 256-bit key and a small 32-bit value that would already consume 176GiB+
# memory (plus spikes to even more when hashtable or kv arrays get resized).
raise RuntimeError("KV array is full")

kv_index = self.kv_used
memcpy(self.keys + kv_index * self.ksize, key_ptr, self.ksize)
memcpy(self.values + kv_index * self.vsize, value_ptr, self.vsize)
memcpy(self.kv + kv_index * (self.ksize + self.vsize), key_ptr, self.ksize)
memcpy(self.kv + kv_index * (self.ksize + self.vsize) + self.ksize, value_ptr, self.vsize)
self.kv_used += 1

self.used += 1
Expand All @@ -177,7 +219,7 @@ cdef class HashTable:
self.stats_get += 1
if self._lookup_index(<uint8_t*> key, &index):
kv_index = self.table[index]
return self.values[kv_index * self.vsize:(kv_index + 1) * self.vsize]
return self.kv[kv_index * (self.ksize + self.vsize) + self.ksize : kv_index * (self.ksize + self.vsize) + self.ksize + self.vsize]
else:
raise KeyError("Key not found")

Expand All @@ -191,8 +233,7 @@ cdef class HashTable:
self.stats_del += 1
if self._lookup_index(key_ptr, &index):
kv_index = self.table[index]
memset(self.keys + kv_index * self.ksize, 0, self.ksize)
memset(self.values + kv_index * self.vsize, 0, self.vsize)
memset(self.kv + kv_index * (self.ksize + self.vsize), 0, self.ksize + self.vsize)
self.table[index] = TOMBSTONE_BUCKET
self.used -= 1
self.tombstones += 1
Expand Down Expand Up @@ -233,10 +274,21 @@ cdef class HashTable:
for i in range(self.capacity):
kv_index = self.table[i]
if kv_index not in (FREE_BUCKET, TOMBSTONE_BUCKET):
key = self.keys[kv_index * self.ksize:(kv_index + 1) * self.ksize]
value = self.values[kv_index * self.vsize:(kv_index + 1) * self.vsize]
key = self.kv[kv_index * (self.ksize + self.vsize) : kv_index * (self.ksize + self.vsize) + self.ksize]
value = self.kv[kv_index * (self.ksize + self.vsize) + self.ksize : kv_index * (self.ksize + self.vsize) + self.ksize + self.vsize]
yield key, value

cpdef void update_table_only(self, bytes key, uint32_t kv_index):
cdef size_t index
self._lookup_index(<uint8_t*> key, &index)
# index is either a bucket containing the key (if it already existed)
# or it is the first free/tombstone bucket in the probe sequence.
if self.table[index] == FREE_BUCKET or self.table[index] == TOMBSTONE_BUCKET:
self.used += 1
self.table[index] = kv_index
if self.used + self.tombstones > self.capacity * self.max_load_factor:
self._resize_table(int(self.capacity * self.grow_factor))

cdef void _resize_table(self, size_t new_capacity):
cdef size_t i, index
cdef uint32_t kv_index
Expand All @@ -250,7 +302,7 @@ cdef class HashTable:
for i in range(current_capacity):
kv_index = self.table[i]
if kv_index not in (FREE_BUCKET, TOMBSTONE_BUCKET):
index = self._get_index(self.keys + kv_index * self.ksize)
index = self._get_index(self.kv + kv_index * (self.ksize + self.vsize))
while new_table[index] != FREE_BUCKET:
index = (index + 1) % new_capacity
new_table[index] = kv_index
Expand All @@ -262,12 +314,36 @@ cdef class HashTable:
cdef void _resize_kv(self, size_t new_capacity):
# We must never use kv indices >= RESERVED; thus, we'll never need more capacity either.
cdef size_t capacity = min(new_capacity, <size_t> RESERVED - 1)
cdef size_t new_mmap_size
cdef void* new_ptr
self.stats_resize_kv += 1
# realloc is already highly optimized (in Linux). By using mremap internally only the peak address space usage is "old size" + "new size", while the peak memory usage is only "new size".
self.keys = <uint8_t*> realloc(self.keys, capacity * self.ksize * sizeof(uint8_t))
self.values = <uint8_t*> realloc(self.values, capacity * self.vsize * sizeof(uint8_t))
if self.fd != -1:
new_mmap_size = self.kv_offset + capacity * (self.ksize + self.vsize) * sizeof(uint8_t)
if self.header != NULL:
# Don't shrink automatically during resize if we already have space.
# This prevents truncating an existing file's data when it's opened
# with a smaller initial_capacity than the file already contains.
# HOWEVER, we MUST shrink if capacity < self.kv_capacity (e.g. shrink_to_fit).
if new_mmap_size <= self.mmap_size and capacity >= self.kv_capacity:
return
munmap(self.header, self.mmap_size)
if ftruncate(self.fd, new_mmap_size) == -1:
raise OSError(errno, "ftruncate failed")
new_ptr = mmap(NULL, new_mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED, self.fd, 0)
if new_ptr == <void*> -1:
raise OSError(errno, "mmap failed")
self.header = <uint8_t*> new_ptr
self.kv = <uint8_t*> new_ptr + self.kv_offset
self.mmap_size = new_mmap_size
else:
# realloc is already highly optimized (in Linux). By using mremap internally, only the peak address space usage is "old size" + "new size", while the peak memory usage is only "new size".
self.kv = <uint8_t*> realloc(self.kv, capacity * (self.ksize + self.vsize) * sizeof(uint8_t))
self.kv_capacity = <uint32_t> capacity

def shrink_to_fit(self) -> None:
"""Shrink the KV array and the file to the actually used size."""
self._resize_kv(self.kv_used)

def k_to_idx(self, key: bytes) -> int:
"""
Return the key's index in the keys array (index is stable while in memory).
Expand All @@ -283,15 +359,17 @@ cdef class HashTable:

def idx_to_k(self, idx: int) -> bytes:
"""
For a given index, return the key stored at that index in the keys array.
For a given index, return the key stored at that index in the kv array.
This is the reverse of k_to_idx (e.g., 32-bit index -> 256-bit key).
"""
cdef uint32_t kv_index = <uint32_t> idx
return self.keys[kv_index * self.ksize:(kv_index + 1) * self.ksize]
if kv_index >= self.kv_used:
raise KeyError(f"Index {kv_index} out of range (kv_used={self.kv_used})")
return self.kv[kv_index * (self.ksize + self.vsize) : kv_index * (self.ksize + self.vsize) + self.ksize]

def kv_to_idx(self, key: bytes, value: bytes) -> int:
"""
Return the key's/value's index in the keys/values array (index is stable while in memory).
Return the key's/value's index in the kv array (index is stable while in memory).
This can be used to "abbreviate" a known key/value pair (e.g., 256-bit key + 32-bit value -> 32-bit index).
"""
if len(key) != self.ksize:
Expand All @@ -302,19 +380,19 @@ cdef class HashTable:
cdef uint32_t kv_index
if self._lookup_index(<uint8_t*> key, &index):
kv_index = self.table[index]
value_found = self.values[kv_index * self.vsize:(kv_index + 1) * self.vsize]
value_found = self.kv[kv_index * (self.ksize + self.vsize) + self.ksize : kv_index * (self.ksize + self.vsize) + self.ksize + self.vsize]
if value == value_found:
return kv_index
raise KeyError("Key/Value not found")

def idx_to_kv(self, idx: int) -> tuple[bytes, bytes]:
"""
For a given index, return the key/value stored at that index in the keys/values array.
For a given index, return the key/value stored at that index in the kv array.
This is the reverse of kv_to_idx (e.g., 32-bit index -> 256-bit key + 32-bit value).
"""
cdef uint32_t kv_index = <uint32_t> idx
key = self.keys[kv_index * self.ksize:(kv_index + 1) * self.ksize]
value = self.values[kv_index * self.vsize:(kv_index + 1) * self.vsize]
key = self.kv[kv_index * (self.ksize + self.vsize) : kv_index * (self.ksize + self.vsize) + self.ksize]
value = self.kv[kv_index * (self.ksize + self.vsize) + self.ksize : kv_index * (self.ksize + self.vsize) + self.ksize + self.vsize]
return key, value

@property
Expand Down
16 changes: 9 additions & 7 deletions src/borghash/HashTableNT.pxd
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .HashTable cimport HashTable

cdef class HashTableNT:
cdef int key_size
cdef object byte_order
cdef object value_type
cdef object value_format
cdef object value_struct
cdef int value_size
cdef object inner
cdef public int key_size
cdef public object byte_order
cdef public object value_type
cdef public object value_format
cdef public object value_struct
cdef public int value_size
cdef public HashTable inner
Loading
Loading