mirror of
https://github.com/varun-r-mallya/sysprof.git
synced 2025-12-31 20:36:25 +00:00
libsysprof-capture: remove framing data from MappedRingBuffer
This removes the 8 bytes of framing data from the MappedRingBuffer which means we can write more data without racing. But also this means that we can eventually use the mapped ring buffer as our normal buffer for capture writing (to be done later).
This commit is contained in:
@ -40,18 +40,6 @@ enum {
|
|||||||
MODE_WRITER = 2,
|
MODE_WRITER = 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
|
||||||
* MappedRingFrame is the header on each buffer entry so that
|
|
||||||
* we can stay 8-byte aligned.
|
|
||||||
*/
|
|
||||||
typedef struct _MappedRingFrame
|
|
||||||
{
|
|
||||||
guint64 len : 32;
|
|
||||||
guint64 padding : 32;
|
|
||||||
} MappedRingFrame;
|
|
||||||
|
|
||||||
G_STATIC_ASSERT (sizeof (MappedRingFrame) == 8);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MappedRingHeader is the header of the first page of the
|
* MappedRingHeader is the header of the first page of the
|
||||||
* buffer. We use the whole buffer so that we can double map
|
* buffer. We use the whole buffer so that we can double map
|
||||||
@ -90,7 +78,7 @@ static inline gpointer
|
|||||||
get_body_at_pos (MappedRingBuffer *self,
|
get_body_at_pos (MappedRingBuffer *self,
|
||||||
gsize pos)
|
gsize pos)
|
||||||
{
|
{
|
||||||
g_assert (pos < (self->body_size + self->body_size - sizeof (MappedRingFrame)));
|
g_assert (pos < (self->body_size + self->body_size));
|
||||||
|
|
||||||
return (guint8 *)self->map + self->page_size + pos;
|
return (guint8 *)self->map + self->page_size + pos;
|
||||||
}
|
}
|
||||||
@ -380,8 +368,6 @@ mapped_ring_buffer_allocate (MappedRingBuffer *self,
|
|||||||
g_return_val_if_fail (length < self->body_size, NULL);
|
g_return_val_if_fail (length < self->body_size, NULL);
|
||||||
g_return_val_if_fail ((length & 0x7) == 0, NULL);
|
g_return_val_if_fail ((length & 0x7) == 0, NULL);
|
||||||
|
|
||||||
length += sizeof (MappedRingFrame);
|
|
||||||
|
|
||||||
header = get_header (self);
|
header = get_header (self);
|
||||||
headpos = g_atomic_int_get (&header->head);
|
headpos = g_atomic_int_get (&header->head);
|
||||||
tailpos = g_atomic_int_get (&header->tail);
|
tailpos = g_atomic_int_get (&header->tail);
|
||||||
@ -396,13 +382,13 @@ mapped_ring_buffer_allocate (MappedRingBuffer *self,
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
if (tailpos == headpos)
|
if (tailpos == headpos)
|
||||||
return get_body_at_pos (self, tailpos + sizeof (MappedRingFrame));
|
return get_body_at_pos (self, tailpos);
|
||||||
|
|
||||||
if (headpos < tailpos)
|
if (headpos < tailpos)
|
||||||
headpos += self->body_size;
|
headpos += self->body_size;
|
||||||
|
|
||||||
if (tailpos + length < headpos)
|
if (tailpos + length < headpos)
|
||||||
return get_body_at_pos (self, tailpos + sizeof (MappedRingFrame));
|
return get_body_at_pos (self, tailpos);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -430,7 +416,6 @@ mapped_ring_buffer_advance (MappedRingBuffer *self,
|
|||||||
gsize length)
|
gsize length)
|
||||||
{
|
{
|
||||||
MappedRingHeader *header;
|
MappedRingHeader *header;
|
||||||
MappedRingFrame *fr;
|
|
||||||
guint32 tail;
|
guint32 tail;
|
||||||
|
|
||||||
g_return_if_fail (self != NULL);
|
g_return_if_fail (self != NULL);
|
||||||
@ -442,13 +427,8 @@ mapped_ring_buffer_advance (MappedRingBuffer *self,
|
|||||||
header = get_header (self);
|
header = get_header (self);
|
||||||
tail = header->tail;
|
tail = header->tail;
|
||||||
|
|
||||||
/* First write the frame header with the data length */
|
/* Calculate the new tail position */
|
||||||
fr = get_body_at_pos (self, tail);
|
tail = tail + length;
|
||||||
fr->len = length;
|
|
||||||
fr->padding = 0;
|
|
||||||
|
|
||||||
/* Now calculate the new tail position */
|
|
||||||
tail = tail + sizeof *fr + length;
|
|
||||||
if (tail >= self->body_size)
|
if (tail >= self->body_size)
|
||||||
tail -= self->body_size;
|
tail -= self->body_size;
|
||||||
|
|
||||||
@ -508,13 +488,16 @@ mapped_ring_buffer_drain (MappedRingBuffer *self,
|
|||||||
|
|
||||||
while (headpos < tailpos)
|
while (headpos < tailpos)
|
||||||
{
|
{
|
||||||
const MappedRingFrame *fr = get_body_at_pos (self, headpos);
|
gconstpointer data = get_body_at_pos (self, headpos);
|
||||||
gconstpointer data = (guint8 *)fr + sizeof *fr;
|
gsize len = tailpos - headpos;
|
||||||
|
|
||||||
headpos = headpos + sizeof *fr + fr->len;
|
if (!callback (data, &len, user_data))
|
||||||
|
|
||||||
if (!callback (data, fr->len, user_data))
|
|
||||||
goto short_circuit;
|
goto short_circuit;
|
||||||
|
|
||||||
|
if (len > (tailpos - headpos))
|
||||||
|
goto short_circuit;
|
||||||
|
|
||||||
|
headpos += len;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = TRUE;
|
ret = TRUE;
|
||||||
@ -605,9 +588,10 @@ static GSourceFuncs mapped_ring_source_funcs = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
guint
|
guint
|
||||||
mapped_ring_buffer_create_source (MappedRingBuffer *self,
|
mapped_ring_buffer_create_source_full (MappedRingBuffer *self,
|
||||||
MappedRingBufferCallback source_func,
|
MappedRingBufferCallback source_func,
|
||||||
gpointer user_data)
|
gpointer user_data,
|
||||||
|
GDestroyNotify destroy)
|
||||||
{
|
{
|
||||||
MappedRingSource *source;
|
MappedRingSource *source;
|
||||||
|
|
||||||
@ -618,8 +602,16 @@ mapped_ring_buffer_create_source (MappedRingBuffer *self,
|
|||||||
|
|
||||||
source = (MappedRingSource *)g_source_new (&mapped_ring_source_funcs, sizeof (MappedRingSource));
|
source = (MappedRingSource *)g_source_new (&mapped_ring_source_funcs, sizeof (MappedRingSource));
|
||||||
source->self = mapped_ring_buffer_ref (self);
|
source->self = mapped_ring_buffer_ref (self);
|
||||||
g_source_set_callback ((GSource *)source, (GSourceFunc)source_func, user_data, NULL);
|
g_source_set_callback ((GSource *)source, (GSourceFunc)source_func, user_data, destroy);
|
||||||
g_source_set_name ((GSource *)source, "MappedRingSource");
|
g_source_set_name ((GSource *)source, "MappedRingSource");
|
||||||
|
|
||||||
return g_source_attach ((GSource *)source, g_main_context_default ());
|
return g_source_attach ((GSource *)source, g_main_context_default ());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
guint
|
||||||
|
mapped_ring_buffer_create_source (MappedRingBuffer *self,
|
||||||
|
MappedRingBufferCallback source_func,
|
||||||
|
gpointer user_data)
|
||||||
|
{
|
||||||
|
return mapped_ring_buffer_create_source_full (self, source_func, user_data, NULL);
|
||||||
|
}
|
||||||
|
|||||||
@ -29,25 +29,29 @@ typedef struct _MappedRingBuffer MappedRingBuffer;
|
|||||||
/**
|
/**
|
||||||
* MappedRingBufferCallback:
|
* MappedRingBufferCallback:
|
||||||
* @data: a pointer into the mapped buffer containing the data frame
|
* @data: a pointer into the mapped buffer containing the data frame
|
||||||
* @length: the size of @data in bytes
|
* @length: (inout): the number of bytes to advance
|
||||||
* @user_data: closure data provided to mapped_ring_buffer_drain()
|
* @user_data: closure data provided to mapped_ring_buffer_drain()
|
||||||
*
|
*
|
||||||
* Functions matching this prototype will be called from the
|
* Functions matching this prototype will be called from the
|
||||||
* mapped_ring_buffer_drain() function for each data frame read from
|
* mapped_ring_buffer_drain() function for each data frame read from the
|
||||||
* the underlying memory mapping.
|
* underlying memory mapping.
|
||||||
*
|
*
|
||||||
* If frames were lost because the reader could not keep up, then
|
* @length is initially set to the max bytes that @data could contain, but
|
||||||
* @data will be NULL and @length will be the number of frames that
|
* should be set by the caller to the amount of bytes known to have been
|
||||||
* were known to be lost by the peer.
|
* consumed. This allows MappedRingBuffer to avoid storing length data or
|
||||||
|
* framing information as that can come from the buffer content.
|
||||||
|
*
|
||||||
|
* The callback should shorten @length if it knows the frame is less than
|
||||||
|
* what was provided.
|
||||||
*
|
*
|
||||||
* This function can also be used with mapped_ring_buffer_create_source()
|
* This function can also be used with mapped_ring_buffer_create_source()
|
||||||
* to automatically drain the ring buffer as part of the main loop progress.
|
* to automatically drain the ring buffer as part of the main loop progress.
|
||||||
*
|
*
|
||||||
* Returns: %TRUE to coninue draining, otherwise %FALSE and draining stops
|
* Returns: %TRUE to coninue draining, otherwise %FALSE and draining stops
|
||||||
*/
|
*/
|
||||||
typedef gboolean (*MappedRingBufferCallback) (gconstpointer data,
|
typedef gboolean (*MappedRingBufferCallback) (gconstpointer data,
|
||||||
gsize length,
|
gsize *length,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
|
|
||||||
G_GNUC_INTERNAL
|
G_GNUC_INTERNAL
|
||||||
MappedRingBuffer *mapped_ring_buffer_new_reader (gsize buffer_size);
|
MappedRingBuffer *mapped_ring_buffer_new_reader (gsize buffer_size);
|
||||||
@ -70,9 +74,14 @@ gboolean mapped_ring_buffer_drain (MappedRingBuffer *se
|
|||||||
MappedRingBufferCallback callback,
|
MappedRingBufferCallback callback,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
G_GNUC_INTERNAL
|
G_GNUC_INTERNAL
|
||||||
guint mapped_ring_buffer_create_source (MappedRingBuffer *self,
|
guint mapped_ring_buffer_create_source (MappedRingBuffer *self,
|
||||||
MappedRingBufferCallback callback,
|
MappedRingBufferCallback callback,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
|
G_GNUC_INTERNAL
|
||||||
|
guint mapped_ring_buffer_create_source_full (MappedRingBuffer *self,
|
||||||
|
MappedRingBufferCallback callback,
|
||||||
|
gpointer user_data,
|
||||||
|
GDestroyNotify destroy);
|
||||||
|
|
||||||
G_DEFINE_AUTOPTR_CLEANUP_FUNC (MappedRingBuffer, mapped_ring_buffer_unref)
|
G_DEFINE_AUTOPTR_CLEANUP_FUNC (MappedRingBuffer, mapped_ring_buffer_unref)
|
||||||
|
|
||||||
|
|||||||
@ -58,11 +58,24 @@ struct _SysprofControlSource
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
SysprofControlSource *self;
|
||||||
|
guint id;
|
||||||
|
} RingData;
|
||||||
|
|
||||||
static void source_iface_init (SysprofSourceInterface *iface);
|
static void source_iface_init (SysprofSourceInterface *iface);
|
||||||
|
|
||||||
G_DEFINE_TYPE_WITH_CODE (SysprofControlSource, sysprof_control_source, G_TYPE_OBJECT,
|
G_DEFINE_TYPE_WITH_CODE (SysprofControlSource, sysprof_control_source, G_TYPE_OBJECT,
|
||||||
G_IMPLEMENT_INTERFACE (SYSPROF_TYPE_SOURCE, source_iface_init))
|
G_IMPLEMENT_INTERFACE (SYSPROF_TYPE_SOURCE, source_iface_init))
|
||||||
|
|
||||||
|
static void
|
||||||
|
ring_data_free (RingData *rd)
|
||||||
|
{
|
||||||
|
g_clear_object (&rd->self);
|
||||||
|
g_slice_free (RingData, rd);
|
||||||
|
}
|
||||||
|
|
||||||
SysprofControlSource *
|
SysprofControlSource *
|
||||||
sysprof_control_source_new (void)
|
sysprof_control_source_new (void)
|
||||||
{
|
{
|
||||||
@ -110,55 +123,42 @@ sysprof_control_source_init (SysprofControlSource *self)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
event_frame_cb (gconstpointer data,
|
event_frame_cb (gconstpointer data,
|
||||||
gsize length,
|
gsize *length,
|
||||||
gpointer user_data)
|
gpointer user_data)
|
||||||
{
|
{
|
||||||
SysprofControlSource *self = user_data;
|
|
||||||
const SysprofCaptureFrame *fr = data;
|
const SysprofCaptureFrame *fr = data;
|
||||||
|
RingData *rd = user_data;
|
||||||
|
|
||||||
g_assert (SYSPROF_IS_CONTROL_SOURCE (self));
|
g_assert (rd != NULL);
|
||||||
|
g_assert (SYSPROF_IS_CONTROL_SOURCE (rd->self));
|
||||||
|
g_assert (rd->id > 0);
|
||||||
|
|
||||||
if (self->writer != NULL)
|
if G_UNLIKELY (rd->self->writer == NULL ||
|
||||||
_sysprof_capture_writer_add_raw (self->writer, fr);
|
*length < sizeof *fr ||
|
||||||
|
*length < fr->len ||
|
||||||
|
fr->type >= SYSPROF_CAPTURE_FRAME_LAST)
|
||||||
|
goto remove_source;
|
||||||
|
|
||||||
|
_sysprof_capture_writer_add_raw (rd->self->writer, fr);
|
||||||
|
|
||||||
return G_SOURCE_CONTINUE;
|
return G_SOURCE_CONTINUE;
|
||||||
|
|
||||||
|
remove_source:
|
||||||
|
for (guint i = 0; i < rd->self->source_ids->len; i++)
|
||||||
|
{
|
||||||
|
guint id = g_array_index (rd->self->source_ids, guint, i);
|
||||||
|
|
||||||
|
if (id == rd->id)
|
||||||
|
{
|
||||||
|
g_array_remove_index (rd->self->source_ids, i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return G_SOURCE_REMOVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
g_autoptr(GUnixFDList) out_fd_list = NULL;
|
|
||||||
g_autoptr(GError) error = NULL;
|
|
||||||
MappedRingBuffer *reader = NULL;
|
|
||||||
guint id;
|
|
||||||
gint fd;
|
|
||||||
gint handle;
|
|
||||||
|
|
||||||
g_assert (IPC_IS_COLLECTOR (collector));
|
|
||||||
g_assert (G_IS_DBUS_METHOD_INVOCATION (invocation));
|
|
||||||
g_assert (SYSPROF_IS_CONTROL_SOURCE (self));
|
|
||||||
|
|
||||||
if (self->stopped)
|
|
||||||
goto failure;
|
|
||||||
|
|
||||||
if (!(reader = mapped_ring_buffer_new_reader (0)))
|
|
||||||
goto failure;
|
|
||||||
|
|
||||||
fd = mapped_ring_buffer_get_fd (reader);
|
|
||||||
out_fd_list = g_unix_fd_list_new ();
|
|
||||||
handle = g_unix_fd_list_append (out_fd_list, fd, &error);
|
|
||||||
if (handle == -1)
|
|
||||||
goto failure;
|
|
||||||
|
|
||||||
id = mapped_ring_buffer_create_source (reader, event_frame_cb, self);
|
|
||||||
g_array_append_val (self->source_ids, id);
|
|
||||||
g_clear_pointer (&reader, mapped_ring_buffer_unref);
|
|
||||||
|
|
||||||
ipc_collector_complete_create_writer (collector,
|
|
||||||
g_steal_pointer (&invocation),
|
|
||||||
out_fd_list,
|
|
||||||
g_variant_new_handle (handle));
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef G_OS_UNIX
|
#ifdef G_OS_UNIX
|
||||||
static void
|
static void
|
||||||
sysprof_control_source_read_cb (GObject *object,
|
sysprof_control_source_read_cb (GObject *object,
|
||||||
@ -184,9 +184,16 @@ sysprof_control_source_read_cb (GObject *object,
|
|||||||
if ((buffer = mapped_ring_buffer_new_reader (0)))
|
if ((buffer = mapped_ring_buffer_new_reader (0)))
|
||||||
{
|
{
|
||||||
int fd = mapped_ring_buffer_get_fd (buffer);
|
int fd = mapped_ring_buffer_get_fd (buffer);
|
||||||
guint id = mapped_ring_buffer_create_source (buffer, event_frame_cb, self);
|
RingData *rd;
|
||||||
|
|
||||||
g_array_append_val (self->source_ids, id);
|
rd = g_slice_new0 (RingData);
|
||||||
|
rd->self = g_object_ref (self);
|
||||||
|
rd->id = mapped_ring_buffer_create_source_full (buffer,
|
||||||
|
event_frame_cb,
|
||||||
|
rd,
|
||||||
|
(GDestroyNotify)ring_data_free);
|
||||||
|
|
||||||
|
g_array_append_val (self->source_ids, rd->id);
|
||||||
g_unix_connection_send_fd (self->conn, fd, NULL, NULL);
|
g_unix_connection_send_fd (self->conn, fd, NULL, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,25 +5,27 @@
|
|||||||
static gsize real_count;
|
static gsize real_count;
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
drain_nth_cb (gconstpointer data,
|
drain_nth_cb (gconstpointer data,
|
||||||
gsize len,
|
gsize *len,
|
||||||
gpointer user_data)
|
gpointer user_data)
|
||||||
{
|
{
|
||||||
const gint64 *v64 = data;
|
const gint64 *v64 = data;
|
||||||
g_assert_cmpint (len, ==, 8);
|
g_assert_cmpint (*len, >=, 8);
|
||||||
g_assert_cmpint (*v64, ==, GPOINTER_TO_SIZE (user_data));
|
g_assert_cmpint (*v64, ==, GPOINTER_TO_SIZE (user_data));
|
||||||
|
*len = sizeof *v64;
|
||||||
return G_SOURCE_CONTINUE;
|
return G_SOURCE_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
drain_count_cb (gconstpointer data,
|
drain_count_cb (gconstpointer data,
|
||||||
gsize len,
|
gsize *len,
|
||||||
gpointer user_data)
|
gpointer user_data)
|
||||||
{
|
{
|
||||||
const gint64 *v64 = data;
|
const gint64 *v64 = data;
|
||||||
g_assert_cmpint (len, ==, 8);
|
g_assert_cmpint (*len, >=, 8);
|
||||||
++real_count;
|
++real_count;
|
||||||
g_assert_cmpint (real_count, ==, *v64);
|
g_assert_cmpint (real_count, ==, *v64);
|
||||||
|
*len = sizeof *v64;
|
||||||
return G_SOURCE_CONTINUE;
|
return G_SOURCE_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,13 +93,14 @@ typedef struct
|
|||||||
} ThreadedMessage;
|
} ThreadedMessage;
|
||||||
|
|
||||||
static gboolean
|
static gboolean
|
||||||
handle_msg (gconstpointer data,
|
handle_msg (gconstpointer data,
|
||||||
gsize length,
|
gsize *length,
|
||||||
gpointer user_data)
|
gpointer user_data)
|
||||||
{
|
{
|
||||||
const ThreadedMessage *msg = data;
|
const ThreadedMessage *msg = data;
|
||||||
gboolean *done = user_data;
|
gboolean *done = user_data;
|
||||||
*done = msg->done;
|
*done = msg->done;
|
||||||
|
*length = sizeof *msg;
|
||||||
return G_SOURCE_CONTINUE;
|
return G_SOURCE_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user