Skip to content

Commit

Permalink
Simplify select_object_content() response stream (#1018)
Browse files Browse the repository at this point in the history
  • Loading branch information
balamurugana authored Dec 2, 2020
1 parent 470d1c5 commit 08901d5
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 408 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
check:
@pip install --user --upgrade pylint
@if python --version | grep -qi 'python 3'; then pylint --reports=no --score=no --disable=R0401,R0801 minio/*py; fi
@if python --version | grep -qi 'python 3'; then pylint --reports=no --score=no minio/credentials minio/select tests/functional; fi
@if python --version | grep -qi 'python 3'; then pylint --reports=no --score=no minio/credentials tests/functional; fi

@isort --diff --recursive .

Expand Down
25 changes: 13 additions & 12 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -889,18 +889,19 @@ __Return Value__
__Example__

```py
request = SelectRequest(
"select * from s3object",
CSVInputSerialization(),
CSVOutputSerialization(),
request_progress=True,
)
data = client.select_object_content('my-bucket', 'my-object', request)
with open('my-record-file', 'w') as record_data:
for d in data.stream(10*1024):
record_data.write(d)
# Get the stats
print(data.stats())
with client.select_object_content(
"my-bucket",
"my-object.csv",
SelectRequest(
"select * from S3Object",
CSVInputSerialization(),
CSVOutputSerialization(),
request_progress=True,
),
) as result:
for data in result.stream():
print(data.decode())
print(result.stats())
```

<a name="fget_object"></a>
Expand Down
37 changes: 20 additions & 17 deletions examples/select_object_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@


from minio import Minio
from minio.selectrequest import (CSVInputSerialization, CSVOutputSerialization,
SelectRequest)
from minio.select import (CSVInputSerialization, CSVOutputSerialization,
SelectRequest)

client = Minio('s3.amazonaws.com',
access_key='YOUR-ACCESSKEY',
secret_key='YOUR-SECRETKEY')

request = SelectRequest(
"select * from s3object",
CSVInputSerialization(),
CSVOutputSerialization(),
request_progress=True,
client = Minio(
"s3.amazonaws.com",
access_key="YOUR-ACCESSKEY",
secret_key="YOUR-SECRETKEY",
)
data = client.select_object_content('my-bucket', 'my-object', request)
with open('my-record-file', 'w') as record_data:
for d in data.stream(10*1024):
record_data.write(d)
# Get the stats
print(data.stats())

with client.select_object_content(
"my-bucket",
"my-object.csv",
SelectRequest(
"select * from S3Object",
CSVInputSerialization(),
CSVOutputSerialization(),
request_progress=True,
),
) as result:
for data in result.stream():
print(data.decode())
print(result.stats())
23 changes: 14 additions & 9 deletions minio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@
from .objectlockconfig import ObjectLockConfig
from .replicationconfig import ReplicationConfig
from .retention import Retention
from .select import SelectObjectReader
from .selectrequest import SelectRequest
from .select import SelectObjectReader, SelectRequest
from .signer import presign_v4, sign_v4_s3
from .sse import SseCustomerKey
from .sseconfig import SSEConfig
Expand Down Expand Up @@ -546,13 +545,19 @@ def select_object_content(self, bucket_name, object_name, request):
:return: A reader contains requested records and progress information.
Example::
request = SelectRequest(
"select * from s3object",
CSVInputSerialization(),
CSVOutputSerialization(),
request_progress=True,
)
data = client.select_object_content('foo', 'test.csv', request)
with client.select_object_content(
"my-bucket",
"my-object.csv",
SelectRequest(
"select * from S3Object",
CSVInputSerialization(),
CSVOutputSerialization(),
request_progress=True,
),
) as result:
for data in result.stream():
print(data.decode())
print(result.stats())
"""
check_bucket_name(bucket_name)
check_non_empty_string(object_name)
Expand Down
178 changes: 174 additions & 4 deletions minio/selectrequest.py → minio/select.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Request/response of PutBucketReplication and GetBucketReplication APIs."""
"""Request/response of SelectObjectContent API."""

from __future__ import absolute_import

from abc import ABCMeta
from binascii import crc32
from io import BytesIO
from xml.etree import ElementTree as ET

from .xml import Element, SubElement
from .error import MinioException
from .xml import Element, SubElement, findtext

COMPRESSION_TYPE_NONE = "NONE"
COMPRESSION_TYPE_GZIP = "GZIP"
Expand Down Expand Up @@ -154,8 +158,8 @@ def toxml(self, element):
class ParquetInputSerialization(InputSerialization):
"""Parquet input serialization."""

def __init__(self, compression_type=None):
super().__init__(compression_type)
def __init__(self):
super().__init__(None)

def toxml(self, element):
"""Convert to XML."""
Expand Down Expand Up @@ -273,3 +277,169 @@ def toxml(self, element):
if self._scan_end_range:
SubElement(tag, "End", self._scan_end_range)
return element


def _read(reader, size):
"""Wrapper to RawIOBase.read() to error out on short reads."""
data = reader.read(size)
if len(data) != size:
raise IOError("insufficient data")
return data


def _int(data):
"""Convert byte data to big-endian int."""
return int.from_bytes(data, byteorder="big")


def _crc32(data):
"""Wrapper to binascii.crc32()."""
return crc32(data) & 0xffffffff


def _decode_header(data):
"""Decode header data."""
reader = BytesIO(data)
headers = {}
while True:
length = reader.read(1)
if not length:
break
name = _read(reader, _int(length))
if _int(_read(reader, 1)) != 7:
raise IOError("header value type is not 7")
value = _read(reader, _int(_read(reader, 2)))
headers[name.decode()] = value.decode()
return headers


class Stats:
"""Progress/Stats information."""

def __init__(self, data):
element = ET.fromstring(data.decode())
self._bytes_scanned = findtext(element, "BytesScanned")
self._bytes_processed = findtext(element, "BytesProcessed")
self._bytes_returned = findtext(element, "BytesReturned")

@property
def bytes_scanned(self):
"""Get bytes scanned."""
return self._bytes_scanned

@property
def bytes_processed(self):
"""Get bytes processed."""
return self._bytes_processed

@property
def bytes_returned(self):
"""Get bytes returned."""
return self._bytes_returned


class SelectObjectReader:
"""
BufferedIOBase compatible reader represents response data of
Minio.select_object_content() API.
"""

def __init__(self, response):
self._response = response
self._stats = None
self._payload = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
return self.close()

def readable(self): # pylint: disable=no-self-use
"""Return this is readable."""
return True

def writeable(self): # pylint: disable=no-self-use
"""Return this is not writeable."""
return False

def close(self):
"""Close response and release network resources."""
self._response.close()
self._response.release_conn()

def stats(self):
"""Get stats information."""
return self._stats

def _read(self):
"""Read and decode response."""
if self._response.isclosed():
return 0

prelude = _read(self._response, 8)
prelude_crc = _read(self._response, 4)
if _crc32(prelude) != _int(prelude_crc):
raise IOError(
"prelude CRC mismatch; expected: {0}, got: {1}".format(
_crc32(prelude), _int(prelude_crc),
),
)

total_length = _int(prelude[:4])
data = _read(self._response, total_length - 8 - 4 - 4)
message_crc = _int(_read(self._response, 4))
if _crc32(prelude + prelude_crc + data) != message_crc:
raise IOError(
"message CRC mismatch; expected: {0}, got: {1}".format(
_crc32(prelude + prelude_crc + data),
message_crc,
),
)

header_length = _int(prelude[4:])
headers = _decode_header(data[:header_length])

if headers.get(":message-type") == "error":
raise MinioException(
"{0}: {1}".format(
headers.get(":error-code"), headers.get(":error-message"),
),
)

if headers.get(":event-type") == "End":
return 0

payload_length = total_length - header_length - 16
if headers.get(":event-type") == "Cont" or payload_length < 1:
return self._read()

payload = data[header_length:header_length+payload_length]

if headers.get(":event-type") in ["Progress", "Stats"]:
self._stats = Stats(payload)
return self._read()

if headers.get(":event-type") == "Records":
self._payload = payload
return len(payload)

raise MinioException(
"unknown event-type {0}".format(headers.get(":event-type")),
)

def stream(self, num_bytes=32*1024):
"""
Stream extracted payload from response data. Upon completion, caller
should call self.close() to release network resources.
"""
while True:
if self._payload:
result = self._payload
if num_bytes < len(self._payload):
result = self._payload[:num_bytes]
self._payload = self._payload[len(result):]
yield result

if self._read() <= 0:
break
32 changes: 0 additions & 32 deletions minio/select/__init__.py

This file was deleted.

38 changes: 0 additions & 38 deletions minio/select/errors.py

This file was deleted.

Loading

0 comments on commit 08901d5

Please sign in to comment.