Skip to content

Commit

Permalink
fetcher: Drop the libsoup queue
Browse files Browse the repository at this point in the history
Now that we have queuing in the higher level pull logic, we don't
need to do this anymore.

It's tempting to keep it since the code diff is so small (without
completely rewriting things), but dropping it here will make
it easier to see when things go wrong at a higher level.

Note that I kept an assertion.

Closes: ostreedev#654
Approved by: jlebon
  • Loading branch information
cgwalters authored and rh-atomic-bot committed Feb 7, 2017
1 parent c18628e commit f4d1334
Showing 1 changed file with 22 additions and 58 deletions.
80 changes: 22 additions & 58 deletions src/libostree/ostree-fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ typedef struct {
GVariant *extra_headers;
int max_outstanding;

/* Queue for libsoup, see bgo#708591 */
GQueue pending_queue;
/* Our active HTTP requests */
GHashTable *outstanding;

/* Shared across threads; be sure to lock. */
Expand All @@ -77,9 +76,6 @@ typedef struct {

} ThreadClosure;

static void
session_thread_process_pending_queue (ThreadClosure *thread_closure);

typedef struct {
volatile int ref_count;

Expand Down Expand Up @@ -187,18 +183,6 @@ idle_closure_free (IdleClosure *idle_closure)
g_slice_free (IdleClosure, idle_closure);
}

static int
pending_task_compare (gconstpointer a,
gconstpointer b,
gpointer unused)
{
gint priority_a = g_task_get_priority (G_TASK (a));
gint priority_b = g_task_get_priority (G_TASK (b));

return (priority_a == priority_b) ? 0 :
(priority_a < priority_b) ? -1 : 1;
}

static OstreeFetcherPendingURI *
pending_uri_ref (OstreeFetcherPendingURI *pending)
{
Expand Down Expand Up @@ -403,30 +387,23 @@ static void
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);

static void
session_thread_process_pending_queue (ThreadClosure *thread_closure)
start_pending_request (ThreadClosure *thread_closure,
GTask *task)
{

while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
{
GTask *task;
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;

task = g_queue_pop_head (&thread_closure->pending_queue);

pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);
OstreeFetcherPendingURI *pending;
GCancellable *cancellable;

g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
g_assert_cmpint (g_hash_table_size (thread_closure->outstanding), <, thread_closure->max_outstanding);

soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
pending = g_task_get_task_data (task);
cancellable = g_task_get_cancellable (task);

g_object_unref (task);
}
g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
soup_request_send_async (pending->request,
cancellable,
on_request_sent,
g_object_ref (task));
}

static void
Expand Down Expand Up @@ -547,10 +524,7 @@ session_thread_request_uri (ThreadClosure *thread_closure,
pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */

g_queue_insert_sorted (&thread_closure->pending_queue,
g_object_ref (task),
pending_task_compare, NULL);
session_thread_process_pending_queue (thread_closure);
start_pending_request (thread_closure, task);
}
}

Expand Down Expand Up @@ -600,8 +574,6 @@ ostree_fetcher_session_thread (gpointer data)
* unreference all data related to the SoupSession ourself to ensure
* it's freed in the same thread where it was created. */
g_clear_pointer (&closure->outstanding, g_hash_table_unref);
while (!g_queue_is_empty (&closure->pending_queue))
g_object_unref (g_queue_pop_head (&closure->pending_queue));
g_clear_pointer (&closure->session, g_object_unref);

thread_closure_unref (closure);
Expand Down Expand Up @@ -903,11 +875,6 @@ finish_stream (OstreeFetcherPendingURI *pending,

pending->state = OSTREE_FETCHER_STATE_COMPLETE;

/* Now that we've finished downloading, continue with other queued
* requests.
*/
session_thread_process_pending_queue (pending->thread_closure);

if (!pending->is_membuf)
{
if (stbuf.st_size < pending->content_length)
Expand Down Expand Up @@ -935,14 +902,13 @@ on_stream_read (GObject *object,
gpointer user_data);

static void
remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
remove_pending (OstreeFetcherPendingURI *pending)
{
/* Hold a temporary ref to ensure the reference to
* pending->thread_closure is valid.
*/
pending_uri_ref (pending);
g_hash_table_remove (pending->thread_closure->outstanding, pending);
session_thread_process_pending_queue (pending->thread_closure);
pending_uri_unref (pending);
}

Expand Down Expand Up @@ -976,7 +942,7 @@ on_out_splice_complete (GObject *object,
if (local_error)
{
g_task_return_error (task, local_error);
remove_pending_rerun_queue (pending);
remove_pending (pending);
}

g_object_unref (task);
Expand Down Expand Up @@ -1018,7 +984,7 @@ on_stream_read (GObject *object,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
remove_pending_rerun_queue (pending);
remove_pending (pending);
}
else
{
Expand Down Expand Up @@ -1057,7 +1023,7 @@ on_stream_read (GObject *object,
if (local_error)
{
g_task_return_error (task, local_error);
remove_pending_rerun_queue (pending);
remove_pending (pending);
}

g_object_unref (task);
Expand Down Expand Up @@ -1096,7 +1062,7 @@ on_request_sent (GObject *object,
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
remove_pending_rerun_queue (pending);
remove_pending (pending);
goto out;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
Expand All @@ -1110,10 +1076,8 @@ on_request_sent (GObject *object,
goto out;

(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_queue_insert_sorted (&pending->thread_closure->pending_queue,
g_object_ref (task), pending_task_compare,
NULL);
remove_pending_rerun_queue (pending);

start_pending_request (pending->thread_closure, task);
}
else
{
Expand Down Expand Up @@ -1204,7 +1168,7 @@ on_request_sent (GObject *object,
if (pending->request_body)
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_error (task, local_error);
remove_pending_rerun_queue (pending);
remove_pending (pending);
}

g_object_unref (task);
Expand Down

0 comments on commit f4d1334

Please sign in to comment.