You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1029 lines
26 KiB

#include "cache.h"
#include "simple-ipc.h"
#include "strbuf.h"
#include "pkt-line.h"
#include "thread-utils.h"
#include "unix-socket.h"
#include "unix-stream-server.h"
#ifndef SUPPORTS_SIMPLE_IPC
/*
* This source file should only be compiled when Simple IPC is supported.
* See the top-level Makefile.
*/
#error SUPPORTS_SIMPLE_IPC not defined
#endif
enum ipc_active_state ipc_get_active_state(const char *path)
{
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
struct ipc_client_connect_options options
= IPC_CLIENT_CONNECT_OPTIONS_INIT;
struct stat st;
struct ipc_client_connection *connection_test = NULL;
options.wait_if_busy = 0;
options.wait_if_not_found = 0;
if (lstat(path, &st) == -1) {
switch (errno) {
case ENOENT:
case ENOTDIR:
return IPC_STATE__NOT_LISTENING;
default:
return IPC_STATE__INVALID_PATH;
}
}
#ifdef __CYGWIN__
/*
* Cygwin emulates Unix sockets by writing special-crafted files whose
* `system` bit is set.
*
* If we are too fast, Cygwin might still be in the process of marking
* the underlying file as a system file. Until then, we will not see a
* Unix socket here, but a plain file instead. Just in case that this
* is happening, wait a little and try again.
*/
{
static const int delay[] = { 1, 10, 20, 40, -1 };
int i;
for (i = 0; S_ISREG(st.st_mode) && delay[i] > 0; i++) {
sleep_millisec(delay[i]);
if (lstat(path, &st) == -1)
return IPC_STATE__INVALID_PATH;
}
}
#endif
/* also complain if a plain file is in the way */
if ((st.st_mode & S_IFMT) != S_IFSOCK)
return IPC_STATE__INVALID_PATH;
/*
* Just because the filesystem has a S_IFSOCK type inode
* at `path`, doesn't mean it that there is a server listening.
* Ping it to be sure.
*/
state = ipc_client_try_connect(path, &options, &connection_test);
ipc_client_close_connection(connection_test);
return state;
}
/*
* Retry frequency when trying to connect to a server.
*
* This value should be short enough that we don't seriously delay our
* caller, but not fast enough that our spinning puts pressure on the
* system.
*/
#define WAIT_STEP_MS (50)
/*
* Try to connect to the server. If the server is just starting up or
* is very busy, we may not get a connection the first time.
*/
static enum ipc_active_state connect_to_server(
const char *path,
int timeout_ms,
const struct ipc_client_connect_options *options,
int *pfd)
{
int k;
*pfd = -1;
for (k = 0; k < timeout_ms; k += WAIT_STEP_MS) {
int fd = unix_stream_connect(path, options->uds_disallow_chdir);
if (fd != -1) {
*pfd = fd;
return IPC_STATE__LISTENING;
}
if (errno == ENOENT) {
if (!options->wait_if_not_found)
return IPC_STATE__PATH_NOT_FOUND;
goto sleep_and_try_again;
}
if (errno == ETIMEDOUT) {
if (!options->wait_if_busy)
return IPC_STATE__NOT_LISTENING;
goto sleep_and_try_again;
}
if (errno == ECONNREFUSED) {
if (!options->wait_if_busy)
return IPC_STATE__NOT_LISTENING;
goto sleep_and_try_again;
}
return IPC_STATE__OTHER_ERROR;
sleep_and_try_again:
sleep_millisec(WAIT_STEP_MS);
}
return IPC_STATE__NOT_LISTENING;
}
/*
* The total amount of time that we are willing to wait when trying to
* connect to a server.
*
* When the server is first started, it might take a little while for
* it to become ready to service requests. Likewise, the server may
* be very (temporarily) busy and not respond to our connections.
*
* We should gracefully and silently handle those conditions and try
* again for a reasonable time period.
*
* The value chosen here should be long enough for the server
* to reliably heal from the above conditions.
*/
#define MY_CONNECTION_TIMEOUT_MS (1000)
enum ipc_active_state ipc_client_try_connect(
const char *path,
const struct ipc_client_connect_options *options,
struct ipc_client_connection **p_connection)
{
enum ipc_active_state state = IPC_STATE__OTHER_ERROR;
int fd = -1;
*p_connection = NULL;
trace2_region_enter("ipc-client", "try-connect", NULL);
trace2_data_string("ipc-client", NULL, "try-connect/path", path);
state = connect_to_server(path, MY_CONNECTION_TIMEOUT_MS,
options, &fd);
trace2_data_intmax("ipc-client", NULL, "try-connect/state",
(intmax_t)state);
trace2_region_leave("ipc-client", "try-connect", NULL);
if (state == IPC_STATE__LISTENING) {
(*p_connection) = xcalloc(1, sizeof(struct ipc_client_connection));
(*p_connection)->fd = fd;
}
return state;
}
void ipc_client_close_connection(struct ipc_client_connection *connection)
{
if (!connection)
return;
if (connection->fd != -1)
close(connection->fd);
free(connection);
}
int ipc_client_send_command_to_connection(
struct ipc_client_connection *connection,
const char *message, size_t message_len,
struct strbuf *answer)
{
int ret = 0;
strbuf_setlen(answer, 0);
trace2_region_enter("ipc-client", "send-command", NULL);
if (write_packetized_from_buf_no_flush(message, message_len,
connection->fd) < 0 ||
packet_flush_gently(connection->fd) < 0) {
ret = error(_("could not send IPC command"));
goto done;
}
if (read_packetized_to_strbuf(
connection->fd, answer,
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR) < 0) {
ret = error(_("could not read IPC response"));
goto done;
}
done:
trace2_region_leave("ipc-client", "send-command", NULL);
return ret;
}
int ipc_client_send_command(const char *path,
const struct ipc_client_connect_options *options,
const char *message, size_t message_len,
struct strbuf *answer)
{
int ret = -1;
enum ipc_active_state state;
struct ipc_client_connection *connection = NULL;
state = ipc_client_try_connect(path, options, &connection);
if (state != IPC_STATE__LISTENING)
return ret;
ret = ipc_client_send_command_to_connection(connection,
message, message_len,
answer);
ipc_client_close_connection(connection);
return ret;
}
static int set_socket_blocking_flag(int fd, int make_nonblocking)
{
int flags;
flags = fcntl(fd, F_GETFL, NULL);
if (flags < 0)
return -1;
if (make_nonblocking)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
return fcntl(fd, F_SETFL, flags);
}
/*
* Magic numbers used to annotate callback instance data.
* These are used to help guard against accidentally passing the
* wrong instance data across multiple levels of callbacks (which
* is easy to do if there are `void*` arguments).
*/
enum magic {
MAGIC_SERVER_REPLY_DATA,
MAGIC_WORKER_THREAD_DATA,
MAGIC_ACCEPT_THREAD_DATA,
MAGIC_SERVER_DATA,
};
struct ipc_server_reply_data {
enum magic magic;
int fd;
struct ipc_worker_thread_data *worker_thread_data;
};
struct ipc_worker_thread_data {
enum magic magic;
struct ipc_worker_thread_data *next_thread;
struct ipc_server_data *server_data;
pthread_t pthread_id;
};
struct ipc_accept_thread_data {
enum magic magic;
struct ipc_server_data *server_data;
struct unix_ss_socket *server_socket;
int fd_send_shutdown;
int fd_wait_shutdown;
pthread_t pthread_id;
};
/*
* With unix-sockets, the conceptual "ipc-server" is implemented as a single
* controller "accept-thread" thread and a pool of "worker-thread" threads.
* The former does the usual `accept()` loop and dispatches connections
* to an idle worker thread. The worker threads wait in an idle loop for
* a new connection, communicate with the client and relay data to/from
* the `application_cb` and then wait for another connection from the
* server thread. This avoids the overhead of constantly creating and
* destroying threads.
*/
struct ipc_server_data {
enum magic magic;
ipc_server_application_cb *application_cb;
void *application_data;
struct strbuf buf_path;
struct ipc_accept_thread_data *accept_thread;
struct ipc_worker_thread_data *worker_thread_list;
pthread_mutex_t work_available_mutex;
pthread_cond_t work_available_cond;
/*
* Accepted but not yet processed client connections are kept
* in a circular buffer FIFO. The queue is empty when the
* positions are equal.
*/
int *fifo_fds;
int queue_size;
int back_pos;
int front_pos;
int shutdown_requested;
int is_stopped;
};
/*
* Remove and return the oldest queued connection.
*
* Returns -1 if empty.
*/
static int fifo_dequeue(struct ipc_server_data *server_data)
{
/* ASSERT holding mutex */
int fd;
if (server_data->back_pos == server_data->front_pos)
return -1;
fd = server_data->fifo_fds[server_data->front_pos];
server_data->fifo_fds[server_data->front_pos] = -1;
server_data->front_pos++;
if (server_data->front_pos == server_data->queue_size)
server_data->front_pos = 0;
return fd;
}
/*
* Push a new fd onto the back of the queue.
*
* Drop it and return -1 if queue is already full.
*/
static int fifo_enqueue(struct ipc_server_data *server_data, int fd)
{
/* ASSERT holding mutex */
int next_back_pos;
next_back_pos = server_data->back_pos + 1;
if (next_back_pos == server_data->queue_size)
next_back_pos = 0;
if (next_back_pos == server_data->front_pos) {
/* Queue is full. Just drop it. */
close(fd);
return -1;
}
server_data->fifo_fds[server_data->back_pos] = fd;
server_data->back_pos = next_back_pos;
return fd;
}
/*
* Wait for a connection to be queued to the FIFO and return it.
*
* Returns -1 if someone has already requested a shutdown.
*/
static int worker_thread__wait_for_connection(
struct ipc_worker_thread_data *worker_thread_data)
{
/* ASSERT NOT holding mutex */
struct ipc_server_data *server_data = worker_thread_data->server_data;
int fd = -1;
pthread_mutex_lock(&server_data->work_available_mutex);
for (;;) {
if (server_data->shutdown_requested)
break;
fd = fifo_dequeue(server_data);
if (fd >= 0)
break;
pthread_cond_wait(&server_data->work_available_cond,
&server_data->work_available_mutex);
}
pthread_mutex_unlock(&server_data->work_available_mutex);
return fd;
}
/*
* Forward declare our reply callback function so that any compiler
* errors are reported when we actually define the function (in addition
* to any errors reported when we try to pass this callback function as
* a parameter in a function call). The former are easier to understand.
*/
static ipc_server_reply_cb do_io_reply_callback;
/*
* Relay application's response message to the client process.
* (We do not flush at this point because we allow the caller
* to chunk data to the client thru us.)
*/
static int do_io_reply_callback(struct ipc_server_reply_data *reply_data,
const char *response, size_t response_len)
{
if (reply_data->magic != MAGIC_SERVER_REPLY_DATA)
BUG("reply_cb called with wrong instance data");
return write_packetized_from_buf_no_flush(response, response_len,
reply_data->fd);
}
/* A randomly chosen value. */
#define MY_WAIT_POLL_TIMEOUT_MS (10)
/*
* If the client hangs up without sending any data on the wire, just
* quietly close the socket and ignore this client.
*
* This worker thread is committed to reading the IPC request data
* from the client at the other end of this fd. Wait here for the
* client to actually put something on the wire -- because if the
* client just does a ping (connect and hangup without sending any
* data), our use of the pkt-line read routines will spew an error
* message.
*
* Return -1 if the client hung up.
* Return 0 if data (possibly incomplete) is ready.
*/
static int worker_thread__wait_for_io_start(
struct ipc_worker_thread_data *worker_thread_data,
int fd)
{
struct ipc_server_data *server_data = worker_thread_data->server_data;
struct pollfd pollfd[1];
int result;
for (;;) {
pollfd[0].fd = fd;
pollfd[0].events = POLLIN;
result = poll(pollfd, 1, MY_WAIT_POLL_TIMEOUT_MS);
if (result < 0) {
if (errno == EINTR)
continue;
goto cleanup;
}
if (result == 0) {
/* a timeout */
int in_shutdown;
pthread_mutex_lock(&server_data->work_available_mutex);
in_shutdown = server_data->shutdown_requested;
pthread_mutex_unlock(&server_data->work_available_mutex);
/*
* If a shutdown is already in progress and this
* client has not started talking yet, just drop it.
*/
if (in_shutdown)
goto cleanup;
continue;
}
if (pollfd[0].revents & POLLHUP)
goto cleanup;
if (pollfd[0].revents & POLLIN)
return 0;
goto cleanup;
}
cleanup:
close(fd);
return -1;
}
/*
* Receive the request/command from the client and pass it to the
* registered request-callback. The request-callback will compose
* a response and call our reply-callback to send it to the client.
*/
static int worker_thread__do_io(
struct ipc_worker_thread_data *worker_thread_data,
int fd)
{
/* ASSERT NOT holding lock */
struct strbuf buf = STRBUF_INIT;
struct ipc_server_reply_data reply_data;
int ret = 0;
reply_data.magic = MAGIC_SERVER_REPLY_DATA;
reply_data.worker_thread_data = worker_thread_data;
reply_data.fd = fd;
ret = read_packetized_to_strbuf(
reply_data.fd, &buf,
PACKET_READ_GENTLE_ON_EOF | PACKET_READ_GENTLE_ON_READ_ERROR);
if (ret >= 0) {
ret = worker_thread_data->server_data->application_cb(
worker_thread_data->server_data->application_data,
buf.buf, buf.len, do_io_reply_callback, &reply_data);
packet_flush_gently(reply_data.fd);
}
else {
/*
* The client probably disconnected/shutdown before it
* could send a well-formed message. Ignore it.
*/
}
strbuf_release(&buf);
close(reply_data.fd);
return ret;
}
/*
* Block SIGPIPE on the current thread (so that we get EPIPE from
* write() rather than an actual signal).
*
* Note that using sigchain_push() and _pop() to control SIGPIPE
* around our IO calls is not thread safe:
* [] It uses a global stack of handler frames.
* [] It uses ALLOC_GROW() to resize it.
* [] Finally, according to the `signal(2)` man-page:
* "The effects of `signal()` in a multithreaded process are unspecified."
*/
static void thread_block_sigpipe(sigset_t *old_set)
{
sigset_t new_set;
sigemptyset(&new_set);
sigaddset(&new_set, SIGPIPE);
sigemptyset(old_set);
pthread_sigmask(SIG_BLOCK, &new_set, old_set);
}
/*
* Thread proc for an IPC worker thread. It handles a series of
* connections from clients. It pulls the next fd from the queue
* processes it, and then waits for the next client.
*
* Block SIGPIPE in this worker thread for the life of the thread.
* This avoids stray (and sometimes delayed) SIGPIPE signals caused
* by client errors and/or when we are under extremely heavy IO load.
*
* This means that the application callback will have SIGPIPE blocked.
* The callback should not change it.
*/
static void *worker_thread_proc(void *_worker_thread_data)
{
struct ipc_worker_thread_data *worker_thread_data = _worker_thread_data;
struct ipc_server_data *server_data = worker_thread_data->server_data;
sigset_t old_set;
int fd, io;
int ret;
trace2_thread_start("ipc-worker");
thread_block_sigpipe(&old_set);
for (;;) {
fd = worker_thread__wait_for_connection(worker_thread_data);
if (fd == -1)
break; /* in shutdown */
io = worker_thread__wait_for_io_start(worker_thread_data, fd);
if (io == -1)
continue; /* client hung up without sending anything */
ret = worker_thread__do_io(worker_thread_data, fd);
if (ret == SIMPLE_IPC_QUIT) {
trace2_data_string("ipc-worker", NULL, "queue_stop_async",
"application_quit");
/*
* The application layer is telling the ipc-server
* layer to shutdown.
*
* We DO NOT have a response to send to the client.
*
* Queue an async stop (to stop the other threads) and
* allow this worker thread to exit now (no sense waiting
* for the thread-pool shutdown signal).
*
* Other non-idle worker threads are allowed to finish
* responding to their current clients.
*/
ipc_server_stop_async(server_data);
break;
}
}
trace2_thread_exit();
return NULL;
}
/* A randomly chosen value. */
#define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
/*
* Accept a new client connection on our socket. This uses non-blocking
* IO so that we can also wait for shutdown requests on our socket-pair
* without actually spinning on a fast timeout.
*/
static int accept_thread__wait_for_connection(
struct ipc_accept_thread_data *accept_thread_data)
{
struct pollfd pollfd[2];
int result;
for (;;) {
pollfd[0].fd = accept_thread_data->fd_wait_shutdown;
pollfd[0].events = POLLIN;
pollfd[1].fd = accept_thread_data->server_socket->fd_socket;
pollfd[1].events = POLLIN;
result = poll(pollfd, 2, MY_ACCEPT_POLL_TIMEOUT_MS);
if (result < 0) {
if (errno == EINTR)
continue;
return result;
}
if (result == 0) {
/* a timeout */
/*
* If someone deletes or force-creates a new unix
* domain socket at our path, all future clients
* will be routed elsewhere and we silently starve.
* If that happens, just queue a shutdown.
*/
if (unix_ss_was_stolen(
accept_thread_data->server_socket)) {
trace2_data_string("ipc-accept", NULL,
"queue_stop_async",
"socket_stolen");
ipc_server_stop_async(
accept_thread_data->server_data);
}
continue;
}
if (pollfd[0].revents & POLLIN) {
/* shutdown message queued to socketpair */
return -1;
}
if (pollfd[1].revents & POLLIN) {
/* a connection is available on server_socket */
int client_fd =
accept(accept_thread_data->server_socket->fd_socket,
NULL, NULL);
if (client_fd >= 0)
return client_fd;
/*
* An error here is unlikely -- it probably
* indicates that the connecting process has
* already dropped the connection.
*/
continue;
}
BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
errno, pollfd[0].revents, pollfd[1].revents);
}
}
/*
* Thread proc for the IPC server "accept thread". This waits for
* an incoming socket connection, appends it to the queue of available
* connections, and notifies a worker thread to process it.
*
* Block SIGPIPE in this thread for the life of the thread. This
* avoids any stray SIGPIPE signals when closing pipe fds under
* extremely heavy loads (such as when the fifo queue is full and we
* drop incomming connections).
*/
static void *accept_thread_proc(void *_accept_thread_data)
{
struct ipc_accept_thread_data *accept_thread_data = _accept_thread_data;
struct ipc_server_data *server_data = accept_thread_data->server_data;
sigset_t old_set;
trace2_thread_start("ipc-accept");
thread_block_sigpipe(&old_set);
for (;;) {
int client_fd = accept_thread__wait_for_connection(
accept_thread_data);
pthread_mutex_lock(&server_data->work_available_mutex);
if (server_data->shutdown_requested) {
pthread_mutex_unlock(&server_data->work_available_mutex);
if (client_fd >= 0)
close(client_fd);
break;
}
if (client_fd < 0) {
/* ignore transient accept() errors */
}
else {
fifo_enqueue(server_data, client_fd);
pthread_cond_broadcast(&server_data->work_available_cond);
}
pthread_mutex_unlock(&server_data->work_available_mutex);
}
trace2_thread_exit();
return NULL;
}
/*
* We can't predict the connection arrival rate relative to the worker
* processing rate, therefore we allow the "accept-thread" to queue up
* a generous number of connections, since we'd rather have the client
* not unnecessarily timeout if we can avoid it. (The assumption is
* that this will be used for FSMonitor and a few second wait on a
* connection is better than having the client timeout and do the full
* computation itself.)
*
* The FIFO queue size is set to a multiple of the worker pool size.
* This value chosen at random.
*/
#define FIFO_SCALE (100)
/*
* The backlog value for `listen(2)`. This doesn't need to huge,
* rather just large enough for our "accept-thread" to wake up and
* queue incoming connections onto the FIFO without the kernel
* dropping any.
*
* This value chosen at random.
*/
#define LISTEN_BACKLOG (50)
static int create_listener_socket(
const char *path,
const struct ipc_server_opts *ipc_opts,
struct unix_ss_socket **new_server_socket)
{
struct unix_ss_socket *server_socket = NULL;
struct unix_stream_listen_opts uslg_opts = UNIX_STREAM_LISTEN_OPTS_INIT;
int ret;
uslg_opts.listen_backlog_size = LISTEN_BACKLOG;
uslg_opts.disallow_chdir = ipc_opts->uds_disallow_chdir;
ret = unix_ss_create(path, &uslg_opts, -1, &server_socket);
if (ret)
return ret;
if (set_socket_blocking_flag(server_socket->fd_socket, 1)) {
int saved_errno = errno;
unix_ss_free(server_socket);
errno = saved_errno;
return -1;
}
*new_server_socket = server_socket;
trace2_data_string("ipc-server", NULL, "listen-with-lock", path);
return 0;
}
static int setup_listener_socket(
const char *path,
const struct ipc_server_opts *ipc_opts,
struct unix_ss_socket **new_server_socket)
{
int ret, saved_errno;
trace2_region_enter("ipc-server", "create-listener_socket", NULL);
ret = create_listener_socket(path, ipc_opts, new_server_socket);
saved_errno = errno;
trace2_region_leave("ipc-server", "create-listener_socket", NULL);
errno = saved_errno;
return ret;
}
/*
* Start IPC server in a pool of background threads.
*/
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
{
struct unix_ss_socket *server_socket = NULL;
struct ipc_server_data *server_data;
int sv[2];
int k;
int ret;
int nr_threads = opts->nr_threads;
*returned_server_data = NULL;
/*
* Create a socketpair and set sv[1] to non-blocking. This
* will used to send a shutdown message to the accept-thread
* and allows the accept-thread to wait on EITHER a client
* connection or a shutdown request without spinning.
*/
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
return -1;
if (set_socket_blocking_flag(sv[1], 1)) {
int saved_errno = errno;
close(sv[0]);
close(sv[1]);
errno = saved_errno;
return -1;
}
ret = setup_listener_socket(path, opts, &server_socket);
if (ret) {
int saved_errno = errno;
close(sv[0]);
close(sv[1]);
errno = saved_errno;
return ret;
}
server_data = xcalloc(1, sizeof(*server_data));
server_data->magic = MAGIC_SERVER_DATA;
server_data->application_cb = application_cb;
server_data->application_data = application_data;
strbuf_init(&server_data->buf_path, 0);
strbuf_addstr(&server_data->buf_path, path);
if (nr_threads < 1)
nr_threads = 1;
pthread_mutex_init(&server_data->work_available_mutex, NULL);
pthread_cond_init(&server_data->work_available_cond, NULL);
server_data->queue_size = nr_threads * FIFO_SCALE;
CALLOC_ARRAY(server_data->fifo_fds, server_data->queue_size);
server_data->accept_thread =
xcalloc(1, sizeof(*server_data->accept_thread));
server_data->accept_thread->magic = MAGIC_ACCEPT_THREAD_DATA;
server_data->accept_thread->server_data = server_data;
server_data->accept_thread->server_socket = server_socket;
server_data->accept_thread->fd_send_shutdown = sv[0];
server_data->accept_thread->fd_wait_shutdown = sv[1];
if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
accept_thread_proc, server_data->accept_thread))
die_errno(_("could not start accept_thread '%s'"), path);
for (k = 0; k < nr_threads; k++) {
struct ipc_worker_thread_data *wtd;
wtd = xcalloc(1, sizeof(*wtd));
wtd->magic = MAGIC_WORKER_THREAD_DATA;
wtd->server_data = server_data;
if (pthread_create(&wtd->pthread_id, NULL, worker_thread_proc,
wtd)) {
if (k == 0)
die(_("could not start worker[0] for '%s'"),
path);
/*
* Limp along with the thread pool that we have.
*/
break;
}
wtd->next_thread = server_data->worker_thread_list;
server_data->worker_thread_list = wtd;
}
*returned_server_data = server_data;
return 0;
}
/*
* Gently tell the IPC server treads to shutdown.
* Can be run on any thread.
*/
int ipc_server_stop_async(struct ipc_server_data *server_data)
{
/* ASSERT NOT holding mutex */
int fd;
if (!server_data)
return 0;
trace2_region_enter("ipc-server", "server-stop-async", NULL);
pthread_mutex_lock(&server_data->work_available_mutex);
server_data->shutdown_requested = 1;
/*
* Write a byte to the shutdown socket pair to wake up the
* accept-thread.
*/
if (write(server_data->accept_thread->fd_send_shutdown, "Q", 1) < 0)
error_errno("could not write to fd_send_shutdown");
/*
* Drain the queue of existing connections.
*/
while ((fd = fifo_dequeue(server_data)) != -1)
close(fd);
/*
* Gently tell worker threads to stop processing new connections
* and exit. (This does not abort in-process conversations.)
*/
pthread_cond_broadcast(&server_data->work_available_cond);
pthread_mutex_unlock(&server_data->work_available_mutex);
trace2_region_leave("ipc-server", "server-stop-async", NULL);
return 0;
}
/*
* Wait for all IPC server threads to stop.
*/
int ipc_server_await(struct ipc_server_data *server_data)
{
pthread_join(server_data->accept_thread->pthread_id, NULL);
if (!server_data->shutdown_requested)
BUG("ipc-server: accept-thread stopped for '%s'",
server_data->buf_path.buf);
while (server_data->worker_thread_list) {
struct ipc_worker_thread_data *wtd =
server_data->worker_thread_list;
pthread_join(wtd->pthread_id, NULL);
server_data->worker_thread_list = wtd->next_thread;
free(wtd);
}
server_data->is_stopped = 1;
return 0;
}
void ipc_server_free(struct ipc_server_data *server_data)
{
struct ipc_accept_thread_data * accept_thread_data;
if (!server_data)
return;
if (!server_data->is_stopped)
BUG("cannot free ipc-server while running for '%s'",
server_data->buf_path.buf);
accept_thread_data = server_data->accept_thread;
if (accept_thread_data) {
unix_ss_free(accept_thread_data->server_socket);
if (accept_thread_data->fd_send_shutdown != -1)
close(accept_thread_data->fd_send_shutdown);
if (accept_thread_data->fd_wait_shutdown != -1)
close(accept_thread_data->fd_wait_shutdown);
free(server_data->accept_thread);
}
while (server_data->worker_thread_list) {
struct ipc_worker_thread_data *wtd =
server_data->worker_thread_list;
server_data->worker_thread_list = wtd->next_thread;
free(wtd);
}
pthread_cond_destroy(&server_data->work_available_cond);
pthread_mutex_destroy(&server_data->work_available_mutex);
strbuf_release(&server_data->buf_path);
free(server_data->fifo_fds);
free(server_data);
}