diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index fa3738d16..32f908636 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -225,14 +226,22 @@ void fdevent_set(fdevent* fde, unsigned events) { void fdevent_add(fdevent* fde, unsigned events) { check_main_thread(); + CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); } void fdevent_del(fdevent* fde, unsigned events) { check_main_thread(); + CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); } +void fdevent_set_timeout(fdevent* fde, std::optional timeout) { + check_main_thread(); + fde->timeout = timeout; + fde->last_active = std::chrono::steady_clock::now(); +} + static std::string dump_pollfds(const std::vector& pollfds) { std::string result; for (const auto& pollfd : pollfds) { @@ -248,6 +257,32 @@ static std::string dump_pollfds(const std::vector& pollfds) { return result; } +static std::optional calculate_timeout() { + std::optional result = std::nullopt; + auto now = std::chrono::steady_clock::now(); + check_main_thread(); + + for (const auto& [fd, pollnode] : g_poll_node_map) { + UNUSED(fd); + auto timeout_opt = pollnode.fde->timeout; + if (timeout_opt) { + auto deadline = pollnode.fde->last_active + *timeout_opt; + auto time_left = std::chrono::duration_cast(deadline - now); + if (time_left < std::chrono::milliseconds::zero()) { + time_left = std::chrono::milliseconds::zero(); + } + + if (!result) { + result = time_left; + } else { + result = std::min(*result, time_left); + } + } + } + + return result; +} + static void fdevent_process() { std::vector pollfds; for (const auto& pair : g_poll_node_map) { @@ -256,11 +291,22 @@ static void fdevent_process() { CHECK_GT(pollfds.size(), 0u); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); - int ret = adb_poll(&pollfds[0], pollfds.size(), -1); + auto timeout = calculate_timeout(); + int timeout_ms; + if (!timeout) { + timeout_ms = -1; + } else { + timeout_ms = timeout->count(); + } + + int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms); if (ret == -1) { PLOG(ERROR) << "poll(), ret = " << ret; return; } + + auto post_poll = std::chrono::steady_clock::now(); + for (const auto& pollfd : pollfds) { if (pollfd.revents != 0) { D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); @@ -282,12 +328,24 @@ static void fdevent_process() { events |= FDE_READ | FDE_ERROR; } #endif + auto it = g_poll_node_map.find(pollfd.fd); + CHECK(it != g_poll_node_map.end()); + fdevent* fde = it->second.fde; + + if (events == 0) { + // Check for timeout. + if (fde->timeout) { + auto deadline = fde->last_active + *fde->timeout; + if (deadline < post_poll) { + events |= FDE_TIMEOUT; + } + } + } + if (events != 0) { - auto it = g_poll_node_map.find(pollfd.fd); - CHECK(it != g_poll_node_map.end()); - fdevent* fde = it->second.fde; CHECK_EQ(fde->fd.get(), pollfd.fd); fde->events |= events; + fde->last_active = post_poll; D("%s got events %x", dump_fde(fde).c_str(), events); fde->state |= FDE_PENDING; g_pending_list.push_back(fde); diff --git a/adb/fdevent.h b/adb/fdevent.h index 70e0a96e0..42dbb9e70 100644 --- a/adb/fdevent.h +++ b/adb/fdevent.h @@ -18,17 +18,20 @@ #define __FDEVENT_H #include -#include /* for int64_t */ +#include +#include #include +#include #include #include "adb_unique_fd.h" -/* events that may be observed */ -#define FDE_READ 0x0001 -#define FDE_WRITE 0x0002 -#define FDE_ERROR 0x0004 +// Events that may be observed +#define FDE_READ 0x0001 +#define FDE_WRITE 0x0002 +#define FDE_ERROR 0x0004 +#define FDE_TIMEOUT 0x0008 typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); @@ -41,6 +44,8 @@ struct fdevent { uint16_t state = 0; uint16_t events = 0; + std::optional timeout; + std::chrono::steady_clock::time_point last_active; std::variant func; void* arg = nullptr; @@ -62,7 +67,11 @@ void fdevent_set(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); -void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); +// Set a timeout on an fdevent. +// If no events are triggered by the timeout, an FDE_TIMEOUT will be generated. +// Note timeouts are not defused automatically; if a timeout is set on an fdevent, it will +// trigger repeatedly every |timeout| ms. +void fdevent_set_timeout(fdevent* fde, std::optional timeout); // Loop forever, handling events. void fdevent_loop(); diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index a9746bbc8..682f06102 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -28,6 +29,8 @@ #include "adb_io.h" #include "fdevent_test.h" +using namespace std::chrono_literals; + class FdHandler { public: FdHandler(int read_fd, int write_fd, bool use_new_callback) @@ -257,3 +260,100 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) { ASSERT_EQ(i, vec[i]); } } + +TEST_F(FdeventTest, timeout) { + fdevent_reset(); + PrepareThread(); + + enum class TimeoutEvent { + read, + timeout, + done, + }; + + struct TimeoutTest { + std::vector> events; + fdevent* fde; + }; + TimeoutTest test; + + int fds[2]; + ASSERT_EQ(0, adb_socketpair(fds)); + static constexpr auto delta = 100ms; + fdevent_run_on_main_thread([&]() { + test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) { + auto test = static_cast(arg); + auto now = std::chrono::steady_clock::now(); + CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT)); + TimeoutEvent event; + if ((events & FDE_READ)) { + char buf[2]; + ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf)); + if (rc == 0) { + event = TimeoutEvent::done; + } else if (rc == 1) { + event = TimeoutEvent::read; + } else { + abort(); + } + } else if ((events & FDE_TIMEOUT)) { + event = TimeoutEvent::timeout; + } else { + abort(); + } + + CHECK_EQ(fde, test->fde); + test->events.emplace_back(event, now); + + if (event == TimeoutEvent::done) { + fdevent_destroy(fde); + } + }, &test); + fdevent_add(test.fde, FDE_READ); + fdevent_set_timeout(test.fde, delta); + }); + + ASSERT_EQ(1, adb_write(fds[1], "", 1)); + + // Timeout should happen here + std::this_thread::sleep_for(delta); + + // and another. + std::this_thread::sleep_for(delta); + + // No timeout should happen here. + std::this_thread::sleep_for(delta / 2); + adb_close(fds[1]); + + TerminateThread(); + + ASSERT_EQ(4ULL, test.events.size()); + ASSERT_EQ(TimeoutEvent::read, test.events[0].first); + ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first); + ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first); + ASSERT_EQ(TimeoutEvent::done, test.events[3].first); + + std::vector time_deltas; + for (size_t i = 0; i < test.events.size() - 1; ++i) { + auto before = test.events[i].second; + auto after = test.events[i + 1].second; + auto diff = std::chrono::duration_cast(after - before); + time_deltas.push_back(diff.count()); + } + + std::vector expected = { + delta.count(), + delta.count(), + delta.count() / 2, + }; + + std::vector diff; + ASSERT_EQ(time_deltas.size(), expected.size()); + for (size_t i = 0; i < time_deltas.size(); ++i) { + diff.push_back(std::abs(time_deltas[i] - expected[i])); + } + + ASSERT_LT(diff[0], delta.count() * 0.5); + ASSERT_LT(diff[1], delta.count() * 0.5); + ASSERT_LT(diff[2], delta.count() * 0.5); +} diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index 7908f82e9..5e28f7601 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -221,6 +221,8 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); + std::this_thread::sleep_for(2s); + WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); TerminateThread(); diff --git a/adb/sockets.cpp b/adb/sockets.cpp index f7c39f0a1..420a6d5a3 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -41,6 +42,8 @@ #include "transport.h" #include "types.h" +using namespace std::chrono_literals; + static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); static unsigned local_socket_next_id = 1; @@ -238,16 +241,64 @@ static void local_socket_ready(asocket* s) { fdevent_add(s->fde, FDE_READ); } +struct ClosingSocket { + std::chrono::steady_clock::time_point begin; +}; + +// The standard (RFC 1122 - 4.2.2.13) says that if we call close on a +// socket while we have pending data, a TCP RST should be sent to the +// other end to notify it that we didn't read all of its data. However, +// this can result in data that we've successfully written out to be dropped +// on the other end. To avoid this, instead of immediately closing a +// socket, call shutdown on it instead, and then read from the file +// descriptor until we hit EOF or an error before closing. +static void deferred_close(unique_fd fd) { + // Shutdown the socket in the outgoing direction only, so that + // we don't have the same problem on the opposite end. + adb_shutdown(fd.get(), SHUT_WR); + auto callback = [](fdevent* fde, unsigned event, void* arg) { + auto socket_info = static_cast(arg); + if (event & FDE_READ) { + ssize_t rc; + char buf[BUFSIZ]; + while ((rc = adb_read(fde->fd.get(), buf, sizeof(buf))) > 0) { + continue; + } + + if (rc == -1 && errno == EAGAIN) { + // There's potentially more data to read. + auto duration = std::chrono::steady_clock::now() - socket_info->begin; + if (duration > 1s) { + LOG(WARNING) << "timeout expired while flushing socket, closing"; + } else { + return; + } + } + } else if (event & FDE_TIMEOUT) { + LOG(WARNING) << "timeout expired while flushing socket, closing"; + } + + // Either there was an error, we hit the end of the socket, or our timeout expired. + fdevent_destroy(fde); + delete socket_info; + }; + + ClosingSocket* socket_info = new ClosingSocket{ + .begin = std::chrono::steady_clock::now(), + }; + + fdevent* fde = fdevent_create(fd.release(), callback, socket_info); + fdevent_add(fde, FDE_READ); + fdevent_set_timeout(fde, 1s); +} + // be sure to hold the socket list lock when calling this static void local_socket_destroy(asocket* s) { int exit_on_close = s->exit_on_close; D("LS(%d): destroying fde.fd=%d", s->id, s->fd); - /* IMPORTANT: the remove closes the fd - ** that belongs to this socket - */ - fdevent_destroy(s->fde); + deferred_close(fdevent_release(s->fde)); remove_socket(s); delete s; diff --git a/adb/sysdeps/chrono.h b/adb/sysdeps/chrono.h index c73a638ad..5c5af7cd6 100644 --- a/adb/sysdeps/chrono.h +++ b/adb/sysdeps/chrono.h @@ -18,29 +18,4 @@ #include -#if defined(_WIN32) -// We don't have C++14 on Windows yet. -// Reimplement std::chrono_literals ourselves until we do. - -// Silence the following warning (which gets promoted to an error): -// error: literal operator suffixes not preceded by ‘_’ are reserved for future standardization -#pragma GCC system_header - -constexpr std::chrono::seconds operator"" s(unsigned long long s) { - return std::chrono::seconds(s); -} - -constexpr std::chrono::duration operator"" s(long double s) { - return std::chrono::duration(s); -} - -constexpr std::chrono::milliseconds operator"" ms(unsigned long long ms) { - return std::chrono::milliseconds(ms); -} - -constexpr std::chrono::duration operator"" ms(long double ms) { - return std::chrono::duration(ms); -} -#else using namespace std::chrono_literals; -#endif diff --git a/adb/test_device.py b/adb/test_device.py index 34f8fd9fa..f95a5b3cd 100755 --- a/adb/test_device.py +++ b/adb/test_device.py @@ -35,6 +35,8 @@ import threading import time import unittest +from datetime import datetime + import adb def requires_root(func): @@ -1335,6 +1337,63 @@ class DeviceOfflineTest(DeviceTest): self.device.forward_remove("tcp:{}".format(local_port)) +class SocketTest(DeviceTest): + def test_socket_flush(self): + """Test that we handle socket closure properly. + + If we're done writing to a socket, closing before the other end has + closed will send a TCP_RST if we have incoming data queued up, which + may result in data that we've written being discarded. + + Bug: http://b/74616284 + """ + s = socket.create_connection(("localhost", 5037)) + + def adb_length_prefixed(string): + encoded = string.encode("utf8") + result = b"%04x%s" % (len(encoded), encoded) + return result + + if "ANDROID_SERIAL" in os.environ: + transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] + else: + transport_string = "host:transport-any" + + s.sendall(adb_length_prefixed(transport_string)) + response = s.recv(4) + self.assertEquals(b"OKAY", response) + + shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" + s.sendall(adb_length_prefixed(shell_string)) + + response = s.recv(4) + self.assertEquals(b"OKAY", response) + + # Spawn a thread that dumps garbage into the socket until failure. + def spam(): + buf = b"\0" * 16384 + try: + while True: + s.sendall(buf) + except Exception as ex: + print(ex) + + thread = threading.Thread(target=spam) + thread.start() + + time.sleep(1) + + received = b"" + while True: + read = s.recv(512) + if len(read) == 0: + break + received += read + + self.assertEquals(1024 * 1024 + len("foo\n"), len(received)) + thread.join() + + if sys.platform == "win32": # From https://stackoverflow.com/a/38749458 import os