From 3866f388b1c7a476be1ae5fd1cd044e5a112feec Mon Sep 17 00:00:00 2001 From: Bernd Rederlechner Date: Thu, 8 Jul 2021 15:19:39 +0200 Subject: [PATCH] Refactor writeObject to only use MultipartUpload when required MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Bernd Rederlechner Co-authored-by: Julius Härtl --- .../Files/ObjectStore/S3ConnectionTrait.php | 6 +- .../Files/ObjectStore/S3ObjectTrait.php | 75 ++++++++++++---- .../lib/Files/ObjectStore/ObjectStoreTest.php | 19 ++++ tests/lib/Files/ObjectStore/S3Test.php | 86 +++++++++++++++++++ 4 files changed, 166 insertions(+), 20 deletions(-) diff --git a/lib/private/Files/ObjectStore/S3ConnectionTrait.php b/lib/private/Files/ObjectStore/S3ConnectionTrait.php index 47c20d96d53ec..c99ebdbcd5cb1 100644 --- a/lib/private/Files/ObjectStore/S3ConnectionTrait.php +++ b/lib/private/Files/ObjectStore/S3ConnectionTrait.php @@ -74,9 +74,9 @@ protected function parseParams($params) { $this->test = isset($params['test']); $this->bucket = $params['bucket']; - $this->proxy = isset($params['proxy']) ? $params['proxy'] : false; - $this->timeout = !isset($params['timeout']) ? 15 : $params['timeout']; - $this->uploadPartSize = !isset($params['uploadPartSize']) ? 524288000 : $params['uploadPartSize']; + $this->proxy = $params['proxy'] ?? false; + $this->timeout = $params['timeout'] ?? 15; + $this->uploadPartSize = $params['uploadPartSize'] ?? 524288000; $params['region'] = empty($params['region']) ? 'eu-west-1' : $params['region']; $params['hostname'] = empty($params['hostname']) ? 's3.' . $params['region'] . '.amazonaws.com' : $params['hostname']; if (!isset($params['port']) || $params['port'] === '') { diff --git a/lib/private/Files/ObjectStore/S3ObjectTrait.php b/lib/private/Files/ObjectStore/S3ObjectTrait.php index bb71306c17d0f..c88246094ed81 100644 --- a/lib/private/Files/ObjectStore/S3ObjectTrait.php +++ b/lib/private/Files/ObjectStore/S3ObjectTrait.php @@ -28,10 +28,11 @@ use Aws\S3\Exception\S3MultipartUploadException; use Aws\S3\MultipartUploader; -use Aws\S3\ObjectUploader; use Aws\S3\S3Client; -use Icewind\Streams\CallbackWrapper; +use GuzzleHttp\Psr7\Utils; use OC\Files\Stream\SeekableHttpStream; +use GuzzleHttp\Psr7; +use Psr\Http\Message\StreamInterface; trait S3ObjectTrait { /** @@ -80,37 +81,77 @@ public function readObject($urn) { } /** + * Single object put helper + * * @param string $urn the unified resource name used to identify the object - * @param resource $stream stream with the data to write + * @param StreamInterface $stream stream with the data to write * @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0 * @throws \Exception when something goes wrong, message will be logged - * @since 7.0.0 */ - public function writeObject($urn, $stream, string $mimetype = null) { - $count = 0; - $countStream = CallbackWrapper::wrap($stream, function ($read) use (&$count) { - $count += $read; - }); + protected function writeSingle(string $urn, StreamInterface $stream, string $mimetype = null): void { + $this->getConnection()->putObject([ + 'Bucket' => $this->bucket, + 'Key' => $urn, + 'Body' => $stream, + 'ACL' => 'private', + 'ContentType' => $mimetype, + ]); + } - $uploader = new MultipartUploader($this->getConnection(), $countStream, [ + + /** + * Multipart upload helper that tries to avoid orphaned fragments in S3 + * + * @param string $urn the unified resource name used to identify the object + * @param StreamInterface $stream stream with the data to write + * @param string|null $mimetype the mimetype to set for the remove object + * @throws \Exception when something goes wrong, message will be logged + */ + protected function writeMultiPart(string $urn, StreamInterface $stream, string $mimetype = null): void { + $uploader = new MultipartUploader($this->getConnection(), $stream, [ 'bucket' => $this->bucket, 'key' => $urn, 'part_size' => $this->uploadPartSize, 'params' => [ 'ContentType' => $mimetype - ] + ], ]); try { $uploader->upload(); } catch (S3MultipartUploadException $e) { - // This is an empty file so just touch it then - if ($count === 0 && feof($countStream)) { - $uploader = new ObjectUploader($this->getConnection(), $this->bucket, $urn, ''); - $uploader->upload(); - } else { - throw $e; + // if anything goes wrong with multipart, make sure that you don´t poison and + // slow down s3 bucket with orphaned fragments + $uploadInfo = $e->getState()->getId(); + if ($e->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) { + $this->getConnection()->abortMultipartUpload($uploadInfo); } + throw $e; + } + } + + + /** + * @param string $urn the unified resource name used to identify the object + * @param resource $stream stream with the data to write + * @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0 + * @throws \Exception when something goes wrong, message will be logged + * @since 7.0.0 + */ + public function writeObject($urn, $stream, string $mimetype = null) { + $psrStream = Utils::streamFor($stream); + + // ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream + // so the optimisation does not apply + $buffer = new Psr7\Stream(fopen("php://memory", 'rwb+')); + Utils::copyToStream($psrStream, $buffer, MultipartUploader::PART_MIN_SIZE); + $buffer->seek(0); + if ($buffer->getSize() < MultipartUploader::PART_MIN_SIZE) { + // buffer is fully seekable, so use it directly for the small upload + $this->writeSingle($urn, $buffer, $mimetype); + } else { + $loadStream = new Psr7\AppendStream([$buffer, $psrStream]); + $this->writeMultiPart($urn, $loadStream, $mimetype); } } diff --git a/tests/lib/Files/ObjectStore/ObjectStoreTest.php b/tests/lib/Files/ObjectStore/ObjectStoreTest.php index 4ec44eb410de2..a245f0ae366e1 100644 --- a/tests/lib/Files/ObjectStore/ObjectStoreTest.php +++ b/tests/lib/Files/ObjectStore/ObjectStoreTest.php @@ -26,11 +26,27 @@ abstract class ObjectStoreTest extends TestCase { + /** @var string[] */ + private $cleanup = []; + /** * @return \OCP\Files\ObjectStore\IObjectStore */ abstract protected function getInstance(); + protected function cleanupAfter(string $urn) { + $this->cleanup[] = $urn; + } + + public function tearDown(): void { + parent::tearDown(); + + $instance = $this->getInstance(); + foreach ($this->cleanup as $urn) { + $instance->deleteObject($urn); + } + } + protected function stringToStream($data) { $stream = fopen('php://temp', 'w+'); fwrite($stream, $data); @@ -110,6 +126,9 @@ public function testExists() { } public function testCopy() { + $this->cleanupAfter('source'); + $this->cleanupAfter('target'); + $stream = $this->stringToStream('foobar'); $instance = $this->getInstance(); diff --git a/tests/lib/Files/ObjectStore/S3Test.php b/tests/lib/Files/ObjectStore/S3Test.php index 25bee9cbdd8de..a7a95d5337576 100644 --- a/tests/lib/Files/ObjectStore/S3Test.php +++ b/tests/lib/Files/ObjectStore/S3Test.php @@ -60,6 +60,12 @@ public function stream_seek($offset, $whence = SEEK_SET) { * @group PRIMARY-s3 */ class S3Test extends ObjectStoreTest { + public function setUp(): void { + parent::setUp(); + $s3 = $this->getInstance(); + $s3->deleteObject('multiparttest'); + } + protected function getInstance() { $config = \OC::$server->getConfig()->getSystemValue('objectstore'); if (!is_array($config) || $config['class'] !== S3::class) { @@ -70,6 +76,8 @@ protected function getInstance() { } public function testUploadNonSeekable() { + $this->cleanupAfter('multiparttest'); + $s3 = $this->getInstance(); $s3->writeObject('multiparttest', NonSeekableStream::wrap(fopen(__FILE__, 'r'))); @@ -80,6 +88,8 @@ public function testUploadNonSeekable() { } public function testSeek() { + $this->cleanupAfter('seek'); + $data = file_get_contents(__FILE__); $instance = $this->getInstance(); @@ -94,4 +104,80 @@ public function testSeek() { fseek($read, 100, SEEK_CUR); $this->assertEquals(substr($data, 210, 100), fread($read, 100)); } + + public function assertNoUpload($objectUrn) { + $s3 = $this->getInstance(); + $s3client = $s3->getConnection(); + $uploads = $s3client->listMultipartUploads([ + 'Bucket' => $s3->getBucket(), + 'Prefix' => $objectUrn, + ]); + $this->assertArrayNotHasKey('Uploads', $uploads); + } + + public function testEmptyUpload() { + $s3 = $this->getInstance(); + + $emptyStream = fopen("php://memory", "r"); + fwrite($emptyStream, null); + + $s3->writeObject('emptystream', $emptyStream); + + $this->assertNoUpload('emptystream'); + $this->assertTrue($s3->objectExists('emptystream')); + + $thrown = false; + try { + self::assertFalse($s3->readObject('emptystream')); + } catch (\Exception $e) { + // An exception is expected here since 0 byte files are wrapped + // to be read from an empty memory stream in the ObjectStoreStorage + $thrown = true; + } + self::assertTrue($thrown, 'readObject with range requests are not expected to work on empty objects'); + + $s3->deleteObject('emptystream'); + } + + /** File size to upload in bytes */ + public function dataFileSizes() { + return [ + [1000000], [2000000], [5242879], [5242880], [5242881], [10000000] + ]; + } + + /** @dataProvider dataFileSizes */ + public function testFileSizes($size) { + $this->cleanupAfter('testfilesizes'); + $s3 = $this->getInstance(); + + $sourceStream = fopen('php://memory', 'wb+'); + $writeChunkSize = 1024; + $chunkCount = $size / $writeChunkSize; + for ($i = 0; $i < $chunkCount; $i++) { + fwrite($sourceStream, str_repeat('A', + ($i < $chunkCount - 1) ? $writeChunkSize : $size - ($i * $writeChunkSize) + )); + } + rewind($sourceStream); + $s3->writeObject('testfilesizes', $sourceStream); + + $this->assertNoUpload('testfilesizes'); + self::assertTrue($s3->objectExists('testfilesizes')); + + $result = $s3->readObject('testfilesizes'); + + // compare first 100 bytes + self::assertEquals(str_repeat('A', 100), fread($result, 100)); + + // compare 100 bytes + fseek($result, $size - 100); + self::assertEquals(str_repeat('A', 100), fread($result, 100)); + + // end of file reached + fseek($result, $size); + self:self::assertTrue(feof($result)); + + $this->assertNoUpload('testfilesizes'); + } }