diff --git a/include/titan/db.h b/include/titan/db.h index 0100389e3..e7b4a0347 100644 --- a/include/titan/db.h +++ b/include/titan/db.h @@ -201,6 +201,8 @@ class TitanDB : public StackableDB { // "rocksdb.titandb.discardable_ratio_le100_file_num" - returns count of // file whose discardable ratio is less or equal to 100%. static const std::string kNumDiscardableRatioLE100File; + + static const std::string kNumHolePunchableBlocks; }; bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, diff --git a/include/titan/options.h b/include/titan/options.h index b6e5bbffb..c2652bb80 100644 --- a/include/titan/options.h +++ b/include/titan/options.h @@ -161,6 +161,13 @@ struct TitanCFOptions : public ColumnFamilyOptions { // Default: false bool skip_value_in_compaction_filter{false}; + // If set true, Titan will use hole punching to release space of unrefed + // blobs. This feature is only available on Linux with file systems that + // support hole punching, such as ext4, xfs, btrfs, etc. + // + // Default: false + bool enable_punch_hole_gc{false}; + TitanCFOptions() = default; explicit TitanCFOptions(const ColumnFamilyOptions& options) : ColumnFamilyOptions(options) {} @@ -190,7 +197,8 @@ struct ImmutableTitanCFOptions { min_gc_batch_size(opts.min_gc_batch_size), merge_small_file_threshold(opts.merge_small_file_threshold), level_merge(opts.level_merge), - skip_value_in_compaction_filter(opts.skip_value_in_compaction_filter) {} + skip_value_in_compaction_filter(opts.skip_value_in_compaction_filter), + enable_punch_hole_gc(opts.enable_punch_hole_gc) {} uint64_t blob_file_target_size; @@ -205,6 +213,8 @@ struct ImmutableTitanCFOptions { bool level_merge; bool skip_value_in_compaction_filter; + + bool enable_punch_hole_gc; }; struct MutableTitanCFOptions { diff --git a/src/blob_aligned_blocks_collector.cc b/src/blob_aligned_blocks_collector.cc new file mode 100644 index 000000000..11f093d96 --- /dev/null +++ b/src/blob_aligned_blocks_collector.cc @@ -0,0 +1,87 @@ +#include "blob_aligned_blocks_collector.h" + +#include "base_db_listener.h" + +namespace rocksdb { +namespace titandb { + +TablePropertiesCollector* +BlobAlignedBlocksCollectorFactory::CreateTablePropertiesCollector( + rocksdb::TablePropertiesCollectorFactory::Context /* context */) { + return new BlobAlignedBlocksCollector(); +} + +const std::string BlobAlignedBlocksCollector::kPropertiesName = + "TitanDB.blob_aligned_blocks"; + +bool BlobAlignedBlocksCollector::Encode( + const std::map& aligned_blocks, std::string* result) { + PutVarint32(result, static_cast(aligned_blocks.size())); + for (const auto& f_blocks : aligned_blocks) { + PutVarint64(result, f_blocks.first); + PutVarint64(result, f_blocks.second); + } + return true; +} +bool BlobAlignedBlocksCollector::Decode( + Slice* slice, std::map* aligned_blocks) { + uint32_t num = 0; + if (!GetVarint32(slice, &num)) { + return false; + } + uint64_t file_number; + uint64_t size; + for (uint32_t i = 0; i < num; ++i) { + if (!GetVarint64(slice, &file_number)) { + return false; + } + if (!GetVarint64(slice, &size)) { + return false; + } + (*aligned_blocks)[file_number] = size; + } + return true; +} + +Status BlobAlignedBlocksCollector::AddUserKey(const Slice& /* key */, + const Slice& value, + EntryType type, + SequenceNumber /* seq */, + uint64_t /* file_size */) { + if (type != kEntryBlobIndex) { + return Status::OK(); + } + + Slice copy = value; + + BlobIndex index; + auto s = index.DecodeFrom(const_cast(©)); + if (!s.ok()) { + return s; + } + + auto iter = aligned_blocks_.find(index.file_number); + if (iter == aligned_blocks_.end()) { + aligned_blocks_[index.file_number] = index.blob_handle.size / 4096 + 1; + } else { + iter->second += index.blob_handle.size / 4096 + 1; + } + + return Status::OK(); +} + +Status BlobAlignedBlocksCollector::Finish(UserCollectedProperties* properties) { + if (aligned_blocks_.empty()) { + return Status::OK(); + } + + std::string res; + bool ok __attribute__((__unused__)) = Encode(aligned_blocks_, &res); + assert(ok); + assert(!res.empty()); + properties->emplace(std::make_pair(kPropertiesName, res)); + return Status::OK(); +} + +} // namespace titandb +} // namespace rocksdb diff --git a/src/blob_aligned_blocks_collector.h b/src/blob_aligned_blocks_collector.h new file mode 100644 index 000000000..d2b244ab0 --- /dev/null +++ b/src/blob_aligned_blocks_collector.h @@ -0,0 +1,56 @@ +#pragma once + +#include "rocksdb/listener.h" +#include "rocksdb/table_properties.h" +#include "util/coding.h" + +#include "blob_file_set.h" +#include "db_impl.h" + +// BlobAlignedBlocksCollector is a TablePropertiesCollector that collects +// the mapping from file number to the number of aligned blocks in the file. +// This information is used by punch hole GC. This is not the same as the +// live_data_size. Because, to use punch hole GC, blobs have to be aligned to +// the file system block size (so that the file is still parsable after holes +// are punched). This is basically live_data_size plus the size of all the +// padding bytes divided by the file system block size. + +namespace rocksdb { +namespace titandb { +class BlobAlignedBlocksCollectorFactory final + : public TablePropertiesCollectorFactory { + public: + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context context) override; + + const char* Name() const override { return "BlobAlignedBlocksCollector"; } + + std::shared_ptr info_logger_; +}; + +class BlobAlignedBlocksCollector final : public TablePropertiesCollector { + public: + const static std::string kPropertiesName; + + static bool Encode(const std::map& aligned_blocks, + std::string* result); + static bool Decode(Slice* slice, + std::map* aligned_blocks); + + Status AddUserKey(const Slice& key, const Slice& value, EntryType type, + SequenceNumber seq, uint64_t file_size) override; + Status Finish(UserCollectedProperties* properties) override; + UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties(); + } + const char* Name() const override { return "BlobAlignedBlocksCollector"; } + + BlobAlignedBlocksCollector() {} + + private: + std::map aligned_blocks_; + std::shared_ptr info_logger_; +}; + +} // namespace titandb +} // namespace rocksdb diff --git a/src/blob_file_builder.cc b/src/blob_file_builder.cc index d0070c7df..a5aecd24c 100644 --- a/src/blob_file_builder.cc +++ b/src/blob_file_builder.cc @@ -1,3 +1,5 @@ +#include "iostream" + #include "blob_file_builder.h" #include "table/block_based/block_based_table_reader.h" @@ -33,6 +35,8 @@ BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options, return; #endif } + // alignment_size_ = cf_options_.alignment_size; + alignment_size_ = cf_options.enable_punch_hole_gc ? 4 * 1024 : 0; WriteHeader(); } @@ -40,12 +44,15 @@ void BlobFileBuilder::WriteHeader() { BlobFileHeader header; header.version = blob_file_version_; if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) { - assert(blob_file_version_ == BlobFileHeader::kVersion2); + assert(blob_file_version_ >= BlobFileHeader::kVersion2); header.flags |= BlobFileHeader::kHasUncompressionDictionary; } std::string buffer; header.EncodeTo(&buffer); status_ = file_->Append(buffer); + if (alignment_size_ > 0) { + FillBlockWithPadding(); + } } void BlobFileBuilder::Add(const BlobRecord& record, @@ -143,11 +150,39 @@ void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) { handle->offset = file_->GetFileSize(); handle->size = encoder_.GetEncodedSize(); live_data_size_ += handle->size; + if (alignment_size_ > 0) { + live_blocks_ += handle->size / alignment_size_ + + (handle->size % alignment_size_ ? 1 : 0); + } status_ = file_->Append(encoder_.GetHeader()); if (ok()) { status_ = file_->Append(encoder_.GetRecord()); num_entries_++; + if (ok()) { + FillBlockWithPadding(); + } + } +} + +void BlobFileBuilder::FillBlockWithPadding() { + if (alignment_size_ == 0) { + return; + } + size_t padding = 0; + if (file_->GetFileSize() % alignment_size_ != 0) { + padding = alignment_size_ - file_->GetFileSize() % alignment_size_; + } + if (padding > 0) { + char buf[4096] = {0}; + while (padding > sizeof(buf)) { + status_ = file_->Append(Slice(buf, sizeof(buf))); + if (!ok()) { + return; + } + padding -= sizeof(buf); + } + status_ = file_->Append(Slice(buf, padding)); } } @@ -190,13 +225,14 @@ Status BlobFileBuilder::Finish(OutContexts* out_ctx) { BlobFileFooter footer; // if has compression dictionary, encode it into meta blocks if (cf_options_.blob_file_compression_options.max_dict_bytes > 0) { - assert(blob_file_version_ == BlobFileHeader::kVersion2); + assert(blob_file_version_ >= BlobFileHeader::kVersion2); BlockHandle meta_index_handle; MetaIndexBuilder meta_index_builder; WriteCompressionDictBlock(&meta_index_builder); WriteRawBlock(meta_index_builder.Finish(), &meta_index_handle); footer.meta_index_handle = meta_index_handle; } + footer.alignment_size = alignment_size_; std::string buffer; footer.EncodeTo(&buffer); diff --git a/src/blob_file_builder.h b/src/blob_file_builder.h index 62018ec5f..ba91e8996 100644 --- a/src/blob_file_builder.h +++ b/src/blob_file_builder.h @@ -69,7 +69,7 @@ class BlobFileBuilder { // caller to sync and close the file after calling Finish(). BlobFileBuilder(const TitanDBOptions& db_options, const TitanCFOptions& cf_options, WritableFileWriter* file, - uint32_t blob_file_version = BlobFileHeader::kVersion2); + uint32_t blob_file_version = BlobFileHeader::kVersion3); // Tries to add the record to the file // Notice: @@ -109,6 +109,8 @@ class BlobFileBuilder { const std::string& GetLargestKey() { return largest_key_; } uint64_t live_data_size() const { return live_data_size_; } + uint64_t live_blocks() const { return live_blocks_; } + uint64_t alignment_size() const { return alignment_size_; } private: BuilderState builder_state_; @@ -123,6 +125,7 @@ class BlobFileBuilder { void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder); void FlushSampleRecords(OutContexts* out_ctx); void WriteEncoderData(BlobHandle* handle); + void FillBlockWithPadding(); TitanCFOptions cf_options_; WritableFileWriter* file_; @@ -142,6 +145,9 @@ class BlobFileBuilder { std::string smallest_key_; std::string largest_key_; uint64_t live_data_size_ = 0; + uint64_t live_blocks_ = 0; + + uint64_t alignment_size_ = 0; }; } // namespace titandb diff --git a/src/blob_file_iterator.cc b/src/blob_file_iterator.cc index 50af8c901..e05baa6cb 100644 --- a/src/blob_file_iterator.cc +++ b/src/blob_file_iterator.cc @@ -1,3 +1,5 @@ +#include "iostream" + #include "blob_file_iterator.h" #include "table/block_based/block_based_table_reader.h" @@ -54,6 +56,8 @@ bool BlobFileIterator::Init() { BlockBasedTable::kBlockTrailerSize); } + alignment_size_ = blob_file_footer.alignment_size; + if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) { status_ = InitUncompressionDict(blob_file_footer, file_.get(), &uncompression_dict_, @@ -81,6 +85,9 @@ void BlobFileIterator::SeekToFirst() { if (!init_ && !Init()) return; status_ = Status::OK(); iterate_offset_ = header_size_; + if (alignment_size_ != 0) { + AdjustOffsetToNextAlignment(); + } PrefetchAndGet(); } @@ -109,7 +116,7 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) { uint64_t total_length = 0; FixedSlice header_buffer; iterate_offset_ = header_size_; - for (; iterate_offset_ < offset; iterate_offset_ += total_length) { + for (; iterate_offset_ < offset;) { // With for_compaction=true, rate_limiter is enabled. Since // BlobFileIterator is only used for GC, we always set for_compaction to // true. @@ -120,23 +127,53 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) { status_ = decoder_.DecodeHeader(&header_buffer); if (!status_.ok()) return; total_length = kRecordHeaderSize + decoder_.GetRecordSize(); + iterate_offset_ += total_length; + uint64_t padding = 0; + if (alignment_size_ != 0) { + padding = alignment_size_ - (iterate_offset_ % alignment_size_); + } + iterate_offset_ += padding; + total_length += padding; } if (iterate_offset_ > offset) iterate_offset_ -= total_length; valid_ = false; } -void BlobFileIterator::GetBlobRecord() { +void BlobFileIterator::AdjustOffsetToNextAlignment() { + if (alignment_size_ == 0) return; + uint64_t remainder = iterate_offset_ % alignment_size_; + if (remainder != 0) { + iterate_offset_ += alignment_size_ - remainder; + } +} + +bool BlobFileIterator::GetBlobRecord() { FixedSlice header_buffer; - // With for_compaction=true, rate_limiter is enabled. Since BlobFileIterator - // is only used for GC, we always set for_compaction to true. + // With for_compaction=true, rate_limiter is enabled. Since + // BlobFileIterator is only used for GC, we always set for_compaction to + // true. status_ = file_->Read(IOOptions(), iterate_offset_, kRecordHeaderSize, &header_buffer, header_buffer.get(), nullptr /*aligned_buf*/, true /*for_compaction*/); - if (!status_.ok()) return; - status_ = decoder_.DecodeHeader(&header_buffer); - if (!status_.ok()) return; + if (!status_.ok()) return false; + // If the header buffer is all zero, it means the record is deleted (punch + // hole). + bool deleted = true; + for (size_t i = 0; i < kRecordHeaderSize; i++) { + if (header_buffer[i] != 0) { + deleted = false; + break; + } + } + if (deleted) { + iterate_offset_ += alignment_size_; + AdjustOffsetToNextAlignment(); + return false; + } + status_ = decoder_.DecodeHeader(&header_buffer); + if (!status_.ok()) return false; Slice record_slice; auto record_size = decoder_.GetRecordSize(); buffer_.resize(record_size); @@ -150,44 +187,53 @@ void BlobFileIterator::GetBlobRecord() { decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_, titan_cf_options_.memory_allocator()); } - if (!status_.ok()) return; + if (!status_.ok()) return false; cur_record_offset_ = iterate_offset_; cur_record_size_ = kRecordHeaderSize + record_size; iterate_offset_ += cur_record_size_; + // align to next record + AdjustOffsetToNextAlignment(); valid_ = true; + return true; } void BlobFileIterator::PrefetchAndGet() { - if (iterate_offset_ >= end_of_blob_record_) { - valid_ = false; - return; - } + while (iterate_offset_ < end_of_blob_record_) { + // TODO: maybe reduce read ahead when encountering punch holes. e.g. just + // read header. + if (readahead_begin_offset_ > iterate_offset_ || + readahead_end_offset_ < iterate_offset_) { + // alignment + readahead_begin_offset_ = + iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1)); + readahead_end_offset_ = readahead_begin_offset_; + readahead_size_ = kMinReadaheadSize; + } + auto min_blob_size = + iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size; + if (readahead_end_offset_ <= min_blob_size) { + while (readahead_end_offset_ + readahead_size_ <= min_blob_size && + readahead_size_ < kMaxReadaheadSize) + readahead_size_ <<= 1; + status_ = file_->Prefetch(readahead_end_offset_, readahead_size_); + if (!status_.ok()) return; + readahead_end_offset_ += readahead_size_; + readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1); + } - if (readahead_begin_offset_ > iterate_offset_ || - readahead_end_offset_ < iterate_offset_) { - // alignment - readahead_begin_offset_ = - iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1)); - readahead_end_offset_ = readahead_begin_offset_; - readahead_size_ = kMinReadaheadSize; - } - auto min_blob_size = - iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size; - if (readahead_end_offset_ <= min_blob_size) { - while (readahead_end_offset_ + readahead_size_ <= min_blob_size && - readahead_size_ < kMaxReadaheadSize) - readahead_size_ <<= 1; - file_->Prefetch(readahead_end_offset_, readahead_size_); - readahead_end_offset_ += readahead_size_; - readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1); - } + bool live = GetBlobRecord(); - GetBlobRecord(); + if (readahead_end_offset_ < iterate_offset_) { + readahead_end_offset_ = iterate_offset_; + } - if (readahead_end_offset_ < iterate_offset_) { - readahead_end_offset_ = iterate_offset_; + // If the record is valid (not punch-holed), we can return. Otherwise, + // continue iterating until we find a valid record. + if (live) return; + iterate_offset_ += alignment_size_; } + valid_ = false; } BlobFileMergeIterator::BlobFileMergeIterator( diff --git a/src/blob_file_iterator.h b/src/blob_file_iterator.h index 2c62f3c1c..174db09be 100644 --- a/src/blob_file_iterator.h +++ b/src/blob_file_iterator.h @@ -35,6 +35,8 @@ class BlobFileIterator { Slice value() const; Status status() const { return status_; } uint64_t header_size() const { return header_size_; } + uint64_t file_number() const { return file_number_; } + uint64_t alginment_size() const { return alignment_size_; } void IterateForPrev(uint64_t); @@ -61,6 +63,8 @@ class BlobFileIterator { bool valid_{false}; std::unique_ptr uncompression_dict_; + uint64_t alignment_size_{0}; + BlobDecoder decoder_; uint64_t iterate_offset_{0}; @@ -76,7 +80,11 @@ class BlobFileIterator { uint64_t readahead_size_{kMinReadaheadSize}; void PrefetchAndGet(); - void GetBlobRecord(); + // Return whether the record at the current offset is valid or not (punch + // hole), if it is deleted, callers needs to move the offset to the next + // block. + bool GetBlobRecord(); + void AdjustOffsetToNextAlignment(); }; class BlobFileMergeIterator { diff --git a/src/blob_file_manager.h b/src/blob_file_manager.h index a216a164e..ad53f0519 100644 --- a/src/blob_file_manager.h +++ b/src/blob_file_manager.h @@ -72,6 +72,12 @@ class BlobFileManager { (void)handles; return Status::OK(); } + + virtual Status BatchUpdateFiles( + const std::vector>& files) { + (void)files; + return Status::OK(); + } }; } // namespace titandb diff --git a/src/blob_file_size_collector.cc b/src/blob_file_size_collector.cc index fa37897b6..4aea0b704 100644 --- a/src/blob_file_size_collector.cc +++ b/src/blob_file_size_collector.cc @@ -50,9 +50,12 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */, if (type != kEntryBlobIndex) { return Status::OK(); } + // In case there are other collectors that need the original value. + // Make a copy of the value because BlobIndex::DecodeFrom will modify it. + Slice copy = value; BlobIndex index; - auto s = index.DecodeFrom(const_cast(&value)); + auto s = index.DecodeFrom(const_cast(©)); if (!s.ok()) { return s; } diff --git a/src/blob_format.cc b/src/blob_format.cc index 7cb5e4591..13d8fe7e5 100644 --- a/src/blob_format.cc +++ b/src/blob_format.cc @@ -1,3 +1,5 @@ +#include "iostream" + #include "blob_format.h" #include "test_util/sync_point.h" @@ -141,6 +143,9 @@ void BlobFileMeta::EncodeTo(std::string* dst) const { PutVarint32(dst, file_level_); PutLengthPrefixedSlice(dst, smallest_key_); PutLengthPrefixedSlice(dst, largest_key_); + PutVarint64(dst, alignment_size_); + PutVarint64(dst, live_blocks_); + PutVarint64(dst, hole_punchable_blocks_); } Status BlobFileMeta::DecodeFromLegacy(Slice* src) { @@ -152,6 +157,25 @@ Status BlobFileMeta::DecodeFromLegacy(Slice* src) { return Status::OK(); } +Status BlobFileMeta::DecodeFromV2(Slice* src) { + if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_) || + !GetVarint64(src, &file_entries_) || !GetVarint32(src, &file_level_)) { + return Status::Corruption("BlobFileMeta decode failed"); + } + Slice str; + if (GetLengthPrefixedSlice(src, &str)) { + smallest_key_.assign(str.data(), str.size()); + } else { + return Status::Corruption("BlobSmallestKey Decode failed"); + } + if (GetLengthPrefixedSlice(src, &str)) { + largest_key_.assign(str.data(), str.size()); + } else { + return Status::Corruption("BlobLargestKey decode failed"); + } + return Status::OK(); +} + Status BlobFileMeta::DecodeFrom(Slice* src) { if (!GetVarint64(src, &file_number_) || !GetVarint64(src, &file_size_) || !GetVarint64(src, &file_entries_) || !GetVarint32(src, &file_level_)) { @@ -168,6 +192,14 @@ Status BlobFileMeta::DecodeFrom(Slice* src) { } else { return Status::Corruption("BlobLargestKey decode failed"); } + uint64_t alignment_size, live_blocks, hole_punchable_blocks; + if (!GetVarint64(src, &alignment_size) || !GetVarint64(src, &live_blocks) || + !GetVarint64(src, &hole_punchable_blocks)) { + return Status::Corruption("BlobFileMeta decode failed"); + } + alignment_size_ = alignment_size; + live_blocks_.store(live_blocks); + hole_punchable_blocks_.store(hole_punchable_blocks); return Status::OK(); } @@ -175,7 +207,11 @@ bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs) { return (lhs.file_number_ == rhs.file_number_ && lhs.file_size_ == rhs.file_size_ && lhs.file_entries_ == rhs.file_entries_ && - lhs.file_level_ == rhs.file_level_); + lhs.file_level_ == rhs.file_level_ && + lhs.alignment_size_ == rhs.alignment_size_ && + lhs.hole_punchable_blocks_.load() == + rhs.hole_punchable_blocks_.load() && + lhs.live_blocks_.load() == rhs.live_blocks_.load()); } void BlobFileMeta::FileStateTransit(const FileEvent& event) { @@ -234,6 +270,10 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) { assert(state_ == FileState::kNormal); state_ = FileState::kToMerge; break; + case FileEvent::kPunchHoleOutput: + assert(state_ == FileState::kBeingGC); + state_ = FileState::kPendingGC; + break; case FileEvent::kReset: state_ = FileState::kNormal; break; @@ -265,8 +305,11 @@ TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const { } void BlobFileMeta::Dump(bool with_keys) const { - fprintf(stdout, "file %" PRIu64 ", size %" PRIu64 ", level %" PRIu32, - file_number_, file_size_, file_level_); + fprintf(stdout, + "file %" PRIu64 ", size %" PRIu64 ", level %" PRIu32 + "live blocks %" PRIu64 ", hole punchable blocks %" PRIu64, + file_number_, file_size_, file_level_, live_blocks_.load(), + hole_punchable_blocks_.load()); if (with_keys) { fprintf(stdout, ", smallest key: %s, largest key: %s", Slice(smallest_key_).ToString(true /*hex*/).c_str(), @@ -279,7 +322,7 @@ void BlobFileHeader::EncodeTo(std::string* dst) const { PutFixed32(dst, kHeaderMagicNumber); PutFixed32(dst, version); - if (version == BlobFileHeader::kVersion2) { + if (version >= BlobFileHeader::kVersion2) { PutFixed32(dst, flags); } } @@ -291,7 +334,7 @@ Status BlobFileHeader::DecodeFrom(Slice* src) { "Blob file header magic number missing or mismatched."); } if (!GetFixed32(src, &version) || - (version != kVersion1 && version != kVersion2)) { + (version != kVersion1 && version != kVersion2 && version != kVersion3)) { return Status::Corruption("Blob file header version missing or invalid."); } if (version == BlobFileHeader::kVersion2) { @@ -305,6 +348,7 @@ Status BlobFileHeader::DecodeFrom(Slice* src) { void BlobFileFooter::EncodeTo(std::string* dst) const { auto size = dst->size(); + PutFixed64(dst, alignment_size); meta_index_handle.EncodeTo(dst); // Add padding to make a fixed size footer. dst->resize(size + kEncodedLength - 12); @@ -315,6 +359,17 @@ void BlobFileFooter::EncodeTo(std::string* dst) const { Status BlobFileFooter::DecodeFrom(Slice* src) { auto data = src->data(); + if (version == BlobFileHeader::kVersion3) { + if (!GetFixed64(src, &alignment_size)) { + return Status::Corruption("BlobFileFooter", "alignment size"); + } + } else { + // src's size is kEncodedLength regardless of version. If version is not 3, + // the first 8 bytes should be ignored. + src->remove_prefix(8); + // Update the footer's offset. + data = src->data(); + } Status s = meta_index_handle.DecodeFrom(src); if (!s.ok()) { return Status::Corruption("BlobFileFooter", s.ToString()); diff --git a/src/blob_format.h b/src/blob_format.h index 3e3d23d8c..3277f6bc3 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -37,7 +37,7 @@ namespace titandb { // const uint64_t kBlobMaxHeaderSize = 12; const uint64_t kRecordHeaderSize = 9; -const uint64_t kBlobFooterSize = BlockHandle::kMaxEncodedLength + 8 + 4; +const uint64_t kBlobFooterSize = 8 + BlockHandle::kMaxEncodedLength + 8 + 4; // Format of blob record (not fixed size): // @@ -209,6 +209,7 @@ class BlobFileMeta { kFlushOrCompactionOutput, kDelete, kNeedMerge, + kPunchHoleOutput, kReset, // reset file to normal for test }; @@ -228,19 +229,30 @@ class BlobFileMeta { BlobFileMeta(uint64_t _file_number, uint64_t _file_size, uint64_t _file_entries, uint32_t _file_level, const std::string& _smallest_key, - const std::string& _largest_key) + const std::string& _largest_key, uint64_t _alignment_size = 0, + uint64_t _live_blocks = 0) : file_number_(_file_number), file_size_(_file_size), file_entries_(_file_entries), file_level_(_file_level), smallest_key_(_smallest_key), - largest_key_(_largest_key) {} + largest_key_(_largest_key), + alignment_size_(_alignment_size), + live_blocks_(_live_blocks), + hole_punchable_blocks_(0) {} friend bool operator==(const BlobFileMeta& lhs, const BlobFileMeta& rhs); void EncodeTo(std::string* dst) const; Status DecodeFrom(Slice* src); Status DecodeFromLegacy(Slice* src); + Status DecodeFromV2(Slice* src); + + void set_live_data_size(uint64_t size) { live_data_size_ = size; } + void set_live_blocks(uint64_t size) { live_blocks_ = size; } + void set_hole_punchable_blocks(uint64_t size) { + hole_punchable_blocks_ = size; + } uint64_t file_number() const { return file_number_; } uint64_t file_size() const { return file_size_; } @@ -248,14 +260,20 @@ class BlobFileMeta { uint32_t file_level() const { return file_level_; } const std::string& smallest_key() const { return smallest_key_; } const std::string& largest_key() const { return largest_key_; } + uint64_t live_blocks() const { return live_blocks_; } + uint64_t hole_punchable_blocks() const { return hole_punchable_blocks_; } + + uint64_t alignment_size() const { return alignment_size_; } - void set_live_data_size(int64_t size) { live_data_size_ = size; } uint64_t file_entries() const { return file_entries_; } FileState file_state() const { return state_; } bool is_obsolete() const { return state_ == FileState::kObsolete; } void FileStateTransit(const FileEvent& event); void UpdateLiveDataSize(int64_t delta) { live_data_size_ += delta; } + void UpdateHolePunchableBlocks(int64_t delta) { + hole_punchable_blocks_ += delta; + } bool NoLiveData() { if (state_ == FileState::kPendingInit || state_ == FileState::kNone) { // File is not initialized yet, so the live_data_size is not accurate now. @@ -269,9 +287,28 @@ class BlobFileMeta { return 0; } // TODO: Exclude meta blocks from file size + if (alignment_size_ > 0) { + return 1 - + std::min( + 1.0, + static_cast(live_blocks_ - hole_punchable_blocks_) * + 1024 * 4 / + (file_size_ - kBlobMaxHeaderSize - kBlobFooterSize)); + } return 1 - (static_cast(live_data_size_) / (file_size_ - kBlobMaxHeaderSize - kBlobFooterSize)); } + + double GetPunchHoleScore() const { + // Only hole-punch a file if we can at least reclaim 256 blocks and + // the remaining live data is more than 20% of the file size. + if (hole_punchable_blocks_ > 256) { + return static_cast(hole_punchable_blocks_) * 1024 * 4 / + (file_size_ - kBlobMaxHeaderSize - kBlobFooterSize); + } + return 0.0; + } + TitanInternalStats::StatsType GetDiscardableRatioLevel() const; void Dump(bool with_keys) const; @@ -293,14 +330,18 @@ class BlobFileMeta { // Size of data with reference from SST files. // // Because the new generated SST is added to superversion before - // `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if there is a - // later compaction trigger by the new generated SST, the later + // `OnFlushCompleted()`/`OnCompactionCompleted()` is called, so if there is + // a later compaction trigger by the new generated SST, the later // `OnCompactionCompleted()` maybe called before the previous events' // `OnFlushCompleted()`/`OnCompactionCompleted()` is called. // So when state_ == kPendingLSM, it uses this to record the delta as a // positive number if any later compaction is trigger before previous // `OnCompactionCompleted()` is called. std::atomic live_data_size_{0}; + + uint64_t alignment_size_{0}; + std::atomic live_blocks_{0}; + std::atomic hole_punchable_blocks_{0}; std::atomic state_{FileState::kNone}; }; @@ -320,13 +361,15 @@ class BlobFileMeta { // | Fixed32 | Fixed32 | Fixed32 | // +--------------+---------+---------+ // -// The header is mean to be compatible with header of BlobDB blob files, except -// we use a different magic number. +// The header is mean to be compatible with header of BlobDB blob files, +// except we use a different magic number. struct BlobFileHeader { // The first 32bits from $(echo titandb/blob | sha1sum). static const uint32_t kHeaderMagicNumber = 0x2be0a614ul; static const uint32_t kVersion1 = 1; static const uint32_t kVersion2 = 2; + // Introducing alignment size in version 3. + static const uint32_t kVersion3 = 3; static const uint64_t kMinEncodedLength = 4 + 4; static const uint64_t kMaxEncodedLength = 4 + 4 + 4; @@ -334,11 +377,12 @@ struct BlobFileHeader { // Flags: static const uint32_t kHasUncompressionDictionary = 1 << 0; - uint32_t version = kVersion2; + uint32_t version = kVersion3; uint32_t flags = 0; static Status ValidateVersion(uint32_t ver) { - if (ver != BlobFileHeader::kVersion1 && ver != BlobFileHeader::kVersion2) { + if (ver != BlobFileHeader::kVersion1 && ver != BlobFileHeader::kVersion2 && + ver != BlobFileHeader::kVersion3) { return Status::InvalidArgument("unrecognized blob file version " + ToString(ver)); } @@ -355,22 +399,32 @@ struct BlobFileHeader { Status DecodeFrom(Slice* src); }; -// Format of blob file footer (BlockHandle::kMaxEncodedLength + 12): +// Format of blob file footer V3 (BlockHandle::kMaxEncodedLength + 20): +// +// +------------------+---------------------+-------------+ +// | alignment size | meta index handle | padding | +// +------------------+---------------------+-------------+ +// | Fixed64 | Varint64 + Varint64 | padding_len | +// +------------------+---------------------+-------------+ // -// +---------------------+-------------+--------------+----------+ -// | meta index handle | padding | magic number | checksum | -// +---------------------+-------------+--------------+----------+ -// | Varint64 + Varint64 | padding_len | Fixed64 | Fixed32 | -// +---------------------+-------------+--------------+----------+ +// +--------------+----------+ +// | magic number | checksum | +// +--------------+----------+ +// | Fixed64 | Fixed32 | +// +--------------+----------+ // -// To make the blob file footer fixed size, -// the padding_len is `BlockHandle::kMaxEncodedLength - meta_handle_len` +// To make the blob file footer fixed size, the padding_len is calculated as: +// `BlockHandle::kMaxEncodedLength - meta_handle_len - sizeof(uint64_t)` struct BlobFileFooter { // The first 64bits from $(echo titandb/blob | sha1sum). static const uint64_t kFooterMagicNumber{0x2be0a6148e39edc6ull}; static const uint64_t kEncodedLength{kBlobFooterSize}; BlockHandle meta_index_handle{BlockHandle::NullBlockHandle()}; + uint64_t alignment_size{0}; + + // Non-persistent field. + uint32_t version = BlobFileHeader::kVersion3; void EncodeTo(std::string* dst) const; Status DecodeFrom(Slice* src); diff --git a/src/blob_format_test.cc b/src/blob_format_test.cc index 74d4187b8..773667a26 100644 --- a/src/blob_format_test.cc +++ b/src/blob_format_test.cc @@ -36,7 +36,7 @@ TEST(BlobFormatTest, BlobIndex) { } TEST(BlobFormatTest, BlobFileMeta) { - BlobFileMeta input(2, 3, 0, 0, "0", "9"); + BlobFileMeta input(2, 3, 0, 0, "0", "9", 0, 0); CheckCodec(input); } diff --git a/src/blob_gc.cc b/src/blob_gc.cc index 9fe6cd2d6..f1ece3c01 100644 --- a/src/blob_gc.cc +++ b/src/blob_gc.cc @@ -4,14 +4,22 @@ namespace rocksdb { namespace titandb { BlobGC::BlobGC(std::vector>&& blob_files, - TitanCFOptions&& _titan_cf_options, bool need_trigger_next) + TitanCFOptions&& _titan_cf_options, bool need_trigger_next, + uint64_t cf_id, bool punch_hole) : inputs_(blob_files), titan_cf_options_(std::move(_titan_cf_options)), - trigger_next_(need_trigger_next) { + trigger_next_(need_trigger_next), + cf_id_(cf_id), + use_punch_hole_(punch_hole) { MarkFilesBeingGC(); } -BlobGC::~BlobGC() {} +BlobGC::~BlobGC() { + // Release snapshot requires db pointer, so we can't release it internally. + // In case the caller forgets to release the snapshot, we assert here, prefer + // to crash in the runtime than leak. + assert(snapshot_ == nullptr); +} void BlobGC::SetColumnFamily(ColumnFamilyHandle* cfh) { cfh_ = cfh; } @@ -40,5 +48,12 @@ void BlobGC::ReleaseGcFiles() { } } +void BlobGC::ReleaseSnapshot(DB* db) { + if (snapshot_ != nullptr) { + db->ReleaseSnapshot(snapshot_); + snapshot_ = nullptr; + } +} + } // namespace titandb } // namespace rocksdb diff --git a/src/blob_gc.h b/src/blob_gc.h index 5ce1998f6..adf93f4db 100644 --- a/src/blob_gc.h +++ b/src/blob_gc.h @@ -14,7 +14,8 @@ namespace titandb { class BlobGC { public: BlobGC(std::vector>&& blob_files, - TitanCFOptions&& _titan_cf_options, bool need_trigger_next); + TitanCFOptions&& _titan_cf_options, bool need_trigger_next, + uint64_t cf_id, bool punch_hole = false); // No copying allowed BlobGC(const BlobGC&) = delete; @@ -38,15 +39,30 @@ class BlobGC { void ReleaseGcFiles(); + uint64_t cf_id() { return cf_id_; } + + const Snapshot* snapshot() { + assert(use_punch_hole_); + assert(snapshot_ != nullptr); + return snapshot_; + } + void SetSnapshot(const Snapshot* snapshot) { snapshot_ = snapshot; } + void ReleaseSnapshot(DB* db); + + bool use_punch_hole() { return use_punch_hole_; } + bool trigger_next() { return trigger_next_; } private: std::vector> inputs_; std::vector outputs_; TitanCFOptions titan_cf_options_; + const bool trigger_next_; + uint64_t cf_id_; ColumnFamilyHandle* cfh_{nullptr}; // Whether need to trigger gc after this gc or not - const bool trigger_next_; + bool use_punch_hole_; + const Snapshot* snapshot_{nullptr}; }; struct GCScore { diff --git a/src/blob_gc_job.cc b/src/blob_gc_job.cc index d1b89a484..d915444e0 100644 --- a/src/blob_gc_job.cc +++ b/src/blob_gc_job.cc @@ -3,8 +3,9 @@ #endif #include "blob_gc_job.h" +#include +#include #include - #include #include "titan_logging.h" @@ -140,13 +141,95 @@ Status BlobGCJob::Run() { } tmp.append(std::to_string(f->file_number())); } - TITAN_LOG_BUFFER(log_buffer_, "[%s] Titan GC candidates[%s]", + TITAN_LOG_BUFFER(log_buffer_, + "[%s] Titan GC inputs: [%s], use punch hole: %s", blob_gc_->column_family_handle()->GetName().c_str(), - tmp.c_str()); - return DoRunGC(); + tmp.c_str(), blob_gc_->use_punch_hole() ? "true" : "false"); + + if (blob_gc_->use_punch_hole()) { + return HolePunchBlobFiles(); + } else { + return RewriteBlobFiles(); + } +} + +Status BlobGCJob::HolePunchBlobFiles() { + for (const auto& file : blob_gc_->inputs()) { + if (IsShutingDown()) { + return Status::ShutdownInProgress(); + } + Status s = HolePunchSingleBlobFile(file); + if (!s.ok()) { + TITAN_LOG_ERROR(db_options_.info_log, + "Hole punch file %" PRIu64 " failed: %s", + file->file_number(), s.ToString().c_str()); + + return s; + } + } + return Status::OK(); } -Status BlobGCJob::DoRunGC() { +Status BlobGCJob::HolePunchSingleBlobFile(std::shared_ptr file) { + Status s; + auto fd = open(BlobFileName(db_options_.dirname, file->file_number()).c_str(), + O_WRONLY); + std::unique_ptr file_reader; + s = NewBlobFileReader(file->file_number(), 0, db_options_, env_options_, env_, + &file_reader); + if (!s.ok()) { + return s; + } + uint64_t live_blocks = 0; + std::unique_ptr iter( + new BlobFileIterator(std::move(file_reader), file->file_number(), + file->file_size(), blob_gc_->titan_cf_options())); + iter->SeekToFirst(); + if (!iter->status().ok()) { + return iter->status(); + } + for (; iter->Valid(); iter->Next()) { + if (IsShutingDown()) { + return Status::ShutdownInProgress(); + } + BlobIndex blob_index = iter->GetBlobIndex(); + auto key = iter->key(); + bool discardable = false; + s = DiscardEntry(key, blob_index, blob_gc_->snapshot(), &discardable); + if (!s.ok()) { + return s; + } + if (!discardable) { + live_blocks += + (blob_index.blob_handle.size + file->alignment_size() - 1) / + file->alignment_size(); + continue; + } + +#if defined(FALLOC_FL_PUNCH_HOLE) && defined(FALLOC_FL_KEEP_SIZE) + auto num_blocks_aligned = + ((blob_index.blob_handle.size + file->alignment_size() - 1) / + file->alignment_size()); + // Hole punch the file at the blob_index.blob_handle.offset with + // blob_index.blob_handle.size aligned to alignment_size. + auto err = fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + blob_index.blob_handle.offset, + num_blocks_aligned * file->alignment_size()); + if (err != 0) { + return Status::IOError("Hole punch failed", strerror(err)); + } +#else + return Status::NotSupported("Hole punch not supported"); +#endif + } + if (!iter->status().ok()) { + return iter->status(); + } + hole_punched_files_map_[file->file_number()] = live_blocks; + return Status::OK(); +} + +Status BlobGCJob::RewriteBlobFiles() { Status s; std::unique_ptr gc_iter; @@ -193,7 +276,7 @@ Status BlobGCJob::DoRunGC() { } bool discardable = false; - s = DiscardEntry(gc_iter->key(), blob_index, &discardable); + s = DiscardEntry(gc_iter->key(), blob_index, nullptr, &discardable); if (!s.ok()) { break; } @@ -332,9 +415,11 @@ Status BlobGCJob::BuildIterator( if (!s.ok()) { break; } - list.emplace_back(std::unique_ptr(new BlobFileIterator( - std::move(file), inputs[i]->file_number(), inputs[i]->file_size(), - blob_gc_->titan_cf_options()))); + auto blob_file_iter = + std::unique_ptr(new BlobFileIterator( + std::move(file), inputs[i]->file_number(), inputs[i]->file_size(), + blob_gc_->titan_cf_options())); + list.emplace_back(std::move(blob_file_iter)); } if (s.ok()) @@ -345,7 +430,7 @@ Status BlobGCJob::BuildIterator( } Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index, - bool* discardable) { + const Snapshot* snapshot, bool* discardable) { TitanStopWatch sw(env_, metrics_.gc_read_lsm_micros); assert(discardable != nullptr); PinnableSlice index_entry; @@ -354,7 +439,11 @@ Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index, gopts.column_family = blob_gc_->column_family_handle(); gopts.value = &index_entry; gopts.is_blob_index = &is_blob_index; - Status s = base_db_impl_->GetImpl(ReadOptions(), key, gopts); + auto read_opts = ReadOptions(); + if (snapshot != nullptr) { + read_opts.snapshot = snapshot; + } + Status s = base_db_impl_->GetImpl(read_opts, key, gopts); if (!s.ok() && !s.IsNotFound()) { return s; } @@ -382,7 +471,7 @@ Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index, // added to db before we rewrite any key to LSM Status BlobGCJob::Finish() { Status s; - { + if (!blob_gc_->use_punch_hole()) { mutex_->Unlock(); s = InstallOutputBlobFiles(); if (s.ok()) { @@ -401,14 +490,37 @@ Status BlobGCJob::Finish() { s.ToString().c_str()); } mutex_->Lock(); + if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) { + TEST_SYNC_POINT("BlobGCJob::Finish::BeforeDeleteInputBlobFiles"); + s = DeleteInputBlobFiles(); + } + TEST_SYNC_POINT("BlobGCJob::Finish::AfterRewriteValidKeyToLSM"); + } else { + // It is possible that while processing the GC job, the input blob files' + // liveness or number of hole punchable blocks have changed. So, we need to + // deal with the meta data update with mutex locked. + // TODO: test this case. + std::vector> hole_punched_files; + for (auto& file : blob_gc_->inputs()) { + if (file->is_obsolete()) { + continue; + } + auto it = hole_punched_files_map_.find(file->file_number()); + if (it == hole_punched_files_map_.end()) { + continue; + } + auto live_blocks = it->second; + auto hole_punched_blocks = file->live_blocks() - live_blocks; + file->set_live_blocks(live_blocks); + file->set_hole_punchable_blocks(file->hole_punchable_blocks() - + hole_punched_blocks); + file->FileStateTransit(BlobFileMeta::FileEvent::kPunchHoleOutput); + hole_punched_files.emplace_back(file); + } + if (!hole_punched_files.empty()) { + s = blob_file_manager_->BatchUpdateFiles(hole_punched_files); + } } - - if (s.ok() && !blob_gc_->GetColumnFamilyData()->IsDropped()) { - TEST_SYNC_POINT("BlobGCJob::Finish::BeforeDeleteInputBlobFiles"); - s = DeleteInputBlobFiles(); - } - TEST_SYNC_POINT("BlobGCJob::Finish::AfterRewriteValidKeyToLSM"); - if (s.ok()) { UpdateInternalOpStats(); } @@ -420,7 +532,8 @@ Status BlobGCJob::InstallOutputBlobFiles() { Status s; std::vector< std::pair, std::unique_ptr>> - files; + new_files; + std::vector> updated_files; std::string tmp; for (auto& builder : blob_file_builders_) { BlobFileBuilder::OutContexts contexts; @@ -433,7 +546,8 @@ Status BlobGCJob::InstallOutputBlobFiles() { auto file = std::make_shared( builder.first->GetNumber(), builder.first->GetFile()->GetFileSize(), 0, - 0, builder.second->GetSmallestKey(), builder.second->GetLargestKey()); + 0, builder.second->GetSmallestKey(), builder.second->GetLargestKey(), + builder.second->alignment_size(), builder.second->live_blocks()); file->set_live_data_size(builder.second->live_data_size()); file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput); RecordInHistogram(statistics(stats_), TITAN_GC_OUTPUT_FILE_SIZE, @@ -442,17 +556,19 @@ Status BlobGCJob::InstallOutputBlobFiles() { tmp.append(" "); } tmp.append(std::to_string(file->file_number())); - files.emplace_back(std::make_pair(file, std::move(builder.first))); + new_files.emplace_back(std::make_pair(file, std::move(builder.first))); } if (s.ok()) { - TITAN_LOG_BUFFER(log_buffer_, "[%s] output[%s]", - blob_gc_->column_family_handle()->GetName().c_str(), - tmp.c_str()); - s = blob_file_manager_->BatchFinishFiles( - blob_gc_->column_family_handle()->GetID(), files); - if (s.ok()) { - for (auto& file : files) { - blob_gc_->AddOutputFile(file.first.get()); + if (!new_files.empty()) { + TITAN_LOG_BUFFER(log_buffer_, "[%s] output[%s]", + blob_gc_->column_family_handle()->GetName().c_str(), + tmp.c_str()); + s = blob_file_manager_->BatchFinishFiles( + blob_gc_->column_family_handle()->GetID(), new_files); + if (s.ok()) { + for (auto& file : new_files) { + blob_gc_->AddOutputFile(file.first.get()); + } } } } else { diff --git a/src/blob_gc_job.h b/src/blob_gc_job.h index 0a986dcb0..c5c2516de 100644 --- a/src/blob_gc_job.h +++ b/src/blob_gc_job.h @@ -60,6 +60,9 @@ class BlobGCJob { std::vector> rewrite_batches_; + // Maps file number to live blocks. + std::unordered_map hole_punched_files_map_; + std::atomic_bool *shuting_down_{nullptr}; TitanStats *stats_; @@ -77,6 +80,8 @@ class BlobGCJob { uint64_t gc_num_files = 0; uint64_t gc_read_lsm_micros = 0; uint64_t gc_update_lsm_micros = 0; + uint64_t gc_punch_holes = 0; + uint64_t gc_punch_hole_bytes = 0; } metrics_; uint64_t prev_bytes_read_ = 0; @@ -84,11 +89,13 @@ class BlobGCJob { uint64_t io_bytes_read_ = 0; uint64_t io_bytes_written_ = 0; - Status DoRunGC(); + Status RewriteBlobFiles(); + Status HolePunchBlobFiles(); + Status HolePunchSingleBlobFile(std::shared_ptr file); void BatchWriteNewIndices(BlobFileBuilder::OutContexts &contexts, Status *s); Status BuildIterator(std::unique_ptr *result); Status DiscardEntry(const Slice &key, const BlobIndex &blob_index, - bool *discardable); + const Snapshot *snapshot, bool *discardable); Status InstallOutputBlobFiles(); Status RewriteValidKeyToLSM(); Status DeleteInputBlobFiles(); diff --git a/src/blob_gc_job_test.cc b/src/blob_gc_job_test.cc index 4e44bceeb..2c7879c2a 100644 --- a/src/blob_gc_job_test.cc +++ b/src/blob_gc_job_test.cc @@ -149,7 +149,8 @@ class BlobGCJobTest : public testing::Test { std::unique_ptr blob_gc; { std::shared_ptr blob_gc_picker = - std::make_shared(db_options, cf_options, nullptr); + std::make_shared(db_options, cf_options, 0, + nullptr); blob_gc = blob_gc_picker->PickBlobGC( blob_file_set_->GetBlobStorage(cfh->GetID()).lock().get()); } @@ -216,13 +217,13 @@ class BlobGCJobTest : public testing::Test { auto rewrite_status = base_db_->Write(WriteOptions(), &wb); std::vector> tmp; - BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/); + BlobGC blob_gc(std::move(tmp), TitanCFOptions(), false /*trigger_next*/, 0); blob_gc.SetColumnFamily(cfh); BlobGCJob blob_gc_job(&blob_gc, base_db_, mutex_, TitanDBOptions(), Env::Default(), EnvOptions(), nullptr, blob_file_set_, nullptr, nullptr, nullptr); bool discardable = false; - ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, &discardable)); + ASSERT_OK(blob_gc_job.DiscardEntry(key, blob_index, nullptr, &discardable)); ASSERT_FALSE(discardable); } @@ -287,6 +288,85 @@ TEST_F(BlobGCJobTest, DiscardEntry) { TestDiscardEntry(); } TEST_F(BlobGCJobTest, RunGC) { TestRunGC(); } +TEST_F(BlobGCJobTest, PunchHole) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"BlobGCJobTest::PunchHole:AfterCompact", + "TitanDBImpl::BackgroundCallGC:BeforeGCRunning"}, + {"TitanDBImpl::BackgroundCallGC:AfterGCRunning", + "BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsQueued"}, + {"BlobGCJobTest::PunchHole:AfterReleaseSnapshot", + "TitanDBImpl::BackgroundCallGC:BeforeRunScheduledPunchHoleGC"}, + {"TitanDBImpl::BackgroundCallGC:AfterRunScheduledPunchHoleGC", + "BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsFinished"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + DisableMergeSmall(); + options_.enable_punch_hole_gc = true; + options_.disable_background_gc = false; + options_.disable_auto_compactions = false; + + NewDB(); + auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock(); + std::vector values(MAX_KEY_NUM); + for (int i = 0; i < MAX_KEY_NUM; i++) { + values.push_back(GenValue(i)); + db_->Put(WriteOptions(), GenKey(i), values[i]); + } + Flush(); + std::map> files; + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + auto file_size = files.begin()->second.lock()->file_size(); + auto live_blocks = files.begin()->second.lock()->live_blocks(); + for (int i = 0; i < MAX_KEY_NUM; i++) { + if (i % 3 == 0) { + db_->Delete(WriteOptions(), GenKey(i)); + } + } + Flush(); + CompactAll(); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files.begin()->second.lock()->hole_punchable_blocks(), 334); + ASSERT_EQ(files.begin()->second.lock()->live_blocks(), 1000); + + auto snapshot = db_->GetSnapshot(); + db_->Put(WriteOptions(), GenKey(100000), GenValue(1)); + + TEST_SYNC_POINT("BlobGCJobTest::PunchHole:AfterCompact"); + TEST_SYNC_POINT("BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsQueued"); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files.begin()->second.lock()->hole_punchable_blocks(), 334); + ASSERT_EQ(files.begin()->second.lock()->live_blocks(), 1000); + + db_->ReleaseSnapshot(snapshot); + TEST_SYNC_POINT("BlobGCJobTest::PunchHole:AfterReleaseSnapshot"); + TEST_SYNC_POINT("BlobGCJobTest::PunchHole:BeforeCheckPunchHoleGCIsFinished"); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + auto post_punch_hole_file_size = files.begin()->second.lock()->file_size(); + ASSERT_EQ(post_punch_hole_file_size, file_size); + ASSERT_EQ(files.begin()->second.lock()->live_blocks(), 666); + ASSERT_EQ(files.begin()->second.lock()->hole_punchable_blocks(), 0); + for (int i = 0; i < MAX_KEY_NUM; i++) { + if (i % 3 == 0) { + std::string value; + db_->Get(ReadOptions(), GenKey(i), &value); + ASSERT_EQ(value, values[i]); + } + } + options_.enable_punch_hole_gc = false; + options_.disable_background_gc = true; + options_.disable_auto_compactions = true; +} + TEST_F(BlobGCJobTest, GCLimiter) { class TestLimiter : public RateLimiter { public: diff --git a/src/blob_gc_picker.cc b/src/blob_gc_picker.cc index 2b102ca10..82d09abd6 100644 --- a/src/blob_gc_picker.cc +++ b/src/blob_gc_picker.cc @@ -12,14 +12,17 @@ namespace rocksdb { namespace titandb { BasicBlobGCPicker::BasicBlobGCPicker(TitanDBOptions db_options, - TitanCFOptions cf_options, + TitanCFOptions cf_options, uint32_t cf_id, TitanStats* stats) - : db_options_(db_options), cf_options_(cf_options), stats_(stats) {} + : db_options_(db_options), + cf_options_(cf_options), + cf_id_(cf_id), + stats_(stats) {} BasicBlobGCPicker::~BasicBlobGCPicker() {} -std::unique_ptr BasicBlobGCPicker::PickBlobGC( - BlobStorage* blob_storage) { +std::unique_ptr BasicBlobGCPicker::PickBlobGC(BlobStorage* blob_storage, + bool allow_punch_hole) { Status s; std::vector> blob_files; @@ -30,10 +33,41 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( uint64_t next_gc_size = 0; bool in_fallback = cf_options_.blob_run_mode == TitanBlobRunMode::kFallback; - for (auto& gc_score : blob_storage->gc_score()) { - if (gc_score.score < cf_options_.blob_file_discardable_ratio) { - break; + if (allow_punch_hole) { + for (auto& score : blob_storage->punch_hole_score()) { + if (score.score >= cf_options_.blob_file_discardable_ratio) { + break; + } + auto blob_file = blob_storage->FindFile(score.file_number).lock(); + if (!CheckBlobFile(blob_file.get())) { + // Skip this file id this file is being GCed + // or this file had + continue; + } + if (!stop_picking) { + blob_files.emplace_back(blob_file); + batch_size += blob_file->file_size(); + if (batch_size >= cf_options_.max_gc_batch_size) { + // Stop pick file for this gc, but still check file for whether need + // trigger gc after this + stop_picking = true; + } + } else { + maybe_continue_next_time = true; + break; + } + } + if (!blob_files.empty()) { + std::string all_candidates; + for (auto& blob_file : blob_files) { + all_candidates += std::to_string(blob_file->file_number()) + " "; + } + return std::unique_ptr( + new BlobGC(std::move(blob_files), std::move(cf_options_), + maybe_continue_next_time, cf_id_, /*punch_hole=*/true)); } + } + for (auto& gc_score : blob_storage->gc_score()) { // in fallback mode, only gc files that all blobs are discarded if (in_fallback && std::abs(1.0 - gc_score.score) > std::numeric_limits::epsilon()) { @@ -83,8 +117,8 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( if (blob_files.empty()) return nullptr; - // Skip these checks if in fallback mode, we need to gc all files in fallback - // mode + // Skip these checks if in fallback mode, we need to gc all files in + // fallback mode if (!in_fallback) { if (batch_size < cf_options_.min_gc_batch_size && estimate_output_size < cf_options_.blob_file_target_size) { @@ -99,8 +133,9 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( } } - return std::unique_ptr(new BlobGC( - std::move(blob_files), std::move(cf_options_), maybe_continue_next_time)); + return std::unique_ptr(new BlobGC(std::move(blob_files), + std::move(cf_options_), + maybe_continue_next_time, cf_id_)); } bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const { diff --git a/src/blob_gc_picker.h b/src/blob_gc_picker.h index ca570872d..c0e4d379e 100644 --- a/src/blob_gc_picker.h +++ b/src/blob_gc_picker.h @@ -24,19 +24,22 @@ class BlobGCPicker { // Returns nullptr if there is no gc to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the gc. Caller should delete the result. - virtual std::unique_ptr PickBlobGC(BlobStorage* blob_storage) = 0; + virtual std::unique_ptr PickBlobGC(BlobStorage* blob_storage, + bool allow_punch_hole = true) = 0; }; class BasicBlobGCPicker final : public BlobGCPicker { public: - BasicBlobGCPicker(TitanDBOptions, TitanCFOptions, TitanStats*); + BasicBlobGCPicker(TitanDBOptions, TitanCFOptions, uint32_t, TitanStats*); ~BasicBlobGCPicker(); - std::unique_ptr PickBlobGC(BlobStorage* blob_storage) override; + std::unique_ptr PickBlobGC(BlobStorage* blob_storage, + bool allow_punch_hole = true) override; private: TitanDBOptions db_options_; TitanCFOptions cf_options_; + uint32_t cf_id_; TitanStats* stats_; // Check if blob_file needs to gc, return true means we need pick this diff --git a/src/blob_gc_picker_test.cc b/src/blob_gc_picker_test.cc index d13d57efd..0abcf6df4 100644 --- a/src/blob_gc_picker_test.cc +++ b/src/blob_gc_picker_test.cc @@ -26,7 +26,7 @@ class BlobGCPickerTest : public testing::Test { blob_storage_.reset(new BlobStorage(titan_db_options, titan_cf_options, 0, blob_file_cache, nullptr, nullptr)); basic_blob_gc_picker_.reset( - new BasicBlobGCPicker(titan_db_options, titan_cf_options, nullptr)); + new BasicBlobGCPicker(titan_db_options, titan_cf_options, 0, nullptr)); } void AddBlobFile(uint64_t file_number, uint64_t data_size, diff --git a/src/blob_storage.cc b/src/blob_storage.cc index cd4670dac..532d7af64 100644 --- a/src/blob_storage.cc +++ b/src/blob_storage.cc @@ -90,6 +90,37 @@ void BlobStorage::AddBlobFile(std::shared_ptr& file) { blob_ranges_.emplace(std::make_pair(Slice(file->smallest_key()), file)); } +void BlobStorage::HolePunchBlobFile(std::shared_ptr& file) { + MutexLock l(&mutex_); + // Update the file in files_ and blob_ranges_. + auto f_it = files_.find(file->file_number()); + if (f_it != files_.end()) { + assert(f_it->second.get() == file.get()); + } else { + TITAN_LOG_ERROR(db_options_.info_log, + "Hole punch blob file %" PRIu64 + " failed, file not found in BlobStorage.", + file->file_number()); + files_.emplace(std::make_pair(file->file_number(), file)); + } + bool found = false; + auto p = blob_ranges_.equal_range(file->smallest_key()); + for (auto it = p.first; it != p.second; it++) { + if (it->second->file_number() == file->file_number()) { + assert(it->second.get() == file.get()); + found = true; + break; + } + } + if (!found) { + TITAN_LOG_ERROR(db_options_.info_log, + "Hole punch blob file %" PRIu64 + " failed, file not found in BlobStorage.", + file->file_number()); + blob_ranges_.emplace(std::make_pair(Slice(file->smallest_key()), file)); + } +} + bool BlobStorage::MarkFileObsolete(uint64_t file_number, SequenceNumber obsolete_sequence) { MutexLock l(&mutex_); @@ -125,8 +156,16 @@ bool BlobStorage::RemoveFile(uint64_t file_number) { break; } } + auto removed_size = 0; + if (file->second->alignment_size() > 0) { + // +1 header block + auto num_blocks = file->second->live_blocks() + 1; + removed_size = num_blocks * file->second->alignment_size(); + } else { + removed_size = file->second->file_size(); + } SubStats(stats_, cf_id_, TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE, - file->second->file_size()); + removed_size); SubStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE, 1); files_.erase(file_number); file_cache_->Evict(file_number); @@ -178,7 +217,8 @@ void BlobStorage::UpdateStats() { levels_file_count_.clear(); levels_file_count_.assign(cf_options_.num_levels, 0); - uint64_t live_blob_file_size = 0, num_live_blob_file = 0; + uint64_t live_blob_file_size = 0, num_live_blob_file = 0, + num_hole_punchable_blocks = 0; uint64_t obsolete_blob_file_size = 0, num_obsolete_blob_file = 0; std::unordered_map ratio_levels; @@ -186,7 +226,13 @@ void BlobStorage::UpdateStats() { for (auto& file : files_) { if (file.second->is_obsolete()) { num_obsolete_blob_file += 1; - obsolete_blob_file_size += file.second->file_size(); + if (file.second->alignment_size() > 0) { + // +1 header block + auto num_blocks = file.second->live_blocks() + 1; + obsolete_blob_file_size += num_blocks * file.second->alignment_size(); + } else { + obsolete_blob_file_size += file.second->file_size(); + } continue; } num_live_blob_file += 1; @@ -194,9 +240,16 @@ void BlobStorage::UpdateStats() { // If the file is initialized yet, skip it if (file.second->file_state() != BlobFileMeta::FileState::kPendingInit) { - live_blob_file_size += file.second->file_size(); ratio_levels[static_cast(file.second->GetDiscardableRatioLevel())] += 1; + if (file.second->alignment_size() > 0) { + // +1 header block + auto num_blocks = file.second->live_blocks() + 1; + num_hole_punchable_blocks += file.second->hole_punchable_blocks(); + live_blob_file_size += num_blocks * file.second->alignment_size(); + } else { + live_blob_file_size += file.second->file_size(); + } } } @@ -214,6 +267,8 @@ void BlobStorage::UpdateStats() { SetStats(stats_, cf_id_, static_cast(i), ratio_levels[i]); } + SetStats(stats_, cf_id_, TitanInternalStats::NUM_HOLE_PUNCHABLE_BLOCKS, + num_hole_punchable_blocks); } void BlobStorage::ComputeGCScore() { UpdateStats(); @@ -223,6 +278,7 @@ void BlobStorage::ComputeGCScore() { MutexLock l(&mutex_); gc_score_.clear(); + punch_hole_score_.clear(); for (auto& file : files_) { if (file.second->is_obsolete()) { @@ -239,16 +295,32 @@ void BlobStorage::ComputeGCScore() { } else { score = file.second->GetDiscardableRatio(); } - gc_score_.emplace_back(GCScore{ - .file_number = file.first, - .score = score, - }); + if (score < cf_options_.blob_file_discardable_ratio && + cf_options_.enable_punch_hole_gc) { + auto punch_hole_score = file.second->GetPunchHoleScore(); + if (punch_hole_score > 0) { + GCScore gc_score = {}; + punch_hole_score_.emplace_back(GCScore{ + .file_number = file.first, + .score = punch_hole_score, + }); + } + } else if (score >= cf_options_.blob_file_discardable_ratio) { + gc_score_.emplace_back(GCScore{ + .file_number = file.first, + .score = score, + }); + } } std::sort(gc_score_.begin(), gc_score_.end(), [](const GCScore& first, const GCScore& second) { return first.score > second.score; }); + std::sort(punch_hole_score_.begin(), punch_hole_score_.end(), + [](const GCScore& first, const GCScore& second) { + return first.score > second.score; + }); } } // namespace titandb diff --git a/src/blob_storage.h b/src/blob_storage.h index 8231f57c9..dcdbe08fd 100644 --- a/src/blob_storage.h +++ b/src/blob_storage.h @@ -58,11 +58,18 @@ class BlobStorage { return _cf_options; } + // Only files with gc score larger than blob_file_discardable_ratio will be + // returned. const std::vector gc_score() { MutexLock l(&mutex_); return gc_score_; } + const std::vector punch_hole_score() { + MutexLock l(&mutex_); + return punch_hole_score_; + } + // Gets the blob record pointed by the blob index. The provided // buffer is used to store the record data, so the buffer must be // valid when the record is used. @@ -119,6 +126,8 @@ class BlobStorage { // Add a new blob file to this blob storage. void AddBlobFile(std::shared_ptr& file); + void HolePunchBlobFile(std::shared_ptr& file); + // Gets all obsolete blob files whose obsolete_sequence is smaller than the // oldest_sequence. Note that the files returned would be erased from internal // structure, so for the next call, the files returned before wouldn't be @@ -208,6 +217,7 @@ class BlobStorage { std::shared_ptr file_cache_; std::vector gc_score_; + std::vector punch_hole_score_; std::list> obsolete_files_; // It is marked when the column family handle is destroyed, indicating the diff --git a/src/db_impl.cc b/src/db_impl.cc index ff40c95ce..9d045fc5f 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -17,6 +17,7 @@ #include "util/threadpool_imp.h" #include "base_db_listener.h" +#include "blob_aligned_blocks_collector.h" #include "blob_file_builder.h" #include "blob_file_iterator.h" #include "blob_file_size_collector.h" @@ -122,6 +123,30 @@ class TitanDBImpl::FileManager : public BlobFileManager { return s; } + Status BatchUpdateFiles( + const std::vector>& files) override { + // Since files are being in-place updated, it has to make sure that the + // BlobFileMeta are not modified by compactions or activities other than + // punch hole GC, between the time BlobFileMeta are + db_->mutex_.AssertHeld(); + Status s = Status::OK(); + VersionEdit edit; + for (const auto& file : files) { + TITAN_LOG_INFO(db_->db_options_.info_log, + "Titan updating blob file [%" PRIu64 + "] live blocks: %" PRIu64 + ", hole punchable blocks :%" PRIu64 ".", + file->file_number(), file->live_blocks(), + file->hole_punchable_blocks()); + edit.UpdateBlobFile(file); + } + s = db_->blob_file_set_->LogAndApply(edit); + if (!s.ok()) { + db_->SetBGError(s); + } + return s; + } + private: class FileHandle : public BlobFileHandle { public: @@ -284,6 +309,8 @@ Status TitanDBImpl::OpenImpl(const std::vector& descs, cf_opts.disable_auto_compactions = true; cf_opts.table_properties_collector_factories.emplace_back( std::make_shared()); + cf_opts.table_properties_collector_factories.emplace_back( + std::make_shared()); titan_table_factories.push_back(std::make_shared( db_options_, desc.options, blob_manager_, &mutex_, blob_file_set_.get(), stats_.get())); @@ -460,6 +487,8 @@ Status TitanDBImpl::CreateColumnFamilies( options.table_factory = titan_table_factory.back(); options.table_properties_collector_factories.emplace_back( std::make_shared()); + options.table_properties_collector_factories.emplace_back( + std::make_shared()); if (options.compaction_filter != nullptr || options.compaction_filter_factory != nullptr) { std::shared_ptr titan_cf_factory = @@ -817,6 +846,24 @@ void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // TODO: // We can record here whether the oldest snapshot is released. // If not, we can just skip the next round of purging obsolete files. + { + MutexLock l(&mutex_); + if (scheduled_punch_hole_gc_ != nullptr && !punch_hole_gc_running_ && + scheduled_punch_hole_gc_->snapshot()->GetSequenceNumber() == + GetOldestSnapshotSequence() && + bg_gc_scheduled_ < db_options_.max_background_gc) { + if (db_options_.disable_background_gc) return; + + if (!initialized_.load(std::memory_order_acquire)) return; + + if (shuting_down_.load(std::memory_order_acquire)) return; + + TITAN_LOG_INFO(db_options_.info_log, + "Titan schedule punch hole GC after releasing snapshot"); + bg_gc_scheduled_++; + thread_pool_->SubmitJob(std::bind(&TitanDBImpl::BGWorkGC, this)); + } + } db_->ReleaseSnapshot(snapshot); } @@ -956,9 +1003,11 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, auto cf_id = column_family->GetID(); std::map blob_file_size_diff; + std::map hole_punchable_blocks_diff; for (auto& prop : props) { Status gc_stats_status = ExtractGCStatsFromTableProperty( - prop.second, false /*to_add*/, &blob_file_size_diff); + prop.second, false /*to_add*/, &blob_file_size_diff, + &hole_punchable_blocks_diff); if (!gc_stats_status.ok()) { // TODO: Should treat it as background error and make DB read-only. TITAN_LOG_ERROR(db_options_.info_log, @@ -967,6 +1016,7 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, assert(false); } } + bool has_hole_punchable_blocks_diff = !hole_punchable_blocks_diff.empty(); // Here could be a running compaction install a new version after obtain // current and before we call DeleteFilesInRange for the base DB. In this case @@ -993,12 +1043,16 @@ Status TitanDBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, for (const auto& file_size : blob_file_size_diff) { uint64_t file_number = file_size.first; int64_t delta = file_size.second; + int64_t hole_punchable_blocks_delta = + has_hole_punchable_blocks_diff ? hole_punchable_blocks_diff[file_number] + : 0; auto file = bs->FindFile(file_number).lock(); if (!file || file->is_obsolete()) { // file has been gc out continue; } file->UpdateLiveDataSize(delta); + file->UpdateHolePunchableBlocks(hole_punchable_blocks_delta); if (file->file_state() == BlobFileMeta::FileState::kPendingInit) { // When uninitialized, only update the live data size. continue; @@ -1255,8 +1309,10 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin1"); TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Begin"); std::map blob_file_size_diff; + std::map hole_punchable_blocks_diff; Status s = ExtractGCStatsFromTableProperty( - flush_job_info.table_properties, true /*to_add*/, &blob_file_size_diff); + flush_job_info.table_properties, true /*to_add*/, &blob_file_size_diff, + &hole_punchable_blocks_diff); if (!s.ok()) { // TODO: Should treat it as background error and make DB read-only. TITAN_LOG_ERROR(db_options_.info_log, @@ -1331,6 +1387,7 @@ void TitanDBImpl::OnCompactionCompleted( return; } std::map blob_file_size_diff; + std::map hole_punchable_blocks_diff; const TablePropertiesCollection& prop_collection = compaction_job_info.table_properties; auto update_diff = [&](const std::vector& files, bool to_add) { @@ -1344,7 +1401,8 @@ void TitanDBImpl::OnCompactionCompleted( continue; } Status gc_stats_status = ExtractGCStatsFromTableProperty( - prop_iter->second, to_add, &blob_file_size_diff); + prop_iter->second, to_add, &blob_file_size_diff, + &hole_punchable_blocks_diff); if (!gc_stats_status.ok()) { // TODO: Should treat it as background error and make DB read-only. TITAN_LOG_ERROR(db_options_.info_log, @@ -1378,10 +1436,33 @@ void TitanDBImpl::OnCompactionCompleted( bool count_sorted_run = cf_options.level_merge && cf_options.range_merge && cf_options.num_levels - 1 == compaction_job_info.output_level; + bool has_live_blocks_diff = !hole_punchable_blocks_diff.empty(); + if (has_live_blocks_diff) { + TITAN_LOG_INFO(db_options_.info_log, + "OnCompactionCompleted[%d]: blob_file_size_diff.size=%zu, " + "hole_punchable_blocks_diff.size=%zu", + compaction_job_info.job_id, blob_file_size_diff.size(), + hole_punchable_blocks_diff.size()); + assert(hole_punchable_blocks_diff.size() == blob_file_size_diff.size()); + std::string debug; + for (const auto& file_diff : hole_punchable_blocks_diff) { + debug += "[" + std::to_string(file_diff.first) + ":" + + std::to_string(file_diff.second) + "]"; + } + TITAN_LOG_INFO(db_options_.info_log, + "OnCompactionCompleted[%d]: hole_punchable_blocks_diff=%s", + compaction_job_info.job_id, debug.c_str()); + } else { + TITAN_LOG_INFO(db_options_.info_log, + "OnCompactionCompleted[%d]: blob_file_size_diff.size=%zu", + compaction_job_info.job_id, blob_file_size_diff.size()); + } for (const auto& file_diff : blob_file_size_diff) { uint64_t file_number = file_diff.first; int64_t delta = file_diff.second; + int64_t hole_punchable_blocks_delta = + has_live_blocks_diff ? hole_punchable_blocks_diff[file_number] : 0; std::shared_ptr file = bs->FindFile(file_number).lock(); if (file == nullptr || file->is_obsolete()) { // File has been GC out. @@ -1391,6 +1472,7 @@ void TitanDBImpl::OnCompactionCompleted( if (file->file_state() == BlobFileMeta::FileState::kPendingInit) { // When uninitialized, only update the live data size. file->UpdateLiveDataSize(delta); + file->UpdateHolePunchableBlocks(hole_punchable_blocks_delta); continue; } @@ -1400,15 +1482,16 @@ void TitanDBImpl::OnCompactionCompleted( // there is a later compaction trigger by the new generated SST, the // later `OnCompactionCompleted()` maybe called before the previous // events' `OnFlushCompleted()`/`OnCompactionCompleted()` is called. - // In this case, the state of the blob file generated by the + // In this case, the state of the blob file generated by the previous // flush/compaction is still `kPendingLSM`, while the blob file size // delta is for the later compaction event, and it is possible that // delta is negative. // If the delta is positive, it means the blob file is the output of - // the compaction and the live data size is already in table builder. - // So here only update live data size when negative. + // the original flush/compaction and the live data size is already set + // by table builder. So here only update live data size when negative. if (delta < 0) { file->UpdateLiveDataSize(delta); + file->UpdateHolePunchableBlocks(hole_punchable_blocks_delta); } file->FileStateTransit(BlobFileMeta::FileEvent::kCompactionCompleted); if (file->NoLiveData()) { @@ -1434,6 +1517,7 @@ void TitanDBImpl::OnCompactionCompleted( compaction_job_info.job_id, file_number); } file->UpdateLiveDataSize(delta); + file->UpdateHolePunchableBlocks(hole_punchable_blocks_delta); if (cf_options.level_merge) { // After level merge, most entries of merged blob files are written // to new blob files. Delete blob files which have no live data. diff --git a/src/db_impl.h b/src/db_impl.h index 1a8bddbe6..2616e2ed8 100644 --- a/src/db_impl.h +++ b/src/db_impl.h @@ -260,16 +260,17 @@ class TitanDBImpl : public TitanDB { Status ExtractGCStatsFromTableProperty( const std::shared_ptr& table_properties, - bool to_add, std::map* blob_file_size_diff); + bool to_add, std::map* blob_file_size_diff, + std::map* blob_live_blocks_diff); Status ExtractGCStatsFromTableProperty( const TableProperties& table_properties, bool to_add, - std::map* blob_file_size_diff); + std::map* blob_file_size_diff, + std::map* blob_live_blocks_diff); // REQUIRE: mutex_ held void AddToGCQueue(uint32_t column_family_id) { mutex_.AssertHeld(); - unscheduled_gc_++; gc_queue_.push_back(column_family_id); } @@ -277,9 +278,9 @@ class TitanDBImpl : public TitanDB { // REQUIRE: mutex_ held uint32_t PopFirstFromGCQueue() { assert(!gc_queue_.empty()); - auto column_family_id = *gc_queue_.begin(); + auto cf_id = *gc_queue_.begin(); gc_queue_.pop_front(); - return column_family_id; + return cf_id; } // REQUIRE: mutex_ held @@ -287,7 +288,7 @@ class TitanDBImpl : public TitanDB { static void BGWorkGC(void* db); void BackgroundCallGC(); - Status BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id); + Status BackgroundGC(LogBuffer* log_buffer, BlobGC* blob_gc); void PurgeObsoleteFiles(); Status PurgeObsoleteFilesImpl(); @@ -378,13 +379,20 @@ class TitanDBImpl : public TitanDB { // pending_gc_ hold column families that already on gc_queue_. std::deque gc_queue_; + // REQUIRE: mutex_ held. + // This is not a queue, since punch hole GC is only runnable when its owned + // snapshot is the oldest one. So we can't really multi-thread it. + std::unique_ptr scheduled_punch_hole_gc_; + // REQUIRE: mutex_ held. + // Indicates whether the scheduled punch hole GC is running, in case multiple + // threads are trying to work on the same job at the same time. + bool punch_hole_gc_running_ = false; + // REQUIRE: mutex_ held. int bg_gc_scheduled_ = 0; // REQUIRE: mutex_ held. int bg_gc_running_ = 0; // REQUIRE: mutex_ held. - int unscheduled_gc_ = 0; - // REQUIRE: mutex_ held. int drop_cf_requests_ = 0; // PurgeObsoleteFiles, DisableFileDeletions and EnableFileDeletions block diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index 0a5e6bf26..ca462e8e8 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -1,5 +1,6 @@ #include "test_util/sync_point.h" +#include "blob_aligned_blocks_collector.h" #include "blob_file_iterator.h" #include "blob_file_size_collector.h" #include "blob_gc_job.h" @@ -14,19 +15,22 @@ namespace titandb { Status TitanDBImpl::ExtractGCStatsFromTableProperty( const std::shared_ptr& table_properties, bool to_add, - std::map* blob_file_size_diff) { + std::map* blob_file_size_diff, + std::map* hole_punchable_blocks_diff) { assert(blob_file_size_diff != nullptr); if (table_properties == nullptr) { // No table property found. File may not contain blob indices. return Status::OK(); } return ExtractGCStatsFromTableProperty(*table_properties.get(), to_add, - blob_file_size_diff); + blob_file_size_diff, + hole_punchable_blocks_diff); } Status TitanDBImpl::ExtractGCStatsFromTableProperty( const TableProperties& table_properties, bool to_add, - std::map* blob_file_size_diff) { + std::map* blob_file_size_diff, + std::map* hole_punchable_blocks_diff) { assert(blob_file_size_diff != nullptr); auto& prop = table_properties.user_collected_properties; auto prop_iter = prop.find(BlobFileSizeCollector::kPropertiesName); @@ -47,6 +51,26 @@ Status TitanDBImpl::ExtractGCStatsFromTableProperty( } (*blob_file_size_diff)[file_number] += diff; } + // We need to extract hole punchable blocks from the table property + // iff we are removing the file. + prop_iter = prop.find(BlobAlignedBlocksCollector::kPropertiesName); + if (prop_iter != prop.end()) { + Slice hole_punchable_blocks_prop_slice(prop_iter->second); + std::map hole_punchable_blocks; + if (!BlobAlignedBlocksCollector::Decode(&hole_punchable_blocks_prop_slice, + &hole_punchable_blocks)) { + return Status::Corruption("Failed to decode blob live blocks property."); + } + for (const auto& hole_punchable_block : hole_punchable_blocks) { + uint64_t file_number = hole_punchable_block.first; + int64_t diff = static_cast(hole_punchable_block.second); + if (to_add) { + // Add means some blocks are not hole punchable. + diff = -diff; + } + (*hole_punchable_blocks_diff)[file_number] += diff; + } + } return Status::OK(); } @@ -112,9 +136,15 @@ Status TitanDBImpl::AsyncInitializeGC( } std::map blob_file_size_diff; + std::map + hole_punchable_blocks_diffs; // Not used, this is not required while + // initializing GC. The initial state of + // punch hole GC is determined by + // BlobFileMeta (in MANIFEST). for (auto& file : collection) { s = ExtractGCStatsFromTableProperty(file.second, true /*to_add*/, - &blob_file_size_diff); + &blob_file_size_diff, + &hole_punchable_blocks_diffs); if (!s.ok()) { MutexLock l(&mutex_); this->SetBGError(s); @@ -135,6 +165,18 @@ Status TitanDBImpl::AsyncInitializeGC( file->UpdateLiveDataSize(file_size.second); } } + for (auto& file_blocks : hole_punchable_blocks_diffs) { + assert(file_blocks.second < 0); + std::shared_ptr file = + blob_storage->FindFile(file_blocks.first).lock(); + if (file != nullptr) { + if (uint64_t(-file_blocks.second) <= file->live_blocks()) { + uint64_t hole_punchable_blocks_diff = + file->live_blocks() + file_blocks.second; + file->UpdateHolePunchableBlocks(hole_punchable_blocks_diff); + } + } + } blob_storage->InitializeAllFiles(); TITAN_LOG_INFO(db_options_.info_log, "Titan finish async GC initialization on cf [%s]", @@ -172,9 +214,9 @@ void TitanDBImpl::MaybeScheduleGC() { if (shuting_down_.load(std::memory_order_acquire)) return; - while (unscheduled_gc_ > 0 && + while (!gc_queue_.empty() && bg_gc_scheduled_ < db_options_.max_background_gc) { - unscheduled_gc_--; + TITAN_LOG_DEBUG(db_options_.info_log, "Titan schedule GC"); bg_gc_scheduled_++; thread_pool_->SubmitJob(std::bind(&TitanDBImpl::BGWorkGC, this)); } @@ -185,6 +227,11 @@ void TitanDBImpl::BGWorkGC(void* db) { } void TitanDBImpl::BackgroundCallGC() { + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan background GC thread start, is punch hole gc running " + "%d, has punch hole gc scheduled %s", + punch_hole_gc_running_, + scheduled_punch_hole_gc_ != nullptr ? "true" : "false"); TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning"); { MutexLock l(&mutex_); @@ -195,19 +242,114 @@ void TitanDBImpl::BackgroundCallGC() { bg_gc_running_++; TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC"); - if (!gc_queue_.empty()) { - uint32_t column_family_id = PopFirstFromGCQueue(); - LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, - db_options_.info_log.get()); - BackgroundGC(&log_buffer, column_family_id); - { - mutex_.Unlock(); - log_buffer.FlushBufferToLog(); - LogFlush(db_options_.info_log.get()); - mutex_.Lock(); + bool run_punch_hole_gc = false; + if (scheduled_punch_hole_gc_ != nullptr && !punch_hole_gc_running_) { + if (blob_file_set_->IsColumnFamilyObsolete( + scheduled_punch_hole_gc_->cf_id())) { + TITAN_LOG_INFO( + db_options_.info_log, "GC skip dropped colum family [%s].", + cf_info_[scheduled_punch_hole_gc_->cf_id()].name.c_str()); + scheduled_punch_hole_gc_->ReleaseGcFiles(); + scheduled_punch_hole_gc_->ReleaseSnapshot(db_); + scheduled_punch_hole_gc_.reset(); + } else if (scheduled_punch_hole_gc_->snapshot()->GetSequenceNumber() == + GetOldestSnapshotSequence()) { + TEST_SYNC_POINT( + "TitanDBImpl::BackgroundCallGC:BeforeRunScheduledPunchHoleGC"); + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan start scheduled punch hole GC"); + std::unique_ptr blob_gc = std::move(scheduled_punch_hole_gc_); + auto cfh = db_impl_->GetColumnFamilyHandleUnlocked(blob_gc->cf_id()); + blob_gc->SetColumnFamily(cfh.get()); + punch_hole_gc_running_ = true; + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + db_options_.info_log.get()); + BackgroundGC(&log_buffer, blob_gc.get()); + punch_hole_gc_running_ = false; + run_punch_hole_gc = true; + TEST_SYNC_POINT( + "TitanDBImpl::BackgroundCallGC:AfterRunScheduledPunchHoleGC"); + { + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(db_options_.info_log.get()); + mutex_.Lock(); + } + } else { + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan skip scheduled punch hole GC due to not holding " + "the oldest snapshot"); + } + } + if (!run_punch_hole_gc && !gc_queue_.empty()) { + // If there is no scheduled punch hole gc, do normal gc. + uint32_t cf_id; + bool found_non_obsolete_cf = false; + while (!gc_queue_.empty()) { + cf_id = PopFirstFromGCQueue(); + if (blob_file_set_->IsColumnFamilyObsolete(cf_id)) { + TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", + nullptr); + TITAN_LOG_INFO(db_options_.info_log, + "GC skip dropped colum family [%s].", + cf_info_[cf_id].name.c_str()); + } else { + found_non_obsolete_cf = true; + break; + } + } + if (found_non_obsolete_cf) { + std::shared_ptr blob_storage = + blob_file_set_->GetBlobStorage(cf_id).lock(); + if (blob_storage != nullptr) { + const auto& cf_options = blob_storage->cf_options(); + std::shared_ptr blob_gc_picker = + std::make_shared(db_options_, cf_options, + cf_id, stats_.get()); + auto blob_gc = blob_gc_picker->PickBlobGC( + blob_storage.get(), + !punch_hole_gc_running_ && scheduled_punch_hole_gc_ == nullptr); + if (blob_gc != nullptr) { + assert(!blob_gc->use_punch_hole() || !punch_hole_gc_running_); + if (blob_gc->use_punch_hole()) { + auto snapshot = db_->GetSnapshot(); + blob_gc->SetSnapshot(snapshot); + } + if (blob_gc->use_punch_hole() && + blob_gc->snapshot()->GetSequenceNumber() > + GetOldestSnapshotSequence()) { + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan queue punch hole GC"); + assert(scheduled_punch_hole_gc_ == nullptr); + scheduled_punch_hole_gc_ = std::move(blob_gc); + } else { + if (blob_gc->use_punch_hole()) { + punch_hole_gc_running_ = true; + } + auto cfh = db_impl_->GetColumnFamilyHandleUnlocked(cf_id); + blob_gc->SetColumnFamily(cfh.get()); + TITAN_LOG_DEBUG(db_options_.info_log, "Titan start GC directly"); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + db_options_.info_log.get()); + BackgroundGC(&log_buffer, blob_gc.get()); + if (blob_gc->use_punch_hole()) { + punch_hole_gc_running_ = false; + } + { + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(db_options_.info_log.get()); + mutex_.Lock(); + } + } + } else { + TITAN_LOG_DEBUG(db_options_.info_log, "Titan GC nothing to do"); + } + } } } + TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:AfterGCRunning"); bg_gc_running_--; bg_gc_scheduled_--; MaybeScheduleGC(); @@ -218,43 +360,16 @@ void TitanDBImpl::BackgroundCallGC() { // waiting for it. bg_cv_.SignalAll(); } - // IMPORTANT: there should be no code after calling SignalAll. This call may - // signal the DB destructor that it's OK to proceed with destruction. In - // that case, all DB variables will be deallocated and referencing them + // IMPORTANT: there should be no code after calling SignalAll. This call + // may signal the DB destructor that it's OK to proceed with destruction. + // In that case, all DB variables will be deallocated and referencing them // will cause trouble. } } -Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, - uint32_t column_family_id) { +Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, BlobGC* blob_gc) { mutex_.AssertHeld(); - std::unique_ptr blob_gc; - std::unique_ptr cfh; - - std::shared_ptr blob_storage; - // Skip CFs that have been dropped. - if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) { - blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock(); - } else { - TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr); - TITAN_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].", - cf_info_[column_family_id].name.c_str()); - } - if (blob_storage != nullptr) { - const auto& cf_options = blob_storage->cf_options(); - std::shared_ptr blob_gc_picker = - std::make_shared(db_options_, cf_options, - stats_.get()); - blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get()); - - if (blob_gc) { - cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - assert(column_family_id == cfh->GetID()); - blob_gc->SetColumnFamily(cfh.get()); - } - } - Status s; // TODO(@DorianZheng) Make sure enough room for GC if (UNLIKELY(!blob_gc)) { @@ -262,9 +377,11 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, // Nothing to do TITAN_LOG_BUFFER(log_buffer, "Titan GC nothing to do"); } else { + TITAN_LOG_BUFFER(log_buffer, "Titan GC start, using punch hole: %s", + blob_gc->use_punch_hole() ? "true" : "false"); StopWatch gc_sw(env_->GetSystemClock().get(), statistics(stats_.get()), TITAN_GC_MICROS); - BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, + BlobGCJob blob_gc_job(blob_gc, db_, &mutex_, db_options_, env_, env_options_, blob_manager_.get(), blob_file_set_.get(), log_buffer, &shuting_down_, stats_.get()); @@ -278,7 +395,11 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, } if (s.ok()) { s = blob_gc_job.Finish(); + } else { + TITAN_LOG_ERROR(db_options_.info_log, "Titan GC error: %s", + s.ToString().c_str()); } + blob_gc->ReleaseSnapshot(db_); blob_gc->ReleaseGcFiles(); if (blob_gc->trigger_next() && @@ -319,7 +440,38 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) { bg_gc_running_++; bg_gc_scheduled_++; - s = BackgroundGC(&log_buffer, column_family_id); + std::unique_ptr cfh; + std::unique_ptr blob_gc; + + std::shared_ptr blob_storage; + // Skip CFs that have been dropped. + if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) { + blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock(); + } else { + TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr); + TITAN_LOG_INFO(db_options_.info_log, "GC skip dropped colum family [%s].", + cf_info_[column_family_id].name.c_str()); + } + if (blob_storage != nullptr) { + const auto& cf_options = blob_storage->cf_options(); + std::shared_ptr blob_gc_picker = + std::make_shared(db_options_, cf_options, + column_family_id, stats_.get()); + blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get(), + !punch_hole_gc_running_); + if (blob_gc != nullptr) { + assert(!blob_gc->use_punch_hole() || !punch_hole_gc_running_); + cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); + blob_gc->SetColumnFamily(cfh.get()); + if (blob_gc->use_punch_hole() && + blob_gc->snapshot()->GetSequenceNumber() > + GetOldestSnapshotSequence()) { + scheduled_punch_hole_gc_ = std::move(blob_gc); + } else { + s = BackgroundGC(&log_buffer, blob_gc.get()); + } + } + } { mutex_.Unlock(); diff --git a/src/edit_collector.h b/src/edit_collector.h index e05f8dac5..56bcdc0c1 100644 --- a/src/edit_collector.h +++ b/src/edit_collector.h @@ -43,6 +43,10 @@ class EditCollector { status_ = collector.DeleteFile(file.first, file.second); if (!status_.ok()) return status_; } + for (auto& file : edit.updated_files_) { + status_ = collector.UpdateFile(file); + if (!status_.ok()) return status_; + } if (edit.has_next_file_number_) { if (edit.next_file_number_ < next_file_number_) { @@ -164,6 +168,24 @@ class EditCollector { return Status::OK(); } + Status UpdateFile(const std::shared_ptr& file) { + auto number = file->file_number(); + if (deleted_files_.count(number) > 0) { + TITAN_LOG_ERROR(info_log_, + "blob file %" PRIu64 " has been deleted before\n", + number); + if (paranoid_check_) { + return Status::Corruption("Blob file " + ToString(number) + + " has been deleted before"); + } + } + if (updated_files_.count(number) > 0) { + assert(updated_files_[number].get() == file.get()); + } + + return Status::OK(); + } + Status Seal(BlobStorage* storage) { for (auto& file : added_files_) { auto number = file.first; @@ -208,6 +230,25 @@ class EditCollector { } } } + for (auto& file : updated_files_) { + auto number = file.first; + auto blob = storage->FindFile(number).lock(); + if (!blob) { + TITAN_LOG_ERROR(storage->db_options().info_log, + "blob file %" PRIu64 " doesn't exist before\n", + number); + return Status::Corruption("Blob file " + ToString(number) + + " doesn't exist before"); + } else if (blob->is_obsolete()) { + TITAN_LOG_ERROR(storage->db_options().info_log, + "blob file %" PRIu64 " has been deleted already\n", + number); + if (paranoid_check_) { + return Status::Corruption("Blob file " + ToString(number) + + " has been deleted already"); + } + } + } return Status::OK(); } @@ -221,12 +262,22 @@ class EditCollector { storage->AddBlobFile(file.second); } + for (auto& file : updated_files_) { + if (deleted_files_.count(file.first) > 0) { + continue; + } + storage->HolePunchBlobFile(file.second); + } + for (auto& file : deleted_files_) { auto number = file.first; // just skip paired added and deleted files if (added_files_.count(number) > 0) { continue; } + if (updated_files_.count(number) > 0) { + continue; + } if (!storage->MarkFileObsolete(number, file.second)) { return Status::NotFound("Invalid file number " + std::to_string(number)); @@ -260,6 +311,11 @@ class EditCollector { file.second); } } + for (auto& file : updated_files_) { + if (deleted_files_.count(file.first) == 0) { + file.second->Dump(with_keys); + } + } } private: @@ -267,6 +323,7 @@ class EditCollector { Logger* info_log_{nullptr}; std::unordered_map> added_files_; std::unordered_map deleted_files_; + std::unordered_map> updated_files_; }; Status status_{Status::OK()}; diff --git a/src/options.cc b/src/options.cc index 97e2a202b..5d4ce490e 100644 --- a/src/options.cc +++ b/src/options.cc @@ -43,7 +43,8 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts, merge_small_file_threshold(immutable_opts.merge_small_file_threshold), blob_run_mode(mutable_opts.blob_run_mode), skip_value_in_compaction_filter( - immutable_opts.skip_value_in_compaction_filter) {} + immutable_opts.skip_value_in_compaction_filter), + enable_punch_hole_gc(immutable_opts.enable_punch_hole_gc) {} void TitanCFOptions::Dump(Logger* logger) const { TITAN_LOG_HEADER(logger, @@ -94,6 +95,8 @@ void TitanCFOptions::Dump(Logger* logger) const { } TITAN_LOG_HEADER(logger, "TitanCFOptions.blob_run_mode : %s", blob_run_mode_str.c_str()); + TITAN_LOG_HEADER(logger, "TitanCFOptions.enable_punch_hole_gc : %s", + enable_punch_hole_gc ? "true" : "false"); } void TitanCFOptions::UpdateMutableOptions( diff --git a/src/table_builder.cc b/src/table_builder.cc index 9b3b7812a..afe25da8a 100644 --- a/src/table_builder.cc +++ b/src/table_builder.cc @@ -100,7 +100,9 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) { gc_num_keys_relocated_++; gc_bytes_relocated_ += record.value.size(); AddBlob(ikey, record.value); - if (ok()) return; + if (ok()) { + return; + } } else { ++error_read_cnt_; TITAN_LOG_DEBUG(db_options_.info_log, @@ -232,13 +234,11 @@ void TitanTableBuilder::FinishBlobFile() { AddBlobResultsToBase(contexts); if (s.ok() && ok()) { - TITAN_LOG_INFO(db_options_.info_log, - "Titan table builder finish output file %" PRIu64 ".", - blob_handle_->GetNumber()); std::shared_ptr file = std::make_shared( blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize(), blob_builder_->NumEntries(), target_level_, - blob_builder_->GetSmallestKey(), blob_builder_->GetLargestKey()); + blob_builder_->GetSmallestKey(), blob_builder_->GetLargestKey(), + blob_builder_->alignment_size(), blob_builder_->live_blocks()); file->set_live_data_size(blob_builder_->live_data_size()); file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput); finished_blobs_.push_back({file, std::move(blob_handle_)}); diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index f1ceb0b1b..db0176ffd 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -643,7 +643,7 @@ TEST_F(TitanDBTest, NewColumnFamilyHasBlobFileSizeCollector) { Open(); AddCF("new_cf"); Options opt = db_->GetOptions(cf_handles_.back()); - ASSERT_EQ(1, opt.table_properties_collector_factories.size()); + ASSERT_EQ(2, opt.table_properties_collector_factories.size()); std::unique_ptr prop_collector_factory( new BlobFileSizeCollectorFactory()); ASSERT_EQ(std::string(prop_collector_factory->Name()), @@ -1327,7 +1327,7 @@ TEST_F(TitanDBTest, GCAfterDropCF) { SyncPoint::GetInstance()->LoadDependency( {{"TitanDBTest::GCAfterDropCF:AfterDropCF", "TitanDBImpl::BackgroundCallGC:BeforeGCRunning"}, - {"TitanDBImpl::BackgroundGC:Finish", + {"TitanDBImpl::BackgroundCallGC:AfterGCRunning", "TitanDBTest::GCAfterDropCF:WaitGC"}}); SyncPoint::GetInstance()->SetCallBack( "TitanDBImpl::BackgroundGC:CFDropped", @@ -2122,6 +2122,7 @@ TEST_F(TitanDBTest, OnlineChangeMinBlobSize) { } TEST_F(TitanDBTest, OnlineChangeCompressionType) { +#ifdef LZ4 const uint64_t kNumKeys = 100; std::map data; Open(); @@ -2183,6 +2184,7 @@ TEST_F(TitanDBTest, OnlineChangeCompressionType) { ASSERT_GT(first_blob_file_size, pair.second.lock()->file_size()); } } +#endif } TEST_F(TitanDBTest, OnlineChangeBlobFileDiscardableRatio) { diff --git a/src/titan_stats.cc b/src/titan_stats.cc index 973bcf9f2..0817109bc 100644 --- a/src/titan_stats.cc +++ b/src/titan_stats.cc @@ -38,6 +38,8 @@ static const std::string num_discardable_ratio_le80_file = "num-discardable-ratio-le80-file"; static const std::string num_discardable_ratio_le100_file = "num-discardable-ratio-le100-file"; +static const std::string num_hole_punchable_blocks = + "num-hole-punchable-blocks"; const std::string TitanDB::Properties::kNumBlobFilesAtLevelPrefix = titandb_prefix + num_blob_files_at_level_prefix; @@ -61,6 +63,8 @@ const std::string TitanDB::Properties::kNumDiscardableRatioLE80File = titandb_prefix + num_discardable_ratio_le80_file; const std::string TitanDB::Properties::kNumDiscardableRatioLE100File = titandb_prefix + num_discardable_ratio_le100_file; +const std::string TitanDB::Properties::kNumHolePunchableBlocks = + titandb_prefix + num_hole_punchable_blocks; const std::unordered_map< std::string, std::function> @@ -106,6 +110,10 @@ const std::unordered_map< std::bind(&TitanInternalStats::HandleStatsValue, std::placeholders::_1, TitanInternalStats::NUM_DISCARDABLE_RATIO_LE100, std::placeholders::_2)}, + {TitanDB::Properties::kNumHolePunchableBlocks, + std::bind(&TitanInternalStats::HandleStatsValue, std::placeholders::_1, + TitanInternalStats::NUM_HOLE_PUNCHABLE_BLOCKS, + std::placeholders::_2)}, }; const std::arrayEncodeTo(dst); } for (auto& file : deleted_files_) { // obsolete sequence is a inpersistent field, so no need to encode it. PutVarint32Varint64(dst, kDeletedBlobFile, file.first); } + for (auto& file : updated_files_) { + PutVarint32(dst, kUpdatedBlobFile); + file->EncodeTo(dst); + } } Status VersionEdit::DecodeFrom(Slice* src) { @@ -59,6 +63,15 @@ Status VersionEdit::DecodeFrom(Slice* src) { } break; case kAddedBlobFileV2: + blob_file = std::make_shared(); + s = blob_file->DecodeFromV2(src); + if (s.ok()) { + AddBlobFile(blob_file); + } else { + error = s.ToString().c_str(); + } + break; + case kAddedBlobFileV3: blob_file = std::make_shared(); s = blob_file->DecodeFrom(src); if (s.ok()) { @@ -74,6 +87,15 @@ Status VersionEdit::DecodeFrom(Slice* src) { error = "deleted blob file"; } break; + case kUpdatedBlobFile: + blob_file = std::make_shared(); + s = blob_file->DecodeFrom(src); + if (s.ok()) { + UpdateBlobFile(blob_file); + } else { + error = s.ToString().c_str(); + } + break; default: error = "unknown tag"; break; @@ -125,6 +147,12 @@ void VersionEdit::Dump(bool with_keys) const { file.second); } } + if (!updated_files_.empty()) { + fprintf(stdout, "update files:\n"); + for (auto& file : updated_files_) { + file->Dump(with_keys); + } + } } } // namespace titandb diff --git a/src/version_edit.h b/src/version_edit.h index b9bc4024a..15260cf84 100644 --- a/src/version_edit.h +++ b/src/version_edit.h @@ -18,6 +18,8 @@ enum Tag { kDeletedBlobFile = 12, // Deprecated, leave here for backward compatibility kAddedBlobFileV2 = 13, // Comparing to kAddedBlobFile, it newly includes // smallest_key and largest_key of blob file + kAddedBlobFileV3 = 14, // Add live blocks and dead blocks info + kUpdatedBlobFile = 15, // Update hole punched blob file meta }; class VersionEdit { @@ -37,6 +39,10 @@ class VersionEdit { deleted_files_.emplace_back(std::make_pair(file_number, obsolete_sequence)); } + void UpdateBlobFile(std::shared_ptr meta) { + updated_files_.push_back(meta); + } + void EncodeTo(std::string* dst) const; Status DecodeFrom(Slice* src); @@ -55,6 +61,7 @@ class VersionEdit { std::vector> added_files_; std::vector> deleted_files_; + std::vector> updated_files_; }; } // namespace titandb