238 lines
6.8 KiB
Ruby
238 lines
6.8 KiB
Ruby
module Faraday
|
|
class Adapter
|
|
# EventMachine adapter is useful for either asynchronous requests
|
|
# when in EM reactor loop or for making parallel requests in
|
|
# synchronous code.
|
|
class EMHttp < Faraday::Adapter
|
|
module Options
|
|
def connection_config(env)
|
|
options = {}
|
|
configure_proxy(options, env)
|
|
configure_timeout(options, env)
|
|
configure_socket(options, env)
|
|
configure_ssl(options, env)
|
|
options
|
|
end
|
|
|
|
def request_config(env)
|
|
options = {
|
|
:body => read_body(env),
|
|
:head => env[:request_headers],
|
|
# :keepalive => true,
|
|
# :file => 'path/to/file', # stream data off disk
|
|
}
|
|
configure_compression(options, env)
|
|
options
|
|
end
|
|
|
|
def read_body(env)
|
|
body = env[:body]
|
|
body.respond_to?(:read) ? body.read : body
|
|
end
|
|
|
|
def configure_proxy(options, env)
|
|
if proxy = request_options(env)[:proxy]
|
|
options[:proxy] = {
|
|
:host => proxy[:uri].host,
|
|
:port => proxy[:uri].port,
|
|
:authorization => [proxy[:user], proxy[:password]]
|
|
}
|
|
end
|
|
end
|
|
|
|
def configure_socket(options, env)
|
|
if bind = request_options(env)[:bind]
|
|
options[:bind] = {
|
|
:host => bind[:host],
|
|
:port => bind[:port]
|
|
}
|
|
end
|
|
end
|
|
|
|
def configure_ssl(options, env)
|
|
if env[:url].scheme == 'https' && env[:ssl]
|
|
options[:ssl] = {
|
|
:cert_chain_file => env[:ssl][:ca_file],
|
|
:verify_peer => env[:ssl].fetch(:verify, true)
|
|
}
|
|
end
|
|
end
|
|
|
|
def configure_timeout(options, env)
|
|
timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
|
|
options[:connect_timeout] = options[:inactivity_timeout] = timeout
|
|
options[:connect_timeout] = open_timeout if open_timeout
|
|
end
|
|
|
|
def configure_compression(options, env)
|
|
if env[:method] == :get and not options[:head].key? 'accept-encoding'
|
|
options[:head]['accept-encoding'] = 'gzip, compressed'
|
|
end
|
|
end
|
|
|
|
def request_options(env)
|
|
env[:request]
|
|
end
|
|
end
|
|
|
|
include Options
|
|
|
|
dependency 'em-http'
|
|
|
|
self.supports_parallel = true
|
|
|
|
def self.setup_parallel_manager(options = nil)
|
|
Manager.new
|
|
end
|
|
|
|
def call(env)
|
|
super
|
|
perform_request env
|
|
@app.call env
|
|
end
|
|
|
|
def perform_request(env)
|
|
if parallel?(env)
|
|
manager = env[:parallel_manager]
|
|
manager.add {
|
|
perform_single_request(env).
|
|
callback { env[:response].finish(env) }
|
|
}
|
|
else
|
|
unless EventMachine.reactor_running?
|
|
error = nil
|
|
# start EM, block until request is completed
|
|
EventMachine.run do
|
|
perform_single_request(env).
|
|
callback { EventMachine.stop }.
|
|
errback { |client|
|
|
error = error_message(client)
|
|
EventMachine.stop
|
|
}
|
|
end
|
|
raise_error(error) if error
|
|
else
|
|
# EM is running: instruct upstream that this is an async request
|
|
env[:parallel_manager] = true
|
|
perform_single_request(env).
|
|
callback { env[:response].finish(env) }.
|
|
errback {
|
|
# TODO: no way to communicate the error in async mode
|
|
raise NotImplementedError
|
|
}
|
|
end
|
|
end
|
|
rescue EventMachine::Connectify::CONNECTError => err
|
|
if err.message.include?("Proxy Authentication Required")
|
|
raise Error::ConnectionFailed, %{407 "Proxy Authentication Required "}
|
|
else
|
|
raise Error::ConnectionFailed, err
|
|
end
|
|
rescue => err
|
|
if defined?(OpenSSL) && OpenSSL::SSL::SSLError === err
|
|
raise Faraday::SSLError, err
|
|
else
|
|
raise
|
|
end
|
|
end
|
|
|
|
# TODO: reuse the connection to support pipelining
|
|
def perform_single_request(env)
|
|
req = EventMachine::HttpRequest.new(env[:url], connection_config(env))
|
|
req.setup_request(env[:method], request_config(env)).callback { |client|
|
|
save_response(env, client.response_header.status, client.response) do |resp_headers|
|
|
client.response_header.each do |name, value|
|
|
resp_headers[name.to_sym] = value
|
|
end
|
|
end
|
|
}
|
|
end
|
|
|
|
def error_message(client)
|
|
client.error or "request failed"
|
|
end
|
|
|
|
def raise_error(msg)
|
|
errklass = Faraday::Error::ClientError
|
|
if msg == Errno::ETIMEDOUT
|
|
errklass = Faraday::Error::TimeoutError
|
|
msg = "request timed out"
|
|
elsif msg == Errno::ECONNREFUSED
|
|
errklass = Faraday::Error::ConnectionFailed
|
|
msg = "connection refused"
|
|
elsif msg == "connection closed by server"
|
|
errklass = Faraday::Error::ConnectionFailed
|
|
end
|
|
raise errklass, msg
|
|
end
|
|
|
|
def parallel?(env)
|
|
!!env[:parallel_manager]
|
|
end
|
|
|
|
# The parallel manager is designed to start an EventMachine loop
|
|
# and block until all registered requests have been completed.
|
|
class Manager
|
|
def initialize
|
|
reset
|
|
end
|
|
|
|
def reset
|
|
@registered_procs = []
|
|
@num_registered = 0
|
|
@num_succeeded = 0
|
|
@errors = []
|
|
@running = false
|
|
end
|
|
|
|
def running?() @running end
|
|
|
|
def add
|
|
if running?
|
|
perform_request { yield }
|
|
else
|
|
@registered_procs << Proc.new
|
|
end
|
|
@num_registered += 1
|
|
end
|
|
|
|
def run
|
|
if @num_registered > 0
|
|
@running = true
|
|
EventMachine.run do
|
|
@registered_procs.each do |proc|
|
|
perform_request(&proc)
|
|
end
|
|
end
|
|
if @errors.size > 0
|
|
raise Faraday::Error::ClientError, @errors.first || "connection failed"
|
|
end
|
|
end
|
|
ensure
|
|
reset
|
|
end
|
|
|
|
def perform_request
|
|
client = yield
|
|
client.callback { @num_succeeded += 1; check_finished }
|
|
client.errback { @errors << client.error; check_finished }
|
|
end
|
|
|
|
def check_finished
|
|
if @num_succeeded + @errors.size == @num_registered
|
|
EventMachine.stop
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
begin
|
|
require 'openssl'
|
|
rescue LoadError
|
|
warn "Warning: no such file to load -- openssl. Make sure it is installed if you want HTTPS support"
|
|
else
|
|
require 'faraday/adapter/em_http_ssl_patch'
|
|
end if Faraday::Adapter::EMHttp.loaded?
|