From be46ca7ff34a9f322712f80ad8aab1d5a64f62e0 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Tue, 17 Aug 2021 07:47:58 +0000 Subject: [PATCH 1/5] snapuserd: Service I/O requests from dm-user Now that merging is done in user-space and partitions are mounted off dm-user directly, daemon will have to serve every I/O request. Daemon has to handle this wherein we need to check if the given I/O request block has been modified in the OTA. Furthermore, if merge is in-progress, we will have to synchronize with the merge thread before servicing the I/O. If the I/O request maps to a REPLACE or ZERO op, we will just read the data from COW device. If the I/O request maps to a COPY or XOR op, the worker thread will have to synchronize with the merge thread and if the merge is in progress, fetch the data directly from RA buffer. This patch handles I/O requests only if the sectors are 4k aligned. Bug: 196929997 Test: snapuserd_test Signed-off-by: Akilesh Kailash Change-Id: I08562b8927e1c22dd9d9ef160e873280854eac99 --- .../include/libsnapshot/cow_format.h | 4 +- .../user-space-merge/snapuserd_core.cpp | 38 +- .../user-space-merge/snapuserd_core.h | 56 ++- .../user-space-merge/snapuserd_dm_user.cpp | 327 +++++++++++++++++- .../user-space-merge/snapuserd_merge.cpp | 13 +- .../user-space-merge/snapuserd_readahead.cpp | 14 +- .../snapuserd_transitions.cpp | 229 ++++++++++++ 7 files changed, 669 insertions(+), 12 deletions(-) diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h index c15682a7e..9f4ddbb61 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h @@ -26,8 +26,8 @@ static constexpr uint32_t kCowVersionMinor = 0; static constexpr uint32_t kCowVersionManifest = 2; -static constexpr uint32_t BLOCK_SZ = 4096; -static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1); +static constexpr size_t BLOCK_SZ = 4096; +static constexpr size_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1); // This header appears as the first sequence of bytes in the COW. All fields // in the layout are little-endian encoded. The on-disk layout is: diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index a2538d297..5730f488f 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -155,13 +155,43 @@ bool SnapshotHandler::ReadMetadata() { // Initialize the iterator for reading metadata std::unique_ptr cowop_iter = reader_->GetMergeOpIter(); + int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ); + int ra_index = 0; + + size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0; + while (!cowop_iter->Done()) { const CowOperation* cow_op = &cowop_iter->Get(); + if (cow_op->type == kCowCopyOp) { + copy_ops += 1; + } else if (cow_op->type == kCowReplaceOp) { + replace_ops += 1; + } else if (cow_op->type == kCowZeroOp) { + zero_ops += 1; + } else if (cow_op->type == kCowXorOp) { + xor_ops += 1; + } + chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op)); - if (!ra_thread_ && IsOrderedOp(*cow_op)) { + if (IsOrderedOp(*cow_op)) { ra_thread_ = true; + block_to_ra_index_[cow_op->new_block] = ra_index; + num_ra_ops_per_iter -= 1; + + if ((ra_index + 1) - merge_blk_state_.size() == 1) { + std::unique_ptr blk_state = std::make_unique( + MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0); + + merge_blk_state_.push_back(std::move(blk_state)); + } + + // Move to next RA block + if (num_ra_ops_per_iter == 0) { + num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ); + ra_index += 1; + } } cowop_iter->Next(); } @@ -173,6 +203,12 @@ bool SnapshotHandler::ReadMetadata() { PrepareReadAhead(); + SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops + << " Total-data-ops: " << reader_->get_num_total_data_ops() + << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops + << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops + << " Xor-ops: " << xor_ops; + return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index c171eda1e..38cfef217 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -67,6 +67,27 @@ enum class MERGE_IO_TRANSITION { class SnapshotHandler; +enum class MERGE_GROUP_STATE { + GROUP_MERGE_PENDING, + GROUP_MERGE_RA_READY, + GROUP_MERGE_IN_PROGRESS, + GROUP_MERGE_COMPLETED, + GROUP_MERGE_FAILED, + GROUP_INVALID, +}; + +struct MergeGroupState { + MERGE_GROUP_STATE merge_state_; + // Ref count I/O when group state + // is in "GROUP_MERGE_PENDING" + size_t num_ios_in_progress; + std::mutex m_lock; + std::condition_variable m_cv; + + MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios) + : merge_state_(state), num_ios_in_progress(n_ios) {} +}; + class ReadAhead { public: ReadAhead(const std::string& cow_device, const std::string& backing_device, @@ -133,16 +154,30 @@ class Worker { base_path_merge_fd_ = {}; } + // Functions interacting with dm-user + bool ReadDmUserHeader(); + bool WriteDmUserPayload(size_t size, bool header_response); + bool DmuserReadRequest(); + // IO Path bool ProcessIORequest(); + bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); } + + bool ReadDataFromBaseDevice(sector_t sector, size_t read_size); + bool ReadFromSourceDevice(const CowOperation* cow_op); + + bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response); + bool RespondIOError(bool header_response); // Processing COW operations + bool ProcessCowOp(const CowOperation* cow_op); bool ProcessReplaceOp(const CowOperation* cow_op); bool ProcessZeroOp(); // Handles Copy and Xor bool ProcessCopyOp(const CowOperation* cow_op); bool ProcessXorOp(const CowOperation* cow_op); + bool ProcessOrderedOp(const CowOperation* cow_op); // Merge related ops bool Merge(); @@ -152,6 +187,9 @@ class Worker { const std::unique_ptr& cowop_iter, std::vector* replace_zero_vec = nullptr); + sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } + chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } + std::unique_ptr reader_; BufferSink bufsink_; XorSink xorsink_; @@ -210,6 +248,7 @@ class SnapshotHandler : public std::enable_shared_from_this { // Read-ahead related functions void* GetMappedAddr() { return mapped_addr_; } void PrepareReadAhead(); + std::unordered_map& GetReadAheadMap() { return read_ahead_buffer_map_; } // State transitions for merge void InitiateMerge(); @@ -239,10 +278,19 @@ class SnapshotHandler : public std::enable_shared_from_this { void SetSocketPresent(bool socket) { is_socket_present_ = socket; } bool MergeInitiated() { return merge_initiated_; } + // Merge Block State Transitions + void SetMergeCompleted(size_t block_index); + void SetMergeInProgress(size_t block_index); + void SetMergeFailed(size_t block_index); + void NotifyIOCompletion(uint64_t new_block); + bool GetRABuffer(std::unique_lock* lock, uint64_t block, void* buffer); + MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer); + private: bool ReadMetadata(); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } + bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } struct BufferState* GetBufferState(); void ReadBlocks(const std::string partition_name, const std::string& dm_block_device); @@ -261,7 +309,6 @@ class SnapshotHandler : public std::enable_shared_from_this { unique_fd cow_fd_; - // Number of sectors required when initializing dm-user uint64_t num_sectors_; std::unique_ptr reader_; @@ -283,6 +330,13 @@ class SnapshotHandler : public std::enable_shared_from_this { int total_ra_blocks_merged_ = 0; MERGE_IO_TRANSITION io_state_; std::unique_ptr read_ahead_thread_; + std::unordered_map read_ahead_buffer_map_; + + // user-space-merging + std::unordered_map block_to_ra_index_; + + // Merge Block state + std::vector> merge_blk_state_; std::unique_ptr merge_thread_; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp index 18c7f2c96..920994450 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp @@ -84,6 +84,56 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) { return true; } +bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) { + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (buffer == nullptr) { + SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer"; + return false; + } + SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block + << " Source: " << cow_op->source; + uint64_t offset = cow_op->source; + if (cow_op->type == kCowCopyOp) { + offset *= BLOCK_SZ; + } + if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) { + std::string op; + if (cow_op->type == kCowCopyOp) + op = "Copy-op"; + else { + op = "Xor-op"; + } + SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_ + << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ; + return false; + } + + return true; +} + +// Start the copy operation. This will read the backing +// block device which is represented by cow_op->source. +bool Worker::ProcessCopyOp(const CowOperation* cow_op) { + if (!ReadFromSourceDevice(cow_op)) { + return false; + } + + return true; +} + +bool Worker::ProcessXorOp(const CowOperation* cow_op) { + if (!ReadFromSourceDevice(cow_op)) { + return false; + } + xorsink_.Reset(); + if (!reader_->ReadData(*cow_op, &xorsink_)) { + SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block; + return false; + } + + return true; +} + bool Worker::ProcessZeroOp() { // Zero out the entire block void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); @@ -96,12 +146,85 @@ bool Worker::ProcessZeroOp() { return true; } -bool Worker::ProcessCopyOp(const CowOperation*) { - return true; +bool Worker::ProcessOrderedOp(const CowOperation* cow_op) { + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (buffer == nullptr) { + SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer"; + return false; + } + + MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer); + + switch (state) { + case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: { + // Merge is completed for this COW op; just read directly from + // the base device + SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: " + << (cow_op->new_block >> SECTOR_SHIFT) + << " Block-number: " << cow_op->new_block; + if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) { + SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: " + << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete."; + return false; + } + return true; + } + case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: { + bool ret; + if (cow_op->type == kCowCopyOp) { + ret = ProcessCopyOp(cow_op); + } else { + ret = ProcessXorOp(cow_op); + } + + // I/O is complete - decrement the refcount irrespective of the return + // status + snapuserd_->NotifyIOCompletion(cow_op->new_block); + return ret; + } + // We already have the data in the buffer retrieved from RA thread. + // Nothing to process further. + case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: { + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: { + return true; + } + default: { + // All other states, fail the I/O viz (GROUP_MERGE_FAILED and GROUP_INVALID) + return false; + } + } + + return false; } -bool Worker::ProcessXorOp(const CowOperation*) { - return true; +bool Worker::ProcessCowOp(const CowOperation* cow_op) { + if (cow_op == nullptr) { + SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op"; + return false; + } + + switch (cow_op->type) { + case kCowReplaceOp: { + return ProcessReplaceOp(cow_op); + } + + case kCowZeroOp: { + return ProcessZeroOp(); + } + + case kCowCopyOp: + [[fallthrough]]; + case kCowXorOp: { + return ProcessOrderedOp(cow_op); + } + + default: { + SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type; + } + } + return false; } void Worker::InitializeBufsink() { @@ -129,7 +252,7 @@ bool Worker::Init() { } bool Worker::RunThread() { - SNAP_LOG(DEBUG) << "Processing snapshot I/O requests..."; + SNAP_LOG(INFO) << "Processing snapshot I/O requests...."; // Start serving IO while (true) { if (!ProcessIORequest()) { @@ -143,8 +266,200 @@ bool Worker::RunThread() { return true; } +// Read Header from dm-user misc device. This gives +// us the sector number for which IO is issued by dm-snapshot device +bool Worker::ReadDmUserHeader() { + if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) { + if (errno != ENOTBLK) { + SNAP_PLOG(ERROR) << "Control-read failed"; + } + + SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed...."; + return false; + } + + return true; +} + +// Send the payload/data back to dm-user misc device. +bool Worker::WriteDmUserPayload(size_t size, bool header_response) { + size_t payload_size = size; + void* buf = bufsink_.GetPayloadBufPtr(); + if (header_response) { + payload_size += sizeof(struct dm_user_header); + buf = bufsink_.GetBufPtr(); + } + + if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) { + SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size; + return false; + } + + return true; +} + +bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) { + CHECK(read_size <= BLOCK_SZ); + + void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ); + if (buffer == nullptr) { + SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer"; + return false; + } + + loff_t offset = sector << SECTOR_SHIFT; + if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) { + SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_ + << "at sector :" << sector << " size: " << read_size; + return false; + } + + return true; +} + +bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + size_t remaining_size = sz; + std::vector>& chunk_vec = snapuserd_->GetChunkVec(); + bool io_error = false; + int ret = 0; + + do { + // Process 1MB payload at a time + size_t read_size = std::min(PAYLOAD_SIZE, remaining_size); + + header->type = DM_USER_RESP_SUCCESS; + size_t total_bytes_read = 0; + io_error = false; + bufsink_.ResetBufferOffset(); + + while (read_size) { + // We need to check every 4k block to verify if it is + // present in the mapping. + size_t size = std::min(BLOCK_SZ, read_size); + + auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), + std::make_pair(sector, nullptr), SnapshotHandler::compare); + bool not_found = (it == chunk_vec.end() || it->first != sector); + + if (not_found) { + // Block not found in map - which means this block was not + // changed as per the OTA. Just route the I/O to the base + // device. + if (!ReadDataFromBaseDevice(sector, size)) { + SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed"; + header->type = DM_USER_RESP_ERROR; + } + + ret = size; + } else { + // We found the sector in mapping. Check the type of COW OP and + // process it. + if (!ProcessCowOp(it->second)) { + SNAP_LOG(ERROR) << "ProcessCowOp failed"; + header->type = DM_USER_RESP_ERROR; + } + + ret = BLOCK_SZ; + } + + // Just return the header if it is an error + if (header->type == DM_USER_RESP_ERROR) { + if (!RespondIOError(header_response)) { + return false; + } + + io_error = true; + break; + } + + read_size -= ret; + total_bytes_read += ret; + sector += (ret >> SECTOR_SHIFT); + bufsink_.UpdateBufferOffset(ret); + } + + if (!io_error) { + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + + SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read + << " header-response: " << header_response + << " remaining_size: " << remaining_size; + header_response = false; + remaining_size -= total_bytes_read; + } + } while (remaining_size > 0 && !io_error); + + return true; +} + +bool Worker::RespondIOError(bool header_response) { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + header->type = DM_USER_RESP_ERROR; + // This is an issue with the dm-user interface. There + // is no way to propagate the I/O error back to dm-user + // if we have already communicated the header back. Header + // is responded once at the beginning; however I/O can + // be processed in chunks. If we encounter an I/O error + // somewhere in the middle of the processing, we can't communicate + // this back to dm-user. + // + // TODO: Fix the interface + CHECK(header_response); + + if (!WriteDmUserPayload(0, header_response)) { + return false; + } + + // There is no need to process further as we have already seen + // an I/O error + return true; +} + +bool Worker::DmuserReadRequest() { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + + // Unaligned I/O request + if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) { + SNAP_LOG(ERROR) << "I/O request is not 4k aligned."; + return false; + } + + return ReadAlignedSector(header->sector, header->len, true); +} + bool Worker::ProcessIORequest() { - // No communication with dm-user yet + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + + if (!ReadDmUserHeader()) { + return false; + } + + SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq; + SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len; + SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector; + SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type; + SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags; + + switch (header->type) { + case DM_USER_REQ_MAP_READ: { + if (!DmuserReadRequest()) { + return false; + } + break; + } + + case DM_USER_REQ_MAP_WRITE: { + // TODO: We should not get any write request + // to dm-user as we mount all partitions + // as read-only. Need to verify how are TRIM commands + // handled during mount. + return false; + } + } + return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp index 696ede72f..47fc7db50 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp @@ -52,7 +52,6 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, break; } - // Check for consecutive blocks uint64_t next_offset = op->new_block * BLOCK_SZ; if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) { break; @@ -177,6 +176,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); + size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOps started...."; @@ -190,9 +190,12 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { + snapuserd_->SetMergeFailed(block_index); return false; } + snapuserd_->SetMergeInProgress(block_index); + loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; @@ -213,6 +216,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Failed to write to backing device while merging " << " at offset: " << source_offset << " io_size: " << io_size; + snapuserd_->SetMergeFailed(block_index); return false; } @@ -226,6 +230,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Flush the data if (fsync(base_path_merge_fd_.get()) < 0) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; + snapuserd_->SetMergeFailed(block_index); return false; } @@ -233,14 +238,20 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; + snapuserd_->SetMergeFailed(block_index); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); + // Mark the block as merge complete + snapuserd_->SetMergeCompleted(block_index); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); + + // Get the next block + block_index += 1; } return true; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp index 319755b11..0bcf26e78 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp @@ -115,7 +115,7 @@ int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops, } bool ReadAhead::ReconstructDataFromCow() { - std::unordered_map read_ahead_buffer_map; + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); loff_t metadata_offset = 0; loff_t start_data_offset = snapuserd_->GetBufferDataOffset(); int num_ops = 0; @@ -319,6 +319,18 @@ bool ReadAhead::ReadAheadIOStart() { memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize()); memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ); + offset = 0; + std::unordered_map& read_ahead_buffer_map = snapuserd_->GetReadAheadMap(); + read_ahead_buffer_map.clear(); + + for (size_t block_index = 0; block_index < blocks.size(); block_index++) { + void* bufptr = static_cast((char*)read_ahead_buffer_ + offset); + uint64_t new_block = blocks[block_index]; + + read_ahead_buffer_map[new_block] = bufptr; + offset += BLOCK_SZ; + } + snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged); // Flush the data only if we have a overlapping blocks in the region diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp index 97418bdcc..ef7a1181f 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp @@ -359,5 +359,234 @@ void SnapshotHandler::WaitForMergeComplete() { } } +//========== End of Read-ahead state transition functions ==================== + +/* + * Root partitions are mounted off dm-user and the I/O's are served + * by snapuserd worker threads. + * + * When there is an I/O request to be served by worker threads, we check + * if the corresponding sector is "changed" due to OTA by doing a lookup. + * If the lookup succeeds then the sector has been changed and that can + * either fall into 4 COW operations viz: COPY, XOR, REPLACE and ZERO. + * + * For the case of REPLACE and ZERO ops, there is not much of a concern + * as there is no dependency between blocks. Hence all the I/O request + * mapped to these two COW operations will be served by reading the COW device. + * + * However, COPY and XOR ops are tricky. Since the merge operations are + * in-progress, we cannot just go and read from the source device. We need + * to be in sync with the state of the merge thread before serving the I/O. + * + * Given that we know merge thread processes a set of COW ops called as RA + * Blocks - These set of COW ops are fixed size wherein each Block comprises + * of 510 COW ops. + * + * +--------------------------+ + * |op-1|op-2|op-3|....|op-510| + * +--------------------------+ + * + * <------ Merge Group Block N ------> + * + * Thus, a Merge Group Block N, will fall into one of these states and will + * transition the states in the following order: + * + * 1: GROUP_MERGE_PENDING + * 2: GROUP_MERGE_RA_READY + * 2: GROUP_MERGE_IN_PROGRESS + * 3: GROUP_MERGE_COMPLETED + * 4: GROUP_MERGE_FAILED + * + * Let's say that we have the I/O request from dm-user whose sector gets mapped + * to a COPY operation with op-10 in the above "Merge Group Block N". + * + * 1: If the Group is in "GROUP_MERGE_PENDING" state: + * + * Just read the data from source block based on COW op->source field. Note, + * that we will take a ref count on "Block N". This ref count will prevent + * merge thread to begin merging if there are any pending I/Os. Once the I/O + * is completed, ref count on "Group N" is decremented. Merge thread will + * resume merging "Group N" if there are no pending I/Os. + * + * 2: If the Group is in "GROUP_MERGE_IN_PROGRESS" or "GROUP_MERGE_RA_READY" state: + * + * When the merge thread is ready to process a "Group", it will first move + * the state to GROUP_MERGE_PENDING -> GROUP_MERGE_RA_READY. From this point + * onwards, I/O will be served from Read-ahead buffer. However, merge thread + * cannot start merging this "Group" immediately. If there were any in-flight + * I/O requests, merge thread should wait and allow those I/O's to drain. + * Once all the in-flight I/O's are completed, merge thread will move the + * state from "GROUP_MERGE_RA_READY" -> "GROUP_MERGE_IN_PROGRESS". I/O will + * be continued to serve from Read-ahead buffer during the entire duration + * of the merge. + * + * See SetMergeInProgress(). + * + * 3: If the Group is in "GROUP_MERGE_COMPLETED" state: + * + * This is straightforward. We just read the data directly from "Base" + * device. We should not be reading the COW op->source field. + * + * 4: If the Block is in "GROUP_MERGE_FAILED" state: + * + * Terminate the I/O with an I/O error as we don't know which "op" in the + * "Group" failed. + * + * Transition ensures that the I/O from root partitions are never made to + * wait and are processed immediately. Thus the state transition for any + * "Group" is: + * + * GROUP_MERGE_PENDING + * | + * | + * v + * GROUP_MERGE_RA_READY + * | + * | + * v + * GROUP_MERGE_IN_PROGRESS + * | + * |----------------------------(on failure) + * | | + * v v + * GROUP_MERGE_COMPLETED GROUP_MERGE_FAILED + * + */ + +// Invoked by Merge thread +void SnapshotHandler::SetMergeCompleted(size_t ra_index) { + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::lock_guard lock(blk_state->m_lock); + + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS); + CHECK(blk_state->num_ios_in_progress == 0); + + // Merge is complete - All I/O henceforth should be read directly + // from base device + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED; + } +} + +// Invoked by Merge thread. This is called just before the beginning +// of merging a given Block of 510 ops. If there are any in-flight I/O's +// from dm-user then wait for them to complete. +void SnapshotHandler::SetMergeInProgress(size_t ra_index) { + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING); + + // First set the state to RA_READY so that in-flight I/O will drain + // and any new I/O will start reading from RA buffer + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_RA_READY; + + // Wait if there are any in-flight I/O's - we cannot merge at this point + while (!(blk_state->num_ios_in_progress == 0)) { + blk_state->m_cv.wait(lock); + } + + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS; + } +} + +// Invoked by Merge thread on failure +void SnapshotHandler::SetMergeFailed(size_t ra_index) { + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_FAILED; + } +} + +// Invoked by worker threads when I/O is complete on a "MERGE_PENDING" +// Block. If there are no more in-flight I/Os, wake up merge thread +// to resume merging. +void SnapshotHandler::NotifyIOCompletion(uint64_t new_block) { + auto it = block_to_ra_index_.find(new_block); + CHECK(it != block_to_ra_index_.end()) << " invalid block: " << new_block; + + bool pending_ios = true; + + int ra_index = it->second; + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING); + blk_state->num_ios_in_progress -= 1; + if (blk_state->num_ios_in_progress == 0) { + pending_ios = false; + } + } + + // Give a chance to merge-thread to resume merge + // as there are no pending I/O. + if (!pending_ios) { + blk_state->m_cv.notify_all(); + } +} + +bool SnapshotHandler::GetRABuffer(std::unique_lock* lock, uint64_t block, + void* buffer) { + if (!lock->owns_lock()) { + SNAP_LOG(ERROR) << "GetRABuffer - Lock not held"; + return false; + } + std::unordered_map::iterator it = read_ahead_buffer_map_.find(block); + + if (it == read_ahead_buffer_map_.end()) { + SNAP_LOG(ERROR) << "Block: " << block << " not found in RA buffer"; + return false; + } + + memcpy(buffer, it->second, BLOCK_SZ); + return true; +} + +// Invoked by worker threads in the I/O path. This is called when a sector +// is mapped to a COPY/XOR COW op. +MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void* buffer) { + auto it = block_to_ra_index_.find(new_block); + if (it == block_to_ra_index_.end()) { + return MERGE_GROUP_STATE::GROUP_INVALID; + } + + int ra_index = it->second; + MergeGroupState* blk_state = merge_blk_state_[ra_index].get(); + { + std::unique_lock lock(blk_state->m_lock); + + MERGE_GROUP_STATE state = blk_state->merge_state_; + switch (state) { + case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: { + blk_state->num_ios_in_progress += 1; // ref count + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: { + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_FAILED: { + return state; + } + // Fetch the data from RA buffer. + case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: { + [[fallthrough]]; + } + case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: { + if (!GetRABuffer(&lock, new_block, buffer)) { + return MERGE_GROUP_STATE::GROUP_INVALID; + } + return state; + } + default: { + return MERGE_GROUP_STATE::GROUP_INVALID; + } + } + } +} + } // namespace snapshot } // namespace android From 8abe050eb519446510d37480e57c149995472f62 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Sat, 2 Oct 2021 10:36:42 +0000 Subject: [PATCH 2/5] snapuserd: I/O requests which are not block aligned. If the I/O request is for a sector which is not block aligned, then we will not find the mapping directly to a COW op as each op is block aligned. Thus, we handle them by chopping the I/O request and processing the unaligned data. Furthermore, if the request doesn't map to any of the COW ops, then route the I/O to the base device. Bug: 196929997 Test: Full OTA on CF - during boot up, we get an I/O request to read superblock on system partition which is not block aligned. Signed-off-by: Akilesh Kailash Change-Id: Ie40633c0c6bc0c87b681c051c74c0ac787e34d9c --- .../user-space-merge/snapuserd_core.h | 3 + .../user-space-merge/snapuserd_dm_user.cpp | 182 +++++++++++++++++- 2 files changed, 183 insertions(+), 2 deletions(-) diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index 38cfef217..d56195242 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -167,6 +167,9 @@ class Worker { bool ReadFromSourceDevice(const CowOperation* cow_op); bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response); + bool ReadUnalignedSector(sector_t sector, size_t size); + int ReadUnalignedSector(sector_t sector, size_t size, + std::vector>::iterator& it); bool RespondIOError(bool header_response); // Processing COW operations diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp index 920994450..bfbacf92e 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp @@ -395,6 +395,185 @@ bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) return true; } +int Worker::ReadUnalignedSector( + sector_t sector, size_t size, + std::vector>::iterator& it) { + size_t skip_sector_size = 0; + + SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size + << " Aligned sector: " << it->first; + + if (!ProcessCowOp(it->second)) { + SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size + << " Aligned sector: " << it->first; + return -1; + } + + int num_sectors_skip = sector - it->first; + + if (num_sectors_skip > 0) { + skip_sector_size = num_sectors_skip << SECTOR_SHIFT; + char* buffer = reinterpret_cast(bufsink_.GetBufPtr()); + struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0])); + + if (skip_sector_size == BLOCK_SZ) { + SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector + << " Base-sector: " << it->first; + return -1; + } + + memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size, + (BLOCK_SZ - skip_sector_size)); + } + + bufsink_.ResetBufferOffset(); + return std::min(size, (BLOCK_SZ - skip_sector_size)); +} + +bool Worker::ReadUnalignedSector(sector_t sector, size_t size) { + struct dm_user_header* header = bufsink_.GetHeaderPtr(); + header->type = DM_USER_RESP_SUCCESS; + bufsink_.ResetBufferOffset(); + std::vector>& chunk_vec = snapuserd_->GetChunkVec(); + + auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr), + SnapshotHandler::compare); + + // |-------|-------|-------| + // 0 1 2 3 + // + // Block 0 - op 1 + // Block 1 - op 2 + // Block 2 - op 3 + // + // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops. + // + // Each block is 4k bytes. Thus, the last block will span 8 sectors + // ranging till block 3 (However, block 3 won't be in chunk_vec as + // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector + // spanning between block 2 and block 3, we need to step back + // and get hold of the last element. + // + // Additionally, we need to make sure that the requested sector is + // indeed within the range of the final sector. It is perfectly valid + // to get an I/O request for block 3 and beyond which are not mapped + // to any COW ops. In that case, we just need to read from the base + // device. + bool merge_complete = false; + bool header_response = true; + if (it == chunk_vec.end()) { + if (chunk_vec.size() > 0) { + // I/O request beyond the last mapped sector + it = std::prev(chunk_vec.end()); + } else { + // This can happen when a partition merge is complete but snapshot + // state in /metadata is not yet deleted; during this window if the + // device is rebooted, subsequent attempt will mount the snapshot. + // However, since the merge was completed we wouldn't have any + // mapping to COW ops thus chunk_vec will be empty. In that case, + // mark this as merge_complete and route the I/O to the base device. + merge_complete = true; + } + } else if (it->first != sector) { + if (it != chunk_vec.begin()) { + --it; + } + } else { + return ReadAlignedSector(sector, size, header_response); + } + + loff_t requested_offset = sector << SECTOR_SHIFT; + + loff_t final_offset = 0; + if (!merge_complete) { + final_offset = it->first << SECTOR_SHIFT; + } + + // Since a COW op span 4k block size, we need to make sure that the requested + // offset is within the 4k region. Consider the following case: + // + // |-------|-------|-------| + // 0 1 2 3 + // + // Block 0 - op 1 + // Block 1 - op 2 + // + // We have an I/O request for a sector between block 2 and block 3. However, + // we have mapping to COW ops only for block 0 and block 1. Thus, the + // requested offset in this case is beyond the last mapped COW op size (which + // is block 1 in this case). + + size_t total_bytes_read = 0; + size_t remaining_size = size; + int ret = 0; + if (!merge_complete && (requested_offset >= final_offset) && + (requested_offset - final_offset) < BLOCK_SZ) { + // Read the partial un-aligned data + ret = ReadUnalignedSector(sector, remaining_size, it); + if (ret < 0) { + SNAP_LOG(ERROR) << "ReadUnalignedSector failed for sector: " << sector + << " size: " << size << " it->sector: " << it->first; + return RespondIOError(header_response); + } + + remaining_size -= ret; + total_bytes_read += ret; + sector += (ret >> SECTOR_SHIFT); + + // Send the data back + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + + header_response = false; + // If we still have pending data to be processed, this will be aligned I/O + if (remaining_size) { + return ReadAlignedSector(sector, remaining_size, header_response); + } + } else { + // This is all about handling I/O request to be routed to base device + // as the I/O is not mapped to any of the COW ops. + loff_t aligned_offset = requested_offset; + // Align to nearest 4k + aligned_offset += BLOCK_SZ - 1; + aligned_offset &= ~(BLOCK_SZ - 1); + // Find the diff of the aligned offset + size_t diff_size = aligned_offset - requested_offset; + CHECK(diff_size <= BLOCK_SZ); + if (remaining_size < diff_size) { + if (!ReadDataFromBaseDevice(sector, remaining_size)) { + return RespondIOError(header_response); + } + total_bytes_read += remaining_size; + + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + } else { + if (!ReadDataFromBaseDevice(sector, diff_size)) { + return RespondIOError(header_response); + } + + total_bytes_read += diff_size; + + if (!WriteDmUserPayload(total_bytes_read, header_response)) { + return false; + } + + remaining_size -= diff_size; + size_t num_sectors_read = (diff_size >> SECTOR_SHIFT); + sector += num_sectors_read; + CHECK(IsBlockAligned(sector << SECTOR_SHIFT)); + header_response = false; + + // If we still have pending data to be processed, this will be aligned I/O + return ReadAlignedSector(sector, remaining_size, header_response); + } + } + + return true; +} + bool Worker::RespondIOError(bool header_response) { struct dm_user_header* header = bufsink_.GetHeaderPtr(); header->type = DM_USER_RESP_ERROR; @@ -423,8 +602,7 @@ bool Worker::DmuserReadRequest() { // Unaligned I/O request if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) { - SNAP_LOG(ERROR) << "I/O request is not 4k aligned."; - return false; + return ReadUnalignedSector(header->sector, header->len); } return ReadAlignedSector(header->sector, header->len, true); From ff590a806c32060864d92c98f92db164aca1d214 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Tue, 17 Aug 2021 23:02:53 +0000 Subject: [PATCH 3/5] snapuserd: Wire up API's for Initiating and tracking Merge Add new client API's for initiating and tracking merge. These API's will be used by libsnapshot. Track the merge completion in the server by walking through all the partitions. Each worker thread will update the merge completion as and when number of COW operations are completed. Server will gather all the completions of each partition and average it out. This is in sync with the current merge completion tracking for dm-snapshot. As a side effect, move the snapuserd_server.h/cpp files to dm-snapshot-merge directory as it will only be a maintaining code. Bug: 193863443 Test: Snapuserd_test Signed-off-by: Akilesh Kailash Change-Id: I031eb1a11b0f426aafbed3d39d85b0c22b9030fb --- fs_mgr/libsnapshot/snapuserd/Android.bp | 3 +- .../snapuserd_server.cpp | 1 + .../snapuserd_server.h | 2 +- .../include/snapuserd/snapuserd_client.h | 6 + .../snapuserd/snapuserd_client.cpp | 21 + .../snapuserd/snapuserd_daemon.cpp | 2 - .../libsnapshot/snapuserd/snapuserd_daemon.h | 2 +- .../user-space-merge/snapuserd_core.cpp | 17 + .../user-space-merge/snapuserd_core.h | 3 + .../user-space-merge/snapuserd_server.cpp | 660 ++++++++++++++++++ .../user-space-merge/snapuserd_server.h | 140 ++++ 11 files changed, 852 insertions(+), 5 deletions(-) rename fs_mgr/libsnapshot/snapuserd/{ => dm-snapshot-merge}/snapuserd_server.cpp (99%) rename fs_mgr/libsnapshot/snapuserd/{ => dm-snapshot-merge}/snapuserd_server.h (99%) create mode 100644 fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp create mode 100644 fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp index 837f33a77..c9b05124b 100644 --- a/fs_mgr/libsnapshot/snapuserd/Android.bp +++ b/fs_mgr/libsnapshot/snapuserd/Android.bp @@ -56,7 +56,7 @@ cc_defaults { "fs_mgr_defaults", ], srcs: [ - "snapuserd_server.cpp", + "dm-snapshot-merge/snapuserd_server.cpp", "dm-snapshot-merge/snapuserd.cpp", "dm-snapshot-merge/snapuserd_worker.cpp", "dm-snapshot-merge/snapuserd_readahead.cpp", @@ -67,6 +67,7 @@ cc_defaults { "user-space-merge/snapuserd_merge.cpp", "user-space-merge/snapuserd_readahead.cpp", "user-space-merge/snapuserd_transitions.cpp", + "user-space-merge/snapuserd_server.cpp", ], cflags: [ diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp similarity index 99% rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp index 91b41908b..9ddc96389 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp +++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp @@ -31,6 +31,7 @@ #include #include #include + #include "snapuserd_server.h" #define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_ diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h similarity index 99% rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h index 14e5de6e7..3b6ff1583 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h +++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h @@ -28,7 +28,7 @@ #include #include -#include "dm-snapshot-merge/snapuserd.h" +#include "snapuserd.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h index aeecf410e..4fa4330b7 100644 --- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h +++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h @@ -79,6 +79,12 @@ class SnapuserdClient { // Returns true if the snapuserd instance supports bridging a socket to second-stage init. bool SupportsSecondStageSocketHandoff(); + + // Returns true if the merge is started(or resumed from crash). + bool InitiateMerge(const std::string& misc_name); + + // Returns Merge completion percentage + double GetMergePercent(); }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp index 1ea05a3b0..87c0ce493 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp @@ -231,5 +231,26 @@ bool SnapuserdClient::DetachSnapuserd() { return true; } +bool SnapuserdClient::InitiateMerge(const std::string& misc_name) { + std::string msg = "initiate_merge," + misc_name; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return false; + } + std::string response = Receivemsg(); + return response == "success"; +} + +double SnapuserdClient::GetMergePercent() { + std::string msg = "merge_percent"; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return false; + } + std::string response = Receivemsg(); + + return std::stod(response); +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp index e05822ed7..912884fd3 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp @@ -21,8 +21,6 @@ #include #include -#include "snapuserd_server.h" - DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path."); DEFINE_bool(no_socket, false, "If true, no socket is used. Each additional argument is an INIT message."); diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h index b660ba2ef..fbf57d943 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h @@ -19,7 +19,7 @@ #include #include -#include "snapuserd_server.h" +#include "dm-snapshot-merge/snapuserd_server.h" namespace android { namespace snapshot { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index 5730f488f..4f32f696d 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -59,6 +59,15 @@ std::unique_ptr SnapshotHandler::CloneReaderForWorker() { return reader_->CloneCowReader(); } +void SnapshotHandler::UpdateMergeCompletionPercentage() { + struct CowHeader* ch = reinterpret_cast(mapped_addr_); + merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops(); + + SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_ + << " num_merge_ops: " << ch->num_merge_ops + << " total-ops: " << reader_->get_num_total_data_ops(); +} + bool SnapshotHandler::CommitMerge(int num_merge_ops) { struct CowHeader* ch = reinterpret_cast(mapped_addr_); ch->num_merge_ops += num_merge_ops; @@ -95,6 +104,12 @@ bool SnapshotHandler::CommitMerge(int num_merge_ops) { } } + // Update the merge completion - this is used by update engine + // to track the completion. No need to take a lock. It is ok + // even if there is a miss on reading a latest updated value. + // Subsequent polling will eventually converge to completion. + UpdateMergeCompletionPercentage(); + return true; } @@ -152,6 +167,8 @@ bool SnapshotHandler::ReadMetadata() { return false; } + UpdateMergeCompletionPercentage(); + // Initialize the iterator for reading metadata std::unique_ptr cowop_iter = reader_->GetMergeOpIter(); diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index d56195242..5e9c7bf14 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -280,6 +280,7 @@ class SnapshotHandler : public std::enable_shared_from_this { int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; } void SetSocketPresent(bool socket) { is_socket_present_ = socket; } bool MergeInitiated() { return merge_initiated_; } + double GetMergePercentage() { return merge_completion_percentage_; } // Merge Block State Transitions void SetMergeCompleted(size_t block_index); @@ -295,6 +296,7 @@ class SnapshotHandler : public std::enable_shared_from_this { chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } struct BufferState* GetBufferState(); + void UpdateMergeCompletionPercentage(); void ReadBlocks(const std::string partition_name, const std::string& dm_block_device); void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name, @@ -342,6 +344,7 @@ class SnapshotHandler : public std::enable_shared_from_this { std::vector> merge_blk_state_; std::unique_ptr merge_thread_; + double merge_completion_percentage_; bool merge_initiated_ = false; bool attached_ = false; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp new file mode 100644 index 000000000..a9b1d17ab --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp @@ -0,0 +1,660 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "snapuserd_server.h" + +#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_ +#include + +namespace android { +namespace snapshot { + +using namespace std::string_literals; + +using android::base::borrowed_fd; +using android::base::unique_fd; + +DaemonOps SnapuserServer::Resolveop(std::string& input) { + if (input == "init") return DaemonOps::INIT; + if (input == "start") return DaemonOps::START; + if (input == "stop") return DaemonOps::STOP; + if (input == "query") return DaemonOps::QUERY; + if (input == "delete") return DaemonOps::DELETE; + if (input == "detach") return DaemonOps::DETACH; + if (input == "supports") return DaemonOps::SUPPORTS; + if (input == "initiate_merge") return DaemonOps::INITIATE; + if (input == "merge_percent") return DaemonOps::PERCENTAGE; + + return DaemonOps::INVALID; +} + +SnapuserServer::~SnapuserServer() { + // Close any client sockets that were added via AcceptClient(). + for (size_t i = 1; i < watched_fds_.size(); i++) { + close(watched_fds_[i].fd); + } +} + +std::string SnapuserServer::GetDaemonStatus() { + std::string msg = ""; + + if (IsTerminating()) + msg = "passive"; + else + msg = "active"; + + return msg; +} + +void SnapuserServer::Parsemsg(std::string const& msg, const char delim, + std::vector& out) { + std::stringstream ss(msg); + std::string s; + + while (std::getline(ss, s, delim)) { + out.push_back(s); + } +} + +void SnapuserServer::ShutdownThreads() { + terminating_ = true; + JoinAllThreads(); +} + +DmUserHandler::DmUserHandler(std::shared_ptr snapuserd) + : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {} + +bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) { + ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL)); + if (ret < 0) { + PLOG(ERROR) << "Snapuserd:server: send() failed"; + return false; + } + + if (ret < msg.size()) { + LOG(ERROR) << "Partial send; expected " << msg.size() << " bytes, sent " << ret; + return false; + } + return true; +} + +bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) { + char msg[MAX_PACKET_SIZE]; + ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0)); + if (rv < 0) { + PLOG(ERROR) << "recv failed"; + return false; + } + *data = std::string(msg, rv); + return true; +} + +bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) { + const char delim = ','; + + std::vector out; + Parsemsg(str, delim, out); + DaemonOps op = Resolveop(out[0]); + + switch (op) { + case DaemonOps::INIT: { + // Message format: + // init,,,, + // + // Reads the metadata and send the number of sectors + if (out.size() != 5) { + LOG(ERROR) << "Malformed init message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + + auto handler = AddHandler(out[1], out[2], out[3], out[4]); + if (!handler) { + return Sendmsg(fd, "fail"); + } + + auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors()); + return Sendmsg(fd, retval); + } + case DaemonOps::START: { + // Message format: + // start, + // + // Start the new thread which binds to dm-user misc device + if (out.size() != 2) { + LOG(ERROR) << "Malformed start message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "fail"); + } + if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) { + LOG(ERROR) << "Tried to re-attach control device: " << out[1]; + return Sendmsg(fd, "fail"); + } + if (!StartHandler(*iter)) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } + case DaemonOps::STOP: { + // Message format: stop + // + // Stop all the threads gracefully and then shutdown the + // main thread + SetTerminating(); + ShutdownThreads(); + return true; + } + case DaemonOps::QUERY: { + // Message format: query + // + // As part of transition, Second stage daemon will be + // created before terminating the first stage daemon. Hence, + // for a brief period client may have to distiguish between + // first stage daemon and second stage daemon. + // + // Second stage daemon is marked as active and hence will + // be ready to receive control message. + return Sendmsg(fd, GetDaemonStatus()); + } + case DaemonOps::DELETE: { + // Message format: + // delete, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + // After merge is completed, we swap dm-user table with + // the underlying dm-linear base device. Hence, worker + // threads would have terminted and was removed from + // the list. + LOG(DEBUG) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "success"); + } + + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } + if (!RemoveAndJoinHandler(out[1])) { + return Sendmsg(fd, "fail"); + } + return Sendmsg(fd, "success"); + } + case DaemonOps::DETACH: { + std::lock_guard lock(lock_); + TerminateMergeThreads(&lock); + terminating_ = true; + return true; + } + case DaemonOps::SUPPORTS: { + if (out.size() != 2) { + LOG(ERROR) << "Malformed supports message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[1] == "second_stage_socket_handoff") { + return Sendmsg(fd, "success"); + } + return Sendmsg(fd, "fail"); + } + case DaemonOps::INITIATE: { + if (out.size() != 2) { + LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts"; + return Sendmsg(fd, "fail"); + } + if (out[0] == "initiate_merge") { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "fail"); + } + + if (!StartMerge(*iter)) { + return Sendmsg(fd, "fail"); + } + + return Sendmsg(fd, "success"); + } + return Sendmsg(fd, "fail"); + } + case DaemonOps::PERCENTAGE: { + std::lock_guard lock(lock_); + double percentage = GetMergePercentage(&lock); + + return Sendmsg(fd, std::to_string(percentage)); + } + default: { + LOG(ERROR) << "Received unknown message type from client"; + Sendmsg(fd, "fail"); + return false; + } + } +} + +void SnapuserServer::RunThread(std::shared_ptr handler) { + LOG(INFO) << "Entering thread for handler: " << handler->misc_name(); + + handler->snapuserd()->SetSocketPresent(is_socket_present_); + if (!handler->snapuserd()->Start()) { + LOG(ERROR) << " Failed to launch all worker threads"; + } + + handler->snapuserd()->CloseFds(); + handler->snapuserd()->CheckMergeCompletionStatus(); + handler->snapuserd()->UnmapBufferRegion(); + + auto misc_name = handler->misc_name(); + LOG(INFO) << "Handler thread about to exit: " << misc_name; + + { + std::lock_guard lock(lock_); + num_partitions_merge_complete_ += 1; + handler->SetThreadTerminated(); + auto iter = FindHandler(&lock, handler->misc_name()); + if (iter == dm_users_.end()) { + // RemoveAndJoinHandler() already removed us from the list, and is + // now waiting on a join(), so just return. Additionally, release + // all the resources held by snapuserd object which are shared + // by worker threads. This should be done when the last reference + // of "handler" is released; but we will explicitly release here + // to make sure snapuserd object is freed as it is the biggest + // consumer of memory in the daemon. + handler->FreeResources(); + LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name; + return; + } + + LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name; + + if (handler->snapuserd()->IsAttached()) { + handler->thread().detach(); + } + + // Important: free resources within the lock. This ensures that if + // WaitForDelete() is called, the handler is either in the list, or + // it's not and its resources are guaranteed to be freed. + handler->FreeResources(); + dm_users_.erase(iter); + } +} + +bool SnapuserServer::Start(const std::string& socketname) { + bool start_listening = true; + + sockfd_.reset(android_get_control_socket(socketname.c_str())); + if (sockfd_ < 0) { + sockfd_.reset(socket_local_server(socketname.c_str(), ANDROID_SOCKET_NAMESPACE_RESERVED, + SOCK_STREAM)); + if (sockfd_ < 0) { + PLOG(ERROR) << "Failed to create server socket " << socketname; + return false; + } + start_listening = false; + } + return StartWithSocket(start_listening); +} + +bool SnapuserServer::StartWithSocket(bool start_listening) { + if (start_listening && listen(sockfd_.get(), 4) < 0) { + PLOG(ERROR) << "listen socket failed"; + return false; + } + + AddWatchedFd(sockfd_, POLLIN); + is_socket_present_ = true; + + // If started in first-stage init, the property service won't be online. + if (access("/dev/socket/property_service", F_OK) == 0) { + if (!android::base::SetProperty("snapuserd.ready", "true")) { + LOG(ERROR) << "Failed to set snapuserd.ready property"; + return false; + } + } + + LOG(DEBUG) << "Snapuserd server now accepting connections"; + return true; +} + +bool SnapuserServer::Run() { + LOG(INFO) << "Now listening on snapuserd socket"; + + while (!IsTerminating()) { + int rv = TEMP_FAILURE_RETRY(poll(watched_fds_.data(), watched_fds_.size(), -1)); + if (rv < 0) { + PLOG(ERROR) << "poll failed"; + return false; + } + if (!rv) { + continue; + } + + if (watched_fds_[0].revents) { + AcceptClient(); + } + + auto iter = watched_fds_.begin() + 1; + while (iter != watched_fds_.end()) { + if (iter->revents && !HandleClient(iter->fd, iter->revents)) { + close(iter->fd); + iter = watched_fds_.erase(iter); + } else { + iter++; + } + } + } + + JoinAllThreads(); + return true; +} + +void SnapuserServer::JoinAllThreads() { + // Acquire the thread list within the lock. + std::vector> dm_users; + { + std::lock_guard guard(lock_); + dm_users = std::move(dm_users_); + } + + for (auto& client : dm_users) { + auto& th = client->thread(); + + if (th.joinable()) th.join(); + } +} + +void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) { + struct pollfd p = {}; + p.fd = fd.get(); + p.events = events; + watched_fds_.emplace_back(std::move(p)); +} + +void SnapuserServer::AcceptClient() { + int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC)); + if (fd < 0) { + PLOG(ERROR) << "accept4 failed"; + return; + } + + AddWatchedFd(fd, POLLIN); +} + +bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) { + if (revents & POLLHUP) { + LOG(DEBUG) << "Snapuserd client disconnected"; + return false; + } + + std::string str; + if (!Recv(fd, &str)) { + return false; + } + if (!Receivemsg(fd, str)) { + LOG(ERROR) << "Encountered error handling client message, revents: " << revents; + return false; + } + return true; +} + +void SnapuserServer::Interrupt() { + // Force close the socket so poll() fails. + sockfd_ = {}; + SetTerminating(); +} + +std::shared_ptr SnapuserServer::AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge) { + auto snapuserd = std::make_shared(misc_name, cow_device_path, backing_device, + base_path_merge); + if (!snapuserd->InitCowDevice()) { + LOG(ERROR) << "Failed to initialize Snapuserd"; + return nullptr; + } + + if (!snapuserd->InitializeWorkers()) { + LOG(ERROR) << "Failed to initialize workers"; + return nullptr; + } + + auto handler = std::make_shared(snapuserd); + { + std::lock_guard lock(lock_); + if (FindHandler(&lock, misc_name) != dm_users_.end()) { + LOG(ERROR) << "Handler already exists: " << misc_name; + return nullptr; + } + dm_users_.push_back(handler); + } + return handler; +} + +bool SnapuserServer::StartHandler(const std::shared_ptr& handler) { + if (handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler already attached"; + return false; + } + + handler->snapuserd()->AttachControlDevice(); + + handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler)); + return true; +} + +bool SnapuserServer::StartMerge(const std::shared_ptr& handler) { + if (!handler->snapuserd()->IsAttached()) { + LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started"; + return false; + } + + handler->snapuserd()->InitiateMerge(); + return true; +} + +auto SnapuserServer::FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name) -> HandlerList::iterator { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if ((*iter)->misc_name() == misc_name) { + return iter; + } + } + return dm_users_.end(); +} + +void SnapuserServer::TerminateMergeThreads(std::lock_guard* proof_of_lock) { + CHECK(proof_of_lock); + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + if (!(*iter)->ThreadTerminated()) { + (*iter)->snapuserd()->NotifyIOTerminated(); + } + } +} + +double SnapuserServer::GetMergePercentage(std::lock_guard* proof_of_lock) { + CHECK(proof_of_lock); + double percentage = 0.0; + int n = 0; + + for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) { + auto& th = (*iter)->thread(); + if (th.joinable()) { + // Merge percentage by individual partitions wherein merge is still + // in-progress + percentage += (*iter)->snapuserd()->GetMergePercentage(); + n += 1; + } + } + + // Calculate final merge including those partitions where merge was already + // completed - num_partitions_merge_complete_ will track them when each + // thread exists in RunThread. + int total_partitions = n + num_partitions_merge_complete_; + + if (total_partitions) { + percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions; + } + + LOG(DEBUG) << "Merge %: " << percentage + << " num_partitions_merge_complete_: " << num_partitions_merge_complete_ + << " total_partitions: " << total_partitions << " n: " << n; + return percentage; +} + +bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) { + std::shared_ptr handler; + { + std::lock_guard lock(lock_); + + auto iter = FindHandler(&lock, misc_name); + if (iter == dm_users_.end()) { + // Client already deleted. + return true; + } + handler = std::move(*iter); + dm_users_.erase(iter); + } + + auto& th = handler->thread(); + if (th.joinable()) { + th.join(); + } + return true; +} + +bool SnapuserServer::WaitForSocket() { + auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); }); + + auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy; + + if (!android::fs_mgr::WaitForFile(socket_path, std::chrono::milliseconds::max())) { + LOG(ERROR) + << "Failed to wait for proxy socket, second-stage snapuserd will fail to connect"; + return false; + } + + // We must re-initialize property service access, since we launched before + // second-stage init. + __system_properties_init(); + + if (!android::base::WaitForProperty("snapuserd.proxy_ready", "true")) { + LOG(ERROR) + << "Failed to wait for proxy property, second-stage snapuserd will fail to connect"; + return false; + } + + unique_fd fd(socket_local_client(kSnapuserdSocketProxy, ANDROID_SOCKET_NAMESPACE_RESERVED, + SOCK_SEQPACKET)); + if (fd < 0) { + PLOG(ERROR) << "Failed to connect to socket proxy"; + return false; + } + + char code[1]; + std::vector fds; + ssize_t rv = android::base::ReceiveFileDescriptorVector(fd, code, sizeof(code), 1, &fds); + if (rv < 0) { + PLOG(ERROR) << "Failed to receive server socket over proxy"; + return false; + } + if (fds.empty()) { + LOG(ERROR) << "Expected at least one file descriptor from proxy"; + return false; + } + + // We don't care if the ACK is received. + code[0] = 'a'; + if (TEMP_FAILURE_RETRY(send(fd, code, sizeof(code), MSG_NOSIGNAL) < 0)) { + PLOG(ERROR) << "Failed to send ACK to proxy"; + return false; + } + + sockfd_ = std::move(fds[0]); + if (!StartWithSocket(true)) { + return false; + } + return Run(); +} + +bool SnapuserServer::RunForSocketHandoff() { + unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy)); + if (proxy_fd < 0) { + PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy; + } + borrowed_fd server_fd(android_get_control_socket(kSnapuserdSocket)); + if (server_fd < 0) { + PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocket; + } + + if (listen(proxy_fd.get(), 4) < 0) { + PLOG(FATAL) << "Proxy listen socket failed"; + } + + if (!android::base::SetProperty("snapuserd.proxy_ready", "true")) { + LOG(FATAL) << "Proxy failed to set ready property"; + } + + unique_fd client_fd( + TEMP_FAILURE_RETRY(accept4(proxy_fd.get(), nullptr, nullptr, SOCK_CLOEXEC))); + if (client_fd < 0) { + PLOG(FATAL) << "Proxy accept failed"; + } + + char code[1] = {'a'}; + std::vector fds = {server_fd.get()}; + ssize_t rv = android::base::SendFileDescriptorVector(client_fd, code, sizeof(code), fds); + if (rv < 0) { + PLOG(FATAL) << "Proxy could not send file descriptor to snapuserd"; + } + // Wait for an ACK - results don't matter, we just don't want to risk closing + // the proxy socket too early. + if (recv(client_fd, code, sizeof(code), 0) < 0) { + PLOG(FATAL) << "Proxy could not receive terminating code from snapuserd"; + } + return true; +} + +} // namespace snapshot +} // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h new file mode 100644 index 000000000..6fc3a9d97 --- /dev/null +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h @@ -0,0 +1,140 @@ +// Copyright (C) 2020 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "snapuserd_core.h" + +namespace android { +namespace snapshot { + +static constexpr uint32_t MAX_PACKET_SIZE = 512; + +enum class DaemonOps { + INIT, + START, + QUERY, + STOP, + DELETE, + DETACH, + SUPPORTS, + INITIATE, + PERCENTAGE, + INVALID, +}; + +class DmUserHandler { + public: + explicit DmUserHandler(std::shared_ptr snapuserd); + + void FreeResources() { + // Each worker thread holds a reference to snapuserd. + // Clear them so that all the resources + // held by snapuserd is released + if (snapuserd_) { + snapuserd_->FreeResources(); + snapuserd_ = nullptr; + } + } + const std::shared_ptr& snapuserd() const { return snapuserd_; } + std::thread& thread() { return thread_; } + + const std::string& misc_name() const { return misc_name_; } + bool ThreadTerminated() { return thread_terminated_; } + void SetThreadTerminated() { thread_terminated_ = true; } + + private: + std::thread thread_; + std::shared_ptr snapuserd_; + std::string misc_name_; + bool thread_terminated_ = false; +}; + +class SnapuserServer { + private: + android::base::unique_fd sockfd_; + bool terminating_; + volatile bool received_socket_signal_ = false; + std::vector watched_fds_; + bool is_socket_present_ = false; + int num_partitions_merge_complete_ = 0; + + std::mutex lock_; + + using HandlerList = std::vector>; + HandlerList dm_users_; + + void AddWatchedFd(android::base::borrowed_fd fd, int events); + void AcceptClient(); + bool HandleClient(android::base::borrowed_fd fd, int revents); + bool Recv(android::base::borrowed_fd fd, std::string* data); + bool Sendmsg(android::base::borrowed_fd fd, const std::string& msg); + bool Receivemsg(android::base::borrowed_fd fd, const std::string& str); + + void ShutdownThreads(); + bool RemoveAndJoinHandler(const std::string& control_device); + DaemonOps Resolveop(std::string& input); + std::string GetDaemonStatus(); + void Parsemsg(std::string const& msg, const char delim, std::vector& out); + + bool IsTerminating() { return terminating_; } + + void RunThread(std::shared_ptr handler); + void JoinAllThreads(); + bool StartWithSocket(bool start_listening); + + // Find a DmUserHandler within a lock. + HandlerList::iterator FindHandler(std::lock_guard* proof_of_lock, + const std::string& misc_name); + + double GetMergePercentage(std::lock_guard* proof_of_lock); + void TerminateMergeThreads(std::lock_guard* proof_of_lock); + + public: + SnapuserServer() { terminating_ = false; } + ~SnapuserServer(); + + bool Start(const std::string& socketname); + bool Run(); + void Interrupt(); + bool RunForSocketHandoff(); + bool WaitForSocket(); + + std::shared_ptr AddHandler(const std::string& misc_name, + const std::string& cow_device_path, + const std::string& backing_device, + const std::string& base_path_merge); + bool StartHandler(const std::shared_ptr& handler); + bool StartMerge(const std::shared_ptr& handler); + + void SetTerminating() { terminating_ = true; } + void ReceivedSocketSignal() { received_socket_signal_ = true; } +}; + +} // namespace snapshot +} // namespace android From b94353cae0ecd273237ccd30f46c97330de5311a Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Sat, 2 Oct 2021 11:24:36 +0000 Subject: [PATCH 4/5] snapuserd: API to query snapshot and merge status Add new API to query the snapshot and merge status. This will be used by libsnapshot. Bug: 193863443 Test: Full OTA on CF Signed-off-by: Akilesh Kailash Change-Id: I86cffff6a979e2e2bf1d8d1a1770e209eeb4a47d --- .../include/snapuserd/snapuserd_client.h | 3 + .../snapuserd/snapuserd_client.cpp | 9 +++ .../user-space-merge/snapuserd_core.h | 2 + .../user-space-merge/snapuserd_server.cpp | 24 ++++++++ .../user-space-merge/snapuserd_server.h | 2 + .../snapuserd_transitions.cpp | 55 +++++++++++++++++++ 6 files changed, 95 insertions(+) diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h index 4fa4330b7..6ed55af2d 100644 --- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h +++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h @@ -85,6 +85,9 @@ class SnapuserdClient { // Returns Merge completion percentage double GetMergePercent(); + + // Return the status of the snapshot + std::string QuerySnapshotStatus(const std::string& misc_name); }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp index 87c0ce493..e345269ae 100644 --- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp +++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp @@ -252,5 +252,14 @@ double SnapuserdClient::GetMergePercent() { return std::stod(response); } +std::string SnapuserdClient::QuerySnapshotStatus(const std::string& misc_name) { + std::string msg = "getstatus," + misc_name; + if (!Sendmsg(msg)) { + LOG(ERROR) << "Failed to send message " << msg << " to snapuserd"; + return "snapshot-merge-failed"; + } + return Receivemsg(); +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index 5e9c7bf14..13b56facb 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -268,6 +268,8 @@ class SnapshotHandler : public std::enable_shared_from_this { bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; } void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; } + // Return the snapshot status + std::string GetMergeStatus(); // RA related functions uint64_t GetBufferMetadataOffset(); diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp index a9b1d17ab..a4fd5a035 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp @@ -54,6 +54,7 @@ DaemonOps SnapuserServer::Resolveop(std::string& input) { if (input == "supports") return DaemonOps::SUPPORTS; if (input == "initiate_merge") return DaemonOps::INITIATE; if (input == "merge_percent") return DaemonOps::PERCENTAGE; + if (input == "getstatus") return DaemonOps::GETSTATUS; return DaemonOps::INVALID; } @@ -262,6 +263,25 @@ bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string return Sendmsg(fd, std::to_string(percentage)); } + case DaemonOps::GETSTATUS: { + // Message format: + // getstatus, + if (out.size() != 2) { + LOG(ERROR) << "Malformed delete message, " << out.size() << " parts"; + return Sendmsg(fd, "snapshot-merge-failed"); + } + { + std::lock_guard lock(lock_); + auto iter = FindHandler(&lock, out[1]); + if (iter == dm_users_.end()) { + LOG(ERROR) << "Could not find handler: " << out[1]; + return Sendmsg(fd, "snapshot-merge-failed"); + } + + std::string merge_status = GetMergeStatus(*iter); + return Sendmsg(fd, merge_status); + } + } default: { LOG(ERROR) << "Received unknown message type from client"; Sendmsg(fd, "fail"); @@ -513,6 +533,10 @@ void SnapuserServer::TerminateMergeThreads(std::lock_guard* proof_of } } +std::string SnapuserServer::GetMergeStatus(const std::shared_ptr& handler) { + return handler->snapuserd()->GetMergeStatus(); +} + double SnapuserServer::GetMergePercentage(std::lock_guard* proof_of_lock) { CHECK(proof_of_lock); double percentage = 0.0; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h index 6fc3a9d97..e93621ca2 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h @@ -45,6 +45,7 @@ enum class DaemonOps { SUPPORTS, INITIATE, PERCENTAGE, + GETSTATUS, INVALID, }; @@ -131,6 +132,7 @@ class SnapuserServer { const std::string& base_path_merge); bool StartHandler(const std::shared_ptr& handler); bool StartMerge(const std::shared_ptr& handler); + std::string GetMergeStatus(const std::shared_ptr& handler); void SetTerminating() { terminating_ = true; } void ReceivedSocketSignal() { received_socket_signal_ = true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp index ef7a1181f..6c91fde6b 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp @@ -359,6 +359,61 @@ void SnapshotHandler::WaitForMergeComplete() { } } +std::string SnapshotHandler::GetMergeStatus() { + bool merge_not_initiated = false; + bool merge_failed = false; + + { + std::lock_guard lock(lock_); + if (!MergeInitiated()) { + merge_not_initiated = true; + } + + if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) { + merge_failed = true; + } + } + + struct CowHeader* ch = reinterpret_cast(mapped_addr_); + bool merge_complete = (ch->num_merge_ops == reader_->get_num_total_data_ops()); + + if (merge_not_initiated) { + // Merge was not initiated yet; however, we have merge completion + // recorded in the COW Header. This can happen if the device was + // rebooted during merge. During next reboot, libsnapshot will + // query the status and if the merge is completed, then snapshot-status + // file will be deleted + if (merge_complete) { + return "snapshot-merge-complete"; + } + + // Return the state as "snapshot". If the device was rebooted during + // merge, we will return the status as "snapshot". This is ok, as + // libsnapshot will explicitly resume the merge. This is slightly + // different from kernel snapshot wherein once the snapshot was switched + // to merge target, during next boot, we immediately switch to merge + // target. We don't do that here because, during first stage init, we + // don't want to initiate the merge. The problem is that we have daemon + // transition between first and second stage init. If the merge was + // started, then we will have to quiesce the merge before switching + // the dm tables. Instead, we just wait until second stage daemon is up + // before resuming the merge. + return "snapshot"; + } + + if (merge_failed) { + return "snapshot-merge-failed"; + } + + // Merge complete + if (merge_complete) { + return "snapshot-merge-complete"; + } + + // Merge is in-progress + return "snapshot-merge"; +} + //========== End of Read-ahead state transition functions ==================== /* From 2ba8eea4896716dc9bb5576fa0262e4a3e194b34 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Tue, 17 Aug 2021 23:42:16 +0000 Subject: [PATCH 5/5] snapuserd: Sort REPLACE ops for batch merge Since we will be iterating forward for user-space merge, we need to sort the blocks in increasing order so that blocks can be batch merged if contiguous. For dm-snapshot merging, we will continue to sort in decreasing order. Bug: 193863397 Test: Snapuserd_test Signed-off-by: Akilesh Kailash Change-Id: I25fb5fce054f716a2ad0dddc0d0c3afef18bc7ad --- fs_mgr/libsnapshot/cow_reader.cpp | 21 +++++++++++++++---- .../include/libsnapshot/cow_reader.h | 8 ++++++- .../user-space-merge/snapuserd_core.cpp | 2 +- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp index 5306b2843..20030b940 100644 --- a/fs_mgr/libsnapshot/cow_reader.cpp +++ b/fs_mgr/libsnapshot/cow_reader.cpp @@ -34,11 +34,12 @@ namespace android { namespace snapshot { -CowReader::CowReader() +CowReader::CowReader(ReaderFlags reader_flag) : fd_(-1), header_(), fd_size_(0), - merge_op_blocks_(std::make_shared>()) {} + merge_op_blocks_(std::make_shared>()), + reader_flag_(reader_flag) {} static void SHA256(const void*, size_t, uint8_t[]) { #if 0 @@ -415,7 +416,7 @@ bool CowReader::ParseOps(std::optional label) { //============================================================== bool CowReader::PrepMergeOps() { auto merge_op_blocks = std::make_shared>(); - std::set> other_ops; + std::vector other_ops; auto seq_ops_set = std::unordered_set(); auto block_map = std::make_shared>(); size_t num_seqs = 0; @@ -446,7 +447,7 @@ bool CowReader::PrepMergeOps() { if (!has_seq_ops_ && IsOrderedOp(current_op)) { merge_op_blocks->emplace_back(current_op.new_block); } else if (seq_ops_set.count(current_op.new_block) == 0) { - other_ops.insert(current_op.new_block); + other_ops.push_back(current_op.new_block); } block_map->insert({current_op.new_block, i}); } @@ -462,6 +463,18 @@ bool CowReader::PrepMergeOps() { } else { num_ordered_ops_to_merge_ = 0; } + + // Sort the vector in increasing order if merging in user-space as + // we can batch merge them when iterating from forward. + // + // dm-snapshot-merge requires decreasing order as we iterate the blocks + // in reverse order. + if (reader_flag_ == ReaderFlags::USERSPACE_MERGE) { + std::sort(other_ops.begin(), other_ops.end()); + } else { + std::sort(other_ops.begin(), other_ops.end(), std::greater()); + } + merge_op_blocks->reserve(merge_op_blocks->size() + other_ops.size()); for (auto block : other_ops) { merge_op_blocks->emplace_back(block); diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index 63a9e687a..d5b433531 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -104,7 +104,12 @@ class ICowOpIter { class CowReader final : public ICowReader { public: - CowReader(); + enum class ReaderFlags { + DEFAULT = 0, + USERSPACE_MERGE = 1, + }; + + CowReader(ReaderFlags reader_flag = ReaderFlags::DEFAULT); ~CowReader() { owned_fd_ = {}; } // Parse the COW, optionally, up to the given label. If no label is @@ -166,6 +171,7 @@ class CowReader final : public ICowReader { uint64_t num_ordered_ops_to_merge_; bool has_seq_ops_; std::shared_ptr> data_loc_; + ReaderFlags reader_flag_; }; } // namespace snapshot diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index 4f32f696d..57e47e7ed 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -139,7 +139,7 @@ void SnapshotHandler::CheckMergeCompletionStatus() { } bool SnapshotHandler::ReadMetadata() { - reader_ = std::make_unique(); + reader_ = std::make_unique(CowReader::ReaderFlags::USERSPACE_MERGE); CowHeader header; CowOptions options;