Merge "snapuserd: Refactor how buffers are managed." into main am: 72c3ca1604

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2633935

Change-Id: Ia76296ec20f62095d3d49f1e78a9789727beedea
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
David Anderson 2023-07-18 17:32:58 +00:00 committed by Automerger Merge Worker
commit a99f70a27b
6 changed files with 117 additions and 169 deletions

View file

@ -36,6 +36,21 @@ class BufferSink final {
struct dm_user_header* GetHeaderPtr();
void ResetBufferOffset() { buffer_offset_ = 0; }
void* GetPayloadBufPtr();
loff_t GetPayloadBytesWritten() { return buffer_offset_; }
// Same as calling GetPayloadBuffer and then UpdateBufferOffset.
//
// This is preferred over GetPayloadBuffer as it does not require a
// separate call to UpdateBufferOffset.
void* AcquireBuffer(size_t size) { return AcquireBuffer(size, size); }
// Same as AcquireBuffer, but separates the requested size from the buffer
// offset. This is useful for a situation where a full run of data will be
// read, but only a partial amount will be returned.
//
// If size != to_write, the excess bytes may be reallocated by the next
// call to AcquireBuffer.
void* AcquireBuffer(size_t size, size_t to_write);
private:
std::unique_ptr<uint8_t[]> buffer_;
@ -43,19 +58,5 @@ class BufferSink final {
size_t buffer_size_;
};
class XorSink final {
public:
void Initialize(BufferSink* sink, size_t size);
void Reset();
void* GetBuffer(size_t requested, size_t* actual);
bool ReturnData(void* buffer, size_t len);
private:
BufferSink* bufsink_;
std::unique_ptr<uint8_t[]> buffer_;
size_t buffer_size_;
size_t returned_;
};
} // namespace snapshot
} // namespace android

View file

@ -15,6 +15,8 @@
*/
#include <snapuserd/snapuserd_buffer.h>
#include <android-base/logging.h>
#include <snapuserd/snapuserd_kernel.h>
namespace android {
@ -26,11 +28,23 @@ void BufferSink::Initialize(size_t size) {
buffer_ = std::make_unique<uint8_t[]>(size);
}
void* BufferSink::GetPayloadBuffer(size_t size) {
if ((buffer_size_ - buffer_offset_) < size) return nullptr;
void* BufferSink::AcquireBuffer(size_t size, size_t to_write) {
CHECK(to_write <= size);
void* ptr = GetPayloadBuffer(size);
if (!ptr) {
return nullptr;
}
UpdateBufferOffset(to_write);
return ptr;
}
void* BufferSink::GetPayloadBuffer(size_t size) {
char* buffer = reinterpret_cast<char*>(GetBufPtr());
struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
if ((buffer_size_ - buffer_offset_ - sizeof(msg->header)) < size) {
return nullptr;
}
return (char*)msg->payload.buf + buffer_offset_;
}
@ -59,38 +73,5 @@ void* BufferSink::GetPayloadBufPtr() {
return msg->payload.buf;
}
void XorSink::Initialize(BufferSink* sink, size_t size) {
bufsink_ = sink;
buffer_size_ = size;
returned_ = 0;
buffer_ = std::make_unique<uint8_t[]>(size);
}
void XorSink::Reset() {
returned_ = 0;
}
void* XorSink::GetBuffer(size_t requested, size_t* actual) {
if (requested > buffer_size_) {
*actual = buffer_size_;
} else {
*actual = requested;
}
return buffer_.get();
}
bool XorSink::ReturnData(void* buffer, size_t len) {
uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
if (buff == nullptr) {
return false;
}
for (size_t i = 0; i < len; i++) {
buff[returned_ + i] ^= xor_data[i];
}
returned_ += len;
return true;
}
} // namespace snapshot
} // namespace android

View file

