From 08901d53e66cefaad494609e8d1c2f85a51ff6e6 Mon Sep 17 00:00:00 2001 From: Bala FA Date: Wed, 2 Dec 2020 17:07:01 +0530 Subject: [PATCH] Simplify select_object_content() response stream (#1018) --- Makefile | 2 +- docs/API.md | 25 +-- examples/select_object_content.py | 37 +++-- minio/api.py | 23 ++- minio/{selectrequest.py => select.py} | 178 +++++++++++++++++++- minio/select/__init__.py | 32 ---- minio/select/errors.py | 38 ----- minio/select/helpers.py | 59 ------- minio/select/reader.py | 228 -------------------------- setup.py | 1 - tests/functional/tests.py | 14 +- 11 files changed, 229 insertions(+), 408 deletions(-) rename minio/{selectrequest.py => select.py} (65%) delete mode 100644 minio/select/__init__.py delete mode 100644 minio/select/errors.py delete mode 100644 minio/select/helpers.py delete mode 100644 minio/select/reader.py diff --git a/Makefile b/Makefile index fb004cb79..0739a68ff 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ check: @pip install --user --upgrade pylint @if python --version | grep -qi 'python 3'; then pylint --reports=no --score=no --disable=R0401,R0801 minio/*py; fi - @if python --version | grep -qi 'python 3'; then pylint --reports=no --score=no minio/credentials minio/select tests/functional; fi + @if python --version | grep -qi 'python 3'; then pylint --reports=no --score=no minio/credentials tests/functional; fi @isort --diff --recursive . diff --git a/docs/API.md b/docs/API.md index e1982933b..a78d87839 100644 --- a/docs/API.md +++ b/docs/API.md @@ -889,18 +889,19 @@ __Return Value__ __Example__ ```py -request = SelectRequest( - "select * from s3object", - CSVInputSerialization(), - CSVOutputSerialization(), - request_progress=True, -) -data = client.select_object_content('my-bucket', 'my-object', request) -with open('my-record-file', 'w') as record_data: - for d in data.stream(10*1024): - record_data.write(d) - # Get the stats - print(data.stats()) +with client.select_object_content( + "my-bucket", + "my-object.csv", + SelectRequest( + "select * from S3Object", + CSVInputSerialization(), + CSVOutputSerialization(), + request_progress=True, + ), +) as result: + for data in result.stream(): + print(data.decode()) + print(result.stats()) ``` diff --git a/examples/select_object_content.py b/examples/select_object_content.py index 8ebfd1ed2..20bbbefb2 100644 --- a/examples/select_object_content.py +++ b/examples/select_object_content.py @@ -16,22 +16,25 @@ from minio import Minio -from minio.selectrequest import (CSVInputSerialization, CSVOutputSerialization, - SelectRequest) +from minio.select import (CSVInputSerialization, CSVOutputSerialization, + SelectRequest) -client = Minio('s3.amazonaws.com', - access_key='YOUR-ACCESSKEY', - secret_key='YOUR-SECRETKEY') - -request = SelectRequest( - "select * from s3object", - CSVInputSerialization(), - CSVOutputSerialization(), - request_progress=True, +client = Minio( + "s3.amazonaws.com", + access_key="YOUR-ACCESSKEY", + secret_key="YOUR-SECRETKEY", ) -data = client.select_object_content('my-bucket', 'my-object', request) -with open('my-record-file', 'w') as record_data: - for d in data.stream(10*1024): - record_data.write(d) - # Get the stats - print(data.stats()) + +with client.select_object_content( + "my-bucket", + "my-object.csv", + SelectRequest( + "select * from S3Object", + CSVInputSerialization(), + CSVOutputSerialization(), + request_progress=True, + ), +) as result: + for data in result.stream(): + print(data.decode()) + print(result.stats()) diff --git a/minio/api.py b/minio/api.py index b24338d81..2455222c6 100644 --- a/minio/api.py +++ b/minio/api.py @@ -64,8 +64,7 @@ from .objectlockconfig import ObjectLockConfig from .replicationconfig import ReplicationConfig from .retention import Retention -from .select import SelectObjectReader -from .selectrequest import SelectRequest +from .select import SelectObjectReader, SelectRequest from .signer import presign_v4, sign_v4_s3 from .sse import SseCustomerKey from .sseconfig import SSEConfig @@ -546,13 +545,19 @@ def select_object_content(self, bucket_name, object_name, request): :return: A reader contains requested records and progress information. Example:: - request = SelectRequest( - "select * from s3object", - CSVInputSerialization(), - CSVOutputSerialization(), - request_progress=True, - ) - data = client.select_object_content('foo', 'test.csv', request) + with client.select_object_content( + "my-bucket", + "my-object.csv", + SelectRequest( + "select * from S3Object", + CSVInputSerialization(), + CSVOutputSerialization(), + request_progress=True, + ), + ) as result: + for data in result.stream(): + print(data.decode()) + print(result.stats()) """ check_bucket_name(bucket_name) check_non_empty_string(object_name) diff --git a/minio/selectrequest.py b/minio/select.py similarity index 65% rename from minio/selectrequest.py rename to minio/select.py index 6e4156d81..69927083b 100644 --- a/minio/selectrequest.py +++ b/minio/select.py @@ -14,13 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Request/response of PutBucketReplication and GetBucketReplication APIs.""" +"""Request/response of SelectObjectContent API.""" from __future__ import absolute_import from abc import ABCMeta +from binascii import crc32 +from io import BytesIO +from xml.etree import ElementTree as ET -from .xml import Element, SubElement +from .error import MinioException +from .xml import Element, SubElement, findtext COMPRESSION_TYPE_NONE = "NONE" COMPRESSION_TYPE_GZIP = "GZIP" @@ -154,8 +158,8 @@ def toxml(self, element): class ParquetInputSerialization(InputSerialization): """Parquet input serialization.""" - def __init__(self, compression_type=None): - super().__init__(compression_type) + def __init__(self): + super().__init__(None) def toxml(self, element): """Convert to XML.""" @@ -273,3 +277,169 @@ def toxml(self, element): if self._scan_end_range: SubElement(tag, "End", self._scan_end_range) return element + + +def _read(reader, size): + """Wrapper to RawIOBase.read() to error out on short reads.""" + data = reader.read(size) + if len(data) != size: + raise IOError("insufficient data") + return data + + +def _int(data): + """Convert byte data to big-endian int.""" + return int.from_bytes(data, byteorder="big") + + +def _crc32(data): + """Wrapper to binascii.crc32().""" + return crc32(data) & 0xffffffff + + +def _decode_header(data): + """Decode header data.""" + reader = BytesIO(data) + headers = {} + while True: + length = reader.read(1) + if not length: + break + name = _read(reader, _int(length)) + if _int(_read(reader, 1)) != 7: + raise IOError("header value type is not 7") + value = _read(reader, _int(_read(reader, 2))) + headers[name.decode()] = value.decode() + return headers + + +class Stats: + """Progress/Stats information.""" + + def __init__(self, data): + element = ET.fromstring(data.decode()) + self._bytes_scanned = findtext(element, "BytesScanned") + self._bytes_processed = findtext(element, "BytesProcessed") + self._bytes_returned = findtext(element, "BytesReturned") + + @property + def bytes_scanned(self): + """Get bytes scanned.""" + return self._bytes_scanned + + @property + def bytes_processed(self): + """Get bytes processed.""" + return self._bytes_processed + + @property + def bytes_returned(self): + """Get bytes returned.""" + return self._bytes_returned + + +class SelectObjectReader: + """ + BufferedIOBase compatible reader represents response data of + Minio.select_object_content() API. + """ + + def __init__(self, response): + self._response = response + self._stats = None + self._payload = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + return self.close() + + def readable(self): # pylint: disable=no-self-use + """Return this is readable.""" + return True + + def writeable(self): # pylint: disable=no-self-use + """Return this is not writeable.""" + return False + + def close(self): + """Close response and release network resources.""" + self._response.close() + self._response.release_conn() + + def stats(self): + """Get stats information.""" + return self._stats + + def _read(self): + """Read and decode response.""" + if self._response.isclosed(): + return 0 + + prelude = _read(self._response, 8) + prelude_crc = _read(self._response, 4) + if _crc32(prelude) != _int(prelude_crc): + raise IOError( + "prelude CRC mismatch; expected: {0}, got: {1}".format( + _crc32(prelude), _int(prelude_crc), + ), + ) + + total_length = _int(prelude[:4]) + data = _read(self._response, total_length - 8 - 4 - 4) + message_crc = _int(_read(self._response, 4)) + if _crc32(prelude + prelude_crc + data) != message_crc: + raise IOError( + "message CRC mismatch; expected: {0}, got: {1}".format( + _crc32(prelude + prelude_crc + data), + message_crc, + ), + ) + + header_length = _int(prelude[4:]) + headers = _decode_header(data[:header_length]) + + if headers.get(":message-type") == "error": + raise MinioException( + "{0}: {1}".format( + headers.get(":error-code"), headers.get(":error-message"), + ), + ) + + if headers.get(":event-type") == "End": + return 0 + + payload_length = total_length - header_length - 16 + if headers.get(":event-type") == "Cont" or payload_length < 1: + return self._read() + + payload = data[header_length:header_length+payload_length] + + if headers.get(":event-type") in ["Progress", "Stats"]: + self._stats = Stats(payload) + return self._read() + + if headers.get(":event-type") == "Records": + self._payload = payload + return len(payload) + + raise MinioException( + "unknown event-type {0}".format(headers.get(":event-type")), + ) + + def stream(self, num_bytes=32*1024): + """ + Stream extracted payload from response data. Upon completion, caller + should call self.close() to release network resources. + """ + while True: + if self._payload: + result = self._payload + if num_bytes < len(self._payload): + result = self._payload[:num_bytes] + self._payload = self._payload[len(result):] + yield result + + if self._read() <= 0: + break diff --git a/minio/select/__init__.py b/minio/select/__init__.py deleted file mode 100644 index 4da89ef5f..000000000 --- a/minio/select/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -# -*- coding: utf-8 -*- -# MinIO Python Library for Amazon S3 Compatible Cloud Storage, -# (C) 2019 MinIO, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -:copyright: (c) 2019 by MinIO, Inc. -:license: Apache 2.0, see LICENSE for more details. -""" - -__title__ = 'minio-py' -__author__ = 'MinIO, Inc.' -__version__ = '0.1.0' -__license__ = 'Apache 2.0' -__copyright__ = 'Copyright 2019 MinIO, Inc.' - -# pylint: disable=unused-import -from .errors import SelectCRCValidationError, SelectMessageError -from .helpers import (byte_int, calculate_crc, # pylint: disable=unused-import - validate_crc) -from .reader import SelectObjectReader # pylint: disable=unused-import diff --git a/minio/select/errors.py b/minio/select/errors.py deleted file mode 100644 index b25867c5d..000000000 --- a/minio/select/errors.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C) -# 2019 MinIO, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -minio.select.errors -~~~~~~~~~~~~~~~ - -This module implements the error classes for SelectObject responses. - -:copyright: (c) 2019 by MinIO, Inc. -:license: Apache 2.0, see LICENSE for more details. - -""" - - -class SelectMessageError(Exception): - ''' - Raised in case of message type 'error' - ''' - - -class SelectCRCValidationError(Exception): - ''' - Raised in case of CRC mismatch - ''' diff --git a/minio/select/helpers.py b/minio/select/helpers.py deleted file mode 100644 index 6d0147a9e..000000000 --- a/minio/select/helpers.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C) -# 2019 MinIO, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -minio.select.helpers -~~~~~~~~~~~~~~~ - -This module implements the helper functions for SelectObject responses. - -:copyright: (c) 2019 by MinIO, Inc. -:license: Apache 2.0, see LICENSE for more details. - -""" - -import codecs -from binascii import crc32 - -EVENT_RECORDS = 'Records' # Event Type is Records -EVENT_PROGRESS = 'Progress' # Event Type Progress -EVENT_STATS = 'Stats' # Event Type Stats -EVENT_CONT = 'Cont' # Event Type continue -EVENT_END = 'End' # Event Type is End -EVENT_CONTENT_TYPE = "text/xml" # Event content xml type -EVENT = 'event' # Message Type is event -ERROR = 'error' # Message Type is error - - -def calculate_crc(value): - ''' - Returns the CRC using crc32 - ''' - return crc32(value) & 0xffffffff - - -def validate_crc(current_value, expected_value): - ''' - Validate through CRC check - ''' - return calculate_crc(current_value) == byte_int(expected_value) - - -def byte_int(data_bytes): - ''' - Convert bytes to big-endian integer - ''' - return int(codecs.encode(data_bytes, 'hex'), 16) diff --git a/minio/select/reader.py b/minio/select/reader.py deleted file mode 100644 index e63fc638f..000000000 --- a/minio/select/reader.py +++ /dev/null @@ -1,228 +0,0 @@ -# -*- coding: utf-8 -*- -# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C) -# 2019 MinIO, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -minio.select.reader -~~~~~~~~~~~~~~~ - -This module implements the reader for SelectObject response body. - -:copyright: (c) 2019 by MinIO, Inc. -:license: Apache 2.0, see LICENSE for more details. - -""" - -from __future__ import absolute_import - -import io -import sys -from xml.etree import ElementTree - -from .errors import SelectCRCValidationError, SelectMessageError -from .helpers import (ERROR, EVENT, EVENT_CONTENT_TYPE, EVENT_RECORDS, - EVENT_STATS, byte_int, calculate_crc, validate_crc) - - -def _extract_header(header_bytes): - """ - populates the header map after reading the header in bytes - """ - header_map = {} - header_byte_parsed = 0 - # While loop ends when all the headers present are read - # header contains multipe headers - while header_byte_parsed < len(header_bytes): - header_name_byte_length = byte_int( - header_bytes[header_byte_parsed:header_byte_parsed+1]) - header_byte_parsed += 1 - header_name = header_bytes[ - header_byte_parsed:header_byte_parsed+header_name_byte_length - ] - header_byte_parsed += header_name_byte_length - # Header Value Type is of 1 bytes and is skipped - header_byte_parsed += 1 - value_string_byte_length = byte_int( - header_bytes[header_byte_parsed:header_byte_parsed+2] - ) - header_byte_parsed += 2 - header_value = header_bytes[ - header_byte_parsed:header_byte_parsed+value_string_byte_length - ] - header_byte_parsed += value_string_byte_length - header_map[header_name.decode( - "utf-8").lstrip(":")] = header_value.decode("utf-8").lstrip(":") - return header_map - - -def _parse_stats(stats): - """ - Parses stats XML and populates the stat dict. - """ - stat = {} - for attribute in ElementTree.fromstring(stats): - if attribute.tag == 'BytesScanned': - stat['BytesScanned'] = attribute.text - elif attribute.tag == 'BytesProcessed': - stat['BytesProcessed'] = attribute.text - elif attribute.tag == 'BytesReturned': - stat['BytesReturned'] = attribute.text - - return stat - - -class SelectObjectReader: - """ - SelectObjectReader returns a Reader that upon read - returns queried data, but stops when the response ends. - LimitedRandomReader is compatible with BufferedIOBase. - """ - - def __init__(self, response): - self.response = response - self.remaining_bytes = bytes() - self.stat = {} - self.prog = {} - - def readable(self): # pylint: disable=no-self-use - """Return this is readable.""" - return True - - def writeable(self): # pylint: disable=no-self-use - """Return this is not writeable.""" - return False - - def close(self): - """Close response.""" - self.response.close() - - def stats(self): - """Get stats information.""" - return self.stat - - def progress(self): - """Get progress information.""" - return self.prog - - def __extract_message(self): - """ - Process the response sent from server. - https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html - """ - - crc_bytes = io.BytesIO() - total_bytes_len = self.response.read(4) - if not total_bytes_len: - return {} - - total_length = byte_int(total_bytes_len) - header_bytes_len = self.response.read(4) - if not header_bytes_len: - return {} - - header_len = byte_int(header_bytes_len) - - crc_bytes.write(total_bytes_len) - crc_bytes.write(header_bytes_len) - - prelude_bytes_crc = self.response.read(4) - if not validate_crc(crc_bytes.getvalue(), prelude_bytes_crc): - raise SelectCRCValidationError( - {"Checksum Mismatch, PreludeCRC of " + - str(calculate_crc(crc_bytes.getvalue())) + - " does not equal expected CRC of " + - str(byte_int(prelude_bytes_crc))}) - - crc_bytes.write(prelude_bytes_crc) - - header_bytes = self.response.read(header_len) - if not header_bytes: - raise SelectMessageError( - "Premature truncation of select message header" + - ", server is sending corrupt message?") - - crc_bytes.write(header_bytes) - - header_map = _extract_header(header_bytes) - payload_length = total_length - header_len - int(16) - payload_bytes = b'' - event_type = header_map["event-type"] - - if header_map["message-type"] == ERROR: - raise SelectMessageError( - header_map["error-code"] + ":\"" + - header_map["error-message"] + "\"") - - if header_map["message-type"] != EVENT: - raise SelectMessageError( - "Unrecognized message-type {0}".format( - header_map["message-type"]) - ) - - if event_type == EVENT_STATS: - content_type = header_map["content-type"] - if content_type != EVENT_CONTENT_TYPE: - raise SelectMessageError( - "Unrecognized content-type {0}".format(content_type)) - - payload_bytes = self.response.read(payload_length) - self.stat = _parse_stats(payload_bytes) - elif event_type == EVENT_RECORDS: - payload_bytes = self.response.read(payload_length) - - crc_bytes.write(payload_bytes) - - message_crc = self.response.read(4) - if not message_crc: - return {} - - if not validate_crc(crc_bytes.getvalue(), message_crc): - raise SelectCRCValidationError( - {"Checksum Mismatch, MessageCRC of " + - str(calculate_crc(crc_bytes.getvalue())) + - " does not equal expected CRC of " + - str(byte_int(message_crc))}) - - message = {event_type: payload_bytes} - return message - - def stream(self, num_bytes=32*1024): - """ - extract each record from the response body ... and buffer it. - send only up to requested bytes such as message[:num_bytes] - rest is buffered and added to the next iteration. - - caller should call self.close() to close the stream. - """ - while not self.response.isclosed(): - if not self.remaining_bytes: - message = self.__extract_message() - if EVENT_RECORDS not in message: - continue - - self.remaining_bytes = message.get(EVENT_RECORDS, b'') - - result = self.remaining_bytes - if num_bytes < len(self.remaining_bytes): - result = self.remaining_bytes[:num_bytes] - self.remaining_bytes = self.remaining_bytes[len(result):] - - if result == b'': - break - if sys.version_info.major == 3: - yield result.decode('utf-8', errors='ignore') - else: - # Python 2.x needs explicit conversion. - yield result.decode('utf-8', errors='ignore').encode('utf-8') diff --git a/setup.py b/setup.py index 5e72da399..fa4f0f904 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,6 @@ packages = [ 'minio', - 'minio.select', 'minio.credentials' ] diff --git a/tests/functional/tests.py b/tests/functional/tests.py index a3dbe6837..fae516966 100644 --- a/tests/functional/tests.py +++ b/tests/functional/tests.py @@ -31,6 +31,7 @@ import tempfile import time import traceback +from binascii import crc32 from datetime import datetime, timedelta, timezone from inspect import getfullargspec from threading import Thread @@ -44,9 +45,8 @@ from minio.datatypes import PostPolicy from minio.deleteobjects import DeleteObject from minio.error import S3Error -from minio.select.helpers import calculate_crc -from minio.selectrequest import (CSVInputSerialization, CSVOutputSerialization, - SelectRequest) +from minio.select import (CSVInputSerialization, CSVOutputSerialization, + SelectRequest) from minio.sse import SseCustomerKey from minio.time import to_http_header from minio.versioningconfig import VersioningConfig @@ -283,15 +283,15 @@ def test_select_object_content(log_entry): # Get the records records = io.BytesIO() for data_bytes in data.stream(10*KB): - records.write(data_bytes.encode('utf-8')) + records.write(data_bytes) - expected_crc = calculate_crc(content.getvalue()) - generated_crc = calculate_crc(records.getvalue()) + expected_crc = crc32(content.getvalue()) & 0xffffffff + generated_crc = crc32(records.getvalue()) & 0xffffffff if expected_crc != generated_crc: raise ValueError( 'Data mismatch Expected : ' '"col1,col2,col3\none,two,three\nX,Y,Z\n"', - 'Received {}', records) + 'Received {0}'.format(records.getvalue().decode())) finally: _CLIENT.remove_object(bucket_name, csvfile) _CLIENT.remove_bucket(bucket_name)