Class: AMQP::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb

Overview

Represents a single established AMQP connection

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", read_loop_thread: true, **options) ⇒ Connection

Establish a connection to an AMQP broker

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • read_loop_thread (Boolean) (defaults to: true)

    If true run #read_loop in a background thread, otherwise the user have to run it explicitly, without #read_loop the connection won’t function

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • connect_timeout (Integer) — default: 30

    TCP connection timeout

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.

  • keepalive (String) — default: 60:10:3

    TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/amqp/client/connection.rb', line 29

def initialize(uri = "", read_loop_thread: true, **options)
  uri = URI.parse(uri)
  tls = uri.scheme == "amqps"
  port = port_from_env || uri.port || (tls ? 5671 : 5672)
  host = uri.host || "localhost"
  user = uri.user || "guest"
  password = uri.password || "guest"
  vhost = URI.decode_www_form_component(uri.path[1..] || "/")
  options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)

  socket = open_socket(host, port, tls, options)
  channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options)

  @socket = socket
  @channel_max = channel_max.zero? ? 65_536 : channel_max
  @frame_max = frame_max
  @heartbeat = heartbeat
  @channels = {}
  @closed = nil
  @replies = ::Queue.new
  @write_lock = Mutex.new
  @blocked = nil
  Thread.new { read_loop } if read_loop_thread
end

Instance Attribute Details

#frame_maxInteger (readonly)

The max frame size negotiated between the client and the broker

Returns:

  • (Integer)


63
64
65
# File 'lib/amqp/client/connection.rb', line 63

def frame_max
  @frame_max
end

Class Method Details

.connect(uri, read_loop_thread: true, **options) ⇒ Object

Deprecated.

Alias for #initialize

See Also:



57
58
59
# File 'lib/amqp/client/connection.rb', line 57

def self.connect(uri, read_loop_thread: true, **options)
  new(uri, read_loop_thread: read_loop_thread, **options)
end

Instance Method Details

#channel(id = nil) ⇒ Channel

Open an AMQP channel

Parameters:

  • id (Integer, nil) (defaults to: nil)

    If nil a new channel will be opened, otherwise an already open channel might be reused

Returns:

Raises:

  • (ArgumentError)


75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/amqp/client/connection.rb', line 75

def channel(id = nil)
  raise ArgumentError, "Channel ID cannot be 0" if id&.zero?
  raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max

  if id
    ch = @channels[id] ||= Channel.new(self, id)
  else
    1.upto(@channel_max) do |i|
      break id = i unless @channels.key? i
    end
    raise Error, "Max channels reached" if id.nil?

    ch = @channels[id] = Channel.new(self, id)
  end
  ch.open
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Parameters:

  • reason (String) (defaults to: "")

    A reason to close the connection can be logged by the broker

  • code (Integer) (defaults to: 200)

Returns:

  • (nil)


108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/amqp/client/connection.rb', line 108

def close(reason: "", code: 200)
  return if @closed

  @closed = [code, reason]
  @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  if @blocked
    @socket.close
  else
    write_bytes FrameBytes.connection_close(code, reason)
    expect(:close_ok)
  end
  nil
end

#closed?Boolean

True if the connection is closed

Returns:

  • (Boolean)


124
125
126
# File 'lib/amqp/client/connection.rb', line 124

def closed?
  !@closed.nil?
end

#read_loopnil

Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.

Returns:

  • (nil)


152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/amqp/client/connection.rb', line 152

def read_loop
  # read more often than write so that channel errors crop up early
  Thread.current.priority += 1
  socket = @socket
  frame_max = @frame_max
  frame_start = String.new(capacity: 7)
  frame_buffer = String.new(capacity: frame_max)
  loop do
    socket.read(7, frame_start) || raise(IOError)
    type, channel_id, frame_size = frame_start.unpack("C S> L>")
    frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")

    # read the frame content
    socket.read(frame_size, frame_buffer) || raise(IOError)

    # make sure that the frame end is correct
    frame_end = socket.readchar.ord
    raise UnexpectedFrameEnd, frame_end if frame_end != 206

    # parse the frame, will return false if a close frame was received
    parse_frame(type, channel_id, frame_buffer) || return
  end
  nil
rescue *READ_EXCEPTIONS => e
  @closed ||= [400, "read error: #{e.message}"]
  nil # ignore read errors
ensure
  @closed ||= [400, "unknown"]
  @replies.close
  begin
    @write_lock.synchronize do
      @socket.close
    end
  rescue *READ_EXCEPTIONS
    nil
  end
end

#with_channel {|Channel| ... } ⇒ Object

Declare a new channel, yield, and then close the channel

Yields:

Returns:

  • (Object)

    Whatever was returned by the block



95
96
97
98
99
100
101
102
# File 'lib/amqp/client/connection.rb', line 95

def with_channel
  ch = channel
  begin
    yield ch
  ensure
    ch.close
  end
end