@ -42,12 +42,7 @@ ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing
// Start the replace operation. This will read the
// internal COW format and if the block is compressed,
// it will be de-compressed.
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (!buffer) {
SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
return false;
}
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer) {
if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
return false;
@ -55,12 +50,7 @@ bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
return true;
}
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
return false;
}
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op, void* buffer) {
uint64_t offset;
if (!reader_->GetSourceOffset(cow_op, &offset)) {
SNAP_LOG(ERROR) << "ReadFromSourceDevice: Failed to get source offset";
@ -85,60 +75,43 @@ bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
// Start the copy operation. This will read the backing
// block device which is represented by cow_op->source.
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op, void* buffer) {
if (!ReadFromSourceDevice(cow_op, buffer)) {
return false;
}
return true;
}
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
if (!ReadFromSourceDevice(cow_op)) {
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op, void* buffer) {
if (!ReadFromSourceDevice(cow_op, buffer)) {
return false;
}
xorsink_.Reset();
size_t actual = 0;
void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual);
if (!buffer || actual < BLOCK_SZ) {
SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got "
<< actual;
return false;
if (xor_buffer_.empty()) {
xor_buffer_.resize(BLOCK_SZ);
}
ssize_t size = reader_->ReadData(cow_op, buffer, BLOCK_SZ);
CHECK(xor_buffer_.size() == BLOCK_SZ);
ssize_t size = reader_->ReadData(cow_op, xor_buffer_.data(), xor_buffer_.size());
if (size != BLOCK_SZ) {
SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block
<< ", return value: " << size;
return false;
}
if (!xorsink_.ReturnData(buffer, size)) {
SNAP_LOG(ERROR) << "ProcessXorOp failed to return data";
return false;
auto xor_out = reinterpret_cast<uint8_t*>(buffer);
for (size_t i = 0; i < BLOCK_SZ; i++) {
xor_out[i] ^= xor_buffer_[i];
}
return true;
}
bool ReadWorker::ProcessZeroOp() {
// Zero out the entire block
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
return false;
}
bool ReadWorker::ProcessZeroOp(void* buffer) {
memset(buffer, 0, BLOCK_SZ);
return true;
}
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
return false;
}
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op, void* buffer) {
MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);
switch (state) {
@ -148,7 +121,7 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
<< (cow_op->new_block >> SECTOR_SHIFT)
<< " Block-number: " << cow_op->new_block;
if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), buffer, BLOCK_SZ)) {
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
<< (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
return false;
@ -158,9 +131,9 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
bool ret;
if (cow_op->type == kCowCopyOp) {
ret = ProcessCopyOp(cow_op);
ret = ProcessCopyOp(cow_op, buffer);
} else {
ret = ProcessXorOp(cow_op);
ret = ProcessXorOp(cow_op, buffer);
}
// I/O is complete - decrement the refcount irrespective of the return
@ -185,7 +158,7 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
return false;
}
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op, void* buffer) {
if (cow_op == nullptr) {
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
return false;
@ -193,17 +166,17 @@ bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
switch (cow_op->type) {
case kCowReplaceOp: {
return ProcessReplaceOp(cow_op);
return ProcessReplaceOp(cow_op, buffer);
}
case kCowZeroOp: {
return ProcessZeroOp();
return ProcessZeroOp(buffer);
}
case kCowCopyOp:
[[fallthrough]];
case kCowXorOp: {
return ProcessOrderedOp(cow_op);
return ProcessOrderedOp(cow_op, buffer);
}
default: {
@ -229,8 +202,6 @@ bool ReadWorker::Init() {
SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
return false;
}
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
return true;
}
@ -271,18 +242,15 @@ bool ReadWorker::WriteDmUserPayload(size_t size) {
// After the first header is sent in response to a request, we cannot
// send any additional headers.
header_response_ = false;
// Reset the buffer for use by the next request.
bufsink_.ResetBufferOffset();
return true;
}
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size) {
CHECK(read_size <= BLOCK_SZ);
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
if (buffer == nullptr) {
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
return false;
}
loff_t offset = sector << SECTOR_SHIFT;
if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
@ -303,7 +271,6 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size);
size_t total_bytes_read = 0;
bufsink_.ResetBufferOffset();
while (read_size) {
// We need to check every 4k block to verify if it is
@ -314,11 +281,17 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
std::make_pair(sector, nullptr), SnapshotHandler::compare);
bool not_found = (it == chunk_vec.end() || it->first != sector);
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, size);
if (!buffer) {
SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadAlignedSector";
return false;
}
if (not_found) {
// Block not found in map - which means this block was not
// changed as per the OTA. Just route the I/O to the base
// device.
if (!ReadDataFromBaseDevice(sector, size)) {
if (!ReadDataFromBaseDevice(sector, buffer, size)) {
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
return false;
}
@ -327,7 +300,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
} else {
// We found the sector in mapping. Check the type of COW OP and
// process it.
if (!ProcessCowOp(it->second)) {
if (!ProcessCowOp(it->second, buffer)) {
SNAP_LOG(ERROR) << "ProcessCowOp failed";
return false;
}
@ -338,14 +311,13 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
read_size -= ret;
total_bytes_read += ret;
sector += (ret >> SECTOR_SHIFT);
bufsink_.UpdateBufferOffset(ret);
}
if (!WriteDmUserPayload(total_bytes_read)) {
if (!SendBufferedIo()) {
return false;
}
SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
SNAP_LOG(DEBUG) << "SendBufferedIo success total_bytes_read: " << total_bytes_read
<< " remaining_size: " << remaining_size;
remaining_size -= total_bytes_read;
} while (remaining_size > 0);
@ -356,40 +328,36 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
int ReadWorker::ReadUnalignedSector(
sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
size_t skip_sector_size = 0;
SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
<< " Aligned sector: " << it->first;
if (!ProcessCowOp(it->second)) {
int num_sectors_skip = sector - it->first;
size_t skip_size = num_sectors_skip << SECTOR_SHIFT;
size_t write_size = std::min(size, BLOCK_SZ - skip_size);
auto buffer = reinterpret_cast<uint8_t*>(bufsink_.AcquireBuffer(BLOCK_SZ, write_size));
if (!buffer) {
SNAP_LOG(ERROR) << "ProcessCowOp failed to allocate buffer";
return -1;
}
if (!ProcessCowOp(it->second, buffer)) {
SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
<< " Aligned sector: " << it->first;
return -1;
}
int num_sectors_skip = sector - it->first;
if (num_sectors_skip > 0) {
skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
if (skip_sector_size == BLOCK_SZ) {
if (skip_size) {
if (skip_size == BLOCK_SZ) {
SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
<< " Base-sector: " << it->first;
return -1;
}
memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
(BLOCK_SZ - skip_sector_size));
memmove(buffer, buffer + skip_size, write_size);
}
bufsink_.ResetBufferOffset();
return std::min(size, (BLOCK_SZ - skip_sector_size));
return write_size;
}
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
bufsink_.ResetBufferOffset();
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
@ -458,7 +426,6 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
// requested offset in this case is beyond the last mapped COW op size (which
// is block 1 in this case).
size_t total_bytes_read = 0;
size_t remaining_size = size;
int ret = 0;
if (!merge_complete && (requested_offset >= final_offset) &&
@ -472,11 +439,10 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
}
remaining_size -= ret;
total_bytes_read += ret;
sector += (ret >> SECTOR_SHIFT);
// Send the data back
if (!WriteDmUserPayload(total_bytes_read)) {
if (!SendBufferedIo()) {
return false;
}
@ -494,26 +460,21 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
// Find the diff of the aligned offset
size_t diff_size = aligned_offset - requested_offset;
CHECK(diff_size <= BLOCK_SZ);
if (remaining_size < diff_size) {
if (!ReadDataFromBaseDevice(sector, remaining_size)) {
return false;
}
total_bytes_read += remaining_size;
if (!WriteDmUserPayload(total_bytes_read)) {
return false;
}
} else {
if (!ReadDataFromBaseDevice(sector, diff_size)) {
return false;
}
total_bytes_read += diff_size;
if (!WriteDmUserPayload(total_bytes_read)) {
return false;
}
size_t read_size = std::min(remaining_size, diff_size);
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, read_size);
if (!buffer) {
SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadUnalignedSector";
return false;
}
if (!ReadDataFromBaseDevice(sector, buffer, read_size)) {
return false;
}
if (!SendBufferedIo()) {
return false;
}
if (remaining_size >= diff_size) {
remaining_size -= diff_size;
size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
sector += num_sectors_read;
@ -555,6 +516,10 @@ bool ReadWorker::DmuserReadRequest() {
return ReadAlignedSector(header->sector, header->len);
}
bool ReadWorker::SendBufferedIo() {
return WriteDmUserPayload(bufsink_.GetPayloadBytesWritten());
}
bool ReadWorker::ProcessIORequest() {
// Read Header from dm-user misc device. This gives
// us the sector number for which IO is issued by dm-snapshot device
@ -579,6 +544,9 @@ bool ReadWorker::ProcessIORequest() {
header->type = DM_USER_RESP_SUCCESS;
header_response_ = true;
// Reset the output buffer.
bufsink_.ResetBufferOffset();
bool ok;
switch (request_type) {
case DM_USER_REQ_MAP_READ:

View file

@ -37,21 +37,22 @@ class ReadWorker : public Worker {
bool ProcessIORequest();
bool WriteDmUserPayload(size_t size);
bool DmuserReadRequest();
bool SendBufferedIo();
void RespondIOError();
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessXorOp(const CowOperation* cow_op);
bool ProcessOrderedOp(const CowOperation* cow_op);
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
bool ProcessZeroOp();
bool ProcessCowOp(const CowOperation* cow_op, void* buffer);
bool ProcessXorOp(const CowOperation* cow_op, void* buffer);
bool ProcessOrderedOp(const CowOperation* cow_op, void* buffer);
bool ProcessCopyOp(const CowOperation* cow_op, void* buffer);
bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer);
bool ProcessZeroOp(void* buffer);
bool ReadAlignedSector(sector_t sector, size_t sz);
bool ReadUnalignedSector(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
bool ReadFromSourceDevice(const CowOperation* cow_op);
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
bool ReadFromSourceDevice(const CowOperation* cow_op, void* buffer);
bool ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size);
constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
@ -62,8 +63,8 @@ class ReadWorker : public Worker {
std::string control_device_;
unique_fd ctrl_fd_;
XorSink xorsink_;
bool header_response_ = false;
std::basic_string<uint8_t> xor_buffer_;
};
} // namespace snapshot

View file

@ -106,9 +106,9 @@ bool MergeWorker::MergeReplaceZeroOps() {
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
const CowOperation* cow_op = replace_zero_vec[i];
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
if (!buffer) {
SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
return false;
}
if (cow_op->type == kCowReplaceOp) {
@ -120,8 +120,6 @@ bool MergeWorker::MergeReplaceZeroOps() {
CHECK(cow_op->type == kCowZeroOp);
memset(buffer, 0, BLOCK_SZ);
}
bufsink_.UpdateBufferOffset(BLOCK_SZ);
}
size_t io_size = linear_blocks * BLOCK_SZ;

View file

@ -492,7 +492,7 @@ bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
if (xor_op_index < xor_op_vec.size()) {
const CowOperation* xor_op = xor_op_vec[xor_op_index];
if (xor_op->new_block == new_block) {
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
if (!buffer) {
SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
<< xor_op->new_block;
@ -506,7 +506,6 @@ bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
}
xor_op_index += 1;
bufsink_.UpdateBufferOffset(BLOCK_SZ);
}
}
block_index += 1;