Skip to content

Commit

Permalink
Events notification system for Nessie - VersionStore changes
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra committed Apr 21, 2023
1 parent 74b8f0a commit 1764e4f
Show file tree
Hide file tree
Showing 84 changed files with 3,015 additions and 872 deletions.
6 changes: 4 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ logback = "1.2.11"
maven = "3.9.1"
mavenResolver = "1.9.8"
micrometer = "1.10.6"
mockito="4.11.0"
nessieClientVersion = "0.51.1" # Must be in sync with Nessie version in the Iceberg release.
opentelemetry = "1.25.0"
opentelemetryAlpha = "1.20.1-alpha"
Expand Down Expand Up @@ -70,7 +71,7 @@ managed-slf4j = ["slf4j-jcl-over-slf4j", "slf4j-log4j-over-slf4j", "slf4j-jcl-ov
managed-testcontainers = ["testcontainers-cockroachdb", "testcontainers-mongodb",
"testcontainers-postgresql", "testcontainers-testcontainers"]

junit-testing = ["assertj-core", "mockito-core", "junit-jupiter-api", "junit-jupiter-params"]
junit-testing = ["assertj-core", "mockito-core", "mockito-junit-jupiter", "junit-jupiter-api", "junit-jupiter-params"]

[libraries]
agroal-pool = { module = "io.agroal:agroal-pool", version = "2.1" }
Expand Down Expand Up @@ -175,7 +176,8 @@ maven-resolver-transport-file = { module = "org.apache.maven.resolver:maven-reso
maven-resolver-transport-http = { module = "org.apache.maven.resolver:maven-resolver-transport-http", version.ref = "mavenResolver" }
micrometer-core = { module = "io.micrometer:micrometer-core", version.ref = "micrometer" }
microprofile-openapi = { module = "org.eclipse.microprofile.openapi:microprofile-openapi-api", version = "3.1" }
mockito-core = { module = "org.mockito:mockito-core", version = "4.11.0" }
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
mockito-junit-jupiter = { module = "org.mockito:mockito-junit-jupiter", version.ref = "mockito" }
mongodb-driver-sync = { module = "org.mongodb:mongodb-driver-sync", version = "4.9.1" }
openapi-generator-cli = { module = "org.openapitools:openapi-generator-cli", version = "6.5.0" }
opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import org.projectnessie.quarkus.cli.ExportRepository.Format;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.CommitMetaSerializer;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.CommitResult;
import org.projectnessie.versioned.ReferenceAlreadyExistsException;
import org.projectnessie.versioned.ReferenceConflictException;
import org.projectnessie.versioned.ReferenceNotFoundException;
import org.projectnessie.versioned.persist.adapter.CommitLogEntry;
import org.projectnessie.versioned.persist.adapter.ContentId;
import org.projectnessie.versioned.persist.adapter.DatabaseAdapter;
import org.projectnessie.versioned.persist.adapter.ImmutableCommitParams;
Expand Down Expand Up @@ -232,7 +233,7 @@ private static void populateRepository(DatabaseAdapter adapter)
ContentKey key = ContentKey.of("namespace123", "table123");
String namespaceId = UUID.randomUUID().toString();
String tableId = UUID.randomUUID().toString();
Hash main =
CommitResult<CommitLogEntry> main =
adapter.commit(
ImmutableCommitParams.builder()
.toBranch(branchMain)
Expand All @@ -256,7 +257,7 @@ private static void populateRepository(DatabaseAdapter adapter)
.toStoreOnReferenceState(
IcebergTable.of("meta", 42, 43, 44, 45, tableId))))
.build());
adapter.create(branchFoo, main);
adapter.create(branchFoo, main.getCommit().getHash());
adapter.commit(
ImmutableCommitParams.builder()
.toBranch(branchFoo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.projectnessie.versioned.storage.common.indexes.StoreKey;
import org.projectnessie.versioned.storage.common.logic.CommitLogic;
import org.projectnessie.versioned.storage.common.logic.ReferenceLogic;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;
import org.projectnessie.versioned.storage.common.persist.Reference;
import org.projectnessie.versioned.store.DefaultStoreWorker;
Expand Down Expand Up @@ -251,7 +251,7 @@ private void populateRepository(Persist persist) throws Exception {

soft.assertThat(persist.storeObj(valueMain)).isTrue();
StoreKey key = key("namespace123", "table123");
ObjId main =
CommitObj main =
commitLogic.doCommit(
newCommitBuilder()
.parentCommitId(EMPTY_OBJ_ID)
Expand All @@ -260,14 +260,14 @@ private void populateRepository(Persist persist) throws Exception {
.headers(EMPTY_COMMIT_HEADERS)
.build(),
emptyList());
referenceLogic.assignReference(refMain, requireNonNull(main));
referenceLogic.assignReference(refMain, requireNonNull(main).id());

Reference refFoo = referenceLogic.createReference("refs/heads/branch-foo", main);
Reference refFoo = referenceLogic.createReference("refs/heads/branch-foo", main.id());
soft.assertThat(persist.storeObj(valueFoo)).isTrue();
ObjId foo =
CommitObj foo =
commitLogic.doCommit(
newCommitBuilder()
.parentCommitId(main)
.parentCommitId(main.id())
.addAdds(
commitAdd(
key,
Expand All @@ -279,6 +279,6 @@ private void populateRepository(Persist persist) throws Exception {
.headers(EMPTY_COMMIT_HEADERS)
.build(),
emptyList());
referenceLogic.assignReference(refFoo, requireNonNull(foo));
referenceLogic.assignReference(refFoo, requireNonNull(foo).id());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ public void testNoAncestorHash(LaunchResult result, DatabaseAdapter adapter) {
public void testMainHash(QuarkusMainLauncher launcher, DatabaseAdapter adapter)
throws ReferenceNotFoundException, ReferenceConflictException {
Hash hash =
adapter.commit(
ImmutableCommitParams.builder()
.toBranch(BranchName.of("main"))
.commitMetaSerialized(ByteString.copyFrom(new byte[] {1, 2, 3}))
.build());
adapter
.commit(
ImmutableCommitParams.builder()
.toBranch(BranchName.of("main"))
.commitMetaSerialized(ByteString.copyFrom(new byte[] {1, 2, 3}))
.build())
.getCommit()
.getHash();

LaunchResult result = launcher.launch("info");
assertThat(result.getOutput()).contains(hash.asString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException;
import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.persist.Persist;

@QuarkusMainTest
Expand Down Expand Up @@ -71,7 +71,7 @@ public void testMainHash(QuarkusMainLauncher launcher, Persist persist)
ObjNotFoundException,
RefNotFoundException,
RefConditionFailedException {
ObjId head =
CommitObj head =
commitLogic(persist)
.doCommit(
newCommitBuilder()
Expand All @@ -81,12 +81,13 @@ public void testMainHash(QuarkusMainLauncher launcher, Persist persist)
.build(),
emptyList());
referenceLogic(persist)
.assignReference(reference("refs/heads/main", EMPTY_OBJ_ID, false), requireNonNull(head));
.assignReference(
reference("refs/heads/main", EMPTY_OBJ_ID, false), requireNonNull(head).id());

LaunchResult result = launcher.launch("info");
assertThat(result.getOutput())
.contains("Repository created:")
.contains("Default branch head commit ID: " + head)
.contains("Default branch head commit ID: " + head.id())
.contains("Default branch commit count: 1")
.contains("Version-store type: " + MONGODB.name())
.contains("Default branch: main")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ private Hash commit(
commitMetaUpdate(null).rewriteSingle(CommitMeta.fromMessage(commitMsg)),
Collections.singletonList(contentOperation),
validator,
(k, c) -> {});
(k, c) -> {})
.getCommit()
.getHash();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.projectnessie.versioned.GetNamedRefsParams.RetrieveOptions;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.KeyEntry;
import org.projectnessie.versioned.LazyPut;
import org.projectnessie.versioned.MergeConflictException;
import org.projectnessie.versioned.MergeResult;
import org.projectnessie.versioned.NamedRef;
Expand Down Expand Up @@ -275,7 +276,7 @@ public Reference createReference(
check.checkAndThrow();

try {
Hash hash = getStore().create(namedReference, toHash(targetHash, false));
Hash hash = getStore().create(namedReference, toHash(targetHash, false)).getHash();
return RefUtil.toReference(namedReference, hash);
} catch (ReferenceNotFoundException e) {
throw new NessieReferenceNotFoundException(e.getMessage(), e);
Expand Down Expand Up @@ -346,7 +347,7 @@ public Reference deleteReference(

startAccessCheck().canDeleteReference(ref).checkAndThrow();

Hash deletedAthash = getStore().delete(ref, toHash(expectedHash, true));
Hash deletedAthash = getStore().delete(ref, toHash(expectedHash, true)).getHash();
return RefUtil.toReference(ref, deletedAthash);
} catch (ReferenceNotFoundException e) {
throw new NessieReferenceNotFoundException(e.getMessage(), e);
Expand Down Expand Up @@ -482,8 +483,8 @@ private ImmutableLogEntry commitToLogEntry(boolean fetchAll, Commit commit) {
.forEach(
op -> {
ContentKey key = op.getKey();
if (op instanceof Put) {
Content content = ((Put) op).getValue();
if (op instanceof LazyPut) {
Content content = ((LazyPut) op).getValue();
logEntry.addOperations(Operation.Put.of(key, content));
}
if (op instanceof Delete) {
Expand Down Expand Up @@ -589,6 +590,7 @@ public MergeResponse transplantCommitsIntoBranch(
MergeResult<Commit> result =
getStore()
.transplant(
BranchName.of(fromRefName),
targetBranch,
toHash(expectedHash, true),
transplants,
Expand Down Expand Up @@ -637,6 +639,7 @@ public MergeResponse mergeRefIntoBranch(
MergeResult<Commit> result =
getStore()
.merge(
BranchName.of(fromRefName),
toHash(fromRefName, fromHash),
targetBranch,
toHash(expectedHash, true),
Expand Down Expand Up @@ -903,7 +906,9 @@ public CommitResponse commitMultipleOperations(
() -> null,
(key, cid) -> {
commitResponse.addAddedContents(addedContent(key, cid));
});
})
.getCommit()
.getHash();

return commitResponse.targetBranch(Branch.of(branch, newHash.asString())).build();
} catch (ReferenceNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
import javax.annotation.Nonnull;
import org.projectnessie.model.ContentKey;
import org.projectnessie.nessie.relocated.protobuf.ByteString;
import org.projectnessie.versioned.CommitResult;
import org.projectnessie.versioned.Diff;
import org.projectnessie.versioned.GetNamedRefsParams;
import org.projectnessie.versioned.Hash;
import org.projectnessie.versioned.MergeResult;
import org.projectnessie.versioned.NamedRef;
import org.projectnessie.versioned.RefLogNotFoundException;
import org.projectnessie.versioned.ReferenceAlreadyExistsException;
import org.projectnessie.versioned.ReferenceAssignedResult;
import org.projectnessie.versioned.ReferenceConflictException;
import org.projectnessie.versioned.ReferenceCreatedResult;
import org.projectnessie.versioned.ReferenceDeletedResult;
import org.projectnessie.versioned.ReferenceInfo;
import org.projectnessie.versioned.ReferenceNotFoundException;

Expand Down Expand Up @@ -148,7 +152,7 @@ Stream<KeyListEntry> keys(Hash commit, KeyFilterPredicate keyFilter)
* branch due to a conflicting change or if the expected hash in {@link
* CommitParams#getToBranch()}is not its expected hEAD
*/
Hash commit(CommitParams commitParams)
CommitResult<CommitLogEntry> commit(CommitParams commitParams)
throws ReferenceConflictException, ReferenceNotFoundException;

/**
Expand Down Expand Up @@ -218,11 +222,11 @@ Stream<ReferenceInfo<ByteString>> namedRefs(GetNamedRefsParams params)
* @param target The already existing named reference with an optional hash on that branch. This
* parameter can be {@code null} for the edge case when the default branch is re-created after
* it has been dropped.
* @return the current HEAD of the created branch or tag
* @return A {@link ReferenceCreatedResult} containing the head of the created reference
* @throws ReferenceAlreadyExistsException if the reference {@code ref} already exists.
* @throws ReferenceNotFoundException if {@code target} does not exist.
*/
Hash create(NamedRef ref, Hash target)
ReferenceCreatedResult create(NamedRef ref, Hash target)
throws ReferenceAlreadyExistsException, ReferenceNotFoundException;

/**
Expand All @@ -231,12 +235,12 @@ Hash create(NamedRef ref, Hash target)
* @param reference named-reference to delete. If a value for the hash is specified, it must be
* equal to the current HEAD.
* @param expectedHead if present, {@code reference}'s current HEAD must be equal to this value
* @return head of deleted reference
* @return A {@link ReferenceDeletedResult} containing the head of the deleted reference
* @throws ReferenceNotFoundException if the named reference in {@code reference} does not exist.
* @throws ReferenceConflictException if the named reference's HEAD is not equal to the expected
* HEAD
*/
Hash delete(NamedRef reference, Optional<Hash> expectedHead)
ReferenceDeletedResult delete(NamedRef reference, Optional<Hash> expectedHead)
throws ReferenceNotFoundException, ReferenceConflictException;

/**
Expand All @@ -245,13 +249,15 @@ Hash delete(NamedRef reference, Optional<Hash> expectedHead)
* @param assignee named reference to re-assign
* @param expectedHead if present, {@code assignee}'s current HEAD must be equal to this value
* @param assignTo commit to update {@code assignee}'s HEAD to
* @return A {@link ReferenceAssignedResult} containing the previous and current head of the
* reference
* @throws ReferenceNotFoundException if either the named reference in {@code assignTo} or the
* commit on that reference, if specified, does not exist or if the named reference specified
* in {@code assignee} does not exist.
* @throws ReferenceConflictException if the HEAD of the named reference {@code assignee} is not
* equal to the expected HEAD
*/
void assign(NamedRef assignee, Optional<Hash> expectedHead, Hash assignTo)
ReferenceAssignedResult assign(NamedRef assignee, Optional<Hash> expectedHead, Hash assignTo)
throws ReferenceNotFoundException, ReferenceConflictException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import org.immutables.value.Value;
import org.projectnessie.model.ContentKey;
import org.projectnessie.nessie.relocated.protobuf.ByteString;
import org.projectnessie.versioned.BranchName;
import org.projectnessie.versioned.MergeType;
import org.projectnessie.versioned.MetadataRewriter;

public interface MetadataRewriteParams extends ToBranchParams {

/** Branch to merge or transplant from. */
BranchName getFromBranch();

/** Whether to keep the individual commits and do not squash the commits to merge. */
@Value.Default
default boolean keepIndividualCommits() {
Expand All @@ -47,6 +51,8 @@ default MergeType getDefaultMergeType() {

@SuppressWarnings({"override", "UnusedReturnValue"})
interface Builder<B> extends ToBranchParams.Builder<B> {
B fromBranch(BranchName fromBranch);

B keepIndividualCommits(boolean keepIndividualCommits);

B defaultMergeType(MergeType defaultMergeType);
Expand Down
Loading

0 comments on commit 1764e4f

Please sign in to comment.