Class: AMQP::Client::Queue
- Inherits:
-
Object
- Object
- AMQP::Client::Queue
- Defined in:
- lib/amqp/client/queue.rb
Overview
Queue abstraction
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#bind(exchange, binding_key: "", arguments: {}) ⇒ self
Bind the queue to an exchange.
-
#delete ⇒ nil
Delete the queue.
-
#get(no_ack: false) ⇒ Message?
Get a message from the queue.
-
#publish(body, **properties) ⇒ Queue
Publish to the queue, wait for confirm.
-
#publish_and_forget(body, **properties) ⇒ Queue
Publish to the queue, without waiting for confirm.
-
#purge ⇒ self
Purge/empty the queue.
-
#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Subscribe/consume from the queue.
-
#unbind(exchange, binding_key: "", arguments: {}) ⇒ self
Unbind the queue from an exchange.
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/amqp/client/queue.rb', line 9 def name @name end |
Instance Method Details
#bind(exchange, binding_key: "", arguments: {}) ⇒ self
Bind the queue to an exchange
78 79 80 81 82 |
# File 'lib/amqp/client/queue.rb', line 78 def bind(exchange, binding_key: "", arguments: {}) exchange = exchange.name unless exchange.is_a?(String) @client.bind(queue: @name, exchange:, binding_key:, arguments:) self end |
#delete ⇒ nil
Delete the queue
104 105 106 107 |
# File 'lib/amqp/client/queue.rb', line 104 def delete @client.delete_queue(@name) nil end |
#get(no_ack: false) ⇒ Message?
Get a message from the queue
69 70 71 |
# File 'lib/amqp/client/queue.rb', line 69 def get(no_ack: false) @client.get(@name, no_ack:) end |
#publish(body, **properties) ⇒ Queue
Publish to the queue, wait for confirm
24 25 26 27 |
# File 'lib/amqp/client/queue.rb', line 24 def publish(body, **properties) @client.publish(body, exchange: "", routing_key: @name, **properties) self end |
#publish_and_forget(body, **properties) ⇒ Queue
Publish to the queue, without waiting for confirm
34 35 36 37 |
# File 'lib/amqp/client/queue.rb', line 34 def publish_and_forget(body, **properties) @client.publish_and_forget(body, exchange: "", routing_key: @name, **properties) self end |
#purge ⇒ self
Purge/empty the queue
97 98 99 100 |
# File 'lib/amqp/client/queue.rb', line 97 def purge @client.purge(@name) self end |
#subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Subscribe/consume from the queue
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/amqp/client/queue.rb', line 55 def subscribe(no_ack: false, exclusive: false, prefetch: 1, worker_threads: 1, requeue_on_reject: true, on_cancel: nil, arguments: {}) @client.subscribe(@name, no_ack:, exclusive:, prefetch:, worker_threads:, on_cancel:, arguments:) do || yield .ack unless no_ack rescue StandardError => e .reject(requeue: requeue_on_reject) unless no_ack raise e end end |
#unbind(exchange, binding_key: "", arguments: {}) ⇒ self
Unbind the queue from an exchange
89 90 91 92 93 |
# File 'lib/amqp/client/queue.rb', line 89 def unbind(exchange, binding_key: "", arguments: {}) exchange = exchange.name unless exchange.is_a?(String) @client.unbind(queue: @name, exchange:, binding_key:, arguments:) self end |