diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp index e5b561b68..a9b96e2ef 100644 --- a/fs_mgr/libsnapshot/snapuserd/Android.bp +++ b/fs_mgr/libsnapshot/snapuserd/Android.bp @@ -64,12 +64,13 @@ cc_library_static { "dm-snapshot-merge/snapuserd_readahead.cpp", "snapuserd_buffer.cpp", "user-space-merge/handler_manager.cpp", + "user-space-merge/read_worker.cpp", "user-space-merge/snapuserd_core.cpp", - "user-space-merge/snapuserd_dm_user.cpp", "user-space-merge/snapuserd_merge.cpp", "user-space-merge/snapuserd_readahead.cpp", "user-space-merge/snapuserd_transitions.cpp", "user-space-merge/snapuserd_verify.cpp", + "user-space-merge/worker.cpp", ], static_libs: [ "libbase", diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp index bdba5c0cc..4105b4b48 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp @@ -18,7 +18,9 @@ #include +#include "read_worker.h" #include "snapuserd_core.h" +#include "snapuserd_merge.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp similarity index 88% rename from fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp rename to fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp index 2b9d14e9c..dd2996b78 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "read_worker.h" + #include "snapuserd_core.h" namespace android { @@ -23,59 +25,24 @@ using namespace android; using namespace android::dm; using android::base::unique_fd; -Worker::Worker(const std::string& cow_device, const std::string& backing_device, - const std::string& control_device, const std::string& misc_name, - const std::string& base_path_merge, std::shared_ptr snapuserd) { - cow_device_ = cow_device; - backing_store_device_ = backing_device; - control_device_ = control_device; - misc_name_ = misc_name; - base_path_merge_ = base_path_merge; - snapuserd_ = snapuserd; +void ReadWorker::CloseFds() { + ctrl_fd_ = {}; + backing_store_fd_ = {}; + Worker::CloseFds(); } -bool Worker::InitializeFds() { - backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY)); - if (backing_store_fd_ < 0) { - SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_; - return false; - } - - cow_fd_.reset(open(cow_device_.c_str(), O_RDWR)); - if (cow_fd_ < 0) { - SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_; - return false; - } - - ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR)); - if (ctrl_fd_ < 0) { - SNAP_PLOG(ERROR) << "Unable to open " << control_device_; - return false; - } - - // Base device used by merge thread - base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR)); - if (base_path_merge_fd_ < 0) { - SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_; - return false; - } - - return true; -} - -bool Worker::InitReader() { - reader_ = snapuserd_->CloneReaderForWorker(); - - if (!reader_->InitForMerge(std::move(cow_fd_))) { - return false; - } - return true; -} +ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device, + const std::string& control_device, const std::string& misc_name, + const std::string& base_path_merge, + std::shared_ptr snapuserd) + : Worker(cow_device, misc_name, base_path_merge, snapuserd), + backing_store_device_(backing_device), + control_device_(control_device) {} // Start the replace operation. This will read the // internal COW format and if the block is compressed, // it will be de-compressed. -bool Worker::ProcessReplaceOp(const CowOperation* cow_op) { +bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (!buffer) { SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer"; @@ -88,7 +55,7 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) { return true; } -bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) { +bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (buffer == nullptr) { SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer"; @@ -118,7 +85,7 @@ bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) { // Start the copy operation. This will read the backing // block device which is represented by cow_op->source. -bool Worker::ProcessCopyOp(const CowOperation* cow_op) { +bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) { if (!ReadFromSourceDevice(cow_op)) { return false; } @@ -126,7 +93,7 @@ bool Worker::ProcessCopyOp(const CowOperation* cow_op) { return true; } -bool Worker::ProcessXorOp(const CowOperation* cow_op) { +bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) { if (!ReadFromSourceDevice(cow_op)) { return false; } @@ -153,7 +120,7 @@ bool Worker::ProcessXorOp(const CowOperation* cow_op) { return true; } -bool Worker::ProcessZeroOp() { +bool ReadWorker::ProcessZeroOp() { // Zero out the entire block void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (buffer == nullptr) { @@ -165,7 +132,7 @@ bool Worker::ProcessZeroOp() { return true; } -bool Worker::ProcessOrderedOp(const CowOperation* cow_op) { +bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) { void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); if (buffer == nullptr) { SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer"; @@ -218,7 +185,7 @@ bool Worker::ProcessOrderedOp(const CowOperation* cow_op) { return false; } -bool Worker::ProcessCowOp(const CowOperation* cow_op) { +bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) { if (cow_op == nullptr) { SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op"; return false; @@ -246,31 +213,28 @@ bool Worker::ProcessCowOp(const CowOperation* cow_op) { return false; } -void Worker::InitializeBufsink() { - // Allocate the buffer which is used to communicate between - // daemon and dm-user. The buffer comprises of header and a fixed payload. - // If the dm-user requests a big IO, the IO will be broken into chunks - // of PAYLOAD_BUFFER_SZ. - size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ; - bufsink_.Initialize(buf_size); -} +bool ReadWorker::Init() { + if (!Worker::Init()) { + return false; + } + + backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY)); + if (backing_store_fd_ < 0) { + SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_; + return false; + } + + ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR)); + if (ctrl_fd_ < 0) { + SNAP_PLOG(ERROR) << "Unable to open " << control_device_; + return false; + } -bool Worker::Init() { - InitializeBufsink(); xorsink_.Initialize(&bufsink_, BLOCK_SZ); - - if (!InitializeFds()) { - return false; - } - - if (!InitReader()) { - return false; - } - return true; } -bool Worker::RunThread() { +bool ReadWorker::Run() { SNAP_LOG(INFO) << "Processing snapshot I/O requests...."; if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) { @@ -291,7 +255,7 @@ bool Worker::RunThread() { } // Send the payload/data back to dm-user misc device. -bool Worker::WriteDmUserPayload(size_t size) { +bool ReadWorker::WriteDmUserPayload(size_t size) { size_t payload_size = size; void* buf = bufsink_.GetPayloadBufPtr(); if (header_response_) { @@ -310,7 +274,7 @@ bool Worker::WriteDmUserPayload(size_t size) { return true; } -bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) { +bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) { CHECK(read_size <= BLOCK_SZ); void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); @@ -329,7 +293,7 @@ bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) { return true; } -bool Worker::ReadAlignedSector(sector_t sector, size_t sz) { +bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) { size_t remaining_size = sz; std::vector>& chunk_vec = snapuserd_->GetChunkVec(); int ret = 0; @@ -389,7 +353,7 @@ bool Worker::ReadAlignedSector(sector_t sector, size_t sz) { return true; } -int Worker::ReadUnalignedSector( +int ReadWorker::ReadUnalignedSector( sector_t sector, size_t size, std::vector>::iterator& it) { size_t skip_sector_size = 0; @@ -424,7 +388,7 @@ int Worker::ReadUnalignedSector( return std::min(size, (BLOCK_SZ - skip_sector_size)); } -bool Worker::ReadUnalignedSector(sector_t sector, size_t size) { +bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) { bufsink_.ResetBufferOffset(); std::vector>& chunk_vec = snapuserd_->GetChunkVec(); @@ -563,7 +527,7 @@ bool Worker::ReadUnalignedSector(sector_t sector, size_t size) { return true; } -void Worker::RespondIOError() { +void ReadWorker::RespondIOError() { struct dm_user_header* header = bufsink_.GetHeaderPtr(); header->type = DM_USER_RESP_ERROR; // This is an issue with the dm-user interface. There @@ -580,7 +544,7 @@ void Worker::RespondIOError() { WriteDmUserPayload(0); } -bool Worker::DmuserReadRequest() { +bool ReadWorker::DmuserReadRequest() { struct dm_user_header* header = bufsink_.GetHeaderPtr(); // Unaligned I/O request @@ -591,7 +555,7 @@ bool Worker::DmuserReadRequest() { return ReadAlignedSector(header->sector, header->len); } -bool Worker::ProcessIORequest() { +bool ReadWorker::ProcessIORequest() { // Read Header from dm-user misc device. This gives // us the sector number for which IO is issued by dm-snapshot device struct dm_user_header* header = bufsink_.GetHeaderPtr(); diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h new file mode 100644 index 000000000..c3a4c346e --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h @@ -0,0 +1,70 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "worker.h" + +namespace android { +namespace snapshot { + +class ReadWorker : public Worker { + public: + ReadWorker(const std::string& cow_device, const std::string& backing_device, + const std::string& control_device, const std::string& misc_name, + const std::string& base_path_merge, std::shared_ptr snapuserd); + + bool Run(); + bool Init() override; + void CloseFds() override; + + private: + // Functions interacting with dm-user + bool ProcessIORequest(); + bool WriteDmUserPayload(size_t size); + bool DmuserReadRequest(); + void RespondIOError(); + + bool ProcessCowOp(const CowOperation* cow_op); + bool ProcessXorOp(const CowOperation* cow_op); + bool ProcessOrderedOp(const CowOperation* cow_op); + bool ProcessCopyOp(const CowOperation* cow_op); + bool ProcessReplaceOp(const CowOperation* cow_op); + bool ProcessZeroOp(); + + bool ReadAlignedSector(sector_t sector, size_t sz); + bool ReadUnalignedSector(sector_t sector, size_t size); + int ReadUnalignedSector(sector_t sector, size_t size, + std::vector>::iterator& it); + bool ReadFromSourceDevice(const CowOperation* cow_op); + bool ReadDataFromBaseDevice(sector_t sector, size_t read_size); + + constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); } + constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } + + std::string backing_store_device_; + unique_fd backing_store_fd_; + + std::string control_device_; + unique_fd ctrl_fd_; + + XorSink xorsink_; + bool header_response_ = false; +}; + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index 8e1212b7b..e52d752a0 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -23,6 +23,9 @@ #include #include +#include "read_worker.h" +#include "snapuserd_merge.h" + namespace android { namespace snapshot { @@ -46,9 +49,8 @@ SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device, bool SnapshotHandler::InitializeWorkers() { for (int i = 0; i < num_worker_threads_; i++) { - std::unique_ptr wt = - std::make_unique(cow_device_, backing_store_device_, control_device_, - misc_name_, base_path_merge_, GetSharedPtr()); + auto wt = std::make_unique(cow_device_, backing_store_device_, control_device_, + misc_name_, base_path_merge_, GetSharedPtr()); if (!wt->Init()) { SNAP_LOG(ERROR) << "Thread initialization failed"; return false; @@ -57,8 +59,8 @@ bool SnapshotHandler::InitializeWorkers() { worker_threads_.push_back(std::move(wt)); } - merge_thread_ = std::make_unique(cow_device_, backing_store_device_, control_device_, - misc_name_, base_path_merge_, GetSharedPtr()); + merge_thread_ = std::make_unique(cow_device_, misc_name_, base_path_merge_, + GetSharedPtr()); read_ahead_thread_ = std::make_unique(cow_device_, backing_store_device_, misc_name_, GetSharedPtr()); @@ -312,11 +314,11 @@ bool SnapshotHandler::Start() { // Launch worker threads for (int i = 0; i < worker_threads_.size(); i++) { threads.emplace_back( - std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get())); + std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get())); } std::future merge_thread = - std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get()); + std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get()); // Now that the worker threads are up, scan the partitions. if (perform_verification_) { @@ -452,5 +454,11 @@ bool SnapshotHandler::CheckPartitionVerification() { return update_verify_->CheckPartitionVerification(); } +void SnapshotHandler::FreeResources() { + worker_threads_.clear(); + read_ahead_thread_ = nullptr; + merge_thread_ = nullptr; +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index c984a61f8..cdc38c016 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -75,7 +75,8 @@ enum class MERGE_IO_TRANSITION { READ_AHEAD_FAILURE, }; -class SnapshotHandler; +class MergeWorker; +class ReadWorker; enum class MERGE_GROUP_STATE { GROUP_MERGE_PENDING, @@ -98,102 +99,6 @@ struct MergeGroupState { : merge_state_(state), num_ios_in_progress(n_ios) {} }; -class Worker { - public: - Worker(const std::string& cow_device, const std::string& backing_device, - const std::string& control_device, const std::string& misc_name, - const std::string& base_path_merge, std::shared_ptr snapuserd); - bool RunThread(); - bool RunMergeThread(); - bool Init(); - - private: - // Initialization - void InitializeBufsink(); - bool InitializeFds(); - bool InitReader(); - void CloseFds() { - ctrl_fd_ = {}; - backing_store_fd_ = {}; - base_path_merge_fd_ = {}; - } - - // Functions interacting with dm-user - bool WriteDmUserPayload(size_t size); - bool DmuserReadRequest(); - - // IO Path - bool ProcessIORequest(); - bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); } - - bool ReadDataFromBaseDevice(sector_t sector, size_t read_size); - bool ReadFromSourceDevice(const CowOperation* cow_op); - - bool ReadAlignedSector(sector_t sector, size_t sz); - bool ReadUnalignedSector(sector_t sector, size_t size); - int ReadUnalignedSector(sector_t sector, size_t size, - std::vector>::iterator& it); - void RespondIOError(); - - // Processing COW operations - bool ProcessCowOp(const CowOperation* cow_op); - bool ProcessReplaceOp(const CowOperation* cow_op); - bool ProcessZeroOp(); - - // Handles Copy and Xor - bool ProcessCopyOp(const CowOperation* cow_op); - bool ProcessXorOp(const CowOperation* cow_op); - bool ProcessOrderedOp(const CowOperation* cow_op); - - // Merge related ops - bool Merge(); - bool AsyncMerge(); - bool SyncMerge(); - bool MergeOrderedOps(); - bool MergeOrderedOpsAsync(); - bool MergeReplaceZeroOps(); - int PrepareMerge(uint64_t* source_offset, int* pending_ops, - std::vector* replace_zero_vec = nullptr); - - sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } - chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } - - bool InitializeIouring(); - void FinalizeIouring(); - - std::unique_ptr reader_; - BufferSink bufsink_; - XorSink xorsink_; - - std::string cow_device_; - std::string backing_store_device_; - std::string control_device_; - std::string misc_name_; - std::string base_path_merge_; - - unique_fd cow_fd_; - unique_fd backing_store_fd_; - unique_fd base_path_merge_fd_; - unique_fd ctrl_fd_; - bool header_response_ = false; - - std::unique_ptr cowop_iter_; - size_t ra_block_index_ = 0; - uint64_t blocks_merged_in_group_ = 0; - bool merge_async_ = false; - // Queue depth of 8 seems optimal. We don't want - // to have a huge depth as it may put more memory pressure - // on the kernel worker threads given that we use - // IOSQE_ASYNC flag - ASYNC flags can potentially - // result in EINTR; Since we don't restart - // syscalls and fallback to synchronous I/O, we - // don't want huge queue depth - int queue_depth_ = 8; - std::unique_ptr ring_; - - std::shared_ptr snapuserd_; -}; - class SnapshotHandler : public std::enable_shared_from_this { public: SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device, @@ -212,11 +117,7 @@ class SnapshotHandler : public std::enable_shared_from_this { bool CommitMerge(int num_merge_ops); void CloseFds() { cow_fd_ = {}; } - void FreeResources() { - worker_threads_.clear(); - read_ahead_thread_ = nullptr; - merge_thread_ = nullptr; - } + void FreeResources(); bool InitializeWorkers(); std::unique_ptr CloneReaderForWorker(); @@ -315,7 +216,7 @@ class SnapshotHandler : public std::enable_shared_from_this { void* mapped_addr_; size_t total_mapped_addr_length_; - std::vector> worker_threads_; + std::vector> worker_threads_; // Read-ahead related bool populate_data_from_cow_ = false; bool ra_thread_ = false; @@ -330,7 +231,7 @@ class SnapshotHandler : public std::enable_shared_from_this { // Merge Block state std::vector> merge_blk_state_; - std::unique_ptr merge_thread_; + std::unique_ptr merge_thread_; double merge_completion_percentage_; bool merge_initiated_ = false; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp index ce95b7659..563f6ad42 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "snapuserd_merge.h" #include "snapuserd_core.h" @@ -23,8 +24,13 @@ using namespace android; using namespace android::dm; using android::base::unique_fd; -int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, - std::vector* replace_zero_vec) { +MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name, + const std::string& base_path_merge, + std::shared_ptr snapuserd) + : Worker(cow_device, misc_name, base_path_merge, snapuserd) {} + +int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops, + std::vector* replace_zero_vec) { int num_ops = *pending_ops; int nr_consecutive = 0; bool checkOrderedOp = (replace_zero_vec == nullptr); @@ -70,7 +76,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } -bool Worker::MergeReplaceZeroOps() { +bool MergeWorker::MergeReplaceZeroOps() { // Flush after merging 2MB. Since all ops are independent and there is no // dependency between COW ops, we will flush the data and the number // of ops merged in COW block device. If there is a crash, we will @@ -99,17 +105,20 @@ bool Worker::MergeReplaceZeroOps() { for (size_t i = 0; i < replace_zero_vec.size(); i++) { const CowOperation* cow_op = replace_zero_vec[i]; + + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (!buffer) { + SNAP_LOG(ERROR) << "Failed to acquire buffer in merge"; + return false; + } if (cow_op->type == kCowReplaceOp) { - if (!ProcessReplaceOp(cow_op)) { - SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block; + if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) { + SNAP_LOG(ERROR) << "Failed to read COW in merge"; return false; } } else { CHECK(cow_op->type == kCowZeroOp); - if (!ProcessZeroOp()) { - SNAP_LOG(ERROR) << "Merge ZeroOp failed."; - return false; - } + memset(buffer, 0, BLOCK_SZ); } bufsink_.UpdateBufferOffset(BLOCK_SZ); @@ -149,7 +158,7 @@ bool Worker::MergeReplaceZeroOps() { if (snapuserd_->IsIOTerminated()) { SNAP_LOG(ERROR) - << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge"; + << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting down merge"; return false; } } @@ -173,7 +182,7 @@ bool Worker::MergeReplaceZeroOps() { return true; } -bool Worker::MergeOrderedOpsAsync() { +bool MergeWorker::MergeOrderedOpsAsync() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); @@ -354,7 +363,7 @@ bool Worker::MergeOrderedOpsAsync() { return true; } -bool Worker::MergeOrderedOps() { +bool MergeWorker::MergeOrderedOps() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); @@ -439,7 +448,7 @@ bool Worker::MergeOrderedOps() { return true; } -bool Worker::AsyncMerge() { +bool MergeWorker::AsyncMerge() { if (!MergeOrderedOpsAsync()) { SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O"; // Reset the iter so that we retry the merge @@ -455,7 +464,7 @@ bool Worker::AsyncMerge() { return true; } -bool Worker::SyncMerge() { +bool MergeWorker::SyncMerge() { if (!MergeOrderedOps()) { SNAP_LOG(ERROR) << "Merge failed for ordered ops"; return false; @@ -465,7 +474,7 @@ bool Worker::SyncMerge() { return true; } -bool Worker::Merge() { +bool MergeWorker::Merge() { cowop_iter_ = reader_->GetOpIter(true); bool retry = false; @@ -511,7 +520,7 @@ bool Worker::Merge() { return true; } -bool Worker::InitializeIouring() { +bool MergeWorker::InitializeIouring() { if (!snapuserd_->IsIouringSupported()) { return false; } @@ -530,13 +539,13 @@ bool Worker::InitializeIouring() { return true; } -void Worker::FinalizeIouring() { +void MergeWorker::FinalizeIouring() { if (merge_async_) { io_uring_queue_exit(ring_.get()); } } -bool Worker::RunMergeThread() { +bool MergeWorker::Run() { SNAP_LOG(DEBUG) << "Waiting for merge begin..."; if (!snapuserd_->WaitForMergeBegin()) { SNAP_LOG(ERROR) << "Merge terminated early..."; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h new file mode 100644 index 000000000..f35147f2b --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h @@ -0,0 +1,58 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include "worker.h" + +#include + +namespace android { +namespace snapshot { + +class MergeWorker : public Worker { + public: + MergeWorker(const std::string& cow_device, const std::string& misc_name, + const std::string& base_path_merge, std::shared_ptr snapuserd); + bool Run(); + + private: + int PrepareMerge(uint64_t* source_offset, int* pending_ops, + std::vector* replace_zero_vec = nullptr); + bool MergeReplaceZeroOps(); + bool MergeOrderedOps(); + bool MergeOrderedOpsAsync(); + bool Merge(); + bool AsyncMerge(); + bool SyncMerge(); + bool InitializeIouring(); + void FinalizeIouring(); + + private: + std::unique_ptr cowop_iter_; + std::unique_ptr ring_; + size_t ra_block_index_ = 0; + uint64_t blocks_merged_in_group_ = 0; + bool merge_async_ = false; + // Queue depth of 8 seems optimal. We don't want + // to have a huge depth as it may put more memory pressure + // on the kernel worker threads given that we use + // IOSQE_ASYNC flag - ASYNC flags can potentially + // result in EINTR; Since we don't restart + // syscalls and fallback to synchronous I/O, we + // don't want huge queue depth + int queue_depth_ = 8; +}; + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp new file mode 100644 index 000000000..aa156301f --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp @@ -0,0 +1,80 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "worker.h" + +#include "snapuserd_core.h" + +namespace android { +namespace snapshot { + +Worker::Worker(const std::string& cow_device, const std::string& misc_name, + const std::string& base_path_merge, std::shared_ptr snapuserd) { + cow_device_ = cow_device; + misc_name_ = misc_name; + base_path_merge_ = base_path_merge; + snapuserd_ = snapuserd; +} + +void Worker::InitializeBufsink() { + // Allocate the buffer which is used to communicate between + // daemon and dm-user. The buffer comprises of header and a fixed payload. + // If the dm-user requests a big IO, the IO will be broken into chunks + // of PAYLOAD_BUFFER_SZ. + size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ; + bufsink_.Initialize(buf_size); +} + +bool Worker::Init() { + InitializeBufsink(); + + if (!InitializeFds()) { + return false; + } + + if (!InitReader()) { + return false; + } + + return true; +} + +bool Worker::InitReader() { + reader_ = snapuserd_->CloneReaderForWorker(); + + if (!reader_->InitForMerge(std::move(cow_fd_))) { + return false; + } + return true; +} + +bool Worker::InitializeFds() { + cow_fd_.reset(open(cow_device_.c_str(), O_RDWR)); + if (cow_fd_ < 0) { + SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_; + return false; + } + + // Base device used by merge thread + base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR)); + if (base_path_merge_fd_ < 0) { + SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_; + return false; + } + + return true; +} + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h new file mode 100644 index 000000000..813b159ea --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h @@ -0,0 +1,65 @@ +// Copyright (C) 2023 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +#include +#include +#include +#include + +namespace android { +namespace snapshot { + +using android::base::unique_fd; + +class SnapshotHandler; + +class Worker { + public: + Worker(const std::string& cow_device, const std::string& misc_name, + const std::string& base_path_merge, std::shared_ptr snapuserd); + virtual ~Worker() = default; + + virtual bool Init(); + + protected: + // Initialization + void InitializeBufsink(); + bool InitializeFds(); + bool InitReader(); + virtual void CloseFds() { base_path_merge_fd_ = {}; } + + std::unique_ptr reader_; + BufferSink bufsink_; + + std::string misc_name_; // Needed for SNAP_LOG. + + unique_fd base_path_merge_fd_; + + std::shared_ptr snapuserd_; + + private: + std::string cow_device_; + std::string base_path_merge_; + unique_fd cow_fd_; +}; + +} // namespace snapshot +} // namespace android