From 6fcdcb99f508e06b3c56c2affc044b4ee59b6309 Mon Sep 17 00:00:00 2001 From: Nisha K Date: Fri, 3 Dec 2021 13:54:56 -0800 Subject: [PATCH] classes: Move get_untar_dir to ImageLayer This is work towards #948 OCI images on disk may be represented in different ways. As such the layer tarballs may exist at the root of the working directory rather than in their own path. This change allows alternative directory paths to be used based on the image layout. - Added a new property: image_layout to the ImageLayer class. The image_layout property will contain either "docker" or "oci" with the default being "oci" - Moved get_untar_dir from rootfs.py to be a method in the ImageLayer class. get_untar_dir will now return a directory path to the expected path where the layer's contents are untarred based on the image layout. Removed an unnecessary function in rootfs.py as a result. - Modified the mount_diff_layers to accept untarred directory paths instead of tar files. - Modified the function mount_overlay_fs in multi_layer.py to collect the directory paths instead of the tar file to pass to the mount_diff_layers function. - Added a new get_untar_dir test to the ImageLayer test suite. Signed-off-by: Nisha K --- tern/analyze/default/container/multi_layer.py | 6 +-- tern/classes/image_layer.py | 42 ++++++++++++++++--- tern/utils/rootfs.py | 23 +++------- tests/test_class_image_layer.py | 20 +++++++++ 4 files changed, 65 insertions(+), 26 deletions(-) diff --git a/tern/analyze/default/container/multi_layer.py b/tern/analyze/default/container/multi_layer.py index 10b98726..1f8ca8e7 100644 --- a/tern/analyze/default/container/multi_layer.py +++ b/tern/analyze/default/container/multi_layer.py @@ -30,10 +30,10 @@ def mount_overlay_fs(image_obj, top_layer, driver): '''Given the image object and the top most layer, mount all the layers until the top layer using overlayfs''' - tar_layers = [] + layer_paths = [] for index in range(0, top_layer + 1): - tar_layers.append(image_obj.layers[index].tar_file) - target = rootfs.mount_diff_layers(tar_layers, driver) + layer_paths.append(image_obj.layers[index].get_untar_dir()) + target = rootfs.mount_diff_layers(layer_paths, driver) return target diff --git a/tern/classes/image_layer.py b/tern/classes/image_layer.py index 2108a02c..83fea108 100644 --- a/tern/classes/image_layer.py +++ b/tern/classes/image_layer.py @@ -9,6 +9,7 @@ from tern.classes.file_data import FileData from tern.classes.origins import Origins from tern.utils import rootfs +from tern.utils import constants from tern.utils.general import prop_names @@ -32,6 +33,8 @@ class ImageLayer: layer_index: The index position of the layer in relationship to the other layers in the image. The base OS would be layer 1. created this layer by importing it from another image + image_layout: Indicates the image layout on disk in order to read + the layer filesystem. This is either "oci" or "docker". files_analyzed: whether the files in this layer are analyzed or not analyzed_output: the result of the file analysis files: a list of files included in the image layer @@ -46,6 +49,8 @@ class ImageLayer: remove_package: removes a package from the layer to_dict: returns a dict representation of the instance get_package_names: returns a list of package names + get_untar_dir: returns the path where the contents of the layer are + untarred. gen_fs_hash: calculate the filesystem hash add_file: adds a file to the layer remove_file: given the file path, remove a file object from the @@ -67,6 +72,7 @@ def __init__(self, diff_id, tar_file=None, created_by=None): self.__import_image = None self.__import_str = '' self.__layer_index = '' + self.__image_layout = 'oci' self.__pkg_format = '' self.__os_guess = '' self.__files_analyzed = False @@ -150,7 +156,18 @@ def layer_index(self): @layer_index.setter def layer_index(self, layer_index): - self.__layer_index = layer_index + self.__layer_index = str(layer_index) + + @property + def image_layout(self): + return self.__image_layout + + @image_layout.setter + def image_layout(self, image_layout): + if image_layout in ('oci', 'docker'): + self.__image_layout = image_layout + else: + self.__image_layout = 'oci' @property def pkg_format(self): @@ -297,19 +314,34 @@ def get_package_names(self): pkg_list.append(pkg.name) return pkg_list + def get_untar_dir(self): + """Get the directory where contents of the image layer are untarred""" + # the untar directory is based on the image layout + if self.image_layout == 'docker': + return os.path.join(rootfs.get_working_dir(), + os.path.dirname(self.tar_file), + constants.untar_dir) + # For OCI layouts, the tar file may be at the root of the directory + # So we will return a path one level deeper + return os.path.join( + rootfs.get_working_dir(), self.layer_index, constants.untar_dir) + def gen_fs_hash(self): '''Get the filesystem hash if the image class was created with a tar_file''' - if self.__tar_file: - fs_dir = rootfs.get_untar_dir(self.__tar_file) - tar_file = rootfs.get_layer_tar_path(self.__tar_file) + if self.tar_file: + fs_dir = self.get_untar_dir() + # make directory structure if it doesn't exist + if not os.path.exists(fs_dir): + os.makedirs(fs_dir) + tar_file = os.path.join(rootfs.get_working_dir(), self.tar_file) rootfs.extract_tarfile(tar_file, fs_dir) self.__fs_hash = rootfs.calc_fs_hash(fs_dir) def add_files(self): '''Get all the files present in a layer and store them as a list of FileData objects''' - fs_path = rootfs.get_untar_dir(self.__tar_file) + fs_path = self.get_untar_dir() hash_file = os.path.join(os.path.dirname(fs_path), self.__fs_hash) + '.txt' with open(hash_file, encoding='utf-8') as f: diff --git a/tern/utils/rootfs.py b/tern/utils/rootfs.py index a162d89a..31d91727 100644 --- a/tern/utils/rootfs.py +++ b/tern/utils/rootfs.py @@ -121,17 +121,6 @@ def get_working_dir(): return os.path.join(working_dir, constants.temp_folder) -def get_untar_dir(layer_tarfile): - '''get the directory to untar the layer tar file''' - return os.path.join(get_working_dir(), os.path.dirname( - layer_tarfile), constants.untar_dir) - - -def get_layer_tar_path(layer_tarfile): - '''get the full path of the layer tar file''' - return os.path.join(get_working_dir(), layer_tarfile) - - def set_up(): '''Create required directories''' op_dir = get_working_dir() @@ -204,14 +193,12 @@ def prep_base_layer(base_layer_tar): return target_dir_path -def mount_diff_layers(diff_layers_tar, driver='overlay2'): - '''Using overlayfs, mount all the layer tarballs''' +def mount_diff_layers(diff_layer_paths, driver='overlay2'): + """Given a list of diff filesystem layer paths, mount all the layers + using an overlay driver. Supported drivers are 'fuse' and 'overlay2'""" # make a list of directory paths to give to lowerdir - lower_dir_paths = [] - for layer_tar in diff_layers_tar: - lower_dir_paths.append(get_untar_dir(layer_tar)) - upper_dir = lower_dir_paths.pop() - lower_dir = ':'.join(list(reversed(lower_dir_paths))) + upper_dir = diff_layer_paths.pop() + lower_dir = ':'.join(list(reversed(diff_layer_paths))) merge_dir_path = os.path.join(get_working_dir(), constants.mergedir) workdir_path = os.path.join(get_working_dir(), constants.workdir) args = 'lowerdir=' + lower_dir + ',upperdir=' + upper_dir + \ diff --git a/tests/test_class_image_layer.py b/tests/test_class_image_layer.py index f0b20c94..53071be2 100644 --- a/tests/test_class_image_layer.py +++ b/tests/test_class_image_layer.py @@ -4,10 +4,12 @@ # SPDX-License-Identifier: BSD-2-Clause import unittest +import os from tern.classes.image_layer import ImageLayer from tern.classes.package import Package from tern.classes.file_data import FileData +from tern.utils import rootfs from test_fixtures import TestTemplate1 from test_fixtures import TestTemplate2 @@ -16,6 +18,7 @@ class TestClassImageLayer(unittest.TestCase): def setUp(self): self.layer = ImageLayer('123abc', 'path/to/tar') + rootfs.set_working_dir() def tearDown(self): del self.layer @@ -146,6 +149,23 @@ def testSetExtensionInfo(self): header = self.layer.extension_info.get("header").pop() self.assertEqual(header, "Test Header") + def testGetUntarDir(self): + self.layer.image_layout = "oci" + self.assertEqual(self.layer.image_layout, "oci") + self.layer.image_layout = "docker" + self.assertEqual(self.layer.image_layout, "docker") + self.layer.image_layout = "" + self.assertEqual(self.layer.image_layout, "oci") + self.layer.layer_index = 1 + self.assertEqual(self.layer.layer_index, "1") + expected_path = os.path.join(rootfs.get_working_dir(), + '1/contents') + self.assertEqual(self.layer.get_untar_dir(), expected_path) + self.layer.image_layout = "docker" + expected_path = os.path.join(rootfs.get_working_dir(), + 'path/to/contents') + self.assertEqual(self.layer.get_untar_dir(), expected_path) + if __name__ == '__main__': unittest.main()