Class: AMQP::Client::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp/client/connection.rb,
lib/amqp/client/channel.rb

Overview

Represents a single established AMQP connection

Defined Under Namespace

Classes: Channel

Instance Attribute Summary collapse

Callbacks collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = "", read_loop_thread: true, **options) ⇒ Connection

Establish a connection to an AMQP broker

Parameters:

  • uri (String) (defaults to: "")

    URL on the format amqp://username:password@hostname/vhost, use amqps:// for encrypted connection

  • read_loop_thread (Boolean) (defaults to: true)

    If true run #read_loop in a background thread, otherwise the user have to run it explicitly, without #read_loop the connection won’t function

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • connection_name (Boolean) — default: PROGRAM_NAME

    Set a name for the connection to be able to identify the client from the broker

  • verify_peer (Boolean) — default: true

    Verify broker’s TLS certificate, set to false for self-signed certs

  • connect_timeout (Integer) — default: 30

    TCP connection timeout

  • heartbeat (Integer) — default: 0

    Heartbeat timeout, defaults to 0 and relies on TCP keepalive instead

  • frame_max (Integer) — default: 131_072

    Maximum frame size, the smallest of the client’s and the broker’s values will be used

  • channel_max (Integer) — default: 2048

    Maxium number of channels the client will be allowed to have open. Maxium allowed is 65_536. The smallest of the client’s and the broker’s value will be used.

  • keepalive (String) — default: 60:10:3

    TCP keepalive setting, 60s idle, 10s interval between probes, 3 probes



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/amqp/client/connection.rb', line 29

def initialize(uri = "", read_loop_thread: true, **options)
  uri = URI.parse(uri)
  tls = uri.scheme == "amqps"
  port = port_from_env || uri.port || (tls ? 5671 : 5672)
  host = uri.host || "localhost"
  user = uri.user || "guest"
  password = uri.password || "guest"
  vhost = URI.decode_www_form_component(uri.path[1..] || "/")
  options = URI.decode_www_form(uri.query || "").map! { |k, v| [k.to_sym, v] }.to_h.merge(options)

  socket = open_socket(host, port, tls, options)
  channel_max, frame_max, heartbeat = establish(socket, user, password, vhost, options)

  @socket = socket
  @channel_max = channel_max.zero? ? 65_536 : channel_max
  @frame_max = frame_max
  @heartbeat = heartbeat
  @channels = {}
  @closed = nil
  @replies = ::Queue.new
  @write_lock = Mutex.new
  @blocked = nil
  @on_blocked = ->(reason) { warn "AMQP-Client blocked by broker: #{reason}" }
  @on_unblocked = -> { warn "AMQP-Client unblocked by broker" }

  Thread.new { read_loop } if read_loop_thread
end

Instance Attribute Details

#frame_maxInteger (readonly)

The max frame size negotiated between the client and the broker

Returns:

  • (Integer)


66
67
68
# File 'lib/amqp/client/connection.rb', line 66

def frame_max
  @frame_max
end

Class Method Details

.connect(uri, read_loop_thread: true, **options) ⇒ Object

Deprecated.

Alias for #initialize

See Also:



60
61
62
# File 'lib/amqp/client/connection.rb', line 60

def self.connect(uri, read_loop_thread: true, **options)
  new(uri, read_loop_thread: read_loop_thread, **options)
end

Instance Method Details

#channel(id = nil) ⇒ Channel

Open an AMQP channel

Parameters:

  • id (Integer, nil) (defaults to: nil)

    If nil a new channel will be opened, otherwise an already open channel might be reused

Returns:

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/amqp/client/connection.rb', line 78

def channel(id = nil)
  raise ArgumentError, "Channel ID cannot be 0" if id&.zero?
  raise ArgumentError, "Channel ID higher than connection's channel max #{@channel_max}" if id && id > @channel_max

  if id
    ch = @channels[id] ||= Channel.new(self, id)
  else
    1.upto(@channel_max) do |i|
      break id = i unless @channels.key? i
    end
    raise Error, "Max channels reached" if id.nil?

    ch = @channels[id] = Channel.new(self, id)
  end
  ch.open
end

#close(reason: "", code: 200) ⇒ nil

Gracefully close a connection

Parameters:

  • reason (String) (defaults to: "")

    A reason to close the connection can be logged by the broker

  • code (Integer) (defaults to: 200)

Returns:

  • (nil)


111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/amqp/client/connection.rb', line 111

def close(reason: "", code: 200)
  return if @closed

  @closed = [code, reason]
  @channels.each_value { |ch| ch.closed!(:connection, code, reason, 0, 0) }
  if @blocked
    @socket.close
  else
    write_bytes FrameBytes.connection_close(code, reason)
    expect(:close_ok)
  end
  nil
end

#closed?Boolean

True if the connection is closed

Returns:

  • (Boolean)


127
128
129
# File 'lib/amqp/client/connection.rb', line 127

def closed?
  !@closed.nil?
end

#on_blocked {|String| ... } ⇒ nil

Callback called when client is blocked by the broker

Yields:

  • (String)

    reason to why the connection is being blocked

Returns:

  • (nil)


136
137
138
139
# File 'lib/amqp/client/connection.rb', line 136

def on_blocked(&blk)
  @on_blocked = blk
  nil
end

#on_unblocked { ... } ⇒ nil

Callback called when client is unblocked by the broker

Yields:

Returns:

  • (nil)


144
145
146
147
# File 'lib/amqp/client/connection.rb', line 144

def on_unblocked(&blk)
  @on_unblocked = blk
  nil
end

#read_loopnil

Reads from the socket, required for any kind of progress. Blocks until the connection is closed. Normally run as a background thread automatically.

Returns:

  • (nil)


168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/amqp/client/connection.rb', line 168

def read_loop
  # read more often than write so that channel errors crop up early
  Thread.current.priority += 1
  socket = @socket
  frame_max = @frame_max
  frame_start = String.new(capacity: 7)
  frame_buffer = String.new(capacity: frame_max)
  loop do
    socket.read(7, frame_start) || raise(IOError)
    type, channel_id, frame_size = frame_start.unpack("C S> L>")
    frame_max >= frame_size || raise(Error, "Frame size #{frame_size} larger than negotiated max frame size #{frame_max}")

    # read the frame content
    socket.read(frame_size, frame_buffer) || raise(IOError)

    # make sure that the frame end is correct
    frame_end = socket.readchar.ord
    raise Error::UnexpectedFrameEnd, frame_end if frame_end != 206

    # parse the frame, will return false if a close frame was received
    parse_frame(type, channel_id, frame_buffer) || return
  end
  nil
rescue *READ_EXCEPTIONS => e
  @closed ||= [400, "read error: #{e.message}"]
  nil # ignore read errors
ensure
  @closed ||= [400, "unknown"]
  @replies.close
  begin
    if @write_lock.owned? # if connection is blocked
      @socket.close
    else
      @write_lock.synchronize do
        @socket.close
      end
    end
  rescue *READ_EXCEPTIONS
    nil
  end
end

#with_channel {|Channel| ... } ⇒ Object

Declare a new channel, yield, and then close the channel

Yields:

Returns:

  • (Object)

    Whatever was returned by the block



98
99
100
101
102
103
104
105
# File 'lib/amqp/client/connection.rb', line 98

def with_channel
  ch = channel
  begin
    yield ch
  ensure
    ch.close
  end
end