You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
486 lines
14 KiB
486 lines
14 KiB
7 years ago
|
From 3d08f71f576a381955f07a91198f5dcb320026ba Mon Sep 17 00:00:00 2001
|
||
|
From: Alexander Scheel <ascheel@redhat.com>
|
||
|
Date: Wed, 2 Aug 2017 15:11:49 -0400
|
||
|
Subject: [PATCH] [client] Switch to non-blocking sockets
|
||
|
|
||
|
Switch the gssproxy client library to non-blocking sockets, allowing
|
||
|
for timeout and retry operations. The client will automatically retry
|
||
|
both send() and recv() operations three times on ETIMEDOUT. If the
|
||
|
combined send() and recv() hit the three time limit, ETIMEDOUT will be
|
||
|
exposed to the caller in the minor status.
|
||
|
|
||
|
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
|
||
|
Reviewed-by: Simo Sorce <simo@redhat.com>
|
||
|
[rharwood@redhat.com: commit message cleanups, rebased]
|
||
|
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
|
||
|
(cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
|
||
|
---
|
||
|
proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++++++++++---
|
||
|
1 file changed, 295 insertions(+), 22 deletions(-)
|
||
|
|
||
|
diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
|
||
|
index 2133618..dba23a6 100644
|
||
|
--- a/proxy/src/client/gpm_common.c
|
||
|
+++ b/proxy/src/client/gpm_common.c
|
||
|
@@ -7,9 +7,15 @@
|
||
|
#include <stdlib.h>
|
||
|
#include <time.h>
|
||
|
#include <pthread.h>
|
||
|
+#include <sys/epoll.h>
|
||
|
+#include <fcntl.h>
|
||
|
+#include <sys/timerfd.h>
|
||
|
|
||
|
#define FRAGMENT_BIT (1 << 31)
|
||
|
|
||
|
+#define RESPONSE_TIMEOUT 15
|
||
|
+#define MAX_TIMEOUT_RETRY 3
|
||
|
+
|
||
|
struct gpm_ctx {
|
||
|
pthread_mutex_t lock;
|
||
|
int fd;
|
||
|
@@ -20,6 +26,9 @@ struct gpm_ctx {
|
||
|
gid_t gid;
|
||
|
|
||
|
int next_xid;
|
||
|
+
|
||
|
+ int epollfd;
|
||
|
+ int timerfd;
|
||
|
};
|
||
|
|
||
|
/* a single global struct is not particularly efficient,
|
||
|
@@ -39,6 +48,8 @@ static void gpm_init_once(void)
|
||
|
pthread_mutex_init(&gpm_global_ctx.lock, &attr);
|
||
|
|
||
|
gpm_global_ctx.fd = -1;
|
||
|
+ gpm_global_ctx.epollfd = -1;
|
||
|
+ gpm_global_ctx.timerfd = -1;
|
||
|
|
||
|
seedp = time(NULL) + getpid() + pthread_self();
|
||
|
gpm_global_ctx.next_xid = rand_r(&seedp);
|
||
|
@@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
|
||
|
struct sockaddr_un addr = {0};
|
||
|
char name[PATH_MAX];
|
||
|
int ret;
|
||
|
+ unsigned flags;
|
||
|
int fd = -1;
|
||
|
|
||
|
ret = get_pipe_name(name);
|
||
|
@@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
|
||
|
goto done;
|
||
|
}
|
||
|
|
||
|
+ ret = fcntl(fd, F_GETFD, &flags);
|
||
|
+ if (ret != 0) {
|
||
|
+ ret = errno;
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
+ ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
|
||
|
+ if (ret != 0) {
|
||
|
+ ret = errno;
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
|
||
|
if (ret == -1) {
|
||
|
ret = errno;
|
||
|
@@ -163,6 +187,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
|
||
|
return pthread_mutex_unlock(&gpmctx->lock);
|
||
|
}
|
||
|
|
||
|
+static void gpm_timer_close(struct gpm_ctx *gpmctx) {
|
||
|
+ if (gpmctx->timerfd < 0) {
|
||
|
+ return;
|
||
|
+ }
|
||
|
+
|
||
|
+ close(gpmctx->timerfd);
|
||
|
+ gpmctx->timerfd = -1;
|
||
|
+}
|
||
|
+
|
||
|
+static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
|
||
|
+ int ret;
|
||
|
+ struct itimerspec its;
|
||
|
+
|
||
|
+ if (gpmctx->timerfd >= 0) {
|
||
|
+ gpm_timer_close(gpmctx);
|
||
|
+ }
|
||
|
+
|
||
|
+ gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
|
||
|
+ if (gpmctx->timerfd < 0) {
|
||
|
+ return errno;
|
||
|
+ }
|
||
|
+
|
||
|
+ its.it_interval.tv_sec = timeout_seconds;
|
||
|
+ its.it_interval.tv_nsec = 0;
|
||
|
+ its.it_value.tv_sec = timeout_seconds;
|
||
|
+ its.it_value.tv_nsec = 0;
|
||
|
+
|
||
|
+ ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
|
||
|
+ if (ret) {
|
||
|
+ ret = errno;
|
||
|
+ gpm_timer_close(gpmctx);
|
||
|
+ return ret;
|
||
|
+ }
|
||
|
+
|
||
|
+ return 0;
|
||
|
+}
|
||
|
+
|
||
|
+static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
|
||
|
+ if (gpmctx->epollfd < 0) {
|
||
|
+ return;
|
||
|
+ }
|
||
|
+
|
||
|
+ close(gpmctx->epollfd);
|
||
|
+ gpmctx->epollfd = -1;
|
||
|
+}
|
||
|
+
|
||
|
+static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
|
||
|
+ struct epoll_event ev;
|
||
|
+ int ret;
|
||
|
+
|
||
|
+ if (gpmctx->epollfd >= 0) {
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ }
|
||
|
+
|
||
|
+ gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
|
||
|
+ if (gpmctx->epollfd == -1) {
|
||
|
+ return errno;
|
||
|
+ }
|
||
|
+
|
||
|
+ /* Add timer */
|
||
|
+ ev.events = EPOLLIN;
|
||
|
+ ev.data.fd = gpmctx->timerfd;
|
||
|
+ ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev);
|
||
|
+ if (ret == -1) {
|
||
|
+ ret = errno;
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ return ret;
|
||
|
+ }
|
||
|
+
|
||
|
+ return ret;
|
||
|
+}
|
||
|
+
|
||
|
+static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
|
||
|
+ int ret;
|
||
|
+ int epoll_ret;
|
||
|
+ struct epoll_event ev;
|
||
|
+ struct epoll_event events[2];
|
||
|
+ uint64_t timer_read;
|
||
|
+
|
||
|
+ if (gpmctx->epollfd < 0) {
|
||
|
+ ret = gpm_epoll_setup(gpmctx);
|
||
|
+ if (ret)
|
||
|
+ return ret;
|
||
|
+ }
|
||
|
+
|
||
|
+ ev.events = event_flags;
|
||
|
+ ev.data.fd = gpmctx->fd;
|
||
|
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev);
|
||
|
+ if (epoll_ret == -1) {
|
||
|
+ ret = errno;
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ return ret;
|
||
|
+ }
|
||
|
+
|
||
|
+ do {
|
||
|
+ epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
|
||
|
+ } while (epoll_ret < 0 && errno == EINTR);
|
||
|
+
|
||
|
+ if (epoll_ret < 0) {
|
||
|
+ /* Error while waiting that isn't EINTR */
|
||
|
+ ret = errno;
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ } else if (epoll_ret == 0) {
|
||
|
+ /* Shouldn't happen as timeout == -1; treat it like a timeout
|
||
|
+ * occurred. */
|
||
|
+ ret = ETIMEDOUT;
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
|
||
|
+ /* Got an event which is only our timer */
|
||
|
+ ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
|
||
|
+ if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
|
||
|
+ /* In the case when reading from the timer failed, don't hide the
|
||
|
+ * timer error behind ETIMEDOUT such that it isn't retried */
|
||
|
+ ret = errno;
|
||
|
+ } else {
|
||
|
+ /* If ret == 0, then we definitely timed out. Else, if ret == -1
|
||
|
+ * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
|
||
|
+ * edge case where epoll thinks the timer can be read, but it
|
||
|
+ * is blocking more; treat it like a TIMEOUT and retry, as
|
||
|
+ * nothing around us would handle EAGAIN from timer and retry
|
||
|
+ * it. */
|
||
|
+ ret = ETIMEDOUT;
|
||
|
+ }
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ } else {
|
||
|
+ /* If ret == 2, then we ignore the timerfd; that way if the next
|
||
|
+ * operation cannot be performed immediately, we timeout and retry.
|
||
|
+ * If ret == 1 and data.fd == gpmctx->fd, return 0. */
|
||
|
+ ret = 0;
|
||
|
+ }
|
||
|
+
|
||
|
+ epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
|
||
|
+ if (epoll_ret == -1) {
|
||
|
+ /* If we previously had an error, expose that error instead of
|
||
|
+ * clobbering it with errno; else if no error, then assume it is
|
||
|
+ * better to notify of the error deleting the event than it is
|
||
|
+ * to continue. */
|
||
|
+ if (ret == 0)
|
||
|
+ ret = errno;
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ }
|
||
|
+
|
||
|
+ return ret;
|
||
|
+}
|
||
|
+
|
||
|
+static int gpm_retry_socket(struct gpm_ctx *gpmctx)
|
||
|
+{
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+ gpm_close_socket(gpmctx);
|
||
|
+ return gpm_open_socket(gpmctx);
|
||
|
+}
|
||
|
+
|
||
|
/* must be called after the lock has been grabbed */
|
||
|
static int gpm_send_buffer(struct gpm_ctx *gpmctx,
|
||
|
char *buffer, uint32_t length)
|
||
|
@@ -183,8 +359,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
|
||
|
retry = false;
|
||
|
do {
|
||
|
do {
|
||
|
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
|
||
|
+ if (ret != 0) {
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
ret = 0;
|
||
|
- wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
|
||
|
+ wn = write(gpmctx->fd, &size, sizeof(uint32_t));
|
||
|
if (wn == -1) {
|
||
|
ret = errno;
|
||
|
}
|
||
|
@@ -192,8 +373,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
|
||
|
if (wn != 4) {
|
||
|
/* reopen and retry once */
|
||
|
if (retry == false) {
|
||
|
- gpm_close_socket(gpmctx);
|
||
|
- ret = gpm_open_socket(gpmctx);
|
||
|
+ ret = gpm_retry_socket(gpmctx);
|
||
|
if (ret == 0) {
|
||
|
retry = true;
|
||
|
continue;
|
||
|
@@ -208,9 +388,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
|
||
|
|
||
|
pos = 0;
|
||
|
while (length > pos) {
|
||
|
- wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
|
||
|
+ ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
|
||
|
+ if (ret) {
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
+ wn = write(gpmctx->fd, buffer + pos, length - pos);
|
||
|
if (wn == -1) {
|
||
|
- if (errno == EINTR) {
|
||
|
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
|
||
|
continue;
|
||
|
}
|
||
|
ret = errno;
|
||
|
@@ -231,7 +416,7 @@ done:
|
||
|
|
||
|
/* must be called after the lock has been grabbed */
|
||
|
static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
|
||
|
- char *buffer, uint32_t *length)
|
||
|
+ char **buffer, uint32_t *length)
|
||
|
{
|
||
|
uint32_t size;
|
||
|
ssize_t rn;
|
||
|
@@ -239,6 +424,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
|
||
|
int ret;
|
||
|
|
||
|
do {
|
||
|
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
|
||
|
+ if (ret) {
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
ret = 0;
|
||
|
rn = read(gpmctx->fd, &size, sizeof(uint32_t));
|
||
|
if (rn == -1) {
|
||
|
@@ -258,11 +448,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
|
||
|
goto done;
|
||
|
}
|
||
|
|
||
|
+ *buffer = malloc(*length);
|
||
|
+ if (*buffer == NULL) {
|
||
|
+ ret = ENOMEM;
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
pos = 0;
|
||
|
while (*length > pos) {
|
||
|
- rn = read(gpmctx->fd, buffer + pos, *length - pos);
|
||
|
+ ret = gpm_epoll_wait(gpmctx, EPOLLIN);
|
||
|
+ if (ret) {
|
||
|
+ goto done;
|
||
|
+ }
|
||
|
+
|
||
|
+ rn = read(gpmctx->fd, *buffer + pos, *length - pos);
|
||
|
if (rn == -1) {
|
||
|
- if (errno == EINTR) {
|
||
|
+ if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
|
||
|
continue;
|
||
|
}
|
||
|
ret = errno;
|
||
|
@@ -281,6 +482,7 @@ done:
|
||
|
if (ret) {
|
||
|
/* on errors we can only close the fd and return */
|
||
|
gpm_close_socket(gpmctx);
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
}
|
||
|
return ret;
|
||
|
}
|
||
|
@@ -309,6 +511,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
|
||
|
return &gpm_global_ctx;
|
||
|
}
|
||
|
|
||
|
+static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
|
||
|
+ uint32_t send_length, char** recv_buffer,
|
||
|
+ uint32_t *recv_length)
|
||
|
+{
|
||
|
+ int ret;
|
||
|
+ int retry_count;
|
||
|
+
|
||
|
+ /* setup timer */
|
||
|
+ ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
|
||
|
+ if (ret)
|
||
|
+ return ret;
|
||
|
+
|
||
|
+ for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
|
||
|
+ /* send to proxy */
|
||
|
+ ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
|
||
|
+
|
||
|
+ if (ret == 0) {
|
||
|
+ /* No error, continue to recv */
|
||
|
+ } else if (ret == ETIMEDOUT) {
|
||
|
+ /* Close and reopen socket before trying again */
|
||
|
+ ret = gpm_retry_socket(gpmctx);
|
||
|
+ if (ret != 0)
|
||
|
+ return ret;
|
||
|
+ ret = ETIMEDOUT;
|
||
|
+
|
||
|
+ /* RETRY entire send */
|
||
|
+ continue;
|
||
|
+ } else {
|
||
|
+ /* Other error */
|
||
|
+ return ret;
|
||
|
+ }
|
||
|
+
|
||
|
+ /* receive answer */
|
||
|
+ ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
|
||
|
+ if (ret == 0) {
|
||
|
+ /* No error */
|
||
|
+ break;
|
||
|
+ } else if (ret == ETIMEDOUT) {
|
||
|
+ /* Close and reopen socket before trying again */
|
||
|
+ ret = gpm_retry_socket(gpmctx);
|
||
|
+
|
||
|
+ /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
|
||
|
+ free(recv_buffer);
|
||
|
+ recv_buffer = NULL;
|
||
|
+
|
||
|
+ if (ret != 0)
|
||
|
+ return ret;
|
||
|
+ ret = ETIMEDOUT;
|
||
|
+ } else {
|
||
|
+ /* Other error */
|
||
|
+ return ret;
|
||
|
+ }
|
||
|
+ }
|
||
|
+
|
||
|
+ return ret;
|
||
|
+}
|
||
|
+
|
||
|
OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
|
||
|
gss_buffer_t buffer)
|
||
|
{
|
||
|
@@ -399,15 +658,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
|
||
|
gp_rpc_msg msg;
|
||
|
XDR xdr_call_ctx;
|
||
|
XDR xdr_reply_ctx;
|
||
|
- char buffer[MAX_RPC_SIZE];
|
||
|
- uint32_t length;
|
||
|
+ char *send_buffer = NULL;
|
||
|
+ char *recv_buffer = NULL;
|
||
|
+ uint32_t send_length;
|
||
|
+ uint32_t recv_length;
|
||
|
uint32_t xid;
|
||
|
bool xdrok;
|
||
|
bool sockgrab = false;
|
||
|
int ret;
|
||
|
|
||
|
- xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
|
||
|
- xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
|
||
|
+ send_buffer = malloc(MAX_RPC_SIZE);
|
||
|
+ if (send_buffer == NULL)
|
||
|
+ return ENOMEM;
|
||
|
+
|
||
|
+ xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
|
||
|
|
||
|
memset(&msg, 0, sizeof(gp_rpc_msg));
|
||
|
msg.header.type = GP_RPC_CALL;
|
||
|
@@ -450,22 +714,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
|
||
|
goto done;
|
||
|
}
|
||
|
|
||
|
- /* send to proxy */
|
||
|
- ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
|
||
|
- if (ret) {
|
||
|
- goto done;
|
||
|
- }
|
||
|
+ /* set send_length */
|
||
|
+ send_length = xdr_getpos(&xdr_call_ctx);
|
||
|
|
||
|
- /* receive answer */
|
||
|
- ret = gpm_recv_buffer(gpmctx, buffer, &length);
|
||
|
- if (ret) {
|
||
|
+ /* Send request, receive response with timeout */
|
||
|
+ ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
|
||
|
+ &recv_length);
|
||
|
+ if (ret)
|
||
|
goto done;
|
||
|
- }
|
||
|
|
||
|
/* release the lock */
|
||
|
gpm_release_sock(gpmctx);
|
||
|
sockgrab = false;
|
||
|
|
||
|
+ /* Create the reply context */
|
||
|
+ xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
|
||
|
+
|
||
|
/* decode header */
|
||
|
memset(&msg, 0, sizeof(gp_rpc_msg));
|
||
|
xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg);
|
||
|
@@ -489,12 +753,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
|
||
|
}
|
||
|
|
||
|
done:
|
||
|
+ gpm_timer_close(gpmctx);
|
||
|
+ gpm_epoll_close(gpmctx);
|
||
|
+
|
||
|
if (sockgrab) {
|
||
|
gpm_release_sock(gpmctx);
|
||
|
}
|
||
|
xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg);
|
||
|
xdr_destroy(&xdr_call_ctx);
|
||
|
- xdr_destroy(&xdr_reply_ctx);
|
||
|
+
|
||
|
+ if (recv_buffer != NULL)
|
||
|
+ xdr_destroy(&xdr_reply_ctx);
|
||
|
+
|
||
|
+ free(send_buffer);
|
||
|
+ free(recv_buffer);
|
||
|
+
|
||
|
return ret;
|
||
|
}
|
||
|
|