diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b738e60aa..4f99bfd56 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,15 +1,14 @@ name: Test all the things - on: [ push, pull_request ] +env: + JAVA_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 + JVM_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 jobs: ci: - runs-on: ubuntu-latest - env: - JAVA_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 - JVM_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 + runs-on: self-hosted steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - run: git fetch -f --depth=1 origin '+refs/tags/*:refs/tags/*' - uses: coursier/cache-action@v6 - uses: coursier/setup-action@v1 @@ -19,9 +18,13 @@ jobs: - name: Test run: sbt -v ";+core/test;+instrumentation/test;+reporters/test" lint: - runs-on: ubuntu-latest + runs-on: self-hosted steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - uses: coursier/cache-action@v6 + - uses: coursier/setup-action@v1 + with: + jvm: adopt:8 + apps: sbt - name: Test run: sbt -v "+scalafmtCheckAll;scalafmtSbtCheck" diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 366f676e2..4b0ef597c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,12 +6,12 @@ on: jobs: release: - runs-on: ubuntu-latest + runs-on: self-hosted env: JAVA_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 JVM_OPTS: -Xms5120M -Xmx5120M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - run: git fetch -f --depth=1 origin '+refs/tags/*:refs/tags/*' - uses: coursier/cache-action@v6 - uses: coursier/setup-action@v1 diff --git a/build.sbt b/build.sbt index ba9f40b5b..d742aa012 100644 --- a/build.sbt +++ b/build.sbt @@ -414,6 +414,7 @@ lazy val `kamon-spring` = (project in file("instrumentation/kamon-spring")) "org.springframework.boot" % "spring-boot-starter-webflux" % "2.4.2" % "provided", okHttp % "test", "com.h2database" % "h2" % "1.4.200" % "test", + "javax.xml.bind" % "jaxb-api" % "2.3.1" % "test", "org.springframework.boot" % "spring-boot-starter-data-jpa" % "2.4.2" % "test", scalatest % "test", logbackClassic % "test" @@ -575,6 +576,21 @@ lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc ) )).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test") +lazy val `kamon-pekko-connectors-kafka` = (project in file("instrumentation/kamon-pekko-connectors-kafka")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "org.apache.pekko" %% "pekko-connectors-kafka" % "1.0.0" % "provided", + "org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided", + scalatest % "test", + logbackClassic % "test" + ) + ).dependsOn(`kamon-core`, `kamon-pekko`, `kamon-testkit` % "test") + lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc")) .enablePlugins(JavaAgent, AkkaGrpcPlugin) .disablePlugins(AssemblyPlugin) @@ -821,6 +837,7 @@ lazy val `kamon-apache-cxf` = (project in file("instrumentation/kamon-apache-cxf "org.mock-server" % "mockserver-client-java" % "5.13.2" % "test", "com.dimafeng" %% "testcontainers-scala" % "0.41.0" % "test", "com.dimafeng" %% "testcontainers-scala-mockserver" % "0.41.0" % "test", + "javax.xml.bind" % "jaxb-api" % "2.3.1" % "test", "org.apache.cxf" % "cxf-rt-frontend-jaxws" % "3.3.6" % "test", "org.apache.cxf" % "cxf-rt-transports-http" % "3.3.6" % "test" ) @@ -1106,6 +1123,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo `kamon-pekko`, `kamon-pekko-http`, `kamon-pekko-grpc`, + `kamon-pekko-connectors-kafka`, `kamon-tapir`, `kamon-alpakka-kafka` ) diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/BooleanTag.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/BooleanTag.java index c46301b83..7124e6a9b 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/BooleanTag.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/BooleanTag.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Context.colf. import static java.lang.String.format; @@ -40,20 +25,16 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class BooleanTag implements Serializable { /** The upper limit for serial byte sizes. */ public static int colferSizeMax = 16 * 1024 * 1024; - - public String key; public boolean value; - /** Default constructor */ public BooleanTag() { init(); @@ -88,7 +69,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(BooleanTag.colferSizeMax, 2048)]; this.buf = buf; @@ -133,7 +113,6 @@ public BooleanTag next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(BooleanTag.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -154,6 +133,16 @@ public BooleanTag next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6 + (long)this.key.length() * 3 + 1; + if (n < 0 || n > (long)BooleanTag.colferSizeMax) return BooleanTag.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -165,22 +154,16 @@ public BooleanTag next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(BooleanTag.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(BooleanTag.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -323,16 +306,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -371,7 +346,7 @@ public void setKey(String value) { /** * Sets kamon/context/generated/binary/context.BooleanTag.key. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public BooleanTag withKey(String value) { this.key = value; @@ -397,7 +372,7 @@ public void setValue(boolean value) { /** * Sets kamon/context/generated/binary/context.BooleanTag.value. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public BooleanTag withValue(boolean value) { this.value = value; @@ -420,8 +395,8 @@ public final boolean equals(Object o) { public final boolean equals(BooleanTag o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == BooleanTag.class - && (this.key == null ? o.key == null : this.key.equals(o.key)) + + return (this.key == null ? o.key == null : this.key.equals(o.key)) && this.value == o.value; } diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java index 746287276..14425e2ae 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Context.colf. import static java.lang.String.format; @@ -39,7 +24,6 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class Context implements Serializable { /** The upper limit for serial byte sizes. */ @@ -49,13 +33,10 @@ public class Context implements Serializable { public static int colferListMax = 64 * 1024; - - public Tags tags; public Entry[] entries; - /** Default constructor */ public Context() { init(); @@ -91,7 +72,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(Context.colferSizeMax, 2048)]; this.buf = buf; @@ -136,7 +116,6 @@ public Context next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(Context.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -157,6 +136,21 @@ public Context next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6; + if (this.tags != null) n += 1 + (long)this.tags.marshalFit(); + for (Entry o : this.entries) { + if (o == null) n++; + else n += o.marshalFit(); + } + if (n < 0 || n > (long)Context.colferSizeMax) return Context.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -169,22 +163,16 @@ public Context next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Context.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(Context.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -311,16 +299,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -359,7 +339,7 @@ public void setTags(Tags value) { /** * Sets kamon/context/generated/binary/context.Context.tags. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Context withTags(Tags value) { this.tags = value; @@ -385,7 +365,7 @@ public void setEntries(Entry[] value) { /** * Sets kamon/context/generated/binary/context.Context.entries. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Context withEntries(Entry[] value) { this.entries = value; @@ -408,8 +388,8 @@ public final boolean equals(Object o) { public final boolean equals(Context o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == Context.class - && (this.tags == null ? o.tags == null : this.tags.equals(o.tags)) + + return (this.tags == null ? o.tags == null : this.tags.equals(o.tags)) && java.util.Arrays.equals(this.entries, o.entries); } diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java index 040a0f280..ae8f903ec 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Context.colf. import static java.lang.String.format; @@ -40,20 +25,16 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class Entry implements Serializable { /** The upper limit for serial byte sizes. */ public static int colferSizeMax = 16 * 1024 * 1024; - - public String key; public byte[] value; - /** Default constructor */ public Entry() { init(); @@ -90,7 +71,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; this.buf = buf; @@ -135,7 +115,6 @@ public Entry next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(Entry.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -156,6 +135,16 @@ public Entry next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6 + (long)this.key.length() * 3 + 6 + (long)this.value.length; + if (n < 0 || n > (long)Entry.colferSizeMax) return Entry.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -167,22 +156,16 @@ public Entry next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(Entry.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -353,16 +336,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -401,7 +376,7 @@ public void setKey(String value) { /** * Sets kamon/context/generated/binary/context.Entry.key. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Entry withKey(String value) { this.key = value; @@ -427,7 +402,7 @@ public void setValue(byte[] value) { /** * Sets kamon/context/generated/binary/context.Entry.value. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Entry withValue(byte[] value) { this.value = value; @@ -450,8 +425,8 @@ public final boolean equals(Object o) { public final boolean equals(Entry o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == Entry.class - && (this.key == null ? o.key == null : this.key.equals(o.key)) + + return (this.key == null ? o.key == null : this.key.equals(o.key)) && java.util.Arrays.equals(this.value, o.value); } diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/LongTag.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/LongTag.java index 39f8ff8bc..f211ae8a2 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/LongTag.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/LongTag.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Context.colf. import static java.lang.String.format; @@ -40,20 +25,16 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class LongTag implements Serializable { /** The upper limit for serial byte sizes. */ public static int colferSizeMax = 16 * 1024 * 1024; - - public String key; public long value; - /** Default constructor */ public LongTag() { init(); @@ -88,7 +69,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(LongTag.colferSizeMax, 2048)]; this.buf = buf; @@ -133,7 +113,6 @@ public LongTag next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(LongTag.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -154,6 +133,16 @@ public LongTag next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6 + (long)this.key.length() * 3 + 10; + if (n < 0 || n > (long)LongTag.colferSizeMax) return LongTag.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -165,22 +154,16 @@ public LongTag next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(LongTag.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(LongTag.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -354,16 +337,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -402,7 +377,7 @@ public void setKey(String value) { /** * Sets kamon/context/generated/binary/context.LongTag.key. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public LongTag withKey(String value) { this.key = value; @@ -428,7 +403,7 @@ public void setValue(long value) { /** * Sets kamon/context/generated/binary/context.LongTag.value. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public LongTag withValue(long value) { this.value = value; @@ -451,8 +426,8 @@ public final boolean equals(Object o) { public final boolean equals(LongTag o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == LongTag.class - && (this.key == null ? o.key == null : this.key.equals(o.key)) + + return (this.key == null ? o.key == null : this.key.equals(o.key)) && this.value == o.value; } diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/StringTag.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/StringTag.java index 43e5f60e2..ee5f5007a 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/StringTag.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/StringTag.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Context.colf. import static java.lang.String.format; @@ -40,20 +25,16 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class StringTag implements Serializable { /** The upper limit for serial byte sizes. */ public static int colferSizeMax = 16 * 1024 * 1024; - - public String key; public String value; - /** Default constructor */ public StringTag() { init(); @@ -89,7 +70,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(StringTag.colferSizeMax, 2048)]; this.buf = buf; @@ -134,7 +114,6 @@ public StringTag next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(StringTag.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -155,6 +134,16 @@ public StringTag next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6 + (long)this.key.length() * 3 + 6 + (long)this.value.length() * 3; + if (n < 0 || n > (long)StringTag.colferSizeMax) return StringTag.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -166,22 +155,16 @@ public StringTag next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(StringTag.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(StringTag.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -377,16 +360,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -425,7 +400,7 @@ public void setKey(String value) { /** * Sets kamon/context/generated/binary/context.StringTag.key. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public StringTag withKey(String value) { this.key = value; @@ -451,7 +426,7 @@ public void setValue(String value) { /** * Sets kamon/context/generated/binary/context.StringTag.value. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public StringTag withValue(String value) { this.value = value; @@ -474,8 +449,8 @@ public final boolean equals(Object o) { public final boolean equals(StringTag o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == StringTag.class - && (this.key == null ? o.key == null : this.key.equals(o.key)) + + return (this.key == null ? o.key == null : this.key.equals(o.key)) && (this.value == null ? o.value == null : this.value.equals(o.value)); } diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Tags.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Tags.java index 4d08e87b6..65440960e 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Tags.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/context/Tags.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Context.colf. import static java.lang.String.format; @@ -39,7 +24,6 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class Tags implements Serializable { /** The upper limit for serial byte sizes. */ @@ -49,15 +33,12 @@ public class Tags implements Serializable { public static int colferListMax = 64 * 1024; - - public StringTag[] strings; public LongTag[] longs; public BooleanTag[] booleans; - /** Default constructor */ public Tags() { init(); @@ -97,7 +78,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(Tags.colferSizeMax, 2048)]; this.buf = buf; @@ -142,7 +122,6 @@ public Tags next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(Tags.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -163,6 +142,28 @@ public Tags next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6 + 6 + 6; + for (StringTag o : this.strings) { + if (o == null) n++; + else n += o.marshalFit(); + } + for (LongTag o : this.longs) { + if (o == null) n++; + else n += o.marshalFit(); + } + for (BooleanTag o : this.booleans) { + if (o == null) n++; + else n += o.marshalFit(); + } + if (n < 0 || n > (long)Tags.colferSizeMax) return Tags.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -177,22 +178,16 @@ public Tags next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Tags.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(Tags.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -396,16 +391,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -444,7 +431,7 @@ public void setStrings(StringTag[] value) { /** * Sets kamon/context/generated/binary/context.Tags.strings. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Tags withStrings(StringTag[] value) { this.strings = value; @@ -470,7 +457,7 @@ public void setLongs(LongTag[] value) { /** * Sets kamon/context/generated/binary/context.Tags.longs. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Tags withLongs(LongTag[] value) { this.longs = value; @@ -496,7 +483,7 @@ public void setBooleans(BooleanTag[] value) { /** * Sets kamon/context/generated/binary/context.Tags.booleans. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Tags withBooleans(BooleanTag[] value) { this.booleans = value; @@ -520,8 +507,8 @@ public final boolean equals(Object o) { public final boolean equals(Tags o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == Tags.class - && java.util.Arrays.equals(this.strings, o.strings) + + return java.util.Arrays.equals(this.strings, o.strings) && java.util.Arrays.equals(this.longs, o.longs) && java.util.Arrays.equals(this.booleans, o.booleans); } diff --git a/core/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java b/core/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java index d333e0520..099a859ed 100644 --- a/core/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java +++ b/core/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java @@ -1,23 +1,8 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package kamon.context.generated.binary.span; // Code generated by colf(1); DO NOT EDIT. +// The compiler used schema file Span.colf. import static java.lang.String.format; @@ -39,15 +24,12 @@ * @author generated by colf(1) * @see Colfer's home */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Span.colf") public class Span implements Serializable { /** The upper limit for serial byte sizes. */ public static int colferSizeMax = 16 * 1024 * 1024; - - public byte[] traceID; public byte[] spanID; @@ -56,7 +38,6 @@ public class Span implements Serializable { public byte samplingDecision; - /** Default constructor */ public Span() { init(); @@ -94,7 +75,6 @@ public static class Unmarshaller { * @param buf the initial buffer or {@code null}. */ public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation if (buf == null || buf.length == 0) buf = new byte[Math.min(Span.colferSizeMax, 2048)]; this.buf = buf; @@ -139,7 +119,6 @@ public Span next() throws IOException { this.i = 0; } else if (i == buf.length) { byte[] src = this.buf; - // TODO: better size estimation if (offset == 0) this.buf = new byte[Math.min(Span.colferSizeMax, this.buf.length * 4)]; System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); this.i -= this.offset; @@ -160,6 +139,16 @@ public Span next() throws IOException { } + /** + * Gets the serial size estimate as an upper boundary, whereby + * {@link #marshal(byte[],int)} ≤ {@link #marshalFit()} ≤ {@link #colferSizeMax}. + * @return the number of bytes. + */ + public int marshalFit() { + long n = 1L + 6 + (long)this.traceID.length + 6 + (long)this.spanID.length + 6 + (long)this.parentID.length + 2; + if (n < 0 || n > (long)Span.colferSizeMax) return Span.colferSizeMax; + return (int) n; + } /** * Serializes the object. @@ -171,22 +160,16 @@ public Span next() throws IOException { * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. */ public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Span.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(Span.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; + int n = 0; + if (buf != null && buf.length != 0) try { + n = marshal(buf, 0); + } catch (BufferOverflowException e) {} + if (n == 0) { + buf = new byte[marshalFit()]; + n = marshal(buf, 0); } + out.write(buf, 0, n); + return buf; } /** @@ -379,16 +362,8 @@ public int unmarshal(byte[] buf, int offset, int end) { // {@link Serializable} Colfer extension. private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - + byte[] buf = new byte[marshalFit()]; + int n = marshal(buf, 0); out.writeInt(n); out.write(buf, 0, n); } @@ -427,7 +402,7 @@ public void setTraceID(byte[] value) { /** * Sets kamon/context/generated/binary/span.Span.traceID. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Span withTraceID(byte[] value) { this.traceID = value; @@ -453,7 +428,7 @@ public void setSpanID(byte[] value) { /** * Sets kamon/context/generated/binary/span.Span.spanID. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Span withSpanID(byte[] value) { this.spanID = value; @@ -479,7 +454,7 @@ public void setParentID(byte[] value) { /** * Sets kamon/context/generated/binary/span.Span.parentID. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Span withParentID(byte[] value) { this.parentID = value; @@ -505,7 +480,7 @@ public void setSamplingDecision(byte value) { /** * Sets kamon/context/generated/binary/span.Span.samplingDecision. * @param value the replacement. - * @return {link this}. + * @return {@code this}. */ public Span withSamplingDecision(byte value) { this.samplingDecision = value; @@ -530,8 +505,8 @@ public final boolean equals(Object o) { public final boolean equals(Span o) { if (o == null) return false; if (o == this) return true; - return o.getClass() == Span.class - && java.util.Arrays.equals(this.traceID, o.traceID) + + return java.util.Arrays.equals(this.traceID, o.traceID) && java.util.Arrays.equals(this.spanID, o.spanID) && java.util.Arrays.equals(this.parentID, o.parentID) && this.samplingDecision == o.samplingDecision; diff --git a/core/kamon-core/src/main/java/kamon/jsr166/LongAdder.java b/core/kamon-core/src/main/java/kamon/jsr166/LongAdder.java deleted file mode 100644 index 4408f7ac5..000000000 --- a/core/kamon-core/src/main/java/kamon/jsr166/LongAdder.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kamon.jsr166; - -import java.io.Serializable; - -/** - * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - * - * - * One or more variables that together maintain an initially zero - * {@code long} sum. When updates (method {@link #add}) are contended - * across threads, the set of variables may grow dynamically to reduce - * contention. Method {@link #sum} (or, equivalently, {@link - * #longValue}) returns the current total combined across the - * variables maintaining the sum. - * - *

This class is usually preferable to {@link java.util.concurrent.atomic.AtomicLong} when - * multiple threads update a common sum that is used for purposes such - * as collecting statistics, not for fine-grained synchronization - * control. Under low update contention, the two classes have similar - * characteristics. But under high contention, expected throughput of - * this class is significantly higher, at the expense of higher space - * consumption. - * - *

LongAdders can be used with a {@link - * java.util.concurrent.ConcurrentHashMap} to maintain a scalable - * frequency map (a form of histogram or multiset). For example, to - * add a count to a {@code ConcurrentHashMap freqs}, - * initializing if not already present, you can use {@code - * freqs.computeIfAbsent(key, k -> new LongAdder()).increment();} - * - *

This class extends {@link Number}, but does not define - * methods such as {@code equals}, {@code hashCode} and {@code - * compareTo} because instances are expected to be mutated, and so are - * not useful as collection keys. - * - * @since 1.8 - * @author Doug Lea - */ -public class LongAdder extends Striped64 implements Serializable { - private static final long serialVersionUID = 7249069246863182397L; - - /** - * Creates a new adder with initial sum of zero. - */ - public LongAdder() { - } - - /** - * Adds the given value. - * - * @param x the value to add - */ - public void add(long x) { - Cell[] as; long b, v; int m; Cell a; - if ((as = cells) != null || !casBase(b = base, b + x)) { - boolean uncontended = true; - if (as == null || (m = as.length - 1) < 0 || - (a = as[getProbe() & m]) == null || - !(uncontended = a.cas(v = a.value, v + x))) - longAccumulate(x, null, uncontended); - } - } - - /** - * Equivalent to {@code add(1)}. - */ - public void increment() { - add(1L); - } - - /** - * Equivalent to {@code add(-1)}. - */ - public void decrement() { - add(-1L); - } - - /** - * Returns the current sum. The returned value is NOT an - * atomic snapshot; invocation in the absence of concurrent - * updates returns an accurate result, but concurrent updates that - * occur while the sum is being calculated might not be - * incorporated. - * - * @return the sum - */ - public long sum() { - Cell[] as = cells; - long sum = base; - if (as != null) { - for (Cell a : as) - if (a != null) - sum += a.value; - } - return sum; - } - - /** - * Resets variables maintaining the sum to zero. This method may - * be a useful alternative to creating a new adder, but is only - * effective if there are no concurrent updates. Because this - * method is intrinsically racy, it should only be used when it is - * known that no threads are concurrently updating. - */ - public void reset() { - Cell[] as = cells; - base = 0L; - if (as != null) { - for (Cell a : as) - if (a != null) - a.reset(); - } - } - - /** - * Equivalent in effect to {@link #sum} followed by {@link - * #reset}. This method may apply for example during quiescent - * points between multithreaded computations. If there are - * updates concurrent with this method, the returned value is - * not guaranteed to be the final value occurring before - * the reset. - * - * @return the sum - */ - public long sumThenReset() { - Cell[] as = cells; - long sum = base; - base = 0L; - if (as != null) { - for (Cell a : as) { - if (a != null) { - sum += a.value; - a.reset(); - } - } - } - return sum; - } - - - /** - * Atomic variant of {@link #sumThenReset} - * - * @return the sum - */ - public long sumAndReset() { - long sum = getAndSetBase(0L); - Cell[] as = cells; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) { - sum += a.getAndSet(0L); - } - } - } - return sum; - } - - /** - * Returns the String representation of the {@link #sum}. - * @return the String representation of the {@link #sum} - */ - public String toString() { - return Long.toString(sum()); - } - - /** - * Equivalent to {@link #sum}. - * - * @return the sum - */ - public long longValue() { - return sum(); - } - - /** - * Returns the {@link #sum} as an {@code int} after a narrowing - * primitive conversion. - */ - public int intValue() { - return (int)sum(); - } - - /** - * Returns the {@link #sum} as a {@code float} - * after a widening primitive conversion. - */ - public float floatValue() { - return (float)sum(); - } - - /** - * Returns the {@link #sum} as a {@code double} after a widening - * primitive conversion. - */ - public double doubleValue() { - return (double)sum(); - } - - /** - * Serialization proxy, used to avoid reference to the non-public - * Striped64 superclass in serialized forms. - * @serial include - */ - private static class SerializationProxy implements Serializable { - private static final long serialVersionUID = 7249069246863182397L; - - /** - * The current value returned by sum(). - * @serial - */ - private final long value; - - SerializationProxy(LongAdder a) { - value = a.sum(); - } - - /** - * Returns a {@code LongAdder} object with initial state - * held by this proxy. - * - * @return a {@code LongAdder} object with initial state - * held by this proxy - */ - private Object readResolve() { - LongAdder a = new LongAdder(); - a.base = value; - return a; - } - } - - /** - * Returns a - * - * SerializationProxy - * representing the state of this instance. - * - * @return a {@link SerializationProxy} - * representing the state of this instance - */ - private Object writeReplace() { - return new SerializationProxy(this); - } - - /** - * @param s the stream - * @throws java.io.InvalidObjectException always - */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.InvalidObjectException { - throw new java.io.InvalidObjectException("Proxy required"); - } - -} \ No newline at end of file diff --git a/core/kamon-core/src/main/java/kamon/jsr166/Striped64.java b/core/kamon-core/src/main/java/kamon/jsr166/Striped64.java deleted file mode 100644 index 60e870d0e..000000000 --- a/core/kamon-core/src/main/java/kamon/jsr166/Striped64.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Copyright 2013-2021 The Kamon Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kamon.jsr166; - -import java.util.Arrays; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.DoubleBinaryOperator; -import java.util.function.LongBinaryOperator; - -/** - * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - * - * - * A package-local class holding common representation and mechanics - * for classes supporting dynamic striping on 64bit values. The class - * extends Number so that concrete subclasses must publicly do so. - */ -@SuppressWarnings("serial") -abstract class Striped64 extends Number { - /* - * This class maintains a lazily-initialized table of atomically - * updated variables, plus an extra "base" field. The table size - * is a power of two. Indexing uses masked per-thread hash codes. - * Nearly all declarations in this class are package-private, - * accessed directly by subclasses. - * - * Table entries are of class Cell; a variant of AtomicLong padded - * (via @Contended) to reduce cache contention. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines (with a huge negative performance impact) without - * this precaution. - * - * In part because Cells are relatively large, we avoid creating - * them until they are needed. When there is no contention, all - * updates are made to the base field. Upon first contention (a - * failed CAS on base update), the table is initialized to size 2. - * The table size is doubled upon further contention until - * reaching the nearest power of two greater than or equal to the - * number of CPUS. Table slots remain empty (null) until they are - * needed. - * - * A single spinlock ("cellsBusy") is used for initializing and - * resizing the table, as well as populating slots with new Cells. - * There is no need for a blocking lock; when the lock is not - * available, threads try other slots (or the base). During these - * retries, there is increased contention and reduced locality, - * which is still better than alternatives. - * - * The Thread probe fields maintained via ThreadLocalRandom serve - * as per-thread hash codes. We let them remain uninitialized as - * zero (if they come in this way) until they contend at slot - * 0. They are then initialized to values that typically do not - * often conflict with others. Contention and/or table collisions - * are indicated by failed CASes when performing an update - * operation. Upon a collision, if the table size is less than - * the capacity, it is doubled in size unless some other thread - * holds the lock. If a hashed slot is empty, and lock is - * available, a new Cell is created. Otherwise, if the slot - * exists, a CAS is tried. Retries proceed by "double hashing", - * using a secondary hash (Marsaglia XorShift) to try to find a - * free slot. - * - * The table size is capped because, when there are more threads - * than CPUs, supposing that each thread were bound to a CPU, - * there would exist a perfect hash function mapping threads to - * slots that eliminates collisions. When we reach capacity, we - * search for this mapping by randomly varying the hash codes of - * colliding threads. Because search is random, and collisions - * only become known via CAS failures, convergence can be slow, - * and because threads are typically not bound to CPUS forever, - * may not occur at all. However, despite these limitations, - * observed contention rates are typically low in these cases. - * - * It is possible for a Cell to become unused when threads that - * once hashed to it terminate, as well as in the case where - * doubling the table causes no thread to hash to it under - * expanded mask. We do not try to detect or remove such cells, - * under the assumption that for long-running instances, observed - * contention levels will recur, so the cells will eventually be - * needed again; and for short-lived ones, it does not matter. - */ - - /** - * Padded variant of AtomicLong supporting only raw accesses plus CAS. - * - * JVM intrinsics note: It would be possible to use a release-only - * form of CAS here, if it were provided. - */ - @sun.misc.Contended static final class Cell { - volatile long value; - Cell(long x) { value = x; } - final boolean cas(long cmp, long val) { - return U.compareAndSwapLong(this, VALUE, cmp, val); - } - final void reset() { - U.putLongVolatile(this, VALUE, 0L); - } - final void reset(long identity) { - U.putLongVolatile(this, VALUE, identity); - } - - // Unsafe mechanics - private static final sun.misc.Unsafe U; - private static final long VALUE; - static { - try { - U = getUnsafe(); - VALUE = U.objectFieldOffset - (Cell.class.getDeclaredField("value")); - } catch (ReflectiveOperationException e) { - throw new Error(e); - } - } - - final long getAndSet(long val) { - return U.getAndSetLong(this, VALUE, val); - } - - } - - /** Number of CPUS, to place bound on table size */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** - * Table of cells. When non-null, size is a power of 2. - */ - transient volatile Cell[] cells; - - /** - * Base value, used mainly when there is no contention, but also as - * a fallback during table initialization races. Updated via CAS. - */ - transient volatile long base; - - /** - * Spinlock (locked via CAS) used when resizing and/or creating Cells. - */ - transient volatile int cellsBusy; - - /** - * Package-private default constructor. - */ - Striped64() { - } - - - /** - * CASes the base field. - */ - final long getAndSetBase(long val) { - return U.getAndSetLong(this, BASE, val); - } - - - /** - * CASes the base field. - */ - final boolean casBase(long cmp, long val) { - return U.compareAndSwapLong(this, BASE, cmp, val); - } - - /** - * CASes the cellsBusy field from 0 to 1 to acquire lock. - */ - final boolean casCellsBusy() { - return U.compareAndSwapInt(this, CELLSBUSY, 0, 1); - } - - /** - * Returns the probe value for the current thread. - * Duplicated from ThreadLocalRandom because of packaging restrictions. - */ - static final int getProbe() { - return U.getInt(Thread.currentThread(), PROBE); - } - - /** - * Pseudo-randomly advances and records the given probe value for the - * given thread. - * Duplicated from ThreadLocalRandom because of packaging restrictions. - */ - static final int advanceProbe(int probe) { - probe ^= probe << 13; // xorshift - probe ^= probe >>> 17; - probe ^= probe << 5; - U.putInt(Thread.currentThread(), PROBE, probe); - return probe; - } - - /** - * Handles cases of updates involving initialization, resizing, - * creating new Cells, and/or contention. See above for - * explanation. This method suffers the usual non-modularity - * problems of optimistic retry code, relying on rechecked sets of - * reads. - * - * @param x the value - * @param fn the update function, or null for add (this convention - * avoids the need for an extra field or function in LongAdder). - * @param wasUncontended false if CAS failed before call - */ - final void longAccumulate(long x, LongBinaryOperator fn, - boolean wasUncontended) { - int h; - if ((h = getProbe()) == 0) { - ThreadLocalRandom.current(); // force initialization - h = getProbe(); - wasUncontended = true; - } - boolean collide = false; // True if last slot nonempty - done: for (;;) { - Cell[] as; Cell a; int n; long v; - if ((as = cells) != null && (n = as.length) > 0) { - if ((a = as[(n - 1) & h]) == null) { - if (cellsBusy == 0) { // Try to attach new Cell - Cell r = new Cell(x); // Optimistically create - if (cellsBusy == 0 && casCellsBusy()) { - try { // Recheck under lock - Cell[] rs; int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - break done; - } - } finally { - cellsBusy = 0; - } - continue; // Slot is now non-empty - } - } - collide = false; - } - else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, - (fn == null) ? v + x : fn.applyAsLong(v, x))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - else if (cellsBusy == 0 && casCellsBusy()) { - try { - if (cells == as) // Expand table unless stale - cells = Arrays.copyOf(as, n << 1); - } finally { - cellsBusy = 0; - } - collide = false; - continue; // Retry with expanded table - } - h = advanceProbe(h); - } - else if (cellsBusy == 0 && cells == as && casCellsBusy()) { - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(x); - cells = rs; - break done; - } - } finally { - cellsBusy = 0; - } - } - // Fall back on using base - else if (casBase(v = base, - (fn == null) ? v + x : fn.applyAsLong(v, x))) - break done; - } - } - - private static long apply(DoubleBinaryOperator fn, long v, double x) { - double d = Double.longBitsToDouble(v); - d = (fn == null) ? d + x : fn.applyAsDouble(d, x); - return Double.doubleToRawLongBits(d); - } - - /** - * Same as longAccumulate, but injecting long/double conversions - * in too many places to sensibly merge with long version, given - * the low-overhead requirements of this class. So must instead be - * maintained by copy/paste/adapt. - */ - final void doubleAccumulate(double x, DoubleBinaryOperator fn, - boolean wasUncontended) { - int h; - if ((h = getProbe()) == 0) { - ThreadLocalRandom.current(); // force initialization - h = getProbe(); - wasUncontended = true; - } - boolean collide = false; // True if last slot nonempty - done: for (;;) { - Cell[] as; Cell a; int n; long v; - if ((as = cells) != null && (n = as.length) > 0) { - if ((a = as[(n - 1) & h]) == null) { - if (cellsBusy == 0) { // Try to attach new Cell - Cell r = new Cell(Double.doubleToRawLongBits(x)); - if (cellsBusy == 0 && casCellsBusy()) { - try { // Recheck under lock - Cell[] rs; int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - break done; - } - } finally { - cellsBusy = 0; - } - continue; // Slot is now non-empty - } - } - collide = false; - } - else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, apply(fn, v, x))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - else if (cellsBusy == 0 && casCellsBusy()) { - try { - if (cells == as) // Expand table unless stale - cells = Arrays.copyOf(as, n << 1); - } finally { - cellsBusy = 0; - } - collide = false; - continue; // Retry with expanded table - } - h = advanceProbe(h); - } - else if (cellsBusy == 0 && cells == as && casCellsBusy()) { - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(Double.doubleToRawLongBits(x)); - cells = rs; - break done; - } - } finally { - cellsBusy = 0; - } - } - // Fall back on using base - else if (casBase(v = base, apply(fn, v, x))) - break done; - } - } - - // Unsafe mechanics - private static final sun.misc.Unsafe U; - private static final long BASE; - private static final long CELLSBUSY; - private static final long PROBE; - static { - try { - U = getUnsafe(); - BASE = U.objectFieldOffset - (Striped64.class.getDeclaredField("base")); - CELLSBUSY = U.objectFieldOffset - (Striped64.class.getDeclaredField("cellsBusy")); - - PROBE = U.objectFieldOffset - (Thread.class.getDeclaredField("threadLocalRandomProbe")); - } catch (ReflectiveOperationException e) { - throw new Error(e); - } - } - - - /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe - */ - private static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException tryReflectionInstead) {} - try { - return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - Class k = sun.misc.Unsafe.class; - for (java.lang.reflect.Field f : k.getDeclaredFields()) { - f.setAccessible(true); - Object x = f.get(null); - if (k.isInstance(x)) - return k.cast(x); - } - throw new NoSuchFieldError("the Unsafe"); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } - } - -} \ No newline at end of file diff --git a/core/kamon-core/src/main/scala/kamon/metric/Counter.scala b/core/kamon-core/src/main/scala/kamon/metric/Counter.scala index 1cb7d1887..d9c41d58e 100644 --- a/core/kamon-core/src/main/scala/kamon/metric/Counter.scala +++ b/core/kamon-core/src/main/scala/kamon/metric/Counter.scala @@ -81,7 +81,7 @@ object Counter { extends Counter with Instrument.Snapshotting[Long] with BaseMetricAutoUpdate[Counter, Metric.Settings.ForValueInstrument, Long] { - private val _adder = new kamon.jsr166.LongAdder() + private val _adder = new java.util.concurrent.atomic.LongAdder() override def increment(): Counter = { _adder.increment() @@ -99,7 +99,7 @@ object Counter { override def snapshot(resetState: Boolean): Long = if (resetState) - _adder.sumAndReset() + _adder.sumThenReset() else _adder.sum() diff --git a/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala b/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala index 97eae1caa..26b548e6c 100644 --- a/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala +++ b/core/kamon-core/src/main/scala/kamon/trace/AdaptiveSampler.scala @@ -20,7 +20,7 @@ package trace import java.util.concurrent.ThreadLocalRandom import com.typesafe.config.Config -import kamon.jsr166.LongAdder +import java.util.concurrent.atomic.LongAdder import kamon.trace.AdaptiveSampler.{Allocation, OperationSampler, Settings} import kamon.trace.Trace.SamplingDecision import kamon.util.EWMA @@ -363,7 +363,7 @@ object AdaptiveSampler { * special logic is used to smooth the process during startup of each individual operation. */ private def decisionHistory(): Long = { - val decisions = _decisions.sumAndReset() + val decisions = _decisions.sumThenReset() _decisionsPerTickPos = if (_decisionsPerTickPos == (_decisionsHistorySize - 1)) 0 else _decisionsPerTickPos + 1 _decisionsPerTick.update(_decisionsPerTickPos, decisions) _tickCount += 1 diff --git a/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala b/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala index ac73a7364..cca0f2898 100644 --- a/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala +++ b/instrumentation/kamon-akka/src/akka-2.5/scala/kamon/instrumentation/akka/instrumentations/akka_25/DispatcherInstrumentation.scala @@ -117,13 +117,12 @@ object InstrumentNewExecutorServiceOnAkka24 { def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = { val executor = callable.call() - val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val scheduledActionName = actorSystemName + "/" + dispatcherName - val systemTags = TagSet.of("akka.system", actorSystemName) - if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { + val actorSystemName = factory.dispatcherPrerequisites.settings.name + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("akka.system", actorSystemName) val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { @@ -142,12 +141,12 @@ object InstrumentNewExecutorServiceOnAkka25 { def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = { val executor = callable.call() - val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val scheduledActionName = actorSystemName + "/" + dispatcherName - val systemTags = TagSet.of("akka.system", actorSystemName) if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { + val actorSystemName = factory.dispatcherPrerequisites.settings.name + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("akka.system", actorSystemName) val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { diff --git a/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala b/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala index 1c2166847..41d3f09c8 100644 --- a/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala +++ b/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/DispatcherInstrumentation.scala @@ -109,12 +109,12 @@ object InstrumentNewExecutorServiceOnAkka26 { @static def around(@This factory: HasDispatcherPrerequisites with HasDispatcherName, @SuperCall callable: Callable[ExecutorService]): ExecutorService = { val executor = callable.call() - val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val scheduledActionName = actorSystemName + "/" + dispatcherName - val systemTags = TagSet.of("akka.system", actorSystemName) if(Kamon.filter(AkkaInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { + val actorSystemName = factory.dispatcherPrerequisites.settings.name + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("akka.system", actorSystemName) val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext if(dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { diff --git a/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/remote/internal/AkkaPduProtobufCodecDecodeMessageMethodAdvisor.scala b/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/remote/internal/AkkaPduProtobufCodecDecodeMessageMethodAdvisor.scala index 06a8ba93b..217102c21 100644 --- a/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/remote/internal/AkkaPduProtobufCodecDecodeMessageMethodAdvisor.scala +++ b/instrumentation/kamon-akka/src/akka-2.6/scala/kamon/instrumentation/akka/instrumentations/akka_26/remote/internal/AkkaPduProtobufCodecDecodeMessageMethodAdvisor.scala @@ -20,7 +20,7 @@ object AkkaPduProtobufCodecDecodeMessage { @OnMethodEnter @static def enter(@Argument(0) bs: ByteString, @Argument(1) provider: RemoteActorRefProvider, @Argument(2) localAddress: Address): Unit = { - val ackAndEnvelope = AckAndContextAwareEnvelopeContainer.parseFrom(bs.toArray) + val ackAndEnvelope = AckAndContextAwareEnvelopeContainer.parseFrom(bs.toArrayUnsafe()) if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { val remoteCtx = ackAndEnvelope.getEnvelope.getTraceContext diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala index 9ddf9c379..613a64ef8 100644 --- a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala +++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala @@ -91,7 +91,8 @@ class MessageSpec } override val container: MockServerContainer = MockServerContainer() - lazy val clientExpectation: MockServerExpectations = new MockServerExpectations("localhost", container.serverPort) + lazy val clientExpectation: MockServerExpectations = + new MockServerExpectations(container.container.getHost, container.serverPort) override protected def beforeAll(): Unit = { super.beforeAll() diff --git a/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpRequestSpec.scala b/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpRequestSpec.scala index 729d65e58..3a221deb6 100644 --- a/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpRequestSpec.scala +++ b/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpRequestSpec.scala @@ -161,7 +161,8 @@ class HttpRequestSpec } override val container: MockServerContainer = MockServerContainer() - lazy val clientExpectation: MockServerExpectations = new MockServerExpectations("localhost", container.serverPort) + lazy val clientExpectation: MockServerExpectations = + new MockServerExpectations(container.container.getHost, container.serverPort) override protected def beforeAll(): Unit = { super.beforeAll() diff --git a/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpUriRequestSpec.scala b/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpUriRequestSpec.scala index 1eadbed25..a0daa250d 100644 --- a/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpUriRequestSpec.scala +++ b/instrumentation/kamon-apache-httpclient/src/test/scala/kamon/instrumentation/apache/httpclient/HttpUriRequestSpec.scala @@ -156,7 +156,8 @@ class HttpUriRequestSpec } override val container: MockServerContainer = MockServerContainer() - lazy val clientExpectation: MockServerExpectations = new MockServerExpectations("localhost", container.serverPort) + lazy val clientExpectation: MockServerExpectations = + new MockServerExpectations(container.container.getHost, container.serverPort) override protected def beforeAll(): Unit = { super.beforeAll() diff --git a/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala b/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala index 247ef207e..1c53abb49 100644 --- a/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala +++ b/instrumentation/kamon-cassandra/src/testCas3/scala/kamon/instrumentation/instrumentation/CassandraClientTracingInstrumentationSpec.scala @@ -77,24 +77,6 @@ class CassandraClientTracingInstrumentationSpec } } - "not swallow exceptions" in { - val query = QueryBuilder - .select("name") - .from("illegaltablename") - .where(QueryBuilder.eq("name", "kamon")) - .allowFiltering() - .setFetchSize(5) - - assertThrows[DriverException] { - session.execute(query) - } - - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan() - span should not be empty - } - } - "trace individual page executions" in { val query = QueryBuilder .select("name") @@ -118,6 +100,24 @@ class CassandraClientTracingInstrumentationSpec clientSpan.get.tags.get(plainBoolean("cassandra.driver.rs.has-more")) shouldBe true } } + + "not swallow exceptions" in { + val query = QueryBuilder + .select("name") + .from("illegaltablename") + .where(QueryBuilder.eq("name", "kamon")) + .allowFiltering() + .setFetchSize(5) + + assertThrows[DriverException] { + session.execute(query) + } + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan() + span should not be empty + } + } } var session: Session = _ diff --git a/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala b/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala index 82e80ecc7..5a1a45eb8 100644 --- a/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala +++ b/instrumentation/kamon-executors/src/main/scala/kamon/instrumentation/executor/ExecutorInstrumentation.scala @@ -29,9 +29,9 @@ import java.util.concurrent.{ TimeUnit, ForkJoinPool => JavaForkJoinPool } +import java.util.concurrent.atomic.LongAdder import com.typesafe.config.Config import kamon.Kamon -import kamon.jsr166.LongAdder import kamon.metric.Counter import kamon.module.ScheduledAction import kamon.tag.TagSet diff --git a/instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf b/instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf new file mode 100644 index 000000000..ff30765f8 --- /dev/null +++ b/instrumentation/kamon-pekko-connectors-kafka/src/main/resources/reference.conf @@ -0,0 +1,22 @@ +# ==================================================== # +# Kamon Pekko Connectors Kafka Reference Configuration # +# ==================================================== # + +kanela { + modules { + pekko-connectors-kafka { + + name = "Apache Pekko Connectors Kafka Instrumentation" + description = "PREVIEW. Provides context propagation for Apache Pekko Connectors Kafka applications" + instrumentations = [ + "kamon.instrumentation.pekko.connectors.kafka.ProducerMessageInstrumentation" + ] + + within = [ + "org.apache.pekko.kafka.ProducerMessage\\$Message", + "org.apache.pekko.kafka.ProducerMessage\\$MultiMessage", + "org.apache.pekko.kafka.internal.DefaultProducerStageLogic" + ] + } + } +} diff --git a/instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala b/instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala new file mode 100644 index 000000000..a406d728c --- /dev/null +++ b/instrumentation/kamon-pekko-connectors-kafka/src/main/scala/kamon/instrumentation/pekko/connectors/kafka/ProducerMessageInstrumentation.scala @@ -0,0 +1,59 @@ +/* + * ========================================================================================== + * Copyright © 2013-2022 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================== + */ + +package kamon +package instrumentation +package pekko +package connectors +package kafka + +import kamon.Kamon +import kamon.context.Storage +import kamon.context.Storage.Scope +import kamon.instrumentation.context.HasContext +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +class ProducerMessageInstrumentation extends InstrumentationBuilder { + + /** + * Captures the current context the a Message or MultiMessage is created and restores it while + * the ProducerLogic is running, so the proper context gets propagated to the Kafka Producer. + */ + onTypes("org.apache.pekko.kafka.ProducerMessage$Message", "org.apache.pekko.kafka.ProducerMessage$MultiMessage") + .mixin(classOf[HasContext.MixinWithInitializer]) + + onTypes( + "org.apache.pekko.kafka.internal.DefaultProducerStageLogic", + "org.apache.pekko.kafka.internal.CommittingProducerSinkStageLogic" + ) + .advise(method("produce"), ProduceWithEnvelopeContext) +} + +object ProduceWithEnvelopeContext { + + @Advice.OnMethodEnter + def enter(@Advice.Argument(0) envelope: Any): Storage.Scope = { + envelope match { + case hasContext: HasContext => Kamon.storeContext(hasContext.context) + case _ => Scope.Empty + } + } + + @Advice.OnMethodExit(onThrowable = classOf[Throwable]) + def exit(@Advice.Enter scope: Storage.Scope): Unit = + scope.close() +} diff --git a/instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala b/instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala index 473184aa2..89606e73f 100644 --- a/instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala +++ b/instrumentation/kamon-pekko-http/src/main/scala/kamon/instrumentation/pekko/http/PekkoHttpServerInstrumentation.scala @@ -55,9 +55,10 @@ class PekkoHttpServerInstrumentation extends InstrumentationBuilder { * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free * of variables) and take a Sampling Decision in case none has been taken so far. */ - onType("org.apache.pekko.http.scaladsl.server.RequestContextImpl") - .mixin(classOf[HasMatchingContext.Mixin]) - .intercept(method("copy"), RequestContextCopyInterceptor) + Try(Class.forName("org.apache.pekko.http.scaladsl.server.RequestContext")).toOption + .foldLeft( + onType("org.apache.pekko.http.scaladsl.server.RequestContextImpl").mixin(classOf[HasMatchingContext.Mixin]) + )((advice, klass) => advice.intercept(withReturnTypes(klass), RequestContextCopyInterceptor)) onType("org.apache.pekko.http.scaladsl.server.directives.PathDirectives") .intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor]) @@ -262,7 +263,7 @@ object RequestContextCopyInterceptor { @RuntimeType def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = { val copiedRequestContext = copyCall.call() - copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext( + if (copiedRequestContext ne context) copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext( context.asInstanceOf[HasMatchingContext].matchingContext ) copiedRequestContext diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/instrumentations/DispatcherInstrumentation.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/instrumentations/DispatcherInstrumentation.scala index 31eee18c5..4607473e3 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/instrumentations/DispatcherInstrumentation.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/instrumentations/DispatcherInstrumentation.scala @@ -127,13 +127,18 @@ object InstrumentNewExecutorServiceOnPekko { @SuperCall callable: Callable[ExecutorService] ): ExecutorService = { val executor = callable.call() - val actorSystemName = factory.dispatcherPrerequisites.settings.name val dispatcherName = factory.dispatcherName - val scheduledActionName = actorSystemName + "/" + dispatcherName - val systemTags = TagSet.of("pekko.system", actorSystemName) if (Kamon.filter(PekkoInstrumentation.TrackDispatcherFilterName).accept(dispatcherName)) { - val defaultEcOption = factory.dispatcherPrerequisites.defaultExecutionContext + val actorSystemName = if (factory.dispatcherPrerequisites != null) { + factory.dispatcherPrerequisites.settings.name + } else { + "unknown" + } + val scheduledActionName = actorSystemName + "/" + dispatcherName + val systemTags = TagSet.of("pekko.system", actorSystemName) + val defaultEcOption = Option(factory.dispatcherPrerequisites) + .flatMap(_.defaultExecutionContext) if (dispatcherName == Dispatchers.DefaultDispatcherId && defaultEcOption.isDefined) { ExecutorInstrumentation.instrumentExecutionContext( diff --git a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/remote/internal/PekkoPduProtobufCodecDecodeMessageMethodAdvisor.scala b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/remote/internal/PekkoPduProtobufCodecDecodeMessageMethodAdvisor.scala index 10f28b66c..232c88ce7 100644 --- a/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/remote/internal/PekkoPduProtobufCodecDecodeMessageMethodAdvisor.scala +++ b/instrumentation/kamon-pekko/src/main/scala/kamon/instrumentation/pekko/remote/internal/PekkoPduProtobufCodecDecodeMessageMethodAdvisor.scala @@ -24,7 +24,7 @@ object PekkoPduProtobufCodecDecodeMessage { @Argument(1) provider: RemoteActorRefProvider, @Argument(2) localAddress: Address ): Unit = { - val ackAndEnvelope = AckAndContextAwareEnvelopeContainer.parseFrom(bs.toArray) + val ackAndEnvelope = AckAndContextAwareEnvelopeContainer.parseFrom(bs.toArrayUnsafe()) if (ackAndEnvelope.hasEnvelope && ackAndEnvelope.getEnvelope.hasTraceContext) { val remoteCtx = ackAndEnvelope.getEnvelope.getTraceContext diff --git a/reporters/kamon-datadog/src/main/resources/reference.conf b/reporters/kamon-datadog/src/main/resources/reference.conf index 0b2f2ccff..c61d2fa5b 100644 --- a/reporters/kamon-datadog/src/main/resources/reference.conf +++ b/reporters/kamon-datadog/src/main/resources/reference.conf @@ -40,6 +40,14 @@ kamon { connect-timeout = 5 seconds read-timeout = 5 seconds write-timeout = 5 seconds + + # Try this number of times to submit metrics to the Datadog API. + # Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again. + # A `0` value disables retries. + retries = 3 + + # The initial retry delay that gets exponentially increased after each retry attempt. + init-retry-delay = 500 milliseconds } # @@ -65,8 +73,23 @@ kamon { # Use 'Deflate' compression when posting to the Datadog API compression = false + + # Try this number of times to submit metrics to the Datadog API. + # Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again. + # A `0` value disables retries. + retries = 3 + + # The initial retry delay that gets exponentially increased after each retry attempt. + init-retry-delay = 500 milliseconds } + # The log level in which to log failures to submit metrics. + failure-log-level = "error" + + # For histograms, which percentiles to submit. + # Each value configured here will correspond to a different custom metric submitted to Datadog. + # Currently only applicable to the API reporter. + percentiles = [95.0] # All time values are collected in nanoseconds, # to scale before sending to datadog set "time-units" to "s" or "ms" or "µs". diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala index 4c07b0899..598ed6cd8 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/DatadogAPIReporter.scala @@ -21,17 +21,17 @@ import java.nio.charset.StandardCharsets import java.text.{DecimalFormat, DecimalFormatSymbols} import java.time.Duration import java.util.Locale - import com.typesafe.config.Config import kamon.metric.MeasurementUnit.Dimension.{Information, Time} import kamon.metric.{MeasurementUnit, MetricSnapshot, PeriodSnapshot} import kamon.tag.{Tag, TagSet} import kamon.util.{EnvironmentTags, Filter} -import kamon.{module, Kamon} +import kamon.{Kamon, module} import kamon.datadog.DatadogAPIReporter.Configuration import kamon.module.{MetricReporter, ModuleFactory} import org.slf4j.LoggerFactory - +import org.slf4j.event.Level +import scala.collection.JavaConverters._ import scala.util.{Failure, Success} class DatadogAPIReporterFactory extends ModuleFactory { @@ -68,7 +68,7 @@ class DatadogAPIReporter( override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = { httpClient.doPost("application/json; charset=utf-8", buildRequestBody(snapshot)) match { case Failure(e) => - logger.error(e.getMessage) + logger.logAtLevel(configuration.failureLogLevel, e.getMessage) case Success(response) => logger.trace(response) } @@ -81,6 +81,12 @@ class DatadogAPIReporter( val interval = Math.round(Duration.between(snapshot.from, snapshot.to).toMillis() / 1000d) val seriesBuilder = new StringBuilder() + @inline + def doubleToPercentileString(double: Double) = { + if (double == double.toLong) f"${double.toLong}%d" + else f"$double%s" + } + def addDistribution(metric: MetricSnapshot.Distributions): Unit = { val unit = metric.settings.unit metric.instruments.foreach { d => @@ -90,12 +96,14 @@ class DatadogAPIReporter( addMetric(metric.name + ".avg", valueFormat.format(scale(average, unit)), gauge, d.tags) addMetric(metric.name + ".count", valueFormat.format(dist.count), count, d.tags) addMetric(metric.name + ".median", valueFormat.format(scale(dist.percentile(50d).value, unit)), gauge, d.tags) - addMetric( - metric.name + ".95percentile", - valueFormat.format(scale(dist.percentile(95d).value, unit)), - gauge, - d.tags - ) + configuration.percentiles.foreach { p => + addMetric( + metric.name + s".${doubleToPercentileString(p)}percentile", + valueFormat.format(scale(dist.percentile(p).value, unit)), + gauge, + d.tags + ) + } addMetric(metric.name + ".max", valueFormat.format(scale(dist.max, unit)), gauge, d.tags) addMetric(metric.name + ".min", valueFormat.format(scale(dist.min, unit)), gauge, d.tags) } @@ -163,10 +171,12 @@ private object DatadogAPIReporter { case class Configuration( httpConfig: Config, + percentiles: Set[Double], timeUnit: MeasurementUnit, informationUnit: MeasurementUnit, extraTags: Seq[(String, String)], - tagFilter: Filter + tagFilter: Filter, + failureLogLevel: Level ) implicit class QuoteInterp(val sc: StringContext) extends AnyVal { @@ -175,15 +185,23 @@ private object DatadogAPIReporter { def readConfiguration(config: Config): Configuration = { val datadogConfig = config.getConfig("kamon.datadog") + + // Remove the "host" tag since it gets added to the datadog payload separately + val extraTags = EnvironmentTags + .from(Kamon.environment, datadogConfig.getConfig("environment-tags")) + .without("host") + .all() + .map(p => p.key -> Tag.unwrapValue(p).toString) + Configuration( datadogConfig.getConfig("api"), + percentiles = datadogConfig.getDoubleList("percentiles").asScala.toList.map(_.toDouble).toSet, timeUnit = readTimeUnit(datadogConfig.getString("time-unit")), informationUnit = readInformationUnit(datadogConfig.getString("information-unit")), // Remove the "host" tag since it gets added to the datadog payload separately - EnvironmentTags.from(Kamon.environment, datadogConfig.getConfig("environment-tags")).without("host").all().map( - p => p.key -> Tag.unwrapValue(p).toString - ), - Kamon.filter("kamon.datadog.environment-tags.filter") + extraTags = extraTags, + tagFilter = Kamon.filter("kamon.datadog.environment-tags.filter"), + failureLogLevel = readLogLevel(datadogConfig.getString("failure-log-level")) ) } } diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala index 9f1c2ca7a..5b2289c7b 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala @@ -19,12 +19,14 @@ package kamon import java.nio.charset.StandardCharsets import java.time.{Duration, Instant} import java.util.concurrent.TimeUnit - import com.typesafe.config.Config import kamon.metric.MeasurementUnit import kamon.metric.MeasurementUnit.{information, time} import okhttp3._ +import org.slf4j.Logger +import org.slf4j.event.Level +import scala.annotation.tailrec import scala.util.{Failure, Success, Try} package object datadog { @@ -36,6 +38,23 @@ package object datadog { } } + implicit class LoggerExtras(val logger: Logger) extends AnyVal { + def logAtLevel(level: Level, msg: String): Unit = { + level match { + case Level.TRACE => + logger.trace(msg) + case Level.DEBUG => + logger.debug(msg) + case Level.INFO => + logger.info(msg) + case Level.WARN => + logger.warn(msg) + case Level.ERROR => + logger.error(msg) + } + } + } + private[datadog] case class HttpClient( apiUrl: String, apiKey: Option[String], @@ -43,10 +62,13 @@ package object datadog { usingAgent: Boolean, connectTimeout: Duration, readTimeout: Duration, - writeTimeout: Duration + writeTimeout: Duration, + retries: Int, + initRetryDelay: Duration ) { - val httpClient: OkHttpClient = createHttpClient() + private val httpClient: OkHttpClient = createHttpClient() + private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504) def this(config: Config, usingAgent: Boolean) = { this( @@ -56,12 +78,33 @@ package object datadog { usingAgent, config.getDuration("connect-timeout"), config.getDuration("read-timeout"), - config.getDuration("write-timeout") + config.getDuration("write-timeout"), + config.getInt("retries"), + config.getDuration("init-retry-delay") ) } - private def doRequest(request: Request): Try[Response] = { - Try(httpClient.newCall(request).execute()) + @tailrec + private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = { + // Try executing the request + val responseAttempt = Try(httpClient.newCall(request).execute()) + + if (attempt >= retries - 1) { + responseAttempt + } else { + responseAttempt match { + // If the request succeeded but with a retryable HTTP status code. + case Success(response) if retryableStatusCodes.contains(response.code) => + response.close() + Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong) + doRequestWithRetries(request, attempt + 1) + + // Either the request succeeded with an HTTP status not included in `retryableStatusCodes` + // or we have an unknown failure + case _ => + responseAttempt + } + } } def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = { @@ -69,7 +112,7 @@ package object datadog { val url = apiUrl + apiKey.map(key => "?api_key=" + key).getOrElse("") val request = new Request.Builder().url(url).method(method, body).build - doRequest(request) match { + doRequestWithRetries(request) match { case Success(response) => val responseBody = response.body().string() response.close() @@ -133,4 +176,14 @@ package object datadog { case "gb" => information.gigabytes case other => sys.error(s"Invalid time unit setting [$other], the possible values are [b, kb, mb, gb]") } + + def readLogLevel(level: String): Level = level match { + case "trace" => Level.TRACE + case "debug" => Level.DEBUG + case "info" => Level.INFO + case "warn" => Level.WARN + case "error" => Level.ERROR + case other => + sys.error(s"Invalid log level setting [$other], the possible values are [trace, debug, info, warn, error]") + } } diff --git a/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala b/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala index 8905a86fc..b5741e6df 100644 --- a/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala +++ b/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala @@ -22,6 +22,10 @@ abstract class AbstractHttpReporter extends AnyWordSpec with BeforeAndAfterAll { server.url(path).toString } + protected def mockResponse(response: MockResponse): Unit = { + server.enqueue(response) + } + override protected def afterAll(): Unit = { super.afterAll() server.shutdown() diff --git a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala index 8676bb6c8..39ffbd20f 100644 --- a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala +++ b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala @@ -18,9 +18,89 @@ import scala.concurrent.ExecutionContext class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Reconfigure { "the DatadogAPIReporter" should { + val reporter = new DatadogAPIReporterFactory().create(ModuleFactory.Settings(Kamon.config(), ExecutionContext.global)) val now = Instant.ofEpochMilli(1523395554) + val examplePeriod = PeriodSnapshot.apply( + now.minusMillis(1000), + now, + MetricSnapshot.ofValues[Long]( + "test.counter", + "test", + Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), + Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil + ) :: Nil, + Nil, + Nil, + Nil, + Nil + ) + + "handle retries on retriable HTTP status codes" in { + val baseUrl = mockResponse("/test", new MockResponse().setResponseCode(429)) + applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"") + applyConfig("kamon.datadog.api.api-key = \"dummy\"") + applyConfig("kamon.datadog.api.compression = false") + applyConfig("kamon.datadog.api.init-retry-delay = 100 milliseconds") + applyConfig("kamon.datadog.api.retries = 0") + reporter.reconfigure(Kamon.config()) + + reporter.reportPeriodSnapshot(examplePeriod) + + server.getRequestCount shouldEqual 1 + server.takeRequest() + + applyConfig("kamon.datadog.api.retries = 3") + reporter.reconfigure(Kamon.config()) + + mockResponse(new MockResponse().setResponseCode(429)) + mockResponse(new MockResponse().setResponseCode(503)) + mockResponse(new MockResponse().setResponseCode(504)) + reporter.reportPeriodSnapshot(examplePeriod) + Thread.sleep(1000) + server.takeRequest() + server.takeRequest() + server.takeRequest() + server.getRequestCount shouldEqual 4 + } + + val examplePeriodWithDistributions: PeriodSnapshot = { + val distributionExample = new Distribution { + override def dynamicRange: DynamicRange = ??? + override def min: Long = 0 + override def max: Long = 10 + override def sum: Long = 100 + override def count: Long = 5 + override def percentile(rank: Double): Distribution.Percentile = new Percentile { + override def rank: Double = 0 + override def value: Long = 0 + override def countAtRank: Long = 0 + } + override def percentiles: Seq[Distribution.Percentile] = ??? + override def percentilesIterator: Iterator[Distribution.Percentile] = ??? + override def buckets: Seq[Distribution.Bucket] = ??? + override def bucketsIterator: Iterator[Distribution.Bucket] = ??? + } + PeriodSnapshot.apply( + now.minusMillis(1000), + now, + Nil, + Nil, + Nil, + MetricSnapshot.ofDistributions( + "test.timer", + "test", + Metric.Settings.ForDistributionInstrument( + MeasurementUnit.none, + java.time.Duration.ZERO, + DynamicRange.Default + ), + Instrument.Snapshot.apply(TagSet.Empty, distributionExample) :: Nil + ) :: Nil, + Nil + ) + } "sends metrics - compressed" in { val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK")) @@ -29,22 +109,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec applyConfig("kamon.datadog.api.compression = true") reporter.reconfigure(Kamon.config()) - reporter.reportPeriodSnapshot( - PeriodSnapshot.apply( - now.minusMillis(1000), - now, - MetricSnapshot.ofValues[Long]( - "test.counter", - "test", - Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), - Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil - ) :: Nil, - Nil, - Nil, - Nil, - Nil - ) - ) + reporter.reportPeriodSnapshot(examplePeriod) val request = server.takeRequest() @@ -64,22 +129,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec applyConfig("kamon.datadog.api.compression = false") reporter.reconfigure(Kamon.config()) - reporter.reportPeriodSnapshot( - PeriodSnapshot.apply( - now.minusMillis(1000), - now, - MetricSnapshot.ofValues[Long]( - "test.counter", - "test", - Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), - Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil - ) :: Nil, - Nil, - Nil, - Nil, - Nil - ) - ) + reporter.reportPeriodSnapshot(examplePeriod) val request = server.takeRequest() request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy" request.getMethod shouldEqual "POST" @@ -90,50 +140,38 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec } - "send timer metrics" in { + "send timer metrics with the p95 percentile by default" in { val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK")) applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"") applyConfig("kamon.datadog.api.api-key = \"dummy\"") applyConfig("kamon.datadog.api.compression = false") reporter.reconfigure(Kamon.config()) - val distribution = new Distribution { - override def dynamicRange: DynamicRange = ??? - override def min: Long = 0 - override def max: Long = 10 - override def sum: Long = 100 - override def count: Long = 5 - override def percentile(rank: Double): Distribution.Percentile = new Percentile { - override def rank: Double = 0 - override def value: Long = 0 - override def countAtRank: Long = 0 - } - override def percentiles: Seq[Distribution.Percentile] = ??? - override def percentilesIterator: Iterator[Distribution.Percentile] = ??? - override def buckets: Seq[Distribution.Bucket] = ??? - override def bucketsIterator: Iterator[Distribution.Bucket] = ??? - } - - reporter.reportPeriodSnapshot( - PeriodSnapshot.apply( - now.minusMillis(1000), - now, - Nil, - Nil, - Nil, - MetricSnapshot.ofDistributions( - "test.timer", - "test", - Metric.Settings.ForDistributionInstrument( - MeasurementUnit.none, - java.time.Duration.ZERO, - DynamicRange.Default - ), - Instrument.Snapshot.apply(TagSet.Empty, distribution) :: Nil - ) :: Nil, - Nil + reporter.reportPeriodSnapshot(examplePeriodWithDistributions) + val request = server.takeRequest() + request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy" + request.getMethod shouldEqual "POST" + Json.parse(request.getBody.readUtf8()) shouldEqual Json + .parse( + """{"series":[ + |{"metric":"test.timer.avg","interval":1,"points":[[1523394,20]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.count","interval":1,"points":[[1523394,5]],"type":"count","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.median","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.95percentile","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.max","interval":1,"points":[[1523394,10]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.min","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}]}""".stripMargin ) - ) + } + + "send timer metrics allowing configuration of percentiles to submit" in { + val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK")) + applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"") + applyConfig("kamon.datadog.api.api-key = \"dummy\"") + applyConfig("kamon.datadog.api.compression = false") + applyConfig("kamon.datadog.percentiles = [95.0, 99, 94.5]") + reporter.reconfigure(Kamon.config()) + + reporter.reportPeriodSnapshot(examplePeriodWithDistributions) val request = server.takeRequest() request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy" request.getMethod shouldEqual "POST" @@ -144,6 +182,31 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec |{"metric":"test.timer.count","interval":1,"points":[[1523394,5]],"type":"count","host":"test","tags":["env:staging","service:kamon-application"]}, |{"metric":"test.timer.median","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, |{"metric":"test.timer.95percentile","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.99percentile","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.94.5percentile","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.max","interval":1,"points":[[1523394,10]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.min","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}]}""".stripMargin + ) + } + + "send timer metrics without percentiles" in { + val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK")) + applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"") + applyConfig("kamon.datadog.api.api-key = \"dummy\"") + applyConfig("kamon.datadog.api.compression = false") + applyConfig("kamon.datadog.percentiles = []") + reporter.reconfigure(Kamon.config()) + + reporter.reportPeriodSnapshot(examplePeriodWithDistributions) + val request = server.takeRequest() + request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy" + request.getMethod shouldEqual "POST" + Json.parse(request.getBody.readUtf8()) shouldEqual Json + .parse( + """{"series":[ + |{"metric":"test.timer.avg","interval":1,"points":[[1523394,20]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.count","interval":1,"points":[[1523394,5]],"type":"count","host":"test","tags":["env:staging","service:kamon-application"]}, + |{"metric":"test.timer.median","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, |{"metric":"test.timer.max","interval":1,"points":[[1523394,10]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}, |{"metric":"test.timer.min","interval":1,"points":[[1523394,0]],"type":"gauge","host":"test","tags":["env:staging","service:kamon-application"]}]}""".stripMargin )