From 6082e7dafb335b558f7347da24bb849ae32d8406 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Thu, 5 Apr 2018 16:16:04 -0700 Subject: [PATCH] adb: add nonblocking fd Connection. Implement a nonblocking version of FdConnection. The initial implementation will be somewhat slower than the blocking one for large packet sizes, due to an extra copy when coalescing an IOVector into an apacket, but is still substantially faster for small packets. Test: adb_benchmark Change-Id: I4900c9ddf685d3bd557b8cb43958452ecb23db53 --- adb/Android.bp | 1 + adb/transport.h | 2 + adb/transport_benchmark.cpp | 21 +++- adb/transport_fd.cpp | 239 ++++++++++++++++++++++++++++++++++++ 4 files changed, 258 insertions(+), 5 deletions(-) create mode 100644 adb/transport_fd.cpp diff --git a/adb/Android.bp b/adb/Android.bp index 553473f19..3c2eb0ab1 100644 --- a/adb/Android.bp +++ b/adb/Android.bp @@ -104,6 +104,7 @@ libadb_srcs = [ "socket_spec.cpp", "sysdeps/errno.cpp", "transport.cpp", + "transport_fd.cpp", "transport_local.cpp", "transport_usb.cpp", ] diff --git a/adb/transport.h b/adb/transport.h index ae9cc023c..cb2061510 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -92,6 +92,8 @@ struct Connection { std::string transport_name_; ReadCallback read_callback_; ErrorCallback error_callback_; + + static std::unique_ptr FromFd(unique_fd fd); }; // Abstraction for a blocking packet transport. diff --git a/adb/transport_benchmark.cpp b/adb/transport_benchmark.cpp index da24aa7a7..022808f29 100644 --- a/adb/transport_benchmark.cpp +++ b/adb/transport_benchmark.cpp @@ -24,13 +24,19 @@ #include "sysdeps.h" #include "transport.h" -#define ADB_CONNECTION_BENCHMARK(benchmark_name, ...) \ - BENCHMARK_TEMPLATE(benchmark_name, FdConnection, ##__VA_ARGS__) \ - ->Arg(1) \ - ->Arg(16384) \ - ->Arg(MAX_PAYLOAD) \ +#define ADB_CONNECTION_BENCHMARK(benchmark_name, ...) \ + BENCHMARK_TEMPLATE(benchmark_name, FdConnection, ##__VA_ARGS__) \ + ->Arg(1) \ + ->Arg(16384) \ + ->Arg(MAX_PAYLOAD) \ + ->UseRealTime(); \ + BENCHMARK_TEMPLATE(benchmark_name, NonblockingFdConnection, ##__VA_ARGS__) \ + ->Arg(1) \ + ->Arg(16384) \ + ->Arg(MAX_PAYLOAD) \ ->UseRealTime() +struct NonblockingFdConnection; template std::unique_ptr MakeConnection(unique_fd fd); @@ -40,6 +46,11 @@ std::unique_ptr MakeConnection(unique_fd fd) { return std::make_unique(std::move(fd_connection)); } +template <> +std::unique_ptr MakeConnection(unique_fd fd) { + return Connection::FromFd(std::move(fd)); +} + template void BM_Connection_Unidirectional(benchmark::State& state) { int fds[2]; diff --git a/adb/transport_fd.cpp b/adb/transport_fd.cpp new file mode 100644 index 000000000..85f3c5299 --- /dev/null +++ b/adb/transport_fd.cpp @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include "adb_unique_fd.h" +#include "adb_utils.h" +#include "sysdeps.h" +#include "sysdeps/memory.h" +#include "transport.h" +#include "types.h" + +static void CreateWakeFds(unique_fd* read, unique_fd* write) { + // TODO: eventfd on linux? + int wake_fds[2]; + int rc = adb_socketpair(wake_fds); + set_file_block_mode(wake_fds[0], false); + set_file_block_mode(wake_fds[1], false); + CHECK_EQ(0, rc); + *read = unique_fd(wake_fds[0]); + *write = unique_fd(wake_fds[1]); +} + +struct NonblockingFdConnection : public Connection { + NonblockingFdConnection(unique_fd fd) : started_(false), fd_(std::move(fd)) { + set_file_block_mode(fd_.get(), false); + CreateWakeFds(&wake_fd_read_, &wake_fd_write_); + } + + void SetRunning(bool value) { + std::lock_guard lock(run_mutex_); + running_ = value; + } + + bool IsRunning() { + std::lock_guard lock(run_mutex_); + return running_; + } + + void Run(std::string* error) { + SetRunning(true); + while (IsRunning()) { + adb_pollfd pfds[2] = { + {.fd = fd_.get(), .events = POLLIN}, + {.fd = wake_fd_read_.get(), .events = POLLIN}, + }; + + { + std::lock_guard lock(this->write_mutex_); + if (!writable_) { + pfds[0].events |= POLLOUT; + } + } + + int rc = adb_poll(pfds, 2, -1); + if (rc == -1) { + *error = android::base::StringPrintf("poll failed: %s", strerror(errno)); + return; + } else if (rc == 0) { + LOG(FATAL) << "poll timed out with an infinite timeout?"; + } + + if (pfds[0].revents) { + if ((pfds[0].revents & POLLOUT)) { + std::lock_guard lock(this->write_mutex_); + WriteResult result = DispatchWrites(); + switch (result) { + case WriteResult::Error: + *error = "write failed"; + return; + + case WriteResult::Completed: + writable_ = true; + break; + + case WriteResult::TryAgain: + break; + } + } + + if (pfds[0].revents & POLLIN) { + // TODO: Should we be getting blocks from a free list? + auto block = std::make_unique(MAX_PAYLOAD); + rc = adb_read(fd_.get(), &(*block)[0], block->size()); + if (rc == -1) { + *error = std::string("read failed: ") + strerror(errno); + return; + } else if (rc == 0) { + *error = "read failed: EOF"; + return; + } + block->resize(rc); + read_buffer_.append(std::move(block)); + + if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) { + auto header_buf = read_buffer_.take_front(sizeof(amessage)).coalesce(); + CHECK_EQ(sizeof(amessage), header_buf.size()); + read_header_ = std::make_unique(); + memcpy(read_header_.get(), header_buf.data(), sizeof(amessage)); + } + + if (read_header_ && read_buffer_.size() >= read_header_->data_length) { + auto data_chain = read_buffer_.take_front(read_header_->data_length); + + // TODO: Make apacket carry around a IOVector instead of coalescing. + auto payload = data_chain.coalesce(); + auto packet = std::make_unique(); + packet->msg = *read_header_; + packet->payload = std::move(payload); + read_header_ = nullptr; + read_callback_(this, std::move(packet)); + } + } + } + + if (pfds[1].revents) { + uint64_t buf; + rc = adb_read(wake_fd_read_.get(), &buf, sizeof(buf)); + CHECK_EQ(static_cast(sizeof(buf)), rc); + + // We were woken up either to add POLLOUT to our events, or to exit. + // Do nothing. + } + } + } + + void Start() override final { + if (started_.exchange(true)) { + LOG(FATAL) << "Connection started multiple times?"; + } + + thread_ = std::thread([this]() { + std::string error = "connection closed"; + Run(&error); + this->error_callback_(this, error); + }); + } + + void Stop() override final { + SetRunning(false); + WakeThread(); + thread_.join(); + } + + void WakeThread() { + uint64_t buf = 0; + if (TEMP_FAILURE_RETRY(adb_write(wake_fd_write_.get(), &buf, sizeof(buf))) != sizeof(buf)) { + LOG(FATAL) << "failed to wake up thread"; + } + } + + enum class WriteResult { + Error, + Completed, + TryAgain, + }; + + WriteResult DispatchWrites() REQUIRES(write_mutex_) { + CHECK(!write_buffer_.empty()); + if (!writable_) { + return WriteResult::TryAgain; + } + + auto iovs = write_buffer_.iovecs(); + ssize_t rc = adb_writev(fd_.get(), iovs.data(), iovs.size()); + if (rc == -1) { + return WriteResult::Error; + } else if (rc == 0) { + errno = 0; + return WriteResult::Error; + } + + // TODO: Implement a more efficient drop_front? + write_buffer_.take_front(rc); + if (write_buffer_.empty()) { + return WriteResult::Completed; + } + + // There's data left in the range, which means our write returned early. + return WriteResult::TryAgain; + } + + bool Write(std::unique_ptr packet) final { + std::lock_guard lock(write_mutex_); + const char* header_begin = reinterpret_cast(&packet->msg); + const char* header_end = header_begin + sizeof(packet->msg); + auto header_block = std::make_unique(header_begin, header_end); + write_buffer_.append(std::move(header_block)); + if (!packet->payload.empty()) { + write_buffer_.append(std::make_unique(std::move(packet->payload))); + } + return DispatchWrites() != WriteResult::Error; + } + + std::thread thread_; + + std::atomic started_; + std::mutex run_mutex_; + bool running_ GUARDED_BY(run_mutex_); + + std::unique_ptr read_header_; + IOVector read_buffer_; + + unique_fd fd_; + unique_fd wake_fd_read_; + unique_fd wake_fd_write_; + + std::mutex write_mutex_; + bool writable_ GUARDED_BY(write_mutex_) = true; + IOVector write_buffer_ GUARDED_BY(write_mutex_); + + IOVector incoming_queue_; +}; + +std::unique_ptr Connection::FromFd(unique_fd fd) { + return std::make_unique(std::move(fd)); +}