Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-13724 Add additional metadata to Registry events #9239

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,59 @@
import org.apache.nifi.registry.hook.EventType;
import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;

import java.util.Objects;

/**
* Factory to create Events from domain objects.
*/
public class EventFactory {

public static Event bucketCreated(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return bucketEvent(bucket, EventType.CREATE_BUCKET);
}

public static Event bucketUpdated(final Bucket bucket) {
return new StandardEvent.Builder()
.eventType(EventType.UPDATE_BUCKET)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return bucketEvent(bucket, EventType.UPDATE_BUCKET);
}

public static Event bucketDeleted(final Bucket bucket) {
return bucketEvent(bucket, EventType.DELETE_BUCKET);
}

private static Event bucketEvent(final Bucket bucket, EventType eventType) {
return new StandardEvent.Builder()
.eventType(EventType.DELETE_BUCKET)
.eventType(eventType)
.addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
.addField(EventFieldName.BUCKET_NAME, bucket.getName())
.addField(EventFieldName.BUCKET_DESCRIPTION, Objects.requireNonNullElse(bucket.getDescription(), "")) //Empty string if Null
.addField(EventFieldName.CREATED_TIMESTAMP, String.valueOf(bucket.getCreatedTimestamp()))
.addField(EventFieldName.ALLOW_PUBLIC_READ, (bucket.isAllowPublicRead() == null) ? "" : String.valueOf(bucket.isAllowPublicRead())) //Empty string if Null
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}

public static Event flowCreated(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.CREATE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return flowEvent(versionedFlow, EventType.CREATE_FLOW);
}

public static Event flowUpdated(final VersionedFlow versionedFlow) {
return new StandardEvent.Builder()
.eventType(EventType.UPDATE_FLOW)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
return flowEvent(versionedFlow, EventType.UPDATE_FLOW);
}

public static Event flowDeleted(final VersionedFlow versionedFlow) {
return flowEvent(versionedFlow, EventType.DELETE_FLOW);
}

private static Event flowEvent(final VersionedFlow versionedFlow, EventType eventType) {
return new StandardEvent.Builder()
.eventType(EventType.DELETE_FLOW)
.eventType(eventType)
.addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
.addField(EventFieldName.BUCKET_NAME, Objects.requireNonNullElse(versionedFlow.getBucketName(), "")) //Empty string if Null
.addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
.addField(EventFieldName.FLOW_NAME, Objects.requireNonNullElse(versionedFlow.getName(), ""))
.addField(EventFieldName.FLOW_DESCRIPTION, Objects.requireNonNullElse(versionedFlow.getDescription(), ""))
.addField(EventFieldName.CREATED_TIMESTAMP, String.valueOf(versionedFlow.getCreatedTimestamp()))
.addField(EventFieldName.MODIFIED_TIMESTAMP, String.valueOf(versionedFlow.getModifiedTimestamp()))
.addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
.build();
}
Expand All @@ -91,8 +92,11 @@ public static Event flowVersionCreated(final VersionedFlowSnapshot versionedFlow
return new StandardEvent.Builder()
.eventType(EventType.CREATE_FLOW_VERSION)
.addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier())
.addField(EventFieldName.BUCKET_NAME, (versionedFlowSnapshot.getBucket() == null) ? "" : Objects.requireNonNullElse(versionedFlowSnapshot.getBucket().getName(), ""))
.addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier())
.addField(EventFieldName.FLOW_NAME, (versionedFlowSnapshot.getBucket() == null) ? "" : Objects.requireNonNullElse(versionedFlowSnapshot.getFlow().getName(), ""))
.addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()))
.addField(EventFieldName.MODIFIED_TIMESTAMP, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getTimestamp()))
.addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor())
.addField(EventFieldName.COMMENT, versionComments)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.registry.event;

