Skip to content

Commit

Permalink
pull: Rework delta superblock fetches to be async
Browse files Browse the repository at this point in the history
For the pending libcurl port, the backend is a bit more
sensitive to the main context setup.  The delta superblock
fetch here is a synchronous request in the section that's
supposed to be async.

Now, libsoup definitely supports mixing sync and async requests, but it wasn't
hard to help the libcurl port here by making this one async. Now fetchers are
either sync or async.

Closes: #636
Approved by: jlebon
  • Loading branch information
cgwalters authored and rh-atomic-bot committed Jan 4, 2017
1 parent ced22f6 commit d9f43cd
Showing 1 changed file with 120 additions and 98 deletions.
218 changes: 120 additions & 98 deletions src/libostree/ostree-repo-pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ typedef struct {
gint n_scanned_metadata;

gboolean gpg_verify;
gboolean require_static_deltas;
gboolean gpg_verify_summary;
gboolean has_tombstone_commits;

Expand Down Expand Up @@ -178,6 +179,10 @@ update_progress (gpointer user_data)
if (! pull_data->progress)
return FALSE;

/* In dry run, we only emit progress once metadata is done */
if (pull_data->dry_run && pull_data->n_outstanding_metadata_fetches > 0)
return TRUE;

outstanding_writes = pull_data->n_outstanding_content_write_requests +
pull_data->n_outstanding_metadata_write_requests +
pull_data->n_outstanding_deltapart_write_requests;
Expand Down Expand Up @@ -1416,75 +1421,6 @@ load_remote_repo_config (OtPullData *pull_data,
return ret;
}

static gboolean
request_static_delta_superblock_sync (OtPullData *pull_data,
const char *from_revision,
const char *to_revision,
GVariant **out_delta_superblock,
GCancellable *cancellable,
GError **error)
{
gboolean ret = FALSE;
g_autoptr(GVariant) ret_delta_superblock = NULL;
g_autofree char *delta_name =
_ostree_get_relative_static_delta_superblock_path (from_revision, to_revision);
g_autoptr(GBytes) delta_superblock_data = NULL;

if (!_ostree_fetcher_mirrored_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, FALSE, TRUE,
&delta_superblock_data,
OSTREE_MAX_METADATA_SIZE,
pull_data->cancellable, error))
goto out;

if (delta_superblock_data)
{
{
g_autofree gchar *delta = NULL;
g_autofree guchar *ret_csum = NULL;
guchar *summary_csum;
g_autoptr (GInputStream) summary_is = NULL;

summary_is = g_memory_input_stream_new_from_data (g_bytes_get_data (delta_superblock_data, NULL),
g_bytes_get_size (delta_superblock_data),
NULL);

if (!ot_gio_checksum_stream (summary_is, &ret_csum, cancellable, error))
goto out;

delta = g_strconcat (from_revision ? from_revision : "", from_revision ? "-" : "", to_revision, NULL);
summary_csum = g_hash_table_lookup (pull_data->summary_deltas_checksums, delta);

/* At this point we've GPG verified the data, so in theory
* could trust that they provided the right data, but let's
* make this a hard error.
*/
if (pull_data->gpg_verify_summary && !summary_csum)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"GPG verification enabled, but no summary signatures found (use gpg-verify-summary=false in remote config to disable)");
goto out;
}

if (summary_csum && memcmp (summary_csum, ret_csum, 32))
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Invalid checksum for static delta %s", delta);
goto out;
}
}

ret_delta_superblock = g_variant_ref_sink (g_variant_new_from_bytes ((GVariantType*)OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT,
delta_superblock_data, FALSE));
}

ret = TRUE;
if (out_delta_superblock)
*out_delta_superblock = g_steal_pointer (&ret_delta_superblock);
out:
return ret;
}

