Merge changes I25fb5fce,I86cffff6,I031eb1a1,Ie40633c0,I08562b89

* changes:
  snapuserd: Sort REPLACE ops for batch merge
  snapuserd: API to query snapshot and merge status
  snapuserd: Wire up API's for Initiating and tracking Merge
  snapuserd: I/O requests which are not block aligned.
  snapuserd: Service I/O requests from dm-user
This commit is contained in:
Akilesh Kailash 2021-11-03 01:45:03 +00:00 committed by Gerrit Code Review
commit 1765610fa4
18 changed files with 1822 additions and 23 deletions

View file

@ -34,11 +34,12 @@
namespace android {
namespace snapshot {
CowReader::CowReader()
CowReader::CowReader(ReaderFlags reader_flag)
: fd_(-1),
header_(),
fd_size_(0),
merge_op_blocks_(std::make_shared<std::vector<uint32_t>>()) {}
merge_op_blocks_(std::make_shared<std::vector<uint32_t>>()),
reader_flag_(reader_flag) {}
static void SHA256(const void*, size_t, uint8_t[]) {
#if 0
@ -415,7 +416,7 @@ bool CowReader::ParseOps(std::optional<uint64_t> label) {
//==============================================================
bool CowReader::PrepMergeOps() {
auto merge_op_blocks = std::make_shared<std::vector<uint32_t>>();
std::set<int, std::greater<int>> other_ops;
std::vector<int> other_ops;
auto seq_ops_set = std::unordered_set<uint32_t>();
auto block_map = std::make_shared<std::unordered_map<uint32_t, int>>();
size_t num_seqs = 0;
@ -446,7 +447,7 @@ bool CowReader::PrepMergeOps() {
if (!has_seq_ops_ && IsOrderedOp(current_op)) {
merge_op_blocks->emplace_back(current_op.new_block);
} else if (seq_ops_set.count(current_op.new_block) == 0) {
other_ops.insert(current_op.new_block);
other_ops.push_back(current_op.new_block);
}
block_map->insert({current_op.new_block, i});
}
@ -462,6 +463,18 @@ bool CowReader::PrepMergeOps() {
} else {
num_ordered_ops_to_merge_ = 0;
}
// Sort the vector in increasing order if merging in user-space as
// we can batch merge them when iterating from forward.
//
// dm-snapshot-merge requires decreasing order as we iterate the blocks
// in reverse order.
if (reader_flag_ == ReaderFlags::USERSPACE_MERGE) {
std::sort(other_ops.begin(), other_ops.end());
} else {
std::sort(other_ops.begin(), other_ops.end(), std::greater<int>());
}
merge_op_blocks->reserve(merge_op_blocks->size() + other_ops.size());
for (auto block : other_ops) {
merge_op_blocks->emplace_back(block);

View file

@ -26,8 +26,8 @@ static constexpr uint32_t kCowVersionMinor = 0;
static constexpr uint32_t kCowVersionManifest = 2;
static constexpr uint32_t BLOCK_SZ = 4096;
static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
static constexpr size_t BLOCK_SZ = 4096;
static constexpr size_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
// This header appears as the first sequence of bytes in the COW. All fields
// in the layout are little-endian encoded. The on-disk layout is:

View file

@ -104,7 +104,12 @@ class ICowOpIter {
class CowReader final : public ICowReader {
public:
CowReader();
enum class ReaderFlags {
DEFAULT = 0,
USERSPACE_MERGE = 1,
};
CowReader(ReaderFlags reader_flag = ReaderFlags::DEFAULT);
~CowReader() { owned_fd_ = {}; }
// Parse the COW, optionally, up to the given label. If no label is
@ -166,6 +171,7 @@ class CowReader final : public ICowReader {
uint64_t num_ordered_ops_to_merge_;
bool has_seq_ops_;
std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
ReaderFlags reader_flag_;
};
} // namespace snapshot

View file

@ -56,7 +56,7 @@ cc_defaults {
"fs_mgr_defaults",
],
srcs: [
"snapuserd_server.cpp",
"dm-snapshot-merge/snapuserd_server.cpp",
"dm-snapshot-merge/snapuserd.cpp",
"dm-snapshot-merge/snapuserd_worker.cpp",
"dm-snapshot-merge/snapuserd_readahead.cpp",
@ -67,6 +67,7 @@ cc_defaults {
"user-space-merge/snapuserd_merge.cpp",
"user-space-merge/snapuserd_readahead.cpp",
"user-space-merge/snapuserd_transitions.cpp",
"user-space-merge/snapuserd_server.cpp",
],
cflags: [

View file

@ -31,6 +31,7 @@
#include <android-base/scopeguard.h>
#include <fs_mgr/file_wait.h>
#include <snapuserd/snapuserd_client.h>
#include "snapuserd_server.h"
#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_

View file

@ -28,7 +28,7 @@
#include <vector>
#include <android-base/unique_fd.h>
#include "dm-snapshot-merge/snapuserd.h"
#include "snapuserd.h"
namespace android {
namespace snapshot {

View file

@ -79,6 +79,15 @@ class SnapuserdClient {
// Returns true if the snapuserd instance supports bridging a socket to second-stage init.
bool SupportsSecondStageSocketHandoff();
// Returns true if the merge is started(or resumed from crash).
bool InitiateMerge(const std::string& misc_name);
// Returns Merge completion percentage
double GetMergePercent();
// Return the status of the snapshot
std::string QuerySnapshotStatus(const std::string& misc_name);
};
} // namespace snapshot

View file

@ -231,5 +231,35 @@ bool SnapuserdClient::DetachSnapuserd() {
return true;
}
bool SnapuserdClient::InitiateMerge(const std::string& misc_name) {
std::string msg = "initiate_merge," + misc_name;
if (!Sendmsg(msg)) {
LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
return false;
}
std::string response = Receivemsg();
return response == "success";
}
double SnapuserdClient::GetMergePercent() {
std::string msg = "merge_percent";
if (!Sendmsg(msg)) {
LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
return false;
}
std::string response = Receivemsg();
return std::stod(response);
}
std::string SnapuserdClient::QuerySnapshotStatus(const std::string& misc_name) {
std::string msg = "getstatus," + misc_name;
if (!Sendmsg(msg)) {
LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
return "snapshot-merge-failed";
}
return Receivemsg();
}
} // namespace snapshot
} // namespace android

View file

@ -21,8 +21,6 @@
#include <gflags/gflags.h>
#include <snapuserd/snapuserd_client.h>
#include "snapuserd_server.h"
DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path.");
DEFINE_bool(no_socket, false,
"If true, no socket is used. Each additional argument is an INIT message.");

View file

@ -19,7 +19,7 @@
#include <string>
#include <vector>
#include "snapuserd_server.h"
#include "dm-snapshot-merge/snapuserd_server.h"
namespace android {
namespace snapshot {

View file

@ -59,6 +59,15 @@ std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
return reader_->CloneCowReader();
}
void SnapshotHandler::UpdateMergeCompletionPercentage() {
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
<< " num_merge_ops: " << ch->num_merge_ops
<< " total-ops: " << reader_->get_num_total_data_ops();
}
bool SnapshotHandler::CommitMerge(int num_merge_ops) {
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
ch->num_merge_ops += num_merge_ops;
@ -95,6 +104,12 @@ bool SnapshotHandler::CommitMerge(int num_merge_ops) {
}
}
// Update the merge completion - this is used by update engine
// to track the completion. No need to take a lock. It is ok
// even if there is a miss on reading a latest updated value.
// Subsequent polling will eventually converge to completion.
UpdateMergeCompletionPercentage();
return true;
}
@ -124,7 +139,7 @@ void SnapshotHandler::CheckMergeCompletionStatus() {
}
bool SnapshotHandler::ReadMetadata() {
reader_ = std::make_unique<CowReader>();
reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE);
CowHeader header;
CowOptions options;
@ -152,16 +167,48 @@ bool SnapshotHandler::ReadMetadata() {
return false;
}
UpdateMergeCompletionPercentage();
// Initialize the iterator for reading metadata
std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
int ra_index = 0;
size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
while (!cowop_iter->Done()) {
const CowOperation* cow_op = &cowop_iter->Get();
if (cow_op->type == kCowCopyOp) {
copy_ops += 1;
} else if (cow_op->type == kCowReplaceOp) {
replace_ops += 1;
} else if (cow_op->type == kCowZeroOp) {
zero_ops += 1;
} else if (cow_op->type == kCowXorOp) {
xor_ops += 1;
}
chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
if (!ra_thread_ && IsOrderedOp(*cow_op)) {
if (IsOrderedOp(*cow_op)) {
ra_thread_ = true;
block_to_ra_index_[cow_op->new_block] = ra_index;
num_ra_ops_per_iter -= 1;
if ((ra_index + 1) - merge_blk_state_.size() == 1) {
std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
merge_blk_state_.push_back(std::move(blk_state));
}
// Move to next RA block
if (num_ra_ops_per_iter == 0) {
num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
ra_index += 1;
}
}
cowop_iter->Next();
}
@ -173,6 +220,12 @@ bool SnapshotHandler::ReadMetadata() {
PrepareReadAhead();
SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
<< " Total-data-ops: " << reader_->get_num_total_data_ops()
<< " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
<< " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
<< " Xor-ops: " << xor_ops;
return true;
}

View file

@ -67,6 +67,27 @@ enum class MERGE_IO_TRANSITION {
class SnapshotHandler;
enum class MERGE_GROUP_STATE {
GROUP_MERGE_PENDING,
GROUP_MERGE_RA_READY,
GROUP_MERGE_IN_PROGRESS,
GROUP_MERGE_COMPLETED,
GROUP_MERGE_FAILED,
GROUP_INVALID,
};
struct MergeGroupState {
MERGE_GROUP_STATE merge_state_;
// Ref count I/O when group state
// is in "GROUP_MERGE_PENDING"
size_t num_ios_in_progress;
std::mutex m_lock;
std::condition_variable m_cv;
MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
: merge_state_(state), num_ios_in_progress(n_ios) {}
};
class ReadAhead {
public:
ReadAhead(const std::string& cow_device, const std::string& backing_device,
@ -133,16 +154,33 @@ class Worker {
base_path_merge_fd_ = {};
}
// Functions interacting with dm-user
bool ReadDmUserHeader();
bool WriteDmUserPayload(size_t size, bool header_response);
bool DmuserReadRequest();
// IO Path
bool ProcessIORequest();
bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
bool ReadFromSourceDevice(const CowOperation* cow_op);
bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response);
bool ReadUnalignedSector(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
bool RespondIOError(bool header_response);
// Processing COW operations
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
bool ProcessZeroOp();
// Handles Copy and Xor
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
// Merge related ops
bool Merge();
@ -152,6 +190,9 @@ class Worker {
const std::unique_ptr<ICowOpIter>& cowop_iter,
std::vector<const CowOperation*>* replace_zero_vec = nullptr);
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
XorSink xorsink_;
@ -210,6 +251,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
// Read-ahead related functions
void* GetMappedAddr() { return mapped_addr_; }
void PrepareReadAhead();
std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
// State transitions for merge
void InitiateMerge();
@ -226,6 +268,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
// Return the snapshot status
std::string GetMergeStatus();
// RA related functions
uint64_t GetBufferMetadataOffset();
@ -238,12 +282,23 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
bool MergeInitiated() { return merge_initiated_; }
double GetMergePercentage() { return merge_completion_percentage_; }
// Merge Block State Transitions
void SetMergeCompleted(size_t block_index);
void SetMergeInProgress(size_t block_index);
void SetMergeFailed(size_t block_index);
void NotifyIOCompletion(uint64_t new_block);
bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
private:
bool ReadMetadata();
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
struct BufferState* GetBufferState();
void UpdateMergeCompletionPercentage();
void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
@ -261,7 +316,6 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
unique_fd cow_fd_;
// Number of sectors required when initializing dm-user
uint64_t num_sectors_;
std::unique_ptr<CowReader> reader_;
@ -283,8 +337,16 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
int total_ra_blocks_merged_ = 0;
MERGE_IO_TRANSITION io_state_;
std::unique_ptr<ReadAhead> read_ahead_thread_;
std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
// user-space-merging
std::unordered_map<uint64_t, int> block_to_ra_index_;
// Merge Block state
std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
std::unique_ptr<Worker> merge_thread_;
double merge_completion_percentage_;
bool merge_initiated_ = false;
bool attached_ = false;

View file

@ -84,6 +84,56 @@ bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
return true;
}
bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
return false;
}
SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
<< " Source: " << cow_op->source;
uint64_t offset = cow_op->source;
if (cow_op->type == kCowCopyOp) {
offset *= BLOCK_SZ;
}
if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
std::string op;
if (cow_op->type == kCowCopyOp)
op = "Copy-op";
else {
op = "Xor-op";
}
SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
<< "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
return false;
}
return true;
}
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
return true;
}
bool Worker::ProcessXorOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
return false;
}
xorsink_.Reset();
if (!reader_->ReadData(*cow_op, &xorsink_)) {
SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
return false;
}
return true;
}
bool Worker::ProcessZeroOp() {
// Zero out the entire block
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@ -96,12 +146,85 @@ bool Worker::ProcessZeroOp() {
return true;
}
bool Worker::ProcessCopyOp(const CowOperation*) {
return true;
bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
return false;
}
MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);
switch (state) {
case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
// Merge is completed for this COW op; just read directly from
// the base device
SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
<< (cow_op->new_block >> SECTOR_SHIFT)
<< " Block-number: " << cow_op->new_block;
if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
<< (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
return false;
}
return true;
}
case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
bool ret;
if (cow_op->type == kCowCopyOp) {
ret = ProcessCopyOp(cow_op);
} else {
ret = ProcessXorOp(cow_op);
}
// I/O is complete - decrement the refcount irrespective of the return
// status
snapuserd_->NotifyIOCompletion(cow_op->new_block);
return ret;
}
// We already have the data in the buffer retrieved from RA thread.
// Nothing to process further.
case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
[[fallthrough]];
}
case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
return true;
}
default: {
// All other states, fail the I/O viz (GROUP_MERGE_FAILED and GROUP_INVALID)
return false;
}
}
return false;
}
bool Worker::ProcessXorOp(const CowOperation*) {
return true;
bool Worker::ProcessCowOp(const CowOperation* cow_op) {
if (cow_op == nullptr) {
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
return false;
}
switch (cow_op->type) {
case kCowReplaceOp: {
return ProcessReplaceOp(cow_op);
}
case kCowZeroOp: {
return ProcessZeroOp();
}
case kCowCopyOp:
[[fallthrough]];
case kCowXorOp: {
return ProcessOrderedOp(cow_op);
}
default: {
SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
}
}
return false;
}
void Worker::InitializeBufsink() {
@ -129,7 +252,7 @@ bool Worker::Init() {
}
bool Worker::RunThread() {
SNAP_LOG(DEBUG) << "Processing snapshot I/O requests...";
SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
// Start serving IO
while (true) {
if (!ProcessIORequest()) {
@ -143,8 +266,378 @@ bool Worker::RunThread() {
return true;
}
// Read Header from dm-user misc device. This gives
// us the sector number for which IO is issued by dm-snapshot device
bool Worker::ReadDmUserHeader() {
if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
if (errno != ENOTBLK) {
SNAP_PLOG(ERROR) << "Control-read failed";
}
SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed....";
return false;
}
return true;
}
// Send the payload/data back to dm-user misc device.
bool Worker::WriteDmUserPayload(size_t size, bool header_response) {
size_t payload_size = size;
void* buf = bufsink_.GetPayloadBufPtr();
if (header_response) {
payload_size += sizeof(struct dm_user_header);
buf = bufsink_.GetBufPtr();
}
if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
return false;
}
return true;
}
bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
CHECK(read_size <= BLOCK_SZ);
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
return false;
}
loff_t offset = sector << SECTOR_SHIFT;
if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
<< "at sector :" << sector << " size: " << read_size;
return false;
}
return true;
}
bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
size_t remaining_size = sz;
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
bool io_error = false;
int ret = 0;
do {
// Process 1MB payload at a time
size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
header->type = DM_USER_RESP_SUCCESS;
size_t total_bytes_read = 0;
io_error = false;
bufsink_.ResetBufferOffset();
while (read_size) {
// We need to check every 4k block to verify if it is
// present in the mapping.
size_t size = std::min(BLOCK_SZ, read_size);
auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
std::make_pair(sector, nullptr), SnapshotHandler::compare);
bool not_found = (it == chunk_vec.end() || it->first != sector);
if (not_found) {
// Block not found in map - which means this block was not
// changed as per the OTA. Just route the I/O to the base
// device.
if (!ReadDataFromBaseDevice(sector, size)) {
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
header->type = DM_USER_RESP_ERROR;
}
ret = size;
} else {
// We found the sector in mapping. Check the type of COW OP and
// process it.
if (!ProcessCowOp(it->second)) {
SNAP_LOG(ERROR) << "ProcessCowOp failed";
header->type = DM_USER_RESP_ERROR;
}
ret = BLOCK_SZ;
}
// Just return the header if it is an error
if (header->type == DM_USER_RESP_ERROR) {
if (!RespondIOError(header_response)) {
return false;
}
io_error = true;
break;
}
read_size -= ret;
total_bytes_read += ret;
sector += (ret >> SECTOR_SHIFT);
bufsink_.UpdateBufferOffset(ret);
}
if (!io_error) {
if (!WriteDmUserPayload(total_bytes_read, header_response)) {
return false;
}
SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
<< " header-response: " << header_response
<< " remaining_size: " << remaining_size;
header_response = false;
remaining_size -= total_bytes_read;
}
} while (remaining_size > 0 && !io_error);
return true;
}
int Worker::ReadUnalignedSector(
sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
size_t skip_sector_size = 0;
SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
<< " Aligned sector: " << it->first;
if (!ProcessCowOp(it->second)) {
SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
<< " Aligned sector: " << it->first;
return -1;
}
int num_sectors_skip = sector - it->first;
if (num_sectors_skip > 0) {
skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
if (skip_sector_size == BLOCK_SZ) {
SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
<< " Base-sector: " << it->first;
return -1;
}
memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
(BLOCK_SZ - skip_sector_size));
}
bufsink_.ResetBufferOffset();
return std::min(size, (BLOCK_SZ - skip_sector_size));
}
bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
header->type = DM_USER_RESP_SUCCESS;
bufsink_.ResetBufferOffset();
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
SnapshotHandler::compare);
// |-------|-------|-------|
// 0 1 2 3
//
// Block 0 - op 1
// Block 1 - op 2
// Block 2 - op 3
//
// chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
//
// Each block is 4k bytes. Thus, the last block will span 8 sectors
// ranging till block 3 (However, block 3 won't be in chunk_vec as
// it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
// spanning between block 2 and block 3, we need to step back
// and get hold of the last element.
//
// Additionally, we need to make sure that the requested sector is
// indeed within the range of the final sector. It is perfectly valid
// to get an I/O request for block 3 and beyond which are not mapped
// to any COW ops. In that case, we just need to read from the base
// device.
bool merge_complete = false;
bool header_response = true;
if (it == chunk_vec.end()) {
if (chunk_vec.size() > 0) {
// I/O request beyond the last mapped sector
it = std::prev(chunk_vec.end());
} else {
// This can happen when a partition merge is complete but snapshot
// state in /metadata is not yet deleted; during this window if the
// device is rebooted, subsequent attempt will mount the snapshot.
// However, since the merge was completed we wouldn't have any
// mapping to COW ops thus chunk_vec will be empty. In that case,
// mark this as merge_complete and route the I/O to the base device.
merge_complete = true;
}
} else if (it->first != sector) {
if (it != chunk_vec.begin()) {
--it;
}
} else {
return ReadAlignedSector(sector, size, header_response);
}
loff_t requested_offset = sector << SECTOR_SHIFT;
loff_t final_offset = 0;
if (!merge_complete) {
final_offset = it->first << SECTOR_SHIFT;
}
// Since a COW op span 4k block size, we need to make sure that the requested
// offset is within the 4k region. Consider the following case:
//
// |-------|-------|-------|
// 0 1 2 3
//
// Block 0 - op 1
// Block 1 - op 2
//
// We have an I/O request for a sector between block 2 and block 3. However,
// we have mapping to COW ops only for block 0 and block 1. Thus, the
// requested offset in this case is beyond the last mapped COW op size (which
// is block 1 in this case).
size_t total_bytes_read = 0;
size_t remaining_size = size;
int ret = 0;
if (!merge_complete && (requested_offset >= final_offset) &&
(requested_offset - final_offset) < BLOCK_SZ) {
// Read the partial un-aligned data
ret = ReadUnalignedSector(sector, remaining_size, it);
if (ret < 0) {
SNAP_LOG(ERROR) << "ReadUnalignedSector failed for sector: " << sector
<< " size: " << size << " it->sector: " << it->first;
return RespondIOError(header_response);
}
remaining_size -= ret;
total_bytes_read += ret;
sector += (ret >> SECTOR_SHIFT);
// Send the data back
if (!WriteDmUserPayload(total_bytes_read, header_response)) {
return false;
}
header_response = false;
// If we still have pending data to be processed, this will be aligned I/O
if (remaining_size) {
return ReadAlignedSector(sector, remaining_size, header_response);
}
} else {
// This is all about handling I/O request to be routed to base device
// as the I/O is not mapped to any of the COW ops.
loff_t aligned_offset = requested_offset;
// Align to nearest 4k
aligned_offset += BLOCK_SZ - 1;
aligned_offset &= ~(BLOCK_SZ - 1);
// Find the diff of the aligned offset
size_t diff_size = aligned_offset - requested_offset;
CHECK(diff_size <= BLOCK_SZ);
if (remaining_size < diff_size) {
if (!ReadDataFromBaseDevice(sector, remaining_size)) {
return RespondIOError(header_response);
}
total_bytes_read += remaining_size;
if (!WriteDmUserPayload(total_bytes_read, header_response)) {
return false;
}
} else {
if (!ReadDataFromBaseDevice(sector, diff_size)) {
return RespondIOError(header_response);
}
total_bytes_read += diff_size;
if (!WriteDmUserPayload(total_bytes_read, header_response)) {
return false;
}
remaining_size -= diff_size;
size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
sector += num_sectors_read;
CHECK(IsBlockAligned(sector << SECTOR_SHIFT));
header_response = false;
// If we still have pending data to be processed, this will be aligned I/O
return ReadAlignedSector(sector, remaining_size, header_response);
}
}
return true;
}
bool Worker::RespondIOError(bool header_response) {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
header->type = DM_USER_RESP_ERROR;
// This is an issue with the dm-user interface. There
// is no way to propagate the I/O error back to dm-user
// if we have already communicated the header back. Header
// is responded once at the beginning; however I/O can
// be processed in chunks. If we encounter an I/O error
// somewhere in the middle of the processing, we can't communicate
// this back to dm-user.
//
// TODO: Fix the interface
CHECK(header_response);
if (!WriteDmUserPayload(0, header_response)) {
return false;
}
// There is no need to process further as we have already seen
// an I/O error
return true;
}
bool Worker::DmuserReadRequest() {
struct dm_user_header* header = bufsink_.GetHeaderPtr();
// Unaligned I/O request
if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) {
return ReadUnalignedSector(header->sector, header->len);
}
return ReadAlignedSector(header->sector, header->len, true);
}
bool Worker::ProcessIORequest() {
// No communication with dm-user yet
struct dm_user_header* header = bufsink_.GetHeaderPtr();
if (!ReadDmUserHeader()) {
return false;
}
SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
switch (header->type) {
case DM_USER_REQ_MAP_READ: {
if (!DmuserReadRequest()) {
return false;
}
break;
}
case DM_USER_REQ_MAP_WRITE: {
// TODO: We should not get any write request
// to dm-user as we mount all partitions
// as read-only. Need to verify how are TRIM commands
// handled during mount.
return false;
}
}
return true;
}

