Skip to content

Commit

Permalink
core: Implement parallel relabeling
Browse files Browse the repository at this point in the history
This is another big task just like importing that greatly benefits
from being parallel.  While here I hit the issue that on error
we didn't wait for pending async tasks to complete; I changed things
for importing so that we do that, and used it here too.

This was almost straightforward except I spent a *lot* of time
debugging what turned out to be calling `dnf_package_get_nevra()`
in the worker threads 😢.

I'm mostly writing this to speed up unified core/jigdo, but it's also obviously
relevant on the client side.
  • Loading branch information
cgwalters committed Dec 8, 2017
1 parent 752166c commit 2586e39
Showing 1 changed file with 181 additions and 74 deletions.
255 changes: 181 additions & 74 deletions src/libpriv/rpmostree-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,14 @@ struct _RpmOstreeContext {
char *passwd_dir;

gboolean async_running;
GCancellable *async_cancellable;
GError *async_error;
DnfState *async_dnfstate;
GPtrArray *pkgs_to_download;
GPtrArray *pkgs_to_import;
guint n_async_pkgs_imported;
GPtrArray *pkgs_to_relabel;
guint n_async_pkgs_relabeled;

GHashTable *pkgs_to_remove; /* pkgname --> gv_nevra */
GHashTable *pkgs_to_replace; /* new gv_nevra --> old gv_nevra */
Expand Down Expand Up @@ -1381,6 +1383,7 @@ sort_packages (RpmOstreeContext *self,
self->n_async_pkgs_imported = 0;
g_assert (!self->pkgs_to_relabel);
self->pkgs_to_relabel = g_ptr_array_new_with_free_func ((GDestroyNotify)g_object_unref);
self->n_async_pkgs_relabeled = 0;

GPtrArray *sources = dnf_context_get_repos (dnfctx);
for (guint i = 0; i < packages->len; i++)
Expand Down Expand Up @@ -1828,6 +1831,7 @@ rpmostree_context_set_packages (RpmOstreeContext *self,
g_clear_pointer (&self->pkgs_to_import, (GDestroyNotify)g_ptr_array_unref);
self->n_async_pkgs_imported = 0;
g_clear_pointer (&self->pkgs_to_relabel, (GDestroyNotify)g_ptr_array_unref);
self->n_async_pkgs_relabeled = 0;
return sort_packages (self, packages, cancellable, error);
}

Expand Down Expand Up @@ -2047,17 +2051,15 @@ on_async_import_done (GObject *obj,
self->async_error ? NULL : &self->async_error);
if (!rev)
{
self->async_running = FALSE;
g_cancellable_cancel (self->async_cancellable);
g_assert (self->async_error != NULL);
}
else
{
g_assert_cmpint (self->n_async_pkgs_imported, <, self->pkgs_to_import->len);
self->n_async_pkgs_imported++;
dnf_state_assert_done (self->async_dnfstate);
if (self->n_async_pkgs_imported == self->pkgs_to_import->len)
self->async_running = FALSE;
}

g_assert_cmpint (self->n_async_pkgs_imported, <, self->pkgs_to_import->len);
self->n_async_pkgs_imported++;
dnf_state_assert_done (self->async_dnfstate);
if (self->n_async_pkgs_imported == self->pkgs_to_import->len)
self->async_running = FALSE;
}

gboolean
Expand Down Expand Up @@ -2093,6 +2095,7 @@ rpmostree_context_import_jigdo (RpmOstreeContext *self,

self->async_dnfstate = hifstate;
self->async_running = TRUE;
self->async_cancellable = g_cancellable_new ();

for (guint i = 0; i < self->pkgs_to_import->len; i++)
{
Expand Down Expand Up @@ -2148,6 +2151,7 @@ rpmostree_context_import_jigdo (RpmOstreeContext *self,
self->async_error = NULL;
while (self->async_running)
g_main_context_iteration (mainctx, TRUE);
g_clear_object (&self->async_cancellable);
if (self->async_error)
{
g_propagate_error (error, g_steal_pointer (&self->async_error));
Expand Down Expand Up @@ -2394,25 +2398,39 @@ break_hardlinks_at (int dfd,
return TRUE;
}

typedef struct {
int tmpdir_dfd;
const char *name;
const char *evr;
const char *arch;
} RelabelTaskData;

static gboolean
relabel_one_package (RpmOstreeContext *self,
int tmpdir_dfd,
OstreeRepo *repo,
DnfPackage *pkg,
OstreeSePolicy *sepolicy,
gboolean *changed,
GCancellable *cancellable,
GError **error)
{
g_autofree char *nevra = g_strdup (dnf_package_get_nevra (pkg));
relabel_in_thread_impl (RpmOstreeContext *self,
const char *name,
const char *evr,
const char *arch,
int tmpdir_dfd,
gboolean *out_changed,
GCancellable *cancellable,
GError **error)
{
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;

const char *nevra = glnx_strjoina (name, "-", evr, ".", arch);
const char *errmsg = glnx_strjoina ("Relabeling ", nevra);
GLNX_AUTO_PREFIX_ERROR (errmsg, error);
const char *pkg_dirname = nevra;
g_autofree char *cachebranch = rpmostree_get_cache_branch_pkg (pkg);

OstreeRepo *repo = get_pkgcache_repo (self);
g_autofree char *cachebranch = rpmostree_get_cache_branch_for_n_evr_a (name, evr, arch);
g_autofree char *commit_csum = NULL;
if (!ostree_repo_resolve_rev (repo, cachebranch, FALSE,
&commit_csum, error))
return FALSE;

/* Compute the original content checksum */
g_autoptr(GVariant) orig_commit = NULL;
if (!ostree_repo_load_commit (repo, commit_csum, &orig_commit, NULL, error))
return FALSE;
Expand All @@ -2427,71 +2445,146 @@ relabel_one_package (RpmOstreeContext *self,
return FALSE;

/* write to the tree */
g_autoptr(OstreeRepoCommitModifier) modifier =
ostree_repo_commit_modifier_new (OSTREE_REPO_COMMIT_MODIFIER_FLAGS_CONSUME,
NULL, NULL, NULL);
ostree_repo_commit_modifier_set_devino_cache (modifier, cache);
ostree_repo_commit_modifier_set_sepolicy (modifier, self->sepolicy);

g_autoptr(OstreeMutableTree) mtree = ostree_mutable_tree_new ();
if (!ostree_repo_write_dfd_to_mtree (repo, tmpdir_dfd, pkg_dirname, mtree,
modifier, cancellable, error))
return glnx_prefix_error (error, "Writing dfd");

g_autoptr(GFile) root = NULL;
{
glnx_unref_object OstreeMutableTree *mtree = ostree_mutable_tree_new ();
if (!ostree_repo_write_mtree (repo, mtree, &root, cancellable, error))
return FALSE;

g_autoptr(OstreeRepoCommitModifier) modifier =
ostree_repo_commit_modifier_new (OSTREE_REPO_COMMIT_MODIFIER_FLAGS_CONSUME,
NULL, NULL, NULL);
/* build metadata and commit */
g_autoptr(GVariant) commit_var = NULL;
g_autoptr(GVariantDict) meta_dict = NULL;

ostree_repo_commit_modifier_set_devino_cache (modifier, cache);
ostree_repo_commit_modifier_set_sepolicy (modifier, self->sepolicy);
if (!ostree_repo_load_commit (repo, commit_csum, &commit_var, NULL, error))
return FALSE;

if (!ostree_repo_write_dfd_to_mtree (repo, tmpdir_dfd, pkg_dirname, mtree,
modifier, cancellable, error))
return FALSE;
/* let's just copy the metadata from the previous commit and only change the
* rpmostree.sepolicy value */
g_autoptr(GVariant) meta = g_variant_get_child_value (commit_var, 0);
meta_dict = g_variant_dict_new (meta);

if (!ostree_repo_write_mtree (repo, mtree, &root, cancellable, error))
return FALSE;
}
g_variant_dict_insert (meta_dict, "rpmostree.sepolicy", "s",
ostree_sepolicy_get_csum (self->sepolicy));

/* build metadata and commit */
{
g_autoptr(GVariant) commit_var = NULL;
g_autoptr(GVariantDict) meta_dict = NULL;
g_autofree char *new_commit_csum = NULL;
if (!ostree_repo_write_commit (repo, NULL, "", "",
g_variant_dict_end (meta_dict),
OSTREE_REPO_FILE (root), &new_commit_csum,
cancellable, error))
return FALSE;

if (!ostree_repo_load_commit (repo, commit_csum, &commit_var, NULL, error))
return FALSE;
/* Compute new content checksum */
g_autoptr(GVariant) new_commit = NULL;
if (!ostree_repo_load_commit (repo, new_commit_csum, &new_commit, NULL, error))
return FALSE;
g_autofree char *new_content_checksum = rpmostree_commit_content_checksum (new_commit);

/* let's just copy the metadata from the previous commit and only change the
* rpmostree.sepolicy value */
{
g_autoptr(GVariant) meta = g_variant_get_child_value (commit_var, 0);
meta_dict = g_variant_dict_new (meta);
/* Queue an update to the ref */
ostree_repo_transaction_set_ref (repo, NULL, cachebranch, new_commit_csum);

g_variant_dict_insert (meta_dict, "rpmostree.sepolicy", "s",
ostree_sepolicy_get_csum (sepolicy));
}
/* Return whether or not we actually changed content */
*out_changed = !g_str_equal (orig_content_checksum, new_content_checksum);

{
g_autofree char *new_commit_csum = NULL;
if (!ostree_repo_write_commit (repo, NULL, "", "",
g_variant_dict_end (meta_dict),
OSTREE_REPO_FILE (root), &new_commit_csum,
cancellable, error))
return FALSE;
return TRUE;
}

g_autoptr(GVariant) new_commit = NULL;
if (!ostree_repo_load_commit (repo, new_commit_csum, &new_commit, NULL, error))
return FALSE;
g_autofree char *new_content_checksum = rpmostree_commit_content_checksum (new_commit);
static void
relabel_in_thread (GTask *task,
gpointer source,
gpointer task_data,
GCancellable *cancellable)
{
g_autoptr(GError) local_error = NULL;
RpmOstreeContext *self = source;
RelabelTaskData *tdata = task_data;

gboolean changed;
if (!relabel_in_thread_impl (self, tdata->name, tdata->evr, tdata->arch,
tdata->tmpdir_dfd, &changed,
cancellable, &local_error))
g_task_return_error (task, g_steal_pointer (&local_error));
else
g_task_return_int (task, changed ? 1 : 0);
}

*changed = !g_str_equal (orig_content_checksum, new_content_checksum);
static void
relabel_package_async (RpmOstreeContext *self,
DnfPackage *pkg,
int tmpdir_dfd,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
g_autoptr(GTask) task = g_task_new (self, cancellable, callback, user_data);
RelabelTaskData *tdata = g_new (RelabelTaskData, 1);
/* We can assume lifetime is greater than the task */
tdata->tmpdir_dfd = tmpdir_dfd;
tdata->name = dnf_package_get_name (pkg);
tdata->evr = dnf_package_get_evr (pkg);
tdata->arch = dnf_package_get_arch (pkg);
g_task_set_task_data (task, tdata, g_free);
g_task_run_in_thread (task, relabel_in_thread);
}

static gssize
relabel_package_async_finish (RpmOstreeContext *self,
GAsyncResult *result,
GError **error)
{
g_return_val_if_fail (g_task_is_valid (result, self), FALSE);
return g_task_propagate_int ((GTask*)result, error);
}

typedef struct {
RpmOstreeContext *self;
guint n_changed_files;
guint n_changed_pkgs;
} RpmOstreeAsyncRelabelData;

ostree_repo_transaction_set_ref (repo, NULL, cachebranch,
new_commit_csum);
static void
on_async_relabel_done (GObject *obj,
GAsyncResult *res,
gpointer user_data)
{
RpmOstreeAsyncRelabelData *data = user_data;
RpmOstreeContext *self = data->self;
gssize n_relabeled =
relabel_package_async_finish (self, res, self->async_error ? NULL : &self->async_error);
if (n_relabeled < 0)
{
g_assert (self->async_error != NULL);
g_cancellable_cancel (self->async_cancellable);
}
}

return TRUE;
g_assert_cmpint (self->n_async_pkgs_relabeled, <, self->pkgs_to_relabel->len);
self->n_async_pkgs_relabeled++;
if (n_relabeled > 0)
{
data->n_changed_files += n_relabeled;
data->n_changed_pkgs++;
}
dnf_state_assert_done (self->async_dnfstate);
if (self->n_async_pkgs_relabeled == self->pkgs_to_relabel->len)
self->async_running = FALSE;
}

gboolean
rpmostree_context_relabel (RpmOstreeContext *self,
GCancellable *cancellable,
GError **error)
{
if (!self->pkgs_to_relabel)
return TRUE;

const int n = self->pkgs_to_relabel->len;
OstreeRepo *ostreerepo = get_pkgcache_repo (self);

Expand Down Expand Up @@ -2520,20 +2613,31 @@ rpmostree_context_relabel (RpmOstreeContext *self,
&relabel_tmpdir, error))
return FALSE;

guint n_changed_pkgs = 0;
self->async_dnfstate = hifstate;
self->async_running = TRUE;
self->async_cancellable = g_cancellable_new ();

RpmOstreeAsyncRelabelData data = { self, 0, };
const guint n_to_relabel = self->pkgs_to_relabel->len;
for (guint i = 0; i < n_to_relabel; i++)
{
DnfPackage *pkg = self->pkgs_to_relabel->pdata[i];
gboolean pkg_changed = 0;
if (!relabel_one_package (self, relabel_tmpdir.fd, ostreerepo,
pkg, self->sepolicy,
&pkg_changed, cancellable, error))
return FALSE;
if (pkg_changed)
n_changed_pkgs++;
dnf_state_assert_done (hifstate);
relabel_package_async (self, pkg, relabel_tmpdir.fd, cancellable,
on_async_relabel_done, &data);
}

/* Wait for all of the relabeling to complete */
GMainContext *mainctx = g_main_context_get_thread_default ();
self->async_error = NULL;
while (self->async_running)
g_main_context_iteration (mainctx, TRUE);
if (self->async_error)
{
g_propagate_error (error, g_steal_pointer (&self->async_error));
return FALSE;
}
g_clear_object (&self->async_cancellable);
self->async_dnfstate = NULL;

/* Commit */
if (!ostree_repo_commit_transaction (ostreerepo, NULL, cancellable, error))
Expand All @@ -2543,10 +2647,13 @@ rpmostree_context_relabel (RpmOstreeContext *self,
rpmostree_output_percent_progress_end ();

sd_journal_send ("MESSAGE_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(RPMOSTREE_MESSAGE_SELINUX_RELABEL),
"MESSAGE=Relabeled %u/%u pkgs", n_changed_pkgs, n_to_relabel,
"RELABELED_PKGS=%u/%u", n_changed_pkgs, n_to_relabel,
"MESSAGE=Relabeled %u/%u pkgs", data.n_changed_pkgs, n_to_relabel,
"RELABELED_PKGS=%u/%u", data.n_changed_pkgs, n_to_relabel,
NULL);

g_clear_pointer (&self->pkgs_to_relabel, (GDestroyNotify)g_ptr_array_unref);
self->n_async_pkgs_relabeled = 0;

return TRUE;
}

Expand Down

0 comments on commit 2586e39

Please sign in to comment.