Class: AMQP::Client::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/queue.rb

Overview

Queue abstraction

Instance Method Summary collapse

Instance Method Details

#bind(exchange, binding_key, arguments: {}) ⇒ self

Bind the queue to an exchange

Parameters:

  • exchange (String)

    Name of the exchange to bind to

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on (only relevant for header exchanges)

Returns:

  • (self)


42
43
44
45
# File 'lib/amqp/client/queue.rb', line 42

def bind(exchange, binding_key, arguments: {})
  @client.bind(@name, exchange, binding_key, arguments: arguments)
  self
end

#deletenil

Delete the queue

Returns:

  • (nil)


66
67
68
69
# File 'lib/amqp/client/queue.rb', line 66

def delete
  @client.delete_queue(@name)
  nil
end

#publish(body, **properties) ⇒ self

Publish to the queue, wait for confirm

Parameters:

  • body (String)

    The body, can be a string or a byte array

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String)

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (Integer)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (self)


19
20
21
22
# File 'lib/amqp/client/queue.rb', line 19

def publish(body, **properties)
  @client.publish(body, "", @name, **properties)
  self
end

#purgeself

Purge/empty the queue

Returns:

  • (self)


59
60
61
62
# File 'lib/amqp/client/queue.rb', line 59

def purge
  @client.purge(@name)
  self
end

#subscribe(no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ self

Subscribe/consume from the queue

Parameters:

  • no_ack (Boolean) (defaults to: false)

    When false messages have to be manually acknowledged (or rejected)

  • prefetch (Integer) (defaults to: 1)

    Specify how many messages to prefetch for consumers with no_ack is false

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages, 0 means that the thread calling this method will be blocked

  • arguments (Hash) (defaults to: {})

    Custom arguments to the consumer

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (self)


32
33
34
35
# File 'lib/amqp/client/queue.rb', line 32

def subscribe(no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk)
  @client.subscribe(@name, no_ack: no_ack, prefetch: prefetch, worker_threads: worker_threads, arguments: arguments, &blk)
  self
end

#unbind(exchange, binding_key, arguments: {}) ⇒ self

Unbind the queue from an exchange

Parameters:

  • exchange (String)

    Name of the exchange to unbind from

  • binding_key (String)

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (self)


52
53
54
55
# File 'lib/amqp/client/queue.rb', line 52

def unbind(exchange, binding_key, arguments: {})
  @client.unbind(@name, exchange, binding_key, arguments: arguments)
  self
end