Skip to content

Commit

Permalink
JobVerificationEngine DAG construction (no driver assigned partition …
Browse files Browse the repository at this point in the history
…IDs) (#131)

* Support for multiple branched CaseWhen

* Interval (#116)

* add date_add, interval sql still running into issues

* Add Interval SQL support

* uncomment out the other tests

* resolve comments

* change interval equality

Co-authored-by: Eric Feng <[email protected]>

* Remove partition ID argument from enclaves

* Fix comments

* updates

* Modifications to integrate crumb, log-mac, and all-outputs_mac, wip

* Store log mac after each output buffer, add all-outputs-mac to each encryptedblocks wip

* Add all_outputs_mac to all EncryptedBlocks once all log_macs have been generated

* Almost builds

* cpp builds

* Use ubyte for all_outputs_mac

* use Mac for all_outputs_mac

* Hopefully this works for flatbuffers all_outputs_mac mutation, cpp builds

* Scala builds now too, running into error with union

* Stuff builds, error with all outputs mac serialization. this commit uses all_outputs_mac as Mac table

* Fixed bug, basic encryption / show works

* All single partition tests pass, multiple partiton passes until tpch-9

* All tests pass except tpch-9 and skew join

* comment tpch back in

* Check same number of ecalls per partition - exception for scanCollectLastPrimary(?)

* First attempt at constructing executed DAG

* Fix typos

* Rework graph

* Add log macs to graph nodes

* Construct expected DAG and refactor JobNode.
Refactor construction of executed DAG.

* Implement 'paths to sink' for a DAG

* add crumb for last ecall

* Fix NULL handling for aggregation (#130)

* Modify COUNT and SUM to correctly handle NULL values

* Change average to support NULL values

* Fix

* Changing operator matching from logical to physical (#129)

* WIP

* Fix

* Unapply change

* Aggregation rewrite (#132)

* Merge new aggregate

* Uncomment log_mac_lst clear

* Clean up comments

* Clean up comments in other files

* Remove print statements from unit tests

Co-authored-by: Andrew Law <[email protected]>
Co-authored-by: Eric Feng <[email protected]>
Co-authored-by: Eric Feng <[email protected]>
Co-authored-by: Chester Leung <[email protected]>
Co-authored-by: Wenting Zheng <[email protected]>
  • Loading branch information
6 people authored Feb 9, 2021
1 parent 9c9dae1 commit a95f2c7
Show file tree
Hide file tree
Showing 24 changed files with 1,552 additions and 1,107 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ concurrentRestrictions in Global := Seq(
fork in Test := true
fork in run := true

testOptions in Test += Tests.Argument("-oF")
javaOptions in Test ++= Seq("-Xmx2048m", "-XX:ReservedCodeCacheSize=384m")
javaOptions in run ++= Seq(
"-Xmx2048m", "-XX:ReservedCodeCacheSize=384m", "-Dspark.master=local[1]")
Expand Down Expand Up @@ -217,7 +218,7 @@ buildFlatbuffersTask := {
if (gen.isEmpty || fbsLastMod > gen.map(_.lastModified).max) {
for (fbs <- flatbuffers) {
streams.value.log.info(s"Generating flatbuffers for ${fbs}")
if (Seq(flatc.getPath, "--cpp", "-o", flatbuffersGenCppDir.value.getPath, fbs.getPath).! != 0
if (Seq(flatc.getPath, "--cpp", "--gen-mutable", "-o", flatbuffersGenCppDir.value.getPath, fbs.getPath).! != 0
|| Seq(flatc.getPath, "--java", "-o", javaOutDir.getPath, fbs.getPath).! != 0) {
sys.error("Flatbuffers build failed.")
}
Expand Down
155 changes: 34 additions & 121 deletions src/enclave/App/App.cpp

Large diffs are not rendered by default.

32 changes: 14 additions & 18 deletions src/enclave/App/SGXEnclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ extern "C" {
JNIEnv *, jobject, jlong);

JNIEXPORT jbyteArray JNICALL Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_Project(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray);

JNIEXPORT jbyteArray JNICALL Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_Filter(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray);

JNIEXPORT jbyteArray JNICALL Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_Encrypt(
JNIEnv *, jobject, jlong, jbyteArray);
Expand All @@ -24,51 +24,47 @@ extern "C" {
JNIEnv *, jobject, jlong, jbyteArray);

JNIEXPORT jbyteArray JNICALL Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_Sample(
JNIEnv *, jobject, jlong, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_FindRangeBounds(
JNIEnv *, jobject, jlong, jbyteArray, jint, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jint, jbyteArray);

JNIEXPORT jobjectArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_PartitionForSort(
JNIEnv *, jobject, jlong, jbyteArray, jint, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jint, jbyteArray, jbyteArray);

JNIEXPORT jbyteArray JNICALL Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_ExternalSort(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_ScanCollectLastPrimary(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_NonObliviousSortMergeJoin(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray);

JNIEXPORT jobject JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_NonObliviousAggregateStep1(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jint);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_NonObliviousAggregateStep2(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jint);
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_NonObliviousAggregate(
JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jboolean);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_CountRowsPerPartition(
JNIEnv *, jobject, jlong, jbyteArray, jint);
JNIEnv *, jobject, jlong, jbyteArray);


JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_ComputeNumRowsPerPartition(
JNIEnv *, jobject, jlong, jint, jbyteArray, jint);
JNIEnv *, jobject, jlong, jint, jbyteArray);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_LocalLimit(
JNIEnv *, jobject, jlong, jint, jbyteArray, jint);
JNIEnv *, jobject, jlong, jint, jbyteArray);

JNIEXPORT jbyteArray JNICALL
Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_LimitReturnRows(
JNIEnv *, jobject, jlong, jlong, jbyteArray, jbyteArray, jint);
JNIEnv *, jobject, jlong, jlong, jbyteArray, jbyteArray);

JNIEXPORT jbyteArray JNICALL Java_edu_berkeley_cs_rise_opaque_execution_SGXEnclave_GenerateReport(
JNIEnv *, jobject, jlong);
Expand Down
121 changes: 16 additions & 105 deletions src/enclave/Enclave/Aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,127 +5,38 @@
#include "FlatbuffersWriters.h"
#include "common.h"

void non_oblivious_aggregate_step1(
void non_oblivious_aggregate(
uint8_t *agg_op, size_t agg_op_length,
uint8_t *input_rows, size_t input_rows_length,
uint8_t **first_row, size_t *first_row_length,
uint8_t **last_group, size_t *last_group_length,
uint8_t **last_row, size_t *last_row_length) {
uint8_t **output_rows, size_t *output_rows_length,
bool is_partial) {

FlatbuffersAggOpEvaluator agg_op_eval(agg_op, agg_op_length);
RowReader r(BufferRefView<tuix::EncryptedBlocks>(input_rows, input_rows_length));
RowWriter first_row_writer;
RowWriter last_group_writer;
RowWriter last_row_writer;
RowWriter w;

FlatbuffersTemporaryRow prev, cur;
size_t count = 0;

while (r.has_next()) {
prev.set(cur.get());
cur.set(r.next());

if (prev.get() == nullptr) {
EnclaveContext::getInstance().set_curr_row_writer(std::string("first_row"));
first_row_writer.append(cur.get());
}

if (!r.has_next()) {
EnclaveContext::getInstance().set_curr_row_writer(std::string("last_row"));
last_row_writer.append(cur.get());
}


if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) {
w.append(agg_op_eval.evaluate());
agg_op_eval.reset_group();
}
agg_op_eval.aggregate(cur.get());
count += 1;
}

EnclaveContext::getInstance().set_curr_row_writer(std::string("last_group"));
last_group_writer.append(agg_op_eval.get_partial_agg());

std::string ecall = std::string("nonObliviousAggregateStep1");

EnclaveContext::getInstance().set_curr_row_writer(std::string("first_row"));
first_row_writer.output_buffer(first_row, first_row_length, ecall);

EnclaveContext::getInstance().set_curr_row_writer(std::string("last_group"));
last_group_writer.output_buffer(last_group, last_group_length, ecall);

EnclaveContext::getInstance().set_curr_row_writer(std::string("last_row"));
last_row_writer.output_buffer(last_row, last_row_length, ecall);
}

void non_oblivious_aggregate_step2(
uint8_t *agg_op, size_t agg_op_length,
uint8_t *input_rows, size_t input_rows_length,
uint8_t *next_partition_first_row, size_t next_partition_first_row_length,
uint8_t *prev_partition_last_group, size_t prev_partition_last_group_length,
uint8_t *prev_partition_last_row, size_t prev_partition_last_row_length,
uint8_t **output_rows, size_t *output_rows_length) {

FlatbuffersAggOpEvaluator agg_op_eval(agg_op, agg_op_length);
RowReader r(BufferRefView<tuix::EncryptedBlocks>(input_rows, input_rows_length));
RowReader next_partition_first_row_reader(
BufferRefView<tuix::EncryptedBlocks>(
next_partition_first_row, next_partition_first_row_length));
RowReader prev_partition_last_group_reader(
BufferRefView<tuix::EncryptedBlocks>(
prev_partition_last_group, prev_partition_last_group_length));
RowReader prev_partition_last_row_reader(
BufferRefView<tuix::EncryptedBlocks>(
prev_partition_last_row, prev_partition_last_row_length));
RowWriter w;

if (next_partition_first_row_reader.num_rows() > 1) {
throw std::runtime_error(
std::string("Incorrect number of starting rows from next partition passed: expected 0 or 1, got ")
+ std::to_string(next_partition_first_row_reader.num_rows()));
}
if (prev_partition_last_group_reader.num_rows() > 1) {
throw std::runtime_error(
std::string("Incorrect number of ending groups from prev partition passed: expected 0 or 1, got ")
+ std::to_string(prev_partition_last_group_reader.num_rows()));
}
if (prev_partition_last_row_reader.num_rows() > 1) {
throw std::runtime_error(
std::string("Incorrect number of ending rows from prev partition passed: expected 0 or 1, got ")
+ std::to_string(prev_partition_last_row_reader.num_rows()));
}

const tuix::Row *next_partition_first_row_ptr =
next_partition_first_row_reader.has_next() ? next_partition_first_row_reader.next() : nullptr;
agg_op_eval.set(prev_partition_last_group_reader.has_next() ?
prev_partition_last_group_reader.next() : nullptr);
const tuix::Row *prev_partition_last_row_ptr =
prev_partition_last_row_reader.has_next() ? prev_partition_last_row_reader.next() : nullptr;

FlatbuffersTemporaryRow prev, cur(prev_partition_last_row_ptr), next;
bool stop = false;
if (r.has_next()) {
next.set(r.next());
} else {
stop = true;
// Skip outputting the final row if the number of input rows is 0 AND
// 1. It's a grouping aggregation, OR
// 2. It's a global aggregation, the mode is final
if (!(count == 0 && (agg_op_eval.get_num_grouping_keys() > 0 || (agg_op_eval.get_num_grouping_keys() == 0 && !is_partial)))) {
w.append(agg_op_eval.evaluate());
}
while (!stop) {
// Populate prev, cur, next to enable lookbehind and lookahead
prev.set(cur.get());
cur.set(next.get());
if (r.has_next()) {
next.set(r.next());
} else {
next.set(next_partition_first_row_ptr);
stop = true;
}

if (prev.get() != nullptr && !agg_op_eval.is_same_group(prev.get(), cur.get())) {
agg_op_eval.reset_group();
}
agg_op_eval.aggregate(cur.get());

// Output the current aggregate if it is the last aggregate for its run
if (next.get() == nullptr || !agg_op_eval.is_same_group(cur.get(), next.get())) {
w.append(agg_op_eval.evaluate());
}
}

w.output_buffer(output_rows, output_rows_length, std::string("nonObliviousAggregateStep2"));
w.output_buffer(output_rows, output_rows_length, std::string("nonObliviousAggregate"));
}

15 changes: 3 additions & 12 deletions src/enclave/Enclave/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,10 @@
#ifndef AGGREGATE_H
#define AGGREGATE_H

void non_oblivious_aggregate_step1(
void non_oblivious_aggregate(
uint8_t *agg_op, size_t agg_op_length,
uint8_t *input_rows, size_t input_rows_length,
uint8_t **first_row, size_t *first_row_length,
uint8_t **last_group, size_t *last_group_length,
uint8_t **last_row, size_t *last_row_length);

void non_oblivious_aggregate_step2(
uint8_t *agg_op, size_t agg_op_length,
uint8_t *input_rows, size_t input_rows_length,
uint8_t *next_partition_first_row, size_t next_partition_first_row_length,
uint8_t *prev_partition_last_group, size_t prev_partition_last_group_length,
uint8_t *prev_partition_last_row, size_t prev_partition_last_row_length,
uint8_t **output_rows, size_t *output_rows_length);
uint8_t **output_rows, size_t *output_rows_length,
bool is_partial);

#endif // AGGREGATE_H
Loading

0 comments on commit a95f2c7

Please sign in to comment.