Skip to content

Commit

Permalink
Extending ParquetFilters
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreSchumacher committed May 16, 2014
1 parent 93e8192 commit 6d22666
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
override def eval(input: Row): Any = c2(input, left, right, _.gteq(_, _))
}

// A simple filter condition on a single column
/*case class ColumnFilterPredicate(val comparison: BinaryComparison) extends BinaryComparison {
override def eval(input: Row): Any = comparison.eval(input)
} */

case class If(predicate: Expression, trueValue: Expression, falseValue: Expression)
extends Expression {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,125 @@

package org.apache.spark.sql.parquet

import org.apache.hadoop.conf.Configuration

import parquet.filter._
import parquet.filter.ColumnPredicates._
import parquet.column.ColumnReader
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Equals
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.types.{IntegerType, BooleanType, NativeType}
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.reflect.ClassTag

import com.google.common.io.BaseEncoding
import parquet.filter
import parquet.filter.ColumnPredicates.BooleanPredicateFunction

// Implicits
import collection.JavaConversions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer

object ParquetFilters {
val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"

def createFilter(filterExpressions: Seq[Expression]): UnboundRecordFilter = {
def createEqualityFilter(name: String, literal: Literal) = literal.dataType match {
case BooleanType => new ComparisonFilter(name, literal.value.asInstanceOf[Boolean])
case IntegerType => new ComparisonFilter(name, _ == literal.value.asInstanceOf[Int])
case BooleanType =>
ComparisonFilter.createBooleanFilter(name, literal.value.asInstanceOf[Boolean])
case IntegerType =>
ComparisonFilter.createIntFilter(name, (x: Int) => x == literal.value.asInstanceOf[Int])
case LongType =>
ComparisonFilter.createLongFilter(name, (x: Long) => x == literal.value.asInstanceOf[Long])
case DoubleType =>
ComparisonFilter.createDoubleFilter(
name,
(x: Double) => x == literal.value.asInstanceOf[Double])
case FloatType =>
ComparisonFilter.createFloatFilter(
name,
(x: Float) => x == literal.value.asInstanceOf[Float])
case StringType =>
ComparisonFilter.createStringFilter(name, literal.value.asInstanceOf[String])
}

val filters: Seq[UnboundRecordFilter] = filterExpressions.map {
case Equals(left: Literal, right: NamedExpression) => {
val name: String = right.name
createEqualityFilter(name, left)
}
case Equals(left: NamedExpression, right: Literal) => {
val name: String = left.name
createEqualityFilter(name, right)
}
def createLessThanFilter(name: String, literal: Literal) = literal.dataType match {
case IntegerType =>
ComparisonFilter.createIntFilter(name, (x: Int) => x < literal.value.asInstanceOf[Int])
case LongType =>
ComparisonFilter.createLongFilter(name, (x: Long) => x < literal.value.asInstanceOf[Long])
case DoubleType =>
ComparisonFilter.createDoubleFilter(
name,
(x: Double) => x < literal.value.asInstanceOf[Double])
case FloatType =>
ComparisonFilter.createFloatFilter(
name,
(x: Float) => x < literal.value.asInstanceOf[Float])
}

def createLessThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match {
case IntegerType =>
ComparisonFilter.createIntFilter(name, (x: Int) => x <= literal.value.asInstanceOf[Int])
case LongType =>
ComparisonFilter.createLongFilter(name, (x: Long) => x <= literal.value.asInstanceOf[Long])
case DoubleType =>
ComparisonFilter.createDoubleFilter(
name,
(x: Double) => x <= literal.value.asInstanceOf[Double])
case FloatType =>
ComparisonFilter.createFloatFilter(
name,
(x: Float) => x <= literal.value.asInstanceOf[Float])
}
// TODO: combine these two types somehow?
def createGreaterThanFilter(name: String, literal: Literal) = literal.dataType match {
case IntegerType =>
ComparisonFilter.createIntFilter(name, (x: Int) => x > literal.value.asInstanceOf[Int])
case LongType =>
ComparisonFilter.createLongFilter(name, (x: Long) => x > literal.value.asInstanceOf[Long])
case DoubleType =>
ComparisonFilter.createDoubleFilter(
name,
(x: Double) => x > literal.value.asInstanceOf[Double])
case FloatType =>
ComparisonFilter.createFloatFilter(
name,
(x: Float) => x > literal.value.asInstanceOf[Float])
}
def createGreaterThanOrEqualFilter(name: String, literal: Literal) = literal.dataType match {
case IntegerType =>
ComparisonFilter.createIntFilter(name, (x: Int) => x >= literal.value.asInstanceOf[Int])
case LongType =>
ComparisonFilter.createLongFilter(name, (x: Long) => x >= literal.value.asInstanceOf[Long])
case DoubleType =>
ComparisonFilter.createDoubleFilter(
name,
(x: Double) => x >= literal.value.asInstanceOf[Double])
case FloatType =>
ComparisonFilter.createFloatFilter(
name,
(x: Float) => x >= literal.value.asInstanceOf[Float])
}
// TODO: can we actually rely on the predicate being normalized as in expression < literal?
// That would simplify this pattern matching
// TODO: we currently only filter on non-nullable (Parquet REQUIRED) attributes until
// https://github.com/Parquet/parquet-mr/issues/371
// has been resolved
val filters: Seq[UnboundRecordFilter] = filterExpressions.collect {
case Equals(left: Literal, right: NamedExpression) if !right.nullable =>
createEqualityFilter(right.name, left)
case Equals(left: NamedExpression, right: Literal) if !left.nullable =>
createEqualityFilter(left.name, right)
case LessThan(left: Literal, right: NamedExpression) if !right.nullable =>
createLessThanFilter(right.name, left)
case LessThan(left: NamedExpression, right: Literal) if !left.nullable =>
createLessThanFilter(left.name, right)
case LessThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
createLessThanOrEqualFilter(right.name, left)
case LessThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
createLessThanOrEqualFilter(left.name, right)
case GreaterThan(left: Literal, right: NamedExpression) if !right.nullable =>
createGreaterThanFilter(right.name, left)
case GreaterThan(left: NamedExpression, right: Literal) if !left.nullable =>
createGreaterThanFilter(left.name, right)
case GreaterThanOrEqual(left: Literal, right: NamedExpression) if !right.nullable =>
createGreaterThanOrEqualFilter(right.name, left)
case GreaterThanOrEqual(left: NamedExpression, right: Literal) if !left.nullable =>
createGreaterThanOrEqualFilter(left.name, right)
}
// TODO: How about disjunctions? (Or-ed)
if (filters.length > 0) filters.reduce(AndRecordFilter.and) else null
}

Expand Down Expand Up @@ -83,47 +166,72 @@ class ComparisonFilter(
private val columnName: String,
private var filter: UnboundRecordFilter)
extends UnboundRecordFilter {
def this(columnName: String, value: Boolean) =
this(
override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
filter.bind(readers)
}
}

object ComparisonFilter {
def createBooleanFilter(columnName: String, value: Boolean): UnboundRecordFilter =
new ComparisonFilter(
columnName,
ColumnRecordFilter.column(
columnName,
ColumnPredicates.applyFunctionToBoolean(
new ColumnPredicates.BooleanPredicateFunction {
new BooleanPredicateFunction {
def functionToApply(input: Boolean): Boolean = input == value
})))
def this(columnName: String, func: Int => Boolean) =
this(
}
)))
def createStringFilter(columnName: String, value: String): UnboundRecordFilter =
new ComparisonFilter(
columnName,
ColumnRecordFilter.column(
columnName,
ColumnPredicates.applyFunctionToString (
new ColumnPredicates.PredicateFunction[String] {
def functionToApply(input: String): Boolean = input == value
}
)))
def createIntFilter(columnName: String, func: Int => Boolean): UnboundRecordFilter =
new ComparisonFilter(
columnName,
ColumnRecordFilter.column(
columnName,
ColumnPredicates.applyFunctionToInteger(
new ColumnPredicates.IntegerPredicateFunction {
def functionToApply(input: Int) = if (input != null) func(input) else false
})))
override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
filter.bind(readers)
}
new IntegerPredicateFunction {
def functionToApply(input: Int) = func(input)
}
)))
def createLongFilter(columnName: String, func: Long => Boolean): UnboundRecordFilter =
new ComparisonFilter(
columnName,
ColumnRecordFilter.column(
columnName,
ColumnPredicates.applyFunctionToLong(
new LongPredicateFunction {
def functionToApply(input: Long) = func(input)
}
)))
def createDoubleFilter(columnName: String, func: Double => Boolean): UnboundRecordFilter =
new ComparisonFilter(
columnName,
ColumnRecordFilter.column(
columnName,
ColumnPredicates.applyFunctionToDouble(
new DoublePredicateFunction {
def functionToApply(input: Double) = func(input)
}
)))
def createFloatFilter(columnName: String, func: Float => Boolean): UnboundRecordFilter =
new ComparisonFilter(
columnName,
ColumnRecordFilter.column(
columnName,
ColumnPredicates.applyFunctionToFloat(
new FloatPredicateFunction {
def functionToApply(input: Float) = func(input)
}
)))
}

