Skip to content

Commit

Permalink
write output to files
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewyates committed Nov 13, 2013
1 parent de754df commit 9c62219
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
6 changes: 2 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
<!-- this makes maven-tools not bump us to snapshot versions -->
<stabilized>true</stabilized>
<!-- Fill these in via https://dev.twitter.com/apps -->
<consumer.key>TODO</consumer.key>
<consumer.secret>TODO</consumer.secret>
<access.token>TODO</access.token>
<access.token.secret>TODO</access.token.secret>
<output.directory>out</output.directory>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -49,6 +46,7 @@
<argument>${consumer.secret}</argument>
<argument>${access.token}</argument>
<argument>${access.token.secret}</argument>
<argument>${output.directory}</argument>
</arguments>
</configuration>
</plugin>
Expand Down
65 changes: 54 additions & 11 deletions src/main/java/edu/georgetown/cs/ir/hbc/NorthAmericaFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,29 @@
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.GZIPOutputStream;

public class NorthAmericaFilter {
String outDir = null;

public NorthAmericaFilter(String outDir) {
this.outDir = outDir;

File f = new File(this.outDir);
f.mkdir();
}

public void oauth(String consumerKey, String consumerSecret, String token, String secret) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10000);
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(100000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

// add bounding box for US + Canada (erring on the side of including extra regions)
Expand All @@ -53,26 +68,54 @@ public void oauth(String consumerKey, String consumerSecret, String token, Strin
.processor(new StringDelimitedProcessor(queue))
.build();

// Establish a connection
client.connect();
Thread t = spawnOutputThread(outDir, queue);

// Do whatever needs to be done with messages
for (int msgRead = 0; msgRead < 1000; msgRead++) {
String msg = queue.take();
System.out.println(msg);
}
client.connect();

t.join();
client.stop();
}

public Thread spawnOutputThread(final String outDir, final BlockingQueue<String> queue) {
Thread t = new Thread() {
public void run() {
try {
// create a new compressed file every 400k messages
while (true) {
long time = System.currentTimeMillis() / 1000L;
String fn = outDir + "/" + time + ".gz";

Writer writer = new OutputStreamWriter(new GZIPOutputStream(
new FileOutputStream(fn)), "UTF-8");

for (int i=0; i<400000; i++) {
String msg = queue.take();
writer.write(msg);
}

writer.close();
}
} catch (InterruptedException ie) {
System.err.println("Thread interrupted:" + ie);
} catch(Exception e) {
System.err.println("Thread exception:" + e);
System.exit(3);
}
}
};

t.start();

return t;
}

public static void main(String[] args) {
if (args.length != 4) {
System.err.println("usage: <consumer key> <consumer secret> <token> <secret");
if (args.length != 5) {
System.err.println("usage: <consumer key> <consumer secret> <token> <secret> <output directory>");
System.exit(1);
}

NorthAmericaFilter naf = new NorthAmericaFilter();
NorthAmericaFilter naf = new NorthAmericaFilter(args[4]);

try {
naf.oauth(args[0], args[1], args[2], args[3]);
Expand Down

0 comments on commit 9c62219

Please sign in to comment.