Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch loading sometimes missing a records #188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class BigQuerySinkTask extends SinkTask {
private GCSToBQWriter gcsToBQWriter;
private BigQuerySinkTaskConfig config;
private SinkRecordConverter recordConverter;
private Map<String, TableId> topicsToBaseTableIds = new HashMap<>();

private boolean useMessageTimeDatePartitioning;
private boolean usePartitionDecorator;
Expand Down Expand Up @@ -198,6 +199,7 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
if (sanitize) {
tableName = FieldNameSanitizer.sanitizeName(tableName);
}

TableId baseTableId = TableId.of(dataset, tableName);
if (upsertDelete) {
TableId intermediateTableId = mergeBatches.intermediateTableFor(baseTableId);
Expand Down Expand Up @@ -247,7 +249,8 @@ public void put(Collection<SinkRecord> records) {
TableWriterBuilder tableWriterBuilder;
if (config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).contains(record.topic())) {
String topic = record.topic();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli();
long offset = record.kafkaOffset();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli()+"_"+records.size()+"_"+offset;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would having a test case for validating that parallel puts create different files with the right offset help?

Copy link
Author

@jeonguihyeong jeonguihyeong Apr 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcp side

  • you can use gcs versioning and how many times write gcs name.

bigquery sink

  • it's concept is used s3 sink. i convert to bigquery sink.
    it is using first offset of records. and it is not corrupted because kafka message offset is unique.

if you need more comment, connect me.

String gcsFolderName = config.getString(BigQuerySinkConfig.GCS_FOLDER_NAME_CONFIG);
if (gcsFolderName != null && !"".equals(gcsFolderName)) {
gcsBlobName = gcsFolderName + "/" + gcsBlobName;
Expand Down Expand Up @@ -467,6 +470,33 @@ public void start(Map<String, String> properties) {
config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG);
sanitize =
config.getBoolean(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG);

List<String> loadGCS = config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG);

for(String sLoadGCS : loadGCS) {

String tableName;
String dataset = config.getString(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG);
String[] smtReplacement = sLoadGCS.split(":");

if (smtReplacement.length == 2) {
dataset = smtReplacement[0];
tableName = smtReplacement[1];
} else if (smtReplacement.length == 1) {
tableName = smtReplacement[0];
} else {
throw new ConnectException(String.format(
"Incorrect regex replacement format in topic name '%s'. "
+ "SMT replacement should either produce the <dataset>:<tableName> format "
+ "or just the <tableName> format.",
"ERROR"
));
}

TableId baseTableId = TableId.of(dataset, tableName);
topicsToBaseTableIds.put(sLoadGCS,baseTableId);
}

if (config.getBoolean(BigQuerySinkTaskConfig.GCS_BQ_TASK_CONFIG)) {
startGCSToBQLoadTask();
} else if (upsertDelete) {
Expand Down Expand Up @@ -499,7 +529,8 @@ private void startGCSToBQLoadTask() {
));
}
}
GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket);

GCSToBQLoadRunnable loadRunnable = new GCSToBQLoadRunnable(getBigQuery(), bucket, topicsToBaseTableIds);

int intervalSec = config.getInt(BigQuerySinkConfig.BATCH_LOAD_INTERVAL_SEC_CONFIG);
loadExecutor.scheduleAtFixedRate(loadRunnable, intervalSec, intervalSec, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.StorageException;


import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter;

import org.slf4j.Logger;
Expand Down Expand Up @@ -75,15 +77,18 @@ public class GCSToBQLoadRunnable implements Runnable {
private static String SOURCE_URI_FORMAT = "gs://%s/%s";
public static final Pattern METADATA_TABLE_PATTERN =
Pattern.compile("((?<project>[^:]+):)?(?<dataset>[^.]+)\\.(?<table>.+)");

private final Set<TableId> targetTableIds;
/**
* Create a {@link GCSToBQLoadRunnable} with the given bigquery, bucket, and ms wait interval.
* Create a {@link GCSToBQLoadRunnable} with the given bigquery, bucket, target tables and ms wait interval.
* @param bigQuery the {@link BigQuery} instance.
* @param bucket the the GCS bucket to read from.
* @param topicsToBaseTableIds target tables to write to
*/
public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket) {
public GCSToBQLoadRunnable(BigQuery bigQuery, Bucket bucket, Map<String, TableId> topicsToBaseTableIds) {
this.bigQuery = bigQuery;
this.bucket = bucket;
this.targetTableIds = topicsToBaseTableIds.values().stream().collect(Collectors.toSet());
this.activeJobs = new HashMap<>();
this.claimedBlobIds = new HashSet<>();
this.deletableBlobIds = new HashSet<>();
Expand Down Expand Up @@ -111,12 +116,16 @@ private Map<TableId, List<Blob>> getBlobsUpToLimit() {
TableId table = getTableFromBlob(blob);
logger.debug("Checking blob bucket={}, name={}, table={} ", blob.getBucket(), blob.getName(), table );

if (table == null || claimedBlobIds.contains(blobId) || deletableBlobIds.contains(blobId)) {
if (table == null
|| claimedBlobIds.contains(blobId)
|| deletableBlobIds.contains(blobId)
|| !targetTableIds.contains(table)) {
// don't do anything if:
// 1. we don't know what table this should be uploaded to or
// 2. this blob is already claimed by a currently-running job or
// 3. this blob is up for deletion.
continue;
// 4. this blob is not targeted for our target tables
continue;
}

if (!tableToURIs.containsKey(table)) {
Expand Down Expand Up @@ -233,7 +242,13 @@ private void checkJobs() {
logger.debug("Checking next job: {}", job.getJobId());

try {
//waiting load job until finished
job = job.waitFor();
if (job.isDone()) {
// log a message job's rows count
LoadStatistics stats = job.getStatistics();
logger.trace("Job is row count: id={}, count={}", job.getJobId(), stats.getOutputRows());

logger.trace("Job is marked done: id={}, status={}", job.getJobId(), job.getStatus());
List<BlobId> blobIdsToDelete = jobEntry.getValue();
jobIterator.remove();
Expand All @@ -244,7 +259,7 @@ private void checkJobs() {
deletableBlobIds.addAll(blobIdsToDelete);
logger.trace("Completed blobs marked as deletable: {}", blobIdsToDelete);
}
} catch (BigQueryException ex) {
} catch (BigQueryException | InterruptedException ex) {
// log a message.
logger.warn("GCS to BQ load job failed", ex);
// remove job from active jobs (it's not active anymore)
Expand Down