Skip to content

Commit

Permalink
Implement Twitter::Streaming::Client
Browse files Browse the repository at this point in the history
  • Loading branch information
sferik committed Sep 11, 2013
1 parent 67de147 commit 23afe90
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ gem 'yard'
group :development do
gem 'kramdown'
gem 'pry'
gem 'pry-rescue'
gem 'pry-stack_explorer'
gem 'pry-debugger', :platforms => :mri_19
end

Expand Down
1 change: 1 addition & 0 deletions lib/twitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
require 'twitter/settings'
require 'twitter/size'
require 'twitter/source_user'
require 'twitter/streaming/client'
require 'twitter/suggestion'
require 'twitter/target_user'
require 'twitter/trend'
Expand Down
75 changes: 75 additions & 0 deletions lib/twitter/streaming/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
require 'twitter/client'
require 'twitter/streaming/connection'
require 'twitter/streaming/proxy'
require 'twitter/streaming/request'
require 'twitter/streaming/response'

module Twitter
module Streaming
class Client < Twitter::Client
attr_writer :connection

def initialize
super
@connection = Twitter::Streaming::Connection.new
@request_options = {
:auto_reconnect => true,
:content_type => 'application/x-www-form-urlencoded',
:headers => {},
:oauth => credentials,
:port => 443,
:ssl => true,
:timeout => 0,
:user_agent => user_agent,
}
end

def user(&block)
user!(&block).value
end

def user!(&block)
request({
:method => 'GET',
:host => 'userstream.twitter.com',
:path => '/1.1/user.json',
:params => {},
}) do |data|
begin
block.call(Tweet.new(data))
rescue StandardError => error
p(error)
end
end
end

def track(*keywords, &block)
track!(*keywords, &block).value
end

def track!(*keywords, &block)
options = {
:method => 'POST',
:host => 'stream.twitter.com',
:path => '/1.1/statuses/filter.json',
:params => {'track' => keywords.join(',')},
}
request(options) do |data|
begin
block.call(Tweet.new(data))
rescue StandardError => error
p(error)
end
end
end

def request(options, &block)
# TODO: consider HTTP::Request
request = Twitter::Streaming::Request.new(@request_options.merge(options))
response = Twitter::Streaming::Response.new(block)
@connection.future.stream(request, response)
end

end
end
end
29 changes: 29 additions & 0 deletions lib/twitter/streaming/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require 'celluloid/io'
require 'http/parser'
require 'openssl'
require 'resolv'

module Twitter
module Streaming
class Connection
include Celluloid::IO

def stream(request, response)
client_context = OpenSSL::SSL::SSLContext.new
parser = Http::Parser.new(response)
client = Celluloid::IO::TCPSocket.new(Resolv.getaddress(request.host), request.port)
ssl_client = Celluloid::IO::SSLSocket.new(client, client_context)
ssl_client.connect
# TODO: HTTP::Request#stream
ssl_client.write(request.to_s)

while body = ssl_client.readpartial(1024)
parser << body
end
rescue EOFError
puts "Stream ended"
end

end
end
end
25 changes: 25 additions & 0 deletions lib/twitter/streaming/proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module Twitter
module Streaming
class Proxy

attr_reader :user, :password, :uri

def initialize(options = {})
@user = options.delete(:user)
@password = options.delete(:password)
@uri = options.delete(:uri)
end

def header
["#{@user}:#{@password}"].pack('m').delete("\r\n") if credentials?
end

private

def credentials?
@user && @password
end

end
end
end
134 changes: 134 additions & 0 deletions lib/twitter/streaming/request.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
require 'uri'
require 'simple_oauth'

module Twitter
module Streaming
class Request
attr_reader :proxy, :options

def initialize(options = {})
@options = options
@proxy = Proxy.new(@options.delete(:proxy)) if @options[:proxy]
end

def host
options[:host]
end

def port
options[:port]
end

def to_s
content = query

data = []
data << "#{request_method} #{request_uri} HTTP/1.1"
data << "Host: #{@options[:host]}"

if gzip?
data << 'Connection: Keep-Alive'
data << 'Accept-Encoding: deflate, gzip'
else
data << 'Accept: */*'
end

data << "User-Agent: #{@options[:user_agent]}" if @options[:user_agent]
if put_or_post?
data << "Content-Type: #{@options[:content_type]}"
data << "Content-Length: #{content.bytesize}"
end
data << "Authorization: #{oauth_header}" if oauth?
data << "Proxy-Authorization: Basic #{proxy.header}" if proxy?

@options[:headers].each do |name, value|
data << "#{name}: #{value}"
end

data << "\r\n"
data = data.join("\r\n")
data << content if post? || put?
data
end

def proxy?
@proxy
end

private

def get?
request_method == 'GET'
end

def post?
request_method == 'POST'
end

def put?
request_method == 'PUT'
end

def put_or_post?
put? || post?
end

def gzip?
@options[:encoding] && @options[:encoding] == 'gzip'
end

def request_method
@options[:method].to_s.upcase
end

def params
flat = {}
@options[:params].each do |param, val|
next if val.to_s.empty? || (val.respond_to?(:empty?) && val.empty?)
val = val.join(",") if val.respond_to?(:join)
flat[param.to_s] = val.to_s
end
flat
end

def query
params.map do |param, value|
[param, SimpleOAuth::Header.encode(value)].join("=")
end.sort.join("&")
end

def oauth?
@options[:oauth] && !@options[:oauth].empty?
end

def oauth_header
SimpleOAuth::Header.new(@options[:method], full_uri, params, @options[:oauth])
end

def proxy_uri
"#{uri_base}:#{@options[:port]}#{path}"
end

def request_uri
proxy? ? proxy_uri : path
end

def path
get? ? "#{@options[:path]}?#{query}" : @options[:path]
end

def uri_base
"#{protocol}://#{@options[:host]}"
end

def protocol
@options[:ssl] ? 'https' : 'http'
end

def full_uri
proxy? ? proxy_uri : "#{uri_base}#{request_uri}"
end

end
end
end
25 changes: 25 additions & 0 deletions lib/twitter/streaming/response.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require 'buftok'

module Twitter
module Streaming
class Response
def initialize(block)
@block = block
@tokenizer = BufferedTokenizer.new("\r\n")
end

def on_headers_complete(headers)
puts headers
# handle response codes
end

def on_body(data)
@tokenizer.extract(data).each do |line|
next if line.empty?
@block.call(JSON.parse(line, :symbolize_names => true))
end
end

end
end
end
3 changes: 3 additions & 0 deletions spec/fixtures/track_streaming.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"created_at":"Wed Apr 06 19:13:37 +0000 2011","id":55709764298092545,"id_str":"55709764298092545","text":"The problem with your code is that it's doing exactly what you told it to do.","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":7505382,"id_str":"7505382","name":"Erik Michaels-Ober","screen_name":"sferik","location":"San Francisco","description":"Write code. Not too much. Mostly Ruby.","url":"https:\/\/github.com\/sferik","entities":{"url":{"urls":[{"url":"https:\/\/github.com\/sferik","expanded_url":null,"indices":[0,25]}]},"description":{"urls":[]}},"protected":false,"followers_count":2479,"friends_count":200,"listed_count":132,"created_at":"Mon Jul 16 12:59:01 +0000 2007","favourites_count":4421,"utc_offset":-28800,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":8730,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"000000","profile_background_image_url":"http:\/\/a0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_tile":false,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/7505382\/1349499693","profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":{"type":"Point","coordinates":[37.78349999,-122.39362884]},"coordinates":{"type":"Point","coordinates":[-122.39362884,37.78349999]},"place":{"id":"5c92ab5379de3839","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5c92ab5379de3839.json","place_type":"neighborhood","name":"South Beach","full_name":"South Beach, San Francisco","country_code":"US","country":"United States","bounding_box":{"type":"Polygon","coordinates":[[[-122.403482,37.777529],[-122.387436,37.777529],[-122.387436,37.794486],[-122.403482,37.794486]]]},"attributes":{}},"contributors":null,"retweet_count":316,"entities":{"hashtags":[],"urls":[],"user_mentions":[]},"favorited":false,"retweeted":false}
{"created_at":"Wed Apr 06 19:13:37 +0000 2011","id":55709764298092545,"id_str":"55709764298092545","text":"The problem with your code is that it's doing exactly what you told it to do.","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":7505382,"id_str":"7505382","name":"Erik Michaels-Ober","screen_name":"sferik","location":"San Francisco","description":"Write code. Not too much. Mostly Ruby.","url":"https:\/\/github.com\/sferik","entities":{"url":{"urls":[{"url":"https:\/\/github.com\/sferik","expanded_url":null,"indices":[0,25]}]},"description":{"urls":[]}},"protected":false,"followers_count":2479,"friends_count":200,"listed_count":132,"created_at":"Mon Jul 16 12:59:01 +0000 2007","favourites_count":4421,"utc_offset":-28800,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":8730,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"000000","profile_background_image_url":"http:\/\/a0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_tile":false,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/7505382\/1349499693","profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":{"type":"Point","coordinates":[37.78349999,-122.39362884]},"coordinates":{"type":"Point","coordinates":[-122.39362884,37.78349999]},"place":{"id":"5c92ab5379de3839","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5c92ab5379de3839.json","place_type":"neighborhood","name":"South Beach","full_name":"South Beach, San Francisco","country_code":"US","country":"United States","bounding_box":{"type":"Polygon","coordinates":[[[-122.403482,37.777529],[-122.387436,37.777529],[-122.387436,37.794486],[-122.403482,37.794486]]]},"attributes":{}},"contributors":null,"retweet_count":316,"entities":{"hashtags":[],"urls":[],"user_mentions":[]},"favorited":false,"retweeted":false}

31 changes: 31 additions & 0 deletions spec/twitter/streaming/client_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
require 'helper'

describe Twitter::Streaming::Client do
before do
@client = Twitter::Streaming::Client.new
end

class FakeConnection
include Celluloid::IO
def initialize(body)
@body = body
end

def stream(request, response)
# TODO: assert request is valid
@body.each_line do |line|
response.on_body(line)
end
end
end

it "supports tracking keywords" do
@client.connection = FakeConnection.new(fixture("track_streaming.json"))

tweets = []
@client.track("india") do |tweet|
tweets << tweet
end
expect(tweets).to have(2).entries
end
end
4 changes: 4 additions & 0 deletions twitter.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'twitter/version'

Gem::Specification.new do |spec|
spec.add_dependency 'buftok', '~> 0.1.0'
spec.add_dependency 'celluloid', '~> 0.14.0'
spec.add_dependency 'celluloid-io', '~> 0.14.0'
spec.add_dependency 'faraday', ['~> 0.8', '< 0.10']
spec.add_dependency 'http_parser.rb', '~> 0.5'
spec.add_dependency 'simple_oauth', '~> 0.2'
spec.add_development_dependency 'bundler', '~> 1.0'
spec.authors = ["Erik Michaels-Ober", "John Nunemaker", "Wynn Netherland", "Steve Richert", "Steve Agalloco"]
Expand Down

0 comments on commit 23afe90

Please sign in to comment.