Skip to content

Commit

Permalink
core: Implement parallel importing
Browse files Browse the repository at this point in the history
Depends: ostreedev/ostree#1358

For jigdo ♲📦 in order to get true image speed like libostree has we need to
interleave and parallelize downloading and importing.

The messy part about this is having sync API do the "invoke and wait on various
async tasks" pattern. It's the same thing in `ostree_repo_pull_with_options()`.

Importing is pretty dramatically faster with this, I can only imagine the speed
win if we actually interleaved with downloads. However doing that requires
libdnf/librepo work.
  • Loading branch information
cgwalters committed Dec 1, 2017
1 parent 32678fc commit ab7012f
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 58 deletions.
136 changes: 78 additions & 58 deletions src/libpriv/rpmostree-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,13 @@ struct _RpmOstreeContext {
OstreeSePolicy *sepolicy;
char *passwd_dir;

gboolean async_running;
gboolean async_error_set;
GError **async_error;
DnfState *async_dnfstate;
GPtrArray *pkgs_to_download;
GPtrArray *pkgs_to_import;
guint n_async_pkgs_imported;
GPtrArray *pkgs_to_relabel;

GHashTable *pkgs_to_remove; /* pkgname --> gv_nevra */
Expand Down Expand Up @@ -1374,6 +1379,7 @@ sort_packages (RpmOstreeContext *self,
self->pkgs_to_download = g_ptr_array_new_with_free_func ((GDestroyNotify)g_object_unref);
g_assert (!self->pkgs_to_import);
self->pkgs_to_import = g_ptr_array_new_with_free_func ((GDestroyNotify)g_object_unref);
self->n_async_pkgs_imported = 0;
g_assert (!self->pkgs_to_relabel);
self->pkgs_to_relabel = g_ptr_array_new_with_free_func ((GDestroyNotify)g_object_unref);

Expand Down Expand Up @@ -1821,6 +1827,7 @@ rpmostree_context_set_packages (RpmOstreeContext *self,
{
g_clear_pointer (&self->pkgs_to_download, (GDestroyNotify)g_ptr_array_unref);
g_clear_pointer (&self->pkgs_to_import, (GDestroyNotify)g_ptr_array_unref);
self->n_async_pkgs_imported = 0;
g_clear_pointer (&self->pkgs_to_relabel, (GDestroyNotify)g_ptr_array_unref);
return sort_packages (self, packages, cancellable, error);
}
Expand Down Expand Up @@ -2023,64 +2030,35 @@ rpmostree_context_download (RpmOstreeContext *self,
return TRUE;
}

static gboolean
import_one_package (RpmOstreeContext *self,
DnfContext *dnfctx,
DnfPackage *pkg,
OstreeSePolicy *sepolicy,
GVariant *jigdo_xattr_table,
GVariant *jigdo_xattrs,
GCancellable *cancellable,
GError **error)
static inline void
dnf_state_assert_done (DnfState *dnfstate)
{
glnx_fd_close int fd = -1;
if (!rpmostree_context_consume_package (self, pkg, &fd, error))
return FALSE;

/* Only set SKIP_EXTRANEOUS for packages we know need it, so that
* people doing custom composes don't have files silently discarded.
* (This will also likely need to be configurable).
*/
const char *pkg_name = dnf_package_get_name (pkg);

int flags = 0;
if (g_str_equal (pkg_name, "filesystem") ||
g_str_equal (pkg_name, "rootfiles"))
flags |= RPMOSTREE_IMPORTER_FLAGS_SKIP_EXTRANEOUS;

{ gboolean docs;
g_assert (g_variant_dict_lookup (self->spec->dict, "documentation", "b", &docs));
if (!docs)
flags |= RPMOSTREE_IMPORTER_FLAGS_NODOCS;
}

/* TODO - tweak the unpacker flags for containers */
OstreeRepo *ostreerepo = get_pkgcache_repo (self);
g_autoptr(RpmOstreeImporter) unpacker =
rpmostree_importer_new_take_fd (&fd, ostreerepo, pkg, flags, sepolicy, error);
if (!unpacker)
return FALSE;

if (jigdo_xattrs)
{
g_assert (!sepolicy);
rpmostree_importer_set_jigdo_mode (unpacker, jigdo_xattr_table, jigdo_xattrs);
}

g_autofree char *ostree_commit = NULL;
if (!rpmostree_importer_run (unpacker, &ostree_commit, cancellable, error))
return glnx_prefix_error (error, "Unpacking %s",
dnf_package_get_nevra (pkg));

return TRUE;
g_assert (dnf_state_done (dnfstate, NULL));
}

static inline void
dnf_state_assert_done (DnfState *hifstate)
static void
on_async_import_done (GObject *obj,
GAsyncResult *res,
gpointer user_data)
{
gboolean r;
r = dnf_state_done (hifstate, NULL);
g_assert (r);
RpmOstreeImporter *importer = (RpmOstreeImporter*) obj;
RpmOstreeContext *self = user_data;
g_autofree char *rev =
rpmostree_importer_run_async_finish (importer, res,
self->async_error_set ? NULL : self->async_error);
if (!rev)
{
self->async_running = FALSE;
self->async_error_set = TRUE;
}
else
{
g_assert_cmpint (self->n_async_pkgs_imported, <, self->pkgs_to_import->len);
self->n_async_pkgs_imported++;
dnf_state_assert_done (self->async_dnfstate);
if (self->n_async_pkgs_imported == self->pkgs_to_import->len)
self->async_running = FALSE;
}
}

gboolean
Expand Down Expand Up @@ -2114,6 +2092,9 @@ rpmostree_context_import_jigdo (RpmOstreeContext *self,
G_CALLBACK (on_hifstate_percentage_changed),
"Importing:");

self->async_dnfstate = hifstate;
self->async_running = TRUE;

for (guint i = 0; i < self->pkgs_to_import->len; i++)
{
DnfPackage *pkg = self->pkgs_to_import->pdata[i];
Expand All @@ -2124,13 +2105,52 @@ rpmostree_context_import_jigdo (RpmOstreeContext *self,
if (!jigdo_xattrs)
g_error ("Failed to find jigdo xattrs for %s", dnf_package_get_nevra (pkg));
}
if (!import_one_package (self, dnfctx, pkg, self->sepolicy,
jigdo_xattr_table, jigdo_xattrs,
cancellable, error))

glnx_fd_close int fd = -1;
if (!rpmostree_context_consume_package (self, pkg, &fd, error))
return FALSE;

/* Only set SKIP_EXTRANEOUS for packages we know need it, so that
* people doing custom composes don't have files silently discarded.
* (This will also likely need to be configurable).
*/
const char *pkg_name = dnf_package_get_name (pkg);

int flags = 0;
if (g_str_equal (pkg_name, "filesystem") ||
g_str_equal (pkg_name, "rootfiles"))
flags |= RPMOSTREE_IMPORTER_FLAGS_SKIP_EXTRANEOUS;

{ gboolean docs;
g_assert (g_variant_dict_lookup (self->spec->dict, "documentation", "b", &docs));
if (!docs)
flags |= RPMOSTREE_IMPORTER_FLAGS_NODOCS;
}

/* TODO - tweak the unpacker flags for containers */
OstreeRepo *ostreerepo = get_pkgcache_repo (self);
g_autoptr(RpmOstreeImporter) unpacker =
rpmostree_importer_new_take_fd (&fd, ostreerepo, pkg, flags,
self->sepolicy, error);
if (!unpacker)
return FALSE;
dnf_state_assert_done (hifstate);

if (jigdo_xattrs)
{
g_assert (!self->sepolicy);
rpmostree_importer_set_jigdo_mode (unpacker, jigdo_xattr_table, jigdo_xattrs);
}

rpmostree_importer_run_async (unpacker, cancellable, on_async_import_done, self);
}

/* Wait for all of the imports to complete */
GMainContext *mainctx = g_main_context_get_thread_default ();
while (self->async_running)
g_main_context_iteration (mainctx, TRUE);
if (self->async_error)
return FALSE;

g_signal_handler_disconnect (hifstate, progress_sigid);
rpmostree_output_percent_progress_end ();
}
Expand Down
35 changes: 35 additions & 0 deletions src/libpriv/rpmostree-importer.c
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,41 @@ rpmostree_importer_run (RpmOstreeImporter *self,
return TRUE;
}

static void
import_in_thread (GTask *task,
gpointer source,
gpointer task_data,
GCancellable *cancellable)
{
GError *local_error = NULL;
RpmOstreeImporter *self = source;
g_autofree char *rev = NULL;

if (!rpmostree_importer_run (self, &rev, cancellable, &local_error))
g_task_return_error (task, local_error);
else
g_task_return_pointer (task, g_steal_pointer (&rev), g_free);
}

void
rpmostree_importer_run_async (RpmOstreeImporter *self,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_autoptr(GTask) task = g_task_new (self, cancellable, callback, user_data);
g_task_run_in_thread (task, import_in_thread);
}

char *
rpmostree_importer_run_async_finish (RpmOstreeImporter *self,
GAsyncResult *result,
GError **error)
{
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
return g_task_propagate_pointer ((GTask*)result, error);
}

char *
rpmostree_importer_get_nevra (RpmOstreeImporter *self)
{
Expand Down
11 changes: 11 additions & 0 deletions src/libpriv/rpmostree-importer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ rpmostree_importer_run (RpmOstreeImporter *unpacker,
GCancellable *cancellable,
GError **error);

void
rpmostree_importer_run_async (RpmOstreeImporter *unpacker,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data);

char *
rpmostree_importer_run_async_finish (RpmOstreeImporter *self,
GAsyncResult *res,
GError **error);

char *
rpmostree_importer_get_nevra (RpmOstreeImporter *self);

Expand Down

0 comments on commit ab7012f

Please sign in to comment.