Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage-Blob] Quick Query API #11991

Merged
merged 29 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
09c2564
Renamed query error
annatisch Jun 11, 2020
2afff10
Renamed query reader
annatisch Jun 11, 2020
69c319a
Updated config models
annatisch Jun 11, 2020
e12c1c5
Updated format request params
annatisch Jun 11, 2020
a2e5c66
Updated iterator
annatisch Jun 11, 2020
cf088de
fix the bug which caused only showing fatal error
xiafu-msft Jun 12, 2020
0920887
Updated Error message
annatisch Jun 12, 2020
b09eddc
Fixed query helper
annatisch Jun 12, 2020
0ed1910
Started test conversion
annatisch Jun 12, 2020
b55a728
small fix
xiafu-msft Jun 12, 2020
a5752e5
Merge branch 'storage-stg73-query' of https://github.com/annatisch/az…
annatisch Jun 12, 2020
de6e434
Fixed tests
annatisch Jun 12, 2020
020cf45
Updated error handling + json model
annatisch Jun 12, 2020
fb17fcb
Updated recordings
annatisch Jun 12, 2020
bc3b153
Removed old recording
annatisch Jun 12, 2020
4ec52e1
Added iter tests
annatisch Jun 12, 2020
b70c252
Iter test recordings
annatisch Jun 12, 2020
1bb8177
Merge remote-tracking branch 'upstream/feature/storage-stg73' into st…
annatisch Jun 12, 2020
bdd3a55
Fix test
annatisch Jun 12, 2020
6cacb58
Remove extra recording
annatisch Jun 12, 2020
6b48df8
Fix pylint
annatisch Jun 12, 2020
e98bd19
Some docs cleanup
annatisch Jun 12, 2020
5baddd5
Renamed iter_records -> iter_stream
annatisch Jun 13, 2020
d485325
Review feedback
annatisch Jun 24, 2020
50964e5
Merge remote-tracking branch 'upstream/feature/storage-stg73' into st…
annatisch Jun 24, 2020
7431e1d
Updated tests
annatisch Jun 24, 2020
c56ed85
Missing commas
annatisch Jun 24, 2020
e2cb444
Fix syntax
annatisch Jun 24, 2020
1d6fd21
Fix pylint
annatisch Jun 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ._blob_service_client import BlobServiceClient
from ._lease import BlobLeaseClient
from ._download import StorageStreamDownloader
from ._quick_query_helper import QuickQueryReader
from ._quick_query_helper import BlobQueryReader
from ._shared_access_signature import generate_account_sas, generate_container_sas, generate_blob_sas
from ._shared.policies import ExponentialRetry, LinearRetry
from ._shared.response_handlers import PartialBatchErrorException
Expand Down Expand Up @@ -52,8 +52,9 @@
BlobSasPermissions,
CustomerProvidedEncryptionKey,
ContainerEncryptionScope,
QuickQueryError,
DelimitedTextConfiguration
BlobQueryError,
DelimitedJSON,
CSVDialect
)

__version__ = VERSION
Expand Down Expand Up @@ -212,7 +213,8 @@ def download_blob_from_url(
'generate_blob_sas',
'PartialBatchErrorException',
'ContainerEncryptionScope',
'QuickQueryError',
'DelimitedTextConfiguration',
'QuickQueryReader'
'BlobQueryError',
'DelimitedJSON',
'CSVDialect',
'BlobQueryReader'
]
113 changes: 78 additions & 35 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@
StorageErrorException,
QueryRequest,
CpkInfo)
from ._serialize import get_modify_conditions, get_source_conditions, get_cpk_scope_info, get_api_version, \
serialize_blob_tags_header, serialize_blob_tags, get_quick_query_serialization_info
from ._serialize import (
get_modify_conditions,
get_source_conditions,
get_cpk_scope_info,
get_api_version,
serialize_blob_tags_header,
serialize_blob_tags,
serialize_query_format
)
from ._deserialize import get_page_ranges_result, deserialize_blob_properties, deserialize_blob_stream
from ._quick_query_helper import QuickQueryReader
from ._quick_query_helper import BlobQueryReader
from ._upload_helpers import (
upload_block_blob,
upload_append_blob,
Expand Down Expand Up @@ -637,11 +644,25 @@ def download_blob(self, offset=None, length=None, **kwargs):
def _quick_query_options(self, query_expression,
**kwargs):
# type: (str, **Any) -> Dict[str, Any]
input_serialization = kwargs.pop('input_serialization', None)
output_serialization = kwargs.pop('output_serialization', None)
query_request = QueryRequest(expression=query_expression,
input_serialization=get_quick_query_serialization_info(input_serialization),
output_serialization=get_quick_query_serialization_info(output_serialization))
delimiter = '\n'
input_format = kwargs.pop('blob_format', None)
if input_format:
try:
delimiter = input_format.lineterminator
except AttributeError:
delimiter = input_format.delimiter
output_format = kwargs.pop('output_format', None)
if output_format:
try:
delimiter = output_format.lineterminator
except AttributeError:
delimiter = output_format.delimiter
has_header = kwargs.pop('has_header', None)
annatisch marked this conversation as resolved.
Show resolved Hide resolved
query_request = QueryRequest(
expression=query_expression,
input_serialization=serialize_query_format(input_format, headers=has_header),
output_serialization=serialize_query_format(output_format)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
)
)
else:
output_format = input_format

