Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring event logs #821

Merged
merged 40 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
cfd675e
WIP http.idl
attilakreiner Feb 28, 2024
b49ded8
WIP kafka.idl
attilakreiner Feb 28, 2024
1c31b3a
WIP tcp.idl
attilakreiner Feb 28, 2024
b7955f1
WIP tls.idl
attilakreiner Feb 28, 2024
68190b3
WIP jwt.idl
attilakreiner Feb 28, 2024
be0a8ce
WIP sch_reg.idl
attilakreiner Feb 28, 2024
924a660
fix http originId
attilakreiner Feb 29, 2024
1d648ef
WIP HttpEventFormatter
attilakreiner Feb 29, 2024
aac1a96
WIP KafkaEventFormatter
attilakreiner Feb 29, 2024
c374119
WIP TcpEventFormatter
attilakreiner Feb 29, 2024
d12a0f7
WIP TlsEventFormatter
attilakreiner Feb 29, 2024
6cc773e
WIP JwtEventFormatter
attilakreiner Feb 29, 2024
77aa0ac
WIP SchRegEventFormatter
attilakreiner Feb 29, 2024
2aea525
fix http routedId
attilakreiner Mar 1, 2024
5523659
fix kafka identity
attilakreiner Mar 1, 2024
d6a126a
WIP EventFormatterSpi
attilakreiner Mar 1, 2024
e622e8b
WIP EventFormatter
attilakreiner Mar 1, 2024
30768c4
WIP EventFormatterFactorySpi
attilakreiner Mar 1, 2024
d97fecb
cleanup
attilakreiner Mar 4, 2024
49229a6
WIP testing
attilakreiner Mar 4, 2024
c72f6c5
fix engine test
attilakreiner Mar 4, 2024
36e5fe6
WIP http EventIT
attilakreiner Mar 4, 2024
d2e092c
fix engine
attilakreiner Mar 4, 2024
76c134d
WIP kafka EventIT
attilakreiner Mar 4, 2024
d4c9f2d
rm HttpEventFormatterTest
attilakreiner Mar 4, 2024
fcf8987
rm KafkaEventFormatterTest
attilakreiner Mar 4, 2024
0c30d4f
WIP tcp EventIT
attilakreiner Mar 4, 2024
2269d2c
WIP tls EventIT
attilakreiner Mar 4, 2024
ecdb38e
fix
attilakreiner Mar 4, 2024
d6b4ea2
WIP stdexp EventIT
attilakreiner Mar 4, 2024
2bd852e
fix stdexp cleanup
attilakreiner Mar 5, 2024
4ded5e6
WIP schreg EventIT
attilakreiner Mar 5, 2024
fd97e79
WIP jwt EventIT
attilakreiner Mar 5, 2024
ea473a8
fix engine.spec
attilakreiner Mar 5, 2024
548d48e
WIP tls IT
attilakreiner Mar 5, 2024
eed324c
cleanup
attilakreiner Mar 5, 2024
b2d3126
fix authorization
attilakreiner Mar 5, 2024
20e94b1
fix tls
attilakreiner Mar 5, 2024
a498bf1
fix stdexp
attilakreiner Mar 5, 2024
c9828f0
Merge branch 'develop' into event-ref
attilakreiner Mar 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ scope schema_registry
REMOTE_ACCESS_REJECTED (1)
}

struct SchemaRegistryRemoteAccessRejected extends core::event::Event
struct SchemaRegistryRemoteAccessRejectedEx extends core::stream::Extension
{
string8 method;
string16 url;
int16 status;
}

