Class: AMQP::Client::Connection::Channel
- Inherits:
-
Object
- Object
- AMQP::Client::Connection::Channel
- Defined in:
- lib/amqp/client/channel.rb
Overview
AMQP Channel
Defined Under Namespace
Classes: QueueOk
Instance Attribute Summary collapse
-
#id ⇒ Integer
readonly
Channel ID.
Exchange collapse
-
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to another exchange.
-
#exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange.
-
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange.
-
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from another exchange.
Queue collapse
-
#queue_bind(name, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent).
-
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue.
-
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue.
-
#queue_unbind(name, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Basic collapse
-
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message.
-
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer.
-
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue.
-
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling).
-
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message.
-
#basic_publish(body, exchange, routing_key, **properties) ⇒ nil
Publishes a message to an exchange.
-
#basic_publish_confirm(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it.
-
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`.
-
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages.
-
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message.
Confirm collapse
-
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker.
-
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed.
Transaction collapse
-
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode.
-
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode.
-
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish.
Instance Method Summary collapse
-
#close(reason: "", code: 200) ⇒ nil
Gracefully close a connection.
-
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block.
Instance Attribute Details
#id ⇒ Integer (readonly)
Channel ID
39 40 41 |
# File 'lib/amqp/client/channel.rb', line 39 def id @id end |
Instance Method Details
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message
358 359 360 361 |
# File 'lib/amqp/client/channel.rb', line 358 def basic_ack(delivery_tag, multiple: false) write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple) nil end |
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer
334 335 336 337 338 339 340 341 342 |
# File 'lib/amqp/client/channel.rb', line 334 def basic_cancel(consumer_tag, no_wait: false) consumer = @consumers.fetch(consumer_tag) return if consumer.closed? write_bytes FrameBytes.basic_cancel(@id, consumer_tag) expect(:basic_cancel_ok) unless no_wait consumer.close nil end |
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue
315 316 317 318 319 320 321 322 323 324 325 326 327 328 |
# File 'lib/amqp/client/channel.rb', line 315 def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1, &blk) write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments) tag, = expect(:basic_consume_ok) @consumers[tag] = q = ::Queue.new if worker_threads.zero? consume_loop(q, tag, &blk) nil else threads = Array.new(worker_threads) do Thread.new { consume_loop(q, tag, &blk) } end [tag, threads] end end |
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling)
232 233 234 235 236 237 238 239 |
# File 'lib/amqp/client/channel.rb', line 232 def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) case (msg = @basic_gets.pop) when Message then msg when :basic_get_empty then nil when nil then raise Error::Closed.new(@id, *@closed) end end |
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message
368 369 370 371 |
# File 'lib/amqp/client/channel.rb', line 368 def basic_nack(delivery_tag, multiple: false, requeue: false) write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue) nil end |
#basic_publish(body, exchange, routing_key, **properties) ⇒ nil
Publishes a message to an exchange
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/amqp/client/channel.rb', line 262 def basic_publish(body, exchange, routing_key, **properties) body_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) @unconfirmed.push @confirm += 1 if @confirm return end write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end @unconfirmed.push @confirm += 1 if @confirm nil end |
#basic_publish_confirm(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it
297 298 299 300 301 |
# File 'lib/amqp/client/channel.rb', line 297 def basic_publish_confirm(body, exchange, routing_key, **properties) confirm_select(no_wait: true) basic_publish(body, exchange, routing_key, **properties) wait_for_confirms end |
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`
349 350 351 352 353 |
# File 'lib/amqp/client/channel.rb', line 349 def basic_qos(prefetch_count, prefetch_size: 0, global: false) write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global) expect :basic_qos_ok nil end |
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages
386 387 388 389 390 |
# File 'lib/amqp/client/channel.rb', line 386 def basic_recover(requeue: false) write_bytes FrameBytes.basic_recover(@id, requeue: requeue) expect :basic_recover_ok nil end |
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message
377 378 379 380 |
# File 'lib/amqp/client/channel.rb', line 377 def basic_reject(delivery_tag, requeue: false) write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue) nil end |
#close(reason: "", code: 200) ⇒ nil
Gracefully close a connection
55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/amqp/client/channel.rb', line 55 def close(reason: "", code: 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close @unconfirmed_empty.close @consumers.each_value(&:close) nil end |
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker
398 399 400 401 402 403 404 405 |
# File 'lib/amqp/client/channel.rb', line 398 def confirm_select(no_wait: false) return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) expect :confirm_select_ok unless no_wait @confirm = 0 nil end |
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to another exchange
125 126 127 128 129 |
# File 'lib/amqp/client/channel.rb', line 125 def exchange_bind(destination, source, binding_key, arguments: {}) write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) expect :exchange_bind_ok nil end |
#exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange
102 103 104 105 106 |
# File 'lib/amqp/client/channel.rb', line 102 def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments) expect :exchange_declare_ok nil end |
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange
113 114 115 116 117 |
# File 'lib/amqp/client/channel.rb', line 113 def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok unless no_wait nil end |
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from another exchange
137 138 139 140 141 |
# File 'lib/amqp/client/channel.rb', line 137 def exchange_unbind(destination, source, binding_key, arguments: {}) write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) expect :exchange_unbind_ok nil end |
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block. If not set the message will just be logged to STDERR
85 86 87 88 |
# File 'lib/amqp/client/channel.rb', line 85 def on_return(&block) @on_return = block nil end |
#queue_bind(name, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange
195 196 197 198 199 |
# File 'lib/amqp/client/channel.rb', line 195 def queue_bind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) expect :queue_bind_ok nil end |
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent)
165 166 167 168 169 170 171 172 173 174 |
# File 'lib/amqp/client/channel.rb', line 165 def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, , consumer_count = expect(:queue_declare_ok) QueueOk.new(name, , consumer_count) end |
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue
183 184 185 186 187 |
# File 'lib/amqp/client/channel.rb', line 183 def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) , = expect :queue_delete unless no_wait end |
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue
206 207 208 209 210 |
# File 'lib/amqp/client/channel.rb', line 206 def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) , = expect :queue_purge_ok unless no_wait end |
#queue_unbind(name, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange
218 219 220 221 222 |
# File 'lib/amqp/client/channel.rb', line 218 def queue_unbind(name, exchange, binding_key, arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok nil end |
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode
450 451 452 453 454 |
# File 'lib/amqp/client/channel.rb', line 450 def tx_commit write_bytes FrameBytes.tx_commit(@id) expect :tx_commit_ok nil end |
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode
458 459 460 461 462 |
# File 'lib/amqp/client/channel.rb', line 458 def tx_rollback write_bytes FrameBytes.tx_rollback(@id) expect :tx_rollback_ok nil end |
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish
442 443 444 445 446 |
# File 'lib/amqp/client/channel.rb', line 442 def tx_select write_bytes FrameBytes.tx_select(@id) expect :tx_select_ok nil end |