Skip to content

Commit

Permalink
Version upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacques committed May 23, 2017
1 parent d8941f8 commit 0ff83ec
Show file tree
Hide file tree
Showing 17 changed files with 196 additions and 517 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Changelog

This file summarizes the main changes for each release.

### <a name="0.11.0"></a>Version 0.11.0

- Remove dependency to the unmaintained `dwhjames/aws-wrap` by including the necessary code.
- Split the project in three : `commons-aws-cloudwatch`, `commons-aws-s3` and `commons-aws-sqs`
-
98 changes: 15 additions & 83 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
# Streaming / asynchronous Scala client for common AWS services

Streaming / asynchronous Scala client for common AWS services on top of [dwhjames/aws-wrap](https://github.com/dwhjames/aws-wrap)
. When possible, clients expose methods that return Akka Stream's Sources / Flows / Sinks to provide streaming facilities.
Streaming / asynchronous Scala client for common AWS services.
When possible, clients expose methods that return Akka Stream's Sources / Flows / Sinks to provide streaming facilities.

Clients use a pool of threads managed internally and optimized for blocking IO operations.

This library makes heavy use of our extension library for Akka Stream
This library makes heavy use of our extension library for Akka Stream
[MfgLabs/akka-stream-extensions](https://github.com/MfgLabs/akka-stream-extensions).

## Resolver

```scala
resolvers ++= Seq(
Resolver.bintrayRepo("mfglabs", "maven"),
Resolver.bintrayRepo("dwhjames", "maven")
Resolver.bintrayRepo("mfglabs", "maven")
)
```

## Dependencies

```scala
libraryDependencies += "com.mfglabs" %% "commons-aws" % "0.10.0"
libraryDependencies += "com.mfglabs" %% "commons-aws" % "0.11.0"
```

Changelog [here](CHANGELOG.md)

## Usage

> Scaladoc is available [there](http://mfglabs.github.io/commons-aws/api/current/)

### Commons

#### S3

```scala
import com.mfglabs.commons.aws.s3._

val builder = S3StreamBuilder(new AmazonS3AsyncClient()) // contains un-materialized composable Source / Flow / Sink
val builder = S3StreamBuilder(AmazonS3AsyncClient()()) // contains un-materialized composable Source / Flow / Sink

val fileStream: Source[ByteString, Unit] = builder.getFileAsStream(bucket, key)

Expand All @@ -47,9 +47,9 @@ someBinaryStream.via(

someBinaryStream.via(
builder.uploadStreamAsMultipartFile(
bucket,
prefix,
nbChunkPerFile = 10000,
bucket,
prefix,
nbChunkPerFile = 10000,
chunkUploadConcurrency = 2
)
)
Expand All @@ -71,11 +71,10 @@ There are also smart `AmazonS3Client` constructors that can be provided with cus
#### SQS

```scala
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.pellucid.wrap.sqs.AmazonSQSScalaClient
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.mfglabs.commons.aws.sqs._

val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), ec)
val sqs = new AmazonSQSClient(new AmazonSQSAsyncClient(), ec)
val builder = SQSStreamBuilder(sqs)

val sender: Flow[String, SendMessageResult, Unit] =
Expand All @@ -87,7 +86,7 @@ val sender: Flow[String, SendMessageResult, Unit] =
}
.via(builder.sendMessageAsStream())

