From c7db07cc99cce2d63f4fbe6e8e03622ef2130259 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 13 Jun 2023 18:56:12 +0800 Subject: [PATCH 1/9] a base trait for singlerocks to get sst segment map Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 9 +++++++-- components/backup/src/service.rs | 19 ++++++++++++------- components/engine_panic/src/lib.rs | 1 + components/engine_panic/src/sst_properties.rs | 6 ++++++ components/engine_rocks/src/lib.rs | 2 ++ components/engine_rocks/src/sst_properties.rs | 10 ++++++++++ components/engine_traits/src/engine.rs | 1 + components/engine_traits/src/lib.rs | 2 ++ .../engine_traits/src/sst_properties.rs | 7 +++++++ components/tikv_kv/src/lib.rs | 5 +++++ src/server/raftkv/mod.rs | 4 ++++ 11 files changed, 57 insertions(+), 9 deletions(-) create mode 100644 components/engine_panic/src/sst_properties.rs create mode 100644 components/engine_rocks/src/sst_properties.rs create mode 100644 components/engine_traits/src/sst_properties.rs diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index d436a9c62a9..998ba02f68d 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -1131,6 +1131,10 @@ impl Endpoint { )); } } + + pub fn prepare(&self, _persistence: bool, _tx: Sender) { + let _engine = self.engine.clone().sst_segmentmap(); + } } impl Runnable for Endpoint { @@ -1146,8 +1150,9 @@ impl Runnable for Endpoint %task); self.handle_backup_task(task); } - Operation::Prepare(_persistent, _tx) => { - unimplemented!(); + Operation::Prepare(persistent, tx) => { + info!("run prepare"); + self.prepare(persistent, tx); } Operation::Cleanup(_unique_id, _tx) => { unimplemented!(); diff --git a/components/backup/src/service.rs b/components/backup/src/service.rs index a21328635f9..3d5626fd0e9 100644 --- a/components/backup/src/service.rs +++ b/components/backup/src/service.rs @@ -220,7 +220,7 @@ mod tests { use super::*; use crate::endpoint::tests::*; - fn new_rpc_suite() -> (Server, BackupClient, ReceiverWrapper) { + fn new_rpc_suite() -> (Server, BackupClient, ReceiverWrapper) { let env = Arc::new(EnvBuilder::new().build()); let (scheduler, rx) = dummy_scheduler(); let backup_service = super::Service::::new(scheduler); @@ -239,7 +239,7 @@ mod tests { fn test_client_stop() { let (_server, client, mut rx) = new_rpc_suite(); - let (tmp, endpoint) = new_endpoint(); + let (tmp, mut endpoint) = new_endpoint(); let mut engine = endpoint.engine.clone(); endpoint.region_info.set_regions(vec![ (b"".to_vec(), b"2".to_vec(), 1), @@ -272,11 +272,11 @@ mod tests { req.set_storage_backend(make_local_backend(&tmp.path().join(now.to_string()))); let stream = client.backup(&req).unwrap(); - let task = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + let operation = rx.recv_timeout(Duration::from_secs(5)).unwrap(); // Drop stream without start receiving will cause cancel error. drop(stream); // A stopped remote must not cause panic. - endpoint.handle_backup_task(task.unwrap()); + endpoint.run(operation.unwrap()); // Set an unique path to avoid AlreadyExists error. req.set_storage_backend(make_local_backend(&tmp.path().join(alloc_ts().to_string()))); @@ -285,14 +285,19 @@ mod tests { client.spawn(async move { let _ = stream.next().await; }); - let task = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + let operation = rx.recv_timeout(Duration::from_secs(5)).unwrap(); // A stopped remote must not cause panic. - endpoint.handle_backup_task(task.unwrap()); + endpoint.run(operation.unwrap()); // Set an unique path to avoid AlreadyExists error. req.set_storage_backend(make_local_backend(&tmp.path().join(alloc_ts().to_string()))); let stream = client.backup(&req).unwrap(); - let task = rx.recv().unwrap(); + let operation = rx.recv().unwrap(); + let task = if let Operation::BackupTask(task) = operation { + task + } else { + panic!("operation is not backup task: {}", operation) + }; // Drop stream without start receiving will cause cancel error. drop(stream); // Wait util the task is canceled in map_err. diff --git a/components/engine_panic/src/lib.rs b/components/engine_panic/src/lib.rs index 93555f5ba5f..114a924c6cc 100644 --- a/components/engine_panic/src/lib.rs +++ b/components/engine_panic/src/lib.rs @@ -46,5 +46,6 @@ pub use crate::flow_control_factors::*; pub mod table_properties; pub use crate::table_properties::*; pub mod checkpoint; +pub mod sst_properties; mod raft_engine; diff --git a/components/engine_panic/src/sst_properties.rs b/components/engine_panic/src/sst_properties.rs new file mode 100644 index 00000000000..512b7a8f147 --- /dev/null +++ b/components/engine_panic/src/sst_properties.rs @@ -0,0 +1,6 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. +use engine_traits::SSTPropertiesExt; + +use crate::engine::PanicEngine; + +impl SSTPropertiesExt for PanicEngine {} diff --git a/components/engine_rocks/src/lib.rs b/components/engine_rocks/src/lib.rs index b5561b3de42..c50717ea072 100644 --- a/components/engine_rocks/src/lib.rs +++ b/components/engine_rocks/src/lib.rs @@ -68,6 +68,8 @@ pub use crate::perf_context_impl::{ PerfStatisticsInstant, ReadPerfContext, ReadPerfInstant, WritePerfContext, WritePerfInstant, }; mod perf_context_metrics; +mod sst_properties; +pub use crate::sst_properties::*; mod engine_iterator; pub use crate::engine_iterator::*; diff --git a/components/engine_rocks/src/sst_properties.rs b/components/engine_rocks/src/sst_properties.rs new file mode 100644 index 00000000000..289419e0fa9 --- /dev/null +++ b/components/engine_rocks/src/sst_properties.rs @@ -0,0 +1,10 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. +use engine_traits::SSTPropertiesExt; + +use crate::RocksEngine; + +impl SSTPropertiesExt for RocksEngine { + fn test(&self) { + let _live_files = self.as_inner().get_live_files(); + } +} diff --git a/components/engine_traits/src/engine.rs b/components/engine_traits/src/engine.rs index b3ee1c93b05..8a9d1bfc014 100644 --- a/components/engine_traits/src/engine.rs +++ b/components/engine_traits/src/engine.rs @@ -26,6 +26,7 @@ pub trait KvEngine: + TtlPropertiesExt + TablePropertiesExt + PerfContextExt + + SSTPropertiesExt + MiscExt + Send + Sync diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index 45a3d18fa7a..be15a3f471e 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -311,6 +311,8 @@ mod table_properties; pub use crate::table_properties::*; mod checkpoint; pub use crate::checkpoint::*; +mod sst_properties; +pub use crate::sst_properties::*; // These modules contain more general traits, some of which may be implemented // by multiple types. diff --git a/components/engine_traits/src/sst_properties.rs b/components/engine_traits/src/sst_properties.rs new file mode 100644 index 00000000000..e6f3ed0bc43 --- /dev/null +++ b/components/engine_traits/src/sst_properties.rs @@ -0,0 +1,7 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +pub trait SSTPropertiesExt { + fn test(&self) { + unimplemented!() + } +} diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs index 293ae7fccc1..263bf09ed11 100644 --- a/components/tikv_kv/src/lib.rs +++ b/components/tikv_kv/src/lib.rs @@ -338,6 +338,11 @@ pub trait Engine: Send + Clone + 'static { unimplemented!() } + /// Get the metadata. + fn sst_segmentmap(&self) { + unimplemented!() + } + /// Write modifications into internal local engine directly. /// /// region_modifies records each region's modifications. diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index bbae97ea293..8b3f403489f 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -403,6 +403,10 @@ where self.router.clone() } + fn sst_segmentmap(&self) { + let _kv_engine = self.engine.test(); + } + fn modify_on_kv_engine( &self, mut region_modifies: HashMap>, From aadc0d5b48da0902b0f52360533c642ba364f721 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Wed, 14 Jun 2023 17:46:07 +0800 Subject: [PATCH 2/9] implement get segment map for default_cf and write_cf Signed-off-by: Leavrth --- Cargo.lock | 1 + components/backup/Cargo.toml | 1 + components/backup/src/endpoint.rs | 26 +++++++- components/backup/src/lib.rs | 1 + components/backup/src/segment_manager.rs | 22 +++++++ components/backup/src/service.rs | 4 +- components/engine_panic/src/lib.rs | 1 - components/engine_panic/src/sst_properties.rs | 6 -- components/engine_rocks/src/checkpoint.rs | 66 +++++++++++++++++-- components/engine_rocks/src/lib.rs | 2 - components/engine_rocks/src/sst_properties.rs | 10 --- components/engine_traits/src/checkpoint.rs | 41 +++++++++++- components/engine_traits/src/engine.rs | 1 - components/engine_traits/src/lib.rs | 2 - .../engine_traits/src/sst_properties.rs | 7 -- components/test_raftstore-v2/src/server.rs | 1 + components/tikv_kv/src/btree_engine.rs | 3 +- components/tikv_kv/src/lib.rs | 7 +- components/tikv_kv/src/mock_engine.rs | 1 + components/tikv_kv/src/rocksdb_engine.rs | 3 +- src/server/gc_worker/gc_worker.rs | 4 +- src/server/raftkv/mod.rs | 5 +- src/server/raftkv2/mod.rs | 1 + src/storage/mod.rs | 1 + 24 files changed, 166 insertions(+), 51 deletions(-) create mode 100644 components/backup/src/segment_manager.rs delete mode 100644 components/engine_panic/src/sst_properties.rs delete mode 100644 components/engine_rocks/src/sst_properties.rs delete mode 100644 components/engine_traits/src/sst_properties.rs diff --git a/Cargo.lock b/Cargo.lock index 963649830ad..48c2336282b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,7 @@ dependencies = [ "tokio", "tokio-stream", "txn_types", + "uuid 0.8.2", "yatp", ] diff --git a/components/backup/Cargo.toml b/components/backup/Cargo.toml index 6cb4edfe7dc..978686ef92a 100644 --- a/components/backup/Cargo.toml +++ b/components/backup/Cargo.toml @@ -74,6 +74,7 @@ tikv_util = { workspace = true } tokio = { version = "1.5", features = ["rt-multi-thread"] } tokio-stream = "0.1" txn_types = { workspace = true } +uuid = "0.8" yatp = { workspace = true } [dev-dependencies] diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 998ba02f68d..9e5366fa3f5 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -11,7 +11,7 @@ use std::{ use async_channel::SendError; use causal_ts::{CausalTsProvider, CausalTsProviderImpl}; use concurrency_manager::ConcurrencyManager; -use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType}; +use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType, Checkpointer, CF_DEFAULT, CF_WRITE}; use external_storage::{BackendConfig, HdfsConfig}; use external_storage_export::{create_storage, ExternalStorage}; use futures::{channel::mpsc::*, executor::block_on}; @@ -24,6 +24,7 @@ use kvproto::{ use online_config::OnlineConfig; use raft::StateRole; use raftstore::coprocessor::RegionInfoProvider; +use segment_manager::SegmentMapManager; use tikv::{ config::BackupConfig, storage::{ @@ -690,6 +691,7 @@ pub struct Endpoint { tablets: LocalTablets, config_manager: ConfigManager, concurrency_manager: ConcurrencyManager, + segment_manager: SegmentMapManager, softlimit: SoftLimitKeeper, api_version: ApiVersion, causal_ts_provider: Option>, // used in rawkv apiv2 only @@ -853,6 +855,7 @@ impl Endpoint { let config_manager = ConfigManager(Arc::new(RwLock::new(config))); let softlimit = SoftLimitKeeper::new(config_manager.clone()); rt.spawn(softlimit.clone().run()); + let segment_manager = SegmentMapManager::new(); Endpoint { store_id, engine, @@ -863,6 +866,7 @@ impl Endpoint { softlimit, config_manager, concurrency_manager, + segment_manager, api_version, causal_ts_provider, } @@ -1132,8 +1136,24 @@ impl Endpoint { } } - pub fn prepare(&self, _persistence: bool, _tx: Sender) { - let _engine = self.engine.clone().sst_segmentmap(); + pub fn prepare(&mut self, _persistence: bool, mut tx: Sender) { + let checkpointer = self.engine.checkpointer().unwrap(); + let default_metadata = checkpointer.column_family_meta_data(CF_DEFAULT).unwrap(); + let write_metadata = checkpointer.column_family_meta_data(CF_WRITE).unwrap(); + + let s1 = format!("default_segmentmap:{:?}", default_metadata); + let s2 = format!("write_segmentmap:{:?}", write_metadata); + info!("{}", s1); + info!("{}", s2); + + let id = self.segment_manager.register(default_metadata.ssts, write_metadata.ssts); + let mut resp = PrepareResponse::new(); + resp.set_unique_id(id); + resp.set_collect_file_count((default_metadata.file_count + write_metadata.file_count) as u64); + resp.set_collect_file_size((default_metadata.file_size + write_metadata.file_size) as u64); + if let Err(e) = tx.try_send(resp) { + error_unknown!(?e; "failed to send response"); + } } } diff --git a/components/backup/src/lib.rs b/components/backup/src/lib.rs index c219773bc05..abf371c281c 100644 --- a/components/backup/src/lib.rs +++ b/components/backup/src/lib.rs @@ -12,6 +12,7 @@ mod service; mod softlimit; mod utils; mod writer; +mod segment_manager; pub use endpoint::{backup_file_name, Endpoint, Operation, Task}; pub use errors::{Error, Result}; diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs new file mode 100644 index 00000000000..1a9bee2ca2c --- /dev/null +++ b/components/backup/src/segment_manager.rs @@ -0,0 +1,22 @@ +// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. + +use engine_traits::SstFileInfo; +use std::collections::{HashMap, BTreeMap}; +use txn_types::Key; + +type SegmentMap = Vec>; + +pub struct SegmentMapManager (HashMap); + +impl SegmentMapManager { + pub fn new() -> Self { + Self (HashMap::new()) + } + + pub fn register(&mut self, d: SegmentMap, w: SegmentMap) -> String { + let id = uuid::Uuid::new_v4().to_string(); + self.0.insert(id.clone(), (d, w)); + id + } + +} \ No newline at end of file diff --git a/components/backup/src/service.rs b/components/backup/src/service.rs index 3d5626fd0e9..ef9e2bc1605 100644 --- a/components/backup/src/service.rs +++ b/components/backup/src/service.rs @@ -112,7 +112,7 @@ where } else { sink.fail(RpcStatus::with_message( RpcStatusCode::INTERNAL, - format!("prepare recv nothing"), + "prepare recv nothing".to_string(), )); }; }; @@ -145,7 +145,7 @@ where } else { sink.fail(RpcStatus::with_message( RpcStatusCode::INTERNAL, - format!("cleanup recv nothing"), + "cleanup recv nothing".to_string(), )); }; }; diff --git a/components/engine_panic/src/lib.rs b/components/engine_panic/src/lib.rs index 114a924c6cc..93555f5ba5f 100644 --- a/components/engine_panic/src/lib.rs +++ b/components/engine_panic/src/lib.rs @@ -46,6 +46,5 @@ pub use crate::flow_control_factors::*; pub mod table_properties; pub use crate::table_properties::*; pub mod checkpoint; -pub mod sst_properties; mod raft_engine; diff --git a/components/engine_panic/src/sst_properties.rs b/components/engine_panic/src/sst_properties.rs deleted file mode 100644 index 512b7a8f147..00000000000 --- a/components/engine_panic/src/sst_properties.rs +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. -use engine_traits::SSTPropertiesExt; - -use crate::engine::PanicEngine; - -impl SSTPropertiesExt for PanicEngine {} diff --git a/components/engine_rocks/src/checkpoint.rs b/components/engine_rocks/src/checkpoint.rs index 0f86aa29945..35d29cc6c73 100644 --- a/components/engine_rocks/src/checkpoint.rs +++ b/components/engine_rocks/src/checkpoint.rs @@ -1,17 +1,22 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::path::Path; +use std::{path::Path, collections::BTreeMap, sync::Arc}; -use engine_traits::{Checkpointable, Checkpointer, Result}; +use engine_traits::{Checkpointable, Checkpointer, Result, SstFileInfo, CfName, ColumnFamilyMetadata}; +use rocksdb::DB; +use txn_types::Key; -use crate::{r2e, RocksEngine}; +use crate::{r2e, RocksEngine, util}; impl Checkpointable for RocksEngine { type Checkpointer = RocksEngineCheckpointer; fn new_checkpointer(&self) -> Result { match self.as_inner().new_checkpointer() { - Ok(pointer) => Ok(RocksEngineCheckpointer(pointer)), + Ok(pointer) => Ok(RocksEngineCheckpointer{ + db: self.as_inner().clone(), + pointer + }), Err(e) => Err(r2e(e)), } } @@ -25,7 +30,10 @@ impl Checkpointable for RocksEngine { } } -pub struct RocksEngineCheckpointer(rocksdb::Checkpointer); +pub struct RocksEngineCheckpointer { + db: Arc, + pointer: rocksdb::Checkpointer +} impl Checkpointer for RocksEngineCheckpointer { fn create_at( @@ -34,15 +42,46 @@ impl Checkpointer for RocksEngineCheckpointer { titan_out_dir: Option<&Path>, log_size_for_flush: u64, ) -> Result<()> { - self.0 + self.pointer .create_at(db_out_dir, titan_out_dir, log_size_for_flush) .map_err(|e| r2e(e)) } + + fn column_family_meta_data(&self, cf: CfName) -> Result { + let db = &self.db; + let handle = util::get_cf_handle(db, cf)?; + let metadata = self.db.get_column_family_meta_data(handle); + let levels_metadata = metadata.get_levels(); + + let mut file_count: usize = 0; + let mut file_size: usize = 0; + let mut lssts = Vec::new(); + for level_metadata in levels_metadata { + let mut ssts = BTreeMap::new(); + let files = level_metadata.get_files(); + file_count += files.len(); + for file in files { + file_size += file.get_size(); + let start_key = Key::from_raw(file.get_smallestkey()); + ssts.insert(start_key, SstFileInfo{ + file_name: file.get_name(), + end_key: Key::from_raw(file.get_largestkey()), + }); + }; + lssts.push(ssts); + } + + Ok(ColumnFamilyMetadata { + file_count, + file_size, + ssts: lssts + }) + } } #[cfg(test)] mod tests { - use engine_traits::{Checkpointable, Checkpointer, Peekable, SyncMutable, ALL_CFS}; + use engine_traits::{Checkpointable, Checkpointer, Peekable, SyncMutable, ALL_CFS, CF_DEFAULT, MiscExt}; use tempfile::tempdir; use crate::util::new_engine; @@ -60,4 +99,17 @@ mod tests { let engine2 = new_engine(path2.as_path().to_str().unwrap(), ALL_CFS).unwrap(); assert_eq!(engine2.get_value(b"key").unwrap().unwrap(), b"value"); } + + #[test] + fn test_column_family_meta_data() { + let dir = tempdir().unwrap(); + let path = dir.path().join("origin"); + let engine = new_engine(path.as_path().to_str().unwrap(), ALL_CFS).unwrap(); + engine.put_cf(CF_DEFAULT, b"key", b"value").unwrap(); + engine.flush_cf(CF_DEFAULT, true).unwrap(); + + let check_pointer = engine.new_checkpointer().unwrap(); + let t = check_pointer.column_family_meta_data(CF_DEFAULT).unwrap(); + println!("{:?}", t); + } } diff --git a/components/engine_rocks/src/lib.rs b/components/engine_rocks/src/lib.rs index c50717ea072..b5561b3de42 100644 --- a/components/engine_rocks/src/lib.rs +++ b/components/engine_rocks/src/lib.rs @@ -68,8 +68,6 @@ pub use crate::perf_context_impl::{ PerfStatisticsInstant, ReadPerfContext, ReadPerfInstant, WritePerfContext, WritePerfInstant, }; mod perf_context_metrics; -mod sst_properties; -pub use crate::sst_properties::*; mod engine_iterator; pub use crate::engine_iterator::*; diff --git a/components/engine_rocks/src/sst_properties.rs b/components/engine_rocks/src/sst_properties.rs deleted file mode 100644 index 289419e0fa9..00000000000 --- a/components/engine_rocks/src/sst_properties.rs +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. -use engine_traits::SSTPropertiesExt; - -use crate::RocksEngine; - -impl SSTPropertiesExt for RocksEngine { - fn test(&self) { - let _live_files = self.as_inner().get_live_files(); - } -} diff --git a/components/engine_traits/src/checkpoint.rs b/components/engine_traits/src/checkpoint.rs index 6b966d806fe..c2bf53ad4d7 100644 --- a/components/engine_traits/src/checkpoint.rs +++ b/components/engine_traits/src/checkpoint.rs @@ -1,8 +1,10 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::path::Path; +use std::{path::Path, collections::BTreeMap, fmt::Debug}; -use crate::Result; +use txn_types::Key; + +use crate::{Result, CfName}; pub trait Checkpointable { type Checkpointer: Checkpointer; @@ -19,4 +21,39 @@ pub trait Checkpointer { titan_out_dir: Option<&Path>, log_size_for_flush: u64, ) -> Result<()>; + + fn column_family_meta_data(&self, _cf: CfName) -> Result { + unimplemented!() + } } + +pub struct SstFileInfo { + pub file_name: String, + //pub start_key: Key, + pub end_key: Key, +} + +pub struct ColumnFamilyMetadata { + pub file_count: usize, + pub file_size: usize, + pub ssts: Vec>, +} + +impl Debug for ColumnFamilyMetadata { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut binding = f.debug_struct("ColumnFamilyMetadata"); + binding.field("file_count", &self.file_count); + binding.field("file_size", &self.file_size); + + for (level, ssts) in self.ssts.iter().enumerate() { + let mut ss = String::new(); + for SstFileInfo{file_name, ..} in ssts { + let str = format!("name: {file_name}"); + ss = ss + &str + } + binding.field(&format!("level: {level}"), &ss); + } + + binding.finish() + } +} \ No newline at end of file diff --git a/components/engine_traits/src/engine.rs b/components/engine_traits/src/engine.rs index 8a9d1bfc014..b3ee1c93b05 100644 --- a/components/engine_traits/src/engine.rs +++ b/components/engine_traits/src/engine.rs @@ -26,7 +26,6 @@ pub trait KvEngine: + TtlPropertiesExt + TablePropertiesExt + PerfContextExt - + SSTPropertiesExt + MiscExt + Send + Sync diff --git a/components/engine_traits/src/lib.rs b/components/engine_traits/src/lib.rs index be15a3f471e..45a3d18fa7a 100644 --- a/components/engine_traits/src/lib.rs +++ b/components/engine_traits/src/lib.rs @@ -311,8 +311,6 @@ mod table_properties; pub use crate::table_properties::*; mod checkpoint; pub use crate::checkpoint::*; -mod sst_properties; -pub use crate::sst_properties::*; // These modules contain more general traits, some of which may be implemented // by multiple types. diff --git a/components/engine_traits/src/sst_properties.rs b/components/engine_traits/src/sst_properties.rs deleted file mode 100644 index e6f3ed0bc43..00000000000 --- a/components/engine_traits/src/sst_properties.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. - -pub trait SSTPropertiesExt { - fn test(&self) { - unimplemented!() - } -} diff --git a/components/test_raftstore-v2/src/server.rs b/components/test_raftstore-v2/src/server.rs index a439056617f..24136627bb5 100644 --- a/components/test_raftstore-v2/src/server.rs +++ b/components/test_raftstore-v2/src/server.rs @@ -123,6 +123,7 @@ impl TestRaftKv2 { impl Engine for TestRaftKv2 { type Snap = RegionSnapshot; type Local = EK; + type Checkpointer = EK::Checkpointer; fn kv_engine(&self) -> Option { self.raftkv.kv_engine() diff --git a/components/tikv_kv/src/btree_engine.rs b/components/tikv_kv/src/btree_engine.rs index 336523dd60c..c2df90faab8 100644 --- a/components/tikv_kv/src/btree_engine.rs +++ b/components/tikv_kv/src/btree_engine.rs @@ -12,7 +12,7 @@ use std::{ }; use collections::HashMap; -use engine_panic::PanicEngine; +use engine_panic::{PanicEngine, checkpoint::PanicCheckpointer}; use engine_traits::{CfName, IterOptions, ReadOptions, CF_DEFAULT, CF_LOCK, CF_WRITE}; use futures::{future, stream, Future, Stream}; use kvproto::kvrpcpb::Context; @@ -78,6 +78,7 @@ impl Default for BTreeEngine { impl Engine for BTreeEngine { type Snap = BTreeEngineSnapshot; type Local = PanicEngine; + type Checkpointer = PanicCheckpointer; fn kv_engine(&self) -> Option { unimplemented!(); diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs index 263bf09ed11..539fef0cbdd 100644 --- a/components/tikv_kv/src/lib.rs +++ b/components/tikv_kv/src/lib.rs @@ -37,7 +37,7 @@ use std::{ use collections::HashMap; use engine_traits::{ CfName, IterOptions, KvEngine as LocalEngine, Mutable, MvccProperties, ReadOptions, - TabletRegistry, WriteBatch, CF_DEFAULT, CF_LOCK, + TabletRegistry, WriteBatch, CF_DEFAULT, CF_LOCK, Checkpointer, }; use error_code::{self, ErrorCode, ErrorCodeExt}; use futures::{compat::Future01CompatExt, future::BoxFuture, prelude::*}; @@ -325,6 +325,7 @@ pub struct SnapContext<'a> { pub trait Engine: Send + Clone + 'static { type Snap: Snapshot; type Local: LocalEngine; + type Checkpointer: Checkpointer; /// Local storage engine. /// @@ -338,8 +339,8 @@ pub trait Engine: Send + Clone + 'static { unimplemented!() } - /// Get the metadata. - fn sst_segmentmap(&self) { + /// Get the checkpointer. + fn checkpointer(&self) -> Option { unimplemented!() } diff --git a/components/tikv_kv/src/mock_engine.rs b/components/tikv_kv/src/mock_engine.rs index 69a61d58963..38dda842d69 100644 --- a/components/tikv_kv/src/mock_engine.rs +++ b/components/tikv_kv/src/mock_engine.rs @@ -148,6 +148,7 @@ fn check_expected_write( impl Engine for MockEngine { type Snap = ::Snap; type Local = ::Local; + type Checkpointer = ::Checkpointer; fn kv_engine(&self) -> Option { self.base.kv_engine() diff --git a/components/tikv_kv/src/rocksdb_engine.rs b/components/tikv_kv/src/rocksdb_engine.rs index 21099974d2d..df21dd5f3ea 100644 --- a/components/tikv_kv/src/rocksdb_engine.rs +++ b/components/tikv_kv/src/rocksdb_engine.rs @@ -14,7 +14,7 @@ use std::{ use collections::HashMap; pub use engine_rocks::RocksSnapshot; use engine_rocks::{ - get_env, RocksCfOptions, RocksDbOptions, RocksEngine as BaseRocksEngine, RocksEngineIterator, + get_env, RocksCfOptions, RocksDbOptions, RocksEngine as BaseRocksEngine, RocksEngineIterator, RocksEngineCheckpointer }; use engine_traits::{ CfName, Engines, IterOptions, Iterable, Iterator, KvEngine, Peekable, ReadOptions, @@ -227,6 +227,7 @@ impl Debug for RocksEngine { impl Engine for RocksEngine { type Snap = Arc; type Local = BaseRocksEngine; + type Checkpointer = RocksEngineCheckpointer; fn kv_engine(&self) -> Option { Some(self.engines.kv.clone()) diff --git a/src/server/gc_worker/gc_worker.rs b/src/server/gc_worker/gc_worker.rs index 08232189552..5b235f1d4be 100644 --- a/src/server/gc_worker/gc_worker.rs +++ b/src/server/gc_worker/gc_worker.rs @@ -1300,7 +1300,7 @@ pub mod test_gc_worker { use std::sync::{Arc, Mutex}; use collections::HashMap; - use engine_rocks::{RocksEngine, RocksSnapshot}; + use engine_rocks::{RocksEngine, RocksSnapshot, RocksEngineCheckpointer}; use futures::Future; use kvproto::{ kvrpcpb::Context, @@ -1331,6 +1331,7 @@ pub mod test_gc_worker { // Use RegionSnapshot which can remove the z prefix internally. type Snap = RegionSnapshot; type Local = RocksEngine; + type Checkpointer = RocksEngineCheckpointer; fn kv_engine(&self) -> Option { self.0.kv_engine() @@ -1424,6 +1425,7 @@ pub mod test_gc_worker { impl Engine for MultiRocksEngine { type Snap = ::Snap; type Local = ::Local; + type Checkpointer = ::Checkpointer; fn kv_engine(&self) -> Option { None diff --git a/src/server/raftkv/mod.rs b/src/server/raftkv/mod.rs index 8b3f403489f..5ce657d41c7 100644 --- a/src/server/raftkv/mod.rs +++ b/src/server/raftkv/mod.rs @@ -392,6 +392,7 @@ where { type Snap = RegionSnapshot; type Local = E; + type Checkpointer = E::Checkpointer; fn kv_engine(&self) -> Option { Some(self.engine.clone()) @@ -403,8 +404,8 @@ where self.router.clone() } - fn sst_segmentmap(&self) { - let _kv_engine = self.engine.test(); + fn checkpointer(&self) -> Option { + self.engine.new_checkpointer().ok() } fn modify_on_kv_engine( diff --git a/src/server/raftkv2/mod.rs b/src/server/raftkv2/mod.rs index 5935d542a37..e7552a40ade 100644 --- a/src/server/raftkv2/mod.rs +++ b/src/server/raftkv2/mod.rs @@ -133,6 +133,7 @@ impl RaftKv2 { impl tikv_kv::Engine for RaftKv2 { type Snap = RegionSnapshot; type Local = EK; + type Checkpointer = EK::Checkpointer; #[inline] fn kv_engine(&self) -> Option { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1e98a1b8257..d7e0637ad61 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -3093,6 +3093,7 @@ pub struct TxnTestEngine { impl Engine for TxnTestEngine { type Snap = TxnTestSnapshot; type Local = E::Local; + type Checkpointer = E::Checkpointer; fn kv_engine(&self) -> Option { self.engine.kv_engine() From b65014abe44956e8539a706e2c927d19168dd539 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 15 Jun 2023 20:06:28 +0800 Subject: [PATCH 3/9] implement file backup Signed-off-by: Leavrth --- Cargo.lock | 1 + components/backup/Cargo.toml | 1 + components/backup/src/endpoint.rs | 205 ++++++++++++++++++++- components/backup/src/segment_manager.rs | 125 ++++++++++++- components/engine_rocks/src/checkpoint.rs | 6 +- components/engine_traits/src/checkpoint.rs | 12 +- components/test_backup/src/lib.rs | 8 +- 7 files changed, 337 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48c2336282b..6f17864f2f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,6 +547,7 @@ dependencies = [ "tikv_util", "tokio", "tokio-stream", + "tokio-util", "txn_types", "uuid 0.8.2", "yatp", diff --git a/components/backup/Cargo.toml b/components/backup/Cargo.toml index 978686ef92a..a2a425a17eb 100644 --- a/components/backup/Cargo.toml +++ b/components/backup/Cargo.toml @@ -72,6 +72,7 @@ tikv = { workspace = true } tikv_alloc = { workspace = true } tikv_util = { workspace = true } tokio = { version = "1.5", features = ["rt-multi-thread"] } +tokio-util = { version = "0.7", features = ["compat"] } tokio-stream = "0.1" txn_types = { workspace = true } uuid = "0.8" diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 9e5366fa3f5..9d8fd161ad9 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -12,7 +12,7 @@ use async_channel::SendError; use causal_ts::{CausalTsProvider, CausalTsProviderImpl}; use concurrency_manager::ConcurrencyManager; use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType, Checkpointer, CF_DEFAULT, CF_WRITE}; -use external_storage::{BackendConfig, HdfsConfig}; +use external_storage::{BackendConfig, HdfsConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; use futures::{channel::mpsc::*, executor::block_on}; use kvproto::{ @@ -43,6 +43,7 @@ use tikv_util::{ worker::Runnable, }; use tokio::runtime::Runtime; +use tokio_util::compat::TokioAsyncReadCompatExt; use txn_types::{Key, Lock, TimeStamp}; use crate::{ @@ -72,6 +73,8 @@ struct Request { compression_level: i32, cipher: CipherInfo, replica_read: bool, + mode: BackupMode, + ssts_id: String, } // Backup Operation corrosponsed to backup service @@ -104,6 +107,8 @@ impl fmt::Debug for Task { .field("is_raw_kv", &self.request.is_raw_kv) .field("dst_api_ver", &self.request.dst_api_ver) .field("cf", &self.request.cf) + .field("mode", &self.request.mode) + .field("unique_id", &self.request.ssts_id) .finish() } } @@ -142,6 +147,8 @@ impl Task { compression_type: req.get_compression_type(), compression_level: req.get_compression_level(), replica_read: req.get_replica_read(), + mode: req.get_mode(), + ssts_id: req.unique_id, cipher: req.cipher_info.unwrap_or_else(|| { let mut cipher = CipherInfo::default(); cipher.set_cipher_type(EncryptionMethod::Plaintext); @@ -299,6 +306,94 @@ async fn send_to_worker_with_metrics( Ok(()) } +struct SstSendInfo { + file_names_d: Vec>, + file_names_w: Vec>, + start_key: Vec, + end_key: Vec, +} + +async fn save_sst_file_worker ( + mut segment_manager: SegmentMapManager, + rx: async_channel::Receiver, + tx: UnboundedSender, + storage: Arc, +) { + while let Ok(msg) = rx.recv().await { + let mut response = BackupResponse::default(); + let mut d_progress_l = 0; + let mut d_progress_f = 0; + let mut w_progress_l = 0; + let mut w_progress_f = 0; + match upload_sst_file( + &msg.file_names_d, + &msg.file_names_w, + &mut d_progress_l, &mut d_progress_f, + &mut w_progress_l, &mut w_progress_f, + storage.clone()).await { + Err(e) => { + error_unknown!(?e; "upload sst file failed"; + "start_key" => &log_wrappers::Value::key(&msg.start_key), + "end_key" => &log_wrappers::Value::key(&msg.end_key), + ); + let e = Error::from(e); + response.set_error(e.into()); + } + Ok(_) => { + let file = File::default(); + response.set_files(vec![file].into()); + } + } + + segment_manager.release_index( + msg.file_names_d, d_progress_l, d_progress_f, + msg.file_names_w, w_progress_l, w_progress_f, + ); + + response.set_start_key(msg.start_key.clone()); + response.set_end_key(msg.end_key.clone()); + // todo: send the count. + if let Err(e) = tx.unbounded_send(response) { + error_unknown!(?e; "backup failed to send response"; + "start_key" => &log_wrappers::Value::key(&msg.start_key), + "end_key" => &log_wrappers::Value::key(&msg.end_key),); + if e.is_disconnected() { + return; + } + } + } +} + +async fn upload_sst_file( + ssts_d: &Vec>, + ssts_w: &Vec>, + d_progress_l: &mut usize, d_progress_f: &mut usize, + w_progress_l: &mut usize, w_progress_f: &mut usize, + storage: Arc, +) -> std::io::Result<()> { + upload_sst_file_internal(ssts_d, d_progress_l, d_progress_f, storage.clone()).await?; + upload_sst_file_internal(ssts_w, w_progress_l, w_progress_f, storage).await +} + +async fn upload_sst_file_internal( + ssts: &Vec>, + progress_l: &mut usize, + progress_f: &mut usize, + storage: Arc, +) -> std::io::Result<()> { + for fs in ssts { + for (file_name, _) in fs { + let file = tokio::fs::File::open(file_name).await?; + let length = file.metadata().await?.len(); + let reader = UnpinReader(Box::new(file.compat())); + storage.write(file_name, reader, length).await?; + *progress_f += 1; + } + *progress_l += 1; + } + Ok(()) +} + impl BackupRange { /// Get entries from the scanner and save them to storage async fn backup( @@ -1118,6 +1213,36 @@ impl Endpoint { let backend = Arc::::from(backend); let concurrency = self.config_manager.0.read().unwrap().num_threads; self.pool.borrow_mut().adjust_with(concurrency); + + match request.mode { + BackupMode::Scan => self.handle_scan_backup( + concurrency, + prs, + backend, + codec, + request, + resp, + ), + BackupMode::File => self.handle_file_backup( + concurrency, + prs, + backend, + request, + resp, + ), + _ => error!("unknown backup mode"; "mode" => ?request.mode), + }; + } + + fn handle_scan_backup( + &self, + concurrency: usize, + prs: Arc>>, + backend: Arc, + codec: KeyValueCodec, + request: Request, + resp: UnboundedSender, + ) { let (tx, rx) = async_channel::bounded(1); for _ in 0..concurrency { self.spawn_backup_worker( @@ -1136,6 +1261,74 @@ impl Endpoint { } } + fn handle_file_backup( + &self, + concurrency: usize, + prs: Arc>>, + backend: Arc, + request: Request, + resp_tx: UnboundedSender, + ) { + let (tx, rx) = async_channel::bounded(1); + for _ in 0..concurrency { + self.io_pool.spawn(save_sst_file_worker( + self.segment_manager.clone(), + rx.clone(), + resp_tx.clone(), + backend.clone(), + )); + } + + let mut segment_manager = self.segment_manager.clone(); + let id = request.ssts_id; + let batch_size = self.config_manager.0.read().unwrap().batch_size; + self.pool.borrow_mut().spawn(async move { + loop { + let batch = { + let progress: &mut Progress<_> = &mut prs.lock().unwrap(); + let batch = progress.forward(batch_size, request.replica_read); + if batch.is_empty() { + return; + } + batch + }; + + for brange in batch { + if request.cancel.load(Ordering::SeqCst) { + warn!("backup task has canceled"; "range" => ?brange); + return; + } + let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_raw().unwrap()); + let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_raw().unwrap()); + let (d_ssts, w_ssts) = segment_manager.find_ssts(&id, &start_key, &end_key); + if let Err(err) = tx.send(SstSendInfo { + file_names_d: d_ssts.clone(), + file_names_w: w_ssts.clone(), + start_key, + end_key, + }).await { + error_unknown!(%err; "error during backup"); + segment_manager.release_index( + d_ssts, + usize::MAX, + usize::MAX, + w_ssts, + usize::MAX, + usize::MAX, + ); + let mut resp = BackupResponse::new(); + let err = Error::from(err); + resp.set_error(err.into()); + if let Err(err) = resp_tx.unbounded_send(resp) { + warn!("failed to send response"; "err" => ?err) + } + } + } + } + + }) + } + pub fn prepare(&mut self, _persistence: bool, mut tx: Sender) { let checkpointer = self.engine.checkpointer().unwrap(); let default_metadata = checkpointer.column_family_meta_data(CF_DEFAULT).unwrap(); @@ -1152,7 +1345,7 @@ impl Endpoint { resp.set_collect_file_count((default_metadata.file_count + write_metadata.file_count) as u64); resp.set_collect_file_size((default_metadata.file_size + write_metadata.file_size) as u64); if let Err(e) = tx.try_send(resp) { - error_unknown!(?e; "failed to send response"); + error_unknown!(?e; "[prepare] failed to send response"); } } } @@ -1610,6 +1803,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: false, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; @@ -1719,6 +1914,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: false, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; @@ -1748,6 +1945,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: true, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; @@ -1861,6 +2060,8 @@ pub mod tests { compression_level: 0, cipher: CipherInfo::default(), replica_read: false, + mode: BackupMode::Scan, + ssts_id: String::from("test"), }, resp: tx, }; diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs index 1a9bee2ca2c..953cceb0101 100644 --- a/components/backup/src/segment_manager.rs +++ b/components/backup/src/segment_manager.rs @@ -1,22 +1,133 @@ // Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. use engine_traits::SstFileInfo; -use std::collections::{HashMap, BTreeMap}; -use txn_types::Key; +use std::{collections::{HashMap, BTreeMap}, sync::{Arc, RwLock, Mutex}}; -type SegmentMap = Vec>; +enum SstStatus { + NotUpload, + Uploading, + Uploaded +} -pub struct SegmentMapManager (HashMap); +type SegmentMap = Vec, SstFileInfo>>; + +#[derive(Clone)] +pub struct SegmentMapManager { + map: Arc>>, + // TODO: directly update the uploaded flag in hashmap + index_d: Arc>>>, + index_w: Arc>>>, +} impl SegmentMapManager { pub fn new() -> Self { - Self (HashMap::new()) + Self { + map: Arc::new(RwLock::new(HashMap::new())), + index_d: Arc::new(Mutex::new(Vec::new())), + index_w: Arc::new(Mutex::new(Vec::new())), + } } pub fn register(&mut self, d: SegmentMap, w: SegmentMap) -> String { let id = uuid::Uuid::new_v4().to_string(); - self.0.insert(id.clone(), (d, w)); + { + let mut map = self.map.write().unwrap(); + map.insert(id.clone(), (d, w)); + } + self.generate_index(&id); id } -} \ No newline at end of file + fn generate_index(&mut self, id: &str) { + let mut map = self.map.write().unwrap(); + let map = map.get_mut(id).unwrap(); + { + let mut index_d = self.index_d.lock().unwrap(); + generate_index_internal(&mut map.0, &mut index_d); + } + { + let mut index_w = self.index_w.lock().unwrap(); + generate_index_internal(&mut map.1, &mut index_w); + } + } + + pub fn find_ssts(&mut self, id: &str, start_key: &Vec, end_key: &Vec) -> (Vec>, Vec>) { + let m = self.map.read().unwrap(); + let map = m.get(id).unwrap(); + let d = { + let mut index_d = self.index_d.lock().unwrap(); + find_ssts_internal(&mut index_d, &map.0, start_key, end_key) + }; + let w = { + let mut index_w = self.index_w.lock().unwrap(); + find_ssts_internal(&mut index_w, &map.1, start_key, end_key) + }; + (d, w) + } + + pub fn release_index( + &mut self, + d: Vec>, d_progress_l: usize, d_progress_f: usize, + w: Vec>, w_progress_l: usize, w_progress_f: usize, + ) { + { + let mut index_d = self.index_d.lock().unwrap(); + release_index_internal(&mut index_d, d, d_progress_l, d_progress_f); + } + { + let mut index_w = self.index_w.lock().unwrap(); + release_index_internal(&mut index_w, w, w_progress_l, w_progress_f); + } + } +} + +fn generate_index_internal(map: &mut SegmentMap, index: &mut Vec>) { + for tree in map { + let mut lvl_idx = Vec::new(); + for (idx, (_, info)) in tree.iter_mut().enumerate() { + info.idx = idx; + lvl_idx.push(SstStatus::NotUpload); + } + + index.push(lvl_idx); + } +} + +fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: &Vec, end_key: &Vec) -> Vec> { + let mut res = Vec::new(); + for (level, tree) in map.iter().enumerate() { + let lvl_index = &mut index[level]; + let mut fs = Vec::new(); + for f in tree.iter().filter(|info| { + let idx = info.1.idx; + if matches!(lvl_index[idx], SstStatus::NotUpload) { + return false + } + + + let sk = info.0; + let ek = &info.1.end_key; + if (end_key.is_empty() || sk < end_key) && (ek.is_empty() || ek > start_key) { + lvl_index[idx] = SstStatus::Uploading; + return true + } + false + }) { + fs.push((f.1.file_name.clone(), f.1.idx)); + } + res.push(fs); + } + res +} + +fn release_index_internal(index: &mut [Vec], findex: Vec>, progress_l: usize, progress_f: usize) { + for (level, sst_index) in findex.iter().enumerate() { + for (_, idx) in sst_index { + index[level][*idx] = if level < progress_l || (level == progress_l && *idx <= progress_f) { + SstStatus::Uploaded + } else { + SstStatus::NotUpload + }; + } + } +} diff --git a/components/engine_rocks/src/checkpoint.rs b/components/engine_rocks/src/checkpoint.rs index 35d29cc6c73..baec1de1040 100644 --- a/components/engine_rocks/src/checkpoint.rs +++ b/components/engine_rocks/src/checkpoint.rs @@ -4,7 +4,6 @@ use std::{path::Path, collections::BTreeMap, sync::Arc}; use engine_traits::{Checkpointable, Checkpointer, Result, SstFileInfo, CfName, ColumnFamilyMetadata}; use rocksdb::DB; -use txn_types::Key; use crate::{r2e, RocksEngine, util}; @@ -62,10 +61,11 @@ impl Checkpointer for RocksEngineCheckpointer { file_count += files.len(); for file in files { file_size += file.get_size(); - let start_key = Key::from_raw(file.get_smallestkey()); + let start_key = file.get_smallestkey().to_vec(); ssts.insert(start_key, SstFileInfo{ file_name: file.get_name(), - end_key: Key::from_raw(file.get_largestkey()), + end_key: file.get_largestkey().to_vec(), + idx: 0, }); }; lssts.push(ssts); diff --git a/components/engine_traits/src/checkpoint.rs b/components/engine_traits/src/checkpoint.rs index c2bf53ad4d7..ff243827977 100644 --- a/components/engine_traits/src/checkpoint.rs +++ b/components/engine_traits/src/checkpoint.rs @@ -2,8 +2,6 @@ use std::{path::Path, collections::BTreeMap, fmt::Debug}; -use txn_types::Key; - use crate::{Result, CfName}; pub trait Checkpointable { @@ -30,13 +28,14 @@ pub trait Checkpointer { pub struct SstFileInfo { pub file_name: String, //pub start_key: Key, - pub end_key: Key, + pub end_key: Vec, + pub idx: usize, } pub struct ColumnFamilyMetadata { pub file_count: usize, pub file_size: usize, - pub ssts: Vec>, + pub ssts: Vec, SstFileInfo>>, } impl Debug for ColumnFamilyMetadata { @@ -47,11 +46,14 @@ impl Debug for ColumnFamilyMetadata { for (level, ssts) in self.ssts.iter().enumerate() { let mut ss = String::new(); - for SstFileInfo{file_name, ..} in ssts { + for SstFileInfo{file_name, ..} in ssts.values() { let str = format!("name: {file_name}"); ss = ss + &str } binding.field(&format!("level: {level}"), &ss); + for sst in ssts { + binding.field("sk", sst.0); + } } binding.finish() diff --git a/components/test_backup/src/lib.rs b/components/test_backup/src/lib.rs index 34eb6e8aa9e..c0fad173b13 100644 --- a/components/test_backup/src/lib.rs +++ b/components/test_backup/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use api_version::{dispatch_api_version, keyspace::KvPair, ApiV1, KvFormat, RawValue}; -use backup::Task; +use backup::{Task, Operation}; use collections::HashMap; use engine_traits::{CfName, IterOptions, CF_DEFAULT, CF_WRITE, DATA_KEY_PREFIX_LEN}; use external_storage_export::make_local_backend; @@ -40,7 +40,7 @@ use txn_types::TimeStamp; pub struct TestSuite { pub cluster: Cluster, - pub endpoints: HashMap>, + pub endpoints: HashMap>, pub tikv_cli: TikvClient, pub context: Context, pub ts: TimeStamp, @@ -301,7 +301,7 @@ impl TestSuite { let (tx, rx) = future_mpsc::unbounded(); for end in self.endpoints.values() { let (task, _) = Task::new(req.clone(), tx.clone()).unwrap(); - end.scheduler().schedule(task).unwrap(); + end.scheduler().schedule(Operation::BackupTask(task)).unwrap(); } rx } @@ -324,7 +324,7 @@ impl TestSuite { let (tx, rx) = future_mpsc::unbounded(); for end in self.endpoints.values() { let (task, _) = Task::new(req.clone(), tx.clone()).unwrap(); - end.scheduler().schedule(task).unwrap(); + end.scheduler().schedule(Operation::BackupTask(task)).unwrap(); } rx } From aeb2845530c900c9579799a5d582fce16190eeb5 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 19 Jun 2023 13:39:45 +0800 Subject: [PATCH 4/9] fix some bugs Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 60 +++++++++++---- components/backup/src/segment_manager.rs | 92 ++++++++++++----------- components/engine_rocks/src/checkpoint.rs | 6 +- components/server/src/server.rs | 1 + components/server/src/server2.rs | 1 + components/test_backup/src/lib.rs | 1 + 6 files changed, 100 insertions(+), 61 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 9d8fd161ad9..f64d4c3ec19 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -4,6 +4,7 @@ use std::{ borrow::Cow, cell::RefCell, fmt, + path::Path, sync::{atomic::*, mpsc, Arc, Mutex, RwLock}, time::{SystemTime, UNIX_EPOCH}, }; @@ -24,7 +25,7 @@ use kvproto::{ use online_config::OnlineConfig; use raft::StateRole; use raftstore::coprocessor::RegionInfoProvider; -use segment_manager::SegmentMapManager; +use segment_manager::{SegmentMapManager, SegmentMapRouter}; use tikv::{ config::BackupConfig, storage::{ @@ -314,11 +315,13 @@ struct SstSendInfo { } async fn save_sst_file_worker ( - mut segment_manager: SegmentMapManager, + segment_manager: Arc, + data_dir: String, rx: async_channel::Receiver, tx: UnboundedSender, storage: Arc, ) { + let dir = Path::new(&data_dir); while let Ok(msg) = rx.recv().await { let mut response = BackupResponse::default(); let mut d_progress_l = 0; @@ -326,6 +329,7 @@ async fn save_sst_file_worker ( let mut w_progress_l = 0; let mut w_progress_f = 0; match upload_sst_file( + dir, &msg.file_names_d, &msg.file_names_w, &mut d_progress_l, &mut d_progress_f, @@ -365,28 +369,35 @@ async fn save_sst_file_worker ( } async fn upload_sst_file( + data_dir: &Path, ssts_d: &Vec>, ssts_w: &Vec>, d_progress_l: &mut usize, d_progress_f: &mut usize, w_progress_l: &mut usize, w_progress_f: &mut usize, storage: Arc, ) -> std::io::Result<()> { - upload_sst_file_internal(ssts_d, d_progress_l, d_progress_f, storage.clone()).await?; - upload_sst_file_internal(ssts_w, w_progress_l, w_progress_f, storage).await + upload_sst_file_internal(data_dir, ssts_d, d_progress_l, d_progress_f, storage.clone()).await?; + upload_sst_file_internal(data_dir, ssts_w, w_progress_l, w_progress_f, storage).await } async fn upload_sst_file_internal( + data_dir: &Path, ssts: &Vec>, progress_l: &mut usize, progress_f: &mut usize, storage: Arc, ) -> std::io::Result<()> { for fs in ssts { - for (file_name, _) in fs { + for (file_name_relative, _) in fs { + let file_name_relative = match file_name_relative.strip_prefix('/') { + Some(s) => s, + None => file_name_relative, + }; + let file_name = data_dir.join(file_name_relative); let file = tokio::fs::File::open(file_name).await?; let length = file.metadata().await?.len(); let reader = UnpinReader(Box::new(file.compat())); - storage.write(file_name, reader, length).await?; + storage.write(file_name_relative, reader, length).await?; *progress_f += 1; } *progress_l += 1; @@ -786,9 +797,10 @@ pub struct Endpoint { tablets: LocalTablets, config_manager: ConfigManager, concurrency_manager: ConcurrencyManager, - segment_manager: SegmentMapManager, + segment_router: SegmentMapRouter, softlimit: SoftLimitKeeper, api_version: ApiVersion, + data_dir: String, causal_ts_provider: Option>, // used in rawkv apiv2 only pub(crate) engine: E, @@ -943,6 +955,7 @@ impl Endpoint { config: BackupConfig, concurrency_manager: ConcurrencyManager, api_version: ApiVersion, + data_dir: String, causal_ts_provider: Option>, ) -> Endpoint { let pool = ControlThreadPool::new(); @@ -950,7 +963,7 @@ impl Endpoint { let config_manager = ConfigManager(Arc::new(RwLock::new(config))); let softlimit = SoftLimitKeeper::new(config_manager.clone()); rt.spawn(softlimit.clone().run()); - let segment_manager = SegmentMapManager::new(); + let segment_router = SegmentMapRouter::new(); Endpoint { store_id, engine, @@ -961,8 +974,9 @@ impl Endpoint { softlimit, config_manager, concurrency_manager, - segment_manager, + segment_router, api_version, + data_dir, causal_ts_provider, } } @@ -1269,18 +1283,35 @@ impl Endpoint { request: Request, resp_tx: UnboundedSender, ) { + let id = request.ssts_id; + let segment_manager = match self.segment_router.route(&id) { + Some(manager) => manager, + None => { + let mut resp = BackupResponse::new(); + let err_msg = format!( + "ssts are not found, unique id: {:?}", + id + ); + resp.set_error(crate::Error::Other(box_err!(err_msg)).into()); + if let Err(err) = resp_tx.unbounded_send(resp) { + warn!("failed to send response"; "err" => ?err) + } + return; + }, + }; + let (tx, rx) = async_channel::bounded(1); for _ in 0..concurrency { self.io_pool.spawn(save_sst_file_worker( - self.segment_manager.clone(), + segment_manager.clone(), + self.data_dir.clone(), rx.clone(), resp_tx.clone(), backend.clone(), )); } - let mut segment_manager = self.segment_manager.clone(); - let id = request.ssts_id; + let batch_size = self.config_manager.0.read().unwrap().batch_size; self.pool.borrow_mut().spawn(async move { loop { @@ -1300,7 +1331,7 @@ impl Endpoint { } let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_raw().unwrap()); let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_raw().unwrap()); - let (d_ssts, w_ssts) = segment_manager.find_ssts(&id, &start_key, &end_key); + let (d_ssts, w_ssts) = segment_manager.find_ssts(&start_key, &end_key); if let Err(err) = tx.send(SstSendInfo { file_names_d: d_ssts.clone(), file_names_w: w_ssts.clone(), @@ -1339,7 +1370,7 @@ impl Endpoint { info!("{}", s1); info!("{}", s2); - let id = self.segment_manager.register(default_metadata.ssts, write_metadata.ssts); + let id = self.segment_router.register(default_metadata.ssts, write_metadata.ssts); let mut resp = PrepareResponse::new(); resp.set_unique_id(id); resp.set_collect_file_count((default_metadata.file_count + write_metadata.file_count) as u64); @@ -1658,6 +1689,7 @@ pub mod tests { }, concurrency_manager, api_version, + String::from("test"), causal_ts_provider, ), ) diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs index 953cceb0101..c25a06e0a37 100644 --- a/components/backup/src/segment_manager.rs +++ b/components/backup/src/segment_manager.rs @@ -1,7 +1,7 @@ // Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. use engine_traits::SstFileInfo; -use std::{collections::{HashMap, BTreeMap}, sync::{Arc, RwLock, Mutex}}; +use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}}; enum SstStatus { NotUpload, @@ -10,63 +10,77 @@ enum SstStatus { } type SegmentMap = Vec, SstFileInfo>>; +pub struct SegmentMapRouter(HashMap>); + +impl SegmentMapRouter { + pub fn new() -> Self { + Self(HashMap::new()) + } + + pub fn register(&mut self, d: SegmentMap, w: SegmentMap) -> String { + let (id, manager) = SegmentMapManager::register(d, w); + self.0.insert(id.clone(), Arc::new(manager)); + id + } + + pub fn route(&self, id: &str) -> Option> { + self.0.get(id).cloned() + } +} -#[derive(Clone)] pub struct SegmentMapManager { - map: Arc>>, + map: (SegmentMap, SegmentMap), // TODO: directly update the uploaded flag in hashmap - index_d: Arc>>>, - index_w: Arc>>>, + index_d: Mutex>>, + index_w: Mutex>>, } impl SegmentMapManager { - pub fn new() -> Self { + fn new(mut d: SegmentMap, mut w: SegmentMap) -> Self { + let index_d_raw = Self::generate_index(&mut d); + let index_w_raw = Self::generate_index(&mut w); Self { - map: Arc::new(RwLock::new(HashMap::new())), - index_d: Arc::new(Mutex::new(Vec::new())), - index_w: Arc::new(Mutex::new(Vec::new())), + map: (d, w), + + index_d: Mutex::new(index_d_raw), + index_w: Mutex::new(index_w_raw), } } - pub fn register(&mut self, d: SegmentMap, w: SegmentMap) -> String { + pub fn register(d: SegmentMap, w: SegmentMap) -> (String, Self) { let id = uuid::Uuid::new_v4().to_string(); - { - let mut map = self.map.write().unwrap(); - map.insert(id.clone(), (d, w)); - } - self.generate_index(&id); - id + + (id, Self::new(d, w)) } - fn generate_index(&mut self, id: &str) { - let mut map = self.map.write().unwrap(); - let map = map.get_mut(id).unwrap(); - { - let mut index_d = self.index_d.lock().unwrap(); - generate_index_internal(&mut map.0, &mut index_d); - } - { - let mut index_w = self.index_w.lock().unwrap(); - generate_index_internal(&mut map.1, &mut index_w); + fn generate_index(map: &mut SegmentMap) -> Vec> { + let mut index = Vec::new(); + for tree in map { + let mut lvl_idx = Vec::new(); + for (idx, (_, info)) in tree.iter_mut().enumerate() { + info.idx = idx; + lvl_idx.push(SstStatus::NotUpload); + } + + index.push(lvl_idx); } + index } - pub fn find_ssts(&mut self, id: &str, start_key: &Vec, end_key: &Vec) -> (Vec>, Vec>) { - let m = self.map.read().unwrap(); - let map = m.get(id).unwrap(); + pub fn find_ssts(&self, start_key: &Vec, end_key: &Vec) -> (Vec>, Vec>) { let d = { let mut index_d = self.index_d.lock().unwrap(); - find_ssts_internal(&mut index_d, &map.0, start_key, end_key) + find_ssts_internal(&mut index_d, &self.map.0, start_key, end_key) }; let w = { let mut index_w = self.index_w.lock().unwrap(); - find_ssts_internal(&mut index_w, &map.1, start_key, end_key) + find_ssts_internal(&mut index_w, &self.map.1, start_key, end_key) }; (d, w) } pub fn release_index( - &mut self, + &self, d: Vec>, d_progress_l: usize, d_progress_f: usize, w: Vec>, w_progress_l: usize, w_progress_f: usize, ) { @@ -81,18 +95,6 @@ impl SegmentMapManager { } } -fn generate_index_internal(map: &mut SegmentMap, index: &mut Vec>) { - for tree in map { - let mut lvl_idx = Vec::new(); - for (idx, (_, info)) in tree.iter_mut().enumerate() { - info.idx = idx; - lvl_idx.push(SstStatus::NotUpload); - } - - index.push(lvl_idx); - } -} - fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: &Vec, end_key: &Vec) -> Vec> { let mut res = Vec::new(); for (level, tree) in map.iter().enumerate() { @@ -100,7 +102,7 @@ fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: let mut fs = Vec::new(); for f in tree.iter().filter(|info| { let idx = info.1.idx; - if matches!(lvl_index[idx], SstStatus::NotUpload) { + if !matches!(lvl_index[idx], SstStatus::NotUpload) { return false } diff --git a/components/engine_rocks/src/checkpoint.rs b/components/engine_rocks/src/checkpoint.rs index baec1de1040..1c9f741891c 100644 --- a/components/engine_rocks/src/checkpoint.rs +++ b/components/engine_rocks/src/checkpoint.rs @@ -4,6 +4,7 @@ use std::{path::Path, collections::BTreeMap, sync::Arc}; use engine_traits::{Checkpointable, Checkpointer, Result, SstFileInfo, CfName, ColumnFamilyMetadata}; use rocksdb::DB; +use keys::origin_key; use crate::{r2e, RocksEngine, util}; @@ -61,10 +62,11 @@ impl Checkpointer for RocksEngineCheckpointer { file_count += files.len(); for file in files { file_size += file.get_size(); - let start_key = file.get_smallestkey().to_vec(); + let start_key = origin_key(file.get_smallestkey()).to_vec(); + let end_key = origin_key(file.get_largestkey()).to_vec(); ssts.insert(start_key, SstFileInfo{ file_name: file.get_name(), - end_key: file.get_largestkey().to_vec(), + end_key, idx: 0, }); }; diff --git a/components/server/src/server.rs b/components/server/src/server.rs index db46b45b1ce..9e89c3f0ff3 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -1115,6 +1115,7 @@ where self.core.config.backup.clone(), self.concurrency_manager.clone(), self.core.config.storage.api_version(), + self.core.config.storage.data_dir.clone(), self.causal_ts_provider.clone(), ); self.cfg_controller.as_mut().unwrap().register( diff --git a/components/server/src/server2.rs b/components/server/src/server2.rs index da970a7e749..14ea66f12f8 100644 --- a/components/server/src/server2.rs +++ b/components/server/src/server2.rs @@ -868,6 +868,7 @@ where self.core.config.backup.clone(), self.concurrency_manager.clone(), self.core.config.storage.api_version(), + self.core.config.storage.data_dir.clone(), self.causal_ts_provider.clone(), ); self.cfg_controller.as_mut().unwrap().register( diff --git a/components/test_backup/src/lib.rs b/components/test_backup/src/lib.rs index c0fad173b13..d8a19c91efb 100644 --- a/components/test_backup/src/lib.rs +++ b/components/test_backup/src/lib.rs @@ -94,6 +94,7 @@ impl TestSuite { }, sim.get_concurrency_manager(*id), api_version, + String::from("test"), None, ); let mut worker = bg_worker.lazy_build(format!("backup-{}", id)); From eeb0dec0ab5430372a42b83604a4f7f7559ab398 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 19 Jun 2023 14:04:31 +0800 Subject: [PATCH 5/9] fix some bugs Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index f64d4c3ec19..18508e0071f 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -4,7 +4,7 @@ use std::{ borrow::Cow, cell::RefCell, fmt, - path::Path, + path::{Path, PathBuf}, sync::{atomic::*, mpsc, Arc, Mutex, RwLock}, time::{SystemTime, UNIX_EPOCH}, }; @@ -316,12 +316,11 @@ struct SstSendInfo { async fn save_sst_file_worker ( segment_manager: Arc, - data_dir: String, + data_dir: PathBuf, rx: async_channel::Receiver, tx: UnboundedSender, storage: Arc, ) { - let dir = Path::new(&data_dir); while let Ok(msg) = rx.recv().await { let mut response = BackupResponse::default(); let mut d_progress_l = 0; @@ -329,7 +328,7 @@ async fn save_sst_file_worker ( let mut w_progress_l = 0; let mut w_progress_f = 0; match upload_sst_file( - dir, + &data_dir, &msg.file_names_d, &msg.file_names_w, &mut d_progress_l, &mut d_progress_f, @@ -368,20 +367,20 @@ async fn save_sst_file_worker ( } } -async fn upload_sst_file( - data_dir: &Path, +async fn upload_sst_file>( + data_dir: P, ssts_d: &Vec>, ssts_w: &Vec>, d_progress_l: &mut usize, d_progress_f: &mut usize, w_progress_l: &mut usize, w_progress_f: &mut usize, storage: Arc, ) -> std::io::Result<()> { - upload_sst_file_internal(data_dir, ssts_d, d_progress_l, d_progress_f, storage.clone()).await?; - upload_sst_file_internal(data_dir, ssts_w, w_progress_l, w_progress_f, storage).await + upload_sst_file_internal(data_dir.as_ref(), ssts_d, d_progress_l, d_progress_f, storage.clone()).await?; + upload_sst_file_internal(data_dir.as_ref(), ssts_w, w_progress_l, w_progress_f, storage).await } -async fn upload_sst_file_internal( - data_dir: &Path, +async fn upload_sst_file_internal>( + data_dir: P, ssts: &Vec>, progress_l: &mut usize, progress_f: &mut usize, @@ -393,7 +392,7 @@ async fn upload_sst_file_internal( Some(s) => s, None => file_name_relative, }; - let file_name = data_dir.join(file_name_relative); + let file_name = data_dir.as_ref().join(file_name_relative); let file = tokio::fs::File::open(file_name).await?; let length = file.metadata().await?.len(); let reader = UnpinReader(Box::new(file.compat())); @@ -800,7 +799,7 @@ pub struct Endpoint { segment_router: SegmentMapRouter, softlimit: SoftLimitKeeper, api_version: ApiVersion, - data_dir: String, + data_dir: PathBuf, causal_ts_provider: Option>, // used in rawkv apiv2 only pub(crate) engine: E, @@ -947,7 +946,7 @@ impl Progress { } impl Endpoint { - pub fn new( + pub fn new>( store_id: u64, engine: E, region_info: R, @@ -955,7 +954,7 @@ impl Endpoint { config: BackupConfig, concurrency_manager: ConcurrencyManager, api_version: ApiVersion, - data_dir: String, + root_dir: P, causal_ts_provider: Option>, ) -> Endpoint { let pool = ControlThreadPool::new(); @@ -964,6 +963,7 @@ impl Endpoint { let softlimit = SoftLimitKeeper::new(config_manager.clone()); rt.spawn(softlimit.clone().run()); let segment_router = SegmentMapRouter::new(); + let data_dir = root_dir.as_ref().join("db"); Endpoint { store_id, engine, From 2d24085799c24a9712668410d1a3dfb1842b0b3a Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 19 Jun 2023 19:17:26 +0800 Subject: [PATCH 6/9] fix some bugs Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 4 +-- components/backup/src/segment_manager.rs | 42 ++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 18508e0071f..553168974a7 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -1329,8 +1329,8 @@ impl Endpoint { warn!("backup task has canceled"; "range" => ?brange); return; } - let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_raw().unwrap()); - let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_raw().unwrap()); + let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_encoded()); + let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_encoded()); let (d_ssts, w_ssts) = segment_manager.find_ssts(&start_key, &end_key); if let Err(err) = tx.send(SstSendInfo { file_names_d: d_ssts.clone(), diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs index c25a06e0a37..4b55bd5f95d 100644 --- a/components/backup/src/segment_manager.rs +++ b/components/backup/src/segment_manager.rs @@ -133,3 +133,45 @@ fn release_index_internal(index: &mut [Vec], findex: Vec SegmentMap { + let mut map = vec![ + BTreeMap::new(), + BTreeMap::new(), + BTreeMap::new(), + ]; + for (i, m) in map.iter_mut().enumerate() { + for j in 0..3 { + let sk = format!("{i}_{j}").into_bytes(); + let ek = format!("{}_{}", i, j+1).into_bytes(); + m.insert(sk, SstFileInfo { + end_key: ek, + file_name: String::from("/asdfg.sst"), + idx: 0, + }); + } + } + + map + } +} \ No newline at end of file From 15918c2708aa4875eedb22e6ba72890829169830 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 20 Jun 2023 13:26:55 +0800 Subject: [PATCH 7/9] response the raw key Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 24 ++++++++++++++++++++---- components/backup/src/segment_manager.rs | 17 ++++++++++------- components/backup/src/utils.rs | 7 +++++++ 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 553168974a7..012c201cc56 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -50,7 +50,7 @@ use txn_types::{Key, Lock, TimeStamp}; use crate::{ metrics::*, softlimit::{CpuStatistics, SoftLimit, SoftLimitByCpu}, - utils::{ControlThreadPool, KeyValueCodec}, + utils::{ControlThreadPool, KeyValueCodec, convert_encoded_key_to_raw_key}, writer::{BackupWriterBuilder, CfNameWrap}, Error, *, }; @@ -312,6 +312,7 @@ struct SstSendInfo { file_names_w: Vec>, start_key: Vec, end_key: Vec, + ssts_cnt: usize, } async fn save_sst_file_worker ( @@ -353,8 +354,10 @@ async fn save_sst_file_worker ( msg.file_names_w, w_progress_l, w_progress_f, ); - response.set_start_key(msg.start_key.clone()); - response.set_end_key(msg.end_key.clone()); + let raw_start_key = convert_encoded_key_to_raw_key(msg.start_key.clone()); + let raw_end_key = convert_encoded_key_to_raw_key(msg.end_key.clone()); + response.set_start_key(raw_start_key); + response.set_end_key(raw_end_key); // todo: send the count. if let Err(e) = tx.unbounded_send(response) { error_unknown!(?e; "backup failed to send response"; @@ -1331,12 +1334,25 @@ impl Endpoint { } let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_encoded()); let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_encoded()); - let (d_ssts, w_ssts) = segment_manager.find_ssts(&start_key, &end_key); + let (d_ssts, w_ssts, ssts_cnt) = segment_manager.find_ssts(&start_key, &end_key); + info!("select {} ssts", ssts_cnt); + if ssts_cnt == 0 { + let mut resp = BackupResponse::new(); + let raw_start_key = convert_encoded_key_to_raw_key(start_key); + let raw_end_key = convert_encoded_key_to_raw_key(end_key); + resp.set_start_key(raw_start_key); + resp.set_end_key(raw_end_key); + if let Err(err) = resp_tx.unbounded_send(resp) { + warn!("failed to send response"; "err" => ?err) + } + continue; + } if let Err(err) = tx.send(SstSendInfo { file_names_d: d_ssts.clone(), file_names_w: w_ssts.clone(), start_key, end_key, + ssts_cnt, }).await { error_unknown!(%err; "error during backup"); segment_manager.release_index( diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs index 4b55bd5f95d..bc7dc403558 100644 --- a/components/backup/src/segment_manager.rs +++ b/components/backup/src/segment_manager.rs @@ -67,16 +67,16 @@ impl SegmentMapManager { index } - pub fn find_ssts(&self, start_key: &Vec, end_key: &Vec) -> (Vec>, Vec>) { - let d = { + pub fn find_ssts(&self, start_key: &Vec, end_key: &Vec) -> (Vec>, Vec>, usize) { + let (d, d_cnt) = { let mut index_d = self.index_d.lock().unwrap(); find_ssts_internal(&mut index_d, &self.map.0, start_key, end_key) }; - let w = { + let (w, w_cnt) = { let mut index_w = self.index_w.lock().unwrap(); find_ssts_internal(&mut index_w, &self.map.1, start_key, end_key) }; - (d, w) + (d, w, d_cnt + w_cnt) } pub fn release_index( @@ -95,8 +95,9 @@ impl SegmentMapManager { } } -fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: &Vec, end_key: &Vec) -> Vec> { +fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: &Vec, end_key: &Vec) -> (Vec>, usize) { let mut res = Vec::new(); + let mut count = 0; for (level, tree) in map.iter().enumerate() { let lvl_index = &mut index[level]; let mut fs = Vec::new(); @@ -115,11 +116,12 @@ fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: } false }) { + count += 1; fs.push((f.1.file_name.clone(), f.1.idx)); } res.push(fs); } - res + (res, count) } fn release_index_internal(index: &mut [Vec], findex: Vec>, progress_l: usize, progress_f: usize) { @@ -150,8 +152,9 @@ mod tests { println!("{id}"); let sk = "1_1".as_bytes().to_vec(); let ek = "2_1".as_bytes().to_vec(); - let (d, _) = manager.find_ssts(&sk, &ek); + let (d, _, cnt) = manager.find_ssts(&sk, &ek); println!("{:?}", d); + assert!(cnt > 0); } fn generate_segment_map() -> SegmentMap { diff --git a/components/backup/src/utils.rs b/components/backup/src/utils.rs index 41af72e83d3..65334b5813d 100644 --- a/components/backup/src/utils.rs +++ b/components/backup/src/utils.rs @@ -265,6 +265,13 @@ impl KeyValueCodec { } } +pub fn convert_encoded_key_to_raw_key(ek: Vec) -> Vec { + if ek.is_empty() { + return ek + } + Key::from_encoded(ek).into_raw().unwrap() +} + #[cfg(test)] pub mod tests { use api_version::{KvFormat, RawValue}; From 2c0416b077cbdad3c80bd0df70cd139b33b6b516 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 20 Jun 2023 17:29:47 +0800 Subject: [PATCH 8/9] fmt Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 116 ++++++++++++--------- components/backup/src/lib.rs | 2 +- components/backup/src/segment_manager.rs | 82 +++++++++------ components/backup/src/utils.rs | 2 +- components/engine_rocks/src/checkpoint.rs | 54 ++++++---- components/engine_traits/src/checkpoint.rs | 10 +- components/test_backup/src/lib.rs | 10 +- components/tikv_kv/src/btree_engine.rs | 2 +- components/tikv_kv/src/lib.rs | 4 +- components/tikv_kv/src/rocksdb_engine.rs | 3 +- src/server/gc_worker/gc_worker.rs | 2 +- 11 files changed, 176 insertions(+), 111 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 012c201cc56..8049c59a88a 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -12,7 +12,10 @@ use std::{ use async_channel::SendError; use causal_ts::{CausalTsProvider, CausalTsProviderImpl}; use concurrency_manager::ConcurrencyManager; -use engine_traits::{name_to_cf, raw_ttl::ttl_current_ts, CfName, KvEngine, SstCompressionType, Checkpointer, CF_DEFAULT, CF_WRITE}; +use engine_traits::{ + name_to_cf, raw_ttl::ttl_current_ts, CfName, Checkpointer, KvEngine, SstCompressionType, + CF_DEFAULT, CF_WRITE, +}; use external_storage::{BackendConfig, HdfsConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; use futures::{channel::mpsc::*, executor::block_on}; @@ -50,7 +53,7 @@ use txn_types::{Key, Lock, TimeStamp}; use crate::{ metrics::*, softlimit::{CpuStatistics, SoftLimit, SoftLimitByCpu}, - utils::{ControlThreadPool, KeyValueCodec, convert_encoded_key_to_raw_key}, + utils::{convert_encoded_key_to_raw_key, ControlThreadPool, KeyValueCodec}, writer::{BackupWriterBuilder, CfNameWrap}, Error, *, }; @@ -315,7 +318,7 @@ struct SstSendInfo { ssts_cnt: usize, } -async fn save_sst_file_worker ( +async fn save_sst_file_worker( segment_manager: Arc, data_dir: PathBuf, rx: async_channel::Receiver, @@ -332,9 +335,14 @@ async fn save_sst_file_worker ( &data_dir, &msg.file_names_d, &msg.file_names_w, - &mut d_progress_l, &mut d_progress_f, - &mut w_progress_l, &mut w_progress_f, - storage.clone()).await { + &mut d_progress_l, + &mut d_progress_f, + &mut w_progress_l, + &mut w_progress_f, + storage.clone(), + ) + .await + { Err(e) => { error_unknown!(?e; "upload sst file failed"; "start_key" => &log_wrappers::Value::key(&msg.start_key), @@ -350,8 +358,12 @@ async fn save_sst_file_worker ( } segment_manager.release_index( - msg.file_names_d, d_progress_l, d_progress_f, - msg.file_names_w, w_progress_l, w_progress_f, + msg.file_names_d, + d_progress_l, + d_progress_f, + msg.file_names_w, + w_progress_l, + w_progress_f, ); let raw_start_key = convert_encoded_key_to_raw_key(msg.start_key.clone()); @@ -374,12 +386,28 @@ async fn upload_sst_file>( data_dir: P, ssts_d: &Vec>, ssts_w: &Vec>, - d_progress_l: &mut usize, d_progress_f: &mut usize, - w_progress_l: &mut usize, w_progress_f: &mut usize, + d_progress_l: &mut usize, + d_progress_f: &mut usize, + w_progress_l: &mut usize, + w_progress_f: &mut usize, storage: Arc, ) -> std::io::Result<()> { - upload_sst_file_internal(data_dir.as_ref(), ssts_d, d_progress_l, d_progress_f, storage.clone()).await?; - upload_sst_file_internal(data_dir.as_ref(), ssts_w, w_progress_l, w_progress_f, storage).await + upload_sst_file_internal( + data_dir.as_ref(), + ssts_d, + d_progress_l, + d_progress_f, + storage.clone(), + ) + .await?; + upload_sst_file_internal( + data_dir.as_ref(), + ssts_w, + w_progress_l, + w_progress_f, + storage, + ) + .await } async fn upload_sst_file_internal>( @@ -1230,23 +1258,12 @@ impl Endpoint { let backend = Arc::::from(backend); let concurrency = self.config_manager.0.read().unwrap().num_threads; self.pool.borrow_mut().adjust_with(concurrency); - + match request.mode { - BackupMode::Scan => self.handle_scan_backup( - concurrency, - prs, - backend, - codec, - request, - resp, - ), - BackupMode::File => self.handle_file_backup( - concurrency, - prs, - backend, - request, - resp, - ), + BackupMode::Scan => { + self.handle_scan_backup(concurrency, prs, backend, codec, request, resp) + } + BackupMode::File => self.handle_file_backup(concurrency, prs, backend, request, resp), _ => error!("unknown backup mode"; "mode" => ?request.mode), }; } @@ -1291,16 +1308,13 @@ impl Endpoint { Some(manager) => manager, None => { let mut resp = BackupResponse::new(); - let err_msg = format!( - "ssts are not found, unique id: {:?}", - id - ); + let err_msg = format!("ssts are not found, unique id: {:?}", id); resp.set_error(crate::Error::Other(box_err!(err_msg)).into()); - if let Err(err) = resp_tx.unbounded_send(resp) { + if let Err(err) = resp_tx.unbounded_send(resp) { warn!("failed to send response"; "err" => ?err) } return; - }, + } }; let (tx, rx) = async_channel::bounded(1); @@ -1314,7 +1328,6 @@ impl Endpoint { )); } - let batch_size = self.config_manager.0.read().unwrap().batch_size; self.pool.borrow_mut().spawn(async move { loop { @@ -1334,7 +1347,8 @@ impl Endpoint { } let start_key = brange.start_key.map_or_else(Vec::new, |k| k.into_encoded()); let end_key = brange.end_key.map_or_else(Vec::new, |k| k.into_encoded()); - let (d_ssts, w_ssts, ssts_cnt) = segment_manager.find_ssts(&start_key, &end_key); + let (d_ssts, w_ssts, ssts_cnt) = + segment_manager.find_ssts(&start_key, &end_key); info!("select {} ssts", ssts_cnt); if ssts_cnt == 0 { let mut resp = BackupResponse::new(); @@ -1342,18 +1356,21 @@ impl Endpoint { let raw_end_key = convert_encoded_key_to_raw_key(end_key); resp.set_start_key(raw_start_key); resp.set_end_key(raw_end_key); - if let Err(err) = resp_tx.unbounded_send(resp) { + if let Err(err) = resp_tx.unbounded_send(resp) { warn!("failed to send response"; "err" => ?err) } continue; } - if let Err(err) = tx.send(SstSendInfo { - file_names_d: d_ssts.clone(), - file_names_w: w_ssts.clone(), - start_key, - end_key, - ssts_cnt, - }).await { + if let Err(err) = tx + .send(SstSendInfo { + file_names_d: d_ssts.clone(), + file_names_w: w_ssts.clone(), + start_key, + end_key, + ssts_cnt, + }) + .await + { error_unknown!(%err; "error during backup"); segment_manager.release_index( d_ssts, @@ -1366,13 +1383,12 @@ impl Endpoint { let mut resp = BackupResponse::new(); let err = Error::from(err); resp.set_error(err.into()); - if let Err(err) = resp_tx.unbounded_send(resp) { + if let Err(err) = resp_tx.unbounded_send(resp) { warn!("failed to send response"; "err" => ?err) } } } } - }) } @@ -1386,10 +1402,14 @@ impl Endpoint { info!("{}", s1); info!("{}", s2); - let id = self.segment_router.register(default_metadata.ssts, write_metadata.ssts); + let id = self + .segment_router + .register(default_metadata.ssts, write_metadata.ssts); let mut resp = PrepareResponse::new(); resp.set_unique_id(id); - resp.set_collect_file_count((default_metadata.file_count + write_metadata.file_count) as u64); + resp.set_collect_file_count( + (default_metadata.file_count + write_metadata.file_count) as u64, + ); resp.set_collect_file_size((default_metadata.file_size + write_metadata.file_size) as u64); if let Err(e) = tx.try_send(resp) { error_unknown!(?e; "[prepare] failed to send response"); diff --git a/components/backup/src/lib.rs b/components/backup/src/lib.rs index abf371c281c..8bd02e26dd6 100644 --- a/components/backup/src/lib.rs +++ b/components/backup/src/lib.rs @@ -8,11 +8,11 @@ extern crate tikv_alloc; mod endpoint; mod errors; mod metrics; +mod segment_manager; mod service; mod softlimit; mod utils; mod writer; -mod segment_manager; pub use endpoint::{backup_file_name, Endpoint, Operation, Task}; pub use errors::{Error, Result}; diff --git a/components/backup/src/segment_manager.rs b/components/backup/src/segment_manager.rs index bc7dc403558..6b640538cdf 100644 --- a/components/backup/src/segment_manager.rs +++ b/components/backup/src/segment_manager.rs @@ -1,12 +1,16 @@ // Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0. +use std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, Mutex}, +}; + use engine_traits::SstFileInfo; -use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}}; enum SstStatus { NotUpload, Uploading, - Uploaded + Uploaded, } type SegmentMap = Vec, SstFileInfo>>; @@ -49,7 +53,7 @@ impl SegmentMapManager { pub fn register(d: SegmentMap, w: SegmentMap) -> (String, Self) { let id = uuid::Uuid::new_v4().to_string(); - + (id, Self::new(d, w)) } @@ -61,13 +65,17 @@ impl SegmentMapManager { info.idx = idx; lvl_idx.push(SstStatus::NotUpload); } - + index.push(lvl_idx); } index } - pub fn find_ssts(&self, start_key: &Vec, end_key: &Vec) -> (Vec>, Vec>, usize) { + pub fn find_ssts( + &self, + start_key: &Vec, + end_key: &Vec, + ) -> (Vec>, Vec>, usize) { let (d, d_cnt) = { let mut index_d = self.index_d.lock().unwrap(); find_ssts_internal(&mut index_d, &self.map.0, start_key, end_key) @@ -81,8 +89,12 @@ impl SegmentMapManager { pub fn release_index( &self, - d: Vec>, d_progress_l: usize, d_progress_f: usize, - w: Vec>, w_progress_l: usize, w_progress_f: usize, + d: Vec>, + d_progress_l: usize, + d_progress_f: usize, + w: Vec>, + w_progress_l: usize, + w_progress_f: usize, ) { { let mut index_d = self.index_d.lock().unwrap(); @@ -95,7 +107,12 @@ impl SegmentMapManager { } } -fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: &Vec, end_key: &Vec) -> (Vec>, usize) { +fn find_ssts_internal( + index: &mut [Vec], + map: &SegmentMap, + start_key: &Vec, + end_key: &Vec, +) -> (Vec>, usize) { let mut res = Vec::new(); let mut count = 0; for (level, tree) in map.iter().enumerate() { @@ -104,15 +121,14 @@ fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: for f in tree.iter().filter(|info| { let idx = info.1.idx; if !matches!(lvl_index[idx], SstStatus::NotUpload) { - return false + return false; } - let sk = info.0; let ek = &info.1.end_key; if (end_key.is_empty() || sk < end_key) && (ek.is_empty() || ek > start_key) { lvl_index[idx] = SstStatus::Uploading; - return true + return true; } false }) { @@ -124,14 +140,20 @@ fn find_ssts_internal(index: &mut [Vec], map: &SegmentMap, start_key: (res, count) } -fn release_index_internal(index: &mut [Vec], findex: Vec>, progress_l: usize, progress_f: usize) { +fn release_index_internal( + index: &mut [Vec], + findex: Vec>, + progress_l: usize, + progress_f: usize, +) { for (level, sst_index) in findex.iter().enumerate() { for (_, idx) in sst_index { - index[level][*idx] = if level < progress_l || (level == progress_l && *idx <= progress_f) { - SstStatus::Uploaded - } else { - SstStatus::NotUpload - }; + index[level][*idx] = + if level < progress_l || (level == progress_l && *idx <= progress_f) { + SstStatus::Uploaded + } else { + SstStatus::NotUpload + }; } } } @@ -148,7 +170,8 @@ mod tests { fn test() { use super::SegmentMapManager; - let (id, manager) = SegmentMapManager::register(generate_segment_map(), generate_segment_map()); + let (id, manager) = + SegmentMapManager::register(generate_segment_map(), generate_segment_map()); println!("{id}"); let sk = "1_1".as_bytes().to_vec(); let ek = "2_1".as_bytes().to_vec(); @@ -158,23 +181,22 @@ mod tests { } fn generate_segment_map() -> SegmentMap { - let mut map = vec![ - BTreeMap::new(), - BTreeMap::new(), - BTreeMap::new(), - ]; + let mut map = vec![BTreeMap::new(), BTreeMap::new(), BTreeMap::new()]; for (i, m) in map.iter_mut().enumerate() { for j in 0..3 { let sk = format!("{i}_{j}").into_bytes(); - let ek = format!("{}_{}", i, j+1).into_bytes(); - m.insert(sk, SstFileInfo { - end_key: ek, - file_name: String::from("/asdfg.sst"), - idx: 0, - }); + let ek = format!("{}_{}", i, j + 1).into_bytes(); + m.insert( + sk, + SstFileInfo { + end_key: ek, + file_name: String::from("/asdfg.sst"), + idx: 0, + }, + ); } } map } -} \ No newline at end of file +} diff --git a/components/backup/src/utils.rs b/components/backup/src/utils.rs index 65334b5813d..18c915bc29a 100644 --- a/components/backup/src/utils.rs +++ b/components/backup/src/utils.rs @@ -267,7 +267,7 @@ impl KeyValueCodec { pub fn convert_encoded_key_to_raw_key(ek: Vec) -> Vec { if ek.is_empty() { - return ek + return ek; } Key::from_encoded(ek).into_raw().unwrap() } diff --git a/components/engine_rocks/src/checkpoint.rs b/components/engine_rocks/src/checkpoint.rs index 1c9f741891c..ef18fc16e8d 100644 --- a/components/engine_rocks/src/checkpoint.rs +++ b/components/engine_rocks/src/checkpoint.rs @@ -1,21 +1,23 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{path::Path, collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, path::Path, sync::Arc}; -use engine_traits::{Checkpointable, Checkpointer, Result, SstFileInfo, CfName, ColumnFamilyMetadata}; +use engine_traits::{ + CfName, Checkpointable, Checkpointer, ColumnFamilyMetadata, Result, SstFileInfo, +}; +use keys::{origin_key, validate_data_key}; use rocksdb::DB; -use keys::origin_key; -use crate::{r2e, RocksEngine, util}; +use crate::{r2e, util, RocksEngine}; impl Checkpointable for RocksEngine { type Checkpointer = RocksEngineCheckpointer; fn new_checkpointer(&self) -> Result { match self.as_inner().new_checkpointer() { - Ok(pointer) => Ok(RocksEngineCheckpointer{ + Ok(pointer) => Ok(RocksEngineCheckpointer { db: self.as_inner().clone(), - pointer + pointer, }), Err(e) => Err(r2e(e)), } @@ -32,7 +34,7 @@ impl Checkpointable for RocksEngine { pub struct RocksEngineCheckpointer { db: Arc, - pointer: rocksdb::Checkpointer + pointer: rocksdb::Checkpointer, } impl Checkpointer for RocksEngineCheckpointer { @@ -61,29 +63,45 @@ impl Checkpointer for RocksEngineCheckpointer { let files = level_metadata.get_files(); file_count += files.len(); for file in files { - file_size += file.get_size(); - let start_key = origin_key(file.get_smallestkey()).to_vec(); - let end_key = origin_key(file.get_largestkey()).to_vec(); - ssts.insert(start_key, SstFileInfo{ - file_name: file.get_name(), - end_key, - idx: 0, - }); - }; + if let Some((start_key, end_key)) = + origin_if_data_key(file.get_smallestkey(), file.get_largestkey()) + { + file_size += file.get_size(); + ssts.insert( + start_key, + SstFileInfo { + file_name: file.get_name(), + end_key, + idx: 0, + }, + ); + } + } lssts.push(ssts); } Ok(ColumnFamilyMetadata { file_count, file_size, - ssts: lssts + ssts: lssts, }) } } +fn origin_if_data_key(start_key: &[u8], end_key: &[u8]) -> Option<(Vec, Vec)> { + match (validate_data_key(start_key), validate_data_key(end_key)) { + (true, true) => Some((origin_key(start_key).to_vec(), origin_key(end_key).to_vec())), + (true, false) => Some((origin_key(start_key).to_vec(), keys::DATA_MAX_KEY.to_vec())), + (false, true) => Some((keys::DATA_MIN_KEY.to_vec(), origin_key(end_key).to_vec())), + (false, false) => None, + } +} + #[cfg(test)] mod tests { - use engine_traits::{Checkpointable, Checkpointer, Peekable, SyncMutable, ALL_CFS, CF_DEFAULT, MiscExt}; + use engine_traits::{ + Checkpointable, Checkpointer, MiscExt, Peekable, SyncMutable, ALL_CFS, CF_DEFAULT, + }; use tempfile::tempdir; use crate::util::new_engine; diff --git a/components/engine_traits/src/checkpoint.rs b/components/engine_traits/src/checkpoint.rs index ff243827977..b730f46b766 100644 --- a/components/engine_traits/src/checkpoint.rs +++ b/components/engine_traits/src/checkpoint.rs @@ -1,8 +1,8 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{path::Path, collections::BTreeMap, fmt::Debug}; +use std::{collections::BTreeMap, fmt::Debug, path::Path}; -use crate::{Result, CfName}; +use crate::{CfName, Result}; pub trait Checkpointable { type Checkpointer: Checkpointer; @@ -27,7 +27,7 @@ pub trait Checkpointer { pub struct SstFileInfo { pub file_name: String, - //pub start_key: Key, + // pub start_key: Key, pub end_key: Vec, pub idx: usize, } @@ -46,7 +46,7 @@ impl Debug for ColumnFamilyMetadata { for (level, ssts) in self.ssts.iter().enumerate() { let mut ss = String::new(); - for SstFileInfo{file_name, ..} in ssts.values() { + for SstFileInfo { file_name, .. } in ssts.values() { let str = format!("name: {file_name}"); ss = ss + &str } @@ -58,4 +58,4 @@ impl Debug for ColumnFamilyMetadata { binding.finish() } -} \ No newline at end of file +} diff --git a/components/test_backup/src/lib.rs b/components/test_backup/src/lib.rs index d8a19c91efb..ae687af42ea 100644 --- a/components/test_backup/src/lib.rs +++ b/components/test_backup/src/lib.rs @@ -9,7 +9,7 @@ use std::{ }; use api_version::{dispatch_api_version, keyspace::KvPair, ApiV1, KvFormat, RawValue}; -use backup::{Task, Operation}; +use backup::{Operation, Task}; use collections::HashMap; use engine_traits::{CfName, IterOptions, CF_DEFAULT, CF_WRITE, DATA_KEY_PREFIX_LEN}; use external_storage_export::make_local_backend; @@ -302,7 +302,9 @@ impl TestSuite { let (tx, rx) = future_mpsc::unbounded(); for end in self.endpoints.values() { let (task, _) = Task::new(req.clone(), tx.clone()).unwrap(); - end.scheduler().schedule(Operation::BackupTask(task)).unwrap(); + end.scheduler() + .schedule(Operation::BackupTask(task)) + .unwrap(); } rx } @@ -325,7 +327,9 @@ impl TestSuite { let (tx, rx) = future_mpsc::unbounded(); for end in self.endpoints.values() { let (task, _) = Task::new(req.clone(), tx.clone()).unwrap(); - end.scheduler().schedule(Operation::BackupTask(task)).unwrap(); + end.scheduler() + .schedule(Operation::BackupTask(task)) + .unwrap(); } rx } diff --git a/components/tikv_kv/src/btree_engine.rs b/components/tikv_kv/src/btree_engine.rs index c2df90faab8..e74556dd37f 100644 --- a/components/tikv_kv/src/btree_engine.rs +++ b/components/tikv_kv/src/btree_engine.rs @@ -12,7 +12,7 @@ use std::{ }; use collections::HashMap; -use engine_panic::{PanicEngine, checkpoint::PanicCheckpointer}; +use engine_panic::{checkpoint::PanicCheckpointer, PanicEngine}; use engine_traits::{CfName, IterOptions, ReadOptions, CF_DEFAULT, CF_LOCK, CF_WRITE}; use futures::{future, stream, Future, Stream}; use kvproto::kvrpcpb::Context; diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs index 539fef0cbdd..8ec713eb493 100644 --- a/components/tikv_kv/src/lib.rs +++ b/components/tikv_kv/src/lib.rs @@ -36,8 +36,8 @@ use std::{ use collections::HashMap; use engine_traits::{ - CfName, IterOptions, KvEngine as LocalEngine, Mutable, MvccProperties, ReadOptions, - TabletRegistry, WriteBatch, CF_DEFAULT, CF_LOCK, Checkpointer, + CfName, Checkpointer, IterOptions, KvEngine as LocalEngine, Mutable, MvccProperties, + ReadOptions, TabletRegistry, WriteBatch, CF_DEFAULT, CF_LOCK, }; use error_code::{self, ErrorCode, ErrorCodeExt}; use futures::{compat::Future01CompatExt, future::BoxFuture, prelude::*}; diff --git a/components/tikv_kv/src/rocksdb_engine.rs b/components/tikv_kv/src/rocksdb_engine.rs index df21dd5f3ea..aef34a71a75 100644 --- a/components/tikv_kv/src/rocksdb_engine.rs +++ b/components/tikv_kv/src/rocksdb_engine.rs @@ -14,7 +14,8 @@ use std::{ use collections::HashMap; pub use engine_rocks::RocksSnapshot; use engine_rocks::{ - get_env, RocksCfOptions, RocksDbOptions, RocksEngine as BaseRocksEngine, RocksEngineIterator, RocksEngineCheckpointer + get_env, RocksCfOptions, RocksDbOptions, RocksEngine as BaseRocksEngine, + RocksEngineCheckpointer, RocksEngineIterator, }; use engine_traits::{ CfName, Engines, IterOptions, Iterable, Iterator, KvEngine, Peekable, ReadOptions, diff --git a/src/server/gc_worker/gc_worker.rs b/src/server/gc_worker/gc_worker.rs index 5b235f1d4be..87d9b41a62d 100644 --- a/src/server/gc_worker/gc_worker.rs +++ b/src/server/gc_worker/gc_worker.rs @@ -1300,7 +1300,7 @@ pub mod test_gc_worker { use std::sync::{Arc, Mutex}; use collections::HashMap; - use engine_rocks::{RocksEngine, RocksSnapshot, RocksEngineCheckpointer}; + use engine_rocks::{RocksEngine, RocksEngineCheckpointer, RocksSnapshot}; use futures::Future; use kvproto::{ kvrpcpb::Context, From 675ad025460b42282e4bcbc2fb06e45385d84f23 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Sun, 25 Jun 2023 15:00:07 +0800 Subject: [PATCH 9/9] use std::fs::File to reduce CPU usage Signed-off-by: Leavrth --- components/backup/src/endpoint.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 8049c59a88a..7f3257ebaad 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -18,7 +18,7 @@ use engine_traits::{ }; use external_storage::{BackendConfig, HdfsConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; -use futures::{channel::mpsc::*, executor::block_on}; +use futures::{channel::mpsc::*, executor::block_on, io::AllowStdIo}; use kvproto::{ brpb::*, encryptionpb::EncryptionMethod, @@ -47,7 +47,6 @@ use tikv_util::{ worker::Runnable, }; use tokio::runtime::Runtime; -use tokio_util::compat::TokioAsyncReadCompatExt; use txn_types::{Key, Lock, TimeStamp}; use crate::{ @@ -315,7 +314,6 @@ struct SstSendInfo { file_names_w: Vec>, start_key: Vec, end_key: Vec, - ssts_cnt: usize, } async fn save_sst_file_worker( @@ -424,9 +422,10 @@ async fn upload_sst_file_internal>( None => file_name_relative, }; let file_name = data_dir.as_ref().join(file_name_relative); - let file = tokio::fs::File::open(file_name).await?; - let length = file.metadata().await?.len(); - let reader = UnpinReader(Box::new(file.compat())); + // Use tokio::fs::File with futures::compat consumes too much CPU. + let file = std::fs::File::open(file_name)?; + let length = file.metadata()?.len(); + let reader = UnpinReader(Box::new(AllowStdIo::new(file))); storage.write(file_name_relative, reader, length).await?; *progress_f += 1; } @@ -1367,7 +1366,6 @@ impl Endpoint { file_names_w: w_ssts.clone(), start_key, end_key, - ssts_cnt, }) .await {