access_conditions = get_access_conditions(kwargs.pop('lease', None))
mod_conditions = get_modify_conditions(kwargs)

Expand All @@ -650,44 +671,53 @@ def _quick_query_options(self, query_expression,
if cpk:
if self.scheme.lower() != 'https':
raise ValueError("Customer provided encryption key must be used over HTTPS.")
cpk_info = CpkInfo(encryption_key=cpk.key_value, encryption_key_sha256=cpk.key_hash,
encryption_algorithm=cpk.algorithm)
cpk_info = CpkInfo(
encryption_key=cpk.key_value,
encryption_key_sha256=cpk.key_hash,
encryption_algorithm=cpk.algorithm
)
options = {
'query_request': query_request,
'lease_access_conditions': access_conditions,
'modified_access_conditions': mod_conditions,
'cpk_info': cpk_info,
'progress_callback': kwargs.pop('progress_callback', None),
'snapshot': self.snapshot,
'timeout': kwargs.pop('timeout', None),
'cls': return_headers_and_deserialized,
'client': self._client,
'name': self.blob_name,
'container': self.container_name}
}
options.update(kwargs)
return options
return options, delimiter

@distributed_trace
def query(self, query_expression, # type: str
**kwargs):
# type: (str, **Any) -> QuickQueryReader
def query_blob(self, query_expression, **kwargs):
annatisch marked this conversation as resolved.
Show resolved Hide resolved
# type: (str, **Any) -> BlobQueryReader
"""Enables users to select/project on blob/or blob snapshot data by providing simple query expressions.
This operations returns a QuickQueryReader, users need to use readall() or readinto() to get query data.
This operations returns a BlobQueryReader, users need to use readall() or readinto() to get query data.

:param str query_expression:
Required. a query statement.
:keyword func(~azure.storage.blob.QuickQueryError, int, int) progress_callback:
Callback where the caller can track progress of the operation as well as the quick query failures.
:keyword input_serialization:
Optional. Defines the input serialization for a blob quick query request.
This keyword arg could be set for delimited (CSV) serialization or JSON serialization.
When the input_serialization is set for JSON records, only a record separator in str format is needed.
:paramtype input_serialization: ~azure.storage.blob.DelimitedTextConfiguration or str
:keyword output_serialization:
Optional. Defines the output serialization for a blob quick query request.
This keyword arg could be set for delimited (CSV) serialization or JSON serialization.
When the input_serialization is set for JSON records, only a record separator in str format is needed.
:paramtype output_serialization: ~azure.storage.blob.DelimitedTextConfiguration or str.
:keyword Union[str, Callable[Exception]] errors:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does operation_on_error sound better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modelled the parameter naming based on this:
https://docs.python.org/3.8/library/codecs.html#error-handlers

Copy link
Contributor

@xiafu-msft xiafu-msft Jun 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kasobol-msft thoughts? how about error handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I posted comment next to implementation below. I believe this change doesn't follow our feature specification and I'd recommend to change in back.

Determines the error behaviour. The default value is 'strict', where any non-fatal error will be
raised. Other possible values include:
- 'ignore': Non-fatal errors will be ignored, this may result in dropped records.
- Callable[Exception]: If a callable is provided, customer error handling can be defined.
:keyword blob_format:
xiafu-msft marked this conversation as resolved.
Show resolved Hide resolved
Optional. Defines the serialization of the data currently stored in the blob. The default is to
treat the blob data as CSV data formatted in the default dialect. This can be overridden with
a custom CSVDialect, or alternatively a DelimitedJSON.
:paramtype blob_format: ~azure.storage.blob.CSVDialect or ~azure.storage.blob.DelimitedJSON
:keyword output_format:
Optional. Defines the output serialization for the data stream. By default the data will be returned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I told you wrong about this"By default the data will be returned as it is represented in the blob", I tried again and the it shows the default output is also Delimited Text data. While I think it makes more sense to return the format that the blob presently have. I will add a commit to make output format default to the existing blob format.

if output_format:
    # existing code
else:
    output = input_format

as it is represented in the blob. By providing an output format, the blob data will be reformatted
according to that profile. This value can be a CSVDialect or a DelimitedJSON.
:paramtype output_format: ~azure.storage.blob.CSVDialect or ~azure.storage.blob.DelimitedJSON
:keyword bool has_header:
annatisch marked this conversation as resolved.
Show resolved Hide resolved
Whether the blob data includes headers in the first line. The default value is False, meaning that the
data will be returned inclusive of the first line. If set to True, the data will be returned exclusive
of the first line.

.. note: This parameter only applies to blob data formatted as CSV.

:keyword lease:
Required if the blob has an active lease. Value can be a BlobLeaseClient object
or the lease ID as a string.
Expand Down Expand Up @@ -716,8 +746,8 @@ def query(self, query_expression, # type: str
a secure connection must be established to transfer the key.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:returns: A streaming object (QuickQueryReader)
:rtype: ~azure.storage.blob.QuickQueryReader
:returns: A streaming object (BlobQueryReader)
:rtype: ~azure.storage.blob.BlobQueryReader

.. admonition:: Example:

Expand All @@ -728,8 +758,21 @@ def query(self, query_expression, # type: str
:dedent: 4
:caption: select/project on blob/or blob snapshot data by providing simple query expressions.
"""
options = self._quick_query_options(query_expression, **kwargs)
return QuickQueryReader(**options)
errors = kwargs.pop("errors", None) or 'strict'
encoding = kwargs.pop("encoding", None)
options, delimiter = self._quick_query_options(query_expression, **kwargs)
try:
headers, raw_response_body = self._client.blob.query(**options)
except StorageErrorException as error:
process_storage_error(error)
return BlobQueryReader(
name=self.blob_name,
container=self.container_name,
errors=errors,
record_delimiter=delimiter,
encoding=encoding,
headers=headers,
response=raw_response_body)
annatisch marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _generic_delete_blob_options(delete_snapshots=False, **kwargs):
Expand Down
50 changes: 27 additions & 23 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from ._generated.models import StorageErrorException
from ._generated.models import BlobPrefix as GenBlobPrefix
from ._generated.models import BlobItemInternal
from ._generated.models import DelimitedTextConfiguration as GenDelimitedTextConfiguration