val receiver: Source[Message, Unit] =
val receiver: Source[Message, Unit] =
builder.receiveMessageAsStream(queueUrl, autoAck = false)
```

Expand All @@ -100,7 +99,7 @@ import com.mfglabs.commons.aws.cloudwatch
import cloudwatch._ // brings implicit extensions

// Create the client
val CW = new cloudwatch.AmazonCloudwatchClient()
val CW = cloudwatch.AmazonCloudwatchClient()()

// Use it
for {
Expand All @@ -114,73 +113,6 @@ and managed by [[AmazonCloudwatchClient]] itself.
There are also smart `AmazonCloudwatchClient` constructors that can be provided with custom.
`java.util.concurrent.ExecutorService` if you want to manage your pools of threads.

### Extensions

#### Cloudwatch heartbeat

It provides a simple mechanism that sends periodically a heartbeat metric to AWS Cloudwatch.

If the heartbeat rate on a _configurable_ period falls under a _configurable_ threshold or the metrics isn't fed with sufficient data, a Cloudwatch `ALARM` status is triggered & sent to a given SQS endpoint.

When the rate goes above threshold again, an `OK` status is triggered & sent to the same SQS endpoint.

> **IMPORTANT**: the alarm is created by the API itself but due to a limitation (or a bug) in Amazon API, the status of this alarm will stay at `INSUFFICIENT_DATA` until you manually update it in the AWS console.
For that, wait 1/2 minutes after start so that Cloudwatch receives enough heartbeats and then select the alarm, click on `modify` and then click on `save`. The alarm should pass to `OK` status.

_Cloudwatch heartbeat is based on Cloudwatch service & Akka scheduler._

##### Low-level client

```scala
import com.mfglabs.commons.aws.cloudwatch
import com.mfglabs.commons.aws.extensions.cloudwatch.CloudwatchAkkaHeartbeat

import myExecutionCtx // an implicit custom execution context

val hb = new CloudwatchAkkaHeartbeat(
namespace = "Test/Heartbeat", // the namespace of the cloudwatch metrics
name = "test1", // the name of the cloudwatch
beatPeriod = 2.second, // the heart beat period in Scala.concurrent.duration.Duration string format
alarmPeriod = 120.seconds, // the period on which the metrics is analyzed to determine the heartbeat health
alarmPeriodNb = 1, // the number of "bad health" periods after which the alarm is triggered
alarmThreshold = 10, // the threshold counting the number of heartbeats on a period under which the "bad health" is detected
system = system, // the Akka system to create scheduler
client = CW, // the cloudwatch client
actionEndpoint = "arn:aws:sns:eu-west-1:896733075612:Cloudwatch-HeartBeat-Test" // the actionEndpoint (SQS) to which Cloudwatch will send the alarm
)

hb.start() // to start the heartbeat

hb.stop() // to stop the heartbeat
```

> Please note that you need to provide an implicit `ExecutionContext` for `CloudwatchAkkaHeartbeat.start/stop`
##### Cakable client

`CloudwatchHeartbeatLayer` is ready to be used in a cake pattern

```scala
object MyAkkaService extends CloudwatchHeartbeatLayer {
override val system = myAkkaSystem

override val heartbeatClient = myCloudClient

override val heartbeatName: String = ...
override val heartbeatNamespace: String = ...

override val heartbeatPeriod: FiniteDuration = ...
override val heartbeatAlarmPeriod: FiniteDuration = ...
override val heartbeatAlarmPeriodNb: Int = ...
override val heartbeatAlarmThreshold: Int = ...
override val heartbeatEndpoint: String = ...

...
// start the heartbeat
heartbeat.start()(myExeCtx)
}
```

## License

This software is licensed under the Apache 2 license, quoted below.
Expand Down
14 changes: 4 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import sbtunidoc.Plugin._
import bintray.Plugin._


organization in ThisBuild := "com.mfglabs"

scalaVersion in ThisBuild := "2.11.7"
scalaVersion in ThisBuild := "2.11.11"

version in ThisBuild := "0.11.0"

Expand Down Expand Up @@ -68,12 +66,8 @@ lazy val all = (project in file("."))
.aggregate(s3)
.aggregate(sqs)
.settings(name := "commons-aws-all")
.settings(site.settings ++ ghpages.settings: _*)
.settings(
name := "commons-aws-all",
site.addMappingsToSiteDir(mappings in (ScalaUnidoc, packageDoc), "api/" + version),
git.remoteRepo := "[email protected]:MfgLabs/commons-aws.git"
)
.enablePlugins(ScalaUnidocPlugin)
.settings(name := "commons-aws-all")
.settings(noPublishSettings)

lazy val commons = project.in(file("commons"))
Expand Down Expand Up @@ -110,7 +104,7 @@ lazy val s3 = project.in(file("s3"))
),
commonSettings,
publishSettings
)
).dependsOn(commons)

lazy val sqs = project.in(file("sqs"))
.settings (
Expand Down
Loading

0 comments on commit 0ff83ec

Please sign in to comment.