Skip to content

Commit

Permalink
classes: Add OCIImage class and use image_layout
Browse files Browse the repository at this point in the history
This is work towards tern-tools#948

This change adds a new class - OCIImage which represents the
OCI image schema version 2 available on disk when downloaded by
skopeo. This change also adds a skopeo.py script to the list
of loading methods. Skopeo.py adds functions to check if the skopeo
binary exists on disk and a function to pull images using skopeo.
A test suite for the OCIImage class is also added. This test suit
is similar to the test_class_docker_image.py test suit.

Additionally, we have changed the DockerImage class and associated tests
to set the layer's image_layout property to "docker".

We've switched the test for changes to the oci class to use a container
image and the changes to the docker class to use the dockerfile, as
we need the docker daemon to test Dockerfile builds.

Signed-off-by: Mukul Taneja <[email protected]>
Signed-off-by: Nisha K <[email protected]>
  • Loading branch information
Nisha K committed Dec 15, 2021
1 parent 6fcdcb9 commit c6c4617
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 1 deletion.
6 changes: 5 additions & 1 deletion ci/test_files_touched.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@
# tern/classes
re.compile('tern/classes/command.py'):
['python tests/test_class_command.py'],
re.compile('tern/classes/oci_image.py'):
['tern report -i photon:3.0',
'python tests/test_class_oci_image.py'],
re.compile('tern/classes/docker_image.py'):
['tern report -i photon:3.0'],
['tern report -d samples/alpine_python/Dockerfile',
'python tests/test_class_docker_image.py'],
re.compile('tern/classes/file_data.py'):
['python tests/test_class_file_data.py'],
re.compile('tern/classes/image.py'):
Expand Down
1 change: 1 addition & 0 deletions tern/classes/docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def load_image(self, load_until_layer=0):
if (self.load_until_layer >= layer_count
or self.load_until_layer == 0):
layer.set_checksum(checksum_type, layer.diff_id)
layer.image_layout = "docker"
layer.gen_fs_hash()
layer.layer_index = layer_count
self._layers.append(layer)
Expand Down
136 changes: 136 additions & 0 deletions tern/classes/oci_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause

import json
import subprocess # nosec

from tern.utils import rootfs
from tern.utils import general
from tern.classes.image import Image
from tern.utils.constants import manifest_file
from tern.classes.image_layer import ImageLayer


class OCIImage(Image):
"""A representation of an OCI compatible image that exists on disk"""
def __init__(self, repotag=None):
super().__init__(repotag)
# In case the OCI image corresponds with an image built by Docker
# we also include the history
self.__history = None
if self.repotag is None:
raise NameError("Image object initialized with no repotag")

# parse the repotag
repo_dict = general.parse_image_string(self._repotag)
self._name = repo_dict.get('name')
self._tag = repo_dict.get('tag')
self.set_checksum(
repo_dict.get('digest_type'), repo_dict.get('digest'))

@property
def history(self):
return self.__history

def to_dict(self, template=None):
# this should take care of 'origins' and 'layers'
oci_dict = super().to_dict(template)
return oci_dict

def get_image_manifest(self):
temp_path = rootfs.get_working_dir()
with general.pushd(temp_path):
with open(manifest_file, encoding='utf-8') as f:
json_obj = json.loads(f.read())
return json_obj

def get_image_layers(self, manifest):
layers = []
for layer in manifest.get('layers'):
layers.append(layer.get("digest").split(":")[1])
return layers

def get_image_config_file(self, manifest):
return manifest.get('config').get("digest").split(":")[1]

def get_image_config(self, manifest):
config_file = self.get_image_config_file(manifest)
temp_path = rootfs.get_working_dir()
with general.pushd(temp_path):
with open(config_file, encoding='utf-8') as f:
json_obj = json.loads(f.read())
return json_obj

def get_image_history(self, config):
if 'history' in config.keys():
return config['history']
return None

