Merge "Support batching ops across Add*Blocks() call" into main

This commit is contained in:
Treehugger Robot 2023-12-20 22:49:38 +00:00 committed by Gerrit Code Review
commit 26cb9dbfef
6 changed files with 125 additions and 80 deletions

View file

@ -107,7 +107,7 @@ struct ResumePoint {
static constexpr uint8_t kNumResumePoints = 4; static constexpr uint8_t kNumResumePoints = 4;
struct CowHeaderV3 : public CowHeader { struct CowHeaderV3 : public CowHeader {
// Number of sequence data stored (each of which is a 32 byte integer) // Number of sequence data stored (each of which is a 32 bit integer)
uint64_t sequence_data_count; uint64_t sequence_data_count;
// Number of currently written resume points && // Number of currently written resume points &&
uint32_t resume_point_count; uint32_t resume_point_count;
@ -311,6 +311,8 @@ std::ostream& operator<<(std::ostream& os, CowOperationV3 const& arg);
std::ostream& operator<<(std::ostream& os, ResumePoint const& arg); std::ostream& operator<<(std::ostream& os, ResumePoint const& arg);
std::ostream& operator<<(std::ostream& os, CowOperationType cow_type);
int64_t GetNextOpOffset(const CowOperationV2& op, uint32_t cluster_size); int64_t GetNextOpOffset(const CowOperationV2& op, uint32_t cluster_size);
int64_t GetNextDataOffset(const CowOperationV2& op, uint32_t cluster_size); int64_t GetNextDataOffset(const CowOperationV2& op, uint32_t cluster_size);

View file

@ -52,6 +52,10 @@ std::ostream& EmitCowTypeString(std::ostream& os, CowOperationType cow_type) {
} }
} }
std::ostream& operator<<(std::ostream& os, CowOperationType cow_type) {
return EmitCowTypeString(os, cow_type);
}
std::ostream& operator<<(std::ostream& os, CowOperationV2 const& op) { std::ostream& operator<<(std::ostream& os, CowOperationV2 const& op) {
os << "CowOperationV2("; os << "CowOperationV2(";
EmitCowTypeString(os, op.type) << ", "; EmitCowTypeString(os, op.type) << ", ";

View file

@ -114,6 +114,12 @@ bool CowParserV3::ParseOps(borrowed_fd fd, const uint32_t op_index) {
for (auto op : *ops_) { for (auto op : *ops_) {
if (op.type() == kCowXorOp) { if (op.type() == kCowXorOp) {
xor_data_loc_->insert({op.new_block, data_pos}); xor_data_loc_->insert({op.new_block, data_pos});
} else if (op.type() == kCowReplaceOp) {
if (data_pos != op.source()) {
LOG(ERROR) << "Invalid data location for operation " << op
<< ", expected: " << data_pos;
return false;
}
} }
data_pos += op.data_length; data_pos += op.data_length;
} }

View file

@ -688,5 +688,15 @@ TEST_F(CowTestV3, CopyOpMany) {
} }
} }
TEST_F(CowTestV3, CheckOpCount) {
CowOptions options;
options.op_count_max = 20;
options.batch_write = true;
options.cluster_ops = 200;
auto writer = CreateCowWriter(3, options, GetCowFd());
ASSERT_TRUE(writer->AddZeroBlocks(0, 19));
ASSERT_FALSE(writer->AddZeroBlocks(0, 19));
}
} // namespace snapshot } // namespace snapshot
} // namespace android } // namespace android

View file

