Skip to content

Commit

Permalink
Make shard cleanup more efficient
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksei Statkevich committed Oct 11, 2016
1 parent 960edc5 commit 5da44f2
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface H2ShardDao

@SqlUpdate("DELETE FROM transactions\n" +
"WHERE end_time < :maxEndTime\n" +
" AND successful IS NOT NULL\n" +
" AND successful IN (TRUE, FALSE)\n" +
" AND transaction_id NOT IN (SELECT transaction_id FROM created_shards)\n" +
"LIMIT " + CLEANUP_TRANSACTIONS_BATCH_SIZE)
void deleteOldCompletedTransactions(@Bind("maxEndTime") Timestamp maxEndTime);
int deleteOldCompletedTransactions(@Bind("maxEndTime") Timestamp maxEndTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public interface MySqlShardDao
// 'order by' is needed in this statement in order to make it compatible with statement-based replication
@SqlUpdate("DELETE FROM transactions\n" +
"WHERE end_time < :maxEndTime\n" +
" AND successful IS NOT NULL\n" +
" AND successful IN (TRUE, FALSE)\n" +
" AND transaction_id NOT IN (SELECT transaction_id FROM created_shards)\n" +
"ORDER BY transaction_id\n" +
"ORDER BY end_time, transaction_id\n" +
"LIMIT " + CLEANUP_TRANSACTIONS_BATCH_SIZE)
void deleteOldCompletedTransactions(@Bind("maxEndTime") Timestamp maxEndTime);
int deleteOldCompletedTransactions(@Bind("maxEndTime") Timestamp maxEndTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.raptor.metadata.ShardDao.CLEANABLE_SHARDS_BATCH_SIZE;
import static com.facebook.presto.raptor.metadata.ShardDao.CLEANUP_TRANSACTIONS_BATCH_SIZE;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand Down Expand Up @@ -298,7 +299,12 @@ void deleteOldShards()
@VisibleForTesting
void deleteOldCompletedTransactions()
{
dao.deleteOldCompletedTransactions(maxTimestamp(maxCompletedTransactionAge));
while (!Thread.currentThread().isInterrupted()) {
int deleted = dao.deleteOldCompletedTransactions(maxTimestamp(maxCompletedTransactionAge));
if (deleted < CLEANUP_TRANSACTIONS_BATCH_SIZE) {
break;
}
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public interface ShardDao
{
int CLEANABLE_SHARDS_BATCH_SIZE = 1000;
int CLEANUP_TRANSACTIONS_BATCH_SIZE = 1_000_000;
int CLEANUP_TRANSACTIONS_BATCH_SIZE = 10_000;

@SqlUpdate("INSERT INTO nodes (node_identifier) VALUES (:nodeIdentifier)")
@GetGeneratedKeys
Expand Down Expand Up @@ -206,5 +206,5 @@ void updateBucketNode(
@Bind("bucketNumber") int bucketNumber,
@Bind("nodeId") int nodeId);

void deleteOldCompletedTransactions(@Bind("maxEndTime") Timestamp maxEndTime);
int deleteOldCompletedTransactions(@Bind("maxEndTime") Timestamp maxEndTime);
}

0 comments on commit 5da44f2

Please sign in to comment.