Skip to content

Commit

Permalink
[GOBBLIN-1697]Have a separate resource handler to rely on CDC stream …
Browse files Browse the repository at this point in the history
…to do message forwarding (apache#3549)

* address comments

* use connectionmanager when httpclient is not cloesable

* fix test case to test orchestor as one listener of flow spec

* remove unintentional change

* [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding

* fix compilation error

* address comments

* address comments

* address comments

* update outdated javadoc

Co-authored-by: Zihan Li <[email protected]>
  • Loading branch information
2 people authored and phet committed Sep 22, 2022
1 parent eea5ce8 commit ace81f7
Show file tree
Hide file tree
Showing 9 changed files with 667 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,14 @@ public CreateResponse createFlowConfig(FlowConfig flowConfig) throws FlowConfigL
return this.createFlowConfig(flowConfig, true);
}

public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener) {
// Set the max version to be the largest value so that we blindly update the flow spec in this case
return updateFlowConfig(flowId, flowConfig, triggerListener, Long.MAX_VALUE);
}
/**
* Update flowConfig locally and trigger all listeners iff @param triggerListener is set to true
*/
public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener) {
public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boolean triggerListener, long modifiedWatermark) {
log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());

if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || !flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
Expand All @@ -185,7 +189,7 @@ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, boo
flowConfig = originalFlowConfig;
}
try {
this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
this.flowCatalog.update(createFlowSpecForConfig(flowConfig), triggerListener, modifiedWatermark);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec));
}

@Override
public Spec updateSpec(Spec spec, long modifiedWatermark) throws IOException, SpecNotFoundException {
return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec, modifiedWatermark));
}

