Class: AMQP::Client::Connection::Channel
- Inherits:
-
Object
- Object
- AMQP::Client::Connection::Channel
- Defined in:
- lib/amqp/client/channel.rb
Overview
AMQP Channel
Queue collapse
- QueueOk =
Response when declaring a Queue
Data.define(:queue_name, :message_count, :consumer_count)
Basic collapse
- ConsumeOk =
Response when subscribing (starting a consumer)
Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel)
Queue collapse
-
#consumer_count ⇒ Integer
Number of consumers subscribed to the queue at the time of declaration.
-
#message_count ⇒ Integer
Number of messages in the queue at the time of declaration.
-
#queue_name ⇒ String
The name of the queue.
Basic collapse
-
#channel_id ⇒ Integer
The channel ID.
-
#consumer_tag ⇒ String
The consumer tag.
-
#worker_threads ⇒ Array<Thread>
Array of worker threads.
Instance Attribute Summary collapse
-
#connection ⇒ Connection
readonly
Connection this channel belongs to.
-
#id ⇒ Integer
readonly
Channel ID.
Exchange collapse
-
#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Bind an exchange to another exchange.
-
#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange.
-
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange.
-
#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Unbind an exchange from another exchange.
Queue collapse
-
#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent).
-
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue.
-
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue.
-
#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Basic collapse
-
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message.
-
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer.
-
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?
Consume messages from a queue.
-
#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message
Consume a single message from a queue.
-
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling).
-
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message.
-
#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publishes a message to an exchange.
-
#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it.
-
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`.
-
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages.
-
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message.
Confirm collapse
-
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker.
-
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed.
Transaction collapse
-
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode.
-
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode.
-
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish.
Instance Method Summary collapse
-
#close(reason: "", code: 200) ⇒ nil
Gracefully close channel.
-
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block.
Instance Attribute Details
#channel_id ⇒ Integer
Returns The channel ID.
322 |
# File 'lib/amqp/client/channel.rb', line 322 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
#connection ⇒ Connection (readonly)
Connection this channel belongs to
45 46 47 |
# File 'lib/amqp/client/channel.rb', line 45 def connection @connection end |
#consumer_count ⇒ Integer
Returns Number of consumers subscribed to the queue at the time of declaration.
163 |
# File 'lib/amqp/client/channel.rb', line 163 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#consumer_tag ⇒ String
Returns The consumer tag.
322 |
# File 'lib/amqp/client/channel.rb', line 322 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
#id ⇒ Integer (readonly)
Channel ID
41 42 43 |
# File 'lib/amqp/client/channel.rb', line 41 def id @id end |
#message_count ⇒ Integer
Returns Number of messages in the queue at the time of declaration.
163 |
# File 'lib/amqp/client/channel.rb', line 163 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#queue_name ⇒ String
Returns The name of the queue.
163 |
# File 'lib/amqp/client/channel.rb', line 163 QueueOk = Data.define(:queue_name, :message_count, :consumer_count) |
#worker_threads ⇒ Array<Thread>
Returns Array of worker threads.
322 |
# File 'lib/amqp/client/channel.rb', line 322 ConsumeOk = Data.define(:channel_id, :consumer_tag, :worker_threads, :msg_q, :on_cancel) |
Instance Method Details
#basic_ack(delivery_tag, multiple: false) ⇒ nil
Acknowledge a message
411 412 413 414 |
# File 'lib/amqp/client/channel.rb', line 411 def basic_ack(delivery_tag, multiple: false) write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple) nil end |
#basic_cancel(consumer_tag, no_wait: false) ⇒ nil
Cancel/abort/stop a consumer
385 386 387 388 389 390 391 392 393 394 |
# File 'lib/amqp/client/channel.rb', line 385 def basic_cancel(consumer_tag, no_wait: false) consumer = @consumers[consumer_tag] return unless consumer write_bytes FrameBytes.basic_cancel(@id, consumer_tag) expect(:basic_cancel_ok) unless no_wait @consumers.delete(consumer_tag) close_consumer(consumer) nil end |
#basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil) {|Message| ... } ⇒ ConsumeOk?
Consume messages from a queue
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/amqp/client/channel.rb', line 338 def basic_consume(queue, tag: "", no_ack: true, exclusive: false, no_wait: false, arguments: {}, worker_threads: 1, on_cancel: nil, &blk) raise ArgumentError, "consumer_tag required when no_wait" if no_wait && tag.empty? write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, no_wait, arguments) consumer_tag, = expect(:basic_consume_ok) unless no_wait msg_q = ::Queue.new if worker_threads.zero? @consumers[consumer_tag] = ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: [], msg_q:, on_cancel:) consume_loop(msg_q, consumer_tag, &blk) nil else threads = Array.new(worker_threads) do Thread.new { consume_loop(msg_q, consumer_tag, &blk) } end @consumers[consumer_tag] = ConsumeOk.new(channel_id: @id, consumer_tag:, worker_threads: threads, msg_q:, on_cancel:) end end |
#basic_consume_once(queue, timeout: nil) { ... } ⇒ Message
Consume a single message from a queue
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/amqp/client/channel.rb', line 365 def basic_consume_once(queue, timeout: nil, &) tag = "consume-once-#{rand(1024)}" write_bytes FrameBytes.basic_consume(@id, queue, tag, true, false, true, nil) msg_q = ::Queue.new @consumers[tag] = ConsumeOk.new(channel_id: @id, consumer_tag: tag, worker_threads: [], msg_q:, on_cancel: nil) yield if block_given? msg = msg_q.pop(timeout:) write_bytes FrameBytes.basic_cancel(@id, tag, no_wait: true) consumer = @consumers.delete(tag) close_consumer(consumer) raise Timeout::Error, "No message received in #{timeout} seconds" if timeout && msg.nil? msg end |
#basic_get(queue_name, no_ack: true) ⇒ Message?
Get a message from a queue (by polling)
242 243 244 245 246 247 248 249 |
# File 'lib/amqp/client/channel.rb', line 242 def basic_get(queue_name, no_ack: true) write_bytes FrameBytes.basic_get(@id, queue_name, no_ack) case (msg = @basic_gets.pop) when Message then msg when :basic_get_empty then nil when nil then raise Error::Closed.new(@id, *@closed) end end |
#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil
Negatively acknowledge a message
421 422 423 424 |
# File 'lib/amqp/client/channel.rb', line 421 def basic_nack(delivery_tag, multiple: false, requeue: false) write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue) nil end |
#basic_publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publishes a message to an exchange
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/amqp/client/channel.rb', line 272 def basic_publish(body, exchange:, routing_key: "", **properties) body_max = @connection.frame_max - 8 id = @id mandatory = properties.delete(:mandatory) || false case properties.delete(:persistent) when true then properties[:delivery_mode] = 2 when false then properties[:delivery_mode] = 1 end if @confirm @unconfirmed_lock.synchronize do @unconfirmed.push @confirm += 1 end end if body.bytesize.between?(1, body_max) write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties), FrameBytes.body(id, body) return end write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory), FrameBytes.header(id, body.bytesize, properties) pos = 0 while pos < body.bytesize # split body into multiple frame_max frames len = [body_max, body.bytesize - pos].min body_part = body.byteslice(pos, len) write_bytes FrameBytes.body(id, body_part) pos += len end nil end |
#basic_publish_confirm(body, exchange:, routing_key: "", **properties) ⇒ Boolean
Publish a message and block until the message has confirmed it has received it
309 310 311 312 313 |
# File 'lib/amqp/client/channel.rb', line 309 def basic_publish_confirm(body, exchange:, routing_key: "", **properties) confirm_select(no_wait: true) basic_publish(body, exchange:, routing_key:, **properties) wait_for_confirms end |
#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil
Specify how many messages to prefetch for consumers with ‘no_ack: false`
401 402 403 404 405 |
# File 'lib/amqp/client/channel.rb', line 401 def basic_qos(prefetch_count, prefetch_size: 0, global: false) write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global) expect :basic_qos_ok nil end |
#basic_recover(requeue: false) ⇒ nil
Recover all the unacknowledge messages
439 440 441 442 443 |
# File 'lib/amqp/client/channel.rb', line 439 def basic_recover(requeue: false) write_bytes FrameBytes.basic_recover(@id, requeue:) expect :basic_recover_ok nil end |
#basic_reject(delivery_tag, requeue: false) ⇒ nil
Reject a message
430 431 432 433 |
# File 'lib/amqp/client/channel.rb', line 430 def basic_reject(delivery_tag, requeue: false) write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue) nil end |
#close(reason: "", code: 200) ⇒ nil
Gracefully close channel
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/amqp/client/channel.rb', line 63 def close(reason: "", code: 200) return if @closed write_bytes FrameBytes.channel_close(@id, reason, code) @closed = [:channel, code, reason] expect :channel_close_ok @replies.close @basic_gets.close @unconfirmed_lock.synchronize { @unconfirmed_empty.broadcast } @consumers.each_value { |c| close_consumer(c) } nil end |
#confirm_select(no_wait: false) ⇒ nil
Put the channel in confirm mode, each published message will then be confirmed by the broker
451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/amqp/client/channel.rb', line 451 def confirm_select(no_wait: false) return if @confirm # fast path @unconfirmed_lock.synchronize do # check again in case another thread already did this while we waited for the lock return if @confirm write_bytes FrameBytes.confirm_select(@id, no_wait) expect :confirm_select_ok unless no_wait @confirm = 0 end nil end |
#exchange_bind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Bind an exchange to another exchange
135 136 137 138 139 |
# File 'lib/amqp/client/channel.rb', line 135 def exchange_bind(source:, destination:, binding_key:, arguments: {}) write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments) expect :exchange_bind_ok nil end |
#exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil
Declare an exchange
112 113 114 115 116 |
# File 'lib/amqp/client/channel.rb', line 112 def exchange_declare(name, type:, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments) expect :exchange_declare_ok nil end |
#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil
Delete an exchange
123 124 125 126 127 |
# File 'lib/amqp/client/channel.rb', line 123 def exchange_delete(name, if_unused: false, no_wait: false) write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait) expect :exchange_delete_ok unless no_wait nil end |
#exchange_unbind(source:, destination:, binding_key:, arguments: {}) ⇒ nil
Unbind an exchange from another exchange
147 148 149 150 151 |
# File 'lib/amqp/client/channel.rb', line 147 def exchange_unbind(source:, destination:, binding_key:, arguments: {}) write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments) expect :exchange_unbind_ok nil end |
#on_return {|ReturnMessage| ... } ⇒ Object
Handle returned messages in this block. If not set the message will just be logged to STDERR
95 96 97 98 |
# File 'lib/amqp/client/channel.rb', line 95 def on_return(&block) @on_return = block nil end |
#queue_bind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange
205 206 207 208 209 |
# File 'lib/amqp/client/channel.rb', line 205 def queue_bind(name, exchange:, binding_key: "", arguments: {}) write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments) expect :queue_bind_ok nil end |
#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk
Create a queue (operation is idempotent)
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/amqp/client/channel.rb', line 175 def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) durable = false if name.empty? exclusive = true if name.empty? auto_delete = true if name.empty? write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments) name, , consumer_count = expect(:queue_declare_ok) QueueOk.new(name, , consumer_count) end |
#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?
Delete a queue
193 194 195 196 197 |
# File 'lib/amqp/client/channel.rb', line 193 def queue_delete(name, if_unused: false, if_empty: false, no_wait: false) write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait) , = expect :queue_delete unless no_wait end |
#queue_purge(name, no_wait: false) ⇒ Integer?
Purge a queue
216 217 218 219 220 |
# File 'lib/amqp/client/channel.rb', line 216 def queue_purge(name, no_wait: false) write_bytes FrameBytes.queue_purge(@id, name, no_wait) , = expect :queue_purge_ok unless no_wait end |
#queue_unbind(name, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange
228 229 230 231 232 |
# File 'lib/amqp/client/channel.rb', line 228 def queue_unbind(name, exchange:, binding_key: "", arguments: {}) write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments) expect :queue_unbind_ok nil end |
#tx_commit ⇒ nil
Commmit a transaction, requires that the channel is in transaction mode
509 510 511 512 513 |
# File 'lib/amqp/client/channel.rb', line 509 def tx_commit write_bytes FrameBytes.tx_commit(@id) expect :tx_commit_ok nil end |
#tx_rollback ⇒ nil
Rollback a transaction, requires that the channel is in transaction mode
517 518 519 520 521 |
# File 'lib/amqp/client/channel.rb', line 517 def tx_rollback write_bytes FrameBytes.tx_rollback(@id) expect :tx_rollback_ok nil end |
#tx_select ⇒ nil
Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish
501 502 503 504 505 |
# File 'lib/amqp/client/channel.rb', line 501 def tx_select write_bytes FrameBytes.tx_select(@id) expect :tx_select_ok nil end |
#wait_for_confirms ⇒ Boolean
Block until all publishes messages are confirmed
467 468 469 470 471 472 473 474 475 476 477 |
# File 'lib/amqp/client/channel.rb', line 467 def wait_for_confirms @unconfirmed_lock.synchronize do until @unconfirmed.empty? @unconfirmed_empty.wait(@unconfirmed_lock) raise Error::Closed.new(@id, *@closed) if @closed end result = !@nacked @nacked = false # Reset for next round of publishes result end end |