Skip to content

Commit

Permalink
[Storage-Blob] Quick Query API (#11991)
Browse files Browse the repository at this point in the history
* Renamed query error

* Renamed query reader

* Updated config models

* Updated format request params

* Updated iterator

* fix the bug which caused only showing fatal error

* Updated Error message

* Fixed query helper

* Started test conversion

* small fix

* Fixed tests

* Updated error handling + json model

* Updated recordings

* Removed old recording

* Added iter tests

* Iter test recordings

* Fix test

* Remove extra recording

* Fix pylint

* Some docs cleanup

* Renamed iter_records -> iter_stream

* Review feedback

* Updated tests

* Missing commas

* Fix syntax

* Fix pylint

Co-authored-by: xiafu <[email protected]>
  • Loading branch information
annatisch and xiafu-msft authored Jun 30, 2020
1 parent c5f1413 commit 17c1284
Show file tree
Hide file tree
Showing 26 changed files with 4,406 additions and 464 deletions.
14 changes: 8 additions & 6 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ._blob_service_client import BlobServiceClient
from ._lease import BlobLeaseClient
from ._download import StorageStreamDownloader
from ._quick_query_helper import QuickQueryReader
from ._quick_query_helper import BlobQueryReader
from ._shared_access_signature import generate_account_sas, generate_container_sas, generate_blob_sas
from ._shared.policies import ExponentialRetry, LinearRetry
from ._shared.response_handlers import PartialBatchErrorException
Expand Down Expand Up @@ -51,8 +51,9 @@
BlobSasPermissions,
CustomerProvidedEncryptionKey,
ContainerEncryptionScope,
QuickQueryError,
DelimitedTextConfiguration,
BlobQueryError,
DelimitedJSON,
DelimitedTextDialect,
ObjectReplicationPolicy,
ObjectReplicationRule
)
Expand Down Expand Up @@ -213,9 +214,10 @@ def download_blob_from_url(
'generate_blob_sas',
'PartialBatchErrorException',
'ContainerEncryptionScope',
'QuickQueryError',
'DelimitedTextConfiguration',
'QuickQueryReader',
'BlobQueryError',
'DelimitedJSON',
'DelimitedTextDialect',
'BlobQueryReader',
'ObjectReplicationPolicy',
'ObjectReplicationRule'
]
102 changes: 67 additions & 35 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@
StorageErrorException,
QueryRequest,
CpkInfo)
from ._serialize import get_modify_conditions, get_source_conditions, get_cpk_scope_info, get_api_version, \
serialize_blob_tags_header, serialize_blob_tags, get_quick_query_serialization_info
from ._serialize import (
get_modify_conditions,
get_source_conditions,
get_cpk_scope_info,
get_api_version,
serialize_blob_tags_header,
serialize_blob_tags,
serialize_query_format
)
from ._deserialize import get_page_ranges_result, deserialize_blob_properties, deserialize_blob_stream
from ._quick_query_helper import QuickQueryReader
from ._quick_query_helper import BlobQueryReader
from ._upload_helpers import (
upload_block_blob,
upload_append_blob,
Expand Down Expand Up @@ -637,11 +644,24 @@ def download_blob(self, offset=None, length=None, **kwargs):
def _quick_query_options(self, query_expression,
**kwargs):
# type: (str, **Any) -> Dict[str, Any]
input_serialization = kwargs.pop('input_serialization', None)
output_serialization = kwargs.pop('output_serialization', None)
query_request = QueryRequest(expression=query_expression,
input_serialization=get_quick_query_serialization_info(input_serialization),
output_serialization=get_quick_query_serialization_info(output_serialization))
delimiter = '\n'
input_format = kwargs.pop('blob_format', None)
if input_format:
try:
delimiter = input_format.lineterminator
except AttributeError:
delimiter = input_format.delimiter
output_format = kwargs.pop('output_format', None)
if output_format:
try:
delimiter = output_format.lineterminator
except AttributeError:
delimiter = output_format.delimiter
query_request = QueryRequest(
expression=query_expression,
input_serialization=serialize_query_format(input_format),
output_serialization=serialize_query_format(output_format)
)
access_conditions = get_access_conditions(kwargs.pop('lease', None))
mod_conditions = get_modify_conditions(kwargs)

Expand All @@ -650,44 +670,43 @@ def _quick_query_options(self, query_expression,
if cpk:
if self.scheme.lower() != 'https':
raise ValueError("Customer provided encryption key must be used over HTTPS.")
cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
encryption_algorithm=cpk.algorithm)
cpk_info = CpkInfo(
encryption_key=cpk.key_value,
encryption_key_sha256=cpk.key_hash,
encryption_algorithm=cpk.algorithm
)
options = {
'query_request': query_request,
'lease_access_conditions': access_conditions,
'modified_access_conditions': mod_conditions,
'cpk_info': cpk_info,
'progress_callback': kwargs.pop('progress_callback', None),
'snapshot': self.snapshot,
'timeout': kwargs.pop('timeout', None),
'cls': return_headers_and_deserialized,
'client': self._client,
'name': self.blob_name,
'container': self.container_name}
}
options.update(kwargs)
return options
return options, delimiter

@distributed_trace
def query(self, query_expression, # type: str
**kwargs):
# type: (str, **Any) -> QuickQueryReader
def query_blob(self, query_expression, **kwargs):
# type: (str, **Any) -> BlobQueryReader
"""Enables users to select/project on blob/or blob snapshot data by providing simple query expressions.
This operations returns a QuickQueryReader, users need to use readall() or readinto() to get query data.
This operations returns a BlobQueryReader, users need to use readall() or readinto() to get query data.
:param str query_expression:
Required. a query statement.
:keyword func(~azure.storage.blob.QuickQueryError, int, int) progress_callback:
Callback where the caller can track progress of the operation as well as the quick query failures.
:keyword input_serialization:
Optional. Defines the input serialization for a blob quick query request.
This keyword arg could be set for delimited (CSV) serialization or JSON serialization.
When the input_serialization is set for JSON records, only a record separator in str format is needed.
:paramtype input_serialization: ~azure.storage.blob.DelimitedTextConfiguration or str
:keyword output_serialization:
Optional. Defines the output serialization for a blob quick query request.
This keyword arg could be set for delimited (CSV) serialization or JSON serialization.
When the input_serialization is set for JSON records, only a record separator in str format is needed.
:paramtype output_serialization: ~azure.storage.blob.DelimitedTextConfiguration or str.
:keyword Callable[Exception] on_error:
A function to be called on any processing errors returned by the service.
:keyword blob_format:
Optional. Defines the serialization of the data currently stored in the blob. The default is to
treat the blob data as CSV data formatted in the default dialect. This can be overridden with
a custom DelimitedTextDialect, or alternatively a DelimitedJSON.
:paramtype blob_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJSON
:keyword output_format:
Optional. Defines the output serialization for the data stream. By default the data will be returned
as it is represented in the blob. By providing an output format, the blob data will be reformatted
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJSON.
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJSON
:keyword lease:
Required if the blob has an active lease. Value can be a BlobLeaseClient object
or the lease ID as a string.
Expand Down Expand Up @@ -716,8 +735,8 @@ def query(self, query_expression, # type: str
a secure connection must be established to transfer the key.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:returns: A streaming object (QuickQueryReader)
:rtype: ~azure.storage.blob.QuickQueryReader
:returns: A streaming object (BlobQueryReader)
:rtype: ~azure.storage.blob.BlobQueryReader
.. admonition:: Example:
Expand All @@ -728,8 +747,21 @@ def query(self, query_expression, # type: str
:dedent: 4
:caption: select/project on blob/or blob snapshot data by providing simple query expressions.
"""
options = self._quick_query_options(query_expression, **kwargs)
return QuickQueryReader(**options)
errors = kwargs.pop("on_error", None)
encoding = kwargs.pop("encoding", None)
options, delimiter = self._quick_query_options(query_expression, **kwargs)
try:
headers, raw_response_body = self._client.blob.query(**options)
except StorageErrorException as error:
process_storage_error(error)
return BlobQueryReader(
name=self.blob_name,
container=self.container_name,
errors=errors,
record_delimiter=delimiter,
encoding=encoding,
headers=headers,
response=raw_response_body)

