"""Flask-MQTT Package.
:author: Stefan Lehmann <stlm@posteo.de>
:license: MIT, see license file or https://opensource.org/licenses/MIT
"""
import sys
import ssl
import logging
from collections import namedtuple
from flask import Flask
from typing import Dict, Any, Callable, Tuple, Optional, List
# noinspection PyUnresolvedReferences
from paho.mqtt.client import (
Client,
MQTT_ERR_SUCCESS,
MQTT_ERR_ACL_DENIED,
MQTT_ERR_AGAIN,
MQTT_ERR_AUTH,
MQTT_ERR_CONN_LOST,
MQTT_ERR_CONN_REFUSED,
MQTT_ERR_ERRNO,
MQTT_ERR_INVAL,
MQTT_ERR_NO_CONN,
MQTT_ERR_NOMEM,
MQTT_ERR_NOT_FOUND,
MQTT_ERR_NOT_SUPPORTED,
MQTT_ERR_PAYLOAD_SIZE,
MQTT_ERR_PROTOCOL,
MQTT_ERR_QUEUE_SIZE,
MQTT_ERR_TLS,
MQTT_ERR_UNKNOWN,
MQTT_LOG_DEBUG,
MQTT_LOG_ERR,
MQTT_LOG_INFO,
MQTT_LOG_NOTICE,
MQTT_LOG_WARNING,
MQTTv311,
MQTTv31,
)
# define some alias for python2 compatibility
if sys.version_info[0] >= 3:
unicode = str
# current Flask-MQTT version
__version__ = "1.1.2"
#: Container for topic + qos
TopicQos = namedtuple("TopicQos", ["topic", "qos"])
# Init logger
logger = logging.getLogger(__name__)
[docs]class Mqtt:
"""Main Mqtt class.
:param app: flask application object
:param connect_async: if True then connect_aync will be used to connect to MQTT broker
:param mqtt_logging: if True then messages from MQTT client will be logged
"""
def __init__(
self, app: Flask = None, connect_async: bool = False, mqtt_logging: bool = False, config_prefix: str = "MQTT"
) -> None:
self._connect_async: bool = connect_async
self._connect_handler: Optional[Callable] = None
self._disconnect_handler: Optional[Callable] = None
self.app = app
self.client = Client()
self.connected = False
self.topics: Dict[str, TopicQos] = {}
# configuration parameters
self.client_id: str = ""
self.config_prefix = config_prefix
self.clean_session: bool = True
self.username: Optional[str] = None
self.password: Optional[str] = None
self.broker_url: str = "localhost"
self.broker_port: int = 1883
self.tls_enabled: bool = False
self.keepalive: int = 60
self.last_will_topic: Optional[str] = None
self.last_will_message: Optional[str] = None
self.last_will_qos: int = 0
self.last_will_retain: bool = False
self.tls_ca_certs: Optional[List[str]] = None
self.tls_certfile: Optional[str] = None
self.tls_keyfile: Optional[str] = None
self.tls_cert_reqs: int = ssl.CERT_NONE
self.tls_version: int = ssl.PROTOCOL_TLSv1
self.tls_ciphers: Optional[List[str]] = None
self.tls_insecure: bool = False
if mqtt_logging:
self.client.enable_logger(logger)
if app is not None:
self.init_app(app, self.config_prefix)
[docs] def init_app(self, app: Flask, config_prefix : str = "MQTT") -> None:
"""Init the Flask-MQTT addon."""
if self.app is None:
self.app = app
if config_prefix + "_CLIENT_ID" in app.config:
self.client_id = app.config["MQTT_CLIENT_ID"]
if isinstance(self.client_id, unicode):
self.client._client_id = self.client_id.encode("utf-8")
else:
self.client._client_id = self.client_id
self.client._transport = app.config.get(config_prefix + "_TRANSPORT", "tcp").lower()
self.client._protocol = app.config.get(config_prefix + "_PROTOCOL_VERSION", MQTTv311)
self.client._clean_session = self.clean_session
self.client.on_connect = self._handle_connect
self.client.on_disconnect = self._handle_disconnect
if config_prefix + "_CLEAN_SESSION" in app.config:
self.clean_session = app.config[config_prefix + "_CLEAN_SESSION"]
if config_prefix + "_USERNAME" in app.config:
self.username = app.config[ config_prefix + "_USERNAME"]
if config_prefix + "_PASSWORD" in app.config:
self.password = app.config[config_prefix + "_PASSWORD"]
if config_prefix + "_BROKER_URL" in app.config:
self.broker_url = app.config[config_prefix + "_BROKER_URL"]
if config_prefix + "_BROKER_PORT" in app.config:
self.broker_port = app.config[config_prefix + "_BROKER_PORT"]
if config_prefix + "_TLS_ENABLED" in app.config:
self.tls_enabled = app.config[config_prefix + "_TLS_ENABLED"]
if config_prefix + "_KEEPALIVE" in app.config:
self.keepalive = app.config[config_prefix + "_KEEPALIVE"]
if config_prefix + "_LAST_WILL_TOPIC" in app.config:
self.last_will_topic = app.config[config_prefix + "_LAST_WILL_TOPIC"]
if config_prefix + "_LAST_WILL_MESSAGE" in app.config:
self.last_will_message = app.config[config_prefix + "_LAST_WILL_MESSAGE"]
if config_prefix + "_LAST_WILL_QOS" in app.config:
self.last_will_qos = app.config[config_prefix + "_LAST_WILL_QOS"]
if config_prefix + "_LAST_WILL_RETAIN" in app.config:
self.last_will_retain = app.config[ config_prefix + "_LAST_WILL_RETAIN"]
if self.tls_enabled:
if config_prefix + "_TLS_CA_CERTS" in app.config:
self.tls_ca_certs = app.config[config_prefix + "_TLS_CA_CERTS"]
if config_prefix + "_TLS_CERTFILE" in app.config:
self.tls_certfile = app.config[config_prefix + "_TLS_CERTFILE"]
if config_prefix + "_TLS_KEYFILE" in app.config:
self.tls_keyfile = app.config[config_prefix + "_TLS_KEYFILE"]
if config_prefix + "_TLS_CIPHERS" in app.config:
self.tls_ciphers = app.config[config_prefix + "_TLS_CIPHERS"]
if config_prefix + "_TLS_INSECURE" in app.config:
self.tls_insecure = app.config[config_prefix + "_TLS_INSECURE"]
self.tls_cert_reqs = app.config.get(config_prefix + "_TLS_CERT_REQS", ssl.CERT_REQUIRED)
self.tls_version = app.config.get(config_prefix + "_TLS_VERSION", ssl.PROTOCOL_TLSv1)
# set last will message
if self.last_will_topic is not None:
self.client.will_set(
self.last_will_topic,
self.last_will_message,
self.last_will_qos,
self.last_will_retain,
)
self._connect()
def _connect(self) -> None:
if self.username is not None:
self.client.username_pw_set(self.username, self.password)
# security
if self.tls_enabled:
self.client.tls_set(
ca_certs=self.tls_ca_certs,
certfile=self.tls_certfile,
keyfile=self.tls_keyfile,
cert_reqs=self.tls_cert_reqs,
tls_version=self.tls_version,
ciphers=self.tls_ciphers,
)
if self.tls_insecure:
self.client.tls_insecure_set(self.tls_insecure)
if self._connect_async:
# if connect_async is used
self.client.connect_async(
self.broker_url, self.broker_port, keepalive=self.keepalive
)
else:
res = self.client.connect(
self.broker_url, self.broker_port, keepalive=self.keepalive
)
if res == 0:
logger.debug(
"Connected client '{0}' to broker {1}:{2}".format(
self.client_id, self.broker_url, self.broker_port
)
)
else:
logger.error(
"Could not connect to MQTT Broker, Error Code: {0}".format(res)
)
self.client.loop_start()
def _disconnect(self) -> None:
self.client.loop_stop()
self.client.disconnect()
logger.debug("Disconnected from Broker")
def _handle_connect(
self, client: Client, userdata: Any, flags: Dict[str, Any], rc: int
) -> None:
if rc == MQTT_ERR_SUCCESS:
self.connected = True
for key, item in self.topics.items():
self.client.subscribe(topic=item.topic, qos=item.qos)
if self._connect_handler is not None:
self._connect_handler(client, userdata, flags, rc)
def _handle_disconnect(self, client: Client, userdata: Any, rc: int) -> None:
self.connected = False
if self._disconnect_handler is not None:
self._disconnect_handler()
[docs] def on_topic(self, topic: str) -> Callable:
"""Decorator.
Decorator to add a callback function that is called when a certain
topic has been published. The callback function is expected to have the
following form: `handle_topic(client, userdata, message)`
:parameter topic: a string specifying the subscription topic to
subscribe to
The topic still needs to be subscribed via mqtt.subscribe() before the
callback function can be used to handle a certain topic. This way it is
possible to subscribe and unsubscribe during runtime.
**Example usage:**::
app = Flask(__name__)
mqtt = Mqtt(app)
mqtt.subscribe('home/mytopic')
@mqtt.on_topic('home/mytopic')
def handle_mytopic(client, userdata, message):
print('Received message on topic {}: {}'
.format(message.topic, message.payload.decode()))
"""
def decorator(handler: Callable[[str], None]) -> Callable[[str], None]:
self.client.message_callback_add(topic, handler)
return handler
return decorator
[docs] def subscribe(self, topic, qos: int = 0) -> Tuple[int, int]:
"""
Subscribe to a certain topic.
:param topic: a string specifying the subscription topic to
subscribe to.
:param qos: the desired quality of service level for the subscription.
Defaults to 0.
:rtype: (int, int)
:result: (result, mid)
A topic is a UTF-8 string, which is used by the broker to filter
messages for each connected client. A topic consists of one or more
topic levels. Each topic level is separated by a forward slash
(topic level separator).
The function returns a tuple (result, mid), where result is
MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the
client is not currently connected. mid is the message ID for the
subscribe request. The mid value can be used to track the subscribe
request by checking against the mid argument in the on_subscribe()
callback if it is defined.
**Topic example:** `myhome/groundfloor/livingroom/temperature`
"""
# TODO: add support for list of topics
# don't subscribe if already subscribed
# try to subscribe
result, mid = self.client.subscribe(topic=topic, qos=qos)
# if successful add to topics
if result == MQTT_ERR_SUCCESS:
if isinstance(topic, tuple):
topic, qos = topic
self.topics[topic] = TopicQos(topic=topic, qos=qos)
elif isinstance(topic, list):
for t, q in topic:
self.topics[t] = TopicQos(topic=t, qos=q)
else:
self.topics[topic] = TopicQos(topic=topic, qos=qos)
logger.debug("Subscribed to topic: {0}, qos: {1}".format(topic, qos))
else:
logger.error("Error {0} subscribing to topic: {1}".format(result, topic))
return result, mid
[docs] def unsubscribe(self, topic: str) -> Optional[Tuple[int, int]]:
"""
Unsubscribe from a single topic.
:param topic: a single string that is the subscription topic to
unsubscribe from
:rtype: (int, int)
:result: (result, mid)
Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not
currently connected.
mid is the message ID for the unsubscribe request. The mid value can be
used to track the unsubscribe request by checking against the mid
argument in the on_unsubscribe() callback if it is defined.
"""
# don't unsubscribe if not in topics
if topic in self.topics:
result, mid = self.client.unsubscribe(topic)
if result == MQTT_ERR_SUCCESS:
self.topics.pop(topic)
logger.debug("Unsubscribed from topic: {0}".format(topic))
else:
logger.debug(
"Error {0} unsubscribing from topic: {1}".format(result, topic)
)
# if successful remove from topics
return result, mid
return None
[docs] def unsubscribe_all(self) -> None:
"""
Unsubscribe from all topics.
Returns True if all topics are unsubscribed from self.topics, otherwise False
"""
topics = list(self.topics.keys())
for topic in topics:
self.unsubscribe(topic)
if not len(self.topics):
return True
return False
[docs] def publish(
self,
topic: str,
payload: Optional[bytes] = None,
qos: int = 0,
retain: bool = False,
) -> Tuple[int, int]:
"""
Send a message to the broker.
:param topic: the topic that the message should be published on
:param payload: the actual message to send. If not given, or set to
None a zero length message will be used. Passing an
int or float will result in the payload being
converted to a string representing that number.
If you wish to send a true int/float, use struct.pack()
to create the payload you require.
:param qos: the quality of service level to use
:param retain: if set to True, the message will be set as the
"last known good"/retained message for the topic
:returns: Returns a tuple (result, mid), where result is
MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN
if the client is not currently connected. mid is the message
ID for the publish request.
"""
result, mid = self.client.publish(topic, payload, qos, retain)
if result == MQTT_ERR_SUCCESS:
logger.debug("Published topic {0}: {1}".format(topic, payload))
else:
logger.error("Error {0} publishing topic {1}".format(result, topic))
return result, mid
[docs] def on_connect(self) -> Callable:
"""Decorator.
Decorator to handle the event when the broker responds to a connection
request. Only the last decorated function will be called.
"""
def decorator(handler: Callable) -> Callable:
self._connect_handler = handler
return handler
return decorator
[docs] def on_disconnect(self) -> Callable:
"""Decorator.
Decorator to handle the event when client disconnects from broker. Only
the last decorated function will be called.
"""
def decorator(handler: Callable) -> Callable:
self._disconnect_handler = handler
return handler
return decorator
[docs] def on_message(self) -> Callable:
"""Decorator.
Decorator to handle all messages that have been subscribed and that
are not handled via the `on_message` decorator.
**Note:** Unlike as written in the paho mqtt documentation this
callback will not be called if there exists an topic-specific callback
added by the `on_topic` decorator.
**Example Usage:**::
@mqtt.on_message()
def handle_messages(client, userdata, message):
print('Received message on topic {}: {}'
.format(message.topic, message.payload.decode()))
"""
def decorator(handler: Callable) -> Callable:
self.client.on_message = handler
return handler
return decorator
[docs] def on_publish(self) -> Callable:
"""Decorator.
Decorator to handle all messages that have been published by the
client.
**Example Usage:**::
@mqtt.on_publish()
def handle_publish(client, userdata, mid):
print('Published message with mid {}.'
.format(mid))
"""
def decorator(handler: Callable) -> Callable:
self.client.on_publish = handler
return handler
return decorator
[docs] def on_subscribe(self) -> Callable:
"""Decorate a callback function to handle subscritions.
**Usage:**::
@mqtt.on_subscribe()
def handle_subscribe(client, userdata, mid, granted_qos):
print('Subscription id {} granted with qos {}.'
.format(mid, granted_qos))
"""
def decorator(handler: Callable) -> Callable:
self.client.on_subscribe = handler
return handler
return decorator
[docs] def on_unsubscribe(self) -> Callable:
"""Decorate a callback funtion to handle unsubscribtions.
**Usage:**::
@mqtt.unsubscribe()
def handle_unsubscribe(client, userdata, mid)
print('Unsubscribed from topic (id: {})'
.format(mid)')
"""
def decorator(handler: Callable) -> Callable:
self.client.on_unsubscribe = handler
return handler
return decorator
[docs] def on_log(self) -> Callable:
"""Decorate a callback function to handle MQTT logging.
**Example Usage:**
::
@mqtt.on_log()
def handle_logging(client, userdata, level, buf):
print(client, userdata, level, buf)
"""
def decorator(handler: Callable) -> Callable:
self.client.on_log = handler
return handler
return decorator