MqttClient
in package
implements
MqttClient
Uses
GeneratesRandomClientIds, OffersHooks, ValidatesConfiguration
An MQTT client implementing protocol version 3.1.
Interfaces, Classes, Traits and Enums
- MqttClient
- An interface for the MQTT client.
Table of Contents
- MQTT_3_1 = '3.1'
- MQTT_3_1_1 = '3.1.1'
- QOS_AT_LEAST_ONCE = 1
- QOS_AT_MOST_ONCE = 0
- QOS_EXACTLY_ONCE = 2
- SOCKET_READ_BUFFER_SIZE = 8192
- $socket : resource|null
- $buffer : string
- $bytesReceived : int
- $bytesSent : int
- $clientId : string
- $connected : bool
- $connectedEventHandlers : SplObjectStorage|array<string|int, Closure>
- $host : string
- $interrupted : bool
- $lastPingAt : float|null
- $logger : LoggerInterface
- $loopEventHandlers : SplObjectStorage|array<string|int, Closure>
- $messageProcessor : MessageProcessor
- $messageReceivedEventHandlers : SplObjectStorage|array<string|int, Closure>
- $port : int
- $publishEventHandlers : SplObjectStorage|array<string|int, Closure>
- $repository : Repository
- $settings : ConnectionSettings
- __construct() : mixed
- Constructs a new MQTT client which subsequently supports publishing and subscribing
- connect() : void
- Connect to the MQTT broker using the given settings.
- disconnect() : void
- Sends a disconnect message to the broker and closes the socket.
- getClientId() : string
- Returns the identifier used by the client.
- getHost() : string
- Returns the host used by the client to connect to.
- getPort() : int
- Returns the port used by the client to connect to.
- getReceivedBytes() : int
- Returns the total number of received bytes, across reconnects.
- getSentBytes() : int
- Returns the total number of sent bytes, across reconnects.
- interrupt() : void
- Sets the interrupted signal. Doing so instructs the client to exit the loop, if it is actually looping.
- isConnected() : bool
- Returns an indication, whether the client is supposed to be connected already or not.
- loop() : void
- Runs an event loop that handles messages from the server and calls the registered callbacks for published messages.
- loopOnce() : void
- Runs an event loop iteration that handles messages from the server and calls the registered callbacks for published messages. Also resends pending messages and calls loop event handlers.
- publish() : void
- Publishes the given message on the given topic. If the additional quality of service and retention flags are set, the message will be published using these settings.
- registerConnectedEventHandler() : MqttClient
- Registers an event handler which is called when the client established a connection to the broker.
- registerLoopEventHandler() : MqttClient
- Registers a loop event handler which is called each iteration of the loop.
- registerMessageReceivedEventHandler() : MqttClient
- Registers an event handler which is called when a message is received from the broker.
- registerPublishEventHandler() : MqttClient
- Registers a loop event handler which is called when a message is published.
- subscribe() : void
- Subscribe to the given topic with the given quality of service.
- unregisterConnectedEventHandler() : MqttClient
- Unregisters a connected event handler which prevents it from being called in the future.
- unregisterLoopEventHandler() : MqttClient
- Unregisters a loop event handler which prevents it from being called in the future.
- unregisterMessageReceivedEventHandler() : MqttClient
- Unregisters a message received event handler which prevents it from being called in the future.
- unregisterPublishEventHandler() : MqttClient
- Unregisters a publish event handler which prevents it from being called in the future.
- unsubscribe() : void
- Unsubscribe from the given topic.
- allQueuesAreEmpty() : bool
- Determines if all queues are empty.
- closeSocket() : void
- Closes the socket connection immediately, without flushing queued data.
- connectInternal() : void
- Connect to the MQTT broker using the configured settings.
- deliverPublishedMessage() : void
- Delivers a published message to subscribed callbacks.
- ensureConnected() : void
- Ensures the client is connected to a broker (or at least thinks it is).
- ensureConnectionSettingsAreValid() : void
- Ensures the given connection settings are valid. If they are not valid, which means they are misconfigured, an exception containing information about the configuration error is thrown.
- establishSocketConnection() : void
- Opens a socket that connects to the host and port set on the object.
- generateRandomClientId() : string
- Generates a random client id in the form of an md5 hash.
- handleMessage() : void
- Handles the given message according to its contents.
- initializeEventHandlers() : void
- Needs to be called in order to initialize the trait.
- nextPingAt() : float
- Returns the next time the broker expects to be pinged.
- performConnectionHandshake() : void
- Performs the connection handshake with the help of the configured message processor.
- ping() : void
- Sends a ping message to the broker to keep the connection alive.
- publishMessage() : void
- Actually publishes a message after using the configured message processor to build it.
- readAllAvailableDataFromSocket() : string
- Reads all the available data from the socket using non-blocking mode. Essentially this means that {@see MqttClient::readFromSocketWithAutoReconnect()} is called over and over again, as long as data is returned.
- readFromSocket() : string
- Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.
- readFromSocketWithAutoReconnect() : string
- Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.
- reconnect() : void
- Attempts to reconnect to the broker. If a connection cannot be established within the configured number of retries, the last caught exception is thrown.
- resendPendingMessages() : void
- Republishes pending messages.
- sendDisconnect() : void
- Sends a disconnect message to the broker. Does not close the socket.
- sendPublishAcknowledgement() : void
- Sends a publish acknowledgement for the given message identifier.
- sendPublishComplete() : void
- Sends a publish complete message for the given message identifier.
- sendPublishReceived() : void
- Sends a publish received message for the given message identifier.
- sendPublishRelease() : void
- Sends a publish release message for the given message identifier.
- writeToSocket() : void
- Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.
- writeToSocketWithAutoReconnect() : void
- Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.
- parseTlsErrorMessage() : void
- Internal parser for SSL-related PHP error messages.
- processMessageBuffer() : void
- Processes the incoming message buffer by parsing and handling the messages, until the buffer is empty.
- runConnectedEventHandlers() : void
- Runs all the registered connected event handlers.
- runLoopEventHandlers() : void
- Runs all registered loop event handlers with the given parameters.
- runMessageReceivedEventHandlers() : void
- Runs all the registered message received event handlers with the given parameters.
- runPublishEventHandlers() : void
- Runs all the registered publish event handlers with the given parameters.
Constants
MQTT_3_1
public
mixed
MQTT_3_1
= '3.1'
MQTT_3_1_1
public
mixed
MQTT_3_1_1
= '3.1.1'
QOS_AT_LEAST_ONCE
public
mixed
QOS_AT_LEAST_ONCE
= 1
QOS_AT_MOST_ONCE
public
mixed
QOS_AT_MOST_ONCE
= 0
QOS_EXACTLY_ONCE
public
mixed
QOS_EXACTLY_ONCE
= 2
SOCKET_READ_BUFFER_SIZE
public
mixed
SOCKET_READ_BUFFER_SIZE
= 8192
Properties
$socket
protected
resource|null
$socket
$buffer
private
string
$buffer
= ''
$bytesReceived
private
int
$bytesReceived
= 0
$bytesSent
private
int
$bytesSent
= 0
$clientId
private
string
$clientId
$connected
private
bool
$connected
= false
$connectedEventHandlers
private
SplObjectStorage|array<string|int, Closure>
$connectedEventHandlers
$host
private
string
$host
$interrupted
private
bool
$interrupted
= false
$lastPingAt
private
float|null
$lastPingAt
= null
$logger
private
LoggerInterface
$logger
$loopEventHandlers
private
SplObjectStorage|array<string|int, Closure>
$loopEventHandlers
$messageProcessor
private
MessageProcessor
$messageProcessor
$messageReceivedEventHandlers
private
SplObjectStorage|array<string|int, Closure>
$messageReceivedEventHandlers
$port
private
int
$port
$publishEventHandlers
private
SplObjectStorage|array<string|int, Closure>
$publishEventHandlers
$repository
private
Repository
$repository
$settings
private
ConnectionSettings
$settings
Methods
__construct()
Constructs a new MQTT client which subsequently supports publishing and subscribing
public
__construct(string $host[, int $port = 1883 ][, string|null $clientId = null ][, string $protocol = self::MQTT_3_1 ][, Repository|null $repository = null ][, LoggerInterface|null $logger = null ]) : mixed
Notes:
- If no client id is given, a random one is generated, forcing a clean session implicitly.
- If no protocol is given, MQTT v3 is used by default.
- If no repository is given, an in-memory repository is created for you. Once you terminate your script, all stored data (like resend queues) is lost.
- If no logger is given, log messages are dropped. Any PSR-3 logger will work.
Parameters
- $host : string
- $port : int = 1883
- $clientId : string|null = null
- $protocol : string = self::MQTT_3_1
- $repository : Repository|null = null
- $logger : LoggerInterface|null = null
Tags
Return values
mixed —connect()
Connect to the MQTT broker using the given settings.
public
connect([ConnectionSettings $settings = null ][, bool $useCleanSession = false ]) : void
Parameters
- $settings : ConnectionSettings = null
- $useCleanSession : bool = false
Return values
void —disconnect()
Sends a disconnect message to the broker and closes the socket.
public
disconnect() : void
Return values
void —getClientId()
Returns the identifier used by the client.
public
getClientId() : string
Return values
string —getHost()
Returns the host used by the client to connect to.
public
getHost() : string
Return values
string —getPort()
Returns the port used by the client to connect to.
public
getPort() : int
Return values
int —getReceivedBytes()
Returns the total number of received bytes, across reconnects.
public
getReceivedBytes() : int
Return values
int —getSentBytes()
Returns the total number of sent bytes, across reconnects.
public
getSentBytes() : int
Return values
int —interrupt()
Sets the interrupted signal. Doing so instructs the client to exit the loop, if it is actually looping.
public
interrupt() : void
Return values
void —isConnected()
Returns an indication, whether the client is supposed to be connected already or not.
public
isConnected() : bool
Return values
bool —loop()
Runs an event loop that handles messages from the server and calls the registered callbacks for published messages.
public
loop([bool $allowSleep = true ][, bool $exitWhenQueuesEmpty = false ][, int $queueWaitLimit = null ]) : void
Parameters
- $allowSleep : bool = true
- $exitWhenQueuesEmpty : bool = false
- $queueWaitLimit : int = null
Return values
void —loopOnce()
Runs an event loop iteration that handles messages from the server and calls the registered callbacks for published messages. Also resends pending messages and calls loop event handlers.
public
loopOnce(float $loopStartedAt[, bool $allowSleep = false ][, int $sleepMicroseconds = 100000 ]) : void
Parameters
- $loopStartedAt : float
- $allowSleep : bool = false
- $sleepMicroseconds : int = 100000
Return values
void —publish()
Publishes the given message on the given topic. If the additional quality of service and retention flags are set, the message will be published using these settings.
public
publish(string $topic, string $message, int $qualityOfService[, bool $retain = false ]) : void
Parameters
- $topic : string
- $message : string
- $qualityOfService : int
- $retain : bool = false
Return values
void —registerConnectedEventHandler()
Registers an event handler which is called when the client established a connection to the broker.
public
registerConnectedEventHandler(Closure $callback) : MqttClient
This also includes manual reconnects as well as auto-reconnects by the client itself.
The event handler is passed the MQTT client as first argument, followed by a flag which indicates whether an auto-reconnect occurred as second argument.
Example:
$mqtt->registerConnectedEventHandler(function (
MqttClient $mqtt,
bool $isAutoReconnect
) use ($logger) {
if ($isAutoReconnect) {
$logger->info("Client successfully auto-reconnected to the broker.);
} else {
$logger->info("Client successfully connected to the broker.");
}
});
Multiple event handlers can be registered at the same time.
Parameters
- $callback : Closure
Return values
MqttClient —registerLoopEventHandler()
Registers a loop event handler which is called each iteration of the loop.
public
registerLoopEventHandler(Closure $callback) : MqttClient
This event handler can be used for example to interrupt the loop under certain conditions.
The loop event handler is passed the MQTT client instance as first and the elapsed time which the loop is already running for as second parameter. The elapsed time is a float containing seconds.
Example:
$mqtt->registerLoopEventHandler(function (
MqttClient $mqtt,
float $elapsedTime
) use ($logger) {
$logger->info("Running for [{$elapsedTime}] seconds already.");
});
Multiple event handlers can be registered at the same time.
Parameters
- $callback : Closure
Return values
MqttClient —registerMessageReceivedEventHandler()
Registers an event handler which is called when a message is received from the broker.
public
registerMessageReceivedEventHandler(Closure $callback) : MqttClient
The message received event handler is passed the MQTT client as first, the topic as second and the message as third parameter. As fourth parameter, the QoS level will be passed and the retained flag as fifth.
Example:
$mqtt->registerReceivedMessageEventHandler(function (
MqttClient $mqtt,
string $topic,
string $message,
int $qualityOfService,
bool $retained
) use ($logger) {
$logger->info("Received message on topic [{$topic}]: {$message}");
});
Multiple event handlers can be registered at the same time.
Parameters
- $callback : Closure
Return values
MqttClient —registerPublishEventHandler()
Registers a loop event handler which is called when a message is published.
public
registerPublishEventHandler(Closure $callback) : MqttClient
The loop event handler is passed the MQTT client as first, the topic as second and the message as third parameter. As fourth parameter, the message identifier will be passed, which can be null in case of QoS 0. The QoS level as well as the retained flag will also be passed as fifth and sixth parameters.
Example:
$mqtt->registerPublishEventHandler(function (
MqttClient $mqtt,
string $topic,
string $message,
?int $messageId,
int $qualityOfService,
bool $retain
) use ($logger) {
$logger->info("Sending message on topic [{$topic}]: {$message}");
});
Multiple event handlers can be registered at the same time.
Parameters
- $callback : Closure
Return values
MqttClient —subscribe()
Subscribe to the given topic with the given quality of service.
public
subscribe(string $topicFilter[, callable $callback = null ][, int $qualityOfService = self::QOS_AT_MOST_ONCE ]) : void
Parameters
- $topicFilter : string
- $callback : callable = null
- $qualityOfService : int = self::QOS_AT_MOST_ONCE
Return values
void —unregisterConnectedEventHandler()
Unregisters a connected event handler which prevents it from being called in the future.
public
unregisterConnectedEventHandler([Closure|null $callback = null ]) : MqttClient
This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.
Parameters
- $callback : Closure|null = null
Return values
MqttClient —unregisterLoopEventHandler()
Unregisters a loop event handler which prevents it from being called in the future.
public
unregisterLoopEventHandler([Closure|null $callback = null ]) : MqttClient
This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.
Parameters
- $callback : Closure|null = null
Return values
MqttClient —unregisterMessageReceivedEventHandler()
Unregisters a message received event handler which prevents it from being called in the future.
public
unregisterMessageReceivedEventHandler([Closure|null $callback = null ]) : MqttClient
This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.
Parameters
- $callback : Closure|null = null
Return values
MqttClient —unregisterPublishEventHandler()
Unregisters a publish event handler which prevents it from being called in the future.
public
unregisterPublishEventHandler([Closure|null $callback = null ]) : MqttClient
This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.
Parameters
- $callback : Closure|null = null
Return values
MqttClient —unsubscribe()
Unsubscribe from the given topic.
public
unsubscribe(string $topicFilter) : void
Parameters
- $topicFilter : string
Return values
void —allQueuesAreEmpty()
Determines if all queues are empty.
protected
allQueuesAreEmpty() : bool
Return values
bool —closeSocket()
Closes the socket connection immediately, without flushing queued data.
protected
closeSocket() : void
Return values
void —connectInternal()
Connect to the MQTT broker using the configured settings.
protected
connectInternal([bool $useCleanSession = false ][, bool $isAutoReconnect = false ]) : void
Parameters
- $useCleanSession : bool = false
- $isAutoReconnect : bool = false
Tags
Return values
void —deliverPublishedMessage()
Delivers a published message to subscribed callbacks.
protected
deliverPublishedMessage(string $topic, string $message, int $qualityOfServiceLevel[, bool $retained = false ]) : void
Parameters
- $topic : string
- $message : string
- $qualityOfServiceLevel : int
- $retained : bool = false
Return values
void —ensureConnected()
Ensures the client is connected to a broker (or at least thinks it is).
protected
ensureConnected() : void
This method does not account for closed sockets.
Tags
Return values
void —ensureConnectionSettingsAreValid()
Ensures the given connection settings are valid. If they are not valid, which means they are misconfigured, an exception containing information about the configuration error is thrown.
protected
ensureConnectionSettingsAreValid(ConnectionSettings $settings) : void
Parameters
- $settings : ConnectionSettings
Tags
Return values
void —establishSocketConnection()
Opens a socket that connects to the host and port set on the object.
protected
establishSocketConnection() : void
When this method is called, all connection settings have been validated.
Tags
Return values
void —generateRandomClientId()
Generates a random client id in the form of an md5 hash.
protected
generateRandomClientId() : string
Return values
string —handleMessage()
Handles the given message according to its contents.
protected
handleMessage(Message $message) : void
Parameters
- $message : Message
Tags
Return values
void —initializeEventHandlers()
Needs to be called in order to initialize the trait.
protected
initializeEventHandlers() : void
Return values
void —nextPingAt()
Returns the next time the broker expects to be pinged.
protected
nextPingAt() : float
Return values
float —performConnectionHandshake()
Performs the connection handshake with the help of the configured message processor.
protected
performConnectionHandshake([bool $useCleanSession = false ]) : void
The connection handshake is expected to have the same flow all the time:
- Connect request with variable length
- Connect acknowledgement with variable length
Parameters
- $useCleanSession : bool = false
Tags
Return values
void —ping()
Sends a ping message to the broker to keep the connection alive.
protected
ping() : void
Tags
Return values
void —publishMessage()
Actually publishes a message after using the configured message processor to build it.
protected
publishMessage(string $topic, string $message, int $qualityOfService, bool $retain[, int|null $messageId = null ][, bool $isDuplicate = false ]) : void
This is an internal method used for both, initial publishing of messages as well as re-publishing in case of timeouts.
Parameters
- $topic : string
- $message : string
- $qualityOfService : int
- $retain : bool
- $messageId : int|null = null
- $isDuplicate : bool = false
Tags
Return values
void —readAllAvailableDataFromSocket()
Reads all the available data from the socket using non-blocking mode. Essentially this means that {@see MqttClient::readFromSocketWithAutoReconnect()} is called over and over again, as long as data is returned.
protected
readAllAvailableDataFromSocket([bool $withAutoReconnectIfConfigured = false ]) : string
Parameters
- $withAutoReconnectIfConfigured : bool = false
Tags
Return values
string —readFromSocket()
Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.
protected
readFromSocket([int $limit = self::SOCKET_READ_BUFFER_SIZE ][, bool $withoutBlocking = false ]) : string
Parameters
- $limit : int = self::SOCKET_READ_BUFFER_SIZE
- $withoutBlocking : bool = false
Tags
Return values
string —readFromSocketWithAutoReconnect()
Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.
protected
readFromSocketWithAutoReconnect([int $limit = self::SOCKET_READ_BUFFER_SIZE ][, bool $withoutBlocking = false ]) : string
If configured, this method will try to reconnect in case of transmission errors.
Parameters
- $limit : int = self::SOCKET_READ_BUFFER_SIZE
- $withoutBlocking : bool = false
Tags
Return values
string —reconnect()
Attempts to reconnect to the broker. If a connection cannot be established within the configured number of retries, the last caught exception is thrown.
protected
reconnect() : void
Tags
Return values
void —resendPendingMessages()
Republishes pending messages.
protected
resendPendingMessages() : void
Tags
Return values
void —sendDisconnect()
Sends a disconnect message to the broker. Does not close the socket.
protected
sendDisconnect() : void
Tags
Return values
void —sendPublishAcknowledgement()
Sends a publish acknowledgement for the given message identifier.
protected
sendPublishAcknowledgement(int $messageId) : void
Parameters
- $messageId : int
Tags
Return values
void —sendPublishComplete()
Sends a publish complete message for the given message identifier.
protected
sendPublishComplete(int $messageId) : void
Parameters
- $messageId : int
Tags
Return values
void —sendPublishReceived()
Sends a publish received message for the given message identifier.
protected
sendPublishReceived(int $messageId) : void
Parameters
- $messageId : int
Tags
Return values
void —sendPublishRelease()
Sends a publish release message for the given message identifier.
protected
sendPublishRelease(int $messageId) : void
Parameters
- $messageId : int
Tags
Return values
void —writeToSocket()
Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.
protected
writeToSocket(string $data[, int|null $length = null ]) : void
Parameters
- $data : string
- $length : int|null = null
Tags
Return values
void —writeToSocketWithAutoReconnect()
Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.
protected
writeToSocketWithAutoReconnect(string $data[, int|null $length = null ]) : void
If configured, this method will try to reconnect in case of transmission errors.
Parameters
- $data : string
- $length : int|null = null
Tags
Return values
void —parseTlsErrorMessage()
Internal parser for SSL-related PHP error messages.
private
parseTlsErrorMessage(array<string|int, mixed>|null $phpError[, string|null &$tlsErrorCode = null ][, string|null &$tlsErrorMessage = null ]) : void
Parameters
- $phpError : array<string|int, mixed>|null
- $tlsErrorCode : string|null = null
- $tlsErrorMessage : string|null = null
Return values
void —processMessageBuffer()
Processes the incoming message buffer by parsing and handling the messages, until the buffer is empty.
private
processMessageBuffer() : void
Tags
Return values
void —runConnectedEventHandlers()
Runs all the registered connected event handlers.
private
runConnectedEventHandlers(bool $isAutoReconnect) : void
Each event handler is executed in a try-catch block to avoid spilling exceptions.
Parameters
- $isAutoReconnect : bool
Return values
void —runLoopEventHandlers()
Runs all registered loop event handlers with the given parameters.
private
runLoopEventHandlers(float $elapsedTime) : void
Each event handler is executed in a try-catch block to avoid spilling exceptions.
Parameters
- $elapsedTime : float
Return values
void —runMessageReceivedEventHandlers()
Runs all the registered message received event handlers with the given parameters.
private
runMessageReceivedEventHandlers(string $topic, string $message, int $qualityOfService, bool $retained) : void
Each event handler is executed in a try-catch block to avoid spilling exceptions.
Parameters
- $topic : string
- $message : string
- $qualityOfService : int
- $retained : bool
Return values
void —runPublishEventHandlers()
Runs all the registered publish event handlers with the given parameters.
private
runPublishEventHandlers(string $topic, string $message, int|null $messageId, int $qualityOfService, bool $retain) : void
Each event handler is executed in a try-catch block to avoid spilling exceptions.
Parameters
- $topic : string
- $message : string
- $messageId : int|null
- $qualityOfService : int
- $retain : bool