Skip to content

Commit

Permalink
feat(PubSub): Add CloudStorageConfig to subscribe function (#6482)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajupazhamayil authored Aug 2, 2023
1 parent 4801b80 commit bcb075f
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 18 deletions.
8 changes: 8 additions & 0 deletions PubSub/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
use Google\Protobuf\Duration;
use Google\Protobuf\FieldMask;
use Google\Protobuf\Timestamp;
use Google\Cloud\PubSub\V1\CloudStorageConfig;

/**
* Implementation of the
Expand Down Expand Up @@ -324,6 +325,13 @@ public function createSubscription(array $args)
);
}

if (isset($args['cloudStorageConfig'])) {
$args['cloudStorageConfig'] = $this->serializer->decodeMessage(
new CloudStorageConfig(),
$args['cloudStorageConfig']
);
}

return $this->send([$this->getSubscriberClient(), 'createSubscription'], [
$this->pluck('name', $args),
$this->pluck('topic', $args),
Expand Down
69 changes: 69 additions & 0 deletions PubSub/src/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,35 @@ public function detached(array $options = [])
* be between 0 and 600 seconds. Defaults to 600 seconds.
* @type bool $enableExactlyOnceDelivery Indicates whether to enable
* 'Exactly Once Delivery' on the subscription.
* @type array $cloudStorageConfig If provided, messages will be delivered to Google Cloud Storage.
* @type string $cloudStorageConfig.bucket User-provided name for the Cloud Storage bucket.
* The bucket must be created by the user. The bucket name must be without
* any prefix like "gs://". See the [bucket naming
* requirements] (https://cloud.google.com/storage/docs/buckets#naming).
* @type string $cloudStorageConfig.filenamePrefix
* User-provided prefix for Cloud Storage filename. See the [object naming
* requirements](https://cloud.google.com/storage/docs/objects#naming).
* @type string $cloudStorageConfig.filenameSuffix
* User-provided suffix for Cloud Storage filename. See the [object naming
* requirements](https://cloud.google.com/storage/docs/objects#naming). Must
* not end in "/".
* @type array $cloudStorageConfig.textConfig If present, payloads will be written
* to Cloud Storage as raw text, separated by a newline.
* @type array $cloudStorageConfig.avroConfig If set, message payloads and metadata
* will be written to Cloud Storage in Avro format.
* @type bool $cloudStorageConfig.avroConfig.writeMetadata
* When true, write the subscription name, message_id, publish_time,
* attributes, and ordering_key as additional fields in the output.
* @type Duration|string $cloudStorageConfig.maxDuration The maximum duration
* that can elapse before a new Cloud Storage file is created.
* Min 1 minute, max 10 minutes, default 5 minutes. May not exceed the
* subscription's acknowledgement deadline. If a string is provided,
* it should be as a duration in seconds with up to nine fractional digits,
* terminated by 's', e.g "3.5s"
* @type int|string $cloudStorageConfig.maxBytes The maximum bytes that can be
* written to a Cloud Storage file before a new file is created.
* Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded in cases where
* messages are larger than the limit.
* }
* @return array An array of subscription info
* @throws \InvalidArgumentException
Expand Down Expand Up @@ -499,6 +528,35 @@ public function create(array $options = [])
* be between 0 and 600 seconds. Defaults to 600 seconds.
* @type bool $enableExactlyOnceDelivery Indicates whether to enable
* 'Exactly Once Delivery' on the subscription.
* @type array $cloudStorageConfig If provided, messages will be delivered to Google Cloud Storage.
* @type string $cloudStorageConfig.bucket User-provided name for the Cloud Storage bucket.
* The bucket must be created by the user. The bucket name must be without
* any prefix like "gs://". See the [bucket naming
* requirements] (https://cloud.google.com/storage/docs/buckets#naming).
* @type string $cloudStorageConfig.filenamePrefix
* User-provided prefix for Cloud Storage filename. See the [object naming
* requirements](https://cloud.google.com/storage/docs/objects#naming).
* @type string $cloudStorageConfig.filenameSuffix
* User-provided suffix for Cloud Storage filename. See the [object naming
* requirements](https://cloud.google.com/storage/docs/objects#naming). Must
* not end in "/".
* @type array $cloudStorageConfig.textConfig If present, payloads will be written
* to Cloud Storage as raw text, separated by a newline.
* @type array $cloudStorageConfig.avroConfig If set, message payloads and metadata
* will be written to Cloud Storage in Avro format.
* @type bool $cloudStorageConfig.avroConfig.writeMetadata
* When true, write the subscription name, message_id, publish_time,
* attributes, and ordering_key as additional fields in the output.
* @type Duration|string $cloudStorageConfig.maxDuration The maximum duration
* that can elapse before a new Cloud Storage file is created.
* Min 1 minute, max 10 minutes, default 5 minutes. May not exceed the
* subscription's acknowledgement deadline. If a string is provided,
* it should be as a duration in seconds with up to nine fractional digits,
* terminated by 's', e.g "3.5s"
* @type int|string $cloudStorageConfig.maxBytes The maximum bytes that can be
* written to a Cloud Storage file before a new file is created.
* Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded in cases where
* messages are larger than the limit.
* }
* @param array $options [optional] {
* Configuration options.
Expand Down Expand Up @@ -1263,6 +1321,17 @@ private function formatSubscriptionDurations(array $options)
);
}

if (isset($options['cloudStorageConfig']['maxDuration']) &&
$options['cloudStorageConfig']['maxDuration'] instanceof Duration
) {
$duration = $options['cloudStorageConfig']['maxDuration']->get();
$options['cloudStorageConfig']['maxDuration'] = sprintf(
'%s.%ss',
$duration['seconds'],
$this->convertNanoSecondsToFraction($duration['nanos'], false)
);
}

return $options;
}

Expand Down
114 changes: 96 additions & 18 deletions PubSub/tests/System/ManageSubscriptionsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,71 @@ public function testCreateAndListSubscriptions($client)
$this->assertSubsFound($topic, $subsToCreate);
}

/**
* @dataProvider clientProvider
*/
public function testCreateSubscriptionWithCloudStorageConfig($client)
{
$gcsBucket = getenv('GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET');
if (!$gcsBucket) {
$this->markTestSkipped(
'Must provide `GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET` to run this test.'
);
return;
}

$topic = self::topic($client);
$bucket = [
'bucket' => $gcsBucket,
'avroConfig' => ['writeMetadata' => false],
'maxDuration' => new Duration(150, 1e+9),
'maxBytes' => '2000'
];

$subsToCreate = [
uniqid(self::TESTING_PREFIX),
];

foreach ($subsToCreate as $subToCreate) {
self::$deletionQueue->add($client->subscribe(
$subToCreate,
$topic,
['cloudStorageConfig' => $bucket]
));
}

$this->assertSubsFound($client, $subsToCreate, true);
}

/**
* @dataProvider clientProvider
*/
public function testUpdateSubscriptionWithCloudStorageConfig($client)
{
$gcsBucket = getenv('GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET');
if (!$gcsBucket) {
$this->markTestSkipped(
'Must provide `GCP_PHP_PUBSUB_TEST_CLOUD_STORAGE_BUCKET` to run this test.'
);
return;
}

$topic = self::topic($client);
$subToCreate = uniqid(self::TESTING_PREFIX);
$sub = $client->subscribe($subToCreate, $topic);
self::$deletionQueue->add($sub);

$isSetCloudStorageConfig = isset($sub->info()['cloudStorageConfig']) ?? false;
$bucket = ['bucket' => $gcsBucket];

$sub->update([
'cloudStorageConfig' => $bucket
]);

$this->assertEquals(false, $isSetCloudStorageConfig);
$this->assertEquals(true, $sub->info()['cloudStorageConfig'] ? true : false);
}

/**
* @dataProvider clientProvider
*/
Expand Down Expand Up @@ -399,29 +464,42 @@ public function testDetach($client)
$this->assertTrue($sub->detached());
}

