package com.yahoo.labs.samoa.examples; import com.github.javacliparser.Configurable; import com.github.javacliparser.IntOption; import com.github.javacliparser.StringOption; import com.yahoo.labs.samoa.core.TopologyStarter; import com.yahoo.labs.samoa.examples.HelloWorldSourceProcessor.HelloWorldTopologyStarter; import com.yahoo.labs.samoa.tasks.Task; import com.yahoo.labs.samoa.topology.ComponentFactory; import com.yahoo.labs.samoa.topology.Stream; import com.yahoo.labs.samoa.topology.Topology; import com.yahoo.labs.samoa.topology.TopologyBuilder; import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HelloWorldTask implements Task, Configurable { /** * */ private static final long serialVersionUID = -5134935141154021352L; private static Logger logger = LoggerFactory.getLogger(HelloWorldTask.class); private TopologyBuilder builder; private Topology helloWorldTopology; private HelloWorldSourceProcessor sourceProcessor; private HelloWorldTopologyStarter starter; private HelloWorldDestinationProcessor destProcessor; public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', "Maximum number of instances to test/train on (-1 = no limit).", 1000000, -1, Integer.MAX_VALUE); public StringOption evaluationNameOption = new StringOption("evalutionName", 'n', "Identifier of the evaluation", "HelloWorldTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); public IntOption helloWorldParallelismOption = new IntOption( "parallelismOption", 'p', "The number of destination PI", 1, 1, Integer.MAX_VALUE); @Override public void init() { if (builder == null) { builder = new TopologyBuilder(); logger.debug("Sucessfully instantiating TopologyBuilder"); builder.initTopology(evaluationNameOption.getValue()); logger.debug("Sucessfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); } //create sourceProcesor sourceProcessor = new HelloWorldSourceProcessor(instanceLimitOption.getValue()); starter = new HelloWorldTopologyStarter(sourceProcessor); builder.addEntranceProcessor(sourceProcessor, starter); //create Stream Stream stream = builder.createStream(sourceProcessor); sourceProcessor.setHelloWorldStream(stream); //create Destination Processor destProcessor = new HelloWorldDestinationProcessor(); builder.addProcessor(destProcessor, helloWorldParallelismOption.getValue()); builder.connectInputShuffleStream(stream, destProcessor); //build the topology helloWorldTopology = builder.build(); logger.debug("Sucessfully building the topology"); } @Override public Topology getTopology() { return helloWorldTopology; } @Override public TopologyStarter getTopologyStarter() { return starter; } @Override public void setFactory(ComponentFactory factory) { // for now use this code first, will be removed after dynamic binding is done builder = new TopologyBuilder(factory); logger.debug("Sucessfully instantiating TopologyBuilder"); builder.initTopology(evaluationNameOption.getValue()); logger.debug("Sucessfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); } }