Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions csrc/storage_backends/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Therefore the framework enforces:
|------|---------|
| `connector_types.h` | `Request`, `Completion`, `BatchState`, `Op` |
| `connector_interface.h` | `IStorageConnector` — top-level abstract interface |
| `connector_base.h` | `ConnectorBase<T>` — core harness (eventfd, SQ/CQ, threading, tiling). Override 4 methods per backend |
| `connector_base.h` | `ConnectorBase<T>` — core harness (eventfd, SQ/CQ, threading, tiling). Override 4 required + 1 optional method per backend |
| `connector_pybind_utils.h` | Pybind utilities with GIL release + `LMCACHE_BIND_CONNECTOR_METHODS` macro |
| `redis/` | Reference implementation (RESP2 protocol over TCP) |

Expand All @@ -61,8 +61,8 @@ each step.
### Step 1: C++ connector — inherit from ConnectorBase

Create your connector directory (e.g., `csrc/storage_backends/mybackend/`)
and inherit from `ConnectorBase<YourConnectionType>`. You only need to
override 4 methods:
and inherit from `ConnectorBase<YourConnectionType>`. You need to
override 4 required methods (and optionally `do_single_delete` for eviction):

```cpp
// csrc/storage_backends/mybackend/connector.h
Expand Down Expand Up @@ -105,6 +105,11 @@ class MyConnector : public lmcache::connector::ConnectorBase<MyConn> {
// send EXISTS, return true/false
}

// Optional: delete a key (enables eviction support)
bool do_single_delete(MyConn& conn, const std::string& key) override {
// send DELETE, return true if deleted, false if not found
}

// Optional: clean shutdown of connections
void shutdown_connections() override { /* close sockets */ }

Expand Down Expand Up @@ -275,7 +280,7 @@ Python eventfd.

## Checklist for a new backend

- [ ] C++ connector inheriting `ConnectorBase<T>` with 4 method overrides
- [ ] C++ connector inheriting `ConnectorBase<T>` with 4 required + 1 optional (`do_single_delete`) method overrides
- [ ] Pybind module using `LMCACHE_BIND_CONNECTOR_METHODS`
- [ ] `setup.py` entry for the new `CppExtension`
- [ ] Python client inheriting `ConnectorClientBase` (non-MP mode)
Expand Down
61 changes: 60 additions & 1 deletion csrc/storage_backends/connector_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ this base needs to have at least four methods be overridden by the derived
- 3. do_single_set()
- 4. do_single_exists()

optionally override do_single_delete() to support eviction (default returns
false for all keys).

see the RedisConnector (csrc/redis/) implementing the RESP2 protocol over TCP
for an example
*/
Expand Down Expand Up @@ -130,6 +133,39 @@ class ConnectorBase : public IStorageConnector {
return batch_future_id;
}

uint64_t submit_batch_delete(const std::vector<std::string>& keys) override {
if (keys.empty()) {
throw std::runtime_error("keys list is empty");
}

size_t num_items = keys.size();
auto [batch_future_id, batch_state, num_tiles, tile_size] =
prepare_batch_operation(num_items, Op::BATCH_TILE_DELETE);

// pre-allocate per-key results (1 = deleted, 0 = not found)
batch_state->per_key_results.assign(num_items, 0);

// fan out work to threads
for (size_t tile_idx = 0; tile_idx < num_tiles; ++tile_idx) {
size_t start = tile_idx * tile_size;
size_t end = std::min(start + tile_size, num_items);

Request tile_req;
tile_req.op = Op::BATCH_TILE_DELETE;
tile_req.future_id = batch_future_id;
tile_req.batch = batch_state;
tile_req.start_idx = start;

for (size_t i = start; i < end; ++i) {
tile_req.keys.push_back(keys[i]);
}

enqueue_request(std::move(tile_req));
}

return batch_future_id;
}

std::vector<Completion> drain_completions() override {
// Drain the eventfd that triggered this drain_completions callback
drain_eventfd_();
Expand Down Expand Up @@ -216,6 +252,11 @@ class ConnectorBase : public IStorageConnector {
size_t chunk_size) = 0;
virtual bool do_single_exists(ConnectionType& conn,
const std::string& key) = 0;
virtual bool do_single_delete(ConnectionType& conn, const std::string& key) {
(void)conn;
(void)key;
return false; // no-op default for backward compat with plugins
}
virtual void shutdown_connections() {}

bool is_stopping() const { return stop_.load(std::memory_order_acquire); }
Expand Down Expand Up @@ -393,6 +434,23 @@ class ConnectorBase : public IStorageConnector {
}
comp.ok = true;
break;

case Op::BATCH_TILE_DELETE:
for (size_t i = 0; i < req.keys.size(); ++i) {
try {
bool deleted = do_single_delete(conn, req.keys[i]);
req.batch->per_key_results[req.start_idx + i] =
deleted ? 1 : 0;
} catch (const std::exception& e) {
// Per-key error tolerance: record failure
// but continue processing remaining keys
req.batch->per_key_results[req.start_idx + i] = 0;
fprintf(stderr, "[LMCache DELETE] key %s failed: %s\n",
req.keys[i].c_str(), e.what());
}
}
comp.ok = true;
break;
}
} catch (const std::exception& e) {
comp.ok = false;
Expand Down Expand Up @@ -438,7 +496,8 @@ class ConnectorBase : public IStorageConnector {
}
// for batch exists and batch get, move per-key results
if (req.batch->batch_op == Op::BATCH_TILE_EXISTS ||
req.batch->batch_op == Op::BATCH_TILE_GET) {
req.batch->batch_op == Op::BATCH_TILE_GET ||
req.batch->batch_op == Op::BATCH_TILE_DELETE) {
batch_comp.result_bytes = std::move(req.batch->per_key_results);
}
push_completion(std::move(batch_comp));
Expand Down
18 changes: 18 additions & 0 deletions csrc/storage_backends/connector_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ class IStorageConnector {
virtual uint64_t submit_batch_exists(
const std::vector<std::string>& keys) = 0;

/*
submit a batch DELETE operation

deletes multiple keys in parallel. work is automatically divided
among worker threads (tiling). returns a single future_id for the entire
batch.

args:
keys: vector of key strings to delete

returns:
uint64_t: future id for tracking this batch operation
completion will contain result_bytes vector with 0/1 for each key
(1 = deleted, 0 = not found)
*/
virtual uint64_t submit_batch_delete(
const std::vector<std::string>& keys) = 0;

/*
drain all available completions

Expand Down
12 changes: 12 additions & 0 deletions csrc/storage_backends/connector_pybind_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ example usage (see `redis/pybind.cpp`):
lmcache::connector::pybind_utils::bind_submit_batch_exists< \
ConnectorType>(), \
py::arg("keys")) \
.def("submit_batch_delete", \
lmcache::connector::pybind_utils::bind_submit_batch_delete< \
ConnectorType>(), \
py::arg("keys")) \
.def("drain_completions", \
lmcache::connector::pybind_utils::bind_drain_completions< \
ConnectorType>()) \
Expand Down Expand Up @@ -113,6 +117,14 @@ auto bind_submit_batch_exists() {
};
}

template <typename ConnectorType>
auto bind_submit_batch_delete() {
return [](ConnectorType& self, const std::vector<std::string>& keys) {
py::gil_scoped_release release;
return self.submit_batch_delete(keys);
};
}

template <typename ConnectorType>
auto bind_drain_completions() {
return [](ConnectorType& self) {
Expand Down
7 changes: 6 additions & 1 deletion csrc/storage_backends/connector_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ namespace connector {

// we only support batched operations
// benefits are fewer submissions and fewer completions
enum class Op : uint8_t { BATCH_TILE_GET, BATCH_TILE_SET, BATCH_TILE_EXISTS };
enum class Op : uint8_t {
BATCH_TILE_GET,
BATCH_TILE_SET,
BATCH_TILE_EXISTS,
BATCH_TILE_DELETE
};

/*
shared communication state between threads executing a single batch operation.
Expand Down
7 changes: 7 additions & 0 deletions csrc/storage_backends/fs/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,12 @@ bool FSConnector::do_single_exists(WorkerFSConn& conn, const std::string& key) {
return std::filesystem::exists(file_path);
}

bool FSConnector::do_single_delete(WorkerFSConn& conn, const std::string& key) {
std::string filename = key_to_filename(key);
auto file_path = conn.base_path / filename;
std::error_code ec;
return std::filesystem::remove(file_path, ec);
}

} // namespace connector
} // namespace lmcache
1 change: 1 addition & 0 deletions csrc/storage_backends/fs/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class FSConnector : public ConnectorBase<WorkerFSConn> {
void do_single_set(WorkerFSConn& conn, const std::string& key,
const void* buf, size_t len, size_t chunk_size) override;
bool do_single_exists(WorkerFSConn& conn, const std::string& key) override;
bool do_single_delete(WorkerFSConn& conn, const std::string& key) override;

private:
// Build the filesystem-safe filename from a serialized key string.
Expand Down
74 changes: 74 additions & 0 deletions csrc/storage_backends/mooncake/connector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: Apache-2.0

#include "connector.h"

#include <cstdint>
#include <cstdio>
#include <stdexcept>
#include <string>

namespace lmcache {
namespace connector {

MooncakeConnector::MooncakeConnector(ConfigDict config, int num_workers)
: ConnectorBase(num_workers), config_(std::move(config)) {
// Create a RealClient via the static factory.
client_ = mooncake::RealClient::create();
if (!client_) {
throw std::runtime_error("Failed to create mooncake RealClient");
}

// Forward the config dict to setup_internal().
mooncake::ConfigDict mc_config(config_.begin(), config_.end());
auto result = client_->setup_internal(mc_config);
if (!result.has_value()) {
throw std::runtime_error("Mooncake setup_internal failed");
}

start_workers(); // IMPORTANT: call at END of ctor
}

MooncakeConnector::~MooncakeConnector() {
close();
if (client_) {
client_->tearDownAll();
client_.reset();
}
}

WorkerMooncakeConn MooncakeConnector::create_connection() {
WorkerMooncakeConn conn;
conn.client = client_.get();
return conn;
}

void MooncakeConnector::do_single_get(WorkerMooncakeConn& conn,
const std::string& key, void* buf,
size_t len, size_t chunk_size) {
int64_t bytes_read = conn.client->get_into(key, buf, len);
if (bytes_read < 0) {
throw std::runtime_error("Mooncake get_into failed for key: " + key);
}
}

void MooncakeConnector::do_single_set(WorkerMooncakeConn& conn,
const std::string& key, const void* buf,
size_t len, size_t chunk_size) {
int rc = conn.client->put_from(key, const_cast<void*>(buf), len);
if (rc != 0) {
throw std::runtime_error("Mooncake put_from failed for key: " + key);
}
}

bool MooncakeConnector::do_single_exists(WorkerMooncakeConn& conn,
const std::string& key) {
// isExist returns: 1=exists, 0=not, -1=error
int result = conn.client->isExist(key);
if (result < 0) {
throw std::runtime_error("Mooncake isExist failed for key: " + key);
}
return result == 1;
}

} // namespace connector
} // namespace lmcache
53 changes: 53 additions & 0 deletions csrc/storage_backends/mooncake/connector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// SPDX-License-Identifier: Apache-2.0
#pragma once

#include "../connector_base.h"
#include "real_client.h"

#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

namespace lmcache {
namespace connector {

// ConfigDict mirrors mooncake::ConfigDict
// (std::unordered_map<std::string, std::string>).
using ConfigDict = std::unordered_map<std::string, std::string>;

// Per-worker connection state for the Mooncake connector.
// Each worker holds a raw pointer to the shared
// RealClient (owned by MooncakeConnector).
struct WorkerMooncakeConn {
mooncake::RealClient* client{nullptr};
};

class MooncakeConnector : public ConnectorBase<WorkerMooncakeConn> {
public:
MooncakeConnector(ConfigDict config, int num_workers);
~MooncakeConnector() override;

protected:
WorkerMooncakeConn create_connection() override;

void do_single_get(WorkerMooncakeConn& conn, const std::string& key,
void* buf, size_t len, size_t chunk_size) override;

void do_single_set(WorkerMooncakeConn& conn, const std::string& key,
const void* buf, size_t len, size_t chunk_size) override;

bool do_single_exists(WorkerMooncakeConn& conn,
const std::string& key) override;

private:
// Shared Mooncake RealClient instance.
std::shared_ptr<mooncake::RealClient> client_;

// The original config dict (kept for diagnostics).
ConfigDict config_;
};

} // namespace connector
} // namespace lmcache
14 changes: 14 additions & 0 deletions csrc/storage_backends/mooncake/pybind.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// SPDX-License-Identifier: Apache-2.0
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include "../connector_pybind_utils.h"
#include "connector.h"

namespace py = pybind11;

PYBIND11_MODULE(lmcache_mooncake, m) {
py::class_<lmcache::connector::MooncakeConnector>(m, "LMCacheMooncakeClient")
.def(py::init<lmcache::connector::ConfigDict, int>(), py::arg("config"),
py::arg("num_workers"))
LMCACHE_BIND_CONNECTOR_METHODS(lmcache::connector::MooncakeConnector);
}
Loading
Loading