static gboolean
process_one_static_delta_fallback (OtPullData *pull_data,
gboolean delta_byteswap,
Expand Down Expand Up @@ -1749,6 +1685,100 @@ process_one_static_delta (OtPullData *pull_data,
return ret;
}

typedef struct {
OtPullData *pull_data;
char *from_revision;
char *to_revision;
} FetchDeltaSuperData;

static void
on_superblock_fetched (GObject *src,
GAsyncResult *res,
gpointer data)

{
FetchDeltaSuperData *fdata = data;
OtPullData *pull_data = fdata->pull_data;
GError *local_error = NULL;
GError **error = &local_error;
g_autoptr(GBytes) delta_superblock_data = NULL;
const char *from_revision = fdata->from_revision;
const char *to_revision = fdata->to_revision;

if (!_ostree_fetcher_request_to_membuf_finish ((OstreeFetcher*)src,
res,
&delta_superblock_data,
error))
{
if (!g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND))
goto out;

g_clear_error (&local_error);

if (pull_data->require_static_deltas)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Static deltas required, but none found for %s to %s",
from_revision, to_revision);
goto out;
}

queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0);
}
else
{
g_autofree gchar *delta = NULL;
g_autofree guchar *ret_csum = NULL;
guchar *summary_csum;
g_autoptr (GInputStream) summary_is = NULL;
g_autoptr(GVariant) delta_superblock = NULL;

summary_is = g_memory_input_stream_new_from_data (g_bytes_get_data (delta_superblock_data, NULL),
g_bytes_get_size (delta_superblock_data),
NULL);

if (!ot_gio_checksum_stream (summary_is, &ret_csum, pull_data->cancellable, error))
goto out;

delta = g_strconcat (from_revision ? from_revision : "", from_revision ? "-" : "", to_revision, NULL);
summary_csum = g_hash_table_lookup (pull_data->summary_deltas_checksums, delta);

/* At this point we've GPG verified the data, so in theory
* could trust that they provided the right data, but let's
* make this a hard error.
*/
if (pull_data->gpg_verify_summary && !summary_csum)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"GPG verification enabled, but no summary signatures found (use gpg-verify-summary=false in remote config to disable)");
goto out;
}

if (summary_csum && memcmp (summary_csum, ret_csum, 32))
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "Invalid checksum for static delta %s", delta);
goto out;
}

delta_superblock = g_variant_ref_sink (g_variant_new_from_bytes ((GVariantType*)OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT,
delta_superblock_data, FALSE));

g_ptr_array_add (pull_data->static_delta_superblocks, g_variant_ref (delta_superblock));
if (!process_one_static_delta (pull_data, from_revision, to_revision, delta_superblock,
pull_data->cancellable, error))
goto out;
}

out:
g_free (fdata->from_revision);
g_free (fdata->to_revision);
g_free (fdata);
g_assert (pull_data->n_outstanding_metadata_fetches > 0);
pull_data->n_outstanding_metadata_fetches--;
pull_data->n_fetched_metadata++;
check_outstanding_requests_handle_error (pull_data, local_error);
}