import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.authorization.User;
import org.apache.nifi.registry.authorization.UserGroup;
import org.apache.nifi.registry.bucket.Bucket;
Expand All @@ -26,10 +27,10 @@
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.registry.hook.Event;
import org.apache.nifi.registry.hook.EventFieldName;
import org.apache.nifi.registry.hook.EventType;
import org.apache.nifi.registry.revision.entity.RevisionInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -46,19 +47,28 @@ public class TestEventFactory {
private BundleVersion bundleVersion;
private User user;
private UserGroup userGroup;
private RevisionInfo revisionInfo;

@BeforeEach
public void setup() {
bucket = new Bucket();
bucket.setName("Bucket1");
bucket.setIdentifier(UUID.randomUUID().toString());
bucket.setCreatedTimestamp(System.currentTimeMillis());
bucket.setDescription("Bucket 1 Description");

revisionInfo = new RevisionInfo();
revisionInfo.setVersion(1L);

versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier(UUID.randomUUID().toString());
versionedFlow.setName("Flow 1");
versionedFlow.setDescription("Flow 1 Description");
versionedFlow.setBucketIdentifier(bucket.getIdentifier());
versionedFlow.setBucketName(bucket.getName());
versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
versionedFlow.setModifiedTimestamp(System.currentTimeMillis());
versionedFlow.setRevision(revisionInfo);

VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
metadata.setAuthor("user1");
Expand All @@ -70,6 +80,8 @@ public void setup() {
versionedFlowSnapshot = new VersionedFlowSnapshot();
versionedFlowSnapshot.setSnapshotMetadata(metadata);
versionedFlowSnapshot.setFlowContents(new VersionedProcessGroup());
versionedFlowSnapshot.setFlow(versionedFlow);
versionedFlowSnapshot.setBucket(bucket);

bundle = new Bundle();
bundle.setIdentifier(UUID.randomUUID().toString());
Expand Down Expand Up @@ -105,19 +117,74 @@ public void testBucketCreatedEvent() {
event.validate();

assertEquals(EventType.CREATE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(6, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
}

@Test
public void testBucketCreatedEventWithNulls() {
Bucket bucket = new Bucket();
bucket.setName("test-bucket");
bucket.setIdentifier(UUID.randomUUID().toString());
final Event event = EventFactory.bucketCreated(bucket);

assertEquals(EventType.CREATE_BUCKET, event.getEventType());
assertEquals(6, event.getFields().size());

//Assert null values are empty Strings.
assertEquals( "", event.getField(EventFieldName.BUCKET_DESCRIPTION).getValue());
assertEquals("", event.getField(EventFieldName.ALLOW_PUBLIC_READ).getValue());
assertEquals( "0", event.getField(EventFieldName.CREATED_TIMESTAMP).getValue());
}

@Test
public void testFlowCreatedEventWithNulls() {
VersionedFlow versionedFlow = new VersionedFlow();
versionedFlow.setIdentifier(UUID.randomUUID().toString());
versionedFlow.setBucketIdentifier(UUID.randomUUID().toString());
versionedFlow.setBucketIdentifier(UUID.randomUUID().toString());
final Event event = EventFactory.flowCreated(versionedFlow);

assertEquals(EventType.CREATE_FLOW, event.getEventType());
assertEquals(8, event.getFields().size());

//Assert null values are empty Strings.
assertEquals( "", event.getField(EventFieldName.BUCKET_NAME).getValue());
assertEquals( "", event.getField(EventFieldName.FLOW_NAME).getValue());
assertEquals( "0", event.getField(EventFieldName.CREATED_TIMESTAMP).getValue());
assertEquals( "0", event.getField(EventFieldName.MODIFIED_TIMESTAMP).getValue());
}

@Test
public void testFlowMetaDataCreatedEventWithNulls() {
VersionedFlowSnapshot flowSnapshot = new VersionedFlowSnapshot();
VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
snapshotMetadata.setComments("");
snapshotMetadata.setFlowIdentifier(UUID.randomUUID().toString());
snapshotMetadata.setVersion(0);
snapshotMetadata.setBucketIdentifier(UUID.randomUUID().toString());
snapshotMetadata.setAuthor("");
flowSnapshot.setSnapshotMetadata(snapshotMetadata);
final Event event = EventFactory.flowVersionCreated(flowSnapshot);

assertEquals(EventType.CREATE_FLOW_VERSION, event.getEventType());
assertEquals(8, event.getFields().size());

//Assert null values are empty Strings.
assertEquals( "", event.getField(EventFieldName.BUCKET_NAME).getValue());
assertEquals( "", event.getField(EventFieldName.FLOW_NAME).getValue());
assertEquals( "0", event.getField(EventFieldName.MODIFIED_TIMESTAMP).getValue());
}

@Test
public void testBucketUpdatedEvent() {
final Event event = EventFactory.bucketUpdated(bucket);
event.validate();

assertEquals(EventType.UPDATE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(6, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
Expand All @@ -129,7 +196,7 @@ public void testBucketDeletedEvent() {
event.validate();

assertEquals(EventType.DELETE_BUCKET, event.getEventType());
assertEquals(2, event.getFields().size());
assertEquals(6, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals("unknown", event.getField(EventFieldName.USER).getValue());
Expand All @@ -141,7 +208,7 @@ public void testFlowCreated() {
event.validate();

assertEquals(EventType.CREATE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(8, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
Expand All @@ -154,7 +221,7 @@ public void testFlowUpdated() {
event.validate();

assertEquals(EventType.UPDATE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(8, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
Expand All @@ -167,7 +234,7 @@ public void testFlowDeleted() {
event.validate();

assertEquals(EventType.DELETE_FLOW, event.getEventType());
assertEquals(3, event.getFields().size());
assertEquals(8, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
Expand All @@ -180,7 +247,7 @@ public void testFlowVersionedCreated() {
event.validate();

assertEquals(EventType.CREATE_FLOW_VERSION, event.getEventType());
assertEquals(5, event.getFields().size());
assertEquals(8, event.getFields().size());

assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue());
assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public void teardown() throws Exception {
@Test
public void testPublishConsume() throws InterruptedException {
final Bucket bucket = new Bucket();
bucket.setName("bucket1");
bucket.setDescription("bucketDescription");
bucket.setIdentifier(UUID.randomUUID().toString());

final Event bucketCreatedEvent = EventFactory.bucketCreated(bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@
*/
public enum EventFieldName {

ALLOW_PUBLIC_READ,
BUCKET_ID,
BUCKET_NAME,
BUCKET_DESCRIPTION,
FLOW_ID,
FLOW_NAME,
FLOW_DESCRIPTION,
CREATED_TIMESTAMP,
MODIFIED_TIMESTAMP,
EXTENSION_BUNDLE_ID,
VERSION,
USER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,27 @@ public enum EventType {

CREATE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.BUCKET_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.ALLOW_PUBLIC_READ,
EventFieldName.USER),
CREATE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.FLOW_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER),
CREATE_FLOW_VERSION(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.VERSION,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER,
EventFieldName.COMMENT),
CREATE_EXTENSION_BUNDLE(
Expand All @@ -55,17 +67,35 @@ public enum EventType {
REGISTRY_START(),
UPDATE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.BUCKET_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.ALLOW_PUBLIC_READ,
EventFieldName.USER),
UPDATE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.FLOW_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER),
DELETE_BUCKET(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.BUCKET_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.ALLOW_PUBLIC_READ,
EventFieldName.USER),
DELETE_FLOW(
EventFieldName.BUCKET_ID,
EventFieldName.BUCKET_NAME,
EventFieldName.FLOW_ID,
EventFieldName.FLOW_NAME,
EventFieldName.FLOW_DESCRIPTION,
EventFieldName.CREATED_TIMESTAMP,
EventFieldName.MODIFIED_TIMESTAMP,
EventFieldName.USER),
DELETE_EXTENSION_BUNDLE(
EventFieldName.BUCKET_ID,
Expand Down