Skip to content

Commit

Permalink
replaced GcsUploader and GcsDownloader
Browse files Browse the repository at this point in the history
  • Loading branch information
BjornPrime committed Mar 23, 2023
1 parent 8a4e029 commit 08bfd4f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4790,10 +4790,6 @@ GBKTransform:
- from_runner_api_parameter
- to_runner_api_parameter
GcpTestIOError: {}
GcsDownloader:
methods:
- get_range
- size
GCSFileSystem:
methods:
- checksum
Expand Down Expand Up @@ -4837,10 +4833,6 @@ GcsIOError: {}
GcsIOOverrides:
methods:
- retry_func
GcsUploader:
methods:
- finish
- put
GeneralPurposeConsumerSet:
methods:
- flush
Expand Down
197 changes: 8 additions & 189 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,12 @@
# pytype: skip-file

import errno
import io
import logging
import multiprocessing
import re
import threading
import time
import traceback
from itertools import islice
from google.cloud import storage

from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.filesystemio import Downloader
from apache_beam.io.filesystemio import DownloaderStream
from apache_beam.io.filesystemio import PipeStream
from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
Expand All @@ -63,7 +51,6 @@
# pylint: disable=ungrouped-imports
from apitools.base.py.batch import BatchApiRequest
from apitools.base.py.exceptions import HttpError
from apitools.base.py import transfer
from apache_beam.internal.gcp import auth
except ImportError:
raise ImportError(
Expand Down Expand Up @@ -229,24 +216,15 @@ def open(
Raises:
ValueError: Invalid open file mode.
"""
bucket_name, blob_name = parse_gcs_path(filename)
bucket = self.client.get_bucket(bucket_name)
blob = bucket.get_blob(blob_name)

if mode == 'r' or mode == 'rb':
downloader = GcsDownloader(
self.client,
filename,
buffer_size=read_buffer_size,
get_project_number=self.get_project_number)
return io.BufferedReader(
DownloaderStream(
downloader, read_buffer_size=read_buffer_size, mode=mode),
buffer_size=read_buffer_size)
return storage.fileio.BlobReader(blob, chunk_size=read_buffer_size)
elif mode == 'w' or mode == 'wb':
uploader = GcsUploader(
self.client,
filename,
mime_type,
get_project_number=self.get_project_number)
return io.BufferedWriter(
UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
return storage.fileio.BlobReader(
blob, chunk_size=read_buffer_size, content_type=mime_type)
else:
raise ValueError('Invalid file open mode: %s.' % mode)

Expand All @@ -260,7 +238,7 @@ def delete(self, path):
"""
bucket_name, target_name = parse_gcs_path(path)
try:
bucket = self.get_bucket(bucket_name)
bucket = self.client.get_bucket(bucket_name)
bucket.delete_blob(target_name)
except HttpError as http_error:
if http_error.status_code == 404:
Expand Down Expand Up @@ -605,162 +583,3 @@ def _updated_to_seconds(updated):
return (
time.mktime(updated.timetuple()) - time.timezone +
updated.microsecond / 1000000.0)


class GcsDownloader(Downloader):
def __init__(self, client, path, buffer_size, get_project_number):
self._client = client
self._path = path
self._bucket, self._name = parse_gcs_path(path)
self._buffer_size = buffer_size
self._get_project_number = get_project_number

# Create a request count metric
resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'GCS Client',
monitoring_infos.METHOD_LABEL: 'BlobReader.read',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: self._bucket
}
project_number = self._get_project_number(self._bucket)
if project_number:
labels[monitoring_infos.GCS_PROJECT_ID_LABEL] = str(project_number)
else:
_LOGGER.debug(
'Possibly missing storage.get_bucket permission to '
'bucket %s. Label %s is not added to the counter because it '
'cannot be identified.',
self._bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL)

service_call_metric = ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)

try:
bucket = self._client.get_bucket(self._bucket)
metadata = bucket.get_blob(self._name)
except HttpError as http_error:
service_call_metric.call(http_error)
if http_error.status_code == 404:
raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
else:
_LOGGER.error(
'HTTP error while requesting file %s: %s', self._path, http_error)
raise
else:
service_call_metric.call('ok')

self._size = metadata.size()

try:
reader = storage.fileio.BlobReader(metadata, chunk_size=self._buffer_size)
reader.read()
service_call_metric.call('ok')
except HttpError as e:
service_call_metric.call(e)
raise
finally:
reader.close()

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
@property
def size(self):
return self._size

def get_range(self, start, end):
self._download_stream.seek(0)
self._download_stream.truncate(0)
self._downloader.GetRange(start, end - 1)
return self._download_stream.getvalue()


class GcsUploader(Uploader):
def __init__(self, client, path, mime_type, get_project_number):
self._client = client
self._path = path
self._bucket, self._name = parse_gcs_path(path)
self._mime_type = mime_type
self._get_project_number = get_project_number

# Set up communication with child thread.
parent_conn, child_conn = multiprocessing.Pipe()
self._child_conn = child_conn
self._conn = parent_conn

# Set up uploader.
self._insert_request = (
storage.StorageObjectsInsertRequest(
bucket=self._bucket, name=self._name))
self._upload = transfer.Upload(
PipeStream(self._child_conn),
self._mime_type,
chunksize=WRITE_CHUNK_SIZE)
self._upload.strategy = transfer.RESUMABLE_UPLOAD

# Start uploading thread.
self._upload_thread = threading.Thread(target=self._start_upload)
self._upload_thread.daemon = True
self._upload_thread.last_error = None
self._upload_thread.start()

# TODO(silviuc): Refactor so that retry logic can be applied.
# There is retry logic in the underlying transfer library but we should make
# it more explicit so we can control the retry parameters.
@retry.no_retries # Using no_retries marks this as an integration point.
def _start_upload(self):
# This starts the uploader thread. We are forced to run the uploader in
# another thread because the apitools uploader insists on taking a stream
# as input. Happily, this also means we get asynchronous I/O to GCS.
#
# The uploader by default transfers data in chunks of 1024 * 1024 bytes at
# a time, buffering writes until that size is reached.

project_number = self._get_project_number(self._bucket)

# Create a request count metric
resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
monitoring_infos.METHOD_LABEL: 'Objects.insert',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: self._bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: str(project_number)
}
service_call_metric = ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)
try:
self._client.objects.Insert(self._insert_request, upload=self._upload)
service_call_metric.call('ok')
except Exception as e: # pylint: disable=broad-except
service_call_metric.call(e)
_LOGGER.error(
'Error in _start_upload while inserting file %s: %s',
self._path,
traceback.format_exc())
self._upload_thread.last_error = e
finally:
self._child_conn.close()

def put(self, data):
try:
self._conn.send_bytes(data.tobytes())
except EOFError:
if self._upload_thread.last_error is not None:
raise self._upload_thread.last_error # pylint: disable=raising-bad-type
raise

def finish(self):
self._conn.close()
# TODO(udim): Add timeout=DEFAULT_HTTP_TIMEOUT_SECONDS * 2 and raise if
# isAlive is True.
self._upload_thread.join()
# Check for exception since the last put() call.
if self._upload_thread.last_error is not None:
raise type(self._upload_thread.last_error)(
"Error while uploading file %s: %s",
self._path,
self._upload_thread.last_error.message) # pylint: disable=raising-bad-type

0 comments on commit 08bfd4f

Please sign in to comment.