Skip to content

Commit

Permalink
Created test case for parallel execution (Issue #10)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jun 2, 2015
1 parent 4872e47 commit 25eb0f8
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ protected void begin() {
if (cfgMaxRetries != null)
maxRetries = cfgMaxRetries;

final int dumpEveryMs = (Integer) context.getVariable("dumpEveryMs");
if (dumpEveryMs > 0) {
final Integer dumpEveryMs = (Integer) context.getVariable("dumpEveryMs");
if (dumpEveryMs != null && dumpEveryMs > 0) {
dumpTask = new TimerTask() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.orientechnologies.orient.etl;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.tinkerpop.blueprints.impls.orient.OrientGraph;
import junit.framework.TestCase;
Expand Down Expand Up @@ -53,9 +54,15 @@ protected List<ODocument> getResult() {
return ((TestLoader) proc.getLoader()).loadedRecords;
}

protected void process(String cfgJson) {
protected void process(final String cfgJson) {
ODocument cfg = new ODocument().fromJSON(cfgJson, "noMap");
proc.parse(cfg, null);
proc.execute();
}

protected void process(final String cfgJson, final OBasicCommandContext iContext) {
ODocument cfg = new ODocument().fromJSON(cfgJson, "noMap");
proc.parse(cfg, iContext);
proc.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/

package com.orientechnologies.orient.etl;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.etl.extractor.OAbstractExtractor;

import java.io.Reader;
import java.util.Random;

/**
* ETL Mock loader to check the result in tests.
*
* @author Luca Garulli on 27/11/14.
*/
public class RandomExtractor extends OAbstractExtractor {
private long current = 0;
private int fields;
private long items;
private int delay = 0;

@Override
public void configure(OETLProcessor iProcessor, ODocument iConfiguration, OBasicCommandContext iContext) {
super.configure(iProcessor, iConfiguration, iContext);

if (iConfiguration.containsField("items"))
items = ((Number) iConfiguration.field("items")).longValue();
if (iConfiguration.containsField("fields"))
fields = iConfiguration.field("fields");
if (iConfiguration.containsField("delay"))
delay = iConfiguration.field("delay");
}

@Override
public void extract(final Reader iReader) {
}

@Override
public String getUnit() {
return "row";
}

@Override
public boolean hasNext() {
return current < items;
}

@Override
public OExtractedItem next() {
final ODocument doc = new ODocument();

for (int i = 0; i < fields; ++i) {
doc.field("field" + i, "value_" + new Random().nextInt(30));
}

if (delay > 0)
// SIMULATE A SLOW DOWN
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
}

return new OExtractedItem(current++, doc);
}

@Override
public String getName() {
return "random";
}
}
11 changes: 9 additions & 2 deletions src/test/java/com/orientechnologies/orient/etl/TestLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ public TestLoader() {

@Override
public void load(Object input, OCommandContext context) {
loadedRecords.add((ODocument) input);
synchronized (loadedRecords) {
loadedRecords.add((ODocument) input);
}
}

@Override
public long getProgress() {
return loadedRecords.size();
}

@Override
public String getUnit() {
return null;
return "document";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
*
* * Copyright 2010-2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package com.orientechnologies.orient.etl.extractor;

import org.junit.Ignore;
import org.junit.Test;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.etl.ETLBaseTest;
import com.orientechnologies.orient.etl.RandomExtractor;

/**
* Tests ETL JSON Extractor.
*
* @author Luca Garulli
*/
public class OJsonRandomExtractorTest extends ETLBaseTest {

private final static int TOTAL = 1000000;

@Ignore
public void testNonParallel() {
proc.getFactory().registerExtractor(RandomExtractor.class);

process("{extractor : { random: {items: " + TOTAL + ", fields: 10} }, "
+ "loader: { orientdb: { dbURL: 'memory:ETLBaseTest', dbType:'graph', class: 'Person', useLightweightEdges:false, "
+ "classes: [{name: 'Person', extends: 'V'}] } } }");

assertEquals(TOTAL, graph.countVertices("Person"));

int i = 0;
for (ODocument doc : graph.getRawGraph().browseClass("Person")) {
assertEquals(10, doc.fields());
i++;
}
}

@Test
public void testParallel() {
proc.getFactory().registerExtractor(RandomExtractor.class);

process("{extractor : { random: {items: " + TOTAL + ", fields: 10, delay: 0} }, "
+ "loader: { orientdb: { dbURL: 'memory:ETLBaseTest', dbType:'graph', class: 'Person', useLightweightEdges:false, "
+ "classes: [{name: 'Person', extends: 'V', clusters: 8 }] } } }",
(OBasicCommandContext) new OBasicCommandContext().setVariable("parallel", Boolean.TRUE).setVariable("dumpEveryMs", 1000));

assertEquals(TOTAL, graph.countVertices("Person"));

int i = 0;
for (ODocument doc : graph.getRawGraph().browseClass("Person")) {
assertEquals(10, doc.fields());
i++;
}
}
}

0 comments on commit 25eb0f8

Please sign in to comment.