So, you're an Elixir developer who are in need of the power and scalability of MQTT alongside your existing backend infrastructure? Have you started thinking about implementing your own MQTT broker in Elixir? If that's the case you can drop what you're doing and instead use VerneMQ. VerneMQ is written in Erlang but can be extended with plugins written in Elixir.
And that's the motivation for this blog post! Since we are huge fans of Elixir, we thought we'd write this blog post to show, from start to finish, how to write a VerneMQ plugin in Elixir and show that it's in fact quite easy.
We'll be building a relatively simple plugin which will call an HTTP endpoint on three different events: When a client has been successfully authenticated (on_client_wakeup hook), when the client is no longer online (on_client_gone and on_client_offline hooks) and whenever a client publishes a message (on_publish hook).
One could easily imagine a use case where it could be interesting to store all published messages somewhere as well as the online/offline status of the clients.
Aside: Why do we have two hooks for when a client goes offline? It's because of the clean_session
connection flag. Depending of the value of this flag the client may or may not have left session state on the broker, and for this reason there are two hooks: on_client_offline
is called when clean_session=false
and the client has state on the broker and on_client_gone
when it doesn't. See the clean session section in our MQTT primer for more information.
In VerneMQ a plugin is an OTP application (it may be helpful to read the documentation on Supervisors and Applications if you're not familiar with the concept). A short (but incomplete) summary of what an OTP application is follows. An OPT application is
VerneMQ is built up of a bunch of OTP applications, and in fact Elixir itself is a set of OTP applications! Also, it's important to stress that there is no difference between an Erlang and an Elixir application. They are the exact same thing.
This has some interesting implications: when we develop a plugin in Elixir it is really an OTP application that depends on another applications, namely Elixir itself. We now know how to make Elixir available inside VerneMQ: We can load it as a plugin since it's just another OTP application! We'll use this information later when we will be testing our plugin.
This blog post was written on a Linux system, but the steps should be the same on Apple OSX or other unix-like systems.
At the time of writing the newest stable version of Elixir is version 1.3.3 and this blog post is based on that version. To install Elixir into your system consult the official documentation which can be found here.
Note that part of the process of installing Elixir is to install Erlang as well. It's important that the installed Erlang version is compatible with the Erlang version used to build VerneMQ. If you installed VerneMQ as a package you should install Erlang 18.x as that is what we currently use to build our packages. If you've built VerneMQ from source, just use the same Erlang version as you used to build VerneMQ.
We used the latest version of VerneMQ as well which at the time of writing is 0.14.2 which you can find here.
We assume that both Elixir and VerneMQ have been properly installed and the various executables (mix
, iex
for Elixir and vernemq
and vmq-admin
for VerneMQ) are available in the environment paths.
We'll be creating a test HTTP endpoint using Python and we'll be testing everything using the command line tools mosquitto_pub
and mosquitto_pub
(from the mosquitto-clients
package on Debian).
Let's create a basic Elixir project to get started by using the Elixir mix
tool:
$ mix new my_vernemq_plugin --module MyVMQPlugin
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/my_vernemq_plugin.ex
* creating test
* creating test/test_helper.exs
* creating test/my_vernemq_plugin_test.exs
Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:
cd my_vernemq_plugin
mix test
Run "mix help" for more commands.
Since we'll be calling some endpoints using HTTP we need an HTTP client and for fun let's encode the payload data as json. As our HTTP client we'll use httpoison and to encode json we'll use exjsx. Add these to the dependencies in mix.exs
:
defp deps do
[{:httpoison, "~> 0.9.0"},
{:exjsx, "~> 3.2"}]
end
And to our list of applications in mix.exs
:
def application do
[applications: [:logger, :httpoison, :exjsx]]
end
And then run mix deps.get
to fetch the new dependencies so they'll be available to us.
Now let's open up the lib/my_vernemq_plugin.ex
which looks like this:
defmodule MyVMQPlugin do
end
Let's add here the different callbacks handling the hooks mentioned above. Let's start on_client_wakeup
hook which is called after a client has been successfully authenticated. The callback looks like this:
@doc """
Callback function which is called after a client has been
successfully authenticated.
"""
def on_client_wakeup({mountpoint, clientid}) do
endpoint = Application.get_env(:my_vernemq_plugin, :endpoint_on_client_wakeup)
headers = [{"hook", "on_client_wakeup"}]
body = JSX.encode!(%{"mountpoint" => mountpoint,
"clientid" => clientid})
HTTPoison.post!(endpoint, body, headers)
:ok
end
The function retrives the endpoint information from the :my_vernemq_plugin
application environment, then creates a header with the hook name, then json encodes the subscriberid (which is made up of a mountpoint and a client id) and finally do a post with the endpoint, headers and body.
That's all that's needed to implement an VerneMQ hook. Of course we still need to tell VerneMQ to call this function when the on_client_wakeup
hook is called. We'll get to this later, but first let's implement the remaining hooks: on_client_offline, on_client_gone and on_publish and add them to the my_vernemq_plugin.ex
file so it in its complete form looks like this:
defmodule MyVMQPlugin do
@doc """
Callback function which is called after a client has been
successfully authenticated.
"""
def on_client_wakeup({mountpoint, clientid}) do
endpoint = Application.get_env(:my_vernemq_plugin, :endpoint_on_client_wakeup)
headers = [{"hook", "on_client_wakeup"}]
body = JSX.encode!(%{"mountpoint" => mountpoint,
"clientid" => clientid})
HTTPoison.post!(endpoint, body, headers)
:ok
end
@doc """
Callback function which is called after a client using
`clean_session=false` has been disconnected.
"""
def on_client_offline({mountpoint, clientid}) do
endpoint = Application.get_env(:my_vernemq_plugin, :endpoint_on_client_offline)
headers = [{"hook", "on_client_offline"}]
body = JSX.encode!(%{"mountpoint" => mountpoint,
"clientid" => clientid})
HTTPoison.post!(endpoint, body, headers)
:ok
end
@doc """
Callback function which is called after a client using
`clean_session=true` has been disconnected.
"""
def on_client_gone({mountpoint, clientid}) do
endpoint = Application.get_env(:my_vernemq_plugin, :endpoint_on_client_gone)
headers = [{"hook", "on_client_gone"}]
body = JSX.encode!(%{"mountpoint" => mountpoint,
"clientid" => clientid})
HTTPoison.post!(endpoint, body, headers)
:ok
end
@doc """
Callback function which is called whenever a message has been
authorized and is about to be published.
"""
def on_publish(username, {mountpoint, clientid}, qos, topic, payload, isretain) do
endpoint = Application.get_env(:my_vernemq_plugin, :endpoint_on_publish)
headers = [{"hook", "on_publish"}]
body = JSX.encode!(%{"username" => username,
"mountpoint" => mountpoint,
"clientid" => clientid,
"qos" => qos,
"topic" => topic,
"payload" => payload,
"isretain" => isretain})
HTTPoison.post!(endpoint, body, headers)
:ok
end
end
All the callback functions look alike, the only one that stands out a bit is the on_publish
call back function since it has some more arguments.
For more information about other hooks, see the Plugin Development Guide which explains (in Erlang terms) when and how each hook is called.
We still have two things to do before the Elixir plugin is ready for action. The first is that we need to configure the endpoints we want to use in the application environment. The second is to let VerneMQ know about the hooks we have implemented in our plugin. We do this by adding another entry to the application environment which VerneMQ looks for when the plugin is loaded. To do this we change the application function in the mix.exs
file so it looks like the following:
def application do
[applications: [:logger, :httpoison, :exjsx],
env: [
vmq_plugin_hooks:
[{:on_client_wakeup, MyVMQPlugin, :on_client_wakeup,1,[]},
{:on_client_offline, MyVMQPlugin, :on_client_offline,1,[]},
{:on_client_gone, MyVMQPlugin, :on_client_gone,1,[]},
{:on_publish, MyVMQPlugin, :on_publish,6,[]}],
endpoint_on_client_wakeup: "http://localhost:1234",
endpoint_on_client_offline: "http://localhost:1234",
endpoint_on_client_gone: "http://localhost:1234",
endpoint_on_publish: "http://localhost:1234"]]
end
We configured all our endpoints to call out to localhost on port 1234 and we also added the vmq_plugin_hooks
entry containing five-tuples with the following elements: the name of the hook, the module, function and arity and finally an empty list (which is for options used by the plugin system and not relevant in this context).
The final step before our plugin is ready is to compile it, so let's do that:
$ mix compile
Before we enable our new plugin we need an HTTP endpoint which the plugin can call. So let's make a very simple one which just prints out the data received to the console:
import web
import json
urls = ('/.*', 'hooks')
app = web.application(urls, globals())
class hooks:
def POST(self):
# fetch hook and request data
hook = web.ctx.env.get('HTTP_HOOK')
data = json.loads(web.data())
# print the hook and request data to the console
print 'hook:', hook
print 'data:', data
print
if __name__ == '__main__':
app.run()
Save this in the file my_vernemq_plugin_endpoint.py
and start it like this:
$ python my_vernemq_plugin_endpoint.py 1234
When loading a plugin we sometimes have dependencies we need to load as well. For instance our plugin will be depending on a bunch of other applications (like our HTTP client and json libraries). If we take a peak at the applications that make up our plugin we see the following:
$ tree _build/dev/lib -L 1
_build/dev/lib
├── certifi
├── exjsx
├── hackney
├── httpoison
├── idna
├── jsx
├── metrics
├── mimerl
├── my_vernemq_plugin
└── ssl_verify_fun
10 directories, 0 files
Instead of having to load all of these applications one by one we can give VerneMQ the path where it can find the lib
folder which contains the plugin and dependency applications. This is a standard OTP convention and VerneMQ will look for this folder and will take care of loading the plugin application together with any dependencies for you.
Your Elixir installation also contains a lib
folder containing all the applications that make up elixir. This folder resides in the root folder of your Elixir installation.
This means we can pass the elixir root folder directly to VerneMQ when enabling Elixir as a plugin and VerneMQ will figure out the rest. We also need to specify the name of the plugin which is just the name of the application.
Enabling Elixir as a plugin then looks like this:
$ vmq-admin plugin enable --name=elixir --path=/path/to/elixir/root/folder
Done
We're now ready to enable our Elixir plugin. Again we just need to pass the path to where the lib folder can be found as well as the application name of our plugin:
$ vmq-admin plugin enable --name=my_vernemq_plugin --path=/path/to/my_vernemq_plugin/_build/dev
Done
To check that the the plugins where loaded and started correctly we can issue the following command:
$ vmq-admin plugin show +-----------------+-----------+-----------------+----------------------------------------+ | Plugin | Type | Hook(s) | M:F/A | +-----------------+-----------+-----------------+----------------------------------------+ | vmq_passwd |application|auth_on_register | vmq_passwd:auth_on_register/5 | | vmq_acl |application| auth_on_publish | vmq_acl:auth_on_publish/6 | | | |auth_on_subscribe| vmq_acl:auth_on_subscribe/3 | | elixir |application| | | |my_vernemq_plugin|application|on_client_wakeup |'Elixir.MyVMQPlugin':on_client_wakeup/1 | | | |on_client_offline|'Elixir.MyVMQPlugin':on_client_offline/1| | | | on_client_gone | 'Elixir.MyVMQPlugin':on_client_gone/1 | | | | on_publish | 'Elixir.MyVMQPlugin':on_publish/6 | +-----------------+-----------+-----------------+----------------------------------------+
All looks good, both the elixir
and my_vernemq_plugin
have been loaded and the hooks for my_vernemq_plugin
registered as well.
Since we haven't configured any authentication or authorization yet, we'll disable it completely, otherwise our MQTT clients would be rejected. We can do that like this:
$ vmq-admin set allow_anonymous=on
Now everything is ready and if we connect and try to publish a message using for instance the great mosquitto_pub
tool, like this:
$ mosquitto_pub -i myclientid -t "mytopic" -m "hello world"
we'll see the following output from our python HTTP endpoint:
hook: on_client_wakeup
data: {u'mountpoint': [], u'clientid': u'myclientid'}
127.0.0.1:17111 - - [29/Sep/2016 11:44:34] "HTTP/1.1 POST /" - 200 OK
hook: on_publish
data: {u'username': u'undefined', u'qos': 0, u'clientid': u'myclientid', u'isretain': False, u'topic': [u'mytopic'], u'mountpoint': [], u'payload': u'hello world'}
127.0.0.1:17111 - - [29/Sep/2016 11:44:34] "HTTP/1.1 POST /" - 200 OK
hook: on_client_gone
data: {u'mountpoint': [], u'clientid': u'myclientid'}
127.0.0.1:17111 - - [29/Sep/2016 11:44:34] "HTTP/1.1 POST /" - 200 OK
It's working! Time to take a break and celebrate!
We hope with this tutorial that we've shown how to develop a small plugin in Elixir - and that it's quite easy to get started.
The hooks we selected are a small subset of the ones available, we recommend going through the general plugin development guide to see which hooks exist and what you can do with the plugin system in general.
Note that you besides Elixir and Erlang also can develop plugins in Lua using vmq_diversity or as HTTP endpoints using vmq_webhooks.
Questions, comments, feedback? Don't hesitate to get in touch!
Cheers, Lars and the VerneMQ Mission Engineers