diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h index ba75a8df4..69079cb23 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h @@ -15,7 +15,9 @@ #pragma once #include -#include + +#include +#include namespace android { namespace snapshot { @@ -196,5 +198,8 @@ bool IsMetadataOp(const CowOperation& op); // Ops that have dependencies on old blocks, and must take care in their merge order bool IsOrderedOp(const CowOperation& op); +// Convert compression name to internal value. +std::optional CompressionAlgorithmFromString(std::string_view name); + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h index e8e4d7204..8e61a21c4 100644 --- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h +++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h @@ -65,6 +65,7 @@ class ICowReader { // Return the file header. virtual bool GetHeader(CowHeader* header) = 0; + virtual CowHeader& GetHeader() = 0; // Return the file footer. virtual bool GetFooter(CowFooter* footer) = 0; @@ -85,6 +86,19 @@ class ICowReader { // Get decoded bytes from the data section, handling any decompression. // All retrieved data is passed to the sink. virtual bool ReadData(const CowOperation& op, IByteSink* sink) = 0; + + // Get decoded bytes from the data section, handling any decompression. + // + // If ignore_bytes is non-zero, it specifies the initial number of bytes + // to skip writing to |buffer|. + // + // Returns the number of bytes written to |buffer|, or -1 on failure. + // errno is NOT set. + // + // Partial reads are not possible unless |buffer_size| is less than the + // operation block size. + virtual ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size, + size_t ignore_bytes = 0) = 0; }; // Iterate over a sequence of COW operations. @@ -140,6 +154,10 @@ class CowReader final : public ICowReader { std::unique_ptr GetMergeOpIter(bool ignore_progress = false) override; bool ReadData(const CowOperation& op, IByteSink* sink) override; + ssize_t ReadData(const CowOperation& op, void* buffer, size_t buffer_size, + size_t ignore_bytes = 0) override; + + CowHeader& GetHeader() override { return header_; } bool GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read); diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp index 862ce5511..f05aeb20f 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp @@ -24,6 +24,7 @@ #include #include #include +#include "cow_decompress.h" using testing::AssertionFailure; using testing::AssertionResult; @@ -44,23 +45,10 @@ class CowTest : public ::testing::Test { std::unique_ptr cow_; }; -// Sink that always appends to the end of a string. -class StringSink : public IByteSink { - public: - void* GetBuffer(size_t requested, size_t* actual) override { - size_t old_size = stream_.size(); - stream_.resize(old_size + requested, '\0'); - *actual = requested; - return stream_.data() + old_size; - } - bool ReturnData(void*, size_t) override { return true; } - void Reset() { stream_.clear(); } - - std::string& stream() { return stream_; } - - private: - std::string stream_; -}; +// Helper to check read sizes. +static inline bool ReadData(CowReader& reader, const CowOperation& op, void* buffer, size_t size) { + return reader.ReadData(op, buffer, size) == size; +} TEST_F(CowTest, CopyContiguous) { CowOptions options; @@ -144,7 +132,7 @@ TEST_F(CowTest, ReadWrite) { ASSERT_EQ(op->new_block, 10); ASSERT_EQ(op->source, 20); - StringSink sink; + std::string sink(data.size(), '\0'); iter->Next(); ASSERT_FALSE(iter->Done()); @@ -154,8 +142,8 @@ TEST_F(CowTest, ReadWrite) { ASSERT_EQ(op->compression, kCowCompressNone); ASSERT_EQ(op->data_length, 4096); ASSERT_EQ(op->new_block, 50); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data); iter->Next(); ASSERT_FALSE(iter->Done()); @@ -222,7 +210,7 @@ TEST_F(CowTest, ReadWriteXor) { ASSERT_EQ(op->new_block, 10); ASSERT_EQ(op->source, 20); - StringSink sink; + std::string sink(data.size(), '\0'); iter->Next(); ASSERT_FALSE(iter->Done()); @@ -233,8 +221,8 @@ TEST_F(CowTest, ReadWriteXor) { ASSERT_EQ(op->data_length, 4096); ASSERT_EQ(op->new_block, 50); ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10 - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data); iter->Next(); ASSERT_FALSE(iter->Done()); @@ -285,22 +273,22 @@ TEST_F(CowTest, CompressGz) { ASSERT_FALSE(iter->Done()); auto op = &iter->Get(); - StringSink sink; + std::string sink(data.size(), '\0'); ASSERT_EQ(op->type, kCowReplaceOp); ASSERT_EQ(op->compression, kCowCompressGz); ASSERT_EQ(op->data_length, 56); // compressed! ASSERT_EQ(op->new_block, 50); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data); iter->Next(); ASSERT_TRUE(iter->Done()); } -class CompressionRWTest : public CowTest, public testing::WithParamInterface {}; +class CompressionTest : public CowTest, public testing::WithParamInterface {}; -TEST_P(CompressionRWTest, ThreadedBatchWrites) { +TEST_P(CompressionTest, ThreadedBatchWrites) { CowOptions options; options.compression = GetParam(); options.num_compress_threads = 2; @@ -342,31 +330,32 @@ TEST_P(CompressionRWTest, ThreadedBatchWrites) { if (op->type == kCowXorOp) { total_blocks += 1; - StringSink sink; + std::string sink(xor_data.size(), '\0'); ASSERT_EQ(op->new_block, 50); ASSERT_EQ(op->source, 98314); // 4096 * 24 + 10 - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), xor_data); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, xor_data); } if (op->type == kCowReplaceOp) { total_blocks += 1; if (op->new_block == 100) { - StringSink sink; - ASSERT_TRUE(reader.ReadData(*op, &sink)); data.resize(options.block_size); - ASSERT_EQ(sink.stream(), data); + std::string sink(data.size(), '\0'); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink.size(), data.size()); + ASSERT_EQ(sink, data); } if (op->new_block == 6000) { - StringSink sink; - ASSERT_TRUE(reader.ReadData(*op, &sink)); data2.resize(options.block_size); - ASSERT_EQ(sink.stream(), data2); + std::string sink(data2.size(), '\0'); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data2); } if (op->new_block == 9000) { - StringSink sink; - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data3); + std::string sink(data3.size(), '\0'); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data3); } } @@ -376,7 +365,7 @@ TEST_P(CompressionRWTest, ThreadedBatchWrites) { ASSERT_EQ(total_blocks, expected_blocks); } -TEST_P(CompressionRWTest, NoBatchWrites) { +TEST_P(CompressionTest, NoBatchWrites) { CowOptions options; options.compression = GetParam(); options.num_compress_threads = 1; @@ -416,21 +405,21 @@ TEST_P(CompressionRWTest, NoBatchWrites) { if (op->type == kCowReplaceOp) { total_blocks += 1; if (op->new_block == 50) { - StringSink sink; - ASSERT_TRUE(reader.ReadData(*op, &sink)); data.resize(options.block_size); - ASSERT_EQ(sink.stream(), data); + std::string sink(data.size(), '\0'); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data); } if (op->new_block == 3000) { - StringSink sink; - ASSERT_TRUE(reader.ReadData(*op, &sink)); data2.resize(options.block_size); - ASSERT_EQ(sink.stream(), data2); + std::string sink(data2.size(), '\0'); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data2); } if (op->new_block == 5000) { - StringSink sink; - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data3); + std::string sink(data3.size(), '\0'); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data3); } } @@ -440,7 +429,66 @@ TEST_P(CompressionRWTest, NoBatchWrites) { ASSERT_EQ(total_blocks, expected_blocks); } -INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4")); +template +class HorribleStream : public IByteStream { + public: + HorribleStream(const std::basic_string& input) : input_(input) {} + + ssize_t Read(void* buffer, size_t length) override { + if (pos_ >= input_.size()) { + return 0; + } + if (length) { + *reinterpret_cast(buffer) = input_[pos_]; + } + pos_++; + return 1; + } + size_t Size() const override { return input_.size(); } + + private: + std::basic_string input_; + size_t pos_ = 0; +}; + +TEST(HorribleStream, ReadFully) { + std::string expected = "this is some data"; + + HorribleStream stream(expected); + + std::string buffer(expected.size(), '\0'); + ASSERT_TRUE(stream.ReadFully(buffer.data(), buffer.size())); + ASSERT_EQ(buffer, expected); +} + +TEST_P(CompressionTest, HorribleStream) { + if (strcmp(GetParam(), "none") == 0) { + GTEST_SKIP(); + } + + auto algorithm = CompressionAlgorithmFromString(GetParam()); + ASSERT_TRUE(algorithm.has_value()); + + std::string expected = "The quick brown fox jumps over the lazy dog."; + expected.resize(4096, '\0'); + + auto result = CompressWorker::Compress(*algorithm, expected.data(), expected.size()); + ASSERT_FALSE(result.empty()); + + HorribleStream stream(result); + auto decomp = IDecompressor::FromString(GetParam()); + ASSERT_NE(decomp, nullptr); + decomp->set_stream(&stream); + + expected = expected.substr(10, 500); + + std::string buffer(expected.size(), '\0'); + ASSERT_EQ(decomp->Decompress(buffer.data(), 500, 4096, 10), 500); + ASSERT_EQ(buffer, expected); +} + +INSTANTIATE_TEST_SUITE_P(AllCompressors, CompressionTest, + testing::Values("none", "gz", "brotli", "lz4")); TEST_F(CowTest, ClusterCompressGz) { CowOptions options; @@ -470,14 +518,14 @@ TEST_F(CowTest, ClusterCompressGz) { ASSERT_FALSE(iter->Done()); auto op = &iter->Get(); - StringSink sink; + std::string sink(data.size(), '\0'); ASSERT_EQ(op->type, kCowReplaceOp); ASSERT_EQ(op->compression, kCowCompressGz); ASSERT_EQ(op->data_length, 56); // compressed! ASSERT_EQ(op->new_block, 50); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data); iter->Next(); ASSERT_FALSE(iter->Done()); @@ -489,12 +537,13 @@ TEST_F(CowTest, ClusterCompressGz) { ASSERT_FALSE(iter->Done()); op = &iter->Get(); - sink.Reset(); + sink = {}; + sink.resize(data2.size(), '\0'); ASSERT_EQ(op->compression, kCowCompressGz); ASSERT_EQ(op->data_length, 41); // compressed! ASSERT_EQ(op->new_block, 51); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data2); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data2); iter->Next(); ASSERT_FALSE(iter->Done()); @@ -531,55 +580,15 @@ TEST_F(CowTest, CompressTwoBlocks) { iter->Next(); ASSERT_FALSE(iter->Done()); - StringSink sink; + std::string sink(options.block_size, '\0'); auto op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); ASSERT_EQ(op->compression, kCowCompressGz); ASSERT_EQ(op->new_block, 51); - ASSERT_TRUE(reader.ReadData(*op, &sink)); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); } -// Only return 1-byte buffers, to stress test the partial read logic in -// CowReader. -class HorribleStringSink : public StringSink { - public: - void* GetBuffer(size_t, size_t* actual) override { return StringSink::GetBuffer(1, actual); } -}; - -class CompressionTest : public CowTest, public testing::WithParamInterface {}; - -TEST_P(CompressionTest, HorribleSink) { - CowOptions options; - options.compression = GetParam(); - options.cluster_ops = 0; - CowWriter writer(options); - - ASSERT_TRUE(writer.Initialize(cow_->fd)); - - std::string data = "This is some data, believe it"; - data.resize(options.block_size, '\0'); - - ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size())); - ASSERT_TRUE(writer.Finalize()); - - ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0); - - CowReader reader; - ASSERT_TRUE(reader.Parse(cow_->fd)); - - auto iter = reader.GetOpIter(); - ASSERT_NE(iter, nullptr); - ASSERT_FALSE(iter->Done()); - - HorribleStringSink sink; - auto op = &iter->Get(); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data); -} - -INSTANTIATE_TEST_SUITE_P(CowApi, CompressionTest, testing::Values("none", "gz", "brotli")); - TEST_F(CowTest, GetSize) { CowOptions options; options.cluster_ops = 0; @@ -641,7 +650,7 @@ TEST_F(CowTest, AppendLabelSmall) { ASSERT_TRUE(reader.GetLastLabel(&label)); ASSERT_EQ(label, 3); - StringSink sink; + std::string sink(data.size(), '\0'); auto iter = reader.GetOpIter(); ASSERT_NE(iter, nullptr); @@ -649,11 +658,12 @@ TEST_F(CowTest, AppendLabelSmall) { ASSERT_FALSE(iter->Done()); auto op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data); iter->Next(); - sink.Reset(); + sink = {}; + sink.resize(data2.size(), '\0'); ASSERT_FALSE(iter->Done()); op = &iter->Get(); @@ -665,8 +675,8 @@ TEST_F(CowTest, AppendLabelSmall) { ASSERT_FALSE(iter->Done()); op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data2); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data2); iter->Next(); ASSERT_TRUE(iter->Done()); @@ -705,8 +715,6 @@ TEST_F(CowTest, AppendLabelMissing) { CowReader reader; ASSERT_TRUE(reader.Parse(cow_->fd)); - StringSink sink; - auto iter = reader.GetOpIter(); ASSERT_NE(iter, nullptr); @@ -765,8 +773,6 @@ TEST_F(CowTest, AppendExtendedCorrupted) { CowReader reader; ASSERT_TRUE(reader.Parse(cow_->fd)); - StringSink sink; - auto iter = reader.GetOpIter(); ASSERT_NE(iter, nullptr); @@ -816,7 +822,7 @@ TEST_F(CowTest, AppendbyLabel) { CowReader reader; ASSERT_TRUE(reader.Parse(cow_->fd)); - StringSink sink; + std::string sink(options.block_size, '\0'); auto iter = reader.GetOpIter(); ASSERT_NE(iter, nullptr); @@ -824,20 +830,20 @@ TEST_F(CowTest, AppendbyLabel) { ASSERT_FALSE(iter->Done()); auto op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data.substr(0, options.block_size)); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data.substr(0, options.block_size)); iter->Next(); - sink.Reset(); + sink = {}; + sink.resize(options.block_size, '\0'); ASSERT_FALSE(iter->Done()); op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data.substr(options.block_size, 2 * options.block_size)); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data.substr(options.block_size, 2 * options.block_size)); iter->Next(); - sink.Reset(); ASSERT_FALSE(iter->Done()); op = &iter->Get(); @@ -897,7 +903,7 @@ TEST_F(CowTest, ClusterTest) { CowReader reader; ASSERT_TRUE(reader.Parse(cow_->fd)); - StringSink sink; + std::string sink(data.size(), '\0'); auto iter = reader.GetOpIter(); ASSERT_NE(iter, nullptr); @@ -905,11 +911,10 @@ TEST_F(CowTest, ClusterTest) { ASSERT_FALSE(iter->Done()); auto op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data.substr(0, options.block_size)); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data.substr(0, options.block_size)); iter->Next(); - sink.Reset(); ASSERT_FALSE(iter->Done()); op = &iter->Get(); @@ -997,7 +1002,7 @@ TEST_F(CowTest, ClusterAppendTest) { ASSERT_TRUE(reader.GetLastLabel(&label)); ASSERT_EQ(label, 50); - StringSink sink; + std::string sink(data2.size(), '\0'); auto iter = reader.GetOpIter(); ASSERT_NE(iter, nullptr); @@ -1012,8 +1017,8 @@ TEST_F(CowTest, ClusterAppendTest) { ASSERT_FALSE(iter->Done()); op = &iter->Get(); ASSERT_EQ(op->type, kCowReplaceOp); - ASSERT_TRUE(reader.ReadData(*op, &sink)); - ASSERT_EQ(sink.stream(), data2); + ASSERT_TRUE(ReadData(reader, *op, sink.data(), sink.size())); + ASSERT_EQ(sink, data2); iter->Next(); @@ -1066,13 +1071,13 @@ AssertionResult CompareDataBlock(CowReader* reader, const CowOperation& op, std::string cmp = data; cmp.resize(header.block_size, '\0'); - StringSink sink; - if (!reader->ReadData(op, &sink)) { + std::string sink(cmp.size(), '\0'); + if (!reader->ReadData(op, sink.data(), sink.size())) { return AssertionFailure() << "Failed to read data block"; } - if (cmp != sink.stream()) { + if (cmp != sink) { return AssertionFailure() << "Data blocks did not match, expected " << cmp << ", got " - << sink.stream(); + << sink; } return AssertionSuccess(); diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp index 9b5098689..d06c904ba 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp @@ -32,6 +32,21 @@ namespace android { namespace snapshot { + +std::optional CompressionAlgorithmFromString(std::string_view name) { + if (name == "gz") { + return {kCowCompressGz}; + } else if (name == "brotli") { + return {kCowCompressBrotli}; + } else if (name == "lz4") { + return {kCowCompressLz4}; + } else if (name == "none" || name.empty()) { + return {kCowCompressNone}; + } else { + return {}; + } +} + std::basic_string CompressWorker::Compress(const void* data, size_t length) { return Compress(compression_, data, length); } diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp index 139a29f22..483d559b9 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.cpp @@ -16,6 +16,7 @@ #include "cow_decompress.h" +#include #include #include @@ -26,9 +27,50 @@ namespace android { namespace snapshot { +ssize_t IByteStream::ReadFully(void* buffer, size_t buffer_size) { + size_t stream_remaining = Size(); + + char* buffer_start = reinterpret_cast(buffer); + char* buffer_pos = buffer_start; + size_t buffer_remaining = buffer_size; + while (stream_remaining) { + const size_t to_read = std::min(buffer_remaining, stream_remaining); + const ssize_t actual_read = Read(buffer_pos, to_read); + if (actual_read < 0) { + return -1; + } + if (!actual_read) { + LOG(ERROR) << "Stream ended prematurely"; + return -1; + } + CHECK_LE(actual_read, to_read); + + stream_remaining -= actual_read; + buffer_pos += actual_read; + buffer_remaining -= actual_read; + } + return buffer_pos - buffer_start; +} + +std::unique_ptr IDecompressor::FromString(std::string_view compressor) { + if (compressor == "lz4") { + return IDecompressor::Lz4(); + } else if (compressor == "brotli") { + return IDecompressor::Brotli(); + } else if (compressor == "gz") { + return IDecompressor::Gz(); + } else { + return nullptr; + } +} + class NoDecompressor final : public IDecompressor { public: bool Decompress(size_t) override; + ssize_t Decompress(void*, size_t, size_t, size_t) override { + LOG(ERROR) << "Not supported"; + return -1; + } }; bool NoDecompressor::Decompress(size_t) { @@ -45,8 +87,8 @@ bool NoDecompressor::Decompress(size_t) { uint8_t* buffer_pos = buffer; size_t bytes_to_read = std::min(buffer_size, stream_remaining); while (bytes_to_read) { - size_t read; - if (!stream_->Read(buffer_pos, bytes_to_read, &read)) { + ssize_t read = stream_->Read(buffer_pos, bytes_to_read); + if (read < 0) { return false; } if (!read) { @@ -73,10 +115,13 @@ std::unique_ptr IDecompressor::Uncompressed() { class StreamDecompressor : public IDecompressor { public: bool Decompress(size_t output_bytes) override; + ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size, + size_t ignore_bytes) override; virtual bool Init() = 0; virtual bool DecompressInput(const uint8_t* data, size_t length) = 0; - virtual bool Done() = 0; + virtual bool PartialDecompress(const uint8_t* data, size_t length) = 0; + bool OutputFull() const { return !ignore_bytes_ && !output_buffer_remaining_; } protected: bool GetFreshBuffer(); @@ -85,6 +130,8 @@ class StreamDecompressor : public IDecompressor { size_t stream_remaining_; uint8_t* output_buffer_ = nullptr; size_t output_buffer_remaining_ = 0; + size_t ignore_bytes_ = 0; + bool decompressor_ended_ = false; }; static constexpr size_t kChunkSize = 4096; @@ -99,8 +146,9 @@ bool StreamDecompressor::Decompress(size_t output_bytes) { uint8_t chunk[kChunkSize]; while (stream_remaining_) { - size_t read = std::min(stream_remaining_, sizeof(chunk)); - if (!stream_->Read(chunk, read, &read)) { + size_t max_read = std::min(stream_remaining_, sizeof(chunk)); + ssize_t read = stream_->Read(chunk, max_read); + if (read < 0) { return false; } if (!read) { @@ -113,18 +161,65 @@ bool StreamDecompressor::Decompress(size_t output_bytes) { stream_remaining_ -= read; - if (stream_remaining_ && Done()) { + if (stream_remaining_ && decompressor_ended_) { LOG(ERROR) << "Decompressor terminated early"; return false; } } - if (!Done()) { + if (!decompressor_ended_) { LOG(ERROR) << "Decompressor expected more bytes"; return false; } return true; } +ssize_t StreamDecompressor::Decompress(void* buffer, size_t buffer_size, size_t, + size_t ignore_bytes) { + if (!Init()) { + return false; + } + + stream_remaining_ = stream_->Size(); + output_buffer_ = reinterpret_cast(buffer); + output_buffer_remaining_ = buffer_size; + ignore_bytes_ = ignore_bytes; + + uint8_t chunk[kChunkSize]; + while (stream_remaining_ && output_buffer_remaining_ && !decompressor_ended_) { + size_t max_read = std::min(stream_remaining_, sizeof(chunk)); + ssize_t read = stream_->Read(chunk, max_read); + if (read < 0) { + return -1; + } + if (!read) { + LOG(ERROR) << "Stream ended prematurely"; + return -1; + } + if (!PartialDecompress(chunk, read)) { + return -1; + } + stream_remaining_ -= read; + } + + if (stream_remaining_) { + if (decompressor_ended_ && !OutputFull()) { + // If there's more input in the stream, but we haven't finished + // consuming ignored bytes or available output space yet, then + // something weird happened. Report it and fail. + LOG(ERROR) << "Decompressor terminated early"; + return -1; + } + } else { + if (!decompressor_ended_ && !OutputFull()) { + // The stream ended, but the decoder doesn't think so, and there are + // more bytes in the output buffer. + LOG(ERROR) << "Decompressor expected more bytes"; + return -1; + } + } + return buffer_size - output_buffer_remaining_; +} + bool StreamDecompressor::GetFreshBuffer() { size_t request_size = std::min(output_bytes_, kChunkSize); output_buffer_ = @@ -142,11 +237,10 @@ class GzDecompressor final : public StreamDecompressor { bool Init() override; bool DecompressInput(const uint8_t* data, size_t length) override; - bool Done() override { return ended_; } + bool PartialDecompress(const uint8_t* data, size_t length) override; private: z_stream z_ = {}; - bool ended_ = false; }; bool GzDecompressor::Init() { @@ -198,7 +292,61 @@ bool GzDecompressor::DecompressInput(const uint8_t* data, size_t length) { LOG(ERROR) << "Gz stream ended prematurely"; return false; } - ended_ = true; + decompressor_ended_ = true; + return true; + } + } + return true; +} + +bool GzDecompressor::PartialDecompress(const uint8_t* data, size_t length) { + z_.next_in = reinterpret_cast(const_cast(data)); + z_.avail_in = length; + + // If we're asked to ignore starting bytes, we sink those into the output + // repeatedly until there is nothing left to ignore. + while (ignore_bytes_ && z_.avail_in) { + std::array ignore_buffer; + size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size()); + z_.next_out = ignore_buffer.data(); + z_.avail_out = max_ignore; + + int rv = inflate(&z_, Z_NO_FLUSH); + if (rv != Z_OK && rv != Z_STREAM_END) { + LOG(ERROR) << "inflate returned error code " << rv; + return false; + } + + size_t returned = max_ignore - z_.avail_out; + CHECK_LE(returned, ignore_bytes_); + + ignore_bytes_ -= returned; + + if (rv == Z_STREAM_END) { + decompressor_ended_ = true; + return true; + } + } + + z_.next_out = reinterpret_cast(output_buffer_); + z_.avail_out = output_buffer_remaining_; + + while (z_.avail_in && z_.avail_out) { + // Decompress. + int rv = inflate(&z_, Z_NO_FLUSH); + if (rv != Z_OK && rv != Z_STREAM_END) { + LOG(ERROR) << "inflate returned error code " << rv; + return false; + } + + size_t returned = output_buffer_remaining_ - z_.avail_out; + CHECK_LE(returned, output_buffer_remaining_); + + output_buffer_ += returned; + output_buffer_remaining_ -= returned; + + if (rv == Z_STREAM_END) { + decompressor_ended_ = true; return true; } } @@ -215,7 +363,7 @@ class BrotliDecompressor final : public StreamDecompressor { bool Init() override; bool DecompressInput(const uint8_t* data, size_t length) override; - bool Done() override { return BrotliDecoderIsFinished(decoder_); } + bool PartialDecompress(const uint8_t* data, size_t length) override; private: BrotliDecoderState* decoder_ = nullptr; @@ -257,6 +405,44 @@ bool BrotliDecompressor::DecompressInput(const uint8_t* data, size_t length) { return true; } +bool BrotliDecompressor::PartialDecompress(const uint8_t* data, size_t length) { + size_t available_in = length; + const uint8_t* next_in = data; + + while (available_in && ignore_bytes_ && !BrotliDecoderIsFinished(decoder_)) { + std::array ignore_buffer; + size_t max_ignore = std::min(ignore_bytes_, ignore_buffer.size()); + size_t ignore_size = max_ignore; + + uint8_t* ignore_buffer_ptr = ignore_buffer.data(); + auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in, &ignore_size, + &ignore_buffer_ptr, nullptr); + if (r == BROTLI_DECODER_RESULT_ERROR) { + LOG(ERROR) << "brotli decode failed"; + return false; + } else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) { + LOG(ERROR) << "brotli unexpected needs more input"; + return false; + } + ignore_bytes_ -= max_ignore - ignore_size; + } + + while (available_in && !BrotliDecoderIsFinished(decoder_)) { + auto r = BrotliDecoderDecompressStream(decoder_, &available_in, &next_in, + &output_buffer_remaining_, &output_buffer_, nullptr); + if (r == BROTLI_DECODER_RESULT_ERROR) { + LOG(ERROR) << "brotli decode failed"; + return false; + } else if (r == BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT && available_in) { + LOG(ERROR) << "brotli unexpected needs more input"; + return false; + } + } + + decompressor_ended_ = BrotliDecoderIsFinished(decoder_); + return true; +} + std::unique_ptr IDecompressor::Brotli() { return std::unique_ptr(new BrotliDecompressor()); } @@ -275,8 +461,7 @@ class Lz4Decompressor final : public IDecompressor { } // If input size is same as output size, then input is uncompressed. if (stream_->Size() == output_size) { - size_t bytes_read = 0; - stream_->Read(output_buffer, output_size, &bytes_read); + ssize_t bytes_read = stream_->ReadFully(output_buffer, output_size); if (bytes_read != output_size) { LOG(ERROR) << "Failed to read all input at once. Expected: " << output_size << " actual: " << bytes_read; @@ -287,8 +472,7 @@ class Lz4Decompressor final : public IDecompressor { } std::string input_buffer; input_buffer.resize(stream_->Size()); - size_t bytes_read = 0; - stream_->Read(input_buffer.data(), input_buffer.size(), &bytes_read); + ssize_t bytes_read = stream_->ReadFully(input_buffer.data(), input_buffer.size()); if (bytes_read != input_buffer.size()) { LOG(ERROR) << "Failed to read all input at once. Expected: " << input_buffer.size() << " actual: " << bytes_read; @@ -305,6 +489,61 @@ class Lz4Decompressor final : public IDecompressor { sink_->ReturnData(output_buffer, output_size); return true; } + + ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size, + size_t ignore_bytes) override { + std::string input_buffer(stream_->Size(), '\0'); + ssize_t streamed_in = stream_->ReadFully(input_buffer.data(), input_buffer.size()); + if (streamed_in < 0) { + return -1; + } + CHECK_EQ(streamed_in, stream_->Size()); + + char* decode_buffer = reinterpret_cast(buffer); + size_t decode_buffer_size = buffer_size; + + // It's unclear if LZ4 can exactly satisfy a partial decode request, so + // if we get one, create a temporary buffer. + std::string temp; + if (buffer_size < decompressed_size) { + temp.resize(decompressed_size, '\0'); + decode_buffer = temp.data(); + decode_buffer_size = temp.size(); + } + + const int bytes_decompressed = LZ4_decompress_safe(input_buffer.data(), decode_buffer, + input_buffer.size(), decode_buffer_size); + if (bytes_decompressed < 0) { + LOG(ERROR) << "Failed to decompress LZ4 block, code: " << bytes_decompressed; + return -1; + } + if (bytes_decompressed != decompressed_size) { + LOG(ERROR) << "Failed to decompress LZ4 block, expected output size: " + << bytes_decompressed << ", actual: " << bytes_decompressed; + return -1; + } + CHECK_LE(bytes_decompressed, decode_buffer_size); + + if (ignore_bytes > bytes_decompressed) { + LOG(ERROR) << "Ignoring more bytes than exist in stream (ignoring " << ignore_bytes + << ", got " << bytes_decompressed << ")"; + return -1; + } + + if (temp.empty()) { + // LZ4's API has no way to sink out the first N bytes of decoding, + // so we read them all in and memmove() to drop the partial read. + if (ignore_bytes) { + memmove(decode_buffer, decode_buffer + ignore_bytes, + bytes_decompressed - ignore_bytes); + } + return bytes_decompressed - ignore_bytes; + } + + size_t max_copy = std::min(bytes_decompressed - ignore_bytes, buffer_size); + memcpy(buffer, temp.data() + ignore_bytes, max_copy); + return max_copy; + } }; std::unique_ptr IDecompressor::Lz4() { diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h index 7f74eda00..09164d315 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_decompress.h @@ -26,11 +26,16 @@ class IByteStream { virtual ~IByteStream() {} // Read up to |length| bytes, storing the number of bytes read in the out- - // parameter. If the end of the stream is reached, 0 is returned. - virtual bool Read(void* buffer, size_t length, size_t* read) = 0; + // parameter. If the end of the stream is reached, 0 is returned. On error, + // -1 is returned. errno is NOT set. + virtual ssize_t Read(void* buffer, size_t length) = 0; // Size of the stream. virtual size_t Size() const = 0; + + // Helper for Read(). Read the entire stream into |buffer|, up to |length| + // bytes. + ssize_t ReadFully(void* buffer, size_t length); }; class IDecompressor { @@ -43,9 +48,21 @@ class IDecompressor { static std::unique_ptr Brotli(); static std::unique_ptr Lz4(); + static std::unique_ptr FromString(std::string_view compressor); + // |output_bytes| is the expected total number of bytes to sink. virtual bool Decompress(size_t output_bytes) = 0; + // Decompress at most |buffer_size| bytes, ignoring the first |ignore_bytes| + // of the decoded stream. |buffer_size| must be at least one byte. + // |decompressed_size| is the expected total size if the entire stream were + // decompressed. + // + // Returns the number of bytes written to |buffer|, or -1 on error. errno + // is NOT set. + virtual ssize_t Decompress(void* buffer, size_t buffer_size, size_t decompressed_size, + size_t ignore_bytes = 0) = 0; + void set_stream(IByteStream* stream) { stream_ = stream; } void set_sink(IByteSink* sink) { sink_ = sink; } diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp index 45be1912e..e583ff077 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp @@ -747,18 +747,18 @@ class CowDataStream final : public IByteStream { remaining_ = data_length_; } - bool Read(void* buffer, size_t length, size_t* read) override { + ssize_t Read(void* buffer, size_t length) override { size_t to_read = std::min(length, remaining_); if (!to_read) { - *read = 0; - return true; + return 0; } - if (!reader_->GetRawBytes(offset_, buffer, to_read, read)) { - return false; + size_t read; + if (!reader_->GetRawBytes(offset_, buffer, to_read, &read)) { + return -1; } - offset_ += *read; - remaining_ -= *read; - return true; + offset_ += read; + remaining_ -= read; + return read; } size_t Size() const override { return data_length_; } @@ -802,5 +802,44 @@ bool CowReader::ReadData(const CowOperation& op, IByteSink* sink) { return decompressor->Decompress(header_.block_size); } +ssize_t CowReader::ReadData(const CowOperation& op, void* buffer, size_t buffer_size, + size_t ignore_bytes) { + std::unique_ptr decompressor; + switch (op.compression) { + case kCowCompressNone: + break; + case kCowCompressGz: + decompressor = IDecompressor::Gz(); + break; + case kCowCompressBrotli: + decompressor = IDecompressor::Brotli(); + break; + case kCowCompressLz4: + if (header_.block_size != op.data_length) { + decompressor = IDecompressor::Lz4(); + } + break; + default: + LOG(ERROR) << "Unknown compression type: " << op.compression; + return -1; + } + + uint64_t offset; + if (op.type == kCowXorOp) { + offset = data_loc_->at(op.new_block); + } else { + offset = op.source; + } + + if (!decompressor) { + CowDataStream stream(this, offset + ignore_bytes, op.data_length - ignore_bytes); + return stream.ReadFully(buffer, buffer_size); + } + + CowDataStream stream(this, offset, op.data_length); + decompressor->set_stream(&stream); + return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes); +} + } // namespace snapshot } // namespace android diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp index 56b48f0c7..042ffb400 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp @@ -37,6 +37,14 @@ #include #include +// The info messages here are spammy, but as useful for update_engine. Disable +// them when running on the host. +#ifdef __ANDROID__ +#define LOG_INFO LOG(INFO) +#else +#define LOG_INFO LOG(VERBOSE) +#endif + namespace android { namespace snapshot { @@ -194,18 +202,13 @@ void CowWriter::SetupHeaders() { } bool CowWriter::ParseOptions() { - if (options_.compression == "gz") { - compression_ = kCowCompressGz; - } else if (options_.compression == "brotli") { - compression_ = kCowCompressBrotli; - } else if (options_.compression == "lz4") { - compression_ = kCowCompressLz4; - } else if (options_.compression == "none") { - compression_ = kCowCompressNone; - } else if (!options_.compression.empty()) { + auto algorithm = CompressionAlgorithmFromString(options_.compression); + if (!algorithm) { LOG(ERROR) << "unrecognized compression: " << options_.compression; return false; } + compression_ = *algorithm; + if (options_.cluster_ops == 1) { LOG(ERROR) << "Clusters must contain at least two operations to function."; return false; @@ -239,10 +242,10 @@ bool CowWriter::SetFd(android::base::borrowed_fd fd) { return false; } cow_image_size_ = size_in_bytes; - LOG(INFO) << "COW image " << file_path << " has size " << size_in_bytes; + LOG_INFO << "COW image " << file_path << " has size " << size_in_bytes; } else { - LOG(INFO) << "COW image " << file_path - << " is not a block device, assuming unlimited space."; + LOG_INFO << "COW image " << file_path + << " is not a block device, assuming unlimited space."; } } return true; @@ -271,12 +274,12 @@ void CowWriter::InitBatchWrites() { } std::string batch_write = batch_write_ ? "enabled" : "disabled"; - LOG(INFO) << "Batch writes: " << batch_write; + LOG_INFO << "Batch writes: " << batch_write; } void CowWriter::InitWorkers() { if (num_compress_threads_ <= 1) { - LOG(INFO) << "Not creating new threads for compression."; + LOG_INFO << "Not creating new threads for compression."; return; } for (int i = 0; i < num_compress_threads_; i++) { @@ -285,7 +288,7 @@ void CowWriter::InitWorkers() { compress_threads_.push_back(std::move(wt)); } - LOG(INFO) << num_compress_threads_ << " thread used for compression"; + LOG_INFO << num_compress_threads_ << " thread used for compression"; } bool CowWriter::Initialize(unique_fd&& fd) { diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp index 167ff8c35..271615679 100644 --- a/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp +++ b/fs_mgr/libsnapshot/libsnapshot_cow/inspect_cow.cpp @@ -63,24 +63,6 @@ struct Options { bool include_merged; }; -// Sink that always appends to the end of a string. -class StringSink : public IByteSink { - public: - void* GetBuffer(size_t requested, size_t* actual) override { - size_t old_size = stream_.size(); - stream_.resize(old_size + requested, '\0'); - *actual = requested; - return stream_.data() + old_size; - } - bool ReturnData(void*, size_t) override { return true; } - void Reset() { stream_.clear(); } - - std::string& stream() { return stream_; } - - private: - std::string stream_; -}; - static void ShowBad(CowReader& reader, const struct CowOperation& op) { size_t count; auto buffer = std::make_unique(op.data_length); @@ -153,7 +135,9 @@ static bool Inspect(const std::string& path, Options opt) { } else if (opt.iter_type == Merge) { iter = reader.GetMergeOpIter(opt.include_merged); } - StringSink sink; + + std::string buffer(header.block_size, '\0'); + bool success = true; uint64_t xor_ops = 0, copy_ops = 0, replace_ops = 0, zero_ops = 0; while (!iter->Done()) { @@ -162,12 +146,11 @@ static bool Inspect(const std::string& path, Options opt) { if (!opt.silent && opt.show_ops) std::cout << op << "\n"; if (opt.decompress && op.type == kCowReplaceOp && op.compression != kCowCompressNone) { - if (!reader.ReadData(op, &sink)) { + if (reader.ReadData(op, buffer.data(), buffer.size()) < 0) { std::cerr << "Failed to decompress for :" << op << "\n"; success = false; if (opt.show_bad) ShowBad(reader, op); } - sink.Reset(); } if (op.type == kCowSequenceOp && opt.show_seq) {