forked from tern-tools/tern
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
classes: Add OCIImage class and use image_layout
This is work towards tern-tools#948 This change adds a new class - OCIImage which represents the OCI image schema version 2 available on disk when downloaded by skopeo. This change also adds a skopeo.py script to the list of loading methods. Skopeo.py adds functions to check if the skopeo binary exists on disk and a function to pull images using skopeo. A test suite for the OCIImage class is also added. This test suit is similar to the test_class_docker_image.py test suit. Additionally, we have changed the DockerImage class and associated tests to set the layer's image_layout property to "docker". We've switched the test for changes to the oci class to use a container image and the changes to the docker class to use the dockerfile, as we need the docker daemon to test Dockerfile builds. Signed-off-by: Mukul Taneja <[email protected]> Signed-off-by: Nisha K <[email protected]>
- Loading branch information
Nisha K
committed
Dec 15, 2021
1 parent
6fcdcb9
commit e340f66
Showing
6 changed files
with
293 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright (c) 2021 VMware, Inc. All Rights Reserved. | ||
# SPDX-License-Identifier: BSD-2-Clause | ||
|
||
import json | ||
import subprocess # nosec | ||
|
||
from tern.utils import rootfs | ||
from tern.utils import general | ||
from tern.classes.image import Image | ||
from tern.utils.constants import manifest_file | ||
from tern.classes.image_layer import ImageLayer | ||
|
||
|
||
class OCIImage(Image): | ||
"""A representation of an OCI compatible image that exists on disk""" | ||
def __init__(self, repotag=None): | ||
super().__init__(repotag) | ||
# In case the OCI image corresponds with an image built by Docker | ||
# we also include the history | ||
self.__history = None | ||
if self.repotag is None: | ||
raise NameError("Image object initialized with no repotag") | ||
|
||
# parse the repotag | ||
repo_dict = general.parse_image_string(self._repotag) | ||
self._name = repo_dict.get('name') | ||
self._tag = repo_dict.get('tag') | ||
self.set_checksum( | ||
repo_dict.get('digest_type'), repo_dict.get('digest')) | ||
|
||
@property | ||
def history(self): | ||
return self.__history | ||
|
||
def to_dict(self, template=None): | ||
# this should take care of 'origins' and 'layers' | ||
oci_dict = super().to_dict(template) | ||
return oci_dict | ||
|
||
def get_image_manifest(self): | ||
temp_path = rootfs.get_working_dir() | ||
with general.pushd(temp_path): | ||
with open(manifest_file, encoding='utf-8') as f: | ||
json_obj = json.loads(f.read()) | ||
return json_obj | ||
|
||
def get_image_layers(self, manifest): | ||
layers = [] | ||
for layer in manifest.get('layers'): | ||
layers.append(layer.get("digest").split(":")[1]) | ||
return layers | ||
|
||
def get_image_config_file(self, manifest): | ||
return manifest.get('config').get("digest").split(":")[1] | ||
|
||
def get_image_config(self, manifest): | ||
config_file = self.get_image_config_file(manifest) | ||
temp_path = rootfs.get_working_dir() | ||
with general.pushd(temp_path): | ||
with open(config_file, encoding='utf-8') as f: | ||
json_obj = json.loads(f.read()) | ||
return json_obj | ||
|
||
def get_image_history(self, config): | ||
if 'history' in config.keys(): | ||
return config['history'] | ||
return None | ||
|
||
def get_diff_ids(self, config): | ||
diff_ids = [] | ||
for item in config['rootfs']['diff_ids']: | ||
diff_ids.append(item.split(':').pop()) | ||
return diff_ids | ||
|
||
def get_diff_checksum_type(self, config): | ||
'''Get the checksum type that was used to calculate the diff_id | ||
of the image''' | ||
return config['rootfs']['diff_ids'][0].split(':')[0] | ||
|
||
def set_layer_created_by(self): | ||
# the history is ordered according to the order of the layers | ||
# so the first non-empty history corresponds with the first layer | ||
index = 0 | ||
for item in self.__history: | ||
if 'empty_layer' not in item.keys(): | ||
if 'created_by' in item.keys(): | ||
self._layers[index].created_by = item['created_by'] | ||
else: | ||
self._layers[index].created_by = '' | ||
index = index + 1 | ||
|
||
def load_image(self, load_until_layer=0): | ||
if load_until_layer > 0: | ||
self._load_until_layer = load_until_layer | ||
try: | ||
self._manifest = self.get_image_manifest() | ||
self._config = self.get_image_config(self._manifest) | ||
self.__history = self.get_image_history(self._config) | ||
layer_paths = self.get_image_layers(self._manifest) | ||
layer_diffs = self.get_diff_ids(self._config) | ||
# if the digest isn't in the repotag, get it from the config | ||
if not self.checksum: | ||
repo_dict = general.parse_image_string( | ||
self._config.get("config").get("Image")) | ||
self.set_checksum(repo_dict.get("digest_type"), | ||
repo_dict.get("digest")) | ||
layer_checksum_type = self.get_diff_checksum_type(self._config) | ||
layer_count = 1 | ||
while layer_diffs and layer_paths: | ||
layer = ImageLayer(layer_diffs.pop(0), layer_paths.pop(0)) | ||
if (self.load_until_layer >= layer_count | ||
or self.load_until_layer == 0): | ||
layer.set_checksum(layer_checksum_type, layer.diff_id) | ||
layer.image_layout = "oci" | ||
# take care to set the layer index as it will be used | ||
# to create the directory where the layer contents will | ||
# be untarred | ||
layer.layer_index = layer_count | ||
layer.gen_fs_hash() | ||
self._layers.append(layer) | ||
layer_count = layer_count + 1 | ||
self._total_layers = layer_count - 1 | ||
if self.load_until_layer > self.total_layers: | ||
# if user asked to analyze more layers than image has | ||
# turn off the load_until_layer feature | ||
self._load_until_layer = 0 | ||
self.set_layer_created_by() | ||
except NameError as e: | ||
raise NameError(e) from e | ||
except subprocess.CalledProcessError as e: | ||
raise subprocess.CalledProcessError( | ||
e.returncode, cmd=e.cmd, output=e.output, stderr=e.stderr) | ||
except IOError as e: | ||
raise IOError(e) from e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright (c) 2021 VMware, Inc. All Rights Reserved. | ||
# SPDX-License-Identifier: BSD-2-Clause | ||
|
||
""" | ||
Interactions with remote container images using skopeo | ||
""" | ||
|
||
import logging | ||
import sys | ||
import shutil | ||
|
||
from tern.utils import constants | ||
from tern.utils import rootfs | ||
|
||
# global logger | ||
logger = logging.getLogger(constants.logger_name) | ||
|
||
|
||
def check_skopeo_setup(): | ||
"""Check if the skopeo tool is installed""" | ||
if not shutil.which('skopeo'): | ||
logger.critical('Skopeo is not installed') | ||
logger.critical('Exiting...') | ||
sys.exit(1) | ||
|
||
|
||
def pull_image(image_tag_string): | ||
"""Use skopeo to pull a remote image into the working directory""" | ||
# Check if skopeo is set up | ||
check_skopeo_setup() | ||
# we will assume the docker transport for now | ||
remote = f'docker://{image_tag_string}' | ||
local = f'dir:{rootfs.get_working_dir()}' | ||
logger.debug("Attempting to pull image \"%s\"", image_tag_string) | ||
result, error = rootfs.shell_command( | ||
False, ['skopeo', 'copy', remote, local]) | ||
if error: | ||
logger.error("Error when downloading image: \"%s\"", error) | ||
return None | ||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright (c) 2021 VMware, Inc. All Rights Reserved. | ||
# SPDX-License-Identifier: BSD-2-Clause | ||
|
||
import unittest | ||
|
||
from tern.load import skopeo | ||
from tern.classes.oci_image import OCIImage | ||
from tern.utils import rootfs | ||
from test_fixtures import create_working_dir | ||
from test_fixtures import remove_working_dir | ||
|
||
|
||
class TestClassOCIImage(unittest.TestCase): | ||
|
||
def setUp(self): | ||
'''Using a specific image here. If this test fails due to the image | ||
not being found anymore, pick a different image to test against | ||
For now use Docker to pull the image from Dockerhub''' | ||
create_working_dir() | ||
rootfs.set_working_dir() | ||
# this should check if the docker image extraction is successful | ||
skopeo.pull_image('vmware/tern@sha256:20b32a9a20752aa1ad7582c667704f' | ||
'da9f004cc4bfd8601fac7f2656c7567bb4') | ||
self.image = OCIImage('vmware/tern@sha256:20b32a9a20752aa1ad7582c6' | ||
'67704fda9f004cc4bfd8601fac7f2656c7567bb4') | ||
# constants for this image | ||
self.layer = ('c1c3a87012e7ff5791b31e94515b661' | ||
'cdf06f6d5dc2f9a6245eda8774d257a13') | ||
self.no_layers = 1 | ||
self.created_by = ('/bin/sh -c #(nop) ADD ' | ||
'file:92137e724f46c720d8083a11290c67' | ||
'd9daa387e523336b1757a0e3c4f5867cd5 ' | ||
'in / ') | ||
self.file_info = [ | ||
('file2.txt', 'documents/test/file2.txt', | ||
'9710f003d924890c7677b4dd91fd753f6ed71cc57d4f' | ||
'9482261b6786d81957fa', | ||
'sha256'), | ||
('file2.txt', 'documents/test/test2/file2.txt', | ||
'885000512dee8ac814641bbf6a7c887012ec23a2fb3e' | ||
'3b2cff583c45a611317d', | ||
'sha256'), | ||
('file1.txt', 'documents/test/test2/file1.txt', | ||
'885000512dee8ac814641bbf6a7c887012ec' | ||
'23a2fb3e3b2cff583c45a611317d', | ||
'sha256'), | ||
('file1.txt', 'documents/test/file1.txt', | ||
'a3cccbc52486d50a86ff0bc1e6ea0e0b701ac' | ||
'4bb139f8713fa136ef9ec68e97e', | ||
'sha256') | ||
] | ||
|
||
def tearDown(self): | ||
del self.image | ||
remove_working_dir() | ||
|
||
def testInstance(self): | ||
self.assertEqual(self.image.repotag, 'vmware/tern@sha256:20b32a9a2' | ||
'0752aa1ad7582c667704fda9f004cc4' | ||
'bfd8601fac7f2656c7567bb4') | ||
self.assertEqual(self.image.name, 'vmware/tern') | ||
self.assertEqual(self.image.tag, '') | ||
self.assertTrue(self.image.checksum_type, 'sha256') | ||
self.assertTrue(self.image.checksum, '20b32a9a20752aa1ad7582c66' | ||
'7704fda9f004cc4bfd8601fac7' | ||
'f2656c7567bb4') | ||
self.assertFalse(self.image.manifest) | ||
self.assertFalse(self.image.config) | ||
self.assertFalse(self.image.layers) | ||
self.assertFalse(self.image.history) | ||
# test instantiating with a tag | ||
o = OCIImage('vmware/tern:testimage') | ||
self.assertEqual(o.name, 'vmware/tern') | ||
self.assertEqual(o.tag, 'testimage') | ||
self.assertFalse(o.checksum_type) | ||
self.assertFalse(o.checksum) | ||
|
||
def testLoadImage(self): | ||
self.image.load_image() | ||
self.assertEqual(self.image.layers[0].diff_id, self.layer) | ||
self.assertEqual(len(self.image.layers), self.no_layers) | ||
self.assertEqual(self.image.layers[0].created_by, self.created_by) | ||
self.assertEqual(self.image.layers[0].checksum_type, 'sha256') | ||
self.assertEqual(self.image.layers[0].checksum, self.layer) | ||
|
||
def testGetLayerDiffIds(self): | ||
self.image.load_image() | ||
self.assertEqual(len(self.image.get_layer_diff_ids()), self.no_layers) | ||
self.assertEqual(self.image.get_layer_diff_ids()[0], self.layer) | ||
|
||
def testLayerFiles(self): | ||
self.image.load_image() | ||
self.assertFalse(self.image.layers[0].files) | ||
self.image.layers[0].add_files() | ||
for file in self.image.layers[0].files: | ||
self.assertTrue( | ||
(file.name, file.path, file.checksum, | ||
file.checksum_type) in | ||
self.file_info | ||
) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |