Skip to content

Commit

Permalink
HIVE-1534. Join filters do not work correctly with outer joins
Browse files Browse the repository at this point in the history
(Amareshwari Sriramadasu via namit)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/hive/trunk@999637 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Namit Jain committed Sep 21, 2010
1 parent e0ed293 commit 0645936
Show file tree
Hide file tree
Showing 33 changed files with 6,914 additions and 308 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ Trunk - Unreleased
property is added in run config
(Steven Wong via jvs)

HIVE-1534. Join filters do not work correctly with outer joins
(Amareshwari Sriramadasu via namit)

TESTS

HIVE-1464. improve test query performance
Expand Down
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public static enum ConfVars {
// For har files
HIVEARCHIVEENABLED("hive.archive.enabled", false),
HIVEHARPARENTDIRSETTABLE("hive.archive.har.parentdir.settable", false),
HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true),

;

Expand Down
4 changes: 4 additions & 0 deletions data/files/in3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1235
40
48
100100
135 changes: 127 additions & 8 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,12 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ReflectionUtils;

Expand Down Expand Up @@ -94,17 +98,34 @@ public Object topObj() {

protected transient int numAliases; // number of aliases
/**
* The expressions for join outputs.
* The expressions for join inputs.
*/
protected transient Map<Byte, List<ExprNodeEvaluator>> joinValues;

/**
* The filters for join
*/
protected transient Map<Byte, List<ExprNodeEvaluator>> joinFilters;

/**
* The ObjectInspectors for the join inputs.
*/
protected transient Map<Byte, List<ObjectInspector>> joinValuesObjectInspectors;

/**
* The ObjectInspectors for join filters.
*/
protected transient
Map<Byte, List<ObjectInspector>> joinFilterObjectInspectors;
/**
* The standard ObjectInspectors for the join inputs.
*/
protected transient Map<Byte, List<ObjectInspector>> joinValuesStandardObjectInspectors;
/**
* The standard ObjectInspectors for the row container.
*/
protected transient
Map<Byte, List<ObjectInspector>> rowContainerStandardObjectInspectors;

protected static transient Byte[] order; // order in which the results should
// be output
Expand Down Expand Up @@ -183,6 +204,8 @@ public CommonJoinOperator(CommonJoinOperator<T> clone) {
this.posToAliasMap = clone.posToAliasMap;
this.spillTableDesc = clone.spillTableDesc;
this.statsMap = clone.statsMap;
this.joinFilters = clone.joinFilters;
this.joinFilterObjectInspectors = clone.joinFilterObjectInspectors;
}

protected int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
Expand Down Expand Up @@ -282,18 +305,40 @@ protected void initializeOp(Configuration hconf) throws HiveException {

joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();

joinFilters = new HashMap<Byte, List<ExprNodeEvaluator>>();

if (order == null) {
order = conf.getTagOrder();
}
condn = conf.getConds();
noOuterJoin = conf.isNoOuterJoin();

totalSz = populateJoinKeyValue(joinValues, conf.getExprs());
populateJoinKeyValue(joinFilters, conf.getFilters());

joinValuesObjectInspectors = getObjectInspectorsFromEvaluators(joinValues,
inputObjInspectors);
joinFilterObjectInspectors = getObjectInspectorsFromEvaluators(joinFilters,
inputObjInspectors);
joinValuesStandardObjectInspectors = getStandardObjectInspectors(joinValuesObjectInspectors);

if (noOuterJoin) {
rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
} else {
Map<Byte, List<ObjectInspector>> rowContainerObjectInspectors =
new HashMap<Byte, List<ObjectInspector>>();
for (Byte alias : order) {
ArrayList<ObjectInspector> rcOIs = new ArrayList<ObjectInspector>();
rcOIs.addAll(joinValuesObjectInspectors.get(alias));
// for each alias, add object inspector for boolean as the last element
rcOIs.add(
PrimitiveObjectInspectorFactory.writableBooleanObjectInspector);
rowContainerObjectInspectors.put(alias, rcOIs);
}
rowContainerStandardObjectInspectors =
getStandardObjectInspectors(rowContainerObjectInspectors);
}

dummyObj = new Object[numAliases];
dummyObjVectors = new RowContainer[numAliases];

Expand All @@ -313,6 +358,13 @@ protected void initializeOp(Configuration hconf) throws HiveException {
for (int j = 0; j < sz; j++) {
nr.add(null);
}

if (!noOuterJoin) {
// add whether the row is filtered or not
// this value does not matter for the dummyObj
// because the join values are already null
nr.add(new BooleanWritable(false));
}
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
RowContainer<ArrayList<Object>> values = getRowContainer(hconf, pos,
Expand Down Expand Up @@ -354,7 +406,7 @@ RowContainer getRowContainer(Configuration hconf, byte pos, Byte alias,
List<String> colNames = Utilities.getColumnNames(tblDesc.getProperties());
// object inspector for serializing input tuples
rcOI = ObjectInspectorFactory.getStandardStructObjectInspector(colNames,
joinValuesStandardObjectInspectors.get(pos));
rowContainerStandardObjectInspectors.get(pos));
}

rc.setSerDe(serde, rcOI);
Expand Down Expand Up @@ -413,6 +465,12 @@ private void initSpillTables() {
colTypes.append(valueCols.get(k).getTypeString());
colTypes.append(',');
}
if (!noOuterJoin) {
colNames.append("filtered");
colNames.append(',');
colTypes.append(TypeInfoFactory.booleanTypeInfo.getTypeName());
colTypes.append(',');
}
// remove the last ','
colNames.setLength(colNames.length() - 1);
colTypes.setLength(colTypes.length() - 1);
Expand Down Expand Up @@ -449,22 +507,46 @@ protected int getNextSize(int sz) {

protected transient Byte alias;

/**
* Return the key as a standard object. StandardObject can be inspected by a
* standard ObjectInspector.
*/
protected static ArrayList<Object> computeKeys(Object row,
List<ExprNodeEvaluator> keyFields, List<ObjectInspector> keyFieldsOI)
throws HiveException {

// Compute the keys
ArrayList<Object> nr = new ArrayList<Object>(keyFields.size());
for (int i = 0; i < keyFields.size(); i++) {

nr.add(ObjectInspectorUtils.copyToStandardObject(keyFields.get(i)
.evaluate(row), keyFieldsOI.get(i),
ObjectInspectorCopyOption.WRITABLE));
}

return nr;
}

/**
* Return the value as a standard object. StandardObject can be inspected by a
* standard ObjectInspector.
*/
protected static ArrayList<Object> computeValues(Object row,
List<ExprNodeEvaluator> valueFields, List<ObjectInspector> valueFieldsOI)
throws HiveException {
List<ExprNodeEvaluator> valueFields, List<ObjectInspector> valueFieldsOI,
List<ExprNodeEvaluator> filters, List<ObjectInspector> filtersOI,
boolean noOuterJoin) throws HiveException {

// Compute the values
ArrayList<Object> nr = new ArrayList<Object>(valueFields.size());
for (int i = 0; i < valueFields.size(); i++) {

nr.add(ObjectInspectorUtils.copyToStandardObject(valueFields.get(i)
.evaluate(row), valueFieldsOI.get(i),
ObjectInspectorCopyOption.WRITABLE));
}
if (!noOuterJoin) {
// add whether the row is filtered or not.
nr.add(new BooleanWritable(isFiltered(row, filters, filtersOI)));
}

return nr;
}
Expand Down Expand Up @@ -548,6 +630,12 @@ private ArrayList<boolean[]> joinObjectsLeftOuterJoin(
ArrayList<boolean[]> resNulls, ArrayList<boolean[]> inputNulls,
ArrayList<Object> newObj, IntermediateObject intObj, int left,
boolean newObjNull) {
// newObj is null if is already null or
// if the row corresponding to the left alias does not pass through filter
newObjNull = newObjNull ||
((BooleanWritable) (intObj.getObjs()[left].get(
joinValues.get(order[left]).size()))).get();

Iterator<boolean[]> nullsIter = inputNulls.iterator();
while (nullsIter.hasNext()) {
boolean[] oldNulls = nullsIter.next();
Expand Down Expand Up @@ -593,10 +681,14 @@ private ArrayList<boolean[]> joinObjectsRightOuterJoin(
}
}

// if the row does not pass through filter, all old Objects are null
if (((BooleanWritable)newObj.get(newObj.size()-1)).get()) {
allOldObjsNull = true;
}
nullsIter = inputNulls.iterator();
while (nullsIter.hasNext()) {
boolean[] oldNulls = nullsIter.next();
boolean oldObjNull = oldNulls[left];
boolean oldObjNull = oldNulls[left] || allOldObjsNull;

if (!oldObjNull) {
boolean[] newNulls = new boolean[intObj.getCurSize()];
Expand Down Expand Up @@ -652,13 +744,21 @@ private ArrayList<boolean[]> joinObjectsFullOuterJoin(
break;
}
}

// if the row does not pass through filter, all old Objects are null
if (((BooleanWritable)newObj.get(newObj.size()-1)).get()) {
allOldObjsNull = true;
}
boolean rhsPreserved = false;

nullsIter = inputNulls.iterator();
while (nullsIter.hasNext()) {
boolean[] oldNulls = nullsIter.next();
boolean oldObjNull = oldNulls[left];

// old obj is null even if the row corresponding to the left alias
// does not pass through filter
boolean oldObjNull = oldNulls[left] || ((BooleanWritable)
(intObj.getObjs()[left].get(joinValues.get(order[left]).size()))).get()
|| allOldObjsNull;
if (!oldObjNull) {
boolean[] newNulls = new boolean[intObj.getCurSize()];
copyOldArray(oldNulls, newNulls);
Expand Down Expand Up @@ -900,6 +1000,25 @@ protected void reportProgress() {
}
}

/**
* Returns true if the row does not pass through filters.
*/
protected static Boolean isFiltered(Object row,
List<ExprNodeEvaluator> filters, List<ObjectInspector> ois)
throws HiveException {
// apply join filters on the row.
Boolean ret = false;
for (int j = 0; j < filters.size(); j++) {
Object condition = filters.get(j).evaluate(row);
ret = (Boolean) ((PrimitiveObjectInspector)
ois.get(j)).getPrimitiveJavaObject(condition);
if (ret == null || !ret) {
return true;
}
}
return false;
}

/**
* All done.
*
Expand Down
3 changes: 2 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void processOp(Object row, int tag) throws HiveException {
}

ArrayList<Object> nr = computeValues(row, joinValues.get(alias),
joinValuesObjectInspectors.get(alias));
joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
joinFilterObjectInspectors.get(alias), noOuterJoin);

if (handleSkewJoin) {
skewJoinKeyContext.handleSkew(tag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ public void processOp(Object row, int tag) throws HiveException {
}

// compute keys and values as StandardObjects
ArrayList<Object> key = computeValues(row, joinKeys.get(alias),
ArrayList<Object> key = computeKeys(row, joinKeys.get(alias),
joinKeysObjectInspectors.get(alias));
ArrayList<Object> value = computeValues(row, joinValues.get(alias),
joinValuesObjectInspectors.get(alias));
joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
joinFilterObjectInspectors.get(alias), noOuterJoin);

// does this source need to be stored in the hash map
if (tag != posBigTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,11 @@ public void processOp(Object row, int tag) throws HiveException {

byte alias = (byte) tag;
// compute keys and values as StandardObjects
ArrayList<Object> key = computeValues(row, joinKeys.get(alias),
ArrayList<Object> key = computeKeys(row, joinKeys.get(alias),
joinKeysObjectInspectors.get(alias));
ArrayList<Object> value = computeValues(row, joinValues.get(alias),
joinValuesObjectInspectors.get(alias));
joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
joinFilterObjectInspectors.get(alias), noOuterJoin);

//have we reached a new key group?
boolean nextKeyGroup = processKey(alias, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
needed_columns.add(position);
}
}

desc.setVirtualCols(newVirtualCols);
scanOp.setNeededColumnIDs(needed_columns);
return null;
Expand Down Expand Up @@ -600,6 +600,20 @@ private static void pruneJoinOperator(NodeProcessorCtx ctx,

List<String> childColLists = cppCtx.genColLists(op);

//add the columns in join filters
Set<Map.Entry<Byte, List<ExprNodeDesc>>> filters =
conf.getFilters().entrySet();
Iterator<Map.Entry<Byte, List<ExprNodeDesc>>> iter = filters.iterator();
while (iter.hasNext()) {
Map.Entry<Byte, List<ExprNodeDesc>> entry = iter.next();
Byte tag = entry.getKey();
for (ExprNodeDesc desc : entry.getValue()) {
List<String> cols = prunedColLists.get(tag);
cols = Utilities.mergeUniqElems(cols, desc.getCols());
prunedColLists.put(tag, cols);
}
}

RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRR();
RowResolver newJoinRR = new RowResolver();
ArrayList<String> outputCols = new ArrayList<String>();
Expand Down
Loading

0 comments on commit 0645936

Please sign in to comment.