Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make control socket more robust and support larger messages #67

Merged
merged 8 commits into from
Aug 29, 2024
221 changes: 151 additions & 70 deletions communications.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ listen_for_commands(void)
struct sockaddr_un sun;

if ((rp_glob_screen.control_socket_fd = socket(AF_UNIX,
SOCK_STREAM | SOCK_NONBLOCK, 0)) == -1)
SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0)) == -1)
err(1, "socket");

if (strlen(rp_glob_screen.control_socket_path) >= sizeof(sun.sun_path))
Expand Down Expand Up @@ -82,17 +82,115 @@ listen_for_commands(void)
rp_glob_screen.control_socket_path));
}

static ssize_t
recv_unix(int fd, char **callerbuf)
{
int firstloop;
char *message;
ssize_t len, count;

int flags = 0x0;
int retries = 0;

#ifdef SENDCMD_DEBUG
pid_t pid = getpid();
char *dpfx = xsprintf("recv_unix_%d", pid);
#endif
WARNX_DEBUG("%s: enter\n", dpfx);

message = xmalloc(BUFSZ);
memset(message, 0, BUFSZ);

len = 0;
firstloop = 1;

while ((count = recv(fd, message + len, BUFSZ, flags))) {
if (count == -1) {
switch (errno) {
/*
* message is complete
*/
case ECONNRESET: /* sender finished and closed */
case EAGAIN: /* no more left to read */
WARNX_DEBUG("%s: done: e%d\n", dpfx, errno);
break;
/*
* transient conditions for retrying
*/
case ECONNREFUSED:
case ENOMEM:
if (retries++ >= 5) {
warn("recv_unix: retries exhausted");
len = -1;
break;
}
usleep(200);
/* fallthrough */
case EINTR:
warn("recv_unix: trying again");
continue;
/*
* usage error or untenable situation:
* EBADF, EFAULT, EINVAL, ENOTCONN, ENOTSOCK
*/
default:
warn("unanticipated receive error");
len = -1;
}
break;
}
if (firstloop) {
WARNX_DEBUG("%s: first recv: %zd\n", dpfx, count);
/*
* after blocking for the first buffer, we can
* keep reading until it blocks again, which
* should exhaust the message.
*/
flags += MSG_DONTWAIT;
}
len += count;
message = xrealloc(message, len + BUFSZ);
memset(message + len, 0, BUFSZ);
WARNX_DEBUG("%s: looping after count %zd\n", dpfx, count);
firstloop = 0;
}
#ifdef SENDCMD_DEBUG
free(dpfx);
#endif
*callerbuf = message;
return len;
smemsh marked this conversation as resolved.
Show resolved Hide resolved
}

static ssize_t
send_unix(int fd, char *buf, ssize_t sz)
{
ssize_t ret, off = 0;

WARNX_DEBUG("entered send_unix with sz %zd\n", sz);

while (sz > 0) {
if (((ret = write(fd, buf + off, sz)) != sz) && ret == -1) {
if (errno == EINTR)
continue;
warn("send_unix: bad write");
break;
}
sz -= ret;
off += ret;
}

WARNX_DEBUG("leaving send_unix, off %zd, errno %d\n", off, errno);
return off;
}