union SchemaRegistryEvent switch (SchemaRegistryEventType)
union SchemaRegistryEventEx switch (SchemaRegistryEventType)
{
case REMOTE_ACCESS_REJECTED: SchemaRegistryRemoteAccessRejected remoteAccessRejected;
case REMOTE_ACCESS_REJECTED: SchemaRegistryRemoteAccessRejectedEx remoteAccessRejected;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@
name: test
telemetry:
exporters:
stdout0:
type: stdout
exporter0:
type: test
options:
events:
- qname: test.catalog0
message: REMOTE_ACCESS_REJECTED GET http://localhost:8081/schemas/ids/0 0
catalogs:
catalog0:
type: schema-registry
options:
url: http://localhost:8081
bindings:
app0:
type: kafka
kind: client
exit: net0
net0:
type: test
kind: server
options:
catalogs:
- catalog0
exit: app0
2 changes: 1 addition & 1 deletion incubator/catalog-schema-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.80</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.90</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,28 @@
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventType.REMOTE_ACCESS_REJECTED;

import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventFW;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventExFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;

public class SchemaRegistryEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 1024;

private final SchemaRegistryEventFW.Builder schemaRegistryEventRW = new SchemaRegistryEventFW.Builder();
private final MutableDirectBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final EventFW.Builder eventRW = new EventFW.Builder();
private final SchemaRegistryEventExFW.Builder schemaRegistryEventExRW = new SchemaRegistryEventExFW.Builder();
private final int schemaRegistryTypeId;
private final MessageConsumer eventWriter;
private final Clock clock;
Expand All @@ -48,17 +53,22 @@ public void remoteAccessRejected(
HttpRequest httpRequest,
int status)
{
SchemaRegistryEventFW event = schemaRegistryEventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
SchemaRegistryEventExFW extension = schemaRegistryEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.remoteAccessRejected(e -> e
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
.typeId(REMOTE_ACCESS_REJECTED.value())
.method(httpRequest.method())
.url(httpRequest.uri().toString())
.status((short) status)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(schemaRegistryTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.StringFW;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventExFW;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryRemoteAccessRejectedExFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

public final class SchemaRegistryEventFormatter implements EventFormatterSpi
{
private static final String REMOTE_ACCESS_REJECTED = "REMOTE_ACCESS_REJECTED %s %s %d";

private final EventFW eventRO = new EventFW();
private final SchemaRegistryEventExFW schemaRegistryEventExRO = new SchemaRegistryEventExFW();

SchemaRegistryEventFormatter(
Configuration config)
{
}

public String format(
DirectBuffer buffer,
int index,
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
final SchemaRegistryEventExFW extension = schemaRegistryEventExRO
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());
String result = null;
switch (extension.kind())
{
case REMOTE_ACCESS_REJECTED:
{
SchemaRegistryRemoteAccessRejectedExFW ex = extension.remoteAccessRejected();
result = String.format(REMOTE_ACCESS_REJECTED, asString(ex.method()), asString(ex.url()),
ex.status());
break;
}
}
return result;
}

private static String asString(
StringFW stringFW)
{
String s = stringFW.asString();
return s == null ? "" : s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi;

public final class SchemaRegistryEventFormatterFactory implements EventFormatterFactorySpi
{
@Override
public SchemaRegistryEventFormatter create(
Configuration config)
{
return new SchemaRegistryEventFormatter(config);
}

@Override
public String type()
{
return SchemaRegistryCatalog.NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@

provides io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi
with io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfigAdapter;

provides io.aklivity.zilla.runtime.engine.event.EventFormatterFactorySpi
with io.aklivity.zilla.runtime.catalog.schema.registry.internal.SchemaRegistryEventFormatterFactory;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.aklivity.zilla.runtime.catalog.schema.registry.internal.SchemaRegistryEventFormatterFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.rules.RuleChain.outerRule;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.DisableOnDebug;
import org.junit.rules.TestRule;
import org.junit.rules.Timeout;
import org.kaazing.k3po.junit.annotation.Specification;
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.engine.test.EngineRule;
import io.aklivity.zilla.runtime.engine.test.annotation.Configuration;

public class EventIT
{
private final K3poRule k3po = new K3poRule()
.addScriptRoot("net", "io/aklivity/zilla/specs/engine/streams/network")
.addScriptRoot("app", "io/aklivity/zilla/specs/engine/streams/application");

private final TestRule timeout = new DisableOnDebug(new Timeout(10, SECONDS));

private final EngineRule engine = new EngineRule()
.directory("target/zilla-itests")
.countersBufferCapacity(4096)
.configurationRoot("io/aklivity/zilla/specs/catalog/schema/registry/config")
.external("app0")
.clean();

@Rule
public final TestRule chain = outerRule(engine).around(k3po).around(timeout);

@Test
@Configuration("event.yaml")
@Specification({
"${net}/event/client",
"${app}/event/server"
})
public void shouldLogEvents() throws Exception
{
k3po.finish();
}
}
12 changes: 0 additions & 12 deletions incubator/exporter-stdout.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@
<artifactId>engine.spec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>binding-http.spec</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>binding-proxy.spec</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down

This file was deleted.

This file was deleted.

Loading