Class: AMQP::Client
- Inherits:
-
Object
- Object
- AMQP::Client
- Defined in:
- lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/consumer.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/rpc_client.rb,
lib/amqp/client/frame_bytes.rb,
lib/amqp/client/configuration.rb,
lib/amqp/client/message_codecs.rb,
lib/amqp/client/message_codec_registry.rb
Overview
AMQP 0-9-1 Client
Defined Under Namespace
Modules: Coders, Parsers Classes: Configuration, Connection, Consumer, Error, Exchange, Message, MessageCodecRegistry, Properties, Queue, RPCClient, ReturnMessage
Constant Summary collapse
- VERSION =
Version of the client library
"2.0.0"
RPC collapse
-
.codec_registry ⇒ MessageCodecRegistry
readonly
Get the class-level codec registry.
-
.config ⇒ Configuration
readonly
Get the class-level configuration.
-
#codec_registry ⇒ MessageCodecRegistry
readonly
Get the codec registry for this instance.
-
#default_content_encoding ⇒ String?
Get/set the default content_encoding to use when publishing messages.
-
#default_content_type ⇒ String?
Get/set the default content_type to use when publishing messages.
-
#strict_coding ⇒ Object
Get/set if condig should be strict, i.e.
Connect and disconnect collapse
-
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection.
-
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first.
-
#started? ⇒ Boolean
Check if the client is connected.
-
#stop ⇒ nil
Close the currently open connection and stop the supervision / reconnection logic.
High level objects collapse
-
#default_exchange ⇒ Exchange
(also: #default)
Return a high level Exchange object for the default direct exchange.
-
#direct_exchange(name = "amq.direct") ⇒ Exchange
(also: #direct)
Declare a direct exchange and return a high level Exchange object.
-
#exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object.
-
#fanout_exchange(name = "amq.fanout") ⇒ Exchange
(also: #fanout)
Declare a fanout exchange and return a high level Exchange object.
-
#headers_exchange(name = "amq.headers") ⇒ Exchange
(also: #headers)
Declare a headers exchange and return a high level Exchange object.
-
#queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) ⇒ Queue
Declare a queue.
-
#topic_exchange(name = "amq.topic") ⇒ Exchange
(also: #topic)
Declare a topic exchange and return a high level Exchange object.
Publish collapse
-
#publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message and wait for confirmation.
-
#publish_and_forget(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation.
-
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes.
Queue actions collapse
-
#bind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue.
-
#get(queue, no_ack: false) ⇒ Message?
Get a message from a queue.
-
#purge(queue) ⇒ nil
Purge a queue.
-
#subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Consume messages from a queue.
-
#unbind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Exchange actions collapse
-
#delete_exchange(name) ⇒ nil
Delete an exchange.
-
#exchange_bind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Bind an exchange to an exchange.
-
#exchange_unbind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Unbind an exchange from an exchange.
RPC collapse
-
.configure {|Configuration| ... } ⇒ Configuration
Configure the AMQP::Client class-level settings.
-
#rpc_call(method, arguments, timeout: nil, **properties) ⇒ String
Do a RPC call, sends a messages, waits for a response.
-
#rpc_client ⇒ RPCClient
Create a reusable RPC client.
-
#rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}) {|The| ... } ⇒ Consumer
Create a RPC server for a single method/function/procedure.
Instance Method Summary collapse
Constructor Details
#initialize(uri = "", **options) ⇒ Client
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/amqp/client.rb', line 35 def initialize(uri = "", **) @uri = uri @options = @queues = {} @exchanges = {} @consumers = {} @next_consumer_id = 0 @connq = SizedQueue.new(1) @codec_registry = self.class.codec_registry.dup @strict_coding = self.class.config.strict_coding @default_content_encoding = self.class.config.default_content_encoding @default_content_type = self.class.config.default_content_type @start_lock = Mutex.new @supervisor_started = false @stopped = false end |
Class Attribute Details
.codec_registry ⇒ MessageCodecRegistry (readonly)
Get the class-level codec registry
506 507 508 |
# File 'lib/amqp/client.rb', line 506 def codec_registry @codec_registry end |
.config ⇒ Configuration (readonly)
Get the class-level configuration
502 503 504 |
# File 'lib/amqp/client.rb', line 502 def config @config end |
Instance Attribute Details
#codec_registry ⇒ MessageCodecRegistry (readonly)
Get the codec registry for this instance
525 526 527 |
# File 'lib/amqp/client.rb', line 525 def codec_registry @codec_registry end |
#default_content_encoding ⇒ String?
Get/set the default content_encoding to use when publishing messages
536 537 538 |
# File 'lib/amqp/client.rb', line 536 def default_content_encoding @default_content_encoding end |
#default_content_type ⇒ String?
Get/set the default content_type to use when publishing messages
532 533 534 |
# File 'lib/amqp/client.rb', line 532 def default_content_type @default_content_type end |
#strict_coding ⇒ Object
Get/set if condig should be strict, i.e. if the client should raise on unknown codecs
528 529 530 |
# File 'lib/amqp/client.rb', line 528 def strict_coding @strict_coding end |
Class Method Details
.configure {|Configuration| ... } ⇒ Configuration
Configure the AMQP::Client class-level settings
495 496 497 498 |
# File 'lib/amqp/client.rb', line 495 def configure yield @config if block_given? @config end |
Instance Method Details
#bind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Bind a queue to an exchange
345 346 347 348 349 |
# File 'lib/amqp/client.rb', line 345 def bind(queue:, exchange:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).queue_bind(queue, exchange:, binding_key:, arguments:) end end |
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection
59 60 61 |
# File 'lib/amqp/client.rb', line 59 def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread:, codec_registry: @codec_registry, strict_coding: @strict_coding, **@options) end |
#default_exchange ⇒ Exchange Also known as: default
Return a high level Exchange object for the default direct exchange
201 202 203 |
# File 'lib/amqp/client.rb', line 201 def default_exchange(**) direct("", **) end |
#delete_exchange(name) ⇒ nil
Delete an exchange
415 416 417 418 419 420 421 |
# File 'lib/amqp/client.rb', line 415 def delete_exchange(name) with_connection do |conn| conn.channel(1).exchange_delete(name) @exchanges.delete(name) nil end end |
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue
377 378 379 380 381 382 383 |
# File 'lib/amqp/client.rb', line 377 def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| msgs = conn.channel(1).queue_delete(name, if_unused:, if_empty:) @queues.delete(name) msgs end end |
#direct_exchange(name = "amq.direct") ⇒ Exchange Also known as: direct
Declare a direct exchange and return a high level Exchange object
185 186 187 188 189 190 191 192 |
# File 'lib/amqp/client.rb', line 185 def direct_exchange(name = "amq.direct", **) return exchange(name, type: "direct", **) unless name.empty? # Return the default exchange @exchanges.fetch(name) do @exchanges[name] = Exchange.new(self, name) end end |
#exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object
172 173 174 175 176 177 178 179 |
# File 'lib/amqp/client.rb', line 172 def exchange(name, type:, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type:, durable:, auto_delete:, internal:, arguments:) end @exchanges[name] = Exchange.new(self, name) end end |
#exchange_bind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Bind an exchange to an exchange
394 395 396 397 398 |
# File 'lib/amqp/client.rb', line 394 def exchange_bind(source:, destination:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).exchange_bind(destination:, source:, binding_key:, arguments:) end end |
#exchange_unbind(source:, destination:, binding_key: "", arguments: {}) ⇒ nil
Unbind an exchange from an exchange
406 407 408 409 410 |
# File 'lib/amqp/client.rb', line 406 def exchange_unbind(source:, destination:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).exchange_unbind(destination:, source:, binding_key:, arguments:) end end |
#fanout_exchange(name = "amq.fanout") ⇒ Exchange Also known as: fanout
Declare a fanout exchange and return a high level Exchange object
213 214 215 |
# File 'lib/amqp/client.rb', line 213 def fanout_exchange(name = "amq.fanout", **) exchange(name, type: "fanout", **) end |
#get(queue, no_ack: false) ⇒ Message?
Get a message from a queue
332 333 334 335 336 337 |
# File 'lib/amqp/client.rb', line 332 def get(queue, no_ack: false) with_connection do |conn| ch = conn.channel ch.basic_get(queue, no_ack:) end end |
#headers_exchange(name = "amq.headers") ⇒ Exchange Also known as: headers
Declare a headers exchange and return a high level Exchange object
237 238 239 |
# File 'lib/amqp/client.rb', line 237 def headers_exchange(name = "amq.headers", **) exchange(name, type: "headers", **) end |
#publish(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message and wait for confirmation
259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/amqp/client.rb', line 259 def publish(body, exchange:, routing_key: "", **properties) with_connection do |conn| properties[:delivery_mode] ||= 2 properties = default_content_properties.merge(properties) body = serialize_and_encode_body(body, properties) result = conn.channel(1).basic_publish_confirm(body, exchange:, routing_key:, **properties) raise Error::PublishNotConfirmed unless result nil end end |
#publish_and_forget(body, exchange:, routing_key: "", **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation
278 279 280 281 282 283 284 285 |
# File 'lib/amqp/client.rb', line 278 def publish_and_forget(body, exchange:, routing_key: "", **properties) with_connection do |conn| properties[:delivery_mode] ||= 2 properties = default_content_properties.merge(properties) body = serialize_and_encode_body(body, properties) conn.channel(1).basic_publish(body, exchange:, routing_key:, **properties) end end |
#purge(queue) ⇒ nil
Purge a queue
366 367 368 369 370 |
# File 'lib/amqp/client.rb', line 366 def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end |
#queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) ⇒ Queue
Declare a queue
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/amqp/client.rb', line 149 def queue(name, durable: true, auto_delete: false, exclusive: false, passive: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.channel(1).queue_declare(name, durable:, auto_delete:, exclusive:, passive:, arguments:) end @queues[name] = Queue.new(self, name) end end |
#rpc_call(method, arguments, timeout: nil, **properties) ⇒ String
Do a RPC call, sends a messages, waits for a response
461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/amqp/client.rb', line 461 def rpc_call(method, arguments, timeout: nil, **properties) ch = with_connection(&:channel) begin msg = ch.basic_consume_once("amq.rabbitmq.reply-to", timeout:) do properties = default_content_properties.merge(properties) body = serialize_and_encode_body(arguments, properties) ch.basic_publish(body, exchange: "", routing_key: method.to_s, reply_to: "amq.rabbitmq.reply-to", **properties) end msg.parse ensure ch.close end end |
#rpc_client ⇒ RPCClient
Create a reusable RPC client
478 479 480 481 |
# File 'lib/amqp/client.rb', line 478 def rpc_client ch = with_connection(&:channel) RPCClient.new(ch).start end |
#rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}) {|The| ... } ⇒ Consumer
Create a RPC server for a single method/function/procedure
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/amqp/client.rb', line 437 def rpc_server(method, worker_threads: 1, durable: true, auto_delete: false, arguments: {}, &_) queue(method.to_s, durable:, auto_delete:, arguments:) .subscribe(prefetch: worker_threads, worker_threads:) do |msg| result = yield msg.parse properties = { content_type: msg.properties.content_type, content_encoding: msg.properties.content_encoding } result_body = serialize_and_encode_body(result, properties) msg.channel.basic_publish(result_body, exchange: "", routing_key: msg.properties.reply_to, correlation_id: msg.properties.correlation_id, **properties) msg.ack rescue StandardError msg.reject(requeue: false) raise end end |
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/amqp/client.rb', line 69 def start return self if started? @start_lock.synchronize do # rubocop:disable Metrics/BlockLength return self if started? @supervisor_started = true @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.current.abort_on_exception = true # Raising an unhandled exception is a bug loop do break if @stopped conn ||= connect(read_loop_thread: false) Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes @consumers.each_value do |consumer| ch = conn.channel ch.basic_qos(consumer.prefetch) consume_ok = ch.basic_consume(consumer.queue, **consumer.basic_consume_args, &consumer.block) # Update the consumer with new channel and consume_ok metadata consumer.update_consume_ok(consume_ok) end @connq << conn # Remove consumers whose internal queues were already closed (e.g. cancelled during reconnect window) @consumers.delete_if { |_, c| c.closed? } end conn.read_loop # blocks until connection is closed, then reconnect rescue Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 ensure @connq.clear conn = nil end end end self end |
#started? ⇒ Boolean
Check if the client is connected
128 129 130 |
# File 'lib/amqp/client.rb', line 128 def started? @supervisor_started && !@stopped end |
#stop ⇒ nil
Close the currently open connection and stop the supervision / reconnection logic.
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/amqp/client.rb', line 115 def stop return if @stopped && !@supervisor_started @stopped = true return unless @connq.size.positive? conn = @connq.pop conn.close nil end |
#subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}) {|Message| ... } ⇒ Consumer
Consume messages from a queue
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/amqp/client.rb', line 307 def subscribe(queue, exclusive: false, no_ack: false, prefetch: 1, worker_threads: 1, on_cancel: nil, arguments: {}, &blk) raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0 with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) consumer_id = @next_consumer_id += 1 on_cancel_proc = proc do |tag| @consumers.delete(consumer_id) on_cancel&.call(tag) end basic_consume_args = { exclusive:, no_ack:, worker_threads:, on_cancel: on_cancel_proc, arguments: } consume_ok = ch.basic_consume(queue, **basic_consume_args, &blk) consumer = Consumer.new(client: self, channel_id: ch.id, id: consumer_id, block: blk, queue:, consume_ok:, prefetch:, basic_consume_args:) @consumers[consumer_id] = consumer consumer end end |
#topic_exchange(name = "amq.topic") ⇒ Exchange Also known as: topic
Declare a topic exchange and return a high level Exchange object
225 226 227 |
# File 'lib/amqp/client.rb', line 225 def topic_exchange(name = "amq.topic", **) exchange(name, type: "topic", **) end |
#unbind(queue:, exchange:, binding_key: "", arguments: {}) ⇒ nil
Unbind a queue from an exchange
357 358 359 360 361 |
# File 'lib/amqp/client.rb', line 357 def unbind(queue:, exchange:, binding_key: "", arguments: {}) with_connection do |conn| conn.channel(1).queue_unbind(queue, exchange:, binding_key:, arguments:) end end |
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes
289 290 291 292 293 |
# File 'lib/amqp/client.rb', line 289 def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end |
#with_connection ⇒ Object
540 541 542 543 544 545 546 547 548 549 550 551 552 553 |
# File 'lib/amqp/client.rb', line 540 def with_connection conn = nil loop do conn = @connq.pop next if conn.closed? break end begin yield conn ensure @connq << conn unless conn.closed? end end |