/*class EqualityFilter(
private val columnName: String,
private var filter: UnboundRecordFilter)
extends UnboundRecordFilter {
def this(columnName: String, value: Boolean) =
this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value)))
def this(columnName: String, value: Int) =
this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value)))
def this(columnName: String, value: Long) =
this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value)))
def this(columnName: String, value: Double) =
this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value)))
def this(columnName: String, value: Float) =
this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value)))
def this(columnName: String, value: String) =
this(columnName, ColumnRecordFilter.column(columnName, ColumnPredicates.equalTo(value)))
override def bind(readers: java.lang.Iterable[ColumnReader]): RecordFilter = {
filter.bind(readers)
}
}*/

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ import parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.util.Utils
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.api.WriteSupport
import parquet.example.data.{GroupWriter, Group}
import parquet.io.api.RecordConsumer
import parquet.hadoop.api.WriteSupport.WriteContext
import parquet.example.data.simple.SimpleGroup

// Write support class for nested groups:
// ParquetWriter initializes GroupWriteSupport with an empty configuration
// (it is after all not intended to be used in this way?)
// and members are private so we need to make our own
private class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] {
var groupWriter: GroupWriter = null
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
groupWriter = new GroupWriter(recordConsumer, schema)
}
override def init(configuration: Configuration): WriteContext = {
new WriteContext(schema, new java.util.HashMap[String, String]())
}
override def write(record: Group) {
groupWriter.write(record)
}
}

private[sql] object ParquetTestData {

Expand Down Expand Up @@ -75,26 +98,42 @@ private[sql] object ParquetTestData {
val configuration: Configuration = ContextUtil.getConfiguration(job)
val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)

val writeSupport = new RowWriteSupport()
writeSupport.setSchema(schema, configuration)
val writer = new ParquetWriter(path, writeSupport)
//val writeSupport = new MutableRowWriteSupport()
//writeSupport.setSchema(schema, configuration)
//val writer = new ParquetWriter(path, writeSupport)
val writeSupport = new TestGroupWriteSupport(schema)
//val writer = //new ParquetWriter[Group](path, writeSupport)
val writer = new ParquetWriter[Group](path, writeSupport)

for(i <- 0 until 15) {
val data = new Array[Any](6)
val record = new SimpleGroup(schema)
//val data = new Array[Any](6)
if (i % 3 == 0) {
data.update(0, true)
//data.update(0, true)
record.add(0, true)
} else {
data.update(0, false)
//data.update(0, false)
record.add(0, false)
}
//if (i % 5 == 0) {
if (i % 5 == 0) {
record.add(1, 5)
// data.update(1, 5)
//} else {
data.update(1, null) // optional
} else {
if (i % 5 == 1) record.add(1, 4)
}
//else {
// data.update(1, null) // optional
//}
data.update(2, "abc")
data.update(3, i.toLong << 33)
data.update(4, 2.5F)
data.update(5, 4.5D)
writer.write(new GenericRow(data.toArray))
//data.update(2, "abc")
record.add(2, "abc")
//data.update(3, i.toLong << 33)
record.add(3, i.toLong << 33)
//data.update(4, 2.5F)
record.add(4, 2.5F)
//data.update(5, 4.5D)
record.add(5, 4.5D)
//writer.write(new GenericRow(data.toArray))
writer.write(record)
}
writer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,20 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {

test("SELECT WHERE") {
val result = sql("SELECT * FROM testsource WHERE myint = 5").collect()
/*test("SELECT WHERE") {
//val result = parquetFile("/home/andre/input.adam").registerAsTable("adamtable")
//sql("SELECT * FROM adamtable WHERE mapq = 0").collect()
//assert(result != null)
//val result = sql("SELECT * FROM testsource WHERE myint = 5").collect()
// TODO: ADD larger case SchemaRDD with filtering on REQUIRED field!
implicit def anyToRow(value: Any): Row = value.asInstanceOf[Row]
TestSQLContext
.parquetFile(ParquetTestData.testNestedDir1.toString)
.toSchemaRDD.registerAsTable("xtmptable")
val result = sql("SELECT * FROM xtmptable WHERE owner = \"Julien Le Dem\"").collect()
>>>>>>> Extending ParquetFilters
assert(result != null)
}*/
}
}

0 comments on commit 6d22666

Please sign in to comment.