private function assertSubsFound($class, $expectedSubs)
{
private function assertSubsFound(
$class,
$expectedSubs,
$assertForStorageConfig = false
) {
$backoff = new ExponentialBackoff(8);
$hasFoundSubs = $backoff->execute(function () use ($class, $expectedSubs) {
$foundSubs = [];
$subs = $class->subscriptions();

foreach ($subs as $sub) {
$nameParts = explode('/', $sub->name());
$sName = end($nameParts);
foreach ($expectedSubs as $key => $expectedSub) {
if ($sName === $expectedSub) {
$foundSubs[$key] = $sName;
$hasFoundSubs = $backoff->execute(
function () use ($class, $expectedSubs, $assertForStorageConfig) {
$foundSubs = [];
$subs = $class->subscriptions();

foreach ($subs as $sub) {
$nameParts = explode('/', $sub->name());
$sName = end($nameParts);
foreach ($expectedSubs as $key => $expectedSub) {
if ($sName === $expectedSub) {
if ($assertForStorageConfig) {
if (isset($sub->info()['cloudStorageConfig'])) {
$foundSubs[$key] = $sName;
}
} else {
$foundSubs[$key] = $sName;
}
}
}
}
}

if (sort($foundSubs) === sort($expectedSubs)) {
return true;
}
if (sort($foundSubs) === sort($expectedSubs)) {
return true;
}

throw new \Exception('Items not found in the allotted number of attempts.');
});
throw new \Exception(
'Items not found in the allotted number of attempts.'
);
}
);

$this->assertTrue($hasFoundSubs);
}
Expand Down
63 changes: 63 additions & 0 deletions PubSub/tests/Unit/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,69 @@ public function testDetach()
$this->assertEquals([], $this->subscription->detach());
}

public function testCreateSubscriptionWithCloudStorageConfig()
{
$bucket = [
'bucket' => 'pubsub-test-bucket',
'maxDuration' => new Duration(3, 1e+9)
];
$bucketString = [
'bucket' => 'pubsub-test-bucket',
'maxDuration' => '3.1s'
];
$this->connection->createSubscription(Argument::allOf(
Argument::withEntry('foo', 'bar'),
Argument::withEntry('cloudStorageConfig', $bucketString)
))->willReturn([
'name' => self::SUBSCRIPTION,
'topic' => self::TOPIC
])->shouldBeCalledTimes(1);

$this->connection->getSubscription()->shouldNotBeCalled();

$this->subscription->___setProperty('connection', $this->connection->reveal());

$sub = $this->subscription->create([
'foo' => 'bar',
'cloudStorageConfig' => $bucket
]);

$this->assertEquals($sub['name'], self::SUBSCRIPTION);
$this->assertEquals($sub['topic'], self::TOPIC);
}

public function testUpdateSubscriptionWithCloudStorageConfig()
{
$bucket = [
'bucket' => 'pubsub-test-bucket',
'maxDuration' => new Duration(3, 1e+9)
];
$bucketString = [
'name' => 'projects/project-id/subscriptions/subscription-name',
'cloudStorageConfig' => [
'bucket' => 'pubsub-test-bucket',
'maxDuration' => '3.1s'
]
];
$this->connection->updateSubscription(
Argument::containing($bucketString)
)->willReturn([
'name' => self::SUBSCRIPTION,
'topic' => self::TOPIC
])->shouldBeCalledTimes(1);

$this->connection->getSubscription()->shouldNotBeCalled();

$this->subscription->___setProperty('connection', $this->connection->reveal());

$sub = $this->subscription->update([
'cloudStorageConfig' => $bucket
]);

$this->assertEquals($sub['name'], self::SUBSCRIPTION);
$this->assertEquals($sub['topic'], self::TOPIC);
}

// Helper method to generate the exception sent during an invalid EOD operation
// like acknowledge or modifyAckDeadline
private function generateEodException($metadata, $failureReason = 'EXACTLY_ONCE_ACKID_FAILURE')
Expand Down

0 comments on commit bcb075f

Please sign in to comment.