diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp index 9b5fd2aec..746feeb8d 100644 --- a/fs_mgr/libsnapshot/cow_reader.cpp +++ b/fs_mgr/libsnapshot/cow_reader.cpp @@ -550,6 +550,9 @@ class CowOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; + void Prev() override; + bool RDone() override; + private: std::shared_ptr> ops_; std::vector::iterator op_iter_; @@ -560,6 +563,15 @@ CowOpIter::CowOpIter(std::shared_ptr>& ops) { op_iter_ = ops_->begin(); } +bool CowOpIter::RDone() { + return op_iter_ == ops_->begin(); +} + +void CowOpIter::Prev() { + CHECK(!RDone()); + op_iter_--; +} + bool CowOpIter::Done() { return op_iter_ == ops_->end(); } @@ -585,6 +597,9 @@ class CowRevMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; + void Prev() override; + bool RDone() override; + private: std::shared_ptr> ops_; std::shared_ptr> merge_op_blocks_; @@ -603,6 +618,9 @@ class CowMergeOpIter final : public ICowOpIter { const CowOperation& Get() override; void Next() override; + void Prev() override; + bool RDone() override; + private: std::shared_ptr> ops_; std::shared_ptr> merge_op_blocks_; @@ -623,6 +641,15 @@ CowMergeOpIter::CowMergeOpIter(std::shared_ptr> ops, block_iter_ = merge_op_blocks->begin() + start; } +bool CowMergeOpIter::RDone() { + return block_iter_ == merge_op_blocks_->begin(); +} + +void CowMergeOpIter::Prev() { + CHECK(!RDone()); + block_iter_--; +} + bool CowMergeOpIter::Done() { return block_iter_ == merge_op_blocks_->end(); } @@ -649,6 +676,15 @@ CowRevMergeOpIter::CowRevMergeOpIter(std::shared_ptr> block_riter_ = merge_op_blocks->rbegin(); } +bool CowRevMergeOpIter::RDone() { + return block_riter_ == merge_op_blocks_->rbegin(); +} + +void CowRevMergeOpIter::Prev() { + CHECK(!RDone()); + block_riter_--; +} + bool CowRevMergeOpIter::Done() { return block_riter_ == merge_op_blocks_->rend() - start_; } diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index d5b433531..8e6bbd9b6 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -92,7 +92,7 @@ class ICowOpIter { public: virtual ~ICowOpIter() {} - // True if there are more items to read, false otherwise. + // True if there are no more items to read forward, false otherwise. virtual bool Done() = 0; // Read the current operation. @@ -100,6 +100,12 @@ class ICowOpIter { // Advance to the next item. virtual void Next() = 0; + + // Advance to the previous item. + virtual void Prev() = 0; + + // True if there are no more items to read backwards, false otherwise + virtual bool RDone() = 0; }; class CowReader final : public ICowReader { diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp index b988c7bd4..692cb740c 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp @@ -492,22 +492,17 @@ void SnapshotHandler::ReadBlocks(const std::string partition_name, return; } - if (IsIouringSupported()) { - std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device, - partition_name, dev_sz); - } else { - int num_threads = 2; - size_t num_blocks = dev_sz >> BLOCK_SHIFT; - size_t num_blocks_per_thread = num_blocks / num_threads; - size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT; - off_t offset = 0; + int num_threads = 2; + size_t num_blocks = dev_sz >> BLOCK_SHIFT; + size_t num_blocks_per_thread = num_blocks / num_threads; + size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT; + off_t offset = 0; - for (int i = 0; i < num_threads; i++) { - std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, - dm_block_device, partition_name, offset, read_sz_per_thread); + for (int i = 0; i < num_threads; i++) { + std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device, + partition_name, offset, read_sz_per_thread); - offset += read_sz_per_thread; - } + offset += read_sz_per_thread; } } @@ -694,10 +689,8 @@ bool SnapshotHandler::IsIouringSupported() { // During selinux init transition, libsnapshot will propagate the // status of io_uring enablement. As properties are not initialized, // we cannot query system property. - // - // TODO: b/219642530: Intermittent I/O failures observed if (is_io_uring_enabled_) { - return false; + return true; } // Finally check the system property diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h index cc82985bd..83d40f635 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h @@ -99,6 +99,7 @@ class ReadAhead { void InitializeRAIter(); bool RAIterDone(); void RAIterNext(); + void RAResetIter(uint64_t num_blocks); const CowOperation* GetRAOpIter(); void InitializeBuffer(); @@ -151,12 +152,16 @@ class ReadAhead { std::unique_ptr ra_temp_meta_buffer_; BufferSink bufsink_; + uint64_t total_ra_blocks_completed_ = 0; bool read_ahead_async_ = false; - // Queue depth of 32 seems optimal. We don't want + // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use - // IOSQE_ASYNC flag. - int queue_depth_ = 32; + // IOSQE_ASYNC flag - ASYNC flags can potentially + // result in EINTR; Since we don't restart + // syscalls and fallback to synchronous I/O, we + // don't want huge queue depth + int queue_depth_ = 8; std::unique_ptr ring_; }; @@ -210,11 +215,12 @@ class Worker { // Merge related ops bool Merge(); - bool MergeOrderedOps(const std::unique_ptr& cowop_iter); - bool MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter); - bool MergeReplaceZeroOps(const std::unique_ptr& cowop_iter); + bool AsyncMerge(); + bool SyncMerge(); + bool MergeOrderedOps(); + bool MergeOrderedOpsAsync(); + bool MergeReplaceZeroOps(); int PrepareMerge(uint64_t* source_offset, int* pending_ops, - const std::unique_ptr& cowop_iter, std::vector* replace_zero_vec = nullptr); sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } @@ -238,12 +244,18 @@ class Worker { unique_fd base_path_merge_fd_; unique_fd ctrl_fd_; + std::unique_ptr cowop_iter_; + size_t ra_block_index_ = 0; + uint64_t blocks_merged_in_group_ = 0; bool merge_async_ = false; - // Queue depth of 32 seems optimal. We don't want + // Queue depth of 8 seems optimal. We don't want // to have a huge depth as it may put more memory pressure // on the kernel worker threads given that we use - // IOSQE_ASYNC flag. - int queue_depth_ = 32; + // IOSQE_ASYNC flag - ASYNC flags can potentially + // result in EINTR; Since we don't restart + // syscalls and fallback to synchronous I/O, we + // don't want huge queue depth + int queue_depth_ = 8; std::unique_ptr ring_; std::shared_ptr snapuserd_; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp index ffb982a64..0cb41d340 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp @@ -24,15 +24,14 @@ using namespace android::dm; using android::base::unique_fd; int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, - const std::unique_ptr& cowop_iter, std::vector* replace_zero_vec) { int num_ops = *pending_ops; int nr_consecutive = 0; bool checkOrderedOp = (replace_zero_vec == nullptr); do { - if (!cowop_iter->Done() && num_ops) { - const CowOperation* cow_op = &cowop_iter->Get(); + if (!cowop_iter_->Done() && num_ops) { + const CowOperation* cow_op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*cow_op)) { break; } @@ -42,12 +41,12 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, replace_zero_vec->push_back(cow_op); } - cowop_iter->Next(); + cowop_iter_->Next(); num_ops -= 1; nr_consecutive = 1; - while (!cowop_iter->Done() && num_ops) { - const CowOperation* op = &cowop_iter->Get(); + while (!cowop_iter_->Done() && num_ops) { + const CowOperation* op = &cowop_iter_->Get(); if (checkOrderedOp && !IsOrderedOp(*op)) { break; } @@ -63,7 +62,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, nr_consecutive += 1; num_ops -= 1; - cowop_iter->Next(); + cowop_iter_->Next(); } } } while (0); @@ -71,7 +70,7 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops, return nr_consecutive; } -bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) { +bool Worker::MergeReplaceZeroOps() { // Flush every 8192 ops. Since all ops are independent and there is no // dependency between COW ops, we will flush the data and the number // of ops merged in COW file for every 8192 ops. If there is a crash, @@ -84,15 +83,17 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32; int num_ops_merged = 0; - while (!cowop_iter->Done()) { + SNAP_LOG(INFO) << "MergeReplaceZeroOps started...."; + + while (!cowop_iter_->Done()) { int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ; std::vector replace_zero_vec; uint64_t source_offset; - int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec); + int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec); if (linear_blocks == 0) { // Merge complete - CHECK(cowop_iter->Done()); + CHECK(cowop_iter_->Done()); break; } @@ -117,8 +118,8 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) size_t io_size = linear_blocks * BLOCK_SZ; // Merge - Write the contents back to base device - int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size, - source_offset); + int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), + io_size, source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to write to backing device while merging " @@ -172,16 +173,15 @@ bool Worker::MergeReplaceZeroOps(const std::unique_ptr& cowop_iter) return true; } -bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) { +bool Worker::MergeOrderedOpsAsync() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); - size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOpsAsync started...."; - while (!cowop_iter->Done()) { - const CowOperation* cow_op = &cowop_iter->Get(); + while (!cowop_iter_->Done()) { + const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } @@ -190,11 +190,10 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { - snapuserd_->SetMergeFailed(block_index); return false; } - snapuserd_->SetMergeInProgress(block_index); + snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); @@ -202,12 +201,13 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) int pending_sqe = queue_depth_; int pending_ios_to_submit = 0; bool flush_required = false; + blocks_merged_in_group_ = 0; SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops; while (num_ops) { uint64_t source_offset; - int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); + int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks != 0) { size_t io_size = (linear_blocks * BLOCK_SZ); @@ -216,7 +216,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get()); if (!sqe) { SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops"; - snapuserd_->SetMergeFailed(block_index); return false; } @@ -225,10 +224,18 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) offset += io_size; num_ops -= linear_blocks; + blocks_merged_in_group_ += linear_blocks; pending_sqe -= 1; pending_ios_to_submit += 1; - sqe->flags |= IOSQE_ASYNC; + // These flags are important - We need to make sure that the + // blocks are linked and are written in the same order as + // populated. This is because of overlapping block writes. + // + // If there are no dependency, we can optimize this further by + // allowing parallel writes; but for now, just link all the SQ + // entries. + sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } // Ring is full or no more COW ops to be merged in this batch @@ -256,7 +263,7 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) pending_sqe -= 1; flush_required = false; pending_ios_to_submit += 1; - sqe->flags |= IOSQE_ASYNC; + sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC); } } else { flush_required = true; @@ -269,35 +276,45 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: " << " io submit: " << ret << " expected: " << pending_ios_to_submit; - snapuserd_->SetMergeFailed(block_index); return false; } int pending_ios_to_complete = pending_ios_to_submit; pending_ios_to_submit = 0; + bool status = true; + // Reap I/O completions while (pending_ios_to_complete) { struct io_uring_cqe* cqe; + // We need to make sure to reap all the I/O's submitted + // even if there are any errors observed. + // + // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; + // these error codes are not truly I/O errors; we can retry them + // by re-populating the SQE entries and submitting the I/O + // request back. However, we don't do that now; instead we + // will fallback to synchronous I/O. ret = io_uring_wait_cqe(ring_.get(), &cqe); if (ret) { - SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret; - snapuserd_->SetMergeFailed(block_index); - return false; + SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret; + status = false; } if (cqe->res < 0) { - SNAP_LOG(ERROR) - << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res; - snapuserd_->SetMergeFailed(block_index); - return false; + SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res; + status = false; } io_uring_cqe_seen(ring_.get(), cqe); pending_ios_to_complete -= 1; } + if (!status) { + return false; + } + pending_sqe = queue_depth_; } @@ -312,7 +329,6 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) // Flush the data if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; - snapuserd_->SetMergeFailed(block_index); return false; } @@ -320,35 +336,34 @@ bool Worker::MergeOrderedOpsAsync(const std::unique_ptr& cowop_iter) // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; - snapuserd_->SetMergeFailed(block_index); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); + // Mark the block as merge complete - snapuserd_->SetMergeCompleted(block_index); + snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block - block_index += 1; + ra_block_index_ += 1; } return true; } -bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { +bool Worker::MergeOrderedOps() { void* mapped_addr = snapuserd_->GetMappedAddr(); void* read_ahead_buffer = static_cast((char*)mapped_addr + snapuserd_->GetBufferDataOffset()); - size_t block_index = 0; SNAP_LOG(INFO) << "MergeOrderedOps started...."; - while (!cowop_iter->Done()) { - const CowOperation* cow_op = &cowop_iter->Get(); + while (!cowop_iter_->Done()) { + const CowOperation* cow_op = &cowop_iter_->Get(); if (!IsOrderedOp(*cow_op)) { break; } @@ -357,11 +372,11 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Wait for RA thread to notify that the merge window // is ready for merging. if (!snapuserd_->WaitForMergeBegin()) { - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } - snapuserd_->SetMergeInProgress(block_index); + snapuserd_->SetMergeInProgress(ra_block_index_); loff_t offset = 0; int num_ops = snapuserd_->GetTotalBlocksToMerge(); @@ -369,7 +384,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { while (num_ops) { uint64_t source_offset; - int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter); + int linear_blocks = PrepareMerge(&source_offset, &num_ops); if (linear_blocks == 0) { break; } @@ -378,12 +393,13 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Write to the base device. Data is already in the RA buffer. Note // that XOR ops is already handled by the RA thread. We just write // the contents out. - int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size, - source_offset); + int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), + (char*)read_ahead_buffer + offset, io_size, + source_offset)); if (ret < 0 || ret != io_size) { SNAP_LOG(ERROR) << "Failed to write to backing device while merging " << " at offset: " << source_offset << " io_size: " << io_size; - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } @@ -397,7 +413,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // Flush the data if (fsync(base_path_merge_fd_.get()) < 0) { SNAP_LOG(ERROR) << " Failed to fsync merged data"; - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } @@ -405,47 +421,87 @@ bool Worker::MergeOrderedOps(const std::unique_ptr& cowop_iter) { // the merge completion if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) { SNAP_LOG(ERROR) << " Failed to commit the merged block in the header"; - snapuserd_->SetMergeFailed(block_index); + snapuserd_->SetMergeFailed(ra_block_index_); return false; } SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge(); // Mark the block as merge complete - snapuserd_->SetMergeCompleted(block_index); + snapuserd_->SetMergeCompleted(ra_block_index_); // Notify RA thread that the merge thread is ready to merge the next // window snapuserd_->NotifyRAForMergeReady(); // Get the next block - block_index += 1; + ra_block_index_ += 1; } return true; } -bool Worker::Merge() { - std::unique_ptr cowop_iter = reader_->GetMergeOpIter(); +bool Worker::AsyncMerge() { + if (!MergeOrderedOpsAsync()) { + SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O"; + // Reset the iter so that we retry the merge + while (blocks_merged_in_group_ && !cowop_iter_->RDone()) { + cowop_iter_->Prev(); + blocks_merged_in_group_ -= 1; + } + return false; + } + + SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed"; + return true; +} + +bool Worker::SyncMerge() { + if (!MergeOrderedOps()) { + SNAP_LOG(ERROR) << "Merge failed for ordered ops"; + return false; + } + + SNAP_LOG(INFO) << "MergeOrderedOps completed"; + return true; +} + +bool Worker::Merge() { + cowop_iter_ = reader_->GetMergeOpIter(); + + bool retry = false; + bool ordered_ops_merge_status; + + // Start Async Merge if (merge_async_) { - if (!MergeOrderedOpsAsync(cowop_iter)) { + ordered_ops_merge_status = AsyncMerge(); + if (!ordered_ops_merge_status) { + FinalizeIouring(); + retry = true; + merge_async_ = false; + } + } + + // Check if we need to fallback and retry the merge + // + // If the device doesn't support async merge, we + // will directly enter here (aka devices with 4.x kernels) + const bool sync_merge_required = (retry || !merge_async_); + + if (sync_merge_required) { + ordered_ops_merge_status = SyncMerge(); + if (!ordered_ops_merge_status) { + // Merge failed. Device will continue to be mounted + // off snapshots; merge will be retried during + // next reboot SNAP_LOG(ERROR) << "Merge failed for ordered ops"; snapuserd_->MergeFailed(); return false; } - SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed....."; - } else { - // Start with Copy and Xor ops - if (!MergeOrderedOps(cowop_iter)) { - SNAP_LOG(ERROR) << "Merge failed for ordered ops"; - snapuserd_->MergeFailed(); - return false; - } - SNAP_LOG(INFO) << "MergeOrderedOps completed....."; } // Replace and Zero ops - if (!MergeReplaceZeroOps(cowop_iter)) { + if (!MergeReplaceZeroOps()) { SNAP_LOG(ERROR) << "Merge failed for replace/zero ops"; snapuserd_->MergeFailed(); return false; @@ -461,14 +517,6 @@ bool Worker::InitializeIouring() { return false; } - { - // TODO: b/219642530 - Disable io_uring for merge - // until we figure out the cause of intermittent - // IO failures. - merge_async_ = false; - return true; - } - ring_ = std::make_unique(); int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0); @@ -514,7 +562,7 @@ bool Worker::RunMergeThread() { CloseFds(); reader_->CloseCowFd(); - SNAP_LOG(INFO) << "Merge finish"; + SNAP_LOG(INFO) << "Snapshot-Merge completed"; return true; } diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp index 26c5f199a..7d9d39215 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp @@ -279,7 +279,6 @@ bool ReadAhead::ReadAheadAsyncIO() { sqe = io_uring_get_sqe(ring_.get()); if (!sqe) { SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead"; - snapuserd_->ReadAheadIOFailed(); return false; } @@ -309,7 +308,6 @@ bool ReadAhead::ReadAheadAsyncIO() { if (ret != pending_ios_to_submit) { SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: " << " io submit: " << ret << " expected: " << pending_ios_to_submit; - snapuserd_->ReadAheadIOFailed(); return false; } @@ -321,14 +319,12 @@ bool ReadAhead::ReadAheadAsyncIO() { // Read XOR data from COW file in parallel when I/O's are in-flight if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) { SNAP_LOG(ERROR) << "ReadXorData failed"; - snapuserd_->ReadAheadIOFailed(); return false; } // Fetch I/O completions if (!ReapIoCompletions(pending_ios_to_complete)) { SNAP_LOG(ERROR) << "ReapIoCompletions failed"; - snapuserd_->ReadAheadIOFailed(); return false; } @@ -393,26 +389,36 @@ void ReadAhead::UpdateScratchMetadata() { } bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) { + bool status = true; + // Reap I/O completions while (pending_ios_to_complete) { struct io_uring_cqe* cqe; + // We need to make sure to reap all the I/O's submitted + // even if there are any errors observed. + // + // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR; + // these error codes are not truly I/O errors; we can retry them + // by re-populating the SQE entries and submitting the I/O + // request back. However, we don't do that now; instead we + // will fallback to synchronous I/O. int ret = io_uring_wait_cqe(ring_.get(), &cqe); if (ret) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret; - return false; + status = false; } if (cqe->res < 0) { SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res; - return false; + status = false; } io_uring_cqe_seen(ring_.get(), cqe); pending_ios_to_complete -= 1; } - return true; + return status; } void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index, @@ -610,18 +616,38 @@ bool ReadAhead::ReadAheadIOStart() { return ReconstructDataFromCow(); } + bool retry = false; + bool ra_status; + + // Start Async read-ahead if (read_ahead_async_) { - if (!ReadAheadAsyncIO()) { - SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - io_uring processing failure."; - return false; + ra_status = ReadAheadAsyncIO(); + if (!ra_status) { + SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O"; + FinalizeIouring(); + RAResetIter(total_blocks_merged_); + retry = true; + read_ahead_async_ = false; } - } else { - if (!ReadAheadSyncIO()) { + } + + // Check if we need to fallback and retry the merge + // + // If the device doesn't support async operations, we + // will directly enter here (aka devices with 4.x kernels) + + const bool ra_sync_required = (retry || !read_ahead_async_); + + if (ra_sync_required) { + ra_status = ReadAheadSyncIO(); + if (!ra_status) { SNAP_LOG(ERROR) << "ReadAheadSyncIO failed"; return false; } } + SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_; + // Wait for the merge to finish for the previous RA window. We shouldn't // be touching the scratch space until merge is complete of previous RA // window. If there is a crash during this time frame, merge should resume @@ -646,6 +672,7 @@ bool ReadAhead::ReadAheadIOStart() { offset += BLOCK_SZ; } + total_ra_blocks_completed_ += total_blocks_merged_; snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_); // Flush the data only if we have a overlapping blocks in the region @@ -763,6 +790,13 @@ void ReadAhead::RAIterNext() { cowop_iter_->Next(); } +void ReadAhead::RAResetIter(uint64_t num_blocks) { + while (num_blocks && !cowop_iter_->RDone()) { + cowop_iter_->Prev(); + num_blocks -= 1; + } +} + const CowOperation* ReadAhead::GetRAOpIter() { const CowOperation* cow_op = &cowop_iter_->Get(); return cow_op; diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp index 6dec1e2e0..d4e1d7c7e 100644 --- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp +++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp @@ -531,6 +531,13 @@ void SnapshotHandler::SetMergeInProgress(size_t ra_index) { { std::unique_lock lock(blk_state->m_lock); + // We may have fallback from Async-merge to synchronous merging + // on the existing block. There is no need to reset as the + // merge is already in progress. + if (blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS) { + return; + } + CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING); // First set the state to RA_READY so that in-flight I/O will drain