diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1bfc2b1 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +# Changelog + +This file summarizes the main changes for each release. + +### Version 0.11.0 + + - Remove dependency to the unmaintained `dwhjames/aws-wrap` by including the necessary code. + - Split the project in three : `commons-aws-cloudwatch`, `commons-aws-s3` and `commons-aws-sqs` + - diff --git a/README.md b/README.md index 0ef75e0..2b78d37 100644 --- a/README.md +++ b/README.md @@ -1,33 +1,33 @@ # Streaming / asynchronous Scala client for common AWS services -Streaming / asynchronous Scala client for common AWS services on top of [dwhjames/aws-wrap](https://github.com/dwhjames/aws-wrap) -. When possible, clients expose methods that return Akka Stream's Sources / Flows / Sinks to provide streaming facilities. +Streaming / asynchronous Scala client for common AWS services. +When possible, clients expose methods that return Akka Stream's Sources / Flows / Sinks to provide streaming facilities. Clients use a pool of threads managed internally and optimized for blocking IO operations. -This library makes heavy use of our extension library for Akka Stream +This library makes heavy use of our extension library for Akka Stream [MfgLabs/akka-stream-extensions](https://github.com/MfgLabs/akka-stream-extensions). ## Resolver ```scala resolvers ++= Seq( - Resolver.bintrayRepo("mfglabs", "maven"), - Resolver.bintrayRepo("dwhjames", "maven") + Resolver.bintrayRepo("mfglabs", "maven") ) ``` ## Dependencies ```scala -libraryDependencies += "com.mfglabs" %% "commons-aws" % "0.10.0" +libraryDependencies += "com.mfglabs" %% "commons-aws" % "0.11.0" ``` +Changelog [here](CHANGELOG.md) + ## Usage > Scaladoc is available [there](http://mfglabs.github.io/commons-aws/api/current/) - ### Commons #### S3 @@ -35,7 +35,7 @@ libraryDependencies += "com.mfglabs" %% "commons-aws" % "0.10.0" ```scala import com.mfglabs.commons.aws.s3._ -val builder = S3StreamBuilder(new AmazonS3AsyncClient()) // contains un-materialized composable Source / Flow / Sink +val builder = S3StreamBuilder(AmazonS3AsyncClient()()) // contains un-materialized composable Source / Flow / Sink val fileStream: Source[ByteString, Unit] = builder.getFileAsStream(bucket, key) @@ -47,9 +47,9 @@ someBinaryStream.via( someBinaryStream.via( builder.uploadStreamAsMultipartFile( - bucket, - prefix, - nbChunkPerFile = 10000, + bucket, + prefix, + nbChunkPerFile = 10000, chunkUploadConcurrency = 2 ) ) @@ -71,11 +71,10 @@ There are also smart `AmazonS3Client` constructors that can be provided with cus #### SQS ```scala -import com.amazonaws.services.sqs.AmazonSQSAsyncClient -import com.pellucid.wrap.sqs.AmazonSQSScalaClient +import com.amazonaws.services.sqs.AmazonSQSAsyncClient import com.mfglabs.commons.aws.sqs._ -val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), ec) +val sqs = new AmazonSQSClient(new AmazonSQSAsyncClient(), ec) val builder = SQSStreamBuilder(sqs) val sender: Flow[String, SendMessageResult, Unit] = @@ -87,7 +86,7 @@ val sender: Flow[String, SendMessageResult, Unit] = } .via(builder.sendMessageAsStream()) -val receiver: Source[Message, Unit] = +val receiver: Source[Message, Unit] = builder.receiveMessageAsStream(queueUrl, autoAck = false) ``` @@ -100,7 +99,7 @@ import com.mfglabs.commons.aws.cloudwatch import cloudwatch._ // brings implicit extensions // Create the client -val CW = new cloudwatch.AmazonCloudwatchClient() +val CW = cloudwatch.AmazonCloudwatchClient()() // Use it for { @@ -114,73 +113,6 @@ and managed by [[AmazonCloudwatchClient]] itself. There are also smart `AmazonCloudwatchClient` constructors that can be provided with custom. `java.util.concurrent.ExecutorService` if you want to manage your pools of threads. -### Extensions - -#### Cloudwatch heartbeat - -It provides a simple mechanism that sends periodically a heartbeat metric to AWS Cloudwatch. - -If the heartbeat rate on a _configurable_ period falls under a _configurable_ threshold or the metrics isn't fed with sufficient data, a Cloudwatch `ALARM` status is triggered & sent to a given SQS endpoint. - -When the rate goes above threshold again, an `OK` status is triggered & sent to the same SQS endpoint. - -> **IMPORTANT**: the alarm is created by the API itself but due to a limitation (or a bug) in Amazon API, the status of this alarm will stay at `INSUFFICIENT_DATA` until you manually update it in the AWS console. -For that, wait 1/2 minutes after start so that Cloudwatch receives enough heartbeats and then select the alarm, click on `modify` and then click on `save`. The alarm should pass to `OK` status. - -_Cloudwatch heartbeat is based on Cloudwatch service & Akka scheduler._ - -##### Low-level client - -```scala -import com.mfglabs.commons.aws.cloudwatch -import com.mfglabs.commons.aws.extensions.cloudwatch.CloudwatchAkkaHeartbeat - -import myExecutionCtx // an implicit custom execution context - -val hb = new CloudwatchAkkaHeartbeat( - namespace = "Test/Heartbeat", // the namespace of the cloudwatch metrics - name = "test1", // the name of the cloudwatch - beatPeriod = 2.second, // the heart beat period in Scala.concurrent.duration.Duration string format - alarmPeriod = 120.seconds, // the period on which the metrics is analyzed to determine the heartbeat health - alarmPeriodNb = 1, // the number of "bad health" periods after which the alarm is triggered - alarmThreshold = 10, // the threshold counting the number of heartbeats on a period under which the "bad health" is detected - system = system, // the Akka system to create scheduler - client = CW, // the cloudwatch client - actionEndpoint = "arn:aws:sns:eu-west-1:896733075612:Cloudwatch-HeartBeat-Test" // the actionEndpoint (SQS) to which Cloudwatch will send the alarm -) - -hb.start() // to start the heartbeat - -hb.stop() // to stop the heartbeat -``` - -> Please note that you need to provide an implicit `ExecutionContext` for `CloudwatchAkkaHeartbeat.start/stop` - -##### Cakable client - -`CloudwatchHeartbeatLayer` is ready to be used in a cake pattern - -```scala -object MyAkkaService extends CloudwatchHeartbeatLayer { - override val system = myAkkaSystem - - override val heartbeatClient = myCloudClient - - override val heartbeatName: String = ... - override val heartbeatNamespace: String = ... - - override val heartbeatPeriod: FiniteDuration = ... - override val heartbeatAlarmPeriod: FiniteDuration = ... - override val heartbeatAlarmPeriodNb: Int = ... - override val heartbeatAlarmThreshold: Int = ... - override val heartbeatEndpoint: String = ... - - ... - // start the heartbeat - heartbeat.start()(myExeCtx) -} -``` - ## License This software is licensed under the Apache 2 license, quoted below. diff --git a/build.sbt b/build.sbt index 7dff209..9b23d6e 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,8 @@ -import sbtunidoc.Plugin._ import bintray.Plugin._ - organization in ThisBuild := "com.mfglabs" -scalaVersion in ThisBuild := "2.11.7" +scalaVersion in ThisBuild := "2.11.11" version in ThisBuild := "0.11.0" @@ -68,12 +66,8 @@ lazy val all = (project in file(".")) .aggregate(s3) .aggregate(sqs) .settings(name := "commons-aws-all") - .settings(site.settings ++ ghpages.settings: _*) - .settings( - name := "commons-aws-all", - site.addMappingsToSiteDir(mappings in (ScalaUnidoc, packageDoc), "api/" + version), - git.remoteRepo := "git@github.com:MfgLabs/commons-aws.git" - ) + .enablePlugins(ScalaUnidocPlugin) + .settings(name := "commons-aws-all") .settings(noPublishSettings) lazy val commons = project.in(file("commons")) @@ -110,7 +104,7 @@ lazy val s3 = project.in(file("s3")) ), commonSettings, publishSettings - ) + ).dependsOn(commons) lazy val sqs = project.in(file("sqs")) .settings ( diff --git a/cloudwatch/src/main/scala/cloudwatch.scala b/cloudwatch/src/main/scala/cloudwatch.scala index 489ab93..fdae7d2 100644 --- a/cloudwatch/src/main/scala/cloudwatch.scala +++ b/cloudwatch/src/main/scala/cloudwatch.scala @@ -18,17 +18,36 @@ package com.mfglabs.commons.aws package cloudwatch -import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit} -import scala.concurrent.{Future, Promise} -import scala.util.Try +import java.util.concurrent.ExecutorService +import scala.concurrent.Future -import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} -import com.amazonaws.{AmazonWebServiceRequest, ClientConfiguration} -import com.amazonaws.internal.StaticCredentialsProvider +import com.amazonaws.auth.{AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} +import com.amazonaws.ClientConfiguration +import com.amazonaws.client.builder.ExecutorFactory -import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClient +import com.amazonaws.services.cloudwatch.{AmazonCloudWatchAsync, AmazonCloudWatchAsyncClient} import com.amazonaws.services.cloudwatch.model._ + +object AmazonCloudwatchClient { + import FutureHelper.defaultExecutorService + + def apply( + awsCredentialsProvider : AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain, + clientConfiguration : ClientConfiguration = new ClientConfiguration() + )( + executorService : ExecutorService = defaultExecutorService(clientConfiguration, "aws.wrap.cloudwatch") + ): AmazonCloudwatchClient = new AmazonCloudwatchClient( + AmazonCloudWatchAsyncClient + .asyncBuilder() + .withCredentials(awsCredentialsProvider) + .withClientConfiguration(clientConfiguration) + .withExecutorFactory(new ExecutorFactory { def newExecutor() = executorService }) + .build() + ) + +} + /** * A lightweight wrapper for [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/AmazonDynamoDBAsyncClient.html AmazonCloudWatchAsyncClient]]. * @@ -37,120 +56,21 @@ import com.amazonaws.services.cloudwatch.model._ * the underlying [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/AmazonDynamoDBAsyncClient.html AmazonCloudWatchAsyncClient]]. * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/AmazonDynamoDBAsyncClient.html AmazonCloudWatchAsyncClient]] */ -class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { +class AmazonCloudwatchClient(val client: AmazonCloudWatchAsync) { import FutureHelper._ /** - * make a client from a credentials provider, a config, and a default executor service. - * - * @param awsCredentialsProvider - * a provider of AWS credentials. - * @param clientConfiguration - * a client configuration. - */ - def this(awsCredentialsProvider: AWSCredentialsProvider, clientConfiguration: ClientConfiguration) = { - this(new AmazonCloudWatchAsyncClient( - awsCredentialsProvider, - clientConfiguration, - new ThreadPoolExecutor( - 0, clientConfiguration.getMaxConnections, - 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue[Runnable], - new AWSThreadFactory("aws.wrap.cloudwatch"))) - ) - } - - /** - * make a client from a credentials provider, a default config, and an executor service. - * - * @param awsCredentialsProvider - * a provider of AWS credentials. - * @param executorService - * an executor service for synchronous calls to the underlying AmazonS3Client. - */ - def this(awsCredentialsProvider: AWSCredentialsProvider, executorService: ExecutorService) = { - this(new AmazonCloudWatchAsyncClient( - awsCredentialsProvider, new ClientConfiguration(), executorService - )) - } - - /** - * make a client from a credentials provider, a default config, and a default executor service. - * - * @param awsCredentialsProvider - * a provider of AWS credentials. - */ - def this(awsCredentialsProvider: AWSCredentialsProvider) = { - this(awsCredentialsProvider, new ClientConfiguration()) - } - - /** - * make a client from credentials, a config, and an executor service. - * - * @param awsCredentials - * AWS credentials. - * @param clientConfiguration - * a client configuration. - * @param executorService - * an executor service for synchronous calls to the underlying AmazonS3Client. - */ - def this(awsCredentials: AWSCredentials, clientConfiguration: ClientConfiguration, executorService: ExecutorService) = { - this(new AmazonCloudWatchAsyncClient( - new StaticCredentialsProvider(awsCredentials), clientConfiguration, executorService - )) - } - - /** - * make a client from credentials, a default config, and an executor service. - * - * @param awsCredentials - * AWS credentials. - * @param executorService - * an executor service for synchronous calls to the underlying AmazonS3Client. - */ - def this(awsCredentials: AWSCredentials, executorService: ExecutorService) = { - this(awsCredentials, new ClientConfiguration(), executorService) - } - - /** - * make a client from credentials, a default config, and a default executor service. - * - * @param awsCredentials - * AWS credentials. - */ - def this(awsCredentials: AWSCredentials) = { - this(new StaticCredentialsProvider(awsCredentials)) - } - - /** - * make a client from a default credentials provider, a config, and a default executor service. - * - * @param clientConfiguration - * a client configuration. - */ - def this(clientConfiguration: ClientConfiguration) = { - this(new DefaultAWSCredentialsProviderChain(), clientConfiguration) - } - - /** - * make a client from a default credentials provider, a default config, and a default executor service. - */ - def this() = { - this(new DefaultAWSCredentialsProviderChain()) - } - -/** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#deleteAlarms(com.amazonaws.services.cloudwatch.model.DeleteAlarmsRequest) AWS Java SDK]] */ def deleteAlarms( deleteAlarmsRequest: DeleteAlarmsRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.deleteAlarmsAsync, deleteAlarmsRequest) + ): Future[DeleteAlarmsResult] = + wrapAsyncMethod(client.deleteAlarmsAsync, deleteAlarmsRequest) /** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#deleteAlarms(com.amazonaws.services.cloudwatch.model.DeleteAlarmsRequest) AWS Java SDK]] */ - def deleteAlarms(alarmNames: String*): Future[Unit] = + def deleteAlarms(alarmNames: String*): Future[DeleteAlarmsResult] = deleteAlarms(new DeleteAlarmsRequest().withAlarmNames(alarmNames: _*)) /** @@ -194,13 +114,13 @@ class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { */ def disableAlarmActions( disableAlarmActionsRequest: DisableAlarmActionsRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.disableAlarmActionsAsync, disableAlarmActionsRequest) + ): Future[DisableAlarmActionsResult] = + wrapAsyncMethod(client.disableAlarmActionsAsync, disableAlarmActionsRequest) /** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#disableAlarmActions(com.amazonaws.services.cloudwatch.model.DisableAlarmActionsRequest) AWS Java SDK]] */ - def disableAlarmActions(alarmNames: String*): Future[Unit] = + def disableAlarmActions(alarmNames: String*): Future[DisableAlarmActionsResult] = disableAlarmActions(new DisableAlarmActionsRequest().withAlarmNames(alarmNames: _*)) /** @@ -208,13 +128,13 @@ class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { */ def enableAlarmActions( enableAlarmActionsRequest: EnableAlarmActionsRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.enableAlarmActionsAsync, enableAlarmActionsRequest) + ): Future[EnableAlarmActionsResult] = + wrapAsyncMethod(client.enableAlarmActionsAsync, enableAlarmActionsRequest) /** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#enableAlarmActions(com.amazonaws.services.cloudwatch.model.EnableAlarmActionsRequest) AWS Java SDK]] */ - def enableAlarmActions(alarmNames: String*): Future[Unit] = + def enableAlarmActions(alarmNames: String*): Future[EnableAlarmActionsResult] = enableAlarmActions(new EnableAlarmActionsRequest().withAlarmNames(alarmNames: _*)) /** @@ -244,16 +164,16 @@ class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { */ def putMetricAlarm( putMetricAlarmRequest: PutMetricAlarmRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.putMetricAlarmAsync, putMetricAlarmRequest) + ): Future[PutMetricAlarmResult] = + wrapAsyncMethod(client.putMetricAlarmAsync, putMetricAlarmRequest) /** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#putMetricData(com.amazonaws.services.cloudwatch.model.PutMetricDataRequest) AWS Java SDK]] */ def putMetricData( putMetricDataRequest: PutMetricDataRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.putMetricDataAsync, putMetricDataRequest) + ): Future[PutMetricDataResult] = + wrapAsyncMethod(client.putMetricDataAsync, putMetricDataRequest) /** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#putMetricData(com.amazonaws.services.cloudwatch.model.PutMetricDataRequest) AWS Java SDK]] @@ -261,7 +181,7 @@ class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { def putMetricData( namespace: String, metricData: Iterable[MetricDatum] - ): Future[Unit] = { + ): Future[PutMetricDataResult] = { import scala.collection.JavaConversions.asJavaCollection putMetricData( @@ -276,8 +196,8 @@ class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { */ def setAlarmState( setAlarmStateRequest: SetAlarmStateRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.setAlarmStateAsync, setAlarmStateRequest) + ): Future[SetAlarmStateResult] = + wrapAsyncMethod(client.setAlarmStateAsync, setAlarmStateRequest) /** * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/cloudwatch/AmazonCloudWatch.html#setAlarmState(com.amazonaws.services.cloudwatch.model.SetAlarmStateRequest) AWS Java SDK]] @@ -287,7 +207,7 @@ class AmazonCloudwatchClient(val client: AmazonCloudWatchAsyncClient) { stateReason: String, stateValue: StateValue, stateReasonData: String = "" - ): Future[Unit] = + ): Future[SetAlarmStateResult] = setAlarmState( new SetAlarmStateRequest() .withAlarmName(alarmName) diff --git a/cloudwatch/src/test/scala/CloudwatchSpec.scala b/cloudwatch/src/test/scala/CloudwatchSpec.scala index 878b4e2..1b8f7bc 100644 --- a/cloudwatch/src/test/scala/CloudwatchSpec.scala +++ b/cloudwatch/src/test/scala/CloudwatchSpec.scala @@ -1,19 +1,14 @@ package com.mfglabs.commons.aws -import collection.mutable.Stack import org.scalatest._ import concurrent.ScalaFutures -import org.scalatest.time.{Minutes, Millis, Seconds, Span} -import scala.concurrent.Future +import org.scalatest.time.{Minutes, Millis, Span} class CloudwatchSpec extends FlatSpec with Matchers with ScalaFutures { - import scala.concurrent.ExecutionContext.Implicits.global - implicit override val patienceConfig = PatienceConfig(timeout = Span(2, Minutes), interval = Span(5, Millis)) - // val cred = new com.amazonaws.auth.BasicAWSCredentials("AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY") - val CW = new cloudwatch.AmazonCloudwatchClient() + val CW = cloudwatch.AmazonCloudwatchClient()() "Cloudwatch client" should "retrieve all metrics" in { whenReady(CW.listMetrics()) { s => s.getMetrics should not be 'empty } diff --git a/commons/src/main/scala/FutureHelper.scala b/commons/src/main/scala/FutureHelper.scala index 23773cf..715cfbe 100644 --- a/commons/src/main/scala/FutureHelper.scala +++ b/commons/src/main/scala/FutureHelper.scala @@ -1,10 +1,11 @@ package com.mfglabs.commons.aws import com.amazonaws.AmazonWebServiceRequest +import com.amazonaws.ClientConfiguration import com.amazonaws.handlers.AsyncHandler import scala.concurrent.{Future, Promise} -import java.util.concurrent.{Future => JFuture} +import java.util.concurrent.{Future => JFuture, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} package object FutureHelper { @@ -14,12 +15,6 @@ package object FutureHelper { override def onSuccess(request: Request, result: Result): Unit = { p.success(result); () } } - def promiseToVoidAsyncHandler[Request <: AmazonWebServiceRequest](p: Promise[Unit]) = - new AsyncHandler[Request, Void] { - override def onError(exception: Exception): Unit = { p.failure(exception); () } - override def onSuccess(request: Request, result: Void): Unit = { p.success(()); () } - } - @inline def wrapAsyncMethod[Request <: AmazonWebServiceRequest, Result]( f: (Request, AsyncHandler[Request, Result]) => JFuture[Result], @@ -30,13 +25,11 @@ package object FutureHelper { p.future } - @inline - def wrapVoidAsyncMethod[Request <: AmazonWebServiceRequest]( - f: (Request, AsyncHandler[Request, Void]) => JFuture[Void], - request: Request - ): Future[Unit] = { - val p = Promise[Unit] - f(request, promiseToVoidAsyncHandler(p)) - p.future - } + def defaultExecutorService(clientConfiguration: ClientConfiguration, factoryName: String) = new ThreadPoolExecutor( + 0, clientConfiguration.getMaxConnections, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue[Runnable], + new AWSThreadFactory(factoryName) + ) + } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5e326fb..69a56f2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,9 +3,9 @@ import sbt._ object Dependencies { object V { - val awsJavaSDK = "1.10.77" - val akkaStreamExt = "0.10.0" - val scalaTest = "2.2.1" + val awsJavaSDK = "1.11.132" + val akkaStreamExt = "0.11.1" + val scalaTest = "3.0.3" val slf4j = "1.7.12" } diff --git a/project/build.properties b/project/build.properties index a6e117b..64317fd 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.8 +sbt.version=0.13.15 diff --git a/project/plugins.sbt b/project/plugins.sbt index f152391..ed978bc 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,3 @@ -addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1") +addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.0") -addSbtPlugin("com.frugalmechanic" % "fm-sbt-s3-resolver" % "0.4.0") - -addSbtPlugin("me.lessis" % "bintray-sbt" % "0.2.1") \ No newline at end of file +addSbtPlugin("me.lessis" % "bintray-sbt" % "0.2.1") diff --git a/project/site.sbt b/project/site.sbt deleted file mode 100644 index 36d93d2..0000000 --- a/project/site.sbt +++ /dev/null @@ -1,5 +0,0 @@ -resolvers += "jgit-repo" at "http://download.eclipse.org/jgit/maven" - -addSbtPlugin("com.typesafe.sbt" % "sbt-site" % "0.8.1") - -addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.3") diff --git a/s3/src/main/scala/futureTransfer.scala b/s3/src/main/scala/futureTransfer.scala index bab260d..5afdbd4 100644 --- a/s3/src/main/scala/futureTransfer.scala +++ b/s3/src/main/scala/futureTransfer.scala @@ -38,7 +38,7 @@ import org.slf4j.{Logger, LoggerFactory} */ object FutureTransfer { - private val logger: Logger = LoggerFactory.getLogger("com.github.dwhjames.awswrap.s3.FutureTransfer") + private val logger: Logger = LoggerFactory.getLogger("com.mfglabs.commons.aws.s3.FutureTransfer") /** * Attach a listener to an S3 Transfer and return it as a Future. diff --git a/s3/src/main/scala/s3.scala b/s3/src/main/scala/s3.scala index 2010fae..6d0bd04 100644 --- a/s3/src/main/scala/s3.scala +++ b/s3/src/main/scala/s3.scala @@ -20,19 +20,29 @@ package s3 import java.io.{InputStream, File} import java.net.URL -import java.util.concurrent.{Executors, ExecutorService, LinkedBlockingQueue, ThreadFactory, ThreadPoolExecutor, TimeUnit, Future => JFuture} +import java.util.concurrent.ExecutorService import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Try -import com.amazonaws.{AmazonWebServiceRequest, ClientConfiguration} -import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} -import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.ClientConfiguration +import com.amazonaws.auth.{AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.s3._ import com.amazonaws.services.s3.model._ -import org.slf4j.{Logger, LoggerFactory} +object AmazonS3AsyncClient { + import FutureHelper.defaultExecutorService + + def apply( + awsCredentialsProvider : AWSCredentialsProvider = new DefaultAWSCredentialsProviderChain, + clientConfiguration : ClientConfiguration = new ClientConfiguration() + )( + executorService : ExecutorService = defaultExecutorService(clientConfiguration, "aws.wrap.s3") + ): AmazonS3AsyncClient = { + new AmazonS3AsyncClient(awsCredentialsProvider, clientConfiguration, executorService) + } +} /** * A lightweight wrapper for [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Client.html AmazonS3Client]] @@ -69,112 +79,14 @@ class AmazonS3AsyncClient( * * @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Client.html AmazonS3Client]] */ - val client = new AmazonS3Client(awsCredentialsProvider, clientConfiguration) - - /** - * make a client from a credentials provider, a config, and a default executor service. - * - * @param awsCredentialsProvider - * a provider of AWS credentials. - * @param clientConfiguration - * a client configuration. - */ - def this(awsCredentialsProvider: AWSCredentialsProvider, clientConfiguration: ClientConfiguration) = { - this(awsCredentialsProvider, clientConfiguration, - new ThreadPoolExecutor( - 0, clientConfiguration.getMaxConnections, - 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue[Runnable], - new AWSThreadFactory("aws.wrap.s3") - ) - ) - } - - /** - * make a client from a credentials provider, a default config, and an executor service. - * - * @param awsCredentialsProvider - * a provider of AWS credentials. - * @param executorService - * an executor service for synchronous calls to the underlying AmazonS3Client. - */ - def this(awsCredentialsProvider: AWSCredentialsProvider, executorService: ExecutorService) { - this(awsCredentialsProvider, new ClientConfiguration(), executorService) - } - - /** - * make a client from a credentials provider, a default config, and a default executor service. - * - * @param awsCredentialsProvider - * a provider of AWS credentials. - */ - def this(awsCredentialsProvider: AWSCredentialsProvider) { - this(awsCredentialsProvider, new ClientConfiguration()) + val client = { + AmazonS3ClientBuilder + .standard() + .withCredentials(awsCredentialsProvider) + .withClientConfiguration(clientConfiguration) + .build() } - /** - * make a client from credentials, a config, and an executor service. - * - * @param awsCredentials - * AWS credentials. - * @param clientConfiguration - * a client configuration. - * @param executorService - * an executor service for synchronous calls to the underlying AmazonS3Client. - */ - def this(awsCredentials: AWSCredentials, clientConfiguration: ClientConfiguration, executorService: ExecutorService) { - this(new StaticCredentialsProvider(awsCredentials), clientConfiguration, executorService) - } - - /** - * make a client from credentials, a default config, and an executor service. - * - * @param awsCredentials - * AWS credentials. - * @param clientConfiguration - * a client configuration. - * @param executorService - * an executor service for synchronous calls to the underlying AmazonS3Client. - */ - def this(awsCredentials: AWSCredentials, executorService: ExecutorService) { - this(awsCredentials, new ClientConfiguration(), executorService) - } - - /** - * make a client from credentials, a default config, and a default executor service. - * - * @param awsCredentials - * AWS credentials. - */ - def this(awsCredentials: AWSCredentials) { - this(new StaticCredentialsProvider(awsCredentials)) - } - - /** - * make a client from a default credentials provider, a config, and a default executor service. - * - * @param clientConfiguration - * a client configuration. - */ - def this(clientConfiguration: ClientConfiguration) { - this(new DefaultAWSCredentialsProviderChain(), clientConfiguration) - } - - /** - * make a client from a default credentials provider, a default config, and a default executor service. - */ - def this() { - this(new DefaultAWSCredentialsProviderChain()) - } - - /** - * Return the underlying executor service, through which all client - * API calls are made. - * - * @return the underlying executor service - */ - def getExecutorsService(): ExecutorService = executorService - /** * Shutdown the executor service. * @@ -186,7 +98,6 @@ class AmazonS3AsyncClient( () } - @inline def wrapMethod[Request, Result]( f: Request => Result, diff --git a/s3/src/main/scala/s3Stream.scala b/s3/src/main/scala/s3Stream.scala index 1de1190..c212bb2 100644 --- a/s3/src/main/scala/s3Stream.scala +++ b/s3/src/main/scala/s3Stream.scala @@ -1,28 +1,30 @@ package com.mfglabs.commons.aws package s3 -import java.io.{File, ByteArrayInputStream, InputStream} +import java.io.{ByteArrayInputStream, InputStream} import java.util.Date import java.util.zip.GZIPInputStream -import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString import com.amazonaws.services.s3.model._ -import com.mfglabs.stream.{ExecutionContextForBlockingOps, SinkExt, FlowExt, SourceExt} +import com.mfglabs.stream.{ExecutionContextForBlockingOps, FlowExt, SourceExt} -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future trait S3StreamBuilder { + import scala.collection.immutable.Seq + val client: AmazonS3AsyncClient import client.ec implicit lazy val ecForBlockingOps = ExecutionContextForBlockingOps(client.ec) // Ops class contains materialized methods (returning Futures) - class MaterializedOps(flowMaterializer: ActorMaterializer) - extends AmazonS3AsyncClient(client.awsCredentialsProvider, client.clientConfiguration, client.executorService) { + class MaterializedOps(flowMaterializer: ActorMaterializer) extends AmazonS3AsyncClient( + client.awsCredentialsProvider, client.clientConfiguration, client.executorService + ) { implicit val fm = flowMaterializer @@ -60,20 +62,14 @@ trait S3StreamBuilder { case None => client.listObjects(bucket) } - SourceExt.seededLazyAsync(getFirstListing) { firstListing => - SourceExt.unfoldPullerAsync(firstListing) { listing => - val files = listing.getObjectSummaries.to[scala.collection.immutable.Seq] - if (listing.isTruncated) - client.listNextBatchOfObjects(listing).map { nextListing => - (Option(files), Option(nextListing)) - } - else Future.successful(Option(files) -> None) - } - } - .mapConcat(identity) - .map { file => - (file.getKey, file.getLastModified) - } + def unfold(listing: ObjectListing) = Some(Some(listing) -> listing.getObjectSummaries.to[Seq]) + + Source.unfoldAsync[Option[ObjectListing], Seq[S3ObjectSummary]](None) { + case None => getFirstListing.map(unfold) + case Some(oldListing) if oldListing.isTruncated => client.listNextBatchOfObjects(oldListing).map(unfold) + case Some(oldListing) => Future.successful(None) + }.mapConcat(identity) + .map { file => file.getKey -> file.getLastModified } } /** Stream a S3 file. diff --git a/s3/src/test/scala/S3Spec.scala b/s3/src/test/scala/S3Spec.scala index fa14acc..2767706 100644 --- a/s3/src/test/scala/S3Spec.scala +++ b/s3/src/test/scala/S3Spec.scala @@ -5,16 +5,16 @@ import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.util.ByteString -import com.amazonaws.services.s3.model.{AmazonS3Exception, CompleteMultipartUploadResult, DeleteObjectsRequest} +import com.amazonaws.services.s3.model.{AmazonS3Exception, CompleteMultipartUploadResult} import com.mfglabs.stream._ import org.scalatest._ import concurrent.ScalaFutures -import org.scalatest.time.{Minutes, Millis, Seconds, Span} +import org.scalatest.time.{Minutes, Millis, Span} import scala.concurrent._ import scala.concurrent.duration._ -class S3Spec extends FlatSpec with Matchers with ScalaFutures { +class S3Spec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll { import s3._ import scala.concurrent.ExecutionContext.Implicits.global @@ -28,24 +28,22 @@ class S3Spec extends FlatSpec with Matchers with ScalaFutures { implicit val system = ActorSystem() implicit val fm = ActorMaterializer() - val streamBuilder = S3StreamBuilder(new s3.AmazonS3AsyncClient()) - val ops = new streamBuilder.MaterializedOps(fm) + val s3Client = s3.AmazonS3AsyncClient( + new com.amazonaws.auth.profile.ProfileCredentialsProvider("mfg") + )() - import streamBuilder.ecForBlockingOps + val streamBuilder = S3StreamBuilder(s3Client) + val ops = new streamBuilder.MaterializedOps(fm) it should "upload/list/delete small files" in { - whenReady( - for { - _ <- ops.deleteObjects(bucket, s"$keyPrefix") - _ <- ops.putObject(bucket, s"$keyPrefix/small.txt", new java.io.File(getClass.getResource("/small.txt").getPath)) - l <- ops.listFiles(bucket, Some(keyPrefix)) - _ <- ops.deleteObjects(bucket, s"$keyPrefix/small.txt") - l2 <- ops.listFiles(bucket, Some(keyPrefix)) - } yield (l, l2) - ) { case (l, l2) => - (l map (_._1)) should equal(List(s"$keyPrefix/small.txt")) - l2 should be('empty) - } + ops.deleteObjects(bucket, s"$keyPrefix").futureValue + ops.putObject(bucket, s"$keyPrefix/small.txt", new java.io.File(getClass.getResource("/small.txt").getPath)).futureValue + val l = ops.listFiles(bucket, Some(keyPrefix)).futureValue + ops.deleteObjects(bucket, s"$keyPrefix/small.txt").futureValue + val l2 = ops.listFiles(bucket, Some(keyPrefix)).futureValue + + (l map (_._1)) should equal(List(s"$keyPrefix/small.txt")) + l2 should be('empty) } it should "throw an exception when a file is non-existent" in { @@ -56,13 +54,13 @@ class S3Spec extends FlatSpec with Matchers with ScalaFutures { } it should "upload and download a big file as a single file" in { - val futBytes = FileIO.fromFile(new java.io.File(getClass.getResource("/big.txt").getPath)) + val futBytes = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("/big.txt")) .via(streamBuilder.uploadStreamAsFile(bucket, s"$keyPrefix/big", chunkUploadConcurrency = 2)) .flatMapConcat(_ => streamBuilder.getFileAsStream(bucket, s"$keyPrefix/big")) .runFold(ByteString.empty)(_ ++ _) .map(_.compact) - val expectedBytes = FileIO.fromFile(new java.io.File(getClass.getResource("/big.txt").getPath)) + val expectedBytes = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("/big.txt")) .runFold(ByteString.empty)(_ ++ _).map(_.compact) whenReady(futBytes zip expectedBytes) { case (bytes, expectedBytes) => @@ -75,10 +73,10 @@ class S3Spec extends FlatSpec with Matchers with ScalaFutures { .via(FlowExt.rechunkByteStringBySize(2 * 1024 * 1024)) .via(FlowExt.rechunkByteStringBySeparator(ByteString("\n"), 8 * 1024)) .map(_.utf8String) - .runWith(SinkExt.collect) + .runWith(Sink.seq) - val futExpectedLines = - FileIO.fromFile(new java.io.File(getClass.getResource("/big.txt").getPath)) + val futExpectedLines = StreamConverters + .fromInputStream(() => getClass.getResourceAsStream("/big.txt")) .runFold(ByteString.empty)(_ ++ _) .map(_.compact.utf8String) .map(_.split("\n").to[scala.collection.immutable.Seq]) @@ -90,20 +88,24 @@ class S3Spec extends FlatSpec with Matchers with ScalaFutures { it should "upload and download a big file as a multipart file" in { - val futBytes = FileIO - .fromFile(new java.io.File(getClass.getResource("/big.txt").getPath), chunkSize = 2 * 1024 * 1024) + val bytes = StreamConverters + .fromInputStream(() => getClass.getResourceAsStream("/big.txt"), chunkSize = 2 * 1024 * 1024) .via(streamBuilder.uploadStreamAsMultipartFile(bucket, s"$keyPrefix/big", nbChunkPerFile = 1, chunkUploadConcurrency = 2)) .via(FlowExt.fold[CompleteMultipartUploadResult, Vector[CompleteMultipartUploadResult]](Vector.empty)(_ :+ _)) .flatMapConcat(_ => streamBuilder.getMultipartFileAsStream(bucket, s"$keyPrefix/big.part")) .runFold(ByteString.empty)(_ ++ _) - .map(_.compact) + .map(_.compact).futureValue - val futExpectedBytes = FileIO.fromFile(new java.io.File(getClass.getResource("/big.txt").getPath)) - .runFold(ByteString.empty)(_ ++ _).map(_.compact) + val expectedBytes = StreamConverters + .fromInputStream(() => getClass.getResourceAsStream("/big.txt")) + .runFold(ByteString.empty)(_ ++ _).map(_.compact).futureValue - whenReady(futBytes zip futExpectedBytes) { case (bytes, expectedBytes) => - bytes shouldEqual expectedBytes - } + bytes shouldEqual expectedBytes } + + override def afterAll() = { + s3Client.shutdown() + val _ = system.terminate().futureValue + } } diff --git a/scaladoc.sbt b/scaladoc.sbt index 91ff75d..6cf730c 100644 --- a/scaladoc.sbt +++ b/scaladoc.sbt @@ -1,10 +1,5 @@ -import sbtunidoc.Plugin._, UnidocKeys._ import scala.util.matching.Regex.Match - -// substitue unidoc as the way to generate documentation -unidocSettings - packageDoc in Compile <<= packageDoc in ScalaUnidoc artifact in (ScalaUnidoc, packageDoc) := { @@ -12,64 +7,12 @@ artifact in (ScalaUnidoc, packageDoc) := { previous.copy(classifier = Some("javadoc")) } -scalacOptions in (Compile, doc) ++= - Seq( - "-implicits", - "-sourcepath", baseDirectory.value.getAbsolutePath, - "-doc-source-url", s"https://github.com/MfgLabs/commons-aws/tree/v${version.value}€{FILE_PATH}.scala") +scalacOptions in (Compile, doc) ++= Seq( + "-implicits", + "-sourcepath", baseDirectory.value.getAbsolutePath, + "-doc-source-url", s"https://github.com/MfgLabs/commons-aws/tree/${version.value}€{FILE_PATH}.scala" +) autoAPIMappings := true apiURL := Some(url("https://MfgLabs.github.io/commons-aws/api/current/")) - -apiMappings ++= { - val jarFiles = (dependencyClasspath in Compile).value.files - val jarMap = jarFiles.find(file => file.toString.contains("com.amazonaws/aws-java-sdk")) match { - case None => Map() - case Some(awsJarFile) => Map(awsJarFile -> url("http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/")) - } - def findManagedDependency(organization: String, name: String): Option[File] = { - (for { - entry <- (fullClasspath in Runtime).value ++ (fullClasspath in Test).value - module <- entry.get(moduleID.key) if module.organization == organization && module.name.startsWith(name) - } yield entry.data).headOption - } - val links = Seq( - findManagedDependency("org.scala-lang", "scala-library").map(d => d -> url(s"http://www.scala-lang.org/api/${scalaVersion.value}/")) - ) - val otherLinksMap = links.collect { case Some(d) => d }.toMap - jarMap ++ Map( - file("/Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre/lib/rt.jar") -> - url("http://docs.oracle.com/javase/7/docs/api") - ) ++ otherLinksMap -} - -lazy val transformJavaDocLinksTask = taskKey[Unit]( - "Transform JavaDoc links - replace #java.io.File with ?java/io/File.html" -) - -transformJavaDocLinksTask := { - val log = streams.value.log - log.info("Transforming JavaDoc links") - val t = (target in unidoc).value - (t ** "*.html").get.filter(hasAwsJavadocApiLink).foreach { f => - log.info("Transforming " + f) - val newContent = awsJavadocApiLink.replaceAllIn(IO.read(f), m => - "href=\"" + m.group(1) + "?" + m.group(2).replace(".", "/") + ".html") - IO.write(f, newContent) - } - (t ** "*.html").get.filter(hasJavadocApiLink).foreach { f => - log.info("Transforming " + f) - val newContent = javadocApiLink.replaceAllIn(IO.read(f), m => - "href=\"" + m.group(1) + "?" + m.group(2).replace(".", "/") + ".html") - IO.write(f, newContent) - } -} - -val awsJavadocApiLink = """href=\"(http://docs\.aws\.amazon\.com/AWSJavaSDK/latest/javadoc/index\.html)#([^"]*)""".r -def hasAwsJavadocApiLink(f: File): Boolean = (awsJavadocApiLink findFirstIn IO.read(f)).nonEmpty - -val javadocApiLink = """href=\"(http://docs\.oracle\.com/javase/7/docs/api/index\.html)#([^"]*)""".r -def hasJavadocApiLink(f: File): Boolean = (javadocApiLink findFirstIn IO.read(f)).nonEmpty - -transformJavaDocLinksTask <<= transformJavaDocLinksTask triggeredBy (unidoc in Compile) diff --git a/sqs/src/main/scala/sqs.scala b/sqs/src/main/scala/sqs.scala index dc1ac29..12b2c51 100644 --- a/sqs/src/main/scala/sqs.scala +++ b/sqs/src/main/scala/sqs.scala @@ -21,39 +21,34 @@ package sqs import scala.concurrent.{Future, ExecutionContext} import scala.collection.JavaConverters._ -import java.util.concurrent.ExecutorService - -import com.amazonaws.services.sqs.AmazonSQSAsyncClient +import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder} import com.amazonaws.services.sqs.model._ class AmazonSQSClient( - val client: AmazonSQSAsyncClient, + val client: AmazonSQSAsync, implicit val execCtx: ExecutionContext ) { import FutureHelper._ /** * make a client from an ExecutionContext - * * @param ExecutionContext - * an ExecutionContext * @param clientConfiguration - * a client configuration. */ def this(execCtx: ExecutionContext) = { - this(new AmazonSQSAsyncClient(), execCtx) + this(AmazonSQSAsyncClientBuilder.defaultClient(), execCtx) } def addPermission( addPermissionRequest: AddPermissionRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.addPermissionAsync, addPermissionRequest) + ): Future[AddPermissionResult] = + wrapAsyncMethod(client.addPermissionAsync, addPermissionRequest) def addPermission( queueUrl: String, label: String, accountActions: Map[String, String] - ): Future[Unit] = { + ): Future[AddPermissionResult] = { val (accounts, actions) = accountActions.unzip addPermission( new AddPermissionRequest( @@ -67,14 +62,14 @@ class AmazonSQSClient( def changeMessageVisibility( changeMessageVisibilityRequest: ChangeMessageVisibilityRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.changeMessageVisibilityAsync, changeMessageVisibilityRequest) + ): Future[ChangeMessageVisibilityResult] = + wrapAsyncMethod(client.changeMessageVisibilityAsync, changeMessageVisibilityRequest) def changeMessageVisibility( queueUrl: String, receiptHandle: String, visibilityTimeout: Int - ): Future[Unit] = + ): Future[ChangeMessageVisibilityResult] = changeMessageVisibility(new ChangeMessageVisibilityRequest(queueUrl, receiptHandle, visibilityTimeout)) def changeMessageVisibilityBatch( @@ -116,13 +111,13 @@ class AmazonSQSClient( def deleteMessage( deleteMessageRequest: DeleteMessageRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.deleteMessageAsync, deleteMessageRequest) + ): Future[DeleteMessageResult] = + wrapAsyncMethod(client.deleteMessageAsync, deleteMessageRequest) def deleteMessage( queueUrl: String, receiptHandle: String - ): Future[Unit] = + ): Future[DeleteMessageResult] = deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle)) def deleteMessageBatch( @@ -145,17 +140,14 @@ class AmazonSQSClient( def deleteQueue( deleteQueueRequest: DeleteQueueRequest - ): Future[Unit] = - wrapVoidAsyncMethod[DeleteQueueRequest](client.deleteQueueAsync, deleteQueueRequest) + ): Future[DeleteQueueResult] = + wrapAsyncMethod[DeleteQueueRequest, DeleteQueueResult](client.deleteQueueAsync, deleteQueueRequest) def deleteQueue( queueUrl: String - ): Future[Unit] = + ): Future[DeleteQueueResult] = deleteQueue(new DeleteQueueRequest(queueUrl)) - def getExecutorService(): ExecutorService = - client.getExecutorService() - def getQueueAttributes( getQueueAttributesRequest: GetQueueAttributesRequest ): Future[GetQueueAttributesResult] = @@ -211,13 +203,13 @@ class AmazonSQSClient( def removePermission( removePermissionRequest: RemovePermissionRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.removePermissionAsync, removePermissionRequest) + ): Future[RemovePermissionResult] = + wrapAsyncMethod(client.removePermissionAsync, removePermissionRequest) def removePermission( queueUrl: String, label: String - ): Future[Unit] = + ): Future[RemovePermissionResult] = removePermission(new RemovePermissionRequest(queueUrl, label)) def sendMessage( @@ -251,13 +243,13 @@ class AmazonSQSClient( def setQueueAttributes( setQueueAttributesRequest: SetQueueAttributesRequest - ): Future[Unit] = - wrapVoidAsyncMethod(client.setQueueAttributesAsync, setQueueAttributesRequest) + ): Future[SetQueueAttributesResult] = + wrapAsyncMethod(client.setQueueAttributesAsync, setQueueAttributesRequest) def setQueueAttributes( queueUrl: String, attributes: Map[String, String] - ): Future[Unit] = + ): Future[SetQueueAttributesResult] = setQueueAttributes(new SetQueueAttributesRequest(queueUrl, attributes.asJava)) def shutdown(): Unit = diff --git a/sqs/src/test/scala/SQSSpec.scala b/sqs/src/test/scala/SQSSpec.scala index d46ea91..46c296d 100644 --- a/sqs/src/test/scala/SQSSpec.scala +++ b/sqs/src/test/scala/SQSSpec.scala @@ -4,10 +4,9 @@ package sqs import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ -import com.amazonaws.regions.{Region, Regions} -import com.amazonaws.services.sqs.AmazonSQSAsyncClient +import com.amazonaws.regions.Regions +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder import com.amazonaws.services.sqs.model.{CreateQueueRequest, MessageAttributeValue, SendMessageRequest} -import com.mfglabs.stream.SinkExt import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} import org.scalatest.{FlatSpec, Matchers} @@ -25,7 +24,9 @@ class SQSSpec extends FlatSpec with Matchers with ScalaFutures { implicit val as = ActorSystem() implicit val fm = ActorMaterializer() - val sqs = new AmazonSQSClient( scala.concurrent.ExecutionContext.Implicits.global) + + val client = AmazonSQSAsyncClientBuilder.standard().withRegion(Regions.EU_WEST_1).build() + val sqs = new AmazonSQSClient(client, scala.concurrent.ExecutionContext.Implicits.global) val builder = SQSStreamBuilder(sqs) val testQueueName = "commons-aws-sqs-test-" + Random.nextInt() @@ -33,10 +34,11 @@ class SQSSpec extends FlatSpec with Matchers with ScalaFutures { val messageAttributeKey = "attribute_key" - "SQS client" should "send message and receive them as streams" in { - sqs.client.setRegion(Region.getRegion(Regions.EU_WEST_1)) - + "SQS client" should "delete all test queue if any" in { sqs.listQueues("commons-aws-sqs-test-").futureValue.headOption.foreach(queueUrl => sqs.deleteQueue(queueUrl).futureValue) + } + + it should "send message and receive them as streams" in { val newQueueReq = new CreateQueueRequest() newQueueReq.setAttributes(Map("VisibilityTimeout" -> 10.toString)) // 10 seconds newQueueReq.setQueueName(testQueueName) @@ -73,10 +75,7 @@ class SQSSpec extends FlatSpec with Matchers with ScalaFutures { } - "SQS client" should "send message and receive them as streams with retry mechanism" in { - sqs.client.setRegion(Region.getRegion(Regions.EU_WEST_1)) - - sqs.listQueues("commons-aws-sqs-test-").futureValue.headOption.foreach(queueUrl => sqs.deleteQueue(queueUrl).futureValue) + it should "send message and receive them as streams with retry mechanism" in { val newQueueReq = new CreateQueueRequest() newQueueReq.setAttributes(Map("VisibilityTimeout" -> 10.toString)) // 10 seconds newQueueReq.setQueueName(testQueueName2)