Class: AMQP::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/consumer.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/rpc_client.rb,
lib/amqp/client/frame_bytes.rb,
lib/amqp/client/configuration.rb,
lib/amqp/client/message_codecs.rb,
lib/amqp/client/message_codec_registry.rb

Overview

AMQP 0-9-1 Client

See Also:

Defined Under Namespace

Modules: Coders, Parsers Classes: Configuration, Connection, Consumer, Error, Exchange, Message, MessageCodecRegistry, Properties, Queue, RPCClient, ReturnMessage

Constant Summary collapse

VERSION =

Version of the client library

"2.0.0"

RPC collapse

Connect and disconnect collapse

High level objects collapse

Publish collapse

Queue actions collapse

Exchange actions collapse

RPC collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", **options) ⇒ Client

Create a new Client object, this won’t establish a connection yet, use #connect or #start for that

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maximum number of channels the client will be allowed to have open. Maximum allowed is 65_536. The smallest of the client’s and the broker’s value will be used.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/amqp/client.rb', line 35

def initialize(uri = "", **options)
  @uri = uri
  @options = options
  @queues = {}
  @exchanges = {}
  @consumers = {}
  @next_consumer_id = 0
  @connq = SizedQueue.new(1)
  @codec_registry = self.class.codec_registry.dup
  @strict_coding = self.class.config.strict_coding
  @default_content_encoding = self.class.config.default_content_encoding
  @default_content_type = self.class.config.default_content_type
  @start_lock = Mutex.new
  @supervisor_started = false
  @stopped = false
end

Class Attribute Details

.codec_registryMessageCodecRegistry (readonly)

Get the class-level codec registry



506
507
508
# File 'lib/amqp/client.rb', line 506

def codec_registry
  @codec_registry
end

.configConfiguration (readonly)

Get the class-level configuration

Returns:



502
503
504
# File 'lib/amqp/client.rb', line 502

def config
  @config
end

Instance Attribute Details

#codec_registryMessageCodecRegistry (readonly)

Get the codec registry for this instance



525
526
527
# File 'lib/amqp/client.rb', line 525

def codec_registry
  @codec_registry
end

#default_content_encodingString?

Get/set the default content_encoding to use when publishing messages

Returns:

  • (String, nil)


536
537
538
# File 'lib/amqp/client.rb', line 536

def default_content_encoding
  @default_content_encoding
end

#default_content_typeString?

Get/set the default content_type to use when publishing messages

Returns:

  • (String, nil)


532
533
534
# File 'lib/amqp/client.rb', line 532

def default_content_type
  @default_content_type
end

#strict_codingObject

Get/set if condig should be strict, i.e. if the client should raise on unknown codecs



528
529
530
# File 'lib/amqp/client.rb', line 528

def strict_coding
  @strict_coding
end

Class Method Details

.configure {|Configuration| ... } ⇒ Configuration

Configure the AMQP::Client class-level settings

Examples:

AMQP::Client.configure do |config|
  config.default_content_type = "application/json"
  config.strict_coding = true
end

Yields:

  • (Configuration)

    Yields the configuration object for modification

Returns:



495
496
497
498
# File 'lib/amqp/client.rb', line 495

def configure
  yield @config if block_given?
  @config
end

Instance Method Details

#bind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil

Bind a queue to an exchange

Parameters:

  • queue (String)

    Name of the queue to bind

  • exchange (String)

    Name of the exchange to bind to

  • binding_key (String) (defaults to: "")

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (nil)


345
346
347
348
349
# File 'lib/amqp/client.rb', line 345

def bind(queue:, exchange:, binding_key: "", arguments: {})
  with_connection do |conn|
    conn.channel(1).queue_bind(queue, exchange:, binding_key:, arguments:)
  end
end

#connect(read_loop_thread: true) ⇒ Connection

Establishes and returns a new AMQP connection

Examples:

connection = AMQP::Client.new("amqps://server.rmq.cloudamqp.com", connection_name: "My connection").connect

Returns:

See Also:



59
60
61
# File 'lib/amqp/client.rb', line 59

def connect(read_loop_thread: true)
  Connection.new(@uri, read_loop_thread:, codec_registry: @codec_registry, strict_coding: @strict_coding, **@options)
end

#default_exchangeExchange Also known as: default

