Class: AMQP::Client
- Inherits:
-
Object
- Object
- AMQP::Client
- Defined in:
- lib/amqp/client.rb,
lib/amqp/client/queue.rb,
lib/amqp/client/table.rb,
lib/amqp/client/errors.rb,
lib/amqp/client/channel.rb,
lib/amqp/client/message.rb,
lib/amqp/client/version.rb,
lib/amqp/client/exchange.rb,
lib/amqp/client/connection.rb,
lib/amqp/client/properties.rb,
lib/amqp/client/frame_bytes.rb
Overview
AMQP 0-9-1 Client
Defined Under Namespace
Classes: Connection, Error, Exchange, Message, Properties, Queue, ReturnMessage
Constant Summary collapse
- VERSION =
Version of the client library
"1.2.1"
Connect and disconnect collapse
-
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection.
-
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first.
-
#stop ⇒ nil
Close the currently open connection.
High level objects collapse
-
#direct(name = "") ⇒ Exchange
Declare a direct exchange and return a high level Exchange object.
-
#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object.
-
#fanout(name = "amq.fanout") ⇒ Exchange
Declare a fanout exchange and return a high level Exchange object.
-
#headers(name = "amq.headers") ⇒ Exchange
Declare a headers exchange and return a high level Exchange object.
-
#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue
Declare a queue.
-
#topic(name = "amq.topic") ⇒ Exchange
Declare a topic exchange and return a high level Exchange object.
Publish collapse
-
#publish(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a (persistent) message and wait for confirmation.
-
#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation.
-
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes.
Queue actions collapse
-
#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange.
-
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue.
-
#purge(queue) ⇒ nil
Purge a queue.
-
#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue.
-
#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange.
Exchange actions collapse
-
#delete_exchange(name) ⇒ nil
Delete an exchange.
-
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to an exchange.
-
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from an exchange.
Instance Method Summary collapse
- #initialize(uri = "", **options) ⇒ Client constructor
Constructor Details
#initialize(uri = "", **options) ⇒ Client
25 26 27 28 29 30 31 32 |
# File 'lib/amqp/client.rb', line 25 def initialize(uri = "", **) @uri = uri @options = @queues = {} @exchanges = {} @subscriptions = Set.new @connq = SizedQueue.new(1) end |
Instance Method Details
#bind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Bind a queue to an exchange
241 242 243 244 245 |
# File 'lib/amqp/client.rb', line 241 def bind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_bind(queue, exchange, binding_key, arguments:) end end |
#connect(read_loop_thread: true) ⇒ Connection
Establishes and returns a new AMQP connection
41 42 43 |
# File 'lib/amqp/client.rb', line 41 def connect(read_loop_thread: true) Connection.new(@uri, read_loop_thread:, **@options) end |
#delete_exchange(name) ⇒ nil
Delete an exchange
311 312 313 314 315 316 317 |
# File 'lib/amqp/client.rb', line 311 def delete_exchange(name) with_connection do |conn| conn.channel(1).exchange_delete(name) @exchanges.delete(name) nil end end |
#delete_queue(name, if_unused: false, if_empty: false) ⇒ Integer
Delete a queue
273 274 275 276 277 278 279 |
# File 'lib/amqp/client.rb', line 273 def delete_queue(name, if_unused: false, if_empty: false) with_connection do |conn| msgs = conn.channel(1).queue_delete(name, if_unused:, if_empty:) @queues.delete(name) msgs end end |
#direct(name = "") ⇒ Exchange
Declare a direct exchange and return a high level Exchange object
152 153 154 155 156 157 158 |
# File 'lib/amqp/client.rb', line 152 def direct(name = "", **) return exchange(name, "direct", **) unless name.empty? @exchanges.fetch(name) do @exchanges[name] = Exchange.new(self, name) end end |
#exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) ⇒ Exchange
Declare an exchange and return a high level Exchange object
131 132 133 134 135 136 137 138 |
# File 'lib/amqp/client.rb', line 131 def exchange(name, type, durable: true, auto_delete: false, internal: false, arguments: {}) @exchanges.fetch(name) do with_connection do |conn| conn.channel(1).exchange_declare(name, type, durable:, auto_delete:, internal:, arguments:) end @exchanges[name] = Exchange.new(self, name) end end |
#exchange_bind(destination, source, binding_key, arguments: {}) ⇒ nil
Bind an exchange to an exchange
290 291 292 293 294 |
# File 'lib/amqp/client.rb', line 290 def exchange_bind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_bind(destination, source, binding_key, arguments:) end end |
#exchange_unbind(destination, source, binding_key, arguments: {}) ⇒ nil
Unbind an exchange from an exchange
302 303 304 305 306 |
# File 'lib/amqp/client.rb', line 302 def exchange_unbind(destination, source, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).exchange_unbind(destination, source, binding_key, arguments:) end end |
#fanout(name = "amq.fanout") ⇒ Exchange
Declare a fanout exchange and return a high level Exchange object
144 145 146 |
# File 'lib/amqp/client.rb', line 144 def fanout(name = "amq.fanout", **) exchange(name, "fanout", **) end |
#headers(name = "amq.headers") ⇒ Exchange
Declare a headers exchange and return a high level Exchange object
172 173 174 |
# File 'lib/amqp/client.rb', line 172 def headers(name = "amq.headers", **) exchange(name, "headers", **) end |
#publish(body, exchange, routing_key, **properties) ⇒ Boolean
Publish a (persistent) message and wait for confirmation
184 185 186 187 188 189 |
# File 'lib/amqp/client.rb', line 184 def publish(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish_confirm(body, exchange, routing_key, **properties) end end |
#publish_and_forget(body, exchange, routing_key, **properties) ⇒ nil
Publish a (persistent) message but don’t wait for a confirmation
196 197 198 199 200 201 |
# File 'lib/amqp/client.rb', line 196 def publish_and_forget(body, exchange, routing_key, **properties) with_connection do |conn| properties = { delivery_mode: 2 }.merge!(properties) conn.channel(1).basic_publish(body, exchange, routing_key, **properties) end end |
#purge(queue) ⇒ nil
Purge a queue
262 263 264 265 266 |
# File 'lib/amqp/client.rb', line 262 def purge(queue) with_connection do |conn| conn.channel(1).queue_purge(queue) end end |
#queue(name, durable: true, auto_delete: false, arguments: {}) ⇒ Queue
Declare a queue
108 109 110 111 112 113 114 115 116 117 |
# File 'lib/amqp/client.rb', line 108 def queue(name, durable: true, auto_delete: false, arguments: {}) raise ArgumentError, "Currently only supports named, durable queues" if name.empty? @queues.fetch(name) do with_connection do |conn| conn.channel(1).queue_declare(name, durable:, auto_delete:, arguments:) end @queues[name] = Queue.new(self, name) end end |
#start ⇒ self
Opens an AMQP connection using the high level API, will try to reconnect if successfully connected at first
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/amqp/client.rb', line 51 def start @stopped = false Thread.new(connect(read_loop_thread: false)) do |conn| Thread.current.abort_on_exception = true # Raising an unhandled exception is a bug loop do break if @stopped conn ||= connect(read_loop_thread: false) Thread.new do # restore connection in another thread, read_loop have to run conn.channel(1) # reserve channel 1 for publishes @subscriptions.each do |queue_name, no_ack, prefetch, wt, args, blk| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue_name, no_ack:, worker_threads: wt, arguments: args, &blk) end @connq << conn end conn.read_loop # blocks until connection is closed, then reconnect rescue Error => e warn "AMQP-Client reconnect error: #{e.inspect}" sleep @options[:reconnect_interval] || 1 ensure conn = nil end end self end |
#stop ⇒ nil
Close the currently open connection
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/amqp/client.rb', line 82 def stop return if @stopped @stopped = true return unless @connq.size.positive? conn = @connq.pop conn.close nil end |
#subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}) {|Message| ... } ⇒ Array<(String, Array<Thread>)>?
Consume messages from a queue
223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/amqp/client.rb', line 223 def subscribe(queue, no_ack: false, prefetch: 1, worker_threads: 1, arguments: {}, &blk) raise ArgumentError, "worker_threads have to be > 0" if worker_threads <= 0 @subscriptions.add? [queue, no_ack, prefetch, worker_threads, arguments, blk] with_connection do |conn| ch = conn.channel ch.basic_qos(prefetch) ch.basic_consume(queue, no_ack:, worker_threads:, arguments:, &blk) end end |
#topic(name = "amq.topic") ⇒ Exchange
Declare a topic exchange and return a high level Exchange object
164 165 166 |
# File 'lib/amqp/client.rb', line 164 def topic(name = "amq.topic", **) exchange(name, "topic", **) end |
#unbind(queue, exchange, binding_key, arguments: {}) ⇒ nil
Unbind a queue from an exchange
253 254 255 256 257 |
# File 'lib/amqp/client.rb', line 253 def unbind(queue, exchange, binding_key, arguments: {}) with_connection do |conn| conn.channel(1).queue_unbind(queue, exchange, binding_key, arguments:) end end |
#wait_for_confirms ⇒ Boolean
Wait for unconfirmed publishes
205 206 207 208 209 |
# File 'lib/amqp/client.rb', line 205 def wait_for_confirms with_connection do |conn| conn.channel(1).wait_for_confirms end end |