You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
156 lines
6.8 KiB
156 lines
6.8 KiB
From 47ac92420da9ecbffaf3aa0046d170be358639a2 Mon Sep 17 00:00:00 2001 |
|
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl> |
|
Date: Fri, 13 Mar 2015 00:02:28 -0400 |
|
Subject: [PATCH] journal-remote: process events without delay |
|
|
|
journal-remote buffers input, and then parses it handling one journal entry at a time. |
|
It was possible for useful data to be left in the buffer after some entries were |
|
processesed. But all data would be already read from the fd, so there would be |
|
no reason for the event loop to call the handler again. After some new data came in, |
|
the handler would be called again, and would then process the "old" data in the buffer. |
|
|
|
Fix this by enabling a handler wherever we process input data and do not exhaust data |
|
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until |
|
we encounter EAGAIN. |
|
|
|
Looping over the input data is done in this roundabout way to allow the event loop |
|
to dispatch other events in the meanwhile. If the loop was inside the handler, a |
|
source which produced data fast enough could completely monopolize the process. |
|
|
|
https://bugs.freedesktop.org/show_bug.cgi?id=89516 |
|
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b) |
|
--- |
|
src/journal-remote/journal-remote-parse.c | 1 + |
|
src/journal-remote/journal-remote-parse.h | 1 + |
|
src/journal-remote/journal-remote.c | 65 +++++++++++++++++++++++++++---- |
|
3 files changed, 59 insertions(+), 8 deletions(-) |
|
|
|
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c |
|
index 6c096de03..7e6295435 100644 |
|
--- a/src/journal-remote/journal-remote-parse.c |
|
+++ b/src/journal-remote/journal-remote-parse.c |
|
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) { |
|
writer_unref(source->writer); |
|
|
|
sd_event_source_unref(source->event); |
|
+ sd_event_source_unref(source->buffer_event); |
|
|
|
free(source); |
|
} |
|
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h |
|
index 22db55091..06a50296a 100644 |
|
--- a/src/journal-remote/journal-remote-parse.h |
|
+++ b/src/journal-remote/journal-remote-parse.h |
|
@@ -54,6 +54,7 @@ typedef struct RemoteSource { |
|
Writer *writer; |
|
|
|
sd_event_source *event; |
|
+ sd_event_source *buffer_event; |
|
} RemoteSource; |
|
|
|
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer); |
|
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c |
|
index d1486e7cd..b7cc6d717 100644 |
|
--- a/src/journal-remote/journal-remote.c |
|
+++ b/src/journal-remote/journal-remote.c |
|
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event, |
|
int fd, |
|
uint32_t revents, |
|
void *userdata); |
|
+static int dispatch_raw_source_until_block(sd_event_source *event, |
|
+ void *userdata); |
|
static int dispatch_blocking_source_event(sd_event_source *event, |
|
void *userdata); |
|
static int dispatch_raw_connection_event(sd_event_source *event, |
|
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) { |
|
|
|
r = sd_event_add_io(s->events, &source->event, |
|
fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI, |
|
- dispatch_raw_source_event, s); |
|
- if (r == -EPERM) { |
|
+ dispatch_raw_source_event, source); |
|
+ if (r == 0) { |
|
+ /* Add additional source for buffer processing. It will be |
|
+ * enabled later. */ |
|
+ r = sd_event_add_defer(s->events, &source->buffer_event, |
|
+ dispatch_raw_source_until_block, source); |
|
+ if (r == 0) |
|
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF); |
|
+ } else if (r == -EPERM) { |
|
log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name); |
|
r = sd_event_add_defer(s->events, &source->event, |
|
dispatch_blocking_source_event, source); |
|
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) { |
|
********************************************************************** |
|
**********************************************************************/ |
|
|
|
-static int dispatch_raw_source_event(sd_event_source *event, |
|
- int fd, |
|
- uint32_t revents, |
|
- void *userdata) { |
|
+static int handle_raw_source(sd_event_source *event, |
|
+ int fd, |
|
+ uint32_t revents, |
|
+ RemoteServer *s) { |
|
|
|
- RemoteServer *s = userdata; |
|
RemoteSource *source; |
|
int r; |
|
|
|
+ /* Returns 1 if there might be more data pending, |
|
+ * 0 if data is currently exhausted, negative on error. |
|
+ */ |
|
+ |
|
assert(fd >= 0 && fd < (ssize_t) s->sources_size); |
|
source = s->sources[fd]; |
|
assert(source->fd == fd); |
|
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event, |
|
return 1; |
|
} |
|
|
|
+static int dispatch_raw_source_until_block(sd_event_source *event, |
|
+ void *userdata) { |
|
+ RemoteSource *source = userdata; |
|
+ int r; |
|
+ |
|
+ /* Make sure event stays around even if source is destroyed */ |
|
+ sd_event_source_ref(event); |
|
+ |
|
+ r = handle_raw_source(event, source->fd, EPOLLIN, server); |
|
+ if (r != 1) |
|
+ /* No more data for now */ |
|
+ sd_event_source_set_enabled(event, SD_EVENT_OFF); |
|
+ |
|
+ sd_event_source_unref(event); |
|
+ |
|
+ return r; |
|
+} |
|
+ |
|
+static int dispatch_raw_source_event(sd_event_source *event, |
|
+ int fd, |
|
+ uint32_t revents, |
|
+ void *userdata) { |
|
+ RemoteSource *source = userdata; |
|
+ int r; |
|
+ |
|
+ assert(source->event); |
|
+ assert(source->buffer_event); |
|
+ |
|
+ r = handle_raw_source(event, fd, EPOLLIN, server); |
|
+ if (r == 1) |
|
+ /* Might have more data. We need to rerun the handler |
|
+ * until we are sure the buffer is exhausted. */ |
|
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON); |
|
+ |
|
+ return r; |
|
+} |
|
+ |
|
static int dispatch_blocking_source_event(sd_event_source *event, |
|
void *userdata) { |
|
RemoteSource *source = userdata; |
|
|
|
- return dispatch_raw_source_event(event, source->fd, EPOLLIN, server); |
|
+ return handle_raw_source(event, source->fd, EPOLLIN, server); |
|
} |
|
|
|
static int accept_connection(const char* type, int fd,
|
|
|