Offline Queueing with MQTT

For a project I’m working on, I want to use MQTT as a lightweight message queue. I particularly want the ability to queue up messages in the case where the backend processor is offline, in case it crashed or is being upgraded.

I installed Mosquitto and wrote a pair of publish/subscribe scripts in Python using the paho-mqtt library. I quickly discovered that messages were not being queued if I shut down the subscriber. I tried setting the Retain flag on the messages but that only resulted in just the last message sent being queued.

Turns out there’s several things you need to do in both the publisher and subscriber to enable offline queueing:

  1. Set the QOS level to 1 or 2 in both the publisher and subscriber.
  2. Set a client ID in the subscriber
  3. Subscribe with “Clean Session” set to false

In addition, the subscriber must connect at least once before messages will start queueing up for it.

Here’s the updated sample scripts:

publisher.py

#!/usr/bin/env python

import sys
import paho.mqtt.publish as mqtt_publish

if len(sys.argv) < 3:
    print "Help: publisher.py topic-name payload"
    sys.exit()

topic = sys.argv[1]
message = sys.argv[2]

mqtt_publish.single(topic, message, hostname="localhost", qos=1)

subscriber.py

#!/usr/bin/env python

import paho.mqtt.client as mqtt
import sys

if len(sys.argv) < 2:
    print "Help: subscriber.py topic-name"
    sys.exit()

topic = sys.argv[1]

def on_connect(client, userdata, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(topic, qos=1)

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client(client_id="test-client", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message

client.connect("localhost")

client.loop_forever()

Leave a Reply

Your email address will not be published. Required fields are marked *