You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

485 lines
14 KiB

From 3d08f71f576a381955f07a91198f5dcb320026ba Mon Sep 17 00:00:00 2001
From: Alexander Scheel <ascheel@redhat.com>
Date: Wed, 2 Aug 2017 15:11:49 -0400
Subject: [PATCH] [client] Switch to non-blocking sockets
Switch the gssproxy client library to non-blocking sockets, allowing
for timeout and retry operations. The client will automatically retry
both send() and recv() operations three times on ETIMEDOUT. If the
combined send() and recv() hit the three time limit, ETIMEDOUT will be
exposed to the caller in the minor status.
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
Reviewed-by: Simo Sorce <simo@redhat.com>
[rharwood@redhat.com: commit message cleanups, rebased]
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
(cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
---
proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++++++++++---
1 file changed, 295 insertions(+), 22 deletions(-)
diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
index 2133618..dba23a6 100644
--- a/proxy/src/client/gpm_common.c
+++ b/proxy/src/client/gpm_common.c
@@ -7,9 +7,15 @@
#include <stdlib.h>
#include <time.h>
#include <pthread.h>
+#include <sys/epoll.h>
+#include <fcntl.h>
+#include <sys/timerfd.h>
#define FRAGMENT_BIT (1 << 31)
+#define RESPONSE_TIMEOUT 15
+#define MAX_TIMEOUT_RETRY 3
+
struct gpm_ctx {
pthread_mutex_t lock;
int fd;
@@ -20,6 +26,9 @@ struct gpm_ctx {
gid_t gid;
int next_xid;
+
+ int epollfd;
+ int timerfd;
};
/* a single global struct is not particularly efficient,
@@ -39,6 +48,8 @@ static void gpm_init_once(void)
pthread_mutex_init(&gpm_global_ctx.lock, &attr);
gpm_global_ctx.fd = -1;
+ gpm_global_ctx.epollfd = -1;
+ gpm_global_ctx.timerfd = -1;
seedp = time(NULL) + getpid() + pthread_self();
gpm_global_ctx.next_xid = rand_r(&seedp);
@@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
struct sockaddr_un addr = {0};
char name[PATH_MAX];
int ret;
+ unsigned flags;
int fd = -1;
ret = get_pipe_name(name);
@@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
goto done;
}
+ ret = fcntl(fd, F_GETFD, &flags);
+ if (ret != 0) {
+ ret = errno;
+ goto done;
+ }
+
+ ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
+ if (ret != 0) {
+ ret = errno;
+ goto done;
+ }
+
ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
if (ret == -1) {
ret = errno;
@@ -163,6 +187,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
return pthread_mutex_unlock(&gpmctx->lock);
}
+static void gpm_timer_close(struct gpm_ctx *gpmctx) {
+ if (gpmctx->timerfd < 0) {
+ return;
+ }
+
+ close(gpmctx->timerfd);
+ gpmctx->timerfd = -1;
+}
+
+static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
+ int ret;
+ struct itimerspec its;
+
+ if (gpmctx->timerfd >= 0) {
+ gpm_timer_close(gpmctx);
+ }
+
+ gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
+ if (gpmctx->timerfd < 0) {
+ return errno;
+ }
+
+ its.it_interval.tv_sec = timeout_seconds;
+ its.it_interval.tv_nsec = 0;
+ its.it_value.tv_sec = timeout_seconds;
+ its.it_value.tv_nsec = 0;
+
+ ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
+ if (ret) {
+ ret = errno;
+ gpm_timer_close(gpmctx);
+ return ret;
+ }
+
+ return 0;
+}
+
+static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
+ if (gpmctx->epollfd < 0) {
+ return;
+ }
+
+ close(gpmctx->epollfd);
+ gpmctx->epollfd = -1;
+}
+
+static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
+ struct epoll_event ev;
+ int ret;
+
+ if (gpmctx->epollfd >= 0) {
+ gpm_epoll_close(gpmctx);
+ }
+
+ gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
+ if (gpmctx->epollfd == -1) {
+ return errno;
+ }
+
+ /* Add timer */
+ ev.events = EPOLLIN;
+ ev.data.fd = gpmctx->timerfd;
+ ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev);
+ if (ret == -1) {
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ return ret;
+ }
+
+ return ret;
+}
+
+static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
+ int ret;
+ int epoll_ret;
+ struct epoll_event ev;
+ struct epoll_event events[2];
+ uint64_t timer_read;
+
+ if (gpmctx->epollfd < 0) {
+ ret = gpm_epoll_setup(gpmctx);
+ if (ret)
+ return ret;
+ }
+
+ ev.events = event_flags;
+ ev.data.fd = gpmctx->fd;
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev);
+ if (epoll_ret == -1) {
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ return ret;
+ }
+
+ do {
+ epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
+ } while (epoll_ret < 0 && errno == EINTR);
+
+ if (epoll_ret < 0) {
+ /* Error while waiting that isn't EINTR */
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ } else if (epoll_ret == 0) {
+ /* Shouldn't happen as timeout == -1; treat it like a timeout
+ * occurred. */
+ ret = ETIMEDOUT;
+ gpm_epoll_close(gpmctx);
+ } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
+ /* Got an event which is only our timer */
+ ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
+ if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ /* In the case when reading from the timer failed, don't hide the
+ * timer error behind ETIMEDOUT such that it isn't retried */
+ ret = errno;
+ } else {
+ /* If ret == 0, then we definitely timed out. Else, if ret == -1
+ * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
+ * edge case where epoll thinks the timer can be read, but it
+ * is blocking more; treat it like a TIMEOUT and retry, as
+ * nothing around us would handle EAGAIN from timer and retry
+ * it. */
+ ret = ETIMEDOUT;
+ }
+ gpm_epoll_close(gpmctx);
+ } else {
+ /* If ret == 2, then we ignore the timerfd; that way if the next
+ * operation cannot be performed immediately, we timeout and retry.
+ * If ret == 1 and data.fd == gpmctx->fd, return 0. */
+ ret = 0;
+ }
+
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
+ if (epoll_ret == -1) {
+ /* If we previously had an error, expose that error instead of
+ * clobbering it with errno; else if no error, then assume it is
+ * better to notify of the error deleting the event than it is
+ * to continue. */
+ if (ret == 0)
+ ret = errno;
+ gpm_epoll_close(gpmctx);
+ }
+
+ return ret;
+}
+
+static int gpm_retry_socket(struct gpm_ctx *gpmctx)
+{
+ gpm_epoll_close(gpmctx);
+ gpm_close_socket(gpmctx);
+ return gpm_open_socket(gpmctx);
+}
+
/* must be called after the lock has been grabbed */
static int gpm_send_buffer(struct gpm_ctx *gpmctx,
char *buffer, uint32_t length)
@@ -183,8 +359,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
retry = false;
do {
do {
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
+ if (ret != 0) {
+ goto done;
+ }
+
ret = 0;
- wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
+ wn = write(gpmctx->fd, &size, sizeof(uint32_t));
if (wn == -1) {
ret = errno;
}
@@ -192,8 +373,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
if (wn != 4) {
/* reopen and retry once */
if (retry == false) {
- gpm_close_socket(gpmctx);
- ret = gpm_open_socket(gpmctx);
+ ret = gpm_retry_socket(gpmctx);
if (ret == 0) {
retry = true;
continue;
@@ -208,9 +388,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
pos = 0;
while (length > pos) {
- wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
+ if (ret) {
+ goto done;
+ }
+
+ wn = write(gpmctx->fd, buffer + pos, length - pos);
if (wn == -1) {
- if (errno == EINTR) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
ret = errno;
@@ -231,7 +416,7 @@ done:
/* must be called after the lock has been grabbed */
static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
- char *buffer, uint32_t *length)
+ char **buffer, uint32_t *length)
{
uint32_t size;
ssize_t rn;
@@ -239,6 +424,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
int ret;
do {
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
+ if (ret) {
+ goto done;
+ }
+
ret = 0;
rn = read(gpmctx->fd, &size, sizeof(uint32_t));
if (rn == -1) {
@@ -258,11 +448,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
goto done;
}
+ *buffer = malloc(*length);
+ if (*buffer == NULL) {
+ ret = ENOMEM;
+ goto done;
+ }
+
pos = 0;
while (*length > pos) {
- rn = read(gpmctx->fd, buffer + pos, *length - pos);
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
+ if (ret) {
+ goto done;
+ }
+
+ rn = read(gpmctx->fd, *buffer + pos, *length - pos);
if (rn == -1) {
- if (errno == EINTR) {
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
ret = errno;
@@ -281,6 +482,7 @@ done:
if (ret) {
/* on errors we can only close the fd and return */
gpm_close_socket(gpmctx);
+ gpm_epoll_close(gpmctx);
}
return ret;
}
@@ -309,6 +511,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
return &gpm_global_ctx;
}
+static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
+ uint32_t send_length, char** recv_buffer,
+ uint32_t *recv_length)
+{
+ int ret;
+ int retry_count;
+
+ /* setup timer */
+ ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
+ if (ret)
+ return ret;
+
+ for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
+ /* send to proxy */
+ ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
+
+ if (ret == 0) {
+ /* No error, continue to recv */
+ } else if (ret == ETIMEDOUT) {
+ /* Close and reopen socket before trying again */
+ ret = gpm_retry_socket(gpmctx);
+ if (ret != 0)
+ return ret;
+ ret = ETIMEDOUT;
+
+ /* RETRY entire send */
+ continue;
+ } else {
+ /* Other error */
+ return ret;
+ }
+
+ /* receive answer */
+ ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
+ if (ret == 0) {
+ /* No error */
+ break;
+ } else if (ret == ETIMEDOUT) {
+ /* Close and reopen socket before trying again */
+ ret = gpm_retry_socket(gpmctx);
+
+ /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
+ free(recv_buffer);
+ recv_buffer = NULL;
+
+ if (ret != 0)
+ return ret;
+ ret = ETIMEDOUT;
+ } else {
+ /* Other error */
+ return ret;
+ }
+ }
+
+ return ret;
+}
+
OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
gss_buffer_t buffer)
{
@@ -399,15 +658,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
gp_rpc_msg msg;
XDR xdr_call_ctx;
XDR xdr_reply_ctx;
- char buffer[MAX_RPC_SIZE];
- uint32_t length;
+ char *send_buffer = NULL;
+ char *recv_buffer = NULL;
+ uint32_t send_length;
+ uint32_t recv_length;
uint32_t xid;
bool xdrok;
bool sockgrab = false;
int ret;
- xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
- xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
+ send_buffer = malloc(MAX_RPC_SIZE);
+ if (send_buffer == NULL)
+ return ENOMEM;
+
+ xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
memset(&msg, 0, sizeof(gp_rpc_msg));
msg.header.type = GP_RPC_CALL;
@@ -450,22 +714,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
goto done;
}
- /* send to proxy */
- ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
- if (ret) {
- goto done;
- }
+ /* set send_length */
+ send_length = xdr_getpos(&xdr_call_ctx);
- /* receive answer */
- ret = gpm_recv_buffer(gpmctx, buffer, &length);
- if (ret) {
+ /* Send request, receive response with timeout */
+ ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
+ &recv_length);
+ if (ret)
goto done;
- }
/* release the lock */
gpm_release_sock(gpmctx);
sockgrab = false;
+ /* Create the reply context */
+ xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
+
/* decode header */
memset(&msg, 0, sizeof(gp_rpc_msg));
xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg);
@@ -489,12 +753,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
}
done:
+ gpm_timer_close(gpmctx);
+ gpm_epoll_close(gpmctx);
+
if (sockgrab) {
gpm_release_sock(gpmctx);
}
xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg);
xdr_destroy(&xdr_call_ctx);
- xdr_destroy(&xdr_reply_ctx);
+
+ if (recv_buffer != NULL)
+ xdr_destroy(&xdr_reply_ctx);
+
+ free(send_buffer);
+ free(recv_buffer);
+
return ret;
}