View file

@ -52,7 +52,6 @@ int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
break;
}
// Check for consecutive blocks
uint64_t next_offset = op->new_block * BLOCK_SZ;
if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
break;
@ -177,6 +176,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
void* mapped_addr = snapuserd_->GetMappedAddr();
void* read_ahead_buffer =
static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
size_t block_index = 0;
SNAP_LOG(INFO) << "MergeOrderedOps started....";
@ -190,9 +190,12 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
// Wait for RA thread to notify that the merge window
// is ready for merging.
if (!snapuserd_->WaitForMergeBegin()) {
snapuserd_->SetMergeFailed(block_index);
return false;
}
snapuserd_->SetMergeInProgress(block_index);
loff_t offset = 0;
int num_ops = snapuserd_->GetTotalBlocksToMerge();
SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
@ -213,6 +216,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
if (ret < 0 || ret != io_size) {
SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
<< " at offset: " << source_offset << " io_size: " << io_size;
snapuserd_->SetMergeFailed(block_index);
return false;
}
@ -226,6 +230,7 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
// Flush the data
if (fsync(base_path_merge_fd_.get()) < 0) {
SNAP_LOG(ERROR) << " Failed to fsync merged data";
snapuserd_->SetMergeFailed(block_index);
return false;
}
@ -233,14 +238,20 @@ bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
// the merge completion
if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
snapuserd_->SetMergeFailed(block_index);
return false;
}
SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
// Mark the block as merge complete
snapuserd_->SetMergeCompleted(block_index);
// Notify RA thread that the merge thread is ready to merge the next
// window
snapuserd_->NotifyRAForMergeReady();
// Get the next block
block_index += 1;
}
return true;

