package com.yahoo.labs.samoa.examples; import com.yahoo.labs.samoa.core.ContentEvent; import com.yahoo.labs.samoa.core.Processor; import com.yahoo.labs.samoa.core.TopologyStarter; import com.yahoo.labs.samoa.topology.Stream; import java.util.Random; public class HelloWorldSourceProcessor implements Processor { /** * */ private static final long serialVersionUID = 6212296305865604747L; private Random rnd; private Stream helloWorldStream; private final long maxInst; public HelloWorldSourceProcessor(long maxInst) { this.maxInst = maxInst; } @Override public boolean process(ContentEvent event) { //do nothing, API will be refined further return false; } @Override public void onCreate(int id) { rnd = new Random(id); } @Override public Processor newProcessor(Processor p) { HelloWorldSourceProcessor hwsp = (HelloWorldSourceProcessor) p; return new HelloWorldSourceProcessor(hwsp.maxInst); } public void setHelloWorldStream(Stream hwStream) { this.helloWorldStream = hwStream; } public void sendInstance() { int count = 0; while (count < maxInst) { this.helloWorldStream.put(new HelloWorldContentEvent(rnd.nextInt(), false)); count++; } this.helloWorldStream.put(new HelloWorldContentEvent(-1, true)); } public static class HelloWorldTopologyStarter implements TopologyStarter { /** * */ private static final long serialVersionUID = 5445314667316145715L; private final HelloWorldSourceProcessor hwsp; public HelloWorldTopologyStarter(HelloWorldSourceProcessor hwsp) { this.hwsp = hwsp; } @Override public void start() { this.hwsp.sendInstance(); } } }