Merge changes Id47b288d,I969565eb,Iba2e654e am: 365fdb7acb

Original change: undetermined

Change-Id: I5478564414c48ce0ab0c84ecd4a1e31eb3ec2eec
This commit is contained in:
Tom Cherry 2020-06-02 15:51:51 +00:00 committed by Automerger Merge Worker
commit 76dac24797
12 changed files with 160 additions and 111 deletions

View file

@ -61,7 +61,8 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) {
std::vector<LogMessage> read_log_messages; std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr)); std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
std::vector<LogMessage> expected_log_messages = { std::vector<LogMessage> expected_log_messages = {
make_message(0, "test_tag", "duplicate"), make_message(0, "test_tag", "duplicate"),
@ -72,12 +73,12 @@ TEST_P(ChattyLogBufferTest, deduplication_simple) {
make_message(5, "test_tag", "not_same"), make_message(5, "test_tag", "not_same"),
// 3 duplicate logs together print the first, a 1 count chatty message, then the last. // 3 duplicate logs together print the first, a 1 count chatty message, then the last.
make_message(6, "test_tag", "duplicate"), make_message(6, "test_tag", "duplicate"),
make_message(7, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ expire 1 line", true), make_message(7, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ identical 1 line", true),
make_message(8, "test_tag", "duplicate"), make_message(8, "test_tag", "duplicate"),
make_message(9, "test_tag", "not_same"), make_message(9, "test_tag", "not_same"),
// 6 duplicate logs together print the first, a 4 count chatty message, then the last. // 6 duplicate logs together print the first, a 4 count chatty message, then the last.
make_message(10, "test_tag", "duplicate"), make_message(10, "test_tag", "duplicate"),
make_message(14, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ expire 4 lines", true), make_message(14, "chatty", "uid=0\\([^\\)]+\\) [^ ]+ identical 4 lines", true),
make_message(15, "test_tag", "duplicate"), make_message(15, "test_tag", "duplicate"),
make_message(16, "test_tag", "not_same"), make_message(16, "test_tag", "not_same"),
// duplicate logs > 1 minute apart are not deduplicated. // duplicate logs > 1 minute apart are not deduplicated.
@ -117,15 +118,16 @@ TEST_P(ChattyLogBufferTest, deduplication_overflow) {
std::vector<LogMessage> read_log_messages; std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr)); std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
std::vector<LogMessage> expected_log_messages = { std::vector<LogMessage> expected_log_messages = {
make_message(0, "test_tag", "normal"), make_message(0, "test_tag", "normal"),
make_message(1, "test_tag", "duplicate"), make_message(1, "test_tag", "duplicate"),
make_message(expired_per_chatty_message + 1, "chatty", make_message(expired_per_chatty_message + 1, "chatty",
"uid=0\\([^\\)]+\\) [^ ]+ expire 65535 lines", true), "uid=0\\([^\\)]+\\) [^ ]+ identical 65535 lines", true),
make_message(expired_per_chatty_message + 2, "chatty", make_message(expired_per_chatty_message + 2, "chatty",
"uid=0\\([^\\)]+\\) [^ ]+ expire 1 line", true), "uid=0\\([^\\)]+\\) [^ ]+ identical 1 line", true),
make_message(expired_per_chatty_message + 3, "test_tag", "duplicate"), make_message(expired_per_chatty_message + 3, "test_tag", "duplicate"),
make_message(expired_per_chatty_message + 4, "test_tag", "normal"), make_message(expired_per_chatty_message + 4, "test_tag", "normal"),
}; };
@ -172,7 +174,8 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) {
std::vector<LogMessage> read_log_messages; std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr)); std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
std::vector<LogMessage> expected_log_messages = { std::vector<LogMessage> expected_log_messages = {
make_message(0, 1234, 1), make_message(0, 1234, 1),
@ -199,4 +202,4 @@ TEST_P(ChattyLogBufferTest, deduplication_liblog) {
CompareLogMessages(expected_log_messages, read_log_messages); CompareLogMessages(expected_log_messages, read_log_messages);
}; };
INSTANTIATE_TEST_CASE_P(ChattyLogBufferTests, ChattyLogBufferTest, testing::Values("chatty")); INSTANTIATE_TEST_CASE_P(ChattyLogBufferTests, ChattyLogBufferTest, testing::Values("chatty"));

View file

@ -25,6 +25,27 @@
#include "LogWriter.h" #include "LogWriter.h"
// A mask to represent which log buffers a reader is watching, values are (1 << LOG_ID_MAIN), etc.
using LogMask = uint32_t;
constexpr uint32_t kLogMaskAll = 0xFFFFFFFF;
// State that a LogBuffer may want to persist across calls to FlushTo().
class FlushToState {
public:
FlushToState(uint64_t start, LogMask log_mask) : start_(start), log_mask_(log_mask) {}
virtual ~FlushToState() {}
uint64_t start() const { return start_; }
void set_start(uint64_t start) { start_ = start; }
LogMask log_mask() const { return log_mask_; }
private:
uint64_t start_;
LogMask log_mask_;
};
// Enum for the return values of the `filter` function passed to FlushTo().
enum class FilterResult { enum class FilterResult {
kSkip, kSkip,
kStop, kStop,
@ -39,19 +60,16 @@ class LogBuffer {
virtual int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, virtual int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid,
const char* msg, uint16_t len) = 0; const char* msg, uint16_t len) = 0;
// lastTid is an optional context to help detect if the last previous
// valid message was from the same source so we can differentiate chatty virtual std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) = 0;
// filter types (identical or expired) virtual bool FlushTo(LogWriter* writer, FlushToState& state,
static const uint64_t FLUSH_ERROR = 0; const std::function<FilterResult(log_id_t log_id, pid_t pid,
virtual uint64_t FlushTo(LogWriter* writer, uint64_t start, uint64_t sequence, log_time realtime,
pid_t* last_tid, // nullable uint16_t dropped_count)>& filter) = 0;
const std::function<FilterResult(log_id_t log_id, pid_t pid,
uint64_t sequence, log_time realtime,
uint16_t dropped_count)>& filter) = 0;
virtual bool Clear(log_id_t id, uid_t uid) = 0; virtual bool Clear(log_id_t id, uid_t uid) = 0;
virtual unsigned long GetSize(log_id_t id) = 0; virtual unsigned long GetSize(log_id_t id) = 0;
virtual int SetSize(log_id_t id, unsigned long size) = 0; virtual int SetSize(log_id_t id, unsigned long size) = 0;
virtual uint64_t sequence() const = 0; virtual uint64_t sequence() const = 0;
}; };

View file

@ -208,8 +208,9 @@ TEST_P(LogBufferTest, smoke) {
std::vector<LogMessage> read_log_messages; std::vector<LogMessage> read_log_messages;
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr)); std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
uint64_t flush_result = log_buffer_->FlushTo(test_writer.get(), 1, nullptr, nullptr); std::unique_ptr<FlushToState> flush_to_state = log_buffer_->CreateFlushToState(1, kLogMaskAll);
EXPECT_EQ(1ULL, flush_result); EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
EXPECT_EQ(2ULL, flush_to_state->start());
CompareLogMessages(log_messages, read_log_messages); CompareLogMessages(log_messages, read_log_messages);
} }
@ -335,4 +336,39 @@ TEST_P(LogBufferTest, random_messages) {
CompareLogMessages(log_messages, read_log_messages); CompareLogMessages(log_messages, read_log_messages);
} }
TEST_P(LogBufferTest, read_last_sequence) {
std::vector<LogMessage> log_messages = {
{{.pid = 1, .tid = 2, .sec = 10000, .nsec = 20001, .lid = LOG_ID_MAIN, .uid = 0},
"first"},
{{.pid = 10, .tid = 2, .sec = 10000, .nsec = 20002, .lid = LOG_ID_MAIN, .uid = 0},
"second"},
{{.pid = 100, .tid = 2, .sec = 10000, .nsec = 20003, .lid = LOG_ID_MAIN, .uid = 0},
"third"},
};
FixupMessages(&log_messages);
LogMessages(log_messages);
std::vector<LogMessage> read_log_messages;
bool released = false;
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
std::unique_ptr<LogReaderThread> log_reader(
new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
0, ~0, 0, {}, 3, {}));
reader_list_.reader_threads().emplace_back(std::move(log_reader));
}
while (!released) {
usleep(5000);
}
{
auto lock = std::unique_lock{reader_list_.reader_threads_lock()};
EXPECT_EQ(0U, reader_list_.reader_threads().size());
}
std::vector<LogMessage> expected_log_messages = {log_messages.back()};
CompareLogMessages(expected_log_messages, read_log_messages);
}
INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest, testing::Values("chatty", "simple")); INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest, testing::Values("chatty", "simple"));