View file

@ -115,7 +115,7 @@ int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
}
bool ReadAhead::ReconstructDataFromCow() {
std::unordered_map<uint64_t, void*> read_ahead_buffer_map;
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
loff_t metadata_offset = 0;
loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
int num_ops = 0;
@ -319,6 +319,18 @@ bool ReadAhead::ReadAheadIOStart() {
memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
offset = 0;
std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
read_ahead_buffer_map.clear();
for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
uint64_t new_block = blocks[block_index];
read_ahead_buffer_map[new_block] = bufptr;
offset += BLOCK_SZ;
}
snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
// Flush the data only if we have a overlapping blocks in the region

View file

@ -0,0 +1,684 @@
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <arpa/inet.h>
#include <cutils/sockets.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <android-base/cmsg.h>
#include <android-base/logging.h>
#include <android-base/properties.h>
#include <android-base/scopeguard.h>
#include <fs_mgr/file_wait.h>
#include <snapuserd/snapuserd_client.h>
#include "snapuserd_server.h"
#define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
#include <sys/_system_properties.h>
namespace android {
namespace snapshot {
using namespace std::string_literals;
using android::base::borrowed_fd;
using android::base::unique_fd;
DaemonOps SnapuserServer::Resolveop(std::string& input) {
if (input == "init") return DaemonOps::INIT;
if (input == "start") return DaemonOps::START;
if (input == "stop") return DaemonOps::STOP;
if (input == "query") return DaemonOps::QUERY;
if (input == "delete") return DaemonOps::DELETE;
if (input == "detach") return DaemonOps::DETACH;
if (input == "supports") return DaemonOps::SUPPORTS;
if (input == "initiate_merge") return DaemonOps::INITIATE;
if (input == "merge_percent") return DaemonOps::PERCENTAGE;
if (input == "getstatus") return DaemonOps::GETSTATUS;
return DaemonOps::INVALID;
}
SnapuserServer::~SnapuserServer() {
// Close any client sockets that were added via AcceptClient().
for (size_t i = 1; i < watched_fds_.size(); i++) {
close(watched_fds_[i].fd);
}
}
std::string SnapuserServer::GetDaemonStatus() {
std::string msg = "";
if (IsTerminating())
msg = "passive";
else
msg = "active";
return msg;
}
void SnapuserServer::Parsemsg(std::string const& msg, const char delim,
std::vector<std::string>& out) {
std::stringstream ss(msg);
std::string s;
while (std::getline(ss, s, delim)) {
out.push_back(s);
}
}
void SnapuserServer::ShutdownThreads() {
terminating_ = true;
JoinAllThreads();
}
DmUserHandler::DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd)
: snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL));
if (ret < 0) {
PLOG(ERROR) << "Snapuserd:server: send() failed";
return false;
}
if (ret < msg.size()) {
LOG(ERROR) << "Partial send; expected " << msg.size() << " bytes, sent " << ret;
return false;
}
return true;
}
bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) {
char msg[MAX_PACKET_SIZE];
ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0));
if (rv < 0) {
PLOG(ERROR) << "recv failed";
return false;
}
*data = std::string(msg, rv);
return true;
}
bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) {
const char delim = ',';
std::vector<std::string> out;
Parsemsg(str, delim, out);
DaemonOps op = Resolveop(out[0]);
switch (op) {
case DaemonOps::INIT: {
// Message format:
// init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
//
// Reads the metadata and send the number of sectors
if (out.size() != 5) {
LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
auto handler = AddHandler(out[1], out[2], out[3], out[4]);
if (!handler) {
return Sendmsg(fd, "fail");
}
auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
return Sendmsg(fd, retval);
}
case DaemonOps::START: {
// Message format:
// start,<misc_name>
//
// Start the new thread which binds to dm-user misc device
if (out.size() != 2) {
LOG(ERROR) << "Malformed start message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
std::lock_guard<std::mutex> lock(lock_);
auto iter = FindHandler(&lock, out[1]);
if (iter == dm_users_.end()) {
LOG(ERROR) << "Could not find handler: " << out[1];
return Sendmsg(fd, "fail");
}
if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
LOG(ERROR) << "Tried to re-attach control device: " << out[1];
return Sendmsg(fd, "fail");
}
if (!StartHandler(*iter)) {
return Sendmsg(fd, "fail");
}
return Sendmsg(fd, "success");
}
case DaemonOps::STOP: {
// Message format: stop
//
// Stop all the threads gracefully and then shutdown the
// main thread
SetTerminating();
ShutdownThreads();
return true;
}
case DaemonOps::QUERY: {
// Message format: query
//
// As part of transition, Second stage daemon will be
// created before terminating the first stage daemon. Hence,
// for a brief period client may have to distiguish between
// first stage daemon and second stage daemon.
//
// Second stage daemon is marked as active and hence will
// be ready to receive control message.
return Sendmsg(fd, GetDaemonStatus());
}
case DaemonOps::DELETE: {
// Message format:
// delete,<misc_name>
if (out.size() != 2) {
LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
{
std::lock_guard<std::mutex> lock(lock_);
auto iter = FindHandler(&lock, out[1]);
if (iter == dm_users_.end()) {
// After merge is completed, we swap dm-user table with
// the underlying dm-linear base device. Hence, worker
// threads would have terminted and was removed from
// the list.
LOG(DEBUG) << "Could not find handler: " << out[1];
return Sendmsg(fd, "success");
}
if (!(*iter)->ThreadTerminated()) {
(*iter)->snapuserd()->NotifyIOTerminated();
}
}
if (!RemoveAndJoinHandler(out[1])) {
return Sendmsg(fd, "fail");
}
return Sendmsg(fd, "success");
}
case DaemonOps::DETACH: {
std::lock_guard<std::mutex> lock(lock_);
TerminateMergeThreads(&lock);
terminating_ = true;
return true;
}
case DaemonOps::SUPPORTS: {
if (out.size() != 2) {
LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
if (out[1] == "second_stage_socket_handoff") {
return Sendmsg(fd, "success");
}
return Sendmsg(fd, "fail");
}
case DaemonOps::INITIATE: {
if (out.size() != 2) {
LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
return Sendmsg(fd, "fail");
}
if (out[0] == "initiate_merge") {
std::lock_guard<std::mutex> lock(lock_);
auto iter = FindHandler(&lock, out[1]);
if (iter == dm_users_.end()) {
LOG(ERROR) << "Could not find handler: " << out[1];
return Sendmsg(fd, "fail");
}
if (!StartMerge(*iter)) {
return Sendmsg(fd, "fail");
}
return Sendmsg(fd, "success");
}
return Sendmsg(fd, "fail");
}
case DaemonOps::PERCENTAGE: {
std::lock_guard<std::mutex> lock(lock_);
double percentage = GetMergePercentage(&lock);
return Sendmsg(fd, std::to_string(percentage));
}
case DaemonOps::GETSTATUS: {
// Message format:
// getstatus,<misc_name>
if (out.size() != 2) {
LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
return Sendmsg(fd, "snapshot-merge-failed");
}
{
std::lock_guard<std::mutex> lock(lock_);
auto iter = FindHandler(&lock, out[1]);
if (iter == dm_users_.end()) {
LOG(ERROR) << "Could not find handler: " << out[1];
return Sendmsg(fd, "snapshot-merge-failed");
}
std::string merge_status = GetMergeStatus(*iter);
return Sendmsg(fd, merge_status);
}
}
default: {
LOG(ERROR) << "Received unknown message type from client";
Sendmsg(fd, "fail");
return false;
}
}
}
void SnapuserServer::RunThread(std::shared_ptr<DmUserHandler> handler) {
LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
handler->snapuserd()->SetSocketPresent(is_socket_present_);
if (!handler->snapuserd()->Start()) {
LOG(ERROR) << " Failed to launch all worker threads";
}
handler->snapuserd()->CloseFds();
handler->snapuserd()->CheckMergeCompletionStatus();
handler->snapuserd()->UnmapBufferRegion();
auto misc_name = handler->misc_name();
LOG(INFO) << "Handler thread about to exit: " << misc_name;
{
std::lock_guard<std::mutex> lock(lock_);
num_partitions_merge_complete_ += 1;
handler->SetThreadTerminated();
auto iter = FindHandler(&lock, handler->misc_name());
if (iter == dm_users_.end()) {
// RemoveAndJoinHandler() already removed us from the list, and is
// now waiting on a join(), so just return. Additionally, release
// all the resources held by snapuserd object which are shared
// by worker threads. This should be done when the last reference
// of "handler" is released; but we will explicitly release here
// to make sure snapuserd object is freed as it is the biggest
// consumer of memory in the daemon.
handler->FreeResources();
LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
return;
}
LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
if (handler->snapuserd()->IsAttached()) {
handler->thread().detach();
}
// Important: free resources within the lock. This ensures that if
// WaitForDelete() is called, the handler is either in the list, or
// it's not and its resources are guaranteed to be freed.
handler->FreeResources();
dm_users_.erase(iter);
}
}
bool SnapuserServer::Start(const std::string& socketname) {
bool start_listening = true;
sockfd_.reset(android_get_control_socket(socketname.c_str()));
if (sockfd_ < 0) {
sockfd_.reset(socket_local_server(socketname.c_str(), ANDROID_SOCKET_NAMESPACE_RESERVED,
SOCK_STREAM));
if (sockfd_ < 0) {
PLOG(ERROR) << "Failed to create server socket " << socketname;
return false;
}
start_listening = false;
}
return StartWithSocket(start_listening);
}
bool SnapuserServer::StartWithSocket(bool start_listening) {
if (start_listening && listen(sockfd_.get(), 4) < 0) {
PLOG(ERROR) << "listen socket failed";
return false;
}
AddWatchedFd(sockfd_, POLLIN);
is_socket_present_ = true;
// If started in first-stage init, the property service won't be online.
if (access("/dev/socket/property_service", F_OK) == 0) {
if (!android::base::SetProperty("snapuserd.ready", "true")) {
LOG(ERROR) << "Failed to set snapuserd.ready property";
return false;
}
}
LOG(DEBUG) << "Snapuserd server now accepting connections";
return true;
}
bool SnapuserServer::Run() {
LOG(INFO) << "Now listening on snapuserd socket";
while (!IsTerminating()) {
int rv = TEMP_FAILURE_RETRY(poll(watched_fds_.data(), watched_fds_.size(), -1));
if (rv < 0) {
PLOG(ERROR) << "poll failed";
return false;
}
if (!rv) {
continue;
}
if (watched_fds_[0].revents) {
AcceptClient();
}
auto iter = watched_fds_.begin() + 1;
while (iter != watched_fds_.end()) {
if (iter->revents && !HandleClient(iter->fd, iter->revents)) {
close(iter->fd);
iter = watched_fds_.erase(iter);
} else {
iter++;
}
}
}
JoinAllThreads();
return true;
}
void SnapuserServer::JoinAllThreads() {
// Acquire the thread list within the lock.
std::vector<std::shared_ptr<DmUserHandler>> dm_users;
{
std::lock_guard<std::mutex> guard(lock_);
dm_users = std::move(dm_users_);
}
for (auto& client : dm_users) {
auto& th = client->thread();
if (th.joinable()) th.join();
}
}
void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
struct pollfd p = {};
p.fd = fd.get();
p.events = events;
watched_fds_.emplace_back(std::move(p));
}
void SnapuserServer::AcceptClient() {
int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC));
if (fd < 0) {
PLOG(ERROR) << "accept4 failed";
return;
}
AddWatchedFd(fd, POLLIN);
}
bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) {
if (revents & POLLHUP) {
LOG(DEBUG) << "Snapuserd client disconnected";
return false;
}
std::string str;
if (!Recv(fd, &str)) {
return false;
}
if (!Receivemsg(fd, str)) {
LOG(ERROR) << "Encountered error handling client message, revents: " << revents;
return false;
}
return true;
}
void SnapuserServer::Interrupt() {
// Force close the socket so poll() fails.
sockfd_ = {};
SetTerminating();
}
std::shared_ptr<DmUserHandler> SnapuserServer::AddHandler(const std::string& misc_name,
const std::string& cow_device_path,
const std::string& backing_device,
const std::string& base_path_merge) {
auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
base_path_merge);
if (!snapuserd->InitCowDevice()) {
LOG(ERROR) << "Failed to initialize Snapuserd";
return nullptr;
}
if (!snapuserd->InitializeWorkers()) {
LOG(ERROR) << "Failed to initialize workers";
return nullptr;
}
auto handler = std::make_shared<DmUserHandler>(snapuserd);
{
std::lock_guard<std::mutex> lock(lock_);
if (FindHandler(&lock, misc_name) != dm_users_.end()) {
LOG(ERROR) << "Handler already exists: " << misc_name;
return nullptr;
}
dm_users_.push_back(handler);
}
return handler;
}
bool SnapuserServer::StartHandler(const std::shared_ptr<DmUserHandler>& handler) {
if (handler->snapuserd()->IsAttached()) {
LOG(ERROR) << "Handler already attached";
return false;
}
handler->snapuserd()->AttachControlDevice();
handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler));
return true;
}
bool SnapuserServer::StartMerge(const std::shared_ptr<DmUserHandler>& handler) {
if (!handler->snapuserd()->IsAttached()) {
LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
return false;
}
handler->snapuserd()->InitiateMerge();
return true;
}
auto SnapuserServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
const std::string& misc_name) -> HandlerList::iterator {
CHECK(proof_of_lock);
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
if ((*iter)->misc_name() == misc_name) {
return iter;
}
}
return dm_users_.end();
}
void SnapuserServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
CHECK(proof_of_lock);
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
if (!(*iter)->ThreadTerminated()) {
(*iter)->snapuserd()->NotifyIOTerminated();
}
}
}
std::string SnapuserServer::GetMergeStatus(const std::shared_ptr<DmUserHandler>& handler) {
return handler->snapuserd()->GetMergeStatus();
}
double SnapuserServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
CHECK(proof_of_lock);
double percentage = 0.0;
int n = 0;
for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
auto& th = (*iter)->thread();
if (th.joinable()) {
// Merge percentage by individual partitions wherein merge is still
// in-progress
percentage += (*iter)->snapuserd()->GetMergePercentage();
n += 1;
}
}
// Calculate final merge including those partitions where merge was already
// completed - num_partitions_merge_complete_ will track them when each
// thread exists in RunThread.
int total_partitions = n + num_partitions_merge_complete_;
if (total_partitions) {
percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
}
LOG(DEBUG) << "Merge %: " << percentage
<< " num_partitions_merge_complete_: " << num_partitions_merge_complete_
<< " total_partitions: " << total_partitions << " n: " << n;
return percentage;
}
bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) {
std::shared_ptr<DmUserHandler> handler;
{
std::lock_guard<std::mutex> lock(lock_);
auto iter = FindHandler(&lock, misc_name);
if (iter == dm_users_.end()) {
// Client already deleted.
return true;
}
handler = std::move(*iter);
dm_users_.erase(iter);
}
auto& th = handler->thread();
if (th.joinable()) {
th.join();
}
return true;
}
bool SnapuserServer::WaitForSocket() {
auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
if (!android::fs_mgr::WaitForFile(socket_path, std::chrono::milliseconds::max())) {
LOG(ERROR)
<< "Failed to wait for proxy socket, second-stage snapuserd will fail to connect";
return false;
}
// We must re-initialize property service access, since we launched before
// second-stage init.
__system_properties_init();
if (!android::base::WaitForProperty("snapuserd.proxy_ready", "true")) {
LOG(ERROR)
<< "Failed to wait for proxy property, second-stage snapuserd will fail to connect";
return false;
}
unique_fd fd(socket_local_client(kSnapuserdSocketProxy, ANDROID_SOCKET_NAMESPACE_RESERVED,
SOCK_SEQPACKET));
if (fd < 0) {
PLOG(ERROR) << "Failed to connect to socket proxy";
return false;
}
char code[1];
std::vector<unique_fd> fds;
ssize_t rv = android::base::ReceiveFileDescriptorVector(fd, code, sizeof(code), 1, &fds);
if (rv < 0) {
PLOG(ERROR) << "Failed to receive server socket over proxy";
return false;
}
if (fds.empty()) {
LOG(ERROR) << "Expected at least one file descriptor from proxy";
return false;
}
// We don't care if the ACK is received.
code[0] = 'a';
if (TEMP_FAILURE_RETRY(send(fd, code, sizeof(code), MSG_NOSIGNAL) < 0)) {
PLOG(ERROR) << "Failed to send ACK to proxy";
return false;
}
sockfd_ = std::move(fds[0]);
if (!StartWithSocket(true)) {
return false;
}
return Run();
}
bool SnapuserServer::RunForSocketHandoff() {
unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy));
if (proxy_fd < 0) {
PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy;
}
borrowed_fd server_fd(android_get_control_socket(kSnapuserdSocket));
if (server_fd < 0) {
PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocket;
}
if (listen(proxy_fd.get(), 4) < 0) {
PLOG(FATAL) << "Proxy listen socket failed";
}
if (!android::base::SetProperty("snapuserd.proxy_ready", "true")) {
LOG(FATAL) << "Proxy failed to set ready property";
}
unique_fd client_fd(
TEMP_FAILURE_RETRY(accept4(proxy_fd.get(), nullptr, nullptr, SOCK_CLOEXEC)));
if (client_fd < 0) {
PLOG(FATAL) << "Proxy accept failed";
}
char code[1] = {'a'};
std::vector<int> fds = {server_fd.get()};
ssize_t rv = android::base::SendFileDescriptorVector(client_fd, code, sizeof(code), fds);
if (rv < 0) {
PLOG(FATAL) << "Proxy could not send file descriptor to snapuserd";
}
// Wait for an ACK - results don't matter, we just don't want to risk closing
// the proxy socket too early.
if (recv(client_fd, code, sizeof(code), 0) < 0) {
PLOG(FATAL) << "Proxy could not receive terminating code from snapuserd";
}
return true;
}
} // namespace snapshot
} // namespace android

