From 325e2acbb9e19cdbe49fee7a33af2f0cb45df068 Mon Sep 17 00:00:00 2001 From: Akilesh Kailash Date: Thu, 24 Feb 2022 02:08:51 +0000 Subject: [PATCH] snapuserd: Fallback to synchronous I/O if any errors observed during async merge. If there are any I/O errors during async merge, we will retry the I/O in synchronous I/O path. For this to happen, we have to reset the iterator so that we replay the blocks which were partially completed during async merge. Furthermore, we will disable the async merge and continue to do the I/O in synchronous path. Additionally, cut down the queue depth to 8 so that it will decerease the number of the async offload. We don't want to have a big queue depth with async offload. Bug: 220991038 Test: Instrument the code to fail the Async I/O's randomly and make sure merge is completed. Instrumentation was done both on readahead and merge code path. Signed-off-by: Akilesh Kailash Change-Id: I0db6d0f46054ca5b8423201a598c726b2c3d21ac --- fs_mgr/libsnapshot/cow_reader.cpp | 36 ++++ .../include/libsnapshot/cow_reader.h | 8 +- .../user-space-merge/snapuserd_core.cpp | 27 +-- .../user-space-merge/snapuserd_core.h | 32 ++- .../user-space-merge/snapuserd_merge.cpp | 190 +++++++++++------- .../user-space-merge/snapuserd_readahead.cpp | 58 ++++-- .../snapuserd_transitions.cpp | 7 + 7 files changed, 247 insertions(+), 111 deletions(-) diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp index 9b5fd2aec..746feeb8d 100644 --- a/fs_mgr/libsnapshot/cow_reader.cpp +++ b/fs_mgr/libsnapshot/cow_reader.cpp @@ -550,6 +550,9 @@ class CowOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; + void Prev() override; + bool RDone() override; + private: std::shared_ptr> ops_; std::vector::iterator op_iter_; @@ -560,6 +563,15 @@ CowOpIter::CowOpIter(std::shared_ptr>& ops) { op_iter_ = ops_->begin(); } +bool CowOpIter::RDone() { + return op_iter_ == ops_->begin(); +} + +void CowOpIter::Prev() { + CHECK(!RDone()); + op_iter_--; +} + bool CowOpIter::Done() { return op_iter_ == ops_->end(); } @@ -585,6 +597,9 @@ class CowRevMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; + void Prev() override; + bool RDone() override; + private: std::shared_ptr> ops_; std::shared_ptr> merge_op_blocks_; @@ -603,6 +618,9 @@ class CowMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; + void Prev() override; + bool RDone() override; + private: std::shared_ptr> ops_; std::shared_ptr> merge_op_blocks_; @@ -623,6 +641,15 @@ CowMergeOpIter::CowMergeOpIter(std::shared_ptr> ops, block_iter_ = merge_op_blocks->begin() + start; } +bool CowMergeOpIter::RDone() { + return block_iter_ == merge_op_blocks_->begin(); +} + +void CowMergeOpIter::Prev() { + CHECK(!RDone()); + block_iter_--; +} + bool CowMergeOpIter::Done() { return block_iter_ == merge_op_blocks_->end(); } @@ -649,6 +676,15 @@ CowRevMergeOpIter::CowRevMergeOpIter(std::shared_ptr> block_riter_ = merge_op_blocks->rbegin(); } +bool CowRevMergeOpIter::RDone() { + return block_riter_ == merge_op_blocks_->rbegin(); +} + +void CowRevMergeOpIter::Prev() { + CHECK(!RDone()); + block_riter_--; +} + bool CowRevMergeOpIter::Done() { return block_riter_ == merge_op_blocks_->rend() - start_; } diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index d5b433531..8e6bbd9b6 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -92,7 +92,7 @@ class ICowOpIter { public: virtual ~ICowOpIter() {} - // True if there are more items to read, false otherwise. + // True if there are no more items to read forward, false otherwise. virtual bool Done() = 0; // Read the current operation. @@ -100,6 +100,12 @@ class ICowOpIter { // Advance to the next item. virtual void Next() = 0; + + // Advance to the previous item. + virtual void Prev() = 0; + + // True if there are no more items to read backwards, false otherwise + virtual bool RDone() = 0; }; class CowReader final : public ICowReader { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index b988c7bd4..692cb740c 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -492,22 +492,17 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name, return; } - if (IsIouringSupported()) { - std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device, - partition_name, dev_sz); - } else { - int num_threads = 2; - size_t num_blocks = dev_sz >> BLOCK_SHIFT; - size_t num_blocks_per_thread = num_blocks / num_threads; - size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT; - off_t offset = 0; + int num_threads = 2; + size_t num_blocks = dev_sz >> BLOCK_SHIFT; + size_t num_blocks_per_thread = num_blocks / num_threads; + size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT; + off_t offset = 0; - for (int i = 0; i < num_threads; i++) { - std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, - dm_block_device, partition_name, offset, read_sz_per_thread); + for (int i = 0; i < num_threads; i++) { + std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device, + partition_name, offset, read_sz_per_thread); - offset += read_sz_per_thread; - } + offset += read_sz_per_thread; } } @@ -694,10 +689,8 @@ bool SnapshotHandler::IsIouringSupported() { // During selinux init transition, libsnapshot will propagate the // status of io_uring enablement. As properties are not initialized, // we cannot query system property. - // - // TODO: b/219642530: Intermittent I/O failures observed if (is_io_uring_enabled_) { - return false; + return true; } // Finally check the system property diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index cc82985bd..83d40f635 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -99,6 +99,7 @@ class ReadAhead { void InitializeRAIter(); bool RAIterDone(); void RAIterNext(); + void RAResetIter(uint64_t num_blocks); const CowOperation* GetRAOpIter(); void InitializeBuffer(); @@ -151,12 +152,16 @@ class ReadAhead { std::unique_ptr ra_temp_meta_buffer_; BufferSink bufsink_; + uint64_t total_ra_blocks_completed_ = 0; bool read_ahead_async_ = false; - // Queue depth of 32 seems optimal. We don't want + // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use - // IOSQE_ASYNC flag. - int queue_depth_ = 32; + // IOSQE_ASYNC flag - ASYNC flags can potentially + // result in EINTR; Since we don't restart + // syscalls and fallback to synchronous I/O, we + // don't want huge queue depth + int queue_depth_ = 8; std::unique_ptr ring_; }; @@ -210,11 +215,12 @@ class Worker { // Merge related ops bool Merge(); - bool MergeOrderedOps(const std::unique_ptr& cowop_iter); - bool MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter); - bool MergeReplaceZeroOps(const std::unique_ptr& cowop_iter); + bool AsyncMerge(); + bool SyncMerge(); + bool MergeOrderedOps(); + bool MergeOrderedOpsAsync(); + bool MergeReplaceZeroOps(); int PrepareMerge(uint64_t* source_offset, int* pending_ops, - const std::unique_ptr& cowop_iter, std::vector* replace_zero_vec = nullptr); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } @@ -238,12 +244,18 @@ class Worker { unique_fd base_path_merge_fd_; unique_fd ctrl_fd_; + std::unique_ptr cowop_iter_; + size_t ra_block_index_ = 0; + uint64_t blocks_merged_in_group_ = 0; bool merge_async_ = false; - // Queue depth of 32 seems optimal. We don't want + // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use - // IOSQE_ASYNC flag. - int queue_depth_ = 32; + // IOSQE_ASYNC flag - ASYNC flags can potentially + // result in EINTR; Since we don't restart + // syscalls and fallback to synchronous I/O, we + // don't want huge queue depth + int queue_depth_ = 8; std::unique_ptr ring_; std::shared_ptr snapuserd_; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp index ffb982a64..0cb41d340 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp @@ -24,15 +24,14 @@ using namespace android::dm; using android::base::unique_fd; int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, - const std::unique_ptr& cowop_iter, std::vector* replace_zero_vec) { int num_ops = *pending_ops; int nr_consecutive = 0; bool checkOrderedOp = (replace_zero_vec == nullptr); do { - if (!cowop_iter->Done() && num_ops) { - const CowOperation* cow_op = &cowop_iter->Get(); + if (!cowop_iter_->Done() && num_ops) { + const CowOperation* cow_op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*cow_op)) { break; } @@ -42,12 +41,12 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, replace_zero_vec->push_back(cow_op); } - cowop_iter->Next(); + cowop_iter_->Next(); num_ops -= 1; nr_consecutive = 1; - while (!cowop_iter->Done() && num_ops) { - const CowOperation* op = &cowop_iter->Get(); + while (!cowop_iter_->Done() && num_ops) { + const CowOperation* op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*op)) { break; } @@ -63,7 +62,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, nr_consecutive += 1; num_ops -= 1; - cowop_iter->Next(); + cowop_iter_->Next(); } } } while (0); @@ -71,7 +70,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } -bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) { +bool Worker::MergeReplaceZeroOps() { // Flush every 8192 ops. Since all ops are independent and there is no // dependency between COW ops, we will flush the data and the number // of ops merged in COW file for every 8192 ops. If there is a crash, @@ -84,15 +83,17 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32; int num_ops_merged = 0; - while (!cowop_iter->Done()) { + SNAP_LOG(INFO) << "MergeReplaceZeroOps started...."; + + while (!cowop_iter_->Done()) { int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ; std::vector replace_zero_vec; uint64_t source_offset; - int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec); + int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec); if (linear_blocks == 0) { // Merge complete - CHECK(cowop_iter->Done()); + CHECK(cowop_iter_->Done()); break; } @@ -117,8 +118,8 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) size_t io_size = linear_blocks * BLOCK_SZ; // Merge - Write the contents back to base device - int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size, - source_offset); + int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), + io_size, source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to write to backing device while merging " @@ -172,16 +173,15 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) return true; } -bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) { +bool Worker::MergeOrderedOpsAsync() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); - size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOpsAsync started...."; - while (!cowop_iter->Done()) { - const CowOperation* cow_op = &cowop_iter->Get(); + while (!cowop_iter_->Done()) { + const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } @@ -190,11 +190,10 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { - snapuserd_->SetMergeFailed(block_index); return false; } - snapuserd_->SetMergeInProgress(block_index); + snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); @@ -202,12 +201,13 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) int pending_sqe = queue_depth_; int pending_ios_to_submit = 0; bool flush_required = false; + blocks_merged_in_group_ = 0; SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; while (num_ops) { uint64_t source_offset; - int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); + int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks != 0) { size_t io_size = (linear_blocks * BLOCK_SZ); @@ -216,7 +216,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get()); if (!sqe) { SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops"; - snapuserd_->SetMergeFailed(block_index); return false; } @@ -225,10 +224,18 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) offset += io_size; num_ops -= linear_blocks; + blocks_merged_in_group_ += linear_blocks; pending_sqe -= 1; pending_ios_to_submit += 1; - sqe->flags |= IOSQE_ASYNC; + // These flags are important - We need to make sure that the + // blocks are linked and are written in the same order as + // populated. This is because of overlapping block writes. + // + // If there are no dependency, we can optimize this further by + // allowing parallel writes; but for now, just link all the SQ + // entries. + sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } // Ring is full or no more COW ops to be merged in this batch @@ -256,7 +263,7 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) pending_sqe -= 1; flush_required = false; pending_ios_to_submit += 1; - sqe->flags |= IOSQE_ASYNC; + sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } } else { flush_required = true; @@ -269,35 +276,45 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: " << " io submit: " << ret << " expected: " << pending_ios_to_submit; - snapuserd_->SetMergeFailed(block_index); return false; } int pending_ios_to_complete = pending_ios_to_submit; pending_ios_to_submit = 0; + bool status = true; + // Reap I/O completions while (pending_ios_to_complete) { struct io_uring_cqe* cqe; + // We need to make sure to reap all the I/O's submitted + // even if there are any errors observed. + // + // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; + // these error codes are not truly I/O errors; we can retry them + // by re-populating the SQE entries and submitting the I/O + // request back. However, we don't do that now; instead we + // will fallback to synchronous I/O. ret = io_uring_wait_cqe(ring_.get(), &cqe); if (ret) { - SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret; - snapuserd_->SetMergeFailed(block_index); - return false; + SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret; + status = false; } if (cqe->res < 0) { - SNAP_LOG(ERROR) - << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res; - snapuserd_->SetMergeFailed(block_index); - return false; + SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res; + status = false; } io_uring_cqe_seen(ring_.get(), cqe); pending_ios_to_complete -= 1; } + if (!status) { + return false; + } + pending_sqe = queue_depth_; } @@ -312,7 +329,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) // Flush the data if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; - snapuserd_->SetMergeFailed(block_index); return false; } @@ -320,35 +336,34 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; - snapuserd_->SetMergeFailed(block_index); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); + // Mark the block as merge complete - snapuserd_->SetMergeCompleted(block_index); + snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block - block_index += 1; + ra_block_index_ += 1; } return true; } -bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { +bool Worker::MergeOrderedOps() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); - size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOps started...."; - while (!cowop_iter->Done()) { - const CowOperation* cow_op = &cowop_iter->Get(); + while (!cowop_iter_->Done()) { + const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } @@ -357,11 +372,11 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } - snapuserd_->SetMergeInProgress(block_index); + snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); @@ -369,7 +384,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { while (num_ops) { uint64_t source_offset; - int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); + int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks == 0) { break; } @@ -378,12 +393,13 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Write to the base device. Data is already in the RA buffer. Note // that XOR ops is already handled by the RA thread. We just write // the contents out. - int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size, - source_offset); + int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), + (char*)read_ahead_buffer + offset, io_size, + source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Failed to write to backing device while merging " << " at offset: " << source_offset << " io_size: " << io_size; - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } @@ -397,7 +413,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Flush the data if (fsync(base_path_merge_fd_.get()) < 0) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } @@ -405,47 +421,87 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); // Mark the block as merge complete - snapuserd_->SetMergeCompleted(block_index); + snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block - block_index += 1; + ra_block_index_ += 1; } return true; } -bool Worker::Merge() { - std::unique_ptr cowop_iter = reader_->GetMergeOpIter(); +bool Worker::AsyncMerge() { + if (!MergeOrderedOpsAsync()) { + SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O"; + // Reset the iter so that we retry the merge + while (blocks_merged_in_group_ && !cowop_iter_->RDone()) { + cowop_iter_->Prev(); + blocks_merged_in_group_ -= 1; + } + return false; + } + + SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed"; + return true; +} + +bool Worker::SyncMerge() { + if (!MergeOrderedOps()) { + SNAP_LOG(ERROR) << "Merge failed for ordered ops"; + return false; + } + + SNAP_LOG(INFO) << "MergeOrderedOps completed"; + return true; +} + +bool Worker::Merge() { + cowop_iter_ = reader_->GetMergeOpIter(); + + bool retry = false; + bool ordered_ops_merge_status; + + // Start Async Merge if (merge_async_) { - if (!MergeOrderedOpsAsync(cowop_iter)) { + ordered_ops_merge_status = AsyncMerge(); + if (!ordered_ops_merge_status) { + FinalizeIouring(); + retry = true; + merge_async_ = false; + } + } + + // Check if we need to fallback and retry the merge + // + // If the device doesn't support async merge, we + // will directly enter here (aka devices with 4.x kernels) + const bool sync_merge_required = (retry || !merge_async_); + + if (sync_merge_required) { + ordered_ops_merge_status = SyncMerge(); + if (!ordered_ops_merge_status) { + // Merge failed. Device will continue to be mounted + // off snapshots; merge will be retried during + // next reboot SNAP_LOG(ERROR) << "Merge failed for ordered ops"; snapuserd_->MergeFailed(); return false; } - SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed....."; - } else { - // Start with Copy and Xor ops - if (!MergeOrderedOps(cowop_iter)) { - SNAP_LOG(ERROR) << "Merge failed for ordered ops"; - snapuserd_->MergeFailed(); - return false; - } - SNAP_LOG(INFO) << "MergeOrderedOps completed....."; } // Replace and Zero ops - if (!MergeReplaceZeroOps(cowop_iter)) { + if (!MergeReplaceZeroOps()) { SNAP_LOG(ERROR) << "Merge failed for replace/zero ops"; snapuserd_->MergeFailed(); return false; @@ -461,14 +517,6 @@ bool Worker::InitializeIouring() { return false; } - { - // TODO: b/219642530 - Disable io_uring for merge - // until we figure out the cause of intermittent - // IO failures. - merge_async_ = false; - return true; - } - ring_ = std::make_unique(); int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0); @@ -514,7 +562,7 @@ bool Worker::RunMergeThread() { CloseFds(); reader_->CloseCowFd(); - SNAP_LOG(INFO) << "Merge finish"; + SNAP_LOG(INFO) << "Snapshot-Merge completed"; return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp index 26c5f199a..7d9d39215 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp @@ -279,7 +279,6 @@ bool ReadAhead::ReadAheadAsyncIO() { sqe = io_uring_get_sqe(ring_.get()); if (!sqe) { SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead"; - snapuserd_->ReadAheadIOFailed(); return false; } @@ -309,7 +308,6 @@ bool ReadAhead::ReadAheadAsyncIO() { if (ret != pending_ios_to_submit) { SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: " << " io submit: " << ret << " expected: " << pending_ios_to_submit; - snapuserd_->ReadAheadIOFailed(); return false; } @@ -321,14 +319,12 @@ bool ReadAhead::ReadAheadAsyncIO() { // Read XOR data from COW file in parallel when I/O's are in-flight if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) { SNAP_LOG(ERROR) << "ReadXorData failed"; - snapuserd_->ReadAheadIOFailed(); return false; } // Fetch I/O completions if (!ReapIoCompletions(pending_ios_to_complete)) { SNAP_LOG(ERROR) << "ReapIoCompletions failed"; - snapuserd_->ReadAheadIOFailed(); return false; } @@ -393,26 +389,36 @@ void ReadAhead::UpdateScratchMetadata() { } bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) { + bool status = true; + // Reap I/O completions while (pending_ios_to_complete) { struct io_uring_cqe* cqe; + // We need to make sure to reap all the I/O's submitted + // even if there are any errors observed. + // + // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; + // these error codes are not truly I/O errors; we can retry them + // by re-populating the SQE entries and submitting the I/O + // request back. However, we don't do that now; instead we + // will fallback to synchronous I/O. int ret = io_uring_wait_cqe(ring_.get(), &cqe); if (ret) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret; - return false; + status = false; } if (cqe->res < 0) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res; - return false; + status = false; } io_uring_cqe_seen(ring_.get(), cqe); pending_ios_to_complete -= 1; } - return true; + return status; } void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index, @@ -610,18 +616,38 @@ bool ReadAhead::ReadAheadIOStart() { return ReconstructDataFromCow(); } + bool retry = false; + bool ra_status; + + // Start Async read-ahead if (read_ahead_async_) { - if (!ReadAheadAsyncIO()) { - SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - io_uring processing failure."; - return false; + ra_status = ReadAheadAsyncIO(); + if (!ra_status) { + SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O"; + FinalizeIouring(); + RAResetIter(total_blocks_merged_); + retry = true; + read_ahead_async_ = false; } - } else { - if (!ReadAheadSyncIO()) { + } + + // Check if we need to fallback and retry the merge + // + // If the device doesn't support async operations, we + // will directly enter here (aka devices with 4.x kernels) + + const bool ra_sync_required = (retry || !read_ahead_async_); + + if (ra_sync_required) { + ra_status = ReadAheadSyncIO(); + if (!ra_status) { SNAP_LOG(ERROR) << "ReadAheadSyncIO failed"; return false; } } + SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_; + // Wait for the merge to finish for the previous RA window. We shouldn't // be touching the scratch space until merge is complete of previous RA // window. If there is a crash during this time frame, merge should resume @@ -646,6 +672,7 @@ bool ReadAhead::ReadAheadIOStart() { offset += BLOCK_SZ; } + total_ra_blocks_completed_ += total_blocks_merged_; snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_); // Flush the data only if we have a overlapping blocks in the region @@ -763,6 +790,13 @@ void ReadAhead::RAIterNext() { cowop_iter_->Next(); } +void ReadAhead::RAResetIter(uint64_t num_blocks) { + while (num_blocks && !cowop_iter_->RDone()) { + cowop_iter_->Prev(); + num_blocks -= 1; + } +} + const CowOperation* ReadAhead::GetRAOpIter() { const CowOperation* cow_op = &cowop_iter_->Get(); return cow_op; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp index 6dec1e2e0..d4e1d7c7e 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp @@ -531,6 +531,13 @@ void SnapshotHandler::SetMergeInProgress(size_t ra_index) { { std::unique_lock lock(blk_state->m_lock); + // We may have fallback from Async-merge to synchronous merging + // on the existing block. There is no need to reset as the + // merge is already in progress. + if (blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS) { + return; + } + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING); // First set the state to RA_READY so that in-flight I/O will drain