Class: AMQP::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb

Overview

Represents a single established AMQP connection

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Callbacks collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", read_loop_thread: true, **options) ⇒ Connection

Establish a connection to an AMQP broker

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • read_loop_thread (Boolean) (defaults to: true)

    If true run #read_loop in a background thread, otherwise the user have to run it explicitly, without #read_loop the connection won’t function

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • connect_timeout (Float) — default: 30

    TCP connection timeout

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.

  • keepalive (String) — default: 60:10:3

    TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/amqp/client/connection.rb', line 29

def initialize(uri = "", read_loop_thread: true, **options)
  uri = URI.parse(uri)
  tls = uri.scheme == "amqps"
  port = port_from_env || uri.port || (tls ? 5671 : 5672)
  host = uri.host || "localhost"
  user = uri.user || "guest"
  password = uri.password || "guest"
  vhost = URI.decode_www_form_component(uri.path[1..] || "/")
  options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)

  socket = open_socket(host, port, tls, options)
  channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options)

  @socket = socket
  @channel_max = channel_max.zero? ? 65_536 : channel_max
  @frame_max = frame_max
  @heartbeat = heartbeat
  @channels = {}
  @channels_lock = Mutex.new
  @closed = nil
  @replies = ::Queue.new
  @write_lock = Mutex.new
  @blocked = nil
  @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" }
  @on_unblocked = -> { warn "AMQP-Client unblocked by broker" }

  Thread.new { read_loop } if read_loop_thread
end

Instance Attribute Details

#frame_maxInteger (readonly)

The max frame size negotiated between the client and the broker

Returns:

  • (Integer)


77
78
79
# File 'lib/amqp/client/connection.rb', line 77

def frame_max
  @frame_max
end

Class Method Details

.connect(uri, read_loop_thread: true, **options) ⇒ Object

Deprecated.

Alias for #initialize

See Also:



71
72
73
# File 'lib/amqp/client/connection.rb', line 71

def self.connect(uri, read_loop_thread: true, **options)
  new(uri, read_loop_thread: read_loop_thread, **options)
end

Instance Method Details

#blocked?Bool

Indicates that the server is blocking publishes. If the client keeps publishing the server will stop reading from the socket. Use the #on_blocked callback to get notified when the server is resource constrained.

Returns:

  • (Bool)

See Also:



64
65
66
# File 'lib/amqp/client/connection.rb', line 64

def blocked?
  !@blocked.nil?
end

#channel(id = nil) ⇒ Channel

Open an AMQP channel

Parameters:

  • id (Integer, nil) (defaults to: nil)

    If nil a new channel will be opened, otherwise an already open channel might be reused

Returns:

Raises:

  • (ArgumentError)


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/amqp/client/connection.rb', line 89

def channel(id = nil)
  raise ArgumentError, "Channel ID cannot be 0" if id&.zero?
  raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max

  ch = @channels_lock.synchronize do
    if id
      @channels[id] ||= Channel.new(self, id)
    else
      1.upto(@channel_max) do |i|
        break id = i unless @channels.key? i
      end
      raise Error, "Max channels reached" if id.nil?

      @channels[id] = Channel.new(self, id)
    end
  end
  ch.open
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Parameters:

  • reason (String) (defaults to: "")

    A reason to close the connection can be logged by the broker

  • code (Integer) (defaults to: 200)

Returns:

  • (nil)


124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/amqp/client/connection.rb', line 124

def close(reason: "", code: 200)
  return if @closed

  @closed = [code, reason]
  @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  if @blocked
    @socket.close
  else
    write_bytes FrameBytes.connection_close(code, reason)
    expect(:close_ok)
  end
  nil
end

#closed?Boolean

True if the connection is closed

Returns:

  • (Boolean)


150
151
152
# File 'lib/amqp/client/connection.rb', line 150

def closed?
  !@closed.nil?
end

#on_blocked {|String| ... } ⇒ nil

Callback called when client is blocked by the broker

Yields:

  • (String)

    reason to why the connection is being blocked

Returns:

  • (nil)


159
160
161
162
# File 'lib/amqp/client/connection.rb', line 159

def on_blocked(&blk)
  @on_blocked = blk
  nil
end

#on_unblocked { ... } ⇒ nil

Callback called when client is unblocked by the broker

Yields:

Returns:

  • (nil)


167
168
169
170
# File 'lib/amqp/client/connection.rb', line 167

def on_unblocked(&blk)
  @on_unblocked = blk
  nil
end

#read_loopnil

Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.

Returns:

  • (nil)


191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/amqp/client/connection.rb', line 191

def read_loop
  # read more often than write so that channel errors crop up early
  Thread.current.priority += 1
  socket = @socket
  frame_max = @frame_max
  frame_start = String.new(capacity: 7)
  frame_buffer = String.new(capacity: frame_max)
  loop do
    socket.read(7, frame_start) || raise(IOError)
    type, channel_id, frame_size = frame_start.unpack("C S> L>")
    frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")

    # read the frame content
    socket.read(frame_size, frame_buffer) || raise(IOError)

    # make sure that the frame end is correct
    frame_end = socket.readchar.ord
    raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206

    # parse the frame, will return false if a close frame was received
    parse_frame(type, channel_id, frame_buffer) || return
  end
  nil
rescue *READ_EXCEPTIONS => e
  @closed ||= [400, "read error: #{e.message}"]
  nil # ignore read errors
ensure
  @closed ||= [400, "unknown"]
  @replies.close
  begin
    if @write_lock.owned? # if connection is blocked
      @socket.close
    else
      @write_lock.synchronize do
        @socket.close
      end
    end
  rescue *READ_EXCEPTIONS
    nil
  end
end

#update_secret(secret, reason) ⇒ nil

Update authentication secret, for example when an OAuth backend is used

Parameters:

  • secret (String)

    The new secret

  • reason (String)

    A reason to update it

Returns:

  • (nil)


142
143
144
145
146
# File 'lib/amqp/client/connection.rb', line 142

def update_secret(secret, reason)
  write_bytes FrameBytes.update_secret(secret, reason)
  expect(:update_secret_ok)
  nil
end

#with_channel {|Channel| ... } ⇒ Object

Declare a new channel, yield, and then close the channel

Yields:

Returns:

  • (Object)

    Whatever was returned by the block



111
112
113
114
115
116
117
118
# File 'lib/amqp/client/connection.rb', line 111

def with_channel
  ch = channel
  begin
    yield ch
  ensure
    ch.close
  end
end