diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index 5118bd0bbc27..fdbb454de413 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -50,9 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -64,12 +63,6 @@ * Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes. */ public abstract class CompactorOnTezTest { - private static final AtomicInteger RANDOM_INT = new AtomicInteger(new Random().nextInt()); - private static final String TEST_DATA_DIR = new File( - System.getProperty("java.io.tmpdir") + File.separator + TestCrudCompactorOnTez.class - .getCanonicalName() + "-" + System.currentTimeMillis() + "_" + RANDOM_INT - .getAndIncrement()).getPath().replaceAll("\\\\", "/"); - private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; static final String CUSTOM_COMPACTION_QUEUE = "my_compaction_test_queue"; protected HiveConf conf; @@ -85,7 +78,7 @@ public abstract class CompactorOnTezTest { @Before // Note: we create a new conf and driver object before every test public void setup() throws Exception { - HiveConf hiveConf = new HiveConf(this.getClass()); + HiveConfForTest hiveConf = new HiveConfForTest(this.getClass()); setupWithConf(hiveConf); } @@ -94,19 +87,20 @@ public static void setupClass() throws Exception { tmpFolder = folder.newFolder().getAbsolutePath(); } - protected void setupWithConf(HiveConf hiveConf) throws Exception { - File f = new File(TEST_WAREHOUSE_DIR); + protected void setupWithConf(HiveConfForTest hiveConf) throws Exception { + String testWarehouseDir = hiveConf.getTestDataDir() + "/warehouse"; + File f = new File(testWarehouseDir); if (f.exists()) { FileUtil.fullyDelete(f); } - if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) { - throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR); + if (!(new File(testWarehouseDir).mkdirs())) { + throw new RuntimeException("Could not create " + testWarehouseDir); } hiveConf.setVar(HiveConf.ConfVars.PRE_EXEC_HOOKS, ""); hiveConf.setVar(HiveConf.ConfVars.POST_EXEC_HOOKS, ""); - hiveConf.setVar(HiveConf.ConfVars.METASTORE_WAREHOUSE, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVE_INPUT_FORMAT, HiveInputFormat.class.getName()); hiveConf.setVar(HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "none"); + MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.WAREHOUSE, testWarehouseDir); MetastoreConf.setTimeVar(hiveConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); @@ -116,27 +110,23 @@ protected void setupWithConf(HiveConf hiveConf) throws Exception { TestTxnDbUtil.prepDb(hiveConf); conf = hiveConf; // Use tez as execution engine for this test class - setupTez(conf); + setupTez(hiveConf); msClient = new HiveMetaStoreClient(conf); driver = DriverFactory.newDriver(conf); SessionState.start(new CliSessionState(conf)); } - private void setupTez(HiveConf conf) { + private void setupTez(HiveConfForTest conf) { conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez"); - conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR); conf.set("tez.am.resource.memory.mb", "128"); conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled"); - conf.setBoolean("tez.local.mode", true); - conf.setBoolean("tez.local.mode.without.network", true); conf.set("fs.defaultFS", "file:///"); conf.setBoolean("tez.runtime.optimize.local.fetch", true); - conf.set("tez.staging-dir", TEST_DATA_DIR); + conf.set("tez.staging-dir", conf.getTestDataDir()); conf.setBoolean("tez.ignore.lib.uris", true); conf.set("hive.tez.container.size", "128"); conf.setBoolean("hive.merge.tezfiles", false); - conf.setBoolean("hive.in.tez.test", true); if (!mmCompaction) { // We need these settings to create a table which is not bucketed, but contains multiple files. // If these parameters are set when inserting 100 rows into the table, the rows will diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 0a174a6651a2..c145eb7dbdef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -279,7 +279,9 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + String reportedException = "Execution has failed, stack trace: " + ExceptionUtils.getStackTrace(e); + console.printError(reportedException); + diagnostics.append(reportedException); rc = 1; done = true; } else { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java index d07a2281a142..59766990be60 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java @@ -63,15 +63,15 @@ protected String getTestDataDir() { @Test - public void addPartition() throws Exception { - - addPartition(false); + public void testAddPartition() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + addPartition(); } @Test - public void addPartitionVectorized() throws Exception { + public void testAddPartitionVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - addPartition(true); + addPartition(); } /** @@ -80,7 +80,7 @@ public void addPartitionVectorized() throws Exception { * adding partition when it already exists * adding partition when it already exists with "if not exists" */ - private void addPartition(boolean isVectorized) throws Exception { + private void addPartition() throws Exception { runStatementOnDriver("drop table if exists T"); runStatementOnDriver("drop table if exists Tstage"); runStatementOnDriver("create table T (a int, b int) partitioned by (p int) stored as orc" + @@ -97,8 +97,7 @@ private void addPartition(boolean isVectorized) throws Exception { " PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" + " PARTITION (p=2)"); - String testQuery = isVectorized ? "select ROW__ID, p, a, b from T order by p, ROW__ID" : - "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"; + String testQuery = "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"; String[][] expected = new String[][]{ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, @@ -108,7 +107,7 @@ private void addPartition(boolean isVectorized) throws Exception { "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}}; - checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG); + checkResultAndVectorization(expected, testQuery, "add 2 parts w/data and 1 empty", LOG); runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'"); //should be an error since p=3 exists @@ -136,18 +135,19 @@ private void addPartition(boolean isVectorized) throws Exception { "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}, {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t3\t0\t4", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}}; - checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG); + checkResultAndVectorization(expected2, testQuery, "add 2 existing parts and 1 empty", LOG); } @Test - public void addPartitionMM() throws Exception { - addPartitionMM(false); + public void testAddPartitionMM() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + addPartitionMM(); } @Test - public void addPartitionMMVectorized() throws Exception { + public void testAddPartitionMMVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - addPartitionMM(true); + addPartitionMM(); } /** @@ -157,7 +157,7 @@ public void addPartitionMMVectorized() throws Exception { * adding partition when it already exists * adding partition when it already exists with "if not exists" */ - private void addPartitionMM(boolean isVectorized) throws Exception { + private void addPartitionMM() throws Exception { runStatementOnDriver("drop table if exists T"); runStatementOnDriver("drop table if exists Tstage"); @@ -175,14 +175,13 @@ private void addPartitionMM(boolean isVectorized) throws Exception { " PARTITION (p=1) location '" + getWarehouseDir() + "/2/data'" + " PARTITION (p=2)"); - String testQuery = isVectorized ? "select p, a, b from T order by p, a, b" : - "select p, a, b, INPUT__FILE__NAME from T order by p, a, b"; + String testQuery = "select p, a, b, INPUT__FILE__NAME from T order by p, a, b"; String[][] expected = new String[][]{ {"0\t0\t2", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, {"0\t0\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, {"1\t0\t2", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, {"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}}; - checkResult(expected, testQuery, isVectorized, "add 2 parts w/data and 1 empty", LOG); + checkResultAndVectorization(expected, testQuery, "add 2 parts w/data and 1 empty", LOG); runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/3'"); //should be an error since p=3 exists @@ -203,7 +202,7 @@ private void addPartitionMM(boolean isVectorized) throws Exception { {"1\t0\t4", "warehouse/t/p=1/delta_0000001_0000001_0000/000000_0"}, {"3\t0\t2", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}, {"3\t0\t4", "warehouse/t/p=3/delta_0000003_0000003_0000/000000_0"}}; - checkResult(expected2, testQuery, isVectorized, "add 2 existing parts and 1 empty", LOG); + checkResultAndVectorization(expected2, testQuery, "add 2 existing parts and 1 empty", LOG); } @Test @@ -232,7 +231,7 @@ public void addPartitionBucketed() throws Exception { } private void checkExpected(List rs, String[][] expected, String msg) { - super.checkExpected(rs, expected, msg, LOG, true); + super.checkExpected(rs, expected, msg, LOG); } /** @@ -249,7 +248,7 @@ public void addPartitionRename() throws Exception { runStatementOnDriver("create table Tstage (a int, b int) clustered by (a) into 2 " + "buckets stored as orc tblproperties('transactional'='false')"); - runStatementOnDriver("insert into Tstage values(0,2),(1,4)"); + runStatementOnDriver("insert into Tstage values(0,2),(2,4)"); runStatementOnDriver("export table Tstage to '" + getWarehouseDir() + "/1'"); FileSystem fs = FileSystem.get(hiveConf); fs.rename(new Path(getWarehouseDir() + "/1/data/000000_0"), new Path(getWarehouseDir() + "/1/data/part-m000")); @@ -261,9 +260,9 @@ public void addPartitionRename() throws Exception { List rs = runStatementOnDriver( "select ROW__ID, p, a, b, INPUT__FILE__NAME from T order by p, ROW__ID"); String[][] expected = new String[][]{ + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2\t4", + "warehouse/t/p=0/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t0\t0\t2", - "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}, - {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t0\t1\t4", "warehouse/t/p=0/delta_0000001_0000001_0000/000001_0"}}; checkExpected(rs, expected, "add partition (p=0)"); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 2344b908cebc..9e21a19dbe4b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -304,6 +304,7 @@ private static final class QueryRunnable implements Runnable { this.cdlIn = cdlIn; this.cdlOut = cdlOut; this.hiveConf = new HiveConf(hiveConf); + this.hiveConf.unset(HiveConf.ConfVars.HIVE_SESSION_ID.varname); } @Override @@ -1424,15 +1425,14 @@ public void testNonAcidToAcidConversion01() throws Exception { //create a delta directory runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,17)"); - boolean isVectorized = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - String query = "select ROW__ID, a, b" + (isVectorized ? " from " : ", INPUT__FILE__NAME from ") + Table.NONACIDORCTBL + " order by ROW__ID"; + String query = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"; String[][] expected = new String[][] { {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/000001_0"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/000001_0_copy_1"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/000001_0_copy_1"}, {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/delta_10000001_10000001_0000/bucket_00001_0"} }; - checkResult(expected, query, isVectorized, "before compact", LOG); + checkResultAndVectorization(expected, query, "before compact", LOG); Assert.assertEquals(536870912, BucketCodec.V1.encode(new AcidOutputFormat.Options(hiveConf).bucket(0))); @@ -1443,15 +1443,14 @@ public void testNonAcidToAcidConversion01() throws Exception { runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " compact 'major'"); runWorker(hiveConf); - query = "select ROW__ID, a, b" + (isVectorized ? "" : ", INPUT__FILE__NAME") + " from " - + Table.NONACIDORCTBL + " order by ROW__ID"; + query = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by ROW__ID"; String[][] expected2 = new String[][] { {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000009/bucket_00001"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000009/bucket_00001"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000009/bucket_00001"}, {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000009/bucket_00001"} }; - checkResult(expected2, query, isVectorized, "after major compact", LOG); + checkResultAndVectorization(expected2, query, "after major compact", LOG); //make sure they are the same before and after compaction } //@Ignore("see bucket_num_reducers_acid.q") diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 3386404cbf0c..483d8521440e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -205,7 +205,8 @@ public void testOrcNoPPD() throws Exception { * @throws Exception */ private void testOrcPPD(boolean enablePPD) throws Exception { - boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_OPT_INDEX_FILTER); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPT_PPD, enablePPD);//enables PPD hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPT_INDEX_FILTER, enablePPD);//enables ORC PPD //create delta_0001_0001_0000 (should push predicate here) runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}})); @@ -264,7 +265,6 @@ private void testOrcPPD(boolean enablePPD) throws Exception { List rs1 = runStatementOnDriver(query); int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}}; Assert.assertEquals("Update failed", stringifyValues(resultData), rs1); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPT_INDEX_FILTER, originalPpd); } static void assertExplainHasString(String string, List queryPlan, String errMsg) { @@ -365,13 +365,13 @@ public void testNonAcidToAcidConversion02() throws Exception { * Note: order of rows in a file ends up being the reverse of order in values clause (why?!) */ String[][] expected = { - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t0\t13", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":3}\t0\t13", "bucket_00001"}, {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t0\t15", "bucket_00001"}, {"{\"writeid\":10000003,\"bucketid\":536936448,\"rowid\":0}\t0\t17", "bucket_00001"}, {"{\"writeid\":10000002,\"bucketid\":536936449,\"rowid\":0}\t0\t120", "bucket_00001"}, - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "bucket_00001"}, - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":5}\t1\t4", "bucket_00001"}, - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t5", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":4}\t1\t4", "bucket_00001"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":5}\t1\t5", "bucket_00001"}, {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":6}\t1\t6", "bucket_00001"}, {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":1}\t1\t16", "bucket_00001"} }; @@ -426,11 +426,9 @@ public void testNonAcidToAcidConversion1() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + // There should be 1 original bucket file in the location (000001_0) + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); int [][] resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -443,10 +441,8 @@ public void testNonAcidToAcidConversion1() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -458,9 +454,9 @@ public void testNonAcidToAcidConversion1() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + // There should be 1 original bucket file (000001_0), plus a new delta directory. // The delta directory should also have only 1 bucket file (bucket_00001) - Assert.assertEquals(3, status.length); + Assert.assertEquals(2, status.length); boolean sawNewDelta = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { @@ -484,10 +480,10 @@ public void testNonAcidToAcidConversion1() throws Exception { runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); runWorker(hiveConf); // There should be 1 new directory: base_xxxxxxx. - // Original bucket files and delta directory should stay until Cleaner kicks in. + // Original bucket file and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(4, status.length); + Assert.assertEquals(3, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { @@ -517,8 +513,8 @@ public void testNonAcidToAcidConversion1() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Before Cleaner, there should be 5 items: - // 2 original files, 1 original directory, 1 base directory and 1 delta directory - Assert.assertEquals(5, status.length); + // 1 original file, 1 original directory, 1 base directory and 1 delta directory + Assert.assertEquals(4, status.length); runCleaner(hiveConf); // There should be only 1 directory left: base_xxxxxxx. // Original bucket files and delta directory should have been cleaned up. @@ -556,11 +552,9 @@ public void testNonAcidToAcidConversion2() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + // There should be 1 original bucket file in the location (000001_0) + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); int [][] resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -573,10 +567,8 @@ public void testNonAcidToAcidConversion2() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -588,12 +580,12 @@ public void testNonAcidToAcidConversion2() throws Exception { runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory + // There should be 1 original bucket file (000001_0), plus one delta directory // and one delete_delta directory. When split-update is enabled, an update event is split into // a combination of delete and insert, that generates the delete_delta directory. // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) // and so should the delete_delta directory. - Assert.assertEquals(4, status.length); + Assert.assertEquals(3, status.length); boolean sawNewDelta = false; boolean sawNewDeleteDelta = false; for (int i = 0; i < status.length; i++) { @@ -624,10 +616,10 @@ public void testNonAcidToAcidConversion2() throws Exception { runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); runWorker(hiveConf); // There should be 1 new directory: base_0000001. - // Original bucket files and delta directory should stay until Cleaner kicks in. + // Original bucket file and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(5, status.length); + Assert.assertEquals(4, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { @@ -649,8 +641,8 @@ public void testNonAcidToAcidConversion2() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Before Cleaner, there should be 5 items: - // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory - Assert.assertEquals(5, status.length); + // 1 original file, 1 delta directory, 1 delete_delta directory and 1 base directory + Assert.assertEquals(4, status.length); runCleaner(hiveConf); // There should be only 1 directory left: base_0000001. // Original bucket files, delta directory and delete_delta should have been cleaned up. @@ -688,11 +680,9 @@ public void testNonAcidToAcidConversion3() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + // There should be 1 original bucket file in the location (000001_0) + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); int [][] resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -705,10 +695,8 @@ public void testNonAcidToAcidConversion3() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -720,10 +708,10 @@ public void testNonAcidToAcidConversion3() throws Exception { runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); runWorker(hiveConf); // There should be 1 new directory: base_-9223372036854775808 - // Original bucket files should stay until Cleaner kicks in. + // Original bucket file should stay until Cleaner kicks in. status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); - Assert.assertEquals(3, status.length); + Assert.assertEquals(2, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { @@ -750,10 +738,10 @@ public void testNonAcidToAcidConversion3() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 - // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // There should be 1 original bucket file (000001_0), a base directory, // plus two new delta directories and one delete_delta directory that would be created due to // the update statement (remember split-update U=D+I)! - Assert.assertEquals(6, status.length); + Assert.assertEquals(5, status.length); int numDelta = 0; int numDeleteDelta = 0; sawNewBase = false; @@ -805,12 +793,12 @@ public void testNonAcidToAcidConversion3() throws Exception { runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); runWorker(hiveConf); // There should be 1 new base directory: base_00000016 - // Original bucket files, delta directories, delete_delta directories and the + // Original bucket file, delta directories, delete_delta directories and the // previous base directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); Arrays.sort(status); - Assert.assertEquals(7, status.length); + Assert.assertEquals(6, status.length); int numBase = 0; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { @@ -841,8 +829,8 @@ public void testNonAcidToAcidConversion3() throws Exception { status = fs.listStatus(new Path(getWarehouseDir() + "/" + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.HIDDEN_FILES_PATH_FILTER); // Before Cleaner, there should be 6 items: - // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories - Assert.assertEquals(7, status.length); + // 1 original file, 2 delta directories, 1 delete_delta directory and 2 base directories + Assert.assertEquals(6, status.length); runCleaner(hiveConf); runCleaner(hiveConf); // There should be only 1 directory left: base_00000016 @@ -883,11 +871,9 @@ public void testNonAcidToAcidConversion4() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDNESTEDPART + " partition(p='p1',q='q1') " + makeValuesClause(targetVals)); status = listFilesByTable(fs, Table.NONACIDNESTEDPART); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + // There should be 1 original bucket file in the location (000001_0) + Assert.assertEquals(BUCKET_COUNT - 1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); List rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART); Assert.assertEquals(stringifyValues(targetVals), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDNESTEDPART); @@ -898,10 +884,8 @@ public void testNonAcidToAcidConversion4() throws Exception { runStatementOnDriver("alter table " + Table.NONACIDNESTEDPART + " SET TBLPROPERTIES ('transactional'='true')"); status = listFilesByTable(fs, Table.NONACIDNESTEDPART); // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } + Assert.assertEquals(BUCKET_COUNT - 1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("000001_0")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDNESTEDPART); Assert.assertEquals(stringifyValues(targetVals), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDNESTEDPART); @@ -914,7 +898,7 @@ public void testNonAcidToAcidConversion4() throws Exception { // There should be 1 new directory: base_-9223372036854775808 // Original bucket files should stay until Cleaner kicks in. status = listFilesByTable(fs, Table.NONACIDNESTEDPART); - Assert.assertEquals(3, status.length); + Assert.assertEquals(2, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { Path parent = status[i].getPath().getParent(); @@ -938,10 +922,10 @@ public void testNonAcidToAcidConversion4() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDNESTEDPART + "(a,b,p,q) values(3,4,'p1','q1')"); status = listFilesByTable(fs, Table.NONACIDNESTEDPART); Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 - // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, + // There should be 1 original bucket file (000001_0), a base directory, // plus two new delta directories and one delete_delta directory that would be created due to // the update statement (remember split-update U=D+I)! - Assert.assertEquals(6, status.length); + Assert.assertEquals(5, status.length); int numDelta = 0; int numDeleteDelta = 0; sawNewBase = false; @@ -992,11 +976,11 @@ public void testNonAcidToAcidConversion4() throws Exception { runStatementOnDriver("alter table "+ Table.NONACIDNESTEDPART + " partition(p='p1',q='q1') compact 'MAJOR'"); runWorker(hiveConf); // There should be 1 new base directory: base_00000016 - // Original bucket files, delta directories, delete_delta directories and the + // Original bucket file, delta directories, delete_delta directories and the // previous base directory should stay until Cleaner kicks in. status = listFilesByTable(fs, Table.NONACIDNESTEDPART); Arrays.sort(status); - Assert.assertEquals(8, status.length); + Assert.assertEquals(7, status.length); int numBase = 0; Set bases = new HashSet<>(); for (int i = 0; i < status.length; i++) { @@ -1029,9 +1013,9 @@ public void testNonAcidToAcidConversion4() throws Exception { // 6. Let Cleaner delete obsolete files/dirs status = listFilesByTable(fs, Table.NONACIDNESTEDPART); // Before Cleaner, there should be 8 items: - // 2 original files, 2 delta directories (1 files each), 1 delete_delta directory (1 file) and 2 base directories (with one and two files respectively) + // 1 original file, 2 delta directories (1 files each), 1 delete_delta directory (1 file) and 2 base directories (with one and two files respectively) - Assert.assertEquals(8, status.length); + Assert.assertEquals(7, status.length); runCleaner(hiveConf); runCleaner(hiveConf); // There should be only 1 directory left: base_00000016 @@ -1331,49 +1315,6 @@ public void testInitiatorWithMinorCompactionForInsertOnlyTable() throws Exceptio verifyBaseDir(1, tblName, ""); } - /** - * Make sure there's no FileSystem$Cache$Key leak due to UGI use - * @throws Exception - */ - @Test - public void testFileSystemUnCaching() throws Exception { - int cacheSizeBefore; - int cacheSizeAfter; - - // get the size of cache BEFORE - cacheSizeBefore = getFileSystemCacheSize(); - - // Insert a row to ACID table - runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); - - // Perform a major compaction - runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'major'"); - runWorker(hiveConf); - runCleaner(hiveConf); - - // get the size of cache AFTER - cacheSizeAfter = getFileSystemCacheSize(); - - Assert.assertEquals(cacheSizeBefore, cacheSizeAfter); - } - - private int getFileSystemCacheSize() throws Exception { - try { - Field cache = FileSystem.class.getDeclaredField("CACHE"); - cache.setAccessible(true); - Object o = cache.get(null); // FileSystem.CACHE - - Field mapField = o.getClass().getDeclaredField("map"); - mapField.setAccessible(true); - Map map = (HashMap)mapField.get(o); // FileSystem.CACHE.map - - return map.size(); - } catch (NoSuchFieldException e) { - System.out.println(e); - } - return 0; - } - private static class CompactionsByState { private int didNotInitiate; private int failed; @@ -2450,6 +2391,9 @@ public void testCleanerForTxnToWriteId() throws Exception { // Keep an open txn which refers to the aborted txn. Context ctx = new Context(hiveConf); HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + // Txn is not considered committed or aborted until TXN_OPENTXN_TIMEOUT expires + // See MinOpenTxnIdWaterMarkFunction, OpenTxnTimeoutLowBoundaryTxnIdHandler + waitUntilAllTxnFinished(); txnMgr.openTxn(ctx, "u1"); txnMgr.getValidTxns(); @@ -2805,7 +2749,7 @@ public void testDynPartUpdateWithAborts() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_LOAD_DYNAMIC_PARTITION, true); runStatementOnDriverWithAbort("update " + Table.ACIDTBLPART + " set b=a+2 where a<5"); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_FAIL_LOAD_DYNAMIC_PARTITION, false); - verifyDeltaDirAndResult(2, Table.ACIDTBLPART.toString(), "p=p1", resultData1); + verifyDeltaDirAndResult(3, Table.ACIDTBLPART.toString(), "p=p1", resultData1); verifyDeleteDeltaDir(1, Table.ACIDTBLPART.toString(), "p=p1"); int count = TestTxnDbUtil diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index 563b86a471fd..a2446d63ad15 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -91,8 +91,7 @@ public void testRenameTable() throws Exception { "s/delta_0000001_0000001_0000/bucket_00000_0"}, {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t4\t6", "s/delta_0000002_0000002_0001/bucket_00000_0"}}; - checkResult(expected, testQuery, false, "check data", LOG); - + checkResult(expected, testQuery, "check data", LOG); Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='t'")); @@ -172,17 +171,13 @@ private void testDeleteEventPruning() throws Exception { List rs = runStatementOnDriver( "select ROW__ID, a, b from T order by a, b"); - boolean isVectorized = - hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - String testQuery = isVectorized ? - "select ROW__ID, a, b from T order by a, b" : - "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b"; + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b"; String[][] expected = new String[][]{ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", - "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + "warehouse/t/delta_0000001_0000001_0000/bucket_00000_0"}, {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", - "warehouse/t/delta_0000002_0000002_0000/bucket_00000"}}; - checkResult(expected, testQuery, isVectorized, "after delete", LOG); + "warehouse/t/delta_0000002_0000002_0000/bucket_00000_0"}}; + checkResultAndVectorization(expected, testQuery, "after delete", LOG); runStatementOnDriver("alter table T compact 'MAJOR'"); runWorker(hiveConf); @@ -197,10 +192,10 @@ private void testDeleteEventPruning() throws Exception { String[][] expected2 = new String[][]{ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t4\t5", - "warehouse/t/base_0000001/bucket_00000"}, + "warehouse/t/base_0000003_v0000012/bucket_00000"}, {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t4\t6", - "warehouse/t/base_0000002/bucket_00000"}}; - checkResult(expected2, testQuery, isVectorized, "after compaction", LOG); + "warehouse/t/base_0000003_v0000012/bucket_00000"}}; + checkResultAndVectorization(expected2, testQuery, "after compaction", LOG); } /** * HIVE-19985 @@ -256,32 +251,26 @@ public void testAcidMetaColumsDecode() throws Exception { */ @Test public void testSdpoBucketed() throws Exception { - testSdpoBucketed(true, true, 1); - testSdpoBucketed(true, false, 1); - testSdpoBucketed(false, true, 1); - testSdpoBucketed(false, false,1); - - testSdpoBucketed(true, true, 2); - testSdpoBucketed(true, false, 2); - testSdpoBucketed(false, true, 2); - testSdpoBucketed(false, false,2); + testSdpoBucketed(true, 1); + testSdpoBucketed(false, 1); + + testSdpoBucketed(true, 2); + testSdpoBucketed(false, 2); } - private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketing_version) + private void testSdpoBucketed(boolean isVectorized, int bucketingVersion) throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); runStatementOnDriver("drop table if exists acid_uap"); runStatementOnDriver("create transactional table acid_uap(a int, b varchar(128)) " + "partitioned by (ds string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES " + - "('bucketing_version'='" + bucketing_version + "')"); + "('bucketing_version'='" + bucketingVersion + "')"); runStatementOnDriver("insert into table acid_uap partition (ds='tomorrow') " + "values (1, 'bah'),(2, 'yah')"); runStatementOnDriver("insert into table acid_uap partition (ds='today') " + "values (1, 'bah'),(2, 'yah')"); runStatementOnDriver("select a,b, ds from acid_uap order by a,b, ds"); - String testQuery = isVectorized ? - "select ROW__ID, a, b, ds from acid_uap order by ds, a, b" : - "select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b"; + String testQuery = "select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b"; String[][] expected = new String[][]{ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttoday", "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001_0"}, @@ -292,7 +281,7 @@ private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketin "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001_0"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttomorrow", "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000_0"}}; - checkResult(expected, testQuery, isVectorized, "after insert", LOG); + checkResultAndVectorization(expected, testQuery, "after insert", LOG); runStatementOnDriver("update acid_uap set b = 'fred'"); @@ -306,7 +295,7 @@ private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketin "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0001/bucket_00001_0"}, {"{\"writeid\":3,\"bucketid\":536870913,\"rowid\":0}\t2\tfred\ttomorrow", "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0001/bucket_00000_0"}}; - checkResult(expected2, testQuery, isVectorized, "after update", LOG); + checkResultAndVectorization(expected2, testQuery, "after update", LOG); } @Test public void testCleaner2() throws Exception { @@ -338,8 +327,7 @@ public void testCleaner2() throws Exception { "t/delta_0000001_0000001_0000/bucket_00000_0"}, {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t4", "t/delta_0000002_0000002_0000/bucket_00000_0"}}; - checkResult(expected, testQuery, false, "check data", LOG); - + checkResult(expected, testQuery, "check data", LOG); txnMgr2 = swapTxnManager(txnMgr1); driver2 = swapDrivers(driver1); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java index 89483bae5658..1fabfcb2e52c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -62,7 +62,7 @@ public void testConcatenate() throws Exception { "acidtbl/delta_0000003_0000003_0000/bucket_00001_0"}, {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t8\t8", "acidtbl/delta_0000003_0000003_0000/bucket_00001_0"}}; - checkResult(expected, testQuery, false, "check data", LOG); + checkResultAndVectorization(expected, testQuery, "check data", LOG); /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() but in normal usage 'concatenate' is blocking, */ @@ -86,7 +86,7 @@ public void testConcatenate() throws Exception { "acidtbl/base_0000003_v0000011/bucket_00001"}, {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t8\t8", "acidtbl/base_0000003_v0000011/bucket_00001"}}; - checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + checkResultAndVectorization(expected2, testQuery, "check data after concatenate", LOG); } @Test public void testConcatenatePart() throws Exception { @@ -103,7 +103,7 @@ public void testConcatenatePart() throws Exception { "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001_0"}, {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}}; - checkResult(expected, testQuery, false, "check data", LOG); + checkResultAndVectorization(expected, testQuery, "check data", LOG); /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() but in normal usage 'concatenate' is blocking, */ @@ -128,11 +128,15 @@ public void testConcatenatePart() throws Exception { {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}}; - checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + checkResultAndVectorization(expected2, testQuery, "check data after concatenate", LOG); } @Test public void testConcatenateMM() throws Exception { + // Only one bucket is expected in this test + hiveConf.set("tez.grouping.max-size", "1024"); + hiveConf.set("tez.grouping.min-size", "1"); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); dropTables("T"); runStatementOnDriver("create table T(a int, b int)"); @@ -144,7 +148,7 @@ public void testConcatenateMM() throws Exception { {"4\t5", "t/delta_0000001_0000001_0000/000000_0"}, {"5\t6", "t/delta_0000002_0000002_0000/000000_0"}, {"8\t8", "t/delta_0000002_0000002_0000/000000_0"}}; - checkResult(expected, testQuery, false, "check data", LOG); + checkResultAndVectorization(expected, testQuery, "check data", LOG); /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() but in normal usage 'concatenate' is blocking, */ @@ -164,6 +168,6 @@ public void testConcatenateMM() throws Exception { {"4\t5", "t/base_0000003_v0000011/000000_0"}, {"5\t6", "t/base_0000003_v0000011/000000_0"}, {"8\t8", "t/base_0000003_v0000011/000000_0"}}; - checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + checkResultAndVectorization(expected2, testQuery, "check data after concatenate", LOG); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index 6636ff252bd2..ac2966463011 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -280,7 +280,7 @@ public void testCTLT() throws Exception { */ @Test public void testImport() throws Exception { - testImport(false, true); + testImport(true); } /** * tests import where target table already exists. @@ -288,14 +288,15 @@ public void testImport() throws Exception { @Test public void testImportVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - testImport(true, true); + testImport(true); } /** * tests import where target table does not exists. */ @Test public void testImportNoTarget() throws Exception { - testImport(false, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + testImport(false); } /** * MM tables already work - mm_exim.q @@ -304,7 +305,7 @@ public void testImportNoTarget() throws Exception { * If importing into existing table (un-partitioned) it must be empty. * If Import is creating a table it will be exactly like exported one except for the name. */ - private void testImport(boolean isVectorized, boolean existingTarget) throws Exception { + private void testImport(boolean existingTarget) throws Exception { dropTables("T", "Tstage"); if(existingTarget) { runStatementOnDriver("create table T (a int, b int) stored as orc"); @@ -320,8 +321,7 @@ private void testImport(boolean isVectorized, boolean existingTarget) throws Exc //load into existing empty table T runStatementOnDriver("import table T from '" + getWarehouseDir() + "/1'"); - String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : - "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, @@ -329,7 +329,7 @@ private void testImport(boolean isVectorized, boolean existingTarget) throws Exc "t/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "t/delta_0000001_0000001_0000/000000_0"}}; - checkResult(expected, testQuery, isVectorized, "import existing table"); + checkResultAndVectorization(expected, testQuery, "import existing table", LOG); runStatementOnDriver("update T set a = 0 where b = 6"); String[][] expected2 = new String[][] { @@ -339,7 +339,7 @@ private void testImport(boolean isVectorized, boolean existingTarget) throws Exc "t/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t0\t6", "t/delta_0000002_0000002_0001/bucket_00000_0"}}; - checkResult(expected2, testQuery, isVectorized, "update imported table"); + checkResultAndVectorization(expected2, testQuery, "update imported table", LOG); runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); @@ -350,7 +350,7 @@ private void testImport(boolean isVectorized, boolean existingTarget) throws Exc ".*t/delta_0000001_0000002_v000001[4-5]/bucket_00000"}, {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t0\t6", ".*t/delta_0000001_0000002_v000001[4-5]/bucket_00000"}}; - checkResult(expected3, testQuery, isVectorized, "minor compact imported table"); + checkResultAndVectorization(expected3, testQuery, "minor compact imported table", LOG); } @@ -383,7 +383,7 @@ public void testImportPartitioned() throws Exception { "t/p=11/delta_0000002_0000002_0000/000000_0"}, {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/p=12/delta_0000003_0000003_0000/000000_0"}}; - checkResult(expected, testQuery, isVectorized, "import existing table"); + checkResultAndVectorization(expected, testQuery, "import existing table", LOG); } @Test @@ -563,10 +563,6 @@ private void testMM(boolean existingTable, boolean isSourceMM) throws Exception Assert.assertTrue(s, s.endsWith("/000000_0")); } } - private void checkResult(String[][] expectedResult, String query, boolean isVectorized, - String msg) throws Exception{ - checkResult(expectedResult, query, isVectorized, msg, LOG); - } /** * This test will fail - MM export doesn't filter out aborted transaction data. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java index d5d6a330f486..8bd40171e9ac 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java @@ -58,42 +58,42 @@ protected String getTestDataDir() { } @Test - public void loadData() throws Exception { - loadData(false); + public void loadDataNotVectorized() throws Exception { + loadData(); } @Test public void loadDataVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - loadData(true); + loadData(); } @Test - public void loadDataUpdate() throws Exception { - loadDataUpdate(false); + public void loadDataUpdateNotVectorized() throws Exception { + loadDataUpdate(); } @Test public void loadDataUpdateVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - loadDataUpdate(true); + loadDataUpdate(); } @Test - public void loadDataNonAcid2AcidConversion() throws Exception { - loadDataNonAcid2AcidConversion(false); + public void loadDataNonAcid2AcidConversionNotVectorized() throws Exception { + loadDataNonAcid2AcidConversion(); } @Test public void loadDataNonAcid2AcidConversionVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - loadDataNonAcid2AcidConversion(true); + loadDataNonAcid2AcidConversion(); } @Test - public void testMultiStatement() throws Exception { - testMultiStatement(false); + public void testMultiStatementNotVectorized() throws Exception { + testMultiStatement(); } @Test public void testMultiStatementVectorized() throws Exception { hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - testMultiStatement(true); + testMultiStatement(); } - private void loadDataUpdate(boolean isVectorized) throws Exception { + private void loadDataUpdate() throws Exception { dropTables("T", "Tstage"); runStatementOnDriver( "create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); @@ -108,18 +108,17 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { // 'data' is created by export command/ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); - String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : - "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][]{ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}}; - checkResult(expected, testQuery, isVectorized, "load data inpath"); + checkResult(expected, testQuery, "load data inpath"); runStatementOnDriver("update T set b = 17 where a = 1"); String[][] expected2 = new String[][]{ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/000000_0"}, {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0001/bucket_00000_0"} }; - checkResult(expected2, testQuery, isVectorized, "update"); + checkResult(expected2, testQuery, "update"); runStatementOnDriver("insert into T values(2,2)"); String[][] expectedInter2 = new String[][] { @@ -127,13 +126,13 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0001/bucket_00000_0"}, {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"} }; - checkResult(expectedInter2, testQuery, isVectorized, "insert"); + checkResult(expectedInter2, testQuery, "insert"); runStatementOnDriver("delete from T where a = 3"); String[][] expectedInter3 = new String[][] { {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t1\t17", "t/delta_0000002_0000002_0001/bucket_00000_0"}, {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000003_0000003_0000/bucket_00000_0"} }; - checkResult(expectedInter3, testQuery, isVectorized, "delete"); + checkResult(expectedInter3, testQuery, "delete"); //test minor compaction runStatementOnDriver("alter table T compact 'minor'"); TestTxnCommands2.runWorker(hiveConf); @@ -141,13 +140,13 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { {"{\"writeid\":2,\"bucketid\":536870913,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000018/bucket_00000"}, {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000018/bucket_00000"} }; - checkResult(expected3, testQuery, isVectorized, "delete compact minor"); + checkResult(expected3, testQuery, "delete compact minor"); runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' overwrite into table T"); String[][] expected4 = new String[][]{ {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000005/000000_0"}, {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000005/000000_0"}}; - checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite"); + checkResult(expected4, testQuery, "load data inpath overwrite"); //load same data again (additive) runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); @@ -157,7 +156,7 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000006_0000006_0000/000000_0"}, {"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000006_0000006_0000/000000_0"} }; - checkResult(expectedInt1, testQuery, isVectorized, "load data local inpath"); + checkResult(expectedInt1, testQuery, "load data local inpath"); runStatementOnDriver("update T set b = 17 where a = 1");//matches 2 rows runStatementOnDriver("delete from T where a = 3");//matches 2 rows runStatementOnDriver("insert into T values(2,2)"); @@ -166,7 +165,7 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { {"{\"writeid\":7,\"bucketid\":536936449,\"rowid\":0}\t1\t17", "t/delta_0000007_0000007_0001/bucket_00001_0"}, {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000009_0000009_0000/bucket_00000_0"} }; - checkResult(expected5, testQuery, isVectorized, "load data inpath overwrite update"); + checkResult(expected5, testQuery, "load data inpath overwrite update"); //test major compaction runStatementOnDriver("alter table T compact 'major'"); @@ -176,9 +175,9 @@ private void loadDataUpdate(boolean isVectorized) throws Exception { {"{\"writeid\":7,\"bucketid\":536936449,\"rowid\":0}\t1\t17", "t/base_0000009_v0000033/bucket_00001"}, {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000033/bucket_00000"} }; - checkResult(expected6, testQuery, isVectorized, "load data inpath compact major"); + checkResult(expected6, testQuery, "load data inpath compact major"); } - private void loadData(boolean isVectorized) throws Exception { + private void loadData() throws Exception { dropTables("T", "Tstage"); runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("insert into T values(0,2),(0,4)"); @@ -193,8 +192,7 @@ private void loadData(boolean isVectorized) throws Exception { // 'data' is created by export command/ runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); - String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : - "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { //normal insert {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"}, @@ -202,7 +200,7 @@ private void loadData(boolean isVectorized) throws Exception { //Load Data {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000002_0000002_0000/000000_0"}, {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000002_0000002_0000/000000_0"}}; - checkResult(expected, testQuery, isVectorized, "load data inpath"); + checkResult(expected, testQuery, "load data inpath"); //test minor compaction runStatementOnDriver("alter table T compact 'minor'"); @@ -213,7 +211,7 @@ private void loadData(boolean isVectorized) throws Exception { {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002_v0000010/bucket_00000"}, {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002_v0000010/bucket_00000"} }; - checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)"); + checkResult(expected1, testQuery, "load data inpath (minor)"); //test major compaction runStatementOnDriver("insert into T values(2,2)"); @@ -226,7 +224,7 @@ private void loadData(boolean isVectorized) throws Exception { {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003_v0000015/bucket_00000"}, {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003_v0000015/bucket_00000"} }; - checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); + checkResult(expected2, testQuery, "load data inpath (major)"); //create more staging data and test Load Data Overwrite runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); @@ -235,7 +233,7 @@ private void loadData(boolean isVectorized) throws Exception { String[][] expected3 = new String[][] { {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000004/000000_0"}, {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000004/000000_0"}}; - checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite"); + checkResult(expected3, testQuery, "load data inpath overwrite"); //one more major compaction runStatementOnDriver("insert into T values(6,6)"); @@ -245,12 +243,12 @@ private void loadData(boolean isVectorized) throws Exception { {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005_v0000023/bucket_00000"}, {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005_v0000023/bucket_00000"}, {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005_v0000023/bucket_00000"}}; - checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)"); + checkResult(expected4, testQuery, "load data inpath overwrite (major)"); } /** * Load Data [overwrite] in to an (un-)partitioned acid converted table */ - private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Exception { + private void loadDataNonAcid2AcidConversion() throws Exception { dropTables("T", "Tstage"); runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='false')"); //per acid write to test nonAcid2acid conversion mixed with load data @@ -271,8 +269,7 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti // (with 000000_0, 000000_0_copy_1, 000000_0_copy_2) runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); - String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : - "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; /* {"writeid":0,"bucketid":536870912,"rowid":0} 0 2/000000_0 {"writeid":0,"bucketid":536870912,"rowid":1} 0 4/000000_0 @@ -297,7 +294,7 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti {"{\"writeid\":10000001,\"bucketid\":537001984,\"rowid\":1}\t5\t5", "t/delta_10000001_10000001_0000/000002_0"}, }; - checkResult(expected, testQuery, isVectorized, "load data inpath"); + checkResult(expected, testQuery, "load data inpath"); //create more staging data with copy_N files and do LD+Overwrite runStatementOnDriver("insert into Tstage values(5,6),(7,8)"); @@ -313,7 +310,7 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti "t/base_10000002/000001_0"} }; - checkResult(expected2, testQuery, isVectorized, "load data inpath overwrite"); + checkResult(expected2, testQuery, "load data inpath overwrite"); //create 1 more delta_x_x so that compactor has > dir file to compact runStatementOnDriver("insert into T values(9,9)"); @@ -330,7 +327,7 @@ private void loadDataNonAcid2AcidConversion(boolean isVectorized) throws Excepti {"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t9\t9", "t/base_10000003_v0000013/bucket_00000"} }; - checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)"); + checkResult(expected3, testQuery, "load data inpath overwrite (major)"); } /** * Load Data [overwrite] in to a partitioned transactional table @@ -404,7 +401,7 @@ public void testValidations() throws Exception { } private void checkExpected(List rs, String[][] expected, String msg) { - super.checkExpected(rs, expected, msg, LOG, true); + super.checkExpected(rs, expected, msg, LOG); } @Test @@ -421,7 +418,7 @@ public void testMMOrcTable() throws Exception { * Make sure Load Data assigns ROW_IDs correctly when there is statementId suffix on delta dir * For example, delta_x_x_0001. */ - private void testMultiStatement(boolean isVectorized) throws Exception { + private void testMultiStatement() throws Exception { dropTables("T", "Tstage"); runStatementOnDriver("create table T (a int, b int) stored as orc tblproperties('transactional'='true')"); //Tstage is just a simple way to generate test data @@ -438,15 +435,14 @@ private void testMultiStatement(boolean isVectorized) throws Exception { runStatementOnDriver("load data local inpath '" + getWarehouseDir() + "/1/data' into table T"); runStatementOnDriver("COMMIT"); - String testQuery = isVectorized ? "select ROW__ID, a, b from T order by ROW__ID" : - "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"; String[][] expected = new String[][] { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"}, {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/delta_0000001_0000001_0001/000000_0"}, {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/delta_0000001_0000001_0001/000000_0"} }; - checkResult(expected, testQuery, isVectorized, "load data inpath"); + checkResult(expected, testQuery, "load data inpath"); runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); @@ -456,7 +452,7 @@ private void testMultiStatement(boolean isVectorized) throws Exception { {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001_v0000009/bucket_00000"}, {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001_v0000009/bucket_00000"} }; - checkResult(expected2, testQuery, isVectorized, "load data inpath (major)"); + checkResult(expected2, testQuery, "load data inpath (major)"); //at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154 } @@ -484,11 +480,10 @@ public void testAbort() throws Exception { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000001_0000/bucket_00000_0"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000001_0000/bucket_00000_0"} }; - checkResult(expected, testQuery, isVectorized, "load data inpath"); + checkResult(expected, testQuery, "load data inpath"); } - void checkResult(String[][] expectedResult, String query, boolean isVectorized, - String msg) throws Exception{ - checkResult(expectedResult, query, isVectorized, msg, LOG); + void checkResult(String[][] expectedResult, String query, String msg) throws Exception { + checkResultAndVectorization(expectedResult, query, msg, LOG); } @Test public void testLoadAcidFile() throws Exception { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 40a0a169744b..7e8c5e7e1b8f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -63,10 +63,6 @@ public void setUp() throws Exception { //see TestTxnNoBucketsVectorized for vectorized version hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); } - - private boolean shouldVectorize() { - return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - } /** * Tests that Acid can work with un-bucketed tables. */ @@ -169,10 +165,9 @@ public void testNoBuckets() throws Exception { {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000011/bucket_00001"}, {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000011/bucket_00000"} }; - checkResult(expected, - "select ROW__ID, c1, c2, c3" + (shouldVectorize() ? "" : ", INPUT__FILE__NAME") + checkResultAndVectorization(expected, + "select ROW__ID, c1, c2, c3, INPUT__FILE__NAME" + " from " + NO_BUCKETS_TBL_NAME + " order by c1, c2, c3", - shouldVectorize(), "After Major Compaction", LOG); expectedFiles.clear(); @@ -270,10 +265,10 @@ public void testCTAS() throws Exception { " union all select a, b from " + Table.ACIDTBL); rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas3 order by ROW__ID"); String expected3[][] = { - {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000_0"}, - {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00001_0"}, - {"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00002_0"}, - {"{\"writeid\":1,\"bucketid\":537067520,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00003_0"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0001/bucket_00000_0"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0002/bucket_00000_0"}, + {"{\"writeid\":1,\"bucketid\":536936449,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0001/bucket_00001_0"}, + {"{\"writeid\":1,\"bucketid\":536936450,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0002/bucket_00001_0"}, }; checkExpected(rs, expected3, "Unexpected row count after ctas from union all query"); @@ -384,14 +379,14 @@ public void testToAcidConversionMultiBucket() throws Exception { List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); //previous insert+union creates 3 data files (0-3) - //insert (12,12) creates 000000_0_copy_1 + //insert (12,12) creates 000000_0 String expected[][] = { - {"1\t2", "warehouse/t/000002_0"}, - {"2\t4", "warehouse/t/000002_0"}, - {"5\t6", "warehouse/t/000000_0"}, - {"6\t8", "warehouse/t/000001_0"}, - {"9\t10", "warehouse/t/000000_0"}, - {"12\t12", "warehouse/t/000000_0_copy_1"} + {"1\t2", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"2\t4", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"5\t6", "warehouse/t/HIVE_UNION_SUBDIR_2/000000_0"}, + {"6\t8", "warehouse/t/HIVE_UNION_SUBDIR_2/000001_0"}, + {"9\t10", "warehouse/t/HIVE_UNION_SUBDIR_3/000000_0"}, + {"12\t12", "warehouse/t/000000_0"} }; checkExpected(rs, expected,"before converting to acid"); @@ -405,13 +400,13 @@ public void testToAcidConversionMultiBucket() throws Exception { " where a between 5 and 7"); //now we have a table with data files at multiple different levels. String expected1[][] = { - {"1\t2", "warehouse/t/000002_0"}, - {"2\t4", "warehouse/t/000002_0"}, - {"5\t6", "warehouse/t/000000_0"}, - {"6\t8", "warehouse/t/000001_0"}, - {"9\t10", "warehouse/t/000000_0"}, + {"1\t2", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"2\t4", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"5\t6", "warehouse/t/HIVE_UNION_SUBDIR_2/000000_0"}, + {"6\t8", "warehouse/t/HIVE_UNION_SUBDIR_2/000001_0"}, + {"9\t10", "warehouse/t/HIVE_UNION_SUBDIR_3/000000_0"}, {"10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, - {"12\t12", "warehouse/t/000000_0_copy_1"}, + {"12\t12", "warehouse/t/000000_0"}, {"20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, {"50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, {"60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"} @@ -429,36 +424,35 @@ now that T is Acid, data for each writerId is treated like a logical bucket (tho logical bucket (tranche) */ String expected2[][] = { - {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, - {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/000002_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "warehouse/t/000000_0"}, - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t6\t8", "warehouse/t/000001_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10", "warehouse/t/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t2\t4", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t5\t6", "warehouse/t/HIVE_UNION_SUBDIR_2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t6\t8", "warehouse/t/HIVE_UNION_SUBDIR_2/000001_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":7}\t9\t10", "warehouse/t/HIVE_UNION_SUBDIR_3/000000_0"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t12\t12", "warehouse/t/000000_0"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, - {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"}, + {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"}, }; checkExpected(rs, expected2,"after converting to acid (no compaction)"); Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984)); Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448)); - assertVectorized(shouldVectorize(), "update T set b = 88 where b = 80"); - runStatementOnDriver("update T set b = 88 where b = 80"); - assertVectorized(shouldVectorize(), "delete from T where b = 8"); + assertMappersAreVectorized("update T set b = 88 where b = 80 or b = 60"); + runStatementOnDriver("update T set b = 88 where b = 80 or b = 60"); + assertMappersAreVectorized("delete from T where b = 8"); runStatementOnDriver("delete from T where b = 8"); String expected3[][] = { - {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, - {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/000002_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "warehouse/t/000000_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10", "warehouse/t/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t2\t4", "warehouse/t/HIVE_UNION_SUBDIR_1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t5\t6", "warehouse/t/HIVE_UNION_SUBDIR_2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":7}\t9\t10", "warehouse/t/HIVE_UNION_SUBDIR_3/000000_0"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", "warehouse/t/000000_0_copy_1"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t12\t12", "warehouse/t/000000_0"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, - // update for "{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t60\t80" + {"{\"writeid\":10000001,\"bucketid\":537067521,\"rowid\":0}\t50\t88", "warehouse/t/delta_10000001_10000001_0001/bucket_00003_0"}, {"{\"writeid\":10000001,\"bucketid\":536870913,\"rowid\":0}\t60\t88", "warehouse/t/delta_10000001_10000001_0001/bucket_00000_0"}, }; rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); @@ -471,27 +465,28 @@ logical bucket (tranche) /*Compaction preserves location of rows wrt buckets/tranches (for now)*/ String expected4[][] = { - {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", - "warehouse/t/base_10000002_v0000015/bucket_00002"}, - {"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", - "warehouse/t/base_10000002_v0000015/bucket_00002"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6", + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2", + "warehouse/t/base_10000002_v0000015/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t2\t4", + "warehouse/t/base_10000002_v0000015/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":6}\t5\t6", "warehouse/t/base_10000002_v0000015/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10", + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":7}\t9\t10", "warehouse/t/base_10000002_v0000015/bucket_00000"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20", "warehouse/t/base_10000002_v0000015/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12", + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t12\t12", "warehouse/t/base_10000002_v0000015/bucket_00000"}, {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40", "warehouse/t/base_10000002_v0000015/bucket_00000"}, - {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60", - "warehouse/t/base_10000002_v0000015/bucket_00000"}, + {"{\"writeid\":10000001,\"bucketid\":537067521,\"rowid\":0}\t50\t88", + "warehouse/t/base_10000002_v0000015/bucket_00003"}, {"{\"writeid\":10000001,\"bucketid\":536870913,\"rowid\":0}\t60\t88", "warehouse/t/base_10000002_v0000015/bucket_00000"}, }; checkExpected(rs, expected4,"after major compact"); } + @Test public void testInsertFromUnion() throws Exception { int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; @@ -678,7 +673,7 @@ public void testNonAcidToAcidVectorzied() throws Exception { checkExpected(rs, expected, "After conversion"); Assert.assertEquals(Integer.toString(6), rs.get(0)); Assert.assertEquals(Integer.toString(9), rs.get(1)); - assertVectorized(shouldVectorize(), query); + assertMappersAreVectorized(query); //why isn't PPD working.... - it is working but storage layer doesn't do row level filtering; only row group level //this uses VectorizedOrcAcidRowBatchReader @@ -689,7 +684,7 @@ public void testNonAcidToAcidVectorzied() throws Exception { {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} }; checkExpected(rs, expected1, "After conversion with VC1"); - assertVectorized(shouldVectorize(), query); + assertMappersAreVectorized(query); //this uses VectorizedOrcAcidRowBatchReader query = "select ROW__ID, a from T where b > 0 order by a"; @@ -702,9 +697,10 @@ public void testNonAcidToAcidVectorzied() throws Exception { {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}", "9"} }; checkExpected(rs, expected2, "After conversion with VC2"); - assertVectorized(shouldVectorize(), query); + assertMappersAreVectorized(query); //doesn't vectorize (uses neither of the Vectorzied Acid readers) + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); query = "select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a"; rs = runStatementOnDriver(query); Assert.assertEquals("", 2, rs.size()); @@ -715,8 +711,9 @@ public void testNonAcidToAcidVectorzied() throws Exception { checkExpected(rs, expected3, "After non-vectorized read"); Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); //vectorized because there is INPUT__FILE__NAME - assertVectorized(false, query); + assertMappersAreVectorized(query); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); runStatementOnDriver("update T set b = 17 where a = 1"); //this should use VectorizedOrcAcidRowReader query = "select ROW__ID, b from T where b > 0 order by a"; @@ -729,7 +726,7 @@ public void testNonAcidToAcidVectorzied() throws Exception { {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}","10"} }; checkExpected(rs, expected4, "After conversion with VC4"); - assertVectorized(shouldVectorize(), query); + assertMappersAreVectorized(query); runStatementOnDriver("alter table T compact 'major'"); TestTxnCommands2.runWorker(hiveConf); @@ -756,10 +753,10 @@ public void testNonAcidToAcidVectorzied() throws Exception { }; checkExpected(rs, expected5, "After major compaction"); //vectorized because there is INPUT__FILE__NAME - assertVectorized(false, query); + assertMappersAreVectorized(query); } private void checkExpected(List rs, String[][] expected, String msg) { - super.checkExpected(rs, expected, msg, LOG, true); + super.checkExpected(rs, expected, msg, LOG); } /** * HIVE-17900 diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBucketsVectorized.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBucketsVectorized.java index 8aa967f501a4..033b22e2d762 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBucketsVectorized.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBucketsVectorized.java @@ -22,6 +22,7 @@ public class TestTxnNoBucketsVectorized extends TestTxnNoBuckets { @Before + @Override public void setUp() throws Exception { setUpInternal(); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index a0ae8f860371..21f7ddfadf3b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -23,12 +23,14 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -37,7 +39,10 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -60,9 +65,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; +import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; + public abstract class TxnCommandsBaseForTests { private static final Logger LOG = LoggerFactory.getLogger(TxnCommandsBaseForTests.class); - + //bucket count for test tables; set it to 1 for easier debugging final static int BUCKET_COUNT = 2; @Rule @@ -71,6 +80,7 @@ public abstract class TxnCommandsBaseForTests { protected HiveConf hiveConf; protected Driver d; protected TxnStore txnHandler; + private DatabaseProduct databaseProduct; public enum Table { ACIDTBL("acidTbl"), @@ -106,10 +116,15 @@ public void setUp() throws Exception { } } void initHiveConf() { - hiveConf = new HiveConf(this.getClass()); - //TODO: HIVE-28029: Make unit tests based on TxnCommandsBaseForTests run on Tez - hiveConf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); + hiveConf = new HiveConfForTest(this.getClass()); + // Multiple tests requires more than one buckets per write. Use a very small value for grouping size to create + // multiple mapper instances with FileSinkOperators. The number of buckets are depends on the size of the data + // written and the grouping size. Most test cases expects 2 buckets. + hiveConf.set("tez.grouping.max-size", "10"); + hiveConf.set("tez.grouping.min-size", "1"); + databaseProduct = determineDatabaseProduct(DatabaseProduct.DERBY_NAME, hiveConf); } + void setUpInternal() throws Exception { initHiveConf(); Path workDir = new Path(System.getProperty("test.tmp.dir", @@ -304,19 +319,41 @@ protected CommandProcessorException runStatementOnDriverNegative(String stmt) { throw new RuntimeException("Didn't get expected failure!"); } - /** - * Runs Vectorized Explain on the query and checks if the plan is vectorized as expected - * @param vectorized {@code true} - assert that it's vectorized - */ - void assertVectorized(boolean vectorized, String query) throws Exception { - List rs = runStatementOnDriver("EXPLAIN VECTORIZATION DETAIL " + query); - for(String line : rs) { - if(line != null && line.contains("Execution mode: vectorized")) { - Assert.assertTrue("Was vectorized when it wasn't expected", vectorized); - return; + protected void assertMappersAreVectorized(String query) + throws Exception { + List rs = runStatementOnDriver("EXPLAIN FORMATTED VECTORIZATION DETAIL " + query); + ObjectMapper objectMapper = new ObjectMapper(); + Map plan = objectMapper.readValue(rs.get(0), Map.class); + Map stages = (Map) plan.get("STAGE PLANS"); + Map tezStage = null; + if (stages == null) { + Assert.fail("Execution plan of query does not have have stages: " + rs.get(0)); + } + for (Map.Entry stageEntry : stages.entrySet()) { + Map stage = (Map) stageEntry.getValue(); + tezStage = (Map) stage.get("Tez"); + if (tezStage != null) { + break; + } + } + if (tezStage == null) { + Assert.fail("Execution plan of query does not contain a Tez stage: " + rs.get(0)); + } + Map vertices = (Map) tezStage.get("Vertices:"); + if (vertices == null) { + Assert.fail("Execution plan of query does not contain Tez vertices: " + rs.get(0)); + } + for (Map.Entry vertexEntry : stages.entrySet()) { + if (vertexEntry.getKey() == null || !vertexEntry.getKey().startsWith("Map")) { + continue; } + Map mapVertex = (Map) vertexEntry.getValue(); + String executionMode = (String) mapVertex.get("Execution mode"); + boolean vectorized = isNotBlank(executionMode) && executionMode.contains("vectorized"); + String message = "Mapper was " + (shouldVectorized() ? "not vectorized: " : "vectorized but was not expected: "); + Assert.assertTrue(message + rs.get(0), + shouldVectorized() ^ vectorized); } - Assert.assertTrue("Din't find expected 'vectorized' in plan", !vectorized); } /** * Will assert that actual files match expected. @@ -340,7 +377,7 @@ void assertExpectedFileSet(Set expectedFiles, String rootPath, String ta } Assert.assertEquals("Unexpected file list", expectedFiles, actualFiles); } - void checkExpected(List rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { + void checkExpected(List rs, String[][] expected, String msg, Logger LOG) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); logResult(LOG, rs); Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs, @@ -348,9 +385,9 @@ void checkExpected(List rs, String[][] expected, String msg, Logger LOG, //verify data and layout for(int i = 0; i < expected.length; i++) { Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i) + "; expected " + expected[i][0], rs.get(i).startsWith(expected[i][0])); - if(checkFileName) { + if (expected.length == 2) { Assert.assertTrue("Actual line(file) " + i + " file: " + rs.get(i), - rs.get(i).endsWith(expected[i][1]) || rs.get(i).matches(expected[i][1])); + rs.get(i).endsWith(expected[i][1]) || rs.get(i).matches(expected[i][1])); } } } @@ -367,10 +404,15 @@ void logResult(Logger LOG, List rs) { * which will currently make the query non-vectorizable. This means we can't check the file name * for vectorized version of the test. */ - protected void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg, Logger LOG) throws Exception{ + protected void checkResultAndVectorization(String[][] expectedResult, String query, String msg, Logger LOG) + throws Exception { + checkResult(expectedResult, query, msg, LOG); + assertMappersAreVectorized(query); + } + protected void checkResult(String[][] expectedResult, String query, String msg, Logger LOG) + throws Exception { List rs = runStatementOnDriver(query); - checkExpected(rs, expectedResult, msg + (isVectorized ? " vect" : ""), LOG, !isVectorized); - assertVectorized(isVectorized, query); + checkExpected(rs, expectedResult, msg + (shouldVectorized() ? " vect" : ""), LOG); } void dropTables(String... tables) throws Exception { HiveConf queryConf = d.getQueryState().getConf(); @@ -385,4 +427,23 @@ Driver swapDrivers(Driver otherDriver) { d = otherDriver; return tmp; } + + protected void waitUntilAllTxnFinished() throws Exception { + long openTxnTimeOutMillis = MetastoreConf.getTimeVar( + hiveConf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); + while (getOpenTxnCount(openTxnTimeOutMillis) > 0) { + Thread.sleep(openTxnTimeOutMillis); + } + } + + protected int getOpenTxnCount(long openTxnTimeOutMillis) throws Exception { + return TestTxnDbUtil.countQueryAgent(hiveConf, + "select count(*) from TXNS where TXN_STATE = '" + TxnStatus.OPEN.getSqlConst() + "' " + + "or TXN_STARTED >= (" + getEpochFn(databaseProduct) + + " - " + openTxnTimeOutMillis + ")"); + } + + protected boolean shouldVectorized() { + return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index 8f62c598932c..5787951a3bf0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -42,20 +43,13 @@ */ public abstract class DbTxnManagerEndToEndTestBase { - private static final String TEST_DATA_DIR = new File( - System.getProperty("java.io.tmpdir") + File.separator + - DbTxnManagerEndToEndTestBase.class.getCanonicalName() + "-" + System.currentTimeMillis()) - .getPath().replaceAll("\\\\", "/"); - - protected static HiveConf conf = new HiveConf(Driver.class); + protected static HiveConfForTest conf = new HiveConfForTest(DbTxnManagerEndToEndTestBase.class); protected HiveTxnManager txnMgr; protected Context ctx; protected Driver driver, driver2; protected TxnStore txnHandler; public DbTxnManagerEndToEndTestBase() { - //TODO: HIVE-28029: Make unit tests based on DbTxnManagerEndToEndTestBase run on Tez - conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr"); HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); @@ -104,7 +98,7 @@ public void setUp() throws Exception { throw new RuntimeException("Could not create " + getWarehouseDir()); } } - + @After public void tearDown() throws Exception { driver.close(); @@ -114,10 +108,10 @@ public void tearDown() throws Exception { if (txnMgr != null) { txnMgr.closeTxnManager(); } - FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); + FileUtils.deleteDirectory(new File(conf.getTestDataDir())); } protected String getWarehouseDir() { - return TEST_DATA_DIR + "/warehouse"; + return conf.getTestDataDir() + "/warehouse"; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java b/ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java index 4f24454056b7..af70710618b1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/util/TestUpgradeTool.java @@ -121,7 +121,7 @@ public void testPostUpgrade() throws Exception { {"4\t5",""}, {"5\t6",""}, }; - checkResult(expected0, testQuery0, true, "TFlat pre-check", LOG); + checkResultAndVectorization(expected0, testQuery0, "TFlat pre-check", LOG); //should be converted to MM @@ -184,7 +184,7 @@ public void testPostUpgrade() throws Exception { {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t5\t6", "tacid/delta_0000002_0000002/000000_0"} }; - checkResult(expected, testQuery, false, "TAcid post-check", LOG); + checkResultAndVectorization(expected, testQuery, "TAcid post-check", LOG); testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from TAcidPart order by a, b, p, ROW__ID"; @@ -202,7 +202,7 @@ public void testPostUpgrade() throws Exception { {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "tacidpart/p=12/delta_0000001_0000001/000000_0"} }; - checkResult(expected2, testQuery, false, "TAcidPart post-check", LOG); + checkResultAndVectorization(expected2, testQuery, "TAcidPart post-check", LOG); /* Verify that we re-arranged/renamed so that files names follow hive naming convention and are spread among deltas/buckets @@ -220,7 +220,7 @@ public void testPostUpgrade() throws Exception { {"4\t5"}, {"5\t6"} }; - checkResult(expectedData, testQuery, true, "TFlat post-check data", LOG); + checkResultAndVectorization(expectedData, testQuery, "TFlat post-check data", LOG); testQuery = "select ROW__ID, INPUT__FILE__NAME from TFlat order by INPUT__FILE__NAME"; String[][] expectedMetaData = new String[][] { @@ -235,7 +235,7 @@ public void testPostUpgrade() throws Exception { {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}", "tflat/delta_0000005_0000005/00000_0"} }; - checkResult(expectedMetaData, testQuery, false, "TFlat post-check files", LOG); + checkResultAndVectorization(expectedMetaData, testQuery, "TFlat post-check files", LOG); } @Test public void testGuessNumBuckets() {