View file

@ -0,0 +1,142 @@
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <poll.h>
#include <cstdio>
#include <cstring>
#include <functional>
#include <future>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <android-base/unique_fd.h>
#include "snapuserd_core.h"
namespace android {
namespace snapshot {
static constexpr uint32_t MAX_PACKET_SIZE = 512;
enum class DaemonOps {
INIT,
START,
QUERY,
STOP,
DELETE,
DETACH,
SUPPORTS,
INITIATE,
PERCENTAGE,
GETSTATUS,
INVALID,
};
class DmUserHandler {
public:
explicit DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd);
void FreeResources() {
// Each worker thread holds a reference to snapuserd.
// Clear them so that all the resources
// held by snapuserd is released
if (snapuserd_) {
snapuserd_->FreeResources();
snapuserd_ = nullptr;
}
}
const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
std::thread& thread() { return thread_; }
const std::string& misc_name() const { return misc_name_; }
bool ThreadTerminated() { return thread_terminated_; }
void SetThreadTerminated() { thread_terminated_ = true; }
private:
std::thread thread_;
std::shared_ptr<SnapshotHandler> snapuserd_;
std::string misc_name_;
bool thread_terminated_ = false;
};
class SnapuserServer {
private:
android::base::unique_fd sockfd_;
bool terminating_;
volatile bool received_socket_signal_ = false;
std::vector<struct pollfd> watched_fds_;
bool is_socket_present_ = false;
int num_partitions_merge_complete_ = 0;
std::mutex lock_;
using HandlerList = std::vector<std::shared_ptr<DmUserHandler>>;
HandlerList dm_users_;
void AddWatchedFd(android::base::borrowed_fd fd, int events);
void AcceptClient();
bool HandleClient(android::base::borrowed_fd fd, int revents);
bool Recv(android::base::borrowed_fd fd, std::string* data);
bool Sendmsg(android::base::borrowed_fd fd, const std::string& msg);
bool Receivemsg(android::base::borrowed_fd fd, const std::string& str);
void ShutdownThreads();
bool RemoveAndJoinHandler(const std::string& control_device);
DaemonOps Resolveop(std::string& input);
std::string GetDaemonStatus();
void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
bool IsTerminating() { return terminating_; }
void RunThread(std::shared_ptr<DmUserHandler> handler);
void JoinAllThreads();
bool StartWithSocket(bool start_listening);
// Find a DmUserHandler within a lock.
HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
const std::string& misc_name);
double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
public:
SnapuserServer() { terminating_ = false; }
~SnapuserServer();
bool Start(const std::string& socketname);
bool Run();
void Interrupt();
bool RunForSocketHandoff();
bool WaitForSocket();
std::shared_ptr<DmUserHandler> AddHandler(const std::string& misc_name,
const std::string& cow_device_path,
const std::string& backing_device,
const std::string& base_path_merge);
bool StartHandler(const std::shared_ptr<DmUserHandler>& handler);
bool StartMerge(const std::shared_ptr<DmUserHandler>& handler);
std::string GetMergeStatus(const std::shared_ptr<DmUserHandler>& handler);
void SetTerminating() { terminating_ = true; }
void ReceivedSocketSignal() { received_socket_signal_ = true; }
};
} // namespace snapshot
} // namespace android