@Override
public Collection<Spec> getSpecs() throws IOException {
return this.getAllTimer.invokeMayThrowIO(() -> getSpecsImpl());
Expand All @@ -175,6 +180,10 @@ public int getSize() throws IOException {
return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
}

public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws IOException, SpecNotFoundException{
return updateSpecImpl(spec);
}

public abstract void addSpecImpl(Spec spec) throws IOException;
public abstract Spec updateSpecImpl(Spec spec) throws IOException, SpecNotFoundException;
public abstract boolean existsImpl(URI specUri) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public interface SpecStore {
*/
Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException;

/***
* Update {@link Spec} in the {@link SpecStore} when modification time of current entry is smaller than {@link modifiedWatermark}.
* @param spec {@link Spec} to be updated.
* @param modifiedWatermark largest modifiedWatermark that current spec should be
* @throws IOException Exception in updating the {@link Spec}.
* @return Updated {@link Spec}.
* @throws SpecNotFoundException If {@link Spec} being updated is not present in store.
*/
default Spec updateSpec(Spec spec, long modifiedWatermark) throws IOException, SpecNotFoundException {return updateSpec(spec);};

/***
* Retrieve the latest version of the {@link Spec} by URI from the {@link SpecStore}.
* @param specUri URI for the {@link Spec} to be retrieved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,21 +349,33 @@ public Spec getSpecWrapper(URI uri) {
return spec;
}


public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) throws Throwable {
return updateOrAddSpecHelper(spec, triggerListener, false, Long.MAX_VALUE);
}

public Map<String, AddSpecResponse> update(Spec spec, boolean triggerListener, long modifiedWatermark) throws Throwable {
return updateOrAddSpecHelper(spec, triggerListener, true, modifiedWatermark);
}

/**
* Persist {@link Spec} into {@link SpecStore} and notify {@link SpecCatalogListener} if triggerListener
* is set to true.
* If the {@link Spec} is a {@link FlowSpec} it is persisted if it can be compiled at the time this method received
* the spec. `explain` specs are not persisted. The logic of this method is tightly coupled with the logic of
* {@link GobblinServiceJobScheduler#onAddSpec()}, which is one of the listener of {@link FlowCatalog}.
* {@link GobblinServiceJobScheduler#onAddSpec()} or {@link Orchestrator#onAddSpec()} in warm standby mode,
* which is one of the listener of {@link FlowCatalog}.
* We use condition variables {@link #specSyncObjects} to achieve synchronization between
* {@link GobblinServiceJobScheduler#NonScheduledJobRunner} thread and this thread to ensure deletion of
* {@link FlowSpec} happens after the corresponding run once flow is submitted to the orchestrator.
*
* @param spec The Spec to be added
* @param triggerListener True if listeners should be notified.
* @param isUpdate Whether this is update or add operation, it will call different method in spec store to persist the spec
* @param modifiedWatermark If it's update operation, the largest modifiedWatermark that it can modify, or in other word, the timestamp which old spec should be modified before
* @return a map of listeners and their {@link AddSpecResponse}s
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) throws Throwable {
private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec, boolean triggerListener, boolean isUpdate, long modifiedWatermark) throws Throwable {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
FlowSpec flowSpec = (FlowSpec) spec;
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Expand All @@ -384,7 +396,6 @@ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) thro
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) {
throw entry.getValue().getError().getCause();
}
return responseMap;
}
}
AddSpecResponse<String> compileResponse;
Expand All @@ -402,12 +413,17 @@ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) thro
try {
if (!flowSpec.isExplain()) {
long startTime = System.currentTimeMillis();
specStore.addSpec(spec);
if (isUpdate) {
specStore.updateSpec(spec, modifiedWatermark);
} else {
specStore.addSpec(spec);
}
metrics.updatePutSpecTime(startTime);
}
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
} catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
String operation = isUpdate ? "update" : "add";
throw new RuntimeException("Cannot " + operation + " Spec to Spec store: " + flowSpec, e);
} finally {
syncObject.notifyAll();
this.specSyncObjects.remove(flowSpec.getUri().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ protected interface CheckedFunction<T, R> {
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec) "
+ "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
// Keep previous syntax that update only update spec and spec_json
//todo: do we need change this behavior so that everything can be updated
protected static final String UPDATE_STATEMENT = "UPDATE %s SET spec=?,spec_json=? WHERE spec_uri=? AND UNIX_TIMESTAMP(modified_time) < ?";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM %s WHERE ";
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
Expand All @@ -91,6 +94,7 @@ protected interface CheckedFunction<T, R> {
* between statements, collect them within this inner class that enables selective, per-statement override, and delivers them as a unit.
*/
protected class SqlStatements {
public final String updateStatement = String.format(getTablelessUpdateStatement(), MysqlBaseSpecStore.this.tableName);
public final String existsStatement = String.format(getTablelessExistsStatement(), MysqlBaseSpecStore.this.tableName);
public final String insertStatement = String.format(getTablelessInsertStatement(), MysqlBaseSpecStore.this.tableName);
public final String deleteStatement = String.format(getTablelessDeleteStatement(), MysqlBaseSpecStore.this.tableName);
Expand All @@ -115,6 +119,7 @@ public Spec extractSpec(ResultSet rs) throws SQLException, IOException {
}

protected String getTablelessExistsStatement() { return MysqlBaseSpecStore.EXISTS_STATEMENT; }
protected String getTablelessUpdateStatement() { return MysqlBaseSpecStore.UPDATE_STATEMENT; }
protected String getTablelessInsertStatement() { return MysqlBaseSpecStore.INSERT_STATEMENT; }
protected String getTablelessDeleteStatement() { return MysqlBaseSpecStore.DELETE_STATEMENT; }
protected String getTablelessGetStatementBase() { return MysqlBaseSpecStore.GET_STATEMENT_BASE; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime.spec_store;

import com.google.common.base.Charsets;
import com.typesafe.config.Config;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;


public class MysqlSpecStoreWithUpdate extends MysqlSpecStore{
// In this case, when we try to insert but key is existed, we will throw exception
protected static final String INSERT_STATEMENT_WITHOUT_UPDATE = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
+ "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, spec_json) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
public MysqlSpecStoreWithUpdate(Config config, SpecSerDe specSerDe) throws IOException {
super(config, specSerDe);
}

/** Bundle all changes following from schema differences against the base class. */
protected class SpecificSqlStatementsWithUpdate extends SpecificSqlStatements {
public void completeUpdatePreparedStatement(PreparedStatement statement, Spec spec, long modifiedWatermark) throws
SQLException {
FlowSpec flowSpec = (FlowSpec) spec;
URI specUri = flowSpec.getUri();

int i = 0;

statement.setBlob(++i, new ByteArrayInputStream(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec)));
statement.setString(++i, new String(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
statement.setString(++i, specUri.toString());
statement.setLong(++i, modifiedWatermark);
}

@Override
protected String getTablelessInsertStatement() { return INSERT_STATEMENT_WITHOUT_UPDATE; }
}

@Override
protected SqlStatements createSqlStatements() {
return new SpecificSqlStatementsWithUpdate();
}

@Override
// TODO: fix to obey the `SpecStore` contract of returning the *updated* `Spec`
public Spec updateSpecImpl(Spec spec) throws IOException {
updateSpecImpl(spec, Long.MAX_VALUE);
return spec;
}

@Override
// TODO: fix to obey the `SpecStore` contract of returning the *updated* `Spec`
// Update {@link Spec} in the {@link SpecStore} when current modification time is smaller than {@link modifiedWatermark}.
public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws IOException {
withPreparedStatement(this.sqlStatements.updateStatement, statement -> {
((SpecificSqlStatementsWithUpdate)this.sqlStatements).completeUpdatePreparedStatement(statement, spec, modifiedWatermark);
int i = statement.executeUpdate();
if (i == 0) {
throw new IOException("Spec does not exist or concurrent update happens, please check current spec and update again");
}
return null; // (type: `Void`)
}, true);
return spec;
}



}
Loading

0 comments on commit ace81f7

Please sign in to comment.