@ -71,9 +71,6 @@ void CowWriterV3::SetupHeaders() {
header_.block_size = options_.block_size; header_.block_size = options_.block_size;
header_.num_merge_ops = options_.num_merge_ops; header_.num_merge_ops = options_.num_merge_ops;
header_.cluster_ops = 0; header_.cluster_ops = 0;
if (batch_size_ > 1) {
LOG(INFO) << "Batch writes enabled with batch size of " << batch_size_;
}
if (options_.scratch_space) { if (options_.scratch_space) {
header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE; header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
} }
@ -82,6 +79,7 @@ void CowWriterV3::SetupHeaders() {
// WIP: not quite sure how some of these are calculated yet, assuming buffer_size is determined // WIP: not quite sure how some of these are calculated yet, assuming buffer_size is determined
// during COW size estimation // during COW size estimation
header_.sequence_data_count = 0; header_.sequence_data_count = 0;
header_.resume_point_count = 0; header_.resume_point_count = 0;
header_.resume_point_max = kNumResumePoints; header_.resume_point_max = kNumResumePoints;
header_.op_count = 0; header_.op_count = 0;
@ -123,6 +121,19 @@ bool CowWriterV3::ParseOptions() {
LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm; LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
return false; return false;
} }
if (options_.cluster_ops &&
(android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
options_.batch_write)) {
batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
data_vec_.reserve(batch_size_);
cached_data_.reserve(batch_size_);
cached_ops_.reserve(batch_size_);
}
}
if (batch_size_ > 1) {
LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
} else {
LOG(INFO) << "Batch writes: disabled";
} }
return true; return true;
} }
@ -220,8 +231,9 @@ bool CowWriterV3::CheckOpCount(size_t op_count) {
if (IsEstimating()) { if (IsEstimating()) {
return true; return true;
} }
if (header_.op_count + op_count > header_.op_count_max) { if (header_.op_count + cached_ops_.size() + op_count > header_.op_count_max) {
LOG(ERROR) << "Current number of ops on disk: " << header_.op_count LOG(ERROR) << "Current number of ops on disk: " << header_.op_count
<< ", number of ops cached in memory: " << cached_ops_.size()
<< ", number of ops attempting to write: " << op_count << ", number of ops attempting to write: " << op_count
<< ", this will exceed max op count " << header_.op_count_max; << ", this will exceed max op count " << header_.op_count_max;
return false; return false;
@ -233,17 +245,18 @@ bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_
if (!CheckOpCount(num_blocks)) { if (!CheckOpCount(num_blocks)) {
return false; return false;
} }
std::vector<CowOperationV3> ops(num_blocks);
for (size_t i = 0; i < num_blocks; i++) { for (size_t i = 0; i < num_blocks; i++) {
CowOperationV3& op = ops[i]; CowOperationV3& op = cached_ops_.emplace_back();
op.set_type(kCowCopyOp); op.set_type(kCowCopyOp);
op.new_block = new_block + i; op.new_block = new_block + i;
op.set_source(old_block + i); op.set_source(old_block + i);
} }
if (!WriteOperation({ops.data(), ops.size()}, {})) {
return false;
}
if (NeedsFlush()) {
if (!FlushCacheOps()) {
return false;
}
}
return true; return true;
} }
@ -262,6 +275,13 @@ bool CowWriterV3::EmitXorBlocks(uint32_t new_block_start, const void* data, size
return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp); return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
} }
bool CowWriterV3::NeedsFlush() const {
// Allow bigger batch sizes for ops without data. A single CowOperationV3
// struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory,
// it's only ~44K.
return cached_data_.size() >= batch_size_ || cached_ops_.size() >= batch_size_ * 16;
}
bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size, bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
uint64_t old_block, uint16_t offset, CowOperationType type) { uint64_t old_block, uint16_t offset, CowOperationType type) {
if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) { if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
@ -270,34 +290,18 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
return false; return false;
} }
const size_t num_blocks = (size / header_.block_size); const size_t num_blocks = (size / header_.block_size);
if (compression_.algorithm == kCowCompressNone) {
std::vector<CowOperationV3> ops(num_blocks);
for (size_t i = 0; i < num_blocks; i++) {
CowOperation& op = ops[i];
op.new_block = new_block_start + i;
op.set_type(type); for (size_t i = 0; i < num_blocks;) {
if (type == kCowXorOp) { const auto blocks_to_write =
op.set_source((old_block + i) * header_.block_size + offset); std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
} else {
op.set_source(next_data_pos_ + header_.block_size * i);
}
op.data_length = header_.block_size;
}
return WriteOperation({ops.data(), ops.size()}, data, size);
}
for (size_t i = 0; i < num_blocks; i += batch_size_) {
const auto blocks_to_write = std::min<size_t>(batch_size_, num_blocks - i);
std::vector<std::basic_string<uint8_t>> compressed_blocks(blocks_to_write);
std::vector<CowOperationV3> ops(blocks_to_write);
std::vector<struct iovec> vec(blocks_to_write);
size_t compressed_bytes = 0; size_t compressed_bytes = 0;
for (size_t j = 0; j < blocks_to_write; j++) { for (size_t j = 0; j < blocks_to_write; j++) {
const uint8_t* const iter = const uint8_t* const iter =
reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j)); reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));
CowOperation& op = ops[j]; CowOperation& op = cached_ops_.emplace_back();
auto& vec = data_vec_.emplace_back();
auto& compressed_data = cached_data_.emplace_back();
op.new_block = new_block_start + i + j; op.new_block = new_block_start + i + j;
op.set_type(type); op.set_type(type);
@ -306,29 +310,31 @@ bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t
} else { } else {
op.set_source(next_data_pos_ + compressed_bytes); op.set_source(next_data_pos_ + compressed_bytes);
} }
if (compression_.algorithm == kCowCompressNone) {
std::basic_string<uint8_t> compressed_data = compressed_data.resize(header_.block_size);
compressor_->Compress(iter, header_.block_size); } else {
if (compressed_data.empty()) { compressed_data = compressor_->Compress(iter, header_.block_size);
LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", " if (compressed_data.empty()) {
<< num_blocks << ");"; LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
return false; << num_blocks << ");";
return false;
}
} }
if (compressed_data.size() >= header_.block_size) { if (compressed_data.size() >= header_.block_size) {
compressed_data.resize(header_.block_size); compressed_data.resize(header_.block_size);
std::memcpy(compressed_data.data(), iter, header_.block_size); std::memcpy(compressed_data.data(), iter, header_.block_size);
} }
compressed_blocks[j] = std::move(compressed_data); vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
vec[j] = {.iov_base = compressed_blocks[j].data(), op.data_length = vec.iov_len;
.iov_len = compressed_blocks[j].size()};
op.data_length = vec[j].iov_len;
compressed_bytes += op.data_length; compressed_bytes += op.data_length;
} }
if (!WriteOperation({ops.data(), ops.size()}, {vec.data(), vec.size()})) { if (NeedsFlush() && !FlushCacheOps()) {
PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: " LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
<< new_block_start << " compression: " << compression_.algorithm; << new_block_start << " compression: " << compression_.algorithm
<< ", op type: " << type;
return false; return false;
} }
i += blocks_to_write;
} }
return true; return true;
@ -338,14 +344,15 @@ bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, const uint64_t num_bl
if (!CheckOpCount(num_blocks)) { if (!CheckOpCount(num_blocks)) {
return false; return false;
} }
std::vector<CowOperationV3> ops(num_blocks);
for (uint64_t i = 0; i < num_blocks; i++) { for (uint64_t i = 0; i < num_blocks; i++) {
auto& op = ops[i]; auto& op = cached_ops_.emplace_back();
op.set_type(kCowZeroOp); op.set_type(kCowZeroOp);
op.new_block = new_block_start + i; op.new_block = new_block_start + i;
} }
if (!WriteOperation({ops.data(), ops.size()})) { if (NeedsFlush()) {
return false; if (!FlushCacheOps()) {
return false;
}
} }
return true; return true;
} }
@ -354,6 +361,10 @@ bool CowWriterV3::EmitLabel(uint64_t label) {
// remove all labels greater than this current one. we want to avoid the situation of adding // remove all labels greater than this current one. we want to avoid the situation of adding
// in // in
// duplicate labels with differing op values // duplicate labels with differing op values
if (!FlushCacheOps()) {
LOG(ERROR) << "Failed to flush cached ops before emitting label " << label;
return false;
}
auto remove_if_callback = [&](const auto& resume_point) -> bool { auto remove_if_callback = [&](const auto& resume_point) -> bool {
if (resume_point.label >= label) return true; if (resume_point.label >= label) return true;
return false; return false;
@ -382,19 +393,14 @@ bool CowWriterV3::EmitLabel(uint64_t label) {
bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) { bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
// TODO: size sequence buffer based on options // TODO: size sequence buffer based on options
if (header_.op_count > 0) { if (header_.op_count > 0 || !cached_ops_.empty()) {
LOG(ERROR) LOG(ERROR) << "There's " << header_.op_count << " operations written to disk and "
<< "There's " << header_.op_count << cached_ops_.size()
<< " operations written to disk. Writing sequence data is only allowed before all " << " ops cached in memory. Writing sequence data is only allowed before all "
"operation writes."; "operation writes.";
return false; return false;
} }
header_.sequence_data_count = num_ops; header_.sequence_data_count = num_ops;
// In COW format v3, data section is placed after op section and sequence
// data section. Therefore, changing the sequence data count has the effect
// of moving op section and data section. Therefore we need to reset the
// value of |next_data_pos|. This is also the reason why writing sequence
// data is only allowed if there's no operation written.
next_data_pos_ = GetDataOffset(header_); next_data_pos_ = GetDataOffset(header_);
if (!android::base::WriteFullyAtOffset(fd_, data, sizeof(data[0]) * num_ops, if (!android::base::WriteFullyAtOffset(fd_, data, sizeof(data[0]) * num_ops,
GetSequenceOffset(header_))) { GetSequenceOffset(header_))) {
@ -404,16 +410,32 @@ bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
return true; return true;
} }
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> op, const void* data, bool CowWriterV3::FlushCacheOps() {
size_t size) { if (cached_ops_.empty()) {
struct iovec vec { if (!data_vec_.empty()) {
.iov_len = size LOG(ERROR) << "Cached ops is empty, but data iovec has size: " << data_vec_.size()
}; << " this is definitely a bug.";
// Dear C++ god, this is effectively a const_cast. I had to do this because return false;
// pwritev()'s struct iovec requires a non-const pointer. The input data }
// will not be modified, as the iovec is only used for a write operation. return true;
std::memcpy(&vec.iov_base, &data, sizeof(data)); }
return WriteOperation(op, {&vec, 1}); size_t bytes_written = 0;
for (auto& op : cached_ops_) {
if (op.type() == kCowReplaceOp) {
op.set_source(next_data_pos_ + bytes_written);
}
bytes_written += op.data_length;
}
if (!WriteOperation({cached_ops_.data(), cached_ops_.size()},
{data_vec_.data(), data_vec_.size()})) {
LOG(ERROR) << "Failed to flush " << cached_ops_.size() << " ops to disk";
return false;
}
cached_ops_.clear();
cached_data_.clear();
data_vec_.clear();
return true;
} }
bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops, bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
@ -438,7 +460,6 @@ bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
<< ops.size() << " ops will exceed the max of " << header_.op_count_max; << ops.size() << " ops will exceed the max of " << header_.op_count_max;
return false; return false;
} }
const off_t offset = GetOpOffset(header_.op_count, header_); const off_t offset = GetOpOffset(header_.op_count, header_);
if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) { if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset; PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
@ -458,13 +479,12 @@ bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
return true; return true;
} }
bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
return WriteOperation({&op, 1}, data, size);
}
bool CowWriterV3::Finalize() { bool CowWriterV3::Finalize() {
CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3)); CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
CHECK_LE(header_.prefix.header_size, sizeof(header_)); CHECK_LE(header_.prefix.header_size, sizeof(header_));
if (!FlushCacheOps()) {
return false;
}
if (!android::base::WriteFullyAtOffset(fd_, &header_, header_.prefix.header_size, 0)) { if (!android::base::WriteFullyAtOffset(fd_, &header_, header_.prefix.header_size, 0)) {
return false; return false;
} }

