Merge changes I6791dd72,I887d5073
* changes: Reduce priority of merge threads Tune snapshot-merge performance
This commit is contained in:
commit
5c00853601
8 changed files with 134 additions and 11 deletions
|
|
@ -2151,8 +2151,17 @@ bool SnapshotManager::ListSnapshots(LockedFile* lock, std::vector<std::string>*
|
|||
if (!suffix.empty() && !android::base::EndsWith(name, suffix)) {
|
||||
continue;
|
||||
}
|
||||
snapshots->emplace_back(std::move(name));
|
||||
|
||||
// Insert system and product partition at the beginning so that
|
||||
// during snapshot-merge, these partitions are merged first.
|
||||
if (name == "system_a" || name == "system_b" || name == "product_a" ||
|
||||
name == "product_b") {
|
||||
snapshots->insert(snapshots->begin(), std::move(name));
|
||||
} else {
|
||||
snapshots->emplace_back(std::move(name));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -147,17 +147,18 @@ void SnapshotHandler::PrepareReadAhead() {
|
|||
NotifyRAForMergeReady();
|
||||
}
|
||||
|
||||
void SnapshotHandler::CheckMergeCompletionStatus() {
|
||||
bool SnapshotHandler::CheckMergeCompletionStatus() {
|
||||
if (!merge_initiated_) {
|
||||
SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
|
||||
<< reader_->get_num_total_data_ops();
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
|
||||
|
||||
SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
|
||||
<< " Total-data-ops: " << reader_->get_num_total_data_ops();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool SnapshotHandler::ReadMetadata() {
|
||||
|
|
|
|||
|
|
@ -18,6 +18,9 @@
|
|||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/resource.h>
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <cstring>
|
||||
|
|
@ -56,6 +59,8 @@ static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ);
|
|||
|
||||
static constexpr int kNumWorkerThreads = 4;
|
||||
|
||||
static constexpr int kNiceValueForMergeThreads = -5;
|
||||
|
||||
#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
|
||||
#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
|
||||
|
||||
|
|
@ -306,7 +311,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
const bool& IsAttached() const { return attached_; }
|
||||
void AttachControlDevice() { attached_ = true; }
|
||||
|
||||
void CheckMergeCompletionStatus();
|
||||
bool CheckMergeCompletionStatus();
|
||||
bool CommitMerge(int num_merge_ops);
|
||||
|
||||
void CloseFds() { cow_fd_ = {}; }
|
||||
|
|
@ -337,6 +342,8 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
|
||||
// State transitions for merge
|
||||
void InitiateMerge();
|
||||
void MonitorMerge();
|
||||
void WakeupMonitorMergeThread();
|
||||
void WaitForMergeComplete();
|
||||
bool WaitForMergeBegin();
|
||||
void NotifyRAForMergeReady();
|
||||
|
|
@ -365,6 +372,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
|
||||
void SetIouringEnabled(bool io_uring_enabled) { is_io_uring_enabled_ = io_uring_enabled; }
|
||||
bool MergeInitiated() { return merge_initiated_; }
|
||||
bool MergeMonitored() { return merge_monitored_; }
|
||||
double GetMergePercentage() { return merge_completion_percentage_; }
|
||||
|
||||
// Merge Block State Transitions
|
||||
|
|
@ -431,6 +439,7 @@ class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
|
|||
double merge_completion_percentage_;
|
||||
|
||||
bool merge_initiated_ = false;
|
||||
bool merge_monitored_ = false;
|
||||
bool attached_ = false;
|
||||
bool is_socket_present_;
|
||||
bool is_io_uring_enabled_ = false;
|
||||
|
|
|
|||
|
|
@ -543,6 +543,10 @@ bool Worker::RunMergeThread() {
|
|||
return true;
|
||||
}
|
||||
|
||||
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
|
||||
SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
|
||||
}
|
||||
|
||||
SNAP_LOG(INFO) << "Merge starting..";
|
||||
|
||||
if (!Init()) {
|
||||
|
|
|
|||
|
|
@ -727,6 +727,10 @@ bool ReadAhead::RunThread() {
|
|||
|
||||
InitializeIouring();
|
||||
|
||||
if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
|
||||
SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
|
||||
}
|
||||
|
||||
while (!RAIterDone()) {
|
||||
if (!ReadAheadIOStart()) {
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -60,6 +60,14 @@ DaemonOps UserSnapshotServer::Resolveop(std::string& input) {
|
|||
return DaemonOps::INVALID;
|
||||
}
|
||||
|
||||
UserSnapshotServer::UserSnapshotServer() {
|
||||
monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
|
||||
if (monitor_merge_event_fd_ == -1) {
|
||||
PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
|
||||
}
|
||||
terminating_ = false;
|
||||
}
|
||||
|
||||
UserSnapshotServer::~UserSnapshotServer() {
|
||||
// Close any client sockets that were added via AcceptClient().
|
||||
for (size_t i = 1; i < watched_fds_.size(); i++) {
|
||||
|
|
@ -250,7 +258,7 @@ bool UserSnapshotServer::Receivemsg(android::base::borrowed_fd fd, const std::st
|
|||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
if (!StartMerge(*iter)) {
|
||||
if (!StartMerge(&lock, *iter)) {
|
||||
return Sendmsg(fd, "fail");
|
||||
}
|
||||
|
||||
|
|
@ -307,7 +315,7 @@ void UserSnapshotServer::RunThread(std::shared_ptr<UserSnapshotDmUserHandler> ha
|
|||
}
|
||||
|
||||
handler->snapuserd()->CloseFds();
|
||||
handler->snapuserd()->CheckMergeCompletionStatus();
|
||||
bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
|
||||
handler->snapuserd()->UnmapBufferRegion();
|
||||
|
||||
auto misc_name = handler->misc_name();
|
||||
|
|
@ -315,7 +323,11 @@ void UserSnapshotServer::RunThread(std::shared_ptr<UserSnapshotDmUserHandler> ha
|
|||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
num_partitions_merge_complete_ += 1;
|
||||
if (merge_completed) {
|
||||
num_partitions_merge_complete_ += 1;
|
||||
active_merge_threads_ -= 1;
|
||||
WakeupMonitorMergeThread();
|
||||
}
|
||||
handler->SetThreadTerminated();
|
||||
auto iter = FindHandler(&lock, handler->misc_name());
|
||||
if (iter == dm_users_.end()) {
|
||||
|
|
@ -427,6 +439,9 @@ void UserSnapshotServer::JoinAllThreads() {
|
|||
|
||||
if (th.joinable()) th.join();
|
||||
}
|
||||
|
||||
stop_monitor_merge_thread_ = true;
|
||||
WakeupMonitorMergeThread();
|
||||
}
|
||||
|
||||
void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
|
||||
|
|
@ -511,13 +526,24 @@ bool UserSnapshotServer::StartHandler(const std::shared_ptr<UserSnapshotDmUserHa
|
|||
return true;
|
||||
}
|
||||
|
||||
bool UserSnapshotServer::StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
|
||||
bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
|
||||
const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
|
||||
CHECK(proof_of_lock);
|
||||
|
||||
if (!handler->snapuserd()->IsAttached()) {
|
||||
LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
|
||||
return false;
|
||||
}
|
||||
|
||||
handler->snapuserd()->InitiateMerge();
|
||||
handler->snapuserd()->MonitorMerge();
|
||||
|
||||
if (!is_merge_monitor_started_.has_value()) {
|
||||
std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
|
||||
is_merge_monitor_started_ = true;
|
||||
}
|
||||
|
||||
merge_handlers_.push(handler);
|
||||
WakeupMonitorMergeThread();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -599,6 +625,42 @@ bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void UserSnapshotServer::WakeupMonitorMergeThread() {
|
||||
uint64_t notify = 1;
|
||||
ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), ¬ify, sizeof(notify)));
|
||||
if (rc < 0) {
|
||||
PLOG(FATAL) << "failed to notify monitor merge thread";
|
||||
}
|
||||
}
|
||||
|
||||
void UserSnapshotServer::MonitorMerge() {
|
||||
while (!stop_monitor_merge_thread_) {
|
||||
uint64_t testVal;
|
||||
ssize_t ret =
|
||||
TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
|
||||
if (ret == -1) {
|
||||
PLOG(FATAL) << "Failed to read from eventfd";
|
||||
} else if (ret == 0) {
|
||||
LOG(FATAL) << "Hit EOF on eventfd";
|
||||
}
|
||||
|
||||
LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
|
||||
auto handler = merge_handlers_.front();
|
||||
merge_handlers_.pop();
|
||||
LOG(INFO) << "Starting merge for partition: "
|
||||
<< handler->snapuserd()->GetMiscName();
|
||||
handler->snapuserd()->InitiateMerge();
|
||||
active_merge_threads_ += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
|
||||
}
|
||||
|
||||
bool UserSnapshotServer::WaitForSocket() {
|
||||
auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
|
||||
|
||||
|
|
@ -655,6 +717,7 @@ bool UserSnapshotServer::WaitForSocket() {
|
|||
if (!StartWithSocket(true)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return Run();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
#pragma once
|
||||
|
||||
#include <poll.h>
|
||||
#include <sys/eventfd.h>
|
||||
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
|
|
@ -22,6 +23,8 @@
|
|||
#include <future>
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
|
@ -34,6 +37,7 @@ namespace android {
|
|||
namespace snapshot {
|
||||
|
||||
static constexpr uint32_t kMaxPacketSize = 512;
|
||||
static constexpr uint8_t kMaxMergeThreads = 2;
|
||||
|
||||
enum class DaemonOps {
|
||||
INIT,
|
||||
|
|
@ -85,13 +89,19 @@ class UserSnapshotServer {
|
|||
std::vector<struct pollfd> watched_fds_;
|
||||
bool is_socket_present_ = false;
|
||||
int num_partitions_merge_complete_ = 0;
|
||||
int active_merge_threads_ = 0;
|
||||
bool stop_monitor_merge_thread_ = false;
|
||||
bool is_server_running_ = false;
|
||||
bool io_uring_enabled_ = false;
|
||||
std::optional<bool> is_merge_monitor_started_;
|
||||
|
||||
android::base::unique_fd monitor_merge_event_fd_;
|
||||
|
||||
std::mutex lock_;
|
||||
|
||||
using HandlerList = std::vector<std::shared_ptr<UserSnapshotDmUserHandler>>;
|
||||
HandlerList dm_users_;
|
||||
std::queue<std::shared_ptr<UserSnapshotDmUserHandler>> merge_handlers_;
|
||||
|
||||
void AddWatchedFd(android::base::borrowed_fd fd, int events);
|
||||
void AcceptClient();
|
||||
|
|
@ -109,6 +119,8 @@ class UserSnapshotServer {
|
|||
bool IsTerminating() { return terminating_; }
|
||||
|
||||
void RunThread(std::shared_ptr<UserSnapshotDmUserHandler> handler);
|
||||
void MonitorMerge();
|
||||
|
||||
void JoinAllThreads();
|
||||
bool StartWithSocket(bool start_listening);
|
||||
|
||||
|
|
@ -122,7 +134,7 @@ class UserSnapshotServer {
|
|||
bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
|
||||
|
||||
public:
|
||||
UserSnapshotServer() { terminating_ = false; }
|
||||
UserSnapshotServer();
|
||||
~UserSnapshotServer();
|
||||
|
||||
bool Start(const std::string& socketname);
|
||||
|
|
@ -136,9 +148,11 @@ class UserSnapshotServer {
|
|||
const std::string& backing_device,
|
||||
const std::string& base_path_merge);
|
||||
bool StartHandler(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
|
||||
bool StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
|
||||
bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
|
||||
const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
|
||||
std::string GetMergeStatus(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
|
||||
|
||||
void WakeupMonitorMergeThread();
|
||||
void SetTerminating() { terminating_ = true; }
|
||||
void ReceivedSocketSignal() { received_socket_signal_ = true; }
|
||||
void SetServerRunning() { is_server_running_ = true; }
|
||||
|
|
|
|||
|
|
@ -165,6 +165,13 @@ using namespace android;
|
|||
using namespace android::dm;
|
||||
using android::base::unique_fd;
|
||||
|
||||
void SnapshotHandler::MonitorMerge() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
merge_monitored_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
// This is invoked once primarily by update-engine to initiate
|
||||
// the merge
|
||||
void SnapshotHandler::InitiateMerge() {
|
||||
|
|
@ -361,10 +368,16 @@ void SnapshotHandler::WaitForMergeComplete() {
|
|||
|
||||
std::string SnapshotHandler::GetMergeStatus() {
|
||||
bool merge_not_initiated = false;
|
||||
bool merge_monitored = false;
|
||||
bool merge_failed = false;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
|
||||
if (MergeMonitored()) {
|
||||
merge_monitored = true;
|
||||
}
|
||||
|
||||
if (!MergeInitiated()) {
|
||||
merge_not_initiated = true;
|
||||
}
|
||||
|
|
@ -387,6 +400,12 @@ std::string SnapshotHandler::GetMergeStatus() {
|
|||
return "snapshot-merge-complete";
|
||||
}
|
||||
|
||||
// Merge monitor thread is tracking the merge but the merge thread
|
||||
// is not started yet.
|
||||
if (merge_monitored) {
|
||||
return "snapshot-merge";
|
||||
}
|
||||
|
||||
// Return the state as "snapshot". If the device was rebooted during
|
||||
// merge, we will return the status as "snapshot". This is ok, as
|
||||
// libsnapshot will explicitly resume the merge. This is slightly
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue