Exchange and Queue Federation
This guide covers various topics related to cluster federation, both of exchanges and queues:
- Federation overview
- What does federation do?
- Getting started
- A basic example
- Federation connections
- Federating clusters
- Federation support for TLS
- Monitoring federation link status
- Troubleshooting
Overview
The high-level goal of the Federation plugin is to replicate or move messages between brokers that have the plugin enabled but do not belong to the same cluster. This is useful for a number of reasons.
Loose Coupling of Nodes or Clusters
The federation plugin can transmit messages between brokers (or clusters) in different administrative domains:
- they may be hosted in different data centers, potentially on different continents
- they may have different users, virtual hosts, permissions and purpose
- they may run on different versions of RabbitMQ and Erlang
- they may be of different sizes
WAN friendliness
The federation plugin communication is entirely asynchronous and assumes that connections between clusters will fail from time to time. So it tolerates intermittent connectivity well and does not create coupling between remote clusters (in terms of availability).
Specificity
A broker can contain federated and local-only components to best fit the desired architecture of the system.
Scalability with Growing Connected Node Count
Federation does not require O(n2) connections between N brokers (although this is the easiest way to set things up).
What Does It Do?
The federation plugin make it possible to federate exchanges and queues. A federated exchange or queue can receive messages from one or more remote clusters called upstreams (to be more precise: exchanges and queues that exist in remote clusters).
A federated exchange will "replay" a stream of messages published to its upstream counterpart, and publish them to a local queue or stream.
A federated queue lets a local consumer receive messages from an upstream queue when the remote queue itself does not have any local consumers online.
Federation links connect to upstreams largely the same way an application would. Therefore they can connect to a specific vhost, use TLS, use multiple authentication mechanisms.
Typically, federation is used to connect remote clusters. However, it can also be used to move data between virtual hosts within the same cluster.
Federation documentation is organized as a number of more focussed guides:
- Exchange federation: for replicating a flow of messages through an exchange to a remote cluster
- Queue federation: to create a "logical queue" across N clusters that will move messages where consumers are (if there are no local consumers)
- Federation settings reference
How is Federation Set Up?
Two steps are involved in setting up federation:
- First, one or more upstreams must be defined. They provide federation with information about how to connect to other nodes. This can be done via runtime parameters or the federation management plugin which adds a federation management tab to the management UI.
- To enable federation, one or more policies that match exchanges or queues must be declared. The policy will make the matched objects (e.g. exchanges) federated, and one federation link (connection to other nodes) will be started for every match
Getting Started
The federation plugin is included in the RabbitMQ distribution. To enable it, use rabbitmq-plugins:
rabbitmq-plugins enable rabbitmq_federation
If management UI is used, it is recommended that
rabbitmq_federation_management is also enabled:
rabbitmq-plugins enable rabbitmq_federation_management
When using a federation in a cluster, all the nodes of the cluster should have the federation plugin enabled.
Information about federation upstreams is stored in the RabbitMQ database, along with users, permissions, queues, etc. There are three levels of configuration involved in federation:
- Upstreams: each upstream defines a remote connection endpoint.
- Upstream sets: each upstream set groups together a set of upstreams to use for federation.
- Policies: each policy selects a set of exchanges, queues or both, and applies a single upstream or an upstream set to those objects.
In practice, for simple use cases you can almost ignore the
existence of upstream sets, since there is an implicitly-defined upstream set called all
to which all upstreams are added.
Upstreams and upstream sets are both defined using runtime parameters. Like exchanges and queues, each virtual host has its own distinct set of parameters and policies. For more generic information on parameters and policies, see the guide on parameters and policies. For full details on the parameters used by federation, see the federation reference.
Parameters and policies can be set in three ways:
- Using CLI tools
- In the management UI if an extension plugin (rabbitmq_federation_management) is enabled
- Using the HTTP API
The HTTP API has a limitation: it does not support management of upstream sets.
A Basic Example
Here we will federate all the built-in exchanges except for the default exchange, with a single upstream. The upstream will be defined to buffer messages when disconnected for up to one hour (3600000ms).
To define an upstream, use one of the following examples, one per tab:
- rabbitmqctl with bash
- rabbitmqadmin with bash
- rabbitmqctl with PowerShell
- rabbitmqadmin.exe with PowerShell
- Management UI
- HTTP API
# target.hostname is just an example, replace it with a URI
# of the target node (usually a member of a remote node/cluster,
# or a URI that connects to a different virtual host within the same cluster)
rabbitmqctl set_parameter federation-upstream my-upstream \
    '{"uri":"amqp://target.hostname","expires":3600000}'
# target.hostname is just an example, replace it with a URI
# of the target node (usually a member of a remote node/cluster,
# or a URI that connects to a different virtual host within the same cluster)
rabbitmqadmin federation declare_upstream --name my-upstream \
    --uri "amqp://target.hostname" \
    --ttl 3600000
# target.hostname is just an example, replace it with a URI
# of the target node (usually a member of a remote node/cluster,
# or a URI that connects to a different virtual host within the same cluster)
rabbitmqctl.bat set_parameter federation-upstream my-upstream `
    '"{""uri"":""amqp://target.hostname"",""expires"":3600000}"'
# target.hostname is just an example, replace it with a URI
# of the target node (usually a member of a remote node/cluster,
# or a URI that connects to a different virtual host within the same cluster)
rabbitmqadmin.exe federation declare_upstream --name my-upstream ^
    --uri "amqp://target.hostname" ^
    --ttl 3600000
Navigate to Admin > Federation Upstreams >
Add a new upstream. Enter "my-upstream" next to Name,
"amqp://target.hostname" next to URI, and 36000000 next to
Expiry. Click Add upstream.
PUT /api/parameters/federation-upstream/%2f/my-upstream
{"value":{"uri":"amqp://target.hostname","expires":3600000}}
Then define a policy that will match built-in exchanges and use this upstream:
- rabbitmqctl with bash
- rabbitmqadmin with bash
- rabbitmqctl with PowerShell
- rabbitmqadmin.exe with PowerShell
- Management UI
- HTTP API
rabbitmqctl set_policy --apply-to exchanges federate-me "^amq\." \
    '{"federation-upstream-set":"all"}'
rabbitmqadmin policies declare \
    --name "federate-me" \
    --pattern "^amq\." \
    --definition '{"federation-upstream-set":"all"}' \
    --apply-to "exchanges"
rabbitmqctl.bat set_policy --apply-to exchanges federate-me "^amq\." `
    '"{""federation-upstream-set"":""all""}"'
rabbitmqadmin.exe policies declare ^
    --name "federate-me" ^
    --pattern "^amq\." ^
    --definition "{""federation-upstream-set"":""all""}" ^
    --apply-to "exchanges"
Navigate to Admin > Policies > Add / update a policy.
Enter "federate-me" next to "Name", "^amq." next to
"Pattern", choose "Exchanges" from the "Apply to" drop down list
and enter "federation-upstream-set" = "all"
in the first line next to "Policy". Click "Add" policy.
PUT /api/policies/%2f/federate-me
{"pattern":"^amq\.", "definition":{"federation-upstream-set":"all"}, "apply-to":"exchanges"}
The defined policy will make the exchanges _whose names begin with "amq." (all the built-in exchanges except for the default one) with (implicit) low priority, and to federate them using the implicitly created upstream set "all", which includes our newly-created upstream.
Any other matching policy with a priority greater than 0 will take
precedence over this policy. Keep in mind that federate-me
is just a name we used for this example, you can use any
string you want there.
The built in exchanges should now be federated because they are matched by the policy. You can check that the policy has applied to the exchanges by checking the exchanges list in management or with:
rabbitmqctl list_exchanges name policy | grep federate-me
And you can check that federation links for each exchange have come up with Admin > Federation Status > Running Links or with:
# This command will be available only if federation plugin is enabled
rabbitmqctl federation_status
In general there will be one federation link for each upstream that is applied to an exchange. So for example with three exchanges and two upstreams for each there will be six links.
For simple use this should be all you need - you will probably want to look at the AMQP URI reference.
The federation reference contains more details on upstream parameters and upstream sets.
Federation Connection (Link) Failures
Inter-node connections used by Federation are based on AMQP 0-9-1 connections. Federation links can be treated as special kind of clients by operators.
Should a link fail, e.g. due to a network interruption, it will attempt to re-connect. Reconnection period is a configurable value that's defined in upstream definition. See federation reference for more details on setting up upstreams and upstream sets.
Links generally try to recover ad infinitum but there are scenarios when they give up:
- Failure rate is too high (max tolerated rate depends on
upstream's reconnect-delaybut is generally a failure every few seconds by default).
- Link no longer can locate its "source" queue or exchange.
- Policy changes in such a way that a link considers itself no longer necessary.
By increasing reconnect-delay for upstreams it is possible
to tolerate higher link failure rates. This is primarily relevant
for RabbitMQ installations where a moderate or large number of active links.
Federating Clusters
Clusters can be linked together with federation just as single brokers can. To summarise how clustering and federation interact:
- You can define policies and parameters on any node in the downstream cluster; once defined on one node they will apply on all nodes.
- Exchange federation links will start on any node in the downstream cluster. They will fail over to other nodes if the node they are running on crashes or stops.
- Queue federation links will start on the same node as the downstream queue. If the downstream queue is a replicated one, they will start on the same node as the leader, and will be recreated on the same node as the new leader after any future leader elections.
- To connect to an upstream cluster, you can specify multiple URIs in a single upstream. The federation link process will choose one of these URIs at random each time it attempts to connect.
Securing Federation Connections with TLS
Starting with Erlang 26, TLS client peer verification is enabled by default by the TLS implementation.
If client TLS certificate and key pair is not configured, TLS-enabled Federation links will fail to connect. A certificate (public key) and private key pair must be configured for TLS-enabled Federation links that need to use peer verification.
If peer verification is not necessary, it can be disabled.
Federation connections (links) can be secured with TLS. Because Federation uses a RabbitMQ client under the hood, it is necessary to both configure the target broker to listen for TLS connections and Federation to use TLS.
To configure Federation to use TLS, one needs to
- In upstream URI, use the amqpsfor scheme instead ofamqpand port5671instead of5672(assuming the default port is used but the port specified explicitly)
- In the same upstream URI, specify CA certificate and client certificate/key pair, as well as other parameters (namely enable or disable peer verification, peer verification depth) via URI query parameters
- Optionally, configure TLS-related settings or defaults common for all links (plus, optionally, Shovel) via the Erlang client settings
In the following example the upstream URI is modified to use TLS with a client certificate (public key) and private key pair but with peer verification disabled (for simplicity, it is encouraged for production use):
- rabbitmqctl with bash
- rabbitmqadmin with bash
- rabbitmqctl with PowerShell
- rabbitmqadmin.exe with PowerShell
- Management UI
- HTTP API
# Note the TLS-related settings in the upstream URI field
rabbitmqctl set_parameter federation-upstream my-upstream \
    '{"uri":"amqps://target.hostname:5671?cacertfile=/path/to/ca_bundle.pem&certfile=/path/to/client_certificate.pem&keyfile=/path/to/client_key.pem&verify=verify_none","expires":3600000}'
# Note the TLS-related settings in the upstream URI field
rabbitmqadmin federation declare_upstream --name my-upstream \
    --uri "amqps://target.hostname:5671?cacertfile=/path/to/ca_bundle.pem&certfile=/path/to/client_certificate.pem&keyfile=/path/to/client_key.pem&verify=verify_none" \
    --ttl 3600000
#
rabbitmqctl.bat set_parameter federation-upstream my-upstream `
    '"{""uri"":""amqps://target.hostname:5671?cacertfile=drive:\path\to\ca_bundle.pem&certfile=drive:\path\to\client_certificate.pem&keyfile=drive:\path\to\client_key.pem&verify=verify_none"",""expires"":3600000}"'
# Note the TLS-related settings in the upstream URI field
rabbitmqadmin.exe federation declare_upstream --name my-upstream ^
    --uri "amqps://target.hostname:5671?cacertfile=drive:\path\to\ca_bundle.pem&certfile=drive:\path\to\client_certificate.pem&keyfile=drive:\path\to\client_key.pem&verify=verify_none" ^
    --ttl 3600000
Navigate to Admin > Federation Upstreams >
Add a new upstream. Enter "my-upstream" next to Name, paste
"amqps://target.hostname:5671?cacertfile=/path/to/ca_bundle.pem&certfile=/path/to/client_certificate.pem&keyfile=/path/to/client_key.pem&verify=verify_none" for URI,
then enter 36000000 next to Expiry.
Click Add upstream.
PUT /api/parameters/federation-upstream/%2f/my-upstream
{"value":{"uri":"amqps://target.hostname:5671?cacertfile=/path/to/ca_bundle.pem&certfile=/path/to/client_certificate.pem&keyfile=/path/to/client_key.pem&verify=verify_none","expires":3600000}}
These examples use a URI with four additional URI query parameters:
- cacertfile: the CA certificate bundle file that includes one or more CA certificates that were used to sign the client certificate and private key pair
- certfile: the client certificate (public key)
- keyfile: the client private key
- verify: controls peer verification (in this specific example, disables it)
Just like with "regular" client connections, if TLS-enabled federation links need to perform peer verification then server's CA must be trusted on the node where federation link(s) runs, and vice versa.
Federation Link Monitoring
Each combination of federated exchange or queue and upstream needs a
link to run. This is the process that retrieves messages from upstream
and republishes them downstream. You can monitor the status of
federation links using rabbitmqctl and the management
plugin.
Using CLI Tools
Federation link status can be inspected using RabbitMQ CLI tools.
Invoke:
# This command will be available only if federation plugin is enabled
rabbitmqctl federation_status
This will output a list of federation links running on the target node (not cluster-wide). It contains the following keys:
| Parameter Name | Description | 
| type | 
 | 
| name | the name of the federated exchange or queue | 
| vhost | the virtual host containing the federated exchange or queue | 
| upstream_name | the name of the upstream this link is connected to | 
| status | status of the link: 
 | 
| connection | the name of the connection for this link (from config) | 
| timestamp | time stamp of the last status update | 
Here's an example:
# This command will be available only if federation plugin is enabled
rabbitmqctl federation_status
# => [[{type,<<"exchange">>},
# =>   {name,<<"my-exchange">>},
# =>   {vhost,<<"/">>},
# =>   {connection,<<"upstream-server">>},
# =>   {upstream_name,<<"my-upstream-x">>},
# =>   {status,{running,<<"<rabbit@my-server.1.281.0>">>}},
# =>   {timestamp,{{2020,3,1},{12,3,28}}}]]
# => ...done.
Using the Management UI
Enable the rabbitmq_federation_management plugin that extends
management UI with a new page that displays federation links in the cluster.
It can be found under Admin > Federation Status, or by using the
GET /api/federation-links HTTP API endpoint.
Troubleshooting
Federation Links Do Not Start
Federation links are started when
- There is a configured upstream (or a set of upstreams)
- There is a policy that matches some exchanges or queues
- Federation can connect to the target upstream
Therefore, in order to narrow down the problem, the recommended steps are:
- Inspect federation upstreams
- Inspect policies, in particular looking for policies with conflicting priorities
- Inspect node logs
Inspect Federation Upstreams
- rabbitmq-diagnostics with bash
- rabbitmqadmin with bash
- rabbitmq-diagnostics with PowerShell
- rabbitmqadmin.exe with PowerShell
- Management UI
- HTTP API
rabbitmq-diagnostics list_parameters --formatter=pretty_table
rabbitmqadmin federation list_all_upstreams
rabbitmq-diagnostics.bat list_parameters --formatter=pretty_table
rabbitmqadmin.exe federation list_all_upstreams
Make sure that the rabbitmq_federation_management plugin is enabled.
Navigate to Admin > Federation Upstreams.
GET /api/parameters
Inspect Policies
Only one policy in RabbitMQ can be applied at a time, and that out of N policies with equal priorities a random one will be selected.
In other words, when there are conflicting policies that match the exchanges or queues that are meant to be federated, the policy that enables federation is not guaranteed to be the effective one.
Using explicit different policies and avoiding policies that --apply-to all will reduce
the risk of running into this problem.