Merge changes I6791dd72,I887d5073

* changes:
  Reduce priority of merge threads
  Tune snapshot-merge performance
This commit is contained in:
Akilesh Kailash 2022-07-03 06:54:12 +00:00 committed by Gerrit Code Review
commit 5c00853601
8 changed files with 134 additions and 11 deletions

View file

@ -2151,8 +2151,17 @@ bool SnapshotManager::ListSnapshots(LockedFile* lock, std::vector<std::string>*
if (!suffix.empty() && !android::base::EndsWith(name, suffix)) {
continue;
}
snapshots->emplace_back(std::move(name));
// Insert system and product partition at the beginning so that
// during snapshot-merge, these partitions are merged first.
if (name == "system_a" || name == "system_b" || name == "product_a" ||
name == "product_b") {
snapshots->insert(snapshots->begin(), std::move(name));
} else {
snapshots->emplace_back(std::move(name));
}
}
return true;
}

View file

@ -147,17 +147,18 @@ void SnapshotHandler::PrepareReadAhead() {
NotifyRAForMergeReady();
}
void SnapshotHandler::CheckMergeCompletionStatus() {
bool SnapshotHandler::CheckMergeCompletionStatus() {
if (!merge_initiated_) {
SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
<< reader_->get_num_total_data_ops();
return;
return false;
}
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
<< " Total-data-ops: " << reader_->get_num_total_data_ops();
return true;
}
bool SnapshotHandler::ReadMetadata() {

View file

@ -18,6 +18,9 @@
#include <stdint.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/time.h>
#include <unistd.h>
#include <condition_variable>
#include <cstring>
@ -56,6 +59,8 @@ static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ);
static constexpr int kNumWorkerThreads = 4;
static constexpr int kNiceValueForMergeThreads = -5;
#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
@ -306,7 +311,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
const bool& IsAttached() const { return attached_; }
void AttachControlDevice() { attached_ = true; }
void CheckMergeCompletionStatus();
bool CheckMergeCompletionStatus();
bool CommitMerge(int num_merge_ops);
void CloseFds() { cow_fd_ = {}; }
@ -337,6 +342,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
// State transitions for merge
void InitiateMerge();
void MonitorMerge();
void WakeupMonitorMergeThread();
void WaitForMergeComplete();
bool WaitForMergeBegin();
void NotifyRAForMergeReady();
@ -365,6 +372,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
void SetIouringEnabled(bool io_uring_enabled) { is_io_uring_enabled_ = io_uring_enabled; }
bool MergeInitiated() { return merge_initiated_; }
bool MergeMonitored() { return merge_monitored_; }
double GetMergePercentage() { return merge_completion_percentage_; }
// Merge Block State Transitions
@ -431,6 +439,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
double merge_completion_percentage_;
bool merge_initiated_ = false;
bool merge_monitored_ = false;
bool attached_ = false;
bool is_socket_present_;
bool is_io_uring_enabled_ = false;

View file

@ -543,6 +543,10 @@ bool Worker::RunMergeThread() {
return true;
}
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
}
SNAP_LOG(INFO) << "Merge starting..";
if (!Init()) {

View file

@ -727,6 +727,10 @@ bool ReadAhead::RunThread() {
InitializeIouring();
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
}
while (!RAIterDone()) {
if (!ReadAheadIOStart()) {
break;

View file

@ -60,6 +60,14 @@ DaemonOps UserSnapshotServer::Resolveop(std::string& input) {
return DaemonOps::INVALID;
}
UserSnapshotServer::UserSnapshotServer() {
monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
if (monitor_merge_event_fd_ == -1) {
PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
}
terminating_ = false;
}
UserSnapshotServer::~UserSnapshotServer() {
// Close any client sockets that were added via AcceptClient().
for (size_t i = 1; i < watched_fds_.size(); i++) {
@ -250,7 +258,7 @@ bool UserSnapshotServer::Receivemsg(android::base::borrowed_fd fd, const std::st
return Sendmsg(fd, "fail");
}
if (!StartMerge(*iter)) {
if (!StartMerge(&lock, *iter)) {
return Sendmsg(fd, "fail");
}
@ -307,7 +315,7 @@ void UserSnapshotServer::RunThread(std::shared_ptr<UserSnapshotDmUserHandler> ha
}
handler->snapuserd()->CloseFds();
handler->snapuserd()->CheckMergeCompletionStatus();
bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
handler->snapuserd()->UnmapBufferRegion();
auto misc_name = handler->misc_name();
@ -315,7 +323,11 @@ void UserSnapshotServer::RunThread(std::shared_ptr<UserSnapshotDmUserHandler> ha
{
std::lock_guard<std::mutex> lock(lock_);
num_partitions_merge_complete_ += 1;
if (merge_completed) {
num_partitions_merge_complete_ += 1;
active_merge_threads_ -= 1;
WakeupMonitorMergeThread();
}
handler->SetThreadTerminated();
auto iter = FindHandler(&lock, handler->misc_name());
if (iter == dm_users_.end()) {
@ -427,6 +439,9 @@ void UserSnapshotServer::JoinAllThreads() {
if (th.joinable()) th.join();
}
stop_monitor_merge_thread_ = true;
WakeupMonitorMergeThread();
}
void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
@ -511,13 +526,24 @@ bool UserSnapshotServer::StartHandler(const std::shared_ptr<UserSnapshotDmUserHa
return true;
}
bool UserSnapshotServer::StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
CHECK(proof_of_lock);
if (!handler->snapuserd()->IsAttached()) {
LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
return false;
}
handler->snapuserd()->InitiateMerge();
handler->snapuserd()->MonitorMerge();
if (!is_merge_monitor_started_.has_value()) {
std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
is_merge_monitor_started_ = true;
}
merge_handlers_.push(handler);
WakeupMonitorMergeThread();
return true;
}
@ -599,6 +625,42 @@ bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) {
return true;
}
void UserSnapshotServer::WakeupMonitorMergeThread() {
uint64_t notify = 1;
ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
if (rc < 0) {
PLOG(FATAL) << "failed to notify monitor merge thread";
}
}
void UserSnapshotServer::MonitorMerge() {
while (!stop_monitor_merge_thread_) {
uint64_t testVal;
ssize_t ret =
TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
if (ret == -1) {
PLOG(FATAL) << "Failed to read from eventfd";
} else if (ret == 0) {
LOG(FATAL) << "Hit EOF on eventfd";
}
LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
{
std::lock_guard<std::mutex> lock(lock_);
while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
auto handler = merge_handlers_.front();
merge_handlers_.pop();
LOG(INFO) << "Starting merge for partition: "
<< handler->snapuserd()->GetMiscName();
handler->snapuserd()->InitiateMerge();
active_merge_threads_ += 1;
}
}
}
LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
}
bool UserSnapshotServer::WaitForSocket() {
auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
@ -655,6 +717,7 @@ bool UserSnapshotServer::WaitForSocket() {
if (!StartWithSocket(true)) {
return false;
}
return Run();
}

View file

@ -15,6 +15,7 @@
#pragma once
#include <poll.h>
#include <sys/eventfd.h>
#include <cstdio>
#include <cstring>
@ -22,6 +23,8 @@
#include <future>
#include <iostream>
#include <mutex>
#include <optional>
#include <queue>
#include <sstream>
#include <string>
#include <thread>
@ -34,6 +37,7 @@ namespace android {
namespace snapshot {
static constexpr uint32_t kMaxPacketSize = 512;
static constexpr uint8_t kMaxMergeThreads = 2;
enum class DaemonOps {
INIT,
@ -85,13 +89,19 @@ class UserSnapshotServer {
std::vector<struct pollfd> watched_fds_;
bool is_socket_present_ = false;
int num_partitions_merge_complete_ = 0;
int active_merge_threads_ = 0;
bool stop_monitor_merge_thread_ = false;
bool is_server_running_ = false;
bool io_uring_enabled_ = false;
std::optional<bool> is_merge_monitor_started_;
android::base::unique_fd monitor_merge_event_fd_;
std::mutex lock_;
using HandlerList = std::vector<std::shared_ptr<UserSnapshotDmUserHandler>>;
HandlerList dm_users_;
std::queue<std::shared_ptr<UserSnapshotDmUserHandler>> merge_handlers_;
void AddWatchedFd(android::base::borrowed_fd fd, int events);
void AcceptClient();
@ -109,6 +119,8 @@ class UserSnapshotServer {
bool IsTerminating() { return terminating_; }
void RunThread(std::shared_ptr<UserSnapshotDmUserHandler> handler);
void MonitorMerge();
void JoinAllThreads();
bool StartWithSocket(bool start_listening);
@ -122,7 +134,7 @@ class UserSnapshotServer {
bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
public:
UserSnapshotServer() { terminating_ = false; }
UserSnapshotServer();
~UserSnapshotServer();
bool Start(const std::string& socketname);
@ -136,9 +148,11 @@ class UserSnapshotServer {
const std::string& backing_device,
const std::string& base_path_merge);
bool StartHandler(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
bool StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
std::string GetMergeStatus(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
void WakeupMonitorMergeThread();
void SetTerminating() { terminating_ = true; }
void ReceivedSocketSignal() { received_socket_signal_ = true; }
void SetServerRunning() { is_server_running_ = true; }

View file

@ -165,6 +165,13 @@ using namespace android;
using namespace android::dm;
using android::base::unique_fd;
void SnapshotHandler::MonitorMerge() {
{
std::lock_guard<std::mutex> lock(lock_);
merge_monitored_ = true;
}
}
// This is invoked once primarily by update-engine to initiate
// the merge
void SnapshotHandler::InitiateMerge() {
@ -361,10 +368,16 @@ void SnapshotHandler::WaitForMergeComplete() {
std::string SnapshotHandler::GetMergeStatus() {
bool merge_not_initiated = false;
bool merge_monitored = false;
bool merge_failed = false;
{
std::lock_guard<std::mutex> lock(lock_);
if (MergeMonitored()) {
merge_monitored = true;
}
if (!MergeInitiated()) {
merge_not_initiated = true;
}
@ -387,6 +400,12 @@ std::string SnapshotHandler::GetMergeStatus() {
return "snapshot-merge-complete";
}
// Merge monitor thread is tracking the merge but the merge thread
// is not started yet.
if (merge_monitored) {
return "snapshot-merge";
}
// Return the state as "snapshot". If the device was rebooted during
// merge, we will return the status as "snapshot". This is ok, as
// libsnapshot will explicitly resume the merge. This is slightly