diff --git a/adb/adb.h b/adb/adb.h index a6d04631d..c74fa99aa 100644 --- a/adb/adb.h +++ b/adb/adb.h @@ -196,10 +196,6 @@ ConnectionState connection_state(atransport* t); extern const char* adb_device_banner; -#if !ADB_HOST -extern int SHELL_EXIT_NOTIFY_FD; -#endif // !ADB_HOST - #define CHUNK_SIZE (64 * 1024) #if !ADB_HOST diff --git a/adb/daemon/shell_service.cpp b/adb/daemon/shell_service.cpp index c04ceafa7..17c7ebaa0 100644 --- a/adb/daemon/shell_service.cpp +++ b/adb/daemon/shell_service.cpp @@ -681,22 +681,6 @@ void Subprocess::WaitForExit() { } protocol_sfd_.reset(-1); } - - // Pass the local socket FD to the shell cleanup fdevent. - if (SHELL_EXIT_NOTIFY_FD >= 0) { - int fd = local_socket_sfd_; - if (WriteFdExactly(SHELL_EXIT_NOTIFY_FD, &fd, sizeof(fd))) { - D("passed fd %d to SHELL_EXIT_NOTIFY_FD (%d) for pid %d", - fd, SHELL_EXIT_NOTIFY_FD, pid_); - // The shell exit fdevent now owns the FD and will close it once - // the last bit of data flushes through. - static_cast(local_socket_sfd_.release()); - } else { - PLOG(ERROR) << "failed to write fd " << fd - << " to SHELL_EXIT_NOTIFY_FD (" << SHELL_EXIT_NOTIFY_FD - << ") for pid " << pid_; - } - } } } // namespace diff --git a/adb/daemon/shell_service_test.cpp b/adb/daemon/shell_service_test.cpp index 839284ed0..4e27822d1 100644 --- a/adb/daemon/shell_service_test.cpp +++ b/adb/daemon/shell_service_test.cpp @@ -55,40 +55,20 @@ class ShellServiceTest : public ::testing::Test { static sighandler_t saved_sigpipe_handler_; int subprocess_fd_ = -1; - int shell_exit_receiver_fd_ = -1, saved_shell_exit_fd_; }; sighandler_t ShellServiceTest::saved_sigpipe_handler_ = nullptr; void ShellServiceTest::StartTestSubprocess( const char* command, SubprocessType type, SubprocessProtocol protocol) { - // We want to intercept the shell exit message to make sure it's sent. - saved_shell_exit_fd_ = SHELL_EXIT_NOTIFY_FD; - int fd[2]; - ASSERT_TRUE(adb_socketpair(fd) >= 0); - SHELL_EXIT_NOTIFY_FD = fd[0]; - shell_exit_receiver_fd_ = fd[1]; - subprocess_fd_ = StartSubprocess(command, nullptr, type, protocol); ASSERT_TRUE(subprocess_fd_ >= 0); } void ShellServiceTest::CleanupTestSubprocess() { if (subprocess_fd_ >= 0) { - // Subprocess should send its FD to SHELL_EXIT_NOTIFY_FD for cleanup. - int notified_fd = -1; - ASSERT_TRUE(ReadFdExactly(shell_exit_receiver_fd_, ¬ified_fd, - sizeof(notified_fd))); - ASSERT_EQ(notified_fd, subprocess_fd_); - adb_close(subprocess_fd_); subprocess_fd_ = -1; - - // Restore SHELL_EXIT_NOTIFY_FD. - adb_close(SHELL_EXIT_NOTIFY_FD); - adb_close(shell_exit_receiver_fd_); - shell_exit_receiver_fd_ = -1; - SHELL_EXIT_NOTIFY_FD = saved_shell_exit_fd_; } } diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index ba82e7c5f..9776c1bec 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -44,13 +44,6 @@ #include "adb_unique_fd.h" #include "adb_utils.h" -#if !ADB_HOST -// This socket is used when a subproc shell service exists. -// It wakes up the fdevent_loop() and cause the correct handling -// of the shell's pseudo-tty master. I.e. force close it. -int SHELL_EXIT_NOTIFY_FD = -1; -#endif // !ADB_HOST - #define FDE_EVENTMASK 0x00ff #define FDE_STATEMASK 0xff00 @@ -296,72 +289,6 @@ static void fdevent_call_fdfunc(fdevent* fde) { fde->func(fde->fd, events, fde->arg); } -#if !ADB_HOST - -#include - -static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) { - D("subproc handling on fd = %d, ev = %x", fd, ev); - - CHECK_GE(fd, 0); - - if (ev & FDE_READ) { - int subproc_fd; - - if(!ReadFdExactly(fd, &subproc_fd, sizeof(subproc_fd))) { - LOG(FATAL) << "Failed to read the subproc's fd from " << fd; - } - auto it = g_poll_node_map.find(subproc_fd); - if (it == g_poll_node_map.end()) { - D("subproc_fd %d cleared from fd_table", subproc_fd); - adb_close(subproc_fd); - return; - } - fdevent* subproc_fde = it->second.fde; - if(subproc_fde->fd != subproc_fd) { - // Already reallocated? - LOG(FATAL) << "subproc_fd(" << subproc_fd << ") != subproc_fde->fd(" << subproc_fde->fd - << ")"; - return; - } - - subproc_fde->force_eof = 1; - - int rcount = 0; - ioctl(subproc_fd, FIONREAD, &rcount); - D("subproc with fd %d has rcount=%d, err=%d", subproc_fd, rcount, errno); - if (rcount != 0) { - // If there is data left, it will show up in the select(). - // This works because there is no other thread reading that - // data when in this fd_func(). - return; - } - - D("subproc_fde %s", dump_fde(subproc_fde).c_str()); - subproc_fde->events |= FDE_READ; - if(subproc_fde->state & FDE_PENDING) { - return; - } - subproc_fde->state |= FDE_PENDING; - fdevent_call_fdfunc(subproc_fde); - } -} - -static void fdevent_subproc_setup() { - int s[2]; - - if(adb_socketpair(s)) { - PLOG(FATAL) << "cannot create shell-exit socket-pair"; - } - D("fdevent_subproc: socket pair (%d, %d)", s[0], s[1]); - - SHELL_EXIT_NOTIFY_FD = s[0]; - fdevent *fde = fdevent_create(s[1], fdevent_subproc_event_func, NULL); - CHECK(fde != nullptr) << "cannot create fdevent for shell-exit handler"; - fdevent_add(fde, FDE_READ); -} -#endif // !ADB_HOST - static void fdevent_run_flush() EXCLUDES(run_queue_mutex) { // We need to be careful around reentrancy here, since a function we call can queue up another // function. @@ -402,6 +329,10 @@ static void fdevent_run_setup() { PLOG(FATAL) << "failed to create run queue notify socketpair"; } + if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { + PLOG(FATAL) << "failed to make run queue notify socket nonblocking"; + } + run_queue_notify_fd.reset(s[0]); fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr); CHECK(fde != nullptr); @@ -418,7 +349,12 @@ void fdevent_run_on_main_thread(std::function fn) { // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. // In that case, rely on the setup code to flush the queue without a notification being needed. if (run_queue_notify_fd != -1) { - if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) { + int rc = adb_write(run_queue_notify_fd.get(), "", 1); + + // It's possible that we get EAGAIN here, if lots of notifications came in while handling. + if (rc == 0) { + PLOG(FATAL) << "run queue notify fd was closed?"; + } else if (rc == -1 && errno != EAGAIN) { PLOG(FATAL) << "failed to write to run queue notify fd"; } } @@ -426,9 +362,6 @@ void fdevent_run_on_main_thread(std::function fn) { void fdevent_loop() { set_main_thread(); -#if !ADB_HOST - fdevent_subproc_setup(); -#endif // !ADB_HOST fdevent_run_setup(); while (true) { diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index dadae5ab7..95dc4c248 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -180,7 +180,13 @@ TEST_F(FdeventTest, run_on_main_thread) { PrepareThread(); std::thread thread(fdevent_loop); - for (int i = 0; i < 100; ++i) { + // Block the main thread for a long time while we queue our callbacks. + fdevent_run_on_main_thread([]() { + check_main_thread(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + }); + + for (int i = 0; i < 1000000; ++i) { fdevent_run_on_main_thread([i, &vec]() { check_main_thread(); vec.push_back(i); @@ -189,8 +195,8 @@ TEST_F(FdeventTest, run_on_main_thread) { TerminateThread(thread); - ASSERT_EQ(100u, vec.size()); - for (int i = 0; i < 100; ++i) { + ASSERT_EQ(1000000u, vec.size()); + for (int i = 0; i < 1000000; ++i) { ASSERT_EQ(i, vec[i]); } } diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h index 5ca49ac08..1a2d41c6f 100644 --- a/adb/fdevent_test.h +++ b/adb/fdevent_test.h @@ -52,13 +52,8 @@ class FdeventTest : public ::testing::Test { } size_t GetAdditionalLocalSocketCount() { -#if ADB_HOST // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket return 2; -#else - // dummy socket + fdevent_run_on_main_thread + fdevent_subproc_setup() sockets - return 3; -#endif } void TerminateThread(std::thread& thread) { diff --git a/adb/socket_test.cpp b/adb/socket_test.cpp index 44d327659..6b400565f 100644 --- a/adb/socket_test.cpp +++ b/adb/socket_test.cpp @@ -209,7 +209,6 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { TerminateThread(thread); } -#if 0 // Ensure that if we fail to write output to an fd, we will still flush data coming from it. TEST_F(LocalSocketTest, flush_after_shutdown) { int head_fd[2]; @@ -248,7 +247,6 @@ TEST_F(LocalSocketTest, flush_after_shutdown) { ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); TerminateThread(thread); } -#endif #if defined(__linux__) diff --git a/adb/sockets.cpp b/adb/sockets.cpp index 0007fed7b..e05a3dbc6 100644 --- a/adb/sockets.cpp +++ b/adb/sockets.cpp @@ -106,50 +106,131 @@ restart: } } +enum class SocketFlushResult { + Destroyed, + TryAgain, + Completed, +}; + +static SocketFlushResult local_socket_flush_incoming(asocket* s) { + while (!s->packet_queue.empty()) { + Range& r = s->packet_queue.front(); + + int rc = adb_write(s->fd, r.data(), r.size()); + if (rc == static_cast(r.size())) { + s->packet_queue.pop_front(); + } else if (rc > 0) { + r.drop_front(rc); + fdevent_add(&s->fde, FDE_WRITE); + return SocketFlushResult::TryAgain; + } else if (rc == -1 && errno == EAGAIN) { + fdevent_add(&s->fde, FDE_WRITE); + return SocketFlushResult::TryAgain; + } + + // We failed to write, but it's possible that we can still read from the socket. + // Give that a try before giving up. + s->has_write_error = true; + break; + } + + // If we sent the last packet of a closing socket, we can now destroy it. + if (s->closing) { + s->close(s); + return SocketFlushResult::Destroyed; + } + + fdevent_del(&s->fde, FDE_WRITE); + return SocketFlushResult::Completed; +} + +// Returns false if the socket has been closed and destroyed as a side-effect of this function. +static bool local_socket_flush_outgoing(asocket* s) { + const size_t max_payload = s->get_max_payload(); + std::string data; + data.resize(max_payload); + char* x = &data[0]; + size_t avail = max_payload; + int r = 0; + int is_eof = 0; + + while (avail > 0) { + r = adb_read(s->fd, x, avail); + D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r, + r < 0 ? errno : 0, avail); + if (r == -1) { + if (errno == EAGAIN) { + break; + } + } else if (r > 0) { + avail -= r; + x += r; + continue; + } + + /* r = 0 or unhandled error */ + is_eof = 1; + break; + } + D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof, + s->fde.force_eof); + + if (avail != max_payload && s->peer) { + data.resize(max_payload - avail); + + // s->peer->enqueue() may call s->close() and free s, + // so save variables for debug printing below. + unsigned saved_id = s->id; + int saved_fd = s->fd; + r = s->peer->enqueue(s->peer, std::move(data)); + D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); + + if (r < 0) { + // Error return means they closed us as a side-effect and we must + // return immediately. + // + // Note that if we still have buffered packets, the socket will be + // placed on the closing socket list. This handler function will be + // called again to process FDE_WRITE events. + return false; + } + + if (r > 0) { + /* if the remote cannot accept further events, + ** we disable notification of READs. They'll + ** be enabled again when we get a call to ready() + */ + fdevent_del(&s->fde, FDE_READ); + } + } + + // Don't allow a forced eof if data is still there. + if ((s->fde.force_eof && !r) || is_eof) { + D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof); + s->close(s); + return false; + } + + return true; +} + static int local_socket_enqueue(asocket* s, std::string data) { D("LS(%d): enqueue %zu", s->id, data.size()); Range r(std::move(data)); - - /* if there is already data queue'd, we will receive - ** events when it's time to write. just add this to - ** the tail - */ - if (!s->packet_queue.empty()) { - goto enqueue; - } - - /* write as much as we can, until we - ** would block or there is an error/eof - */ - while (!r.empty()) { - int rc = adb_write(s->fd, r.data(), r.size()); - if (rc > 0) { - r.drop_front(rc); - continue; - } - - if (rc == 0 || errno != EAGAIN) { - D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno)); - s->has_write_error = true; - s->close(s); - return 1; /* not ready (error) */ - } else { - // errno == EAGAIN - break; - } - } - - if (r.empty()) { - return 0; /* ready for more data */ - } - -enqueue: - /* make sure we are notified when we can drain the queue */ s->packet_queue.push_back(std::move(r)); - fdevent_add(&s->fde, FDE_WRITE); + switch (local_socket_flush_incoming(s)) { + case SocketFlushResult::Destroyed: + return -1; - return 1; /* not ready (backlog) */ + case SocketFlushResult::TryAgain: + return 1; + + case SocketFlushResult::Completed: + return 0; + } + + return !s->packet_queue.empty(); } static void local_socket_ready(asocket* s) { @@ -224,114 +305,21 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s) { ** in order to simplify the code. */ if (ev & FDE_WRITE) { - while (!s->packet_queue.empty()) { - Range& r = s->packet_queue.front(); - while (!r.empty()) { - int rc = adb_write(fd, r.data(), r.size()); - if (rc == -1) { - /* returning here is ok because FDE_READ will - ** be processed in the next iteration loop - */ - if (errno == EAGAIN) { - return; - } - } else if (rc > 0) { - r.drop_front(rc); - continue; - } - - D(" closing after write because rc=%d and errno is %d", rc, errno); - s->has_write_error = true; - s->close(s); + switch (local_socket_flush_incoming(s)) { + case SocketFlushResult::Destroyed: return; - } - if (r.empty()) { - s->packet_queue.pop_front(); - } + case SocketFlushResult::TryAgain: + break; + + case SocketFlushResult::Completed: + s->peer->ready(s->peer); + break; } - - /* if we sent the last packet of a closing socket, - ** we can now destroy it. - */ - if (s->closing) { - D(" closing because 'closing' is set after write"); - s->close(s); - return; - } - - /* no more packets queued, so we can ignore - ** writable events again and tell our peer - ** to resume writing - */ - fdevent_del(&s->fde, FDE_WRITE); - s->peer->ready(s->peer); } if (ev & FDE_READ) { - const size_t max_payload = s->get_max_payload(); - std::string data; - data.resize(max_payload); - char* x = &data[0]; - size_t avail = max_payload; - int r = 0; - int is_eof = 0; - - while (avail > 0) { - r = adb_read(fd, x, avail); - D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r, - r < 0 ? errno : 0, avail); - if (r == -1) { - if (errno == EAGAIN) { - break; - } - } else if (r > 0) { - avail -= r; - x += r; - continue; - } - - /* r = 0 or unhandled error */ - is_eof = 1; - break; - } - D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof, - s->fde.force_eof); - - if (avail != max_payload && s->peer) { - data.resize(max_payload - avail); - - // s->peer->enqueue() may call s->close() and free s, - // so save variables for debug printing below. - unsigned saved_id = s->id; - int saved_fd = s->fd; - r = s->peer->enqueue(s->peer, std::move(data)); - D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); - - if (r < 0) { - /* error return means they closed us as a side-effect - ** and we must return immediately. - ** - ** note that if we still have buffered packets, the - ** socket will be placed on the closing socket list. - ** this handler function will be called again - ** to process FDE_WRITE events. - */ - return; - } - - if (r > 0) { - /* if the remote cannot accept further events, - ** we disable notification of READs. They'll - ** be enabled again when we get a call to ready() - */ - fdevent_del(&s->fde, FDE_READ); - } - } - /* Don't allow a forced eof if data is still there */ - if ((s->fde.force_eof && !r) || is_eof) { - D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof); - s->close(s); + if (!local_socket_flush_outgoing(s)) { return; } } diff --git a/adb/sysdeps/posix/network.cpp b/adb/sysdeps/posix/network.cpp index 45da5af4a..ecd1fd24e 100644 --- a/adb/sysdeps/posix/network.cpp +++ b/adb/sysdeps/posix/network.cpp @@ -105,8 +105,7 @@ static int _network_loopback_server(bool ipv6, int port, int type, std::string* } if (type == SOCK_STREAM || type == SOCK_SEQPACKET) { - // Arbitrarily selected value, ported from libcutils. - if (listen(s, 4) != 0) { + if (listen(s, SOMAXCONN) != 0) { set_error(error); return -1; } diff --git a/adb/sysdeps_win32.cpp b/adb/sysdeps_win32.cpp index 5873b2b39..cd7d187aa 100644 --- a/adb/sysdeps_win32.cpp +++ b/adb/sysdeps_win32.cpp @@ -747,8 +747,6 @@ int network_loopback_client(int port, int type, std::string* error) { return fd; } -#define LISTEN_BACKLOG 4 - // interface_address is INADDR_LOOPBACK or INADDR_ANY. static int _network_server(int port, int type, u_long interface_address, std::string* error) { struct sockaddr_in addr; @@ -805,7 +803,7 @@ static int _network_server(int port, int type, u_long interface_address, std::st return -1; } if (type == SOCK_STREAM) { - if (listen(s, LISTEN_BACKLOG) == SOCKET_ERROR) { + if (listen(s, SOMAXCONN) == SOCKET_ERROR) { const DWORD err = WSAGetLastError(); *error = android::base::StringPrintf( "cannot listen on socket: %s", android::base::SystemErrorCodeToString(err).c_str()); diff --git a/adb/test_device.py b/adb/test_device.py index d422df25d..d39eb144a 100644 --- a/adb/test_device.py +++ b/adb/test_device.py @@ -1187,7 +1187,7 @@ class FileOperationsTest(DeviceTest): # Verify that the device ended up with the expected UTF-8 path output = self.device.shell( ['ls', '/data/local/tmp/adb-test-*'])[0].strip() - self.assertEqual(remote_path.encode('utf-8'), output) + self.assertEqual(remote_path, output) # pull. self.device.pull(remote_path, tf.name)