Class: AMQP::Client::Queue
- Inherits:
-
Object
- Object
- AMQP::Client::Queue
- Defined in:
- lib/amqp/client/queue.rb
Overview
Queue abstraction
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#bind(exchange, binding_key, arguments: {}) ⇒ self
Bind the queue to an exchange.
-
#delete ⇒ nil
Delete the queue.
-
#publish(body, **properties) ⇒ self
Publish to the queue, wait for confirm.
-
#purge ⇒ self
Purge/empty the queue.
-
#subscribe(no_ack: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, arguments: {}) {|Message| ... } ⇒ self
Subscribe/consume from the queue.
-
#unbind(exchange, binding_key, arguments: {}) ⇒ self
Unbind the queue from an exchange.
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/amqp/client/queue.rb', line 7 def name @name end |
Instance Method Details
#bind(exchange, binding_key, arguments: {}) ⇒ self
Bind the queue to an exchange
54 55 56 57 58 |
# File 'lib/amqp/client/queue.rb', line 54 def bind(exchange, binding_key, arguments: {}) exchange = exchange.name unless exchange.is_a?(String) @client.bind(@name, exchange, binding_key, arguments:) self end |
#delete ⇒ nil
Delete the queue
80 81 82 83 |
# File 'lib/amqp/client/queue.rb', line 80 def delete @client.delete_queue(@name) nil end |
#publish(body, **properties) ⇒ self
Publish to the queue, wait for confirm
21 22 23 24 |
# File 'lib/amqp/client/queue.rb', line 21 def publish(body, **properties) @client.publish(body, "", @name, **properties) self end |
#purge ⇒ self
Purge/empty the queue
73 74 75 76 |
# File 'lib/amqp/client/queue.rb', line 73 def purge @client.purge(@name) self end |
#subscribe(no_ack: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, arguments: {}) {|Message| ... } ⇒ self
Subscribe/consume from the queue
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/amqp/client/queue.rb', line 38 def subscribe(no_ack: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, arguments: {}) @client.subscribe(@name, no_ack:, prefetch:, worker_threads:, arguments:) do || yield .ack unless no_ack rescue StandardError => e .reject(requeue: requeue_on_reject) unless no_ack raise e end self end |
#unbind(exchange, binding_key, arguments: {}) ⇒ self
Unbind the queue from an exchange
65 66 67 68 69 |
# File 'lib/amqp/client/queue.rb', line 65 def unbind(exchange, binding_key, arguments: {}) exchange = exchange.name unless exchange.is_a?(String) @client.unbind(@name, exchange, binding_key, arguments:) self end |