Class: AMQP::Client::Connection::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/channel.rb

Overview

AMQP Channel

Defined Under Namespace

Classes: QueueOk

Instance Attribute Summary collapse

Exchange collapse

Queue collapse

Basic collapse

Confirm collapse

Transaction collapse

Instance Method Summary collapse

Instance Attribute Details

#idInteger (readonly)

Channel ID

Returns:

  • (Integer)


39
40
41
# File 'lib/amqp/client/channel.rb', line 39

def id
  @id
end

Instance Method Details

#basic_ack(delivery_tag, multiple: false) ⇒ nil

Acknowledge a message

Parameters:

  • delivery_tag (Integer)

    The delivery tag of the message to acknowledge

Returns:

  • (nil)


362
363
364
365
# File 'lib/amqp/client/channel.rb', line 362

def basic_ack(delivery_tag, multiple: false)
  write_bytes FrameBytes.basic_ack(@id, delivery_tag, multiple)
  nil
end

#basic_cancel(consumer_tag, no_wait: false) ⇒ nil

Cancel/abort/stop a consumer

Parameters:

  • consumer_tag (String)

    Tag of the consumer to cancel

  • no_wait (Boolean) (defaults to: false)

    Will wait for a confirmation from the broker that the consumer is cancelled

Returns:

  • (nil)


338
339
340
341
342
343
344
345
346
# File 'lib/amqp/client/channel.rb', line 338

def basic_cancel(consumer_tag, no_wait: false)
  consumer = @consumers.fetch(consumer_tag)
  return if consumer.closed?

  write_bytes FrameBytes.basic_cancel(@id, consumer_tag)
  expect(:basic_cancel_ok) unless no_wait
  consumer.close
  nil
end

#basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?

Consume messages from a queue

Parameters:

  • queue (String)

    Name of the queue to subscribe to

  • tag (String) (defaults to: "")

    Custom consumer tag, will be auto assigned by the broker if empty. Has to be uniqe among this channel’s consumers only

  • no_ack (Boolean) (defaults to: true)

    When false messages have to be manually acknowledged (or rejected)

  • exclusive (Boolean) (defaults to: false)

    When true only a single consumer can consume from the queue at a time

  • arguments (Hash) (defaults to: {})

    Custom arguments for the consumer

  • worker_threads (Integer) (defaults to: 1)

    Number of threads processing messages, 0 means that the thread calling this method will process the messages and thus this method will block

Yields:

  • (Message)

    Delivered message from the queue

Returns:

  • (Array<(String, Array<Thread>)>)

    Returns consumer_tag and an array of worker threads

  • (nil)

    When ‘worker_threads` is 0 the method will return when the consumer is cancelled



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/amqp/client/channel.rb', line 313

def basic_consume(queue, tag: "", no_ack: true, exclusive: false, arguments: {}, worker_threads: 1)
  write_bytes FrameBytes.basic_consume(@id, queue, tag, no_ack, exclusive, arguments)
  tag, = expect(:basic_consume_ok)
  q = @consumers[tag] = ::Queue.new
  if worker_threads.zero?
    loop do
      yield (q.pop || break)
    end
    nil
  else
    threads = Array.new(worker_threads) do
      Thread.new do
        loop do
          yield (q.pop || break)
        end
      end
    end
    [tag, threads]
  end
end

#basic_get(queue_name, no_ack: true) ⇒ Message?

Get a message from a queue (by polling)

Parameters:

  • queue_name (String)
  • no_ack (Boolean) (defaults to: true)

    When false the message have to be manually acknowledged

Returns:

  • (Message)

    If the queue had a message

  • (nil)

    If the queue doesn’t have any messages



230
231
232
233
234
235
236
237
# File 'lib/amqp/client/channel.rb', line 230

def basic_get(queue_name, no_ack: true)
  write_bytes FrameBytes.basic_get(@id, queue_name, no_ack)
  case (msg = @basic_gets.pop)
  when Message then msg
  when :basic_get_empty then nil
  when nil              then raise Error::Closed.new(@id, *@closed)
  end
end

#basic_nack(delivery_tag, multiple: false, requeue: false) ⇒ nil

Negatively acknowledge a message

Parameters:

  • delivery_tag (Integer)

    The delivery tag of the message to acknowledge

  • multiple (Boolean) (defaults to: false)

    Nack all messages up to this message

  • requeue (Boolean) (defaults to: false)

    Requeue the message

Returns:

  • (nil)


372
373
374
375
# File 'lib/amqp/client/channel.rb', line 372

def basic_nack(delivery_tag, multiple: false, requeue: false)
  write_bytes FrameBytes.basic_nack(@id, delivery_tag, multiple, requeue)
  nil
end

#basic_publish(body, exchange, routing_key, **properties) ⇒ nil

Publishes a message to an exchange

Parameters:

  • body (String)

    The body, can be a string or a byte array

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String)

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (Integer)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (nil)


260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/amqp/client/channel.rb', line 260

def basic_publish(body, exchange, routing_key, **properties)
  body_max = @connection.frame_max - 8
  id = @id
  mandatory = properties.delete(:mandatory) || false
  case properties.delete(:persistent)
  when true then properties[:delivery_mode] = 2
  when false then properties[:delivery_mode] = 1
  end

  if body.bytesize.between?(1, body_max)
    write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
                FrameBytes.header(id, body.bytesize, properties),
                FrameBytes.body(id, body)
    @unconfirmed.push @confirm += 1 if @confirm
    return
  end

  write_bytes FrameBytes.basic_publish(id, exchange, routing_key, mandatory),
              FrameBytes.header(id, body.bytesize, properties)
  pos = 0
  while pos < body.bytesize # split body into multiple frame_max frames
    len = [body_max, body.bytesize - pos].min
    body_part = body.byteslice(pos, len)
    write_bytes FrameBytes.body(id, body_part)
    pos += len
  end
  @unconfirmed.push @confirm += 1 if @confirm
  nil
end

#basic_publish_confirm(body, exchange, routing_key, **properties) ⇒ Boolean

Publish a message and block until the message has confirmed it has received it

Parameters:

  • body (String)

    The body, can be a string or a byte array

  • exchange (String)

    Name of the exchange to publish to

  • routing_key (String)

    The routing key that the exchange might use to route the message to a queue

  • properties (Properties)

Options Hash (**properties):

  • mandatory (Boolean)

    The message will be returned if the message can’t be routed to a queue

  • persistent (Boolean)

    Same as delivery_mode: 2

  • content_type (String)

    Content type of the message body

  • content_encoding (String)

    Content encoding of the body

  • headers (Hash<String, Object>)

    Custom headers

  • delivery_mode (Integer)

    2 for persisted message, transient messages for all other values

  • priority (Integer)

    A priority of the message (between 0 and 255)

  • correlation_id (Integer)

    A correlation id, most often used used for RPC communication

  • reply_to (String)

    Queue to reply RPC responses to

  • expiration (Integer, String)

    Number of seconds the message will stay in the queue

  • message_id (String)

    Can be used to uniquely identify the message, e.g. for deduplication

  • timestamp (Date)

    Often used for the time the message was originally generated

  • type (String)

    Can indicate what kind of message this is

  • user_id (String)

    Can be used to verify that this is the user that published the message

  • app_id (String)

    Can be used to indicates which app that generated the message

Returns:

  • (Boolean)

    True if the message was successfully published



295
296
297
298
299
# File 'lib/amqp/client/channel.rb', line 295

def basic_publish_confirm(body, exchange, routing_key, **properties)
  confirm_select(no_wait: true)
  basic_publish(body, exchange, routing_key, **properties)
  wait_for_confirms
end

#basic_qos(prefetch_count, prefetch_size: 0, global: false) ⇒ nil

Specify how many messages to prefetch for consumers with ‘no_ack: false`

