-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature] temporary table(part-0): introduce TemporaryTableMgr (#43306)
Signed-off-by: silverbullet233 <[email protected]>
- Loading branch information
1 parent
654fac6
commit 97dd7ea
Showing
2 changed files
with
254 additions
and
0 deletions.
There are no files selected for viewing
168 changes: 168 additions & 0 deletions
168
fe/fe-core/src/main/java/com/starrocks/server/TemporaryTableMgr.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package com.starrocks.server; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.HashBasedTable; | ||
import com.google.common.collect.Lists; | ||
import com.google.common.collect.Maps; | ||
import com.google.common.collect.Table; | ||
import com.starrocks.common.CloseableLock; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.locks.ReadWriteLock; | ||
import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
||
// TemporaryTableMgr is used to manage all temporary tables in the cluster, | ||
// all interfaces are thread-safe. | ||
public class TemporaryTableMgr { | ||
private static final Logger LOG = LogManager.getLogger(TemporaryTableMgr.class); | ||
|
||
// TemporaryTableTable is used to manage all temporary tables created by a session, | ||
// all interfaces are thread-safe | ||
private static class TemporaryTableTable { | ||
// database id, table name, table id | ||
private Table<Long, String, Long> temporaryTables = HashBasedTable.create(); | ||
|
||
private ReadWriteLock rwLock = new ReentrantReadWriteLock(); | ||
|
||
public Long getTableId(Long databaseId, String tableName) { | ||
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.readLock())) { | ||
return temporaryTables.get(databaseId, tableName); | ||
} | ||
} | ||
|
||
public void addTable(Long databaseId, String tableName, Long tableId) { | ||
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.writeLock())) { | ||
Preconditions.checkArgument(!temporaryTables.contains(databaseId, tableName), "table already exists"); | ||
temporaryTables.put(databaseId, tableName, tableId); | ||
} | ||
} | ||
|
||
public void removeTable(Long databaseId, String tableName) { | ||
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.writeLock())) { | ||
temporaryTables.remove(databaseId, tableName); | ||
} | ||
} | ||
|
||
public Table<Long, String, Long> getAllTables() { | ||
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.readLock())) { | ||
return HashBasedTable.create(temporaryTables); | ||
} | ||
} | ||
|
||
public List<String> listTables(long databaseId) { | ||
try (CloseableLock ignored = CloseableLock.lock(this.rwLock.readLock())) { | ||
Map<String, Long> row = temporaryTables.row(databaseId); | ||
if (row == null) { | ||
return Lists.newArrayList(); | ||
} | ||
return new ArrayList<>(row.keySet()); | ||
} | ||
} | ||
|
||
} | ||
|
||
// session id -> TemporaryTableTable | ||
private Map<UUID, TemporaryTableTable> tablesMap = Maps.newConcurrentMap(); | ||
|
||
public void addTemporaryTable(UUID sessionId, long databaseId, String tableName, long tableId) { | ||
tablesMap.putIfAbsent(sessionId, new TemporaryTableTable()); | ||
TemporaryTableTable tables = tablesMap.get(sessionId); | ||
tables.addTable(databaseId, tableName, tableId); | ||
LOG.info("add temporary table, session[{}], db id[{}], table name[{}], table id[{}]", | ||
sessionId.toString(), databaseId, tableName, tableId); | ||
} | ||
|
||
public Long getTable(UUID sessionId, long databaseId, String tableName) { | ||
if (!tablesMap.containsKey(sessionId)) { | ||
return null; | ||
} | ||
return tablesMap.get(sessionId).getTableId(databaseId, tableName); | ||
} | ||
|
||
public boolean tableExists(UUID sessionId, long databaseId, String tblName) { | ||
if (!tablesMap.containsKey(sessionId)) { | ||
return false; | ||
} | ||
return tablesMap.get(sessionId).getTableId(databaseId, tblName) != null; | ||
} | ||
|
||
public void dropTemporaryTable(UUID sessionId, long databaseId, String tableName) { | ||
TemporaryTableTable tables = tablesMap.get(sessionId); | ||
if (tables == null) { | ||
return; | ||
} | ||
tables.removeTable(databaseId, tableName); | ||
LOG.info("drop temporary table, session[{}], db id[{}], table name[{}]", | ||
sessionId.toString(), databaseId, tableName); | ||
} | ||
|
||
public Table<Long, String, Long> getTemporaryTables(UUID sessionId) { | ||
TemporaryTableTable tables = tablesMap.get(sessionId); | ||
if (tables == null) { | ||
return HashBasedTable.create(); | ||
} | ||
return tables.getAllTables(); | ||
} | ||
|
||
public void removeTemporaryTables(UUID sessionId) { | ||
tablesMap.remove(sessionId); | ||
LOG.info("remove all temporary tables in session[{}]", sessionId.toString()); | ||
} | ||
|
||
public List<String> listTemporaryTables(UUID sessionId, long databaseId) { | ||
TemporaryTableTable tables = tablesMap.get(sessionId); | ||
if (tables == null) { | ||
return Lists.newArrayList(); | ||
} | ||
return tables.listTables(databaseId); | ||
} | ||
|
||
// get all temporary tables under specific databases, return a Table<databaseId, sessionId, tableId> | ||
public Table<Long, UUID, Long> getAllTemporaryTables(Set<Long> requiredDatabaseIds) { | ||
Table<Long, UUID, Long> result = HashBasedTable.create(); | ||
tablesMap.forEach((sessionId, tables) -> { | ||
// db id -> table name -> table id | ||
Table<Long, String, Long> allTables = tables.getAllTables(); | ||
for (Table.Cell<Long, String, Long> cell : allTables.cellSet()) { | ||
if (requiredDatabaseIds.contains(cell.getRowKey())) { | ||
result.put(cell.getRowKey(), sessionId, cell.getValue()); | ||
} | ||
} | ||
|
||
}); | ||
return result; | ||
} | ||
|
||
public Set<UUID> listSessions() { | ||
return tablesMap.keySet(); | ||
} | ||
|
||
|
||
@VisibleForTesting | ||
public void clear() { | ||
if (tablesMap != null) { | ||
tablesMap.clear(); | ||
} | ||
} | ||
} |
86 changes: 86 additions & 0 deletions
86
fe/fe-core/src/test/java/com/starrocks/server/TemporaryTableMgrTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package com.starrocks.server; | ||
|
||
|
||
import com.google.common.collect.Table; | ||
import com.starrocks.common.util.UUIDUtil; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.UUID; | ||
|
||
public class TemporaryTableMgrTest { | ||
|
||
@Test | ||
public void testBasicOperations() throws Exception { | ||
TemporaryTableMgr temporaryTableMgr = new TemporaryTableMgr(); | ||
UUID sessionId1 = UUIDUtil.genUUID(); | ||
UUID sessionId2 = UUIDUtil.genUUID(); | ||
|
||
temporaryTableMgr.addTemporaryTable(sessionId1, 1L, "table1", 1L); | ||
temporaryTableMgr.addTemporaryTable(sessionId1, 2L, "table2", 2L); | ||
temporaryTableMgr.addTemporaryTable(sessionId2, 1L, "table1", 3L); | ||
|
||
Assert.assertTrue(temporaryTableMgr.tableExists(sessionId1, 1L, "table1")); | ||
long tableId = temporaryTableMgr.getTable(sessionId1, 1L, "table1"); | ||
Assert.assertEquals(tableId, 1L); | ||
|
||
Assert.assertFalse(temporaryTableMgr.tableExists(sessionId1, 1L, "table2")); | ||
Assert.assertEquals(temporaryTableMgr.getTable(sessionId1, 1L, "table2"), null); | ||
|
||
{ | ||
List<String> tables = temporaryTableMgr.listTemporaryTables(sessionId1, 1L); | ||
List<String> expected = Arrays.asList("table1"); | ||
Assert.assertTrue(tables.size() == expected.size() && tables.containsAll(expected)); | ||
|
||
tables = temporaryTableMgr.listTemporaryTables(UUIDUtil.genUUID(), 1L); | ||
Assert.assertTrue(tables.isEmpty()); | ||
} | ||
|
||
{ | ||
List<String> tables = temporaryTableMgr.listTemporaryTables(sessionId1, 3L); | ||
Assert.assertTrue(tables.isEmpty()); | ||
} | ||
|
||
{ | ||
Set<Long> dbIds = new HashSet<>(Arrays.asList(1L)); | ||
Table<Long, UUID, Long> actual = temporaryTableMgr.getAllTemporaryTables(dbIds); | ||
|
||
Assert.assertEquals(actual.size(), 2); | ||
Assert.assertTrue(actual.containsRow(1L)); | ||
Assert.assertTrue(actual.row(1L).size() == 2); | ||
Assert.assertTrue(actual.row(1L).get(sessionId1) == 1L); | ||
Assert.assertTrue(actual.row(1L).get(sessionId2) == 3L); | ||
} | ||
|
||
Assert.assertEquals(temporaryTableMgr.listSessions().size(), 2); | ||
|
||
temporaryTableMgr.dropTemporaryTable(sessionId1, 1L, "table1"); | ||
Assert.assertFalse(temporaryTableMgr.tableExists(sessionId1, 1L, "table1")); | ||
|
||
temporaryTableMgr.dropTemporaryTable(UUIDUtil.genUUID(), 1L, "table1"); | ||
|
||
temporaryTableMgr.removeTemporaryTables(sessionId1); | ||
|
||
Assert.assertEquals(temporaryTableMgr.listSessions().size(), 1); | ||
|
||
temporaryTableMgr.clear(); | ||
} | ||
} |