Merge changes I5e56f17f,Ie3748b6c,I4e1d56a8

* changes:
  adb: switch the socket list mutex to a recursive_mutex.
  adb: clang-format sockets.cpp.
  adb: add implementations of mutex, recursive_mutex.
This commit is contained in:
Treehugger Robot 2016-05-18 23:14:41 +00:00 committed by Gerrit Code Review
commit cea14958d2
4 changed files with 402 additions and 268 deletions

View file

@ -8,7 +8,6 @@
#endif #endif
ADB_MUTEX(basename_lock) ADB_MUTEX(basename_lock)
ADB_MUTEX(dirname_lock) ADB_MUTEX(dirname_lock)
ADB_MUTEX(socket_list_lock)
ADB_MUTEX(transport_lock) ADB_MUTEX(transport_lock)
#if ADB_HOST #if ADB_HOST
ADB_MUTEX(local_transports_lock) ADB_MUTEX(local_transports_lock)

View file

@ -26,6 +26,7 @@
#include <unistd.h> #include <unistd.h>
#include <algorithm> #include <algorithm>
#include <mutex>
#include <string> #include <string>
#include <vector> #include <vector>
@ -35,17 +36,16 @@
#include "adb.h" #include "adb.h"
#include "adb_io.h" #include "adb_io.h"
#include "sysdeps/mutex.h"
#include "transport.h" #include "transport.h"
ADB_MUTEX_DEFINE( socket_list_lock ); static void local_socket_close(asocket* s);
static void local_socket_close_locked(asocket *s);
static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex();
static unsigned local_socket_next_id = 1; static unsigned local_socket_next_id = 1;
static asocket local_socket_list = { static asocket local_socket_list = {
.next = &local_socket_list, .next = &local_socket_list, .prev = &local_socket_list,
.prev = &local_socket_list,
}; };
/* the the list of currently closing local sockets. /* the the list of currently closing local sockets.
@ -53,62 +53,53 @@ static asocket local_socket_list = {
** write to their fd. ** write to their fd.
*/ */
static asocket local_socket_closing_list = { static asocket local_socket_closing_list = {
.next = &local_socket_closing_list, .next = &local_socket_closing_list, .prev = &local_socket_closing_list,
.prev = &local_socket_closing_list,
}; };
// Parse the global list of sockets to find one with id |local_id|. // Parse the global list of sockets to find one with id |local_id|.
// If |peer_id| is not 0, also check that it is connected to a peer // If |peer_id| is not 0, also check that it is connected to a peer
// with id |peer_id|. Returns an asocket handle on success, NULL on failure. // with id |peer_id|. Returns an asocket handle on success, NULL on failure.
asocket *find_local_socket(unsigned local_id, unsigned peer_id) asocket* find_local_socket(unsigned local_id, unsigned peer_id) {
{ asocket* s;
asocket *s; asocket* result = NULL;
asocket *result = NULL;
adb_mutex_lock(&socket_list_lock); std::lock_guard<std::recursive_mutex> lock(local_socket_list_lock);
for (s = local_socket_list.next; s != &local_socket_list; s = s->next) { for (s = local_socket_list.next; s != &local_socket_list; s = s->next) {
if (s->id != local_id) if (s->id != local_id) {
continue; continue;
}
if (peer_id == 0 || (s->peer && s->peer->id == peer_id)) { if (peer_id == 0 || (s->peer && s->peer->id == peer_id)) {
result = s; result = s;
} }
break; break;
} }
adb_mutex_unlock(&socket_list_lock);
return result; return result;
} }
static void static void insert_local_socket(asocket* s, asocket* list) {
insert_local_socket(asocket* s, asocket* list) s->next = list;
{ s->prev = s->next->prev;
s->next = list;
s->prev = s->next->prev;
s->prev->next = s; s->prev->next = s;
s->next->prev = s; s->next->prev = s;
} }
void install_local_socket(asocket* s) {
void install_local_socket(asocket *s) std::lock_guard<std::recursive_mutex> lock(local_socket_list_lock);
{
adb_mutex_lock(&socket_list_lock);
s->id = local_socket_next_id++; s->id = local_socket_next_id++;
// Socket ids should never be 0. // Socket ids should never be 0.
if (local_socket_next_id == 0) if (local_socket_next_id == 0) {
local_socket_next_id = 1; fatal("local socket id overflow");
}
insert_local_socket(s, &local_socket_list); insert_local_socket(s, &local_socket_list);
adb_mutex_unlock(&socket_list_lock);
} }
void remove_socket(asocket *s) void remove_socket(asocket* s) {
{
// socket_list_lock should already be held // socket_list_lock should already be held
if (s->prev && s->next) if (s->prev && s->next) {
{
s->prev->next = s->next; s->prev->next = s->next;
s->next->prev = s->prev; s->next->prev = s->prev;
s->next = 0; s->next = 0;
@ -117,50 +108,47 @@ void remove_socket(asocket *s)
} }
} }
void close_all_sockets(atransport *t) void close_all_sockets(atransport* t) {
{ asocket* s;
asocket *s;
/* this is a little gross, but since s->close() *will* modify /* this is a little gross, but since s->close() *will* modify
** the list out from under you, your options are limited. ** the list out from under you, your options are limited.
*/ */
adb_mutex_lock(&socket_list_lock); std::lock_guard<std::recursive_mutex> lock(local_socket_list_lock);
restart: restart:
for(s = local_socket_list.next; s != &local_socket_list; s = s->next){ for (s = local_socket_list.next; s != &local_socket_list; s = s->next) {
if(s->transport == t || (s->peer && s->peer->transport == t)) { if (s->transport == t || (s->peer && s->peer->transport == t)) {
local_socket_close_locked(s); local_socket_close(s);
goto restart; goto restart;
} }
} }
adb_mutex_unlock(&socket_list_lock);
} }
static int local_socket_enqueue(asocket *s, apacket *p) static int local_socket_enqueue(asocket* s, apacket* p) {
{
D("LS(%d): enqueue %d", s->id, p->len); D("LS(%d): enqueue %d", s->id, p->len);
p->ptr = p->data; p->ptr = p->data;
/* if there is already data queue'd, we will receive /* if there is already data queue'd, we will receive
** events when it's time to write. just add this to ** events when it's time to write. just add this to
** the tail ** the tail
*/ */
if(s->pkt_first) { if (s->pkt_first) {
goto enqueue; goto enqueue;
} }
/* write as much as we can, until we /* write as much as we can, until we
** would block or there is an error/eof ** would block or there is an error/eof
*/ */
while(p->len > 0) { while (p->len > 0) {
int r = adb_write(s->fd, p->ptr, p->len); int r = adb_write(s->fd, p->ptr, p->len);
if(r > 0) { if (r > 0) {
p->len -= r; p->len -= r;
p->ptr += r; p->ptr += r;
continue; continue;
} }
if((r == 0) || (errno != EAGAIN)) { if ((r == 0) || (errno != EAGAIN)) {
D( "LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno) ); D("LS(%d): not ready, errno=%d: %s", s->id, errno, strerror(errno));
put_apacket(p); put_apacket(p);
s->has_write_error = true; s->has_write_error = true;
s->close(s); s->close(s);
@ -170,55 +158,46 @@ static int local_socket_enqueue(asocket *s, apacket *p)
} }
} }
if(p->len == 0) { if (p->len == 0) {
put_apacket(p); put_apacket(p);
return 0; /* ready for more data */ return 0; /* ready for more data */
} }
enqueue: enqueue:
p->next = 0; p->next = 0;
if(s->pkt_first) { if (s->pkt_first) {
s->pkt_last->next = p; s->pkt_last->next = p;
} else { } else {
s->pkt_first = p; s->pkt_first = p;
} }
s->pkt_last = p; s->pkt_last = p;
/* make sure we are notified when we can drain the queue */ /* make sure we are notified when we can drain the queue */
fdevent_add(&s->fde, FDE_WRITE); fdevent_add(&s->fde, FDE_WRITE);
return 1; /* not ready (backlog) */ return 1; /* not ready (backlog) */
} }
static void local_socket_ready(asocket *s) static void local_socket_ready(asocket* s) {
{
/* far side is ready for data, pay attention to /* far side is ready for data, pay attention to
readable events */ readable events */
fdevent_add(&s->fde, FDE_READ); fdevent_add(&s->fde, FDE_READ);
} }
static void local_socket_close(asocket *s)
{
adb_mutex_lock(&socket_list_lock);
local_socket_close_locked(s);
adb_mutex_unlock(&socket_list_lock);
}
// be sure to hold the socket list lock when calling this // be sure to hold the socket list lock when calling this
static void local_socket_destroy(asocket *s) static void local_socket_destroy(asocket* s) {
{
apacket *p, *n; apacket *p, *n;
int exit_on_close = s->exit_on_close; int exit_on_close = s->exit_on_close;
D("LS(%d): destroying fde.fd=%d", s->id, s->fde.fd); D("LS(%d): destroying fde.fd=%d", s->id, s->fde.fd);
/* IMPORTANT: the remove closes the fd /* IMPORTANT: the remove closes the fd
** that belongs to this socket ** that belongs to this socket
*/ */
fdevent_remove(&s->fde); fdevent_remove(&s->fde);
/* dispose of any unwritten data */ /* dispose of any unwritten data */
for(p = s->pkt_first; p; p = n) { for (p = s->pkt_first; p; p = n) {
D("LS(%d): discarding %d bytes", s->id, p->len); D("LS(%d): discarding %d bytes", s->id, p->len);
n = p->next; n = p->next;
put_apacket(p); put_apacket(p);
@ -232,41 +211,35 @@ static void local_socket_destroy(asocket *s)
} }
} }
static void local_socket_close(asocket* s) {
static void local_socket_close_locked(asocket *s) D("entered local_socket_close. LS(%d) fd=%d", s->id, s->fd);
{ std::lock_guard<std::recursive_mutex> lock(local_socket_list_lock);
D("entered local_socket_close_locked. LS(%d) fd=%d", s->id, s->fd); if (s->peer) {
if(s->peer) { D("LS(%d): closing peer. peer->id=%d peer->fd=%d", s->id, s->peer->id, s->peer->fd);
D("LS(%d): closing peer. peer->id=%d peer->fd=%d",
s->id, s->peer->id, s->peer->fd);
/* Note: it's important to call shutdown before disconnecting from /* Note: it's important to call shutdown before disconnecting from
* the peer, this ensures that remote sockets can still get the id * the peer, this ensures that remote sockets can still get the id
* of the local socket they're connected to, to send a CLOSE() * of the local socket they're connected to, to send a CLOSE()
* protocol event. */ * protocol event. */
if (s->peer->shutdown) if (s->peer->shutdown) {
s->peer->shutdown(s->peer); s->peer->shutdown(s->peer);
s->peer->peer = 0;
// tweak to avoid deadlock
if (s->peer->close == local_socket_close) {
local_socket_close_locked(s->peer);
} else {
s->peer->close(s->peer);
} }
s->peer = 0; s->peer->peer = nullptr;
s->peer->close(s->peer);
s->peer = nullptr;
} }
/* If we are already closing, or if there are no /* If we are already closing, or if there are no
** pending packets, destroy immediately ** pending packets, destroy immediately
*/ */
if (s->closing || s->has_write_error || s->pkt_first == NULL) { if (s->closing || s->has_write_error || s->pkt_first == NULL) {
int id = s->id; int id = s->id;
local_socket_destroy(s); local_socket_destroy(s);
D("LS(%d): closed", id); D("LS(%d): closed", id);
return; return;
} }
/* otherwise, put on the closing list /* otherwise, put on the closing list
*/ */
D("LS(%d): closing", s->id); D("LS(%d): closing", s->id);
s->closing = 1; s->closing = 1;
fdevent_del(&s->fde, FDE_READ); fdevent_del(&s->fde, FDE_READ);
@ -276,8 +249,7 @@ static void local_socket_close_locked(asocket *s)
CHECK_EQ(FDE_WRITE, s->fde.state & FDE_WRITE); CHECK_EQ(FDE_WRITE, s->fde.state & FDE_WRITE);
} }
static void local_socket_event_func(int fd, unsigned ev, void* _s) static void local_socket_event_func(int fd, unsigned ev, void* _s) {
{
asocket* s = reinterpret_cast<asocket*>(_s); asocket* s = reinterpret_cast<asocket*>(_s);
D("LS(%d): event_func(fd=%d(==%d), ev=%04x)", s->id, s->fd, fd, ev); D("LS(%d): event_func(fd=%d(==%d), ev=%04x)", s->id, s->fd, fd, ev);
@ -334,10 +306,9 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s)
s->peer->ready(s->peer); s->peer->ready(s->peer);
} }
if (ev & FDE_READ) { if (ev & FDE_READ) {
apacket *p = get_apacket(); apacket* p = get_apacket();
unsigned char *x = p->data; unsigned char* x = p->data;
const size_t max_payload = s->get_max_payload(); const size_t max_payload = s->get_max_payload();
size_t avail = max_payload; size_t avail = max_payload;
int r = 0; int r = 0;
@ -345,8 +316,8 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s)
while (avail > 0) { while (avail > 0) {
r = adb_read(fd, x, avail); r = adb_read(fd, x, avail);
D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", D("LS(%d): post adb_read(fd=%d,...) r=%d (errno=%d) avail=%zu", s->id, s->fd, r,
s->id, s->fd, r, r < 0 ? errno : 0, avail); r < 0 ? errno : 0, avail);
if (r == -1) { if (r == -1) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
break; break;
@ -361,8 +332,8 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s)
is_eof = 1; is_eof = 1;
break; break;
} }
D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", D("LS(%d): fd=%d post avail loop. r=%d is_eof=%d forced_eof=%d", s->id, s->fd, r, is_eof,
s->id, s->fd, r, is_eof, s->fde.force_eof); s->fde.force_eof);
if ((avail == max_payload) || (s->peer == 0)) { if ((avail == max_payload) || (s->peer == 0)) {
put_apacket(p); put_apacket(p);
} else { } else {
@ -376,48 +347,48 @@ static void local_socket_event_func(int fd, unsigned ev, void* _s)
D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r); D("LS(%u): fd=%d post peer->enqueue(). r=%d", saved_id, saved_fd, r);
if (r < 0) { if (r < 0) {
/* error return means they closed us as a side-effect /* error return means they closed us as a side-effect
** and we must return immediately. ** and we must return immediately.
** **
** note that if we still have buffered packets, the ** note that if we still have buffered packets, the
** socket will be placed on the closing socket list. ** socket will be placed on the closing socket list.
** this handler function will be called again ** this handler function will be called again
** to process FDE_WRITE events. ** to process FDE_WRITE events.
*/ */
return; return;
} }
if (r > 0) { if (r > 0) {
/* if the remote cannot accept further events, /* if the remote cannot accept further events,
** we disable notification of READs. They'll ** we disable notification of READs. They'll
** be enabled again when we get a call to ready() ** be enabled again when we get a call to ready()
*/ */
fdevent_del(&s->fde, FDE_READ); fdevent_del(&s->fde, FDE_READ);
} }
} }
/* Don't allow a forced eof if data is still there */ /* Don't allow a forced eof if data is still there */
if ((s->fde.force_eof && !r) || is_eof) { if ((s->fde.force_eof && !r) || is_eof) {
D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", D(" closing because is_eof=%d r=%d s->fde.force_eof=%d", is_eof, r, s->fde.force_eof);
is_eof, r, s->fde.force_eof);
s->close(s); s->close(s);
return; return;
} }
} }
if (ev & FDE_ERROR){ if (ev & FDE_ERROR) {
/* this should be caught be the next read or write /* this should be caught be the next read or write
** catching it here means we may skip the last few ** catching it here means we may skip the last few
** bytes of readable data. ** bytes of readable data.
*/ */
D("LS(%d): FDE_ERROR (fd=%d)", s->id, s->fd); D("LS(%d): FDE_ERROR (fd=%d)", s->id, s->fd);
return; return;
} }
} }
asocket *create_local_socket(int fd) asocket* create_local_socket(int fd) {
{ asocket* s = reinterpret_cast<asocket*>(calloc(1, sizeof(asocket)));
asocket *s = reinterpret_cast<asocket*>(calloc(1, sizeof(asocket))); if (s == NULL) {
if (s == NULL) fatal("cannot allocate socket"); fatal("cannot allocate socket");
}
s->fd = fd; s->fd = fd;
s->enqueue = local_socket_enqueue; s->enqueue = local_socket_enqueue;
s->ready = local_socket_ready; s->ready = local_socket_ready;
@ -430,32 +401,33 @@ asocket *create_local_socket(int fd)
return s; return s;
} }
asocket *create_local_service_socket(const char *name, asocket* create_local_service_socket(const char* name, const atransport* transport) {
const atransport* transport)
{
#if !ADB_HOST #if !ADB_HOST
if (!strcmp(name,"jdwp")) { if (!strcmp(name, "jdwp")) {
return create_jdwp_service_socket(); return create_jdwp_service_socket();
} }
if (!strcmp(name,"track-jdwp")) { if (!strcmp(name, "track-jdwp")) {
return create_jdwp_tracker_service_socket(); return create_jdwp_tracker_service_socket();
} }
#endif #endif
int fd = service_to_fd(name, transport); int fd = service_to_fd(name, transport);
if(fd < 0) return 0; if (fd < 0) {
return 0;
}
asocket* s = create_local_socket(fd); asocket* s = create_local_socket(fd);
D("LS(%d): bound to '%s' via %d", s->id, name, fd); D("LS(%d): bound to '%s' via %d", s->id, name, fd);
#if !ADB_HOST #if !ADB_HOST
char debug[PROPERTY_VALUE_MAX]; char debug[PROPERTY_VALUE_MAX];
if (!strncmp(name, "root:", 5)) if (!strncmp(name, "root:", 5)) {
property_get("ro.debuggable", debug, ""); property_get("ro.debuggable", debug, "");
}
if ((!strncmp(name, "root:", 5) && getuid() != 0 && strcmp(debug, "1") == 0) if ((!strncmp(name, "root:", 5) && getuid() != 0 && strcmp(debug, "1") == 0) ||
|| (!strncmp(name, "unroot:", 7) && getuid() == 0) (!strncmp(name, "unroot:", 7) && getuid() == 0) ||
|| !strncmp(name, "usb:", 4) !strncmp(name, "usb:", 4) ||
|| !strncmp(name, "tcpip:", 6)) { !strncmp(name, "tcpip:", 6)) {
D("LS(%d): enabling exit_on_close", s->id); D("LS(%d): enabling exit_on_close", s->id);
s->exit_on_close = 1; s->exit_on_close = 1;
} }
@ -465,9 +437,8 @@ asocket *create_local_service_socket(const char *name,
} }
#if ADB_HOST #if ADB_HOST
static asocket *create_host_service_socket(const char *name, const char* serial) static asocket* create_host_service_socket(const char* name, const char* serial) {
{ asocket* s;
asocket *s;
s = host_service_to_socket(name, serial); s = host_service_to_socket(name, serial);
@ -480,10 +451,8 @@ static asocket *create_host_service_socket(const char *name, const char* serial)
} }
#endif /* ADB_HOST */ #endif /* ADB_HOST */
static int remote_socket_enqueue(asocket *s, apacket *p) static int remote_socket_enqueue(asocket* s, apacket* p) {
{ D("entered remote_socket_enqueue RS(%d) WRITE fd=%d peer.fd=%d", s->id, s->fd, s->peer->fd);
D("entered remote_socket_enqueue RS(%d) WRITE fd=%d peer.fd=%d",
s->id, s->fd, s->peer->fd);
p->msg.command = A_WRTE; p->msg.command = A_WRTE;
p->msg.arg0 = s->peer->id; p->msg.arg0 = s->peer->id;
p->msg.arg1 = s->id; p->msg.arg1 = s->id;
@ -492,40 +461,35 @@ static int remote_socket_enqueue(asocket *s, apacket *p)
return 1; return 1;
} }
static void remote_socket_ready(asocket *s) static void remote_socket_ready(asocket* s) {
{ D("entered remote_socket_ready RS(%d) OKAY fd=%d peer.fd=%d", s->id, s->fd, s->peer->fd);
D("entered remote_socket_ready RS(%d) OKAY fd=%d peer.fd=%d", apacket* p = get_apacket();
s->id, s->fd, s->peer->fd);
apacket *p = get_apacket();
p->msg.command = A_OKAY; p->msg.command = A_OKAY;
p->msg.arg0 = s->peer->id; p->msg.arg0 = s->peer->id;
p->msg.arg1 = s->id; p->msg.arg1 = s->id;
send_packet(p, s->transport); send_packet(p, s->transport);
} }
static void remote_socket_shutdown(asocket *s) static void remote_socket_shutdown(asocket* s) {
{ D("entered remote_socket_shutdown RS(%d) CLOSE fd=%d peer->fd=%d", s->id, s->fd,
D("entered remote_socket_shutdown RS(%d) CLOSE fd=%d peer->fd=%d", s->peer ? s->peer->fd : -1);
s->id, s->fd, s->peer?s->peer->fd:-1); apacket* p = get_apacket();
apacket *p = get_apacket();
p->msg.command = A_CLSE; p->msg.command = A_CLSE;
if(s->peer) { if (s->peer) {
p->msg.arg0 = s->peer->id; p->msg.arg0 = s->peer->id;
} }
p->msg.arg1 = s->id; p->msg.arg1 = s->id;
send_packet(p, s->transport); send_packet(p, s->transport);
} }
static void remote_socket_close(asocket *s) static void remote_socket_close(asocket* s) {
{
if (s->peer) { if (s->peer) {
s->peer->peer = 0; s->peer->peer = 0;
D("RS(%d) peer->close()ing peer->id=%d peer->fd=%d", D("RS(%d) peer->close()ing peer->id=%d peer->fd=%d", s->id, s->peer->id, s->peer->fd);
s->id, s->peer->id, s->peer->fd);
s->peer->close(s->peer); s->peer->close(s->peer);
} }
D("entered remote_socket_close RS(%d) CLOSE fd=%d peer->fd=%d", D("entered remote_socket_close RS(%d) CLOSE fd=%d peer->fd=%d", s->id, s->fd,
s->id, s->fd, s->peer?s->peer->fd:-1); s->peer ? s->peer->fd : -1);
D("RS(%d): closed", s->id); D("RS(%d): closed", s->id);
free(s); free(s);
} }
@ -534,12 +498,15 @@ static void remote_socket_close(asocket *s)
// |t|. Where |id| is the socket id of the corresponding service on the other // |t|. Where |id| is the socket id of the corresponding service on the other
// side of the transport (it is allocated by the remote side and _cannot_ be 0). // side of the transport (it is allocated by the remote side and _cannot_ be 0).
// Returns a new non-NULL asocket handle. // Returns a new non-NULL asocket handle.
asocket *create_remote_socket(unsigned id, atransport *t) asocket* create_remote_socket(unsigned id, atransport* t) {
{ if (id == 0) {
if (id == 0) fatal("invalid remote socket id (0)"); fatal("invalid remote socket id (0)");
}
asocket* s = reinterpret_cast<asocket*>(calloc(1, sizeof(asocket))); asocket* s = reinterpret_cast<asocket*>(calloc(1, sizeof(asocket)));
if (s == NULL) fatal("cannot allocate socket"); if (s == NULL) {
fatal("cannot allocate socket");
}
s->id = id; s->id = id;
s->enqueue = remote_socket_enqueue; s->enqueue = remote_socket_enqueue;
s->ready = remote_socket_ready; s->ready = remote_socket_ready;
@ -551,13 +518,12 @@ asocket *create_remote_socket(unsigned id, atransport *t)
return s; return s;
} }
void connect_to_remote(asocket *s, const char *destination) void connect_to_remote(asocket* s, const char* destination) {
{
D("Connect_to_remote call RS(%d) fd=%d", s->id, s->fd); D("Connect_to_remote call RS(%d) fd=%d", s->id, s->fd);
apacket *p = get_apacket(); apacket* p = get_apacket();
size_t len = strlen(destination) + 1; size_t len = strlen(destination) + 1;
if(len > (s->get_max_payload()-1)) { if (len > (s->get_max_payload() - 1)) {
fatal("destination oversized"); fatal("destination oversized");
} }
@ -565,15 +531,13 @@ void connect_to_remote(asocket *s, const char *destination)
p->msg.command = A_OPEN; p->msg.command = A_OPEN;
p->msg.arg0 = s->id; p->msg.arg0 = s->id;
p->msg.data_length = len; p->msg.data_length = len;
strcpy((char*) p->data, destination); strcpy((char*)p->data, destination);
send_packet(p, s->transport); send_packet(p, s->transport);
} }
/* this is used by magic sockets to rig local sockets to /* this is used by magic sockets to rig local sockets to
send the go-ahead message when they connect */ send the go-ahead message when they connect */
static void local_socket_ready_notify(asocket *s) static void local_socket_ready_notify(asocket* s) {
{
s->ready = local_socket_ready; s->ready = local_socket_ready;
s->shutdown = NULL; s->shutdown = NULL;
s->close = local_socket_close; s->close = local_socket_close;
@ -584,8 +548,7 @@ static void local_socket_ready_notify(asocket *s)
/* this is used by magic sockets to rig local sockets to /* this is used by magic sockets to rig local sockets to
send the failure message if they are closed before send the failure message if they are closed before
connected (to avoid closing them without a status message) */ connected (to avoid closing them without a status message) */
static void local_socket_close_notify(asocket *s) static void local_socket_close_notify(asocket* s) {
{
s->ready = local_socket_ready; s->ready = local_socket_ready;
s->shutdown = NULL; s->shutdown = NULL;
s->close = local_socket_close; s->close = local_socket_close;
@ -593,28 +556,41 @@ static void local_socket_close_notify(asocket *s)
s->close(s); s->close(s);
} }
static unsigned unhex(unsigned char *s, int len) static unsigned unhex(unsigned char* s, int len) {
{
unsigned n = 0, c; unsigned n = 0, c;
while(len-- > 0) { while (len-- > 0) {
switch((c = *s++)) { switch ((c = *s++)) {
case '0': case '1': case '2': case '0':
case '3': case '4': case '5': case '1':
case '6': case '7': case '8': case '2':
case '9': case '3':
c -= '0'; case '4':
break; case '5':
case 'a': case 'b': case 'c': case '6':
case 'd': case 'e': case 'f': case '7':
c = c - 'a' + 10; case '8':
break; case '9':
case 'A': case 'B': case 'C': c -= '0';
case 'D': case 'E': case 'F': break;
c = c - 'A' + 10; case 'a':
break; case 'b':
default: case 'c':
return 0xffffffff; case 'd':
case 'e':
case 'f':
c = c - 'a' + 10;
break;
case 'A':
case 'B':
case 'C':
case 'D':
case 'E':
case 'F':
c = c - 'A' + 10;
break;
default:
return 0xffffffff;
} }
n = (n << 4) | c; n = (n << 4) | c;
@ -671,31 +647,29 @@ char* skip_host_serial(char* service) {
} // namespace internal } // namespace internal
#endif // ADB_HOST #endif // ADB_HOST
static int smart_socket_enqueue(asocket *s, apacket *p) static int smart_socket_enqueue(asocket* s, apacket* p) {
{
unsigned len; unsigned len;
#if ADB_HOST #if ADB_HOST
char *service = nullptr; char* service = nullptr;
char* serial = nullptr; char* serial = nullptr;
TransportType type = kTransportAny; TransportType type = kTransportAny;
#endif #endif
D("SS(%d): enqueue %d", s->id, p->len); D("SS(%d): enqueue %d", s->id, p->len);
if(s->pkt_first == 0) { if (s->pkt_first == 0) {
s->pkt_first = p; s->pkt_first = p;
s->pkt_last = p; s->pkt_last = p;
} else { } else {
if((s->pkt_first->len + p->len) > s->get_max_payload()) { if ((s->pkt_first->len + p->len) > s->get_max_payload()) {
D("SS(%d): overflow", s->id); D("SS(%d): overflow", s->id);
put_apacket(p); put_apacket(p);
goto fail; goto fail;
} }
memcpy(s->pkt_first->data + s->pkt_first->len, memcpy(s->pkt_first->data + s->pkt_first->len, p->data, p->len);
p->data, p->len);
s->pkt_first->len += p->len; s->pkt_first->len += p->len;
put_apacket(p); put_apacket(p);
@ -703,7 +677,9 @@ static int smart_socket_enqueue(asocket *s, apacket *p)
} }
/* don't bother if we can't decode the length */ /* don't bother if we can't decode the length */
if(p->len < 4) return 0; if (p->len < 4) {
return 0;
}
len = unhex(p->data, 4); len = unhex(p->data, 4);
if ((len < 1) || (len > MAX_PAYLOAD_V1)) { if ((len < 1) || (len > MAX_PAYLOAD_V1)) {
@ -711,27 +687,27 @@ static int smart_socket_enqueue(asocket *s, apacket *p)
goto fail; goto fail;
} }
D("SS(%d): len is %d", s->id, len ); D("SS(%d): len is %d", s->id, len);
/* can't do anything until we have the full header */ /* can't do anything until we have the full header */
if((len + 4) > p->len) { if ((len + 4) > p->len) {
D("SS(%d): waiting for %d more bytes", s->id, len+4 - p->len); D("SS(%d): waiting for %d more bytes", s->id, len + 4 - p->len);
return 0; return 0;
} }
p->data[len + 4] = 0; p->data[len + 4] = 0;
D("SS(%d): '%s'", s->id, (char*) (p->data + 4)); D("SS(%d): '%s'", s->id, (char*)(p->data + 4));
#if ADB_HOST #if ADB_HOST
service = (char *)p->data + 4; service = (char*)p->data + 4;
if(!strncmp(service, "host-serial:", strlen("host-serial:"))) { if (!strncmp(service, "host-serial:", strlen("host-serial:"))) {
char* serial_end; char* serial_end;
service += strlen("host-serial:"); service += strlen("host-serial:");
// serial number should follow "host:" and could be a host:port string. // serial number should follow "host:" and could be a host:port string.
serial_end = internal::skip_host_serial(service); serial_end = internal::skip_host_serial(service);
if (serial_end) { if (serial_end) {
*serial_end = 0; // terminate string *serial_end = 0; // terminate string
serial = service; serial = service;
service = serial_end + 1; service = serial_end + 1;
} }
@ -749,42 +725,42 @@ static int smart_socket_enqueue(asocket *s, apacket *p)
} }
if (service) { if (service) {
asocket *s2; asocket* s2;
/* some requests are handled immediately -- in that /* some requests are handled immediately -- in that
** case the handle_host_request() routine has sent ** case the handle_host_request() routine has sent
** the OKAY or FAIL message and all we have to do ** the OKAY or FAIL message and all we have to do
** is clean up. ** is clean up.
*/ */
if(handle_host_request(service, type, serial, s->peer->fd, s) == 0) { if (handle_host_request(service, type, serial, s->peer->fd, s) == 0) {
/* XXX fail message? */ /* XXX fail message? */
D( "SS(%d): handled host service '%s'", s->id, service ); D("SS(%d): handled host service '%s'", s->id, service);
goto fail; goto fail;
} }
if (!strncmp(service, "transport", strlen("transport"))) { if (!strncmp(service, "transport", strlen("transport"))) {
D( "SS(%d): okay transport", s->id ); D("SS(%d): okay transport", s->id);
p->len = 0; p->len = 0;
return 0; return 0;
} }
/* try to find a local service with this name. /* try to find a local service with this name.
** if no such service exists, we'll fail out ** if no such service exists, we'll fail out
** and tear down here. ** and tear down here.
*/ */
s2 = create_host_service_socket(service, serial); s2 = create_host_service_socket(service, serial);
if(s2 == 0) { if (s2 == 0) {
D( "SS(%d): couldn't create host service '%s'", s->id, service ); D("SS(%d): couldn't create host service '%s'", s->id, service);
SendFail(s->peer->fd, "unknown host service"); SendFail(s->peer->fd, "unknown host service");
goto fail; goto fail;
} }
/* we've connected to a local host service, /* we've connected to a local host service,
** so we make our peer back into a regular ** so we make our peer back into a regular
** local socket and bind it to the new local ** local socket and bind it to the new local
** service socket, acknowledge the successful ** service socket, acknowledge the successful
** connection, and close this smart socket now ** connection, and close this smart socket now
** that its work is done. ** that its work is done.
*/ */
SendOkay(s->peer->fd); SendOkay(s->peer->fd);
s->peer->ready = local_socket_ready; s->peer->ready = local_socket_ready;
@ -793,10 +769,10 @@ static int smart_socket_enqueue(asocket *s, apacket *p)
s->peer->peer = s2; s->peer->peer = s2;
s2->peer = s->peer; s2->peer = s->peer;
s->peer = 0; s->peer = 0;
D( "SS(%d): okay", s->id ); D("SS(%d): okay", s->id);
s->close(s); s->close(s);
/* initial state is "ready" */ /* initial state is "ready" */
s2->ready(s2); s2->ready(s2);
return 0; return 0;
} }
@ -811,53 +787,50 @@ static int smart_socket_enqueue(asocket *s, apacket *p)
} }
#endif #endif
if(!(s->transport) || (s->transport->connection_state == kCsOffline)) { if (!(s->transport) || (s->transport->connection_state == kCsOffline)) {
/* if there's no remote we fail the connection /* if there's no remote we fail the connection
** right here and terminate it ** right here and terminate it
*/ */
SendFail(s->peer->fd, "device offline (x)"); SendFail(s->peer->fd, "device offline (x)");
goto fail; goto fail;
} }
/* instrument our peer to pass the success or fail
/* instrument our peer to pass the success or fail ** message back once it connects or closes, then
** message back once it connects or closes, then ** detach from it, request the connection, and
** detach from it, request the connection, and ** tear down
** tear down */
*/
s->peer->ready = local_socket_ready_notify; s->peer->ready = local_socket_ready_notify;
s->peer->shutdown = nullptr; s->peer->shutdown = nullptr;
s->peer->close = local_socket_close_notify; s->peer->close = local_socket_close_notify;
s->peer->peer = 0; s->peer->peer = 0;
/* give him our transport and upref it */ /* give him our transport and upref it */
s->peer->transport = s->transport; s->peer->transport = s->transport;
connect_to_remote(s->peer, (char*) (p->data + 4)); connect_to_remote(s->peer, (char*)(p->data + 4));
s->peer = 0; s->peer = 0;
s->close(s); s->close(s);
return 1; return 1;
fail: fail:
/* we're going to close our peer as a side-effect, so /* we're going to close our peer as a side-effect, so
** return -1 to signal that state to the local socket ** return -1 to signal that state to the local socket
** who is enqueueing against us ** who is enqueueing against us
*/ */
s->close(s); s->close(s);
return -1; return -1;
} }
static void smart_socket_ready(asocket *s) static void smart_socket_ready(asocket* s) {
{
D("SS(%d): ready", s->id); D("SS(%d): ready", s->id);
} }
static void smart_socket_close(asocket *s) static void smart_socket_close(asocket* s) {
{
D("SS(%d): closed", s->id); D("SS(%d): closed", s->id);
if(s->pkt_first){ if (s->pkt_first) {
put_apacket(s->pkt_first); put_apacket(s->pkt_first);
} }
if(s->peer) { if (s->peer) {
s->peer->peer = 0; s->peer->peer = 0;
s->peer->close(s->peer); s->peer->close(s->peer);
s->peer = 0; s->peer = 0;
@ -865,10 +838,9 @@ static void smart_socket_close(asocket *s)
free(s); free(s);
} }
static asocket *create_smart_socket(void) static asocket* create_smart_socket(void) {
{
D("Creating smart socket"); D("Creating smart socket");
asocket *s = reinterpret_cast<asocket*>(calloc(1, sizeof(asocket))); asocket* s = reinterpret_cast<asocket*>(calloc(1, sizeof(asocket)));
if (s == NULL) fatal("cannot allocate socket"); if (s == NULL) fatal("cannot allocate socket");
s->enqueue = smart_socket_enqueue; s->enqueue = smart_socket_enqueue;
s->ready = smart_socket_ready; s->ready = smart_socket_ready;
@ -879,10 +851,9 @@ static asocket *create_smart_socket(void)
return s; return s;
} }
void connect_to_smartsocket(asocket *s) void connect_to_smartsocket(asocket* s) {
{
D("Connecting to smart socket"); D("Connecting to smart socket");
asocket *ss = create_smart_socket(); asocket* ss = create_smart_socket();
s->peer = ss; s->peer = ss;
ss->peer = s; ss->peer = s;
s->ready(s); s->ready(s);

107
adb/sysdeps/mutex.h Normal file
View file

@ -0,0 +1,107 @@
#pragma once
/*
* Copyright (C) 2016 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if defined(_WIN32)
#include <windows.h>
#include <android-base/macros.h>
#include "adb.h"
// The prebuilt version of mingw we use doesn't support mutex or recursive_mutex.
// Therefore, implement our own using the Windows primitives.
// Put them directly into the std namespace, so that when they're actually available, the build
// breaks until they're removed.
#include <mutex>
namespace std {
// CRITICAL_SECTION is recursive, so just wrap it in a Mutex-compatible class.
class recursive_mutex {
public:
recursive_mutex() {
InitializeCriticalSection(&mutex_);
}
~recursive_mutex() {
DeleteCriticalSection(&mutex_);
}
void lock() {
EnterCriticalSection(&mutex_);
}
bool try_lock() {
return TryEnterCriticalSection(&mutex_);
}
void unlock() {
LeaveCriticalSection(&mutex_);
}
private:
CRITICAL_SECTION mutex_;
DISALLOW_COPY_AND_ASSIGN(recursive_mutex);
};
class mutex {
public:
mutex() {
}
~mutex() {
}
void lock() {
mutex_.lock();
if (++lock_count_ != 1) {
fatal("non-recursive mutex locked reentrantly");
}
}
void unlock() {
if (--lock_count_ != 0) {
fatal("non-recursive mutex unlock resulted in unexpected lock count: %d", lock_count_);
}
mutex_.unlock();
}
bool try_lock() {
if (!mutex_.try_lock()) {
return false;
}
if (lock_count_ != 0) {
mutex_.unlock();
return false;
}
++lock_count_;
return true;
}
private:
recursive_mutex mutex_;
size_t lock_count_ = 0;
};
}
#endif

View file

@ -244,3 +244,60 @@ TEST_F(sysdeps_poll, fd_count) {
adb_close(fd); adb_close(fd);
} }
} }
#include "sysdeps/mutex.h"
TEST(sysdeps_mutex, mutex_smoke) {
static std::atomic<bool> finished(false);
static std::mutex &m = *new std::mutex();
m.lock();
ASSERT_FALSE(m.try_lock());
adb_thread_create([](void*) {
ASSERT_FALSE(m.try_lock());
m.lock();
finished.store(true);
adb_sleep_ms(200);
m.unlock();
}, nullptr);
ASSERT_FALSE(finished.load());
adb_sleep_ms(100);
ASSERT_FALSE(finished.load());
m.unlock();
adb_sleep_ms(100);
m.lock();
ASSERT_TRUE(finished.load());
m.unlock();
}
// Our implementation on Windows aborts on double lock.
#if defined(_WIN32)
TEST(sysdeps_mutex, mutex_reentrant_lock) {
std::mutex &m = *new std::mutex();
m.lock();
ASSERT_FALSE(m.try_lock());
EXPECT_DEATH(m.lock(), "non-recursive mutex locked reentrantly");
}
#endif
TEST(sysdeps_mutex, recursive_mutex_smoke) {
static std::recursive_mutex &m = *new std::recursive_mutex();
m.lock();
ASSERT_TRUE(m.try_lock());
m.unlock();
adb_thread_create([](void*) {
ASSERT_FALSE(m.try_lock());
m.lock();
adb_sleep_ms(500);
m.unlock();
}, nullptr);
adb_sleep_ms(100);
m.unlock();
adb_sleep_ms(100);
ASSERT_FALSE(m.try_lock());
m.lock();
m.unlock();
}