From 470d1c5b39a81779441fe872f56da65986145f72 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Wed, 2 Dec 2020 12:44:17 +0530 Subject: [PATCH] Add compose_object() and enhance copy_object() APIs (#1021) --- docs/API.md | 172 ++++++++---- examples/compose_object.py | 51 ++++ examples/copy_object.py | 40 +-- examples/put_and_get_object_sse-c.py | 6 +- minio/__init__.py | 1 - minio/api.py | 394 ++++++++++++++++++++++++--- minio/commonconfig.py | 214 +++++++++++++++ minio/copy_conditions.py | 92 ------- minio/datatypes.py | 12 + minio/helpers.py | 23 ++ tests/functional/tests.py | 61 +++-- tests/unit/copy_object_test.py | 40 +-- 12 files changed, 827 insertions(+), 279 deletions(-) create mode 100644 examples/compose_object.py delete mode 100644 minio/copy_conditions.py diff --git a/docs/API.md b/docs/API.md index c96f7759f..e1982933b 100644 --- a/docs/API.md +++ b/docs/API.md @@ -35,24 +35,24 @@ s3Client = Minio( | [`make_bucket`](#make_bucket) | [`get_object`](#get_object) | | [`list_buckets`](#list_buckets) | [`put_object`](#put_object) | | [`bucket_exists`](#bucket_exists) | [`copy_object`](#copy_object) | -| [`remove_bucket`](#remove_bucket) | [`stat_object`](#stat_object) | -| [`list_objects`](#list_objects) | [`remove_object`](#remove_object) | -| [`get_bucket_versioning`](#get_bucket_versioning) | [`remove_objects`](#remove_objects) | -| [`set_bucket_versioning`](#set_bucket_versioning) | [`fput_object`](#fput_object) | -| [`delete_bucket_replication`](#delete_bucket_replication) | [`fget_object`](#fget_object) | -| [`get_bucket_replication`](#get_bucket_replication) | [`select_object_content`](#select_object_content) | -| [`set_bucket_replication`](#set_bucket_replication) | [`delete_object_tags`](#delete_object_tags) | -| [`delete_bucket_lifecycle`](#delete_bucket_lifecycle) | [`get_object_tags`](#get_object_tags) | -| [`get_bucket_lifecycle`](#get_bucket_lifecycle) | [`set_object_tags`](#set_object_tags) | -| [`set_bucket_lifecycle`](#set_bucket_lifecycle) | [`enable_object_legal_hold`](#enable_object_legal_hold) | -| [`delete_bucket_tags`](#delete_bucket_tags) | [`disable_object_legal_hold`](#disable_object_legal_hold) | -| [`get_bucket_tags`](#get_bucket_tags) | [`is_object_legal_hold_enabled`](#is_object_legal_hold_enabled) | -| [`set_bucket_tags`](#set_bucket_tags) | [`get_object_retention`](#get_object_retention) | -| [`delete_bucket_policy`](#delete_bucket_policy) | [`set_object_retention`](#set_object_retention) | -| [`get_bucket_policy`](#get_bucket_policy) | [`presigned_get_object`](#presigned_get_object) | -| [`set_bucket_policy`](#set_bucket_policy) | [`presigned_put_object`](#presigned_put_object) | -| [`delete_bucket_notification`](#delete_bucket_notification) | [`presigned_post_policy`](#presigned_post_policy) | -| [`get_bucket_notification`](#get_bucket_notification) | | +| [`remove_bucket`](#remove_bucket) | [`compose_object`](#compose_object) | +| [`list_objects`](#list_objects) | [`stat_object`](#stat_object) | +| [`get_bucket_versioning`](#get_bucket_versioning) | [`remove_object`](#remove_object) | +| [`set_bucket_versioning`](#set_bucket_versioning) | [`remove_objects`](#remove_objects) | +| [`delete_bucket_replication`](#delete_bucket_replication) | [`fput_object`](#fput_object) | +| [`get_bucket_replication`](#get_bucket_replication) | [`fget_object`](#fget_object) | +| [`set_bucket_replication`](#set_bucket_replication) | [`select_object_content`](#select_object_content) | +| [`delete_bucket_lifecycle`](#delete_bucket_lifecycle) | [`delete_object_tags`](#delete_object_tags) | +| [`get_bucket_lifecycle`](#get_bucket_lifecycle) | [`get_object_tags`](#get_object_tags) | +| [`set_bucket_lifecycle`](#set_bucket_lifecycle) | [`set_object_tags`](#set_object_tags) | +| [`delete_bucket_tags`](#delete_bucket_tags) | [`enable_object_legal_hold`](#enable_object_legal_hold) | +| [`get_bucket_tags`](#get_bucket_tags) | [`disable_object_legal_hold`](#disable_object_legal_hold) | +| [`set_bucket_tags`](#set_bucket_tags) | [`is_object_legal_hold_enabled`](#is_object_legal_hold_enabled) | +| [`delete_bucket_policy`](#delete_bucket_policy) | [`get_object_retention`](#get_object_retention) | +| [`get_bucket_policy`](#get_bucket_policy) | [`set_object_retention`](#set_object_retention) | +| [`set_bucket_policy`](#set_bucket_policy) | [`presigned_get_object`](#presigned_get_object) | +| [`delete_bucket_notification`](#delete_bucket_notification) | [`presigned_put_object`](#presigned_put_object) | +| [`get_bucket_notification`](#get_bucket_notification) | [`presigned_post_policy`](#presigned_post_policy) | | [`set_bucket_notification`](#set_bucket_notification) | | | [`listen_bucket_notification`](#listen_bucket_notification) | | | [`delete_bucket_encryption`](#delete_bucket_encryption) | | @@ -147,7 +147,7 @@ s3Client = Minio( -### make_bucket(self, bucket_name, location='us-east-1', object_lock=False) +### make_bucket(bucket_name, location='us-east-1', object_lock=False) Create a bucket with region and object lock. @@ -938,21 +938,25 @@ minio.fget_object( -### copy_object(bucket_name, object_name, object_source, conditions=None, source_sse=None, sse=None, metadata=None) +### copy_object(bucket_name, object_name, source, sse=None, metadata=None, tags=None, retention=None, legal_hold=False, metadata_directive=None, tagging_directive=None) Create an object by server-side copying data from another object. In this API maximum supported source object size is 5GiB. __Parameters__ -| Param | Type | Description | -|:----------------|:-----------------|:----------------------------------------------------------------------| -| `bucket_name` | _str_ | Name of the bucket. | -| `object_name` | _str_ | Object name in the bucket. | -| `object_source` | _str_ | Source object to be copied. | -| `conditions` | _CopyConditions_ | Collection of supported CopyObject conditions. | -| `source_sse` | _SseCustomerKey_ | Server-side encryption customer key of source object. | -| `sse` | _Sse_ | Server-side encryption of destination object. | -| `metadata` | _dict_ | Any user-defined metadata to be copied along with destination object. | +| Param | Type | Description | +|:---------------------|:-------------|:----------------------------------------------------------------------| +| `bucket_name` | _str_ | Name of the bucket. | +| `object_name` | _str_ | Object name in the bucket. | +| `source` | _CopySource_ | Source object information. | +| `sse` | _Sse_ | Server-side encryption of destination object. | +| `metadata` | _dict_ | Any user-defined metadata to be copied along with destination object. | +| `tags` | _Tags_ | Tags for destination object. | +| `retention` | _Retention_ | Retention configuration. | +| `legal_hold` | _bool_ | Flag to set legal hold for destination object. | +| `metadata_directive` | _str_ | Directive used to handle user metadata for destination object. | +| `tagging_directive` | _str_ | Directive used to handle tags for destination object. | + __Return Value__ @@ -963,48 +967,98 @@ __Return Value__ __Example__ ```py -import time -from datetime import datetime -from minio import CopyConditions +from datetime import datetime, timezone +from minio.commonconfig import REPLACE, CopySource + +# copy an object from a bucket to another. +result = client.copy_object( + "my-bucket", + "my-object", + CopySource("my-sourcebucket", "my-sourceobject"), +) +print(result.object_name, result.version_id) -minio.copy_object( - "my-bucketname", - "my-objectname", - "my-source-bucketname/my-source-objectname", +# copy an object with condition. +result = client.copy_object( + "my-bucket", + "my-object", + CopySource( + "my-sourcebucket", + "my-sourceobject", + modified_since=datetime(2014, 4, 1, tzinfo=timezone.utc), + ), ) +print(result.object_name, result.version_id) -minio.copy_object( - "my-bucketname", - "my-objectname", - "my-source-bucketname/my-source-objectname" - "?versionId=b6602757-7c9c-449b-937f-fed504d04c94", +# copy an object from a bucket with replacing metadata. +metadata = {"test_meta_key": "test_meta_value"} +result = client.copy_object( + "my-bucket", + "my-object", + CopySource("my-sourcebucket", "my-sourceobject"), + metadata=metadata, + metadata_directive=REPLACE, ) +print(result.object_name, result.version_id) +``` -copy_conditions = CopyConditions() -# Set modified condition, copy object modified since 2014 April. -t = (2014, 4, 0, 0, 0, 0, 0, 0, 0) -mod_since = datetime.utcfromtimestamp(time.mktime(t)) -copy_conditions.set_modified_since(mod_since) + -# Set unmodified condition, copy object unmodified since 2014 April. -copy_conditions.set_unmodified_since(mod_since) +### compose_object(bucket_name, object_name, sources, sse=None, metadata=None, tags=None, retention=None, legal_hold=False) -# Set matching ETag condition, copy object which matches the following ETag. -copy_conditions.set_match_etag("31624deb84149d2f8ef9c385918b653a") +Create an object by combining data from different source objects using server-side copy. -# Set matching ETag except condition, copy object which does not match the following ETag. -copy_conditions.set_match_etag_except("31624deb84149d2f8ef9c385918b653a") +__Parameters__ + +| Param | Type | Description | +|:--------------|:------------|:----------------------------------------------------------------------| +| `bucket_name` | _str_ | Name of the bucket. | +| `object_name` | _str_ | Object name in the bucket. | +| `sources` | _list_ | List of _ComposeSource_ object. | +| `sse` | _Sse_ | Server-side encryption of destination object. | +| `metadata` | _dict_ | Any user-defined metadata to be copied along with destination object. | +| `tags` | _Tags_ | Tags for destination object. | +| `retention` | _Retention_ | Retention configuration. | +| `legal_hold` | _bool_ | Flag to set legal hold for destination object. | -# Set metadata, which will be copied along with the destination object. -metadata = {"test-key": "test-data"} -result = minioClient.copy_object( - "my-bucketname", - "my-objectname", - "my-source-bucketname/my-source-objectname", - copy_conditions,metadata=metadata, +__Return Value__ + +| Return | +|:----------------------------| +| _ObjectWriteResult_ object. | + +__Example__ + +```py +from minio.commonconfig import ComposeSource +from minio.sse import SseS3 + +sources = [ + ComposeSource("my-job-bucket", "my-object-part-one"), + ComposeSource("my-job-bucket", "my-object-part-two"), + ComposeSource("my-job-bucket", "my-object-part-three"), +] + +# Create my-bucket/my-object by combining source object +# list. +result = client.compose_object("my-bucket", "my-object", sources) +print(result.object_name, result.version_id) + +# Create my-bucket/my-object with user metadata by combining +# source object list. +result = client.compose_object( + "my-bucket", + "my-object", + sources, + metadata={"test_meta_key": "test_meta_value"}, ) print(result.object_name, result.version_id) + +# Create my-bucket/my-object with user metadata and +# server-side encryption by combining source object list. +client.compose_object("my-bucket", "my-object", sources, sse=SseS3()) +print(result.object_name, result.version_id) ``` diff --git a/examples/compose_object.py b/examples/compose_object.py new file mode 100644 index 000000000..265f2e6c4 --- /dev/null +++ b/examples/compose_object.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# MinIO Python Library for Amazon S3 Compatible Cloud Storage, +# (C) 2020 MinIO, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from minio import Minio +from minio.commonconfig import ComposeSource +from minio.sse import SseS3 + +client = Minio( + "play.min.io", + access_key="Q3AM3UQ867SPQQA43P2F", + secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", +) + +sources = [ + ComposeSource("my-job-bucket", "my-object-part-one"), + ComposeSource("my-job-bucket", "my-object-part-two"), + ComposeSource("my-job-bucket", "my-object-part-three"), +] + +# Create my-bucket/my-object by combining source object +# list. +result = client.compose_object("my-bucket", "my-object", sources) +print(result.object_name, result.version_id) + +# Create my-bucket/my-object with user metadata by combining +# source object list. +result = client.compose_object( + "my-bucket", + "my-object", + sources, + metadata={"test_meta_key": "test_meta_value"}, +) +print(result.object_name, result.version_id) + +# Create my-bucket/my-object with user metadata and +# server-side encryption by combining source object list. +client.compose_object("my-bucket", "my-object", sources, sse=SseS3()) +print(result.object_name, result.version_id) diff --git a/examples/copy_object.py b/examples/copy_object.py index b3191da8d..06f6d136a 100644 --- a/examples/copy_object.py +++ b/examples/copy_object.py @@ -14,45 +14,34 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY, my-testfile, my-bucketname and -# my-objectname are dummy values, please replace them with original values. - from datetime import datetime, timezone -from minio import CopyConditions, Minio +from minio import Minio +from minio.commonconfig import REPLACE, CopySource -client = Minio('s3.amazonaws.com', - access_key='YOUR-ACCESSKEY', - secret_key='YOUR-SECRETKEY') +client = Minio( + "play.min.io", + access_key="Q3AM3UQ867SPQQA43P2F", + secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", +) # copy an object from a bucket to another. result = client.copy_object( "my-bucket", "my-object", - "/my-sourcebucket/my-sourceobject", + CopySource("my-sourcebucket", "my-sourceobject"), ) print(result.object_name, result.version_id) # copy an object with condition. -copy_conditions = CopyConditions() -# Set modified condition, copy object modified since 1st April 2014. -mod_since = datetime(2014, 4, 1, tzinfo=timezone.utc) -copy_conditions.set_modified_since(mod_since) - -# Set unmodified condition, copy object unmodified since 1st April 2014. -# copy_conditions.set_unmodified_since(mod_since) - -# Set matching ETag condition, copy object which matches the following ETag. -# copy_conditions.set_match_etag("31624deb84149d2f8ef9c385918b653a") - -# Set matching ETag except condition, copy object which does not match the -# following ETag. -# copy_conditions.set_match_etag_except("31624deb84149d2f8ef9c385918b653a") result = client.copy_object( "my-bucket", "my-object", - "/my-sourcebucket/my-sourceobject", - copy_conditions, + CopySource( + "my-sourcebucket", + "my-sourceobject", + modified_since=datetime(2014, 4, 1, tzinfo=timezone.utc), + ), ) print(result.object_name, result.version_id) @@ -61,7 +50,8 @@ result = client.copy_object( "my-bucket", "my-object", - "/my-sourcebucket/my-sourceobject", + CopySource("my-sourcebucket", "my-sourceobject"), metadata=metadata, + metadata_directive=REPLACE, ) print(result.object_name, result.version_id) diff --git a/examples/put_and_get_object_sse-c.py b/examples/put_and_get_object_sse-c.py index e07c25d27..fb7192a64 100644 --- a/examples/put_and_get_object_sse-c.py +++ b/examples/put_and_get_object_sse-c.py @@ -20,6 +20,7 @@ from io import BytesIO from minio.api import Minio +from minio.commonconfig import CopySource from minio.sse import SseCustomerKey AWSAccessKeyId = 'YOUR-ACCESSKEYID' @@ -45,8 +46,9 @@ def main(): # Copy encrypted object on Server-Side from Source to Destination obj = minio.copy_object(STORAGE_BUCKET, 'test_crypt_copy.txt', - STORAGE_BUCKET + '/test_crypt.txt', - source_sse=ssec, + CopySource( + STORAGE_BUCKET, 'test_crypt.txt', ssec=ssec, + ), sse=ssec) # Get decrypted object with SSE_C object passed in as param diff --git a/minio/__init__.py b/minio/__init__.py index 269446e83..52ba9216c 100644 --- a/minio/__init__.py +++ b/minio/__init__.py @@ -39,5 +39,4 @@ # pylint: disable=unused-import from .api import Minio -from .copy_conditions import CopyConditions from .error import InvalidResponseError, S3Error, ServerError diff --git a/minio/api.py b/minio/api.py index 5217d5ff2..b24338d81 100644 --- a/minio/api.py +++ b/minio/api.py @@ -42,20 +42,22 @@ import urllib3 from urllib3._collections import HTTPHeaderDict -from . import __title__, __version__ -from . import time -from .commonconfig import Tags +from . import __title__, __version__, time +from .commonconfig import COPY, REPLACE, ComposeSource, CopySource, Tags from .credentials import StaticProvider from .datatypes import (CompleteMultipartUploadResult, ListAllMyBucketsResult, ListMultipartUploadsResult, ListPartsResult, Object, - Part, PostPolicy, parse_list_objects) + Part, PostPolicy, parse_copy_object, + parse_list_objects) from .deleteobjects import DeleteError, DeleteRequest, DeleteResult from .error import InvalidResponseError, S3Error, ServerError -from .helpers import (BaseURL, ObjectWriteResult, ThreadPool, - check_bucket_name, check_non_empty_string, check_sse, - check_ssec, get_part_info, headers_to_strings, - is_valid_policy_type, makedirs, md5sum_hash, - normalize_headers, quote, read_part_data, sha256_hash) +from .helpers import (MAX_MULTIPART_COUNT, MAX_MULTIPART_OBJECT_SIZE, + MAX_PART_SIZE, MIN_PART_SIZE, BaseURL, ObjectWriteResult, + ThreadPool, check_bucket_name, check_non_empty_string, + check_sse, check_ssec, genheaders, get_part_info, + headers_to_strings, is_valid_policy_type, makedirs, + md5sum_hash, normalize_headers, read_part_data, + sha256_hash) from .legalhold import LegalHold from .lifecycleconfig import LifecycleConfig from .notificationconfig import NotificationConfig @@ -1099,64 +1101,129 @@ def get_object(self, bucket_name, object_name, offset=0, length=0, preload_content=False, ) - def copy_object(self, bucket_name, object_name, object_source, - conditions=None, source_sse=None, sse=None, metadata=None): + def copy_object(self, bucket_name, object_name, source, + sse=None, metadata=None, tags=None, retention=None, + legal_hold=False, metadata_directive=None, + tagging_directive=None): """ Create an object by server-side copying data from another object. In this API maximum supported source object size is 5GiB. :param bucket_name: Name of the bucket. :param object_name: Object name in the bucket. - :param object_source: Source object to be copied. - :param conditions: :class:`CopyConditions` object. Collection of - supported CopyObject conditions. - :param source_sse: Server-side encryption customer key of source - object. + :param source: :class:`CopySource` object. :param sse: Server-side encryption of destination object. :param metadata: Any user-defined metadata to be copied along with destination object. + :param tags: Tags for destination object. + :param retention: :class:`Retention` configuration object. + :param legal_hold: Flag to set legal hold for destination object. + :param metadata_directive: Directive used to handle user metadata for + destination object. + :param tagging_directive: Directive used to handle tags for destination + object. :return: :class:`ObjectWriteResult ` object. Example:: - minio.copy_object( - "my-bucketname", - "my-objectname", - "my-source-bucketname/my-source-objectname", + # copy an object from a bucket to another. + result = client.copy_object( + "my-bucket", + "my-object", + CopySource("my-sourcebucket", "my-sourceobject"), ) - minio.copy_object( - "my-bucketname", - "my-objectname", - "my-source-bucketname/my-source-objectname" - "?versionId=b6602757-7c9c-449b-937f-fed504d04c94", + print(result.object_name, result.version_id) + + # copy an object with condition. + result = client.copy_object( + "my-bucket", + "my-object", + CopySource( + "my-sourcebucket", + "my-sourceobject", + modified_since=datetime(2014, 4, 1, tzinfo=timezone.utc), + ), + ) + print(result.object_name, result.version_id) + + # copy an object from a bucket with replacing metadata. + metadata = {"test_meta_key": "test_meta_value"} + result = client.copy_object( + "my-bucket", + "my-object", + CopySource("my-sourcebucket", "my-sourceobject"), + metadata=metadata, + metadata_directive=REPLACE, ) + print(result.object_name, result.version_id) """ check_bucket_name(bucket_name) check_non_empty_string(object_name) - check_non_empty_string(object_source) - check_ssec(source_sse) + if not isinstance(source, CopySource): + raise ValueError("source must be CopySource type") check_sse(sse) + if tags is not None and not isinstance(tags, Tags): + raise ValueError("tags must be Tags type") + if retention is not None and not isinstance(retention, Retention): + raise ValueError("retention must be Retention type") + if ( + metadata_directive is not None and + metadata_directive not in [COPY, REPLACE] + ): + raise ValueError( + "metadata directive must be {0} or {1}".format(COPY, REPLACE), + ) + if ( + tagging_directive is not None and + tagging_directive not in [COPY, REPLACE] + ): + raise ValueError( + "tagging directive must be {0} or {1}".format(COPY, REPLACE), + ) - headers = normalize_headers(metadata) - if metadata: - headers["x-amz-metadata-directive"] = "REPLACE" - if conditions: - headers.update(conditions) - headers.update(source_sse.copy_headers() if source_sse else {}) - headers.update(sse.headers() if sse else {}) - headers['X-Amz-Copy-Source'] = quote(object_source) + size = -1 + if source.offset is None and source.length is None: + stat = self.stat_object( + source.bucket_name, + source.object_name, + version_id=source.version_id, + ssec=source.ssec, + ) + size = stat.size + + if ( + source.offset is not None or + source.length is not None or + size > MAX_PART_SIZE + ): + if metadata_directive == COPY: + raise ValueError( + "COPY metadata directive is not applicable to source " + "object size greater than 5 GiB", + ) + if tagging_directive == COPY: + raise ValueError( + "COPY tagging directive is not applicable to source " + "object size greater than 5 GiB" + ) + return self.compose_object( + bucket_name, object_name, ComposeSource.of(source), + sse=sse, metadata=metadata, tags=tags, retention=retention, + legal_hold=legal_hold, + ) + + headers = genheaders(metadata, sse, tags, retention, legal_hold) + if metadata_directive: + headers["x-amz-metadata-directive"] = metadata_directive + if tagging_directive: + headers["x-amz-tagging-directive"] = tagging_directive + headers.update(source.gen_copy_headers()) response = self._execute( "PUT", bucket_name, object_name=object_name, headers=headers, ) - element = ET.fromstring(response.data.decode()) - etag = findtext(element, "ETag") - if etag: - etag = etag.replace('"', "") - last_modified = findtext(element, "LastModified") - if last_modified: - last_modified = time.from_iso8601utc(last_modified) + etag, last_modified = parse_copy_object(response) return ObjectWriteResult( bucket_name, object_name, @@ -1166,6 +1233,249 @@ def copy_object(self, bucket_name, object_name, object_source, last_modified=last_modified, ) + def _calc_part_count(self, sources): + """Calculate part count.""" + object_size = 0 + part_count = 0 + i = 0 + for src in sources: + i += 1 + stat = self.stat_object( + src.bucket_name, + src.object_name, + version_id=src.version_id, + ssec=src.ssec, + ) + src.build_headers(stat.size, stat.etag) + size = stat.size + if src.length is not None: + size = src.length + elif src.offset is not None: + size -= src.offset + + if ( + size < MIN_PART_SIZE and + len(sources) != 1 and + i != len(sources) + ): + raise ValueError( + "source {0}/{1}: size {2} must be greater than {3}".format( + src.bucket_name, src.object_name, size, MIN_PART_SIZE, + ), + ) + + object_size += size + if object_size > MAX_MULTIPART_OBJECT_SIZE: + raise ValueError( + "destination object size must be less than {0}".format( + MAX_MULTIPART_OBJECT_SIZE, + ), + ) + + if size > MAX_PART_SIZE: + count = int(size / MAX_PART_SIZE) + last_part_size = size - (count * MAX_PART_SIZE) + if last_part_size > 0: + count += 1 + else: + last_part_size = MAX_PART_SIZE + if ( + last_part_size < MIN_PART_SIZE and + len(sources) != 1 and + i != len(sources) + ): + raise ValueError( + ( + "source {0}/{1}: for multipart split upload of " + "{2}, last part size is less than {3}" + ).format( + src.bucket_name, src.object_name, size, + MIN_PART_SIZE, + ), + ) + part_count += count + else: + part_count += 1 + + if part_count > MAX_MULTIPART_COUNT: + raise ValueError( + ( + "Compose sources create more than allowed multipart " + "count {0}" + ).format(MAX_MULTIPART_COUNT), + ) + return part_count + + def _upload_part_copy(self, bucket_name, object_name, upload_id, + part_number, headers): + """Execute UploadPartCopy S3 API.""" + query_params = { + "partNumber": str(part_number), + "uploadId": upload_id, + } + response = self._execute( + "PUT", + bucket_name, + object_name, + headers=headers, + query_params=query_params, + ) + return parse_copy_object(response) + + def compose_object( # pylint: disable=too-many-branches + self, bucket_name, object_name, sources, + sse=None, metadata=None, tags=None, retention=None, + legal_hold=False, + ): + """ + Create an object by combining data from different source objects using + server-side copy. + + :param bucket_name: Name of the bucket. + :param object_name: Object name in the bucket. + :param sources: List of :class:`ComposeSource` object. + :param sse: Server-side encryption of destination object. + :param metadata: Any user-defined metadata to be copied along with + destination object. + :param tags: Tags for destination object. + :param retention: :class:`Retention` configuration object. + :param legal_hold: Flag to set legal hold for destination object. + :return: :class:`ObjectWriteResult ` object. + + Example:: + sources = [ + ComposeSource("my-job-bucket", "my-object-part-one"), + ComposeSource("my-job-bucket", "my-object-part-two"), + ComposeSource("my-job-bucket", "my-object-part-three"), + ] + + # Create my-bucket/my-object by combining source object + # list. + result = client.compose_object("my-bucket", "my-object", sources) + print(result.object_name, result.version_id) + + # Create my-bucket/my-object with user metadata by combining + # source object list. + result = client.compose_object( + "my-bucket", + "my-object", + sources, + metadata={"test_meta_key": "test_meta_value"}, + ) + print(result.object_name, result.version_id) + + # Create my-bucket/my-object with user metadata and + # server-side encryption by combining source object list. + client.compose_object( + "my-bucket", "my-object", sources, sse=SseS3(), + ) + print(result.object_name, result.version_id) + """ + check_bucket_name(bucket_name) + check_non_empty_string(object_name) + if not isinstance(sources, (list, tuple)) or not sources: + raise ValueError("sources must be non-empty list or tuple type") + i = 0 + for src in sources: + if not isinstance(src, ComposeSource): + raise ValueError( + "sources[{0}] must be ComposeSource type".format(i), + ) + i += 1 + check_sse(sse) + if tags is not None and not isinstance(tags, Tags): + raise ValueError("tags must be Tags type") + if retention is not None and not isinstance(retention, Retention): + raise ValueError("retention must be Retention type") + + part_count = self._calc_part_count(sources) + if ( + part_count == 1 and + sources[0].offset is None and + sources[0].length is None + ): + return self.copy_object( + bucket_name, object_name, CopySource.of(sources[0]), + sse=sse, metadata=metadata, tags=tags, retention=retention, + legal_hold=legal_hold, + metadata_directive=REPLACE if metadata else None, + tagging_directive=REPLACE if tags else None, + ) + + headers = genheaders(metadata, sse, tags, retention, legal_hold) + upload_id = self._create_multipart_upload( + bucket_name, object_name, headers, + ) + ssec_headers = sse.headers() if isinstance(sse, SseCustomerKey) else {} + try: + part_number = 0 + total_parts = [] + for src in sources: + size = src.object_size + if src.length is not None: + size = src.length + elif src.offset is not None: + size -= src.offset + offset = src.offset or 0 + headers = src.headers + headers.update(ssec_headers) + if size <= MAX_PART_SIZE: + part_number += 1 + if src.length is not None: + headers["x-amz-copy-source-range"] = ( + "bytes={0}-{1}".format(offset, offset+src.length-1) + ) + elif src.offset is not None: + headers["x-amz-copy-source-range"] = ( + "bytes={0}-{1}".format(offset, offset+size-1) + ) + etag, _ = self._upload_part_copy( + bucket_name, + object_name, + upload_id, + part_number, + headers, + ) + total_parts.append(Part(part_number, etag)) + continue + while size > 0: + part_number += 1 + start_bytes = offset + end_bytes = start_bytes + MAX_PART_SIZE + if size < MAX_PART_SIZE: + end_bytes = start_bytes + size + headers_copy = headers.copy() + headers_copy["x-amz-copy-source-range"] = ( + "bytes={0}-{1}".format(start_bytes, end_bytes) + ) + etag, _ = self._upload_part_copy( + bucket_name, + object_name, + upload_id, + part_number, + headers_copy, + ) + total_parts.append(Part(part_number, etag)) + offset = start_bytes + size -= end_bytes - start_bytes + result = self._complete_multipart_upload( + bucket_name, object_name, upload_id, total_parts, + ) + return ObjectWriteResult( + result.bucket_name, + result.object_name, + result.version_id, + result.etag, + result.http_headers, + location=result.location, + ) + except Exception as exc: + if upload_id: + self._abort_multipart_upload( + bucket_name, object_name, upload_id, + ) + raise exc + def _abort_multipart_upload(self, bucket_name, object_name, upload_id): """Execute AbortMultipartUpload S3 API.""" self._execute( diff --git a/minio/commonconfig.py b/minio/commonconfig.py index 355aa37a1..735dd4205 100644 --- a/minio/commonconfig.py +++ b/minio/commonconfig.py @@ -19,10 +19,17 @@ from __future__ import absolute_import +import datetime from abc import ABCMeta +from .error import MinioException +from .helpers import quote +from .sse import SseCustomerKey +from .time import to_http_header from .xml import SubElement, find, findall, findtext +COPY = "COPY" +REPLACE = "REPLACE" DISABLED = "Disabled" ENABLED = "Enabled" GOVERNANCE = "GOVERNANCE" @@ -262,3 +269,210 @@ def check_status(status): """Validate status.""" if status not in [ENABLED, DISABLED]: raise ValueError("status must be 'Enabled' or 'Disabled'") + + +class ObjectConditionalReadArgs: + """Base argument class holds condition properties for reading object.""" + __metaclass__ = ABCMeta + + def __init__(self, bucket_name, object_name, region=None, version_id=None, + ssec=None, offset=None, length=None, match_etag=None, + not_match_etag=None, modified_since=None, + unmodified_since=None): + if ssec is not None and not isinstance(ssec, SseCustomerKey): + raise ValueError("ssec must be SseCustomerKey type") + if offset is not None and offset < 0: + raise ValueError("offset should be zero or greater") + if length is not None and length <= 0: + raise ValueError("length should be greater than zero") + if match_etag is not None and match_etag == "": + raise ValueError("match_etag must not be empty") + if not_match_etag is not None and not_match_etag == "": + raise ValueError("not_match_etag must not be empty") + if ( + modified_since is not None and + not isinstance(modified_since, datetime.datetime) + ): + raise ValueError("modified_since must be datetime.datetime type") + if ( + unmodified_since is not None and + not isinstance(unmodified_since, datetime.datetime) + ): + raise ValueError("unmodified_since must be datetime.datetime type") + + self._bucket_name = bucket_name + self._object_name = object_name + self._region = region + self._version_id = version_id + self._ssec = ssec + self._offset = offset + self._length = length + self._match_etag = match_etag + self._not_match_etag = not_match_etag + self._modified_since = modified_since + self._unmodified_since = unmodified_since + + @property + def bucket_name(self): + """Get bucket name.""" + return self._bucket_name + + @property + def object_name(self): + """Get object name.""" + return self._object_name + + @property + def region(self): + """Get region.""" + return self._region + + @property + def version_id(self): + """Get version ID.""" + return self._version_id + + @property + def ssec(self): + """Get SSE-C.""" + return self._ssec + + @property + def offset(self): + """Get offset.""" + return self._offset + + @property + def length(self): + """Get length.""" + return self._length + + @property + def match_etag(self): + """Get match ETag condition.""" + return self._match_etag + + @property + def not_match_etag(self): + """Get not-match ETag condition.""" + return self._not_match_etag + + @property + def modified_since(self): + """Get modified since condition.""" + return self._modified_since + + @property + def unmodified_since(self): + """Get unmodified since condition.""" + return self._unmodified_since + + def gen_copy_headers(self): + """Generate copy source headers.""" + copy_source = quote("/" + self._bucket_name + "/" + self._object_name) + if self._version_id: + copy_source += "?versionId=" + quote(self._version_id) + + headers = {"x-amz-copy-source": copy_source} + if self._ssec: + headers.update(self._ssec.copy_headers()) + if self._match_etag: + headers["x-amz-copy-source-if-match"] = self._match_etag + if self._not_match_etag: + headers["x-amz-copy-source-if-none-match"] = self._not_match_etag + if self._modified_since: + headers["x-amz-copy-source-if-modified-since"] = ( + to_http_header(self._modified_since) + ) + if self._unmodified_since: + headers["x-amz-copy-source-if-unmodified-since"] = ( + to_http_header(self._unmodified_since) + ) + return headers + + +class CopySource(ObjectConditionalReadArgs): + """A source object defintion for copy_object method.""" + @classmethod + def of(cls, src): + """Create CopySource from another source.""" + return cls( + src.bucket_name, src.object_name, src.region, src.version_id, + src.ssec, src.offset, src.length, src.match_etag, + src.not_match_etag, src.modified_since, src.unmodified_since, + ) + + +class ComposeSource(ObjectConditionalReadArgs): + """A source object defintion for compose_object method.""" + + def __init__(self, bucket_name, object_name, region=None, version_id=None, + ssec=None, offset=None, length=None, match_etag=None, + not_match_etag=None, modified_since=None, + unmodified_since=None): + super().__init__( + bucket_name, object_name, region, version_id, ssec, offset, length, + match_etag, not_match_etag, modified_since, unmodified_since, + ) + self._object_size = None + self._headers = None + + def _validate_size(self, object_size): + """Validate object size with offset and length.""" + def _get_error(name, value): + ver = ("?versionId="+self._version_id) if self._version_id else "" + return ( + "Source {0}/{1}{2}: {3} {4} is beyond object size {5}".format( + self._bucket_name, + self._object_name, + ver, + name, + value, + object_size, + ) + ) + if self._offset is not None and self._offset >= object_size: + raise ValueError("offset", self._offset) + if self._length is not None: + if self._length > object_size: + raise ValueError("length", self._length) + offset = self._offset or 0 + if offset+self.length > object_size: + raise ValueError("compose size", offset+self._length) + + def build_headers(self, object_size, etag): + """Build headers.""" + self._validate_size(object_size) + self._object_size = object_size + headers = self.gen_copy_headers() + headers["x-amz-copy-source-if-match"] = self._match_etag or etag + self._headers = headers + + @property + def object_size(self): + """Get object size.""" + if self._object_size is None: + raise MinioException( + "build_headers() must be called prior to " + "this method invocation", + ) + return self._object_size + + @property + def headers(self): + """Get headers.""" + if self._headers is None: + raise MinioException( + "build_headers() must be called prior to " + "this method invocation", + ) + return self._headers.copy() + + @classmethod + def of(cls, src): + """Create ComposeSource from another source.""" + return cls( + src.bucket_name, src.object_name, src.region, src.version_id, + src.ssec, src.offset, src.length, src.match_etag, + src.not_match_etag, src.modified_since, src.unmodified_since, + ) diff --git a/minio/copy_conditions.py b/minio/copy_conditions.py deleted file mode 100644 index 45ac3f8af..000000000 --- a/minio/copy_conditions.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- -# MinIO Python Library for Amazon S3 Compatible Cloud Storage, -# (C) 2016 MinIO, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -minio.copy_conditions -~~~~~~~~~~~~~~~ - -This module contains :class:`CopyConditions ` implementation. - -:copyright: (c) 2016 by MinIO, Inc. -:license: Apache 2.0, see LICENSE for more details. - -""" - -from collections.abc import MutableMapping - -from .helpers import check_non_empty_string -from .time import to_http_header - -# CopyCondition explanation: -# http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html -# -# Example: -# -# copyCondition { -# key: "x-amz-copy-if-modified-since", -# value: "Tue, 15 Nov 1994 12:45:26 GMT", -# - - -class CopyConditions(MutableMapping): - """ - A :class:`CopyConditions ` collection of - supported CopyObject conditions. - - - x-amz-copy-source-if-match - - x-amz-copy-source-if-none-match - - x-amz-copy-source-if-unmodified-since - - x-amz-copy-source-if-modified-since - - """ - - def __init__(self, *args, **kwargs): - self._store = dict(*args, **kwargs) - - def __getitem__(self, key): - return self._store[key] - - def __setitem__(self, key, value): - self._store[key] = value - - def __delitem__(self, key): - del self._store[key] - - def __iter__(self): - return iter(self._store) - - def __len__(self): - return len(self._store) - - def set_match_etag(self, etag): - """Set ETag match condition.""" - check_non_empty_string(etag) - self._store["X-Amz-Copy-Source-If-Match"] = etag - - def set_match_etag_except(self, etag): - """Set ETag not match condition.""" - check_non_empty_string(etag) - self._store["X-Amz-Copy-Source-If-None-Match"] = etag - - def set_unmodified_since(self, mod_time): - """Set unmodified since condition.""" - time = to_http_header(mod_time) - self._store["X-Amz-Copy-Source-If-Unmodified-Since"] = time - - def set_modified_since(self, mod_time): - """Set modified since condition.""" - time = to_http_header(mod_time) - self._store["X-Amz-Copy-Source-If-Modified-Since"] = time diff --git a/minio/datatypes.py b/minio/datatypes.py index 086e7b400..1143a1c1d 100644 --- a/minio/datatypes.py +++ b/minio/datatypes.py @@ -724,3 +724,15 @@ def form_data(self, creds, region): def bucket_name(self): """Get bucket name.""" return self._bucket_name + + +def parse_copy_object(response): + """Parse CopyObject/UploadPartCopy response.""" + element = ET.fromstring(response.data.decode()) + etag = findtext(element, "ETag") + if etag: + etag = etag.replace('"', "") + last_modified = findtext(element, "LastModified") + if last_modified: + last_modified = from_iso8601utc(last_modified) + return etag, last_modified diff --git a/minio/helpers.py b/minio/helpers.py index dc85206a6..d2c9948da 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -29,6 +29,7 @@ from threading import BoundedSemaphore, Thread from .sse import Sse, SseCustomerKey +from .time import to_iso8601utc # Constants MAX_MULTIPART_COUNT = 10000 # 10000 parts @@ -347,6 +348,28 @@ def guess_user_metadata(key): return headers +def genheaders(headers, sse, tags, retention, legal_hold): + """Generate headers for given parameters.""" + headers = normalize_headers(headers) + headers.update(sse.headers() if sse else {}) + tagging = "&".join( + [ + queryencode(key) + "=" + queryencode(value) + for key, value in (tags or {}).items() + ], + ) + if tagging: + headers["x-amz-tagging"] = tagging + if retention and retention.mode(): + headers["x-amz-object-lock-mode"] = retention.mode() + headers["x-amz-object-lock-retain-until-date"] = ( + to_iso8601utc(retention.retain_until_date()) + ) + if legal_hold: + headers["x-amz-object-lock-legal-hold"] = "ON" + return headers + + def _extract_region(host): """Extract region from Amazon S3 host.""" diff --git a/tests/functional/tests.py b/tests/functional/tests.py index 80bb56259..a3dbe6837 100644 --- a/tests/functional/tests.py +++ b/tests/functional/tests.py @@ -39,8 +39,8 @@ import certifi import urllib3 -from minio import CopyConditions, Minio -from minio.commonconfig import ENABLED +from minio import Minio +from minio.commonconfig import ENABLED, REPLACE, CopySource from minio.datatypes import PostPolicy from minio.deleteobjects import DeleteObject from minio.error import S3Error @@ -446,9 +446,10 @@ def test_copy_object_no_copy_condition( # pylint: disable=invalid-name size = 1 * KB reader = LimitedRandomReader(size) _CLIENT.put_object(bucket_name, object_source, reader, size, sse=ssec) - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source, - source_sse=ssec_copy, sse=ssec) + _CLIENT.copy_object( + bucket_name, object_copy, sse=ssec, + source=CopySource(bucket_name, object_source, ssec=ssec_copy), + ) st_obj = _CLIENT.stat_object(bucket_name, object_copy, ssec=ssec) _validate_stat(st_obj, size, {}) finally: @@ -482,9 +483,10 @@ def test_copy_object_with_metadata(log_entry): reader = LimitedRandomReader(size) _CLIENT.put_object(bucket_name, object_source, reader, size) # Perform a server side copy of an object - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source, - metadata=metadata) + _CLIENT.copy_object( + bucket_name, object_copy, CopySource(bucket_name, object_source), + metadata=metadata, metadata_directive=REPLACE, + ) # Verification st_obj = _CLIENT.stat_object(bucket_name, object_copy) expected_metadata = {'x-amz-meta-testing-int': '1', @@ -518,16 +520,16 @@ def test_copy_object_etag_match(log_entry): reader = LimitedRandomReader(size) _CLIENT.put_object(bucket_name, object_source, reader, size) # Perform a server side copy of an object - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source) + _CLIENT.copy_object( + bucket_name, object_copy, CopySource(bucket_name, object_source), + ) # Verification source_etag = _CLIENT.stat_object(bucket_name, object_source).etag - copy_conditions = CopyConditions() - copy_conditions.set_match_etag(source_etag) log_entry["args"]["conditions"] = {'set_match_etag': source_etag} - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source, - copy_conditions) + _CLIENT.copy_object( + bucket_name, object_copy, + CopySource(bucket_name, object_source, match_etag=source_etag), + ) finally: _CLIENT.remove_object(bucket_name, object_source) _CLIENT.remove_object(bucket_name, object_copy) @@ -560,12 +562,11 @@ def test_copy_object_negative_etag_match( # pylint: disable=invalid-name # Perform a server side copy of an object # with incorrect pre-conditions and fail etag = 'test-etag' - copy_conditions = CopyConditions() - copy_conditions.set_match_etag(etag) log_entry["args"]["conditions"] = {'set_match_etag': etag} - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source, - copy_conditions) + _CLIENT.copy_object( + bucket_name, object_copy, + CopySource(bucket_name, object_source, match_etag=etag), + ) except S3Error as exc: if exc.code != "PreconditionFailed": raise @@ -597,16 +598,15 @@ def test_copy_object_modified_since(log_entry): reader = LimitedRandomReader(size) _CLIENT.put_object(bucket_name, object_source, reader, size) # Set up the 'modified_since' copy condition - copy_conditions = CopyConditions() mod_since = datetime(2014, 4, 1, tzinfo=timezone.utc) - copy_conditions.set_modified_since(mod_since) log_entry["args"]["conditions"] = { 'set_modified_since': to_http_header(mod_since)} # Perform a server side copy of an object # and expect the copy to complete successfully - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source, - copy_conditions) + _CLIENT.copy_object( + bucket_name, object_copy, + CopySource(bucket_name, object_source, modified_since=mod_since), + ) finally: _CLIENT.remove_object(bucket_name, object_source) _CLIENT.remove_object(bucket_name, object_copy) @@ -636,18 +636,19 @@ def test_copy_object_unmodified_since( # pylint: disable=invalid-name reader = LimitedRandomReader(size) _CLIENT.put_object(bucket_name, object_source, reader, size) # Set up the 'unmodified_since' copy condition - copy_conditions = CopyConditions() unmod_since = datetime(2014, 4, 1, tzinfo=timezone.utc) - copy_conditions.set_unmodified_since(unmod_since) log_entry["args"]["conditions"] = { 'set_unmodified_since': to_http_header(unmod_since)} try: # Perform a server side copy of an object and expect # the copy to fail since the creation/modification # time is now, way later than unmodification time, April 1st, 2014 - _CLIENT.copy_object(bucket_name, object_copy, - '/' + bucket_name + '/' + object_source, - copy_conditions) + _CLIENT.copy_object( + bucket_name, object_copy, + CopySource( + bucket_name, object_source, unmodified_since=unmod_since, + ), + ) except S3Error as exc: if exc.code != "PreconditionFailed": raise diff --git a/tests/unit/copy_object_test.py b/tests/unit/copy_object_test.py index 544bbd4c2..178ab0df8 100644 --- a/tests/unit/copy_object_test.py +++ b/tests/unit/copy_object_test.py @@ -19,43 +19,27 @@ from nose.tools import raises from minio import Minio -from minio.copy_conditions import CopyConditions +from minio.commonconfig import CopySource class CopyObjectTest(TestCase): - @raises(TypeError) - def test_object_is_string(self): - client = Minio('localhost:9000') - client.copy_object('hello', 12, 12) - @raises(ValueError) - def test_object_is_not_empty_string(self): + def test_valid_copy_source(self): client = Minio('localhost:9000') - client.copy_object('hello', ' \t \n ', '') + client.copy_object('hello', '1', '/testbucket/object') @raises(ValueError) - def test_length_is_string(self): - client = Minio('localhost:9000') - client.copy_object('..hello', '1', '/testbucket/object') + def test_valid_match_etag(self): + CopySource("src-bucket", "src-object", match_etag='') + @raises(ValueError) + def test_not_match_etag(self): + CopySource("src-bucket", "src-object", not_match_etag='') -class CopyConditionTest(TestCase): @raises(ValueError) - def test_match_etag_is_not_empty(self): - conds = CopyConditions() - conds.set_match_etag('') + def test_valid_modified_since(self): + CopySource("src-bucket", "src-object", modified_since='') @raises(ValueError) - def test_match_etag_is_not_empty_except(self): - conds = CopyConditions() - conds.set_match_etag_except('') - - @raises(AttributeError) - def test_unmodified_since(self): - conds = CopyConditions() - conds.set_unmodified_since('') - - @raises(AttributeError) - def test_modified_since(self): - conds = CopyConditions() - conds.set_modified_since('') + def test_valid_unmodified_since(self): + CopySource("src-bucket", "src-object", unmodified_since='')