Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ChangeStreamMutation which is a ChangeStreamRecord #1324

Merged
merged 7 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,351 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;

/**
* A ChangeStreamMutation represents a list of mods(represented by List<{@link Entry}>) targeted at
* a single row, which is concatenated by (TODO:ChangeStreamRecordMerger). It represents a logical
* row mutation and can be converted to the original write request(i.e. {@link RowMutation} or
* {@link RowMutationEntry}.
*
* <p>A ChangeStreamMutation can be constructed in two ways, depending on whether it's a user
* initiated mutation or a Garbage Collection mutation. Either way, the caller should explicitly set
* `token` and `lowWatermark` before build(), otherwise it'll raise an error.
*
* <p>Case 1) User initiated mutation.
*
* <pre>{@code
* ChangeStreamMutation.Builder builder = ChangeStreamMutation.createUserMutation(...);
* builder.setCell(...);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
* builder.deleteFamily(...);
* builder.deleteCells(...);
* ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
* }</pre>
*
* Case 2) Garbage Collection mutation.
*
* <pre>{@code
* ChangeStreamMutation.Builder builder = ChangeStreamMutation.createGcMutation(...);
* builder.setCell(...);
* builder.deleteFamily(...);
* builder.deleteCells(...);
* ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
* }</pre>
*/
mutianf marked this conversation as resolved.
Show resolved Hide resolved
public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
private static final long serialVersionUID = 8419520253162024218L;

private final ByteString rowKey;

/** Possible values: USER/GARBAGE_COLLECTION. */
private final Type type;

/** This should only be set when type==USER. */
private final String sourceClusterId;

private final Timestamp commitTimestamp;

private final int tieBreaker;

private transient ImmutableList.Builder<Entry> entries = ImmutableList.builder();

private String token;

private Timestamp lowWatermark;

private ChangeStreamMutation(Builder builder) {
this.rowKey = builder.rowKey;
this.type = builder.type;
this.sourceClusterId = builder.sourceClusterId;
this.commitTimestamp = builder.commitTimestamp;
this.tieBreaker = builder.tieBreaker;
this.token = builder.token;
this.lowWatermark = builder.lowWatermark;
this.entries = builder.entries;
}

/**
* Creates a new instance of a user initiated mutation. It returns a builder instead of a
* ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish
* building the logical mutation.
*/
static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
int tieBreaker) {
return new Builder(rowKey, Type.USER, sourceClusterId, commitTimestamp, tieBreaker);
}

/**
* Creates a new instance of a GC mutation. It returns a builder instead of a ChangeStreamMutation
* because `token` and `loWatermark` must be set later when we finish building the logical
* mutation.
*/
static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
input.defaultReadObject();

@SuppressWarnings("unchecked")
ImmutableList<Entry> deserialized = (ImmutableList<Entry>) input.readObject();
this.entries = ImmutableList.<Entry>builder().addAll(deserialized);
}

private void writeObject(ObjectOutputStream output) throws IOException {
output.defaultWriteObject();
output.writeObject(entries.build());
}

/** Get the row key of the current mutation. */
@Nonnull
public ByteString getRowKey() {
return this.rowKey;
}

/** Get the type of the current mutation. */
@Nonnull
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
public Type getType() {
return this.type;
}

/** Get the source cluster id of the current mutation. Null for Garbage collection mutation. */
public String getSourceClusterId() {
return this.sourceClusterId;
}

/** Get the commit timestamp of the current mutation. */
@Nonnull
public Timestamp getCommitTimestamp() {
return this.commitTimestamp;
}

/**
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
* mutations are applied to different clusters at the same time.
*/
public int getTieBreaker() {
return this.tieBreaker;
}

/** Get the token of the current mutation, which can be used to resume the changestream. */
public String getToken() {
return this.token;
}

/** Get the low watermark of the current mutation. */
public Timestamp getLowWatermark() {
return this.lowWatermark;
}

/** Get the list of mods of the current mutation. */
@Nonnull
public List<Entry> getEntries() {
return this.entries.build();
}

/** Returns a builder containing all the values of this ChangeStreamMutation class. */
Builder toBuilder() {
return new Builder(this);
}

/** Helper class to create a ChangeStreamMutation. */
public static class Builder {
private final ByteString rowKey;

private final Type type;

private final String sourceClusterId;

private final Timestamp commitTimestamp;

private final int tieBreaker;

private transient ImmutableList.Builder<Entry> entries = ImmutableList.builder();

private String token;

private Timestamp lowWatermark;

private Builder(
ByteString rowKey,
Type type,
String sourceClusterId,
Timestamp commitTimestamp,
int tieBreaker) {
this.rowKey = rowKey;
this.type = type;
this.sourceClusterId = sourceClusterId;
this.commitTimestamp = commitTimestamp;
this.tieBreaker = tieBreaker;
}

private Builder(ChangeStreamMutation changeStreamMutation) {
this.rowKey = changeStreamMutation.rowKey;
this.type = changeStreamMutation.type;
this.sourceClusterId = changeStreamMutation.sourceClusterId;
this.commitTimestamp = changeStreamMutation.commitTimestamp;
this.tieBreaker = changeStreamMutation.tieBreaker;
this.entries = changeStreamMutation.entries;
this.token = changeStreamMutation.token;
this.lowWatermark = changeStreamMutation.lowWatermark;
}

Builder setCell(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
long timestamp,
@Nonnull ByteString value) {
this.entries.add(SetCell.create(familyName, qualifier, timestamp, value));
return this;
}

Builder deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.entries.add(DeleteCells.create(familyName, qualifier, timestampRange));
return this;
}

Builder deleteFamily(@Nonnull String familyName) {
this.entries.add(DeleteFamily.create(familyName));
return this;
}

public Builder setToken(@Nonnull String token) {
this.token = token;
return this;
}

public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
this.lowWatermark = lowWatermark;
return this;
}

public ChangeStreamMutation build() {
Preconditions.checkArgument(
token != null && lowWatermark != null,
"ChangeStreamMutation must have a continuation token and low watermark.");
return new ChangeStreamMutation(this);
}
}

mutianf marked this conversation as resolved.
Show resolved Hide resolved
public RowMutation toRowMutation(@Nonnull String tableId) {
RowMutation rowMutation = RowMutation.create(tableId, rowKey);
for (Entry entry : this.entries.build()) {
if (entry instanceof DeleteFamily) {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
rowMutation.deleteFamily(((DeleteFamily) entry).getFamilyName());
} else if (entry instanceof DeleteCells) {
DeleteCells deleteCells = (DeleteCells) entry;
rowMutation.deleteCells(
deleteCells.getFamilyName(),
deleteCells.getQualifier(),
deleteCells.getTimestampRange());
} else if (entry instanceof SetCell) {
SetCell setCell = (SetCell) entry;
rowMutation.setCell(
setCell.getFamilyName(),
setCell.getQualifier(),
setCell.getTimestamp(),
setCell.getValue());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
}
return rowMutation;
}

public RowMutationEntry toRowMutationEntry() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
RowMutationEntry rowMutationEntry = RowMutationEntry.create(rowKey);
for (Entry entry : this.entries.build()) {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
if (entry instanceof DeleteFamily) {
rowMutationEntry.deleteFamily(((DeleteFamily) entry).getFamilyName());
} else if (entry instanceof DeleteCells) {
DeleteCells deleteCells = (DeleteCells) entry;
rowMutationEntry.deleteCells(
deleteCells.getFamilyName(),
deleteCells.getQualifier(),
deleteCells.getTimestampRange());
} else if (entry instanceof SetCell) {
SetCell setCell = (SetCell) entry;
rowMutationEntry.setCell(
setCell.getFamilyName(),
setCell.getQualifier(),
setCell.getTimestamp(),
setCell.getValue());
} else {
throw new IllegalArgumentException("Unexpected Entry type.");
}
}
return rowMutationEntry;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangeStreamMutation otherChangeStreamMutation = (ChangeStreamMutation) o;
return Objects.equal(this.hashCode(), otherChangeStreamMutation.hashCode());
}

@Override
public int hashCode() {
return Objects.hashCode(
rowKey, type, sourceClusterId, commitTimestamp, tieBreaker, token, lowWatermark, entries);
}

@Override
public String toString() {
List<String> entriesAsStrings = new ArrayList<>();
for (Entry entry : this.entries.build()) {
entriesAsStrings.add(entry.toString());
}
String entryString = "[" + String.join(";\t", entriesAsStrings) + "]";
return MoreObjects.toStringHelper(this)
.add("rowKey", this.rowKey.toStringUtf8())
.add("type", this.type)
.add("sourceClusterId", this.sourceClusterId)
.add("commitTimestamp", this.commitTimestamp.toString())
.add("token", this.token)
.add("lowWatermark", this.lowWatermark)
.add("entries", entryString)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** Representation of a DeleteCells mod in a data change. */
@AutoValue
mutianf marked this conversation as resolved.
Show resolved Hide resolved
public abstract class DeleteCells implements Entry, Serializable {
private static final long serialVersionUID = 851772158721462017L;

public static DeleteCells create(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
return new AutoValue_DeleteCells(familyName, qualifier, timestampRange);
}

/** Get the column family of the current DeleteCells. */
@Nonnull
public abstract String getFamilyName();

/** Get the column qualifier of the current DeleteCells. */
@Nonnull
public abstract ByteString getQualifier();

/** Get the timestamp range of the current DeleteCells. */
@Nonnull
public abstract TimestampRange getTimestampRange();
}
Loading