Return a high level Exchange object for the default direct exchange

Returns:

See Also:



201
202
203
# File 'lib/amqp/client.rb', line 201

def default_exchange(**)
  direct("", **)
end

#delete_exchange(name) ⇒ nil

Delete an exchange

Parameters:

  • name (String)

    Name of the exchange

Returns:

  • (nil)


415
416
417
418
419
420
421
# File 'lib/amqp/client.rb', line 415

def delete_exchange(name)
  with_connection do |conn|
    conn.channel(1).exchange_delete(name)
    @exchanges.delete(name)
    nil
  end
end

#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer

Delete a queue

Parameters:

  • name (String)

    Name of the queue

  • if_unused (Boolean) (defaults to: false)

    Only delete if the queue doesn’t have consumers, raises a ChannelClosed error otherwise

  • if_empty (Boolean) (defaults to: false)

    Only delete if the queue is empty, raises a ChannelClosed error otherwise

Returns:

  • (Integer)

    Number of messages in the queue when deleted



377
378
379
380
381
382
383
# File 'lib/amqp/client.rb', line 377

def delete_queue(name, if_unused: false, if_empty: false)
  with_connection do |conn|
    msgs = conn.channel(1).queue_delete(name, if_unused:, if_empty:)
    @queues.delete(name)
    msgs
  end
end

#direct_exchange(name = "amq.direct") ⇒ Exchange Also known as: direct

Declare a direct exchange and return a high level Exchange object

Parameters:

  • name (String) (defaults to: "amq.direct")

    Name of the exchange (defaults to “amq.direct”)

Returns:

See Also:



185
186
187
188
189
190
191
192
# File 'lib/amqp/client.rb', line 185

def direct_exchange(name = "amq.direct", **)
  return exchange(name, type: "direct", **) unless name.empty?

  # Return the default exchange
  @exchanges.fetch(name) do
    @exchanges[name] = Exchange.new(self, name)
  end
end

#exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange

Declare an exchange and return a high level Exchange object

Examples:

amqp = AMQP::Client.new.start
x = amqp.exchange("my.hash.exchange", type: "x-consistent-hash")
x.publish("body", routing_key: "routing-key")

Parameters:

  • name (String)

    Name of the exchange

  • type (String)

    Type of the exchange, one of “direct”, “fanout”, “topic”, “headers” or custom exchange type

  • durable (Boolean) (defaults to: true)

    If true the exchange will survive broker restarts

  • auto_delete (Boolean) (defaults to: false)

    If true the exchange will be deleted when the last queue is unbound

  • internal (Boolean) (defaults to: false)

    If true the exchange will not accept directly published messages

  • arguments (Hash) (defaults to: {})

    Custom arguments such as alternate-exchange etc.

Returns:



172
173
174
175
176
177
178
179
# File 'lib/amqp/client.rb', line 172

def exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {})
  @exchanges.fetch(name) do
    with_connection do |conn|
      conn.channel(1).exchange_declare(name, type:, durable:, auto_delete:, internal:, arguments:)
    end
    @exchanges[name] = Exchange.new(self, name)
  end
end

#exchange_bind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil

Bind an exchange to an exchange

Parameters:

  • source (String)

    Name of the exchange to bind to

  • destination (String)

    Name of the exchange to bind

  • binding_key (String) (defaults to: "")

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (nil)


394
395
396
397
398
# File 'lib/amqp/client.rb', line 394

def exchange_bind(source:, destination:, binding_key: "", arguments: {})
  with_connection do |conn|
    conn.channel(1).exchange_bind(destination:, source:, binding_key:, arguments:)
  end
end

#exchange_unbind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil

Unbind an exchange from an exchange

Parameters:

  • source (String)

    Name of the exchange to unbind from

  • destination (String)

    Name of the exchange to unbind

  • binding_key (String) (defaults to: "")

    Binding key which the exchange is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


406
407
408
409
410
# File 'lib/amqp/client.rb', line 406

def exchange_unbind(source:, destination:, binding_key: "", arguments: {})
  with_connection do |conn|
    conn.channel(1).exchange_unbind(destination:, source:, binding_key:, arguments:)
  end
end

#fanout_exchange(name = "amq.fanout") ⇒ Exchange Also known as: fanout

