Class: AMQP::Client::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/queue.rb

Overview

Queue abstraction

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/amqp/client/queue.rb', line 7

def name
  @name
end

Instance Method Details

#bind(exchange, binding_key, arguments: {}) ⇒ self

Bind the queue to an exchange

Parameters:

  • exchange (String, Exchange)

    Name of the exchange to bind to, or the exchange object itself

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (self)


54
55
56
57
58
# File 'lib/amqp/client/queue.rb', line 54

def bind(exchange, binding_key, arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.bind(@name, exchange, binding_key, arguments:)
  self
end

#deletenil

Delete the queue

Returns:

  • (nil)


80
81
82
83
# File 'lib/amqp/client/queue.rb', line 80

def delete
  @client.delete_queue(@name)
  nil
end

#publish(body, **properties) ⇒ self

Publish to the queue, wait for confirm

Parameters:

  • body (String)

    The body, can be a string or a byte array

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String)

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (String)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (self)


21
22
23
24
# File 'lib/amqp/client/queue.rb', line 21

def publish(body, **properties)
  @client.publish(body, "", @name, **properties)
  self
end

#purgeself

Purge/empty the queue

Returns:

  • (self)


73
74
75
76
# File 'lib/amqp/client/queue.rb', line 73

def purge
  @client.purge(@name)
  self
end

#subscribe(no_ack: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, arguments: {}) {|Message| ... } ⇒ self

Subscribe/consume from the queue

Parameters:

  • no_ack (Boolean) (defaults to: false)

    If true, messages are automatically acknowledged by the server upon delivery. If false, messages are acknowledged only after the block completes successfully; if the block raises an exception, the message is rejected and can be optionally requeued. (Default: false)

  • prefetch (Integer) (defaults to: 1)

    Specify how many messages to prefetch for consumers with no_ack is false

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages, 0 means that the thread calling this method will be blocked

  • requeue_on_reject (Boolean) (defaults to: true)

    If true, messages that are rejected due to an exception in the block will be requeued. Only relevant if no_ack is false. (Default: true)

  • arguments (Hash) (defaults to: {})

    Custom arguments to the consumer

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (self)


38
39
40
41
42
43
44
45
46
47
# File 'lib/amqp/client/queue.rb', line 38

def subscribe(no_ack: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, arguments: {})
  @client.subscribe(@name, no_ack:, prefetch:, worker_threads:, arguments:) do |message|
    yield message
    message.ack unless no_ack
  rescue StandardError => e
    message.reject(requeue: requeue_on_reject) unless no_ack
    raise e
  end
  self
end

#unbind(exchange, binding_key, arguments: {}) ⇒ self

Unbind the queue from an exchange

Parameters:

  • exchange (String, Exchange)

    Name of the exchange to unbind from, or the exchange object itself

  • binding_key (String)

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (self)


65
66
67
68
69
# File 'lib/amqp/client/queue.rb', line 65

def unbind(exchange, binding_key, arguments: {})
  exchange = exchange.name unless exchange.is_a?(String)
  @client.unbind(@name, exchange, binding_key, arguments:)
  self
end