Class: AMQP::Client
- Inherits:
-
Object
- Object
- AMQP::Client
- Defined in:
- lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/frame_bytes.rb
Overview
AMQP 0-9-1 Client
Defined Under Namespace
Classes: Connection, Error, Exchange, Message, Properties, Queue, ReturnMessage
Constant Summary collapse
- VERSION =
Version of the client library
"1.2.1"
Connect and disconnect collapse
-
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection.
-
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first.
-
#stop ⇒ nil
Close the currently open connection.
High level objects collapse
-
#default_exchange ⇒ Exchange
(also: #default)
Return a high level Exchange object for the default direct exchange.
-
#direct_exchange(name = "amq.direct") ⇒ Exchange
(also: #direct)
Declare a direct exchange and return a high level Exchange object.
-
#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object.
-
#fanout_exchange(name = "amq.fanout") ⇒ Exchange
(also: #fanout)
Declare a fanout exchange and return a high level Exchange object.
-
#headers_exchange(name = "amq.headers") ⇒ Exchange
(also: #headers)
Declare a headers exchange and return a high level Exchange object.
-
#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue
Declare a queue.
-
#topic_exchange(name = "amq.topic") ⇒ Exchange
(also: #topic)
Declare a topic exchange and return a high level Exchange object.
Publish collapse
-
#publish(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a (persistent) message and wait for confirmation.
-
#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation.
-
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes.
Queue actions collapse
-
#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue.
-
#purge(queue) ⇒ nil
Purge a queue.
-
#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue.
-
#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Exchange actions collapse
-
#delete_exchange(name) ⇒ nil
Delete an exchange.
-
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to an exchange.
-
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from an exchange.
Instance Method Summary collapse
- #initialize(uri = "", **options) ⇒ Client constructor
Constructor Details
#initialize(uri = "", **options) ⇒ Client
25 26 27 28 29 30 31 32 |
# File 'lib/amqp/client.rb', line 25 def initialize(uri = "", **) @uri = uri @options = @queues = {} @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end |
Instance Method Details
#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange
269 270 271 272 273 |
# File 'lib/amqp/client.rb', line 269 def bind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_bind(queue, exchange, binding_key, arguments:) end end |
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection
41 42 43 |
# File 'lib/amqp/client.rb', line 41 def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread:, **@options) end |
#default_exchange ⇒ Exchange Also known as: default
Return a high level Exchange object for the default direct exchange
160 161 162 |
# File 'lib/amqp/client.rb', line 160 def default_exchange(**) direct("", **) end |
#delete_exchange(name) ⇒ nil
Delete an exchange
339 340 341 342 343 344 345 |
# File 'lib/amqp/client.rb', line 339 def delete_exchange(name) with_connection do |conn| conn.channel(1).exchange_delete(name) @exchanges.delete(name) nil end end |
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue
301 302 303 304 305 306 307 |
# File 'lib/amqp/client.rb', line 301 def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| msgs = conn.channel(1).queue_delete(name, if_unused:, if_empty:) @queues.delete(name) msgs end end |
#direct_exchange(name = "amq.direct") ⇒ Exchange Also known as: direct
Declare a direct exchange and return a high level Exchange object
144 145 146 147 148 149 150 151 |
# File 'lib/amqp/client.rb', line 144 def direct_exchange(name = "amq.direct", **) return exchange(name, "direct", **) unless name.empty? # Return the default exchange @exchanges.fetch(name) do @exchanges[name] = Exchange.new(self, name) end end |
#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object
131 132 133 134 135 136 137 138 |
# File 'lib/amqp/client.rb', line 131 def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type, durable:, auto_delete:, internal:, arguments:) end @exchanges[name] = Exchange.new(self, name) end end |
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to an exchange
318 319 320 321 322 |
# File 'lib/amqp/client.rb', line 318 def exchange_bind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_bind(destination, source, binding_key, arguments:) end end |
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from an exchange
330 331 332 333 334 |
# File 'lib/amqp/client.rb', line 330 def exchange_unbind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_unbind(destination, source, binding_key, arguments:) end end |
#fanout_exchange(name = "amq.fanout") ⇒ Exchange Also known as: fanout
Declare a fanout exchange and return a high level Exchange object
172 173 174 |
# File 'lib/amqp/client.rb', line 172 def fanout_exchange(name = "amq.fanout", **) exchange(name, "fanout", **) end |
#headers_exchange(name = "amq.headers") ⇒ Exchange Also known as: headers
Declare a headers exchange and return a high level Exchange object
196 197 198 |
# File 'lib/amqp/client.rb', line 196 def headers_exchange(name = "amq.headers", **) exchange(name, "headers", **) end |
#publish(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a (persistent) message and wait for confirmation
212 213 214 215 216 217 |
# File 'lib/amqp/client.rb', line 212 def publish(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) end end |
#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation
224 225 226 227 228 229 |
# File 'lib/amqp/client.rb', line 224 def publish_and_forget(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish(body, exchange, routing_key, **properties) end end |
#purge(queue) ⇒ nil
Purge a queue
290 291 292 293 294 |
# File 'lib/amqp/client.rb', line 290 def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end |
#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue
Declare a queue
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/amqp/client.rb', line 108 def queue(name, durable: true, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.channel(1).queue_declare(name, durable:, auto_delete:, arguments:) end @queues[name] = Queue.new(self, name) end end |
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/amqp/client.rb', line 51 def start @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.current.abort_on_exception = true # Raising an unhandled exception is a bug loop do break if @stopped conn ||= connect(read_loop_thread: false) Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue_name, no_ack:, worker_threads: wt, arguments: args, &blk) end @connq << conn end conn.read_loop # blocks until connection is closed, then reconnect rescue Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 ensure conn = nil end end self end |
#stop ⇒ nil
Close the currently open connection
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/amqp/client.rb', line 82 def stop return if @stopped @stopped = true return unless @connq.size.positive? conn = @connq.pop conn.close nil end |
#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue
251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/amqp/client.rb', line 251 def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0 @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue, no_ack:, worker_threads:, arguments:, &blk) end end |
#topic_exchange(name = "amq.topic") ⇒ Exchange Also known as: topic
Declare a topic exchange and return a high level Exchange object
184 185 186 |
# File 'lib/amqp/client.rb', line 184 def topic_exchange(name = "amq.topic", **) exchange(name, "topic", **) end |
#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange
281 282 283 284 285 |
# File 'lib/amqp/client.rb', line 281 def unbind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments:) end end |
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes
233 234 235 236 237 |
# File 'lib/amqp/client.rb', line 233 def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end |