Declare a fanout exchange and return a high level Exchange object

Parameters:

  • name (String) (defaults to: "amq.fanout")

    Name of the exchange (defaults to “amq.fanout”)

Returns:

See Also:



213
214
215
# File 'lib/amqp/client.rb', line 213

def fanout_exchange(name = "amq.fanout", **)
  exchange(name, type: "fanout", **)
end

#get(queue, no_ack: false) ⇒ Message?

Get a message from a queue

Parameters:

  • queue (String)

    Name of the queue to get the message from

  • no_ack (Boolean) (defaults to: false)

    When false the message has to be manually acknowledged (or rejected) (default: false)

Returns:

  • (Message, nil)

    The message from the queue or nil if the queue is empty



332
333
334
335
336
337
# File 'lib/amqp/client.rb', line 332

def get(queue, no_ack: false)
  with_connection do |conn|
    ch = conn.channel
    ch.basic_get(queue, no_ack:)
  end
end

#headers_exchange(name = "amq.headers") ⇒ Exchange Also known as: headers

Declare a headers exchange and return a high level Exchange object

Parameters:

  • name (String) (defaults to: "amq.headers")

    Name of the exchange (defaults to “amq.headers”)

Returns:

See Also:



237
238
239
# File 'lib/amqp/client.rb', line 237

def headers_exchange(name = "amq.headers", **)
  exchange(name, type: "headers", **)
end

#publish(body, exchange:, routing_key: "", **properties) ⇒ nil

Publish a (persistent) message and wait for confirmation

Parameters:

  • body (Object)

    The message body will be encoded if any matching codec is found in the client’s codec registry

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String) (defaults to: "")

    Routing key for the message

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (nil)

Raises:



259
260
261
262
263
264
265
266
267
268
269
# File 'lib/amqp/client.rb', line 259

def publish(body, exchange:, routing_key: "", **properties)
  with_connection do |conn|
    properties[:delivery_mode] ||= 2
    properties = default_content_properties.merge(properties)
    body = serialize_and_encode_body(body, properties)
    result = conn.channel(1).basic_publish_confirm(body, exchange:, routing_key:, **properties)
    raise Error::PublishNotConfirmed unless result

    nil
  end
end

#publish_and_forget(body, exchange:, routing_key: "", **properties) ⇒ nil

Publish a (persistent) message but don’t wait for a confirmation

Parameters:

  • body (String)

    The body

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String) (defaults to: "")

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (nil)

Raises:



278
279
280
281
282
283
284
285
# File 'lib/amqp/client.rb', line 278

def publish_and_forget(body, exchange:, routing_key: "", **properties)
  with_connection do |conn|
    properties[:delivery_mode] ||= 2
    properties = default_content_properties.merge(properties)
    body = serialize_and_encode_body(body, properties)
    conn.channel(1).basic_publish(body, exchange:, routing_key:, **properties)
  end
end

#purge(queue) ⇒ nil

Purge a queue

Parameters:

  • queue (String)

    Name of the queue

Returns:

  • (nil)


366
367
368
369
370
# File 'lib/amqp/client.rb', line 366

def purge(queue)
  with_connection do |conn|
    conn.channel(1).queue_purge(queue)
  end
end

#queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) ⇒ Queue

Declare a queue

Examples:

amqp = AMQP::Client.new.start
q = amqp.queue("foobar")
q.publish("body")

Parameters:

  • name (String)

    Name of the queue

  • durable (Boolean) (defaults to: true)

    If true the queue will survive broker restarts, messages in the queue will only survive if they are published as persistent

  • auto_delete (Boolean) (defaults to: false)

    If true the queue will be deleted when the last consumer stops consuming (it won’t be deleted until at least one consumer has consumed from it)

  • exclusive (Boolean) (defaults to: false)

    If true the queue will be deleted when the connection is closed

  • passive (Boolean) (defaults to: false)

    If true an exception will be raised if the queue doesn’t already exists

  • arguments (Hash) (defaults to: {})

    Custom arguments, such as queue-ttl etc.

Returns:

Raises:

  • (ArgumentError)


149
150
151
152
153
154
155
156
157
158
# File 'lib/amqp/client.rb', line 149

def queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {})
  raise ArgumentError, "Currently only supports named, durable queues" if name.empty?

  @queues.fetch(name) do
    with_connection do |conn|
      conn.channel(1).queue_declare(name, durable:, auto_delete:, exclusive:, passive:, arguments:)
    end
    @queues[name] = Queue.new(self, name)
  end
end

#rpc_call(method, arguments, timeout: nil, **properties) ⇒ String

Do a RPC call, sends a messages, waits for a response

Parameters:

  • method (String, Symbol)

    name of the RPC method to call (i.e. queue name on the server side)

  • arguments (String)

    arguments/body to the call

  • timeout (Numeric, nil) (defaults to: nil)

    Number of seconds to wait for a response

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (String)

    Returns the result from the call

Raises:

  • (Timeout::Error)

    if no response is received within the timeout period



461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/amqp/client.rb', line 461

def rpc_call(method, arguments, timeout: nil, **properties)
  ch = with_connection(&:channel)
  begin
    msg = ch.basic_consume_once("amq.rabbitmq.reply-to", timeout:) do
      properties = default_content_properties.merge(properties)
      body = serialize_and_encode_body(arguments, properties)
      ch.basic_publish(body, exchange: "", routing_key: method.to_s,
                             reply_to: "amq.rabbitmq.reply-to", **properties)
    end
    msg.parse
  ensure
    ch.close
  end
end

#rpc_clientRPCClient

Create a reusable RPC client

Returns:



478
479
480
481
# File 'lib/amqp/client.rb', line 478

def rpc_client
  ch = with_connection(&:channel)
  RPCClient.new(ch).start
end

#rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}) {|The| ... } ⇒ Consumer

Create a RPC server for a single method/function/procedure

Parameters:

  • method (String, Symbol)

    name of the RPC method to host (i.e. queue name on the server side)

  • worker_threads (Integer) (defaults to: 1)

    number of threads that process requests

  • durable (Boolean) (defaults to: true)

    If true the queue will survive broker restarts

  • auto_delete (Boolean) (defaults to: false)

    If true the queue will be deleted when the last consumer stops consuming (it won’t be deleted until at least one consumer has consumed from it)

  • arguments (Hash) (defaults to: {})

    Custom arguments, such as queue-ttl etc.

Yields:

  • Block that processes the RPC request messages

Yield Parameters:

  • The (String)

    body of the request message

Yield Returns:

  • (String)

    The response message body

Returns:

  • (Consumer)

    The consumer object, which can be used to cancel the consumer



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/amqp/client.rb', line 437

def rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}, &_)
  queue(method.to_s, durable:, auto_delete:, arguments:)
    .subscribe(prefetch: worker_threads, worker_threads:) do |msg|
      result = yield msg.parse
      properties = { content_type: msg.properties.content_type,
                     content_encoding: msg.properties.content_encoding }
      result_body = serialize_and_encode_body(result, properties)

      msg.channel.basic_publish(result_body, exchange: "", routing_key: msg.properties.reply_to,
                                             correlation_id: msg.properties.correlation_id, **properties)
      msg.ack
    rescue StandardError
      msg.reject(requeue: false)
      raise
    end
end

#startself

Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first

Examples:

amqp = AMQP::Client.new("amqps://server.rmq.cloudamqp.com")
amqp.start
amqp.queue("foobar")

Returns:

  • (self)


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/amqp/client.rb', line 69

def start
  return self if started?

  @start_lock.synchronize do # rubocop:disable Metrics/BlockLength
    return self if started?

    @supervisor_started = true
    @stopped = false
    Thread.new(connect(read_loop_thread: false)) do |conn|
      Thread.current.abort_on_exception = true # Raising an unhandled exception is a bug
      loop do
        break if @stopped

        conn ||= connect(read_loop_thread: false)

        Thread.new do
          # restore connection in another thread, read_loop have to run
          conn.channel(1) # reserve channel 1 for publishes
          @consumers.each_value do |consumer|
            ch = conn.channel
            ch.basic_qos(consumer.prefetch)
            consume_ok = ch.basic_consume(consumer.queue,
                                          **consumer.basic_consume_args,
                                          &consumer.block)
            # Update the consumer with new channel and consume_ok metadata
            consumer.update_consume_ok(consume_ok)
          end
          @connq << conn
          # Remove consumers whose internal queues were already closed (e.g. cancelled during reconnect window)
          @consumers.delete_if { |_, c| c.closed? }
        end
        conn.read_loop # blocks until connection is closed, then reconnect
      rescue Error => e
        warn "AMQP-Client reconnect error: #{e.inspect}"
        sleep @options[:reconnect_interval] || 1
      ensure
        @connq.clear
        conn = nil
      end
    end
  end
  self
