Class: AMQP::Client::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/queue.rb

Overview

Queue abstraction

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/amqp/client/queue.rb', line 9

def name
  @name
end

Instance Method Details

#bind(exchange, binding_key: "", arguments: {}) ⇒ self

Bind the queue to an exchange

Parameters:

  • exchange (String, Exchange)

    Name of the exchange to bind to, or the exchange object itself

  • binding_key (String) (defaults to: "")

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (self)


78
79
80
81
82
# File 'lib/amqp/client/queue.rb', line 78

def bind(exchange, binding_key: "", arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.bind(queue: @name, exchange:, binding_key:, arguments:)
  self
end

#deletenil

Delete the queue

Returns:

  • (nil)


104
105
106
107
# File 'lib/amqp/client/queue.rb', line 104

def delete
  @client.delete_queue(@name)
  nil
end

#get(no_ack: false) ⇒ Message?

Get a message from the queue

Parameters:

  • no_ack (Boolean) (defaults to: false)

    When false the message has to be manually acknowledged (or rejected) (default: false)

Returns:

  • (Message, nil)

    The message from the queue or nil if the queue is empty



69
70
71
# File 'lib/amqp/client/queue.rb', line 69

def get(no_ack: false)
  @client.get(@name, no_ack:)
end

#publish(body, **properties) ⇒ Queue

Publish to the queue, wait for confirm

Parameters:

  • body (Object)

    The message body will be encoded if any matching codec is found in the client’s codec registry

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

Raises:



24
25
26
27
# File 'lib/amqp/client/queue.rb', line 24

def publish(body, **properties)
  @client.publish(body, exchange: "", routing_key: @name, **properties)
  self
end

#publish_and_forget(body, **properties) ⇒ Queue

Publish to the queue, without waiting for confirm

Parameters:

  • body (Object)

    The message body will be encoded if any matching codec is found in the client’s codec registry

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

Raises:



34
35
36
37
# File 'lib/amqp/client/queue.rb', line 34

def publish_and_forget(body, **properties)
  @client.publish_and_forget(body, exchange: "", routing_key: @name, **properties)
  self
end

#purgeself

Purge/empty the queue

Returns:

  • (self)


97
98
99
100
# File 'lib/amqp/client/queue.rb', line 97

def purge
  @client.purge(@name)
  self
end

#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer

Subscribe/consume from the queue

Parameters:

  • no_ack (Boolean) (defaults to: false)

    If true, messages are automatically acknowledged by the server upon delivery. If false, messages are acknowledged only after the block completes successfully; if the block raises an exception, the message is rejected and can be optionally requeued. You can of course handle the ack/reject in the block yourself. (Default: false)

  • exclusive (Boolean) (defaults to: false)

    When true only a single consumer can consume from the queue at a time

  • prefetch (Integer) (defaults to: 1)

    Specify how many messages to prefetch for consumers with no_ack is false

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages, 0 means that the thread calling this method will be blocked

  • requeue_on_reject (Boolean) (defaults to: true)

    If true, messages that are rejected due to an exception in the block will be requeued. Only relevant if no_ack is false. (Default: true)

  • on_cancel (Proc) (defaults to: nil)

    Optional proc that will be called if the consumer is cancelled by the broker The proc will be called with the consumer tag as the only argument

  • arguments (Hash) (defaults to: {})

    Custom arguments to the consumer

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (Consumer)

    The consumer object, which can be used to cancel the consumer



55
56
57
58
59
60
61
62
63
64
# File 'lib/amqp/client/queue.rb', line 55

def subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true,
              on_cancel: nil, arguments: {})
  @client.subscribe(@name, no_ack:, exclusive:, prefetch:, worker_threads:, on_cancel:, arguments:) do |message|
    yield message
    message.ack unless no_ack
  rescue StandardError => e
    message.reject(requeue: requeue_on_reject) unless no_ack
    raise e
  end
end

#unbind(exchange, binding_key: "", arguments: {}) ⇒ self

Unbind the queue from an exchange

Parameters:

  • exchange (String, Exchange)

    Name of the exchange to unbind from, or the exchange object itself

  • binding_key (String) (defaults to: "")

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (self)


89
90
91
92
93
# File 'lib/amqp/client/queue.rb', line 89

def unbind(exchange, binding_key: "", arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.unbind(queue: @name, exchange:, binding_key:, arguments:)
  self
end