Class: AMQP::Client::RPCClient

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/rpc_client.rb

Overview

Reusable RPC client, when RPC performance is important

Instance Method Summary collapse

Constructor Details

#initialize(channel) ⇒ RPCClient

Returns a new instance of RPCClient.

Parameters:



8
9
10
11
12
13
# File 'lib/amqp/client/rpc_client.rb', line 8

def initialize(channel)
  @ch = channel
  @correlation_id = 0
  @lock = Mutex.new
  @messages = ::Queue.new
end

Instance Method Details

#call(method, arguments, timeout: nil, **properties) ⇒ String

Do a RPC call, sends a messages, waits for a response

Parameters:

  • method (String, Symbol)

    name of the method to call (i.e. queue name on the server side)

  • arguments (String)

    arguments/body to the call

  • timeout (Numeric, nil) (defaults to: nil)

    Number of seconds to wait for a response

  • properties (Hash)

    a customizable set of options

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (String)

    Returns the result from the call

Raises:

  • (Timeout::Error)

    if no response is received within the timeout period



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/amqp/client/rpc_client.rb', line 31

def call(method, arguments, timeout: nil, **properties)
  correlation_id = @lock.synchronize { @correlation_id += 1 }.to_s(36)
  @ch.basic_publish(arguments, exchange: "", routing_key: method.to_s,
                               reply_to: "amq.rabbitmq.reply-to", correlation_id:, **properties)
  Timeout.timeout(timeout) do # Timeout the whole loop if we never find the right correlation_id
    loop do
      msg = @messages.pop(timeout:) # Timeout individual pop to avoid blocking forever
      raise Timeout::Error if msg.nil? && timeout

      return msg.body if msg.properties.correlation_id == correlation_id

      @messages.push msg
    end
  end
rescue Timeout::Error
  raise Timeout::Error, "No response received in #{timeout} seconds"
end

#closeObject

Closes the channel used by the RPCClient



50
51
52
53
# File 'lib/amqp/client/rpc_client.rb', line 50

def close
  @ch.close
  @messages.close
end

#startself

Start listening for responses from the RPC calls

Returns:

  • (self)


17
18
19
20
21
22
# File 'lib/amqp/client/rpc_client.rb', line 17

def start
  @ch.basic_consume("amq.rabbitmq.reply-to") do |msg|
    @messages.push msg
  end
  self
end