Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobReader not buffering properly. #462

Closed
Megabytemb opened this issue Jun 10, 2021 · 8 comments · Fixed by #721
Closed

BlobReader not buffering properly. #462

Megabytemb opened this issue Jun 10, 2021 · 8 comments · Fixed by #721
Assignees
Labels
api: storage Issues related to the googleapis/python-storage API. status: investigating The issue is under investigation, which is determined to be non-trivial. type: question Request for information or clarification. Not an issue.

Comments

@Megabytemb
Copy link

Megabytemb commented Jun 10, 2021

Environment details

  • OS type and version: Mac 10.15.7
  • Python version: Python 3.9.5
  • pip version: pip 21.1.1
  • google-cloud-storage: Version: 1.38.0

Steps to reproduce

When trying to stream a file from Google Cloud Storage to Google Drive, the BlobReader doesn't appear to be buffering properly.

Reading through the blobReader code, it should buffer the file as per chunksize, then download new chuncks as that buffer is exausted. However my experience is that every time the blobwriter is read a 2nd time, it invalidates the buffer, and downloads a new chunk.

the Google API MediaIoBaseUpload appears to be requesting files in 8192 bytes chunks, and every time the next chunk is requested from the GCS BlobReader, it downloads the next 40Mb chunk, rather than reading from the buffer.

My debugging has found that the buffer is actually being invalided when the python HTTP class 'seeks' the next chunk, and the math is failing here, however, i'm unsure what should be happening.

Code example below demonstrates the problem, just provide your own client credentials for Drive, and upload a CSV file to Google Cloud storage, and note the blob and bucket.

Code example

from google.cloud import storage
from googleapiclient.http import MediaIoBaseUpload
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import os
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)

logger = logging.getLogger(__name__)

SCOPES = ['https://www.googleapis.com/auth/drive']


def getCreds():
    creds = None
    # The file token.json stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'client_secret.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    
    return creds

def gcsToDrive(bucketName, blobName):
    client = storage.Client()

    bucket = client.get_bucket(bucketName)
    blob = bucket.blob(blobName)

    creds = getCreds()

    service = build('drive', 'v3', credentials=creds)

    megabyte = (256 * 1024) * 4
    chunk_size: int = megabyte * 40

    with blob.open("rb", chunk_size=chunk_size) as stream:
        file_metadata = {
            "name": "My Report",
            "mimeType": "application/vnd.google-apps.spreadsheet",
        }
        media = MediaIoBaseUpload(stream, mimetype="text/csv", resumable=True, chunksize=chunk_size)
        file = (
            service.files()
            .create(body=file_metadata, media_body=media, fields="id")
            .execute()
        )

    logger.info(file)
    
    return file

if __name__ == "__main__":
    my_bucket = "my_bucked"
    my_blob = "my-blob.csv"

    gcsToDrive(my_bucket, my_blob)

Example Logs

DEBUG:google.auth._default:Checking /Users/<username>/code/python/gscFileError/sa.json for explicit credentials as part of auth process...
DEBUG:google.auth._default:Checking /Users/<username>/code/python/gscFileError/sa.json for explicit credentials as part of auth process...
DEBUG:urllib3.util.retry:Converted retries value: 3 -> Retry(total=3, connect=None, read=None, redirect=None, status=None)
DEBUG:google.auth.transport.requests:Making request: POST https://oauth2.googleapis.com/token
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG:urllib3.connectionpool:https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): storage.googleapis.com:443
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /storage/v1/b/<my bucket>?projection=noAcl&prettyPrint=false HTTP/1.1" 200 574
INFO:googleapiclient.discovery_cache:file_cache is only supported with oauth2client<4.0.0
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /storage/v1/b/<my bucket>/o/my-blob.csv?projection=noAcl&prettyPrint=false HTTP/1.1" 200 738
DEBUG:googleapiclient.discovery:URL being requested: POST https://www.googleapis.com/upload/drive/v3/files?fields=id&alt=json&uploadType=resumable
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2240745
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2224361
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2207977
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2191593
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2175209
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2158825
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2142441
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 2126057
...
...
...
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 94441
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 78057
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 61673
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 45289
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 28905
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 206 12521
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 416 29
DEBUG:urllib3.connectionpool:https://storage.googleapis.com:443 "GET /download/storage/v1/b/<my bucket>/o/my-blob.csv?generation=1623219180016524&alt=media HTTP/1.1" 416 29
INFO:__main__:{'id': '19GXqCAQDsRtK1czPSB_vX_SaBp4JtatlxCl4uQUU66o'}
@product-auto-label product-auto-label bot added the api: storage Issues related to the googleapis/python-storage API. label Jun 10, 2021
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label Jun 10, 2021
@shaffeeullah shaffeeullah added type: question Request for information or clarification. Not an issue. and removed triage me I really want to be triaged. labels Jun 10, 2021
@tseaver tseaver added the status: investigating The issue is under investigation, which is determined to be non-trivial. label Jun 11, 2021
@tseaver tseaver self-assigned this Jun 11, 2021
@tseaver
Copy link
Contributor

tseaver commented Jun 11, 2021

@Megabytemb Thanks for the report!

As a first pass, I tried reproducing the issue without using googleapiclient (see this gist). That test succeeds as I expected: it uses the BlobReader stream to read odd chunk sizes, but gets the same result as reading the whole file.

FWIW, my test never needs to call seek() on the BlobReader stream. Your debugging suggests that the MediaIoBaseUpload is calling seek() -- I'm not sure why it would, but I can imagine that there are odd interactions there.

