Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API #49

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ client.get("/1.1/users/show.json", { "screen_name" => "sferik" })
client.post("/1.1/statuses/update.json", { "status" => "The world is your oyster." })
```

### Streaming

```crystal
client = Twitter::Streaming::Client.new(consumer_key, consumer_secret, access_token, access_token_secret)

# This will block the thread
# The block will be yielded each time a new tweet (or delete) received
@client.sample do |content|
p content
end
```

If you want to call the API directly, refer to the [API reference](https://dev.twitter.com/rest/reference).

## Contributing
Expand Down
12 changes: 12 additions & 0 deletions src/twitter/delete.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require "json"
require "./ext/json/from_json"
require "./status"

module Twitter
class Delete
JSON.mapping(
status: Status,
timestamp_ms: {type: Time, converter: Time::EpochMillisConverterString},
)
end
end
6 changes: 6 additions & 0 deletions src/twitter/ext/json/from_json.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Converter for string timestamp_ms
module Time::EpochMillisConverterString
def self.from_json(value : JSON::PullParser) : Time
Time.epoch_ms(value.read_string.to_i64)
end
end
12 changes: 12 additions & 0 deletions src/twitter/status.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require "json"

module Twitter
class Status
JSON.mapping(
id: Int64,
id_str: String,
user_id: Int64,
user_id_str: String,
)
end
end
9 changes: 9 additions & 0 deletions src/twitter/streaming/api.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require "./statuses"

module Twitter
module Streaming
module API
include Twitter::Streaming::Statuses
end
end
end
69 changes: 69 additions & 0 deletions src/twitter/streaming/client.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require "http/client"
require "json"
require "oauth"
require "uri"

module Twitter
module Streaming
class Client
include Twitter::Streaming::API

Host = "stream.twitter.com"

property access_token : String
property access_token_secret : String
property consumer_key : String
property consumer_secret : String
property user_agent : String?
property http_client : HTTP::Client

def initialize(@consumer_key, @consumer_secret, @access_token, @access_token_secret, @user_agent = nil, connect_timeout : Time::Span? = 30.seconds)
@user_agent ||= "CrystalTwitterClient/#{Twitter::Version.to_s}"
consumer = OAuth::Consumer.new(Host, consumer_key, consumer_secret)
access_token = OAuth::AccessToken.new(access_token, access_token_secret)
@http_client = HTTP::Client.new(Host, tls: true)
@http_client.connect_timeout = connect_timeout if connect_timeout
consumer.authenticate(http_client, access_token)
end

def get(path : String, params = {} of String => String, &block)
path += "?#{to_query_string(params)}" unless params.empty?
http_client.get(path) do |response|
yield handle_response(response)
end
end

def post(path : String, form = {} of String => String, &block)
http_client.post_form(path, form) do |response|
yield handle_response(response)
end
end

private def handle_response(response : HTTP::Client::Response)
case response.status_code
when 200..299
response.body_io
when 400..499
message = Twitter::Errors.from_json(response.body).errors.first.message
raise Twitter::Errors::ClientError.new(message)
when 502
raise Twitter::Errors::ServerError.new("Bad Gateway")
when 503
raise Twitter::Errors::ServerError.new("Service Unavailable")
when 504
raise Twitter::Errors::ServerError.new("Gateway Timeout")
else
raise Twitter::Errors::ServerError.new("Internal Server Error")
end
end

private def to_query_string(hash : Hash)
HTTP::Params.build do |form_builder|
hash.each do |key, value|
form_builder.add(key, value)
end
end
end
end
end
end
47 changes: 47 additions & 0 deletions src/twitter/streaming/statuses.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require "../delete"
require "../tweet"

module Twitter
module Streaming
module Statuses
def filter(options = {} of String => String)
delimeted_length = options.fetch("delimited", false)

post("/1.1/statuses/filter.json", options) do |response|
loop do
json = if delimeted_length
bytes_to_read = response.gets.not_nil!.to_i
response.gets(bytes_to_read).not_nil!
else
response.gets.not_nil!
end

yield parse_result(json)
end
end
end

def sample(options = {} of String => String, &block)
delimeted_length = options.fetch("delimited", false)

get("/1.1/statuses/sample.json", options) do |response|
loop do
json = if delimeted_length
bytes_to_read = response.gets.not_nil!.to_i
response.gets(bytes_to_read).not_nil!
else
response.gets.not_nil!
end

yield parse_result(json)
end
end
end

def parse_result(json)
(return Twitter::Tweet.from_json(json)) rescue nil
(return Twitter::Delete.from_json(json, root: "delete")) rescue nil
end
end
end
end