diff --git a/Cargo.lock b/Cargo.lock index 963649830ad..6f17864f2f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,7 +547,9 @@ dependencies = [ "tikv_util", "tokio", "tokio-stream", + "tokio-util", "txn_types", + "uuid 0.8.2", "yatp", ] diff --git a/components/backup/Cargo.toml b/components/backup/Cargo.toml index 6cb4edfe7dc..a2a425a17eb 100644 --- a/components/backup/Cargo.toml +++ b/components/backup/Cargo.toml @@ -72,8 +72,10 @@ tikv = { workspace = true } tikv_alloc = { workspace = true } tikv_util = { workspace = true } tokio = { version = "1.5", features = ["rt-multi-thread"] } +tokio-util = { version = "0.7", features = ["compat"] } tokio-stream = "0.1" txn_types = { workspace = true } +uuid = "0.8" yatp = { workspace = true } [dev-dependencies] diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index d436a9c62a9..7f3257ebaad 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -4,6 +4,7 @@ use std::{ borrow::Cow, cell::RefCell, fmt, + path::{Path, PathBuf}, sync::{atomic::*, mpsc, Arc, Mutex, RwLock}, time::{SystemTime, UNIX_EPOCH}, }; @@ -11,10 +12,13 @@ use std::{ use async_channel::SendError; use causal_ts::{CausalTsProvider, CausalTsProviderImpl}; use concurrency_manager::ConcurrencyManager; -use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType}; -use external_storage::{BackendConfig, HdfsConfig}; +use engine_traits::{ + name_to_cf, raw_ttl::ttl_current_ts, CfName, Checkpointer, KvEngine, SstCompressionType, + CF_DEFAULT, CF_WRITE, +}; +use external_storage::{BackendConfig, HdfsConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; -use futures::{channel::mpsc::*, executor::block_on}; +use futures::{channel::mpsc::*, executor::block_on, io::AllowStdIo}; use kvproto::{ brpb::*, encryptionpb::EncryptionMethod, @@ -24,6 +28,7 @@ use kvproto::{ use online_config::OnlineConfig; use raft::StateRole; use raftstore::coprocessor::RegionInfoProvider; +use segment_manager::{SegmentMapManager, SegmentMapRouter}; use tikv::{ config::BackupConfig, storage::{ @@ -47,7 +52,7 @@ use txn_types::{Key, Lock, TimeStamp}; use crate::{ metrics::*, softlimit::{CpuStatistics, SoftLimit, SoftLimitByCpu}, - utils::{ControlThreadPool, KeyValueCodec}, + utils::{convert_encoded_key_to_raw_key, ControlThreadPool, KeyValueCodec}, writer::{BackupWriterBuilder, CfNameWrap}, Error, *, }; @@ -71,6 +76,8 @@ struct Request { compression_level: i32, cipher: CipherInfo, replica_read: bool, + mode: BackupMode, + ssts_id: String, } // Backup Operation corrosponsed to backup service @@ -103,6 +110,8 @@ impl fmt::Debug for Task { .field("is_raw_kv", &self.request.is_raw_kv) .field("dst_api_ver", &self.request.dst_api_ver) .field("cf", &self.request.cf) + .field("mode", &self.request.mode) + .field("unique_id", &self.request.ssts_id) .finish() } } @@ -141,6 +150,8 @@ impl Task { compression_type: req.get_compression_type(), compression_level: req.get_compression_level(), replica_read: req.get_replica_read(), + mode: req.get_mode(), + ssts_id: req.unique_id, cipher: req.cipher_info.unwrap_or_else(|| { let mut cipher = CipherInfo::default(); cipher.set_cipher_type(EncryptionMethod::Plaintext); @@ -298,6 +309,131 @@ async fn send_to_worker_with_metrics( Ok(()) } +struct SstSendInfo { + file_names_d: Vec>, + file_names_w: Vec>, + start_key: Vec, + end_key: Vec, +} + +async fn save_sst_file_worker( + segment_manager: Arc, + data_dir: PathBuf, + rx: async_channel::Receiver, + tx: UnboundedSender, + storage: Arc, +) { + while let Ok(msg) = rx.recv().await { + let mut response = BackupResponse::default(); + let mut d_progress_l = 0; + let mut d_progress_f = 0; + let mut w_progress_l = 0; + let mut w_progress_f = 0; + match upload_sst_file( + &data_dir, + &msg.file_names_d, + &msg.file_names_w, + &mut d_progress_l, + &mut d_progress_f, + &mut w_progress_l, + &mut w_progress_f, + storage.clone(), + ) + .await + { + Err(e) => { + error_unknown!(?e; "upload sst file failed"; + "start_key" => &log_wrappers::Value::key(&msg.start_key), + "end_key" => &log_wrappers::Value::key(&msg.end_key), + ); + let e = Error::from(e); + response.set_error(e.into()); + } + Ok(_) => { + let file = File::default(); + response.set_files(vec![file].into()); + } + } + + segment_manager.release_index( + msg.file_names_d, + d_progress_l, + d_progress_f, + msg.file_names_w, + w_progress_l, + w_progress_f, + ); + + let raw_start_key = convert_encoded_key_to_raw_key(msg.start_key.clone()); + let raw_end_key = convert_encoded_key_to_raw_key(msg.end_key.clone()); + response.set_start_key(raw_start_key); + response.set_end_key(raw_end_key); + // todo: send the count. + if let Err(e) = tx.unbounded_send(response) { + error_unknown!(?e; "backup failed to send response"; + "start_key" => &log_wrappers::Value::key(&msg.start_key), + "end_key" => &log_wrappers::Value::key(&msg.end_key),); + if e.is_disconnected() { + return; + } + } + } +} + +async fn upload_sst_file>( + data_dir: P, + ssts_d: &Vec>, + ssts_w: &Vec>, + d_progress_l: &mut usize, + d_progress_f: &mut usize, + w_progress_l: &mut usize, + w_progress_f: &mut usize, + storage: Arc, +) -> std::io::Result<()> { + upload_sst_file_internal( + data_dir.as_ref(), + ssts_d, + d_progress_l, + d_progress_f, + storage.clone(), + ) + .await?; + upload_sst_file_internal( + data_dir.as_ref(), + ssts_w, + w_progress_l, + w_progress_f, + storage, + ) + .await +} + +async fn upload_sst_file_internal>( + data_dir: P, + ssts: &Vec>, + progress_l: &mut usize, + progress_f: &mut usize, + storage: Arc, +) -> std::io::Result<()> { + for fs in ssts { + for (file_name_relative, _) in fs { + let file_name_relative = match file_name_relative.strip_prefix('/') { + Some(s) => s, + None => file_name_relative, + }; + let file_name = data_dir.as_ref().join(file_name_relative); + // Use tokio::fs::File with futures::compat consumes too much CPU. + let file = std::fs::File::open(file_name)?; + let length = file.metadata()?.len(); + let reader = UnpinReader(Box::new(AllowStdIo::new(file))); + storage.write(file_name_relative, reader, length).await?; + *progress_f += 1; + } + *progress_l += 1; + } + Ok(()) +} + impl BackupRange { /// Get entries from the scanner and save them to storage async fn backup( @@ -690,8 +826,10 @@ pub struct Endpoint { tablets: LocalTablets, config_manager: ConfigManager, concurrency_manager: ConcurrencyManager, + segment_router: SegmentMapRouter, softlimit: SoftLimitKeeper, api_version: ApiVersion, + data_dir: PathBuf, causal_ts_provider: Option>, // used in rawkv apiv2 only pub(crate) engine: E, @@ -838,7 +976,7 @@ impl Progress { } impl Endpoint { - pub fn new( + pub fn new>( store_id: u64, engine: E, region_info: R, @@ -846,6 +984,7 @@ impl Endpoint { config: BackupConfig, concurrency_manager: ConcurrencyManager, api_version: ApiVersion, + root_dir: P, causal_ts_provider: Option>, ) -> Endpoint { let pool = ControlThreadPool::new(); @@ -853,6 +992,8 @@ impl Endpoint { let config_manager = ConfigManager(Arc::new(RwLock::new(config))); let softlimit = SoftLimitKeeper::new(config_manager.clone()); rt.spawn(softlimit.clone().run()); + let segment_router = SegmentMapRouter::new(); + let data_dir = root_dir.as_ref().join("db"); Endpoint { store_id, engine, @@ -863,7 +1004,9 @@ impl Endpoint { softlimit, config_manager, concurrency_manager, + segment_router, api_version, + data_dir, causal_ts_provider, } } @@ -1114,6 +1257,25 @@ impl Endpoint { let backend = Arc::::from(backend); let concurrency = self.config_manager.0.read().unwrap().num_threads; self.pool.borrow_mut().adjust_with(concurrency); + + match request.mode { + BackupMode::Scan => { + self.handle_scan_backup(concurrency, prs, backend, codec, request, resp) + } + BackupMode::File => self.handle_file_backup(concurrency, prs, backend, request, resp), + _ => error!("unknown backup mode"; "mode" => ?request.mode), + }; + } + + fn handle_scan_backup( + &self, + concurrency: usize, + prs: Arc>>, + backend: Arc, + codec: KeyValueCodec, + request: Request, + resp: UnboundedSender, + ) { let (tx, rx) = async_channel::bounded(1); for _ in 0..concurrency { self.spawn_backup_worker( @@ -1131,6 +1293,126 @@ impl Endpoint { )); } } + + fn handle_file_backup( + &self, + concurrency: usize, + prs: Arc>>, + backend: Arc, + request: Request, + resp_tx: UnboundedSender, + ) { + let id = request.ssts_id; + let segment_manager = match self.segment_router.route(&id) { + Some(manager) => manager, + None => { + let mut resp = BackupResponse::new(); + let err_msg = format!("ssts are not found, unique id: {:?}", id); + resp.set_error(crate::Error::Other(box_err!(err_msg)).into()); + if let Err(err) = resp_tx.unbounded_send(resp) { + warn!("failed to send response"; "err" => ?err) + } + return; + } + }; + + let (tx, rx) = async_channel::bounded(1); + for _ in 0..concurrency { + self.io_pool.spawn(save_sst_file_worker( + segment_manager.clone(), + self.data_dir.clone(), + rx.clone(), + resp_tx.clone(), + backend.clone(), + )); + } + + let batch_size = self.config_manager.0.read().unwrap().batch_size; + self.pool.borrow_mut().spawn(async move { + loop { + let batch = { + let progress: &mut Progress<_> = &mut prs.lock().unwrap(); + let batch = progress.forward(batch_size, request.replica_read); + if batch.is_empty() { + return; + } + batch + }; + + for brange in batch { + if request.cancel.load(Ordering::SeqCst) { + warn!("backup task has canceled"; "range" => ?brange); + return; + } + let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_encoded()); + let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_encoded()); + let (d_ssts, w_ssts, ssts_cnt) = + segment_manager.find_ssts(&start_key, &end_key); + info!("select {} ssts", ssts_cnt); + if ssts_cnt == 0 { + let mut resp = BackupResponse::new(); + let raw_start_key = convert_encoded_key_to_raw_key(start_key); + let raw_end_key = convert_encoded_key_to_raw_key(end_key); + resp.set_start_key(raw_start_key); + resp.set_end_key(raw_end_key); + if let Err(err) = resp_tx.unbounded_send(resp) { + warn!("failed to send response"; "err" => ?err) + } + continue; + } + if let Err(err) = tx + .send(SstSendInfo { + file_names_d: d_ssts.clone(), + file_names_w: w_ssts.clone(), + start_key, + end_key, + }) + .await + { + error_unknown!(%err; "error during backup"); + segment_manager.release_index( + d_ssts, + usize::MAX, + usize::MAX, + w_ssts, + usize::MAX, + usize::MAX, + ); + let mut resp = BackupResponse::new(); + let err = Error::from(err); + resp.set_error(err.into()); + if let Err(err) = resp_tx.unbounded_send(resp) { + warn!("failed to send response"; "err" => ?err) + } + } + } + } + }) + } + + pub fn prepare(&mut self, _persistence: bool, mut tx: Sender) { + let checkpointer = self.engine.checkpointer().unwrap(); + let default_metadata = checkpointer.column_family_meta_data(CF_DEFAULT).unwrap(); + let write_metadata = checkpointer.column_family_meta_data(CF_WRITE).unwrap(); + + let s1 = format!("default_segmentmap:{:?}", default_metadata); + let s2 = format!("write_segmentmap:{:?}", write_metadata); + info!("{}", s1); + info!("{}", s2); + + let id = self + .segment_router + .register(default_metadata.ssts, write_metadata.ssts); + let mut resp = PrepareResponse::new(); + resp.set_unique_id(id); + resp.set_collect_file_count( + (default_metadata.file_count + write_metadata.file_count) as u64, + ); + resp.set_collect_file_size((default_metadata.file_size + write_metadata.file_size) as u64); + if let Err(e) = tx.try_send(resp) { + error_unknown!(?e; "[prepare] failed to send response"); + } + } } impl Runnable for Endpoint { @@ -1146,8 +1428,9 @@ impl Runnable for Endpoint %task); self.handle_backup_task(task); } - Operation::Prepare(_persistent, _tx) => { - unimplemented!(); + Operation::Prepare(persistent, tx) => { + info!("run prepare"); + self.prepare(persistent, tx); } Operation::Cleanup(_unique_id, _tx) => { unimplemented!(); @@ -1440,6 +1723,7 @@ pub mod tests { }, concurrency_manager, api_version, + String::from("test"), causal_ts_provider, ), ) @@ -1585,6 +1869,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: false, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; @@ -1694,6 +1980,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: false, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; @@ -1723,6 +2011,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: true, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; @@ -1836,6 +2126,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: false, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; diff --git a/components/backup/src/lib.rs b/components/backup/src/lib.rs index c219773bc05..8bd02e26dd6 100644 --- a/components/backup/src/lib.rs +++ b/components/backup/src/lib.rs @@ -8,6 +8,7 @@ extern crate tikv_alloc; mod endpoint; mod errors; mod metrics; +mod segment_manager; mod service; mod softlimit; mod utils; diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs new file mode 100644 index 00000000000..6b640538cdf --- /dev/null +++ b/components/backup/src/segment_manager.rs @@ -0,0 +1,202 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, Mutex}, +}; + +use engine_traits::SstFileInfo; + +enum SstStatus { + NotUpload, + Uploading, + Uploaded, +} + +type SegmentMap = Vec, SstFileInfo>>; +pub struct SegmentMapRouter(HashMap>); + +impl SegmentMapRouter { + pub fn new() -> Self { + Self(HashMap::new()) + } + + pub fn register(&mut self, d: SegmentMap, w: SegmentMap) -> String { + let (id, manager) = SegmentMapManager::register(d, w); + self.0.insert(id.clone(), Arc::new(manager)); + id + } + + pub fn route(&self, id: &str) -> Option> { + self.0.get(id).cloned() + } +} + +pub struct SegmentMapManager { + map: (SegmentMap, SegmentMap), + // TODO: directly update the uploaded flag in hashmap + index_d: Mutex>>, + index_w: Mutex>>, +} + +impl SegmentMapManager { + fn new(mut d: SegmentMap, mut w: SegmentMap) -> Self { + let index_d_raw = Self::generate_index(&mut d); + let index_w_raw = Self::generate_index(&mut w); + Self { + map: (d, w), + + index_d: Mutex::new(index_d_raw), + index_w: Mutex::new(index_w_raw), + } + } + + pub fn register(d: SegmentMap, w: SegmentMap) -> (String, Self) { + let id = uuid::Uuid::new_v4().to_string(); + + (id, Self::new(d, w)) + } + + fn generate_index(map: &mut SegmentMap) -> Vec> { + let mut index = Vec::new(); + for tree in map { + let mut lvl_idx = Vec::new(); + for (idx, (_, info)) in tree.iter_mut().enumerate() { + info.idx = idx; + lvl_idx.push(SstStatus::NotUpload); + } + + index.push(lvl_idx); + } + index + } + + pub fn find_ssts( + &self, + start_key: &Vec, + end_key: &Vec, + ) -> (Vec>, Vec>, usize) { + let (d, d_cnt) = { + let mut index_d = self.index_d.lock().unwrap(); + find_ssts_internal(&mut index_d, &self.map.0, start_key, end_key) + }; + let (w, w_cnt) = { + let mut index_w = self.index_w.lock().unwrap(); + find_ssts_internal(&mut index_w, &self.map.1, start_key, end_key) + }; + (d, w, d_cnt + w_cnt) + } + + pub fn release_index( + &self, + d: Vec>, + d_progress_l: usize, + d_progress_f: usize, + w: Vec>, + w_progress_l: usize, + w_progress_f: usize, + ) { + { + let mut index_d = self.index_d.lock().unwrap(); + release_index_internal(&mut index_d, d, d_progress_l, d_progress_f); + } + { + let mut index_w = self.index_w.lock().unwrap(); + release_index_internal(&mut index_w, w, w_progress_l, w_progress_f); + } + } +} + +fn find_ssts_internal( + index: &mut [Vec], + map: &SegmentMap, + start_key: &Vec, + end_key: &Vec, +) -> (Vec>, usize) { + let mut res = Vec::new(); + let mut count = 0; + for (level, tree) in map.iter().enumerate() { + let lvl_index = &mut index[level]; + let mut fs = Vec::new(); + for f in tree.iter().filter(|info| { + let idx = info.1.idx; + if !matches!(lvl_index[idx], SstStatus::NotUpload) { + return false; + } + + let sk = info.0; + let ek = &info.1.end_key; + if (end_key.is_empty() || sk < end_key) && (ek.is_empty() || ek > start_key) { + lvl_index[idx] = SstStatus::Uploading; + return true; + } + false + }) { + count += 1; + fs.push((f.1.file_name.clone(), f.1.idx)); + } + res.push(fs); + } + (res, count) +} + +fn release_index_internal( + index: &mut [Vec], + findex: Vec>, + progress_l: usize, + progress_f: usize, +) { + for (level, sst_index) in findex.iter().enumerate() { + for (_, idx) in sst_index { + index[level][*idx] = + if level < progress_l || (level == progress_l && *idx <= progress_f) { + SstStatus::Uploaded + } else { + SstStatus::NotUpload + }; + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use engine_traits::SstFileInfo; + + use super::SegmentMap; + + #[test] + fn test() { + use super::SegmentMapManager; + + let (id, manager) = + SegmentMapManager::register(generate_segment_map(), generate_segment_map()); + println!("{id}"); + let sk = "1_1".as_bytes().to_vec(); + let ek = "2_1".as_bytes().to_vec(); + let (d, _, cnt) = manager.find_ssts(&sk, &ek); + println!("{:?}", d); + assert!(cnt > 0); + } + + fn generate_segment_map() -> SegmentMap { + let mut map = vec![BTreeMap::new(), BTreeMap::new(), BTreeMap::new()]; + for (i, m) in map.iter_mut().enumerate() { + for j in 0..3 { + let sk = format!("{i}_{j}").into_bytes(); + let ek = format!("{}_{}", i, j + 1).into_bytes(); + m.insert( + sk, + SstFileInfo { + end_key: ek, + file_name: String::from("/asdfg.sst"), + idx: 0, + }, + ); + } + } + + map + } +} diff --git a/components/backup/src/service.rs b/components/backup/src/service.rs index a21328635f9..ef9e2bc1605 100644 --- a/components/backup/src/service.rs +++ b/components/backup/src/service.rs @@ -112,7 +112,7 @@ where } else { sink.fail(RpcStatus::with_message( RpcStatusCode::INTERNAL, - format!("prepare recv nothing"), + "prepare recv nothing".to_string(), )); }; }; @@ -145,7 +145,7 @@ where } else { sink.fail(RpcStatus::with_message( RpcStatusCode::INTERNAL, - format!("cleanup recv nothing"), + "cleanup recv nothing".to_string(), )); }; }; @@ -220,7 +220,7 @@ mod tests { use super::*; use crate::endpoint::tests::*; - fn new_rpc_suite() -> (Server, BackupClient, ReceiverWrapper) { + fn new_rpc_suite() -> (Server, BackupClient, ReceiverWrapper) { let env = Arc::new(EnvBuilder::new().build()); let (scheduler, rx) = dummy_scheduler(); let backup_service = super::Service::::new(scheduler); @@ -239,7 +239,7 @@ mod tests { fn test_client_stop() { let (_server, client, mut rx) = new_rpc_suite(); - let (tmp, endpoint) = new_endpoint(); + let (tmp, mut endpoint) = new_endpoint(); let mut engine = endpoint.engine.clone(); endpoint.region_info.set_regions(vec![ (b"".to_vec(), b"2".to_vec(), 1), @@ -272,11 +272,11 @@ mod tests { req.set_storage_backend(make_local_backend(&tmp.path().join(now.to_string()))); let stream = client.backup(&req).unwrap(); - let task = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + let operation = rx.recv_timeout(Duration::from_secs(5)).unwrap(); // Drop stream without start receiving will cause cancel error. drop(stream); // A stopped remote must not cause panic. - endpoint.handle_backup_task(task.unwrap()); + endpoint.run(operation.unwrap()); // Set an unique path to avoid AlreadyExists error. req.set_storage_backend(make_local_backend(&tmp.path().join(alloc_ts().to_string()))); @@ -285,14 +285,19 @@ mod tests { client.spawn(async move { let _ = stream.next().await; }); - let task = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + let operation = rx.recv_timeout(Duration::from_secs(5)).unwrap(); // A stopped remote must not cause panic. - endpoint.handle_backup_task(task.unwrap()); + endpoint.run(operation.unwrap()); // Set an unique path to avoid AlreadyExists error. req.set_storage_backend(make_local_backend(&tmp.path().join(alloc_ts().to_string()))); let stream = client.backup(&req).unwrap(); - let task = rx.recv().unwrap(); + let operation = rx.recv().unwrap(); + let task = if let Operation::BackupTask(task) = operation { + task + } else { + panic!("operation is not backup task: {}", operation) + }; // Drop stream without start receiving will cause cancel error. drop(stream); // Wait util the task is canceled in map_err. diff --git a/components/backup/src/utils.rs b/components/backup/src/utils.rs index 41af72e83d3..18c915bc29a 100644 --- a/components/backup/src/utils.rs +++ b/components/backup/src/utils.rs @@ -265,6 +265,13 @@ impl KeyValueCodec { } } +pub fn convert_encoded_key_to_raw_key(ek: Vec) -> Vec { + if ek.is_empty() { + return ek; + } + Key::from_encoded(ek).into_raw().unwrap() +} + #[cfg(test)] pub mod tests { use api_version::{KvFormat, RawValue}; diff --git a/components/engine_rocks/src/checkpoint.rs b/components/engine_rocks/src/checkpoint.rs index 0f86aa29945..ef18fc16e8d 100644 --- a/components/engine_rocks/src/checkpoint.rs +++ b/components/engine_rocks/src/checkpoint.rs @@ -1,17 +1,24 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::path::Path; +use std::{collections::BTreeMap, path::Path, sync::Arc}; -use engine_traits::{Checkpointable, Checkpointer, Result}; +use engine_traits::{ + CfName, Checkpointable, Checkpointer, ColumnFamilyMetadata, Result, SstFileInfo, +}; +use keys::{origin_key, validate_data_key}; +use rocksdb::DB; -use crate::{r2e, RocksEngine}; +use crate::{r2e, util, RocksEngine}; impl Checkpointable for RocksEngine { type Checkpointer = RocksEngineCheckpointer; fn new_checkpointer(&self) -> Result { match self.as_inner().new_checkpointer() { - Ok(pointer) => Ok(RocksEngineCheckpointer(pointer)), + Ok(pointer) => Ok(RocksEngineCheckpointer { + db: self.as_inner().clone(), + pointer, + }), Err(e) => Err(r2e(e)), } } @@ -25,7 +32,10 @@ impl Checkpointable for RocksEngine { } } -pub struct RocksEngineCheckpointer(rocksdb::Checkpointer); +pub struct RocksEngineCheckpointer { + db: Arc, + pointer: rocksdb::Checkpointer, +} impl Checkpointer for RocksEngineCheckpointer { fn create_at( @@ -34,15 +44,64 @@ impl Checkpointer for RocksEngineCheckpointer { titan_out_dir: Option<&Path>, log_size_for_flush: u64, ) -> Result<()> { - self.0 + self.pointer .create_at(db_out_dir, titan_out_dir, log_size_for_flush) .map_err(|e| r2e(e)) } + + fn column_family_meta_data(&self, cf: CfName) -> Result { + let db = &self.db; + let handle = util::get_cf_handle(db, cf)?; + let metadata = self.db.get_column_family_meta_data(handle); + let levels_metadata = metadata.get_levels(); + + let mut file_count: usize = 0; + let mut file_size: usize = 0; + let mut lssts = Vec::new(); + for level_metadata in levels_metadata { + let mut ssts = BTreeMap::new(); + let files = level_metadata.get_files(); + file_count += files.len(); + for file in files { + if let Some((start_key, end_key)) = + origin_if_data_key(file.get_smallestkey(), file.get_largestkey()) + { + file_size += file.get_size(); + ssts.insert( + start_key, + SstFileInfo { + file_name: file.get_name(), + end_key, + idx: 0, + }, + ); + } + } + lssts.push(ssts); + } + + Ok(ColumnFamilyMetadata { + file_count, + file_size, + ssts: lssts, + }) + } +} + +fn origin_if_data_key(start_key: &[u8], end_key: &[u8]) -> Option<(Vec, Vec)> { + match (validate_data_key(start_key), validate_data_key(end_key)) { + (true, true) => Some((origin_key(start_key).to_vec(), origin_key(end_key).to_vec())), + (true, false) => Some((origin_key(start_key).to_vec(), keys::DATA_MAX_KEY.to_vec())), + (false, true) => Some((keys::DATA_MIN_KEY.to_vec(), origin_key(end_key).to_vec())), + (false, false) => None, + } } #[cfg(test)] mod tests { - use engine_traits::{Checkpointable, Checkpointer, Peekable, SyncMutable, ALL_CFS}; + use engine_traits::{ + Checkpointable, Checkpointer, MiscExt, Peekable, SyncMutable, ALL_CFS, CF_DEFAULT, + }; use tempfile::tempdir; use crate::util::new_engine; @@ -60,4 +119,17 @@ mod tests { let engine2 = new_engine(path2.as_path().to_str().unwrap(), ALL_CFS).unwrap(); assert_eq!(engine2.get_value(b"key").unwrap().unwrap(), b"value"); } + + #[test] + fn test_column_family_meta_data() { + let dir = tempdir().unwrap(); + let path = dir.path().join("origin"); + let engine = new_engine(path.as_path().to_str().unwrap(), ALL_CFS).unwrap(); + engine.put_cf(CF_DEFAULT, b"key", b"value").unwrap(); + engine.flush_cf(CF_DEFAULT, true).unwrap(); + + let check_pointer = engine.new_checkpointer().unwrap(); + let t = check_pointer.column_family_meta_data(CF_DEFAULT).unwrap(); + println!("{:?}", t); + } } diff --git a/components/engine_traits/src/checkpoint.rs b/components/engine_traits/src/checkpoint.rs index 6b966d806fe..b730f46b766 100644 --- a/components/engine_traits/src/checkpoint.rs +++ b/components/engine_traits/src/checkpoint.rs @@ -1,8 +1,8 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::path::Path; +use std::{collections::BTreeMap, fmt::Debug, path::Path}; -use crate::Result; +use crate::{CfName, Result}; pub trait Checkpointable { type Checkpointer: Checkpointer; @@ -19,4 +19,43 @@ pub trait Checkpointer { titan_out_dir: Option<&Path>, log_size_for_flush: u64, ) -> Result<()>; + + fn column_family_meta_data(&self, _cf: CfName) -> Result { + unimplemented!() + } +} + +pub struct SstFileInfo { + pub file_name: String, + // pub start_key: Key, + pub end_key: Vec, + pub idx: usize, +} + +pub struct ColumnFamilyMetadata { + pub file_count: usize, + pub file_size: usize, + pub ssts: Vec, SstFileInfo>>, +} + +impl Debug for ColumnFamilyMetadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut binding = f.debug_struct("ColumnFamilyMetadata"); + binding.field("file_count", &self.file_count); + binding.field("file_size", &self.file_size); + + for (level, ssts) in self.ssts.iter().enumerate() { + let mut ss = String::new(); + for SstFileInfo { file_name, .. } in ssts.values() { + let str = format!("name: {file_name}"); + ss = ss + &str + } + binding.field(&format!("level: {level}"), &ss); + for sst in ssts { + binding.field("sk", sst.0); + } + } + + binding.finish() + } } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index db46b45b1ce..9e89c3f0ff3 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -1115,6 +1115,7 @@ where self.core.config.backup.clone(), self.concurrency_manager.clone(), self.core.config.storage.api_version(), + self.core.config.storage.data_dir.clone(), self.causal_ts_provider.clone(), ); self.cfg_controller.as_mut().unwrap().register( diff --git a/components/server/src/server2.rs b/components/server/src/server2.rs index da970a7e749..14ea66f12f8 100644 --- a/components/server/src/server2.rs +++ b/components/server/src/server2.rs @@ -868,6 +868,7 @@ where self.core.config.backup.clone(), self.concurrency_manager.clone(), self.core.config.storage.api_version(), + self.core.config.storage.data_dir.clone(), self.causal_ts_provider.clone(), ); self.cfg_controller.as_mut().unwrap().register( diff --git a/components/test_backup/src/lib.rs b/components/test_backup/src/lib.rs index 34eb6e8aa9e..ae687af42ea 100644 --- a/components/test_backup/src/lib.rs +++ b/components/test_backup/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use api_version::{dispatch_api_version, keyspace::KvPair, ApiV1, KvFormat, RawValue}; -use backup::Task; +use backup::{Operation, Task}; use collections::HashMap; use engine_traits::{CfName, IterOptions, CF_DEFAULT, CF_WRITE, DATA_KEY_PREFIX_LEN}; use external_storage_export::make_local_backend; @@ -40,7 +40,7 @@ use txn_types::TimeStamp; pub struct TestSuite { pub cluster: Cluster, - pub endpoints: HashMap>, + pub endpoints: HashMap>, pub tikv_cli: TikvClient, pub context: Context, pub ts: TimeStamp, @@ -94,6 +94,7 @@ impl TestSuite { }, sim.get_concurrency_manager(*id), api_version, + String::from("test"), None, ); let mut worker = bg_worker.lazy_build(format!("backup-{}", id)); @@ -301,7 +302,9 @@ impl TestSuite { let (tx, rx) = future_mpsc::unbounded(); for end in self.endpoints.values() { let (task, _) = Task::new(req.clone(), tx.clone()).unwrap(); - end.scheduler().schedule(task).unwrap(); + end.scheduler() + .schedule(Operation::BackupTask(task)) + .unwrap(); } rx } @@ -324,7 +327,9 @@ impl TestSuite { let (tx, rx) = future_mpsc::unbounded(); for end in self.endpoints.values() { let (task, _) = Task::new(req.clone(), tx.clone()).unwrap(); - end.scheduler().schedule(task).unwrap(); + end.scheduler() + .schedule(Operation::BackupTask(task)) + .unwrap(); } rx } diff --git a/components/test_raftstore-v2/src/server.rs b/components/test_raftstore-v2/src/server.rs index a439056617f..24136627bb5 100644 --- a/components/test_raftstore-v2/src/server.rs +++ b/components/test_raftstore-v2/src/server.rs @@ -123,6 +123,7 @@ impl TestRaftKv2 { impl Engine for TestRaftKv2 { type Snap = RegionSnapshot; type Local = EK; + type Checkpointer = EK::Checkpointer; fn kv_engine(&self) -> Option { self.raftkv.kv_engine() diff --git a/components/tikv_kv/src/btree_engine.rs b/components/tikv_kv/src/btree_engine.rs index 336523dd60c..e74556dd37f 100644 --- a/components/tikv_kv/src/btree_engine.rs +++ b/components/tikv_kv/src/btree_engine.rs @@ -12,7 +12,7 @@ use std::{ }; use collections::HashMap; -use engine_panic::PanicEngine; +use engine_panic::{checkpoint::PanicCheckpointer, PanicEngine}; use engine_traits::{CfName, IterOptions, ReadOptions, CF_DEFAULT, CF_LOCK, CF_WRITE}; use futures::{future, stream, Future, Stream}; use kvproto::kvrpcpb::Context; @@ -78,6 +78,7 @@ impl Default for BTreeEngine { impl Engine for BTreeEngine { type Snap = BTreeEngineSnapshot; type Local = PanicEngine; + type Checkpointer = PanicCheckpointer; fn kv_engine(&self) -> Option { unimplemented!(); diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs index 293ae7fccc1..8ec713eb493 100644 --- a/components/tikv_kv/src/lib.rs +++ b/components/tikv_kv/src/lib.rs @@ -36,8 +36,8 @@ use std::{ use collections::HashMap; use engine_traits::{ - CfName, IterOptions, KvEngine as LocalEngine, Mutable, MvccProperties, ReadOptions, - TabletRegistry, WriteBatch, CF_DEFAULT, CF_LOCK, + CfName, Checkpointer, IterOptions, KvEngine as LocalEngine, Mutable, MvccProperties, + ReadOptions, TabletRegistry, WriteBatch, CF_DEFAULT, CF_LOCK, }; use error_code::{self, ErrorCode, ErrorCodeExt}; use futures::{compat::Future01CompatExt, future::BoxFuture, prelude::*}; @@ -325,6 +325,7 @@ pub struct SnapContext<'a> { pub trait Engine: Send + Clone + 'static { type Snap: Snapshot; type Local: LocalEngine; + type Checkpointer: Checkpointer; /// Local storage engine. /// @@ -338,6 +339,11 @@ pub trait Engine: Send + Clone + 'static { unimplemented!() } + /// Get the checkpointer. + fn checkpointer(&self) -> Option { + unimplemented!() + } + /// Write modifications into internal local engine directly. /// /// region_modifies records each region's modifications. diff --git a/components/tikv_kv/src/mock_engine.rs b/components/tikv_kv/src/mock_engine.rs index 69a61d58963..38dda842d69 100644 --- a/components/tikv_kv/src/mock_engine.rs +++ b/components/tikv_kv/src/mock_engine.rs @@ -148,6 +148,7 @@ fn check_expected_write( impl Engine for MockEngine { type Snap = ::Snap; type Local = ::Local; + type Checkpointer = ::Checkpointer; fn kv_engine(&self) -> Option { self.base.kv_engine() diff --git a/components/tikv_kv/src/rocksdb_engine.rs b/components/tikv_kv/src/rocksdb_engine.rs index 21099974d2d..aef34a71a75 100644 --- a/components/tikv_kv/src/rocksdb_engine.rs +++ b/components/tikv_kv/src/rocksdb_engine.rs @@ -14,7 +14,8 @@ use std::{ use collections::HashMap; pub use engine_rocks::RocksSnapshot; use engine_rocks::{ - get_env, RocksCfOptions, RocksDbOptions, RocksEngine as BaseRocksEngine, RocksEngineIterator, + get_env, RocksCfOptions, RocksDbOptions, RocksEngine as BaseRocksEngine, + RocksEngineCheckpointer, RocksEngineIterator, }; use engine_traits::{ CfName, Engines, IterOptions, Iterable, Iterator, KvEngine, Peekable, ReadOptions, @@ -227,6 +228,7 @@ impl Debug for RocksEngine { impl Engine for RocksEngine { type Snap = Arc; type Local = BaseRocksEngine; + type Checkpointer = RocksEngineCheckpointer; fn kv_engine(&self) -> Option { Some(self.engines.kv.clone()) diff --git a/src/server/gc_worker/gc_worker.rs b/src/server/gc_worker/gc_worker.rs index 08232189552..87d9b41a62d 100644 --- a/src/server/gc_worker/gc_worker.rs +++ b/src/server/gc_worker/gc_worker.rs @@ -1300,7 +1300,7 @@ pub mod test_gc_worker { use std::sync::{Arc, Mutex}; use collections::HashMap; - use engine_rocks::{RocksEngine, RocksSnapshot}; + use engine_rocks::{RocksEngine, RocksEngineCheckpointer, RocksSnapshot}; use futures::Future; use kvproto::{ kvrpcpb::Context, @@ -1331,6 +1331,7 @@ pub mod test_gc_worker { // Use RegionSnapshot which can remove the z prefix internally. type Snap = RegionSnapshot; type Local = RocksEngine; + type Checkpointer = RocksEngineCheckpointer; fn kv_engine(&self) -> Option { self.0.kv_engine() @@ -1424,6 +1425,7 @@ pub mod test_gc_worker { impl Engine for MultiRocksEngine { type Snap = ::Snap; type Local = ::Local; + type Checkpointer = ::Checkpointer; fn kv_engine(&self) -> Option { None diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index bbae97ea293..5ce657d41c7 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -392,6 +392,7 @@ where { type Snap = RegionSnapshot; type Local = E; + type Checkpointer = E::Checkpointer; fn kv_engine(&self) -> Option { Some(self.engine.clone()) @@ -403,6 +404,10 @@ where self.router.clone() } + fn checkpointer(&self) -> Option { + self.engine.new_checkpointer().ok() + } + fn modify_on_kv_engine( &self, mut region_modifies: HashMap>, diff --git a/src/server/raftkv2/mod.rs b/src/server/raftkv2/mod.rs index 5935d542a37..e7552a40ade 100644 --- a/src/server/raftkv2/mod.rs +++ b/src/server/raftkv2/mod.rs @@ -133,6 +133,7 @@ impl RaftKv2 { impl tikv_kv::Engine for RaftKv2 { type Snap = RegionSnapshot; type Local = EK; + type Checkpointer = EK::Checkpointer; #[inline] fn kv_engine(&self) -> Option { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1e98a1b8257..d7e0637ad61 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -3093,6 +3093,7 @@ pub struct TxnTestEngine { impl Engine for TxnTestEngine { type Snap = TxnTestSnapshot; type Local = E::Local; + type Checkpointer = E::Checkpointer; fn kv_engine(&self) -> Option { self.engine.kv_engine()