adbd: add source/sink services.

Add some services that skip the service fd to see how much of a benefit
it'll be to eliminate it.

Test: adb raw source:$((300 * 1024 * 1024)) | pv > /dev/null
Test: dd if=/dev/zero bs=1M count=100 | pv | adb raw sink:$((100 * 1024 * 1024))
Change-Id: I042f25f85b16ae9869cb1f1e306d8671b024ed97
This commit is contained in:
Josh Gao 2018-11-16 15:40:16 -08:00
parent e89a55dd41
commit 6eb788298b
4 changed files with 107 additions and 5 deletions

View file

@ -24,6 +24,7 @@ cc_defaults {
"-Wno-missing-field-initializers",
"-Wvla",
],
cpp_std: "experimental",
use_version_lib: true,

View file

@ -148,6 +148,10 @@ unique_fd daemon_service_to_fd(const char* name, atransport* transport);
asocket* host_service_to_socket(const char* name, const char* serial, TransportId transport_id);
#endif
#if !ADB_HOST
asocket* daemon_service_to_socket(std::string_view name);
#endif
#if !ADB_HOST
int init_jdwp(void);
asocket* create_jdwp_service_socket();

View file

@ -33,6 +33,7 @@
#include <thread>
#include <android-base/file.h>
#include <android-base/parseint.h>
#include <android-base/parsenetaddress.h>
#include <android-base/properties.h>
#include <android-base/stringprintf.h>
@ -223,6 +224,105 @@ static void spin_service(unique_fd fd) {
WriteFdExactly(fd.get(), "spinning\n");
}
struct ServiceSocket : public asocket {
ServiceSocket() {
install_local_socket(this);
this->enqueue = [](asocket* self, apacket::payload_type data) {
return static_cast<ServiceSocket*>(self)->Enqueue(std::move(data));
};
this->ready = [](asocket* self) { return static_cast<ServiceSocket*>(self)->Ready(); };
this->close = [](asocket* self) { return static_cast<ServiceSocket*>(self)->Close(); };
}
virtual ~ServiceSocket() = default;
virtual int Enqueue(apacket::payload_type data) { return -1; }
virtual void Ready() {}
virtual void Close() {
if (peer) {
peer->peer = nullptr;
if (peer->shutdown) {
peer->shutdown(peer);
}
peer->close(peer);
}
remove_socket(this);
delete this;
}
};
struct SinkSocket : public ServiceSocket {
explicit SinkSocket(size_t byte_count) {
LOG(INFO) << "Creating new SinkSocket with capacity " << byte_count;
bytes_left_ = byte_count;
}
virtual ~SinkSocket() { LOG(INFO) << "SinkSocket destroyed"; }
virtual int Enqueue(apacket::payload_type data) override final {
if (bytes_left_ <= data.size()) {
// Done reading.
Close();
return -1;
}
bytes_left_ -= data.size();
return 0;
}
size_t bytes_left_;
};
struct SourceSocket : public ServiceSocket {
explicit SourceSocket(size_t byte_count) {
LOG(INFO) << "Creating new SourceSocket with capacity " << byte_count;
bytes_left_ = byte_count;
}
virtual ~SourceSocket() { LOG(INFO) << "SourceSocket destroyed"; }
void Ready() {
size_t len = std::min(bytes_left_, get_max_payload());
if (len == 0) {
Close();
return;
}
Block block(len);
memset(block.data(), 0, block.size());
peer->enqueue(peer, std::move(block));
bytes_left_ -= len;
}
int Enqueue(apacket::payload_type data) { return -1; }
size_t bytes_left_;
};
asocket* daemon_service_to_socket(std::string_view name) {
if (name == "jdwp") {
return create_jdwp_service_socket();
} else if (name == "track-jdwp") {
return create_jdwp_tracker_service_socket();
} else if (name.starts_with("sink:")) {
name.remove_prefix(strlen("sink:"));
uint64_t byte_count = 0;
if (!android::base::ParseUint(name.data(), &byte_count)) {
return nullptr;
}
return new SinkSocket(byte_count);
} else if (name.starts_with("source:")) {
name.remove_prefix(strlen("source:"));
uint64_t byte_count = 0;
if (!android::base::ParseUint(name.data(), &byte_count)) {
return nullptr;
}
return new SourceSocket(byte_count);
}
return nullptr;
}
unique_fd daemon_service_to_fd(const char* name, atransport* transport) {
if (!strncmp("dev:", name, 4)) {
return unique_fd{unix_open(name + 4, O_RDWR | O_CLOEXEC)};

View file

@ -348,11 +348,8 @@ asocket* create_local_socket(int fd) {
asocket* create_local_service_socket(const char* name, atransport* transport) {
#if !ADB_HOST
if (!strcmp(name, "jdwp")) {
return create_jdwp_service_socket();
}
if (!strcmp(name, "track-jdwp")) {
return create_jdwp_tracker_service_socket();
if (asocket* s = daemon_service_to_socket(name); s) {
return s;
}
#endif
int fd = service_to_fd(name, transport);