@staticmethod
def _generic_delete_blob_options(delete_snapshots=False, **kwargs):
Expand Down
61 changes: 37 additions & 24 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from ._generated.models import StorageErrorException
from ._generated.models import BlobPrefix as GenBlobPrefix
from ._generated.models import BlobItemInternal
from ._generated.models import DelimitedTextConfiguration as GenDelimitedTextConfiguration


class BlobType(str, Enum):
Expand Down Expand Up @@ -1214,28 +1213,38 @@ def _from_generated(cls, generated):
return None


class DelimitedTextConfiguration(GenDelimitedTextConfiguration):
"""Defines the input or output delimited (CSV) serialization for a blob quick query request.
class DelimitedJSON(object):
"""Defines the input or output JSON serialization for a blob data query.
:keyword str column_separator: column separator, defaults to ','
:keyword str field_quote: field quote, defaults to '"'
:keyword str record_separator: record separator, defaults to '\n'
:keyword str escape_char: escape char, defaults to empty
:keyword bool headers_present: has headers, defaults to False
:keyword str delimiter: The line separator character, default value is '\n'
"""
def __init__(self, **kwargs):
column_separator = kwargs.pop('column_separator', ',')
field_quote = kwargs.pop('field_quote', '"')
record_separator = kwargs.pop('record_separator', '\n')
escape_char = kwargs.pop('escape_char', "")
headers_present = kwargs.pop('headers_present', False)

super(DelimitedTextConfiguration, self).__init__(
column_separator=column_separator,
field_quote=field_quote,
record_separator=record_separator,
escape_char=escape_char,
headers_present=headers_present)
def __init__(self, **kwargs):
self.delimiter = kwargs.pop('delimiter', '\n')


class DelimitedTextDialect(object):
"""Defines the input or output delimited (CSV) serialization for a blob query request.
:keyword str delimiter:
Column separator, defaults to ','.
:keyword str quotechar:
Field quote, defaults to '"'.
:keyword str lineterminator:
Record separator, defaults to '\n'.
:keyword str escapechar:
Escape char, defaults to empty.
:keyword bool has_header:
Whether the blob data includes headers in the first line. The default value is False, meaning that the
data will be returned inclusive of the first line. If set to True, the data will be returned exclusive
of the first line.
"""
def __init__(self, **kwargs):
self.delimiter = kwargs.pop('delimiter', ',')
self.quotechar = kwargs.pop('quotechar', '"')
self.lineterminator = kwargs.pop('lineterminator', '\n')
self.escapechar = kwargs.pop('escapechar', "")
self.has_header = kwargs.pop('has_header', False)


class ObjectReplicationPolicy(DictMixin):
Expand Down Expand Up @@ -1267,10 +1276,10 @@ def __init__(self, **kwargs):
self.status = kwargs.pop('status', None)


class QuickQueryError(object):
class BlobQueryError(Exception):
"""The error happened during quick query operation.
:ivar str name:
:ivar str error:
The name of the error.
:ivar bool is_fatal:
If true, this error prevents further query processing. More result data may be returned,
Expand All @@ -1281,8 +1290,12 @@ class QuickQueryError(object):
:ivar int position:
The blob offset at which the error occurred.
"""
def __init__(self, name=None, is_fatal=False, description=None, position=None):
self.name = name
def __init__(self, error=None, is_fatal=False, description=None, position=None):
self.error = error
self.is_fatal = is_fatal
self.description = description
self.position = position
message = self.error
if self.description:
message += ": {}".format(self.description)
super(BlobQueryError, self).__init__(message)
Loading

0 comments on commit 17c1284

Please sign in to comment.