Skip to content

Commit

Permalink
Revert "HIVE-11402 : HS2 - add an option to disallow parallel query e…
Browse files Browse the repository at this point in the history
…xecution within a single Session (Sergey Shelukhin, reviewed by Aihua Xu, Thejas Nair)"

This reverts commit 48f3297.
  • Loading branch information
prongs committed Jul 26, 2016
1 parent b4f3754 commit d6807ce
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 228 deletions.
2 changes: 0 additions & 2 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2148,8 +2148,6 @@ public static enum ConfVars {
new TimeValidator(TimeUnit.SECONDS),
"Number of seconds a request will wait to acquire the compile lock before giving up. " +
"Setting it to 0s disables the timeout."),
HIVE_SERVER2_PARALLEL_OPS_IN_SESSION("hive.server2.parallel.ops.in.session", true,
"Whether to allow several parallel operations (such as SQL statements) in one session."),

// HiveServer2 WebUI
HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public static void setUpBeforeClass() throws SQLException, ClassNotFoundExceptio
System.setProperty(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose");
System.setProperty(ConfVars.HIVEMAPREDMODE.varname, "nonstrict");
System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider");
System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false");

Statement stmt1 = con1.createStatement();
assertNotNull("Statement is null", stmt1);
Expand Down Expand Up @@ -328,23 +327,6 @@ private void checkBadUrl(String url) throws SQLException {
}
}

@Test
public void testSerializedExecution() throws Exception {
// Test running parallel queries (with parallel queries disabled).
// Should be serialized in the order of execution.
HiveStatement stmt1 = (HiveStatement) con.createStatement();
HiveStatement stmt2 = (HiveStatement) con.createStatement();
stmt1.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
stmt1.execute("create table test_ser_1(i int)");
stmt1.executeAsync("insert into test_ser_1 select sleepMsUDF(under_col, 500) from "
+ tableName + " limit 1");
boolean isResultSet = stmt2.executeAsync("select * from test_ser_1");
assertTrue(isResultSet);
ResultSet rs = stmt2.getResultSet();
assertTrue(rs.next());
assertFalse(rs.next());
}

@Test
public void testParentReferences() throws Exception {
/* Test parent references from Statement */
Expand Down Expand Up @@ -2552,19 +2534,6 @@ public Integer evaluate(final Integer value) {
}
}


// A udf which sleeps for some number of ms to simulate a long running query
public static class SleepMsUDF extends UDF {
public Integer evaluate(final Integer value, final Integer ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// No-op
}
return value;
}
}