end

#started?Boolean

Check if the client is connected

Returns:

  • (Boolean)

    true if connected or currently trying to connect, false otherwise



128
129
130
# File 'lib/amqp/client.rb', line 128

def started?
  @supervisor_started && !@stopped
end

#stopnil

Close the currently open connection and stop the supervision / reconnection logic.

Returns:

  • (nil)


115
116
117
118
119
120
121
122
123
124
# File 'lib/amqp/client.rb', line 115

def stop
  return if @stopped && !@supervisor_started

  @stopped = true
  return unless @connq.size.positive?

  conn = @connq.pop
  conn.close
  nil
end

#subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer

Consume messages from a queue

Parameters:

  • queue (String)

    Name of the queue to subscribe to

  • no_ack (Boolean) (defaults to: false)

    When false messages have to be manually acknowledged (or rejected) (default: false)

  • prefetch (Integer) (defaults to: 1)

    Specify how many messages to prefetch for consumers with no_ack is false (default: 1)

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages (default: 1)

  • on_cancel (Proc) (defaults to: nil)

    Optional proc that will be called if the consumer is cancelled by the broker The proc will be called with the consumer tag as the only argument

  • arguments (Hash) (defaults to: {})

    Custom arguments to the consumer

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (Consumer)

    The consumer object, which can be used to cancel the consumer

Raises:

  • (ArgumentError)


307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/amqp/client.rb', line 307

def subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1,
              on_cancel: nil, arguments: {}, &blk)
  raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0

  with_connection do |conn|
    ch = conn.channel
    ch.basic_qos(prefetch)
    consumer_id = @next_consumer_id += 1
    on_cancel_proc = proc do |tag|
      @consumers.delete(consumer_id)
      on_cancel&.call(tag)
    end
    basic_consume_args = { exclusive:, no_ack:, worker_threads:, on_cancel: on_cancel_proc, arguments: }
    consume_ok = ch.basic_consume(queue, **basic_consume_args, &blk)
    consumer = Consumer.new(client: self, channel_id: ch.id, id: consumer_id, block: blk,
                            queue:, consume_ok:, prefetch:, basic_consume_args:)
    @consumers[consumer_id] = consumer
    consumer
  end
end

#topic_exchange(name = "amq.topic") ⇒ Exchange Also known as: topic

Declare a topic exchange and return a high level Exchange object

Parameters:

  • name (String) (defaults to: "amq.topic")

    Name of the exchange (defaults to “amq.topic”)

Returns:

See Also:



225
226
227
# File 'lib/amqp/client.rb', line 225

def topic_exchange(name = "amq.topic", **)
  exchange(name, type: "topic", **)
end

#unbind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil

Unbind a queue from an exchange

Parameters:

  • queue (String)

    Name of the queue to unbind

  • exchange (String)

    Name of the exchange to unbind from

  • binding_key (String) (defaults to: "")

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


357
358
359
360
361
# File 'lib/amqp/client.rb', line 357

def unbind(queue:, exchange:, binding_key: "", arguments: {})
  with_connection do |conn|
    conn.channel(1).queue_unbind(queue, exchange:, binding_key:, arguments:)
  end
end

#wait_for_confirmsBoolean

Wait for unconfirmed publishes

Returns:

  • (Boolean)

    True if successful, false if any message negatively acknowledged



289
290
291
292
293
# File 'lib/amqp/client.rb', line 289

def wait_for_confirms
  with_connection do |conn|
    conn.channel(1).wait_for_confirms
  end
end

#with_connectionObject



540
541
542
543
544
545
546
547
548
549
550
551
552
553
# File 'lib/amqp/client.rb', line 540

def with_connection
  conn = nil
  loop do
    conn = @connq.pop
    next if conn.closed?

    break
  end
  begin
    yield conn
  ensure
    @connq << conn unless conn.closed?
  end
end