-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsqlite_batch.rs
More file actions
137 lines (115 loc) · 4.41 KB
/
Copy pathsqlite_batch.rs
File metadata and controls
137 lines (115 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//! Batch-insert rows into an in-memory `SQLite` database using the rarray virtual table.
//!
//! Run with: `cargo run --example sqlite_batch`
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use rusqlite::vtab::array::load_module;
use rusqlite::Connection;
use tower::Service;
use tower_batch::{Batch, BatchControl, BoxError};
struct InsertRow {
name: String,
value: i64,
}
struct SqliteBatchService {
conn: Arc<Mutex<Connection>>,
pending: Vec<InsertRow>,
}
impl Service<BatchControl<InsertRow>> for SqliteBatchService {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: BatchControl<InsertRow>) -> Self::Future {
match req {
BatchControl::Item(row) => {
self.pending.push(row);
Box::pin(std::future::ready(Ok(())))
}
BatchControl::Flush => {
let rows = mem::take(&mut self.pending);
let conn = Arc::clone(&self.conn);
Box::pin(async move {
tokio::task::spawn_blocking(move || {
let conn = conn
.lock()
.map_err(|e| -> BoxError { e.to_string().into() })?;
let tx = conn.unchecked_transaction()?;
let names: Vec<rusqlite::types::Value> = rows
.iter()
.map(|r| rusqlite::types::Value::Text(r.name.clone()))
.collect();
let values: Vec<rusqlite::types::Value> = rows
.iter()
.map(|r| rusqlite::types::Value::Integer(r.value))
.collect();
// Rc is required here: rusqlite's rarray only implements ToSql for Rc<Vec<Value>>.
// This is safe because the Rc never leaves the spawn_blocking closure.
let names = Rc::new(names);
let values = Rc::new(values);
conn.execute(
"INSERT INTO data(name, value) \
SELECT n.value, v.value \
FROM rarray(?1) AS n \
JOIN rarray(?2) AS v ON n.rowid = v.rowid",
rusqlite::params![names, values],
)?;
tx.commit()?;
Ok::<(), BoxError>(())
})
.await?
})
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), BoxError> {
let conn = Connection::open_in_memory()?;
conn.execute_batch("CREATE TABLE data (name TEXT NOT NULL, value INTEGER NOT NULL)")?;
load_module(&conn)?;
let conn = Arc::new(Mutex::new(conn));
let service = SqliteBatchService {
conn: Arc::clone(&conn),
pending: Vec::new(),
};
let batch = Batch::new(service, 50, Duration::from_millis(100));
let mut handles = Vec::new();
for task_id in 0..4 {
let mut batch = batch.clone();
handles.push(tokio::spawn(async move {
for i in 0..50 {
tower::ServiceExt::ready(&mut batch).await.unwrap();
batch
.call(InsertRow {
name: format!("task{task_id}_row{i}"),
value: i64::from(task_id * 50 + i),
})
.await
.unwrap();
}
}));
}
for handle in handles {
handle.await?;
}
// Drop the last Batch handle so the worker knows no more requests are coming,
// then give it time to flush. In production you may want a more robust shutdown
// mechanism.
drop(batch);
tokio::time::sleep(Duration::from_millis(200)).await;
let count: i64 = conn
.lock()
.map_err(|e| -> BoxError { e.to_string().into() })?
.query_row("SELECT COUNT(*) FROM data", [], |row| row.get(0))?;
println!("Inserted {count} rows (expected 200)");
assert_eq!(count, 200);
Ok(())
}