From b1a05003a8e5aa21499e710ce16e160aeae49f74 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Thu, 8 Dec 2022 23:10:20 +0000 Subject: [PATCH 1/3] libsnapshot: Use two threads to run compression Compression is a hot function in the install path. Use two threads for compression. By default, number of thread is set to 1. If the property, "ro.virtual_ab.compression.threads" is true, the number of threads is increased to 2. OTA install time (without post-install) on Pixel 6 Pro with 2 threads: Without-this-patch With-this-patch Full OTA: 23 Minutes 17 Minutes Bug: 254188450 Test: Full/Incremental OTA on Pixel Change-Id: I4a11dca3a5ebfe11dcc7f0d882332d491f2d7933 Signed-off-by: Akilesh Kailash --- .../include/libsnapshot/cow_writer.h | 51 ++++- .../libsnapshot_cow/cow_compress.cpp | 116 +++++++++++- .../libsnapshot_cow/cow_writer.cpp | 174 ++++++++++++++---- 3 files changed, 305 insertions(+), 36 deletions(-) diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h index a9682a175..291bebb42 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h @@ -16,10 +16,17 @@ #include +#include #include +#include #include +#include #include +#include #include +#include +#include +#include #include #include @@ -42,6 +49,9 @@ struct CowOptions { // Preset the number of merged ops. Only useful for testing. uint64_t num_merge_ops = 0; + + // Number of threads for compression + int num_compress_threads = 0; }; // Interface for writing to a snapuserd COW. All operations are ordered; merges @@ -100,9 +110,40 @@ class ICowWriter { CowOptions options_; }; +class CompressWorker { + public: + CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size); + bool RunThread(); + void EnqueueCompressBlocks(const void* buffer, size_t num_blocks); + bool GetCompressedBuffers(std::vector>* compressed_buf); + void Finalize(); + + private: + struct CompressWork { + const void* buffer; + size_t num_blocks; + bool compression_status = false; + std::vector> compressed_data; + }; + + CowCompressionAlgorithm compression_; + uint32_t block_size_; + + std::queue work_queue_; + std::queue compressed_queue_; + std::mutex lock_; + std::condition_variable cv_; + bool stopped_ = false; + + std::basic_string Compress(const void* data, size_t length); + bool CompressBlocks(const void* buffer, size_t num_blocks, + std::vector>* compressed_data); +}; + class CowWriter : public ICowWriter { public: explicit CowWriter(const CowOptions& options); + ~CowWriter(); // Set up the writer. // The file starts from the beginning. @@ -138,6 +179,7 @@ class CowWriter : public ICowWriter { bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, uint8_t type); void SetupHeaders(); + void SetupWriteOptions(); bool ParseOptions(); bool OpenForWrite(); bool OpenForAppend(uint64_t label); @@ -145,9 +187,10 @@ class CowWriter : public ICowWriter { bool WriteRawData(const void* data, size_t size); bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0); void AddOperation(const CowOperation& op); - std::basic_string Compress(const void* data, size_t length); void InitPos(); + void InitWorkers(); + bool CompressBlocks(size_t num_blocks, const void* data); bool SetFd(android::base::borrowed_fd fd); bool Sync(); bool Truncate(off_t length); @@ -168,6 +211,12 @@ class CowWriter : public ICowWriter { bool merge_in_progress_ = false; bool is_block_device_ = false; uint64_t cow_image_size_ = INT64_MAX; + + int num_compress_threads_ = 1; + std::vector> compress_threads_; + std::vector> threads_; + std::vector> compressed_buf_; + std::vector>::iterator buf_iter_; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp index 0eb231bdf..4d9b74837 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp @@ -33,7 +33,7 @@ namespace android { namespace snapshot { -std::basic_string CowWriter::Compress(const void* data, size_t length) { +std::basic_string CompressWorker::Compress(const void* data, size_t length) { switch (compression_) { case kCowCompressGz: { const auto bound = compressBound(length); @@ -100,5 +100,119 @@ std::basic_string CowWriter::Compress(const void* data, size_t length) return {}; } +bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, + std::vector>* compressed_data) { + const uint8_t* iter = reinterpret_cast(buffer); + while (num_blocks) { + auto data = Compress(iter, block_size_); + if (data.empty()) { + PLOG(ERROR) << "CompressBlocks: Compression failed"; + return false; + } + if (data.size() > std::numeric_limits::max()) { + LOG(ERROR) << "Compressed block is too large: " << data.size(); + return false; + } + + compressed_data->emplace_back(std::move(data)); + num_blocks -= 1; + iter += block_size_; + } + return true; +} + +bool CompressWorker::RunThread() { + while (true) { + // Wait for work + CompressWork blocks; + { + std::unique_lock lock(lock_); + while (work_queue_.empty() && !stopped_) { + cv_.wait(lock); + } + + if (stopped_) { + return true; + } + + blocks = std::move(work_queue_.front()); + work_queue_.pop(); + } + + // Compress blocks + bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data); + blocks.compression_status = ret; + { + std::lock_guard lock(lock_); + compressed_queue_.push(std::move(blocks)); + } + + // Notify completion + cv_.notify_all(); + + if (!ret) { + LOG(ERROR) << "CompressBlocks failed"; + return false; + } + } + + return true; +} + +void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) { + { + std::lock_guard lock(lock_); + + CompressWork blocks = {}; + blocks.buffer = buffer; + blocks.num_blocks = num_blocks; + work_queue_.push(std::move(blocks)); + } + cv_.notify_all(); +} + +bool CompressWorker::GetCompressedBuffers(std::vector>* compressed_buf) { + { + std::unique_lock lock(lock_); + while (compressed_queue_.empty() && !stopped_) { + cv_.wait(lock); + } + + if (stopped_) { + return true; + } + } + + { + std::lock_guard lock(lock_); + while (compressed_queue_.size() > 0) { + CompressWork blocks = std::move(compressed_queue_.front()); + compressed_queue_.pop(); + + if (blocks.compression_status) { + compressed_buf->insert(compressed_buf->end(), + std::make_move_iterator(blocks.compressed_data.begin()), + std::make_move_iterator(blocks.compressed_data.end())); + } else { + LOG(ERROR) << "Block compression failed"; + return false; + } + } + } + + return true; +} + +void CompressWorker::Finalize() { + { + std::unique_lock lock(lock_); + stopped_ = true; + } + cv_.notify_all(); +} + +CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size) + : compression_(compression), block_size_(block_size) {} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp index 43c17a634..ba6e7a0d5 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -132,6 +133,39 @@ bool ICowWriter::ValidateNewBlock(uint64_t new_block) { CowWriter::CowWriter(const CowOptions& options) : ICowWriter(options), fd_(-1) { SetupHeaders(); + SetupWriteOptions(); +} + +CowWriter::~CowWriter() { + for (size_t i = 0; i < compress_threads_.size(); i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (worker) { + worker->Finalize(); + } + } + + bool ret = true; + for (auto& t : threads_) { + ret = t.get() && ret; + } + + if (!ret) { + LOG(ERROR) << "Compression failed"; + } + compress_threads_.clear(); +} + +void CowWriter::SetupWriteOptions() { + num_compress_threads_ = options_.num_compress_threads; + + if (!num_compress_threads_) { + num_compress_threads_ = 1; + // We prefer not to have more than two threads as the overhead of additional + // threads is far greater than cutting down compression time. + if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) { + num_compress_threads_ = 2; + } + } } void CowWriter::SetupHeaders() { @@ -206,6 +240,14 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) { return true; } +void CowWriter::InitWorkers() { + for (int i = 0; i < num_compress_threads_; i++) { + auto wt = std::make_unique(compression_, header_.block_size); + threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); + compress_threads_.push_back(std::move(wt)); + } +} + bool CowWriter::Initialize(unique_fd&& fd) { owned_fd_ = std::move(fd); return Initialize(borrowed_fd{owned_fd_}); @@ -216,7 +258,13 @@ bool CowWriter::Initialize(borrowed_fd fd) { return false; } - return OpenForWrite(); + bool ret = OpenForWrite(); + + if (ret) { + InitWorkers(); + } + + return ret; } bool CowWriter::InitializeAppend(android::base::unique_fd&& fd, uint64_t label) { @@ -229,7 +277,13 @@ bool CowWriter::InitializeAppend(android::base::borrowed_fd fd, uint64_t label) return false; } - return OpenForAppend(label); + bool ret = OpenForAppend(label); + + if (ret && !compress_threads_.size()) { + InitWorkers(); + } + + return ret; } void CowWriter::InitPos() { @@ -348,47 +402,99 @@ bool CowWriter::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp); } +bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) { + size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_; + size_t num_blocks_per_thread = num_blocks / num_threads; + const uint8_t* iter = reinterpret_cast(data); + compressed_buf_.clear(); + + // Submit the blocks per thread. The retrieval of + // compressed buffers has to be done in the same order. + // We should not poll for completed buffers in a different order as the + // buffers are tightly coupled with block ordering. + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (i == num_threads - 1) { + num_blocks_per_thread = num_blocks; + } + worker->EnqueueCompressBlocks(iter, num_blocks_per_thread); + iter += (num_blocks_per_thread * header_.block_size); + num_blocks -= num_blocks_per_thread; + } + + for (size_t i = 0; i < num_threads; i++) { + CompressWorker* worker = compress_threads_[i].get(); + if (!worker->GetCompressedBuffers(&compressed_buf_)) { + return false; + } + } + + return true; +} + bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, uint16_t offset, uint8_t type) { - const uint8_t* iter = reinterpret_cast(data); CHECK(!merge_in_progress_); - for (size_t i = 0; i < size / header_.block_size; i++) { - CowOperation op = {}; - op.new_block = new_block_start + i; - op.type = type; - if (type == kCowXorOp) { - op.source = (old_block + i) * header_.block_size + offset; - } else { - op.source = next_data_pos_; - } + const uint8_t* iter = reinterpret_cast(data); + + // Update engine can potentially send 100MB of blocks at a time. We + // don't want to process all those blocks in one shot as it can + // stress the memory. Hence, process the blocks in chunks. + // + // 1024 blocks is reasonable given we will end up using max + // memory of ~4MB. + const size_t kProcessingBlocks = 1024; + size_t num_blocks = (size / header_.block_size); + size_t i = 0; + + while (num_blocks) { + size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks)); if (compression_) { - auto data = Compress(iter, header_.block_size); - if (data.empty()) { - PLOG(ERROR) << "AddRawBlocks: compression failed"; - return false; - } - if (data.size() > std::numeric_limits::max()) { - LOG(ERROR) << "Compressed block is too large: " << data.size() << " bytes"; - return false; - } - op.compression = compression_; - op.data_length = static_cast(data.size()); - - if (!WriteOperation(op, data.data(), data.size())) { - PLOG(ERROR) << "AddRawBlocks: write failed, bytes requested: " << size - << ", bytes written: " << i * header_.block_size; - return false; - } - } else { - op.data_length = static_cast(header_.block_size); - if (!WriteOperation(op, iter, header_.block_size)) { - PLOG(ERROR) << "AddRawBlocks: write failed"; + if (!CompressBlocks(pending_blocks, iter)) { return false; } + buf_iter_ = compressed_buf_.begin(); + CHECK(pending_blocks == compressed_buf_.size()); + iter += (pending_blocks * header_.block_size); } - iter += header_.block_size; + num_blocks -= pending_blocks; + + while (i < size / header_.block_size && pending_blocks) { + CowOperation op = {}; + op.new_block = new_block_start + i; + op.type = type; + if (type == kCowXorOp) { + op.source = (old_block + i) * header_.block_size + offset; + } else { + op.source = next_data_pos_; + } + + if (compression_) { + auto data = std::move(*buf_iter_); + op.compression = compression_; + op.data_length = static_cast(data.size()); + + if (!WriteOperation(op, data.data(), data.size())) { + PLOG(ERROR) << "AddRawBlocks: write failed"; + return false; + } + buf_iter_++; + } else { + op.data_length = static_cast(header_.block_size); + if (!WriteOperation(op, iter, header_.block_size)) { + PLOG(ERROR) << "AddRawBlocks: write failed"; + return false; + } + iter += header_.block_size; + } + + i += 1; + pending_blocks -= 1; + } + + CHECK(pending_blocks == 0); } return true; } From 4bc81b409b3d7b49949c0faca7dd61241fdbbdba Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Sun, 27 Nov 2022 01:00:30 -0800 Subject: [PATCH 2/3] libsnapshot: Batch write COW operations in a cluster COW operations are written in cluster. All the COW ops and the COW data in this cluster are contiguous. Hence, batch these writes and write them in one syscall. Writes are done when the cluster is full or when label ops are written. OTA install time (without post-install) on Pixel 6 Pro: Without-this-patch With-this-patch Full OTA: 17 Minutes 13 Minutes Following are the OTA install times with both the optimization. viz - batch writes + 2 threads for compression. OTA install (without post-install) on Pixel 6 Pro. All numbers are in minutes. Full ota - 2.2G Compression Without-this-patch With-this-patch ========================================================= gz 23 13 lz4 13 7 none 13 7 Incremental OTA - 376M Compression Without-this-patch With-this-patch ========================================================= gz 22 16 lz4 14 11 none 15 11 Bug: 254188450 Test: Full / Incremental OTA on Pixel Change-Id: Ie3aba1ff28a6569d25a766377efab6cbe78d9277 Signed-off-by: Akilesh Kailash --- .../include/libsnapshot/cow_writer.h | 17 +++ .../libsnapshot_cow/cow_writer.cpp | 122 ++++++++++++++++-- 2 files changed, 131 insertions(+), 8 deletions(-) diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h index 291bebb42..798bc7345 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h @@ -52,6 +52,9 @@ struct CowOptions { // Number of threads for compression int num_compress_threads = 0; + + // Batch write cluster ops + bool batch_write = false; }; // Interface for writing to a snapuserd COW. All operations are ordered; merges @@ -188,7 +191,9 @@ class CowWriter : public ICowWriter { bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0); void AddOperation(const CowOperation& op); void InitPos(); + void InitBatchWrites(); void InitWorkers(); + bool FlushCluster(); bool CompressBlocks(size_t num_blocks, const void* data); bool SetFd(android::base::borrowed_fd fd); @@ -202,8 +207,11 @@ class CowWriter : public ICowWriter { CowHeader header_{}; CowFooter footer_{}; CowCompressionAlgorithm compression_ = kCowCompressNone; + uint64_t current_op_pos_ = 0; uint64_t next_op_pos_ = 0; uint64_t next_data_pos_ = 0; + uint64_t current_data_pos_ = 0; + ssize_t total_data_written_ = 0; uint32_t cluster_size_ = 0; uint32_t current_cluster_size_ = 0; uint64_t current_data_size_ = 0; @@ -217,6 +225,15 @@ class CowWriter : public ICowWriter { std::vector> threads_; std::vector> compressed_buf_; std::vector>::iterator buf_iter_; + + std::vector> opbuffer_vec_; + std::vector> databuffer_vec_; + std::unique_ptr cowop_vec_; + int op_vec_index_ = 0; + + std::unique_ptr data_vec_; + int data_vec_index_ = 0; + bool batch_write_ = false; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp index ba6e7a0d5..2d5e4bc36 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp @@ -15,6 +15,7 @@ // #include +#include #include #include @@ -162,10 +163,17 @@ void CowWriter::SetupWriteOptions() { num_compress_threads_ = 1; // We prefer not to have more than two threads as the overhead of additional // threads is far greater than cutting down compression time. - if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) { + if (header_.cluster_ops && + android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) { num_compress_threads_ = 2; } } + + if (header_.cluster_ops && + (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) || + options_.batch_write)) { + batch_write_ = true; + } } void CowWriter::SetupHeaders() { @@ -240,12 +248,40 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) { return true; } +void CowWriter::InitBatchWrites() { + if (batch_write_) { + cowop_vec_ = std::make_unique(header_.cluster_ops); + data_vec_ = std::make_unique(header_.cluster_ops); + struct iovec* cowop_ptr = cowop_vec_.get(); + struct iovec* data_ptr = data_vec_.get(); + for (size_t i = 0; i < header_.cluster_ops; i++) { + std::unique_ptr op = std::make_unique(); + cowop_ptr[i].iov_base = op.get(); + cowop_ptr[i].iov_len = sizeof(CowOperation); + opbuffer_vec_.push_back(std::move(op)); + + std::unique_ptr buffer = std::make_unique(header_.block_size * 2); + data_ptr[i].iov_base = buffer.get(); + data_ptr[i].iov_len = header_.block_size * 2; + databuffer_vec_.push_back(std::move(buffer)); + } + + current_op_pos_ = next_op_pos_; + current_data_pos_ = next_data_pos_; + } + + std::string batch_write = batch_write_ ? "enabled" : "disabled"; + LOG(INFO) << "Batch writes: " << batch_write; +} + void CowWriter::InitWorkers() { for (int i = 0; i < num_compress_threads_; i++) { auto wt = std::make_unique(compression_, header_.block_size); threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get())); compress_threads_.push_back(std::move(wt)); } + + LOG(INFO) << num_compress_threads_ << " thread used for compression"; } bool CowWriter::Initialize(unique_fd&& fd) { @@ -341,6 +377,7 @@ bool CowWriter::OpenForWrite() { } InitPos(); + InitBatchWrites(); return true; } @@ -374,6 +411,9 @@ bool CowWriter::OpenForAppend(uint64_t label) { PLOG(ERROR) << "lseek failed"; return false; } + + InitBatchWrites(); + return EmitClusterIfNeeded(); } @@ -522,7 +562,7 @@ bool CowWriter::EmitLabel(uint64_t label) { bool CowWriter::EmitSequenceData(size_t num_ops, const uint32_t* data) { CHECK(!merge_in_progress_); size_t to_add = 0; - size_t max_ops = std::numeric_limits::max() / sizeof(uint32_t); + size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t); while (num_ops > 0) { CowOperation op = {}; op.type = kCowSequenceOp; @@ -567,6 +607,11 @@ static void SHA256(const void*, size_t, uint8_t[]) { } bool CowWriter::Finalize() { + if (!FlushCluster()) { + LOG(ERROR) << "Finalize: FlushCluster() failed"; + return false; + } + auto continue_cluster_size = current_cluster_size_; auto continue_data_size = current_data_size_; auto continue_data_pos = next_data_pos_; @@ -631,6 +676,9 @@ bool CowWriter::Finalize() { next_op_pos_ = continue_op_pos; footer_.op.num_ops = continue_num_ops; } + + FlushCluster(); + return Sync(); } @@ -662,6 +710,35 @@ bool CowWriter::EnsureSpaceAvailable(const uint64_t bytes_needed) const { return true; } +bool CowWriter::FlushCluster() { + ssize_t ret; + + if (op_vec_index_) { + ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_); + if (ret != (op_vec_index_ * sizeof(CowOperation))) { + PLOG(ERROR) << "pwritev failed for CowOperation. Expected: " + << (op_vec_index_ * sizeof(CowOperation)); + return false; + } + } + + if (data_vec_index_) { + ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_); + if (ret != total_data_written_) { + PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_; + return false; + } + } + + total_data_written_ = 0; + op_vec_index_ = 0; + data_vec_index_ = 0; + current_op_pos_ = next_op_pos_; + current_data_pos_ = next_data_pos_; + + return true; +} + bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t size) { if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op))) { return false; @@ -670,14 +747,43 @@ bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t return false; } - if (!android::base::WriteFullyAtOffset(fd_, reinterpret_cast(&op), sizeof(op), - next_op_pos_)) { - return false; - } - if (data != nullptr && size > 0) { - if (!WriteRawData(data, size)) return false; + if (batch_write_) { + CowOperation* cow_op = reinterpret_cast(cowop_vec_[op_vec_index_].iov_base); + std::memcpy(cow_op, &op, sizeof(CowOperation)); + op_vec_index_ += 1; + + if (data != nullptr && size > 0) { + struct iovec* data_ptr = data_vec_.get(); + std::memcpy(data_ptr[data_vec_index_].iov_base, data, size); + data_ptr[data_vec_index_].iov_len = size; + data_vec_index_ += 1; + total_data_written_ += size; + } + } else { + if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) { + PLOG(ERROR) << "lseek failed for writing operation."; + return false; + } + if (!android::base::WriteFully(fd_, reinterpret_cast(&op), sizeof(op))) { + return false; + } + if (data != nullptr && size > 0) { + if (!WriteRawData(data, size)) return false; + } } + AddOperation(op); + + if (batch_write_) { + if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops || + op.type == kCowLabelOp || op.type == kCowClusterOp) { + if (!FlushCluster()) { + LOG(ERROR) << "Failed to flush cluster data"; + return false; + } + } + } + return EmitClusterIfNeeded(); } From c4a5576fdf8f1e17e9b07d41eba671561653ce02 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Thu, 8 Dec 2022 07:45:51 +0000 Subject: [PATCH 3/3] libsnapshot: Test batch writes and threaded compression Test all compression algorithms. Bug: 254188450 Test: cow_api_test Change-Id: I977e631402eb2dfaa76205f5d8cb955e6d3bddbb Signed-off-by: Akilesh Kailash --- .../libsnapshot_cow/cow_api_test.cpp | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp index 2c1187fd2..862ce5511 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp @@ -298,6 +298,150 @@ TEST_F(CowTest, CompressGz) { ASSERT_TRUE(iter->Done()); } +class CompressionRWTest : public CowTest, public testing::WithParamInterface {}; + +TEST_P(CompressionRWTest, ThreadedBatchWrites) { + CowOptions options; + options.compression = GetParam(); + options.num_compress_threads = 2; + + CowWriter writer(options); + + ASSERT_TRUE(writer.Initialize(cow_->fd)); + + std::string xor_data = "This is test data-1. Testing xor"; + xor_data.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10)); + + std::string data = "This is test data-2. Testing replace ops"; + data.resize(options.block_size * 2048, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size())); + + std::string data2 = "This is test data-3. Testing replace ops"; + data2.resize(options.block_size * 259, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size())); + + std::string data3 = "This is test data-4. Testing replace ops"; + data3.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size())); + + ASSERT_TRUE(writer.Finalize()); + + int expected_blocks = (1 + 2048 + 259 + 1); + ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0); + + CowReader reader; + ASSERT_TRUE(reader.Parse(cow_->fd)); + + auto iter = reader.GetOpIter(); + ASSERT_NE(iter, nullptr); + + int total_blocks = 0; + while (!iter->Done()) { + auto op = &iter->Get(); + + if (op->type == kCowXorOp) { + total_blocks += 1; + StringSink sink; + ASSERT_EQ(op->new_block, 50); + ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10 + ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_EQ(sink.stream(), xor_data); + } + + if (op->type == kCowReplaceOp) { + total_blocks += 1; + if (op->new_block == 100) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data.resize(options.block_size); + ASSERT_EQ(sink.stream(), data); + } + if (op->new_block == 6000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data2.resize(options.block_size); + ASSERT_EQ(sink.stream(), data2); + } + if (op->new_block == 9000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_EQ(sink.stream(), data3); + } + } + + iter->Next(); + } + + ASSERT_EQ(total_blocks, expected_blocks); +} + +TEST_P(CompressionRWTest, NoBatchWrites) { + CowOptions options; + options.compression = GetParam(); + options.num_compress_threads = 1; + options.cluster_ops = 0; + + CowWriter writer(options); + + ASSERT_TRUE(writer.Initialize(cow_->fd)); + + std::string data = "Testing replace ops without batch writes"; + data.resize(options.block_size * 1024, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size())); + + std::string data2 = "Testing odd blocks without batch writes"; + data2.resize(options.block_size * 111, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(3000, data2.data(), data2.size())); + + std::string data3 = "Testing single 4k block"; + data3.resize(options.block_size, '\0'); + ASSERT_TRUE(writer.AddRawBlocks(5000, data3.data(), data3.size())); + + ASSERT_TRUE(writer.Finalize()); + + int expected_blocks = (1024 + 111 + 1); + ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0); + + CowReader reader; + ASSERT_TRUE(reader.Parse(cow_->fd)); + + auto iter = reader.GetOpIter(); + ASSERT_NE(iter, nullptr); + + int total_blocks = 0; + while (!iter->Done()) { + auto op = &iter->Get(); + + if (op->type == kCowReplaceOp) { + total_blocks += 1; + if (op->new_block == 50) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data.resize(options.block_size); + ASSERT_EQ(sink.stream(), data); + } + if (op->new_block == 3000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + data2.resize(options.block_size); + ASSERT_EQ(sink.stream(), data2); + } + if (op->new_block == 5000) { + StringSink sink; + ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_EQ(sink.stream(), data3); + } + } + + iter->Next(); + } + + ASSERT_EQ(total_blocks, expected_blocks); +} + +INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4")); + TEST_F(CowTest, ClusterCompressGz) { CowOptions options; options.compression = "gz";