Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mathematical operators #553

Merged
merged 1 commit into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>mutiny-test-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-math</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
33 changes: 33 additions & 0 deletions math/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Mutiny Math Operators

This library provides a set of _operators_ for [Mutiny](https://smallrye.io/smallrye-mutiny).

You can use these operators using the `plug` method offered by Mutiny:

```java
Multi<Long> counts = Multi.createFrom().items("a", "b", "c", "d", "e")
.plug(Math.count())
```

These operators provide streams (`Multi`) emitting new values when the computed value changes. To get the last value, use `collect().last()`:

```java
String max = Multi.createFrom().items("e", "b", "c", "f", "g", "e")
.plug(Math.max())
.collect().last()
.await().indefinitely();
```

The `io.smallrye.math.Math` class provides the entry point to the offered operators:

* count - emits the number of items emitted by the upstream
* index - emits `Tuple2<Long, T>` for each item from the upstream. The first element of the tuple is the index (0-based), and the second if the item
* min / max - emits the min/max item from the upstream, using Java comparator
* top(x) - emits the top x items from the upstream, a new ranking is emitted every time it changes.
* sum - emits the sum of all the items emitted by the upstream
* average - emits the average of all the items emitted by the upstream
* median - emits the median of all the items emitted by the upstream
* statistics - emits statistics about the emitted items



149 changes: 149 additions & 0 deletions math/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny-project</artifactId>
<version>999-SNAPSHOT</version>
</parent>

<name>SmallRye Mutiny :: Math Operators</name>
<artifactId>mutiny-math</artifactId>

<dependencies>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>reactive-streams-junit5-tck</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${reactive-streams.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default-test</id>
<goals>
<goal>test</goal>
</goals>
<configuration>
<!-- Disable TestNG -->
<testNGArtifactName>none:none</testNGArtifactName>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.moditect</groupId>
<artifactId>moditect-maven-plugin</artifactId>
<executions>
<execution>
<id>add-module-infos</id>
<phase>package</phase>
<goals>
<goal>add-module-info</goal>
</goals>
<configuration>
<module>
<moduleInfoFile>src/main/module/module-info.java</moduleInfoFile>
</module>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.revapi</groupId>
<artifactId>revapi-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<activation>
<jdk>[9,)</jdk>
</activation>
<id>java-9+</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- wonderful reliability of javadoc used with modules breaking on non exposed packages -->
<source>8</source>
<release>8</release>
<detectJavaApiLink>false</detectJavaApiLink>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
37 changes: 37 additions & 0 deletions math/revapi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[ {
"extension" : "revapi.java",
"id" : "java",
"configuration" : {
"missing-classes" : {
"behavior" : "report",
"ignoreMissingAnnotations" : false
},
"filter" : {
"packages" : {
"regex" : true,
"include" : [ "io\\.smallrye\\.mutiny\\.math(\\..+)?" ]
}
}
}
}, {
"extension" : "revapi.differences",
"id" : "breaking-changes",
"configuration" : {
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [

]
}
}, {
"extension" : "revapi.reporter.json",
"configuration" : {
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"output" : "target/compatibility.json",
"indent" : true,
"append" : false,
"keepEmptyFile" : true
}
} ]
31 changes: 31 additions & 0 deletions math/src/main/java/io/smallrye/mutiny/math/AverageOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.smallrye.mutiny.math;

import java.util.function.Function;

import io.smallrye.mutiny.Multi;

/**
* Average operator emitting the average of the items emitted by the upstream.
* <p>
* Everytime it gets an item from upstream, it emits the <em>average</em> of the already received items.
* If the stream emits the completion event without having emitting any item before, 0 is emitted, followed by the
* completion event.
* If the upstream emits a failure, then, the failure is propagated.
*/
public class AverageOperator<T extends Number>
implements Function<Multi<T>, Multi<Double>> {

private double sum = 0.0d;
private long count = 0L;

@Override
public Multi<Double> apply(Multi<T> multi) {
return multi
.onItem().transform(x -> {
count = count + 1L;
sum = sum + x.doubleValue();
return sum / count;
})
.onCompletion().ifEmpty().continueWith(0.0d);
}
}
30 changes: 30 additions & 0 deletions math/src/main/java/io/smallrye/mutiny/math/CountOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.smallrye.mutiny.math;

import java.util.function.Function;

import io.smallrye.mutiny.Multi;

/**
* Count operator emitting the current count.
* Everytime it gets an item from upstream, it emits the <em>count</em>.
* If the stream emits the completion event without having emitting any item before, 0 is emitted, followed by the
* completion event.
* If the upstream emits a failure, the failure is propagated.
*
* @param <T> type of the incoming items.
*/
public class CountOperator<T>
implements Function<Multi<T>, Multi<Long>> {

private long count = 0L;

@Override
public Multi<Long> apply(Multi<T> multi) {
return multi
.onItem().transform(x -> {
count = count + 1L;
return count;
})
.onCompletion().ifEmpty().continueWith(0L);
}
}
26 changes: 26 additions & 0 deletions math/src/main/java/io/smallrye/mutiny/math/IndexOperator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.smallrye.mutiny.math;

import java.util.function.Function;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.tuples.Tuple2;

/**
* Index operator emitting a {@link io.smallrye.mutiny.tuples.Tuple2 Tuple2&lt;Long, T&gt;}, with the index (0-based) of the
* element and the element from upstream.
*
* If the upstream emits a failure, then, the failure is propagated.
*
* @param <T> type of the incoming items.
*/
public class IndexOperator<T>
implements Function<Multi<T>, Multi<Tuple2<Long, T>>> {

private long index = 0L;

@Override
public Multi<Tuple2<Long, T>> apply(Multi<T> multi) {
return multi
.onItem().transform(x -> Tuple2.of(index++, x));
}
}
Loading