From c581886eeaa486bca4d5bee6e1c1bb5f8df23745 Mon Sep 17 00:00:00 2001 From: Tom Cherry Date: Tue, 6 Oct 2020 10:22:35 -0700 Subject: [PATCH] logd: single std::mutex for locking log buffers and tracking readers There are only three places where the log buffer lock is not already held when the reader lock is taken: 1) In LogReader, when a new reader connects 2) In LogReader, when a misbehaving reader disconnects 3) LogReaderThread::ThreadFunction() 1) and 2) happen sufficiently rarely that there's no impact if they additionally held a global lock. 3) is refactored in this CL. Previously, it would do the below in a loop 1) Lock the reader lock then wait on a condition variable 2) Unlock the reader lock 3) Lock the log buffer lock in LogBuffer::FlushTo() 4) In each iteration in the LogBuffer::FlushTo() loop 1) Lock then unlock the reader lock in FilterSecondPass() 2) Unlock the log buffer lock to send the message, then re-lock it 5) Unlock the log buffer lock when leaving LogBuffer::FlushTo() If these locks are collapsed into a single lock, then this simplifies to: 1) Lock the single lock then wait on a condition variable 2) In each iteration in the LogBuffer::FlushTo() loop 1) Unlock the single lock to send the message, then re-lock it Collapsing both these locks into a single lock simplifes the code and removes the overhead of acquiring the second lock, in the majority of use cases where the first lock is already held. Secondly, this lock will be a plain std::mutex instead of a RwLock. RwLock's are appropriate when there is a substantial imbalance between readers and writers and high contention, neither are true for logd. Bug: 169736426 Test: logging unit tests Change-Id: Ia511506f2d0935a5321c1b2f65569066f91ecb06 --- logd/Android.bp | 4 ++- logd/ChattyLogBuffer.cpp | 2 -- logd/ChattyLogBuffer.h | 16 ++++----- logd/ChattyLogBufferTest.cpp | 35 ++++++++++++------ logd/LogBuffer.h | 8 +++-- logd/LogBufferTest.cpp | 49 ++++++++++++++----------- logd/LogReader.cpp | 12 ++++--- logd/LogReaderList.cpp | 4 +-- logd/LogReaderList.h | 11 +++--- logd/LogReaderThread.cpp | 40 +++++++-------------- logd/LogReaderThread.h | 42 ++++++++++++---------- logd/LogdLock.cpp | 19 ++++++++++ logd/LogdLock.h | 21 +++++++++++ logd/ReplayMessages.cpp | 3 +- logd/SerializedFlushToState.cpp | 5 +-- logd/SerializedFlushToState.h | 28 +++++++-------- logd/SerializedFlushToStateTest.cpp | 33 +++++++++-------- logd/SerializedLogBuffer.cpp | 31 ++++++---------- logd/SerializedLogBuffer.h | 22 ++++++------ logd/SimpleLogBuffer.cpp | 35 +++++++++--------- logd/SimpleLogBuffer.h | 32 ++++++++--------- logd/fuzz/log_buffer_log_fuzzer.cpp | 4 +-- logd/rwlock.h | 56 ----------------------------- 23 files changed, 251 insertions(+), 261 deletions(-) create mode 100644 logd/LogdLock.cpp create mode 100644 logd/LogdLock.h delete mode 100644 logd/rwlock.h diff --git a/logd/Android.bp b/logd/Android.bp index a7de56180..829c95fc7 100644 --- a/logd/Android.bp +++ b/logd/Android.bp @@ -58,12 +58,13 @@ cc_library_static { srcs: [ "ChattyLogBuffer.cpp", "CompressionEngine.cpp", + "LogBufferElement.cpp", "LogReaderList.cpp", "LogReaderThread.cpp", - "LogBufferElement.cpp", "LogSize.cpp", "LogStatistics.cpp", "LogTags.cpp", + "LogdLock.cpp", "PruneList.cpp", "SerializedFlushToState.cpp", "SerializedLogBuffer.cpp", @@ -138,6 +139,7 @@ cc_defaults { "-fstack-protector-all", "-g", "-Wall", + "-Wthread-safety", "-Wextra", "-Werror", "-fno-builtin", diff --git a/logd/ChattyLogBuffer.cpp b/logd/ChattyLogBuffer.cpp index fd183e4d6..19dad1786 100644 --- a/logd/ChattyLogBuffer.cpp +++ b/logd/ChattyLogBuffer.cpp @@ -333,8 +333,6 @@ bool ChattyLogBuffer::Prune(log_id_t id, unsigned long pruneRows, uid_t caller_u LogReaderThread* oldest = nullptr; bool clearAll = pruneRows == ULONG_MAX; - auto reader_threads_lock = std::lock_guard{reader_list()->reader_threads_lock()}; - // Region locked? for (const auto& reader_thread : reader_list()->reader_threads()) { if (!reader_thread->IsWatching(id)) { diff --git a/logd/ChattyLogBuffer.h b/logd/ChattyLogBuffer.h index b4d3a2ff3..e5b8611c5 100644 --- a/logd/ChattyLogBuffer.h +++ b/logd/ChattyLogBuffer.h @@ -33,19 +33,19 @@ #include "LogStatistics.h" #include "LogTags.h" #include "LogWriter.h" +#include "LogdLock.h" #include "PruneList.h" #include "SimpleLogBuffer.h" -#include "rwlock.h" typedef std::list LogBufferElementCollection; class ChattyLogBuffer : public SimpleLogBuffer { // watermark of any worst/chatty uid processing typedef std::unordered_map LogBufferIteratorMap; - LogBufferIteratorMap mLastWorst[LOG_ID_MAX] GUARDED_BY(lock_); + LogBufferIteratorMap mLastWorst[LOG_ID_MAX] GUARDED_BY(logd_lock); // watermark of any worst/chatty pid of system processing typedef std::unordered_map LogBufferPidIteratorMap; - LogBufferPidIteratorMap mLastWorstPidOfSystem[LOG_ID_MAX] GUARDED_BY(lock_); + LogBufferPidIteratorMap mLastWorstPidOfSystem[LOG_ID_MAX] GUARDED_BY(logd_lock); public: ChattyLogBuffer(LogReaderList* reader_list, LogTags* tags, PruneList* prune, @@ -53,18 +53,18 @@ class ChattyLogBuffer : public SimpleLogBuffer { ~ChattyLogBuffer(); protected: - bool Prune(log_id_t id, unsigned long pruneRows, uid_t uid) REQUIRES(lock_) override; - void LogInternal(LogBufferElement&& elem) REQUIRES(lock_) override; + bool Prune(log_id_t id, unsigned long pruneRows, uid_t uid) REQUIRES(logd_lock) override; + void LogInternal(LogBufferElement&& elem) REQUIRES(logd_lock) override; private: LogBufferElementCollection::iterator Erase(LogBufferElementCollection::iterator it, - bool coalesce = false) REQUIRES(lock_); + bool coalesce = false) REQUIRES(logd_lock); PruneList* prune_; // This always contains a copy of the last message logged, for deduplication. - std::optional last_logged_elements_[LOG_ID_MAX] GUARDED_BY(lock_); + std::optional last_logged_elements_[LOG_ID_MAX] GUARDED_BY(logd_lock); // This contains an element if duplicate messages are seen. // Its `dropped` count is `duplicates seen - 1`. - std::optional duplicate_elements_[LOG_ID_MAX] GUARDED_BY(lock_); + std::optional duplicate_elements_[LOG_ID_MAX] GUARDED_BY(logd_lock); }; diff --git a/logd/ChattyLogBufferTest.cpp b/logd/ChattyLogBufferTest.cpp index 3d9005ab6..e273efe1b 100644 --- a/logd/ChattyLogBufferTest.cpp +++ b/logd/ChattyLogBufferTest.cpp @@ -60,9 +60,14 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) { LogMessages(log_messages); std::vector read_log_messages; - std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); - EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + { + auto lock = std::lock_guard{logd_lock}; + std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); + + std::unique_ptr flush_to_state = + log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + } std::vector expected_log_messages = { make_message(0, "test_tag", "duplicate"), @@ -117,9 +122,13 @@ TEST_P(ChattyLogBufferTest, deduplication_overflow) { LogMessages(log_messages); std::vector read_log_messages; - std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); - EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + { + auto lock = std::lock_guard{logd_lock}; + std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); + std::unique_ptr flush_to_state = + log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + } std::vector expected_log_messages = { make_message(0, "test_tag", "normal"), @@ -173,9 +182,13 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) { LogMessages(log_messages); std::vector read_log_messages; - std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); - EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + { + auto lock = std::lock_guard{logd_lock}; + std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); + std::unique_ptr flush_to_state = + log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + } std::vector expected_log_messages = { make_message(0, 1234, 1), @@ -257,7 +270,7 @@ TEST_P(ChattyLogBufferTest, no_leading_chatty_simple) { std::vector read_log_messages; bool released = false; { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new TestWriter(&read_log_messages, &released)); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true, @@ -317,7 +330,7 @@ TEST_P(ChattyLogBufferTest, no_leading_chatty_tail) { std::vector read_log_messages; bool released = false; { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new TestWriter(&read_log_messages, &released)); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true, diff --git a/logd/LogBuffer.h b/logd/LogBuffer.h index 025926650..8d585236a 100644 --- a/logd/LogBuffer.h +++ b/logd/LogBuffer.h @@ -21,10 +21,12 @@ #include #include +#include #include #include #include "LogWriter.h" +#include "LogdLock.h" // A mask to represent which log buffers a reader is watching, values are (1 << LOG_ID_MAIN), etc. using LogMask = uint32_t; @@ -62,12 +64,12 @@ class LogBuffer { virtual int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, uint16_t len) = 0; - virtual std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) = 0; - virtual void DeleteFlushToState(std::unique_ptr) {} + virtual std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) + REQUIRES(logd_lock) = 0; virtual bool FlushTo( LogWriter* writer, FlushToState& state, const std::function& filter) = 0; + log_time realtime)>& filter) REQUIRES(logd_lock) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual size_t GetSize(log_id_t id) = 0; diff --git a/logd/LogBufferTest.cpp b/logd/LogBufferTest.cpp index 191110522..cb9f4284b 100644 --- a/logd/LogBufferTest.cpp +++ b/logd/LogBufferTest.cpp @@ -190,10 +190,14 @@ TEST_P(LogBufferTest, smoke) { LogMessages(log_messages); std::vector read_log_messages; - std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); - std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); - EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); - EXPECT_EQ(2ULL, flush_to_state->start()); + { + auto lock = std::lock_guard{logd_lock}; + std::unique_ptr test_writer(new TestWriter(&read_log_messages, nullptr)); + std::unique_ptr flush_to_state = + log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + EXPECT_EQ(2ULL, flush_to_state->start()); + } CompareLogMessages(log_messages, read_log_messages); } @@ -227,7 +231,7 @@ TEST_P(LogBufferTest, smoke_with_reader_thread) { bool released = false; { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new TestWriter(&read_log_messages, &released)); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true, @@ -239,7 +243,7 @@ TEST_P(LogBufferTest, smoke_with_reader_thread) { usleep(5000); } { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; EXPECT_EQ(0U, reader_list_.reader_threads().size()); } CompareLogMessages(log_messages, read_log_messages); @@ -301,7 +305,7 @@ TEST_P(LogBufferTest, random_messages) { bool released = false; { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new TestWriter(&read_log_messages, &released)); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true, @@ -313,7 +317,7 @@ TEST_P(LogBufferTest, random_messages) { usleep(5000); } { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; EXPECT_EQ(0U, reader_list_.reader_threads().size()); } CompareLogMessages(log_messages, read_log_messages); @@ -335,7 +339,7 @@ TEST_P(LogBufferTest, read_last_sequence) { bool released = false; { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new TestWriter(&read_log_messages, &released)); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true, @@ -347,7 +351,7 @@ TEST_P(LogBufferTest, read_last_sequence) { usleep(5000); } { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; EXPECT_EQ(0U, reader_list_.reader_threads().size()); } std::vector expected_log_messages = {log_messages.back()}; @@ -372,7 +376,7 @@ TEST_P(LogBufferTest, clear_logs) { // Connect a blocking reader. { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new TestWriter(&read_log_messages, &released)); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), false, @@ -385,7 +389,7 @@ TEST_P(LogBufferTest, clear_logs) { int count = 0; for (; count < kMaxRetryCount; ++count) { usleep(5000); - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; if (reader_list_.reader_threads().back()->start() == 4) { break; } @@ -410,7 +414,7 @@ TEST_P(LogBufferTest, clear_logs) { // Wait up to 250ms for the reader to read the 3 additional logs. for (count = 0; count < kMaxRetryCount; ++count) { usleep(5000); - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; if (reader_list_.reader_threads().back()->start() == 7) { break; } @@ -419,14 +423,14 @@ TEST_P(LogBufferTest, clear_logs) { // Release the reader, wait for it to get the signal then check that it has been deleted. { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; - reader_list_.reader_threads().back()->release_Locked(); + auto lock = std::lock_guard{logd_lock}; + reader_list_.reader_threads().back()->Release(); } while (!released) { usleep(5000); } { - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; EXPECT_EQ(0U, reader_list_.reader_threads().size()); } @@ -438,10 +442,15 @@ TEST_P(LogBufferTest, clear_logs) { // Finally, call FlushTo and ensure that only the 3 logs after the clear remain in the buffer. std::vector read_log_messages_after_clear; - std::unique_ptr test_writer(new TestWriter(&read_log_messages_after_clear, nullptr)); - std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll); - EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); - EXPECT_EQ(7ULL, flush_to_state->start()); + { + auto lock = std::lock_guard{logd_lock}; + std::unique_ptr test_writer( + new TestWriter(&read_log_messages_after_clear, nullptr)); + std::unique_ptr flush_to_state = + log_buffer_->CreateFlushToState(1, kLogMaskAll); + EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr)); + EXPECT_EQ(7ULL, flush_to_state->start()); + } CompareLogMessages(after_clear_messages, read_log_messages_after_clear); } diff --git a/logd/LogReader.cpp b/logd/LogReader.cpp index 6c976931f..1c6a85f16 100644 --- a/logd/LogReader.cpp +++ b/logd/LogReader.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -152,8 +153,8 @@ bool LogReader::onDataAvailable(SocketClient* cli) { if (!fastcmp(buffer, "dumpAndClose", 12)) { // Allow writer to get some cycles, and wait for pending notifications sched_yield(); - reader_list_->reader_threads_lock().lock(); - reader_list_->reader_threads_lock().unlock(); + logd_lock.lock(); + logd_lock.unlock(); sched_yield(); nonBlock = true; } @@ -191,6 +192,7 @@ bool LogReader::onDataAvailable(SocketClient* cli) { } return FilterResult::kSkip; }; + auto lock = std::lock_guard{logd_lock}; auto flush_to_state = log_buffer_->CreateFlushToState(sequence, logMask); log_buffer_->FlushTo(socket_log_writer.get(), *flush_to_state, log_find_start); @@ -212,7 +214,7 @@ bool LogReader::onDataAvailable(SocketClient* cli) { deadline = {}; } - auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; auto entry = std::make_unique(log_buffer_, reader_list_, std::move(socket_log_writer), nonBlock, tail, logMask, pid, start, sequence, deadline); @@ -230,10 +232,10 @@ bool LogReader::onDataAvailable(SocketClient* cli) { bool LogReader::DoSocketDelete(SocketClient* cli) { auto cli_name = SocketClientToName(cli); - auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; for (const auto& reader : reader_list_->reader_threads()) { if (reader->name() == cli_name) { - reader->release_Locked(); + reader->Release(); return true; } } diff --git a/logd/LogReaderList.cpp b/logd/LogReaderList.cpp index 32ba2910d..51cb3d240 100644 --- a/logd/LogReaderList.cpp +++ b/logd/LogReaderList.cpp @@ -19,8 +19,6 @@ // When we are notified a new log entry is available, inform // listening sockets who are watching this entry's log id. void LogReaderList::NotifyNewLog(LogMask log_mask) const { - auto lock = std::lock_guard{reader_threads_lock_}; - for (const auto& entry : reader_threads_) { if (!entry->IsWatchingMultiple(log_mask)) { continue; @@ -28,6 +26,6 @@ void LogReaderList::NotifyNewLog(LogMask log_mask) const { if (entry->deadline().time_since_epoch().count() != 0) { continue; } - entry->triggerReader_Locked(); + entry->TriggerReader(); } } diff --git a/logd/LogReaderList.h b/logd/LogReaderList.h index 594716a5b..39de70a39 100644 --- a/logd/LogReaderList.h +++ b/logd/LogReaderList.h @@ -22,15 +22,16 @@ #include "LogBuffer.h" #include "LogReaderThread.h" +#include "LogdLock.h" class LogReaderList { public: - void NotifyNewLog(LogMask log_mask) const; + void NotifyNewLog(LogMask log_mask) const REQUIRES(logd_lock); - std::list>& reader_threads() { return reader_threads_; } - std::mutex& reader_threads_lock() { return reader_threads_lock_; } + std::list>& reader_threads() REQUIRES(logd_lock) { + return reader_threads_; + } private: - std::list> reader_threads_; - mutable std::mutex reader_threads_lock_; + std::list> reader_threads_ GUARDED_BY(logd_lock); }; \ No newline at end of file diff --git a/logd/LogReaderThread.cpp b/logd/LogReaderThread.cpp index 6ac9741c3..cf769bafb 100644 --- a/logd/LogReaderThread.cpp +++ b/logd/LogReaderThread.cpp @@ -24,6 +24,7 @@ #include "LogBuffer.h" #include "LogReaderList.h" +#include "SerializedFlushToState.h" LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list, std::unique_ptr writer, bool non_block, @@ -40,7 +41,7 @@ LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_li start_time_(start_time), deadline_(deadline), non_block_(non_block) { - cleanSkip_Locked(); + CleanSkip(); flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask); auto thread = std::thread{&LogReaderThread::ThreadFunction, this}; thread.detach(); @@ -49,7 +50,8 @@ LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_li void LogReaderThread::ThreadFunction() { prctl(PR_SET_NAME, "logd.reader.per"); - auto lock = std::unique_lock{reader_list_->reader_threads_lock()}; + auto lock = std::unique_lock{logd_lock}; + auto lock_assertion = android::base::ScopedLockAssertion{logd_lock}; while (!release_) { if (deadline_.time_since_epoch().count() != 0) { @@ -62,23 +64,19 @@ void LogReaderThread::ThreadFunction() { } } - lock.unlock(); - if (tail_) { auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(), flush_to_state_->log_mask()); - log_buffer_->FlushTo( - writer_.get(), *first_pass_state, - [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) { - return FilterFirstPass(log_id, pid, sequence, realtime); - }); - log_buffer_->DeleteFlushToState(std::move(first_pass_state)); + log_buffer_->FlushTo(writer_.get(), *first_pass_state, + [this](log_id_t log_id, pid_t pid, uint64_t sequence, + log_time realtime) REQUIRES(logd_lock) { + return FilterFirstPass(log_id, pid, sequence, realtime); + }); } bool flush_success = log_buffer_->FlushTo( writer_.get(), *flush_to_state_, - [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) { - return FilterSecondPass(log_id, pid, sequence, realtime); - }); + [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES( + logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); }); // We only ignore entries before the original start time for the first flushTo(), if we // get entries after this first flush before the original start time, then the client @@ -89,8 +87,6 @@ void LogReaderThread::ThreadFunction() { start_time_.tv_sec = 0; start_time_.tv_nsec = 0; - lock.lock(); - if (!flush_success) { break; } @@ -99,17 +95,13 @@ void LogReaderThread::ThreadFunction() { break; } - cleanSkip_Locked(); + CleanSkip(); if (deadline_.time_since_epoch().count() == 0) { thread_triggered_condition_.wait(lock); } } - lock.unlock(); - log_buffer_->DeleteFlushToState(std::move(flush_to_state_)); - lock.lock(); - writer_->Release(); auto& log_reader_threads = reader_list_->reader_threads(); @@ -123,8 +115,6 @@ void LogReaderThread::ThreadFunction() { // A first pass to count the number of elements FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) { - auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; - if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) { ++count_; } @@ -135,8 +125,6 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log // A second pass to send the selected elements FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t, log_time realtime) { - auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; - if (skip_ahead_[log_id]) { skip_ahead_[log_id]--; return FilterResult::kSkip; @@ -179,7 +167,3 @@ ok: } return FilterResult::kSkip; } - -void LogReaderThread::cleanSkip_Locked(void) { - memset(skip_ahead_, 0, sizeof(skip_ahead_)); -} diff --git a/logd/LogReaderThread.h b/logd/LogReaderThread.h index f1b673fa1..96137b30a 100644 --- a/logd/LogReaderThread.h +++ b/logd/LogReaderThread.h @@ -26,10 +26,12 @@ #include #include +#include #include #include "LogBuffer.h" #include "LogWriter.h" +#include "LogdLock.h" class LogReaderList; @@ -39,50 +41,54 @@ class LogReaderThread { std::unique_ptr writer, bool non_block, unsigned long tail, LogMask log_mask, pid_t pid, log_time start_time, uint64_t sequence, std::chrono::steady_clock::time_point deadline); - void triggerReader_Locked() { thread_triggered_condition_.notify_all(); } + void TriggerReader() REQUIRES(logd_lock) { thread_triggered_condition_.notify_all(); } - void triggerSkip_Locked(log_id_t id, unsigned int skip) { skip_ahead_[id] = skip; } - void cleanSkip_Locked(); + void TriggerSkip(log_id_t id, unsigned int skip) REQUIRES(logd_lock) { skip_ahead_[id] = skip; } + void CleanSkip() REQUIRES(logd_lock) { memset(skip_ahead_, 0, sizeof(skip_ahead_)); } - void release_Locked() { + void Release() REQUIRES(logd_lock) { // gracefully shut down the socket. writer_->Shutdown(); release_ = true; thread_triggered_condition_.notify_all(); } - bool IsWatching(log_id_t id) const { return flush_to_state_->log_mask() & (1 << id); } - bool IsWatchingMultiple(LogMask log_mask) const { - return flush_to_state_ && flush_to_state_->log_mask() & log_mask; + bool IsWatching(log_id_t id) const REQUIRES(logd_lock) { + return flush_to_state_->log_mask() & (1 << id); + } + bool IsWatchingMultiple(LogMask log_mask) const REQUIRES(logd_lock) { + return flush_to_state_->log_mask() & log_mask; } - std::string name() const { return writer_->name(); } - uint64_t start() const { return flush_to_state_->start(); } - std::chrono::steady_clock::time_point deadline() const { return deadline_; } - FlushToState& flush_to_state() { return *flush_to_state_; } + std::string name() const REQUIRES(logd_lock) { return writer_->name(); } + uint64_t start() const REQUIRES(logd_lock) { return flush_to_state_->start(); } + std::chrono::steady_clock::time_point deadline() const REQUIRES(logd_lock) { return deadline_; } + FlushToState& flush_to_state() REQUIRES(logd_lock) { return *flush_to_state_; } private: void ThreadFunction(); // flushTo filter callbacks - FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime); - FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime); + FilterResult FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) + REQUIRES(logd_lock); + FilterResult FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) + REQUIRES(logd_lock); std::condition_variable thread_triggered_condition_; LogBuffer* log_buffer_; LogReaderList* reader_list_; - std::unique_ptr writer_; + std::unique_ptr writer_ GUARDED_BY(logd_lock); // Set to true to cause the thread to end and the LogReaderThread to delete itself. - bool release_ = false; + bool release_ GUARDED_BY(logd_lock) = false; // If set to non-zero, only pids equal to this are read by the reader. const pid_t pid_; // When a reader is referencing (via start_) old elements in the log buffer, and the log // buffer's size grows past its memory limit, the log buffer may request the reader to skip // ahead a specified number of logs. - unsigned int skip_ahead_[LOG_ID_MAX]; + unsigned int skip_ahead_[LOG_ID_MAX] GUARDED_BY(logd_lock); // LogBuffer::FlushTo() needs to store state across subsequent calls. - std::unique_ptr flush_to_state_; + std::unique_ptr flush_to_state_ GUARDED_BY(logd_lock); // These next three variables are used for reading only the most recent lines aka `adb logcat // -t` / `adb logcat -T`. @@ -100,7 +106,7 @@ class LogReaderThread { log_time start_time_; // CLOCK_MONOTONIC based deadline used for log wrapping. If this deadline expires before logs // wrap, then wake up and send the logs to the reader anyway. - std::chrono::steady_clock::time_point deadline_; + std::chrono::steady_clock::time_point deadline_ GUARDED_BY(logd_lock); // If this reader is 'dumpAndClose' and will disconnect once it has read its intended logs. const bool non_block_; }; diff --git a/logd/LogdLock.cpp b/logd/LogdLock.cpp new file mode 100644 index 000000000..16a37a5e9 --- /dev/null +++ b/logd/LogdLock.cpp @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LogdLock.h" + +std::mutex logd_lock; diff --git a/logd/LogdLock.h b/logd/LogdLock.h new file mode 100644 index 000000000..6f637c8af --- /dev/null +++ b/logd/LogdLock.h @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +extern std::mutex logd_lock; diff --git a/logd/ReplayMessages.cpp b/logd/ReplayMessages.cpp index 56509ecad..bad8c5664 100644 --- a/logd/ReplayMessages.cpp +++ b/logd/ReplayMessages.cpp @@ -321,6 +321,7 @@ class PrintLogs : public SingleBufferOperation { } void End() override { + auto lock = std::lock_guard{logd_lock}; std::unique_ptr test_writer(new StdoutWriter()); std::unique_ptr flush_to_state = log_buffer_->CreateFlushToState(1, mask_); log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr); @@ -372,7 +373,7 @@ class PrintAllLogs : public SingleBufferOperation { PrintAllLogs(log_time first_log_timestamp, const char* buffer, const char* buffers) : SingleBufferOperation(first_log_timestamp, buffer) { LogMask mask = BuffersToLogMask(buffers); - auto lock = std::unique_lock{reader_list_.reader_threads_lock()}; + auto lock = std::unique_lock{logd_lock}; std::unique_ptr stdout_writer(new StdoutWriter()); std::unique_ptr log_reader( new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(stdout_writer), diff --git a/logd/SerializedFlushToState.cpp b/logd/SerializedFlushToState.cpp index fdf1dd3d0..30276ff5f 100644 --- a/logd/SerializedFlushToState.cpp +++ b/logd/SerializedFlushToState.cpp @@ -20,8 +20,9 @@ #include -SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask) - : FlushToState(start, log_mask) { +SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask, + std::list* logs) + : FlushToState(start, log_mask), logs_(logs) { log_id_for_each(i) { if (((1 << i) & log_mask) == 0) { continue; diff --git a/logd/SerializedFlushToState.h b/logd/SerializedFlushToState.h index c953a16df..1a69f7be1 100644 --- a/logd/SerializedFlushToState.h +++ b/logd/SerializedFlushToState.h @@ -21,6 +21,7 @@ #include #include "LogBuffer.h" +#include "LogdLock.h" #include "SerializedLogChunk.h" #include "SerializedLogEntry.h" @@ -44,48 +45,45 @@ class SerializedFlushToState : public FlushToState { public: // Initializes this state object. For each log buffer set in log_mask, this sets // logs_needed_from_next_position_. - SerializedFlushToState(uint64_t start, LogMask log_mask); + SerializedFlushToState(uint64_t start, LogMask log_mask, std::list* logs) + REQUIRES(logd_lock); // Decrease the reference of all referenced logs. This happens when a reader is disconnected. ~SerializedFlushToState() override; - // We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here. - void InitializeLogs(std::list* logs) { - if (logs_ == nullptr) logs_ = logs; - } - // Updates the state of log_positions_ and logs_needed_from_next_position_ then returns true if // there are any unread logs, false otherwise. - bool HasUnreadLogs(); + bool HasUnreadLogs() REQUIRES(logd_lock); // Returns the next unread log and sets logs_needed_from_next_position_ to indicate that we're // waiting for more logs from the associated log buffer. - LogWithId PopNextUnreadLog(); + LogWithId PopNextUnreadLog() REQUIRES(logd_lock); // If the parent log buffer prunes logs, the reference that this class contains may become // invalid, so this must be called first to drop the reference to buffer_it, if any. - void Prune(log_id_t log_id, const std::list::iterator& buffer_it); + void Prune(log_id_t log_id, const std::list::iterator& buffer_it) + REQUIRES(logd_lock); private: // Set logs_needed_from_next_position_[i] to indicate if log_positions_[i] points to an unread // log or to the point at which the next log will appear. - void UpdateLogsNeeded(log_id_t log_id); + void UpdateLogsNeeded(log_id_t log_id) REQUIRES(logd_lock); // Create a LogPosition object for the given log_id by searching through the log chunks for the // first chunk and then first log entry within that chunk that is greater or equal to start(). - void CreateLogPosition(log_id_t log_id); + void CreateLogPosition(log_id_t log_id) REQUIRES(logd_lock); // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and // calls UpdateLogsNeeded() if so. - void CheckForNewLogs(); + void CheckForNewLogs() REQUIRES(logd_lock); - std::list* logs_ = nullptr; + std::list* logs_ GUARDED_BY(logd_lock) = nullptr; // An optional structure that contains an iterator to the serialized log buffer and offset into // it that this logger should handle next. - std::optional log_positions_[LOG_ID_MAX]; + std::optional log_positions_[LOG_ID_MAX] GUARDED_BY(logd_lock); // A bit for each log that is set if a given log_id has no logs or if this client has read all // of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) && // next_log_position == logs_write_position_)`. These will be re-checked in each // loop in case new logs came in. - std::bitset logs_needed_from_next_position_ = {}; + std::bitset logs_needed_from_next_position_ GUARDED_BY(logd_lock) = {}; }; diff --git a/logd/SerializedFlushToStateTest.cpp b/logd/SerializedFlushToStateTest.cpp index 88f4052a7..f41de377d 100644 --- a/logd/SerializedFlushToStateTest.cpp +++ b/logd/SerializedFlushToStateTest.cpp @@ -36,8 +36,8 @@ class SerializedFlushToStateTest : public testing::Test { } void TearDown() override { android::base::SetMinimumLogSeverity(old_log_severity_); } - std::string TestReport(const std::vector& expected, - const std::vector& read) { + std::string TestReport(const std::vector& expected, const std::vector& read) + REQUIRES(logd_lock) { auto sequence_to_log_id = [&](uint64_t sequence) -> int { for (const auto& [log_id, sequences] : sequence_numbers_per_buffer_) { if (std::find(sequences.begin(), sequences.end(), sequence) != sequences.end()) { @@ -82,13 +82,12 @@ class SerializedFlushToStateTest : public testing::Test { // Read sequence numbers in order from SerializedFlushToState for every mask combination and all // sequence numbers from 0 through the highest logged sequence number + 1. // This assumes that all of the logs have already been written. - void TestAllReading() { + void TestAllReading() REQUIRES(logd_lock) { uint64_t max_sequence = sequence_ + 1; uint32_t max_mask = (1 << LOG_ID_MAX) - 1; for (uint64_t sequence = 0; sequence < max_sequence; ++sequence) { for (uint32_t mask = 0; mask < max_mask; ++mask) { - auto state = SerializedFlushToState{sequence, mask}; - state.InitializeLogs(log_chunks_); + auto state = SerializedFlushToState{sequence, mask, log_chunks_}; TestReading(sequence, mask, state); } } @@ -98,14 +97,14 @@ class SerializedFlushToStateTest : public testing::Test { // it calls write_logs() in a loop for sequence/mask combination. It clears log_chunks_ and // sequence_numbers_per_buffer_ between calls, such that only the sequence numbers written in // the previous call to write_logs() are expected. - void TestAllReadingWithFutureMessages(const std::function& write_logs) { + void TestAllReadingWithFutureMessages(const std::function& write_logs) + REQUIRES(logd_lock) { uint64_t max_sequence = sequence_ + 1; uint32_t max_mask = (1 << LOG_ID_MAX) - 1; for (uint64_t sequence = 1; sequence < max_sequence; ++sequence) { for (uint32_t mask = 1; mask < max_mask; ++mask) { log_id_for_each(i) { log_chunks_[i].clear(); } - auto state = SerializedFlushToState{sequence, mask}; - state.InitializeLogs(log_chunks_); + auto state = SerializedFlushToState{sequence, mask, log_chunks_}; int loop_count = 0; while (write_logs(loop_count++)) { TestReading(sequence, mask, state); @@ -115,7 +114,8 @@ class SerializedFlushToStateTest : public testing::Test { } } - void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state) { + void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state) + REQUIRES(logd_lock) { std::vector expected_sequence; log_id_for_each(i) { if (((1 << i) & log_mask) == 0) { @@ -148,7 +148,7 @@ class SerializedFlushToStateTest : public testing::Test { // Add a chunk with the given messages to the a given log buffer. Keep track of the sequence // numbers for future validation. Optionally mark the block as having finished writing. void AddChunkWithMessages(bool finish_writing, int buffer, - const std::vector& messages) { + const std::vector& messages) REQUIRES(logd_lock) { auto chunk = SerializedLogChunk{kChunkSize}; for (const auto& message : messages) { auto sequence = sequence_++; @@ -175,6 +175,7 @@ class SerializedFlushToStateTest : public testing::Test { // 4: 1 chunk with 0 logs and finished writing (impossible, but SerializedFlushToState handles it) // 5-7: 0 chunks TEST_F(SerializedFlushToStateTest, smoke) { + auto lock = std::lock_guard{logd_lock}; AddChunkWithMessages(true, 0, {"1st", "2nd"}); AddChunkWithMessages(true, 1, {"3rd"}); AddChunkWithMessages(false, 0, {"4th"}); @@ -188,6 +189,7 @@ TEST_F(SerializedFlushToStateTest, smoke) { } TEST_F(SerializedFlushToStateTest, random) { + auto lock = std::lock_guard{logd_lock}; srand(1); for (int count = 0; count < 20; ++count) { unsigned int num_messages = 1 + rand() % 15; @@ -204,7 +206,8 @@ TEST_F(SerializedFlushToStateTest, random) { // Same start as smoke, but we selectively write logs to the buffers and ensure they're read. TEST_F(SerializedFlushToStateTest, future_writes) { - auto write_logs = [&](int loop_count) { + auto lock = std::lock_guard{logd_lock}; + auto write_logs = [&](int loop_count) REQUIRES(logd_lock) { switch (loop_count) { case 0: // Initial writes. @@ -252,11 +255,11 @@ TEST_F(SerializedFlushToStateTest, future_writes) { } TEST_F(SerializedFlushToStateTest, no_dangling_references) { + auto lock = std::lock_guard{logd_lock}; AddChunkWithMessages(true, 0, {"1st", "2nd"}); AddChunkWithMessages(true, 0, {"3rd", "4th"}); - auto state = SerializedFlushToState{1, kLogMaskAll}; - state.InitializeLogs(log_chunks_); + auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks_}; ASSERT_EQ(log_chunks_[0].size(), 2U); auto first_chunk = log_chunks_[0].begin(); @@ -290,6 +293,7 @@ TEST_F(SerializedFlushToStateTest, no_dangling_references) { } TEST(SerializedFlushToState, Prune) { + auto lock = std::lock_guard{logd_lock}; auto chunk = SerializedLogChunk{kChunkSize}; chunk.Log(1, log_time(), 0, 1, 1, "abc", 3); chunk.Log(2, log_time(), 0, 1, 1, "abc", 3); @@ -299,8 +303,7 @@ TEST(SerializedFlushToState, Prune) { std::list log_chunks[LOG_ID_MAX]; log_chunks[LOG_ID_MAIN].emplace_back(std::move(chunk)); - auto state = SerializedFlushToState{1, kLogMaskAll}; - state.InitializeLogs(log_chunks); + auto state = SerializedFlushToState{1, kLogMaskAll, log_chunks}; ASSERT_TRUE(state.HasUnreadLogs()); state.Prune(LOG_ID_MAIN, log_chunks[LOG_ID_MAIN].begin()); diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp index 6d1576f7a..aa808642d 100644 --- a/logd/SerializedLogBuffer.cpp +++ b/logd/SerializedLogBuffer.cpp @@ -41,9 +41,9 @@ void SerializedLogBuffer::Init() { } // Release any sleeping reader threads to dump their current content. - auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; for (const auto& reader_thread : reader_list_->reader_threads()) { - reader_thread->triggerReader_Locked(); + reader_thread->TriggerReader(); } } @@ -86,7 +86,7 @@ int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_ auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed); - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; if (logs_[log_id].empty()) { logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4)); @@ -140,8 +140,6 @@ void SerializedLogBuffer::NotifyReadersOfPrune( } void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) { - auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; - auto& log_buffer = logs_[log_id]; auto it = log_buffer.begin(); while (it != log_buffer.end()) { @@ -158,7 +156,7 @@ void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid // fast enough to not back-up logd. Instead, we can achieve an nearly-as-efficient // but not error-prune batching effect by waking the reader whenever any chunk is // about to be pruned. - reader_thread->triggerReader_Locked(); + reader_thread->TriggerReader(); } // Some readers may be still reading from this log chunk, log a warning that they are @@ -198,22 +196,14 @@ void SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid std::unique_ptr SerializedLogBuffer::CreateFlushToState(uint64_t start, LogMask log_mask) { - return std::make_unique(start, log_mask); -} - -void SerializedLogBuffer::DeleteFlushToState(std::unique_ptr state) { - auto lock = std::unique_lock{lock_}; - state.reset(); + return std::make_unique(start, log_mask, logs_); } bool SerializedLogBuffer::FlushTo( LogWriter* writer, FlushToState& abstract_state, const std::function& filter) { - auto lock = std::unique_lock{lock_}; - auto& state = reinterpret_cast(abstract_state); - state.InitializeLogs(logs_); while (state.HasUnreadLogs()) { LogWithId top = state.PopNextUnreadLog(); @@ -245,13 +235,14 @@ bool SerializedLogBuffer::FlushTo( unsigned char entry_copy[kMaxEntrySize] __attribute__((uninitialized)); CHECK_LT(entry->msg_len(), LOGGER_ENTRY_MAX_PAYLOAD + 1); memcpy(entry_copy, entry, sizeof(*entry) + entry->msg_len()); - lock.unlock(); + logd_lock.unlock(); if (!reinterpret_cast(entry_copy)->Flush(writer, log_id)) { + logd_lock.lock(); return false; } - lock.lock(); + logd_lock.lock(); } state.set_start(state.start() + 1); @@ -259,7 +250,7 @@ bool SerializedLogBuffer::FlushTo( } bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) { - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; Prune(id, ULONG_MAX, uid); // Clearing SerializedLogBuffer never waits for readers and therefore is always successful. @@ -275,7 +266,7 @@ size_t SerializedLogBuffer::GetSizeUsed(log_id_t id) { } size_t SerializedLogBuffer::GetSize(log_id_t id) { - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; return max_size_[id]; } @@ -288,7 +279,7 @@ bool SerializedLogBuffer::SetSize(log_id_t id, size_t size) { return false; } - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; max_size_[id] = size; MaybePrune(id); diff --git a/logd/SerializedLogBuffer.h b/logd/SerializedLogBuffer.h index 916452605..239a81ef2 100644 --- a/logd/SerializedLogBuffer.h +++ b/logd/SerializedLogBuffer.h @@ -30,9 +30,9 @@ #include "LogReaderList.h" #include "LogStatistics.h" #include "LogTags.h" +#include "LogdLock.h" #include "SerializedLogChunk.h" #include "SerializedLogEntry.h" -#include "rwlock.h" class SerializedLogBuffer final : public LogBuffer { public: @@ -41,11 +41,12 @@ class SerializedLogBuffer final : public LogBuffer { int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, uint16_t len) override; - std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) override; - void DeleteFlushToState(std::unique_ptr state) override; + std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) + REQUIRES(logd_lock) override; bool FlushTo(LogWriter* writer, FlushToState& state, const std::function& filter) override; + log_time realtime)>& filter) + REQUIRES(logd_lock) override; bool Clear(log_id_t id, uid_t uid) override; size_t GetSize(log_id_t id) override; @@ -55,20 +56,19 @@ class SerializedLogBuffer final : public LogBuffer { private: bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len); - void MaybePrune(log_id_t log_id) REQUIRES(lock_); - void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_); + void MaybePrune(log_id_t log_id) REQUIRES(logd_lock); + void Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(logd_lock); void NotifyReadersOfPrune(log_id_t log_id, const std::list::iterator& chunk) - REQUIRES(reader_list_->reader_threads_lock()); + REQUIRES(logd_lock); void RemoveChunkFromStats(log_id_t log_id, SerializedLogChunk& chunk); - size_t GetSizeUsed(log_id_t id) REQUIRES(lock_); + size_t GetSizeUsed(log_id_t id) REQUIRES(logd_lock); LogReaderList* reader_list_; LogTags* tags_; LogStatistics* stats_; - size_t max_size_[LOG_ID_MAX] GUARDED_BY(lock_) = {}; - std::list logs_[LOG_ID_MAX] GUARDED_BY(lock_); - RwLock lock_; + size_t max_size_[LOG_ID_MAX] GUARDED_BY(logd_lock) = {}; + std::list logs_[LOG_ID_MAX] GUARDED_BY(logd_lock); std::atomic sequence_ = 1; }; diff --git a/logd/SimpleLogBuffer.cpp b/logd/SimpleLogBuffer.cpp index b00dd25be..55b31f889 100644 --- a/logd/SimpleLogBuffer.cpp +++ b/logd/SimpleLogBuffer.cpp @@ -36,9 +36,9 @@ void SimpleLogBuffer::Init() { } // Release any sleeping reader threads to dump their current content. - auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; for (const auto& reader_thread : reader_list_->reader_threads()) { - reader_thread->triggerReader_Locked(); + reader_thread->TriggerReader(); } } @@ -95,7 +95,7 @@ int SimpleLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pi // exact entry with time specified in ms or us precision. if ((realtime.tv_nsec % 1000) == 0) ++realtime.tv_nsec; - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed); LogInternal(LogBufferElement(log_id, realtime, uid, pid, tid, sequence, msg, len)); return len; @@ -136,8 +136,6 @@ bool SimpleLogBuffer::FlushTo( LogWriter* writer, FlushToState& abstract_state, const std::function& filter) { - auto shared_lock = SharedLock{lock_}; - auto& state = reinterpret_cast(abstract_state); std::list::iterator it; @@ -200,13 +198,14 @@ bool SimpleLogBuffer::FlushTo( state.last_tid()[element.log_id()] = (element.dropped_count() && !same_tid) ? 0 : element.tid(); - shared_lock.unlock(); + logd_lock.unlock(); // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the // `element` pointer is safe here without the lock if (!element.FlushTo(writer, stats_, same_tid)) { + logd_lock.lock(); return false; } - shared_lock.lock_shared(); + logd_lock.lock(); } state.set_start(state.start() + 1); @@ -217,7 +216,7 @@ bool SimpleLogBuffer::Clear(log_id_t id, uid_t uid) { // Try three times to clear, then disconnect the readers and try one final time. for (int retry = 0; retry < 3; ++retry) { { - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; if (Prune(id, ULONG_MAX, uid)) { return true; } @@ -229,27 +228,27 @@ bool SimpleLogBuffer::Clear(log_id_t id, uid_t uid) { // _blocked_ reader. bool busy = false; { - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; busy = !Prune(id, 1, uid); } // It is still busy, disconnect all readers. if (busy) { - auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; + auto lock = std::lock_guard{logd_lock}; for (const auto& reader_thread : reader_list_->reader_threads()) { if (reader_thread->IsWatching(id)) { LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name() << ", from LogBuffer::clear()"; - reader_thread->release_Locked(); + reader_thread->Release(); } } } - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; return Prune(id, ULONG_MAX, uid); } // get the total space allocated to "id" size_t SimpleLogBuffer::GetSize(log_id_t id) { - auto lock = SharedLock{lock_}; + auto lock = std::lock_guard{logd_lock}; size_t retval = max_size_[id]; return retval; } @@ -261,7 +260,7 @@ bool SimpleLogBuffer::SetSize(log_id_t id, size_t size) { return false; } - auto lock = std::lock_guard{lock_}; + auto lock = std::lock_guard{logd_lock}; max_size_[id] = size; return true; } @@ -274,8 +273,6 @@ void SimpleLogBuffer::MaybePrune(log_id_t id) { } bool SimpleLogBuffer::Prune(log_id_t id, unsigned long prune_rows, uid_t caller_uid) { - auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()}; - // Don't prune logs that are newer than the point at which any reader threads are reading from. LogReaderThread* oldest = nullptr; for (const auto& reader_thread : reader_list_->reader_threads()) { @@ -347,14 +344,14 @@ void SimpleLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, unsigned // dropped if we hit too much memory pressure. LOG(WARNING) << "Kicking blocked reader, " << reader->name() << ", from LogBuffer::kickMe()"; - reader->release_Locked(); + reader->Release(); } else if (reader->deadline().time_since_epoch().count() != 0) { // Allow a blocked WRAP deadline reader to trigger and start reporting the log data. - reader->triggerReader_Locked(); + reader->TriggerReader(); } else { // tell slow reader to skip entries to catch up LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name() << ", from LogBuffer::kickMe()"; - reader->triggerSkip_Locked(id, prune_rows); + reader->TriggerSkip(id, prune_rows); } } diff --git a/logd/SimpleLogBuffer.h b/logd/SimpleLogBuffer.h index 8e5b50eb5..51779abab 100644 --- a/logd/SimpleLogBuffer.h +++ b/logd/SimpleLogBuffer.h @@ -25,7 +25,7 @@ #include "LogReaderList.h" #include "LogStatistics.h" #include "LogTags.h" -#include "rwlock.h" +#include "LogdLock.h" class SimpleLogBuffer : public LogBuffer { public: @@ -35,10 +35,12 @@ class SimpleLogBuffer : public LogBuffer { int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, uint16_t len) override; - std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) override; + std::unique_ptr CreateFlushToState(uint64_t start, LogMask log_mask) + REQUIRES(logd_lock) override; bool FlushTo(LogWriter* writer, FlushToState& state, const std::function& filter) override; + log_time realtime)>& filter) + REQUIRES(logd_lock) override; bool Clear(log_id_t id, uid_t uid) override; size_t GetSize(log_id_t id) override; @@ -47,27 +49,25 @@ class SimpleLogBuffer : public LogBuffer { uint64_t sequence() const override { return sequence_.load(std::memory_order_relaxed); } protected: - virtual bool Prune(log_id_t id, unsigned long prune_rows, uid_t uid) REQUIRES(lock_); - virtual void LogInternal(LogBufferElement&& elem) REQUIRES(lock_); + virtual bool Prune(log_id_t id, unsigned long prune_rows, uid_t uid) REQUIRES(logd_lock); + virtual void LogInternal(LogBufferElement&& elem) REQUIRES(logd_lock); // Returns an iterator to the oldest element for a given log type, or logs_.end() if - // there are no logs for the given log type. Requires logs_lock_ to be held. - std::list::iterator GetOldest(log_id_t log_id) REQUIRES(lock_); + // there are no logs for the given log type. Requires logs_logd_lock to be held. + std::list::iterator GetOldest(log_id_t log_id) REQUIRES(logd_lock); std::list::iterator Erase(std::list::iterator it) - REQUIRES(lock_); + REQUIRES(logd_lock); void KickReader(LogReaderThread* reader, log_id_t id, unsigned long prune_rows) - REQUIRES_SHARED(lock_); + REQUIRES(logd_lock); LogStatistics* stats() { return stats_; } LogReaderList* reader_list() { return reader_list_; } - size_t max_size(log_id_t id) REQUIRES_SHARED(lock_) { return max_size_[id]; } + size_t max_size(log_id_t id) REQUIRES_SHARED(logd_lock) { return max_size_[id]; } std::list& logs() { return logs_; } - RwLock lock_; - private: bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len); - void MaybePrune(log_id_t id) REQUIRES(lock_); + void MaybePrune(log_id_t id) REQUIRES(logd_lock); LogReaderList* reader_list_; LogTags* tags_; @@ -75,9 +75,9 @@ class SimpleLogBuffer : public LogBuffer { std::atomic sequence_ = 1; - size_t max_size_[LOG_ID_MAX] GUARDED_BY(lock_); - std::list logs_ GUARDED_BY(lock_); + size_t max_size_[LOG_ID_MAX] GUARDED_BY(logd_lock); + std::list logs_ GUARDED_BY(logd_lock); // Keeps track of the iterator to the oldest log message of a given log type, as an // optimization when pruning logs. Use GetOldest() to retrieve. - std::optional::iterator> oldest_[LOG_ID_MAX] GUARDED_BY(lock_); + std::optional::iterator> oldest_[LOG_ID_MAX] GUARDED_BY(logd_lock); }; diff --git a/logd/fuzz/log_buffer_log_fuzzer.cpp b/logd/fuzz/log_buffer_log_fuzzer.cpp index d71a2f91f..2fe940769 100644 --- a/logd/fuzz/log_buffer_log_fuzzer.cpp +++ b/logd/fuzz/log_buffer_log_fuzzer.cpp @@ -126,7 +126,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { // Read out all of the logs. { - auto lock = std::unique_lock{reader_list.reader_threads_lock()}; + auto lock = std::unique_lock{logd_lock}; std::unique_ptr test_writer(new NoopWriter()); std::unique_ptr log_reader( new LogReaderThread(log_buffer.get(), &reader_list, std::move(test_writer), true, 0, @@ -137,7 +137,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { // Wait until the reader has finished. while (true) { usleep(50); - auto lock = std::unique_lock{reader_list.reader_threads_lock()}; + auto lock = std::unique_lock{logd_lock}; if (reader_list.reader_threads().size() == 0) { break; } diff --git a/logd/rwlock.h b/logd/rwlock.h deleted file mode 100644 index c37721ee3..000000000 --- a/logd/rwlock.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2020 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include - -// As of the end of May 2020, std::shared_mutex is *not* simply a pthread_rwlock, but rather a -// combination of std::mutex and std::condition variable, which is obviously less efficient. This -// immitates what std::shared_mutex should be doing and is compatible with RAII thread wrappers. - -class SHARED_CAPABILITY("mutex") RwLock { - public: - RwLock() {} - ~RwLock() {} - - void lock() ACQUIRE() { pthread_rwlock_wrlock(&rwlock_); } - void lock_shared() ACQUIRE_SHARED() { pthread_rwlock_rdlock(&rwlock_); } - - void unlock() RELEASE() { pthread_rwlock_unlock(&rwlock_); } - - private: - pthread_rwlock_t rwlock_ = PTHREAD_RWLOCK_INITIALIZER; -}; - -// std::shared_lock does not have thread annotations, so we need our own. - -class SCOPED_CAPABILITY SharedLock { - public: - explicit SharedLock(RwLock& lock) ACQUIRE_SHARED(lock) : lock_(lock) { lock_.lock_shared(); } - ~SharedLock() RELEASE() { lock_.unlock(); } - - void lock_shared() ACQUIRE_SHARED() { lock_.lock_shared(); } - void unlock() RELEASE() { lock_.unlock(); } - - DISALLOW_IMPLICIT_CONSTRUCTORS(SharedLock); - - private: - RwLock& lock_; -};