You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
6.8 KiB

From 47ac92420da9ecbffaf3aa0046d170be358639a2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl>
Date: Fri, 13 Mar 2015 00:02:28 -0400
Subject: [PATCH] journal-remote: process events without delay
journal-remote buffers input, and then parses it handling one journal entry at a time.
It was possible for useful data to be left in the buffer after some entries were
processesed. But all data would be already read from the fd, so there would be
no reason for the event loop to call the handler again. After some new data came in,
the handler would be called again, and would then process the "old" data in the buffer.
Fix this by enabling a handler wherever we process input data and do not exhaust data
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until
we encounter EAGAIN.
Looping over the input data is done in this roundabout way to allow the event loop
to dispatch other events in the meanwhile. If the loop was inside the handler, a
source which produced data fast enough could completely monopolize the process.
https://bugs.freedesktop.org/show_bug.cgi?id=89516
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b)
---
src/journal-remote/journal-remote-parse.c | 1 +
src/journal-remote/journal-remote-parse.h | 1 +
src/journal-remote/journal-remote.c | 65 ++++++++++++++++++++---
3 files changed, 59 insertions(+), 8 deletions(-)
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
index 6c096de03a..7e62954351 100644
--- a/src/journal-remote/journal-remote-parse.c
+++ b/src/journal-remote/journal-remote-parse.c
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
writer_unref(source->writer);
sd_event_source_unref(source->event);
+ sd_event_source_unref(source->buffer_event);
free(source);
}
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
index 22db550913..06a50296a1 100644
--- a/src/journal-remote/journal-remote-parse.h
+++ b/src/journal-remote/journal-remote-parse.h
@@ -54,6 +54,7 @@ typedef struct RemoteSource {
Writer *writer;
sd_event_source *event;
+ sd_event_source *buffer_event;
} RemoteSource;
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
index d1486e7cda..b7cc6d7172 100644
--- a/src/journal-remote/journal-remote.c
+++ b/src/journal-remote/journal-remote.c
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event,
int fd,
uint32_t revents,
void *userdata);
+static int dispatch_raw_source_until_block(sd_event_source *event,
+ void *userdata);
static int dispatch_blocking_source_event(sd_event_source *event,
void *userdata);
static int dispatch_raw_connection_event(sd_event_source *event,
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
r = sd_event_add_io(s->events, &source->event,
fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
- dispatch_raw_source_event, s);
- if (r == -EPERM) {
+ dispatch_raw_source_event, source);
+ if (r == 0) {
+ /* Add additional source for buffer processing. It will be
+ * enabled later. */
+ r = sd_event_add_defer(s->events, &source->buffer_event,
+ dispatch_raw_source_until_block, source);
+ if (r == 0)
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
+ } else if (r == -EPERM) {
log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
r = sd_event_add_defer(s->events, &source->event,
dispatch_blocking_source_event, source);
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) {
**********************************************************************
**********************************************************************/
-static int dispatch_raw_source_event(sd_event_source *event,
- int fd,
- uint32_t revents,
- void *userdata) {
+static int handle_raw_source(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ RemoteServer *s) {
- RemoteServer *s = userdata;
RemoteSource *source;
int r;
+ /* Returns 1 if there might be more data pending,
+ * 0 if data is currently exhausted, negative on error.
+ */
+
assert(fd >= 0 && fd < (ssize_t) s->sources_size);
source = s->sources[fd];
assert(source->fd == fd);
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event,
return 1;
}
+static int dispatch_raw_source_until_block(sd_event_source *event,
+ void *userdata) {
+ RemoteSource *source = userdata;
+ int r;
+
+ /* Make sure event stays around even if source is destroyed */
+ sd_event_source_ref(event);
+
+ r = handle_raw_source(event, source->fd, EPOLLIN, server);
+ if (r != 1)
+ /* No more data for now */
+ sd_event_source_set_enabled(event, SD_EVENT_OFF);
+
+ sd_event_source_unref(event);
+
+ return r;
+}
+
+static int dispatch_raw_source_event(sd_event_source *event,
+ int fd,
+ uint32_t revents,
+ void *userdata) {
+ RemoteSource *source = userdata;
+ int r;
+
+ assert(source->event);
+ assert(source->buffer_event);
+
+ r = handle_raw_source(event, fd, EPOLLIN, server);
+ if (r == 1)
+ /* Might have more data. We need to rerun the handler
+ * until we are sure the buffer is exhausted. */
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
+
+ return r;
+}
+
static int dispatch_blocking_source_event(sd_event_source *event,
void *userdata) {
RemoteSource *source = userdata;
- return dispatch_raw_source_event(event, source->fd, EPOLLIN, server);
+ return handle_raw_source(event, source->fd, EPOLLIN, server);
}
static int accept_connection(const char* type, int fd,