Merge changes from topic "libsnapshot-batch-writes" am: a96e27ca44 am: 36fde3a46a
Original change: https://android-review.googlesource.com/c/platform/system/core/+/2335282 Change-Id: I493c868e8cd28a510a4597ccc9060cebdb513da5 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
commit
e7194dc6f0
4 changed files with 579 additions and 43 deletions
|
|
@ -16,10 +16,17 @@
|
|||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <cstdint>
|
||||
#include <future>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include <android-base/unique_fd.h>
|
||||
#include <libsnapshot/cow_format.h>
|
||||
|
|
@ -42,6 +49,12 @@ struct CowOptions {
|
|||
|
||||
// Preset the number of merged ops. Only useful for testing.
|
||||
uint64_t num_merge_ops = 0;
|
||||
|
||||
// Number of threads for compression
|
||||
int num_compress_threads = 0;
|
||||
|
||||
// Batch write cluster ops
|
||||
bool batch_write = false;
|
||||
};
|
||||
|
||||
// Interface for writing to a snapuserd COW. All operations are ordered; merges
|
||||
|
|
@ -100,9 +113,40 @@ class ICowWriter {
|
|||
CowOptions options_;
|
||||
};
|
||||
|
||||
class CompressWorker {
|
||||
public:
|
||||
CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size);
|
||||
bool RunThread();
|
||||
void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
|
||||
bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
|
||||
void Finalize();
|
||||
|
||||
private:
|
||||
struct CompressWork {
|
||||
const void* buffer;
|
||||
size_t num_blocks;
|
||||
bool compression_status = false;
|
||||
std::vector<std::basic_string<uint8_t>> compressed_data;
|
||||
};
|
||||
|
||||
CowCompressionAlgorithm compression_;
|
||||
uint32_t block_size_;
|
||||
|
||||
std::queue<CompressWork> work_queue_;
|
||||
std::queue<CompressWork> compressed_queue_;
|
||||
std::mutex lock_;
|
||||
std::condition_variable cv_;
|
||||
bool stopped_ = false;
|
||||
|
||||
std::basic_string<uint8_t> Compress(const void* data, size_t length);
|
||||
bool CompressBlocks(const void* buffer, size_t num_blocks,
|
||||
std::vector<std::basic_string<uint8_t>>* compressed_data);
|
||||
};
|
||||
|
||||
class CowWriter : public ICowWriter {
|
||||
public:
|
||||
explicit CowWriter(const CowOptions& options);
|
||||
~CowWriter();
|
||||
|
||||
// Set up the writer.
|
||||
// The file starts from the beginning.
|
||||
|
|
@ -138,6 +182,7 @@ class CowWriter : public ICowWriter {
|
|||
bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
|
||||
uint16_t offset, uint8_t type);
|
||||
void SetupHeaders();
|
||||
void SetupWriteOptions();
|
||||
bool ParseOptions();
|
||||
bool OpenForWrite();
|
||||
bool OpenForAppend(uint64_t label);
|
||||
|
|
@ -145,9 +190,12 @@ class CowWriter : public ICowWriter {
|
|||
bool WriteRawData(const void* data, size_t size);
|
||||
bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0);
|
||||
void AddOperation(const CowOperation& op);
|
||||
std::basic_string<uint8_t> Compress(const void* data, size_t length);
|
||||
void InitPos();
|
||||
void InitBatchWrites();
|
||||
void InitWorkers();
|
||||
bool FlushCluster();
|
||||
|
||||
bool CompressBlocks(size_t num_blocks, const void* data);
|
||||
bool SetFd(android::base::borrowed_fd fd);
|
||||
bool Sync();
|
||||
bool Truncate(off_t length);
|
||||
|
|
@ -159,8 +207,11 @@ class CowWriter : public ICowWriter {
|
|||
CowHeader header_{};
|
||||
CowFooter footer_{};
|
||||
CowCompressionAlgorithm compression_ = kCowCompressNone;
|
||||
uint64_t current_op_pos_ = 0;
|
||||
uint64_t next_op_pos_ = 0;
|
||||
uint64_t next_data_pos_ = 0;
|
||||
uint64_t current_data_pos_ = 0;
|
||||
ssize_t total_data_written_ = 0;
|
||||
uint32_t cluster_size_ = 0;
|
||||
uint32_t current_cluster_size_ = 0;
|
||||
uint64_t current_data_size_ = 0;
|
||||
|
|
@ -168,6 +219,21 @@ class CowWriter : public ICowWriter {
|
|||
bool merge_in_progress_ = false;
|
||||
bool is_block_device_ = false;
|
||||
uint64_t cow_image_size_ = INT64_MAX;
|
||||
|
||||
int num_compress_threads_ = 1;
|
||||
std::vector<std::unique_ptr<CompressWorker>> compress_threads_;
|
||||
std::vector<std::future<bool>> threads_;
|
||||
std::vector<std::basic_string<uint8_t>> compressed_buf_;
|
||||
std::vector<std::basic_string<uint8_t>>::iterator buf_iter_;
|
||||
|
||||
std::vector<std::unique_ptr<CowOperation>> opbuffer_vec_;
|
||||
std::vector<std::unique_ptr<uint8_t[]>> databuffer_vec_;
|
||||
std::unique_ptr<struct iovec[]> cowop_vec_;
|
||||
int op_vec_index_ = 0;
|
||||
|
||||
std::unique_ptr<struct iovec[]> data_vec_;
|
||||
int data_vec_index_ = 0;
|
||||
bool batch_write_ = false;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
|||
|
|
@ -298,6 +298,150 @@ TEST_F(CowTest, CompressGz) {
|
|||
ASSERT_TRUE(iter->Done());
|
||||
}
|
||||
|
||||
class CompressionRWTest : public CowTest, public testing::WithParamInterface<const char*> {};
|
||||
|
||||
TEST_P(CompressionRWTest, ThreadedBatchWrites) {
|
||||
CowOptions options;
|
||||
options.compression = GetParam();
|
||||
options.num_compress_threads = 2;
|
||||
|
||||
CowWriter writer(options);
|
||||
|
||||
ASSERT_TRUE(writer.Initialize(cow_->fd));
|
||||
|
||||
std::string xor_data = "This is test data-1. Testing xor";
|
||||
xor_data.resize(options.block_size, '\0');
|
||||
ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10));
|
||||
|
||||
std::string data = "This is test data-2. Testing replace ops";
|
||||
data.resize(options.block_size * 2048, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size()));
|
||||
|
||||
std::string data2 = "This is test data-3. Testing replace ops";
|
||||
data2.resize(options.block_size * 259, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size()));
|
||||
|
||||
std::string data3 = "This is test data-4. Testing replace ops";
|
||||
data3.resize(options.block_size, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size()));
|
||||
|
||||
ASSERT_TRUE(writer.Finalize());
|
||||
|
||||
int expected_blocks = (1 + 2048 + 259 + 1);
|
||||
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
|
||||
|
||||
CowReader reader;
|
||||
ASSERT_TRUE(reader.Parse(cow_->fd));
|
||||
|
||||
auto iter = reader.GetOpIter();
|
||||
ASSERT_NE(iter, nullptr);
|
||||
|
||||
int total_blocks = 0;
|
||||
while (!iter->Done()) {
|
||||
auto op = &iter->Get();
|
||||
|
||||
if (op->type == kCowXorOp) {
|
||||
total_blocks += 1;
|
||||
StringSink sink;
|
||||
ASSERT_EQ(op->new_block, 50);
|
||||
ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
ASSERT_EQ(sink.stream(), xor_data);
|
||||
}
|
||||
|
||||
if (op->type == kCowReplaceOp) {
|
||||
total_blocks += 1;
|
||||
if (op->new_block == 100) {
|
||||
StringSink sink;
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
data.resize(options.block_size);
|
||||
ASSERT_EQ(sink.stream(), data);
|
||||
}
|
||||
if (op->new_block == 6000) {
|
||||
StringSink sink;
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
data2.resize(options.block_size);
|
||||
ASSERT_EQ(sink.stream(), data2);
|
||||
}
|
||||
if (op->new_block == 9000) {
|
||||
StringSink sink;
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
ASSERT_EQ(sink.stream(), data3);
|
||||
}
|
||||
}
|
||||
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
ASSERT_EQ(total_blocks, expected_blocks);
|
||||
}
|
||||
|
||||
TEST_P(CompressionRWTest, NoBatchWrites) {
|
||||
CowOptions options;
|
||||
options.compression = GetParam();
|
||||
options.num_compress_threads = 1;
|
||||
options.cluster_ops = 0;
|
||||
|
||||
CowWriter writer(options);
|
||||
|
||||
ASSERT_TRUE(writer.Initialize(cow_->fd));
|
||||
|
||||
std::string data = "Testing replace ops without batch writes";
|
||||
data.resize(options.block_size * 1024, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size()));
|
||||
|
||||
std::string data2 = "Testing odd blocks without batch writes";
|
||||
data2.resize(options.block_size * 111, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(3000, data2.data(), data2.size()));
|
||||
|
||||
std::string data3 = "Testing single 4k block";
|
||||
data3.resize(options.block_size, '\0');
|
||||
ASSERT_TRUE(writer.AddRawBlocks(5000, data3.data(), data3.size()));
|
||||
|
||||
ASSERT_TRUE(writer.Finalize());
|
||||
|
||||
int expected_blocks = (1024 + 111 + 1);
|
||||
ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
|
||||
|
||||
CowReader reader;
|
||||
ASSERT_TRUE(reader.Parse(cow_->fd));
|
||||
|
||||
auto iter = reader.GetOpIter();
|
||||
ASSERT_NE(iter, nullptr);
|
||||
|
||||
int total_blocks = 0;
|
||||
while (!iter->Done()) {
|
||||
auto op = &iter->Get();
|
||||
|
||||
if (op->type == kCowReplaceOp) {
|
||||
total_blocks += 1;
|
||||
if (op->new_block == 50) {
|
||||
StringSink sink;
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
data.resize(options.block_size);
|
||||
ASSERT_EQ(sink.stream(), data);
|
||||
}
|
||||
if (op->new_block == 3000) {
|
||||
StringSink sink;
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
data2.resize(options.block_size);
|
||||
ASSERT_EQ(sink.stream(), data2);
|
||||
}
|
||||
if (op->new_block == 5000) {
|
||||
StringSink sink;
|
||||
ASSERT_TRUE(reader.ReadData(*op, &sink));
|
||||
ASSERT_EQ(sink.stream(), data3);
|
||||
}
|
||||
}
|
||||
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
ASSERT_EQ(total_blocks, expected_blocks);
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4"));
|
||||
|
||||
TEST_F(CowTest, ClusterCompressGz) {
|
||||
CowOptions options;
|
||||
options.compression = "gz";
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@
|
|||
namespace android {
|
||||
namespace snapshot {
|
||||
|
||||
std::basic_string<uint8_t> CowWriter::Compress(const void* data, size_t length) {
|
||||
std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
|
||||
switch (compression_) {
|
||||
case kCowCompressGz: {
|
||||
const auto bound = compressBound(length);
|
||||
|
|
@ -100,5 +100,119 @@ std::basic_string<uint8_t> CowWriter::Compress(const void* data, size_t length)
|
|||
return {};
|
||||
}
|
||||
|
||||
bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
|
||||
std::vector<std::basic_string<uint8_t>>* compressed_data) {
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
|
||||
while (num_blocks) {
|
||||
auto data = Compress(iter, block_size_);
|
||||
if (data.empty()) {
|
||||
PLOG(ERROR) << "CompressBlocks: Compression failed";
|
||||
return false;
|
||||
}
|
||||
if (data.size() > std::numeric_limits<uint16_t>::max()) {
|
||||
LOG(ERROR) << "Compressed block is too large: " << data.size();
|
||||
return false;
|
||||
}
|
||||
|
||||
compressed_data->emplace_back(std::move(data));
|
||||
num_blocks -= 1;
|
||||
iter += block_size_;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CompressWorker::RunThread() {
|
||||
while (true) {
|
||||
// Wait for work
|
||||
CompressWork blocks;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
while (work_queue_.empty() && !stopped_) {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
|
||||
if (stopped_) {
|
||||
return true;
|
||||
}
|
||||
|
||||
blocks = std::move(work_queue_.front());
|
||||
work_queue_.pop();
|
||||
}
|
||||
|
||||
// Compress blocks
|
||||
bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
|
||||
blocks.compression_status = ret;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
compressed_queue_.push(std::move(blocks));
|
||||
}
|
||||
|
||||
// Notify completion
|
||||
cv_.notify_all();
|
||||
|
||||
if (!ret) {
|
||||
LOG(ERROR) << "CompressBlocks failed";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
|
||||
CompressWork blocks = {};
|
||||
blocks.buffer = buffer;
|
||||
blocks.num_blocks = num_blocks;
|
||||
work_queue_.push(std::move(blocks));
|
||||
}
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
while (compressed_queue_.empty() && !stopped_) {
|
||||
cv_.wait(lock);
|
||||
}
|
||||
|
||||
if (stopped_) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(lock_);
|
||||
while (compressed_queue_.size() > 0) {
|
||||
CompressWork blocks = std::move(compressed_queue_.front());
|
||||
compressed_queue_.pop();
|
||||
|
||||
if (blocks.compression_status) {
|
||||
compressed_buf->insert(compressed_buf->end(),
|
||||
std::make_move_iterator(blocks.compressed_data.begin()),
|
||||
std::make_move_iterator(blocks.compressed_data.end()));
|
||||
} else {
|
||||
LOG(ERROR) << "Block compression failed";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void CompressWorker::Finalize() {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
stopped_ = true;
|
||||
}
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size)
|
||||
: compression_(compression), block_size_(block_size) {}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
//
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/uio.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <limits>
|
||||
|
|
@ -22,6 +23,7 @@
|
|||
|
||||
#include <android-base/file.h>
|
||||
#include <android-base/logging.h>
|
||||
#include <android-base/properties.h>
|
||||
#include <android-base/unique_fd.h>
|
||||
#include <brotli/encode.h>
|
||||
#include <libsnapshot/cow_format.h>
|
||||
|
|
@ -132,6 +134,46 @@ bool ICowWriter::ValidateNewBlock(uint64_t new_block) {
|
|||
|
||||
CowWriter::CowWriter(const CowOptions& options) : ICowWriter(options), fd_(-1) {
|
||||
SetupHeaders();
|
||||
SetupWriteOptions();
|
||||
}
|
||||
|
||||
CowWriter::~CowWriter() {
|
||||
for (size_t i = 0; i < compress_threads_.size(); i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
if (worker) {
|
||||
worker->Finalize();
|
||||
}
|
||||
}
|
||||
|
||||
bool ret = true;
|
||||
for (auto& t : threads_) {
|
||||
ret = t.get() && ret;
|
||||
}
|
||||
|
||||
if (!ret) {
|
||||
LOG(ERROR) << "Compression failed";
|
||||
}
|
||||
compress_threads_.clear();
|
||||
}
|
||||
|
||||
void CowWriter::SetupWriteOptions() {
|
||||
num_compress_threads_ = options_.num_compress_threads;
|
||||
|
||||
if (!num_compress_threads_) {
|
||||
num_compress_threads_ = 1;
|
||||
// We prefer not to have more than two threads as the overhead of additional
|
||||
// threads is far greater than cutting down compression time.
|
||||
if (header_.cluster_ops &&
|
||||
android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
|
||||
num_compress_threads_ = 2;
|
||||
}
|
||||
}
|
||||
|
||||
if (header_.cluster_ops &&
|
||||
(android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
|
||||
options_.batch_write)) {
|
||||
batch_write_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
void CowWriter::SetupHeaders() {
|
||||
|
|
@ -206,6 +248,42 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) {
|
|||
return true;
|
||||
}
|
||||
|
||||
void CowWriter::InitBatchWrites() {
|
||||
if (batch_write_) {
|
||||
cowop_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
|
||||
data_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
|
||||
struct iovec* cowop_ptr = cowop_vec_.get();
|
||||
struct iovec* data_ptr = data_vec_.get();
|
||||
for (size_t i = 0; i < header_.cluster_ops; i++) {
|
||||
std::unique_ptr<CowOperation> op = std::make_unique<CowOperation>();
|
||||
cowop_ptr[i].iov_base = op.get();
|
||||
cowop_ptr[i].iov_len = sizeof(CowOperation);
|
||||
opbuffer_vec_.push_back(std::move(op));
|
||||
|
||||
std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(header_.block_size * 2);
|
||||
data_ptr[i].iov_base = buffer.get();
|
||||
data_ptr[i].iov_len = header_.block_size * 2;
|
||||
databuffer_vec_.push_back(std::move(buffer));
|
||||
}
|
||||
|
||||
current_op_pos_ = next_op_pos_;
|
||||
current_data_pos_ = next_data_pos_;
|
||||
}
|
||||
|
||||
std::string batch_write = batch_write_ ? "enabled" : "disabled";
|
||||
LOG(INFO) << "Batch writes: " << batch_write;
|
||||
}
|
||||
|
||||
void CowWriter::InitWorkers() {
|
||||
for (int i = 0; i < num_compress_threads_; i++) {
|
||||
auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
|
||||
threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
|
||||
compress_threads_.push_back(std::move(wt));
|
||||
}
|
||||
|
||||
LOG(INFO) << num_compress_threads_ << " thread used for compression";
|
||||
}
|
||||
|
||||
bool CowWriter::Initialize(unique_fd&& fd) {
|
||||
owned_fd_ = std::move(fd);
|
||||
return Initialize(borrowed_fd{owned_fd_});
|
||||
|
|
@ -216,7 +294,13 @@ bool CowWriter::Initialize(borrowed_fd fd) {
|
|||
return false;
|
||||
}
|
||||
|
||||
return OpenForWrite();
|
||||
bool ret = OpenForWrite();
|
||||
|
||||
if (ret) {
|
||||
InitWorkers();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool CowWriter::InitializeAppend(android::base::unique_fd&& fd, uint64_t label) {
|
||||
|
|
@ -229,7 +313,13 @@ bool CowWriter::InitializeAppend(android::base::borrowed_fd fd, uint64_t label)
|
|||
return false;
|
||||
}
|
||||
|
||||
return OpenForAppend(label);
|
||||
bool ret = OpenForAppend(label);
|
||||
|
||||
if (ret && !compress_threads_.size()) {
|
||||
InitWorkers();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void CowWriter::InitPos() {
|
||||
|
|
@ -287,6 +377,7 @@ bool CowWriter::OpenForWrite() {
|
|||
}
|
||||
|
||||
InitPos();
|
||||
InitBatchWrites();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
@ -320,6 +411,9 @@ bool CowWriter::OpenForAppend(uint64_t label) {
|
|||
PLOG(ERROR) << "lseek failed";
|
||||
return false;
|
||||
}
|
||||
|
||||
InitBatchWrites();
|
||||
|
||||
return EmitClusterIfNeeded();
|
||||
}
|
||||
|
||||
|
|
@ -348,47 +442,99 @@ bool CowWriter::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t
|
|||
return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
|
||||
}
|
||||
|
||||
bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) {
|
||||
size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
|
||||
size_t num_blocks_per_thread = num_blocks / num_threads;
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
compressed_buf_.clear();
|
||||
|
||||
// Submit the blocks per thread. The retrieval of
|
||||
// compressed buffers has to be done in the same order.
|
||||
// We should not poll for completed buffers in a different order as the
|
||||
// buffers are tightly coupled with block ordering.
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
if (i == num_threads - 1) {
|
||||
num_blocks_per_thread = num_blocks;
|
||||
}
|
||||
worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
|
||||
iter += (num_blocks_per_thread * header_.block_size);
|
||||
num_blocks -= num_blocks_per_thread;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_threads; i++) {
|
||||
CompressWorker* worker = compress_threads_[i].get();
|
||||
if (!worker->GetCompressedBuffers(&compressed_buf_)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
|
||||
uint64_t old_block, uint16_t offset, uint8_t type) {
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
CHECK(!merge_in_progress_);
|
||||
for (size_t i = 0; i < size / header_.block_size; i++) {
|
||||
CowOperation op = {};
|
||||
op.new_block = new_block_start + i;
|
||||
op.type = type;
|
||||
if (type == kCowXorOp) {
|
||||
op.source = (old_block + i) * header_.block_size + offset;
|
||||
} else {
|
||||
op.source = next_data_pos_;
|
||||
}
|
||||
const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
|
||||
|
||||
// Update engine can potentially send 100MB of blocks at a time. We
|
||||
// don't want to process all those blocks in one shot as it can
|
||||
// stress the memory. Hence, process the blocks in chunks.
|
||||
//
|
||||
// 1024 blocks is reasonable given we will end up using max
|
||||
// memory of ~4MB.
|
||||
const size_t kProcessingBlocks = 1024;
|
||||
size_t num_blocks = (size / header_.block_size);
|
||||
size_t i = 0;
|
||||
|
||||
while (num_blocks) {
|
||||
size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
|
||||
|
||||
if (compression_) {
|
||||
auto data = Compress(iter, header_.block_size);
|
||||
if (data.empty()) {
|
||||
PLOG(ERROR) << "AddRawBlocks: compression failed";
|
||||
return false;
|
||||
}
|
||||
if (data.size() > std::numeric_limits<uint16_t>::max()) {
|
||||
LOG(ERROR) << "Compressed block is too large: " << data.size() << " bytes";
|
||||
return false;
|
||||
}
|
||||
op.compression = compression_;
|
||||
op.data_length = static_cast<uint16_t>(data.size());
|
||||
|
||||
if (!WriteOperation(op, data.data(), data.size())) {
|
||||
PLOG(ERROR) << "AddRawBlocks: write failed, bytes requested: " << size
|
||||
<< ", bytes written: " << i * header_.block_size;
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
op.data_length = static_cast<uint16_t>(header_.block_size);
|
||||
if (!WriteOperation(op, iter, header_.block_size)) {
|
||||
PLOG(ERROR) << "AddRawBlocks: write failed";
|
||||
if (!CompressBlocks(pending_blocks, iter)) {
|
||||
return false;
|
||||
}
|
||||
buf_iter_ = compressed_buf_.begin();
|
||||
CHECK(pending_blocks == compressed_buf_.size());
|
||||
iter += (pending_blocks * header_.block_size);
|
||||
}
|
||||
|
||||
iter += header_.block_size;
|
||||
num_blocks -= pending_blocks;
|
||||
|
||||
while (i < size / header_.block_size && pending_blocks) {
|
||||
CowOperation op = {};
|
||||
op.new_block = new_block_start + i;
|
||||
op.type = type;
|
||||
if (type == kCowXorOp) {
|
||||
op.source = (old_block + i) * header_.block_size + offset;
|
||||
} else {
|
||||
op.source = next_data_pos_;
|
||||
}
|
||||
|
||||
if (compression_) {
|
||||
auto data = std::move(*buf_iter_);
|
||||
op.compression = compression_;
|
||||
op.data_length = static_cast<uint16_t>(data.size());
|
||||
|
||||
if (!WriteOperation(op, data.data(), data.size())) {
|
||||
PLOG(ERROR) << "AddRawBlocks: write failed";
|
||||
return false;
|
||||
}
|
||||
buf_iter_++;
|
||||
} else {
|
||||
op.data_length = static_cast<uint16_t>(header_.block_size);
|
||||
if (!WriteOperation(op, iter, header_.block_size)) {
|
||||
PLOG(ERROR) << "AddRawBlocks: write failed";
|
||||
return false;
|
||||
}
|
||||
iter += header_.block_size;
|
||||
}
|
||||
|
||||
i += 1;
|
||||
pending_blocks -= 1;
|
||||
}
|
||||
|
||||
CHECK(pending_blocks == 0);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
@ -416,7 +562,7 @@ bool CowWriter::EmitLabel(uint64_t label) {
|
|||
bool CowWriter::EmitSequenceData(size_t num_ops, const uint32_t* data) {
|
||||
CHECK(!merge_in_progress_);
|
||||
size_t to_add = 0;
|
||||
size_t max_ops = std::numeric_limits<uint16_t>::max() / sizeof(uint32_t);
|
||||
size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t);
|
||||
while (num_ops > 0) {
|
||||
CowOperation op = {};
|
||||
op.type = kCowSequenceOp;
|
||||
|
|
@ -461,6 +607,11 @@ static void SHA256(const void*, size_t, uint8_t[]) {
|
|||
}
|
||||
|
||||
bool CowWriter::Finalize() {
|
||||
if (!FlushCluster()) {
|
||||
LOG(ERROR) << "Finalize: FlushCluster() failed";
|
||||
return false;
|
||||
}
|
||||
|
||||
auto continue_cluster_size = current_cluster_size_;
|
||||
auto continue_data_size = current_data_size_;
|
||||
auto continue_data_pos = next_data_pos_;
|
||||
|
|
@ -525,6 +676,9 @@ bool CowWriter::Finalize() {
|
|||
next_op_pos_ = continue_op_pos;
|
||||
footer_.op.num_ops = continue_num_ops;
|
||||
}
|
||||
|
||||
FlushCluster();
|
||||
|
||||
return Sync();
|
||||
}
|
||||
|
||||
|
|
@ -556,6 +710,35 @@ bool CowWriter::EnsureSpaceAvailable(const uint64_t bytes_needed) const {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool CowWriter::FlushCluster() {
|
||||
ssize_t ret;
|
||||
|
||||
if (op_vec_index_) {
|
||||
ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_);
|
||||
if (ret != (op_vec_index_ * sizeof(CowOperation))) {
|
||||
PLOG(ERROR) << "pwritev failed for CowOperation. Expected: "
|
||||
<< (op_vec_index_ * sizeof(CowOperation));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (data_vec_index_) {
|
||||
ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_);
|
||||
if (ret != total_data_written_) {
|
||||
PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
total_data_written_ = 0;
|
||||
op_vec_index_ = 0;
|
||||
data_vec_index_ = 0;
|
||||
current_op_pos_ = next_op_pos_;
|
||||
current_data_pos_ = next_data_pos_;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t size) {
|
||||
if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op))) {
|
||||
return false;
|
||||
|
|
@ -564,14 +747,43 @@ bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!android::base::WriteFullyAtOffset(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op),
|
||||
next_op_pos_)) {
|
||||
return false;
|
||||
}
|
||||
if (data != nullptr && size > 0) {
|
||||
if (!WriteRawData(data, size)) return false;
|
||||
if (batch_write_) {
|
||||
CowOperation* cow_op = reinterpret_cast<CowOperation*>(cowop_vec_[op_vec_index_].iov_base);
|
||||
std::memcpy(cow_op, &op, sizeof(CowOperation));
|
||||
op_vec_index_ += 1;
|
||||
|
||||
if (data != nullptr && size > 0) {
|
||||
struct iovec* data_ptr = data_vec_.get();
|
||||
std::memcpy(data_ptr[data_vec_index_].iov_base, data, size);
|
||||
data_ptr[data_vec_index_].iov_len = size;
|
||||
data_vec_index_ += 1;
|
||||
total_data_written_ += size;
|
||||
}
|
||||
} else {
|
||||
if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
|
||||
PLOG(ERROR) << "lseek failed for writing operation.";
|
||||
return false;
|
||||
}
|
||||
if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op))) {
|
||||
return false;
|
||||
}
|
||||
if (data != nullptr && size > 0) {
|
||||
if (!WriteRawData(data, size)) return false;
|
||||
}
|
||||
}
|
||||
|
||||
AddOperation(op);
|
||||
|
||||
if (batch_write_) {
|
||||
if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops ||
|
||||
op.type == kCowLabelOp || op.type == kCowClusterOp) {
|
||||
if (!FlushCluster()) {
|
||||
LOG(ERROR) << "Failed to flush cluster data";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return EmitClusterIfNeeded();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue