Skip to content

Commit

Permalink
out_s3: update for getting upload time from timestamp.
Browse files Browse the repository at this point in the history
Signed-off-by: Clay Cheng <[email protected]>
  • Loading branch information
Clay Cheng authored and Claych committed Nov 30, 2022
1 parent 760956f commit 8e8f3e7
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 32 deletions.
100 changes: 71 additions & 29 deletions plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ static int s3_put_object(struct flb_s3 *ctx, const char *tag, time_t create_time

static int put_all_chunks(struct flb_s3 *ctx);

static void cb_s3_upload(struct flb_config *ctx, void *data);
static void cb_s3_upload(struct flb_config *ctx, void *data, struct flb_time *tms);

static struct multipart_upload *get_upload(struct flb_s3 *ctx,
const char *tag, int tag_len);

static struct multipart_upload *create_upload(struct flb_s3 *ctx,
const char *tag, int tag_len);
const char *tag, int tag_len,
struct flb_time *tms);

static void remove_from_queue(struct upload_queue *entry);

Expand Down Expand Up @@ -946,7 +947,7 @@ static int cb_s3_init(struct flb_output_instance *ins,
* we don't need to worry if this fails; it will retry each
* time the upload callback is called
*/
cb_s3_upload(config, ctx);
cb_s3_upload(config, ctx, NULL);
}

if (ctx->use_put_object == FLB_TRUE) {
Expand Down Expand Up @@ -974,7 +975,8 @@ static int cb_s3_init(struct flb_output_instance *ins,
static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
struct multipart_upload *m_upload,
char *body, size_t body_size,
const char *tag, int tag_len)
const char *tag, int tag_len,
struct flb_time *tms)
{
int init_upload = FLB_FALSE;
int complete_upload = FLB_FALSE;
Expand Down Expand Up @@ -1055,8 +1057,12 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
if (chunk) {
create_time = chunk->create_time;
}
else {
else if (tms == NULL) {
create_time = time(NULL);
flb_plg_error(ctx->ins, "Failed to get timestamp used for the S3 key, "
"use the current time instead.");
} else {
create_time = tms->tm.tv_sec;
}

ret = s3_put_object(ctx, tag, create_time, body, body_size);
Expand All @@ -1081,7 +1087,7 @@ static int upload_data(struct flb_s3 *ctx, struct s3_file *chunk,
multipart:

if (init_upload == FLB_TRUE) {
m_upload = create_upload(ctx, tag, tag_len);
m_upload = create_upload(ctx, tag, tag_len, tms);
if (!m_upload) {
flb_plg_error(ctx->ins, "Could not find or create upload for tag %s", tag);
if (chunk) {
Expand Down Expand Up @@ -1509,8 +1515,8 @@ static struct multipart_upload *get_upload(struct flb_s3 *ctx,
return m_upload;
}

static struct multipart_upload *create_upload(struct flb_s3 *ctx,
const char *tag, int tag_len)
static struct multipart_upload *create_upload(struct flb_s3 *ctx, const char *tag,
int tag_len, struct flb_time *tms)
{
int ret;
struct multipart_upload *m_upload = NULL;
Expand Down Expand Up @@ -1540,7 +1546,13 @@ static struct multipart_upload *create_upload(struct flb_s3 *ctx,
m_upload->tag = tmp_sds;
m_upload->upload_state = MULTIPART_UPLOAD_STATE_NOT_CREATED;
m_upload->part_number = 1;
m_upload->init_time = time(NULL);
if (tms == NULL) {
m_upload->init_time = time(NULL);
flb_plg_error(ctx->ins, "Failed to get timestamp used for the S3 key, "
"use the current time instead.");
} else {
m_upload->init_time = tms->tm.tv_sec;
}
mk_list_add(&m_upload->_head, &ctx->uploads);

/* Update file and increment index value right before request */
Expand Down Expand Up @@ -1641,7 +1653,8 @@ static int upload_queue_valid(struct upload_queue *upload_contents, time_t now,
static int send_upload_request(void *out_context, flb_sds_t chunk,
struct s3_file *upload_file,
struct multipart_upload *m_upload_file,
const char *tag, int tag_len)
const char *tag, int tag_len,
struct flb_time *tms)
{
int ret;
char *buffer;
Expand All @@ -1658,19 +1671,22 @@ static int send_upload_request(void *out_context, flb_sds_t chunk,
}

/* Upload to S3 */
ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len);
ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len, tms);
flb_free(buffer);

return ret;
}

static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_t chunk,
int chunk_size, const char *tag, int tag_len)
static int buffer_chunk(void *out_context, struct s3_file *upload_file,
flb_sds_t chunk, int chunk_size,
const char *tag, int tag_len,
struct flb_time *tms)
{
int ret;
struct flb_s3 *ctx = out_context;

ret = s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size);
ret = s3_store_buffer_put(ctx, upload_file, tag,
tag_len, chunk, (size_t) chunk_size, tms);
flb_sds_destroy(chunk);
if (ret < 0) {
flb_plg_warn(ctx->ins, "Could not buffer chunk. Data order preservation "
Expand All @@ -1681,7 +1697,8 @@ static int buffer_chunk(void *out_context, struct s3_file *upload_file, flb_sds_
}

/* Uploads all chunk files in queue synchronously */
static void s3_upload_queue(struct flb_config *config, void *out_context)
static void s3_upload_queue(struct flb_config *config, void *out_context,
struct flb_time *tms)
{
int ret;
int async_flags;
Expand All @@ -1697,7 +1714,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
if (mk_list_size(&ctx->upload_queue) == 0) {
flb_plg_debug(ctx->ins, "No files found in upload_queue. Scanning for timed "
"out chunks");
cb_s3_upload(config, out_context);
cb_s3_upload(config, out_context, tms);
}

/* upload timer must use sync mode */
Expand All @@ -1721,7 +1738,8 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
/* Try to upload file. Return value can be -1, FLB_OK, FLB_ERROR, FLB_RETRY. */
ret = send_upload_request(ctx, NULL, upload_contents->upload_file,
upload_contents->m_upload_file,
upload_contents->tag, upload_contents->tag_len);
upload_contents->tag, upload_contents->tag_len,
tms);
if (ret < 0) {
goto exit;
}
Expand Down Expand Up @@ -1761,7 +1779,7 @@ static void s3_upload_queue(struct flb_config *config, void *out_context)
}
}

static void cb_s3_upload(struct flb_config *config, void *data)
static void cb_s3_upload(struct flb_config *config, void *data, struct flb_time *tms)
{
struct flb_s3 *ctx = data;
struct s3_file *chunk = NULL;
Expand Down Expand Up @@ -1811,7 +1829,7 @@ static void cb_s3_upload(struct flb_config *config, void *data)

/* FYI: if construct_request_buffer() succeedeed, the s3_file is locked */
ret = upload_data(ctx, chunk, m_upload, buffer, buffer_size,
(const char *) fsf->meta_buf, fsf->meta_size);
(const char *) fsf->meta_buf, fsf->meta_size, tms);
flb_free(buffer);
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "Could not send chunk with tag %s",
Expand Down Expand Up @@ -2017,22 +2035,25 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, const char

static void unit_test_flush(void *out_context, struct s3_file *upload_file,
const char *tag, int tag_len, flb_sds_t chunk,
int chunk_size, struct multipart_upload *m_upload_file)
int chunk_size, struct multipart_upload *m_upload_file,
struct flb_time *tms)
{
int ret;
char *buffer;
size_t buffer_size;
struct flb_s3 *ctx = out_context;

s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size);
s3_store_buffer_put(ctx, upload_file, tag, tag_len, chunk, (size_t) chunk_size, tms);
ret = construct_request_buffer(ctx, chunk, upload_file, &buffer, &buffer_size);
if (ret < 0) {
flb_plg_error(ctx->ins, "Could not construct request buffer for %s",
upload_file->file_path);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

ret = upload_data(ctx, upload_file, m_upload_file, buffer, buffer_size, tag, tag_len);
ret = upload_data(ctx, upload_file, m_upload_file,
buffer, buffer_size, tag,
tag_len, tms);
flb_free(buffer);

FLB_OUTPUT_RETURN(ret);
Expand Down Expand Up @@ -2104,10 +2125,29 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
struct s3_file *upload_file = NULL;
struct flb_s3 *ctx = out_context;
struct multipart_upload *m_upload_file = NULL;
msgpack_unpacked result;
msgpack_object *obj;
size_t off = 0;
struct flb_time tms;

/* Cleanup old buffers and initialize upload timer */
flush_init(ctx);

/* unpack msgpack */
msgpack_unpacked_init(&result);

/* Get the first record timestamp */
while (msgpack_unpack_next(&result,
event_chunk->data,
event_chunk->size, &off) == MSGPACK_UNPACK_SUCCESS) {
flb_time_pop_from_msgpack(&tms, &result, &obj);
if (&tms.tm.tv_sec != 0) {
break;
}
}

msgpack_unpacked_destroy(&result);

/* Process chunk */
if (ctx->log_key) {
chunk = flb_pack_msgpack_extract_log_key(ctx,
Expand Down Expand Up @@ -2136,7 +2176,8 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
if (s3_plugin_under_test() == FLB_TRUE) {
unit_test_flush(ctx, upload_file,
event_chunk->tag, flb_sds_len(event_chunk->tag),
chunk, chunk_size, m_upload_file);
chunk, chunk_size,
m_upload_file, &tms);
}

/* Discard upload_file if it has failed to upload MAX_UPLOAD_ERRORS times */
Expand Down Expand Up @@ -2175,7 +2216,8 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
if (ctx->preserve_data_ordering == FLB_TRUE) {
/* Buffer last chunk in file and lock file to prevent further changes */
ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
event_chunk->tag, flb_sds_len(event_chunk->tag));
event_chunk->tag, flb_sds_len(event_chunk->tag),
&tms);
if (ret < 0) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand All @@ -2189,7 +2231,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
}

/* Go through upload queue and return error if something went wrong */
s3_upload_queue(config, ctx);
s3_upload_queue(config, ctx, &tms);
if (ctx->upload_queue_success == FLB_FALSE) {
ctx->upload_queue_success = FLB_TRUE;
FLB_OUTPUT_RETURN(FLB_ERROR);
Expand All @@ -2198,9 +2240,9 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,
}
else {
/* Send upload directly without upload queue */
ret = send_upload_request(ctx, chunk, upload_file, m_upload_file,
event_chunk->tag,
flb_sds_len(event_chunk->tag));
ret = send_upload_request(ctx, chunk, upload_file,
m_upload_file,event_chunk->tag,
flb_sds_len(event_chunk->tag), &tms);
if (ret < 0) {
FLB_OUTPUT_RETURN(FLB_ERROR);
}
Expand All @@ -2210,7 +2252,7 @@ static void cb_s3_flush(struct flb_event_chunk *event_chunk,

/* Buffer current chunk in filesystem and wait for next chunk from engine */
ret = buffer_chunk(ctx, upload_file, chunk, chunk_size,
event_chunk->tag, flb_sds_len(event_chunk->tag));
event_chunk->tag, flb_sds_len(event_chunk->tag), &tms);
if (ret < 0) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand Down
11 changes: 9 additions & 2 deletions plugins/out_s3/s3_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ struct s3_file *s3_store_file_get(struct flb_s3 *ctx, const char *tag,
/* Append data to a new or existing fstore file */
int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
const char *tag, int tag_len,
char *data, size_t bytes)
char *data, size_t bytes,
struct flb_time *tms)
{
int ret;
flb_sds_t name;
Expand Down Expand Up @@ -175,7 +176,13 @@ int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
return -1;
}
s3_file->fsf = fsf;
s3_file->create_time = time(NULL);
if (tms == NULL) {
s3_file->create_time = time(NULL);
flb_plg_error(ctx->ins, "Failed to get timestamp used for the S3 key, "
"use the current time instead.");
} else {
s3_file->create_time = tms->tm.tv_sec;
}

/* Use fstore opaque 'data' reference to keep our context */
fsf->data = s3_file;
Expand Down
3 changes: 2 additions & 1 deletion plugins/out_s3/s3_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ struct s3_file {

int s3_store_buffer_put(struct flb_s3 *ctx, struct s3_file *s3_file,
const char *tag, int tag_len,
char *data, size_t bytes);
char *data, size_t bytes,
struct flb_time *tms);

int s3_store_init(struct flb_s3 *ctx);
int s3_store_exit(struct flb_s3 *ctx);
Expand Down

0 comments on commit 8e8f3e7

Please sign in to comment.