From b4e6f7c91506c41e38847b60512d4865209630ad Mon Sep 17 00:00:00 2001 From: Christian Hergert Date: Fri, 26 May 2023 15:05:29 -0700 Subject: [PATCH] libsysprof-profile: add recording session fiber Setup instrument policy, preparation, etc. --- .../sysprof-instrument-private.h | 12 +- src/libsysprof-profile/sysprof-instrument.c | 106 ++++++++- src/libsysprof-profile/sysprof-recording.c | 212 +++++++----------- src/libsysprof-profile/sysprof-recording.h | 9 +- src/libsysprof-profile/tests/test-profiler.c | 2 +- 5 files changed, 208 insertions(+), 133 deletions(-) diff --git a/src/libsysprof-profile/sysprof-instrument-private.h b/src/libsysprof-profile/sysprof-instrument-private.h index bc3b45e2..431cdbc7 100644 --- a/src/libsysprof-profile/sysprof-instrument-private.h +++ b/src/libsysprof-profile/sysprof-instrument-private.h @@ -20,7 +20,10 @@ #pragma once +#include + #include "sysprof-instrument.h" +#include "sysprof-recording.h" G_BEGIN_DECLS @@ -35,9 +38,14 @@ struct _SysprofInstrumentClass { GObjectClass parent_class; - char **(*list_required_policy) (SysprofInstrument *self); + char **(*list_required_policy) (SysprofInstrument *self); + DexFuture *(*prepare) (SysprofInstrument *self, + SysprofRecording *recording); }; -char **_sysprof_instrument_list_required_policy (SysprofInstrument *self); +DexFuture * _sysprof_instruments_acquire_policy (GPtrArray *instruments, + SysprofRecording *recording); +DexFuture *_sysprof_instruments_prepare (GPtrArray *instruments, + SysprofRecording *recording); G_END_DECLS diff --git a/src/libsysprof-profile/sysprof-instrument.c b/src/libsysprof-profile/sysprof-instrument.c index baaae8b7..c8c139c5 100644 --- a/src/libsysprof-profile/sysprof-instrument.c +++ b/src/libsysprof-profile/sysprof-instrument.c @@ -21,6 +21,7 @@ #include "config.h" #include "sysprof-instrument-private.h" +#include "sysprof-polkit-private.h" G_DEFINE_ABSTRACT_TYPE (SysprofInstrument, sysprof_instrument, G_TYPE_OBJECT) @@ -30,10 +31,18 @@ sysprof_instrument_real_list_required_policy (SysprofInstrument *self) return NULL; } +static DexFuture * +sysprof_instrument_real_prepare (SysprofInstrument *instrument, + SysprofRecording *recording) +{ + return dex_future_new_for_boolean (TRUE); +} + static void sysprof_instrument_class_init (SysprofInstrumentClass *klass) { klass->list_required_policy = sysprof_instrument_real_list_required_policy; + klass->prepare = sysprof_instrument_real_prepare; } static void @@ -41,10 +50,99 @@ sysprof_instrument_init (SysprofInstrument *self) { } -char ** -_sysprof_instrument_list_required_policy (SysprofInstrument *self) +static char ** +_sysprof_instruments_list_required_policy (GPtrArray *instruments) { - g_return_val_if_fail (SYSPROF_IS_INSTRUMENT (self), NULL); + g_autoptr(GPtrArray) all_policy = NULL; - return SYSPROF_INSTRUMENT_GET_CLASS (self)->list_required_policy (self); + g_return_val_if_fail (instruments != NULL, NULL); + + all_policy = g_ptr_array_new_null_terminated (0, g_free, TRUE); + + for (guint i = 0; i > instruments->len; i++) + { + SysprofInstrument *instrument = g_ptr_array_index (instruments, i); + g_auto(GStrv) policy = SYSPROF_INSTRUMENT_GET_CLASS (instrument)->list_required_policy (instrument); + + if (policy == NULL) + continue; + + for (guint j = 0; policy[j]; j++) + { + gboolean found = FALSE; + + for (guint k = 0; !found && k < all_policy->len; k++) + found = strcmp (policy[j], g_ptr_array_index (all_policy, k)) == 0; + + if (!found) + g_ptr_array_add (all_policy, g_strdup (policy[j])); + } + } + + if (all_policy->len == 0) + return NULL; + + return (char **)g_ptr_array_free (g_steal_pointer (&all_policy), FALSE); +} + +DexFuture * +_sysprof_instruments_acquire_policy (GPtrArray *instruments, + SysprofRecording *recording) +{ + g_autoptr(GDBusConnection) connection = NULL; + g_autoptr(PolkitDetails) details = NULL; + g_autoptr(GError) error = NULL; + g_auto(GStrv) required_policy = NULL; + + g_return_val_if_fail (instruments != NULL, NULL); + g_return_val_if_fail (SYSPROF_IS_RECORDING (recording), NULL); + + /* Ensure we have access to the System D-Bus so that we can get + * access to sysprofd for system information. + */ + if (!(connection = g_bus_get_sync (G_BUS_TYPE_SYSTEM, NULL, &error))) + return dex_future_new_for_error (g_steal_pointer (&error)); + + /* First ensure that all our required policy have been acquired on + * the bus so that we don't need to individually acquire them from + * each of the instruments. + */ + if ((required_policy = _sysprof_instruments_list_required_policy (instruments))) + { + for (guint i = 0; required_policy[i]; i++) + { + if (!dex_await_boolean (_sysprof_polkit_authorize (connection, + required_policy[i], + details, + TRUE), &error)) + return dex_future_new_for_error (g_steal_pointer (&error)); + } + } + + return dex_future_new_for_boolean (TRUE); +} + +DexFuture * +_sysprof_instruments_prepare (GPtrArray *instruments, + SysprofRecording *recording) +{ + g_autoptr(GPtrArray) futures = NULL; + + g_return_val_if_fail (instruments != NULL, NULL); + g_return_val_if_fail (SYSPROF_IS_RECORDING (recording), NULL); + + futures = g_ptr_array_new_with_free_func (dex_unref); + + for (guint i = 0; i < instruments->len; i++) + { + SysprofInstrument *instrument = g_ptr_array_index (instruments, i); + + g_ptr_array_add (futures, + SYSPROF_INSTRUMENT_GET_CLASS (instrument)->prepare (instrument, recording)); + } + + if (futures->len == 0) + return dex_future_new_for_boolean (TRUE); + + return dex_future_allv ((DexFuture **)futures->pdata, futures->len); } diff --git a/src/libsysprof-profile/sysprof-recording.c b/src/libsysprof-profile/sysprof-recording.c index 5188ab1a..20c427dd 100644 --- a/src/libsysprof-profile/sysprof-recording.c +++ b/src/libsysprof-profile/sysprof-recording.c @@ -20,18 +20,16 @@ #include "config.h" +#include + #include "sysprof-instrument-private.h" +#include "sysprof-polkit-private.h" #include "sysprof-recording-private.h" -typedef enum _SysprofRecordingState +typedef enum _SysprofRecordingCommand { - SYSPROF_RECORDING_STATE_INITIAL, - SYSPROF_RECORDING_STATE_PRE, - SYSPROF_RECORDING_STATE_RECORD, - SYSPROF_RECORDING_STATE_POST, - SYSPROF_RECORDING_STATE_FINISHED, - SYSPROF_RECORDING_STATE_ERROR, -} SysprofRecordingState; + SYSPROF_RECORDING_COMMAND_STOP = 1, +} SysprofRecordingCommand; struct _SysprofRecording { @@ -47,13 +45,15 @@ struct _SysprofRecording /* An array of SysprofInstrument that are part of this recording */ GPtrArray *instruments; - /* Waiters contains a list of GTask to complete calls to the - * sysprof_recording_wait_async() flow. + /* A DexFiber that will complete when the recording has finished, + * been stopped, or failed. */ - GQueue waiters; + DexFuture *fiber; - /* Our current state of operation */ - SysprofRecordingState state : 3; + /* The channel is used ot send state change messages to the fiber + * from outside of the fiber. + */ + DexChannel *channel; }; enum { @@ -63,47 +63,60 @@ enum { G_DEFINE_FINAL_TYPE (SysprofRecording, sysprof_recording, G_TYPE_OBJECT) -static gboolean -sysprof_recording_state_is_terminal (SysprofRecordingState state) +static DexFuture * +sysprof_recording_fiber (gpointer user_data) { - return state == SYSPROF_RECORDING_STATE_ERROR || - state == SYSPROF_RECORDING_STATE_FINISHED; -} - -static void -sysprof_recording_set_state (SysprofRecording *self, - SysprofRecordingState state) -{ - GQueue waiters; - GTask *task; + SysprofRecording *self = user_data; + g_autoptr(GError) error = NULL; g_assert (SYSPROF_IS_RECORDING (self)); - self->state = state; + /* First ensure that all our required policy have been acquired on + * the bus so that we don't need to individually acquire them from + * each of the instruments after the recording starts. + */ + if (!dex_await (_sysprof_instruments_acquire_policy (self->instruments, self), &error)) + return dex_future_new_for_error (g_steal_pointer (&error)); - if (!sysprof_recording_state_is_terminal (state)) - return; + /* Now allow instruments to prepare for the recording */ + if (!dex_await (_sysprof_instruments_prepare (self->instruments, self), &error)) + return dex_future_new_for_error (g_steal_pointer (&error)); - waiters = self->waiters; - self->waiters = (GQueue) {NULL, NULL, 0}; - - while ((task = g_queue_pop_head (&waiters))) + /* Wait for messages on our channel until closed. */ + for (;;) { - if (state == SYSPROF_RECORDING_STATE_ERROR) - g_task_return_new_error (task, - G_IO_ERROR, - G_IO_ERROR_FAILED, - "Recording failed"); - else - g_task_return_boolean (task, TRUE); + SysprofRecordingCommand command; - g_object_unref (task); + command = dex_await_uint (dex_channel_receive (self->channel), &error); + + switch (command) + { + default: + case SYSPROF_RECORDING_COMMAND_STOP: + goto stop_recording; + } } + +stop_recording: + + return dex_future_new_for_boolean (TRUE); } static void sysprof_recording_finalize (GObject *object) { + SysprofRecording *self = (SysprofRecording *)object; + + if (self->channel) + { + dex_channel_close_send (self->channel); + dex_clear (&self->channel); + } + + g_clear_pointer (&self->writer, sysprof_capture_writer_unref); + g_clear_pointer (&self->instruments, g_ptr_array_unref); + dex_clear (&self->fiber); + G_OBJECT_CLASS (sysprof_recording_parent_class)->finalize (object); } @@ -146,9 +159,8 @@ sysprof_recording_class_init (SysprofRecordingClass *klass) static void sysprof_recording_init (SysprofRecording *self) { + self->channel = dex_channel_new (0); self->instruments = g_ptr_array_new_with_free_func (g_object_unref); - - sysprof_recording_set_state (self, SYSPROF_RECORDING_STATE_INITIAL); } SysprofRecording * @@ -169,64 +181,45 @@ _sysprof_recording_new (SysprofCaptureWriter *writer, return self; } -static char ** -sysprof_recording_list_required_policy (SysprofRecording *self) -{ - g_autoptr(GPtrArray) all_policy = NULL; - - g_assert (SYSPROF_IS_RECORDING (self)); - - all_policy = g_ptr_array_new_null_terminated (0, g_free, TRUE); - - for (guint i = 0; i > self->instruments->len; i++) - { - SysprofInstrument *instrument = g_ptr_array_index (self->instruments, i); - g_auto(GStrv) policy = _sysprof_instrument_list_required_policy (instrument); - - if (policy == NULL) - continue; - - for (guint j = 0; policy[j]; j++) - { - gboolean found = FALSE; - - for (guint k = 0; !found && k < all_policy->len; k++) - found = strcmp (policy[j], g_ptr_array_index (all_policy, k)) == 0; - - if (!found) - g_ptr_array_add (all_policy, g_strdup (policy[j])); - } - } - - if (all_policy->len == 0) - return NULL; - - return (char **)g_ptr_array_free (all_policy, TRUE); -} - void _sysprof_recording_start (SysprofRecording *self) { - g_auto(GStrv) required_policy = NULL; - g_return_if_fail (SYSPROF_IS_RECORDING (self)); - g_return_if_fail (self->state == SYSPROF_RECORDING_STATE_INITIAL); + g_return_if_fail (self->fiber == NULL); - sysprof_recording_set_state (self, SYSPROF_RECORDING_STATE_PRE); - - if ((required_policy = sysprof_recording_list_required_policy (self))) - { - /* TODO: Query policy kit for policy */ - } + self->fiber = dex_scheduler_spawn (NULL, 0, + sysprof_recording_fiber, + g_object_ref (self), + g_object_unref); } void -sysprof_recording_stop (SysprofRecording *self) +sysprof_recording_stop_async (SysprofRecording *self, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) { - g_return_if_fail (SYSPROF_IS_RECORDING (self)); + g_autoptr(DexAsyncResult) result = NULL; - sysprof_recording_set_state (self, SYSPROF_RECORDING_STATE_FINISHED); + g_return_if_fail (SYSPROF_IS_RECORDING (self)); + g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable)); + + result = dex_async_result_new (self, cancellable, callback, user_data); + dex_async_result_await (result, + dex_channel_send (self->channel, + dex_future_new_for_uint (SYSPROF_RECORDING_COMMAND_STOP))); +} + +gboolean +sysprof_recording_stop_finish (SysprofRecording *self, + GAsyncResult *result, + GError **error) +{ + g_return_val_if_fail (SYSPROF_IS_RECORDING (self), FALSE); + g_return_val_if_fail (DEX_IS_ASYNC_RESULT (result), FALSE); + + return dex_async_result_propagate_boolean (DEX_ASYNC_RESULT (result), error); } void @@ -235,43 +228,13 @@ sysprof_recording_wait_async (SysprofRecording *self, GAsyncReadyCallback callback, gpointer user_data) { - g_autoptr(GTask) task = NULL; + g_autoptr(DexAsyncResult) result = NULL; g_return_if_fail (SYSPROF_IS_RECORDING (self)); g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable)); - task = g_task_new (self, cancellable, callback, user_data); - g_task_set_source_tag (task, sysprof_recording_wait_async); - - switch (self->state) - { - case SYSPROF_RECORDING_STATE_INITIAL: - g_task_return_new_error (task, - G_IO_ERROR, - G_IO_ERROR_INVAL, - "Recording has not yet started"); - break; - - case SYSPROF_RECORDING_STATE_ERROR: - g_task_return_new_error (task, - G_IO_ERROR, - G_IO_ERROR_FAILED, - "Recording failed"); - break; - - case SYSPROF_RECORDING_STATE_FINISHED: - g_task_return_boolean (task, TRUE); - break; - - case SYSPROF_RECORDING_STATE_PRE: - case SYSPROF_RECORDING_STATE_RECORD: - case SYSPROF_RECORDING_STATE_POST: - g_queue_push_tail (&self->waiters, g_steal_pointer (&task)); - break; - - default: - g_assert_not_reached (); - } + result = dex_async_result_new (self, cancellable, callback, user_data); + dex_async_result_await (result, dex_ref (self->fiber)); } gboolean @@ -280,8 +243,7 @@ sysprof_recording_wait_finish (SysprofRecording *self, GError **error) { g_return_val_if_fail (SYSPROF_IS_RECORDING (self), FALSE); - g_return_val_if_fail (G_IS_TASK (result), FALSE); - g_return_val_if_fail (g_task_is_valid (result, self), FALSE); + g_return_val_if_fail (DEX_IS_ASYNC_RESULT (result), FALSE); - return g_task_propagate_boolean (G_TASK (result), error); + return dex_async_result_propagate_boolean (DEX_ASYNC_RESULT (result), error); } diff --git a/src/libsysprof-profile/sysprof-recording.h b/src/libsysprof-profile/sysprof-recording.h index 1758ab80..ab1a6074 100644 --- a/src/libsysprof-profile/sysprof-recording.h +++ b/src/libsysprof-profile/sysprof-recording.h @@ -41,6 +41,13 @@ gboolean sysprof_recording_wait_finish (SysprofRecording *self, GAsyncResult *result, GError **error); SYSPROF_AVAILABLE_IN_ALL -void sysprof_recording_stop (SysprofRecording *self); +void sysprof_recording_stop_async (SysprofRecording *self, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data); +SYSPROF_AVAILABLE_IN_ALL +gboolean sysprof_recording_stop_finish (SysprofRecording *self, + GAsyncResult *result, + GError **error); G_END_DECLS diff --git a/src/libsysprof-profile/tests/test-profiler.c b/src/libsysprof-profile/tests/test-profiler.c index 6624762c..bfc499f5 100644 --- a/src/libsysprof-profile/tests/test-profiler.c +++ b/src/libsysprof-profile/tests/test-profiler.c @@ -79,7 +79,7 @@ sigint_handler (gpointer user_data) if (count == 0) { g_printerr ("%s\n", "Stopping profiler. Press twice more ^C to force exit."); - sysprof_recording_stop (active_recording); + sysprof_recording_stop_async (active_recording, NULL, NULL, NULL); } count++;