diff --git a/adb/Android.bp b/adb/Android.bp index d7eb870e9..00e98feeb 100644 --- a/adb/Android.bp +++ b/adb/Android.bp @@ -25,6 +25,7 @@ cc_defaults { "-Wvla", ], rtti: true, + cpp_std: "gnu++17", use_version_lib: true, @@ -313,6 +314,7 @@ cc_library_static { srcs: libadb_srcs + libadb_posix_srcs + [ "daemon/auth.cpp", "daemon/jdwp_service.cpp", + "daemon/usb.cpp", "daemon/usb_ffs.cpp", "daemon/usb_legacy.cpp", ], diff --git a/adb/daemon/usb.cpp b/adb/daemon/usb.cpp new file mode 100644 index 000000000..f603d1359 --- /dev/null +++ b/adb/daemon/usb.cpp @@ -0,0 +1,585 @@ +/* + * Copyright (C) 2018 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define TRACE_TAG USB + +#include "sysdeps.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include + +#include "adb_unique_fd.h" +#include "adb_utils.h" +#include "sysdeps/chrono.h" +#include "transport.h" +#include "types.h" + +using android::base::StringPrintf; + +static constexpr size_t kUsbReadQueueDepth = 16; +static constexpr size_t kUsbReadSize = 16384; + +static constexpr size_t kUsbWriteQueueDepth = 16; + +static const char* to_string(enum usb_functionfs_event_type type) { + switch (type) { + case FUNCTIONFS_BIND: + return "FUNCTIONFS_BIND"; + case FUNCTIONFS_UNBIND: + return "FUNCTIONFS_UNBIND"; + case FUNCTIONFS_ENABLE: + return "FUNCTIONFS_ENABLE"; + case FUNCTIONFS_DISABLE: + return "FUNCTIONFS_DISABLE"; + case FUNCTIONFS_SETUP: + return "FUNCTIONFS_SETUP"; + case FUNCTIONFS_SUSPEND: + return "FUNCTIONFS_SUSPEND"; + case FUNCTIONFS_RESUME: + return "FUNCTIONFS_RESUME"; + } +} + +enum class TransferDirection : uint64_t { + READ = 0, + WRITE = 1, +}; + +struct TransferId { + TransferDirection direction : 1; + uint64_t id : 63; + + TransferId() : TransferId(TransferDirection::READ, 0) {} + + private: + TransferId(TransferDirection direction, uint64_t id) : direction(direction), id(id) {} + + public: + explicit operator uint64_t() const { + uint64_t result; + static_assert(sizeof(*this) == sizeof(result)); + memcpy(&result, this, sizeof(*this)); + return result; + } + + static TransferId read(uint64_t id) { return TransferId(TransferDirection::READ, id); } + static TransferId write(uint64_t id) { return TransferId(TransferDirection::WRITE, id); } + + static TransferId from_value(uint64_t value) { + TransferId result; + memcpy(&result, &value, sizeof(value)); + return result; + } +}; + +struct IoBlock { + bool pending; + struct iocb control; + Block payload; + + TransferId id() const { return TransferId::from_value(control.aio_data); } +}; + +struct ScopedAioContext { + ScopedAioContext() = default; + ~ScopedAioContext() { reset(); } + + ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); } + ScopedAioContext(const ScopedAioContext& copy) = delete; + + ScopedAioContext& operator=(ScopedAioContext&& move) { + reset(move.release()); + return *this; + } + ScopedAioContext& operator=(const ScopedAioContext& copy) = delete; + + static ScopedAioContext Create(size_t max_events) { + aio_context_t ctx = 0; + if (io_setup(max_events, &ctx) != 0) { + PLOG(FATAL) << "failed to create aio_context_t"; + } + ScopedAioContext result; + result.reset(ctx); + return result; + } + + aio_context_t release() { + aio_context_t result = context_; + context_ = 0; + return result; + } + + void reset(aio_context_t new_context = 0) { + if (context_ != 0) { + io_destroy(context_); + } + + context_ = new_context; + } + + aio_context_t get() { return context_; } + + private: + aio_context_t context_ = 0; +}; + +struct UsbFfsConnection : public Connection { + UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write, + std::promise destruction_notifier) + : stopped_(false), + destruction_notifier_(std::move(destruction_notifier)), + control_fd_(std::move(control)), + read_fd_(std::move(read)), + write_fd_(std::move(write)) { + LOG(INFO) << "UsbFfsConnection constructed"; + event_fd_.reset(eventfd(0, EFD_CLOEXEC)); + if (event_fd_ == -1) { + PLOG(FATAL) << "failed to create eventfd"; + } + + aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth); + } + + ~UsbFfsConnection() { + LOG(INFO) << "UsbFfsConnection being destroyed"; + Stop(); + monitor_thread_.join(); + destruction_notifier_.set_value(); + } + + virtual bool Write(std::unique_ptr packet) override final { + LOG(DEBUG) << "USB write: " << dump_header(&packet->msg); + Block header(sizeof(packet->msg)); + memcpy(header.data(), &packet->msg, sizeof(packet->msg)); + + std::lock_guard lock(write_mutex_); + write_requests_.push_back(CreateWriteBlock(std::move(header), next_write_id_++)); + if (!packet->payload.empty()) { + write_requests_.push_back( + CreateWriteBlock(std::move(packet->payload), next_write_id_++)); + } + SubmitWrites(); + return true; + } + + virtual void Start() override final { StartMonitor(); } + + virtual void Stop() override final { + if (stopped_.exchange(true)) { + return; + } + stopped_ = true; + uint64_t notify = 1; + ssize_t rc = adb_write(event_fd_.get(), ¬ify, sizeof(notify)); + if (rc < 0) { + PLOG(FATAL) << "failed to notify eventfd to stop UsbFfsConnection"; + } + CHECK_EQ(static_cast(rc), sizeof(notify)); + } + + private: + void StartMonitor() { + // This is a bit of a mess. + // It's possible for io_submit to end up blocking, if we call it as the endpoint + // becomes disabled. Work around this by having a monitor thread to listen for functionfs + // lifecycle events. If we notice an error condition (either we've become disabled, or we + // were never enabled in the first place), we send interruption signals to the worker thread + // until it dies, and then report failure to the transport via HandleError, which will + // eventually result in the transport being destroyed, which will result in UsbFfsConnection + // being destroyed, which unblocks the open thread and restarts this entire process. + static constexpr int kInterruptionSignal = SIGUSR1; + static std::once_flag handler_once; + std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); }); + + monitor_thread_ = std::thread([this]() { + adb_thread_setname("UsbFfs-monitor"); + + bool bound = false; + bool started = false; + bool running = true; + while (running) { + if (!bound || !started) { + adb_pollfd pfd = {.fd = control_fd_.get(), .events = POLLIN, .revents = 0}; + int rc = TEMP_FAILURE_RETRY(adb_poll(&pfd, 1, 5000 /*ms*/)); + if (rc == -1) { + PLOG(FATAL) << "poll on USB control fd failed"; + } else if (rc == 0) { + // Something in the kernel presumably went wrong. + // Close our endpoints, wait for a bit, and then try again. + aio_context_.reset(); + read_fd_.reset(); + write_fd_.reset(); + control_fd_.reset(); + std::this_thread::sleep_for(5s); + HandleError("didn't receive FUNCTIONFS_ENABLE, retrying"); + return; + } + } + + struct usb_functionfs_event event; + if (TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event))) != + sizeof(event)) { + PLOG(FATAL) << "failed to read functionfs event"; + } + + LOG(INFO) << "USB event: " + << to_string(static_cast(event.type)); + + switch (event.type) { + case FUNCTIONFS_BIND: + CHECK(!started) << "received FUNCTIONFS_ENABLE while already bound?"; + bound = true; + break; + + case FUNCTIONFS_ENABLE: + CHECK(!started) << "received FUNCTIONFS_ENABLE while already running?"; + started = true; + StartWorker(); + break; + + case FUNCTIONFS_DISABLE: + running = false; + break; + } + } + + pthread_t worker_thread_handle = worker_thread_.native_handle(); + while (true) { + int rc = pthread_kill(worker_thread_handle, kInterruptionSignal); + if (rc != 0) { + LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc); + break; + } + + std::this_thread::sleep_for(100ms); + + rc = pthread_kill(worker_thread_handle, 0); + if (rc == 0) { + continue; + } else if (rc == ESRCH) { + break; + } else { + LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc); + } + } + + worker_thread_.join(); + + aio_context_.reset(); + read_fd_.reset(); + write_fd_.reset(); + }); + } + + void StartWorker() { + worker_thread_ = std::thread([this]() { + adb_thread_setname("UsbFfs-worker"); + for (size_t i = 0; i < kUsbReadQueueDepth; ++i) { + read_requests_[i] = CreateReadBlock(next_read_id_++); + SubmitRead(&read_requests_[i]); + } + + while (!stopped_) { + uint64_t dummy; + ssize_t rc = adb_read(event_fd_.get(), &dummy, sizeof(dummy)); + if (rc == -1) { + PLOG(FATAL) << "failed to read from eventfd"; + } else if (rc == 0) { + LOG(FATAL) << "hit EOF on eventfd"; + } + + WaitForEvents(); + } + }); + } + + void PrepareReadBlock(IoBlock* block, uint64_t id) { + block->pending = false; + block->payload.resize(kUsbReadSize); + block->control.aio_data = static_cast(TransferId::read(id)); + block->control.aio_buf = reinterpret_cast(block->payload.data()); + block->control.aio_nbytes = block->payload.size(); + } + + IoBlock CreateReadBlock(uint64_t id) { + IoBlock block; + PrepareReadBlock(&block, id); + block.control.aio_rw_flags = 0; + block.control.aio_lio_opcode = IOCB_CMD_PREAD; + block.control.aio_reqprio = 0; + block.control.aio_fildes = read_fd_.get(); + block.control.aio_offset = 0; + block.control.aio_flags = IOCB_FLAG_RESFD; + block.control.aio_resfd = event_fd_.get(); + return block; + } + + void WaitForEvents() { + static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth; + struct io_event events[kMaxEvents]; + struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0}; + int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout); + if (rc == -1) { + HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno))); + return; + } + + for (int event_idx = 0; event_idx < rc; ++event_idx) { + auto& event = events[event_idx]; + TransferId id = TransferId::from_value(event.data); + + if (event.res < 0) { + std::string error = + StringPrintf("%s %" PRIu64 " failed with error %s", + id.direction == TransferDirection::READ ? "read" : "write", + id.id, strerror(-event.res)); + HandleError(error); + return; + } + + if (id.direction == TransferDirection::READ) { + HandleRead(id, event.res); + } else { + HandleWrite(id); + } + } + } + + void HandleRead(TransferId id, int64_t size) { + uint64_t read_idx = id.id % kUsbReadQueueDepth; + IoBlock* block = &read_requests_[read_idx]; + block->pending = false; + block->payload.resize(size); + + // Notification for completed reads can be received out of order. + if (block->id().id != needed_read_id_) { + LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for " + << needed_read_id_; + return; + } + + for (uint64_t id = needed_read_id_;; ++id) { + size_t read_idx = id % kUsbReadQueueDepth; + IoBlock* current_block = &read_requests_[read_idx]; + if (current_block->pending) { + break; + } + ProcessRead(current_block); + ++needed_read_id_; + } + } + + void ProcessRead(IoBlock* block) { + if (!block->payload.empty()) { + if (!incoming_header_.has_value()) { + CHECK_EQ(sizeof(amessage), block->payload.size()); + amessage msg; + memcpy(&msg, block->payload.data(), sizeof(amessage)); + LOG(DEBUG) << "USB read:" << dump_header(&msg); + incoming_header_ = msg; + } else { + size_t bytes_left = incoming_header_->data_length - incoming_payload_.size(); + Block payload = std::move(block->payload); + CHECK_LE(payload.size(), bytes_left); + incoming_payload_.append(std::make_unique(std::move(payload))); + } + + if (incoming_header_->data_length == incoming_payload_.size()) { + auto packet = std::make_unique(); + packet->msg = *incoming_header_; + + // TODO: Make apacket contain an IOVector so we don't have to coalesce. + packet->payload = incoming_payload_.coalesce(); + read_callback_(this, std::move(packet)); + + incoming_header_.reset(); + incoming_payload_.clear(); + } + } + + PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth); + SubmitRead(block); + } + + void SubmitRead(IoBlock* block) { + block->pending = true; + struct iocb* iocb = &block->control; + if (io_submit(aio_context_.get(), 1, &iocb) != 1) { + HandleError(StringPrintf("failed to submit read: %s", strerror(errno))); + return; + } + } + + void HandleWrite(TransferId id) { + std::lock_guard lock(write_mutex_); + auto it = + std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) { + return static_cast(req->id()) == static_cast(id); + }); + CHECK(it != write_requests_.end()); + + write_requests_.erase(it); + size_t outstanding_writes = --writes_submitted_; + LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes; + + SubmitWrites(); + } + + std::unique_ptr CreateWriteBlock(Block payload, uint64_t id) { + auto block = std::make_unique(); + block->payload = std::move(payload); + block->control.aio_data = static_cast(TransferId::write(id)); + block->control.aio_rw_flags = 0; + block->control.aio_lio_opcode = IOCB_CMD_PWRITE; + block->control.aio_reqprio = 0; + block->control.aio_fildes = write_fd_.get(); + block->control.aio_buf = reinterpret_cast(block->payload.data()); + block->control.aio_nbytes = block->payload.size(); + block->control.aio_offset = 0; + block->control.aio_flags = IOCB_FLAG_RESFD; + block->control.aio_resfd = event_fd_.get(); + return block; + } + + void SubmitWrites() REQUIRES(write_mutex_) { + if (writes_submitted_ == kUsbWriteQueueDepth) { + return; + } + + ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_, + write_requests_.size() - writes_submitted_); + CHECK_GE(writes_to_submit, 0); + if (writes_to_submit == 0) { + return; + } + + struct iocb* iocbs[kUsbWriteQueueDepth]; + for (int i = 0; i < writes_to_submit; ++i) { + CHECK(!write_requests_[writes_submitted_ + i]->pending); + write_requests_[writes_submitted_ + i]->pending = true; + iocbs[i] = &write_requests_[writes_submitted_ + i]->control; + LOG(VERBOSE) << "submitting write_request " << static_cast(iocbs[i]); + } + + int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs); + if (rc == -1) { + HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno))); + return; + } else if (rc != writes_to_submit) { + LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit + << ", actually submitted " << rc; + } + + writes_submitted_ += rc; + } + + void HandleError(const std::string& error) { + std::call_once(error_flag_, [&]() { + error_callback_(this, error); + if (!stopped_) { + Stop(); + } + }); + } + + std::thread monitor_thread_; + std::thread worker_thread_; + + std::atomic stopped_; + std::promise destruction_notifier_; + std::once_flag error_flag_; + + unique_fd event_fd_; + + ScopedAioContext aio_context_; + unique_fd control_fd_; + unique_fd read_fd_; + unique_fd write_fd_; + + std::optional incoming_header_; + IOVector incoming_payload_; + + std::array read_requests_; + IOVector read_data_; + + // ID of the next request that we're going to send out. + size_t next_read_id_ = 0; + + // ID of the next packet we're waiting for. + size_t needed_read_id_ = 0; + + std::mutex write_mutex_; + std::deque> write_requests_ GUARDED_BY(write_mutex_); + size_t next_write_id_ GUARDED_BY(write_mutex_) = 0; + size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0; +}; + +static void usb_ffs_open_thread() { + adb_thread_setname("usb ffs open"); + + while (true) { + unique_fd control; + unique_fd bulk_out; + unique_fd bulk_in; + if (!open_functionfs(&control, &bulk_out, &bulk_in)) { + std::this_thread::sleep_for(1s); + continue; + } + + atransport* transport = new atransport(); + transport->serial = "UsbFfs"; + std::promise destruction_notifier; + std::future future = destruction_notifier.get_future(); + transport->SetConnection(std::make_unique( + std::move(control), std::move(bulk_out), std::move(bulk_in), + std::move(destruction_notifier))); + register_transport(transport); + future.wait(); + } +} + +void usb_init_legacy(); +void usb_init() { + if (!android::base::GetBoolProperty("persist.adb.nonblocking_ffs", false)) { + usb_init_legacy(); + } else { + std::thread(usb_ffs_open_thread).detach(); + } +} diff --git a/adb/daemon/usb_legacy.cpp b/adb/daemon/usb_legacy.cpp index fcd26524e..7ace59d3e 100644 --- a/adb/daemon/usb_legacy.cpp +++ b/adb/daemon/usb_legacy.cpp @@ -94,8 +94,8 @@ static bool init_functionfs(struct usb_handle* h) { return true; } -static void usb_ffs_open_thread(usb_handle* usb) { - adb_thread_setname("usb ffs open"); +static void usb_legacy_ffs_open_thread(usb_handle* usb) { + adb_thread_setname("usb legacy ffs open"); while (true) { // wait until the USB device needs opening @@ -285,12 +285,12 @@ usb_handle* create_usb_handle(unsigned num_bufs, unsigned io_size) { return h; } -void usb_init() { - D("[ usb_init - using FunctionFS ]"); +void usb_init_legacy() { + D("[ usb_init - using legacy FunctionFS ]"); dummy_fd.reset(adb_open("/dev/null", O_WRONLY | O_CLOEXEC)); CHECK_NE(-1, dummy_fd.get()); - std::thread(usb_ffs_open_thread, create_usb_handle(USB_FFS_NUM_BUFS, USB_FFS_BULK_SIZE)) + std::thread(usb_legacy_ffs_open_thread, create_usb_handle(USB_FFS_NUM_BUFS, USB_FFS_BULK_SIZE)) .detach(); } diff --git a/adb/transport.cpp b/adb/transport.cpp index d41f9c8bf..051ee2f78 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -52,7 +52,6 @@ #include "fdevent.h" #include "sysdeps/chrono.h" -static void register_transport(atransport* transport); static void remove_transport(atransport* transport); static void transport_unref(atransport* transport); @@ -671,7 +670,7 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { return true; }); t->connection()->SetErrorCallback([t](Connection*, const std::string& error) { - D("%s: connection terminated: %s", t->serial.c_str(), error.c_str()); + LOG(INFO) << t->serial_name() << ": connection terminated: " << error; fdevent_run_on_main_thread([t]() { handle_offline(t); transport_unref(t); @@ -730,7 +729,7 @@ void kick_all_transports() { } /* the fdevent select pump is single threaded */ -static void register_transport(atransport* transport) { +void register_transport(atransport* transport) { tmsg m; m.transport = transport; m.action = 1; @@ -758,6 +757,7 @@ static void transport_unref(atransport* t) { CHECK_GT(t->ref_count, 0u); t->ref_count--; if (t->ref_count == 0) { + LOG(INFO) << "destroying transport " << t->serial_name(); t->connection()->Stop(); #if ADB_HOST if (t->IsTcpDevice() && !t->kicked()) { @@ -1293,6 +1293,7 @@ void register_usb_transport(usb_handle* usb, const char* serial, const char* dev register_transport(t); } +#if ADB_HOST // This should only be used for transports with connection_state == kCsNoPerm. void unregister_usb_transport(usb_handle* usb) { std::lock_guard lock(transport_lock); @@ -1304,6 +1305,7 @@ void unregister_usb_transport(usb_handle* usb) { return false; }); } +#endif bool check_header(apacket* p, atransport* t) { if (p->msg.magic != (p->msg.command ^ 0xffffffff)) { diff --git a/adb/transport.h b/adb/transport.h index f854ce5c2..1350e63f6 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -362,6 +362,7 @@ atransport* find_transport(const char* serial); void kick_all_tcp_devices(); void kick_all_transports(); +void register_transport(atransport* transport); void register_usb_transport(usb_handle* h, const char* serial, const char* devpath, unsigned writeable);