diff --git a/adb/fdevent/fdevent.cpp b/adb/fdevent/fdevent.cpp index e80bb5a51..c858f6be4 100644 --- a/adb/fdevent/fdevent.cpp +++ b/adb/fdevent/fdevent.cpp @@ -49,6 +49,32 @@ std::string dump_fde(const fdevent* fde) { state.c_str()); } +void fdevent_context::Run(std::function fn) { + { + std::lock_guard lock(run_queue_mutex_); + run_queue_.push_back(std::move(fn)); + } + + Interrupt(); +} + +void fdevent_context::FlushRunQueue() { + // We need to be careful around reentrancy here, since a function we call can queue up another + // function. + while (true) { + std::function fn; + { + std::lock_guard lock(this->run_queue_mutex_); + if (this->run_queue_.empty()) { + break; + } + fn = this->run_queue_.front(); + this->run_queue_.pop_front(); + } + fn(); + } +} + static auto& g_ambient_fdevent_context = *new std::unique_ptr(new fdevent_context_poll()); diff --git a/adb/fdevent/fdevent.h b/adb/fdevent/fdevent.h index b46219c51..5a2f2c63f 100644 --- a/adb/fdevent/fdevent.h +++ b/adb/fdevent/fdevent.h @@ -21,10 +21,14 @@ #include #include +#include #include +#include #include #include +#include + #include "adb_unique_fd.h" // Events that may be observed @@ -48,6 +52,7 @@ struct fdevent; std::string dump_fde(const fdevent* fde); struct fdevent_context { + public: virtual ~fdevent_context() = default; // Allocate and initialize a new fdevent object. @@ -68,17 +73,29 @@ struct fdevent_context { virtual void SetTimeout(fdevent* fde, std::optional timeout) = 0; // Loop forever, handling events. + // Implementations should call FlushRunQueue on every iteration. virtual void Loop() = 0; // Assert that the caller is running on the context's main thread. virtual void CheckMainThread() = 0; // Queue an operation to be run on the main thread. - virtual void Run(std::function fn) = 0; + void Run(std::function fn); // Test-only functionality: virtual void TerminateLoop() = 0; virtual size_t InstalledCount() = 0; + + protected: + // Interrupt the run loop. + virtual void Interrupt() = 0; + + // Run all pending functions enqueued via Run(). + void FlushRunQueue() EXCLUDES(run_queue_mutex_); + + private: + std::mutex run_queue_mutex_; + std::deque> run_queue_ GUARDED_BY(run_queue_mutex_); }; struct fdevent { diff --git a/adb/fdevent/fdevent_poll.cpp b/adb/fdevent/fdevent_poll.cpp index 6e016f6cc..7615859dc 100644 --- a/adb/fdevent/fdevent_poll.cpp +++ b/adb/fdevent/fdevent_poll.cpp @@ -50,6 +50,35 @@ #include "fdevent.h" #include "sysdeps/chrono.h" +static void fdevent_interrupt(int fd, unsigned, void*) { + char buf[BUFSIZ]; + ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf))); + if (rc == -1) { + PLOG(FATAL) << "failed to read from fdevent interrupt fd"; + } +} + +fdevent_context_poll::fdevent_context_poll() { + int s[2]; + if (adb_socketpair(s) != 0) { + PLOG(FATAL) << "failed to create fdevent interrupt socketpair"; + } + + if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { + PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking"; + } + + this->interrupt_fd_.reset(s[0]); + fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr); + CHECK(fde != nullptr); + this->Add(fde, FDE_READ); +} + +fdevent_context_poll::~fdevent_context_poll() { + main_thread_valid_ = false; + this->Destroy(this->interrupt_fde_); +} + void fdevent_context_poll::CheckMainThread() { if (main_thread_valid_) { CHECK_EQ(main_thread_id_, android::base::GetThreadId()); @@ -291,79 +320,6 @@ static void fdevent_call_fdfunc(fdevent* fde) { fde->func); } -static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) { - // We need to be careful around reentrancy here, since a function we call can queue up another - // function. - while (true) { - std::function fn; - { - std::lock_guard lock(ctx->run_queue_mutex_); - if (ctx->run_queue_.empty()) { - break; - } - fn = ctx->run_queue_.front(); - ctx->run_queue_.pop_front(); - } - fn(); - } -} - -static void fdevent_run_func(int fd, unsigned ev, void* data) { - CHECK_GE(fd, 0); - CHECK(ev & FDE_READ); - - bool* run_needs_flush = static_cast(data); - char buf[1024]; - - // Empty the fd. - if (adb_read(fd, buf, sizeof(buf)) == -1) { - PLOG(FATAL) << "failed to empty run queue notify fd"; - } - - // Mark that we need to flush, and then run it at the end of fdevent_loop. - *run_needs_flush = true; -} - -static void fdevent_run_setup(fdevent_context_poll* ctx) { - { - std::lock_guard lock(ctx->run_queue_mutex_); - CHECK(ctx->run_queue_notify_fd_.get() == -1); - int s[2]; - if (adb_socketpair(s) != 0) { - PLOG(FATAL) << "failed to create run queue notify socketpair"; - } - - if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { - PLOG(FATAL) << "failed to make run queue notify socket nonblocking"; - } - - ctx->run_queue_notify_fd_.reset(s[0]); - fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_); - CHECK(fde != nullptr); - ctx->Add(fde, FDE_READ); - } - - fdevent_run_flush(ctx); -} - -void fdevent_context_poll::Run(std::function fn) { - std::lock_guard lock(run_queue_mutex_); - run_queue_.push_back(std::move(fn)); - - // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. - // In that case, rely on the setup code to flush the queue without a notification being needed. - if (run_queue_notify_fd_ != -1) { - int rc = adb_write(run_queue_notify_fd_.get(), "", 1); - - // It's possible that we get EAGAIN here, if lots of notifications came in while handling. - if (rc == 0) { - PLOG(FATAL) << "run queue notify fd was closed?"; - } else if (rc == -1 && errno != EAGAIN) { - PLOG(FATAL) << "failed to write to run queue notify fd"; - } - } -} - static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { // Check to see if we're spinning because we forgot about an fdevent // by keeping track of how long fdevents have been continuously pending. @@ -424,7 +380,6 @@ static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { void fdevent_context_poll::Loop() { this->main_thread_id_ = android::base::GetThreadId(); this->main_thread_valid_ = true; - fdevent_run_setup(this); uint64_t cycle = 0; while (true) { @@ -444,17 +399,27 @@ void fdevent_context_poll::Loop() { fdevent_call_fdfunc(fde); } - if (run_needs_flush_) { - fdevent_run_flush(this); - run_needs_flush_ = false; - } + this->FlushRunQueue(); } } void fdevent_context_poll::TerminateLoop() { terminate_loop_ = true; + Interrupt(); } size_t fdevent_context_poll::InstalledCount() { - return poll_node_map_.size(); + // We always have an installed fde for interrupt. + return poll_node_map_.size() - 1; +} + +void fdevent_context_poll::Interrupt() { + int rc = adb_write(this->interrupt_fd_, "", 1); + + // It's possible that we get EAGAIN here, if lots of notifications came in while handling. + if (rc == 0) { + PLOG(FATAL) << "fdevent interrupt fd was closed?"; + } else if (rc == -1 && errno != EAGAIN) { + PLOG(FATAL) << "failed to write to fdevent interrupt fd"; + } } diff --git a/adb/fdevent/fdevent_poll.h b/adb/fdevent/fdevent_poll.h index f5720ca3a..1b505a7e0 100644 --- a/adb/fdevent/fdevent_poll.h +++ b/adb/fdevent/fdevent_poll.h @@ -25,6 +25,7 @@ #include +#include "adb_unique_fd.h" #include "fdevent.h" struct PollNode { @@ -44,7 +45,8 @@ struct PollNode { }; struct fdevent_context_poll : public fdevent_context { - virtual ~fdevent_context_poll() = default; + fdevent_context_poll(); + virtual ~fdevent_context_poll(); virtual fdevent* Create(unique_fd fd, std::variant func, void* arg) final; virtual unique_fd Destroy(fdevent* fde) final; @@ -58,11 +60,13 @@ struct fdevent_context_poll : public fdevent_context { virtual void CheckMainThread() final; - virtual void Run(std::function fn) final; - virtual void TerminateLoop() final; virtual size_t InstalledCount() final; + protected: + virtual void Interrupt() final; + + public: // All operations to fdevent should happen only in the main thread. // That's why we don't need a lock for fdevent. std::unordered_map poll_node_map_; @@ -71,10 +75,7 @@ struct fdevent_context_poll : public fdevent_context { uint64_t main_thread_id_ = 0; uint64_t fdevent_id_ = 0; - bool run_needs_flush_ = false; - unique_fd run_queue_notify_fd_; - std::mutex run_queue_mutex_; - std::deque> run_queue_ GUARDED_BY(run_queue_mutex_); - + unique_fd interrupt_fd_; + fdevent* interrupt_fde_ = nullptr; std::atomic terminate_loop_ = false; }; diff --git a/adb/fdevent/fdevent_test.h b/adb/fdevent/fdevent_test.h index 24bce59a7..2139d0f66 100644 --- a/adb/fdevent/fdevent_test.h +++ b/adb/fdevent/fdevent_test.h @@ -78,8 +78,8 @@ class FdeventTest : public ::testing::Test { } size_t GetAdditionalLocalSocketCount() { - // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket - return 2; + // dummy socket installed in PrepareThread() + return 1; } void TerminateThread() {