static gboolean
validate_variant_is_csum (GVariant *csum,
GError **error)
Expand Down Expand Up @@ -2344,7 +2374,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_autofree char **override_commit_ids = NULL;
GSource *update_timeout = NULL;
gboolean disable_static_deltas = FALSE;
gboolean require_static_deltas = FALSE;
gboolean opt_gpg_verify_set = FALSE;
gboolean opt_gpg_verify_summary_set = FALSE;
const char *url_override = NULL;
Expand All @@ -2368,7 +2397,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_variant_lookup (options, "gpg-verify-summary", "b", &pull_data->gpg_verify_summary);
(void) g_variant_lookup (options, "depth", "i", &pull_data->maxdepth);
(void) g_variant_lookup (options, "disable-static-deltas", "b", &disable_static_deltas);
(void) g_variant_lookup (options, "require-static-deltas", "b", &require_static_deltas);
(void) g_variant_lookup (options, "require-static-deltas", "b", &pull_data->require_static_deltas);
(void) g_variant_lookup (options, "override-commit-ids", "^a&s", &override_commit_ids);
(void) g_variant_lookup (options, "dry-run", "b", &pull_data->dry_run);
(void) g_variant_lookup (options, "override-url", "&s", &url_override);
Expand All @@ -2386,11 +2415,11 @@ ostree_repo_pull_with_options (OstreeRepo *self,
for (i = 0; dirs_to_pull != NULL && dirs_to_pull[i] != NULL; i++)
g_return_val_if_fail (dirs_to_pull[i][0] == '/', FALSE);

g_return_val_if_fail (!(disable_static_deltas && require_static_deltas), FALSE);
g_return_val_if_fail (!(disable_static_deltas && pull_data->require_static_deltas), FALSE);
/* We only do dry runs with static deltas, because we don't really have any
* in-advance information for bare fetches.
*/
g_return_val_if_fail (!pull_data->dry_run || require_static_deltas, FALSE);
g_return_val_if_fail (!pull_data->dry_run || pull_data->require_static_deltas, FALSE);

pull_data->is_mirror = (flags & OSTREE_REPO_PULL_FLAGS_MIRROR) > 0;
pull_data->is_commit_only = (flags & OSTREE_REPO_PULL_FLAGS_COMMIT_ONLY) > 0;
Expand Down Expand Up @@ -2655,7 +2684,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
/* For local pulls, default to disabling static deltas so that the
* exact object files are copied.
*/
if (pull_data->remote_repo_local && !require_static_deltas)
if (pull_data->remote_repo_local && !pull_data->require_static_deltas)
disable_static_deltas = TRUE;

pull_data->static_delta_superblocks = g_ptr_array_new_with_free_func ((GDestroyNotify)g_variant_unref);
Expand Down Expand Up @@ -2710,7 +2739,7 @@ ostree_repo_pull_with_options (OstreeRepo *self,
goto out;
}

if (!bytes_summary && require_static_deltas)
if (!bytes_summary && pull_data->require_static_deltas)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Fetch configured to require static deltas, but no summary found");
Expand Down Expand Up @@ -2939,7 +2968,6 @@ ostree_repo_pull_with_options (OstreeRepo *self,
g_autofree char *from_revision = NULL;
const char *ref = key;
const char *to_revision = value;
g_autoptr(GVariant) delta_superblock = NULL;

if (!ostree_repo_resolve_rev (pull_data->repo, ref, TRUE,
&from_revision, error))
Expand All @@ -2948,31 +2976,25 @@ ostree_repo_pull_with_options (OstreeRepo *self,
if (!(disable_static_deltas || mirroring_into_archive || pull_data->is_commit_only) &&
(from_revision == NULL || g_strcmp0 (from_revision, to_revision) != 0))
{
if (!request_static_delta_superblock_sync (pull_data, from_revision, to_revision,
&delta_superblock, cancellable, error))
goto out;
}

if (!delta_superblock)
{
if (require_static_deltas)
{
g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
"Static deltas required, but none found for %s to %s",
from_revision, to_revision);
goto out;
}
g_debug ("no delta superblock for %s-%s", from_revision ? from_revision : "empty", to_revision);
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0);
g_autofree char *delta_name =
_ostree_get_relative_static_delta_superblock_path (from_revision, to_revision);
FetchDeltaSuperData *fdata = g_new0(FetchDeltaSuperData, 1);
fdata->pull_data = pull_data;
fdata->from_revision = g_strdup (from_revision);
fdata->to_revision = g_strdup (to_revision);

_ostree_fetcher_request_to_membuf (pull_data->fetcher,
pull_data->content_mirrorlist,
delta_name, 0,
OSTREE_MAX_METADATA_SIZE,
0, pull_data->cancellable,
on_superblock_fetched, fdata);
pull_data->n_outstanding_metadata_fetches++;
pull_data->n_requested_metadata++;
}
else
{
g_debug ("processing delta superblock for %s-%s", from_revision ? from_revision : "empty", to_revision);
g_ptr_array_add (pull_data->static_delta_superblocks, g_variant_ref (delta_superblock));
if (!process_one_static_delta (pull_data, from_revision, to_revision,
delta_superblock,
cancellable, error))
goto out;
queue_scan_one_metadata_object (pull_data, to_revision, OSTREE_OBJECT_TYPE_COMMIT, NULL, 0);
}
}

Expand Down

0 comments on commit d9f43cd

Please sign in to comment.