-
Notifications
You must be signed in to change notification settings - Fork 643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added list bucket method to S3 library. #253
Conversation
…s they are already retried on the underlying layer with max-retries
When running sbt test:compile locally, I get successful runs. Im not sure what exactly is going wrong in the Travis build. |
IMHO there it is an "external" issue, I have *sometime* the same error !
Le ven. 7 avr. 2017 à 18:42, Sean Callahan <[email protected]> a
écrit :
When running sbt test:compile locally, I get successful runs. Im not sure
what exactly is going wrong in the Travis build.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#253 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAXGq-CQrAbVrjGntrHzy2ptLLmtZawvks5rtmeQgaJpZM4M2RmP>
.
|
I tried to resolve the conflicts (hopefully correctly...). |
All still looks good at first glance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few comments. Two things should be fixed:
- the recursion in
fileSourceFromFuture
- and the potential double encoding
listBucket
Have you tested that it works?
case None => uri.withScheme("https") | ||
case Some(proxy) => uri.withPort(proxy.port).withScheme(proxy.scheme) | ||
} | ||
private[this] def requestHost(bucket: String, region: String)(implicit conf: S3Settings): Uri.Host = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, to put extract that method.
@@ -43,6 +41,8 @@ final case class FailedUpload(reasons: Seq[Throwable]) extends Exception | |||
|
|||
final case class CompleteMultipartUploadResult(location: Uri, bucket: String, key: String, etag: String) | |||
|
|||
final case class ListBucketResult(is_truncated: Boolean, continuation_token: Option[String], keys: Seq[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should keep the camel case style for parameters?
continuation_token: Option[String] = None | ||
)(implicit conf: S3Settings): HttpRequest = { | ||
|
||
val listTypeQuery: (String, String) = "list-type" -> "2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this implementation is fine as is.
It could probably be simplified by collecting all parameters as a Seq[(String, Option[String])
and then converting that to a Map, filtering out the None
values.
Something like
Seq("list-type" -> Some("2"),
"prefix" -> prefix,
"continuation-token" -> continuation_token.map(_.replace...)
).collect { case (k, Some(v)) => k -> v }.toMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, this is a little nicer. Still leveling up on my use of the collections in scala and using them properly.
val listTypeQuery: (String, String) = "list-type" -> "2" | ||
val prefixQuery: Option[(String, String)] = prefix.map("prefix" -> _) | ||
val continuationTokenQuery: Option[(String, String)] = | ||
continuation_token.map("continuation-token" -> _.replaceAll("=", "%3D")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does only this character need to be replaced? Are you use that's correct at all, because Uri.query
will also encode characters, so that the percent character may then be double encoded?
bucket: String, | ||
region: String, | ||
prefix: Option[String] = None, | ||
continuation_token: Option[String] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use camel case there as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix
def fileSourceFromFuture(f: Future[ListBucketResult]): Source[String, NotUsed] = | ||
Source | ||
.fromFuture(f) | ||
.flatMapConcat((res: ListBucketResult) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The recursive calls here look like a nice pattern but it doesn't run in constant memory as expected for streams and may lead to OOM eventually. See the comment akka/akka#22352 (comment) here for a potential workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Ill dig in and try to get something working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After digging a little here, Im not quite sure how you use this technique to page through the results properly. I dont how statefulMapConcat gives you access to the previous value than continues to page until completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the meantime I figured that using Source.unfoldAsync
would probably even simpler. It allows you to thread state through between each call. So, it should basically look similar to this:
def getNextBucket(continuationToken: Option[String]): Future[Option[(String, Seq[...])]] // needs to return a future of a tuple `Some(next_continuation_token, sequence of results)` (or `None` if complete).
Source.unfoldAsync(None)(getNextBucket)
.mapConcat(identity) // flatten sequence of results
Can you try that?
import scala.collection.immutable.Seq | ||
import scala.concurrent.{ExecutionContext, Future} | ||
|
||
private[alpakka] object HttpRequests { | ||
|
||
def listBucket( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it wouldn't make sense to use separate methods for the initial call and the continuation. It seems that you either would want to add a prefix or a token both never both?
def listBucketCall(continuation_token: Option[String] = None): Future[ListBucketResult] = | ||
signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, region, prefix, continuation_token)) | ||
|
||
def fileSourceFromFuture(f: Future[ListBucketResult]): Source[String, NotUsed] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it called fileSourceFromFuture
? ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the best name now that you mention it. Hah. Got any better ideas?
Also, for what its worth, I have been using a forked version of the repo for a while now using this method with success. |
The increased memory usage will probably only show for particular use cases. @Scalahansolo, could you try |
…get keys and used unfoldAsync to get all keys to run in constant memory.
The unfoldAsync ended up working out. Thats a really cool API method that I was unaware of. The type on the listBucket call ended up being pretty brutal, but the logic seems sound. |
* and the first call Boolean is false, we know that we are done getting keys. | ||
* @return | ||
*/ | ||
def listBucketCall(args: (Option[String], Boolean)): Future[Option[((Option[String], Boolean), Seq[String])]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see, you need to distinguish "Start", "Running", and "End" states. Maybe it would make sense to create a three state ADT instead? That would read a bit cleaner:
sealed trait State
case object Starting extends State
case class Running(continuationTokenL String) extends State
case object Finished extends State
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, that would be a little cleaner for sure. Ill see what I can come up. Will update this soon.
…f the brutal type signature from earlier.
I think that you'll like this a bit better. I definitely feel this is cleaner and much easier to read. |
} | ||
|
||
Source | ||
.unfoldAsync[ListBucketState, Seq[String]](Starting) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is great now.
@@ -43,6 +41,13 @@ final case class FailedUpload(reasons: Seq[Throwable]) extends Exception | |||
|
|||
final case class CompleteMultipartUploadResult(location: Uri, bucket: String, key: String, etag: String) | |||
|
|||
final case class ListBucketResult(isTruncated: Boolean, continuationToken: Option[String], keys: Seq[String]) | |||
|
|||
sealed trait ListBucketState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move those into a private place so that it doesn't clutter the public namespace? Would be ok from my side to but the whole hierarchy into the listBucket
method if it's only used there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost done. Can be merged from my side after the internal state hierarchy classes are moved into a more private place.
…he public namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Great one, thanks a lot @Scalahansolo. |
- Set default settings as default parameter - Improve tests Implement java dsl [FTP] Critical fix for infinite loop of traversing "." and ".." directories Upgrade to aws-java-sdk-dynamodb 1.11.106 Allow to pass a `SSLSocketFactory` to `MqttConnectionSettings` FTP - attribute enrichment of FTPFile akka#153 Add KairosDB connector = akka#135 Limit parallelism for the SqsSource (akka#163) * Create a custom thread pool for the SqsSource and limit concurrency with the buffer size * Provide AWSCredentials for all SqsSource tests * Provide better java api * Use ArrayDeque as FIFO queue Messages for asserts in SqsSourceSettings * Remove AmazonSQSAsyncClient from factory method - Correct the javadoc * Add implicit AmazonSQSAsync to factory method - It now conforms again with the SqsSink - Passing the clients from extern seems the preferred way (awslambda module) * Update documentation to reflect thread pool usage * Improvements from review by ktoso - Replace IntStream with Source.range - Use Sink.head instead of Sink.seq =pro update akka http to 10.0.5 (akka#230) Make Travis fail build on code format differences =sqs fix typo in require in SqsSourceSettings (akka#228) FTP - toPath sink akka#182 Add GCE Pubsub with publish and subscribe. improve naming consistency of private vars. PR feedback. Improve java api, java examples, better json marshalling. use mapAsyncUnordered s3: provide access to the returned response s3: clean up test log s3: add javadsl for request() Time goes by, next try Update connectors.md (akka#237) Make external libs more visible (reactive kafka) (akka#229) * Update TOC depth to 3, to show Reactive Kafka Now people don't notice Kafka is in here since it's "external", expanding the TOC one more level makes it more visible. WDYT? * Update index.md FTP: make lastModified test more robust (fixes akka#236) Add SqsAckSink (akka#129) Add SqsAckSink * update elasticmq version * update dependencies Added possibility configure Sftp connection using private key akka#197. - Added SftpIdentity case class to allow configuring private/public key, - Added option to configure known_hosts file and test to check its usage. - Added spec that should fail password based authentication and revert to private key one, - Added docs paragraph to describe this option. Upgrade to scalafmt 0.6.6 Remove deperecated binPack.callSite scalafmt setting Format with updated scalafmt and fixed settings S3 - add documentation akka#103 Fix alphabetical ordering in the docs Separate out release docs S3 path style access akka#64 Make SqsSourceTest less likely to fail - Reduce amount of sent messages to 1 (multiple batch streaming is tested in the SqsSourceSpec) - Increase timeout Introduced "secure" boolean property for S3 which controls whether HTTPS is used akka#247 README: add scaladex, travis badges And make docs links less scary to click on :) Add CSV data transformation module (akka#213) * Alpakka Issue akka#66: CSV component * Alpakka Issue akka#66: revised CSV parser * Alpakka Issue akka#60: CSV parsing stage * wait for line end before issuing line As the byte string may not contain a whole line the parser needs to read until a line end is reached. * Add Java API and JUnit test; add a bit of documentation * Introduce CsvToMap stage; more documentation * Parse line even without line end at upstream finish * Add Java API for CsvToMap; more documentation * More restricted API, incorporated comments by @johanandren * Format sequence as CSV in ByteString * Add Scala CSV formatting stage * Add Java API for CSV formatting; more docs * Separate enums for Java and Scala DSLs * Use Flow.fromGraph to construct flow * Rename CsvFraming to CsvParsing * Check for Byte Order Mark and ignore it for UTF-8 * Emit Byte Order Marks in formatting; CsvFormatting is just a map * Byte Order Mark for Java API * Add line number to error messages; sample files exported from third party software * Use Charset directly instead of name * csv: autoformatted files * simplified dependency declaration Fixes akka#60. SQS flows + Embedded ElasticMQ akka#255 * Add a flow stage and use ElasticMQ * Use flow-based stage for ACKs * Use AmazonSQSAsync instead of AmazonSQSAsyncClient * Using embedded ElasticMQ for tests add SNS connector with publish sink akka#204 Await futures before performing assertion (fixes akka#235) When an assertion fails after the test has already succeeded it will be ignored, so Await the future before continuing with the check. Document 'docker-compose' for running tests Fail MqttSourceStage mat. value on connection loss And increase the timeout. Might help with akka#189, or otherwise help generate a better error message when it does happen again. Ref akka#2 add IronMq integration Refs akka#2 add at-least-one semantic to IronMq connector Improve documentation and test coverage for IronMq ref akka#2 - Document the IronMq domain classes - Document IronMq client - Test the at-least-once producer/consumer mechanism - Improve the IronMQ connector documentation Ref akka#2 Preserve newline in reference.conf Ref akka#2 Make seure the actor system is fully terminated after each test Ref akka#2 Reformat code Refs akka#2 define a different Committable and CommittableMessage for Java and Scala DSL Refs akka#2 Fix typos in IronMQ documentations Refs akka#2 Remove non needed Environment variables from TravisCI config file Refs akka#2 Add a simple Java test and refactor the Java DSL to looks better in Java FTP: Attempt to fix flaky test on Travis Link to scaladex (akka#266) s3: support for encryption, storage class, custom headers akka#109 s3: Added support for partial file download from S3 akka#264 (akka#265) Add version info and links in index page (akka#273) FTP - append mode for toPath sink + improved upstream failure handling akka#207 Fix broken recovery of EventSource (sse) Replace scala.binaryVersion with scalaBinaryVersion (see akka#278) Fix minor typo in alpakka MQTT Connector doc Add Flow to support RabbitMQ RPC workflow akka#160 Changes Amqp sinks to materialize to Future[Done]. As currently it was very difficult to determine when/if a sink failed due to a amqp error. AMQP: add more options to configuration of the ConnectionFactory, akka#191 Directory sources akka#272 sse: Upgrade to Akka SSE 3 and make test more robust CSV: Fixes ignored second double quote S3: add listBucket method to S3 library (akka#253) * Added recursive listBucket call to get all keys under a specific prefix. * Properly using the request URI method and constructing queries with the Query type * Added tests around query parsing * Fixed formatting and removed recoverWithRetries on listbucket calls as they are already retried on the underlying layer with max-retries * Using signAndGetAs instead of signAndGet as to not duplicate logic. * Implemented quick fixes based on comments. Removed recursive call to get keys and used unfoldAsync to get all keys to run in constant memory. * Added execution context. Fixed broken test * Fixed formatting error. * Cleaned up lisBucket call by added a ListBucketState object instead of the brutal type signature from earlier. * Moved trait for listBucket into the def itself as to remove it from the public namespace. azure-storage-queue connector akka#280 Add attribute parameters to sqs source settings akka#302 Formatting fix for akka#302 Streaming XML parser and utilities. Prepare XML parser to join Alpakka family Remove duplicated region argument in client methods akka#297 Build with Akka 2.5 as well Add Azure Storage Queue documentation to TOC Stub documentation for S3.listBucket S3: fix formatting Run the deployment only against Akka 2.4 PubSub: Add support for emulator host variables Initial commit for apache geode connector CSV: Emit all lines on completion akka#315 XML: make code in tests more consistent Add whitesource plugin Merge branch 'master' into add-kairosdb-connector Add copyright header update docker-compose Make execution context optional in java api Make execution context optional in scala api remove ec from sink spec
Adds the ability to get a source of all keys under a given prefix.