int
send_command(int interactive, unsigned char *cmd)
send_command(int interactive, char *cmd)
{
struct sockaddr_un sun;
char *wcmd, *bufstart;
char ret[BUFSZ+1];
char *wcmd, *response;
char success = 0;
size_t len;
ssize_t count;
int fd, firstloop;
int flags = 0x0;
ssize_t len;
int fd;
FILE *outf = NULL;

#ifdef SENDCMD_DEBUG
Expand All @@ -101,10 +199,10 @@ send_command(int interactive, unsigned char *cmd)
#endif
WARNX_DEBUG("%s: enter\n", dpfx);

len = 1 + strlen((char *)cmd) + 2;
wcmd = malloc(len);
if (snprintf(wcmd, len, "%c%s\n", interactive, cmd) != (len - 1))
errx(1, "snprintf");
len = 1 + strlen(cmd) + 1;
wcmd = xmalloc(len);
*wcmd = (unsigned char)interactive;
strncpy(wcmd + 1, cmd, len - 1);

if ((fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
err(1, "socket");
Expand All @@ -122,43 +220,30 @@ send_command(int interactive, unsigned char *cmd)
err(1, "failed to connect to control socket at %s",
rp_glob_screen.control_socket_path);

if (write(fd, wcmd, len) != len)
err(1, "short write to control socket");
if (send_unix(fd, wcmd, len) != len)
err(1, "%s: aborting after bad write", __func__);

free(wcmd);

firstloop = 1;
while ((count = recv(fd, &ret, BUFSZ, flags))) {
bufstart = ret;
if (firstloop) {
WARNX_DEBUG("%s: first recv: %zu\n", dpfx, count);
/* first byte is exit status */
success = *ret;
outf = success ? stdout : stderr;
bufstart++;
if (count == 2 && *bufstart == '\n')
/* commands that had no output */
return success;
/*
* after blocking for the first buffer, we can keep
* reading until it blocks again, which should exhaust
* the response. don't want to block when the message
* is finished: sometimes connection is closed, other
* times it blocks, not sure why? both end a response
*/
flags += MSG_DONTWAIT;
}
if (count == -1) {
WARNX_DEBUG("%s: finish errno: %d\n", dpfx, errno);
if (errno == EAGAIN || errno == ECONNRESET)
return success;
if ((len = recv_unix(fd, &response)) == -1)
warnx("send_message: aborted reply from receiver");

/* first byte is exit status */
success = *response;
outf = success ? stdout : stderr;

if (len > 1) {
/* command had some output */
if (response[len] != '\0') {
/* should not be possible, TODO remove */
warnx("%s\n", "last byte of response not null");
response[len] = '\0';
}
ret[count] = '\0';
fprintf(outf, "%s", bufstart);
fprintf(outf, "%s", response + 1);
fflush(outf);
firstloop = 0;
WARNX_DEBUG("%s: looping\n", dpfx);
}
free(response);

WARNX_DEBUG("%s: no more bytes\n", dpfx);
#ifdef SENDCMD_DEBUG
free(dpfx);
Expand All @@ -171,8 +256,7 @@ void
receive_command(void)
{
cmdret *cmd_ret;
char cmd[BUFSZ] = { 0 }, c;
char *result, *rcmd;
char *result, *rcmd, *cmd;
int cl, len = 0, interactive = 0;

PRINT_DEBUG(("have connection waiting on command socket\n"));
Expand All @@ -181,27 +265,23 @@ receive_command(void)
warn("accept");
return;
}

while (len <= sizeof(cmd)) {
if (len == sizeof(cmd)) {
warn("%s: bogus command length", __func__);
close(cl);
return;
}

if (read(cl, &c, 1) == 1) {
if (c == '\n') {
cmd[len++] = '\0';
break;
}
cmd[len++] = c;
} else if (errno != EAGAIN) {
PRINT_DEBUG(("bad read result on control socket: %s\n",
strerror(errno)));
break;
}
if ((fcntl(cl, F_SETFD, FD_CLOEXEC)) == -1) {
warn("cloexec");
close(cl);
return;
}

if ((len = recv_unix(cl, &cmd)) <= 1) {
warnx("receive_command: %s\n",
(len == -1 ? "encountered error during receive"
: "received command was malformed"));
goto done;
}
if (cmd[len] != '\0') {
/* should not be possible, TODO remove */
warnx("%s\n", "last byte of sent command not null");
cmd[len] = '\0';
}
interactive = cmd[0];
rcmd = cmd + 1;

Expand All @@ -212,22 +292,23 @@ receive_command(void)
/* notify the client of any text that was returned by the command */
len = 2;
if (cmd_ret->output) {
result = xsprintf("%c%s\n", cmd_ret->success ? 1 : 0,
result = xsprintf("%c%s", cmd_ret->success ? 1 : 0,
cmd_ret->output);
len = 1 + strlen(cmd_ret->output) + 1;
} else if (cmd_ret->success)
result = xsprintf("%c\n", 1);
result = xsprintf("%c", 1);
else
result = xsprintf("%c\n", 0);
result = xsprintf("%c", 0);

cmdret_free(cmd_ret);

PRINT_DEBUG(("writing back %d to command client: %s", len, result + 1));

if (write(cl, result, len) != len)
warn("%s: short write", __func__);
if (send_unix(cl, result, len) != len)
warnx("%s: proceeding after bad write", __func__);

PRINT_DEBUG(("receive_command: write finished, closing\n"));

done:
free(cmd);
close(cl);
}
2 changes: 1 addition & 1 deletion communications.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

void init_control_socket_path(void);
void listen_for_commands(void);
int send_command(int interactive, unsigned char *cmd);
int send_command(int interactive, char *cmd);
void receive_command(void);

#endif /* ! _SDORFEHS_COMMUNICATIONS_H */
2 changes: 1 addition & 1 deletion sdorfehs.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ main(int argc, char *argv[])
int j, exit_status = 0;

for (j = 0; j < cmd_count; j++) {
if (!send_command(interactive, (unsigned char *)cmd[j]))
if (!send_command(interactive, cmd[j]))
exit_status = 1;
free(cmd[j]);
}
Expand Down
Loading