Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor writeObject to only use MultipartUpload when required #27877

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/private/Files/ObjectStore/S3ConnectionTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ protected function parseParams($params) {

$this->test = isset($params['test']);
$this->bucket = $params['bucket'];
$this->proxy = isset($params['proxy']) ? $params['proxy'] : false;
$this->timeout = !isset($params['timeout']) ? 15 : $params['timeout'];
$this->uploadPartSize = !isset($params['uploadPartSize']) ? 524288000 : $params['uploadPartSize'];
$this->proxy = $params['proxy'] ?? false;
$this->timeout = $params['timeout'] ?? 15;
$this->uploadPartSize = $params['uploadPartSize'] ?? 524288000;
$params['region'] = empty($params['region']) ? 'eu-west-1' : $params['region'];
$params['hostname'] = empty($params['hostname']) ? 's3.' . $params['region'] . '.amazonaws.com' : $params['hostname'];
if (!isset($params['port']) || $params['port'] === '') {
Expand Down
75 changes: 58 additions & 17 deletions lib/private/Files/ObjectStore/S3ObjectTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@

use Aws\S3\Exception\S3MultipartUploadException;
use Aws\S3\MultipartUploader;
use Aws\S3\ObjectUploader;
use Aws\S3\S3Client;
use Icewind\Streams\CallbackWrapper;
use GuzzleHttp\Psr7\Utils;
use OC\Files\Stream\SeekableHttpStream;
use GuzzleHttp\Psr7;
use Psr\Http\Message\StreamInterface;

trait S3ObjectTrait {
/**
Expand Down Expand Up @@ -80,37 +81,77 @@ public function readObject($urn) {
}

/**
* Single object put helper
juliusknorr marked this conversation as resolved.
Show resolved Hide resolved
*
* @param string $urn the unified resource name used to identify the object
* @param resource $stream stream with the data to write
* @param StreamInterface $stream stream with the data to write
* @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0
* @throws \Exception when something goes wrong, message will be logged
* @since 7.0.0
*/
public function writeObject($urn, $stream, string $mimetype = null) {
$count = 0;
$countStream = CallbackWrapper::wrap($stream, function ($read) use (&$count) {
$count += $read;
});
protected function writeSingle(string $urn, StreamInterface $stream, string $mimetype = null): void {
$this->getConnection()->putObject([
'Bucket' => $this->bucket,
'Key' => $urn,
'Body' => $stream,
'ACL' => 'private',
'ContentType' => $mimetype,
]);
}

$uploader = new MultipartUploader($this->getConnection(), $countStream, [

/**
* Multipart upload helper that tries to avoid orphaned fragments in S3
juliusknorr marked this conversation as resolved.
Show resolved Hide resolved
*
* @param string $urn the unified resource name used to identify the object
* @param StreamInterface $stream stream with the data to write
* @param string|null $mimetype the mimetype to set for the remove object
* @throws \Exception when something goes wrong, message will be logged
*/
protected function writeMultiPart(string $urn, StreamInterface $stream, string $mimetype = null): void {
$uploader = new MultipartUploader($this->getConnection(), $stream, [
'bucket' => $this->bucket,
'key' => $urn,
'part_size' => $this->uploadPartSize,
'params' => [
'ContentType' => $mimetype
]
],
]);

try {
$uploader->upload();
} catch (S3MultipartUploadException $e) {
// This is an empty file so just touch it then
if ($count === 0 && feof($countStream)) {
$uploader = new ObjectUploader($this->getConnection(), $this->bucket, $urn, '');
$uploader->upload();
} else {
throw $e;
// if anything goes wrong with multipart, make sure that you don´t poison and
// slow down s3 bucket with orphaned fragments
$uploadInfo = $e->getState()->getId();
if ($e->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) {
$this->getConnection()->abortMultipartUpload($uploadInfo);
}
throw $e;
}
}


/**
* @param string $urn the unified resource name used to identify the object
* @param resource $stream stream with the data to write
* @param string|null $mimetype the mimetype to set for the remove object @since 22.0.0
* @throws \Exception when something goes wrong, message will be logged
* @since 7.0.0
*/
public function writeObject($urn, $stream, string $mimetype = null) {
$psrStream = Utils::streamFor($stream);

// ($psrStream->isSeekable() && $psrStream->getSize() !== null) evaluates to true for a On-Seekable stream
// so the optimisation does not apply
$buffer = new Psr7\Stream(fopen("php://memory", 'rwb+'));
Utils::copyToStream($psrStream, $buffer, MultipartUploader::PART_MIN_SIZE);
$buffer->seek(0);
if ($buffer->getSize() < MultipartUploader::PART_MIN_SIZE) {
// buffer is fully seekable, so use it directly for the small upload
$this->writeSingle($urn, $buffer, $mimetype);
} else {
$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We append buffer and psrStream because the psrStream might be non-seekable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

$this->writeMultiPart($urn, $loadStream, $mimetype);
}
}

Expand Down
19 changes: 19 additions & 0 deletions tests/lib/Files/ObjectStore/ObjectStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,27 @@

abstract class ObjectStoreTest extends TestCase {

/** @var string[] */
private $cleanup = [];

/**
* @return \OCP\Files\ObjectStore\IObjectStore
*/
abstract protected function getInstance();

protected function cleanupAfter(string $urn) {
$this->cleanup[] = $urn;
}

public function tearDown(): void {
parent::tearDown();

$instance = $this->getInstance();
foreach ($this->cleanup as $urn) {
$instance->deleteObject($urn);
}
}

protected function stringToStream($data) {
$stream = fopen('php://temp', 'w+');
fwrite($stream, $data);
Expand Down Expand Up @@ -110,6 +126,9 @@ public function testExists() {
}

public function testCopy() {
$this->cleanupAfter('source');
$this->cleanupAfter('target');

$stream = $this->stringToStream('foobar');

$instance = $this->getInstance();
Expand Down
86 changes: 86 additions & 0 deletions tests/lib/Files/ObjectStore/S3Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public function stream_seek($offset, $whence = SEEK_SET) {
* @group PRIMARY-s3
*/
class S3Test extends ObjectStoreTest {
public function setUp(): void {
parent::setUp();
$s3 = $this->getInstance();
$s3->deleteObject('multiparttest');
}

protected function getInstance() {
$config = \OC::$server->getConfig()->getSystemValue('objectstore');
if (!is_array($config) || $config['class'] !== S3::class) {
Expand All @@ -70,6 +76,8 @@ protected function getInstance() {
}

public function testUploadNonSeekable() {
$this->cleanupAfter('multiparttest');

$s3 = $this->getInstance();

$s3->writeObject('multiparttest', NonSeekableStream::wrap(fopen(__FILE__, 'r')));
Expand All @@ -80,6 +88,8 @@ public function testUploadNonSeekable() {
}

public function testSeek() {
$this->cleanupAfter('seek');

$data = file_get_contents(__FILE__);

$instance = $this->getInstance();
Expand All @@ -94,4 +104,80 @@ public function testSeek() {
fseek($read, 100, SEEK_CUR);
$this->assertEquals(substr($data, 210, 100), fread($read, 100));
}

public function assertNoUpload($objectUrn) {
$s3 = $this->getInstance();
$s3client = $s3->getConnection();
$uploads = $s3client->listMultipartUploads([
'Bucket' => $s3->getBucket(),
'Prefix' => $objectUrn,
]);
$this->assertArrayNotHasKey('Uploads', $uploads);
}

public function testEmptyUpload() {
$s3 = $this->getInstance();

$emptyStream = fopen("php://memory", "r");
fwrite($emptyStream, null);

$s3->writeObject('emptystream', $emptyStream);

$this->assertNoUpload('emptystream');
$this->assertTrue($s3->objectExists('emptystream'));

$thrown = false;
try {
self::assertFalse($s3->readObject('emptystream'));
} catch (\Exception $e) {
// An exception is expected here since 0 byte files are wrapped
// to be read from an empty memory stream in the ObjectStoreStorage
$thrown = true;
}
self::assertTrue($thrown, 'readObject with range requests are not expected to work on empty objects');

$s3->deleteObject('emptystream');
}

/** File size to upload in bytes */
public function dataFileSizes() {
return [
[1000000], [2000000], [5242879], [5242880], [5242881], [10000000]
];
}

/** @dataProvider dataFileSizes */
public function testFileSizes($size) {
$this->cleanupAfter('testfilesizes');
$s3 = $this->getInstance();

$sourceStream = fopen('php://memory', 'wb+');
$writeChunkSize = 1024;
$chunkCount = $size / $writeChunkSize;
for ($i = 0; $i < $chunkCount; $i++) {
fwrite($sourceStream, str_repeat('A',
($i < $chunkCount - 1) ? $writeChunkSize : $size - ($i * $writeChunkSize)
));
}
rewind($sourceStream);
$s3->writeObject('testfilesizes', $sourceStream);

$this->assertNoUpload('testfilesizes');
self::assertTrue($s3->objectExists('testfilesizes'));

$result = $s3->readObject('testfilesizes');

// compare first 100 bytes
self::assertEquals(str_repeat('A', 100), fread($result, 100));

// compare 100 bytes
fseek($result, $size - 100);
self::assertEquals(str_repeat('A', 100), fread($result, 100));

// end of file reached
fseek($result, $size);
self:self::assertTrue(feof($result));

$this->assertNoUpload('testfilesizes');
}
}