@Megabytemb
Copy link
Author

Sorry for the silence on this issue, but i've been doing some more troubleshooting on exactly when the buffer is invalidated.

I think the issue is to do with a combination of seeking to the beginning and end of the file, and what whence is used.
You're always seeking relatively on the buffer, but the blobReader is supporting all whence types.

as a play through example,
(difference and buffer_pos is referencing the values here

  1. seek to end of file using seek(0, whence=2)
  2. seek to beginning of file using seek(0, whence=0)
    as difference is -2240745, and buffer_pos is 0, the buffer is invalidated, and the buffer is invalidated, when in fact, it does have the bytes, and was just seeking to the beginning of the file.

I haven't cracked exactly how the seeking relationship should work yet, but i'm finding more reasons why its failing.

@Megabytemb
Copy link
Author

Ok, getting closer still

it looks like the built-in tell() on io.BufferedIOBase appears to do some weird things with seek(), but i couldn't really find any information on how BufferedIOBase's Tell is being calculated.

I've implemented Tell by simply doing the following

class MyBlobReader(BlobReader):
    
    def tell(self):
        return self._pos

...

with MyBlobReader(myBlob) as stream:
   ...

The above has stopped the GCS BlobReader cache from constantly being trashed and rebuilt every chunk request from the GoogleApiClient.

Its still downloading the file from Cloud storage about 4 times, rather then once, but i'm getting closer.

Any input would be really appreciated.

@tseaver tseaver removed their assignment Dec 29, 2021
@andrewsg
Copy link
Contributor

andrewsg commented Feb 15, 2022

Hi, sorry for the significant delay. It looks like GoogleAPIClient, as you've noted, is doing a seek to the end of the file during the init() to find out how long it is. This does erase the buffer, because once you seek to the end of the file we assume the buffer for an earlier spot is no longer needed. I don't yet see why it is causing a problem here, however, since your download should not have started at all at the time the MediaIoBaseUpload is created.

What MediaIoBaseUpload's constructor will do is force a reload() on the blob, because the blob's metadata has not yet been downloaded and so the blob's length is unknown. You can skip this reload() by using bucket.get_blob() to get the blob's metadata information ahead of time.

The built-in tell() should be using a relative seek() that changes position 0 bytes to find out where it is. This is our primary suspect. I don't think that seek() which doesn't change the position should ever invalidate our buffer, but if it reimplementing tell fixed it for you, maybe that is what is happening in this case.

Given the delay in response I'll certainly understand if you have totally moved on, but if you are still interested, please let me know and we can resume digging.

@aabbccddeeffgghhii1438
Copy link
Contributor

aabbccddeeffgghhii1438 commented Mar 1, 2022

@andrewsg I also ran into similar problems. The below code is an easy reproduction that will cause invalidations once per loop:

from google.cloud import storage
import tarfile
import time

with storage.Client().bucket("xxx").blob("xxx.tar").open("rb") as f:
    with tarfile.open(mode="r:", fileobj=f) as tar:
        t = time.time()
        for entry in tar:
            tar.extractfile(entry)
            print(f"{time.time() - t}")
            t = time.time()

The goal of the code is straightforward, that is to read from a remote tar stored on GCS. The above code will cause issues when tarfile internally calls tell() to get tar offsets. You can replace the bucket and tar with any and it will do.

The above code was run on the following:
google-cloud-storage 1.44.0
python 3.7.12
pip 20.1.1
OS: 5.16.10-arch1-1

@Megabytemb
Copy link
Author

Hey @andrewsg,

The sample code i provide is exactly what i implemented in my production code, and it's worked beautifully for me.

class MyBlobReader(BlobReader):
    
    def tell(self):
        return self._pos

...

with MyBlobReader(myBlob) as stream:
   ...

aabbccddeeffgghhii1438 pushed a commit to aabbccddeeffgghhii1438/python-storage that referenced this issue Mar 2, 2022
aabbccddeeffgghhii1438 pushed a commit to aabbccddeeffgghhii1438/python-storage that referenced this issue Mar 2, 2022
@aabbccddeeffgghhii1438
Copy link
Contributor

@andrewsg I have provided a PR that fixes the issue, as well as providing a test that will break the current implementation.

aabbccddeeffgghhii1438 pushed a commit to aabbccddeeffgghhii1438/python-storage that referenced this issue Mar 2, 2022
aabbccddeeffgghhii1438 pushed a commit to aabbccddeeffgghhii1438/python-storage that referenced this issue Mar 2, 2022
@andrewsg andrewsg self-assigned this Mar 3, 2022
aabbccddeeffgghhii1438 pushed a commit to aabbccddeeffgghhii1438/python-storage that referenced this issue Mar 11, 2022
aabbccddeeffgghhii1438 pushed a commit to aabbccddeeffgghhii1438/python-storage that referenced this issue Mar 11, 2022
@andrewsg
Copy link
Contributor

@Megabytemb Thanks again for your original report. I am looking at accepting @allenc97 's PR which should resolve the issue. However, I suspect that accepting this PR will break your solution that accesses self._pos, so please be sure to remove that fix when you upgrade. It looks like the problem was ambiguity between read() and seek() over the meaning of _pos, which the PR should resolve.

andrewsg added a commit that referenced this issue Mar 11, 2022
* tests (fileio): add tarfile based test case for reader seek

Background info: #462

* fix: Patch blob reader to return correct seek/tell values (#462)

Co-authored-by: Andrew Gorcester <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the googleapis/python-storage API. status: investigating The issue is under investigation, which is determined to be non-trivial. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants