Kazoo AMQP
- Maintaining connections to brokers
- Creating exchanges, queues, bindings
- Publishing messages
- gen_listener behaviour for message reception
Erlang App Architecture
The application’s main supervisor, kz_amqp_sup
starts the following children:
[?WORKER('kz_amqp_connections')
,?SUPER('kz_amqp_connection_sup')
,?SUPER('kz_amqp_federated_listeners_sup')
,?WORKER('kz_amqp_assignments')
,?WORKER('kz_amqp_bootstrap')
]
kz_amqp_connections
Maintains an ETS table for AMQP connections.
kz_amqp_connection_sup
Supervisor for processes that manage a single AMQP connection.
kz_amqp_connection
Starts a connection process.
On init, assumes the connection is disconnected.
When disconnecting
- If the connection is available, flags the connection as unavailable in
kz_amqp_connections
- If the connection has a channel reference (a monitor ref to the amqp_client’s channel process), demonitor the ref
- If the connection has a channel process (the amqp_client channel process), send that PID a
close
message - If the connection has a connection reference (a monitor ref tot he amqp_client’s connection process), demonitor the ref
- If the connection has a connection process (the amqp_client connection process, close the connection
- If the connection pre-initializes channels, set to false (not pre-initialized yet)
- Spawn and monitor a reconnect PID to send a
connect
message to thekz_amqp_connection
process.
When connecting
- If the connection is not available, try to start (via
amqp_connection:start/1
) the AMQP connection. a. If a failure occurs, go todisconnected/2
(seeWhen Disconnecting
) - Monitor the amqp_client connection PID
- Register as the blocked handler (for
connection.blocked
andconnection.unblocked
messages) - Cancel the reconnect ref (see
When disconnecting #7
) - Create the control channel a. set self as the default consumer of the channel b. monitor the amqp_client channel PID
- Declare any pre-defined AMQP exchanges
- Mark connection as available in
kz_amqp_connections
- Initialize pre-channels
a. Add each channel to
kz_amqp_assignments
kz_amqp_federated_listeners_sup
Manages listener_federator
processes (which are gen_listeners
) that connect to federated (non-local) brokers.
kz_amqp_assignments
Creates an ETS table to track AMQP channel assignments.
An assignment tracks:
- The timstamp of the assignment
- The consumer PID (typically the KAZOO PID, like a
gen_listener
process) - The KAZOO application of the consumer PID (if possible)
- The
type
of channelfloat
channels will be created on whichever broker is available at the timesticky
channels will only be created on the specified broker when it is available
- The
amqp_client
channel PID - The
amqp_client
connection PID - The
broker
URI - The
assigned
timestamp reconnect
boolean if reconnecting the assignmentwatchers
- set of consumer processes waiting on a channel, notified when they get assigned a channel
kz_amqp_bootstrap
- Reads AMQP config from config.ini
- For each {Zone, Brokers} section, ask
kz_amqp_connections
to add{Broker, Zone}
connections - Block in
init/1
untilkz_amqp_connections:wait_for_available/0
returns a. This waits until a primary broker (zone = ‘local’) is connected - Create the
targeted
exchange