From ce59072c5c89209029e515918db2a33ed4313891 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sun, 10 Nov 2024 08:10:11 -0600 Subject: [PATCH] Improvements to Stream implementation and simplification of Task --- benchmark/benchmarks.json | 1121 ----------------- .../scala/benchmark/StreamBenchmark.scala | 28 +- cats/src/main/scala/rapid/cats/package.scala | 22 +- core/src/main/scala/rapid/ChainedTask.scala | 7 - core/src/main/scala/rapid/Fiber.scala | 2 +- core/src/main/scala/rapid/SimpleTask.scala | 3 - core/src/main/scala/rapid/Stream.scala | 152 +-- core/src/main/scala/rapid/Task.scala | 35 +- core/src/test/scala/spec/StreamSpec.scala | 5 +- 9 files changed, 101 insertions(+), 1274 deletions(-) delete mode 100644 benchmark/benchmarks.json delete mode 100644 core/src/main/scala/rapid/ChainedTask.scala delete mode 100644 core/src/main/scala/rapid/SimpleTask.scala diff --git a/benchmark/benchmarks.json b/benchmark/benchmarks.json deleted file mode 100644 index cc28489..0000000 --- a/benchmark/benchmarks.json +++ /dev/null @@ -1,1121 +0,0 @@ -[ - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamFilter", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "1000" - }, - "primaryMetric" : { - "score" : 64574.14109753256, - "scoreError" : 22665.627534642677, - "scoreConfidence" : [ - 41908.513562889886, - 87239.76863217524 - ], - "scorePercentiles" : { - "0.0" : 63428.4629358893, - "50.0" : 64399.26830113731, - "90.0" : 65894.6920555711, - "95.0" : 65894.6920555711, - "99.0" : 65894.6920555711, - "99.9" : 65894.6920555711, - "99.99" : 65894.6920555711, - "99.999" : 65894.6920555711, - "99.9999" : 65894.6920555711, - "100.0" : 65894.6920555711 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 63428.4629358893, - 64399.26830113731, - 65894.6920555711 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamFilter", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "10000" - }, - "primaryMetric" : { - "score" : 13711.478275069092, - "scoreError" : 1939.0670459629177, - "scoreConfidence" : [ - 11772.411229106174, - 15650.54532103201 - ], - "scorePercentiles" : { - "0.0" : 13623.66246095326, - "50.0" : 13681.135440566693, - "90.0" : 13829.636923687323, - "95.0" : 13829.636923687323, - "99.0" : 13829.636923687323, - "99.9" : 13829.636923687323, - "99.99" : 13829.636923687323, - "99.999" : 13829.636923687323, - "99.9999" : 13829.636923687323, - "100.0" : 13829.636923687323 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 13829.636923687323, - 13681.135440566693, - 13623.66246095326 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamFilter", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "100000" - }, - "primaryMetric" : { - "score" : 2758.7985524273486, - "scoreError" : 59.07507067912862, - "scoreConfidence" : [ - 2699.72348174822, - 2817.873623106477 - ], - "scorePercentiles" : { - "0.0" : 2756.719895233284, - "50.0" : 2757.1462819675594, - "90.0" : 2762.529480081203, - "95.0" : 2762.529480081203, - "99.0" : 2762.529480081203, - "99.9" : 2762.529480081203, - "99.99" : 2762.529480081203, - "99.999" : 2762.529480081203, - "99.9999" : 2762.529480081203, - "100.0" : 2762.529480081203 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 2757.1462819675594, - 2762.529480081203, - 2756.719895233284 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamMap", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "1000" - }, - "primaryMetric" : { - "score" : 63295.07269129075, - "scoreError" : 12300.615217642213, - "scoreConfidence" : [ - 50994.45747364854, - 75595.68790893296 - ], - "scorePercentiles" : { - "0.0" : 62856.77776703328, - "50.0" : 62956.97661976627, - "90.0" : 64071.46368707268, - "95.0" : 64071.46368707268, - "99.0" : 64071.46368707268, - "99.9" : 64071.46368707268, - "99.99" : 64071.46368707268, - "99.999" : 64071.46368707268, - "99.9999" : 64071.46368707268, - "100.0" : 64071.46368707268 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 62856.77776703328, - 64071.46368707268, - 62956.97661976627 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamMap", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "10000" - }, - "primaryMetric" : { - "score" : 14243.939492287878, - "scoreError" : 1368.0634067346343, - "scoreConfidence" : [ - 12875.876085553244, - 15612.002899022513 - ], - "scorePercentiles" : { - "0.0" : 14199.6937934667, - "50.0" : 14201.603327676155, - "90.0" : 14330.521355720779, - "95.0" : 14330.521355720779, - "99.0" : 14330.521355720779, - "99.9" : 14330.521355720779, - "99.99" : 14330.521355720779, - "99.999" : 14330.521355720779, - "99.9999" : 14330.521355720779, - "100.0" : 14330.521355720779 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 14201.603327676155, - 14330.521355720779, - 14199.6937934667 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamMap", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "100000" - }, - "primaryMetric" : { - "score" : 1059.206164205553, - "scoreError" : 1003.1660070563736, - "scoreConfidence" : [ - 56.040157149179436, - 2062.3721712619267 - ], - "scorePercentiles" : { - "0.0" : 1014.3856065783126, - "50.0" : 1042.6690566061402, - "90.0" : 1120.5638294322061, - "95.0" : 1120.5638294322061, - "99.0" : 1120.5638294322061, - "99.9" : 1120.5638294322061, - "99.99" : 1120.5638294322061, - "99.999" : 1120.5638294322061, - "99.9999" : 1120.5638294322061, - "100.0" : 1120.5638294322061 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 1120.5638294322061, - 1014.3856065783126, - 1042.6690566061402 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamToList", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "1000" - }, - "primaryMetric" : { - "score" : 83362.399871348, - "scoreError" : 24149.33355649607, - "scoreConfidence" : [ - 59213.06631485192, - 107511.73342784407 - ], - "scorePercentiles" : { - "0.0" : 82476.87441290758, - "50.0" : 82726.23614782427, - "90.0" : 84884.08905331213, - "95.0" : 84884.08905331213, - "99.0" : 84884.08905331213, - "99.9" : 84884.08905331213, - "99.99" : 84884.08905331213, - "99.999" : 84884.08905331213, - "99.9999" : 84884.08905331213, - "100.0" : 84884.08905331213 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 82726.23614782427, - 84884.08905331213, - 82476.87441290758 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamToList", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "10000" - }, - "primaryMetric" : { - "score" : 18764.7653656699, - "scoreError" : 1476.9901860449029, - "scoreConfidence" : [ - 17287.775179624998, - 20241.755551714803 - ], - "scorePercentiles" : { - "0.0" : 18703.317006746405, - "50.0" : 18734.47803577768, - "90.0" : 18856.501054485623, - "95.0" : 18856.501054485623, - "99.0" : 18856.501054485623, - "99.9" : 18856.501054485623, - "99.99" : 18856.501054485623, - "99.999" : 18856.501054485623, - "99.9999" : 18856.501054485623, - "100.0" : 18856.501054485623 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 18703.317006746405, - 18734.47803577768, - 18856.501054485623 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.fs2StreamToList", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "100000" - }, - "primaryMetric" : { - "score" : 2318.1294463569425, - "scoreError" : 144.99976649912443, - "scoreConfidence" : [ - 2173.1296798578182, - 2463.1292128560667 - ], - "scorePercentiles" : { - "0.0" : 2309.8351231989277, - "50.0" : 2318.8747425844163, - "90.0" : 2325.6784732874835, - "95.0" : 2325.6784732874835, - "99.0" : 2325.6784732874835, - "99.9" : 2325.6784732874835, - "99.99" : 2325.6784732874835, - "99.999" : 2325.6784732874835, - "99.9999" : 2325.6784732874835, - "100.0" : 2325.6784732874835 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 2318.8747425844163, - 2309.8351231989277, - 2325.6784732874835 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.rapidStreamFilter", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "1000" - }, - "primaryMetric" : { - "score" : 2470.525131131241, - "scoreError" : 82.42299839208422, - "scoreConfidence" : [ - 2388.102132739157, - 2552.948129523325 - ], - "scorePercentiles" : { - "0.0" : 2465.3194703395015, - "50.0" : 2472.832858878632, - "90.0" : 2473.423064175589, - "95.0" : 2473.423064175589, - "99.0" : 2473.423064175589, - "99.9" : 2473.423064175589, - "99.99" : 2473.423064175589, - "99.999" : 2473.423064175589, - "99.9999" : 2473.423064175589, - "100.0" : 2473.423064175589 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 2472.832858878632, - 2465.3194703395015, - 2473.423064175589 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.rapidStreamFilter", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "10000" - }, - "primaryMetric" : { - "score" : 25.088463463377526, - "scoreError" : 1.9638277179767856, - "scoreConfidence" : [ - 23.12463574540074, - 27.05229118135431 - ], - "scorePercentiles" : { - "0.0" : 24.981814355836054, - "50.0" : 25.086500479956875, - "90.0" : 25.197075554339644, - "95.0" : 25.197075554339644, - "99.0" : 25.197075554339644, - "99.9" : 25.197075554339644, - "99.99" : 25.197075554339644, - "99.999" : 25.197075554339644, - "99.9999" : 25.197075554339644, - "100.0" : 25.197075554339644 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 25.086500479956875, - 24.981814355836054, - 25.197075554339644 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.rapidStreamMap", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "1000" - }, - "primaryMetric" : { - "score" : 641.6873030834682, - "scoreError" : 42.87649825026216, - "scoreConfidence" : [ - 598.810804833206, - 684.5638013337305 - ], - "scorePercentiles" : { - "0.0" : 640.1388526565165, - "50.0" : 640.5314526298528, - "90.0" : 644.3916039640353, - "95.0" : 644.3916039640353, - "99.0" : 644.3916039640353, - "99.9" : 644.3916039640353, - "99.99" : 644.3916039640353, - "99.999" : 644.3916039640353, - "99.9999" : 644.3916039640353, - "100.0" : 644.3916039640353 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 640.1388526565165, - 640.5314526298528, - 644.3916039640353 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.StreamBenchmark.rapidStreamToList", - "mode" : "thrpt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "params" : { - "size" : "1000" - }, - "primaryMetric" : { - "score" : 623.6401268724171, - "scoreError" : 55.578749715678356, - "scoreConfidence" : [ - 568.0613771567388, - 679.2188765880954 - ], - "scorePercentiles" : { - "0.0" : 620.5337618682299, - "50.0" : 623.7636997596106, - "90.0" : 626.6229189894106, - "95.0" : 626.6229189894106, - "99.0" : 626.6229189894106, - "99.9" : 626.6229189894106, - "99.99" : 626.6229189894106, - "99.999" : 626.6229189894106, - "99.9999" : 626.6229189894106, - "100.0" : 626.6229189894106 - }, - "scoreUnit" : "ops/s", - "rawData" : [ - [ - 620.5337618682299, - 623.7636997596106, - 626.6229189894106 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.ManySleepsBenchmark.ioBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 18279.974318333334, - "scoreError" : 2518.0313627622772, - "scoreConfidence" : [ - 15761.942955571056, - 20798.005681095612 - ], - "scorePercentiles" : { - "0.0" : 18130.049503, - "50.0" : 18308.118691, - "90.0" : 18401.754761, - "95.0" : 18401.754761, - "99.0" : 18401.754761, - "99.9" : 18401.754761, - "99.99" : 18401.754761, - "99.999" : 18401.754761, - "99.9999" : 18401.754761, - "100.0" : 18401.754761 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 18308.118691, - 18401.754761, - 18130.049503 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.ManySleepsBenchmark.rapidBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 20210.000168666666, - "scoreError" : 14644.893932415484, - "scoreConfidence" : [ - 5565.106236251182, - 34854.89410108215 - ], - "scorePercentiles" : { - "0.0" : 19568.556238, - "50.0" : 19951.24182, - "90.0" : 21110.202448, - "95.0" : 21110.202448, - "99.0" : 21110.202448, - "99.9" : 21110.202448, - "99.99" : 21110.202448, - "99.999" : 21110.202448, - "99.9999" : 21110.202448, - "100.0" : 21110.202448 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 19568.556238, - 19951.24182, - 21110.202448 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.ManySleepsBenchmark.zioBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 75171.56530566666, - "scoreError" : 15551.93437688822, - "scoreConfidence" : [ - 59619.63092877844, - 90723.49968255489 - ], - "scorePercentiles" : { - "0.0" : 74259.610008, - "50.0" : 75306.714379, - "90.0" : 75948.37153, - "95.0" : 75948.37153, - "99.0" : 75948.37153, - "99.9" : 75948.37153, - "99.99" : 75948.37153, - "99.999" : 75948.37153, - "99.9999" : 75948.37153, - "100.0" : 75948.37153 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 75948.37153, - 75306.714379, - 74259.610008 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.ManyTasksBenchmark.ioBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 384.7470023504274, - "scoreError" : 155.52300075145328, - "scoreConfidence" : [ - 229.22400159897413, - 540.2700031018807 - ], - "scorePercentiles" : { - "0.0" : 378.535644, - "50.0" : 381.23940166666665, - "90.0" : 394.4659613846154, - "95.0" : 394.4659613846154, - "99.0" : 394.4659613846154, - "99.9" : 394.4659613846154, - "99.99" : 394.4659613846154, - "99.999" : 394.4659613846154, - "99.9999" : 394.4659613846154, - "100.0" : 394.4659613846154 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 378.535644, - 394.4659613846154, - 381.23940166666665 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.ManyTasksBenchmark.rapidBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 155.23103954147916, - "scoreError" : 130.34371178006953, - "scoreConfidence" : [ - 24.887327761409637, - 285.57475132154866 - ], - "scorePercentiles" : { - "0.0" : 147.53102453623188, - "50.0" : 156.516416265625, - "90.0" : 161.64567782258064, - "95.0" : 161.64567782258064, - "99.0" : 161.64567782258064, - "99.9" : 161.64567782258064, - "99.99" : 161.64567782258064, - "99.999" : 161.64567782258064, - "99.9999" : 161.64567782258064, - "100.0" : 161.64567782258064 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 147.53102453623188, - 161.64567782258064, - 156.516416265625 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.ManyTasksBenchmark.zioBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 68.63331723105847, - "scoreError" : 8.143481268374051, - "scoreConfidence" : [ - 60.48983596268442, - 76.77679849943252 - ], - "scorePercentiles" : { - "0.0" : 68.21858176190476, - "50.0" : 68.57564996575343, - "90.0" : 69.10571996551724, - "95.0" : 69.10571996551724, - "99.0" : 69.10571996551724, - "99.9" : 69.10571996551724, - "99.99" : 69.10571996551724, - "99.999" : 69.10571996551724, - "99.9999" : 69.10571996551724, - "100.0" : 69.10571996551724 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 68.57564996575343, - 69.10571996551724, - 68.21858176190476 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.OverheadBenchmark.ioBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 26.786694023970217, - "scoreError" : 2.3941136544565658, - "scoreConfidence" : [ - 24.392580369513652, - 29.18080767842678 - ], - "scorePercentiles" : { - "0.0" : 26.661846348404254, - "50.0" : 26.774747427807487, - "90.0" : 26.923488295698924, - "95.0" : 26.923488295698924, - "99.0" : 26.923488295698924, - "99.9" : 26.923488295698924, - "99.99" : 26.923488295698924, - "99.999" : 26.923488295698924, - "99.9999" : 26.923488295698924, - "100.0" : 26.923488295698924 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 26.774747427807487, - 26.661846348404254, - 26.923488295698924 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.OverheadBenchmark.rapidBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 13.742979314139594, - "scoreError" : 1.057130572603235, - "scoreConfidence" : [ - 12.68584874153636, - 14.80010988674283 - ], - "scorePercentiles" : { - "0.0" : 13.68222714637483, - "50.0" : 13.749075956043956, - "90.0" : 13.79763484, - "95.0" : 13.79763484, - "99.0" : 13.79763484, - "99.9" : 13.79763484, - "99.99" : 13.79763484, - "99.999" : 13.79763484, - "99.9999" : 13.79763484, - "100.0" : 13.79763484 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 13.68222714637483, - 13.79763484, - 13.749075956043956 - ] - ] - }, - "secondaryMetrics" : { - } - }, - { - "jmhVersion" : "1.37", - "benchmark" : "benchmark.OverheadBenchmark.zioBenchmark", - "mode" : "avgt", - "threads" : 1, - "forks" : 1, - "jvm" : "/home/mhicks/.sdkman/candidates/java/22.0.2.crac-zulu/bin/java", - "jvmArgs" : [ - ], - "jdkVersion" : "22.0.2", - "vmName" : "OpenJDK 64-Bit Server VM", - "vmVersion" : "22.0.2+9", - "warmupIterations" : 3, - "warmupTime" : "10 s", - "warmupBatchSize" : 1, - "measurementIterations" : 3, - "measurementTime" : "10 s", - "measurementBatchSize" : 1, - "primaryMetric" : { - "score" : 1110.165445788889, - "scoreError" : 198.81440890985587, - "scoreConfidence" : [ - 911.3510368790331, - 1308.979854698745 - ], - "scorePercentiles" : { - "0.0" : 1099.7220882, - "50.0" : 1109.3075005, - "90.0" : 1121.4667486666667, - "95.0" : 1121.4667486666667, - "99.0" : 1121.4667486666667, - "99.9" : 1121.4667486666667, - "99.99" : 1121.4667486666667, - "99.999" : 1121.4667486666667, - "99.9999" : 1121.4667486666667, - "100.0" : 1121.4667486666667 - }, - "scoreUnit" : "ms/op", - "rawData" : [ - [ - 1109.3075005, - 1099.7220882, - 1121.4667486666667 - ] - ] - }, - "secondaryMetrics" : { - } - } -] - - diff --git a/benchmark/src/main/scala/benchmark/StreamBenchmark.scala b/benchmark/src/main/scala/benchmark/StreamBenchmark.scala index 87ce8d6..c508199 100644 --- a/benchmark/src/main/scala/benchmark/StreamBenchmark.scala +++ b/benchmark/src/main/scala/benchmark/StreamBenchmark.scala @@ -14,11 +14,10 @@ import java.util.concurrent.TimeUnit @OutputTimeUnit(TimeUnit.SECONDS) @State(Scope.Thread) class StreamBenchmark { - @Param(Array("1000")) //, "10000", "100000")) + @Param(Array("1000", "10000", "100000")) var size: Int = _ lazy val rapidStream: rapid.Stream[Int] = rapid.Stream.fromList((1 to size).toList) - lazy val rapidFs2Stream: fs2.Stream[Task, Int] = fs2.Stream.emits(1 to size) lazy val fs2Stream: fs2.Stream[IO, Int] = fs2.Stream.emits(1 to size) @Setup(Level.Trial) @@ -33,32 +32,27 @@ class StreamBenchmark { } @Benchmark - def rapidFs2StreamToList(): List[Int] = { - rapidFs2Stream.compile.toList.sync() + def fs2StreamToList(): List[Int] = { + fs2Stream.compile.toList.unsafeRunSync() } -// @Benchmark -// def fs2StreamToList(): List[Int] = { -// fs2Stream.compile.toList.unsafeRunSync() -// } - @Benchmark def rapidStreamFilter(): List[Int] = { rapidStream.filter(_ % 2 == 0).toList.sync() } -// @Benchmark -// def fs2StreamFilter(): List[Int] = { -// fs2Stream.filter(_ % 2 == 0).compile.toList.unsafeRunSync() -// } + @Benchmark + def fs2StreamFilter(): List[Int] = { + fs2Stream.filter(_ % 2 == 0).compile.toList.unsafeRunSync() + } @Benchmark def rapidStreamMap(): List[Int] = { rapidStream.map(_ * 2).toList.sync() } -// @Benchmark -// def fs2StreamMap(): List[Int] = { -// fs2Stream.map(_ * 2).compile.toList.unsafeRunSync() -// } + @Benchmark + def fs2StreamMap(): List[Int] = { + fs2Stream.map(_ * 2).compile.toList.unsafeRunSync() + } } \ No newline at end of file diff --git a/cats/src/main/scala/rapid/cats/package.scala b/cats/src/main/scala/rapid/cats/package.scala index 8b85f23..7b7969d 100644 --- a/cats/src/main/scala/rapid/cats/package.scala +++ b/cats/src/main/scala/rapid/cats/package.scala @@ -7,15 +7,15 @@ package object cats { def toIO: IO[Return] = IO.blocking(task.sync()) } - implicit class StreamExtras[Return](val stream: Stream[Return]) extends AnyVal { - def toFS2: fs2.Stream[IO, Return] = { - def loop(stream: Stream[Return]): fs2.Stream[IO, Return] = { - fs2.Stream.eval(stream.pull.toTask.toIO).flatMap { - case Some((head, tail)) => fs2.Stream.emit(head) ++ loop(tail) - case None => fs2.Stream.empty - } - } - loop(stream) - } - } +// implicit class StreamExtras[Return](val stream: Stream[Return]) extends AnyVal { +// def toFS2: fs2.Stream[IO, Return] = { +// def loop(stream: Stream[Return]): fs2.Stream[IO, Return] = { +// fs2.Stream.eval(stream.pull.toTask.toIO).flatMap { +// case Some((head, tail)) => fs2.Stream.emit(head) ++ loop(tail) +// case None => fs2.Stream.empty +// } +// } +// loop(stream) +// } +// } } \ No newline at end of file diff --git a/core/src/main/scala/rapid/ChainedTask.scala b/core/src/main/scala/rapid/ChainedTask.scala deleted file mode 100644 index 3565404..0000000 --- a/core/src/main/scala/rapid/ChainedTask.scala +++ /dev/null @@ -1,7 +0,0 @@ -package rapid - -case class ChainedTask[Return](list: List[Any => Task[Any]]) extends Task[Return] { - override protected def f: () => Return = () => list.reverse.foldLeft((): Any)((value, f) => f(value).sync()).asInstanceOf[Return] - - override def flatMap[T](f: Return => Task[T]): Task[T] = copy(f.asInstanceOf[Any => Task[Any]] :: list) -} diff --git a/core/src/main/scala/rapid/Fiber.scala b/core/src/main/scala/rapid/Fiber.scala index 002b891..b9c77e8 100644 --- a/core/src/main/scala/rapid/Fiber.scala +++ b/core/src/main/scala/rapid/Fiber.scala @@ -14,7 +14,7 @@ class Fiber[Return](val task: Task[Return]) extends Task[Return] { case t: Throwable => result = Left(t) }) - override protected lazy val f: () => Return = () => await() + override protected def invoke(): Return = await() override def start(): Fiber[Return] = this diff --git a/core/src/main/scala/rapid/SimpleTask.scala b/core/src/main/scala/rapid/SimpleTask.scala deleted file mode 100644 index 6d438e1..0000000 --- a/core/src/main/scala/rapid/SimpleTask.scala +++ /dev/null @@ -1,3 +0,0 @@ -package rapid - -class SimpleTask[Return](val f: () => Return) extends AnyVal with Task[Return] diff --git a/core/src/main/scala/rapid/Stream.scala b/core/src/main/scala/rapid/Stream.scala index e379bcf..fecfeb1 100644 --- a/core/src/main/scala/rapid/Stream.scala +++ b/core/src/main/scala/rapid/Stream.scala @@ -7,13 +7,22 @@ import java.util.concurrent.Semaphore * * @tparam Return the type of the values produced by this stream */ -trait Stream[Return] { stream => +class Stream[Return](private val task: Task[Iterator[Return]]) extends AnyVal { /** - * Produces the next value in the stream, if any. + * Filters the values in the stream using the given predicate. * - * @return a `Pull` that produces an optional pair of the next value and the remaining stream + * @param p the predicate to test the values + * @return a new stream with the values that satisfy the predicate */ - def pull: Pull[Option[(Return, Stream[Return])]] + def filter(p: Return => Boolean): Stream[Return] = new Stream(task.map(_.filter(p))) + + /** + * Takes values from the stream while the given predicate holds. + * + * @param p the predicate to test the values + * @return a new stream with the values that satisfy the predicate + */ + def takeWhile(p: Return => Boolean): Stream[Return] = new Stream(task.map(_.takeWhile(p))) /** * Transforms the values in the stream using the given function. @@ -22,12 +31,7 @@ trait Stream[Return] { stream => * @tparam T the type of the transformed values * @return a new stream with the transformed values */ - def map[T](f: Return => T): Stream[T] = new Stream[T] { - def pull: Pull[Option[(T, Stream[T])]] = stream.pull.flatMap { - case Some((head, tail)) => Pull.pure(Some(f(head) -> tail.map(f))) - case None => Pull.pure(None) - } - } + def map[T](f: Return => T): Stream[T] = new Stream(task.map(_.map(f))) /** * Transforms the values in the stream using the given function that returns a new stream. @@ -36,18 +40,20 @@ trait Stream[Return] { stream => * @tparam T the type of the values in the new streams * @return a new stream with the transformed values */ - def flatMap[T](f: Return => Stream[T]): Stream[T] = new Stream[T] { - def pull: Pull[Option[(T, Stream[T])]] = { - def go(s: Stream[Return]): Pull[Option[(T, Stream[T])]] = s.pull.flatMap { - case Some((head, tail)) => f(head).pull.flatMap { - case Some((fh, ft)) => Pull.pure(Some(fh -> ft.append(tail.flatMap(f)))) - case None => go(tail) - } - case None => Pull.pure(None) - } - go(stream) - } - } + def flatMap[T](f: Return => Stream[T]): Stream[T] = new Stream(task.map { iterator => + iterator.flatMap(r => f(r).task.sync()) + }) + + /** + * Transforms the values in the stream using the given function that returns a task. + * + * @param f the function to transform the values into tasks + * @tparam T the type of the values in the tasks + * @return a new stream with the transformed values + */ + def evalMap[T](f: Return => Task[T]): Stream[T] = new Stream(task.map { iterator => + iterator.map(f).map(_.sync()) + }) /** * Appends another stream to this stream. @@ -56,64 +62,34 @@ trait Stream[Return] { stream => * @tparam T the type of the values in the appended stream * @return a new stream with the values from both streams */ - def append[T >: Return](that: => Stream[T]): Stream[T] = new Stream[T] { - def pull: Pull[Option[(T, Stream[T])]] = stream.pull.flatMap { - case Some((head, tail)) => Pull.pure(Some(head -> tail.append(that))) - case None => that.pull - } - } + def append[T >: Return](that: => Stream[T]): Stream[T] = new Stream(Task { + val iterator1 = task.sync() + val iterator2 = that.task.sync() + iterator1 ++ iterator2 + }) /** - * Takes values from the stream while the given predicate holds. + * Converts the stream to a list. * - * @param p the predicate to test the values - * @return a new stream with the values that satisfy the predicate + * @return a task that produces a list of the values in the stream */ - def takeWhile(p: Return => Boolean): Stream[Return] = new Stream[Return] { - def pull: Pull[Option[(Return, Stream[Return])]] = stream.pull.flatMap { - case Some((head, tail)) => - if (p(head)) Pull.pure(Some(head -> tail.takeWhile(p))) - else Pull.pure(None) - case None => Pull.pure(None) - } - } + def toList: Task[List[Return]] = task.map(_.toList) /** - * Filters the values in the stream using the given predicate. + * Counts the number of elements in the stream and fully evaluates it. * - * @param p the predicate to test the values - * @return a new stream with the values that satisfy the predicate + * @return a `Task[Int]` representing the total number of entries evaluated */ - def filter(p: Return => Boolean): Stream[Return] = new Stream[Return] { - def pull: Pull[Option[(Return, Stream[Return])]] = { - def go(s: Stream[Return]): Pull[Option[(Return, Stream[Return])]] = s.pull.flatMap { - case Some((head, tail)) => - if (p(head)) Pull.pure(Some(head -> new Stream[Return] { - def pull: Pull[Option[(Return, Stream[Return])]] = go(tail) - })) - else go(tail) - case None => Pull.pure(None) - } - go(stream) - } - } + def count: Task[Int] = task.map(_.size) +} +/*trait Stream[Return] { stream => /** - * Transforms the values in the stream using the given function that returns a task. + * Produces the next value in the stream, if any. * - * @param f the function to transform the values into tasks - * @tparam T the type of the values in the tasks - * @return a new stream with the transformed values + * @return a `Pull` that produces an optional pair of the next value and the remaining stream */ - def evalMap[T](f: Return => Task[T]): Stream[T] = new Stream[T] { - def pull: Pull[Option[(T, Stream[T])]] = stream.pull.flatMap { - case Some((head, tail)) => - Pull.suspend { - f(head).map(result => Option(result -> tail.evalMap(f))).toPull - } - case None => Pull.pure(None) - } - } + def pull: Pull[Option[(Return, Stream[Return])]] /** * Transforms the values in the stream using the given function that returns a task, with a maximum concurrency. @@ -146,37 +122,7 @@ trait Stream[Return] { stream => } } } - - /** - * Converts the stream to a list. - * - * @return a task that produces a list of the values in the stream - */ - def toList: Task[List[Return]] = { - def loop(stream: Stream[Return], acc: List[Return]): Pull[List[Return]] = { - stream.pull.flatMap { - case Some((head, tail)) => Pull.suspend(loop(tail, acc :+ head)) - case None => Pull.pure(acc) - } - } - loop(this, List.empty).toTask - } - - /** - * Counts the number of elements in the stream and fully evaluates it. - * - * @return a `Task[Int]` representing the total number of entries evaluated - */ - def count: Task[Int] = { - def loop(stream: Stream[Return], acc: Int): Pull[Int] = { - stream.pull.flatMap { - case Some((_, tail)) => Pull.suspend(loop(tail, acc + 1)) - case None => Pull.pure(acc) - } - } - loop(this, 0).toTask - } -} +}*/ object Stream { /** @@ -186,9 +132,7 @@ object Stream { * @tparam Return the type of the value * @return a new stream that emits the value */ - def emit[Return](value: Return): Stream[Return] = new Stream[Return] { - def pull: Pull[Option[(Return, Stream[Return])]] = Pull.pure(Some(value -> empty)) - } + def emit[Return](value: Return): Stream[Return] = new Stream[Return](Task.pure(List(value).iterator)) /** * Creates an empty stream. @@ -196,9 +140,7 @@ object Stream { * @tparam Return the type of the values in the stream * @return a new empty stream */ - def empty[Return]: Stream[Return] = new Stream[Return] { - def pull: Pull[Option[(Return, Stream[Return])]] = Pull.pure(None) - } + def empty[Return]: Stream[Return] = new Stream[Return](Task.pure(Nil.iterator)) /** * Creates a stream from a list of values. diff --git a/core/src/main/scala/rapid/Task.scala b/core/src/main/scala/rapid/Task.scala index 5f4441a..d79b683 100644 --- a/core/src/main/scala/rapid/Task.scala +++ b/core/src/main/scala/rapid/Task.scala @@ -8,14 +8,14 @@ import scala.concurrent.duration.FiniteDuration * @tparam Return the type of the result produced by this task */ trait Task[Return] extends Any { - protected def f: () => Return + protected def invoke(): Return /** * Synchronously (blocking) executes the task and returns the result. * * @return the result of the task */ - def sync(): Return = f() + def sync(): Return = invoke() /** * Starts the task and returns a `Fiber` representing the running task. @@ -45,7 +45,7 @@ trait Task[Return] extends Any { * @tparam T the type of the transformed result * @return a new task with the transformed result */ - def map[T](f: Return => T): Task[T] = Task(f(this.f())) + def map[T](f: Return => T): Task[T] = Task(f(invoke())) /** * Flat maps the result of the task using the given function. @@ -54,7 +54,7 @@ trait Task[Return] extends Any { * @tparam T the type of the result of the new task * @return a new task with the transformed result */ - def flatMap[T](f: Return => Task[T]): Task[T] = ChainedTask(List( + def flatMap[T](f: Return => Task[T]): Task[T] = Task.Chained(List( v => f(v.asInstanceOf[Return]).asInstanceOf[Task[Any]], (_: Any) => this.asInstanceOf[Task[Any]], )) @@ -85,10 +85,33 @@ trait Task[Return] extends Any { } object Task { + case class Pure[Return](value: Return) extends AnyVal with Task[Return] { + override protected def invoke(): Return = value + } + + case class Single[Return](f: () => Return) extends AnyVal with Task[Return] { + override protected def invoke(): Return = f() + } + + case class Chained[Return](list: List[Any => Task[Any]]) extends AnyVal with Task[Return] { + override protected def invoke(): Return = list.reverse.foldLeft((): Any)((value, f) => f(value).sync()).asInstanceOf[Return] + + override def flatMap[T](f: Return => Task[T]): Task[T] = copy(f.asInstanceOf[Any => Task[Any]] :: list) + } + /** * A task that returns `Unit`. */ - lazy val unit: Task[Unit] = apply(()) + lazy val unit: Task[Unit] = pure(()) + + /** + * Creates a new task with the given value pre-evaluated. + * + * @param value the return value of this Task + * @tparam Return the type of the result produced by the task + * @return a new task + */ + def pure[Return](value: Return): Task[Return] = Pure(value) /** * Creates a new task with the given function. @@ -97,7 +120,7 @@ object Task { * @tparam Return the type of the result produced by the task * @return a new task */ - def apply[Return](f: => Return): Task[Return] = new SimpleTask(() => f) + def apply[Return](f: => Return): Task[Return] = Single(() => f) /** * Creates a new task that sleeps for the given duration. diff --git a/core/src/test/scala/spec/StreamSpec.scala b/core/src/test/scala/spec/StreamSpec.scala index 3c9bc97..adc4fb9 100644 --- a/core/src/test/scala/spec/StreamSpec.scala +++ b/core/src/test/scala/spec/StreamSpec.scala @@ -31,7 +31,7 @@ class StreamSpec extends AnyWordSpec with Matchers { val result = stream.evalMap(x => Task(x * 2)).toList.sync() result shouldEqual List(2, 4, 6) } - "evaluate elements in parallel with parEvalMap" in { + /*"evaluate elements in parallel with parEvalMap" in { val stream = Stream.fromList(List(1, 2, 3, 4)) val result = stream.parEvalMap(2)(x => Task(x * 2)).toList.sync() result.sorted shouldEqual List(2, 4, 6, 8) // Sorting to account for parallel execution order @@ -40,8 +40,7 @@ class StreamSpec extends AnyWordSpec with Matchers { val stream = Stream.fromList(List(1, 2, 3, 4, 5, 6)) val result = stream.parEvalMap(3)(x => Task(x * 2)).toList.sync() result.sorted shouldEqual List(2, 4, 6, 8, 10, 12) // Sorting to account for parallel execution order - } - + }*/ "append two streams" in { val stream1 = Stream.fromList(List(1, 2, 3)) val stream2 = Stream.fromList(List(4, 5, 6))