Skip to content

Commit

Permalink
core: sync scheduler resolve multi task user issue
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala authored and leonardo-albertovich committed Apr 13, 2023
1 parent 9efd353 commit ed67f03
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ static int flb_output_task_queue_flush_one(struct flb_task_queue *queue)
queued_task = mk_list_entry_first(&queue->pending, struct flb_task_enqueued, _head);
mk_list_del(&queued_task->_head);
mk_list_add(&queued_task->_head, &queue->in_progress);

/*
* Remove temporary user now that task is out of singleplex queue.
* Flush will add back the user representing queued_task->out_instance if it succeeds.
*/
flb_task_users_dec(queued_task->task, FLB_FALSE);
ret = flb_output_task_flush(queued_task->task,
queued_task->out_instance,
queued_task->config);
Expand Down Expand Up @@ -251,6 +257,16 @@ int flb_output_task_singleplex_enqueue(struct flb_task_queue *queue,
int ret;
int is_empty;

/*
* Add temporary user to preserve task while in singleplex queue.
* Temporary user will be removed when task is removed from queue.
*
* Note: if we fail to increment now, then the task may be prematurely
* deleted if the task's users go to 0 while we are waiting in the
* queue.
*/
flb_task_users_inc(task);

/* Enqueue task */
ret = flb_output_task_queue_enqueue(queue, retry, task, out_ins, config);
if (ret == -1) {
Expand Down

0 comments on commit ed67f03

Please sign in to comment.