View file

@ -359,5 +359,289 @@ void SnapshotHandler::WaitForMergeComplete() {
}
}
std::string SnapshotHandler::GetMergeStatus() {
bool merge_not_initiated = false;
bool merge_failed = false;
{
std::lock_guard<std::mutex> lock(lock_);
if (!MergeInitiated()) {
merge_not_initiated = true;
}
if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) {
merge_failed = true;
}
}
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
bool merge_complete = (ch->num_merge_ops == reader_->get_num_total_data_ops());
if (merge_not_initiated) {
// Merge was not initiated yet; however, we have merge completion
// recorded in the COW Header. This can happen if the device was
// rebooted during merge. During next reboot, libsnapshot will
// query the status and if the merge is completed, then snapshot-status
// file will be deleted
if (merge_complete) {
return "snapshot-merge-complete";
}
// Return the state as "snapshot". If the device was rebooted during
// merge, we will return the status as "snapshot". This is ok, as
// libsnapshot will explicitly resume the merge. This is slightly
// different from kernel snapshot wherein once the snapshot was switched
// to merge target, during next boot, we immediately switch to merge
// target. We don't do that here because, during first stage init, we
// don't want to initiate the merge. The problem is that we have daemon
// transition between first and second stage init. If the merge was
// started, then we will have to quiesce the merge before switching
// the dm tables. Instead, we just wait until second stage daemon is up
// before resuming the merge.
return "snapshot";
}
if (merge_failed) {
return "snapshot-merge-failed";
}
// Merge complete
if (merge_complete) {
return "snapshot-merge-complete";
}
// Merge is in-progress
return "snapshot-merge";
}
//========== End of Read-ahead state transition functions ====================
/*
* Root partitions are mounted off dm-user and the I/O's are served
* by snapuserd worker threads.
*
* When there is an I/O request to be served by worker threads, we check
* if the corresponding sector is "changed" due to OTA by doing a lookup.
* If the lookup succeeds then the sector has been changed and that can
* either fall into 4 COW operations viz: COPY, XOR, REPLACE and ZERO.
*
* For the case of REPLACE and ZERO ops, there is not much of a concern
* as there is no dependency between blocks. Hence all the I/O request
* mapped to these two COW operations will be served by reading the COW device.
*
* However, COPY and XOR ops are tricky. Since the merge operations are
* in-progress, we cannot just go and read from the source device. We need
* to be in sync with the state of the merge thread before serving the I/O.
*
* Given that we know merge thread processes a set of COW ops called as RA
* Blocks - These set of COW ops are fixed size wherein each Block comprises
* of 510 COW ops.
*
* +--------------------------+
* |op-1|op-2|op-3|....|op-510|
* +--------------------------+
*
* <------ Merge Group Block N ------>
*
* Thus, a Merge Group Block N, will fall into one of these states and will
* transition the states in the following order:
*
* 1: GROUP_MERGE_PENDING
* 2: GROUP_MERGE_RA_READY
* 2: GROUP_MERGE_IN_PROGRESS
* 3: GROUP_MERGE_COMPLETED
* 4: GROUP_MERGE_FAILED
*
* Let's say that we have the I/O request from dm-user whose sector gets mapped
* to a COPY operation with op-10 in the above "Merge Group Block N".
*
* 1: If the Group is in "GROUP_MERGE_PENDING" state:
*
* Just read the data from source block based on COW op->source field. Note,
* that we will take a ref count on "Block N". This ref count will prevent
* merge thread to begin merging if there are any pending I/Os. Once the I/O
* is completed, ref count on "Group N" is decremented. Merge thread will
* resume merging "Group N" if there are no pending I/Os.
*
* 2: If the Group is in "GROUP_MERGE_IN_PROGRESS" or "GROUP_MERGE_RA_READY" state:
*
* When the merge thread is ready to process a "Group", it will first move
* the state to GROUP_MERGE_PENDING -> GROUP_MERGE_RA_READY. From this point
* onwards, I/O will be served from Read-ahead buffer. However, merge thread
* cannot start merging this "Group" immediately. If there were any in-flight
* I/O requests, merge thread should wait and allow those I/O's to drain.
* Once all the in-flight I/O's are completed, merge thread will move the
* state from "GROUP_MERGE_RA_READY" -> "GROUP_MERGE_IN_PROGRESS". I/O will
* be continued to serve from Read-ahead buffer during the entire duration
* of the merge.
*
* See SetMergeInProgress().
*
* 3: If the Group is in "GROUP_MERGE_COMPLETED" state:
*
* This is straightforward. We just read the data directly from "Base"
* device. We should not be reading the COW op->source field.
*
* 4: If the Block is in "GROUP_MERGE_FAILED" state:
*
* Terminate the I/O with an I/O error as we don't know which "op" in the
* "Group" failed.
*
* Transition ensures that the I/O from root partitions are never made to
* wait and are processed immediately. Thus the state transition for any
* "Group" is:
*
* GROUP_MERGE_PENDING
* |
* |
* v
* GROUP_MERGE_RA_READY
* |
* |
* v
* GROUP_MERGE_IN_PROGRESS
* |
* |----------------------------(on failure)
* | |
* v v
* GROUP_MERGE_COMPLETED GROUP_MERGE_FAILED
*
*/
// Invoked by Merge thread
void SnapshotHandler::SetMergeCompleted(size_t ra_index) {
MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
{
std::lock_guard<std::mutex> lock(blk_state->m_lock);
CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS);
CHECK(blk_state->num_ios_in_progress == 0);
// Merge is complete - All I/O henceforth should be read directly
// from base device
blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED;
}
}
// Invoked by Merge thread. This is called just before the beginning
// of merging a given Block of 510 ops. If there are any in-flight I/O's
// from dm-user then wait for them to complete.
void SnapshotHandler::SetMergeInProgress(size_t ra_index) {
MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
{
std::unique_lock<std::mutex> lock(blk_state->m_lock);
CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
// First set the state to RA_READY so that in-flight I/O will drain
// and any new I/O will start reading from RA buffer
blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_RA_READY;
// Wait if there are any in-flight I/O's - we cannot merge at this point
while (!(blk_state->num_ios_in_progress == 0)) {
blk_state->m_cv.wait(lock);
}
blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS;
}
}
// Invoked by Merge thread on failure
void SnapshotHandler::SetMergeFailed(size_t ra_index) {
MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
{
std::unique_lock<std::mutex> lock(blk_state->m_lock);
blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_FAILED;
}
}
// Invoked by worker threads when I/O is complete on a "MERGE_PENDING"
// Block. If there are no more in-flight I/Os, wake up merge thread
// to resume merging.
void SnapshotHandler::NotifyIOCompletion(uint64_t new_block) {
auto it = block_to_ra_index_.find(new_block);
CHECK(it != block_to_ra_index_.end()) << " invalid block: " << new_block;
bool pending_ios = true;
int ra_index = it->second;
MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
{
std::unique_lock<std::mutex> lock(blk_state->m_lock);
CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
blk_state->num_ios_in_progress -= 1;
if (blk_state->num_ios_in_progress == 0) {
pending_ios = false;
}
}
// Give a chance to merge-thread to resume merge
// as there are no pending I/O.
if (!pending_ios) {
blk_state->m_cv.notify_all();
}
}
bool SnapshotHandler::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block,
void* buffer) {
if (!lock->owns_lock()) {
SNAP_LOG(ERROR) << "GetRABuffer - Lock not held";
return false;
}
std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
if (it == read_ahead_buffer_map_.end()) {
SNAP_LOG(ERROR) << "Block: " << block << " not found in RA buffer";
return false;
}
memcpy(buffer, it->second, BLOCK_SZ);
return true;
}
// Invoked by worker threads in the I/O path. This is called when a sector
// is mapped to a COPY/XOR COW op.
MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void* buffer) {
auto it = block_to_ra_index_.find(new_block);
if (it == block_to_ra_index_.end()) {
return MERGE_GROUP_STATE::GROUP_INVALID;
}
int ra_index = it->second;
MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
{
std::unique_lock<std::mutex> lock(blk_state->m_lock);
MERGE_GROUP_STATE state = blk_state->merge_state_;
switch (state) {
case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
blk_state->num_ios_in_progress += 1; // ref count
[[fallthrough]];
}
case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
[[fallthrough]];
}
case MERGE_GROUP_STATE::GROUP_MERGE_FAILED: {
return state;
}
// Fetch the data from RA buffer.
case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
[[fallthrough]];
}
case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
if (!GetRABuffer(&lock, new_block, buffer)) {
return MERGE_GROUP_STATE::GROUP_INVALID;
}
return state;
}
default: {
return MERGE_GROUP_STATE::GROUP_INVALID;
}
}
}
}
} // namespace snapshot
} // namespace android