View file

@ -45,7 +45,7 @@ void FixupMessages(std::vector<LogMessage>* messages);
class TestWriter : public LogWriter { class TestWriter : public LogWriter {
public: public:
TestWriter(std::vector<LogMessage>* msgs, bool* released) TestWriter(std::vector<LogMessage>* msgs, bool* released)
: LogWriter(0, true, true), msgs_(msgs), released_(released) {} : LogWriter(0, true), msgs_(msgs), released_(released) {}
bool Write(const logger_entry& entry, const char* message) override { bool Write(const logger_entry& entry, const char* message) override {
msgs_->emplace_back(LogMessage{entry, std::string(message, entry.len), false}); msgs_->emplace_back(LogMessage{entry, std::string(message, entry.len), false});
return true; return true;

View file

@ -45,11 +45,8 @@ static std::string SocketClientToName(SocketClient* client) {
class SocketLogWriter : public LogWriter { class SocketLogWriter : public LogWriter {
public: public:
SocketLogWriter(LogReader* reader, SocketClient* client, bool privileged, SocketLogWriter(LogReader* reader, SocketClient* client, bool privileged)
bool can_read_security_logs) : LogWriter(client->getUid(), privileged), reader_(reader), client_(client) {}
: LogWriter(client->getUid(), privileged, can_read_security_logs),
reader_(reader),
client_(client) {}
bool Write(const logger_entry& entry, const char* msg) override { bool Write(const logger_entry& entry, const char* msg) override {
struct iovec iovec[2]; struct iovec iovec[2];
@ -162,25 +159,23 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
bool privileged = clientHasLogCredentials(cli); bool privileged = clientHasLogCredentials(cli);
bool can_read_security = CanReadSecurityLogs(cli); bool can_read_security = CanReadSecurityLogs(cli);
if (!can_read_security) {
logMask &= ~(1 << LOG_ID_SECURITY);
}
std::unique_ptr<LogWriter> socket_log_writer( std::unique_ptr<LogWriter> socket_log_writer(new SocketLogWriter(this, cli, privileged));
new SocketLogWriter(this, cli, privileged, can_read_security));
uint64_t sequence = 1; uint64_t sequence = 1;
// Convert realtime to sequence number // Convert realtime to sequence number
if (start != log_time::EPOCH) { if (start != log_time::EPOCH) {
bool start_time_set = false; bool start_time_set = false;
uint64_t last = sequence; uint64_t last = sequence;
auto log_find_start = [pid, logMask, start, &sequence, &start_time_set, &last]( auto log_find_start = [pid, start, &sequence, &start_time_set, &last](
log_id_t element_log_id, pid_t element_pid, log_id_t, pid_t element_pid, uint64_t element_sequence,
uint64_t element_sequence, log_time element_realtime, log_time element_realtime, uint16_t) -> FilterResult {
uint16_t) -> FilterResult {
if (pid && pid != element_pid) { if (pid && pid != element_pid) {
return FilterResult::kSkip; return FilterResult::kSkip;
} }
if ((logMask & (1 << element_log_id)) == 0) {
return FilterResult::kSkip;
}
if (start == element_realtime) { if (start == element_realtime) {
sequence = element_sequence; sequence = element_sequence;
start_time_set = true; start_time_set = true;
@ -195,8 +190,8 @@ bool LogReader::onDataAvailable(SocketClient* cli) {
} }
return FilterResult::kSkip; return FilterResult::kSkip;
}; };
auto flush_to_state = log_buffer_->CreateFlushToState(sequence, logMask);
log_buffer_->FlushTo(socket_log_writer.get(), sequence, nullptr, log_find_start); log_buffer_->FlushTo(socket_log_writer.get(), *flush_to_state, log_find_start);
if (!start_time_set) { if (!start_time_set) {
if (nonBlock) { if (nonBlock) {

View file

@ -18,7 +18,7 @@
// When we are notified a new log entry is available, inform // When we are notified a new log entry is available, inform
// listening sockets who are watching this entry's log id. // listening sockets who are watching this entry's log id.
void LogReaderList::NotifyNewLog(unsigned int log_mask) const { void LogReaderList::NotifyNewLog(LogMask log_mask) const {
auto lock = std::lock_guard{reader_threads_lock_}; auto lock = std::lock_guard{reader_threads_lock_};
for (const auto& entry : reader_threads_) { for (const auto& entry : reader_threads_) {

View file

@ -20,11 +20,12 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include "LogBuffer.h"
#include "LogReaderThread.h" #include "LogReaderThread.h"
class LogReaderList { class LogReaderList {
public: public:
void NotifyNewLog(unsigned int log_mask) const; void NotifyNewLog(LogMask log_mask) const;
std::list<std::unique_ptr<LogReaderThread>>& reader_threads() { return reader_threads_; } std::list<std::unique_ptr<LogReaderThread>>& reader_threads() { return reader_threads_; }
std::mutex& reader_threads_lock() { return reader_threads_lock_; } std::mutex& reader_threads_lock() { return reader_threads_lock_; }

View file

@ -29,24 +29,22 @@ using namespace std::placeholders;
LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list, LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
std::unique_ptr<LogWriter> writer, bool non_block, std::unique_ptr<LogWriter> writer, bool non_block,
unsigned long tail, unsigned int log_mask, pid_t pid, unsigned long tail, LogMask log_mask, pid_t pid,
log_time start_time, uint64_t start, log_time start_time, uint64_t start,
std::chrono::steady_clock::time_point deadline) std::chrono::steady_clock::time_point deadline)
: log_buffer_(log_buffer), : log_buffer_(log_buffer),
reader_list_(reader_list), reader_list_(reader_list),
writer_(std::move(writer)), writer_(std::move(writer)),
leading_dropped_(false), leading_dropped_(false),
log_mask_(log_mask),
pid_(pid), pid_(pid),
tail_(tail), tail_(tail),
count_(0), count_(0),
index_(0), index_(0),
start_time_(start_time), start_time_(start_time),
start_(start),
deadline_(deadline), deadline_(deadline),
non_block_(non_block) { non_block_(non_block) {
memset(last_tid_, 0, sizeof(last_tid_));
cleanSkip_Locked(); cleanSkip_Locked();
flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
auto thread = std::thread{&LogReaderThread::ThreadFunction, this}; auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
thread.detach(); thread.detach();
} }
@ -58,8 +56,6 @@ void LogReaderThread::ThreadFunction() {
auto lock = std::unique_lock{reader_list_->reader_threads_lock()}; auto lock = std::unique_lock{reader_list_->reader_threads_lock()};
uint64_t start = start_;
while (!release_) { while (!release_) {
if (deadline_.time_since_epoch().count() != 0) { if (deadline_.time_since_epoch().count() != 0) {
if (thread_triggered_condition_.wait_until(lock, deadline_) == if (thread_triggered_condition_.wait_until(lock, deadline_) ==
@ -74,7 +70,9 @@ void LogReaderThread::ThreadFunction() {
lock.unlock(); lock.unlock();
if (tail_) { if (tail_) {
log_buffer_->FlushTo(writer_.get(), start, nullptr, auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
flush_to_state_->log_mask());
log_buffer_->FlushTo(writer_.get(), *first_pass_state,
[this](log_id_t log_id, pid_t pid, uint64_t sequence, [this](log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count) { log_time realtime, uint16_t dropped_count) {
return FilterFirstPass(log_id, pid, sequence, realtime, return FilterFirstPass(log_id, pid, sequence, realtime,
@ -84,12 +82,12 @@ void LogReaderThread::ThreadFunction() {
true; // TODO: Likely a bug, if leading_dropped_ was not true before calling true; // TODO: Likely a bug, if leading_dropped_ was not true before calling
// flushTo(), then it should not be reset to true after. // flushTo(), then it should not be reset to true after.
} }
start = log_buffer_->FlushTo(writer_.get(), start, last_tid_, bool flush_success = log_buffer_->FlushTo(
[this](log_id_t log_id, pid_t pid, uint64_t sequence, writer_.get(), *flush_to_state_,
log_time realtime, uint16_t dropped_count) { [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime,
return FilterSecondPass(log_id, pid, sequence, realtime, uint16_t dropped_count) {
dropped_count); return FilterSecondPass(log_id, pid, sequence, realtime, dropped_count);
}); });
// We only ignore entries before the original start time for the first flushTo(), if we // We only ignore entries before the original start time for the first flushTo(), if we
// get entries after this first flush before the original start time, then the client // get entries after this first flush before the original start time, then the client
@ -102,12 +100,10 @@ void LogReaderThread::ThreadFunction() {
lock.lock(); lock.lock();
if (start == LogBuffer::FLUSH_ERROR) { if (!flush_success) {
break; break;
} }
start_ = start + 1;
if (non_block_ || release_) { if (non_block_ || release_) {
break; break;
} }
@ -131,8 +127,8 @@ void LogReaderThread::ThreadFunction() {
} }
// A first pass to count the number of elements // A first pass to count the number of elements
FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64_t sequence, FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime,
log_time realtime, uint16_t dropped_count) { uint16_t dropped_count) {
auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
if (leading_dropped_) { if (leading_dropped_) {
@ -142,12 +138,7 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64
leading_dropped_ = false; leading_dropped_ = false;
} }
if (count_ == 0) { if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
start_ = sequence;
}
if ((!pid_ || pid_ == pid) && IsWatching(log_id) &&
(start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
++count_; ++count_;
} }
@ -155,12 +146,10 @@ FilterResult LogReaderThread::FilterFirstPass(log_id_t log_id, pid_t pid, uint64
} }
// A second pass to send the selected elements // A second pass to send the selected elements
FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t sequence, FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
log_time realtime, uint16_t dropped_count) { log_time realtime, uint16_t dropped_count) {
auto lock = std::lock_guard{reader_list_->reader_threads_lock()}; auto lock = std::lock_guard{reader_list_->reader_threads_lock()};
start_ = sequence;
if (skip_ahead_[log_id]) { if (skip_ahead_[log_id]) {
skip_ahead_[log_id]--; skip_ahead_[log_id]--;
return FilterResult::kSkip; return FilterResult::kSkip;
@ -178,10 +167,6 @@ FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint6
return FilterResult::kStop; return FilterResult::kStop;
} }
if (!IsWatching(log_id)) {
return FilterResult::kSkip;
}
if (pid_ && pid_ != pid) { if (pid_ && pid_ != pid) {
return FilterResult::kSkip; return FilterResult::kSkip;
} }

View file

@ -38,7 +38,7 @@ class LogReaderThread {
public: public:
LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list, LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
std::unique_ptr<LogWriter> writer, bool non_block, unsigned long tail, std::unique_ptr<LogWriter> writer, bool non_block, unsigned long tail,
unsigned int log_mask, pid_t pid, log_time start_time, uint64_t sequence, LogMask log_mask, pid_t pid, log_time start_time, uint64_t sequence,
std::chrono::steady_clock::time_point deadline); std::chrono::steady_clock::time_point deadline);
void triggerReader_Locked() { thread_triggered_condition_.notify_all(); } void triggerReader_Locked() { thread_triggered_condition_.notify_all(); }
@ -52,11 +52,13 @@ class LogReaderThread {
thread_triggered_condition_.notify_all(); thread_triggered_condition_.notify_all();
} }
bool IsWatching(log_id_t id) const { return log_mask_ & (1 << id); } bool IsWatching(log_id_t id) const { return flush_to_state_->log_mask() & (1 << id); }
bool IsWatchingMultiple(unsigned int log_mask) const { return log_mask_ & log_mask; } bool IsWatchingMultiple(LogMask log_mask) const {
return flush_to_state_->log_mask() & log_mask;
}
std::string name() const { return writer_->name(); } std::string name() const { return writer_->name(); }
uint64_t start() const { return start_; } uint64_t start() const { return flush_to_state_->start(); }
std::chrono::steady_clock::time_point deadline() const { return deadline_; } std::chrono::steady_clock::time_point deadline() const { return deadline_; }
private: private:
@ -78,16 +80,14 @@ class LogReaderThread {
// messages should be ignored. // messages should be ignored.
bool leading_dropped_; bool leading_dropped_;
// A mask of the logs buffers that are read by this reader.
const unsigned int log_mask_;
// If set to non-zero, only pids equal to this are read by the reader. // If set to non-zero, only pids equal to this are read by the reader.
const pid_t pid_; const pid_t pid_;
// When a reader is referencing (via start_) old elements in the log buffer, and the log // When a reader is referencing (via start_) old elements in the log buffer, and the log
// buffer's size grows past its memory limit, the log buffer may request the reader to skip // buffer's size grows past its memory limit, the log buffer may request the reader to skip
// ahead a specified number of logs. // ahead a specified number of logs.
unsigned int skip_ahead_[LOG_ID_MAX]; unsigned int skip_ahead_[LOG_ID_MAX];
// Used for distinguishing 'dropped' messages for duplicate logs vs chatty drops // LogBuffer::FlushTo() needs to store state across subsequent calls.
pid_t last_tid_[LOG_ID_MAX]; std::unique_ptr<FlushToState> flush_to_state_;
// These next three variables are used for reading only the most recent lines aka `adb logcat // These next three variables are used for reading only the most recent lines aka `adb logcat
// -t` / `adb logcat -T`. // -t` / `adb logcat -T`.
@ -103,8 +103,6 @@ class LogReaderThread {
// When a reader requests logs starting from a given timestamp, its stored here for the first // When a reader requests logs starting from a given timestamp, its stored here for the first
// pass, such that logs before this time stamp that are accumulated in the buffer are ignored. // pass, such that logs before this time stamp that are accumulated in the buffer are ignored.
log_time start_time_; log_time start_time_;
// The point from which the reader will read logs once awoken.
uint64_t start_;
// CLOCK_MONOTONIC based deadline used for log wrapping. If this deadline expires before logs // CLOCK_MONOTONIC based deadline used for log wrapping. If this deadline expires before logs
// wrap, then wake up and send the logs to the reader anyway. // wrap, then wake up and send the logs to the reader anyway.
std::chrono::steady_clock::time_point deadline_; std::chrono::steady_clock::time_point deadline_;

View file

@ -23,8 +23,7 @@
// An interface for writing logs to a reader. // An interface for writing logs to a reader.
class LogWriter { class LogWriter {
public: public:
LogWriter(uid_t uid, bool privileged, bool can_read_security_logs) LogWriter(uid_t uid, bool privileged) : uid_(uid), privileged_(privileged) {}
: uid_(uid), privileged_(privileged), can_read_security_logs_(can_read_security_logs) {}
virtual ~LogWriter() {} virtual ~LogWriter() {}
virtual bool Write(const logger_entry& entry, const char* msg) = 0; virtual bool Write(const logger_entry& entry, const char* msg) = 0;
@ -35,12 +34,10 @@ class LogWriter {
uid_t uid() const { return uid_; } uid_t uid() const { return uid_; }
bool privileged() const { return privileged_; } bool privileged() const { return privileged_; }
bool can_read_security_logs() const { return can_read_security_logs_; }
private: private:
uid_t uid_; uid_t uid_;
// If this writer sees logs from all UIDs or only its own UID. See clientHasLogCredentials(). // If this writer sees logs from all UIDs or only its own UID. See clientHasLogCredentials().
bool privileged_; bool privileged_;
bool can_read_security_logs_; // If this writer sees security logs. See CanReadSecurityLogs().
}; };

View file

@ -110,14 +110,34 @@ void SimpleLogBuffer::LogInternal(LogBufferElement&& elem) {
reader_list_->NotifyNewLog(1 << log_id); reader_list_->NotifyNewLog(1 << log_id);
} }
uint64_t SimpleLogBuffer::FlushTo( // These extra parameters are only required for chatty, but since they're a no-op for
LogWriter* writer, uint64_t start, pid_t* last_tid, // SimpleLogBuffer, it's easier to include them here, then to duplicate FlushTo() for
// ChattyLogBuffer.
class ChattyFlushToState : public FlushToState {
public:
ChattyFlushToState(uint64_t start, LogMask log_mask) : FlushToState(start, log_mask) {}
pid_t* last_tid() { return last_tid_; }
private:
pid_t last_tid_[LOG_ID_MAX] = {};
};
std::unique_ptr<FlushToState> SimpleLogBuffer::CreateFlushToState(uint64_t start,
LogMask log_mask) {
return std::make_unique<ChattyFlushToState>(start, log_mask);
}
bool SimpleLogBuffer::FlushTo(
LogWriter* writer, FlushToState& abstract_state,
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence, const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
log_time realtime, uint16_t dropped_count)>& filter) { log_time realtime, uint16_t dropped_count)>& filter) {
auto shared_lock = SharedLock{lock_}; auto shared_lock = SharedLock{lock_};
auto& state = reinterpret_cast<ChattyFlushToState&>(abstract_state);
std::list<LogBufferElement>::iterator it; std::list<LogBufferElement>::iterator it;
if (start <= 1) { if (state.start() <= 1) {
// client wants to start from the beginning // client wants to start from the beginning
it = logs_.begin(); it = logs_.begin();
} else { } else {
@ -126,23 +146,25 @@ uint64_t SimpleLogBuffer::FlushTo(
for (it = logs_.end(); it != logs_.begin(); for (it = logs_.end(); it != logs_.begin();
/* do nothing */) { /* do nothing */) {
--it; --it;
if (it->getSequence() <= start) { if (it->getSequence() == state.start()) {
break;
} else if (it->getSequence() < state.start()) {
it++; it++;
break; break;
} }
} }
} }
uint64_t curr = start;
for (; it != logs_.end(); ++it) { for (; it != logs_.end(); ++it) {
LogBufferElement& element = *it; LogBufferElement& element = *it;
state.set_start(element.getSequence());
if (!writer->privileged() && element.getUid() != writer->uid()) { if (!writer->privileged() && element.getUid() != writer->uid()) {
continue; continue;
} }
if (!writer->can_read_security_logs() && element.getLogId() == LOG_ID_SECURITY) { if (((1 << element.getLogId()) & state.log_mask()) == 0) {
continue; continue;
} }
@ -157,31 +179,24 @@ uint64_t SimpleLogBuffer::FlushTo(
} }
} }
bool same_tid = false; bool same_tid = state.last_tid()[element.getLogId()] == element.getTid();
if (last_tid) { // Dropped (chatty) immediately following a valid log from the same source in the same log
same_tid = last_tid[element.getLogId()] == element.getTid(); // buffer indicates we have a multiple identical squash. chatty that differs source is due
// Dropped (chatty) immediately following a valid log from the // to spam filter. chatty to chatty of different source is also due to spam filter.
// same source in the same log buffer indicates we have a state.last_tid()[element.getLogId()] =
// multiple identical squash. chatty that differs source (element.getDropped() && !same_tid) ? 0 : element.getTid();
// is due to spam filter. chatty to chatty of different
// source is also due to spam filter.
last_tid[element.getLogId()] =
(element.getDropped() && !same_tid) ? 0 : element.getTid();
}
shared_lock.unlock(); shared_lock.unlock();
// We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the
// `element` pointer is safe here without the lock // `element` pointer is safe here without the lock
curr = element.getSequence();
if (!element.FlushTo(writer, stats_, same_tid)) { if (!element.FlushTo(writer, stats_, same_tid)) {
return FLUSH_ERROR; return false;
} }
shared_lock.lock_shared(); shared_lock.lock_shared();
} }
return curr; state.set_start(state.start() + 1);
return true;
} }
// clear all rows of type "id" from the buffer. // clear all rows of type "id" from the buffer.

View file

@ -35,10 +35,11 @@ class SimpleLogBuffer : public LogBuffer {
int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg, int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
uint16_t len) override; uint16_t len) override;
uint64_t FlushTo(LogWriter* writer, uint64_t start, pid_t* lastTid, std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) override;
const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence, bool FlushTo(LogWriter* writer, FlushToState& state,
log_time realtime, uint16_t dropped_count)>& const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
filter) override; log_time realtime, uint16_t dropped_count)>&
filter) override;
bool Clear(log_id_t id, uid_t uid) override; bool Clear(log_id_t id, uid_t uid) override;
unsigned long GetSize(log_id_t id) override; unsigned long GetSize(log_id_t id) override;