Parameters:

  • prefetch_count (Integer)

    Number of messages to maxium keep in flight

  • prefetch_size (Integer) (defaults to: 0)

    Number of bytes to maxium keep in flight

  • global (Boolean) (defaults to: false)

    If true the limit will apply to channel rather than the consumer

Returns:

  • (nil)


353
354
355
356
357
# File 'lib/amqp/client/channel.rb', line 353

def basic_qos(prefetch_count, prefetch_size: 0, global: false)
  write_bytes FrameBytes.basic_qos(@id, prefetch_size, prefetch_count, global)
  expect :basic_qos_ok
  nil
end

#basic_recover(requeue: false) ⇒ nil

Recover all the unacknowledge messages

Parameters:

  • requeue (Boolean) (defaults to: false)

    If false the currently unack:ed messages will be deliviered to this consumer again, if true to any consumer

Returns:

  • (nil)


390
391
392
393
394
# File 'lib/amqp/client/channel.rb', line 390

def basic_recover(requeue: false)
  write_bytes FrameBytes.basic_recover(@id, requeue: requeue)
  expect :basic_recover_ok
  nil
end

#basic_reject(delivery_tag, requeue: false) ⇒ nil

Reject a message

Parameters:

  • delivery_tag (Integer)

    The delivery tag of the message to acknowledge

  • requeue (Boolean) (defaults to: false)

    Requeue the message into the queue again

Returns:

  • (nil)


381
382
383
384
# File 'lib/amqp/client/channel.rb', line 381

def basic_reject(delivery_tag, requeue: false)
  write_bytes FrameBytes.basic_reject(@id, delivery_tag, requeue)
  nil
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Returns:

  • (nil)


55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/amqp/client/channel.rb', line 55

def close(reason: "", code: 200)
  return if @closed

  write_bytes FrameBytes.channel_close(@id, reason, code)
  @closed = [:channel, code, reason]
  expect :channel_close_ok
  @replies.close
  @basic_gets.close
  @unconfirmed_empty.close
  @consumers.each_value(&:close)
  nil
end

#confirm_select(no_wait: false) ⇒ nil

Put the channel in confirm mode, each published message will then be confirmed by the broker

Parameters:

  • no_wait (Boolean) (defaults to: false)

    If false the method will block until the broker has confirmed the request

Returns:

  • (nil)


402
403
404
405
406
407
408
409
# File 'lib/amqp/client/channel.rb', line 402

def confirm_select(no_wait: false)
  return if @confirm

  write_bytes FrameBytes.confirm_select(@id, no_wait)
  expect :confirm_select_ok unless no_wait
  @confirm = 0
  nil
end

#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil

Bind an exchange to another exchange

Parameters:

  • destination (String)

    Name of the exchange to bind

  • source (String)

    Name of the exchange to bind to

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on, but only when bound to header exchanges

Returns:

  • (nil)


124
125
126
127
128
# File 'lib/amqp/client/channel.rb', line 124

def exchange_bind(destination, source, binding_key, arguments: {})
  write_bytes FrameBytes.exchange_bind(@id, destination, source, binding_key, false, arguments)
  expect :exchange_bind_ok
  nil
end

#exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ nil

Declare an exchange

Parameters:

  • name (String)

    Name of the exchange

  • type (String)

    Type of exchange (amq.direct, amq.fanout, amq.topic, amq.headers, etc.)

  • passive (Boolean) (defaults to: false)

    If true raise an exception if the exchange doesn’t already exists

  • durable (Boolean) (defaults to: true)

    If true the exchange will persist between broker restarts, also a requirement for persistent messages

  • auto_delete (Boolean) (defaults to: false)

    If true the exchange will be deleted when the last queue/exchange is unbound

  • internal (Boolean) (defaults to: false)

    If true the exchange can’t be published to directly

  • arguments (Hash) (defaults to: {})

    Custom arguments

Returns:

  • (nil)


101
102
103
104
105
# File 'lib/amqp/client/channel.rb', line 101

def exchange_declare(name, type, passive: false, durable: true, auto_delete: false, internal: false, arguments: {})
  write_bytes FrameBytes.exchange_declare(@id, name, type, passive, durable, auto_delete, internal, arguments)
  expect :exchange_declare_ok
  nil
end

#exchange_delete(name, if_unused: false, no_wait: false) ⇒ nil

Delete an exchange

Parameters:

  • name (String)

    Name of the exchange

  • if_unused (Boolean) (defaults to: false)

    If true raise an exception if queues/exchanges is bound to this exchange

  • no_wait (Boolean) (defaults to: false)

    If true don’t wait for a broker confirmation

Returns:

  • (nil)


112
113
114
115
116
# File 'lib/amqp/client/channel.rb', line 112

def exchange_delete(name, if_unused: false, no_wait: false)
  write_bytes FrameBytes.exchange_delete(@id, name, if_unused, no_wait)
  expect :exchange_delete_ok unless no_wait
  nil
end

#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil

Unbind an exchange from another exchange

Parameters:

  • destination (String)

    Name of the exchange to unbind

  • source (String)

    Name of the exchange to unbind from

  • binding_key (String)

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


136
137
138
139
140
# File 'lib/amqp/client/channel.rb', line 136

def exchange_unbind(destination, source, binding_key, arguments: {})
  write_bytes FrameBytes.exchange_unbind(@id, destination, source, binding_key, false, arguments)
  expect :exchange_unbind_ok
  nil
end

#on_return {|ReturnMessage| ... } ⇒ Object

Handle returned messages in this block. If not set the message will just be logged to STDERR

Yields:

  • (ReturnMessage)

    Messages returned by the broker when a publish has failed

Returns:

  • nil



84
85
86
87
# File 'lib/amqp/client/channel.rb', line 84

def on_return(&block)
  @on_return = block
  nil
end

#queue_bind(name, exchange, binding_key, arguments: {}) ⇒ nil

Bind a queue to an exchange

Parameters:

  • name (String)

    Name of the queue to bind

  • exchange (String)

    Name of the exchange to bind to

  • binding_key (String)

    Binding key on which messages that match might be routed (depending on exchange type)

  • arguments (Hash) (defaults to: {})

    Message headers to match on, but only when bound to header exchanges

Returns:

  • (nil)


194
195
196
197
198
# File 'lib/amqp/client/channel.rb', line 194

def queue_bind(name, exchange, binding_key, arguments: {})
  write_bytes FrameBytes.queue_bind(@id, name, exchange, binding_key, false, arguments)
  expect :queue_bind_ok
  nil
end

#queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {}) ⇒ QueueOk

Create a queue (operation is idempotent)

Parameters:

  • name (String) (defaults to: "")

    Name of the queue, can be empty, but will then be generated by the broker

  • passive (Boolean) (defaults to: false)

    If true an exception will be raised if the queue doesn’t already exists

  • durable (Boolean) (defaults to: true)

    If true the queue will survive broker restarts, messages in the queue will only survive if they are published as persistent

  • exclusive (Boolean) (defaults to: false)

    If true the queue will be deleted when the channel is closed

  • auto_delete (Boolean) (defaults to: false)

    If true the queue will be deleted when the last consumer stops consuming (it won’t be deleted until at least one consumer has consumed from it)

  • arguments (Hash) (defaults to: {})

    Custom arguments, such as queue-ttl etc.

