diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp index 5306b2843..20030b940 100644 --- a/fs_mgr/libsnapshot/cow_reader.cpp +++ b/fs_mgr/libsnapshot/cow_reader.cpp @@ -34,11 +34,12 @@ namespace android { namespace snapshot { -CowReader::CowReader() +CowReader::CowReader(ReaderFlags reader_flag) : fd_(-1), header_(), fd_size_(0), - merge_op_blocks_(std::make_shared>()) {} + merge_op_blocks_(std::make_shared>()), + reader_flag_(reader_flag) {} static void SHA256(const void*, size_t, uint8_t[]) { #if 0 @@ -415,7 +416,7 @@ bool CowReader::ParseOps(std::optional label) { //============================================================== bool CowReader::PrepMergeOps() { auto merge_op_blocks = std::make_shared>(); - std::set> other_ops; + std::vector other_ops; auto seq_ops_set = std::unordered_set(); auto block_map = std::make_shared>(); size_t num_seqs = 0; @@ -446,7 +447,7 @@ bool CowReader::PrepMergeOps() { if (!has_seq_ops_ && IsOrderedOp(current_op)) { merge_op_blocks->emplace_back(current_op.new_block); } else if (seq_ops_set.count(current_op.new_block) == 0) { - other_ops.insert(current_op.new_block); + other_ops.push_back(current_op.new_block); } block_map->insert({current_op.new_block, i}); } @@ -462,6 +463,18 @@ bool CowReader::PrepMergeOps() { } else { num_ordered_ops_to_merge_ = 0; } + + // Sort the vector in increasing order if merging in user-space as + // we can batch merge them when iterating from forward. + // + // dm-snapshot-merge requires decreasing order as we iterate the blocks + // in reverse order. + if (reader_flag_ == ReaderFlags::USERSPACE_MERGE) { + std::sort(other_ops.begin(), other_ops.end()); + } else { + std::sort(other_ops.begin(), other_ops.end(), std::greater()); + } + merge_op_blocks->reserve(merge_op_blocks->size() + other_ops.size()); for (auto block : other_ops) { merge_op_blocks->emplace_back(block); diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h index c15682a7e..9f4ddbb61 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h @@ -26,8 +26,8 @@ static constexpr uint32_t kCowVersionMinor = 0; static constexpr uint32_t kCowVersionManifest = 2; -static constexpr uint32_t BLOCK_SZ = 4096; -static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1); +static constexpr size_t BLOCK_SZ = 4096; +static constexpr size_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1); // This header appears as the first sequence of bytes in the COW. All fields // in the layout are little-endian encoded. The on-disk layout is: diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index 63a9e687a..d5b433531 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -104,7 +104,12 @@ class ICowOpIter { class CowReader final : public ICowReader { public: - CowReader(); + enum class ReaderFlags { + DEFAULT = 0, + USERSPACE_MERGE = 1, + }; + + CowReader(ReaderFlags reader_flag = ReaderFlags::DEFAULT); ~CowReader() { owned_fd_ = {}; } // Parse the COW, optionally, up to the given label. If no label is @@ -166,6 +171,7 @@ class CowReader final : public ICowReader { uint64_t num_ordered_ops_to_merge_; bool has_seq_ops_; std::shared_ptr> data_loc_; + ReaderFlags reader_flag_; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp index 837f33a77..c9b05124b 100644 --- a/fs_mgr/libsnapshot/snapuserd/Android.bp +++ b/fs_mgr/libsnapshot/snapuserd/Android.bp @@ -56,7 +56,7 @@ cc_defaults { "fs_mgr_defaults", ], srcs: [ - "snapuserd_server.cpp", + "dm-snapshot-merge/snapuserd_server.cpp", "dm-snapshot-merge/snapuserd.cpp", "dm-snapshot-merge/snapuserd_worker.cpp", "dm-snapshot-merge/snapuserd_readahead.cpp", @@ -67,6 +67,7 @@ cc_defaults { "user-space-merge/snapuserd_merge.cpp", "user-space-merge/snapuserd_readahead.cpp", "user-space-merge/snapuserd_transitions.cpp", + "user-space-merge/snapuserd_server.cpp", ], cflags: [ diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp similarity index 99% rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp index 91b41908b..9ddc96389 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp +++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp @@ -31,6 +31,7 @@ #include #include #include + #include "snapuserd_server.h" #define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_ diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h similarity index 99% rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h index 14e5de6e7..3b6ff1583 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h +++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h @@ -28,7 +28,7 @@ #include #include -#include "dm-snapshot-merge/snapuserd.h" +#include "snapuserd.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h index aeecf410e..6ed55af2d 100644 --- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h +++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h @@ -79,6 +79,15 @@ class SnapuserdClient { // Returns true if the snapuserd instance supports bridging a socket to second-stage init. bool SupportsSecondStageSocketHandoff(); + + // Returns true if the merge is started(or resumed from crash). + bool InitiateMerge(const std::string& misc_name); + + // Returns Merge completion percentage + double GetMergePercent(); + + // Return the status of the snapshot + std::string QuerySnapshotStatus(const std::string& misc_name); }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp index 1ea05a3b0..e345269ae 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp @@ -231,5 +231,35 @@ bool SnapuserdClient::DetachSnapuserd() { return true; } +bool SnapuserdClient::InitiateMerge(const std::string& misc_name) { + std::string msg = "initiate_merge," + misc_name; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return false; + } + std::string response = Receivemsg(); + return response == "success"; +} + +double SnapuserdClient::GetMergePercent() { + std::string msg = "merge_percent"; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return false; + } + std::string response = Receivemsg(); + + return std::stod(response); +} + +std::string SnapuserdClient::QuerySnapshotStatus(const std::string& misc_name) { + std::string msg = "getstatus," + misc_name; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return "snapshot-merge-failed"; + } + return Receivemsg(); +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp index e05822ed7..912884fd3 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp @@ -21,8 +21,6 @@ #include #include -#include "snapuserd_server.h" - DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path."); DEFINE_bool(no_socket, false, "If true, no socket is used. Each additional argument is an INIT message."); diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h index b660ba2ef..fbf57d943 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h @@ -19,7 +19,7 @@ #include #include -#include "snapuserd_server.h" +#include "dm-snapshot-merge/snapuserd_server.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index a2538d297..57e47e7ed 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -59,6 +59,15 @@ std::unique_ptr SnapshotHandler::CloneReaderForWorker() { return reader_->CloneCowReader(); } +void SnapshotHandler::UpdateMergeCompletionPercentage() { + struct CowHeader* ch = reinterpret_cast(mapped_addr_); + merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops(); + + SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_ + << " num_merge_ops: " << ch->num_merge_ops + << " total-ops: " << reader_->get_num_total_data_ops(); +} + bool SnapshotHandler::CommitMerge(int num_merge_ops) { struct CowHeader* ch = reinterpret_cast(mapped_addr_); ch->num_merge_ops += num_merge_ops; @@ -95,6 +104,12 @@ bool SnapshotHandler::CommitMerge(int num_merge_ops) { } } + // Update the merge completion - this is used by update engine + // to track the completion. No need to take a lock. It is ok + // even if there is a miss on reading a latest updated value. + // Subsequent polling will eventually converge to completion. + UpdateMergeCompletionPercentage(); + return true; } @@ -124,7 +139,7 @@ void SnapshotHandler::CheckMergeCompletionStatus() { } bool SnapshotHandler::ReadMetadata() { - reader_ = std::make_unique(); + reader_ = std::make_unique(CowReader::ReaderFlags::USERSPACE_MERGE); CowHeader header; CowOptions options; @@ -152,16 +167,48 @@ bool SnapshotHandler::ReadMetadata() { return false; } + UpdateMergeCompletionPercentage(); + // Initialize the iterator for reading metadata std::unique_ptr cowop_iter = reader_->GetMergeOpIter(); + int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ); + int ra_index = 0; + + size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0; + while (!cowop_iter->Done()) { const CowOperation* cow_op = &cowop_iter->Get(); + if (cow_op->type == kCowCopyOp) { + copy_ops += 1; + } else if (cow_op->type == kCowReplaceOp) { + replace_ops += 1; + } else if (cow_op->type == kCowZeroOp) { + zero_ops += 1; + } else if (cow_op->type == kCowXorOp) { + xor_ops += 1; + } + chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op)); - if (!ra_thread_ && IsOrderedOp(*cow_op)) { + if (IsOrderedOp(*cow_op)) { ra_thread_ = true; + block_to_ra_index_[cow_op->new_block] = ra_index; + num_ra_ops_per_iter -= 1; + + if ((ra_index + 1) - merge_blk_state_.size() == 1) { + std::unique_ptr blk_state = std::make_unique( + MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0); + + merge_blk_state_.push_back(std::move(blk_state)); + } + + // Move to next RA block + if (num_ra_ops_per_iter == 0) { + num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ); + ra_index += 1; + } } cowop_iter->Next(); } @@ -173,6 +220,12 @@ bool SnapshotHandler::ReadMetadata() { PrepareReadAhead(); + SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops + << " Total-data-ops: " << reader_->get_num_total_data_ops() + << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops + << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops + << " Xor-ops: " << xor_ops; + return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index c171eda1e..13b56facb 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -67,6 +67,27 @@ enum class MERGE_IO_TRANSITION { class SnapshotHandler; +enum class MERGE_GROUP_STATE { + GROUP_MERGE_PENDING, + GROUP_MERGE_RA_READY, + GROUP_MERGE_IN_PROGRESS, + GROUP_MERGE_COMPLETED, + GROUP_MERGE_FAILED, + GROUP_INVALID, +}; + +struct MergeGroupState { + MERGE_GROUP_STATE merge_state_; + // Ref count I/O when group state + // is in "GROUP_MERGE_PENDING" + size_t num_ios_in_progress; + std::mutex m_lock; + std::condition_variable m_cv; + + MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios) + : merge_state_(state), num_ios_in_progress(n_ios) {} +}; + class ReadAhead { public: ReadAhead(const std::string& cow_device, const std::string& backing_device, @@ -133,16 +154,33 @@ class Worker { base_path_merge_fd_ = {}; } + // Functions interacting with dm-user + bool ReadDmUserHeader(); + bool WriteDmUserPayload(size_t size, bool header_response); + bool DmuserReadRequest(); + // IO Path bool ProcessIORequest(); + bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); } + + bool ReadDataFromBaseDevice(sector_t sector, size_t read_size); + bool ReadFromSourceDevice(const CowOperation* cow_op); + + bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response); + bool ReadUnalignedSector(sector_t sector, size_t size); + int ReadUnalignedSector(sector_t sector, size_t size, + std::vector>::iterator& it); + bool RespondIOError(bool header_response); // Processing COW operations + bool ProcessCowOp(const CowOperation* cow_op); bool ProcessReplaceOp(const CowOperation* cow_op); bool ProcessZeroOp(); // Handles Copy and Xor bool ProcessCopyOp(const CowOperation* cow_op); bool ProcessXorOp(const CowOperation* cow_op); + bool ProcessOrderedOp(const CowOperation* cow_op); // Merge related ops bool Merge(); @@ -152,6 +190,9 @@ class Worker { const std::unique_ptr& cowop_iter, std::vector* replace_zero_vec = nullptr); + sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } + chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } + std::unique_ptr reader_; BufferSink bufsink_; XorSink xorsink_; @@ -210,6 +251,7 @@ class SnapshotHandler : public std::enable_shared_from_this { // Read-ahead related functions void* GetMappedAddr() { return mapped_addr_; } void PrepareReadAhead(); + std::unordered_map& GetReadAheadMap() { return read_ahead_buffer_map_; } // State transitions for merge void InitiateMerge(); @@ -226,6 +268,8 @@ class SnapshotHandler : public std::enable_shared_from_this { bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; } void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; } + // Return the snapshot status + std::string GetMergeStatus(); // RA related functions uint64_t GetBufferMetadataOffset(); @@ -238,12 +282,23 @@ class SnapshotHandler : public std::enable_shared_from_this { int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; } void SetSocketPresent(bool socket) { is_socket_present_ = socket; } bool MergeInitiated() { return merge_initiated_; } + double GetMergePercentage() { return merge_completion_percentage_; } + + // Merge Block State Transitions + void SetMergeCompleted(size_t block_index); + void SetMergeInProgress(size_t block_index); + void SetMergeFailed(size_t block_index); + void NotifyIOCompletion(uint64_t new_block); + bool GetRABuffer(std::unique_lock* lock, uint64_t block, void* buffer); + MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer); private: bool ReadMetadata(); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } + bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } struct BufferState* GetBufferState(); + void UpdateMergeCompletionPercentage(); void ReadBlocks(const std::string partition_name, const std::string& dm_block_device); void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name, @@ -261,7 +316,6 @@ class SnapshotHandler : public std::enable_shared_from_this { unique_fd cow_fd_; - // Number of sectors required when initializing dm-user uint64_t num_sectors_; std::unique_ptr reader_; @@ -283,8 +337,16 @@ class SnapshotHandler : public std::enable_shared_from_this { int total_ra_blocks_merged_ = 0; MERGE_IO_TRANSITION io_state_; std::unique_ptr read_ahead_thread_; + std::unordered_map read_ahead_buffer_map_; + + // user-space-merging + std::unordered_map block_to_ra_index_; + + // Merge Block state + std::vector> merge_blk_state_; std::unique_ptr merge_thread_; + double merge_completion_percentage_; bool merge_initiated_ = false; bool attached_ = false; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp index 18c7f2c96..bfbacf92e 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp @@ -84,6 +84,56 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) { return true; } +bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) { + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (buffer == nullptr) { + SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer"; + return false; + } + SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block + << " Source: " << cow_op->source; + uint64_t offset = cow_op->source; + if (cow_op->type == kCowCopyOp) { + offset *= BLOCK_SZ; + } + if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) { + std::string op; + if (cow_op->type == kCowCopyOp) + op = "Copy-op"; + else { + op = "Xor-op"; + } + SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_ + << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ; + return false; + } + + return true; +} + +// Start the copy operation. This will read the backing +// block device which is represented by cow_op->source. +bool Worker::ProcessCopyOp(const CowOperation* cow_op) { + if (!ReadFromSourceDevice(cow_op)) { + return false; + } + + return true; +} + +bool Worker::ProcessXorOp(const CowOperation* cow_op) { + if (!ReadFromSourceDevice(cow_op)) { + return false; + } + xorsink_.Reset(); + if (!reader_->ReadData(*cow_op, &xorsink_)) { + SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block; + return false; + } + + return true; +} + bool Worker::ProcessZeroOp() { // Zero out the entire block void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); @@ -96,12 +146,85 @@ bool Worker::ProcessZeroOp() { return true; } -bool Worker::ProcessCopyOp(const CowOperation*) { - return true; +bool Worker::ProcessOrderedOp(const CowOperation* cow_op) { + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (buffer == nullptr) { + SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer"; + return false; + } + + MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer); + + switch (state) { + case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: { + // Merge is completed for this COW op; just read directly from + // the base device + SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: " + << (cow_op->new_block >> SECTOR_SHIFT) + << " Block-number: " << cow_op->new_block; + if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) { + SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: " + << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete."; + return false; + } + return true; + } + case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: { + bool ret; + if (cow_op->type == kCowCopyOp) { + ret = ProcessCopyOp(cow_op); + } else { + ret = ProcessXorOp(cow_op); + } + + // I/O is complete - decrement the refcount irrespective of the return + // status + snapuserd_->NotifyIOCompletion(cow_op->new_block); + return ret; + } + // We already have the data in the buffer retrieved from RA thread. + // Nothing to process further. + case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: { + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: { + return true; + } + default: { + // All other states, fail the I/O viz (GROUP_MERGE_FAILED and GROUP_INVALID) + return false; + } + } + + return false; } -bool Worker::ProcessXorOp(const CowOperation*) { - return true; +bool Worker::ProcessCowOp(const CowOperation* cow_op) { + if (cow_op == nullptr) { + SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op"; + return false; + } + + switch (cow_op->type) { + case kCowReplaceOp: { + return ProcessReplaceOp(cow_op); + } + + case kCowZeroOp: { + return ProcessZeroOp(); + } + + case kCowCopyOp: + [[fallthrough]]; + case kCowXorOp: { + return ProcessOrderedOp(cow_op); + } + + default: { + SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type; + } + } + return false; } void Worker::InitializeBufsink() { @@ -129,7 +252,7 @@ bool Worker::Init() { } bool Worker::RunThread() { - SNAP_LOG(DEBUG) << "Processing snapshot I/O requests..."; + SNAP_LOG(INFO) << "Processing snapshot I/O requests...."; // Start serving IO while (true) { if (!ProcessIORequest()) { @@ -143,8 +266,378 @@ bool Worker::RunThread() { return true; } +// Read Header from dm-user misc device. This gives +// us the sector number for which IO is issued by dm-snapshot device +bool Worker::ReadDmUserHeader() { + if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) { + if (errno != ENOTBLK) { + SNAP_PLOG(ERROR) << "Control-read failed"; + } + + SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed...."; + return false; + } + + return true; +} + +// Send the payload/data back to dm-user misc device. +bool Worker::WriteDmUserPayload(size_t size, bool header_response) { + size_t payload_size = size; + void* buf = bufsink_.GetPayloadBufPtr(); + if (header_response) { + payload_size += sizeof(struct dm_user_header); + buf = bufsink_.GetBufPtr(); + } + + if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) { + SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size; + return false; + } + + return true; +} + +bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) { + CHECK(read_size <= BLOCK_SZ); + + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (buffer == nullptr) { + SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer"; + return false; + } + + loff_t offset = sector << SECTOR_SHIFT; + if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) { + SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_ + << "at sector :" << sector << " size: " << read_size; + return false; + } + + return true; +} + +bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + size_t remaining_size = sz; + std::vector>& chunk_vec = snapuserd_->GetChunkVec(); + bool io_error = false; + int ret = 0; + + do { + // Process 1MB payload at a time + size_t read_size = std::min(PAYLOAD_SIZE, remaining_size); + + header->type = DM_USER_RESP_SUCCESS; + size_t total_bytes_read = 0; + io_error = false; + bufsink_.ResetBufferOffset(); + + while (read_size) { + // We need to check every 4k block to verify if it is + // present in the mapping. + size_t size = std::min(BLOCK_SZ, read_size); + + auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), + std::make_pair(sector, nullptr), SnapshotHandler::compare); + bool not_found = (it == chunk_vec.end() || it->first != sector); + + if (not_found) { + // Block not found in map - which means this block was not + // changed as per the OTA. Just route the I/O to the base + // device. + if (!ReadDataFromBaseDevice(sector, size)) { + SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed"; + header->type = DM_USER_RESP_ERROR; + } + + ret = size; + } else { + // We found the sector in mapping. Check the type of COW OP and + // process it. + if (!ProcessCowOp(it->second)) { + SNAP_LOG(ERROR) << "ProcessCowOp failed"; + header->type = DM_USER_RESP_ERROR; + } + + ret = BLOCK_SZ; + } + + // Just return the header if it is an error + if (header->type == DM_USER_RESP_ERROR) { + if (!RespondIOError(header_response)) { + return false; + } + + io_error = true; + break; + } + + read_size -= ret; + total_bytes_read += ret; + sector += (ret >> SECTOR_SHIFT); + bufsink_.UpdateBufferOffset(ret); + } + + if (!io_error) { + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + + SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read + << " header-response: " << header_response + << " remaining_size: " << remaining_size; + header_response = false; + remaining_size -= total_bytes_read; + } + } while (remaining_size > 0 && !io_error); + + return true; +} + +int Worker::ReadUnalignedSector( + sector_t sector, size_t size, + std::vector>::iterator& it) { + size_t skip_sector_size = 0; + + SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size + << " Aligned sector: " << it->first; + + if (!ProcessCowOp(it->second)) { + SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size + << " Aligned sector: " << it->first; + return -1; + } + + int num_sectors_skip = sector - it->first; + + if (num_sectors_skip > 0) { + skip_sector_size = num_sectors_skip << SECTOR_SHIFT; + char* buffer = reinterpret_cast(bufsink_.GetBufPtr()); + struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0])); + + if (skip_sector_size == BLOCK_SZ) { + SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector + << " Base-sector: " << it->first; + return -1; + } + + memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size, + (BLOCK_SZ - skip_sector_size)); + } + + bufsink_.ResetBufferOffset(); + return std::min(size, (BLOCK_SZ - skip_sector_size)); +} + +bool Worker::ReadUnalignedSector(sector_t sector, size_t size) { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + header->type = DM_USER_RESP_SUCCESS; + bufsink_.ResetBufferOffset(); + std::vector>& chunk_vec = snapuserd_->GetChunkVec(); + + auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr), + SnapshotHandler::compare); + + // |-------|-------|-------| + // 0 1 2 3 + // + // Block 0 - op 1 + // Block 1 - op 2 + // Block 2 - op 3 + // + // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops. + // + // Each block is 4k bytes. Thus, the last block will span 8 sectors + // ranging till block 3 (However, block 3 won't be in chunk_vec as + // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector + // spanning between block 2 and block 3, we need to step back + // and get hold of the last element. + // + // Additionally, we need to make sure that the requested sector is + // indeed within the range of the final sector. It is perfectly valid + // to get an I/O request for block 3 and beyond which are not mapped + // to any COW ops. In that case, we just need to read from the base + // device. + bool merge_complete = false; + bool header_response = true; + if (it == chunk_vec.end()) { + if (chunk_vec.size() > 0) { + // I/O request beyond the last mapped sector + it = std::prev(chunk_vec.end()); + } else { + // This can happen when a partition merge is complete but snapshot + // state in /metadata is not yet deleted; during this window if the + // device is rebooted, subsequent attempt will mount the snapshot. + // However, since the merge was completed we wouldn't have any + // mapping to COW ops thus chunk_vec will be empty. In that case, + // mark this as merge_complete and route the I/O to the base device. + merge_complete = true; + } + } else if (it->first != sector) { + if (it != chunk_vec.begin()) { + --it; + } + } else { + return ReadAlignedSector(sector, size, header_response); + } + + loff_t requested_offset = sector << SECTOR_SHIFT; + + loff_t final_offset = 0; + if (!merge_complete) { + final_offset = it->first << SECTOR_SHIFT; + } + + // Since a COW op span 4k block size, we need to make sure that the requested + // offset is within the 4k region. Consider the following case: + // + // |-------|-------|-------| + // 0 1 2 3 + // + // Block 0 - op 1 + // Block 1 - op 2 + // + // We have an I/O request for a sector between block 2 and block 3. However, + // we have mapping to COW ops only for block 0 and block 1. Thus, the + // requested offset in this case is beyond the last mapped COW op size (which + // is block 1 in this case). + + size_t total_bytes_read = 0; + size_t remaining_size = size; + int ret = 0; + if (!merge_complete && (requested_offset >= final_offset) && + (requested_offset - final_offset) < BLOCK_SZ) { + // Read the partial un-aligned data + ret = ReadUnalignedSector(sector, remaining_size, it); + if (ret < 0) { + SNAP_LOG(ERROR) << "ReadUnalignedSector failed for sector: " << sector + << " size: " << size << " it->sector: " << it->first; + return RespondIOError(header_response); + } + + remaining_size -= ret; + total_bytes_read += ret; + sector += (ret >> SECTOR_SHIFT); + + // Send the data back + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + + header_response = false; + // If we still have pending data to be processed, this will be aligned I/O + if (remaining_size) { + return ReadAlignedSector(sector, remaining_size, header_response); + } + } else { + // This is all about handling I/O request to be routed to base device + // as the I/O is not mapped to any of the COW ops. + loff_t aligned_offset = requested_offset; + // Align to nearest 4k + aligned_offset += BLOCK_SZ - 1; + aligned_offset &= ~(BLOCK_SZ - 1); + // Find the diff of the aligned offset + size_t diff_size = aligned_offset - requested_offset; + CHECK(diff_size <= BLOCK_SZ); + if (remaining_size < diff_size) { + if (!ReadDataFromBaseDevice(sector, remaining_size)) { + return RespondIOError(header_response); + } + total_bytes_read += remaining_size; + + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + } else { + if (!ReadDataFromBaseDevice(sector, diff_size)) { + return RespondIOError(header_response); + } + + total_bytes_read += diff_size; + + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + + remaining_size -= diff_size; + size_t num_sectors_read = (diff_size >> SECTOR_SHIFT); + sector += num_sectors_read; + CHECK(IsBlockAligned(sector << SECTOR_SHIFT)); + header_response = false; + + // If we still have pending data to be processed, this will be aligned I/O + return ReadAlignedSector(sector, remaining_size, header_response); + } + } + + return true; +} + +bool Worker::RespondIOError(bool header_response) { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + header->type = DM_USER_RESP_ERROR; + // This is an issue with the dm-user interface. There + // is no way to propagate the I/O error back to dm-user + // if we have already communicated the header back. Header + // is responded once at the beginning; however I/O can + // be processed in chunks. If we encounter an I/O error + // somewhere in the middle of the processing, we can't communicate + // this back to dm-user. + // + // TODO: Fix the interface + CHECK(header_response); + + if (!WriteDmUserPayload(0, header_response)) { + return false; + } + + // There is no need to process further as we have already seen + // an I/O error + return true; +} + +bool Worker::DmuserReadRequest() { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + + // Unaligned I/O request + if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) { + return ReadUnalignedSector(header->sector, header->len); + } + + return ReadAlignedSector(header->sector, header->len, true); +} + bool Worker::ProcessIORequest() { - // No communication with dm-user yet + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + + if (!ReadDmUserHeader()) { + return false; + } + + SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq; + SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len; + SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector; + SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type; + SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags; + + switch (header->type) { + case DM_USER_REQ_MAP_READ: { + if (!DmuserReadRequest()) { + return false; + } + break; + } + + case DM_USER_REQ_MAP_WRITE: { + // TODO: We should not get any write request + // to dm-user as we mount all partitions + // as read-only. Need to verify how are TRIM commands + // handled during mount. + return false; + } + } + return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp index 696ede72f..47fc7db50 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp @@ -52,7 +52,6 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, break; } - // Check for consecutive blocks uint64_t next_offset = op->new_block * BLOCK_SZ; if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) { break; @@ -177,6 +176,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); + size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOps started...."; @@ -190,9 +190,12 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { + snapuserd_->SetMergeFailed(block_index); return false; } + snapuserd_->SetMergeInProgress(block_index); + loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; @@ -213,6 +216,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Failed to write to backing device while merging " << " at offset: " << source_offset << " io_size: " << io_size; + snapuserd_->SetMergeFailed(block_index); return false; } @@ -226,6 +230,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Flush the data if (fsync(base_path_merge_fd_.get()) < 0) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; + snapuserd_->SetMergeFailed(block_index); return false; } @@ -233,14 +238,20 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; + snapuserd_->SetMergeFailed(block_index); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); + // Mark the block as merge complete + snapuserd_->SetMergeCompleted(block_index); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); + + // Get the next block + block_index += 1; } return true; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp index 319755b11..0bcf26e78 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp @@ -115,7 +115,7 @@ int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops, } bool ReadAhead::ReconstructDataFromCow() { - std::unordered_map read_ahead_buffer_map; + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); loff_t metadata_offset = 0; loff_t start_data_offset = snapuserd_->GetBufferDataOffset(); int num_ops = 0; @@ -319,6 +319,18 @@ bool ReadAhead::ReadAheadIOStart() { memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize()); memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ); + offset = 0; + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); + read_ahead_buffer_map.clear(); + + for (size_t block_index = 0; block_index < blocks.size(); block_index++) { + void* bufptr = static_cast((char*)read_ahead_buffer_ + offset); + uint64_t new_block = blocks[block_index]; + + read_ahead_buffer_map[new_block] = bufptr; + offset += BLOCK_SZ; + } + snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged); // Flush the data only if we have a overlapping blocks in the region diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp new file mode 100644 index 000000000..a4fd5a035 --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp @@ -0,0 +1,684 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "snapuserd_server.h" + +#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_ +#include + +namespace android { +namespace snapshot { + +using namespace std::string_literals; + +using android::base::borrowed_fd; +using android::base::unique_fd; + +DaemonOps SnapuserServer::Resolveop(std::string& input) { + if (input == "init") return DaemonOps::INIT; + if (input == "start") return DaemonOps::START; + if (input == "stop") return DaemonOps::STOP; + if (input == "query") return DaemonOps::QUERY; + if (input == "delete") return DaemonOps::DELETE; + if (input == "detach") return DaemonOps::DETACH; + if (input == "supports") return DaemonOps::SUPPORTS; + if (input == "initiate_merge") return DaemonOps::INITIATE; + if (input == "merge_percent") return DaemonOps::PERCENTAGE; + if (input == "getstatus") return DaemonOps::GETSTATUS; + + return DaemonOps::INVALID; +} + +SnapuserServer::~SnapuserServer() { + // Close any client sockets that were added via AcceptClient(). + for (size_t i = 1; i < watched_fds_.size(); i++) { + close(watched_fds_[i].fd); + } +} + +std::string SnapuserServer::GetDaemonStatus() { + std::string msg = ""; + + if (IsTerminating()) + msg = "passive"; + else + msg = "active"; + + return msg; +} + +void SnapuserServer::Parsemsg(std::string const& msg, const char delim, + std::vector& out) { + std::stringstream ss(msg); + std::string s; + + while (std::getline(ss, s, delim)) { + out.push_back(s); + } +} + +void SnapuserServer::ShutdownThreads() { + terminating_ = true; + JoinAllThreads(); +} + +DmUserHandler::DmUserHandler(std::shared_ptr snapuserd) + : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {} + +bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) { + ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL)); + if (ret < 0) { + PLOG(ERROR) << "Snapuserd:server: send() failed"; + return false; + } + + if (ret < msg.size()) { + LOG(ERROR) << "Partial send; expected " << msg.size() << " bytes, sent " << ret; + return false; + } + return true; +} + +bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) { + char msg[MAX_PACKET_SIZE]; + ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0)); + if (rv < 0) { + PLOG(ERROR) << "recv failed"; + return false; + } + *data = std::string(msg, rv); + return true; +} + +bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) { + const char delim = ','; + + std::vector out; + Parsemsg(str, delim, out); + DaemonOps op = Resolveop(out[0]); + + switch (op) { + case DaemonOps::INIT: { + // Message format: + // init,,,, + // + // Reads the metadata and send the number of sectors + if (out.size() != 5) { + LOG(ERROR) << "Malformed init message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + + auto handler = AddHandler(out[1], out[2], out[3], out[4]); + if (!handler) { + return Sendmsg(fd, "fail"); + } + + auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors()); + return Sendmsg(fd, retval); + } + case DaemonOps::START: { + // Message format: + // start, + // + // Start the new thread which binds to dm-user misc device + if (out.size() != 2) { + LOG(ERROR) << "Malformed start message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "fail"); + } + if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) { + LOG(ERROR) << "Tried to re-attach control device: " << out[1]; + return Sendmsg(fd, "fail"); + } + if (!StartHandler(*iter)) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } + case DaemonOps::STOP: { + // Message format: stop + // + // Stop all the threads gracefully and then shutdown the + // main thread + SetTerminating(); + ShutdownThreads(); + return true; + } + case DaemonOps::QUERY: { + // Message format: query + // + // As part of transition, Second stage daemon will be + // created before terminating the first stage daemon. Hence, + // for a brief period client may have to distiguish between + // first stage daemon and second stage daemon. + // + // Second stage daemon is marked as active and hence will + // be ready to receive control message. + return Sendmsg(fd, GetDaemonStatus()); + } + case DaemonOps::DELETE: { + // Message format: + // delete, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + // After merge is completed, we swap dm-user table with + // the underlying dm-linear base device. Hence, worker + // threads would have terminted and was removed from + // the list. + LOG(DEBUG) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "success"); + } + + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } + if (!RemoveAndJoinHandler(out[1])) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } + case DaemonOps::DETACH: { + std::lock_guard lock(lock_); + TerminateMergeThreads(&lock); + terminating_ = true; + return true; + } + case DaemonOps::SUPPORTS: { + if (out.size() != 2) { + LOG(ERROR) << "Malformed supports message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[1] == "second_stage_socket_handoff") { + return Sendmsg(fd, "success"); + } + return Sendmsg(fd, "fail"); + } + case DaemonOps::INITIATE: { + if (out.size() != 2) { + LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[0] == "initiate_merge") { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "fail"); + } + + if (!StartMerge(*iter)) { + return Sendmsg(fd, "fail"); + } + + return Sendmsg(fd, "success"); + } + return Sendmsg(fd, "fail"); + } + case DaemonOps::PERCENTAGE: { + std::lock_guard lock(lock_); + double percentage = GetMergePercentage(&lock); + + return Sendmsg(fd, std::to_string(percentage)); + } + case DaemonOps::GETSTATUS: { + // Message format: + // getstatus, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "snapshot-merge-failed"); + } + { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "snapshot-merge-failed"); + } + + std::string merge_status = GetMergeStatus(*iter); + return Sendmsg(fd, merge_status); + } + } + default: { + LOG(ERROR) << "Received unknown message type from client"; + Sendmsg(fd, "fail"); + return false; + } + } +} + +void SnapuserServer::RunThread(std::shared_ptr handler) { + LOG(INFO) << "Entering thread for handler: " << handler->misc_name(); + + handler->snapuserd()->SetSocketPresent(is_socket_present_); + if (!handler->snapuserd()->Start()) { + LOG(ERROR) << " Failed to launch all worker threads"; + } + + handler->snapuserd()->CloseFds(); + handler->snapuserd()->CheckMergeCompletionStatus(); + handler->snapuserd()->UnmapBufferRegion(); + + auto misc_name = handler->misc_name(); + LOG(INFO) << "Handler thread about to exit: " << misc_name; + + { + std::lock_guard lock(lock_); + num_partitions_merge_complete_ += 1; + handler->SetThreadTerminated(); + auto iter = FindHandler(&lock, handler->misc_name()); + if (iter == dm_users_.end()) { + // RemoveAndJoinHandler() already removed us from the list, and is + // now waiting on a join(), so just return. Additionally, release + // all the resources held by snapuserd object which are shared + // by worker threads. This should be done when the last reference + // of "handler" is released; but we will explicitly release here + // to make sure snapuserd object is freed as it is the biggest + // consumer of memory in the daemon. + handler->FreeResources(); + LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name; + return; + } + + LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name; + + if (handler->snapuserd()->IsAttached()) { + handler->thread().detach(); + } + + // Important: free resources within the lock. This ensures that if + // WaitForDelete() is called, the handler is either in the list, or + // it's not and its resources are guaranteed to be freed. + handler->FreeResources(); + dm_users_.erase(iter); + } +} + +bool SnapuserServer::Start(const std::string& socketname) { + bool start_listening = true; + + sockfd_.reset(android_get_control_socket(socketname.c_str())); + if (sockfd_ < 0) { + sockfd_.reset(socket_local_server(socketname.c_str(), ANDROID_SOCKET_NAMESPACE_RESERVED, + SOCK_STREAM)); + if (sockfd_ < 0) { + PLOG(ERROR) << "Failed to create server socket " << socketname; + return false; + } + start_listening = false; + } + return StartWithSocket(start_listening); +} + +bool SnapuserServer::StartWithSocket(bool start_listening) { + if (start_listening && listen(sockfd_.get(), 4) < 0) { + PLOG(ERROR) << "listen socket failed"; + return false; + } + + AddWatchedFd(sockfd_, POLLIN); + is_socket_present_ = true; + + // If started in first-stage init, the property service won't be online. + if (access("/dev/socket/property_service", F_OK) == 0) { + if (!android::base::SetProperty("snapuserd.ready", "true")) { + LOG(ERROR) << "Failed to set snapuserd.ready property"; + return false; + } + } + + LOG(DEBUG) << "Snapuserd server now accepting connections"; + return true; +} + +bool SnapuserServer::Run() { + LOG(INFO) << "Now listening on snapuserd socket"; + + while (!IsTerminating()) { + int rv = TEMP_FAILURE_RETRY(poll(watched_fds_.data(), watched_fds_.size(), -1)); + if (rv < 0) { + PLOG(ERROR) << "poll failed"; + return false; + } + if (!rv) { + continue; + } + + if (watched_fds_[0].revents) { + AcceptClient(); + } + + auto iter = watched_fds_.begin() + 1; + while (iter != watched_fds_.end()) { + if (iter->revents && !HandleClient(iter->fd, iter->revents)) { + close(iter->fd); + iter = watched_fds_.erase(iter); + } else { + iter++; + } + } + } + + JoinAllThreads(); + return true; +} + +void SnapuserServer::JoinAllThreads() { + // Acquire the thread list within the lock. + std::vector> dm_users; + { + std::lock_guard guard(lock_); + dm_users = std::move(dm_users_); + } + + for (auto& client : dm_users) { + auto& th = client->thread(); + + if (th.joinable()) th.join(); + } +} + +void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) { + struct pollfd p = {}; + p.fd = fd.get(); + p.events = events; + watched_fds_.emplace_back(std::move(p)); +} + +void SnapuserServer::AcceptClient() { + int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC)); + if (fd < 0) { + PLOG(ERROR) << "accept4 failed"; + return; + } + + AddWatchedFd(fd, POLLIN); +} + +bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) { + if (revents & POLLHUP) { + LOG(DEBUG) << "Snapuserd client disconnected"; + return false; + } + + std::string str; + if (!Recv(fd, &str)) { + return false; + } + if (!Receivemsg(fd, str)) { + LOG(ERROR) << "Encountered error handling client message, revents: " << revents; + return false; + } + return true; +} + +void SnapuserServer::Interrupt() { + // Force close the socket so poll() fails. + sockfd_ = {}; + SetTerminating(); +} + +std::shared_ptr SnapuserServer::AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge) { + auto snapuserd = std::make_shared(misc_name, cow_device_path, backing_device, + base_path_merge); + if (!snapuserd->InitCowDevice()) { + LOG(ERROR) << "Failed to initialize Snapuserd"; + return nullptr; + } + + if (!snapuserd->InitializeWorkers()) { + LOG(ERROR) << "Failed to initialize workers"; + return nullptr; + } + + auto handler = std::make_shared(snapuserd); + { + std::lock_guard lock(lock_); + if (FindHandler(&lock, misc_name) != dm_users_.end()) { + LOG(ERROR) << "Handler already exists: " << misc_name; + return nullptr; + } + dm_users_.push_back(handler); + } + return handler; +} + +bool SnapuserServer::StartHandler(const std::shared_ptr& handler) { + if (handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler already attached"; + return false; + } + + handler->snapuserd()->AttachControlDevice(); + + handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler)); + return true; +} + +bool SnapuserServer::StartMerge(const std::shared_ptr& handler) { + if (!handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started"; + return false; + } + + handler->snapuserd()->InitiateMerge(); + return true; +} + +auto SnapuserServer::FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name) -> HandlerList::iterator { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if ((*iter)->misc_name() == misc_name) { + return iter; + } + } + return dm_users_.end(); +} + +void SnapuserServer::TerminateMergeThreads(std::lock_guard* proof_of_lock) { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } +} + +std::string SnapuserServer::GetMergeStatus(const std::shared_ptr& handler) { + return handler->snapuserd()->GetMergeStatus(); +} + +double SnapuserServer::GetMergePercentage(std::lock_guard* proof_of_lock) { + CHECK(proof_of_lock); + double percentage = 0.0; + int n = 0; + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + auto& th = (*iter)->thread(); + if (th.joinable()) { + // Merge percentage by individual partitions wherein merge is still + // in-progress + percentage += (*iter)->snapuserd()->GetMergePercentage(); + n += 1; + } + } + + // Calculate final merge including those partitions where merge was already + // completed - num_partitions_merge_complete_ will track them when each + // thread exists in RunThread. + int total_partitions = n + num_partitions_merge_complete_; + + if (total_partitions) { + percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions; + } + + LOG(DEBUG) << "Merge %: " << percentage + << " num_partitions_merge_complete_: " << num_partitions_merge_complete_ + << " total_partitions: " << total_partitions << " n: " << n; + return percentage; +} + +bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) { + std::shared_ptr handler; + { + std::lock_guard lock(lock_); + + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + // Client already deleted. + return true; + } + handler = std::move(*iter); + dm_users_.erase(iter); + } + + auto& th = handler->thread(); + if (th.joinable()) { + th.join(); + } + return true; +} + +bool SnapuserServer::WaitForSocket() { + auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); }); + + auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy; + + if (!android::fs_mgr::WaitForFile(socket_path, std::chrono::milliseconds::max())) { + LOG(ERROR) + << "Failed to wait for proxy socket, second-stage snapuserd will fail to connect"; + return false; + } + + // We must re-initialize property service access, since we launched before + // second-stage init. + __system_properties_init(); + + if (!android::base::WaitForProperty("snapuserd.proxy_ready", "true")) { + LOG(ERROR) + << "Failed to wait for proxy property, second-stage snapuserd will fail to connect"; + return false; + } + + unique_fd fd(socket_local_client(kSnapuserdSocketProxy, ANDROID_SOCKET_NAMESPACE_RESERVED, + SOCK_SEQPACKET)); + if (fd < 0) { + PLOG(ERROR) << "Failed to connect to socket proxy"; + return false; + } + + char code[1]; + std::vector fds; + ssize_t rv = android::base::ReceiveFileDescriptorVector(fd, code, sizeof(code), 1, &fds); + if (rv < 0) { + PLOG(ERROR) << "Failed to receive server socket over proxy"; + return false; + } + if (fds.empty()) { + LOG(ERROR) << "Expected at least one file descriptor from proxy"; + return false; + } + + // We don't care if the ACK is received. + code[0] = 'a'; + if (TEMP_FAILURE_RETRY(send(fd, code, sizeof(code), MSG_NOSIGNAL) < 0)) { + PLOG(ERROR) << "Failed to send ACK to proxy"; + return false; + } + + sockfd_ = std::move(fds[0]); + if (!StartWithSocket(true)) { + return false; + } + return Run(); +} + +bool SnapuserServer::RunForSocketHandoff() { + unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy)); + if (proxy_fd < 0) { + PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy; + } + borrowed_fd server_fd(android_get_control_socket(kSnapuserdSocket)); + if (server_fd < 0) { + PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocket; + } + + if (listen(proxy_fd.get(), 4) < 0) { + PLOG(FATAL) << "Proxy listen socket failed"; + } + + if (!android::base::SetProperty("snapuserd.proxy_ready", "true")) { + LOG(FATAL) << "Proxy failed to set ready property"; + } + + unique_fd client_fd( + TEMP_FAILURE_RETRY(accept4(proxy_fd.get(), nullptr, nullptr, SOCK_CLOEXEC))); + if (client_fd < 0) { + PLOG(FATAL) << "Proxy accept failed"; + } + + char code[1] = {'a'}; + std::vector fds = {server_fd.get()}; + ssize_t rv = android::base::SendFileDescriptorVector(client_fd, code, sizeof(code), fds); + if (rv < 0) { + PLOG(FATAL) << "Proxy could not send file descriptor to snapuserd"; + } + // Wait for an ACK - results don't matter, we just don't want to risk closing + // the proxy socket too early. + if (recv(client_fd, code, sizeof(code), 0) < 0) { + PLOG(FATAL) << "Proxy could not receive terminating code from snapuserd"; + } + return true; +} + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h new file mode 100644 index 000000000..e93621ca2 --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h @@ -0,0 +1,142 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "snapuserd_core.h" + +namespace android { +namespace snapshot { + +static constexpr uint32_t MAX_PACKET_SIZE = 512; + +enum class DaemonOps { + INIT, + START, + QUERY, + STOP, + DELETE, + DETACH, + SUPPORTS, + INITIATE, + PERCENTAGE, + GETSTATUS, + INVALID, +}; + +class DmUserHandler { + public: + explicit DmUserHandler(std::shared_ptr snapuserd); + + void FreeResources() { + // Each worker thread holds a reference to snapuserd. + // Clear them so that all the resources + // held by snapuserd is released + if (snapuserd_) { + snapuserd_->FreeResources(); + snapuserd_ = nullptr; + } + } + const std::shared_ptr& snapuserd() const { return snapuserd_; } + std::thread& thread() { return thread_; } + + const std::string& misc_name() const { return misc_name_; } + bool ThreadTerminated() { return thread_terminated_; } + void SetThreadTerminated() { thread_terminated_ = true; } + + private: + std::thread thread_; + std::shared_ptr snapuserd_; + std::string misc_name_; + bool thread_terminated_ = false; +}; + +class SnapuserServer { + private: + android::base::unique_fd sockfd_; + bool terminating_; + volatile bool received_socket_signal_ = false; + std::vector watched_fds_; + bool is_socket_present_ = false; + int num_partitions_merge_complete_ = 0; + + std::mutex lock_; + + using HandlerList = std::vector>; + HandlerList dm_users_; + + void AddWatchedFd(android::base::borrowed_fd fd, int events); + void AcceptClient(); + bool HandleClient(android::base::borrowed_fd fd, int revents); + bool Recv(android::base::borrowed_fd fd, std::string* data); + bool Sendmsg(android::base::borrowed_fd fd, const std::string& msg); + bool Receivemsg(android::base::borrowed_fd fd, const std::string& str); + + void ShutdownThreads(); + bool RemoveAndJoinHandler(const std::string& control_device); + DaemonOps Resolveop(std::string& input); + std::string GetDaemonStatus(); + void Parsemsg(std::string const& msg, const char delim, std::vector& out); + + bool IsTerminating() { return terminating_; } + + void RunThread(std::shared_ptr handler); + void JoinAllThreads(); + bool StartWithSocket(bool start_listening); + + // Find a DmUserHandler within a lock. + HandlerList::iterator FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name); + + double GetMergePercentage(std::lock_guard* proof_of_lock); + void TerminateMergeThreads(std::lock_guard* proof_of_lock); + + public: + SnapuserServer() { terminating_ = false; } + ~SnapuserServer(); + + bool Start(const std::string& socketname); + bool Run(); + void Interrupt(); + bool RunForSocketHandoff(); + bool WaitForSocket(); + + std::shared_ptr AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge); + bool StartHandler(const std::shared_ptr& handler); + bool StartMerge(const std::shared_ptr& handler); + std::string GetMergeStatus(const std::shared_ptr& handler); + + void SetTerminating() { terminating_ = true; } + void ReceivedSocketSignal() { received_socket_signal_ = true; } +}; + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp index 97418bdcc..6c91fde6b 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp @@ -359,5 +359,289 @@ void SnapshotHandler::WaitForMergeComplete() { } } +std::string SnapshotHandler::GetMergeStatus() { + bool merge_not_initiated = false; + bool merge_failed = false; + + { + std::lock_guard lock(lock_); + if (!MergeInitiated()) { + merge_not_initiated = true; + } + + if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) { + merge_failed = true; + } + } + + struct CowHeader* ch = reinterpret_cast(mapped_addr_); + bool merge_complete = (ch->num_merge_ops == reader_->get_num_total_data_ops()); + + if (merge_not_initiated) { + // Merge was not initiated yet; however, we have merge completion + // recorded in the COW Header. This can happen if the device was + // rebooted during merge. During next reboot, libsnapshot will + // query the status and if the merge is completed, then snapshot-status + // file will be deleted + if (merge_complete) { + return "snapshot-merge-complete"; + } + + // Return the state as "snapshot". If the device was rebooted during + // merge, we will return the status as "snapshot". This is ok, as + // libsnapshot will explicitly resume the merge. This is slightly + // different from kernel snapshot wherein once the snapshot was switched + // to merge target, during next boot, we immediately switch to merge + // target. We don't do that here because, during first stage init, we + // don't want to initiate the merge. The problem is that we have daemon + // transition between first and second stage init. If the merge was + // started, then we will have to quiesce the merge before switching + // the dm tables. Instead, we just wait until second stage daemon is up + // before resuming the merge. + return "snapshot"; + } + + if (merge_failed) { + return "snapshot-merge-failed"; + } + + // Merge complete + if (merge_complete) { + return "snapshot-merge-complete"; + } + + // Merge is in-progress + return "snapshot-merge"; +} + +//========== End of Read-ahead state transition functions ==================== + +/* + * Root partitions are mounted off dm-user and the I/O's are served + * by snapuserd worker threads. + * + * When there is an I/O request to be served by worker threads, we check + * if the corresponding sector is "changed" due to OTA by doing a lookup. + * If the lookup succeeds then the sector has been changed and that can + * either fall into 4 COW operations viz: COPY, XOR, REPLACE and ZERO. + * + * For the case of REPLACE and ZERO ops, there is not much of a concern + * as there is no dependency between blocks. Hence all the I/O request + * mapped to these two COW operations will be served by reading the COW device. + * + * However, COPY and XOR ops are tricky. Since the merge operations are + * in-progress, we cannot just go and read from the source device. We need + * to be in sync with the state of the merge thread before serving the I/O. + * + * Given that we know merge thread processes a set of COW ops called as RA + * Blocks - These set of COW ops are fixed size wherein each Block comprises + * of 510 COW ops. + * + * +--------------------------+ + * |op-1|op-2|op-3|....|op-510| + * +--------------------------+ + * + * <------ Merge Group Block N ------> + * + * Thus, a Merge Group Block N, will fall into one of these states and will + * transition the states in the following order: + * + * 1: GROUP_MERGE_PENDING + * 2: GROUP_MERGE_RA_READY + * 2: GROUP_MERGE_IN_PROGRESS + * 3: GROUP_MERGE_COMPLETED + * 4: GROUP_MERGE_FAILED + * + * Let's say that we have the I/O request from dm-user whose sector gets mapped + * to a COPY operation with op-10 in the above "Merge Group Block N". + * + * 1: If the Group is in "GROUP_MERGE_PENDING" state: + * + * Just read the data from source block based on COW op->source field. Note, + * that we will take a ref count on "Block N". This ref count will prevent + * merge thread to begin merging if there are any pending I/Os. Once the I/O + * is completed, ref count on "Group N" is decremented. Merge thread will + * resume merging "Group N" if there are no pending I/Os. + * + * 2: If the Group is in "GROUP_MERGE_IN_PROGRESS" or "GROUP_MERGE_RA_READY" state: + * + * When the merge thread is ready to process a "Group", it will first move + * the state to GROUP_MERGE_PENDING -> GROUP_MERGE_RA_READY. From this point + * onwards, I/O will be served from Read-ahead buffer. However, merge thread + * cannot start merging this "Group" immediately. If there were any in-flight + * I/O requests, merge thread should wait and allow those I/O's to drain. + * Once all the in-flight I/O's are completed, merge thread will move the + * state from "GROUP_MERGE_RA_READY" -> "GROUP_MERGE_IN_PROGRESS". I/O will + * be continued to serve from Read-ahead buffer during the entire duration + * of the merge. + * + * See SetMergeInProgress(). + * + * 3: If the Group is in "GROUP_MERGE_COMPLETED" state: + * + * This is straightforward. We just read the data directly from "Base" + * device. We should not be reading the COW op->source field. + * + * 4: If the Block is in "GROUP_MERGE_FAILED" state: + * + * Terminate the I/O with an I/O error as we don't know which "op" in the + * "Group" failed. + * + * Transition ensures that the I/O from root partitions are never made to + * wait and are processed immediately. Thus the state transition for any + * "Group" is: + * + * GROUP_MERGE_PENDING + * | + * | + * v + * GROUP_MERGE_RA_READY + * | + * | + * v + * GROUP_MERGE_IN_PROGRESS + * | + * |----------------------------(on failure) + * | | + * v v + * GROUP_MERGE_COMPLETED GROUP_MERGE_FAILED + * + */ + +// Invoked by Merge thread +void SnapshotHandler::SetMergeCompleted(size_t ra_index) { + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::lock_guard lock(blk_state->m_lock); + + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS); + CHECK(blk_state->num_ios_in_progress == 0); + + // Merge is complete - All I/O henceforth should be read directly + // from base device + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED; + } +} + +// Invoked by Merge thread. This is called just before the beginning +// of merging a given Block of 510 ops. If there are any in-flight I/O's +// from dm-user then wait for them to complete. +void SnapshotHandler::SetMergeInProgress(size_t ra_index) { + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING); + + // First set the state to RA_READY so that in-flight I/O will drain + // and any new I/O will start reading from RA buffer + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_RA_READY; + + // Wait if there are any in-flight I/O's - we cannot merge at this point + while (!(blk_state->num_ios_in_progress == 0)) { + blk_state->m_cv.wait(lock); + } + + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS; + } +} + +// Invoked by Merge thread on failure +void SnapshotHandler::SetMergeFailed(size_t ra_index) { + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_FAILED; + } +} + +// Invoked by worker threads when I/O is complete on a "MERGE_PENDING" +// Block. If there are no more in-flight I/Os, wake up merge thread +// to resume merging. +void SnapshotHandler::NotifyIOCompletion(uint64_t new_block) { + auto it = block_to_ra_index_.find(new_block); + CHECK(it != block_to_ra_index_.end()) << " invalid block: " << new_block; + + bool pending_ios = true; + + int ra_index = it->second; + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING); + blk_state->num_ios_in_progress -= 1; + if (blk_state->num_ios_in_progress == 0) { + pending_ios = false; + } + } + + // Give a chance to merge-thread to resume merge + // as there are no pending I/O. + if (!pending_ios) { + blk_state->m_cv.notify_all(); + } +} + +bool SnapshotHandler::GetRABuffer(std::unique_lock* lock, uint64_t block, + void* buffer) { + if (!lock->owns_lock()) { + SNAP_LOG(ERROR) << "GetRABuffer - Lock not held"; + return false; + } + std::unordered_map::iterator it = read_ahead_buffer_map_.find(block); + + if (it == read_ahead_buffer_map_.end()) { + SNAP_LOG(ERROR) << "Block: " << block << " not found in RA buffer"; + return false; + } + + memcpy(buffer, it->second, BLOCK_SZ); + return true; +} + +// Invoked by worker threads in the I/O path. This is called when a sector +// is mapped to a COPY/XOR COW op. +MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void* buffer) { + auto it = block_to_ra_index_.find(new_block); + if (it == block_to_ra_index_.end()) { + return MERGE_GROUP_STATE::GROUP_INVALID; + } + + int ra_index = it->second; + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + MERGE_GROUP_STATE state = blk_state->merge_state_; + switch (state) { + case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: { + blk_state->num_ios_in_progress += 1; // ref count + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: { + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_FAILED: { + return state; + } + // Fetch the data from RA buffer. + case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: { + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: { + if (!GetRABuffer(&lock, new_block, buffer)) { + return MERGE_GROUP_STATE::GROUP_INVALID; + } + return state; + } + default: { + return MERGE_GROUP_STATE::GROUP_INVALID; + } + } + } +} + } // namespace snapshot } // namespace android