Class: AMQP::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb

Overview

Represents a single established AMQP connection

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Callbacks collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, **options) ⇒ Connection

Establish a connection to an AMQP broker

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • read_loop_thread (Boolean) (defaults to: true)

    If true run #read_loop in a background thread, otherwise the user have to run it explicitly, without #read_loop the connection won’t function

  • codec_registry (MessageCodecRegistry) (defaults to: nil)

    Registry for message codecs

  • strict_coding (Boolean) (defaults to: false)

    Whether to raise errors on unsupported codecs

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • connect_timeout (Float) — default: 30

    TCP connection timeout

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.

  • keepalive (String) — default: 60:10:3

    TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/amqp/client/connection.rb', line 31

def initialize(uri = "", read_loop_thread: true, codec_registry: nil, strict_coding: false, **options)
  uri = URI.parse(uri)
  tls = uri.scheme == "amqps"
  port = port_from_env || uri.port || (tls ? 5671 : 5672)
  host = uri.host || "localhost"
  user = uri.user || "guest"
  password = uri.password || "guest"
  vhost = URI.decode_www_form_component(uri.path[1..] || "/")
  options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)

  socket = open_socket(host, port, tls, options)
  channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options)

  @socket = socket
  @channel_max = channel_max.zero? ? 65_536 : channel_max
  @frame_max = frame_max
  @heartbeat = heartbeat
  @codec_registry = codec_registry
  @strict_coding = strict_coding
  @channels = {}
  @channels_lock = Mutex.new
  @closed = nil
  @replies = ::Queue.new
  @write_lock = Mutex.new
  @blocked = nil
  @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" }
  @on_unblocked = -> { warn "AMQP-Client unblocked by broker" }

  # Only used with heartbeats
  @last_activity_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  Thread.new { read_loop } if read_loop_thread
end

Instance Attribute Details

#codec_registryMessageCodecRegistry? (readonly)

The codec registry for message encoding/decoding

Returns:



88
89
90
# File 'lib/amqp/client/connection.rb', line 88

def codec_registry
  @codec_registry
end

#frame_maxInteger (readonly)

The max frame size negotiated between the client and the broker

Returns:

  • (Integer)


84
85
86
# File 'lib/amqp/client/connection.rb', line 84

def frame_max
  @frame_max
end

#strict_codingBoolean (readonly)

Whether to use strict coding (raise errors on unsupported codecs)

Returns:

  • (Boolean)


92
93
94
# File 'lib/amqp/client/connection.rb', line 92

def strict_coding
  @strict_coding
end

Class Method Details

.connect(uri, read_loop_thread: true) ⇒ Object

Deprecated.

Alias for #initialize

See Also:



78
79
80
# File 'lib/amqp/client/connection.rb', line 78

def self.connect(uri, read_loop_thread: true, **)
  new(uri, read_loop_thread:, **)
end

Instance Method Details

#blocked?Bool

Indicates that the server is blocking publishes. If the client keeps publishing the server will stop reading from the socket. Use the #on_blocked callback to get notified when the server is resource constrained.

Returns:

  • (Bool)

See Also:



71
72
73
# File 'lib/amqp/client/connection.rb', line 71

def blocked?
  !@blocked.nil?
end

#channel(id = nil) ⇒ Channel

Open an AMQP channel

Parameters:

  • id (Integer, nil) (defaults to: nil)

    If nil a new channel will be opened, otherwise an already open channel might be reused

Returns:

Raises:

  • (ArgumentError)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/amqp/client/connection.rb', line 104

def channel(id = nil)
  raise ArgumentError, "Channel ID cannot be 0" if id&.zero?

  raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max

  ch = @channels_lock.synchronize do
    if id
      @channels[id] ||= Channel.new(self, id)
    else
      1.upto(@channel_max) do |i|
        break id = i unless @channels.key? i
      end
      raise Error, "Max channels reached" if id.nil?

      @channels[id] = Channel.new(self, id)
    end
  end
  ch.open
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Parameters:

  • reason (String) (defaults to: "")

    A reason to close the connection can be logged by the broker

  • code (Integer) (defaults to: 200)

Returns:

  • (nil)


140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/amqp/client/connection.rb', line 140

def close(reason: "", code: 200)
  return if @closed

  @closed = [code, reason]
  @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  if @blocked
    @socket.close
  else
    write_bytes FrameBytes.connection_close(code, reason)
    expect(:close_ok)
  end
  nil
end

#closed?Boolean

True if the connection is closed

Returns:

  • (Boolean)


166
167
168
# File 'lib/amqp/client/connection.rb', line 166

def closed?
  !@closed.nil?
end

#on_blocked {|String| ... } ⇒ nil

Callback called when client is blocked by the broker

Yields:

  • (String)

    reason to why the connection is being blocked

Returns:

  • (nil)


175
176
177
178
# File 'lib/amqp/client/connection.rb', line 175

def on_blocked(&blk)
  @on_blocked = blk
  nil
end

#on_unblocked { ... } ⇒ nil

Callback called when client is unblocked by the broker

Yields:

Returns:

  • (nil)


183
184
185
186
# File 'lib/amqp/client/connection.rb', line 183

def on_unblocked(&blk)
  @on_unblocked = blk
  nil
end

#read_loopnil

Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.

Returns:

  • (nil)


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/amqp/client/connection.rb', line 208

def read_loop
  # read more often than write so that channel errors crop up early
  Thread.current.priority += 1
  socket = @socket
  frame_max = @frame_max
  frame_start = String.new(capacity: 7)
  frame_buffer = String.new(capacity: frame_max)
  loop do
    socket.read(7, frame_start) || raise(IOError)
    type, channel_id, frame_size = frame_start.unpack("C S> L>")
    frame_max >= frame_size ||
      raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")

    # read the frame content
    socket.read(frame_size, frame_buffer) || raise(IOError)

    # make sure that the frame end is correct
    frame_end = socket.readchar.ord
    raise Error::UnexpectedFrameTypeEnd, frame_end if frame_end != 206

    # parse the frame, will return false if a close frame was received
    parse_frame(type, channel_id, frame_buffer) || return
    update_last_activity
  end
  nil
rescue *READ_EXCEPTIONS => e
  @closed ||= [400, "read error: #{e.message}"]
  nil # ignore read errors
ensure
  @closed ||= [400, "unknown"]
  @replies.close
  begin
    if @write_lock.owned? # if connection is blocked
      @socket.close
    else
      @write_lock.synchronize do
        @socket.close
      end
    end
  rescue *READ_EXCEPTIONS
    nil
  end
end

#update_secret(secret, reason:) ⇒ nil

Update authentication secret, for example when an OAuth backend is used

Parameters:

  • secret (String)

    The new secret

  • reason (String)

    A reason to update it

Returns:

  • (nil)


158
159
160
161
162
# File 'lib/amqp/client/connection.rb', line 158

def update_secret(secret, reason:)
  write_bytes FrameBytes.update_secret(secret, reason)
  expect(:update_secret_ok)
  nil
end

#with_channel {|Channel| ... } ⇒ Object

Declare a new channel, yield, and then close the channel

Yields:

Returns:

  • (Object)

    Whatever was returned by the block



127
128
129
130
131
132
133
134
# File 'lib/amqp/client/connection.rb', line 127

def with_channel
  ch = channel
  begin
    yield ch
  ensure
    ch.close
  end
end