-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Google cloud storage support #1340
Conversation
Hi @josipgrgurica, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
I signed the CLA. Sorry, I totally forgot about that. |
Thanks for picking this up! |
@francisdb you're welcome. Thank you for doing great work on this connector 👏. |
Sorry for being slow on reviewing. We focused on Alpakka Kafka for a long while, but are back on this part of Alpakka now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great so far I got today. I haven't looked into the internals, yet.
We've recently tried out to use Akka Extensions for connectors that normally have a single incarnation (just as Akka Http contains Http()
), that would flip the usage around to do things via methods on a CloudStorage
object which implicitly takes the extension. What do you think about such a design? (see #1323 which would change S3 to that style)
...storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/GoogleCloudStorageModel.scala
Outdated
Show resolved
Hide resolved
...storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/GoogleCloudStorageModel.scala
Outdated
Show resolved
Hide resolved
...storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/GoogleCloudStorageModel.scala
Outdated
Show resolved
Hide resolved
...a/akka/stream/alpakka/googlecloud/storage/impl/GoogleCloudStorageClientIntegrationSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really great, I've nothing big to object.
google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Chunker.scala
Outdated
Show resolved
Hide resolved
google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala
Show resolved
Hide resolved
google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala
Outdated
Show resolved
Hide resolved
google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GoogleCloudStorageClient.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
private[storage] abstract class GoogleCloudStorage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This cannot be private as it is part of the public API. You should make it a final class by passing in the client and ec.
* Gets information on a bucket | ||
* | ||
* @param bucketName the name of the bucket to look up | ||
* @return a [[CompletionStage]] containing [[Bucket]] if it exists */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scaladoc requires the full package path for classes not in the same package (importing doesn't help). But I think its enough to back-tick those as they show clickable in the message signatures.
} | ||
} | ||
|
||
private[storage] trait GoogleCloudStorage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't be private. You should make it a final class by passing in the client.
new StorageSettings(projectId, clientEmail, privateKey) | ||
|
||
override def toString: String = | ||
s"StorageSettings(projectId=$projectId, clientEmail=$clientEmail, privateKey=$privateKey" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't print out the key by mistake...
s"StorageSettings(projectId=$projectId, clientEmail=$clientEmail, privateKey=$privateKey" | |
s"StorageSettings(projectId=$projectId, clientEmail=$clientEmail, privateKey=***)" |
val chunkSize = 5 * 1024 * 1024 | ||
|
||
val uploadSink: Sink[ByteString, Future[StorageObject]] = | ||
storage.resumableUpload(bucketName, objectName, ContentTypes.`text/plain(UTF-8)`, chunkSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a perfect world, we'd pull these from sources that are run within the tests.
Is there any plans to continue working on this PR? |
Yes there are, I plan to continue by the end of this week. 😄 |
Hi @josipgrgurica, Do you still have plans to continue this? Or would you like anybody else to carry on? |
Hi, I'm sorry about not pushing anything, I got stuck in my daily job 😞 I pushed code with changes for most comments. |
That makes sense to me, is this PR that I should use for reference #1395 ? |
private def expiresSoon(g: AccessTokenExpiry): Boolean = | ||
g.expiresAt < (tokenApi.now + 60) | ||
|
||
def getToken()(implicit materializer: Materializer): Future[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using this lib in production for few months and I noticed that this call can fail, it is transient issue. Does it make sense to retry it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, but it is important to back off appropriately.
I'm not aware that someone had that issue with the other Google Cloud connectors, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, we should be careful with back off.
What do you suggest for defaults?
@ennru I refactored code to use Akka Extensions, I took current S3 api design as a guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had another look, still impressed. Great that you added the extension.
google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/Formats.scala
Show resolved
Hide resolved
private def expiresSoon(g: AccessTokenExpiry): Boolean = | ||
g.expiresAt < (tokenApi.now + 60) | ||
|
||
def getToken()(implicit materializer: Materializer): Future[String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, but it is important to back off appropriately.
I'm not aware that someone had that issue with the other Google Cloud connectors, though.
...ud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
Outdated
Show resolved
Hide resolved
contentType: ContentType, | ||
chunkSize: Int): Flow[ByteString, (HttpRequest, (MultiPartUpload, Int)), NotUsed] = { | ||
|
||
assert( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion could go all the way up to resumableUpload
AFAICS.
val projectId = c.getString("project-id") | ||
val clientEmail = c.getString("client-email") | ||
val privateKey = c.getString("private-key") | ||
val baseUrl = if (c.hasPath("base-url")) c.getString("base-url") else defaultBaseUrl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of using config is to have all defaults in reference.conf
so that they are always set.
The build fails on MiMa, you need to add |
Hi @jkobejs, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good. Just some nitpick comments.
GCStorageStream.listBucket(bucket, Option(prefix)).asJava | ||
|
||
/** | ||
* Downloads object from bucket. Returns an empty Source if the object was not found. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is contradictory with the comment in the "@return" section.
import akka.actor.{ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider} | ||
|
||
/** | ||
* Manages one [[S3Settings]] per `ActorSystem`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Manages one [[S3Settings]] per `ActorSystem`. | |
* Manages one [[GCStorageSettings]] per `ActorSystem`. |
@@ -0,0 +1,10 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be moved to impl
package, as it is only accessed from there.
@@ -0,0 +1,21 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be moved to impl
package, as it is only accessed from there.
|
||
import scala.collection.immutable.Seq | ||
|
||
final case class FailedUpload(reasons: Seq[Throwable]) extends Exception(reasons.map(_.getMessage).mkString(", ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be converted to non-case class and introduced a Java API getter for reasons (as other model classes have).
@@ -0,0 +1,10 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be moved to impl
package, as it is only accessed from there.
@@ -0,0 +1,22 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be moved to impl
package, as it is only accessed from there.
@@ -0,0 +1,17 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be moved to impl
package, as it is only accessed from there.
@@ -0,0 +1,6 @@ | |||
alpakka.googlecloud.storage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other Google Cloud connector (Pub/Sub) has used alpakka.google.cloud.pubsub
namespace. So this one should follow suit.
project/Dependencies.scala
Outdated
@@ -22,6 +22,8 @@ object Dependencies { | |||
val CouchbaseVersion = "2.7.2" | |||
val CouchbaseVersionForDocs = "2.7" | |||
|
|||
val JwtCoreVersion = "2.1.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 2.13 build failed because we upgraded from Scala 2.13.0-M5 to 2.13.0 in master and jwt-core 2.1.0 is not available for Scala 2.13.0. It is a bit tricky to upgrade jwt-core for libraries that are using it already (#1762 and #1763), but since this is a new connector, this can be upgraded to 3.0.1 safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated JwtCoreVersion to 3.0.1
together with failing code.
I noticed these warnings when compiling against Scala 2.13.0
[warn] /Users/jgrgurica/Development/FOOS/alpakka/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/FailedUpload.scala:15:47: object JavaConverters in package collection is deprecated (since 2.13.0): Use `scala.jdk.CollectionConverters` instead
[warn] def getReasons: java.util.List[Throwable] = reasons.asJava
[warn] ^
[warn] /Users/jgrgurica/Development/FOOS/alpakka/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/FailedUpload.scala:23:65: object JavaConverters in package collection is deprecated (since 2.13.0): Use `scala.jdk.CollectionConverters` instead
[warn] def create(reasons: java.util.List[Throwable]) = FailedUpload(reasons.asScala.toList)
[warn] ^
[warn] /Users/jgrgurica/Development/FOOS/alpakka/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala:598:27: value Stream in package scala is deprecated (since 2.13.0): Use LazyList instead of Stream
[warn] .mapConcat(r => Stream.continually(r))
[warn] ^
[warn] three warnings found
Can you advise me how to deal with these warnings since scala.jdk.CollenctionConverters
and scala.collection.immutable.LazyList
doesn't exist in previous versions of Scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have to deal with warnings for now. It will be possible to get rid of those when a new scala-collection-compat release is out with scala/scala-collection-compat#217
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one looks good to be merged in. I'll squash the commits to one, that contains all @francisdb work, and to another one with all @jkobejs work. And after last Travis validation run wel'll merge it.
Implement missing apis Add documentation Improve java/scala dsl's Add mocked http tests Write tests for client facing apis Add stroage test command into travis script Refactor api to static methods remove VS settings, format dependencies, add project info add reference conf, log status codes, disable mima Bump jwt-core to 3.0.1
Failure was #1417 |
Great to have this in. Thanks a lot @francisdb and @jkobejs! |
Great to see @jkobejs took the time to complete this, thanks! |
I couldn't agree more, this is what open source is about. Work together. |
@francisdb thanks for opportunity to finish your work, it has been a pleasure working on this! |
Fixes #588
Finished work that @francisdb started on #650.
I didn't add documentation for all api methods, should add documentation for remaining ones?