/**
* Loads data from a table containing non-ascii value column
* Runs a query and compares the return value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public void testLeakOperationHandle() throws HiveSQLException {
HiveSessionImpl session = new HiveSessionImpl(protocol, username, password,
serverhiveConf, ipAddress) {
@Override
protected synchronized void acquire(boolean userAccess, boolean isOperation) {
protected synchronized void acquire(boolean userAccess) {
}

@Override
protected synchronized void release(boolean userAccess, boolean isOperation) {
protected synchronized void release(boolean userAccess) {
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class ExecuteStatementOperation extends Operation {

public ExecuteStatementOperation(HiveSession parentSession, String statement,
Map<String, String> confOverlay, boolean runInBackground) {
super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT);
super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground);
this.statement = statement;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public abstract class Operation {
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
protected volatile HiveSQLException operationException;
protected final boolean runAsync;
protected volatile Future<?> backgroundHandle;
protected OperationLog operationLog;
protected boolean isOperationLogEnabled;
Expand All @@ -84,29 +85,23 @@ public abstract class Operation {
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);


protected Operation(HiveSession parentSession, OperationType opType) {
this(parentSession, null, opType);
}

protected Operation(HiveSession parentSession, Map<String, String> confOverlay,
OperationType opType) {
this(parentSession, confOverlay, opType, false);
}
this(parentSession, null, opType, false);
}

protected Operation(HiveSession parentSession,
Map<String, String> confOverlay, OperationType opType, boolean isAsyncQueryState) {
protected Operation(HiveSession parentSession, Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {
this.parentSession = parentSession;
if (confOverlay != null) {
this.confOverlay = confOverlay;
}
this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
beginTime = System.currentTimeMillis();
lastAccessTime = beginTime;
operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
setMetrics(state);
queryState = new QueryState(parentSession.getHiveConf(), confOverlay, isAsyncQueryState);
queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runAsync);
}

public Future<?> getBackgroundHandle() {
Expand All @@ -118,9 +113,10 @@ protected void setBackgroundHandle(Future<?> backgroundHandle) {
}

public boolean shouldRunAsync() {
return false; // Most operations cannot run asynchronously.
return runAsync;
}


public HiveSession getParentSession() {
return parentSession;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
*/
@SuppressWarnings("deprecation")
public class SQLOperation extends ExecuteStatementOperation {

private Driver driver = null;
private CommandProcessorResponse response;
private TableSchema resultSchema = null;
Expand All @@ -100,7 +101,6 @@ public class SQLOperation extends ExecuteStatementOperation {
private SQLOperationDisplay sqlOpDisplay;
private long queryTimeout;
private ScheduledExecutorService timeoutExecutor;
private final boolean runAsync;

/**
* A map to track query count running by each user
Expand All @@ -112,7 +112,6 @@ public SQLOperation(HiveSession parentSession, String statement, Map<String, Str
boolean runInBackground, long queryTimeout) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
super(parentSession, statement, confOverlay, runInBackground);
this.runAsync = runInBackground;
this.queryTimeout = queryTimeout;
setupSessionIO(parentSession.getSessionState());
try {
Expand All @@ -122,11 +121,6 @@ public SQLOperation(HiveSession parentSession, String statement, Map<String, Str
}
}

@Override
public boolean shouldRunAsync() {
return runAsync;
}

private void setupSessionIO(SessionState sessionState) {
try {
sessionState.in = null; // hive server's session input stream is not used
Expand Down Expand Up @@ -278,16 +272,70 @@ public void runInternal() throws HiveSQLException {
if (!runAsync) {
runQuery();
} else {
// We'll pass ThreadLocals in the background thread from the foreground (handler) thread.
// 1) ThreadLocal Hive object needs to be set in background thread
// 2) The metastore client in Hive is associated with right user.
// 3) Current UGI will get used by metastore when metastore is in embedded mode
Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),
SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);
// We'll pass ThreadLocals in the background thread from the foreground (handler) thread
final SessionState parentSessionState = SessionState.get();
// ThreadLocal Hive object needs to be set in background thread.
// The metastore client in Hive is associated with right user.
final Hive parentHive = parentSession.getSessionHive();
final PerfLogger parentPerfLogger = SessionState.getPerfLogger();
// Current UGI will get used by metastore when metsatore is in embedded mode
// So this needs to get passed to the new background thread
final UserGroupInformation currentUGI = getCurrentUGI();
// Runnable impl to call runInternal asynchronously,
// from a different thread
Runnable backgroundOperation = new Runnable() {
@Override
public void run() {
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
Hive.set(parentHive);
// TODO: can this result in cross-thread reuse of session state?
SessionState.setCurrentSessionState(parentSessionState);
PerfLogger.setPerfLogger(parentPerfLogger);
// Set current OperationLog in this async thread for keeping on saving query log.
registerCurrentOperationLog();
registerLoggingContext();
try {
if (asyncPrepare) {
prepare(queryState);
}
runQuery();
} catch (HiveSQLException e) {
setOperationException(e);
LOG.error("Error running hive query: ", e);
} finally {
unregisterLoggingContext();
unregisterOperationLog();
}
return null;
}
};

try {
currentUGI.doAs(doAsAction);
} catch (Exception e) {
setOperationException(new HiveSQLException(e));
LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
}
finally {
/**
* We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
* when this thread is garbage collected later.
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
*/
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
ThreadWithGarbageCleanup currentThread =
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
currentThread.cacheThreadLocalRawStore();
}
}
}
};
try {
// This submit blocks if no background threads are available to run this operation
Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);
Future<?> backgroundHandle =
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
setBackgroundHandle(backgroundHandle);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
Expand All @@ -297,74 +345,6 @@ public void runInternal() throws HiveSQLException {
}
}


private final class BackgroundWork implements Runnable {
private final UserGroupInformation currentUGI;
private final Hive parentHive;
private final PerfLogger parentPerfLogger;
private final SessionState parentSessionState;
private final boolean asyncPrepare;

private BackgroundWork(UserGroupInformation currentUGI,
Hive parentHive, PerfLogger parentPerfLogger,
SessionState parentSessionState, boolean asyncPrepare) {
this.currentUGI = currentUGI;
this.parentHive = parentHive;
this.parentPerfLogger = parentPerfLogger;
this.parentSessionState = parentSessionState;
this.asyncPrepare = asyncPrepare;
}

@Override
public void run() {
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
Hive.set(parentHive);
// TODO: can this result in cross-thread reuse of session state?
SessionState.setCurrentSessionState(parentSessionState);
PerfLogger.setPerfLogger(parentPerfLogger);
// Set current OperationLog in this async thread for keeping on saving query log.
registerCurrentOperationLog();
registerLoggingContext();
try {
if (asyncPrepare) {
prepare(queryState);
}
runQuery();
} catch (HiveSQLException e) {
setOperationException(e);
LOG.error("Error running hive query: ", e);
} finally {
unregisterLoggingContext();
unregisterOperationLog();
}
return null;
}
};

try {
currentUGI.doAs(doAsAction);
} catch (Exception e) {
setOperationException(new HiveSQLException(e));
LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
}
finally {
/**
* We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
* when this thread is garbage collected later.
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
*/
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
ThreadWithGarbageCleanup currentThread =
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
currentThread.cacheThreadLocalRawStore();
}
}
}
}


/**
* Returns the current UGI on the stack
* @param opConfig
Expand Down Expand Up @@ -683,5 +663,4 @@ private void decrementUserQueries(Metrics metrics) throws IOException {
public String getExecutionEngine() {
return queryState.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Hive;
Expand Down Expand Up @@ -212,6 +211,4 @@ void renewDelegationToken(HiveAuthFactory authFactory, String tokenStr)
void closeExpiredOperations();

long getNoOperationTime();

Future<?> submitBackgroundOperation(Runnable work);
}
Loading

0 comments on commit d6807ce

Please sign in to comment.