Skip to content

Commit

Permalink
[#4309] feat(core): support tag events for event listener (#5847)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

support tag events for event listener

### Why are the changes needed?

Fix: #4309 

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests
  • Loading branch information
TungYuChiang authored Dec 18, 2024
1 parent 92cf3c5 commit c727df5
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 34 deletions.
17 changes: 10 additions & 7 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.gravitino.listener.PartitionEventDispatcher;
import org.apache.gravitino.listener.SchemaEventDispatcher;
import org.apache.gravitino.listener.TableEventDispatcher;
import org.apache.gravitino.listener.TagEventDispatcher;
import org.apache.gravitino.listener.TopicEventDispatcher;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.metalake.MetalakeDispatcher;
Expand All @@ -71,6 +72,7 @@
import org.apache.gravitino.metrics.source.JVMMetricsSource;
import org.apache.gravitino.storage.IdGenerator;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.tag.TagDispatcher;
import org.apache.gravitino.tag.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -108,6 +110,8 @@ public class GravitinoEnv {

private CredentialManager credentialManager;

private TagDispatcher tagDispatcher;

private AccessControlDispatcher accessControlDispatcher;

private IdGenerator idGenerator;
Expand All @@ -122,7 +126,6 @@ public class GravitinoEnv {

private AuditLogManager auditLogManager;

private TagManager tagManager;
private EventBus eventBus;
private OwnerManager ownerManager;
private FutureGrantManager futureGrantManager;
Expand Down Expand Up @@ -321,12 +324,12 @@ public AccessControlDispatcher accessControlDispatcher() {
}

/**
* Get the TagManager associated with the Gravitino environment.
* Get the tagDispatcher associated with the Gravitino environment.
*
* @return The TagManager instance.
* @return The tagDispatcher instance.
*/
public TagManager tagManager() {
return tagManager;
public TagDispatcher tagDispatcher() {
return tagDispatcher;
}

/**
Expand Down Expand Up @@ -497,7 +500,7 @@ private void initGravitinoServerComponents() {
// Tree lock
this.lockManager = new LockManager(config);

// Tag manager
this.tagManager = new TagManager(idGenerator, entityStore);
// Create and initialize Tag related modules
this.tagDispatcher = new TagEventDispatcher(eventBus, new TagManager(idGenerator, entityStore));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.listener;

import java.util.Map;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.exceptions.NoSuchTagException;
import org.apache.gravitino.tag.Tag;
import org.apache.gravitino.tag.TagChange;
import org.apache.gravitino.tag.TagDispatcher;

/**
* {@code TagEventDispatcher} is a decorator for {@link TagDispatcher} that not only delegates tag
* operations to the underlying tag dispatcher but also dispatches corresponding events to an {@link
* EventBus} after each operation is completed. This allows for event-driven workflows or monitoring
* of tag operations.
*/
public class TagEventDispatcher implements TagDispatcher {
@SuppressWarnings("unused")
private final EventBus eventBus;

@SuppressWarnings("unused")
private final TagDispatcher dispatcher;

public TagEventDispatcher(EventBus eventBus, TagDispatcher dispatcher) {
this.eventBus = eventBus;
this.dispatcher = dispatcher;
}

@Override
public String[] listTags(String metalake) {
// TODO: listTagsPreEvent
try {
// TODO: listTagsEvent
return dispatcher.listTags(metalake);
} catch (Exception e) {
// TODO: listTagFailureEvent
throw e;
}
}

@Override
public Tag[] listTagsInfo(String metalake) {
// TODO: listTagsInfoPreEvent
try {
// TODO: listTagsInfoEvent
return dispatcher.listTagsInfo(metalake);
} catch (Exception e) {
// TODO: listTagsInfoFailureEvent
throw e;
}
}

@Override
public Tag getTag(String metalake, String name) throws NoSuchTagException {
// TODO: getTagPreEvent
try {
// TODO: getTagEvent
return dispatcher.getTag(metalake, name);
} catch (NoSuchTagException e) {
// TODO: getTagFailureEvent
throw e;
}
}

@Override
public Tag createTag(
String metalake, String name, String comment, Map<String, String> properties) {
// TODO: createTagPreEvent
try {
// TODO: createTagEvent
return dispatcher.createTag(metalake, name, comment, properties);
} catch (Exception e) {
// TODO: createTagFailureEvent
throw e;
}
}

@Override
public Tag alterTag(String metalake, String name, TagChange... changes) {
// TODO: alterTagPreEvent
try {
// TODO: alterTagEvent
return dispatcher.alterTag(metalake, name, changes);
} catch (Exception e) {
// TODO: alterTagFailureEvent
throw e;
}
}

@Override
public boolean deleteTag(String metalake, String name) {
// TODO: deleteTagPreEvent
try {
// TODO: deleteTagEvent
return dispatcher.deleteTag(metalake, name);
} catch (Exception e) {
// TODO: deleteTagFailureEvent
throw e;
}
}

@Override
public MetadataObject[] listMetadataObjectsForTag(String metalake, String name) {
// TODO: listMetadataObjectsForTagPreEvent
try {
// TODO: listMetadataObjectsForTagEvent
return dispatcher.listMetadataObjectsForTag(metalake, name);
} catch (Exception e) {
// TODO: listMetadataObjectsForTagFailureEvent
throw e;
}
}

@Override
public String[] listTagsForMetadataObject(String metalake, MetadataObject metadataObject) {
// TODO: listTagsForMetadataObjectPreEvent
try {
// TODO: listTagsForMetadataObjectEvent
return dispatcher.listTagsForMetadataObject(metalake, metadataObject);
} catch (Exception e) {
// TODO: listTagsForMetadataObjectFailureEvent
throw e;
}
}

@Override
public Tag[] listTagsInfoForMetadataObject(String metalake, MetadataObject metadataObject) {
// TODO: listTagsInfoForMetadataObjectPreEvent
try {
// TODO: listTagsInfoForMetadataObjectEvent
return dispatcher.listTagsInfoForMetadataObject(metalake, metadataObject);
} catch (Exception e) {
// TODO: listTagsInfoForMetadataObjectFailureEvent
throw e;
}
}

@Override
public String[] associateTagsForMetadataObject(
String metalake, MetadataObject metadataObject, String[] tagsToAdd, String[] tagsToRemove) {
// TODO: associateTagsForMetadataObjectPreEvent
try {
// TODO: associateTagsForMetadataObjectEvent
return dispatcher.associateTagsForMetadataObject(
metalake, metadataObject, tagsToAdd, tagsToRemove);
} catch (Exception e) {
// TODO: associateTagsForMetadataObjectFailureEvent
throw e;
}
}

@Override
public Tag getTagForMetadataObject(String metalake, MetadataObject metadataObject, String name) {
// TODO: getTagForMetadataObjectPreEvent
try {
// TODO: getTagForMetadataObjectEvent
return dispatcher.getTagForMetadataObject(metalake, metadataObject, name);
} catch (Exception e) {
// TODO: getTagForMetadataObjectFailureEvent
throw e;
}
}
}
133 changes: 133 additions & 0 deletions core/src/main/java/org/apache/gravitino/tag/TagDispatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.tag;

import java.util.Map;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.exceptions.NoSuchTagException;

/**
* {@code TagDispatcher} interface provides functionalities for managing tags within a metalake. It
* includes a comprehensive set of operations such as listing, creating, retrieving, updating, and
* deleting tags, as well as associating tags with other objects.
*/
public interface TagDispatcher {
/**
* List all the tag names for the specific object.
*
* @return The list of tag names.
*/
String[] listTags(String metalake);

/**
* List all the tags with details for the specific object.
*
* @return The list of tags.
*/
Tag[] listTagsInfo(String metalake);

/**
* Get a tag by its name for the specific object.
*
* @param name The name of the tag.
* @param metalake The name of the metalake
* @return The tag.
* @throws NoSuchTagException If the tag does not associate with the object.
*/
Tag getTag(String metalake, String name) throws NoSuchTagException;

/**
* Create a new tag in the specified metalake.
*
* @param metalake The name of the metalake
* @param name The name of the tag
* @param comment A comment for the new tag.
* @param properties The properties of the tag.
* @return The created tag.
*/
Tag createTag(String metalake, String name, String comment, Map<String, String> properties);

/**
* Alter an existing tag in the specified metalake
*
* @param metalake The name of the metalake.
* @param name The name of the tag.
* @param changes The changes to apply to the tag.
* @return The updated tag.
*/
Tag alterTag(String metalake, String name, TagChange... changes);

/**
* delete an existing tag in the specified metalake
*
* @param metalake The name of the metalake.
* @param name The name of the tag.
* @return True if the tag was successfully deleted, false otherwise
*/
boolean deleteTag(String metalake, String name);

/**
* List all metadata objects associated with the specified tag.
*
* @param metalake The name of the metalake.
* @param name The name of the tag.
* @return The array of metadata objects associated with the specified tag.
*/
MetadataObject[] listMetadataObjectsForTag(String metalake, String name);

/**
* List all tag names associated with the specified metadata object.
*
* @param metalake The name of the metalake
* @param metadataObject The metadata object for which associated tags
* @return The list of tag names associated with the given metadata object.
*/
String[] listTagsForMetadataObject(String metalake, MetadataObject metadataObject);

/**
* List detailed information for all tags associated with the specified metadata object.
*
* @param metalake The name of the metalake
* @param metadataObject The metadata object to query tag details for.
* @return An array of tags with detailed information.
*/
Tag[] listTagsInfoForMetadataObject(String metalake, MetadataObject metadataObject);

/**
* Associate or disassociate tags with the specified metadata object.
*
* @param metalake The name of the metalake.
* @param metadataObject The metadata object to update tags for.
* @param tagsToAdd Tags to associate with the object.
* @param tagsToRemove Tags to disassociate from the object.
* @return An array of updated tag names.
*/
String[] associateTagsForMetadataObject(
String metalake, MetadataObject metadataObject, String[] tagsToAdd, String[] tagsToRemove);

/**
* Retrieve a specific tag associated with the specified metadata object.
*
* @param metalake The name of the metalake.
* @param metadataObject The metadata object to query the tag for.
* @param name The name of the tag to retrieve.
* @return The tag associated with the metadata object.
*/
Tag getTagForMetadataObject(String metalake, MetadataObject metadataObject, String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TagManager {
public class TagManager implements TagDispatcher {

private static final Logger LOG = LoggerFactory.getLogger(TagManager.class);

Expand Down
Loading

0 comments on commit c727df5

Please sign in to comment.