Skip to content

Commit

Permalink
feat: caches watermark for Spanner change streams
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagotnunes committed Jan 13, 2025
1 parent 0b2e57c commit d8b47c9
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
Expand Down Expand Up @@ -1594,6 +1595,8 @@ public abstract static class ReadChangeStream
@Deprecated
abstract @Nullable Double getTraceSampleProbability();

abstract @Nullable Duration getWatermarkRefreshRate();

abstract Builder toBuilder();

@AutoValue.Builder
Expand All @@ -1617,6 +1620,8 @@ abstract static class Builder {

abstract Builder setTraceSampleProbability(Double probability);

abstract Builder setWatermarkRefreshRate(Duration refreshRate);

abstract ReadChangeStream build();
}

Expand Down Expand Up @@ -1703,6 +1708,10 @@ public ReadChangeStream withTraceSampleProbability(Double probability) {
return toBuilder().setTraceSampleProbability(probability).build();
}

public ReadChangeStream withWatermarkRefreshRate(Duration refreshRate) {
return toBuilder().setWatermarkRefreshRate(refreshRate).build();
}

@Override
public PCollection<DataChangeRecord> expand(PBegin input) {
checkArgument(
Expand Down Expand Up @@ -1803,10 +1812,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
metadataDatabaseDialect);
final ActionFactory actionFactory = new ActionFactory();

final CacheFactory cacheFactory = new CacheFactory(daoFactory, getWatermarkRefreshRate());

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
new DetectNewPartitionsDoFn(
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
Expand Down Expand Up @@ -151,12 +152,17 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
public synchronized DetectNewPartitionsAction detectNewPartitionsAction(
PartitionMetadataDao partitionMetadataDao,
PartitionMetadataMapper partitionMetadataMapper,
WatermarkCache watermarkCache,
ChangeStreamMetrics metrics,
Duration resumeDuration) {
if (detectNewPartitionsActionInstance == null) {
detectNewPartitionsActionInstance =
new DetectNewPartitionsAction(
partitionMetadataDao, partitionMetadataMapper, metrics, resumeDuration);
partitionMetadataDao,
partitionMetadataMapper,
watermarkCache,
metrics,
resumeDuration);
}
return detectNewPartitionsActionInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
Expand All @@ -50,17 +51,20 @@ public class DetectNewPartitionsAction {

private final PartitionMetadataDao dao;
private final PartitionMetadataMapper mapper;
private WatermarkCache cache;
private final ChangeStreamMetrics metrics;
private final Duration resumeDuration;

/** Constructs an action class for detecting / scheduling new partitions. */
public DetectNewPartitionsAction(
PartitionMetadataDao dao,
PartitionMetadataMapper mapper,
WatermarkCache cache,
ChangeStreamMetrics metrics,
Duration resumeDuration) {
this.dao = dao;
this.mapper = mapper;
this.cache = cache;
this.metrics = metrics;
this.resumeDuration = resumeDuration;
}
Expand Down Expand Up @@ -98,7 +102,7 @@ public ProcessContinuation run(

final Timestamp readTimestamp = tracker.currentRestriction().getFrom();
// Updates the current watermark as the min of the watermarks from all existing partitions
final Timestamp minWatermark = dao.getUnfinishedMinWatermark();
final Timestamp minWatermark = cache.getUnfinishedMinWatermark();

if (minWatermark != null) {
return processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.joda.time.Duration;

@SuppressWarnings({"initialization.fields.uninitialized", "nullness"})
public class CacheFactory implements Serializable {

private static final long serialVersionUID = -8722905670370252723L;
private final DaoFactory daoFactory;
private final @Nullable Duration refreshRate;
private transient WatermarkCache watermarkCacheInstance;

public CacheFactory(DaoFactory daoFactory, @Nullable Duration watermarkRefreshRate) {
this.daoFactory = daoFactory;
this.refreshRate = watermarkRefreshRate;
}

public synchronized WatermarkCache getWatermarkCache() {
if (watermarkCacheInstance == null) {
if (refreshRate == null) {
watermarkCacheInstance = new NoOpWatermarkCache(daoFactory.getPartitionMetadataDao());
} else {
watermarkCacheInstance =
new LoadingWatermarkCache(daoFactory.getPartitionMetadataDao(), refreshRate);
}
}
return watermarkCacheInstance;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;

public class LoadingWatermarkCache implements WatermarkCache {
private static final Object MIN_WATERMARK_KEY = new Object();

private final LoadingCache<Object, Optional<Timestamp>> cache;

public LoadingWatermarkCache(PartitionMetadataDao dao, org.joda.time.Duration refreshRate) {
this.cache =
CacheBuilder.newBuilder()
.refreshAfterWrite(Duration.ofMillis(refreshRate.getMillis()))
.build(
new CacheLoader<Object, Optional<Timestamp>>() {
@Override
public Optional<Timestamp> load(Object key) {
return Optional.ofNullable(dao.getUnfinishedMinWatermark());
}
});
}

@Override
public @Nullable Timestamp getUnfinishedMinWatermark() {
try {
return cache.get(MIN_WATERMARK_KEY).orElse(null);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;

public class NoOpWatermarkCache implements WatermarkCache {

private final PartitionMetadataDao dao;

public NoOpWatermarkCache(PartitionMetadataDao dao) {
this.dao = dao;
}

@Override
public @Nullable Timestamp getUnfinishedMinWatermark() {
return dao.getUnfinishedMinWatermark();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import javax.annotation.Nullable;

public interface WatermarkCache {
@Nullable
Timestamp getUnfinishedMinWatermark();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Caching strategy for watermark. */
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DetectNewPartitionsAction;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.CacheFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.cache.WatermarkCache;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class DetectNewPartitionsDoFn extends DoFn<PartitionMetadata, PartitionMe
private final MapperFactory mapperFactory;
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
private final CacheFactory cacheFactory;
private long averagePartitionBytesSize;
private boolean averagePartitionBytesSizeSet;
private transient DetectNewPartitionsAction detectNewPartitionsAction;
Expand All @@ -81,10 +84,12 @@ public DetectNewPartitionsDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
CacheFactory cacheFactory,
ChangeStreamMetrics metrics) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.cacheFactory = cacheFactory;
this.metrics = metrics;
this.resumeDuration = DEFAULT_RESUME_DURATION;
this.averagePartitionBytesSizeSet = false;
Expand Down Expand Up @@ -143,9 +148,10 @@ public DetectNewPartitionsRangeTracker newTracker(@Restriction TimestampRange re
public void setup() {
final PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
final PartitionMetadataMapper partitionMetadataMapper = mapperFactory.partitionMetadataMapper();
final WatermarkCache watermarkCache = cacheFactory.getWatermarkCache();
this.detectNewPartitionsAction =
actionFactory.detectNewPartitionsAction(
partitionMetadataDao, partitionMetadataMapper, metrics, resumeDuration);
partitionMetadataDao, partitionMetadataMapper, watermarkCache, metrics, resumeDuration);
}

/**
Expand Down
Loading

0 comments on commit d8b47c9

Please sign in to comment.