8. AMQP and RabbitMQ¶
This chapter covers configuration and debugging of RabbitMQ, a popular AMQP message bus service.
8.1. RabbitMQ use in DIMS¶
AMQP (specifically RabbitMQ) is discussed in Sections DIMS architectural design and System Software Architecture of DIMS Architecture Design v 2.10.0, and the specifics of the server initially configured for use in DIMS is documented in Section dimsasbuilt:rabbitmq of dimsasbuilt:dimsasbuilt. Its use for processing logs within DIMS is discussed in Section dimsparselogs:introtologparsing of dimsparselogs:parsinglogswithdims.
Attention
While RabbitMQ is documented extensively on their web site, it is sometimes hard to interpret what it says. Another very useful resource is Chapter 8: Administering RabbitMQ from the Web from RabbitMQ in Action: Distributed messaging for everyone, by Alvaro Videla and Jason J. W. Williams.
8.2. Basic Service Administration¶
RabbitMQ is started/stopped/restarted/queried for status just like any other
Ubuntu service using the service
command as root. Its configuration files
and settings are found in /etc/rabbitmq
and
/etc/default/rabbitmq-server
, and its log files in /var/log/rabbitmq/
.
root@rabbitmq:~# cd /etc/rabbitmq
root@rabbitmq:/etc/rabbitmq# tree
.
├── enabled_plugins
├── rabbitmq.config
├── rabbitmq.conf.d
└── rabbitmq-env.conf
1 directory, 3 files
root@rabbitmq:/etc/rabbitmq# cat rabbitmq.config
[
{kernel,
[{inet_dist_listen_min, 45000},
{inet_dist_listen_max, 45000}
]
}
].
root@rabbitmq:/var/log/rabbitmq# cat /etc/default/rabbitmq-server
ulimit -n 1024
Note
The ulimit
setting here controls the number of open file
handles a process can have. A server with lots of connections
needs a higher limit than the default, hence this setting.
See [rabbitmq-discuss] Increasing the file descriptors limit
and mozilla/opsec-puppet and Increase RabbitMQ file descriptor limit
and memory watermark without restart.
root@b52:/etc/rabbitmq# rabbitmqctl status | grep -A 4 file_descriptors
{file_descriptors,
[{total_limit,924},{total_used,3},{sockets_limit,829},{sockets_used,1}]},
{processes,[{limit,1048576},{used,200}]},
{run_queue,0},
{uptime,82858}]
root@rabbitmq:/etc/rabbitmq# cd /var/log/rabbitmq
root@rabbitmq:/var/log/rabbitmq# tree
.
├── rabbit@rabbitmq.log
├── rabbit@rabbitmq-sasl.log
├── shutdown_log
└── startup_log
0 directories, 4 files
8.3. Managing RabbitMQ¶
RabbitMQ can be administered in two ways: (1) manually, using the built-in
web interface, or (2) using command line tools like rabbitmqctl
and
rabbitmqadmin
.
To get access to the management interface, you must enabled
rabbitmq_management
in the RabbitMQ configuration:
root@rabbitmq:/etc/rabbitmq# cat rabbitmq-env.conf
#RABBITMQ_NODE_IP_ADDRESS=10.142.29.170
RABBITMQ_NODE_PORT=5672
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]"
# Source other environment files (that include ONLY variable settings,
# not RabbitMQ configuration
for ENVFILE in `ls /etc/rabbitmq/rabbitmq.conf.d |sort -r`; do
. /etc/rabbitmq/rabbitmq.conf.d/$ENVFILE
done
Once you do this, and restart the server, two things become
available. The first is a web interface, and the second is access to a
downloadable (from the RabbitMQ server itself) script named
rabbitmqadmin
.
8.3.1. Using the web interface¶
You can see the web management interface in Figure RabbitMQ Mangement Interface Login Screen and Figure Figure RabbitMQ Mangement Interface Home Screen.
8.3.2. Using the command line¶
The RabbitMQ service daemons are started like any other service on Ubuntu 14.04.
root@b52:~# service rabbitmq-server restart
* Restarting message broker rabbitmq-server
...done.
There are multiple ways with Linux to discover the listening port number. You
can identify the process names with ps
or pstree
to map to output of
netstat
, use lsof
, and the epmd
command:
root@b52:~# pstree -p | less
init(1)-+- ...
|-lightdm(2599)-+-Xorg(2648)
| ...
| |-lightdm(3363)-+-init(4946)-+-at-spi-bus-laun(5140)-+-dbus-daemon(5144)
| | | |-rabbitmq-server(19303)---beam.smp(19311)-+-inet_gethost(19492)---inet_gethos+
| | | | |-{beam.smp}(19408)
| | | | |-{beam.smp}(19409)
| | | | | ...
| | | | |-{beam.smp}(19451)
| | | | `-{beam.smp}(19452)
| ...
root@b52:~# netstat -pan | grep beam
tcp 0 0 0.0.0.0:45000 0.0.0.0:* LISTEN 19311/beam.smp
tcp 0 0 127.0.0.1:51156 127.0.0.1:4369 ESTABLISHED 19311/beam.smp
tcp6 0 0 :::5672 :::* LISTEN 19311/beam.smp
root@b52:~# lsof -i | grep beam
beam.smp 19311 rabbitmq 8u IPv4 27589259 0t0 TCP *:45000 (LISTEN)
beam.smp 19311 rabbitmq 9u IPv4 27589261 0t0 TCP localhost:51156->localhost:epmd (ESTABLISHED)
beam.smp 19311 rabbitmq 16u IPv6 27580219 0t0 TCP *:amqp (LISTEN)
root@b52:~# epmd -names
epmd: up and running on port 4369 with data:
name rabbit at port 45000
There are two ways of getting the exact same information on
the runtime status of RabbitMQ. The first uses rabbitmqctl
directly. The second uses service rabbitmq-server status
.
They are both shown here:
root@rabbitmq:/etc/rabbitmq# rabbitmqctl status
Status of node rabbit@rabbitmq ...
[{pid,8815},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","0.0.0"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","0.0.0"},
{amqp_client,"RabbitMQ AMQP Client","0.0.0"},
{rabbit,"RabbitMQ","2.7.1"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{sasl,"SASL CXC 138 11","2.1.10"},
{rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","0.0.0"},
{webmachine,"webmachine","1.7.0-rmq0.0.0-hg"},
{mochiweb,"MochiMedia Web Server","1.3-rmq0.0.0-git"},
{inets,"INETS CXC 138 49","5.7.1"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:16:16] [rq:16] [async-threads:30] [kernel-poll:true]\n"},
{memory,
[{total,31080064},
{processes,11445592},
{processes_used,11433880},
{system,19634472},
{atom,1336577},
{atom_used,1313624},
{binary,117880},
{code,14301212},
{ets,1142776}]},
{vm_memory_high_watermark,0.39999999996434304},
{vm_memory_limit,6730807705}]
...done.
root@rabbitmq:/etc/rabbitmq# service rabbitmq-server status
Status of node rabbit@rabbitmq ...
[{pid,8815},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","0.0.0"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","0.0.0"},
{amqp_client,"RabbitMQ AMQP Client","0.0.0"},
{rabbit,"RabbitMQ","2.7.1"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{sasl,"SASL CXC 138 11","2.1.10"},
{rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","0.0.0"},
{webmachine,"webmachine","1.7.0-rmq0.0.0-hg"},
{mochiweb,"MochiMedia Web Server","1.3-rmq0.0.0-git"},
{inets,"INETS CXC 138 49","5.7.1"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:16:16] [rq:16] [async-threads:30] [kernel-poll:true]\n"},
{memory,
[{total,31103832},
{processes,11469280},
{processes_used,11457568},
{system,19634552},
{atom,1336577},
{atom_used,1313689},
{binary,117880},
{code,14301212},
{ets,1142776}]},
{vm_memory_high_watermark,0.39999999996434304},
{vm_memory_limit,6730807705}]
...done.
The following shows how to get a copy of the rabbitmqadmin
script
and make it executable from the command line.
root@rabbitmq:/etc/rabbitmq# wget http://localhost:55672/cli/rabbitmqadmin
root@rabbitmq:/etc/rabbitmq# chmod +x rabbitmqadmin
Note
These steps should be done immediately after initial RabbitMQ
installation when creating Ansible playbooks, the script turned
into a Jinja2 template, and installed into the $PATH for
direct access from the command line (as opposed to being
run with a relative path after changing directory into
the /etc/rabbitmq
directory as shown here).
The rabbitmqadmin
script has a help
option that provides
information on how to use it.
root@rabbitmq:/etc/rabbitmq# ./rabbitmqadmin help subcommands
Usage
=====
rabbitmqadmin [options] subcommand
where subcommand is one of:
Display
=======
list users [<column>...]
list vhosts [<column>...]
list connections [<column>...]
list exchanges [<column>...]
list bindings [<column>...]
list permissions [<column>...]
list channels [<column>...]
list parameters [<column>...]
list queues [<column>...]
list policies [<column>...]
list nodes [<column>...]
show overview [<column>...]
Object Manipulation
===================
declare queue name=... [node=... auto_delete=... durable=... arguments=...]
declare vhost name=... [tracing=...]
declare user name=... password=... tags=...
declare exchange name=... type=... [auto_delete=... internal=... durable=... arguments=...]
declare policy name=... pattern=... definition=... [priority=... apply-to=...]
declare parameter component=... name=... value=...
declare permission vhost=... user=... configure=... write=... read=...
declare binding source=... destination=... [arguments=... routing_key=... destination_type=...]
delete queue name=...
delete vhost name=...
delete user name=...
delete exchange name=...
delete policy name=...
delete parameter component=... name=...
delete permission vhost=... user=...
delete binding source=... destination_type=... destination=... properties_key=...
close connection name=...
purge queue name=...
Broker Definitions
==================
export <file>
import <file>
Publishing and Consuming
========================
publish routing_key=... [payload=... payload_encoding=... exchange=...]
get queue=... [count=... requeue=... payload_file=... encoding=...]
* If payload is not specified on publish, standard input is used
* If payload_file is not specified on get, the payload will be shown on
standard output along with the message metadata
* If payload_file is specified on get, count must not be set
Here rabbitmqadmin
is used to get a list of the currently
defined exchanges:
root@rabbitmq:/etc/rabbitmq# ./rabbitmqadmin list exchanges
+-------+--------------------+---------+-------------+---------+----------+
| vhost | name | type | auto_delete | durable | internal |
+-------+--------------------+---------+-------------+---------+----------+
| / | | direct | False | True | False |
| / | amq.direct | direct | False | True | False |
| / | amq.fanout | fanout | False | True | False |
| / | amq.headers | headers | False | True | False |
| / | amq.match | headers | False | True | False |
| / | amq.rabbitmq.log | topic | False | True | False |
| / | amq.rabbitmq.trace | topic | False | True | False |
| / | amq.topic | topic | False | True | False |
| / | devops | fanout | False | True | False |
| / | log_task | direct | False | True | False |
| / | logs | fanout | False | False | False |
+-------+--------------------+---------+-------------+---------+----------+
We can now define a new fanout
exchange where we can direct log messages for
later processing using rabbitmqadmin
, rather than the web interface:
root@rabbitmq:/etc/rabbitmq# ./rabbitmqadmin declare exchange name=health type=fanout auto_delete=false durable=true internal=false
exchange declared
root@rabbitmq:/etc/rabbitmq# ./rabbitmqadmin list exchanges
+-------+--------------------+---------+-------------+---------+----------+
| vhost | name | type | auto_delete | durable | internal |
+-------+--------------------+---------+-------------+---------+----------+
| / | | direct | False | True | False |
| / | amq.direct | direct | False | True | False |
| / | amq.fanout | fanout | False | True | False |
| / | amq.headers | headers | False | True | False |
| / | amq.match | headers | False | True | False |
| / | amq.rabbitmq.log | topic | False | True | False |
| / | amq.rabbitmq.trace | topic | False | True | False |
| / | amq.topic | topic | False | True | False |
| / | devops | fanout | False | True | False |
| / | health | fanout | False | True | False |
| / | log_task | direct | False | True | False |
| / | logs | fanout | False | False | False |
+-------+--------------------+---------+-------------+---------+----------+
After creating all of the broker objects we wish to have in the
default server (using either the web interface and/or rabbitmqadmin
)
you can export
a JSON file that can be put under Ansible control
for later import into a newly instantiated RabbitMQ server.
(See Loading rabbitmq config at startup.)
Caution
There are passwords in this output (which are redacted here). Keep this file secure and do not put it in a public source repository without encryption or templating (e.g., with Jinja2).
root@rabbitmq:/etc/rabbitmq# ./rabbitmqadmin export broker-objects.json
Exported definitions for localhost to "broker-objects.json"
root@rabbitmq:/etc/rabbitmq# python -m json.tool broker-objects.json
{
"bindings": [
{
"arguments": {},
"destination": "log_task",
"destination_type": "queue",
"routing_key": "log_task",
"source": "log_task",
"vhost": "/"
},
{
"arguments": {},
"destination": "log_test_queue",
"destination_type": "queue",
"routing_key": "",
"source": "test_exchange",
"vhost": "/"
},
{
"arguments": {},
"destination": "taskqueue",
"destination_type": "queue",
"routing_key": "",
"source": "test_exchange",
"vhost": "/"
},
{
"arguments": {},
"destination": "test_exchange",
"destination_type": "queue",
"routing_key": "test_exchange",
"source": "test_exchange",
"vhost": "/"
}
],
"exchanges": [
{
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"name": "test_exchange",
"type": "direct",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"name": "devops",
"type": "fanout",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"name": "test",
"type": "fanout",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"name": "health",
"type": "fanout",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": false,
"internal": false,
"name": "logs",
"type": "fanout",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"name": "log_task",
"type": "direct",
"vhost": "/"
}
],
"permissions": [
{
"configure": ".*",
"read": ".*",
"user": "rpc_user",
"vhost": "/",
"write": ".*"
},
{
"configure": ".*",
"read": ".*",
"user": "logmatrix",
"vhost": "/",
"write": ".*"
},
{
"configure": ".*",
"read": ".*",
"user": "hutchman",
"vhost": "/",
"write": ".*"
}
],
"queues": [
{
"arguments": {},
"auto_delete": false,
"durable": false,
"name": "crosscor_test_0.5.5",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"name": "taskqueue",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": false,
"name": "cifbulk_v1_0.5.5",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"name": "test_exchange",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": false,
"name": "anon_0.5.5",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"name": "log_task",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": false,
"name": "cifbulk_v1_test_0.5.5",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": false,
"name": "crosscor_0.5.5",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"name": "log_queue_test",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": true,
"name": "log_test_queue",
"vhost": "/"
},
{
"arguments": {},
"auto_delete": false,
"durable": false,
"name": "anon_test_0.5.5",
"vhost": "/"
}
],
"rabbit_version": "2.7.1",
"users": [
{
"name": "hutchman",
"password_hash": "REDACTED",
"tags": "administrator"
},
{
"name": "logmatrix",
"password_hash": "REDACTED",
"tags": "administrator"
},
{
"name": "rpc_user",
"password_hash": "REDACTED",
"tags": ""
}
],
"vhosts": [
{
"name": "/"
}
]
}