Merge "Looper: Use sequence numbers in epoll_event to track requests" am: 663c692a2d am: 7dc38fe273
Original change: https://android-review.googlesource.com/c/platform/system/core/+/1795650 Change-Id: I32e4943965d7965b32da8350be0d3dbe27d06fa1
This commit is contained in:
commit
cf90434fe6
3 changed files with 232 additions and 107 deletions
|
|
@ -20,6 +20,16 @@
|
||||||
|
|
||||||
namespace android {
|
namespace android {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
constexpr uint64_t WAKE_EVENT_FD_SEQ = 1;
|
||||||
|
|
||||||
|
epoll_event createEpollEvent(uint32_t events, uint64_t seq) {
|
||||||
|
return {.events = events, .data = {.u64 = seq}};
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
// --- WeakMessageHandler ---
|
// --- WeakMessageHandler ---
|
||||||
|
|
||||||
WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
|
WeakMessageHandler::WeakMessageHandler(const wp<MessageHandler>& handler) :
|
||||||
|
|
@ -64,7 +74,7 @@ Looper::Looper(bool allowNonCallbacks)
|
||||||
mSendingMessage(false),
|
mSendingMessage(false),
|
||||||
mPolling(false),
|
mPolling(false),
|
||||||
mEpollRebuildRequired(false),
|
mEpollRebuildRequired(false),
|
||||||
mNextRequestSeq(0),
|
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
|
||||||
mResponseIndex(0),
|
mResponseIndex(0),
|
||||||
mNextMessageUptime(LLONG_MAX) {
|
mNextMessageUptime(LLONG_MAX) {
|
||||||
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
|
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
|
||||||
|
|
@ -137,22 +147,17 @@ void Looper::rebuildEpollLocked() {
|
||||||
mEpollFd.reset();
|
mEpollFd.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allocate the new epoll instance and register the wake pipe.
|
// Allocate the new epoll instance and register the WakeEventFd.
|
||||||
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
|
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
|
||||||
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
|
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
|
||||||
|
|
||||||
struct epoll_event eventItem;
|
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
|
||||||
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
|
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);
|
||||||
eventItem.events = EPOLLIN;
|
|
||||||
eventItem.data.fd = mWakeEventFd.get();
|
|
||||||
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
|
|
||||||
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
|
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
|
||||||
for (size_t i = 0; i < mRequests.size(); i++) {
|
for (const auto& [seq, request] : mRequests) {
|
||||||
const Request& request = mRequests.valueAt(i);
|
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
|
||||||
struct epoll_event eventItem;
|
|
||||||
request.initEventItem(&eventItem);
|
|
||||||
|
|
||||||
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
|
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
|
||||||
if (epollResult < 0) {
|
if (epollResult < 0) {
|
||||||
|
|
@ -276,26 +281,28 @@ int Looper::pollInner(int timeoutMillis) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
for (int i = 0; i < eventCount; i++) {
|
for (int i = 0; i < eventCount; i++) {
|
||||||
int fd = eventItems[i].data.fd;
|
const SequenceNumber seq = eventItems[i].data.u64;
|
||||||
uint32_t epollEvents = eventItems[i].events;
|
uint32_t epollEvents = eventItems[i].events;
|
||||||
if (fd == mWakeEventFd.get()) {
|
if (seq == WAKE_EVENT_FD_SEQ) {
|
||||||
if (epollEvents & EPOLLIN) {
|
if (epollEvents & EPOLLIN) {
|
||||||
awoken();
|
awoken();
|
||||||
} else {
|
} else {
|
||||||
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
|
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ssize_t requestIndex = mRequests.indexOfKey(fd);
|
const auto& request_it = mRequests.find(seq);
|
||||||
if (requestIndex >= 0) {
|
if (request_it != mRequests.end()) {
|
||||||
|
const auto& request = request_it->second;
|
||||||
int events = 0;
|
int events = 0;
|
||||||
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
|
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
|
||||||
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
|
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
|
||||||
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
|
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
|
||||||
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
|
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
|
||||||
pushResponse(events, mRequests.valueAt(requestIndex));
|
mResponses.push({.seq = seq, .events = events, .request = request});
|
||||||
} else {
|
} else {
|
||||||
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
|
ALOGW("Ignoring unexpected epoll events 0x%x for sequence number %" PRIu64
|
||||||
"no longer registered.", epollEvents, fd);
|
" that is no longer registered.",
|
||||||
|
epollEvents, seq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -354,7 +361,8 @@ Done: ;
|
||||||
// we need to be a little careful when removing the file descriptor afterwards.
|
// we need to be a little careful when removing the file descriptor afterwards.
|
||||||
int callbackResult = response.request.callback->handleEvent(fd, events, data);
|
int callbackResult = response.request.callback->handleEvent(fd, events, data);
|
||||||
if (callbackResult == 0) {
|
if (callbackResult == 0) {
|
||||||
removeFd(fd, response.request.seq);
|
AutoMutex _l(mLock);
|
||||||
|
removeSequenceNumberLocked(response.seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear the callback reference in the response structure promptly because we
|
// Clear the callback reference in the response structure promptly because we
|
||||||
|
|
@ -416,13 +424,6 @@ void Looper::awoken() {
|
||||||
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
|
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
|
||||||
}
|
}
|
||||||
|
|
||||||
void Looper::pushResponse(int events, const Request& request) {
|
|
||||||
Response response;
|
|
||||||
response.events = events;
|
|
||||||
response.request = request;
|
|
||||||
mResponses.push(response);
|
|
||||||
}
|
|
||||||
|
|
||||||
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
|
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
|
||||||
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : nullptr, data);
|
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : nullptr, data);
|
||||||
}
|
}
|
||||||
|
|
@ -449,27 +450,27 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
|
||||||
|
|
||||||
{ // acquire lock
|
{ // acquire lock
|
||||||
AutoMutex _l(mLock);
|
AutoMutex _l(mLock);
|
||||||
|
// There is a sequence number reserved for the WakeEventFd.
|
||||||
|
if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
|
||||||
|
const SequenceNumber seq = mNextRequestSeq++;
|
||||||
|
|
||||||
Request request;
|
Request request;
|
||||||
request.fd = fd;
|
request.fd = fd;
|
||||||
request.ident = ident;
|
request.ident = ident;
|
||||||
request.events = events;
|
request.events = events;
|
||||||
request.seq = mNextRequestSeq++;
|
|
||||||
request.callback = callback;
|
request.callback = callback;
|
||||||
request.data = data;
|
request.data = data;
|
||||||
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
|
|
||||||
|
|
||||||
struct epoll_event eventItem;
|
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
|
||||||
request.initEventItem(&eventItem);
|
auto seq_it = mSequenceNumberByFd.find(fd);
|
||||||
|
if (seq_it == mSequenceNumberByFd.end()) {
|
||||||
ssize_t requestIndex = mRequests.indexOfKey(fd);
|
|
||||||
if (requestIndex < 0) {
|
|
||||||
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
|
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
|
||||||
if (epollResult < 0) {
|
if (epollResult < 0) {
|
||||||
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
|
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mRequests.add(fd, request);
|
mRequests.emplace(seq, request);
|
||||||
|
mSequenceNumberByFd.emplace(fd, seq);
|
||||||
} else {
|
} else {
|
||||||
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
|
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
|
||||||
if (epollResult < 0) {
|
if (epollResult < 0) {
|
||||||
|
|
@ -486,7 +487,7 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
|
||||||
// set from scratch because it may contain an old file handle that we are
|
// set from scratch because it may contain an old file handle that we are
|
||||||
// now unable to remove since its file descriptor is no longer valid.
|
// now unable to remove since its file descriptor is no longer valid.
|
||||||
// No such problem would have occurred if we were using the poll system
|
// No such problem would have occurred if we were using the poll system
|
||||||
// call instead, but that approach carries others disadvantages.
|
// call instead, but that approach carries other disadvantages.
|
||||||
#if DEBUG_CALLBACKS
|
#if DEBUG_CALLBACKS
|
||||||
ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
|
ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor "
|
||||||
"being recycled, falling back on EPOLL_CTL_ADD: %s",
|
"being recycled, falling back on EPOLL_CTL_ADD: %s",
|
||||||
|
|
@ -504,71 +505,69 @@ int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callb
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mRequests.replaceValueAt(requestIndex, request);
|
const SequenceNumber oldSeq = seq_it->second;
|
||||||
|
mRequests.erase(oldSeq);
|
||||||
|
mRequests.emplace(seq, request);
|
||||||
|
seq_it->second = seq;
|
||||||
}
|
}
|
||||||
} // release lock
|
} // release lock
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int Looper::removeFd(int fd) {
|
int Looper::removeFd(int fd) {
|
||||||
return removeFd(fd, -1);
|
AutoMutex _l(mLock);
|
||||||
|
const auto& it = mSequenceNumberByFd.find(fd);
|
||||||
|
if (it == mSequenceNumberByFd.end()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return removeSequenceNumberLocked(it->second);
|
||||||
}
|
}
|
||||||
|
|
||||||
int Looper::removeFd(int fd, int seq) {
|
int Looper::removeSequenceNumberLocked(SequenceNumber seq) {
|
||||||
#if DEBUG_CALLBACKS
|
#if DEBUG_CALLBACKS
|
||||||
ALOGD("%p ~ removeFd - fd=%d, seq=%d", this, fd, seq);
|
ALOGD("%p ~ removeFd - fd=%d, seq=%u", this, fd, seq);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
{ // acquire lock
|
const auto& request_it = mRequests.find(seq);
|
||||||
AutoMutex _l(mLock);
|
if (request_it == mRequests.end()) {
|
||||||
ssize_t requestIndex = mRequests.indexOfKey(fd);
|
return 0;
|
||||||
if (requestIndex < 0) {
|
}
|
||||||
return 0;
|
const int fd = request_it->second.fd;
|
||||||
}
|
|
||||||
|
|
||||||
// Check the sequence number if one was given.
|
// Always remove the FD from the request map even if an error occurs while
|
||||||
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
|
// updating the epoll set so that we avoid accidentally leaking callbacks.
|
||||||
|
mRequests.erase(request_it);
|
||||||
|
mSequenceNumberByFd.erase(fd);
|
||||||
|
|
||||||
|
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
|
||||||
|
if (epollResult < 0) {
|
||||||
|
if (errno == EBADF || errno == ENOENT) {
|
||||||
|
// Tolerate EBADF or ENOENT because it means that the file descriptor was closed
|
||||||
|
// before its callback was unregistered. This error may occur naturally when a
|
||||||
|
// callback has the side-effect of closing the file descriptor before returning and
|
||||||
|
// unregistering itself.
|
||||||
|
//
|
||||||
|
// Unfortunately due to kernel limitations we need to rebuild the epoll
|
||||||
|
// set from scratch because it may contain an old file handle that we are
|
||||||
|
// now unable to remove since its file descriptor is no longer valid.
|
||||||
|
// No such problem would have occurred if we were using the poll system
|
||||||
|
// call instead, but that approach carries other disadvantages.
|
||||||
#if DEBUG_CALLBACKS
|
#if DEBUG_CALLBACKS
|
||||||
ALOGD("%p ~ removeFd - sequence number mismatch, oldSeq=%d",
|
ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
|
||||||
this, mRequests.valueAt(requestIndex).seq);
|
"being closed: %s",
|
||||||
|
this, strerror(errno));
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
scheduleEpollRebuildLocked();
|
||||||
|
} else {
|
||||||
|
// Some other error occurred. This is really weird because it means
|
||||||
|
// our list of callbacks got out of sync with the epoll set somehow.
|
||||||
|
// We defensively rebuild the epoll set to avoid getting spurious
|
||||||
|
// notifications with nowhere to go.
|
||||||
|
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
|
||||||
|
scheduleEpollRebuildLocked();
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Always remove the FD from the request map even if an error occurs while
|
|
||||||
// updating the epoll set so that we avoid accidentally leaking callbacks.
|
|
||||||
mRequests.removeItemsAt(requestIndex);
|
|
||||||
|
|
||||||
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
|
|
||||||
if (epollResult < 0) {
|
|
||||||
if (seq != -1 && (errno == EBADF || errno == ENOENT)) {
|
|
||||||
// Tolerate EBADF or ENOENT when the sequence number is known because it
|
|
||||||
// means that the file descriptor was closed before its callback was
|
|
||||||
// unregistered. This error may occur naturally when a callback has the
|
|
||||||
// side-effect of closing the file descriptor before returning and
|
|
||||||
// unregistering itself.
|
|
||||||
//
|
|
||||||
// Unfortunately due to kernel limitations we need to rebuild the epoll
|
|
||||||
// set from scratch because it may contain an old file handle that we are
|
|
||||||
// now unable to remove since its file descriptor is no longer valid.
|
|
||||||
// No such problem would have occurred if we were using the poll system
|
|
||||||
// call instead, but that approach carries others disadvantages.
|
|
||||||
#if DEBUG_CALLBACKS
|
|
||||||
ALOGD("%p ~ removeFd - EPOLL_CTL_DEL failed due to file descriptor "
|
|
||||||
"being closed: %s", this, strerror(errno));
|
|
||||||
#endif
|
|
||||||
scheduleEpollRebuildLocked();
|
|
||||||
} else {
|
|
||||||
// Some other error occurred. This is really weird because it means
|
|
||||||
// our list of callbacks got out of sync with the epoll set somehow.
|
|
||||||
// We defensively rebuild the epoll set to avoid getting spurious
|
|
||||||
// notifications with nowhere to go.
|
|
||||||
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
|
|
||||||
scheduleEpollRebuildLocked();
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} // release lock
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -656,14 +655,11 @@ bool Looper::isPolling() const {
|
||||||
return mPolling;
|
return mPolling;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Looper::Request::initEventItem(struct epoll_event* eventItem) const {
|
uint32_t Looper::Request::getEpollEvents() const {
|
||||||
int epollEvents = 0;
|
uint32_t epollEvents = 0;
|
||||||
if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
|
if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
|
||||||
if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
|
if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
|
||||||
|
return epollEvents;
|
||||||
memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
|
|
||||||
eventItem->events = epollEvents;
|
|
||||||
eventItem->data.fd = fd;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageHandler::~MessageHandler() { }
|
MessageHandler::~MessageHandler() { }
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,9 @@
|
||||||
#include <utils/Looper.h>
|
#include <utils/Looper.h>
|
||||||
#include <utils/StopWatch.h>
|
#include <utils/StopWatch.h>
|
||||||
#include <utils/Timers.h>
|
#include <utils/Timers.h>
|
||||||
|
#include <thread>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <utility>
|
||||||
#include "Looper_test_pipe.h"
|
#include "Looper_test_pipe.h"
|
||||||
|
|
||||||
#include <utils/threads.h>
|
#include <utils/threads.h>
|
||||||
|
|
@ -710,4 +713,123 @@ TEST_F(LooperTest, RemoveMessage_WhenRemovingSomeMessagesForHandler_ShouldRemove
|
||||||
<< "no more messages to handle";
|
<< "no more messages to handle";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class LooperEventCallback : public LooperCallback {
|
||||||
|
public:
|
||||||
|
using Callback = std::function<int(int fd, int events)>;
|
||||||
|
explicit LooperEventCallback(Callback callback) : mCallback(std::move(callback)) {}
|
||||||
|
int handleEvent(int fd, int events, void* /*data*/) override { return mCallback(fd, events); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
Callback mCallback;
|
||||||
|
};
|
||||||
|
|
||||||
|
// A utility class that allows for pipes to be added and removed from the looper, and polls the
|
||||||
|
// looper from a different thread.
|
||||||
|
class ThreadedLooperUtil {
|
||||||
|
public:
|
||||||
|
explicit ThreadedLooperUtil(const sp<Looper>& looper) : mLooper(looper), mRunning(true) {
|
||||||
|
mThread = std::thread([this]() {
|
||||||
|
while (mRunning) {
|
||||||
|
static constexpr std::chrono::milliseconds POLL_TIMEOUT(500);
|
||||||
|
mLooper->pollOnce(POLL_TIMEOUT.count());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
~ThreadedLooperUtil() {
|
||||||
|
mRunning = false;
|
||||||
|
mThread.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new pipe, and return the write end of the pipe and the id used to track the pipe.
|
||||||
|
// The read end of the pipe is added to the looper.
|
||||||
|
std::pair<int /*id*/, base::unique_fd> createPipe() {
|
||||||
|
int pipeFd[2];
|
||||||
|
if (pipe(pipeFd)) {
|
||||||
|
ADD_FAILURE() << "pipe() failed.";
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
const int readFd = pipeFd[0];
|
||||||
|
const int writeFd = pipeFd[1];
|
||||||
|
|
||||||
|
int id;
|
||||||
|
{ // acquire lock
|
||||||
|
std::scoped_lock l(mLock);
|
||||||
|
|
||||||
|
id = mNextId++;
|
||||||
|
mFds.emplace(id, readFd);
|
||||||
|
|
||||||
|
auto removeCallback = [this, id, readFd](int fd, int events) {
|
||||||
|
EXPECT_EQ(readFd, fd) << "Received callback for incorrect fd.";
|
||||||
|
if ((events & Looper::EVENT_HANGUP) == 0) {
|
||||||
|
return 1; // Not a hangup, keep the callback.
|
||||||
|
}
|
||||||
|
removePipe(id);
|
||||||
|
return 0; // Remove the callback.
|
||||||
|
};
|
||||||
|
|
||||||
|
mLooper->addFd(readFd, 0, Looper::EVENT_INPUT,
|
||||||
|
new LooperEventCallback(std::move(removeCallback)), nullptr);
|
||||||
|
} // release lock
|
||||||
|
|
||||||
|
return {id, base::unique_fd(writeFd)};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the pipe with the given id.
|
||||||
|
void removePipe(int id) {
|
||||||
|
std::scoped_lock l(mLock);
|
||||||
|
if (mFds.find(id) == mFds.end()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
mLooper->removeFd(mFds[id].get());
|
||||||
|
mFds.erase(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the pipe with the given id exists and has not been removed.
|
||||||
|
bool hasPipe(int id) {
|
||||||
|
std::scoped_lock l(mLock);
|
||||||
|
return mFds.find(id) != mFds.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
sp<Looper> mLooper;
|
||||||
|
std::atomic<bool> mRunning;
|
||||||
|
std::thread mThread;
|
||||||
|
|
||||||
|
std::mutex mLock;
|
||||||
|
std::unordered_map<int, base::unique_fd> mFds GUARDED_BY(mLock);
|
||||||
|
int mNextId GUARDED_BY(mLock) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(LooperTest, MultiThreaded_NoUnexpectedFdRemoval) {
|
||||||
|
ThreadedLooperUtil util(mLooper);
|
||||||
|
|
||||||
|
// Iterate repeatedly to try to recreate a flaky instance.
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
auto [firstPipeId, firstPipeFd] = util.createPipe();
|
||||||
|
const int firstFdNumber = firstPipeFd.get();
|
||||||
|
|
||||||
|
// Close the first pipe's fd, causing a fd hangup.
|
||||||
|
firstPipeFd.reset();
|
||||||
|
|
||||||
|
// Request to remove the pipe from this test thread. This causes a race for pipe removal
|
||||||
|
// between the hangup in the looper's thread and this remove request from the test thread.
|
||||||
|
util.removePipe(firstPipeId);
|
||||||
|
|
||||||
|
// Create the second pipe. Since the fds for the first pipe are closed, this pipe should
|
||||||
|
// have the same fd numbers as the first pipe because the lowest unused fd number is used.
|
||||||
|
const auto [secondPipeId, fd] = util.createPipe();
|
||||||
|
EXPECT_EQ(firstFdNumber, fd.get())
|
||||||
|
<< "The first and second fds must match for the purposes of this test.";
|
||||||
|
|
||||||
|
// Wait for unexpected hangup to occur.
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||||
|
|
||||||
|
ASSERT_TRUE(util.hasPipe(secondPipeId)) << "The second pipe was removed unexpectedly.";
|
||||||
|
|
||||||
|
util.removePipe(secondPipeId);
|
||||||
|
}
|
||||||
|
SUCCEED() << "No unexpectedly removed fds.";
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace android
|
} // namespace android
|
||||||
|
|
|
||||||
|
|
@ -17,15 +17,16 @@
|
||||||
#ifndef UTILS_LOOPER_H
|
#ifndef UTILS_LOOPER_H
|
||||||
#define UTILS_LOOPER_H
|
#define UTILS_LOOPER_H
|
||||||
|
|
||||||
#include <utils/threads.h>
|
|
||||||
#include <utils/RefBase.h>
|
#include <utils/RefBase.h>
|
||||||
#include <utils/KeyedVector.h>
|
|
||||||
#include <utils/Timers.h>
|
#include <utils/Timers.h>
|
||||||
|
#include <utils/Vector.h>
|
||||||
|
#include <utils/threads.h>
|
||||||
|
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
|
||||||
#include <android-base/unique_fd.h>
|
#include <android-base/unique_fd.h>
|
||||||
|
|
||||||
|
#include <unordered_map>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
namespace android {
|
namespace android {
|
||||||
|
|
@ -421,18 +422,20 @@ public:
|
||||||
static sp<Looper> getForThread();
|
static sp<Looper> getForThread();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct Request {
|
using SequenceNumber = uint64_t;
|
||||||
int fd;
|
|
||||||
int ident;
|
|
||||||
int events;
|
|
||||||
int seq;
|
|
||||||
sp<LooperCallback> callback;
|
|
||||||
void* data;
|
|
||||||
|
|
||||||
void initEventItem(struct epoll_event* eventItem) const;
|
struct Request {
|
||||||
};
|
int fd;
|
||||||
|
int ident;
|
||||||
|
int events;
|
||||||
|
sp<LooperCallback> callback;
|
||||||
|
void* data;
|
||||||
|
|
||||||
|
uint32_t getEpollEvents() const;
|
||||||
|
};
|
||||||
|
|
||||||
struct Response {
|
struct Response {
|
||||||
|
SequenceNumber seq;
|
||||||
int events;
|
int events;
|
||||||
Request request;
|
Request request;
|
||||||
};
|
};
|
||||||
|
|
@ -463,9 +466,14 @@ private:
|
||||||
android::base::unique_fd mEpollFd; // guarded by mLock but only modified on the looper thread
|
android::base::unique_fd mEpollFd; // guarded by mLock but only modified on the looper thread
|
||||||
bool mEpollRebuildRequired; // guarded by mLock
|
bool mEpollRebuildRequired; // guarded by mLock
|
||||||
|
|
||||||
// Locked list of file descriptor monitoring requests.
|
// Locked maps of fds and sequence numbers monitoring requests.
|
||||||
KeyedVector<int, Request> mRequests; // guarded by mLock
|
// Both maps must be kept in sync at all times.
|
||||||
int mNextRequestSeq;
|
std::unordered_map<SequenceNumber, Request> mRequests; // guarded by mLock
|
||||||
|
std::unordered_map<int /*fd*/, SequenceNumber> mSequenceNumberByFd; // guarded by mLock
|
||||||
|
|
||||||
|
// The sequence number to use for the next fd that is added to the looper.
|
||||||
|
// The sequence number 0 is reserved for the WakeEventFd.
|
||||||
|
SequenceNumber mNextRequestSeq; // guarded by mLock
|
||||||
|
|
||||||
// This state is only used privately by pollOnce and does not require a lock since
|
// This state is only used privately by pollOnce and does not require a lock since
|
||||||
// it runs on a single thread.
|
// it runs on a single thread.
|
||||||
|
|
@ -474,9 +482,8 @@ private:
|
||||||
nsecs_t mNextMessageUptime; // set to LLONG_MAX when none
|
nsecs_t mNextMessageUptime; // set to LLONG_MAX when none
|
||||||
|
|
||||||
int pollInner(int timeoutMillis);
|
int pollInner(int timeoutMillis);
|
||||||
int removeFd(int fd, int seq);
|
int removeSequenceNumberLocked(SequenceNumber seq); // requires mLock
|
||||||
void awoken();
|
void awoken();
|
||||||
void pushResponse(int events, const Request& request);
|
|
||||||
void rebuildEpollLocked();
|
void rebuildEpollLocked();
|
||||||
void scheduleEpollRebuildLocked();
|
void scheduleEpollRebuildLocked();
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue