Class: AMQP::Client::RPCClient
- Inherits:
-
Object
- Object
- AMQP::Client::RPCClient
- Defined in:
- lib/amqp/client/rpc_client.rb
Overview
Reusable RPC client, when RPC performance is important
Instance Method Summary collapse
-
#call(method, arguments, timeout: nil, **properties) ⇒ String
Do a RPC call, sends a messages, waits for a response.
-
#close ⇒ Object
Closes the channel used by the RPCClient.
-
#initialize(channel) ⇒ RPCClient
constructor
A new instance of RPCClient.
-
#start ⇒ self
Start listening for responses from the RPC calls.
Constructor Details
#initialize(channel) ⇒ RPCClient
Returns a new instance of RPCClient.
8 9 10 11 12 13 |
# File 'lib/amqp/client/rpc_client.rb', line 8 def initialize(channel) @ch = channel @correlation_id = 0 @lock = Mutex.new @messages = ::Queue.new end |
Instance Method Details
#call(method, arguments, timeout: nil, **properties) ⇒ String
Do a RPC call, sends a messages, waits for a response
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/amqp/client/rpc_client.rb', line 31 def call(method, arguments, timeout: nil, **properties) correlation_id = @lock.synchronize { @correlation_id += 1 }.to_s(36) @ch.basic_publish(arguments, exchange: "", routing_key: method.to_s, reply_to: "amq.rabbitmq.reply-to", correlation_id:, **properties) Timeout.timeout(timeout) do # Timeout the whole loop if we never find the right correlation_id loop do msg = @messages.pop(timeout:) # Timeout individual pop to avoid blocking forever raise Timeout::Error if msg.nil? && timeout return msg.body if msg.properties.correlation_id == correlation_id @messages.push msg end end rescue Timeout::Error raise Timeout::Error, "No response received in #{timeout} seconds" end |
#close ⇒ Object
Closes the channel used by the RPCClient
50 51 52 53 |
# File 'lib/amqp/client/rpc_client.rb', line 50 def close @ch.close @messages.close end |
#start ⇒ self
Start listening for responses from the RPC calls
17 18 19 20 21 22 |
# File 'lib/amqp/client/rpc_client.rb', line 17 def start @ch.basic_consume("amq.rabbitmq.reply-to") do |msg| @messages.push msg end self end |