class BlobType(str, Enum):
Expand Down Expand Up @@ -1214,34 +1213,35 @@ def _from_generated(cls, generated):
return None


class DelimitedTextConfiguration(GenDelimitedTextConfiguration):
"""Defines the input or output delimited (CSV) serialization for a blob quick query request.
class DelimitedJSON(object):
annatisch marked this conversation as resolved.
Show resolved Hide resolved
"""Defines the input or output JSON serialization for a blob data query.

:keyword str column_separator: column separator, defaults to ','
:keyword str field_quote: field quote, defaults to '"'
:keyword str record_separator: record separator, defaults to '\n'
:keyword str escape_char: escape char, defaults to empty
:keyword bool headers_present: has headers, defaults to False
:keyword str delimiter: The line separator character, default value is '\n'
"""

def __init__(self, **kwargs):
column_separator = kwargs.pop('column_separator', ',')
field_quote = kwargs.pop('field_quote', '"')
record_separator = kwargs.pop('record_separator', '\n')
escape_char = kwargs.pop('escape_char', "")
headers_present = kwargs.pop('headers_present', False)
self.delimiter = kwargs.pop('delimiter', '\n')


class CSVDialect(object):
"""Defines the input or output delimited (CSV) serialization for a blob query request.

super(DelimitedTextConfiguration, self).__init__(
column_separator=column_separator,
field_quote=field_quote,
record_separator=record_separator,
escape_char=escape_char,
headers_present=headers_present)
:keyword str delimiter: column separator, defaults to ','
:keyword str quotechar: field quote, defaults to '"'
:keyword str lineterminator: record separator, defaults to '\n'
:keyword str escapechar: escape char, defaults to empty
"""
def __init__(self, **kwargs):
self.delimiter = kwargs.pop('delimiter', ',')
self.quotechar = kwargs.pop('quotechar', '"')
self.lineterminator = kwargs.pop('lineterminator', '\n')
self.escapechar = kwargs.pop('escapechar', "")


class QuickQueryError(object):
class BlobQueryError(Exception):
"""The error happened during quick query operation.

:ivar str name:
:ivar str error:
The name of the error.
:ivar bool is_fatal:
If true, this error prevents further query processing. More result data may be returned,
Expand All @@ -1252,8 +1252,12 @@ class QuickQueryError(object):
:ivar int position:
The blob offset at which the error occurred.
"""
def __init__(self, name=None, is_fatal=False, description=None, position=None):
self.name = name
def __init__(self, error=None, is_fatal=False, description=None, position=None):
self.error = error
self.is_fatal = is_fatal
self.description = description
self.position = position
message = self.error
if self.description:
message += ": {}".format(self.description)
super(BlobQueryError, self).__init__(message)
Loading