diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
index 8438e4ac509e..34cd02471ba4 100644
--- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
+++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java
@@ -48,6 +48,7 @@ public final class SchemaUtil {
 
   public static final String ROWKEY_NAME = "ROWKEY";
   public static final String ROWTIME_NAME = "ROWTIME";
+  public static final String WINDOWSTART_NAME = "WINDOWSTART";
 
   public static final int ROWKEY_INDEX = 1;
 
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
index 52278fa96c08..190c5fddc1c5 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
@@ -131,7 +131,7 @@ Analysis analyze(
       final Query query,
       final Optional<Sink> sink
   ) {
-    final Visitor visitor = new Visitor();
+    final Visitor visitor = new Visitor(query.isStatic());
     visitor.process(query, null);
 
     sink.ifPresent(visitor::analyzeNonStdOutSink);
@@ -146,9 +146,14 @@ private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
     // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
 
     private final Analysis analysis = new Analysis();
+    private final boolean staticQuery;
     private boolean isJoin = false;
     private boolean isGroupBy = false;
 
+    Visitor(final boolean staticQuery) {
+      this.staticQuery = staticQuery;
+    }
+
     private void analyzeNonStdOutSink(final Sink sink) {
       analysis.setProperties(sink.getProperties());
       sink.getPartitionBy().ifPresent(analysis::setPartitionBy);
@@ -301,19 +306,26 @@ private void throwOnUnknownColumnReference() {
           new ExpressionAnalyzer(analysis.getFromSourceSchemas());
 
       for (final Expression selectExpression : analysis.getSelectExpressions()) {
-        expressionAnalyzer.analyzeExpression(selectExpression);
+        expressionAnalyzer.analyzeExpression(selectExpression, false);
       }
 
       if (analysis.getWhereExpression() != null) {
-        expressionAnalyzer.analyzeExpression(analysis.getWhereExpression());
+        final boolean allowWindowMetaFields = staticQuery
+            && analysis.getFromDataSources().get(0)
+            .getDataSource()
+            .getKsqlTopic()
+            .getKeyFormat()
+            .isWindowed();
+
+        expressionAnalyzer.analyzeExpression(analysis.getWhereExpression(), allowWindowMetaFields);
       }
 
       for (final Expression expression : analysis.getGroupByExpressions()) {
-        expressionAnalyzer.analyzeExpression(expression);
+        expressionAnalyzer.analyzeExpression(expression, false);
       }
 
       if (analysis.getHavingExpression() != null) {
-        expressionAnalyzer.analyzeExpression(analysis.getHavingExpression());
+        expressionAnalyzer.analyzeExpression(analysis.getHavingExpression(), false);
       }
     }
 
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java
new file mode 100644
index 000000000000..57061aba217f
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ContinuousQueryValidator.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import io.confluent.ksql.parser.tree.Query;
+import io.confluent.ksql.parser.tree.ResultMaterialization;
+import io.confluent.ksql.parser.tree.Sink;
+import io.confluent.ksql.util.KsqlException;
+import java.util.Optional;
+
+public class ContinuousQueryValidator implements QueryValidator {
+
+  @Override
+  public void preValidate(
+      final Query query,
+      final Optional<Sink> sink
+  ) {
+    if (query.isStatic()) {
+      throw new IllegalArgumentException("static");
+    }
+
+    if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
+      throw new KsqlException("Continuous queries do not yet support `EMIT FINAL`. "
+          + "Consider changing to `EMIT CHANGES`."
+          + QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
+      );
+    }
+  }
+
+  @Override
+  public void postValidate(final Analysis analysis) {
+  }
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java
index 758c583b3829..0cded2fceb21 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java
@@ -45,34 +45,18 @@ class ExpressionAnalyzer {
     this.sourceSchemas = Objects.requireNonNull(sourceSchemas, "sourceSchemas");
   }
 
-  void analyzeExpression(final Expression expression) {
-    final Visitor visitor = new Visitor();
+  void analyzeExpression(final Expression expression, final boolean allowWindowMetaFields) {
+    final Visitor visitor = new Visitor(allowWindowMetaFields);
     visitor.process(expression, null);
   }
 
-  private void throwOnUnknownField(final QualifiedName name) {
-    final Set<String> sourcesWithField = sourceSchemas.sourcesWithField(name.name());
-    if (sourcesWithField.isEmpty()) {
-      throw new KsqlException("Field '" + name + "' cannot be resolved.");
-    }
+  private final class Visitor extends VisitParentExpressionVisitor<Object, Object> {
 
-    if (name.qualifier().isPresent()) {
-      if (!sourcesWithField.contains(name.qualifier().get())) {
-        throw new KsqlException("Source '" + name.qualifier() + "', "
-            + "used in '" + name + "' cannot be resolved.");
-      }
-    } else if (sourcesWithField.size() > 1) {
-      final String possibilities = sourcesWithField.stream()
-          .sorted()
-          .map(source -> SchemaUtil.buildAliasedFieldName(source, name.name()))
-          .collect(Collectors.joining(","));
-
-      throw new KsqlException("Field '" + name + "' is ambiguous. "
-          + "Could be any of: " + possibilities);
-    }
-  }
+    private final boolean allowWindowMetaFields;
 
-  private class Visitor extends VisitParentExpressionVisitor<Object, Object> {
+    Visitor(final boolean allowWindowMetaFields) {
+      this.allowWindowMetaFields = allowWindowMetaFields;
+    }
 
     public Object visitLikePredicate(final LikePredicate node, final Object context) {
       process(node.getValue(), null);
@@ -138,5 +122,32 @@ public Object visitQualifiedNameReference(
       throwOnUnknownField(node.getName());
       return null;
     }
+
+    private void throwOnUnknownField(final QualifiedName name) {
+      final Set<String> sourcesWithField = sourceSchemas.sourcesWithField(name.name());
+      if (sourcesWithField.isEmpty()) {
+        if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) {
+          return;
+        }
+
+        throw new KsqlException("Field '" + name + "' cannot be resolved.");
+      }
+
+      if (name.qualifier().isPresent()) {
+        final String qualifier = name.qualifier().get();
+        if (!sourcesWithField.contains(qualifier)) {
+          throw new KsqlException("Source '" + qualifier + "', "
+              + "used in '" + name + "' cannot be resolved.");
+        }
+      } else if (sourcesWithField.size() > 1) {
+        final String possibilities = sourcesWithField.stream()
+            .sorted()
+            .map(source -> SchemaUtil.buildAliasedFieldName(source, name.name()))
+            .collect(Collectors.joining(", "));
+
+        throw new KsqlException("Field '" + name + "' is ambiguous. "
+            + "Could be any of: " + possibilities);
+      }
+    }
   }
 }
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java
index 9d5d42d37cb2..46dd0f4545c3 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryAnalyzer.java
@@ -17,6 +17,7 @@
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
@@ -27,7 +28,6 @@
 import io.confluent.ksql.metastore.MetaStore;
 import io.confluent.ksql.parser.rewrite.ExpressionTreeRewriter;
 import io.confluent.ksql.parser.tree.Query;
-import io.confluent.ksql.parser.tree.ResultMaterialization;
 import io.confluent.ksql.parser.tree.Sink;
 import io.confluent.ksql.serde.SerdeOption;
 import io.confluent.ksql.util.AggregateExpressionRewriter;
@@ -40,33 +40,49 @@
 
 public class QueryAnalyzer {
 
-  private static final String NEW_QUERY_SYNTAX_HELP =
-      "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
-          + System.lineSeparator()
-          + "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
-          + "static queries, i.e. they query the current state of the system and return a final "
-          + "result."
-          + System.lineSeparator()
-          + "To turn a static query into a streaming query, as was the default in older versions "
-          + "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
-          + System.lineSeparator()
-          + "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
-          + "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
-          + "as a this will be required in a future release.";
-
+  static final String NEW_QUERY_SYNTAX_HELP = System.lineSeparator()
+      + "'EMIT CHANGES' is used to indicate a query is continuous and outputs all changes."
+      + System.lineSeparator()
+      + "'Bare queries, e.g. those in the format 'SELECT * FROM X ...' are now, by default, "
+      + "static queries, i.e. they query the current state of the system and return a final "
+      + "result."
+      + System.lineSeparator()
+      + "To turn a static query into a streaming query, as was the default in older versions "
+      + "of KSQL, add `EMIT CHANGES` to the end of the statement, before any limit clause."
+      + System.lineSeparator()
+      + "Persistent queries, e.g. `CREATE STREAM AS ...`, currently have an implicit "
+      + "`EMIT CHANGES`. However, it is recommended to add `EMIT CHANGES` to such statements "
+      + "as a this will be required in a future release.";
+
+  private final Analyzer analyzer;
   private final MetaStore metaStore;
-  private final String outputTopicPrefix;
-  private final Set<SerdeOption> defaultSerdeOptions;
+  private final QueryValidator continuousValidator;
+  private final QueryValidator staticValidator;
 
   public QueryAnalyzer(
       final MetaStore metaStore,
       final String outputTopicPrefix,
       final Set<SerdeOption> defaultSerdeOptions
+  ) {
+    this(
+        metaStore,
+        new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions),
+        new ContinuousQueryValidator(),
+        new StaticQueryValidator()
+    );
+  }
+
+  @VisibleForTesting
+  QueryAnalyzer(
+      final MetaStore metaStore,
+      final Analyzer analyzer,
+      final QueryValidator continuousValidator,
+      final QueryValidator staticValidator
   ) {
     this.metaStore = requireNonNull(metaStore, "metaStore");
-    this.outputTopicPrefix = requireNonNull(outputTopicPrefix, "outputTopicPrefix");
-    this.defaultSerdeOptions = ImmutableSet.copyOf(
-        requireNonNull(defaultSerdeOptions, "defaultSerdeOptions"));
+    this.analyzer = requireNonNull(analyzer, "analyzer");
+    this.continuousValidator = requireNonNull(continuousValidator, "continuousValidator");
+    this.staticValidator = requireNonNull(staticValidator, "staticValidator");
   }
 
   public Analysis analyze(
@@ -74,23 +90,20 @@ public Analysis analyze(
       final Optional<Sink> sink
   ) {
     if (query.isStatic()) {
-      throw new KsqlException("Static queries are not yet supported. "
-          + "Consider adding 'EMIT CHANGES' to any bare query, "
-          + System.lineSeparator()
-          + NEW_QUERY_SYNTAX_HELP
-      );
+      staticValidator.preValidate(query, sink);
+    } else {
+      continuousValidator.preValidate(query, sink);
     }
 
-    if (query.getResultMaterialization() != ResultMaterialization.CHANGES) {
-      throw new KsqlException("Continous queries do not yet support `EMIT FINAL`. "
-          + "Consider changing to `EMIT CHANGES`."
-          + System.lineSeparator()
-          + NEW_QUERY_SYNTAX_HELP
-      );
+    final Analysis analysis = analyzer.analyze(query, sink);
+
+    if (query.isStatic()) {
+      staticValidator.postValidate(analysis);
+    } else {
+      continuousValidator.postValidate(analysis);
     }
 
-    return new Analyzer(metaStore, outputTopicPrefix, defaultSerdeOptions)
-        .analyze(query, sink);
+    return analysis;
   }
 
   public AggregateAnalysis analyzeAggregate(final Query query, final Analysis analysis) {
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java
new file mode 100644
index 000000000000..9147f90182f5
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/QueryValidator.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import io.confluent.ksql.parser.tree.Query;
+import io.confluent.ksql.parser.tree.Sink;
+import java.util.Optional;
+
+/**
+ * Validator used by {@link QueryAnalyzer}.
+ */
+interface QueryValidator {
+
+  void preValidate(Query query, Optional<Sink> sink);
+
+  void postValidate(Analysis analysis);
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java
new file mode 100644
index 000000000000..a8edafa8b36e
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/StaticQueryValidator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import io.confluent.ksql.parser.tree.Query;
+import io.confluent.ksql.parser.tree.ResultMaterialization;
+import io.confluent.ksql.parser.tree.Sink;
+import io.confluent.ksql.util.KsqlException;
+import java.util.Optional;
+
+public class StaticQueryValidator implements QueryValidator {
+
+  @Override
+  public void preValidate(
+      final Query query,
+      final Optional<Sink> sink
+  ) {
+    if (!query.isStatic()) {
+      throw new IllegalArgumentException("not static");
+    }
+
+    if (query.getResultMaterialization() != ResultMaterialization.FINAL) {
+      throw new KsqlException("Static queries do not yet support `EMIT CHANGES`. "
+          + "Consider removing 'EMIT CHANGES' to any bare query."
+          + QueryAnalyzer.NEW_QUERY_SYNTAX_HELP
+      );
+    }
+
+    if (sink.isPresent()) {
+      throw new IllegalArgumentException("static queries should not have a sink");
+    }
+  }
+
+  @Override
+  public void postValidate(final Analysis analysis) {
+    if (analysis.getInto().isPresent()) {
+      throw new KsqlException("Static queries do not support outputting to sinks.");
+    }
+
+    if (analysis.isJoin()) {
+      throw new KsqlException("Static queries do not support joins.");
+    }
+
+    if (analysis.getWindowExpression() != null) {
+      throw new KsqlException("Static queries do not support WINDOW clauses.");
+    }
+
+    if (!analysis.getGroupByExpressions().isEmpty()) {
+      throw new KsqlException("Static queries do not support GROUP BY clauses.");
+    }
+
+    if (analysis.getPartitionBy().isPresent()) {
+      throw new KsqlException("Static queries do not support PARTITION BY clauses.");
+    }
+
+    if (analysis.getHavingExpression() != null) {
+      throw new KsqlException("Static queries do not support HAVING clauses.");
+    }
+
+    if (analysis.getLimitClause().isPresent()) {
+      throw new KsqlException("Static queries do not support LIMIT clauses.");
+    }
+  }
+}
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java
similarity index 98%
rename from ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java
rename to ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java
index e4143e430416..dc0d143d8830 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java
@@ -75,9 +75,16 @@
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+/**
+ * DO NOT ADD NEW TESTS TO THIS FILE
+ *
+ * <p>Instead add new JSON based tests to QueryTranslationTest.
+ *
+ * <p>This test file is more of a functional test, which is better implemented using QTT.
+ */
 @SuppressWarnings("OptionalGetWithoutIsPresent")
 @RunWith(MockitoJUnitRunner.class)
-public class AnalyzerTest {
+public class AnalyzerFunctionalTest {
 
   private static final Set<SerdeOption> DEFAULT_SERDE_OPTIONS = SerdeOption.none();
 
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java
new file mode 100644
index 000000000000..eefa57980263
--- /dev/null
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ContinuousQueryValidatorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import static org.mockito.Mockito.when;
+
+import io.confluent.ksql.parser.tree.Query;
+import io.confluent.ksql.parser.tree.ResultMaterialization;
+import io.confluent.ksql.parser.tree.Sink;
+import io.confluent.ksql.util.KsqlException;
+import java.util.Optional;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ContinuousQueryValidatorTest {
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Mock
+  private Query query;
+  @Mock
+  private Sink sink;
+
+  private QueryValidator validator;
+
+  @Before
+  public void setUp() {
+    validator = new ContinuousQueryValidator();
+
+    when(query.isStatic()).thenReturn(false);
+  }
+
+  @Test
+  public void shouldThrowOnContinuousQueryThatIsFinal() {
+    // Given:
+    when(query.getResultMaterialization()).thenReturn(ResultMaterialization.FINAL);
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Continuous queries do not yet support `EMIT FINAL`.");
+
+    // When:
+    validator.preValidate(query, Optional.empty());
+  }
+}
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java
new file mode 100644
index 000000000000..21207a20de91
--- /dev/null
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/ExpressionAnalyzerTest.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
+import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.expression.tree.QualifiedName;
+import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
+import io.confluent.ksql.execution.expression.tree.StringLiteral;
+import io.confluent.ksql.util.KsqlException;
+import io.confluent.ksql.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ExpressionAnalyzerTest {
+
+  private static final Expression WINDOW_START_EXP = new QualifiedNameReference(
+      QualifiedName.of("something", SchemaUtil.WINDOWSTART_NAME)
+  );
+
+  private static final Expression OTHER_EXP = new StringLiteral("foo");
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Mock
+  private SourceSchemas sourceSchemas;
+  private ExpressionAnalyzer analyzer;
+
+  @Before
+  public void setUp() {
+    analyzer = new ExpressionAnalyzer(sourceSchemas);
+  }
+
+  @Test
+  public void shouldNotThrowOnWindowStartIfAllowed() {
+    // Given:
+    final Expression expression = new ComparisonExpression(
+        Type.EQUAL,
+        WINDOW_START_EXP,
+        OTHER_EXP
+    );
+
+    // When:
+    analyzer.analyzeExpression(expression, true);
+
+    // Then: did not throw
+  }
+
+  @Test
+  public void shouldThrowOnWindowStartIfNotAllowed() {
+    // Given:
+    final Expression expression = new ComparisonExpression(
+        Type.EQUAL,
+        WINDOW_START_EXP,
+        OTHER_EXP
+    );
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Field 'something.WINDOWSTART' cannot be resolved.");
+
+    // When:
+    analyzer.analyzeExpression(expression, false);
+  }
+
+  @Test
+  public void shouldNotThrowOnMultipleSourcesIfFullyQualified() {
+    // Given:
+    final Expression expression = new QualifiedNameReference(
+        QualifiedName.of("fully", "qualified")
+    );
+
+    when(sourceSchemas.sourcesWithField("qualified"))
+        .thenReturn(ImmutableSet.of("multiple", "sources", "fully"));
+
+    // When:
+    analyzer.analyzeExpression(expression, true);
+
+    // Then: did not throw
+  }
+
+  @Test
+  public void shouldThrowOnMultipleSourcesIfFullyQualifiedButNoMatch() {
+    // Given:
+    final Expression expression = new QualifiedNameReference(
+        QualifiedName.of("fully", "qualified")
+    );
+
+    when(sourceSchemas.sourcesWithField("qualified"))
+        .thenReturn(ImmutableSet.of("not-fully", "also-not-fully"));
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Source 'fully', used in 'fully.qualified' cannot be resolved.");
+
+    // When:
+    analyzer.analyzeExpression(expression, true);
+  }
+
+  @Test
+  public void shouldThrowOnMultipleSourcesIfNotFullyQualified() {
+    // Given:
+    final Expression expression = new QualifiedNameReference(
+        QualifiedName.of("just-name")
+    );
+
+    when(sourceSchemas.sourcesWithField("just-name"))
+        .thenReturn(ImmutableSet.of("multiple", "sources"));
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Field 'just-name' is ambiguous. Could be any of: multiple.just-name, sources.just-name");
+
+    // When:
+    analyzer.analyzeExpression(expression, true);
+  }
+
+  @Test
+  public void shouldThrowOnNoSources() {
+    // Given:
+    final Expression expression = new QualifiedNameReference(
+        QualifiedName.of("just-name")
+    );
+
+    when(sourceSchemas.sourcesWithField("just-name"))
+        .thenReturn(ImmutableSet.of());
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Field 'just-name' cannot be resolved.");
+
+    // When:
+    analyzer.analyzeExpression(expression, true);
+  }
+}
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java
new file mode 100644
index 000000000000..8e1792183d6f
--- /dev/null
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerFunctionalTest.java
@@ -0,0 +1,477 @@
+/*
+ * Copyright 2018 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import static io.confluent.ksql.util.ExpressionMatchers.qualifiedNameExpressions;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+
+import io.confluent.ksql.analyzer.Analysis.AliasedDataSource;
+import io.confluent.ksql.analyzer.Analysis.Into;
+import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
+import io.confluent.ksql.execution.expression.tree.QualifiedName;
+import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
+import io.confluent.ksql.function.InternalFunctionRegistry;
+import io.confluent.ksql.metastore.MetaStore;
+import io.confluent.ksql.metastore.model.DataSource;
+import io.confluent.ksql.metastore.model.KsqlStream;
+import io.confluent.ksql.metastore.model.KsqlTable;
+import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
+import io.confluent.ksql.parser.KsqlParserTestUtil;
+import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
+import io.confluent.ksql.parser.tree.CreateTableAsSelect;
+import io.confluent.ksql.parser.tree.InsertInto;
+import io.confluent.ksql.parser.tree.Query;
+import io.confluent.ksql.parser.tree.Sink;
+import io.confluent.ksql.serde.Format;
+import io.confluent.ksql.serde.SerdeOption;
+import io.confluent.ksql.util.KsqlException;
+import io.confluent.ksql.util.MetaStoreFixture;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * DO NOT ADD NEW TESTS TO THIS FILE
+ *
+ * <p>Instead add new JSON based tests to QueryTranslationTest
+ *
+ * <p>This test file is more of a functional test, which is better implemented using QTT.
+ */
+@SuppressWarnings("OptionalGetWithoutIsPresent")
+public class QueryAnalyzerFunctionalTest {
+
+  private static final QualifiedNameReference ITEM_ID =
+      new QualifiedNameReference(QualifiedName.of("ORDERS", "ITEMID"));
+
+  private static final QualifiedNameReference ORDER_ID =
+      new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERID"));
+
+  private static final QualifiedNameReference ORDER_UNITS =
+      new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERUNITS"));
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
+  private final QueryAnalyzer queryAnalyzer =
+      new QueryAnalyzer(metaStore, "prefix-~", SerdeOption.none());
+
+  @Test
+  public void shouldCreateAnalysisForSimpleQuery() {
+    // Given:
+    final Query query = givenQuery("select orderid from orders EMIT CHANGES;");
+
+    // When:
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // Then:
+    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
+    assertThat(analysis.getSelectExpressions(), equalTo(Collections.singletonList(ORDER_ID)));
+    assertThat(analysis.getFromDataSources(), hasSize(1));
+    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class));
+    assertThat(fromDataSource.getAlias(), equalTo("ORDERS"));
+  }
+
+  @Test
+  public void shouldCreateAnalysisForCsas() {
+    // Given:
+    final PreparedStatement<CreateStreamAsSelect> statement = KsqlParserTestUtil.buildSingleAst(
+        "create stream s as select col1 from test1 EMIT CHANGES;", metaStore);
+    final Query query = statement.getStatement().getQuery();
+    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
+
+    // When:
+    final Analysis analysis = queryAnalyzer.analyze(query, sink);
+
+    // Then:
+    assertThat(analysis.getSelectExpressions(), contains(
+        new QualifiedNameReference(QualifiedName.of("TEST1", "COL1"))));
+
+    assertThat(analysis.getFromDataSources(), hasSize(1));
+
+    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
+    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class));
+    assertThat(fromDataSource.getAlias(), equalTo("TEST1"));
+    assertThat(analysis.getInto().get().getName(), is("S"));
+  }
+
+  @Test
+  public void shouldCreateAnalysisForCtas() {
+    // Given:
+    final PreparedStatement<CreateTableAsSelect> statement = KsqlParserTestUtil.buildSingleAst(
+        "create table t as select col1 from test2 EMIT CHANGES;", metaStore);
+    final Query query = statement.getStatement().getQuery();
+    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
+
+    // When:
+    final Analysis analysis = queryAnalyzer.analyze(query, sink);
+
+    // Then:
+    assertThat(analysis.getSelectExpressions(), contains(
+        new QualifiedNameReference(QualifiedName.of("TEST2", "COL1"))));
+
+    assertThat(analysis.getFromDataSources(), hasSize(1));
+
+    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
+    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlTable.class));
+    assertThat(fromDataSource.getAlias(), equalTo("TEST2"));
+    assertThat(analysis.getInto().get().getName(), is("T"));
+  }
+
+  @Test
+  public void shouldCreateAnalysisForInsertInto() {
+    // Given:
+    final PreparedStatement<InsertInto> statement = KsqlParserTestUtil.buildSingleAst(
+        "insert into test0 select col1 from test1 EMIT CHANGES;", metaStore);
+    final Query query = statement.getStatement().getQuery();
+    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
+
+    // When:
+    final Analysis analysis = queryAnalyzer.analyze(query, sink);
+
+    // Then:
+    assertThat(analysis.getSelectExpressions(), contains(
+        new QualifiedNameReference(QualifiedName.of("TEST1", "COL1"))));
+
+    assertThat(analysis.getFromDataSources(), hasSize(1));
+
+    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
+    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class));
+    assertThat(fromDataSource.getAlias(), equalTo("TEST1"));
+    assertThat(analysis.getInto(), is(not(Optional.empty())));
+    final Into into = analysis.getInto().get();
+    final DataSource<?> test0 = metaStore.getSource("TEST0");
+    assertThat(into.getName(), is(test0.getName()));
+    assertThat(into.getKsqlTopic(), is(test0.getKsqlTopic()));
+  }
+
+  @Test
+  public void shouldAnalyseWindowedAggregate() {
+    // Given:
+    final Query query = givenQuery(
+        "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " +
+            "where orderunits > 5 group by itemid EMIT CHANGES;");
+
+    // When:
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then:
+    assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().get(ITEM_ID), contains(ITEM_ID));
+    assertThat(aggregateAnalysis.getFinalSelectExpressions(), equalTo(Arrays.asList(ITEM_ID, new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_0")))));
+    assertThat(aggregateAnalysis.getAggregateFunctionArguments(), equalTo(Collections.singletonList(ORDER_UNITS)));
+    assertThat(aggregateAnalysis.getRequiredColumns(), containsInAnyOrder(ITEM_ID, ORDER_UNITS));
+  }
+
+  @Test
+  public void shouldThrowIfAggregateAnalysisDoesNotHaveGroupBy() {
+    // Given:
+    final Query query = givenQuery("select itemid, sum(orderunits) from orders EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Use of aggregate functions requires a GROUP BY clause. Aggregate function(s): SUM");
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldThrowOnAdditionalNonAggregateSelects() {
+    // Given:
+    final Query query = givenQuery(
+        "select itemid, orderid, sum(orderunits) from orders group by itemid EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]");
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldThrowOnAdditionalNonAggregateHavings() {
+    // Given:
+    final Query query = givenQuery(
+        "select sum(orderunits) from orders group by itemid having orderid = 1 EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException
+        .expectMessage("Non-aggregate HAVING expression not part of GROUP BY: [ORDERS.ORDERID]");
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldProcessGroupByExpression() {
+    // Given:
+    final Query query = givenQuery(
+        "select sum(orderunits) from orders group by itemid EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // When:
+    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then:
+    assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID));
+  }
+
+  @Test
+  public void shouldProcessGroupByArithmetic() {
+    // Given:
+    final Query query = givenQuery(
+        "select sum(orderunits) from orders group by itemid + 1 EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // When:
+    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then:
+    assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID));
+  }
+
+  @Test
+  public void shouldProcessGroupByFunction() {
+    // Given:
+    final Query query = givenQuery(
+        "select sum(orderunits) from orders group by ucase(itemid) EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // When:
+    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then:
+    assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID));
+  }
+
+  @Test
+  public void shouldProcessGroupByConstant() {
+    // Given:
+    final Query query = givenQuery(
+        "select sum(orderunits) from orders group by 1 EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then: did not throw.
+  }
+
+  @Test
+  public void shouldThrowIfGroupByAggFunction() {
+    // Given:
+    final Query query = givenQuery(
+        "select sum(orderunits) from orders group by sum(orderid) EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "GROUP BY does not support aggregate functions: SUM is an aggregate function.");
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldProcessHavingExpression() {
+    // Given:
+    final Query query = givenQuery(
+        "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " +
+            "where orderunits > 5 group by itemid having count(itemid) > 10 EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // When:
+    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then:
+    final Expression havingExpression = aggregateAnalysis.getHavingExpression();
+    assertThat(havingExpression, equalTo(new ComparisonExpression(
+        ComparisonExpression.Type.GREATER_THAN,
+        new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_1")),
+        new IntegerLiteral(10))));
+  }
+
+  @Test
+  public void shouldFailOnSelectStarWithGroupBy() {
+    // Given:
+    final Query query = givenQuery("select *, count() from orders group by itemid EMIT CHANGES;");
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Non-aggregate SELECT expression(s) not part of GROUP BY: "
+            + "[ORDERS.ADDRESS, ORDERS.ARRAYCOL, ORDERS.ITEMINFO, ORDERS.MAPCOL, ORDERS.ORDERID, "
+            + "ORDERS.ORDERTIME, ORDERS.ORDERUNITS, ORDERS.ROWKEY, ORDERS.ROWTIME]"
+    );
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldHandleSelectStarWithCorrectGroupBy() {
+    // Given:
+    final Query query = givenQuery("select *, count() from orders group by "
+        + "ROWTIME, ROWKEY, ITEMID, ORDERTIME, ORDERUNITS, MAPCOL, ORDERID, ITEMINFO, ARRAYCOL, ADDRESS"
+        + " EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    // When:
+    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
+
+    // Then:
+    assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().keySet(), containsInAnyOrder(
+        qualifiedNameExpressions(
+            "ORDERS.ROWTIME", "ORDERS.ROWKEY", "ORDERS.ITEMID", "ORDERS.ORDERTIME",
+            "ORDERS.ORDERUNITS", "ORDERS.MAPCOL", "ORDERS.ORDERID", "ORDERS.ITEMINFO",
+            "ORDERS.ARRAYCOL", "ORDERS.ADDRESS")
+    ));
+  }
+
+  @Test
+  public void shouldThrowIfSelectContainsUdfNotInGroupBy() {
+    // Given:
+    final Query query = givenQuery("select substring(orderid, 1, 2), count(*) "
+        + "from orders group by substring(orderid, 2, 5) EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Non-aggregate SELECT expression(s) not part of GROUP BY: [SUBSTRING(ORDERS.ORDERID, 1, 2)]"
+    );
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldThrowIfSelectContainsReversedStringConcatExpression() {
+    // Given:
+    final Query query = givenQuery("select itemid + address->street, count(*) "
+        + "from orders group by address->street + itemid EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Non-aggregate SELECT expression(s) not part of GROUP BY: "
+            + "[(ORDERS.ITEMID + FETCH_FIELD_FROM_STRUCT(ORDERS.ADDRESS, 'STREET'))]"
+    );
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldThrowIfSelectContainsFieldsUsedInExpressionInGroupBy() {
+    // Given:
+    final Query query = givenQuery("select orderId, count(*) "
+        + "from orders group by orderid + orderunits EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]"
+    );
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldThrowIfSelectContainsIncompatibleBinaryArithmetic() {
+    // Given:
+    final Query query = givenQuery("SELECT orderId - ordertime, COUNT(*) "
+        + "FROM ORDERS GROUP BY ordertime - orderId EMIT CHANGES;");
+
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "Non-aggregate SELECT expression(s) not part of GROUP BY: "
+            + "[(ORDERS.ORDERID - ORDERS.ORDERTIME)]"
+    );
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldThrowIfGroupByMissingAggregateSelectExpressions() {
+    // Given:
+    final Query query = givenQuery("select orderid from orders group by orderid EMIT CHANGES;");
+    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
+
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage(
+        "GROUP BY requires columns using aggregate functions in SELECT clause."
+    );
+
+    // When:
+    queryAnalyzer.analyzeAggregate(query, analysis);
+  }
+
+  @Test
+  public void shouldHandleValueFormat() {
+    // Given:
+    final PreparedStatement<CreateStreamAsSelect> statement = KsqlParserTestUtil.buildSingleAst(
+        "create stream s with(value_format='delimited') as select * from test1;", metaStore);
+    final Query query = statement.getStatement().getQuery();
+    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
+
+    // When:
+    final Analysis analysis = queryAnalyzer.analyze(query, sink);
+
+    // Then:
+    assertThat(analysis.getInto().get().getKsqlTopic().getValueFormat().getFormat(),
+        is(Format.DELIMITED));
+  }
+
+  private Query givenQuery(final String sql) {
+    return KsqlParserTestUtil.<Query>buildSingleAst(sql, metaStore).getStatement();
+  }
+}
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java
index fafe47ce1868..665542871de5 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2018 Confluent Inc.
+ * Copyright 2019 Confluent Inc.
  *
  * Licensed under the Confluent Community License (the "License"); you may not use
  * this file except in compliance with the License.  You may obtain a copy of the
@@ -15,472 +15,85 @@
 
 package io.confluent.ksql.analyzer;
 
-import static io.confluent.ksql.util.ExpressionMatchers.qualifiedNameExpressions;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.mockito.Mockito.mock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import io.confluent.ksql.analyzer.Analysis.AliasedDataSource;
-import io.confluent.ksql.analyzer.Analysis.Into;
-import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
-import io.confluent.ksql.execution.expression.tree.Expression;
-import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
-import io.confluent.ksql.execution.expression.tree.QualifiedName;
-import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
-import io.confluent.ksql.function.InternalFunctionRegistry;
 import io.confluent.ksql.metastore.MetaStore;
-import io.confluent.ksql.metastore.model.DataSource;
-import io.confluent.ksql.metastore.model.KsqlStream;
-import io.confluent.ksql.metastore.model.KsqlTable;
-import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
-import io.confluent.ksql.parser.KsqlParserTestUtil;
-import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
-import io.confluent.ksql.parser.tree.CreateTableAsSelect;
-import io.confluent.ksql.parser.tree.InsertInto;
 import io.confluent.ksql.parser.tree.Query;
 import io.confluent.ksql.parser.tree.Sink;
-import io.confluent.ksql.serde.Format;
-import io.confluent.ksql.serde.SerdeOption;
-import io.confluent.ksql.util.KsqlException;
-import io.confluent.ksql.util.MetaStoreFixture;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Optional;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
-@SuppressWarnings("OptionalGetWithoutIsPresent")
+@RunWith(MockitoJUnitRunner.class)
 public class QueryAnalyzerTest {
 
-  private static final QualifiedNameReference ITEM_ID =
-      new QualifiedNameReference(QualifiedName.of("ORDERS", "ITEMID"));
-
-  private static final QualifiedNameReference ORDER_ID =
-      new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERID"));
-
-  private static final QualifiedNameReference ORDER_UNITS =
-      new QualifiedNameReference(QualifiedName.of("ORDERS", "ORDERUNITS"));
-
   @Rule
   public final ExpectedException expectedException = ExpectedException.none();
 
-  private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
-  private final QueryAnalyzer queryAnalyzer =
-      new QueryAnalyzer(metaStore, "prefix-~", SerdeOption.none());
-
-  @Test
-  public void shouldCreateAnalysisForSimpleQuery() {
-    // Given:
-    final Query query = givenQuery("select orderid from orders EMIT CHANGES;");
-
-    // When:
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // Then:
-    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
-    assertThat(analysis.getSelectExpressions(), equalTo(Collections.singletonList(ORDER_ID)));
-    assertThat(analysis.getFromDataSources(), hasSize(1));
-    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class));
-    assertThat(fromDataSource.getAlias(), equalTo("ORDERS"));
-  }
-
-  @Test
-  public void shouldCreateAnalysisForCsas() {
-    // Given:
-    final PreparedStatement<CreateStreamAsSelect> statement = KsqlParserTestUtil.buildSingleAst(
-        "create stream s as select col1 from test1 EMIT CHANGES;", metaStore);
-    final Query query = statement.getStatement().getQuery();
-    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
-
-    // When:
-    final Analysis analysis = queryAnalyzer.analyze(query, sink);
-
-    // Then:
-    assertThat(analysis.getSelectExpressions(), contains(
-        new QualifiedNameReference(QualifiedName.of("TEST1", "COL1"))));
-
-    assertThat(analysis.getFromDataSources(), hasSize(1));
-
-    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
-    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class));
-    assertThat(fromDataSource.getAlias(), equalTo("TEST1"));
-    assertThat(analysis.getInto().get().getName(), is("S"));
-  }
-
-  @Test
-  public void shouldCreateAnalysisForCtas() {
-    // Given:
-    final PreparedStatement<CreateTableAsSelect> statement = KsqlParserTestUtil.buildSingleAst(
-        "create table t as select col1 from test2 EMIT CHANGES;", metaStore);
-    final Query query = statement.getStatement().getQuery();
-    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
-
-    // When:
-    final Analysis analysis = queryAnalyzer.analyze(query, sink);
-
-    // Then:
-    assertThat(analysis.getSelectExpressions(), contains(
-        new QualifiedNameReference(QualifiedName.of("TEST2", "COL1"))));
-
-    assertThat(analysis.getFromDataSources(), hasSize(1));
-
-    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
-    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlTable.class));
-    assertThat(fromDataSource.getAlias(), equalTo("TEST2"));
-    assertThat(analysis.getInto().get().getName(), is("T"));
-  }
-
-  @Test
-  public void shouldCreateAnalysisForInsertInto() {
-    // Given:
-    final PreparedStatement<InsertInto> statement = KsqlParserTestUtil.buildSingleAst(
-        "insert into test0 select col1 from test1 EMIT CHANGES;", metaStore);
-    final Query query = statement.getStatement().getQuery();
-    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
-
-    // When:
-    final Analysis analysis = queryAnalyzer.analyze(query, sink);
-
-    // Then:
-    assertThat(analysis.getSelectExpressions(), contains(
-        new QualifiedNameReference(QualifiedName.of("TEST1", "COL1"))));
-
-    assertThat(analysis.getFromDataSources(), hasSize(1));
-
-    final AliasedDataSource fromDataSource = analysis.getFromDataSources().get(0);
-    assertThat(fromDataSource.getDataSource(), instanceOf(KsqlStream.class));
-    assertThat(fromDataSource.getAlias(), equalTo("TEST1"));
-    assertThat(analysis.getInto(), is(not(Optional.empty())));
-    final Into into = analysis.getInto().get();
-    final DataSource<?> test0 = metaStore.getSource("TEST0");
-    assertThat(into.getName(), is(test0.getName()));
-    assertThat(into.getKsqlTopic(), is(test0.getKsqlTopic()));
-  }
-
-  @Test
-  public void shouldAnalyseWindowedAggregate() {
-    // Given:
-    final Query query = givenQuery(
-        "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " +
-            "where orderunits > 5 group by itemid EMIT CHANGES;");
-
-    // When:
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then:
-    assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().get(ITEM_ID), contains(ITEM_ID));
-    assertThat(aggregateAnalysis.getFinalSelectExpressions(), equalTo(Arrays.asList(ITEM_ID, new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_0")))));
-    assertThat(aggregateAnalysis.getAggregateFunctionArguments(), equalTo(Collections.singletonList(ORDER_UNITS)));
-    assertThat(aggregateAnalysis.getRequiredColumns(), containsInAnyOrder(ITEM_ID, ORDER_UNITS));
-  }
-
-  @Test
-  public void shouldThrowIfAggregateAnalysisDoesNotHaveGroupBy() {
-    // Given:
-    final Query query = givenQuery("select itemid, sum(orderunits) from orders EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Use of aggregate functions requires a GROUP BY clause. Aggregate function(s): SUM");
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldThrowOnAdditionalNonAggregateSelects() {
-    // Given:
-    final Query query = givenQuery(
-        "select itemid, orderid, sum(orderunits) from orders group by itemid EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]");
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldThrowOnAdditionalNonAggregateHavings() {
-    // Given:
-    final Query query = givenQuery(
-        "select sum(orderunits) from orders group by itemid having orderid = 1 EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException
-        .expectMessage("Non-aggregate HAVING expression not part of GROUP BY: [ORDERS.ORDERID]");
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldProcessGroupByExpression() {
-    // Given:
-    final Query query = givenQuery(
-        "select sum(orderunits) from orders group by itemid EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // When:
-    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then:
-    assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID));
-  }
-
-  @Test
-  public void shouldProcessGroupByArithmetic() {
-    // Given:
-    final Query query = givenQuery(
-        "select sum(orderunits) from orders group by itemid + 1 EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // When:
-    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then:
-    assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID));
-  }
-
-  @Test
-  public void shouldProcessGroupByFunction() {
-    // Given:
-    final Query query = givenQuery(
-        "select sum(orderunits) from orders group by ucase(itemid) EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // When:
-    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then:
-    assertThat(aggregateAnalysis.getRequiredColumns(), hasItem(ITEM_ID));
-  }
-
-  @Test
-  public void shouldProcessGroupByConstant() {
-    // Given:
-    final Query query = givenQuery(
-        "select sum(orderunits) from orders group by 1 EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then: did not throw.
-  }
-
-  @Test
-  public void shouldThrowIfGroupByAggFunction() {
-    // Given:
-    final Query query = givenQuery(
-        "select sum(orderunits) from orders group by sum(orderid) EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // Then:
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "GROUP BY does not support aggregate functions: SUM is an aggregate function.");
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldProcessHavingExpression() {
-    // Given:
-    final Query query = givenQuery(
-        "select itemid, sum(orderunits) from orders window TUMBLING ( size 30 second) " +
-            "where orderunits > 5 group by itemid having count(itemid) > 10 EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // When:
-    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then:
-    final Expression havingExpression = aggregateAnalysis.getHavingExpression();
-    assertThat(havingExpression, equalTo(new ComparisonExpression(
-        ComparisonExpression.Type.GREATER_THAN,
-        new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_1")),
-        new IntegerLiteral(10))));
-  }
-
-  @Test
-  public void shouldFailOnSelectStarWithGroupBy() {
-    // Given:
-    final Query query = givenQuery("select *, count() from orders group by itemid EMIT CHANGES;");
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Non-aggregate SELECT expression(s) not part of GROUP BY: "
-            + "[ORDERS.ADDRESS, ORDERS.ARRAYCOL, ORDERS.ITEMINFO, ORDERS.MAPCOL, ORDERS.ORDERID, "
-            + "ORDERS.ORDERTIME, ORDERS.ORDERUNITS, ORDERS.ROWKEY, ORDERS.ROWTIME]"
-    );
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldHandleSelectStarWithCorrectGroupBy() {
-    // Given:
-    final Query query = givenQuery("select *, count() from orders group by "
-        + "ROWTIME, ROWKEY, ITEMID, ORDERTIME, ORDERUNITS, MAPCOL, ORDERID, ITEMINFO, ARRAYCOL, ADDRESS"
-        + " EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    // When:
-    final AggregateAnalysis aggregateAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
-
-    // Then:
-    assertThat(aggregateAnalysis.getNonAggregateSelectExpressions().keySet(), containsInAnyOrder(
-        qualifiedNameExpressions(
-            "ORDERS.ROWTIME", "ORDERS.ROWKEY", "ORDERS.ITEMID", "ORDERS.ORDERTIME",
-            "ORDERS.ORDERUNITS", "ORDERS.MAPCOL", "ORDERS.ORDERID", "ORDERS.ITEMINFO",
-            "ORDERS.ARRAYCOL", "ORDERS.ADDRESS")
-    ));
-  }
-
-  @Test
-  public void shouldThrowIfSelectContainsUdfNotInGroupBy() {
-    // Given:
-    final Query query = givenQuery("select substring(orderid, 1, 2), count(*) "
-        + "from orders group by substring(orderid, 2, 5) EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Non-aggregate SELECT expression(s) not part of GROUP BY: [SUBSTRING(ORDERS.ORDERID, 1, 2)]"
-    );
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldThrowIfSelectContainsReversedStringConcatExpression() {
-    // Given:
-    final Query query = givenQuery("select itemid + address->street, count(*) "
-        + "from orders group by address->street + itemid EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Non-aggregate SELECT expression(s) not part of GROUP BY: "
-            + "[(ORDERS.ITEMID + FETCH_FIELD_FROM_STRUCT(ORDERS.ADDRESS, 'STREET'))]"
+  @Mock
+  private MetaStore metaStore;
+  @Mock
+  private Analyzer analyzer;
+  @Mock
+  private Query query;
+  @Mock
+  private Analysis analysis;
+  @Mock
+  private QueryValidator continuousValidator;
+  @Mock
+  private QueryValidator staticValidator;
+  @Mock
+  private Sink sink;
+  private QueryAnalyzer queryAnalyzer;
+
+  @Before
+  public void setUp() {
+    queryAnalyzer = new QueryAnalyzer(
+        metaStore,
+        analyzer,
+        continuousValidator,
+        staticValidator
     );
 
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
+    when(analyzer.analyze(any(), any())).thenReturn(analysis);
   }
 
   @Test
-  public void shouldThrowIfSelectContainsFieldsUsedInExpressionInGroupBy() {
+  public void shouldPreThenPostValidateContinuousQueries() {
     // Given:
-    final Query query = givenQuery("select orderId, count(*) "
-        + "from orders group by orderid + orderunits EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Non-aggregate SELECT expression(s) not part of GROUP BY: [ORDERS.ORDERID]"
-    );
+    when(query.isStatic()).thenReturn(false);
 
     // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldThrowIfSelectContainsIncompatibleBinaryArithmetic() {
-    // Given:
-    final Query query = givenQuery("SELECT orderId - ordertime, COUNT(*) "
-        + "FROM ORDERS GROUP BY ordertime - orderId EMIT CHANGES;");
-
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "Non-aggregate SELECT expression(s) not part of GROUP BY: "
-            + "[(ORDERS.ORDERID - ORDERS.ORDERTIME)]"
-    );
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldThrowIfGroupByMissingAggregateSelectExpressions() {
-    // Given:
-    final Query query = givenQuery("select orderid from orders group by orderid EMIT CHANGES;");
-    final Analysis analysis = queryAnalyzer.analyze(query, Optional.empty());
-
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage(
-        "GROUP BY requires columns using aggregate functions in SELECT clause."
-    );
-
-    // When:
-    queryAnalyzer.analyzeAggregate(query, analysis);
-  }
-
-  @Test
-  public void shouldHandleValueFormat() {
-    // Given:
-    final PreparedStatement<CreateStreamAsSelect> statement = KsqlParserTestUtil.buildSingleAst(
-        "create stream s with(value_format='delimited') as select * from test1;", metaStore);
-    final Query query = statement.getStatement().getQuery();
-    final Optional<Sink> sink = Optional.of(statement.getStatement().getSink());
-
-    // When:
-    final Analysis analysis = queryAnalyzer.analyze(query, sink);
+    queryAnalyzer.analyze(query, Optional.of(sink));
 
     // Then:
-    assertThat(analysis.getInto().get().getKsqlTopic().getValueFormat().getFormat(),
-        is(Format.DELIMITED));
+    final InOrder inOrder = Mockito.inOrder(continuousValidator);
+    inOrder.verify(continuousValidator).preValidate(query, Optional.of(sink));
+    inOrder.verify(continuousValidator).postValidate(analysis);
+    verifyNoMoreInteractions(staticValidator);
   }
 
   @Test
-  public void shouldRejectStaticQueries() {
+  public void shouldPreValidateStaticQueries() {
     // Given:
-    final Query query = mock(Query.class);
     when(query.isStatic()).thenReturn(true);
 
-    // Then:
-    expectedException.expect(KsqlException.class);
-    expectedException.expectMessage("Static queries are not yet supported");
-
     // When:
-    queryAnalyzer.analyze(query, Optional.empty());
-  }
+    queryAnalyzer.analyze(query, Optional.of(sink));
 
-  private Query givenQuery(final String sql) {
-    return KsqlParserTestUtil.<Query>buildSingleAst(sql, metaStore).getStatement();
+    // Then:
+    final InOrder inOrder = Mockito.inOrder(staticValidator);
+    inOrder.verify(staticValidator).preValidate(query, Optional.of(sink));
+    inOrder.verify(staticValidator).postValidate(analysis);
+    verifyNoMoreInteractions(continuousValidator);
   }
 }
\ No newline at end of file
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java
new file mode 100644
index 000000000000..ce50e0df6b85
--- /dev/null
+++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/StaticQueryValidatorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.analyzer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import io.confluent.ksql.execution.expression.tree.Expression;
+import io.confluent.ksql.parser.tree.Query;
+import io.confluent.ksql.parser.tree.ResultMaterialization;
+import io.confluent.ksql.parser.tree.Sink;
+import io.confluent.ksql.parser.tree.WindowExpression;
+import io.confluent.ksql.util.KsqlException;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StaticQueryValidatorTest {
+
+  private static final Expression AN_EXPRESSION = mock(Expression.class);
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Mock
+  private Query query;
+  @Mock
+  private Analysis analysis;
+  @Mock
+  private WindowExpression windowExpression;
+  @Mock
+  private Sink sink;
+
+  private QueryValidator validator;
+
+  @Before
+  public void setUp() {
+    validator = new StaticQueryValidator();
+
+    when(query.isStatic()).thenReturn(true);
+    when(query.getResultMaterialization()).thenReturn(ResultMaterialization.FINAL);
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatIsNotFinal() {
+    // Given:
+    when(query.getResultMaterialization()).thenReturn(ResultMaterialization.CHANGES);
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not yet support `EMIT CHANGES`");
+
+    // When:
+    validator.preValidate(query, Optional.empty());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldThrowOnStaticQueryIfSinkSupplied() {
+    validator.preValidate(query, Optional.of(sink));
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatIsJoin() {
+    // Given:
+    when(analysis.isJoin()).thenReturn(true);
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not support joins.");
+
+    // When:
+    validator.postValidate(analysis);
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatIsWindowed() {
+    // Given:
+
+    when(analysis.getWindowExpression()).thenReturn(windowExpression);
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not support WINDOW clauses.");
+
+    // When:
+    validator.postValidate(analysis);
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatHasGroupBy() {
+    // Given:
+    when(analysis.getGroupByExpressions()).thenReturn(ImmutableList.of(AN_EXPRESSION));
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not support GROUP BY clauses.");
+
+    // When:
+    validator.postValidate(analysis);
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatHasPartitionBy() {
+    // Given:
+    when(analysis.getPartitionBy()).thenReturn(Optional.of("Something"));
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not support PARTITION BY clauses.");
+
+    // When:
+    validator.postValidate(analysis);
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatHasHavingClause() {
+    // Given:
+    when(analysis.getHavingExpression()).thenReturn(AN_EXPRESSION);
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not support HAVING clauses.");
+
+    // When:
+    validator.postValidate(analysis);
+  }
+
+  @Test
+  public void shouldThrowOnStaticQueryThatHasLimitClause() {
+    // Given:
+    when(analysis.getLimitClause()).thenReturn(OptionalInt.of(1));
+
+    // Then:
+    expectedException.expect(KsqlException.class);
+    expectedException.expectMessage("Static queries do not support LIMIT clauses.");
+
+    // When:
+    validator.postValidate(analysis);
+  }
+}
\ No newline at end of file
diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json
index 9b33d7581318..5112adbe74fd 100644
--- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json
+++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json
@@ -24,6 +24,45 @@
         {"@type": "row", "rows": []}
       ]
     },
+    {
+      "name": "non-windowed with projection",
+      "statements": [
+        "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
+        "CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
+        "SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10';"
+      ],
+      "inputs": [
+        {"topic": "test_topic", "key": "11", "value": {}},
+        {"topic": "test_topic", "key": "10", "value": {}}
+      ],
+      "responses": [
+        {"@type": "currentStatus"},
+        {"@type": "currentStatus"},
+        {"@type": "row", "rows": [
+          {"window": null, "key": {"ROWKEY": "10"}, "value": {"COUNT": 1, "ID": "10x", "KSQL_COL_2": 2}}
+        ]}
+      ]
+    },
+    {
+      "name": "windowed with projection",
+      "statements": [
+        "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
+        "CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;",
+        "SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10' AND WindowStart=12000;"
+      ],
+      "inputs": [
+        {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}},
+        {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}},
+        {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}
+      ],
+      "responses": [
+        {"@type": "currentStatus"},
+        {"@type": "currentStatus"},
+        {"@type": "row", "rows": [
+          {"window": {"start": 12000, "end": null}, "key": {"ROWKEY": "10"}, "value": {"COUNT": 1, "ID": "10x", "KSQL_COL_2": 2}}
+        ]}
+      ]
+    },
     {
       "name": "tumbling windowed single key lookup with exact window start",
       "statements": [
@@ -248,19 +287,6 @@
         {"@type": "row", "rows": []}
       ]
     },
-    {
-      "name": "fail on unsupported query feature: non-select-star projections",
-      "statements": [
-        "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
-        "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
-        "SELECT COUNT FROM AGGREGATE WHERE ROWKEY='10';"
-      ],
-      "expectedError": {
-        "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage",
-        "message": "Static queries currently only support a 'SELECT *' projections",
-        "status": 400
-      }
-    },
     {
       "name": "fail on unsupported query feature: join",
       "statements": [
@@ -389,6 +415,19 @@
         "message": "WHERE clause missing WINDOWSTART",
         "status": 400
       }
+    },
+    {
+      "name": "fail if WINDOWSTART used in non-windowed static query",
+      "statements": [
+        "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
+        "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
+        "SELECT * FROM AGGREGATE WHERE ROWKEY='10' AND WINDOWSTART=10;"
+      ],
+      "expectedError": {
+        "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage",
+        "message": "Field 'AGGREGATE.WINDOWSTART' cannot be resolved",
+        "status": 400
+      }
     }
   ]
 }
\ No newline at end of file
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
index 0d66858a626b..611cdbcab880 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
@@ -26,6 +26,8 @@
 import com.google.common.collect.Sets.SetView;
 import io.confluent.ksql.GenericRow;
 import io.confluent.ksql.KsqlExecutionContext;
+import io.confluent.ksql.analyzer.Analysis;
+import io.confluent.ksql.analyzer.QueryAnalyzer;
 import io.confluent.ksql.execution.context.QueryContext;
 import io.confluent.ksql.execution.context.QueryContext.Stacker;
 import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
@@ -37,6 +39,11 @@
 import io.confluent.ksql.execution.expression.tree.LongLiteral;
 import io.confluent.ksql.execution.expression.tree.QualifiedNameReference;
 import io.confluent.ksql.execution.expression.tree.StringLiteral;
+import io.confluent.ksql.execution.plan.SelectExpression;
+import io.confluent.ksql.execution.streams.SelectValueMapper;
+import io.confluent.ksql.execution.streams.SelectValueMapperFactory;
+import io.confluent.ksql.execution.util.ExpressionTypeManager;
+import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
 import io.confluent.ksql.materialization.Locator;
 import io.confluent.ksql.materialization.Locator.KsqlNode;
 import io.confluent.ksql.materialization.Materialization;
@@ -44,14 +51,9 @@
 import io.confluent.ksql.materialization.Window;
 import io.confluent.ksql.metastore.MetaStore;
 import io.confluent.ksql.metastore.model.DataSource;
-import io.confluent.ksql.parser.tree.AliasedRelation;
-import io.confluent.ksql.parser.tree.Join;
+import io.confluent.ksql.parser.tree.AllColumns;
 import io.confluent.ksql.parser.tree.Query;
-import io.confluent.ksql.parser.tree.Relation;
-import io.confluent.ksql.parser.tree.Select;
 import io.confluent.ksql.parser.tree.SelectItem;
-import io.confluent.ksql.parser.tree.SingleColumn;
-import io.confluent.ksql.parser.tree.Table;
 import io.confluent.ksql.query.QueryId;
 import io.confluent.ksql.rest.Errors;
 import io.confluent.ksql.rest.client.KsqlRestClient;
@@ -61,9 +63,13 @@
 import io.confluent.ksql.rest.entity.QueryResultEntity;
 import io.confluent.ksql.rest.entity.QueryResultEntityFactory;
 import io.confluent.ksql.rest.server.resources.KsqlRestException;
+import io.confluent.ksql.schema.ksql.LogicalSchema;
 import io.confluent.ksql.schema.ksql.PhysicalSchema;
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import io.confluent.ksql.serde.SerdeOption;
 import io.confluent.ksql.services.ServiceContext;
 import io.confluent.ksql.statement.ConfiguredStatement;
+import io.confluent.ksql.util.KsqlConfig;
 import io.confluent.ksql.util.KsqlConstants;
 import io.confluent.ksql.util.KsqlException;
 import io.confluent.ksql.util.KsqlServerException;
@@ -73,9 +79,12 @@
 import io.confluent.ksql.util.timestamp.StringToTimestampParser;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -108,20 +117,11 @@ public static void validate(
     }
 
     try {
-      validateSelects(queryStmt.getSelect());
+      final Analysis analysis = analyze(statement, executionContext);
 
-      final PersistentQueryMetadata query = findMaterializingQuery(executionContext, queryStmt);
-
-      extractWhereInfo(queryStmt.getWhere(), query);
-
-      if (queryStmt.getWindow().isPresent()) {
-        throw new KsqlException("Static queries do not support WINDOW clauses.");
-      }
-
-      if (queryStmt.getGroupBy().isPresent()) {
-        throw new KsqlException("Static queries do not support GROUP BY clauses.");
-      }
+      final PersistentQueryMetadata query = findMaterializingQuery(executionContext, analysis);
 
+      extractWhereInfo(analysis, query);
     } catch (final Exception e) {
       throw new KsqlStatementException(
           e.getMessage(),
@@ -137,17 +137,17 @@ public static Optional<KsqlEntity> execute(
       final ServiceContext serviceContext
   ) {
     try {
-      final Query queryStmt = statement.getStatement();
+      final Analysis analysis = analyze(statement, executionContext);
 
-      final PersistentQueryMetadata query = findMaterializingQuery(executionContext, queryStmt);
+      final PersistentQueryMetadata query = findMaterializingQuery(executionContext, analysis);
 
-      final WhereInfo whereInfo = extractWhereInfo(queryStmt.getWhere(), query);
+      final WhereInfo whereInfo = extractWhereInfo(analysis, query);
 
       final QueryContext.Stacker contextStacker = new Stacker(new QueryId("static-query"));
 
       final Materialization mat = query
           .getMaterialization(contextStacker)
-          .orElseThrow(() -> notMaterializedException(getSourceRelation(queryStmt.getFrom())));
+          .orElseThrow(() -> notMaterializedException(getSourceName(analysis)));
 
       final Struct rowKey = asKeyStruct(whereInfo.rowkey, query.getPhysicalSchema());
 
@@ -156,7 +156,7 @@ public static Optional<KsqlEntity> execute(
         return Optional.of(proxyTo(owner, statement));
       }
 
-      final Map<Optional<Window>, GenericRow> result;
+      Result result;
       if (whereInfo.windowBounds.isPresent()) {
         final WindowBounds windowBounds = whereInfo.windowBounds.get();
 
@@ -164,18 +164,29 @@ public static Optional<KsqlEntity> execute(
         mat.windowed().get(rowKey, windowBounds.lower, windowBounds.upper)
             .forEach((k, v) -> builder.put(Optional.of(k), v));
 
-        result = builder.build();
+        result = new Result(
+            mat.schema(),
+            builder.build()
+        );
       } else {
-        result = mat.nonWindowed().get(rowKey)
+        final ImmutableMap<Optional<Window>, GenericRow> rows = mat
+            .nonWindowed().get(rowKey)
             .map(v -> ImmutableMap.of(Optional.<Window>empty(), v))
             .orElse(ImmutableMap.of());
+
+        result = new Result(
+            mat.schema(),
+            rows
+        );
       }
 
+      result = handleSelects(result, statement, executionContext, analysis);
+
       final QueryResultEntity entity = new QueryResultEntity(
           statement.getStatementText(),
           mat.windowType(),
-          mat.schema(),
-          QueryResultEntityFactory.createRows(rowKey, result, mat.schema())
+          result.schema,
+          QueryResultEntityFactory.createRows(rowKey, result.rows, result.schema)
       );
 
       return Optional.of(entity);
@@ -188,6 +199,19 @@ public static Optional<KsqlEntity> execute(
     }
   }
 
+  private static Analysis analyze(
+      final ConfiguredStatement<Query> statement,
+      final KsqlExecutionContext executionContext
+  ) {
+    final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(
+        executionContext.getMetaStore(),
+        "",
+        SerdeOption.none()
+    );
+
+    return queryAnalyzer.analyze(statement.getStatement(), Optional.empty());
+  }
+
   private static final class WindowBounds {
 
     private final Instant lower;
@@ -217,14 +241,30 @@ private WhereInfo(
     }
   }
 
+  private static final class Result {
+
+    private final LogicalSchema schema;
+    private final Map<Optional<Window>, GenericRow> rows;
+
+    private Result(
+        final LogicalSchema schema,
+        final Map<Optional<Window>, GenericRow> rows
+    ) {
+      this.schema = Objects.requireNonNull(schema, "schema");
+      this.rows = Objects.requireNonNull(rows, "rows");
+    }
+  }
+
   private static WhereInfo extractWhereInfo(
-      final Optional<Expression> possibleWhere,
+      final Analysis analysis,
       final PersistentQueryMetadata query
   ) {
     final boolean windowed = query.getResultTopic().getKeyFormat().isWindowed();
 
-    final Expression where = possibleWhere
-        .orElseThrow(() -> invalidWhereClauseException("missing WHERE clause", windowed));
+    final Expression where = analysis.getWhereExpression();
+    if (where == null) {
+      throw invalidWhereClauseException("Missing WHERE clause", windowed);
+    }
 
     final Map<ComparisonTarget, List<ComparisonExpression>> comparisons = extractComparisons(where);
 
@@ -237,7 +277,7 @@ private static WhereInfo extractWhereInfo(
 
     if (!windowed) {
       if (comparisons.size() > 1) {
-        throw invalidWhereClauseException("Unsupported WHERE clause", windowed);
+        throw invalidWhereClauseException("Unsupported WHERE clause", false);
       }
 
       return new WhereInfo(rowKey, Optional.empty());
@@ -249,7 +289,7 @@ private static WhereInfo extractWhereInfo(
     if (windowBoundsComparison == null) {
       throw invalidWhereClauseException(
           "WHERE clause missing " + ComparisonTarget.WINDOWSTART,
-          windowed
+          true
       );
     }
 
@@ -465,32 +505,73 @@ private static ComparisonTarget extractWhereClauseTarget(final ComparisonExpress
     }
   }
 
-  private static void validateSelects(final Select select) {
-    final List<SelectItem> selectItems = select.getSelectItems();
+  private static boolean isSelectStar(final List<SelectItem> selects) {
+    return selects.size() == 1 && selects.get(0) instanceof AllColumns;
+  }
 
-    if (selectItems.size() != 1
-        || selectItems.get(0) instanceof SingleColumn
-    ) {
-      throw new KsqlException("Static queries currently only support a 'SELECT *' projections");
+  private static Result handleSelects(
+      final Result input,
+      final ConfiguredStatement<Query> statement,
+      final KsqlExecutionContext executionContext,
+      final Analysis analysis
+  ) {
+    final List<SelectItem> selectItems = statement.getStatement().getSelect().getSelectItems();
+    if (input.rows.isEmpty() || isSelectStar(selectItems)) {
+      return input;
+    }
+
+    final LogicalSchema.Builder schema = LogicalSchema.builder();
+    schema.keyColumns(input.schema.key());
+
+    final ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(
+        input.schema,
+        executionContext.getMetaStore()
+    );
+
+    final List<SelectExpression> selects = new ArrayList<>(selectItems.size());
+    for (int idx = 0; idx < analysis.getSelectExpressions().size(); idx++) {
+      final Expression exp = analysis.getSelectExpressions().get(idx);
+      final String alias = analysis.getSelectExpressionAlias().get(idx);
+      selects.add(SelectExpression.of(alias, exp));
+      final SqlType type = expressionTypeManager.getExpressionSqlType(exp);
+      schema.valueColumn(alias, type);
     }
+
+    final String sourceName = getSourceName(analysis);
+
+    final KsqlConfig ksqlConfig = statement.getConfig()
+        .cloneWithPropertyOverwrite(statement.getOverrides());
+
+    final SelectValueMapper mapper = SelectValueMapperFactory.create(
+        selects,
+        input.schema.withAlias(sourceName),
+        ksqlConfig,
+        executionContext.getMetaStore(),
+        NoopProcessingLogContext.INSTANCE.getLoggerFactory().getLogger("any")
+    );
+
+    final Map<Optional<Window>, GenericRow> output = new LinkedHashMap<>();
+    input.rows.forEach((k, v) -> output.put(k, mapper.apply(v)));
+    return new Result(
+        schema.build(),
+        output
+    );
   }
 
   private static PersistentQueryMetadata findMaterializingQuery(
       final KsqlExecutionContext executionContext,
-      final Query query
+      final Analysis analysis
   ) {
     final MetaStore metaStore = executionContext.getMetaStore();
 
-    final Table sourceTable = getSourceRelation(query.getFrom());
+    final String sourceName = getSourceName(analysis);
 
-    final DataSource<?> source = getSource(sourceTable, metaStore);
-
-    final Set<String> queries = metaStore.getQueriesWithSink(source.getName());
+    final Set<String> queries = metaStore.getQueriesWithSink(sourceName);
     if (queries.isEmpty()) {
-      throw notMaterializedException(sourceTable);
+      throw notMaterializedException(sourceName);
     }
     if (queries.size() > 1) {
-      throw new KsqlException("Multiple queries currently materialize '" + sourceTable + "'."
+      throw new KsqlException("Multiple queries currently materialize '" + sourceName + "'."
           + " KSQL currently only supports static queries when the table has only been"
           + " materialized once.");
     }
@@ -501,32 +582,9 @@ private static PersistentQueryMetadata findMaterializingQuery(
         .orElseThrow(() -> new KsqlException("Materializing query has been stopped"));
   }
 
-  private static DataSource<?> getSource(
-      final Table sourceTable,
-      final MetaStore metaStore
-  ) {
-    final DataSource<?> source = metaStore.getSource(sourceTable.getName().toString());
-    if (source == null) {
-      throw new KsqlException("Unknown source: " + sourceTable.getName());
-    }
-
-    return source;
-  }
-
-  private static Table getSourceRelation(final Relation from) {
-    if (from instanceof Join) {
-      throw new KsqlException("Static queries do not support joins.");
-    }
-
-    if (from instanceof Table) {
-      return (Table) from;
-    }
-
-    if (from instanceof AliasedRelation) {
-      return getSourceRelation(((AliasedRelation) from).getRelation());
-    }
-
-    throw new KsqlException("Unsupported source type: " + from.getClass().getSimpleName());
+  private static String getSourceName(final Analysis analysis) {
+    final DataSource<?> source = analysis.getFromDataSources().get(0).getDataSource();
+    return source.getName();
   }
 
   private static KsqlNode getOwner(final Struct rowKey, final Materialization mat) {
@@ -567,7 +625,7 @@ private static KsqlEntity proxyTo(
     }
   }
 
-  private static KsqlException notMaterializedException(final Table sourceTable) {
+  private static KsqlException notMaterializedException(final String sourceTable) {
     return new KsqlException(
         "Table '" + sourceTable + "' is not materialized."
             + " KSQL currently only supports static queries on materialized aggregate tables."