diff --git a/pom.xml b/pom.xml index 3e76c19..c4f1ba4 100644 --- a/pom.xml +++ b/pom.xml @@ -11,10 +11,7 @@ true - TODO - TODO - TODO - TODO + out @@ -49,6 +46,7 @@ ${consumer.secret} ${access.token} ${access.token.secret} + ${output.directory} diff --git a/src/main/java/edu/georgetown/cs/ir/hbc/NorthAmericaFilter.java b/src/main/java/edu/georgetown/cs/ir/hbc/NorthAmericaFilter.java index 29607cf..d53cc33 100644 --- a/src/main/java/edu/georgetown/cs/ir/hbc/NorthAmericaFilter.java +++ b/src/main/java/edu/georgetown/cs/ir/hbc/NorthAmericaFilter.java @@ -26,14 +26,29 @@ import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.OAuth1; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.zip.GZIPOutputStream; public class NorthAmericaFilter { + String outDir = null; + + public NorthAmericaFilter(String outDir) { + this.outDir = outDir; + + File f = new File(this.outDir); + f.mkdir(); + } public void oauth(String consumerKey, String consumerSecret, String token, String secret) throws InterruptedException { - BlockingQueue queue = new LinkedBlockingQueue(10000); + BlockingQueue queue = new LinkedBlockingQueue(100000); StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); // add bounding box for US + Canada (erring on the side of including extra regions) @@ -53,26 +68,54 @@ public void oauth(String consumerKey, String consumerSecret, String token, Strin .processor(new StringDelimitedProcessor(queue)) .build(); - // Establish a connection - client.connect(); + Thread t = spawnOutputThread(outDir, queue); - // Do whatever needs to be done with messages - for (int msgRead = 0; msgRead < 1000; msgRead++) { - String msg = queue.take(); - System.out.println(msg); - } + client.connect(); + t.join(); client.stop(); + } + + public Thread spawnOutputThread(final String outDir, final BlockingQueue queue) { + Thread t = new Thread() { + public void run() { + try { + // create a new compressed file every 400k messages + while (true) { + long time = System.currentTimeMillis() / 1000L; + String fn = outDir + "/" + time + ".gz"; + + Writer writer = new OutputStreamWriter(new GZIPOutputStream( + new FileOutputStream(fn)), "UTF-8"); + + for (int i=0; i<400000; i++) { + String msg = queue.take(); + writer.write(msg); + } + + writer.close(); + } + } catch (InterruptedException ie) { + System.err.println("Thread interrupted:" + ie); + } catch(Exception e) { + System.err.println("Thread exception:" + e); + System.exit(3); + } + } + }; + + t.start(); + return t; } public static void main(String[] args) { - if (args.length != 4) { - System.err.println("usage: "); System.exit(1); } - NorthAmericaFilter naf = new NorthAmericaFilter(); + NorthAmericaFilter naf = new NorthAmericaFilter(args[4]); try { naf.oauth(args[0], args[1], args[2], args[3]);