From f9f7e29e54ce98f1efbebfcf6b6c95df95864a9b Mon Sep 17 00:00:00 2001 From: Christian Hergert Date: Wed, 31 Jul 2019 13:08:42 -0700 Subject: [PATCH] turbostat: use kill() to force sample by turbostat This is more reliable than using a PTY and allows us to use a regular pipe to output data into a GIOChannel. This also changes the design to use async IO watches for sample delivery. --- src/sysprofd/ipc-rapl-profiler.c | 49 ++++- src/sysprofd/sysprof-turbostat.c | 341 ++++++++++--------------------- src/sysprofd/sysprof-turbostat.h | 5 +- 3 files changed, 149 insertions(+), 246 deletions(-) diff --git a/src/sysprofd/ipc-rapl-profiler.c b/src/sysprofd/ipc-rapl-profiler.c index 648c1e7b..f9dd511c 100644 --- a/src/sysprofd/ipc-rapl-profiler.c +++ b/src/sysprofd/ipc-rapl-profiler.c @@ -77,6 +77,8 @@ ipc_rapl_profiler_stop_locked (IpcRaplProfiler *self) { g_assert (IPC_IS_RAPL_PROFILER (self)); + g_message ("Stopping RAPL monitor"); + g_clear_handle_id (&self->poll_source, g_source_remove); if (self->turbostat != NULL) @@ -84,6 +86,13 @@ ipc_rapl_profiler_stop_locked (IpcRaplProfiler *self) g_clear_pointer (&self->turbostat, sysprof_turbostat_free); g_clear_pointer (&self->counter_ids, g_array_unref); + + if (self->writer != NULL) + { + sysprof_capture_writer_flush (self->writer); + sysprof_capture_writer_unref (self->writer); + self->writer = NULL; + } } static guint @@ -263,7 +272,6 @@ ipc_rapl_profiler_poll_cb (gpointer data) { IpcRaplProfiler *self = data; g_autoptr(GMutexLocker) locker = NULL; - g_autoptr(GArray) samples = NULL; g_autoptr(GError) error = NULL; g_assert (IPC_IS_RAPL_PROFILER (self)); @@ -273,13 +281,35 @@ ipc_rapl_profiler_poll_cb (gpointer data) locker = g_mutex_locker_new (&self->mutex); if (self->turbostat == NULL) - return G_SOURCE_REMOVE; + goto failure; g_assert (self->counter_ids != NULL); g_assert (self->writer != NULL); - if (!(samples = sysprof_turbostat_sample (self->turbostat, &error))) - return G_SOURCE_REMOVE; + if (!sysprof_turbostat_sample (self->turbostat, &error)) + { + ipc_rapl_profiler_stop_locked (self); + goto failure; + } + + return G_SOURCE_CONTINUE; + +failure: + self->poll_source = 0; + + return G_SOURCE_REMOVE; +} + +static void +on_sample_cb (gpointer data, + gpointer user_data) +{ + IpcRaplProfiler *self = user_data; + GArray *samples = data; + + g_assert (IPC_IS_RAPL_PROFILER (self)); + g_assert (samples != NULL); + g_assert (samples->len > 0); for (guint i = 0; i < samples->len; i++) { @@ -305,11 +335,11 @@ ipc_rapl_profiler_poll_cb (gpointer data) if (r == FALSE) { ipc_rapl_profiler_stop_locked (self); - return G_SOURCE_REMOVE; + return; } } - return G_SOURCE_CONTINUE; + sysprof_capture_writer_flush (self->writer); } static gboolean @@ -369,7 +399,7 @@ ipc_rapl_profiler_handle_start (IpcProfiler *profiler, return TRUE; } - turbostat = sysprof_turbostat_new (); + turbostat = sysprof_turbostat_new (on_sample_cb, self); if (!sysprof_turbostat_start (turbostat, &error)) { @@ -380,10 +410,11 @@ ipc_rapl_profiler_handle_start (IpcProfiler *profiler, return TRUE; } - /* A small buffer size is fine for our use case. */ - self->writer = sysprof_capture_writer_new_from_fd (fd, 4096); + self->writer = sysprof_capture_writer_new_from_fd (fd, 0); self->counter_ids = g_array_new (FALSE, FALSE, sizeof (CounterId)); + g_message ("Starting RAPL monitor"); + self->turbostat = g_steal_pointer (&turbostat); self->poll_source = g_timeout_add_seconds (DEFAULT_POLL_FREQ_SECONDS, ipc_rapl_profiler_poll_cb, diff --git a/src/sysprofd/sysprof-turbostat.c b/src/sysprofd/sysprof-turbostat.c index c84758b1..7c3fc735 100644 --- a/src/sysprofd/sysprof-turbostat.c +++ b/src/sysprofd/sysprof-turbostat.c @@ -37,15 +37,13 @@ # include #endif -#define PTY_FD_INVALID (-1) - -typedef int PtyFd; - struct _SysprofTurbostat { GPid pid; - GIOChannel *stdin; - GIOChannel *stdout; + GIOChannel *channel; + guint channel_watch; + GFunc sample_func; + gpointer sample_data; }; enum { @@ -54,140 +52,17 @@ enum { KIND_INT, }; -static inline PtyFd -pty_fd_steal (PtyFd *fd) -{ - PtyFd ret = *fd; - *fd = -1; - return ret; -} - -static void -pty_fd_clear (PtyFd *fd) -{ - if (fd != NULL && *fd != -1) - { - int rfd = *fd; - *fd = -1; - close (rfd); - } -} - -G_DEFINE_AUTO_CLEANUP_CLEAR_FUNC (PtyFd, pty_fd_clear) - -PtyFd -pty_create_slave (PtyFd master_fd, - gboolean blocking) -{ - g_auto(PtyFd) ret = PTY_FD_INVALID; - gint extra = blocking ? 0 : O_NONBLOCK; -#if defined(HAVE_PTSNAME_R) || defined(__FreeBSD__) - char name[256]; -#else - const char *name; -#endif - - g_assert (master_fd != -1); - - if (grantpt (master_fd) != 0) - return PTY_FD_INVALID; - - if (unlockpt (master_fd) != 0) - return PTY_FD_INVALID; - -#ifdef HAVE_PTSNAME_R - if (ptsname_r (master_fd, name, sizeof name - 1) != 0) - return PTY_FD_INVALID; - name[sizeof name - 1] = '\0'; -#elif defined(__FreeBSD__) - if (fdevname_r (master_fd, name + 5, sizeof name - 6) == NULL) - return PTY_FD_INVALID; - memcpy (name, "/dev/", 5); - name[sizeof name - 1] = '\0'; -#else - if (NULL == (name = ptsname (master_fd))) - return PTY_FD_INVALID; -#endif - - ret = open (name, O_NOCTTY | O_RDWR | O_CLOEXEC | extra); - - if (ret == PTY_FD_INVALID && errno == EINVAL) - { - gint flags; - - ret = open (name, O_NOCTTY | O_RDWR | O_CLOEXEC); - if (ret == PTY_FD_INVALID && errno == EINVAL) - ret = open (name, O_NOCTTY | O_RDWR); - - if (ret == PTY_FD_INVALID) - return PTY_FD_INVALID; - - /* Add FD_CLOEXEC if O_CLOEXEC failed */ - flags = fcntl (ret, F_GETFD, 0); - if ((flags & FD_CLOEXEC) == 0) - { - if (fcntl (ret, F_SETFD, flags | FD_CLOEXEC) < 0) - return PTY_FD_INVALID; - } - - if (!blocking) - { - if (!g_unix_set_fd_nonblocking (ret, TRUE, NULL)) - return PTY_FD_INVALID; - } - } - - return pty_fd_steal (&ret); -} - -PtyFd -pty_create_master (void) -{ - g_auto(PtyFd) master_fd = PTY_FD_INVALID; - - master_fd = posix_openpt (O_RDWR | O_NOCTTY | O_NONBLOCK | O_CLOEXEC); - -#ifndef __linux__ - /* Fallback for operating systems that don't support - * O_NONBLOCK and O_CLOEXEC when opening. - */ - if (master_fd == PTY_FD_INVALID && errno == EINVAL) - { - master_fd = posix_openpt (O_RDWR | O_NOCTTY | O_CLOEXEC); - - if (master_fd == PTY_FD_INVALID && errno == EINVAL) - { - gint flags; - - master_fd = posix_openpt (O_RDWR | O_NOCTTY); - if (master_fd == -1) - return PTY_FD_INVALID; - - flags = fcntl (master_fd, F_GETFD, 0); - if (flags < 0) - return PTY_FD_INVALID; - - if (fcntl (master_fd, F_SETFD, flags | FD_CLOEXEC) < 0) - return PTY_FD_INVALID; - } - - if (!g_unix_set_fd_nonblocking (master_fd, TRUE, NULL)) - return PTY_FD_INVALID; - } -#endif - - return pty_fd_steal (&master_fd); -} - SysprofTurbostat * -sysprof_turbostat_new (void) +sysprof_turbostat_new (GFunc sample_func, + gpointer sample_data) { SysprofTurbostat *self; self = g_rc_box_new0 (SysprofTurbostat); - self->stdin = NULL; - self->stdout = NULL; self->pid = 0; + self->channel = NULL; + self->sample_func = sample_func; + self->sample_data = sample_data; return g_steal_pointer (&self); } @@ -201,8 +76,7 @@ sysprof_turbostat_finalize (gpointer data) sysprof_turbostat_stop (self); g_assert (self->pid == 0); - g_assert (self->stdin == NULL); - g_assert (self->stdout == NULL); + g_assert (self->channel == NULL); } void @@ -219,108 +93,20 @@ child_setup_cb (gpointer data) #endif } -gboolean -sysprof_turbostat_start (SysprofTurbostat *self, - GError **error) -{ - /* We use a long interval and just send \n to force a sample */ - static const gchar *argv[] = { "turbostat", "-T", "Celcius", "-i", "100000", NULL }; - g_auto(GStrv) env = NULL; - g_auto(PtyFd) stdin_master = PTY_FD_INVALID; - g_auto(PtyFd) stdin_slave = PTY_FD_INVALID; - g_auto(PtyFd) stdout_read = PTY_FD_INVALID; - g_auto(PtyFd) stdout_write = PTY_FD_INVALID; - gint pipes[2]; - gboolean ret; - - g_return_val_if_fail (self != NULL, FALSE); - g_return_val_if_fail (self->pid == 0, FALSE); - g_return_val_if_fail (self->stdin == NULL, FALSE); - g_return_val_if_fail (self->stdout == NULL, FALSE); - - env = g_get_environ (); - env = g_environ_setenv (env, "LANG", "C", TRUE); - - if (-1 == (stdin_master = pty_create_master ()) || - -1 == (stdin_slave = pty_create_slave (stdin_master, FALSE)) || - 0 != pipe2 (pipes, O_CLOEXEC | O_NONBLOCK)) - { - g_set_error (error, - G_FILE_ERROR, - g_file_error_from_errno (errno), - "%s", g_strerror (errno)); - return FALSE; - } - - stdout_read = pipes[0]; - stdout_write = pipes[1]; - - ret = g_spawn_async_with_fds (NULL, - (gchar **)argv, - env, - (G_SPAWN_SEARCH_PATH | G_SPAWN_STDERR_TO_DEV_NULL), - child_setup_cb, - NULL, - &self->pid, - stdin_slave, - stdout_write, - -1, - error); - - if (ret) - { - self->stdin = g_io_channel_unix_new (pty_fd_steal (&stdin_master)); - g_io_channel_set_close_on_unref (self->stdin, TRUE); - g_io_channel_set_buffer_size (self->stdin, 4096); - - self->stdout = g_io_channel_unix_new (pty_fd_steal (&stdout_read)); - g_io_channel_set_close_on_unref (self->stdout, TRUE); - g_io_channel_set_buffer_size (self->stdout, 4096); - g_io_channel_set_flags (self->stdout, G_IO_FLAG_NONBLOCK, NULL); - } - - return ret; -} - -void -sysprof_turbostat_stop (SysprofTurbostat *self) -{ - GPid pid; - - g_return_if_fail (self != NULL); - - if (self->pid == 0) - return; - - pid = self->pid; - self->pid = 0; - kill (pid, SIGTERM); - - g_clear_pointer (&self->stdin, g_io_channel_unref); - g_clear_pointer (&self->stdout, g_io_channel_unref); -} - -GArray * -sysprof_turbostat_sample (SysprofTurbostat *self, - GError **error) +static gboolean +sysprof_turbostat_watch_cb (GIOChannel *channel, + GIOCondition cond, + gpointer data) { + SysprofTurbostat *self = data; g_autoptr(GArray) ret = NULL; - g_auto(GStrv) columns = NULL; g_autoptr(GString) str = NULL; + g_auto(GStrv) columns = NULL; GIOStatus r; gint lineno = 0; - g_return_val_if_fail (self != NULL, NULL); - g_return_val_if_fail (self->stdin != NULL, NULL); - g_return_val_if_fail (self->stdout != NULL, NULL); - - r = g_io_channel_write_chars (self->stdin, "\n", 1, NULL, error) && - g_io_channel_flush (self->stdin, error); - if (r != G_IO_STATUS_NORMAL) - return NULL; - - /* Sleep for just a bit to wait for all results */ - g_usleep (G_USEC_PER_SEC * 0.01); + g_assert (channel != NULL); + g_assert (cond & G_IO_IN); ret = g_array_new (FALSE, FALSE, sizeof (SysprofTurbostatSample)); str = g_string_new (NULL); @@ -328,12 +114,13 @@ sysprof_turbostat_sample (SysprofTurbostat *self, for (;;) { SysprofTurbostatSample sample = {0}; + g_autoptr(GError) lerror = NULL; g_auto(GStrv) parts = NULL; gsize pos = 0; lineno++; - r = g_io_channel_read_line_string (self->stdout, str, &pos, NULL); + r = g_io_channel_read_line_string (self->channel, str, &pos, &lerror); if (r != G_IO_STATUS_NORMAL || str->len == 0 || pos == 0) break; @@ -347,7 +134,7 @@ sysprof_turbostat_sample (SysprofTurbostat *self, continue; } - g_return_val_if_fail (columns != NULL, NULL); + g_assert (columns != NULL); for (guint i = 0; columns[i] != NULL && parts[i] != NULL; i++) { @@ -423,5 +210,89 @@ sysprof_turbostat_sample (SysprofTurbostat *self, g_array_append_val (ret, sample); } - return g_steal_pointer (&ret); + if (ret->len > 0) + self->sample_func (ret, self->sample_data); + + return G_SOURCE_CONTINUE; +} + +gboolean +sysprof_turbostat_start (SysprofTurbostat *self, + GError **error) +{ + /* We use a long interval and kill(..., SIGUSR1) to force a sample */ + static const gchar *argv[] = { "turbostat", "-T", "Celcius", "-i", "100000", NULL }; + g_auto(GStrv) env = NULL; + gboolean ret; + gint stdout_fd = -1; + + g_return_val_if_fail (self != NULL, FALSE); + g_return_val_if_fail (self->pid == 0, FALSE); + g_return_val_if_fail (self->channel == NULL, FALSE); + + env = g_get_environ (); + env = g_environ_setenv (env, "LANG", "C", TRUE); + + ret = g_spawn_async_with_pipes (NULL, + (gchar **)argv, + env, + (G_SPAWN_SEARCH_PATH | G_SPAWN_STDERR_TO_DEV_NULL), + child_setup_cb, + NULL, + &self->pid, + NULL, + &stdout_fd, + NULL, + error); + + + if (ret) + { + if (!g_unix_set_fd_nonblocking (stdout_fd, TRUE, error)) + { + ret = FALSE; + close (stdout_fd); + } + + self->channel = g_io_channel_unix_new (stdout_fd); + g_io_channel_set_close_on_unref (self->channel, TRUE); + g_io_channel_set_buffer_size (self->channel, 4096); + g_io_channel_set_flags (self->channel, G_IO_FLAG_NONBLOCK, NULL); + self->channel_watch = + g_io_add_watch (self->channel, + G_IO_IN, + sysprof_turbostat_watch_cb, + self); + } + + return ret; +} + +void +sysprof_turbostat_stop (SysprofTurbostat *self) +{ + g_return_if_fail (self != NULL); + + if (self->pid != 0) + { + GPid pid = self->pid; + self->pid = 0; + kill (pid, SIGTERM); + } + + g_clear_handle_id (&self->channel_watch, g_source_remove); + g_clear_pointer (&self->channel, g_io_channel_unref); +} + +gboolean +sysprof_turbostat_sample (SysprofTurbostat *self, + GError **error) +{ + g_return_val_if_fail (self != NULL, FALSE); + g_return_val_if_fail (self->channel != NULL, FALSE); + g_return_val_if_fail (self->pid != 0, FALSE); + + kill (self->pid, SIGUSR1); + + return TRUE; } diff --git a/src/sysprofd/sysprof-turbostat.h b/src/sysprofd/sysprof-turbostat.h index f988e575..3b94e75b 100644 --- a/src/sysprofd/sysprof-turbostat.h +++ b/src/sysprofd/sysprof-turbostat.h @@ -36,11 +36,12 @@ typedef struct gdouble ram_watt; } SysprofTurbostatSample; -SysprofTurbostat *sysprof_turbostat_new (void); +SysprofTurbostat *sysprof_turbostat_new (GFunc sample_func, + gpointer sample_data); gboolean sysprof_turbostat_start (SysprofTurbostat *self, GError **error); void sysprof_turbostat_stop (SysprofTurbostat *self); -GArray *sysprof_turbostat_sample (SysprofTurbostat *self, +gboolean sysprof_turbostat_sample (SysprofTurbostat *self, GError **error); void sysprof_turbostat_free (SysprofTurbostat *self);