Merge "snapuserd: Refactor how buffers are managed." into main am: 72c3ca1604
Original change: https://android-review.googlesource.com/c/platform/system/core/+/2633935 Change-Id: Ia76296ec20f62095d3d49f1e78a9789727beedea Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
commit
a99f70a27b
6 changed files with 117 additions and 169 deletions
|
|
@ -36,6 +36,21 @@ class BufferSink final {
|
|||
struct dm_user_header* GetHeaderPtr();
|
||||
void ResetBufferOffset() { buffer_offset_ = 0; }
|
||||
void* GetPayloadBufPtr();
|
||||
loff_t GetPayloadBytesWritten() { return buffer_offset_; }
|
||||
|
||||
// Same as calling GetPayloadBuffer and then UpdateBufferOffset.
|
||||
//
|
||||
// This is preferred over GetPayloadBuffer as it does not require a
|
||||
// separate call to UpdateBufferOffset.
|
||||
void* AcquireBuffer(size_t size) { return AcquireBuffer(size, size); }
|
||||
|
||||
// Same as AcquireBuffer, but separates the requested size from the buffer
|
||||
// offset. This is useful for a situation where a full run of data will be
|
||||
// read, but only a partial amount will be returned.
|
||||
//
|
||||
// If size != to_write, the excess bytes may be reallocated by the next
|
||||
// call to AcquireBuffer.
|
||||
void* AcquireBuffer(size_t size, size_t to_write);
|
||||
|
||||
private:
|
||||
std::unique_ptr<uint8_t[]> buffer_;
|
||||
|
|
@ -43,19 +58,5 @@ class BufferSink final {
|
|||
size_t buffer_size_;
|
||||
};
|
||||
|
||||
class XorSink final {
|
||||
public:
|
||||
void Initialize(BufferSink* sink, size_t size);
|
||||
void Reset();
|
||||
void* GetBuffer(size_t requested, size_t* actual);
|
||||
bool ReturnData(void* buffer, size_t len);
|
||||
|
||||
private:
|
||||
BufferSink* bufsink_;
|
||||
std::unique_ptr<uint8_t[]> buffer_;
|
||||
size_t buffer_size_;
|
||||
size_t returned_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@
|
|||
*/
|
||||
|
||||
#include <snapuserd/snapuserd_buffer.h>
|
||||
|
||||
#include <android-base/logging.h>
|
||||
#include <snapuserd/snapuserd_kernel.h>
|
||||
|
||||
namespace android {
|
||||
|
|
@ -26,11 +28,23 @@ void BufferSink::Initialize(size_t size) {
|
|||
buffer_ = std::make_unique<uint8_t[]>(size);
|
||||
}
|
||||
|
||||
void* BufferSink::GetPayloadBuffer(size_t size) {
|
||||
if ((buffer_size_ - buffer_offset_) < size) return nullptr;
|
||||
void* BufferSink::AcquireBuffer(size_t size, size_t to_write) {
|
||||
CHECK(to_write <= size);
|
||||
|
||||
void* ptr = GetPayloadBuffer(size);
|
||||
if (!ptr) {
|
||||
return nullptr;
|
||||
}
|
||||
UpdateBufferOffset(to_write);
|
||||
return ptr;
|
||||
}
|
||||
|
||||
void* BufferSink::GetPayloadBuffer(size_t size) {
|
||||
char* buffer = reinterpret_cast<char*>(GetBufPtr());
|
||||
struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
|
||||
if ((buffer_size_ - buffer_offset_ - sizeof(msg->header)) < size) {
|
||||
return nullptr;
|
||||
}
|
||||
return (char*)msg->payload.buf + buffer_offset_;
|
||||
}
|
||||
|
||||
|
|
@ -59,38 +73,5 @@ void* BufferSink::GetPayloadBufPtr() {
|
|||
return msg->payload.buf;
|
||||
}
|
||||
|
||||
void XorSink::Initialize(BufferSink* sink, size_t size) {
|
||||
bufsink_ = sink;
|
||||
buffer_size_ = size;
|
||||
returned_ = 0;
|
||||
buffer_ = std::make_unique<uint8_t[]>(size);
|
||||
}
|
||||
|
||||
void XorSink::Reset() {
|
||||
returned_ = 0;
|
||||
}
|
||||
|
||||
void* XorSink::GetBuffer(size_t requested, size_t* actual) {
|
||||
if (requested > buffer_size_) {
|
||||
*actual = buffer_size_;
|
||||
} else {
|
||||
*actual = requested;
|
||||
}
|
||||
return buffer_.get();
|
||||
}
|
||||
|
||||
bool XorSink::ReturnData(void* buffer, size_t len) {
|
||||
uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
|
||||
uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
|
||||
if (buff == nullptr) {
|
||||
return false;
|
||||
}
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
buff[returned_ + i] ^= xor_data[i];
|
||||
}
|
||||
returned_ += len;
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace snapshot
|
||||
} // namespace android
|
||||
|
|
|
|||
|
|
@ -42,12 +42,7 @@ ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing
|
|||
// Start the replace operation. This will read the
|
||||
// internal COW format and if the block is compressed,
|
||||
// it will be de-compressed.
|
||||
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
|
||||
return false;
|
||||
}
|
||||
bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer) {
|
||||
if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
|
||||
SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
|
||||
return false;
|
||||
|
|
@ -55,12 +50,7 @@ bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
|
||||
return false;
|
||||
}
|
||||
bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op, void* buffer) {
|
||||
uint64_t offset;
|
||||
if (!reader_->GetSourceOffset(cow_op, &offset)) {
|
||||
SNAP_LOG(ERROR) << "ReadFromSourceDevice: Failed to get source offset";
|
||||
|
|
@ -85,60 +75,43 @@ bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
|
|||
|
||||
// Start the copy operation. This will read the backing
|
||||
// block device which is represented by cow_op->source.
|
||||
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
|
||||
if (!ReadFromSourceDevice(cow_op)) {
|
||||
bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op, void* buffer) {
|
||||
if (!ReadFromSourceDevice(cow_op, buffer)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
|
||||
if (!ReadFromSourceDevice(cow_op)) {
|
||||
bool ReadWorker::ProcessXorOp(const CowOperation* cow_op, void* buffer) {
|
||||
if (!ReadFromSourceDevice(cow_op, buffer)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
xorsink_.Reset();
|
||||
|
||||
size_t actual = 0;
|
||||
void* buffer = xorsink_.GetBuffer(BLOCK_SZ, &actual);
|
||||
if (!buffer || actual < BLOCK_SZ) {
|
||||
SNAP_LOG(ERROR) << "ProcessXorOp failed to get buffer of " << BLOCK_SZ << " size, got "
|
||||
<< actual;
|
||||
return false;
|
||||
if (xor_buffer_.empty()) {
|
||||
xor_buffer_.resize(BLOCK_SZ);
|
||||
}
|
||||
ssize_t size = reader_->ReadData(cow_op, buffer, BLOCK_SZ);
|
||||
CHECK(xor_buffer_.size() == BLOCK_SZ);
|
||||
|
||||
ssize_t size = reader_->ReadData(cow_op, xor_buffer_.data(), xor_buffer_.size());
|
||||
if (size != BLOCK_SZ) {
|
||||
SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block
|
||||
<< ", return value: " << size;
|
||||
return false;
|
||||
}
|
||||
if (!xorsink_.ReturnData(buffer, size)) {
|
||||
SNAP_LOG(ERROR) << "ProcessXorOp failed to return data";
|
||||
return false;
|
||||
|
||||
auto xor_out = reinterpret_cast<uint8_t*>(buffer);
|
||||
for (size_t i = 0; i < BLOCK_SZ; i++) {
|
||||
xor_out[i] ^= xor_buffer_[i];
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessZeroOp() {
|
||||
// Zero out the entire block
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessZeroOp(void* buffer) {
|
||||
memset(buffer, 0, BLOCK_SZ);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op, void* buffer) {
|
||||
MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);
|
||||
|
||||
switch (state) {
|
||||
|
|
@ -148,7 +121,7 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
|
|||
SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
|
||||
<< (cow_op->new_block >> SECTOR_SHIFT)
|
||||
<< " Block-number: " << cow_op->new_block;
|
||||
if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
|
||||
if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), buffer, BLOCK_SZ)) {
|
||||
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
|
||||
<< (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
|
||||
return false;
|
||||
|
|
@ -158,9 +131,9 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
|
|||
case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
|
||||
bool ret;
|
||||
if (cow_op->type == kCowCopyOp) {
|
||||
ret = ProcessCopyOp(cow_op);
|
||||
ret = ProcessCopyOp(cow_op, buffer);
|
||||
} else {
|
||||
ret = ProcessXorOp(cow_op);
|
||||
ret = ProcessXorOp(cow_op, buffer);
|
||||
}
|
||||
|
||||
// I/O is complete - decrement the refcount irrespective of the return
|
||||
|
|
@ -185,7 +158,7 @@ bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
|
||||
bool ReadWorker::ProcessCowOp(const CowOperation* cow_op, void* buffer) {
|
||||
if (cow_op == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
|
||||
return false;
|
||||
|
|
@ -193,17 +166,17 @@ bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
|
|||
|
||||
switch (cow_op->type) {
|
||||
case kCowReplaceOp: {
|
||||
return ProcessReplaceOp(cow_op);
|
||||
return ProcessReplaceOp(cow_op, buffer);
|
||||
}
|
||||
|
||||
case kCowZeroOp: {
|
||||
return ProcessZeroOp();
|
||||
return ProcessZeroOp(buffer);
|
||||
}
|
||||
|
||||
case kCowCopyOp:
|
||||
[[fallthrough]];
|
||||
case kCowXorOp: {
|
||||
return ProcessOrderedOp(cow_op);
|
||||
return ProcessOrderedOp(cow_op, buffer);
|
||||
}
|
||||
|
||||
default: {
|
||||
|
|
@ -229,8 +202,6 @@ bool ReadWorker::Init() {
|
|||
SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
|
||||
return false;
|
||||
}
|
||||
|
||||
xorsink_.Initialize(&bufsink_, BLOCK_SZ);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -271,18 +242,15 @@ bool ReadWorker::WriteDmUserPayload(size_t size) {
|
|||
// After the first header is sent in response to a request, we cannot
|
||||
// send any additional headers.
|
||||
header_response_ = false;
|
||||
|
||||
// Reset the buffer for use by the next request.
|
||||
bufsink_.ResetBufferOffset();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
|
||||
bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size) {
|
||||
CHECK(read_size <= BLOCK_SZ);
|
||||
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
if (buffer == nullptr) {
|
||||
SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
|
||||
return false;
|
||||
}
|
||||
|
||||
loff_t offset = sector << SECTOR_SHIFT;
|
||||
if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
|
||||
SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
|
||||
|
|
@ -303,7 +271,6 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size);
|
||||
|
||||
size_t total_bytes_read = 0;
|
||||
bufsink_.ResetBufferOffset();
|
||||
|
||||
while (read_size) {
|
||||
// We need to check every 4k block to verify if it is
|
||||
|
|
@ -314,11 +281,17 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
std::make_pair(sector, nullptr), SnapshotHandler::compare);
|
||||
bool not_found = (it == chunk_vec.end() || it->first != sector);
|
||||
|
||||
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, size);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadAlignedSector";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (not_found) {
|
||||
// Block not found in map - which means this block was not
|
||||
// changed as per the OTA. Just route the I/O to the base
|
||||
// device.
|
||||
if (!ReadDataFromBaseDevice(sector, size)) {
|
||||
if (!ReadDataFromBaseDevice(sector, buffer, size)) {
|
||||
SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
|
||||
return false;
|
||||
}
|
||||
|
|
@ -327,7 +300,7 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
} else {
|
||||
// We found the sector in mapping. Check the type of COW OP and
|
||||
// process it.
|
||||
if (!ProcessCowOp(it->second)) {
|
||||
if (!ProcessCowOp(it->second, buffer)) {
|
||||
SNAP_LOG(ERROR) << "ProcessCowOp failed";
|
||||
return false;
|
||||
}
|
||||
|
|
@ -338,14 +311,13 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
read_size -= ret;
|
||||
total_bytes_read += ret;
|
||||
sector += (ret >> SECTOR_SHIFT);
|
||||
bufsink_.UpdateBufferOffset(ret);
|
||||
}
|
||||
|
||||
if (!WriteDmUserPayload(total_bytes_read)) {
|
||||
if (!SendBufferedIo()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
|
||||
SNAP_LOG(DEBUG) << "SendBufferedIo success total_bytes_read: " << total_bytes_read
|
||||
<< " remaining_size: " << remaining_size;
|
||||
remaining_size -= total_bytes_read;
|
||||
} while (remaining_size > 0);
|
||||
|
|
@ -356,40 +328,36 @@ bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
|
|||
int ReadWorker::ReadUnalignedSector(
|
||||
sector_t sector, size_t size,
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
|
||||
size_t skip_sector_size = 0;
|
||||
|
||||
SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
|
||||
<< " Aligned sector: " << it->first;
|
||||
|
||||
if (!ProcessCowOp(it->second)) {
|
||||
int num_sectors_skip = sector - it->first;
|
||||
size_t skip_size = num_sectors_skip << SECTOR_SHIFT;
|
||||
size_t write_size = std::min(size, BLOCK_SZ - skip_size);
|
||||
auto buffer = reinterpret_cast<uint8_t*>(bufsink_.AcquireBuffer(BLOCK_SZ, write_size));
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "ProcessCowOp failed to allocate buffer";
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!ProcessCowOp(it->second, buffer)) {
|
||||
SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
|
||||
<< " Aligned sector: " << it->first;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int num_sectors_skip = sector - it->first;
|
||||
|
||||
if (num_sectors_skip > 0) {
|
||||
skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
|
||||
char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
|
||||
struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
|
||||
|
||||
if (skip_sector_size == BLOCK_SZ) {
|
||||
if (skip_size) {
|
||||
if (skip_size == BLOCK_SZ) {
|
||||
SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
|
||||
<< " Base-sector: " << it->first;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
|
||||
(BLOCK_SZ - skip_sector_size));
|
||||
memmove(buffer, buffer + skip_size, write_size);
|
||||
}
|
||||
|
||||
bufsink_.ResetBufferOffset();
|
||||
return std::min(size, (BLOCK_SZ - skip_sector_size));
|
||||
return write_size;
|
||||
}
|
||||
|
||||
bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
|
||||
bufsink_.ResetBufferOffset();
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
|
||||
|
||||
auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
|
||||
|
|
@ -458,7 +426,6 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
|
|||
// requested offset in this case is beyond the last mapped COW op size (which
|
||||
// is block 1 in this case).
|
||||
|
||||
size_t total_bytes_read = 0;
|
||||
size_t remaining_size = size;
|
||||
int ret = 0;
|
||||
if (!merge_complete && (requested_offset >= final_offset) &&
|
||||
|
|
@ -472,11 +439,10 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
|
|||
}
|
||||
|
||||
remaining_size -= ret;
|
||||
total_bytes_read += ret;
|
||||
sector += (ret >> SECTOR_SHIFT);
|
||||
|
||||
// Send the data back
|
||||
if (!WriteDmUserPayload(total_bytes_read)) {
|
||||
if (!SendBufferedIo()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -494,26 +460,21 @@ bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
|
|||
// Find the diff of the aligned offset
|
||||
size_t diff_size = aligned_offset - requested_offset;
|
||||
CHECK(diff_size <= BLOCK_SZ);
|
||||
if (remaining_size < diff_size) {
|
||||
if (!ReadDataFromBaseDevice(sector, remaining_size)) {
|
||||
return false;
|
||||
}
|
||||
total_bytes_read += remaining_size;
|
||||
|
||||
if (!WriteDmUserPayload(total_bytes_read)) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (!ReadDataFromBaseDevice(sector, diff_size)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
total_bytes_read += diff_size;
|
||||
|
||||
if (!WriteDmUserPayload(total_bytes_read)) {
|
||||
return false;
|
||||
}
|
||||
size_t read_size = std::min(remaining_size, diff_size);
|
||||
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, read_size);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadUnalignedSector";
|
||||
return false;
|
||||
}
|
||||
if (!ReadDataFromBaseDevice(sector, buffer, read_size)) {
|
||||
return false;
|
||||
}
|
||||
if (!SendBufferedIo()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (remaining_size >= diff_size) {
|
||||
remaining_size -= diff_size;
|
||||
size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
|
||||
sector += num_sectors_read;
|
||||
|
|
@ -555,6 +516,10 @@ bool ReadWorker::DmuserReadRequest() {
|
|||
return ReadAlignedSector(header->sector, header->len);
|
||||
}
|
||||
|
||||
bool ReadWorker::SendBufferedIo() {
|
||||
return WriteDmUserPayload(bufsink_.GetPayloadBytesWritten());
|
||||
}
|
||||
|
||||
bool ReadWorker::ProcessIORequest() {
|
||||
// Read Header from dm-user misc device. This gives
|
||||
// us the sector number for which IO is issued by dm-snapshot device
|
||||
|
|
@ -579,6 +544,9 @@ bool ReadWorker::ProcessIORequest() {
|
|||
header->type = DM_USER_RESP_SUCCESS;
|
||||
header_response_ = true;
|
||||
|
||||
// Reset the output buffer.
|
||||
bufsink_.ResetBufferOffset();
|
||||
|
||||
bool ok;
|
||||
switch (request_type) {
|
||||
case DM_USER_REQ_MAP_READ:
|
||||
|
|
|
|||
|
|
@ -37,21 +37,22 @@ class ReadWorker : public Worker {
|
|||
bool ProcessIORequest();
|
||||
bool WriteDmUserPayload(size_t size);
|
||||
bool DmuserReadRequest();
|
||||
bool SendBufferedIo();
|
||||
void RespondIOError();
|
||||
|
||||
bool ProcessCowOp(const CowOperation* cow_op);
|
||||
bool ProcessXorOp(const CowOperation* cow_op);
|
||||
bool ProcessOrderedOp(const CowOperation* cow_op);
|
||||
bool ProcessCopyOp(const CowOperation* cow_op);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op);
|
||||
bool ProcessZeroOp();
|
||||
bool ProcessCowOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessXorOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessOrderedOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessCopyOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer);
|
||||
bool ProcessZeroOp(void* buffer);
|
||||
|
||||
bool ReadAlignedSector(sector_t sector, size_t sz);
|
||||
bool ReadUnalignedSector(sector_t sector, size_t size);
|
||||
int ReadUnalignedSector(sector_t sector, size_t size,
|
||||
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
|
||||
bool ReadFromSourceDevice(const CowOperation* cow_op);
|
||||
bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
|
||||
bool ReadFromSourceDevice(const CowOperation* cow_op, void* buffer);
|
||||
bool ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size);
|
||||
|
||||
constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
|
||||
constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
|
||||
|
|
@ -62,8 +63,8 @@ class ReadWorker : public Worker {
|
|||
std::string control_device_;
|
||||
unique_fd ctrl_fd_;
|
||||
|
||||
XorSink xorsink_;
|
||||
bool header_response_ = false;
|
||||
std::basic_string<uint8_t> xor_buffer_;
|
||||
};
|
||||
|
||||
} // namespace snapshot
|
||||
|
|
|
|||
|
|
@ -106,9 +106,9 @@ bool MergeWorker::MergeReplaceZeroOps() {
|
|||
for (size_t i = 0; i < replace_zero_vec.size(); i++) {
|
||||
const CowOperation* cow_op = replace_zero_vec[i];
|
||||
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
|
||||
SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
|
||||
return false;
|
||||
}
|
||||
if (cow_op->type == kCowReplaceOp) {
|
||||
|
|
@ -120,8 +120,6 @@ bool MergeWorker::MergeReplaceZeroOps() {
|
|||
CHECK(cow_op->type == kCowZeroOp);
|
||||
memset(buffer, 0, BLOCK_SZ);
|
||||
}
|
||||
|
||||
bufsink_.UpdateBufferOffset(BLOCK_SZ);
|
||||
}
|
||||
|
||||
size_t io_size = linear_blocks * BLOCK_SZ;
|
||||
|
|
|
|||
|
|
@ -492,7 +492,7 @@ bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
|
|||
if (xor_op_index < xor_op_vec.size()) {
|
||||
const CowOperation* xor_op = xor_op_vec[xor_op_index];
|
||||
if (xor_op->new_block == new_block) {
|
||||
void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
|
||||
void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
|
||||
if (!buffer) {
|
||||
SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
|
||||
<< xor_op->new_block;
|
||||
|
|
@ -506,7 +506,6 @@ bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
|
|||
}
|
||||
|
||||
xor_op_index += 1;
|
||||
bufsink_.UpdateBufferOffset(BLOCK_SZ);
|
||||
}
|
||||
}
|
||||
block_index += 1;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue