Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding OCI image and a respective test class #734

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions tern/classes/oci_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause

import os
import re
import json
import subprocess # nosec
from tern.utils import general
from tern.classes.image import Image
from tern.analyze.oci import helpers
from tern.classes.image_layer import ImageLayer


class OCIImage(Image):
'''A representation of an image created using OCI format
See image.py for super class's attributes
OCI Image specific attributes:
repotag: the repotag associated with this image
history: a list of commands used to create the filesystem layers
to_dict: return a dict representation of the object
'''

def __init__(self, repotag=None):
super().__init__(repotag)
self.__history = None
if self.repotag is None:
raise NameError("Image object initialized with no repotag")

# parse the repotag
repo_dict = general.parse_oci_image_string(self._repotag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I am expecting an addition of a function in general.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

self._name = repo_dict.get('name')
self._tag = repo_dict.get('tag')
self._image_path = repo_dict.get('path')

Comment on lines +33 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what would be better is if this functionality got moved to the Image class so the DockerImage class can also take advantage of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think self._name and self._tag are common so we can move it to Image class but self._image_path we have to keep it in oci_image.

@property
def history(self):
return self.__history

@property
def image_path(self):
return self._image_path

@image_path.setter
def image_path(self, image_path):
self._image_path = image_path

def to_dict(self, template=None):
'''Return a dictionary representation of the OCI image'''
# this should take care of 'origins' and 'layers'
di_dict = super().to_dict(template)
return di_dict

def get_image_index(self):
'''Returns OCI image index data'''
index_path = os.path.join(self.image_path, "index.json")
with open(index_path) as f:
return json.load(f)

def get_image_manifest(self, image_index):
'''Given image index, returns image manifest'''
manifest = image_index.get("manifests")[0].get("digest")
manifest = re.split(r'[:]', manifest)[1]
manifest = 'blobs/sha256/{0}'.format(manifest)
manifest = os.path.join(self.image_path, manifest)
with open(manifest) as f:
return json.load(f)

def get_image_layers(self, manifest):
'''Given the manifest, return the layers'''
layers = []
for layer in manifest.get('layers'):
layers.append(layer.get("digest").split(":")[1])
return layers

def get_image_config_file(self, manifest):
'''Given the manifest, return the config file'''
return manifest.get('config').get("digest").split(":")[1]

def get_layer_sha(self, layer_path):
'''OCI's layers are file paths starting with the ID.
Get just the sha'''
return os.path.dirname(layer_path)

def get_image_config(self, manifest):
'''Given the manifest, returns the config data'''
config_file = self.get_image_config_file(manifest)
config_file = 'blobs/sha256/{0}'.format(config_file)
config_file = os.path.join(self.image_path, config_file)
with open(config_file) as f:
return json.load(f)

def get_image_history(self, config):
'''If the config has the image history return it. Else return None'''
return config.get('history', None)

def get_diff_checksum_type(self, image_index):
'''Given image index, returns image checksum type'''
manifest = image_index.get("manifests")[0].get("digest")
return re.split(r'[:]', manifest)[0]

def set_layer_created_by(self):
'''OCI image history configuration consists of a list of commands
and indication of whether the command created a filesystem or not.
Set the created_by for each layer in the image'''
# the history is ordered according to the order of the layers
# so the first non-empty history corresponds with the first layer
index = 0
for item in self.__history:
if 'empty_layer' not in item.keys():
if 'created_by' in item.keys():
self._layers[index].created_by = item['created_by']
else:
self._layers[index].created_by = ''
index = index + 1

def load_image(self):
'''Load OCI image metadata using manifest'''
try:
# validate OCI iamge's directory layout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/iamge's/image's

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. its a typo. my bad!

helpers.validate_image_path(self.image_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can put this function in here instead of helpers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I am neutral to keep it here as well.

image_index = self.get_image_index()
self._manifest = self.get_image_manifest(image_index)
self._config = self.get_image_config(self._manifest)
self.__history = self.get_image_history(self._config)
layer_paths = self.get_image_layers(self._manifest)
# copy image layers to working dir for further analysis
helpers.copy_oci_image_layers(self.image_path, layer_paths)
checksum_type = self.get_diff_checksum_type(image_index_data)
while layer_paths:
layer = ImageLayer(None, layer_paths.pop(0))
layer.set_checksum(checksum_type, layer.diff_id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here layer.diff_id will be None

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out, if you are using a tool to convert from a docker image to an OCI image, you may be able to get the layer's diff_id. But in this case, the layer's checksum can be found from the manifest's layers list.

layer.gen_fs_hash()
self._layers.append(layer)
self.set_layer_created_by()
except NameError: # pylint: disable=try-except-raise
raise
except subprocess.CalledProcessError: # pylint: disable=try-except-raise
raise
except IOError: # pylint: disable=try-except-raise
raise
except Exception:
raise
72 changes: 72 additions & 0 deletions tests/test_class_oci_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause

import os
import shutil
import unittest
import subprocess # nosec
from tern.utils import rootfs
from tern.report import report
from tern.utils import general
from tern.__main__ import create_top_dir
from tern.utils.rootfs import set_mount_dir
from tern.classes.oci_image import OCIImage


class TestClassOCIImage(unittest.TestCase):
def setUp(self):
'''Using a specific image here. It will pull a docker image
and convert that into OCI format using `skopeo` utility.'''
set_mount_dir()
create_top_dir()
oci_image_path = general.get_top_dir()
self.image_name = "test"
self.image_tag = "latest"
oci_image = os.path.join(oci_image_path, self.image_name)
image_string = "oci:{0}:{1}".format(oci_image, self.image_tag)
try:
if not os.path.exists(oci_image):
cmd = ["skopeo", "copy", "docker://photon:3.0-20200626", image_string]
rootfs.root_command(cmd)
except subprocess.CalledProcessError as error:
print(error.output)

report.setup(image_tag_string=oci_image, image_type="oci")
self.image = OCIImage(image_string)
self.layer = (
'c571a0f54bfedced3e82c91439f2b36f07d3f42ac3df20db88e242c100a6e3d6')
self.no_layers = 1
self.created_by = ('/bin/sh -c #(nop) ADD '
'file:a702523c66281e08513d982f9eb'
'abf71fefe0bfd756712719a10533d2ae71e37 in / ')

def tearDown(self):
del self.image

@classmethod
def tearDownClass(cls):
shutil.rmtree(general.get_top_dir())

def testInstance(self):
self.assertEqual(self.image.name, self.image_name)
self.assertEqual(self.image.tag, self.image_tag)
self.assertFalse(self.image.manifest)
self.assertFalse(self.image.config)
self.assertFalse(self.image.layers)
self.assertFalse(self.image.history)

def testLoadImage(self):
self.image.load_image()
self.assertEqual(len(self.image.layers), self.no_layers)
self.assertEqual(self.image.layers[0].created_by, self.created_by)
self.assertEqual(self.image.layers[0].checksum_type, 'sha256')

def testLayerFiles(self):
self.image.load_image()
self.assertFalse(self.image.layers[0].files)


if __name__ == '__main__':
unittest.main()