Merge changes Ief3dbf8e,Ib06e6f65
* changes: adb: add fdevent callback that passes the fdevent. base: don't overwrite errno in unique_fd::~unique_fd.
This commit is contained in:
commit
ee0e63f71d
4 changed files with 119 additions and 64 deletions
|
|
@ -33,6 +33,8 @@
|
|||
#include <list>
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
#include <android-base/chrono_utils.h>
|
||||
|
|
@ -121,13 +123,8 @@ static std::string dump_fde(const fdevent* fde) {
|
|||
state.c_str());
|
||||
}
|
||||
|
||||
void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) {
|
||||
check_main_thread();
|
||||
CHECK_GE(fd, 0);
|
||||
memset(fde, 0, sizeof(fdevent));
|
||||
}
|
||||
|
||||
fdevent* fdevent_create(int fd, fd_func func, void* arg) {
|
||||
template <typename F>
|
||||
static fdevent* fdevent_create_impl(int fd, F func, void* arg) {
|
||||
check_main_thread();
|
||||
CHECK_GE(fd, 0);
|
||||
|
||||
|
|
@ -150,6 +147,14 @@ fdevent* fdevent_create(int fd, fd_func func, void* arg) {
|
|||
return fde;
|
||||
}
|
||||
|
||||
fdevent* fdevent_create(int fd, fd_func func, void* arg) {
|
||||
return fdevent_create_impl(fd, func, arg);
|
||||
}
|
||||
|
||||
fdevent* fdevent_create(int fd, fd_func2 func, void* arg) {
|
||||
return fdevent_create_impl(fd, func, arg);
|
||||
}
|
||||
|
||||
unique_fd fdevent_release(fdevent* fde) {
|
||||
check_main_thread();
|
||||
if (!fde) {
|
||||
|
|
@ -290,13 +295,27 @@ static void fdevent_process() {
|
|||
}
|
||||
}
|
||||
|
||||
template <class T>
|
||||
struct always_false : std::false_type {};
|
||||
|
||||
static void fdevent_call_fdfunc(fdevent* fde) {
|
||||
unsigned events = fde->events;
|
||||
fde->events = 0;
|
||||
CHECK(fde->state & FDE_PENDING);
|
||||
fde->state &= (~FDE_PENDING);
|
||||
D("fdevent_call_fdfunc %s", dump_fde(fde).c_str());
|
||||
fde->func(fde->fd.get(), events, fde->arg);
|
||||
std::visit(
|
||||
[&](auto&& f) {
|
||||
using F = std::decay_t<decltype(f)>;
|
||||
if constexpr (std::is_same_v<fd_func, F>) {
|
||||
f(fde->fd.get(), events, fde->arg);
|
||||
} else if constexpr (std::is_same_v<fd_func2, F>) {
|
||||
f(fde, events, fde->arg);
|
||||
} else {
|
||||
static_assert(always_false<F>::value, "non-exhaustive visitor");
|
||||
}
|
||||
},
|
||||
fde->func);
|
||||
}
|
||||
|
||||
static void fdevent_run_flush() EXCLUDES(run_queue_mutex) {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@
|
|||
#include <stdint.h> /* for int64_t */
|
||||
|
||||
#include <functional>
|
||||
#include <variant>
|
||||
|
||||
#include "adb_unique_fd.h"
|
||||
|
||||
|
|
@ -30,6 +31,7 @@
|
|||
#define FDE_ERROR 0x0004
|
||||
|
||||
typedef void (*fd_func)(int fd, unsigned events, void *userdata);
|
||||
typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata);
|
||||
|
||||
struct fdevent {
|
||||
uint64_t id;
|
||||
|
|
@ -40,15 +42,14 @@ struct fdevent {
|
|||
uint16_t state = 0;
|
||||
uint16_t events = 0;
|
||||
|
||||
fd_func func = nullptr;
|
||||
std::variant<fd_func, fd_func2> func;
|
||||
void* arg = nullptr;
|
||||
};
|
||||
|
||||
/* Allocate and initialize a new fdevent object
|
||||
* Note: use FD_TIMER as 'fd' to create a fd-less object
|
||||
* (used to implement timers).
|
||||
*/
|
||||
// Allocate and initialize a new fdevent object
|
||||
// TODO: Switch these to unique_fd.
|
||||
fdevent *fdevent_create(int fd, fd_func func, void *arg);
|
||||
fdevent* fdevent_create(int fd, fd_func2 func, void* arg);
|
||||
|
||||
// Deallocate an fdevent object that was created by fdevent_create.
|
||||
void fdevent_destroy(fdevent *fde);
|
||||
|
|
@ -56,16 +57,14 @@ void fdevent_destroy(fdevent *fde);
|
|||
// fdevent_destroy, except releasing the file descriptor previously owned by the fdevent.
|
||||
unique_fd fdevent_release(fdevent* fde);
|
||||
|
||||
/* Change which events should cause notifications
|
||||
*/
|
||||
// Change which events should cause notifications
|
||||
void fdevent_set(fdevent *fde, unsigned events);
|
||||
void fdevent_add(fdevent *fde, unsigned events);
|
||||
void fdevent_del(fdevent *fde, unsigned events);
|
||||
|
||||
void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms);
|
||||
|
||||
/* loop forever, handling events.
|
||||
*/
|
||||
// Loop forever, handling events.
|
||||
void fdevent_loop();
|
||||
|
||||
void check_main_thread();
|
||||
|
|
|
|||
|
|
@ -30,10 +30,16 @@
|
|||
|
||||
class FdHandler {
|
||||
public:
|
||||
FdHandler(int read_fd, int write_fd) : read_fd_(read_fd), write_fd_(write_fd) {
|
||||
read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
|
||||
FdHandler(int read_fd, int write_fd, bool use_new_callback)
|
||||
: read_fd_(read_fd), write_fd_(write_fd) {
|
||||
if (use_new_callback) {
|
||||
read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this);
|
||||
write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this);
|
||||
} else {
|
||||
read_fde_ = fdevent_create(read_fd_, FdEventCallback, this);
|
||||
write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
|
||||
}
|
||||
fdevent_add(read_fde_, FDE_READ);
|
||||
write_fde_ = fdevent_create(write_fd_, FdEventCallback, this);
|
||||
}
|
||||
|
||||
~FdHandler() {
|
||||
|
|
@ -64,6 +70,29 @@ class FdHandler {
|
|||
}
|
||||
}
|
||||
|
||||
static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) {
|
||||
int fd = fde->fd.get();
|
||||
FdHandler* handler = reinterpret_cast<FdHandler*>(userdata);
|
||||
ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events;
|
||||
if (events & FDE_READ) {
|
||||
ASSERT_EQ(fd, handler->read_fd_);
|
||||
char c;
|
||||
ASSERT_EQ(1, adb_read(fd, &c, 1));
|
||||
handler->queue_.push(c);
|
||||
fdevent_add(handler->write_fde_, FDE_WRITE);
|
||||
}
|
||||
if (events & FDE_WRITE) {
|
||||
ASSERT_EQ(fd, handler->write_fd_);
|
||||
ASSERT_FALSE(handler->queue_.empty());
|
||||
char c = handler->queue_.front();
|
||||
handler->queue_.pop();
|
||||
ASSERT_EQ(1, adb_write(fd, &c, 1));
|
||||
if (handler->queue_.empty()) {
|
||||
fdevent_del(handler->write_fde_, FDE_WRITE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
const int read_fd_;
|
||||
const int write_fd_;
|
||||
|
|
@ -84,56 +113,60 @@ TEST_F(FdeventTest, fdevent_terminate) {
|
|||
}
|
||||
|
||||
TEST_F(FdeventTest, smoke) {
|
||||
const size_t PIPE_COUNT = 10;
|
||||
const size_t MESSAGE_LOOP_COUNT = 100;
|
||||
const std::string MESSAGE = "fdevent_test";
|
||||
int fd_pair1[2];
|
||||
int fd_pair2[2];
|
||||
ASSERT_EQ(0, adb_socketpair(fd_pair1));
|
||||
ASSERT_EQ(0, adb_socketpair(fd_pair2));
|
||||
ThreadArg thread_arg;
|
||||
thread_arg.first_read_fd = fd_pair1[0];
|
||||
thread_arg.last_write_fd = fd_pair2[1];
|
||||
thread_arg.middle_pipe_count = PIPE_COUNT;
|
||||
int writer = fd_pair1[1];
|
||||
int reader = fd_pair2[0];
|
||||
for (bool use_new_callback : {true, false}) {
|
||||
fdevent_reset();
|
||||
const size_t PIPE_COUNT = 10;
|
||||
const size_t MESSAGE_LOOP_COUNT = 100;
|
||||
const std::string MESSAGE = "fdevent_test";
|
||||
int fd_pair1[2];
|
||||
int fd_pair2[2];
|
||||
ASSERT_EQ(0, adb_socketpair(fd_pair1));
|
||||
ASSERT_EQ(0, adb_socketpair(fd_pair2));
|
||||
ThreadArg thread_arg;
|
||||
thread_arg.first_read_fd = fd_pair1[0];
|
||||
thread_arg.last_write_fd = fd_pair2[1];
|
||||
thread_arg.middle_pipe_count = PIPE_COUNT;
|
||||
int writer = fd_pair1[1];
|
||||
int reader = fd_pair2[0];
|
||||
|
||||
PrepareThread();
|
||||
PrepareThread();
|
||||
|
||||
std::vector<std::unique_ptr<FdHandler>> fd_handlers;
|
||||
fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() {
|
||||
std::vector<int> read_fds;
|
||||
std::vector<int> write_fds;
|
||||
std::vector<std::unique_ptr<FdHandler>> fd_handlers;
|
||||
fdevent_run_on_main_thread([&thread_arg, &fd_handlers, use_new_callback]() {
|
||||
std::vector<int> read_fds;
|
||||
std::vector<int> write_fds;
|
||||
|
||||
read_fds.push_back(thread_arg.first_read_fd);
|
||||
for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
|
||||
int fds[2];
|
||||
ASSERT_EQ(0, adb_socketpair(fds));
|
||||
read_fds.push_back(fds[0]);
|
||||
write_fds.push_back(fds[1]);
|
||||
read_fds.push_back(thread_arg.first_read_fd);
|
||||
for (size_t i = 0; i < thread_arg.middle_pipe_count; ++i) {
|
||||
int fds[2];
|
||||
ASSERT_EQ(0, adb_socketpair(fds));
|
||||
read_fds.push_back(fds[0]);
|
||||
write_fds.push_back(fds[1]);
|
||||
}
|
||||
write_fds.push_back(thread_arg.last_write_fd);
|
||||
|
||||
for (size_t i = 0; i < read_fds.size(); ++i) {
|
||||
fd_handlers.push_back(
|
||||
std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback));
|
||||
}
|
||||
});
|
||||
WaitForFdeventLoop();
|
||||
|
||||
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
|
||||
std::string read_buffer = MESSAGE;
|
||||
std::string write_buffer(MESSAGE.size(), 'a');
|
||||
ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
|
||||
ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
|
||||
ASSERT_EQ(read_buffer, write_buffer);
|
||||
}
|
||||
write_fds.push_back(thread_arg.last_write_fd);
|
||||
|
||||
for (size_t i = 0; i < read_fds.size(); ++i) {
|
||||
fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i]));
|
||||
}
|
||||
});
|
||||
WaitForFdeventLoop();
|
||||
fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
|
||||
WaitForFdeventLoop();
|
||||
|
||||
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
|
||||
std::string read_buffer = MESSAGE;
|
||||
std::string write_buffer(MESSAGE.size(), 'a');
|
||||
ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
|
||||
ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
|
||||
ASSERT_EQ(read_buffer, write_buffer);
|
||||
TerminateThread();
|
||||
ASSERT_EQ(0, adb_close(writer));
|
||||
ASSERT_EQ(0, adb_close(reader));
|
||||
}
|
||||
|
||||
fdevent_run_on_main_thread([&fd_handlers]() { fd_handlers.clear(); });
|
||||
WaitForFdeventLoop();
|
||||
|
||||
TerminateThread();
|
||||
ASSERT_EQ(0, adb_close(writer));
|
||||
ASSERT_EQ(0, adb_close(reader));
|
||||
}
|
||||
|
||||
struct InvalidFdArg {
|
||||
|
|
|
|||
|
|
@ -17,10 +17,10 @@
|
|||
#pragma once
|
||||
|
||||
#include <dirent.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#if !defined(_WIN32)
|
||||
#include <dirent.h>
|
||||
#include <sys/socket.h>
|
||||
#endif
|
||||
|
||||
|
|
@ -114,6 +114,8 @@ class unique_fd_impl final {
|
|||
|
||||
private:
|
||||
void reset(int new_value, void* previous_tag) {
|
||||
int previous_errno = errno;
|
||||
|
||||
if (fd_ != -1) {
|
||||
close(fd_, this);
|
||||
}
|
||||
|
|
@ -122,6 +124,8 @@ class unique_fd_impl final {
|
|||
if (new_value != -1) {
|
||||
tag(new_value, previous_tag, this);
|
||||
}
|
||||
|
||||
errno = previous_errno;
|
||||
}
|
||||
|
||||
int fd_ = -1;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue