Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28029: Make unit tests based on TxnCommandsBaseForTests/DbTxnManagerEndToEndTestBase run on Tez #5559

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5314aae
HIVE-28029: Make unit tests based on TxnCommandsBaseForTests/DbTxnMan…
kasakrisz Nov 25, 2024
71994b5
clear session id stored in base hiveconf when executing statement in …
kasakrisz Nov 25, 2024
f9b44fc
extract common tez setup and TEST_DATA_DIR
kasakrisz Nov 25, 2024
395ebb7
adress review comments: use HiveConfForTest
kasakrisz Nov 26, 2024
c0af530
update tests
kasakrisz Nov 28, 2024
fb41ae8
fix assert vectorization
kasakrisz Nov 28, 2024
7ca9475
fix grouping size
kasakrisz Dec 2, 2024
115327a
inline asserVectorize to subclasses
kasakrisz Dec 2, 2024
ed2bed3
fix testNonAcidToAcidConversion
kasakrisz Dec 2, 2024
51b23c7
testFailHeartbeater
kasakrisz Dec 2, 2024
c01da5f
testCleanerForTxnToWriteId
kasakrisz Dec 3, 2024
f1e30e5
testDynPartUpdateWithAborts
kasakrisz Dec 3, 2024
2c90b6f
testOrcPPD
kasakrisz Dec 3, 2024
dd9fcaa
testFileSystemUnCaching
kasakrisz Dec 5, 2024
1ddce00
testCleaner2
kasakrisz Dec 5, 2024
7cc829c
testRenameTable
kasakrisz Dec 5, 2024
e5cc0e2
addPartitionRename
kasakrisz Dec 5, 2024
93a2d8d
testConcatenateMM
kasakrisz Dec 5, 2024
d25dd5f
INPUT__FILE__NAME can be vectorized
kasakrisz Dec 5, 2024
4606931
extract databaseProduct
kasakrisz Dec 5, 2024
6cb6bbd
remove isVectorized flag
kasakrisz Dec 5, 2024
f141ba2
move back HiveConfForTest
kasakrisz Dec 5, 2024
094aad8
remove isVectorized flag #2
kasakrisz Dec 6, 2024
c94cb72
rollback to debug logging
kasakrisz Dec 6, 2024
fb96f80
remove testFileSystemUnCaching
kasakrisz Dec 6, 2024
64b5419
add stacktrace to diagnostics
kasakrisz Dec 7, 2024
7234ade
rename checkResult to checkResultAndVectorization and add new checkRe…
kasakrisz Dec 7, 2024
eff1cb7
create more than one bucket when updating converted table
kasakrisz Dec 7, 2024
fe6edd3
remove duplicates
kasakrisz Dec 9, 2024
c6973f9
sonar
kasakrisz Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
Expand Down Expand Up @@ -50,9 +51,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -64,12 +63,6 @@
* Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes.
*/
public abstract class CompactorOnTezTest {
private static final AtomicInteger RANDOM_INT = new AtomicInteger(new Random().nextInt());
private static final String TEST_DATA_DIR = new File(
System.getProperty("java.io.tmpdir") + File.separator + TestCrudCompactorOnTez.class
.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + RANDOM_INT
.getAndIncrement()).getPath().replaceAll("\\\\", "/");
private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
static final String CUSTOM_COMPACTION_QUEUE = "my_compaction_test_queue";

protected HiveConf conf;
Expand All @@ -85,7 +78,7 @@ public abstract class CompactorOnTezTest {
@Before
// Note: we create a new conf and driver object before every test
public void setup() throws Exception {
HiveConf hiveConf = new HiveConf(this.getClass());
HiveConfForTest hiveConf = new HiveConfForTest(this.getClass());
setupWithConf(hiveConf);
}

Expand All @@ -94,19 +87,20 @@ public static void setupClass() throws Exception {
tmpFolder = folder.newFolder().getAbsolutePath();
}

protected void setupWithConf(HiveConf hiveConf) throws Exception {
File f = new File(TEST_WAREHOUSE_DIR);
protected void setupWithConf(HiveConfForTest hiveConf) throws Exception {
String testWarehouseDir = hiveConf.getTestDataDir() + "/warehouse";
File f = new File(testWarehouseDir);
if (f.exists()) {
FileUtil.fullyDelete(f);
}
if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
if (!(new File(testWarehouseDir).mkdirs())) {
throw new RuntimeException("Could not create " + testWarehouseDir);
}
hiveConf.setVar(HiveConf.ConfVars.PRE_EXEC_HOOKS, "");
hiveConf.setVar(HiveConf.ConfVars.POST_EXEC_HOOKS, "");
hiveConf.setVar(HiveConf.ConfVars.METASTORE_WAREHOUSE, TEST_WAREHOUSE_DIR);
hiveConf.setVar(HiveConf.ConfVars.HIVE_INPUT_FORMAT, HiveInputFormat.class.getName());
hiveConf.setVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "none");
MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.WAREHOUSE, testWarehouseDir);
MetastoreConf.setTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS);
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
Expand All @@ -116,27 +110,23 @@ protected void setupWithConf(HiveConf hiveConf) throws Exception {
TestTxnDbUtil.prepDb(hiveConf);
conf = hiveConf;
// Use tez as execution engine for this test class
setupTez(conf);
setupTez(hiveConf);
msClient = new HiveMetaStoreClient(conf);
driver = DriverFactory.newDriver(conf);
SessionState.start(new CliSessionState(conf));
}

private void setupTez(HiveConf conf) {
private void setupTez(HiveConfForTest conf) {
conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
conf.set("tez.am.resource.memory.mb", "128");
conf.set("tez.am.dag.scheduler.class",
"org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
conf.setBoolean("tez.local.mode", true);
conf.setBoolean("tez.local.mode.without.network", true);
conf.set("fs.defaultFS", "file:///");
conf.setBoolean("tez.runtime.optimize.local.fetch", true);
conf.set("tez.staging-dir", TEST_DATA_DIR);
conf.set("tez.staging-dir", conf.getTestDataDir());
conf.setBoolean("tez.ignore.lib.uris", true);
conf.set("hive.tez.container.size", "128");
conf.setBoolean("hive.merge.tezfiles", false);
conf.setBoolean("hive.in.tez.test", true);
if (!mmCompaction) {
// We need these settings to create a table which is not bucketed, but contains multiple files.
// If these parameters are set when inserting 100 rows into the table, the rows will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ public int monitorExecution() {
} catch (IOException | TezException tezException) {
// best effort
}
console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e));
String reportedException = "Execution has failed, stack trace: " + ExceptionUtils.getStackTrace(e);
console.printError(reportedException);
diagnostics.append(reportedException);
rc = 1;
done = true;
} else {
Expand Down
45 changes: 22 additions & 23 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ protected String getTestDataDir() {


@Test
public void addPartition() throws Exception {

addPartition(false);
public void testAddPartition() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
deniskuzZ marked this conversation as resolved.
Show resolved Hide resolved
addPartition();
}

@Test
public void addPartitionVectorized() throws Exception {
public void testAddPartitionVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
addPartition(true);
addPartition();
}

/**
Expand All @@ -80,7 +80,7 @@ public void addPartitionVectorized() throws Exception {
* adding partition when it already exists
* adding partition when it already exists with "if not exists"
*/
private void addPartition(boolean isVectorized) throws Exception {
private void addPartition() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");
runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc" +
Expand All @@ -97,8 +97,7 @@ private void addPartition(boolean isVectorized) throws Exception {
" PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" +
" PARTITION (p=2)");

String testQuery = isVectorized ? "select ROW__ID, p, a, b from T order by p, ROW__ID" :
"select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID";
String testQuery = "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID";
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2",
"warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
Expand All @@ -108,7 +107,7 @@ private void addPartition(boolean isVectorized) throws Exception {
"warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t0\t4",
"warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG);
checkResultAndVectorization(expected, testQuery, "add 2 parts w/data and 1 empty", LOG);

runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'");
//should be an error since p=3 exists
Expand Down Expand Up @@ -136,18 +135,19 @@ private void addPartition(boolean isVectorized) throws Exception {
"warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t3\t0\t4",
"warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}};
checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG);
checkResultAndVectorization(expected2, testQuery, "add 2 existing parts and 1 empty", LOG);
}

@Test
public void addPartitionMM() throws Exception {
addPartitionMM(false);
public void testAddPartitionMM() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
addPartitionMM();
}

@Test
public void addPartitionMMVectorized() throws Exception {
public void testAddPartitionMMVectorized() throws Exception {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
addPartitionMM(true);
addPartitionMM();
}

/**
Expand All @@ -157,7 +157,7 @@ public void addPartitionMMVectorized() throws Exception {
* adding partition when it already exists
* adding partition when it already exists with "if not exists"
*/
private void addPartitionMM(boolean isVectorized) throws Exception {
private void addPartitionMM() throws Exception {
runStatementOnDriver("drop table if exists T");
runStatementOnDriver("drop table if exists Tstage");

Expand All @@ -175,14 +175,13 @@ private void addPartitionMM(boolean isVectorized) throws Exception {
" PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" +
" PARTITION (p=2)");

String testQuery = isVectorized ? "select p, a, b from T order by p, a, b" :
"select p, a, b, INPUT__FILE__NAME from T order by p, a, b";
String testQuery = "select p, a, b, INPUT__FILE__NAME from T order by p, a, b";
String[][] expected = new String[][]{
{"0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"0\t0\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"1\t0\t2", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}};
checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG);
checkResultAndVectorization(expected, testQuery, "add 2 parts w/data and 1 empty", LOG);

runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'");
//should be an error since p=3 exists
Expand All @@ -203,7 +202,7 @@ private void addPartitionMM(boolean isVectorized) throws Exception {
{"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"},
{"3\t0\t2", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"},
{"3\t0\t4", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}};
checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG);
checkResultAndVectorization(expected2, testQuery, "add 2 existing parts and 1 empty", LOG);
}

@Test
Expand Down Expand Up @@ -232,7 +231,7 @@ public void addPartitionBucketed() throws Exception {
}

private void checkExpected(List<String> rs, String[][] expected, String msg) {
super.checkExpected(rs, expected, msg, LOG, true);
super.checkExpected(rs, expected, msg, LOG);
}

/**
Expand All @@ -249,7 +248,7 @@ public void addPartitionRename() throws Exception {
runStatementOnDriver("create table Tstage (a int, b int) clustered by (a) into 2 " +
"buckets stored as orc tblproperties('transactional'='false')");

runStatementOnDriver("insert into Tstage values(0,2),(1,4)");
runStatementOnDriver("insert into Tstage values(0,2),(2,4)");
runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'");
FileSystem fs = FileSystem.get(hiveConf);
fs.rename(new Path(getWarehouseDir() + "/1/data/000000_0"), new Path(getWarehouseDir() + "/1/data/part-m000"));
Expand All @@ -261,9 +260,9 @@ public void addPartitionRename() throws Exception {
List<String> rs = runStatementOnDriver(
"select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID");
String[][] expected = new String[][]{
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2\t4",
"warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t0\t2",
"warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t1\t4",
"warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}};
checkExpected(rs, expected, "add partition (p=0)");
}
Expand Down
11 changes: 5 additions & 6 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ private static final class QueryRunnable implements Runnable {
this.cdlIn = cdlIn;
this.cdlOut = cdlOut;
this.hiveConf = new HiveConf(hiveConf);
this.hiveConf.unset(HiveConf.ConfVars.HIVE_SESSION_ID.varname);
}

@Override
Expand Down Expand Up @@ -1424,15 +1425,14 @@ public void testNonAcidToAcidConversion01() throws Exception {
//create a delta directory
runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)");

boolean isVectorized = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
String query = "select ROW__ID, a, b" + (isVectorized ? " from " : ", INPUT__FILE__NAME from ") + Table.NONACIDORCTBL + " order by ROW__ID";
String query = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID";
String[][] expected = new String[][] {
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/000001_0"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/000001_0_copy_1"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/000001_0_copy_1"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001_0"}
};
checkResult(expected, query, isVectorized, "before compact", LOG);
checkResultAndVectorization(expected, query, "before compact", LOG);

Assert.assertEquals(536870912,
BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0)));
Expand All @@ -1443,15 +1443,14 @@ public void testNonAcidToAcidConversion01() throws Exception {
runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " compact 'major'");
runWorker(hiveConf);

query = "select ROW__ID, a, b" + (isVectorized ? "" : ", INPUT__FILE__NAME") + " from "
+ Table.NONACIDORCTBL + " order by ROW__ID";
query = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID";
String[][] expected2 = new String[][] {
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000009/bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000009/bucket_00001"},
{"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000009/bucket_00001"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000009/bucket_00001"}
};
checkResult(expected2, query, isVectorized, "after major compact", LOG);
checkResultAndVectorization(expected2, query, "after major compact", LOG);
//make sure they are the same before and after compaction
}
//@Ignore("see bucket_num_reducers_acid.q")
Expand Down
Loading
Loading