EventSource
rest-core — A modular Ruby REST client collection/infrastructure
Middleware
rest-moreRestCore::RailsUtilUtil
rest-core — Modular Ruby clients interface for REST APIs
Middleware
rest-moreRestCore::RailsUtilUtil
RC::Dropbox
(via OAuth 1.0a)RC::Facebook
(via OAuth 2, most completed)RC::Github
(via OAuth 2)RC::Instagram
(via OAuth 2)
rest-core — Modular Ruby clients interface for REST APIs
Middleware
rest-moreRC::Linkedin
(via OAuth 1.0a)RC::Dropbox
(via OAuth 1.0a)RC::Facebook
(via OAuth 2, most completed)RC::Github
(via OAuth 2)RC::Instagram
(via OAuth 2)
rest-core — Modular Ruby clients interface for REST APIs
MiddlewareRC::StackExchange
(via OAuth 2)RC::Linkedin
(via OAuth 1.0a)RC::Dropbox
(via OAuth 1.0a)RC::Facebook
(via OAuth 2, most completed)RC::Github
(via OAuth 2)RC::Instagram
(via OAuth 2)
rest-core — Modular Ruby clients interface for REST APIsRC::Twitter
(via OAuth 1.0a)RC::StackExchange
(via OAuth 2)RC::Linkedin
(via OAuth 1.0a)RC::Dropbox
(via OAuth 1.0a)RC::Facebook
(via OAuth 2, most completed)RC::Github
(via OAuth 2)RC::Instagram
(via OAuth 2)EventSource
Promise#then
Promise.claim
puts client.get('ruby')['name']
puts client.get('ruby')['name'] # -- this blocks
puts client.get('opal')['name']
puts %w[ruby opal].map{ |n| client.get(n) }.
map{ |r| r['name'] }
a, b = client.get('ruby'), client.get('opal')
puts a['name'], b['name']
puts %w[ruby opal].map{ |n| client.get(n) }.
map{ |r| r['name'] }
client.post('ruby/cut', style: 'brilliant')
client.post('ruby/cut', style: 'brilliant'){}
Promise#then
Promise.claim
client.post('ruby/cut', style: 'brilliant'){}
client.post('ruby/cut', style: 'brilliant') do |r|
puts r['status']
end
client.post('ruby/cut', style: 'brilliant') do |r|
puts r.kind_of?(Exception)
end
c = client
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
c.get("#{f.first['name']}/friends") do |ff|
c.post("#{ff.first['name']}/mails", 'Hi')
end
end
end
c = client
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
ff = c.get("#{f.first['name']}/friends")
c.post("#{ff.first['name']}/mails", 'Hi')
end
end
c = client
%w[ruby opal].map do |n|
c.get("#{n}/friends")
end.map do |f|
c.get("#{f.first['name']}/friends")
end.map do |ff|
c.post("#{ff.first['name']}/mails", 'Hi')
end
Promise#then
Promise.claim
RC::Client#wait
c = client
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
ff = c.get("#{f.first['name']}/friends")
c.post("#{ff.first['name']}/mails", 'Hi')
end
end
RC::Client#wait
r = []
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
ff = c.get("#{f.first['name']}/friends")
r << c.post("#{ff.first['name']}/mails", 'Hi')
end
end
c.wait # only for this client instance
puts r
RC::Client.wait
r = []
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
ff = c.get("#{f.first['name']}/friends")
r << c.post("#{ff.first['name']}/mails", 'Hi')
end
end
c.class.wait # for all requests from this class
puts r
RC::Client.wait
r = []
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
ff = c.get("#{f.first['name']}/friends")
r << c.post("#{ff.first['name']}/mails", 'Hi')
end
end
c.class.wait # useful for gracefully shutdown
puts r
Promise#then
Promise.claim
Promise#then
r = []
%w[ruby opal].each do |n|
c.get("#{n}/friends") do |f|
ff = c.get("#{f.first['name']}/friends")
r << c.post("#{ff.first['name']}/mails", 'Hi')
end
end
c.wait
puts r
Promise#then
r = %w[ruby opal].map do |n|
c.get("#{n}/friends", {}, RC::ASYNC => true,
RC::RESPONSE_KEY => RC::PROMISE).
then do |res|
f = res[RC::RESPONSE_BODY]
ff = c.get("#{f.first['name']}/friends")
st = c.post("#{ff.first['name']}/mails", 'Hi')
res.merge(RC::RESPONSE_BODY => st)
end.future_response[RC::RESPONSE_BODY]
end
puts r
Promise#then
r = %w[ruby opal].map do |n|
c.get("#{n}/friends", {}, RC::ASYNC => true,
RC::RESPONSE_KEY => RC::PROMISE).
then do |res|
f = res[RC::RESPONSE_BODY]
ff = c.get("#{f.first['name']}/friends")
st = c.post("#{ff.first['name']}/mails", 'Hi')
res.merge(RC::RESPONSE_BODY => st)
end.future_response[RC::RESPONSE_BODY]
end
puts r
# RC::Github#all
def all p, query={}, o={}
q = {:per_page => MAX_PER_PAGE}.merge(query)
r = get(p, q, o.merge(RC::ASYNC => true,
RESPONSE_KEY => PROMISE)).then{ |res|
b = res[RESPONSE_BODY] +
page_range(res).map{ |page|
get(p, q.merge(:page => page),
o.merge(RESPONSE_KEY=>RESPONSE_BODY))
}.inject([], &:+)
res.merge(RESPONSE_BODY => b)
}.future_response
if block_given?
yield(r[response_key(o)]); self
else
r[response_key(o)]
end
end
Promise#then
Promise.claim
Promise.claim
RC::Cache
RC::Promise.
claim(res, k, body, 200, 'Header' => 'Here').
future_response
Promise.claim
RC::Cache
def RC::Promise.claim env, k=RC.id,
body, status, headers
promise = new(env, k)
promise.fulfill(body, status, headers)
promise
end
EventSource
EventSource
EventSource
is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource
API is standardized as part of HTML5 by the W3C.
EventSource
var es = new EventSource('/api/sse');
es.onmessage = function (event) {
console.log(event.data);
};
EventSource
get '/api/sse' do # (jellyfish's/raggi's example)
headers_merge(
'Content-Type' => 'text/event-stream',
'rack.hijack' => lambda do |sock|
loop do
sock.write("data: Aloha!\n\n")
sleep 5
end
end)
end
EventSource
EventSource
require 'rest-firebase'
es = RestFirebase.new(auth: false).event_source(
'https://SampleChat.firebaseIO-demo.com/')
EventSource
es.onerror do |error|
puts "ERROR: #{error}"
end
es.onreconnect do
!!@start # always reconnect unless stopping
end
es.onmessage do |event, data|
puts "EVENT: #{event}, DATA: #{data}"
end
EventSource
@start = true
es.start
rd, wr = IO.pipe
Signal.trap('INT') do # intercept ctrl-c
@start = false # stop reconnecting
es.close # close socket
es.wait # wait for shutting down
wr.puts # unblock main thread
end
rd.gets # main thread blocks here
EventSource
curl -X POST -d '{"message": "Hi!"}' \
https://SampleChat.firebaseIO-demo.com/ruby.json
EVENT: put, DATA:
{"path"=>"/ruby/-JfOWDn1LQJn-ng9EcF6",
"data"=>{"message"=>"Hi!"}}
EventSource
Mutex
(mutual exclusion) (lock)ConditionVariable
(monitor)WeakRef
(weak reference)
Race Conditionis the behavior of a software system where the output is dependent on the sequence or timing of other uncontrollable events.
Mutex
(lock)is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many threads of execution.
Race Condition
@i = 0 #
#---thread a---#---thread b-----
tmp = @i
@i = tmp + 1
tmp = @i
@i = tmp + 1
# -------------#----------------
@i # => 2 #
Race Condition
@i = 0 #
#---thread a---#---thread b-----
tmp = @i
tmp = @i
@i = tmp + 1
@i = tmp + 1
# -------------#----------------
@i # => 1 #
Mutex
(lock)
@i = 0 #
#---thread a---#---thread b-----
synchronize do
tmp = @i
@i = tmp + 1
end
synchronize do
tmp = @i
@i = tmp + 1
end
# -------------#----------------
@i # => 2 #
Mutex
(mutual exclusion) (lock)ConditionVariable
(monitor)WeakRef
(weak reference)ConditionVariable
(monitor)is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true.
ConditionVariable
(monitor)
#---thread a---#---thread b----------------------
synchronize do # <= a acquires lock
condv.wait # <= a releases lock and sleeps
synchronize do # b acquires lock
@i = 1
condv.signal # a wakes up
end # b releases lock
# <= a acquires lock
puts @i # => 1
end # <= a releases lock
Mutex
(mutual exclusion) (lock)ConditionVariable
(monitor)WeakRef
(weak reference)WeakRef
(weak reference)is a reference that does not protect the referenced object from collection by a garbage collector, unlike a strong reference.
WeakRef
(weak reference)
require 'weakref'
ref = WeakRef.new(Object.new)
GC.start # collecting ^^^^^^
p ref.weakref_alive? # => nil
p ref # raise WeakRef::RefError
# Invalid Reference - probably recycled
EventSource
Promise
DetailThreadPool
Detail
def f n=1
if n.zero?
# Thread.new do
raise '' rescue p $!.backtrace
# end.join
else
f(n-1)
end
end
f # => ["-:4:in `f'", "-:7:in `f'",
# "-:10:in `<main>'"]
def f n=1
if n.zero?
Thread.new do
raise '' rescue p $!.backtrace
end.join
else
f(n-1)
end
end
f # => ["-:4:in `block in f'"]
#
def defer
if pool_size < 0 # none / blocking
else
# retain the backtrace so far
backtrace = caller + self.class.backtrace
if pool_size > 0 # thread pool
else
self.thread = Thread.new do
Thread.current[:backtrace] = backtrace
protected_yield{ yield }
end
end
end
end
ThreadPool
at_exit do
RC::Universal.shutdown
end
def shutdown # RC::Client.shutdown
thread_pool.shutdown
wait
end
def shutdown # RC::ThreadPool#shutdown
workers.size.times{ trim(true) }
workers.first.join && trim(true) until
workers.empty?
queue.clear
end
RC::Universal.pool_size = 1
client = RC::Universal.new
client.get('ruby/friends') do |response|
path = "#{response.first['name']}/friends"
puts client.get(path).first['name']
end
RC::Universal.pool_size = 1
client = RC::Universal.new
client.get('ruby/friends') do |response|
path = "#{response.first['name']}/friends"
puts client.get(path).first['name'] # DEADLOCK!
end
RC::Universal.pool_size = 1
client = RC::Universal.new(:timeout => 10)
client.get('ruby/friends') do |response|
path = "#{response.first['name']}/friends"
puts client.get(path).first['name'] # TIMEOUT!
end
RC::Universal.pool_size = 1
client = RC::Universal.new(:timeout => 10)
1000.times do |i|
client.get("ruby/#{i}"){ |res| puts res }
end
10ms
RC::Universal.pool_size = 10
client = RC::Universal.new(:timeout => 10)
1000.times do |i|
client.get("ruby/#{i}"){ |res| puts res }
end
100ms
Timer.interval
(thus inaccurate)
RC::Timer.interval = 0.5 # Default 1 second
Promise
DetailThreadPool
DetailClient.wait
Promise#wait
Promise#fulfilling
Promise#rejecting
Future#method_missing
Promise#yield
# Wait for all the requests
# to be done for this client
def wait ps=promises, m=mutex
return self if ps.empty?
current_promises = nil
m.synchronize do
current_promises = ps.dup
ps.clear
end
current_promises.each do |p|
next unless p.weakref_alive?
begin
p.wait
rescue WeakRef::RefError
end # it's gc'ed after we think it's alive
end
wait(ps, m)
end
Client.wait
Promise#wait
Promise#fulfilling
Promise#rejecting
Future#method_missing
Promise#yield
Promise#wait
# called in client thread (client.wait)
def wait
# it might be awaken by some other futures!
mutex.synchronize do
condv.wait(mutex) until done?
end unless done?
end
Client.wait
Promise#wait
Promise#fulfilling
Promise#rejecting
Future#method_missing
Promise#yield
Promise#fulfilling
def fulfilling body, status, headers, socket=nil
self.body, self.status,
self.headers, self.socket =
body, status, headers, socket
# under ASYNC callback, should call immediately
callback if immediate
ensure
# client or response might be waiting
condv.broadcast
end
Promise#rejecting
def rejecting error
self.error = if error.kind_of?(Exception)
error
else
Error.new(error || 'unknown')
end
fulfilling('', 0, {})
end
Client.wait
Promise#wait
Promise#fulfilling
Promise#rejecting
Future#method_missing
Promise#yield
Future#method_missing
def method_missing msg, *args, &block
@promise.yield[@target].
__send__(msg, *args, &block)
end
Promise#yield
# called in client thread
# (from the future (e.g. body))
def yield
wait
callback
end
Promise
DetailThreadPool
DetailPromise
DetailPromise#defer
Promise#protected_yield
Promise#cancel_task
RC::Universal.pool_size = -1
def defer
if pool_size < 0
# set working thread
self.thread = Thread.current
# avoid any exception and do the job
protected_yield{ yield }
else
# ...
end
end
RC::Universal.pool_size = 0
def defer
if pool_size < 0 # none / blocking
else
# retain the backtrace so far
backtrace = caller + self.class.backtrace
if pool_size > 0 # thread pool
else
self.thread = Thread.new do
Thread.current[:backtrace] = backtrace
protected_yield{ yield }
end
end
end
end
RC::Universal.pool_size = 10
if pool_size > 0
mutex.synchronize do
# still timing it out if the task
# never processed
env[TIMER].on_timeout{cancel_task}
if env[TIMER]
self.task = client_class.thread_pool.
defer(mutex) do
Thread.current[:backtrace] = backtrace
protected_yield{ yield }
Thread.current[:backtrace] = nil
end
end
else # thread spawn
end
def defer
if pool_size < 0 # negative number for blocking call
self.thread = Thread.current # set working thread
protected_yield{ yield } # avoid any exception and do the job
else
backtrace = caller + self.class.backtrace # retain the backtrace so far
if pool_size > 0
mutex.synchronize do
# still timing it out if the task never processed
env[TIMER].on_timeout{ cancel_task } if env[TIMER]
self.task = client_class.thread_pool.defer(mutex) do
Thread.current[:backtrace] = backtrace
protected_yield{ yield }
Thread.current[:backtrace] = nil
end
end
else
self.thread = Thread.new do
Thread.current[:backtrace] = backtrace
protected_yield{ yield }
end
end
end
end
Promise
DetailPromise#defer
Promise#protected_yield
Promise#cancel_task
def protected_yield
if env[TIMER]
timeout_protected_yield{ yield }
else
yield
end
rescue Exception => e # could be Timeout::Error
mutex.synchronize do
self.class.set_backtrace(e)
if done? # log user callback error
callback_error(e)
else # IOError, SystemCallError, etc
begin
rejecting(e) # would call user callback
rescue Exception => f
callback_error(f)# log user callback error
end
end
end
end
Promise#timeout_protected_yield
def timeout_protected_yield
# timeout might already be set for
# thread_pool (pool_size > 0)
env[TIMER].on_timeout{ cancel_task } unless
env[TIMER].timer
yield
ensure
env[TIMER].cancel
end
Promise
DetailPromise#defer
Promise#protected_yield
Promise#cancel_task
def cancel_task bt=nil
mutex.synchronize do
next if done? # don't cancel if it's done
if t = thread || task.thread
# raise Timeout::Error to working thread
t.raise(env[TIMER].error)
else # task was queued and never started,
begin # just cancel it and fulfil the
task.cancel # promise with Timeout::Error
rejecting(env[TIMER].error)
rescue Exception => e
# log user callback error
callback_error(e) do
e.set_backtrace(e.backtrace + (bt||[]))
end
end
end
end
end
Promise
DetailThreadPool
DetailThreadPool
DetailThreadPool#defer
ThreadPool#spawn_worker
ThreadPool::Task
ThreadPool#defer
def defer mutex=nil, &job
task = Task.new(job, mutex)
queue << task
spawn_worker if waiting == 0 &&
workers.size < max_size
task
end
ThreadPool
DetailThreadPool#defer
ThreadPool#spawn_worker
ThreadPool::Task
ThreadPool#spawn_worker
def spawn_worker
workers << Thread.new{
task = nil
begin
mutex.synchronize{ @waiting += 1 }
task = queue.pop(idle_time)
mutex.synchronize{ @waiting -= 1 }
end while task.call(Thread.current)
mutex.synchronize do
workers.delete(Thread.current)
end
}
end
ThreadPool
DetailThreadPool#defer
ThreadPool#spawn_worker
ThreadPool::Task
ThreadPool::Task
class Task < Struct.new(:job, :mutex, :thread,
:cancelled)
# this should never fail
def call working_thread
mutex.synchronize do
return if cancelled
self.thread = working_thread
end
job.call
true
end
# we should probably synchronize this, too!
def cancel; self.cancelled = true; end
end