View file

@ -16,6 +16,7 @@
#include <android-base/logging.h> #include <android-base/logging.h>
#include <string_view> #include <string_view>
#include <vector>
#include "writer_base.h" #include "writer_base.h"
@ -42,20 +43,20 @@ class CowWriterV3 : public CowWriterBase {
private: private:
void SetupHeaders(); void SetupHeaders();
bool NeedsFlush() const;
bool ParseOptions(); bool ParseOptions();
bool OpenForWrite(); bool OpenForWrite();
bool OpenForAppend(uint64_t label); bool OpenForAppend(uint64_t label);
bool WriteOperation(std::basic_string_view<CowOperationV3> op, bool WriteOperation(std::basic_string_view<CowOperationV3> op,
std::basic_string_view<struct iovec> data); std::basic_string_view<struct iovec> data);
bool WriteOperation(std::basic_string_view<CowOperationV3> op, const void* data = nullptr,
size_t size = 0);
bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block, bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
uint16_t offset, CowOperationType type); uint16_t offset, CowOperationType type);
bool CompressBlocks(size_t num_blocks, const void* data); bool CompressBlocks(size_t num_blocks, const void* data);
bool CheckOpCount(size_t op_count); bool CheckOpCount(size_t op_count);
private: private:
bool ReadBackVerification();
bool FlushCacheOps();
CowHeaderV3 header_{}; CowHeaderV3 header_{};
CowCompression compression_; CowCompression compression_;
// in the case that we are using one thread for compression, we can store and re-use the same // in the case that we are using one thread for compression, we can store and re-use the same
@ -66,12 +67,14 @@ class CowWriterV3 : public CowWriterBase {
std::shared_ptr<std::vector<ResumePoint>> resume_points_; std::shared_ptr<std::vector<ResumePoint>> resume_points_;
uint64_t next_data_pos_ = 0; uint64_t next_data_pos_ = 0;
std::vector<std::basic_string<uint8_t>> compressed_buf_;
// in the case that we are using one thread for compression, we can store and re-use the same // in the case that we are using one thread for compression, we can store and re-use the same
// compressor // compressor
int num_compress_threads_ = 1; int num_compress_threads_ = 1;
size_t batch_size_ = 0; size_t batch_size_ = 1;
std::vector<CowOperationV3> cached_ops_;
std::vector<std::basic_string<uint8_t>> cached_data_;
std::vector<struct iovec> data_vec_;
}; };
} // namespace snapshot } // namespace snapshot