def get_diff_ids(self, config):
diff_ids = []
for item in config['rootfs']['diff_ids']:
diff_ids.append(item.split(':').pop())
return diff_ids

def get_diff_checksum_type(self, config):
'''Get the checksum type that was used to calculate the diff_id
of the image'''
return config['rootfs']['diff_ids'][0].split(':')[0]

def set_layer_created_by(self):
# the history is ordered according to the order of the layers
# so the first non-empty history corresponds with the first layer
index = 0
for item in self.__history:
if 'empty_layer' not in item.keys():
if 'created_by' in item.keys():
self._layers[index].created_by = item['created_by']
else:
self._layers[index].created_by = ''
index = index + 1

def load_image(self, load_until_layer=0):
if load_until_layer > 0:
self._load_until_layer = load_until_layer
try:
self._manifest = self.get_image_manifest()
self._config = self.get_image_config(self._manifest)
self.__history = self.get_image_history(self._config)
layer_paths = self.get_image_layers(self._manifest)
layer_diffs = self.get_diff_ids(self._config)
# if the digest isn't in the repotag, get it from the config
if not self.checksum:
repo_dict = general.parse_image_string(
self._config.get("config").get("Image"))
self.set_checksum(repo_dict.get("digest_type"),
repo_dict.get("digest"))
layer_checksum_type = self.get_diff_checksum_type(self._config)
layer_count = 1
while layer_diffs and layer_paths:
layer = ImageLayer(layer_diffs.pop(0), layer_paths.pop(0))
if (self.load_until_layer >= layer_count
or self.load_until_layer == 0):
layer.set_checksum(layer_checksum_type, layer.diff_id)
layer.image_layout = "oci"
# take care to set the layer index as it will be used
# to create the directory where the layer contents will
# be untarred
layer.layer_index = layer_count
layer.gen_fs_hash()
self._layers.append(layer)
layer_count = layer_count + 1
self._total_layers = layer_count - 1
if self.load_until_layer > self.total_layers:
# if user asked to analyze more layers than image has
# turn off the load_until_layer feature
self._load_until_layer = 0
self.set_layer_created_by()
except NameError as e:
raise NameError(e) from e
except subprocess.CalledProcessError as e:
raise subprocess.CalledProcessError(
e.returncode, cmd=e.cmd, output=e.output, stderr=e.stderr)
except IOError as e:
raise IOError(e) from e
42 changes: 42 additions & 0 deletions tern/load/skopeo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause

"""
Interactions with remote container images using skopeo
"""

import logging
import sys
import shutil

from tern.utils import constants
from tern.utils import rootfs

# global logger
logger = logging.getLogger(constants.logger_name)


def check_skopeo_setup():
"""Check if the skopeo tool is installed"""
if not shutil.which('skopeo'):
logger.critical('Skopeo is not installed')
logger.critical('Exiting...')
sys.exit(1)