Returns:

  • (QueueOk)

    The QueueOk struct got ‘queue_name`, `message_count` and `consumer_count` properties



164
165
166
167
168
169
170
171
172
173
# File 'lib/amqp/client/channel.rb', line 164

def queue_declare(name = "", passive: false, durable: true, exclusive: false, auto_delete: false, arguments: {})
  durable = false if name.empty?
  exclusive = true if name.empty?
  auto_delete = true if name.empty?

  write_bytes FrameBytes.queue_declare(@id, name, passive, durable, exclusive, auto_delete, arguments)
  name, message_count, consumer_count = expect(:queue_declare_ok)

  QueueOk.new(name, message_count, consumer_count)
end

#queue_delete(name, if_unused: false, if_empty: false, no_wait: false) ⇒ Integer?

Delete a queue

Parameters:

  • name (String)

    Name of the queue

  • if_unused (Boolean) (defaults to: false)

    Only delete if the queue doesn’t have consumers, raises a ChannelClosed error otherwise

  • if_empty (Boolean) (defaults to: false)

    Only delete if the queue is empty, raises a ChannelClosed error otherwise

  • no_wait (Boolean) (defaults to: false)

    Don’t wait for a broker confirmation if true

Returns:

  • (Integer)

    Number of messages in queue when deleted

  • (nil)

    If no_wait was set true



182
183
184
185
186
# File 'lib/amqp/client/channel.rb', line 182

def queue_delete(name, if_unused: false, if_empty: false, no_wait: false)
  write_bytes FrameBytes.queue_delete(@id, name, if_unused, if_empty, no_wait)
  message_count, = expect :queue_delete unless no_wait
  message_count
end

#queue_purge(name, no_wait: false) ⇒ nil

Purge a queue

Parameters:

  • name (String)

    Name of the queue

  • no_wait (Boolean) (defaults to: false)

    Don’t wait for a broker confirmation if true

Returns:

  • (nil)


204
205
206
207
208
# File 'lib/amqp/client/channel.rb', line 204

def queue_purge(name, no_wait: false)
  write_bytes FrameBytes.queue_purge(@id, name, no_wait)
  expect :queue_purge_ok unless no_wait
  nil
end

#queue_unbind(name, exchange, binding_key, arguments: {}) ⇒ nil

Unbind a queue from an exchange

Parameters:

  • name (String)

    Name of the queue to unbind

  • exchange (String)

    Name of the exchange to unbind from

  • binding_key (String)

    Binding key which the queue is bound to the exchange with

  • arguments (Hash) (defaults to: {})

    Arguments matching the binding that’s being removed

Returns:

  • (nil)


216
217
218
219
220
# File 'lib/amqp/client/channel.rb', line 216

def queue_unbind(name, exchange, binding_key, arguments: {})
  write_bytes FrameBytes.queue_unbind(@id, name, exchange, binding_key, arguments)
  expect :queue_unbind_ok
  nil
end

#tx_commitnil

Commmit a transaction, requires that the channel is in transaction mode

Returns:

  • (nil)


454
455
456
457
458
# File 'lib/amqp/client/channel.rb', line 454

def tx_commit
  write_bytes FrameBytes.tx_commit(@id)
  expect :tx_commit_ok
  nil
end

#tx_rollbacknil

Rollback a transaction, requires that the channel is in transaction mode

Returns:

  • (nil)


462
463
464
465
466
# File 'lib/amqp/client/channel.rb', line 462

def tx_rollback
  write_bytes FrameBytes.tx_rollback(@id)
  expect :tx_rollback_ok
  nil
end

#tx_selectnil

Put the channel in transaction mode, make sure that you #tx_commit or #tx_rollback after publish

Returns:

  • (nil)


446
447
448
449
450
# File 'lib/amqp/client/channel.rb', line 446

def tx_select
  write_bytes FrameBytes.tx_select(@id)
  expect :tx_select_ok
  nil
end

#wait_for_confirmsBoolean

Block until all publishes messages are confirmed

Returns:

  • (Boolean)

    True if all message where positivly acknowledged, false if not

Raises:



413
414
415
416
417
418
419
420
# File 'lib/amqp/client/channel.rb', line 413

def wait_for_confirms
  return true if @unconfirmed.empty?

  ok = @unconfirmed_empty.pop
  raise Error::Closed.new(@id, *@closed) if ok.nil?

  ok
end