From c0bc805cc3409baa99e50831529a7b1c1aba9e59 Mon Sep 17 00:00:00 2001 From: Peter Portante Date: Sun, 30 May 2021 23:35:28 -0400 Subject: [PATCH] Implement proposed `k8s-stream-file` format Instead of parsing the contents of the data read from the `stdout` and `stderr` pipes, this commit adds support for a "stream" format, named `k8s-stream-file`, which just records what is read from a pipe to disk. It significantly saves on CPU spend processing the buffer read, uses only 2 I/O vectors, and never touches the memory read from the pipe. This is an updated implementation of https://github.com/cri-o/cri-o/pull/1605. --- src/ctr_logging.c | 140 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 135 insertions(+), 5 deletions(-) diff --git a/src/ctr_logging.c b/src/ctr_logging.c index c3fd5d25..27c3416a 100644 --- a/src/ctr_logging.c +++ b/src/ctr_logging.c @@ -24,9 +24,11 @@ static inline int sd_journal_sendv(G_GNUC_UNUSED const struct iovec *iov, G_GNUC /* Different types of container logging */ static gboolean use_journald_logging = FALSE; static gboolean use_k8s_logging = FALSE; +static gboolean use_k8s_stream_logging = FALSE; /* Value the user must input for each log driver */ static const char *const K8S_FILE_STRING = "k8s-file"; +static const char *const K8S_STREAM_FILE_STRING = "k8s-stream-file"; static const char *const JOURNALD_FILE_STRING = "journald"; /* Max log size for any log file types */ @@ -64,6 +66,7 @@ static void parse_log_path(char *log_config); static const char *stdpipe_name(stdpipe_t pipe); static int write_journald(int pipe, char *buf, ssize_t num_read); static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen); +static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen); static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen); static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len); static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf); @@ -134,9 +137,10 @@ void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, char *cuu * parse_log_path branches on log driver type the user inputted. * log_config will either be a ':' delimited string containing: * : or - * in the case of no colon, the driver will be kubernetes-log-file, + * in the case the log driver is 'k8s-stream-file', the must be present. + * in the case of no colon, the driver will be k8s-file, * in the case the log driver is 'journald', the is ignored. - * exits with error if isn't 'journald' or 'kubernetes-log-file' + * exits with error if isn't 'journald' or 'k8s-file' */ static void parse_log_path(char *log_config) { @@ -165,6 +169,17 @@ static void parse_log_path(char *log_config) return; } + // Driver is k8s-file or empty + if (!strcmp(driver, K8S_STREAM_FILE_STRING)) { + if (path == NULL) { + nexitf("k8s-stream-file requires a filename"); + } + use_k8s_logging = TRUE; + use_k8s_stream_logging = TRUE; + k8s_log_path = path; + return; + } + // Driver is k8s-file or empty if (!strcmp(driver, K8S_FILE_STRING)) { if (path == NULL) { @@ -188,9 +203,19 @@ static void parse_log_path(char *log_config) /* write container output to all logs the user defined */ bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read) { - if (use_k8s_logging && write_k8s_log(pipe, buf, num_read) < 0) { - nwarn("write_k8s_log failed"); - return G_SOURCE_CONTINUE; + if (use_k8s_logging) { + if (use_k8s_stream_logging) { + if (write_k8s_log(pipe, buf, num_read) < 0) { + nwarn("write_k8s_log failed"); + return G_SOURCE_CONTINUE; + } + } + else { + if (write_k8s_stream_log(pipe, buf, num_read) < 0) { + nwarn("write_k8s_stream_log failed"); + return G_SOURCE_CONTINUE; + } + } } if (use_journald_logging && write_journald(pipe, buf, num_read) < 0) { nwarn("write_journald failed"); @@ -540,3 +565,108 @@ void sync_logs(void) if (fsync(k8s_log_fd) < 0) pwarn("Failed to sync log file before exit"); } + + +/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout 9999999999 999999999 ") + 1 */ +#define TSSTREAMBUFLEN 128 + +/* + * PROPOSED: CRI Stream Format, variable length file format + */ +static int set_k8s_stream_timestamp(char *buf, ssize_t bufsiz, ssize_t *tsbuflen, const char *pipename, uint64_t offset, ssize_t buflen, ssize_t *btbw) +{ + struct tm *tm; + struct timespec ts; + char off_sign = '+'; + int off, len, err = -1; + + if (clock_gettime(CLOCK_REALTIME, &ts) < 0) { + /* If CLOCK_REALTIME is not supported, we set nano seconds to 0 */ + if (errno == EINVAL) { + ts.tv_nsec = 0; + } else { + return err; + } + } + + if ((tm = localtime(&ts.tv_sec)) == NULL) + return err; + + off = (int)tm->tm_gmtoff; + if (tm->tm_gmtoff < 0) { + off_sign = '-'; + off = -off; + } + + len = snprintf(buf, bufsiz, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s %lud %ld ", tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, + tm->tm_hour, tm->tm_min, tm->tm_sec, ts.tv_nsec, off_sign, off / 3600, off % 3600, pipename, offset, buflen); + + if (len < bufsiz) + err = 0; + + *tsbuflen = len; + *btbw = len + buflen; + return err; +} + + +/* + * PROPOSED: CRI Stream Format, variable length file format + * + * %d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %(stream)s %(offset)lud %(buflen)ld %(buf)s + * + * The CRI stream fromat requires us to write each buffer read with a + * timestamp, stream, length (human readable ascii), and the buffer contents + * read (with a space character separating the buffer length string from the + * buffer. + */ +static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen) +{ + writev_buffer_t bufv = {0}; + char tsbuf[TSSTREAMBUFLEN]; + static ssize_t bytes_written = 0; + static uint64_t offset = 0; + ssize_t bytes_to_be_written = 0; + ssize_t tsbuflen = 0; + + /* + * Use the same timestamp for every line of the log in this buffer. + * There is no practical difference in the output since write(2) is + * fast. + */ + if (set_k8s_stream_timestamp(tsbuf, sizeof tsbuf, &tsbuflen, stdpipe_name(pipe), buflen, offset, &bytes_to_be_written)) + /* TODO: We should handle failures much more cleanly than this. */ + return -1; + + /* + * We re-open the log file if writing out the bytes will exceed the max + * log size. We also reset the state so that the new file is started with + * a timestamp. + */ + if ((opt_log_size_max > 0) && (bytes_written + bytes_to_be_written) > opt_log_size_max) { + bytes_written = 0; + + reopen_k8s_file(); + } + + /* Output the timestamp, stream, and length */ + if (writev_buffer_append_segment(k8s_log_fd, &bufv, tsbuf, tsbuflen) < 0) { + nwarn("failed to write (timestamp, stream) to log"); + goto stream_next; + } + + /* Output the actual contents. */ + if (writev_buffer_append_segment(k8s_log_fd, &bufv, buf, buflen) < 0) { + nwarn("failed to write buffer to log"); + } + +stream_next: + bytes_written += bytes_to_be_written; + offset += (uint64_t)bytes_to_be_written; + + if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) { + nwarn("failed to flush buffer to log"); + } + + return 0; +}