Skip to content

Commit

Permalink
Change nextUri slug for every request
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Oct 9, 2019
1 parent 19dfa28 commit 52614f7
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.prestosql.server.SessionContext;
import io.prestosql.server.SessionPropertyDefaults;
import io.prestosql.server.SessionSupplier;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.resourcegroups.SelectionContext;
Expand Down Expand Up @@ -131,7 +132,7 @@ public QueryId createQueryId()
return queryIdGenerator.createNextQueryId();
}

public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query)
public ListenableFuture<?> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(sessionContext, "sessionFactory is null");
Expand All @@ -155,7 +156,7 @@ public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionCont
* Creates and registers a dispatch query with the query tracker. This method will never fail to register a query with the query
* tracker. If an error occurs while creating a dispatch query, a failed dispatch will be created and registered.
*/
private <C> void createQueryInternal(QueryId queryId, String slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
{
Session session = null;
PreparedQuery preparedQuery = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.prestosql.Session;
import io.prestosql.execution.QueryPreparer.PreparedQuery;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.resourcegroups.ResourceGroupId;

public interface DispatchQueryFactory
Expand All @@ -23,6 +24,6 @@ DispatchQuery createDispatchQuery(
Session session,
String query,
PreparedQuery preparedQuery,
String slug,
Slug slug,
ResourceGroupId resourceGroup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.prestosql.execution.warnings.WarningCollectorFactory;
import io.prestosql.metadata.Metadata;
import io.prestosql.security.AccessControl;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.resourcegroups.ResourceGroupId;
import io.prestosql.sql.tree.Statement;
Expand Down Expand Up @@ -90,7 +91,7 @@ public DispatchQuery createDispatchQuery(
Session session,
String query,
PreparedQuery preparedQuery,
String slug,
Slug slug,
ResourceGroupId resourceGroup)
{
WarningCollector warningCollector = warningCollectorFactory.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.prestosql.execution.QueryState;
import io.prestosql.server.HttpRequestSessionContext;
import io.prestosql.server.SessionContext;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.ErrorCode;
import io.prestosql.spi.QueryId;

Expand Down Expand Up @@ -70,10 +71,10 @@
import static io.airlift.http.server.AsyncResponseHandler.bindAsyncResponse;
import static io.prestosql.execution.QueryState.FAILED;
import static io.prestosql.execution.QueryState.QUEUED;
import static io.prestosql.server.protocol.Slug.Context.EXECUTING_QUERY;
import static io.prestosql.server.protocol.Slug.Context.QUEUED_QUERY;
import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -175,7 +176,7 @@ public void getStatus(
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
Query query = getQuery(queryId, slug);
Query query = getQuery(queryId, slug, token);

// wait for query to be dispatched, up to the wait timeout
ListenableFuture<?> futureStateChange = addTimeout(
Expand Down Expand Up @@ -206,15 +207,15 @@ public Response cancelQuery(
@PathParam("slug") String slug,
@PathParam("token") long token)
{
getQuery(queryId, slug)
getQuery(queryId, slug, token)
.cancel();
return Response.noContent().build();
}

private Query getQuery(QueryId queryId, String slug)
private Query getQuery(QueryId queryId, String slug, long token)
{
Query query = queries.get(queryId);
if (query == null || !query.getSlug().equals(slug)) {
if (query == null || !query.getSlug().isValid(QUEUED_QUERY, slug, token)) {
throw badRequest(NOT_FOUND, "Query not found");
}
return query;
Expand All @@ -229,13 +230,13 @@ private static URI getQueryHtmlUri(QueryId queryId, UriInfo uriInfo, String xFor
.build();
}

private static URI getQueuedUri(QueryId queryId, String slug, long token, UriInfo uriInfo, String xForwardedProto)
private static URI getQueuedUri(QueryId queryId, Slug slug, long token, UriInfo uriInfo, String xForwardedProto)
{
return uriInfo.getBaseUriBuilder()
.scheme(getScheme(xForwardedProto, uriInfo))
.replacePath("/v1/statement/queued/")
.path(queryId.toString())
.path(slug)
.path(slug.makeSlug(QUEUED_QUERY, token))
.path(String.valueOf(token))
.replaceQuery("")
.build();
Expand Down Expand Up @@ -290,7 +291,7 @@ private static final class Query
private final SessionContext sessionContext;
private final DispatchManager dispatchManager;
private final QueryId queryId;
private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
private final Slug slug = Slug.createNew();
private final AtomicLong lastToken = new AtomicLong();

@GuardedBy("this")
Expand All @@ -309,7 +310,7 @@ public QueryId getQueryId()
return queryId;
}

public String getSlug()
public Slug getSlug()
{
return slug;
}
Expand Down Expand Up @@ -412,7 +413,7 @@ private URI getRedirectUri(CoordinatorLocation coordinatorLocation, UriInfo uriI
return uriBuilderFrom(coordinatorUri)
.appendPath("/v1/statement/executing")
.appendPath(queryId.toString())
.appendPath(slug)
.appendPath(slug.makeSlug(EXECUTING_QUERY, 0))
.appendPath("0")
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.prestosql.metadata.Metadata;
import io.prestosql.security.AccessControl;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.QueryId;
import io.prestosql.sql.planner.Plan;
import io.prestosql.sql.tree.Expression;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class DataDefinitionExecution<T extends Statement>
{
private final DataDefinitionTask<T> task;
private final T statement;
private final String slug;
private final Slug slug;
private final TransactionManager transactionManager;
private final Metadata metadata;
private final AccessControl accessControl;
Expand All @@ -63,7 +64,7 @@ public class DataDefinitionExecution<T extends Statement>
private DataDefinitionExecution(
DataDefinitionTask<T> task,
T statement,
String slug,
Slug slug,
TransactionManager transactionManager,
Metadata metadata,
AccessControl accessControl,
Expand All @@ -81,7 +82,7 @@ private DataDefinitionExecution(
}

@Override
public String getSlug()
public Slug getSlug()
{
return slug;
}
Expand Down Expand Up @@ -300,7 +301,7 @@ public DataDefinitionExecutionFactory(
public DataDefinitionExecution<?> createQueryExecution(
PreparedQuery preparedQuery,
QueryStateMachine stateMachine,
String slug,
Slug slug,
WarningCollector warningCollector)
{
return createDataDefinitionExecution(preparedQuery.getStatement(), preparedQuery.getParameters(), stateMachine, slug);
Expand All @@ -310,7 +311,7 @@ private <T extends Statement> DataDefinitionExecution<T> createDataDefinitionExe
T statement,
List<Expression> parameters,
QueryStateMachine stateMachine,
String slug)
Slug slug)
{
@SuppressWarnings("unchecked")
DataDefinitionTask<T> task = (DataDefinitionTask<T>) tasks.get(statement.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.memory.VersionedMemoryPoolId;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.type.Type;
import io.prestosql.sql.planner.Plan;

Expand Down Expand Up @@ -51,7 +52,7 @@ public interface QueryExecution

QueryInfo getQueryInfo();

String getSlug();
Slug getSlug();

Duration getTotalCpuTime();

Expand Down Expand Up @@ -80,7 +81,7 @@ public interface QueryExecution

interface QueryExecutionFactory<T extends QueryExecution>
{
T createQueryExecution(PreparedQuery preparedQuery, QueryStateMachine stateMachine, String slug, WarningCollector warningCollector);
T createQueryExecution(PreparedQuery preparedQuery, QueryStateMachine stateMachine, Slug slug, WarningCollector warningCollector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.prestosql.Session;
import io.prestosql.execution.StateMachine.StateChangeListener;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.QueryId;

import java.util.List;
Expand Down Expand Up @@ -73,7 +74,7 @@ QueryInfo getFullQueryInfo(QueryId queryId)
/**
* @throws NoSuchElementException if query does not exist
*/
boolean isQuerySlugValid(QueryId queryId, String slug);
Slug getQuerySlug(QueryId queryId);

/**
* @throws NoSuchElementException if query does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.prestosql.operator.ForScheduler;
import io.prestosql.security.AccessControl;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.ConnectorTableHandle;
Expand Down Expand Up @@ -99,7 +100,7 @@ public class SqlQueryExecution
private static final OutputBufferId OUTPUT_BUFFER_ID = new OutputBufferId(0);

private final QueryStateMachine stateMachine;
private final String slug;
private final Slug slug;
private final Metadata metadata;
private final SqlParser sqlParser;
private final SplitManager splitManager;
Expand All @@ -126,7 +127,7 @@ public class SqlQueryExecution
private SqlQueryExecution(
PreparedQuery preparedQuery,
QueryStateMachine stateMachine,
String slug,
Slug slug,
Metadata metadata,
AccessControl accessControl,
SqlParser sqlParser,
Expand Down Expand Up @@ -225,7 +226,7 @@ private Analysis analyze(
}

@Override
public String getSlug()
public Slug getSlug()
{
return slug;
}
Expand Down Expand Up @@ -703,7 +704,7 @@ public static class SqlQueryExecutionFactory
public QueryExecution createQueryExecution(
PreparedQuery preparedQuery,
QueryStateMachine stateMachine,
String slug,
Slug slug,
WarningCollector warningCollector)
{
String executionPolicyName = SystemSessionProperties.getExecutionPolicy(stateMachine.getSession());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.prestosql.execution.StateMachine.StateChangeListener;
import io.prestosql.memory.ClusterMemoryManager;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.server.protocol.Slug;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.QueryId;
import io.prestosql.sql.planner.Plan;
Expand Down Expand Up @@ -187,9 +188,9 @@ public Session getQuerySession(QueryId queryId)
}

@Override
public boolean isQuerySlugValid(QueryId queryId, String slug)
public Slug getQuerySlug(QueryId queryId)
{
return queryTracker.getQuery(queryId).getSlug().equals(slug);
return queryTracker.getQuery(queryId).getSlug();
}

public Plan getQueryPlan(QueryId queryId)
Expand Down
Loading

0 comments on commit 52614f7

Please sign in to comment.