For a project I’m working on, I want to use MQTT as a lightweight message queue. I particularly want the ability to queue up messages in the case where the backend processor is offline, in case it crashed or is being upgraded.
I installed Mosquitto and wrote a pair of publish/subscribe scripts in Python using the paho-mqtt library. I quickly discovered that messages were not being queued if I shut down the subscriber. I tried setting the Retain flag on the messages but that only resulted in just the last message sent being queued.
Turns out there’s several things you need to do in both the publisher and subscriber to enable offline queueing:
- Set the QOS level to 1 or 2 in both the publisher and subscriber.
- Set a client ID in the subscriber
- Subscribe with “Clean Session” set to false
In addition, the subscriber must connect at least once before messages will start queueing up for it.
Here’s the updated sample scripts:
publisher.py
#!/usr/bin/env python
import sys
import paho.mqtt.publish as mqtt_publish
if len(sys.argv) < 3:
print "Help: publisher.py topic-name payload"
sys.exit()
topic = sys.argv[1]
message = sys.argv[2]
mqtt_publish.single(topic, message, hostname="localhost", qos=1)
subscriber.py
#!/usr/bin/env python
import paho.mqtt.client as mqtt
import sys
if len(sys.argv) < 2:
print "Help: subscriber.py topic-name"
sys.exit()
topic = sys.argv[1]
def on_connect(client, userdata, rc):
print("Connected with result code " + str(rc))
client.subscribe(topic, qos=1)
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client(client_id="test-client", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost")
client.loop_forever()