def pull_image(image_tag_string):
"""Use skopeo to pull a remote image into the working directory"""
# Check if skopeo is set up
check_skopeo_setup()
# we will assume the docker transport for now
remote = f'docker://{image_tag_string}'
local = f'dir:{rootfs.get_working_dir()}'
logger.debug("Attempting to pull image \"%s\"", image_tag_string)
result, error = rootfs.shell_command(
False, ['skopeo', 'copy', remote, local])
if error:
logger.error("Error when downloading image: \"%s\"", error)
return None
return result
2 changes: 2 additions & 0 deletions tests/test_class_docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def testGetLayerDiffIds(self):
def testLayerFiles(self):
self.image.load_image()
self.assertFalse(self.image.layers[0].files)
print("here")
print(self.image.layers[0].image_layout)
self.image.layers[0].add_files()
for file in self.image.layers[0].files:
self.assertTrue(
Expand Down
106 changes: 106 additions & 0 deletions tests/test_class_oci_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause

import unittest

from tern.load import skopeo
from tern.classes.oci_image import OCIImage
from tern.utils import rootfs
from test_fixtures import create_working_dir
from test_fixtures import remove_working_dir


class TestClassOCIImage(unittest.TestCase):

def setUp(self):
'''Using a specific image here. If this test fails due to the image
not being found anymore, pick a different image to test against
For now use Docker to pull the image from Dockerhub'''
create_working_dir()
rootfs.set_working_dir()
# this should check if the docker image extraction is successful
skopeo.pull_image('vmware/tern@sha256:20b32a9a20752aa1ad7582c667704f'
'da9f004cc4bfd8601fac7f2656c7567bb4')
self.image = OCIImage('vmware/tern@sha256:20b32a9a20752aa1ad7582c6'
'67704fda9f004cc4bfd8601fac7f2656c7567bb4')
# constants for this image
self.layer = ('c1c3a87012e7ff5791b31e94515b661'
'cdf06f6d5dc2f9a6245eda8774d257a13')
self.no_layers = 1
self.created_by = ('/bin/sh -c #(nop) ADD '
'file:92137e724f46c720d8083a11290c67'
'd9daa387e523336b1757a0e3c4f5867cd5 '
'in / ')
self.file_info = [
('file2.txt', 'documents/test/file2.txt',
'9710f003d924890c7677b4dd91fd753f6ed71cc57d4f'
'9482261b6786d81957fa',
'sha256'),
('file2.txt', 'documents/test/test2/file2.txt',
'885000512dee8ac814641bbf6a7c887012ec23a2fb3e'
'3b2cff583c45a611317d',
'sha256'),
('file1.txt', 'documents/test/test2/file1.txt',
'885000512dee8ac814641bbf6a7c887012ec'
'23a2fb3e3b2cff583c45a611317d',
'sha256'),
('file1.txt', 'documents/test/file1.txt',
'a3cccbc52486d50a86ff0bc1e6ea0e0b701ac'
'4bb139f8713fa136ef9ec68e97e',
'sha256')
]

def tearDown(self):
del self.image
remove_working_dir()

def testInstance(self):
self.assertEqual(self.image.repotag, 'vmware/tern@sha256:20b32a9a2'
'0752aa1ad7582c667704fda9f004cc4'
'bfd8601fac7f2656c7567bb4')
self.assertEqual(self.image.name, 'vmware/tern')
self.assertEqual(self.image.tag, '')
self.assertTrue(self.image.checksum_type, 'sha256')
self.assertTrue(self.image.checksum, '20b32a9a20752aa1ad7582c66'
'7704fda9f004cc4bfd8601fac7'
'f2656c7567bb4')
self.assertFalse(self.image.manifest)
self.assertFalse(self.image.config)
self.assertFalse(self.image.layers)
self.assertFalse(self.image.history)
# test instantiating with a tag
o = OCIImage('vmware/tern:testimage')
self.assertEqual(o.name, 'vmware/tern')
self.assertEqual(o.tag, 'testimage')
self.assertFalse(o.checksum_type)
self.assertFalse(o.checksum)

def testLoadImage(self):
self.image.load_image()
self.assertEqual(self.image.layers[0].diff_id, self.layer)
self.assertEqual(len(self.image.layers), self.no_layers)
self.assertEqual(self.image.layers[0].created_by, self.created_by)
self.assertEqual(self.image.layers[0].checksum_type, 'sha256')
self.assertEqual(self.image.layers[0].checksum, self.layer)

def testGetLayerDiffIds(self):
self.image.load_image()
self.assertEqual(len(self.image.get_layer_diff_ids()), self.no_layers)
self.assertEqual(self.image.get_layer_diff_ids()[0], self.layer)

def testLayerFiles(self):
self.image.load_image()
self.assertFalse(self.image.layers[0].files)
self.image.layers[0].add_files()
for file in self.image.layers[0].files:
self.assertTrue(
(file.name, file.path, file.checksum,
file.checksum_type) in
self.file_info
)


if __name__ == '__main__':
unittest.main()

0 comments on commit c6c4617

Please sign in to comment.