Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #482 from maryannxue/master
Browse files Browse the repository at this point in the history
Issue #20 - hashjoin implementation improvements
  • Loading branch information
jtaylor-sfdc committed Oct 17, 2013
2 parents 1b506d2 + e21d1d6 commit 6f14383
Show file tree
Hide file tree
Showing 204 changed files with 11,190 additions and 2,505 deletions.
47 changes: 47 additions & 0 deletions bin/csv-bulk-loader.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash
############################################################################
# Copyright (c) 2013, Salesforce.com, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# Neither the name of Salesforce.com nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
############################################################################

# Phoenix client jar. To generate new jars: $ mvn package -DskipTests

# commandline-options
# -i CSV data file path in hdfs (mandatory)
# -s Phoenix schema name (mandatory if table is created without default phoenix schema name)
# -t Phoenix table name (mandatory)
# -sql Phoenix create table ddl path (mandatory)
# -zk Zookeeper IP:<port> (mandatory)
# -o Output directory path in hdfs (optional)
# -idx Phoenix index table name (optional, index support is yet to be added)
# -error Ignore error while reading rows from CSV ? (1 - YES | 0 - NO, defaults to 1) (optional)
# -help Print all options (optional)

current_dir=$(cd $(dirname $0);pwd)
phoenix_jar_path="$current_dir/../target"
phoenix_client_jar=$(find $phoenix_jar_path/phoenix-*-client.jar)

"$HADOOP_HOME"/bin/hadoop -cp "$phoenix_client_jar" com.salesforce.phoenix.map.reduce.CSVBulkLoader "$@"
10 changes: 4 additions & 6 deletions bin/hbase-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
*/
-->
<configuration>
<!--
<property>
<name>phoenix.mutate.upsertBatchSize</name>
<value>50000</value>
</property>
-->
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
</configuration>
21 changes: 21 additions & 0 deletions bin/readme.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,24 @@ Usage: performance <zookeeper> <row count>

Example: Generates and upserts 1000000 rows and time basic queries on this data
./performance.sh localhost 1000000

csv-bulk-loader.sh
==================

Usage: csv-bulk-loader <option value>

<option> <value>
-i CSV data file path in hdfs (mandatory)
-s Phoenix schema name (mandatory if not default)
-t Phoenix table name (mandatory)
-sql Phoenix create table sql file path (mandatory)
-zk Zookeeper IP:<port> (mandatory)
-o Output directory path in hdfs (optional)
-idx Phoenix index table name (optional, not yet supported)
-error Ignore error while reading rows from CSV? (1-YES | 0-NO, default-1) (optional)
-help Print all options (optional)

Example: Set the HADOOP_HOME variable and run as:
./csv-bulk-loader.sh -i data.csv -s Test -t Phoenix -sql ~/Documents/createTable.sql -zk localhost:2181


5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.salesforce</groupId>
<artifactId>phoenix</artifactId>
<version>2.2.0-SNAPSHOT</version>
<version>2.1.0</version>
<name>Phoenix</name>
<description>A SQL layer over HBase</description>

Expand Down Expand Up @@ -80,6 +80,7 @@
<commons-logging.version>1.1.1</commons-logging.version>
<sqlline.version>1.1.2</sqlline.version>
<guava.version>12.0.1</guava.version>
<jackson.version>1.8.8</jackson.version>

<!-- Test Dependencies -->
<mockito-all.version>1.8.5</mockito-all.version>
Expand Down Expand Up @@ -313,7 +314,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<argLine>-enableassertions -Xmx2500m -Djava.security.egd=file:/dev/./urandom</argLine>
<argLine>-enableassertions -Xmx2750m -Djava.security.egd=file:/dev/./urandom</argLine>
<redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
</configuration>
</plugin>
Expand Down
25 changes: 14 additions & 11 deletions src/build/client.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<include>commons-io:commons-io</include>
<include>commons-lang:commons-lang</include>
<include>commons-logging:commons-logging</include>
<include>commons-cli:commons-cli</include>
<include>com.google.guava:guava</include>
<include>org.apache.hadoop:hadoop*</include>
<include>com.google.protobuf:protobuf-java</include>
Expand All @@ -31,19 +32,13 @@
<include>org.antlr:antlr*</include>
<include>jline:jline</include>
<include>sqlline:sqlline</include>
<include>org.codehaus.jackson:jackson-mapper-asl</include>
<include>org.codehaus.jackson:jackson-core-asl</include>

<!-- <include>*:jar:*</include> -->
</includes>
</dependencySet>
<!-- Separate dependency set to just pull in the jackson stuff since its test
scoped and we only include 'runtime' scoped (which includes compile) dependencies -->
<dependencySet>
<scope>test</scope>
<outputDirectory>/</outputDirectory>
<includes>
<include>org.codehaus.jackson:jackson-core-asl</include>
<include>org.codehaus.jackson:jackson-mapper-asl</include>
</includes>
</dependencySet>

<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
Expand All @@ -65,7 +60,7 @@
<outputDirectory>/</outputDirectory>
</fileSet>
<fileSet>
<!--Get misc project files -->
<!--Get misc project files -->
<directory>${project.basedir}</directory>
<outputDirectory>/</outputDirectory>
<includes>
Expand All @@ -77,5 +72,13 @@
<exclude>build.txt</exclude>
</excludes>
</fileSet>
<fileSet>
<!--Get map-red-config properties files -->
<directory>${project.basedir}/src/main/config</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>csv-bulk-load-config.properties</include>
</includes>
</fileSet>
</fileSets>
</assembly>
71 changes: 50 additions & 21 deletions src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ tokens
INDEX='index';
INCLUDE='include';
WITHIN='within';
ENABLE='enable';
DISABLE='disable';
SET='set';
CAST='cast';
USABLE='usable';
UNUSABLE='unusable';
DISABLE='disable';
REBUILD='rebuild';
}


Expand Down Expand Up @@ -367,7 +369,7 @@ create_table_node returns [CreateTableStatement ret]
: CREATE (tt=VIEW | TABLE) (IF NOT ex=EXISTS)? t=from_table_name
(LPAREN cdefs=column_defs (pk=pk_constraint)? RPAREN)
(p=fam_properties)?
(SPLIT ON v=values)?
(SPLIT ON v=list_expressions)?
{ret = factory.createTable(t, p, cdefs, pk, v, tt!=null ? PTableType.VIEW : PTableType.USER, ex!=null, getBindCount()); }
;

Expand All @@ -377,7 +379,7 @@ create_index_node returns [CreateIndexStatement ret]
(LPAREN pk=index_pk_constraint RPAREN)
(INCLUDE (LPAREN icrefs=column_names RPAREN))?
(p=fam_properties)?
(SPLIT ON v=values)?
(SPLIT ON v=list_expressions)?
{ret = factory.createIndex(i, factory.namedTable(null,t), pk, icrefs, v, p, ex!=null, getBindCount()); }
;

Expand Down Expand Up @@ -446,8 +448,8 @@ drop_index_node returns [DropIndexStatement ret]

// Parse a alter index statement
alter_index_node returns [AlterIndexStatement ret]
: ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name (ENABLE | d=DISABLE)
{ret = factory.alterIndex(factory.namedTable(null,factory.table(t.getSchemaName(),i.getName())), t.getTableName(), ex!=null, d==null ? PIndexState.ENABLE : PIndexState.DISABLE); }
: ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE)
{ret = factory.alterIndex(factory.namedTable(null,factory.table(t.getSchemaName(),i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(s.getText())); }
;

// Parse an alter table statement.
Expand All @@ -458,7 +460,7 @@ alter_table_node returns [AlterTableStatement ret]
;

prop_name returns [String ret]
: p=identifier {$ret = p; }
: p=identifier {$ret = SchemaUtil.normalizeIdentifier(p); }
;

properties returns [Map<String,Object> ret]
Expand Down Expand Up @@ -662,7 +664,7 @@ boolean_expr returns [ParseNode ret]
| (BETWEEN r1=expression AND r2=expression {$ret = factory.between(l,r1,r2,n!=null); } )
| ((IN ((r=bind_expression {$ret = factory.inList(Arrays.asList(l,r),n!=null);} )
| (LPAREN r=select_expression RPAREN {$ret = factory.in(l,r,n!=null);} )
| (v=values {List<ParseNode> il = new ArrayList<ParseNode>(v.size() + 1); il.add(l); il.addAll(v); $ret = factory.inList(il,n!=null);})
| (v=list_expressions {List<ParseNode> il = new ArrayList<ParseNode>(v.size() + 1); il.add(l); il.addAll(v); $ret = factory.inList(il,n!=null);})
)))
))
| { $ret = l; } )
Expand Down Expand Up @@ -710,7 +712,7 @@ expression_term returns [ParseNode ret]
@init{ParseNode n;boolean isAscending=true;}
: field=identifier oj=OUTER_JOIN? {n = factory.column(field); $ret = oj==null ? n : factory.outer(n); }
| tableName=table_name DOT field=identifier oj=OUTER_JOIN? {n = factory.column(tableName, field); $ret = oj==null ? n : factory.outer(n); }
| field=identifier LPAREN l=expression_list RPAREN wg=(WITHIN GROUP LPAREN ORDER BY l2=expression_list (ASC {isAscending = true;} | DESC {isAscending = false;}) RPAREN)?
| field=identifier LPAREN l=expression_list RPAREN wg=(WITHIN GROUP LPAREN ORDER BY l2=expression_terms (ASC {isAscending = true;} | DESC {isAscending = false;}) RPAREN)?
{
FunctionParseNode f = wg==null ? factory.function(field, l) : factory.function(field,l,l2,isAscending);
contextStack.peek().setAggregate(f.isAggregate());
Expand All @@ -731,15 +733,28 @@ expression_term returns [ParseNode ret]
contextStack.peek().setAggregate(f.isAggregate());
$ret = f;
}
| e=expression_literal_bind oj=OUTER_JOIN? { n = e; $ret = oj==null ? n : factory.outer(n); }
| e=literal_or_bind_value oj=OUTER_JOIN? { n = e; $ret = oj==null ? n : factory.outer(n); }
| e=case_statement { $ret = e; }
| LPAREN e=expression RPAREN { $ret = e; }
| LPAREN l=expression_terms RPAREN
{
if(l.size() == 1) {
$ret = l.get(0);
}
else {
$ret = factory.rowValueConstructor(l);
}
}
| CAST e=expression AS dt=identifier { $ret = factory.cast(e, dt);}
;

expression_terms returns [List<ParseNode> ret]
@init{ret = new ArrayList<ParseNode>(); }
: v = expression {$ret.add(v);} (COMMA v = expression {$ret.add(v);} )*
: e = expression {$ret.add(e);} (COMMA e = expression {$ret.add(e);} )*
;

expression_list returns [List<ParseNode> ret]
@init{ret = new ArrayList<ParseNode>(); }
: (v = expression {$ret.add(v);})? (COMMA v = expression {$ret.add(v);} )*
;

index_name returns [NamedNode ret]
Expand All @@ -759,11 +774,25 @@ from_table_name returns [TableName ret]
;

// The lowest level function, which includes literals, binds, but also parenthesized expressions, functions, and case statements.
expression_literal_bind returns [ParseNode ret]
literal_or_bind_value returns [ParseNode ret]
: e=literal { $ret = e; }
| b=bind_name { $ret = factory.bind(b); }
;

// The lowest level function, which includes literals, binds, but also parenthesized expressions, functions, and case statements.
literal_expression returns [ParseNode ret]
: e=literal_or_bind_value { $ret = e; }
| LPAREN l=literal_expressions RPAREN
{
if(l.size() == 1) {
$ret = l.get(0);
}
else {
$ret = factory.rowValueConstructor(l);
}
}
;

// Get a string, integer, double, date, boolean, or NULL value.
literal returns [LiteralParseNode ret]
: t=STRING_LITERAL { ret = factory.literal(t.getText()); }
Expand Down Expand Up @@ -821,9 +850,14 @@ double_literal returns [LiteralParseNode ret]
}
;

values returns [List<ParseNode> ret]
list_expressions returns [List<ParseNode> ret]
@init{ret = new ArrayList<ParseNode>(); }
: LPAREN v = expression_literal_bind {$ret.add(v);} (COMMA v = expression_literal_bind {$ret.add(v);} )* RPAREN
: LPAREN v = literal_expressions RPAREN { $ret = v; }
;

literal_expressions returns [List<ParseNode> ret]
@init{ret = new ArrayList<ParseNode>(); }
: v = literal_expression {$ret.add(v);} (COMMA v = literal_expression {$ret.add(v);} )*
;

// parse a field, if it might be a bind name.
Expand All @@ -847,11 +881,6 @@ parseNoReserved returns [String ret]
: n=NAME { $ret = n.getText(); }
;
expression_list returns [List<ParseNode> ret]
@init{$ret = new ArrayList<ParseNode>();}
: (e=expression {ret.add(e);})? ( COMMA e=expression {ret.add(e);} )*
;
case_statement returns [ParseNode ret]
@init{List<ParseNode> w = new ArrayList<ParseNode>(4);}
: CASE e1=expression (WHEN e2=expression THEN t=expression {w.add(t);w.add(factory.equal(e1,e2));})+ (ELSE el=expression {w.add(el);})? END {$ret = factory.caseWhen(w);}
Expand Down
5 changes: 5 additions & 0 deletions src/main/config/csv-bulk-load-config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mapreduce.map.output.compress=true
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.CompressionCodec
io.sort.record.percent=0.2
io.sort.factor=20
mapred.tasktracker.map.tasks.maximum=10
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void preLogArchive(Path oldPath, Path newPath) throws IOException {
//take a write lock on the index - any pending index updates will complete before we finish
LOG.debug("Taking INDEX_UPDATE writelock");
logArchiveLock.lock();
LOG.debug("Got the INDEX_UPDATE writelock");
}

@Override
Expand Down
Loading

0 comments on commit 6f14383

Please sign in to comment.