From 1a90118bbb93566eec157a3c356dcf82d15c269f Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Thu, 31 Jan 2019 15:51:52 -0800 Subject: [PATCH 1/2] adb: implement fdevent_set_timeout. Test: adb_test Change-Id: I9ae61465617b6a2fe0a1c161ad88c4feae49ec80 --- adb/fdevent.cpp | 66 ++++++++++++++++++++++++++-- adb/fdevent.h | 21 ++++++--- adb/fdevent_test.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++ adb/sysdeps/chrono.h | 25 ----------- 4 files changed, 177 insertions(+), 35 deletions(-) diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index fa3738d16..32f908636 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -225,14 +226,22 @@ void fdevent_set(fdevent* fde, unsigned events) { void fdevent_add(fdevent* fde, unsigned events) { check_main_thread(); + CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); } void fdevent_del(fdevent* fde, unsigned events) { check_main_thread(); + CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); } +void fdevent_set_timeout(fdevent* fde, std::optional timeout) { + check_main_thread(); + fde->timeout = timeout; + fde->last_active = std::chrono::steady_clock::now(); +} + static std::string dump_pollfds(const std::vector& pollfds) { std::string result; for (const auto& pollfd : pollfds) { @@ -248,6 +257,32 @@ static std::string dump_pollfds(const std::vector& pollfds) { return result; } +static std::optional calculate_timeout() { + std::optional result = std::nullopt; + auto now = std::chrono::steady_clock::now(); + check_main_thread(); + + for (const auto& [fd, pollnode] : g_poll_node_map) { + UNUSED(fd); + auto timeout_opt = pollnode.fde->timeout; + if (timeout_opt) { + auto deadline = pollnode.fde->last_active + *timeout_opt; + auto time_left = std::chrono::duration_cast(deadline - now); + if (time_left < std::chrono::milliseconds::zero()) { + time_left = std::chrono::milliseconds::zero(); + } + + if (!result) { + result = time_left; + } else { + result = std::min(*result, time_left); + } + } + } + + return result; +} + static void fdevent_process() { std::vector pollfds; for (const auto& pair : g_poll_node_map) { @@ -256,11 +291,22 @@ static void fdevent_process() { CHECK_GT(pollfds.size(), 0u); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); - int ret = adb_poll(&pollfds[0], pollfds.size(), -1); + auto timeout = calculate_timeout(); + int timeout_ms; + if (!timeout) { + timeout_ms = -1; + } else { + timeout_ms = timeout->count(); + } + + int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms); if (ret == -1) { PLOG(ERROR) << "poll(), ret = " << ret; return; } + + auto post_poll = std::chrono::steady_clock::now(); + for (const auto& pollfd : pollfds) { if (pollfd.revents != 0) { D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); @@ -282,12 +328,24 @@ static void fdevent_process() { events |= FDE_READ | FDE_ERROR; } #endif + auto it = g_poll_node_map.find(pollfd.fd); + CHECK(it != g_poll_node_map.end()); + fdevent* fde = it->second.fde; + + if (events == 0) { + // Check for timeout. + if (fde->timeout) { + auto deadline = fde->last_active + *fde->timeout; + if (deadline < post_poll) { + events |= FDE_TIMEOUT; + } + } + } + if (events != 0) { - auto it = g_poll_node_map.find(pollfd.fd); - CHECK(it != g_poll_node_map.end()); - fdevent* fde = it->second.fde; CHECK_EQ(fde->fd.get(), pollfd.fd); fde->events |= events; + fde->last_active = post_poll; D("%s got events %x", dump_fde(fde).c_str(), events); fde->state |= FDE_PENDING; g_pending_list.push_back(fde); diff --git a/adb/fdevent.h b/adb/fdevent.h index 70e0a96e0..42dbb9e70 100644 --- a/adb/fdevent.h +++ b/adb/fdevent.h @@ -18,17 +18,20 @@ #define __FDEVENT_H #include -#include /* for int64_t */ +#include +#include #include +#include #include #include "adb_unique_fd.h" -/* events that may be observed */ -#define FDE_READ 0x0001 -#define FDE_WRITE 0x0002 -#define FDE_ERROR 0x0004 +// Events that may be observed +#define FDE_READ 0x0001 +#define FDE_WRITE 0x0002 +#define FDE_ERROR 0x0004 +#define FDE_TIMEOUT 0x0008 typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); @@ -41,6 +44,8 @@ struct fdevent { uint16_t state = 0; uint16_t events = 0; + std::optional timeout; + std::chrono::steady_clock::time_point last_active; std::variant func; void* arg = nullptr; @@ -62,7 +67,11 @@ void fdevent_set(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); -void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); +// Set a timeout on an fdevent. +// If no events are triggered by the timeout, an FDE_TIMEOUT will be generated. +// Note timeouts are not defused automatically; if a timeout is set on an fdevent, it will +// trigger repeatedly every |timeout| ms. +void fdevent_set_timeout(fdevent* fde, std::optional timeout); // Loop forever, handling events. void fdevent_loop(); diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index a9746bbc8..682f06102 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -28,6 +29,8 @@ #include "adb_io.h" #include "fdevent_test.h" +using namespace std::chrono_literals; + class FdHandler { public: FdHandler(int read_fd, int write_fd, bool use_new_callback) @@ -257,3 +260,100 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) { ASSERT_EQ(i, vec[i]); } } + +TEST_F(FdeventTest, timeout) { + fdevent_reset(); + PrepareThread(); + + enum class TimeoutEvent { + read, + timeout, + done, + }; + + struct TimeoutTest { + std::vector> events; + fdevent* fde; + }; + TimeoutTest test; + + int fds[2]; + ASSERT_EQ(0, adb_socketpair(fds)); + static constexpr auto delta = 100ms; + fdevent_run_on_main_thread([&]() { + test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) { + auto test = static_cast(arg); + auto now = std::chrono::steady_clock::now(); + CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT)); + TimeoutEvent event; + if ((events & FDE_READ)) { + char buf[2]; + ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf)); + if (rc == 0) { + event = TimeoutEvent::done; + } else if (rc == 1) { + event = TimeoutEvent::read; + } else { + abort(); + } + } else if ((events & FDE_TIMEOUT)) { + event = TimeoutEvent::timeout; + } else { + abort(); + } + + CHECK_EQ(fde, test->fde); + test->events.emplace_back(event, now); + + if (event == TimeoutEvent::done) { + fdevent_destroy(fde); + } + }, &test); + fdevent_add(test.fde, FDE_READ); + fdevent_set_timeout(test.fde, delta); + }); + + ASSERT_EQ(1, adb_write(fds[1], "", 1)); + + // Timeout should happen here + std::this_thread::sleep_for(delta); + + // and another. + std::this_thread::sleep_for(delta); + + // No timeout should happen here. + std::this_thread::sleep_for(delta / 2); + adb_close(fds[1]); + + TerminateThread(); + + ASSERT_EQ(4ULL, test.events.size()); + ASSERT_EQ(TimeoutEvent::read, test.events[0].first); + ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first); + ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first); + ASSERT_EQ(TimeoutEvent::done, test.events[3].first); + + std::vector time_deltas; + for (size_t i = 0; i < test.events.size() - 1; ++i) { + auto before = test.events[i].second; + auto after = test.events[i + 1].second; + auto diff = std::chrono::duration_cast(after - before); + time_deltas.push_back(diff.count()); + } + + std::vector expected = { + delta.count(), + delta.count(), + delta.count() / 2, + }; + + std::vector diff; + ASSERT_EQ(time_deltas.size(), expected.size()); + for (size_t i = 0; i < time_deltas.size(); ++i) { + diff.push_back(std::abs(time_deltas[i] - expected[i])); + } + + ASSERT_LT(diff[0], delta.count() * 0.5); + ASSERT_LT(diff[1], delta.count() * 0.5); + ASSERT_LT(diff[2], delta.count() * 0.5); +} diff --git a/adb/sysdeps/chrono.h b/adb/sysdeps/chrono.h index c73a638ad..5c5af7cd6 100644 --- a/adb/sysdeps/chrono.h +++ b/adb/sysdeps/chrono.h @@ -18,29 +18,4 @@ #include -#if defined(_WIN32) -// We don't have C++14 on Windows yet. -// Reimplement std::chrono_literals ourselves until we do. - -// Silence the following warning (which gets promoted to an error): -// error: literal operator suffixes not preceded by ‘_’ are reserved for future standardization -#pragma GCC system_header - -constexpr std::chrono::seconds operator"" s(unsigned long long s) { - return std::chrono::seconds(s); -} - -constexpr std::chrono::duration operator"" s(long double s) { - return std::chrono::duration(s); -} - -constexpr std::chrono::milliseconds operator"" ms(unsigned long long ms) { - return std::chrono::milliseconds(ms); -} - -constexpr std::chrono::duration operator"" ms(long double ms) { - return std::chrono::duration(ms); -} -#else using namespace std::chrono_literals; -#endif From 74b7ec72f8a535c6ab63b03316d1de0414f42b63 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Fri, 11 Jan 2019 14:42:08 -0800 Subject: [PATCH 2/2] adb: don't close sockets before hitting EOF. Reimplement commit ffc11d3cf3643c24780f902386be61851531f022 using fdevent. The previous attempt was reverted because we were blindly continuing when revents & POLLIN == 0, which ignored POLLHUP/POLLERR, leading to spinloops when the opposite end of the file descriptor was shutdown when we had no data left to read. This patch reimplements the functionality implemented by that commit using fdevent, which gets us detection of spin loops for free. Bug: http://b/74616284 Test: ./test_device.py Change-Id: I1abd671fef4c29e99dad968aa66bb754ca382578 --- adb/socket_test.cpp | 2 ++ adb/sockets.cpp | 59 ++++++++++++++++++++++++++++++++++++++++++--- adb/test_device.py | 59 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 4 deletions(-) diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index 7908f82e9..5e28f7601 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -221,6 +221,8 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); + std::this_thread::sleep_for(2s); + WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); TerminateThread(); diff --git a/adb/sockets.cpp b/adb/sockets.cpp index f7c39f0a1..420a6d5a3 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -41,6 +42,8 @@ #include "transport.h" #include "types.h" +using namespace std::chrono_literals; + static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); static unsigned local_socket_next_id = 1; @@ -238,16 +241,64 @@ static void local_socket_ready(asocket* s) { fdevent_add(s->fde, FDE_READ); } +struct ClosingSocket { + std::chrono::steady_clock::time_point begin; +}; + +// The standard (RFC 1122 - 4.2.2.13) says that if we call close on a +// socket while we have pending data, a TCP RST should be sent to the +// other end to notify it that we didn't read all of its data. However, +// this can result in data that we've successfully written out to be dropped +// on the other end. To avoid this, instead of immediately closing a +// socket, call shutdown on it instead, and then read from the file +// descriptor until we hit EOF or an error before closing. +static void deferred_close(unique_fd fd) { + // Shutdown the socket in the outgoing direction only, so that + // we don't have the same problem on the opposite end. + adb_shutdown(fd.get(), SHUT_WR); + auto callback = [](fdevent* fde, unsigned event, void* arg) { + auto socket_info = static_cast(arg); + if (event & FDE_READ) { + ssize_t rc; + char buf[BUFSIZ]; + while ((rc = adb_read(fde->fd.get(), buf, sizeof(buf))) > 0) { + continue; + } + + if (rc == -1 && errno == EAGAIN) { + // There's potentially more data to read. + auto duration = std::chrono::steady_clock::now() - socket_info->begin; + if (duration > 1s) { + LOG(WARNING) << "timeout expired while flushing socket, closing"; + } else { + return; + } + } + } else if (event & FDE_TIMEOUT) { + LOG(WARNING) << "timeout expired while flushing socket, closing"; + } + + // Either there was an error, we hit the end of the socket, or our timeout expired. + fdevent_destroy(fde); + delete socket_info; + }; + + ClosingSocket* socket_info = new ClosingSocket{ + .begin = std::chrono::steady_clock::now(), + }; + + fdevent* fde = fdevent_create(fd.release(), callback, socket_info); + fdevent_add(fde, FDE_READ); + fdevent_set_timeout(fde, 1s); +} + // be sure to hold the socket list lock when calling this static void local_socket_destroy(asocket* s) { int exit_on_close = s->exit_on_close; D("LS(%d): destroying fde.fd=%d", s->id, s->fd); - /* IMPORTANT: the remove closes the fd - ** that belongs to this socket - */ - fdevent_destroy(s->fde); + deferred_close(fdevent_release(s->fde)); remove_socket(s); delete s; diff --git a/adb/test_device.py b/adb/test_device.py index 34f8fd9fa..f95a5b3cd 100755 --- a/adb/test_device.py +++ b/adb/test_device.py @@ -35,6 +35,8 @@ import threading import time import unittest +from datetime import datetime + import adb def requires_root(func): @@ -1335,6 +1337,63 @@ class DeviceOfflineTest(DeviceTest): self.device.forward_remove("tcp:{}".format(local_port)) +class SocketTest(DeviceTest): + def test_socket_flush(self): + """Test that we handle socket closure properly. + + If we're done writing to a socket, closing before the other end has + closed will send a TCP_RST if we have incoming data queued up, which + may result in data that we've written being discarded. + + Bug: http://b/74616284 + """ + s = socket.create_connection(("localhost", 5037)) + + def adb_length_prefixed(string): + encoded = string.encode("utf8") + result = b"%04x%s" % (len(encoded), encoded) + return result + + if "ANDROID_SERIAL" in os.environ: + transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] + else: + transport_string = "host:transport-any" + + s.sendall(adb_length_prefixed(transport_string)) + response = s.recv(4) + self.assertEquals(b"OKAY", response) + + shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" + s.sendall(adb_length_prefixed(shell_string)) + + response = s.recv(4) + self.assertEquals(b"OKAY", response) + + # Spawn a thread that dumps garbage into the socket until failure. + def spam(): + buf = b"\0" * 16384 + try: + while True: + s.sendall(buf) + except Exception as ex: + print(ex) + + thread = threading.Thread(target=spam) + thread.start() + + time.sleep(1) + + received = b"" + while True: + read = s.recv(512) + if len(read) == 0: + break + received += read + + self.assertEquals(1024 * 1024 + len("foo\n"), len(received)) + thread.join() + + if sys.platform == "win32": # From https://stackoverflow.com/a/38749458 import os