Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement proposed k8s-stream-file format #265

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 136 additions & 5 deletions src/ctr_logging.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ static inline int sd_journal_sendv(G_GNUC_UNUSED const struct iovec *iov, G_GNUC
/* Different types of container logging */
static gboolean use_journald_logging = FALSE;
static gboolean use_k8s_logging = FALSE;
static gboolean use_k8s_stream_logging = FALSE;

/* Value the user must input for each log driver */
static const char *const K8S_FILE_STRING = "k8s-file";
static const char *const K8S_STREAM_FILE_STRING = "k8s-stream-file";
static const char *const JOURNALD_FILE_STRING = "journald";

/* Max log size for any log file types */
Expand Down Expand Up @@ -64,6 +66,7 @@ static void parse_log_path(char *log_config);
static const char *stdpipe_name(stdpipe_t pipe);
static int write_journald(int pipe, char *buf, ssize_t num_read);
static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen);
static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len);
static ssize_t writev_buffer_flush(int fd, writev_buffer_t *buf);
Expand Down Expand Up @@ -134,9 +137,10 @@ void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, char *cuu
* parse_log_path branches on log driver type the user inputted.
* log_config will either be a ':' delimited string containing:
* <DRIVER_NAME>:<PATH_NAME> or <PATH_NAME>
* in the case of no colon, the driver will be kubernetes-log-file,
* in the case the log driver is 'k8s-stream-file', the <PATH_NAME> must be present.
* in the case of no colon, the driver will be k8s-file,
* in the case the log driver is 'journald', the <PATH_NAME> is ignored.
* exits with error if <DRIVER_NAME> isn't 'journald' or 'kubernetes-log-file'
* exits with error if <DRIVER_NAME> isn't 'journald', 'k8s-file', or 'k8s-stream-file'.
*/
static void parse_log_path(char *log_config)
{
Expand Down Expand Up @@ -165,6 +169,17 @@ static void parse_log_path(char *log_config)
return;
}

// Driver is k8s-file, k8s-stream-file, or empty
if (!strcmp(driver, K8S_STREAM_FILE_STRING)) {
if (path == NULL) {
nexitf("k8s-stream-file requires a filename");
}
use_k8s_logging = TRUE;
use_k8s_stream_logging = TRUE;
k8s_log_path = path;
return;
}

// Driver is k8s-file or empty
if (!strcmp(driver, K8S_FILE_STRING)) {
if (path == NULL) {
Expand All @@ -188,9 +203,18 @@ static void parse_log_path(char *log_config)
/* write container output to all logs the user defined */
bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read)
{
if (use_k8s_logging && write_k8s_log(pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
return G_SOURCE_CONTINUE;
if (use_k8s_logging) {
if (use_k8s_stream_logging) {
if (write_k8s_log(pipe, buf, num_read) < 0) {
nwarn("write_k8s_log failed");
return G_SOURCE_CONTINUE;
}
} else {
if (write_k8s_stream_log(pipe, buf, num_read) < 0) {
nwarn("write_k8s_stream_log failed");
return G_SOURCE_CONTINUE;
}
}
}
if (use_journald_logging && write_journald(pipe, buf, num_read) < 0) {
nwarn("write_journald failed");
Expand Down Expand Up @@ -540,3 +564,110 @@ void sync_logs(void)
if (fsync(k8s_log_fd) < 0)
pwarn("Failed to sync log file before exit");
}


/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout 9999999999 999999999 ") + 1 */
#define TSSTREAMBUFLEN 128

/*
* PROPOSED: CRI Stream Format, variable length file format
*/
static int set_k8s_stream_timestamp(char *buf, ssize_t bufsiz, ssize_t *tsbuflen, const char *pipename, uint64_t offset, ssize_t buflen,
ssize_t *btbw)
{
char off_sign = '+';
int off, len, err = -1;

struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
/* If CLOCK_REALTIME is not supported, we set nano seconds to 0 */
if (errno == EINVAL) {
ts.tv_nsec = 0;
} else {
return err;
}
}

struct tm current_tm;
if (localtime_r(&ts.tv_sec, &current_tm) == NULL)
return err;

off = (int)current_tm.tm_gmtoff;
if (current_tm.tm_gmtoff < 0) {
off_sign = '-';
off = -off;
}

len = snprintf(buf, bufsiz, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s %lud %ld ", current_tm.tm_year + 1900,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(A drive-by comment with little context:)

If the goal is to offload the CPU processing to consumers, shouldn’t this just be a timestamp instead of all the timezone lookups and formatting?

Either way, UTC would be better than ambiguous local time — does this one need custom parser code to get a struct timespec back?

current_tm.tm_mon + 1, current_tm.tm_mday, current_tm.tm_hour, current_tm.tm_min, current_tm.tm_sec, ts.tv_nsec,
off_sign, off / 3600, off % 3600, pipename, offset, buflen);

if (len < bufsiz)
err = 0;

*tsbuflen = len;
*btbw = len + buflen;
return err;
}


/*
* PROPOSED: CRI Stream Format, variable length file format
*
* %d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %(stream)s %(offset)lud %(buflen)ld %(buf)s
*
* The CRI stream fromat requires us to write each buffer read with a
* timestamp, stream, length (human readable ascii), and the buffer contents
* read (with a space character separating the buffer length string from the
* buffer.
*/
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
{
writev_buffer_t bufv = {0};
char tsbuf[TSSTREAMBUFLEN];
static ssize_t bytes_written = 0;
static uint64_t offset = 0;
ssize_t bytes_to_be_written = 0;
ssize_t tsbuflen = 0;

/*
* Use the same timestamp for every line of the log in this buffer.
* There is no practical difference in the output since write(2) is
* fast.
*/
if (set_k8s_stream_timestamp(tsbuf, sizeof tsbuf, &tsbuflen, stdpipe_name(pipe), buflen, offset, &bytes_to_be_written))
/* TODO: We should handle failures much more cleanly than this. */
return -1;

/*
* We re-open the log file if writing out the bytes will exceed the max
* log size. We also reset the state so that the new file is started with
* a timestamp.
*/
if ((opt_log_size_max > 0) && (bytes_written + bytes_to_be_written) > opt_log_size_max) {
bytes_written = 0;

reopen_k8s_file();
}

/* Output the timestamp, stream, and length */
if (writev_buffer_append_segment(k8s_log_fd, &bufv, tsbuf, tsbuflen) < 0) {
nwarn("failed to write (timestamp, stream) to log");
goto stream_next;
}

/* Output the actual contents. */
if (writev_buffer_append_segment(k8s_log_fd, &bufv, buf, buflen) < 0) {
nwarn("failed to write buffer to log");
}

stream_next:
bytes_written += bytes_to_be_written;
offset += (uint64_t)bytes_to_be_written;

if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) {
nwarn("failed to flush buffer to log");
}

return 0;
}