From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Wed, 19 Apr 2017 16:24:54 +0100 Subject: [spice-server] stream-device: Limit sending queue from guest to server Do not allow the guest to fill host memory. Also having a huge queue mainly cause to have a higher video latency. Signed-off-by: Frediano Ziglio Acked-by: Jonathon Jongsma --- server/stream-channel.c | 41 ++++++++++++++++++++++++++++++++++++++++- server/stream-channel.h | 10 ++++++++++ server/stream-device.c | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 2 deletions(-) diff --git a/server/stream-channel.c b/server/stream-channel.c index 51b8badf9..ec4bf021d 100644 --- a/server/stream-channel.c +++ b/server/stream-channel.c @@ -71,9 +71,15 @@ struct StreamChannel { /* size of the current video stream */ unsigned width, height; + StreamQueueStat queue_stat; + /* callback to notify when a stream should be started or stopped */ stream_channel_start_proc start_cb; void *start_opaque; + + /* callback to notify when queue statistics changes */ + stream_channel_queue_stat_proc queue_cb; + void *queue_opaque; }; struct StreamChannelClass { @@ -98,6 +104,7 @@ typedef struct StreamCreateItem { typedef struct StreamDataItem { RedPipeItem base; + StreamChannel *channel; // NOTE: this must be the last field in the structure SpiceMsgDisplayStreamData data; } StreamDataItem; @@ -454,6 +461,27 @@ stream_channel_change_format(StreamChannel *channel, const StreamMsgFormat *fmt) red_channel_pipes_add(red_channel, &item->base); } +static inline void +stream_channel_update_queue_stat(StreamChannel *channel, + int32_t num_diff, int32_t size_diff) +{ + channel->queue_stat.num_items += num_diff; + channel->queue_stat.size += size_diff; + if (channel->queue_cb) { + channel->queue_cb(channel->queue_opaque, &channel->queue_stat, channel); + } +} + +static void +data_item_free(RedPipeItem *base) +{ + StreamDataItem *pipe_item = SPICE_UPCAST(StreamDataItem, base); + + stream_channel_update_queue_stat(pipe_item->channel, -1, -pipe_item->data.data_size); + + g_free(pipe_item); +} + void stream_channel_send_data(StreamChannel *channel, const void *data, size_t size, uint32_t mm_time) { @@ -467,10 +495,13 @@ stream_channel_send_data(StreamChannel *channel, const void *data, size_t size, RedChannel *red_channel = RED_CHANNEL(channel); StreamDataItem *item = g_malloc(sizeof(*item) + size); - red_pipe_item_init(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA); + red_pipe_item_init_full(&item->base, RED_PIPE_ITEM_TYPE_STREAM_DATA, + data_item_free); item->data.base.id = channel->stream_id; item->data.base.multi_media_time = mm_time; item->data.data_size = size; + item->channel = channel; + stream_channel_update_queue_stat(channel, 1, size); // TODO try to optimize avoiding the copy memcpy(item->data.data, data, size); red_channel_pipes_add(red_channel, &item->base); @@ -485,6 +516,14 @@ stream_channel_register_start_cb(StreamChannel *channel, } void +stream_channel_register_queue_stat_cb(StreamChannel *channel, + stream_channel_queue_stat_proc cb, void *opaque) +{ + channel->queue_cb = cb; + channel->queue_opaque = opaque; +} + +void stream_channel_reset(StreamChannel *channel) { struct { diff --git a/server/stream-channel.h b/server/stream-channel.h index bd075a951..f961d7157 100644 --- a/server/stream-channel.h +++ b/server/stream-channel.h @@ -67,6 +67,16 @@ typedef void (*stream_channel_start_proc)(void *opaque, struct StreamMsgStartSto void stream_channel_register_start_cb(StreamChannel *channel, stream_channel_start_proc cb, void *opaque); +typedef struct StreamQueueStat { + uint32_t num_items; + uint32_t size; +} StreamQueueStat; + +typedef void (*stream_channel_queue_stat_proc)(void *opaque, const StreamQueueStat *stats, + StreamChannel *channel); +void stream_channel_register_queue_stat_cb(StreamChannel *channel, + stream_channel_queue_stat_proc cb, void *opaque); + G_END_DECLS #endif /* STREAM_CHANNEL_H_ */ diff --git a/server/stream-device.c b/server/stream-device.c index ae108788b..f87538d49 100644 --- a/server/stream-device.c +++ b/server/stream-device.c @@ -44,6 +44,7 @@ struct StreamDevice { uint8_t hdr_pos; bool has_error; bool opened; + bool flow_stopped; StreamChannel *stream_channel; }; @@ -72,7 +73,7 @@ stream_device_read_msg_from_dev(RedCharDevice *self, SpiceCharDeviceInstance *si int n; bool handled = false; - if (dev->has_error || !dev->stream_channel) { + if (dev->has_error || dev->flow_stopped || !dev->stream_channel) { return NULL; } @@ -181,6 +182,9 @@ handle_msg_data(StreamDevice *dev, SpiceCharDeviceInstance *sin) if (n <= 0) { break; } + // TODO collect all message ?? + // up: we send a single frame together + // down: guest can cause a crash stream_channel_send_data(dev->stream_channel, buf, n, reds_get_mm_time()); dev->hdr.size -= n; } @@ -233,6 +237,33 @@ stream_device_stream_start(void *opaque, StreamMsgStartStop *start, red_char_device_write_buffer_add(char_dev, buf); } +static void +stream_device_stream_queue_stat(void *opaque, const StreamQueueStat *stats G_GNUC_UNUSED, + StreamChannel *stream_channel G_GNUC_UNUSED) +{ + StreamDevice *dev = (StreamDevice *) opaque; + + if (!dev->opened) { + return; + } + + // very easy control flow... if any data stop + // this seems a very small queue but as we use tcp + // there's already that queue + if (stats->num_items) { + dev->flow_stopped = true; + return; + } + + if (dev->flow_stopped) { + dev->flow_stopped = false; + // TODO resume flow... + // avoid recursion if we need to call get data from data handling from + // data handling + red_char_device_wakeup(&dev->parent); + } +} + RedCharDevice * stream_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin) { @@ -277,6 +308,7 @@ allocate_channels(StreamDevice *dev) dev->stream_channel = stream_channel; stream_channel_register_start_cb(stream_channel, stream_device_stream_start, dev); + stream_channel_register_queue_stat_cb(stream_channel, stream_device_stream_queue_stat, dev); } static void @@ -303,6 +335,7 @@ stream_device_port_event(RedCharDevice *char_dev, uint8_t event) } dev->hdr_pos = 0; dev->has_error = false; + dev->flow_stopped = false; red_char_device_reset(char_dev); reset_channels(dev); }