Documentation

MqttClient
in package
implements MqttClient Uses GeneratesRandomClientIds, OffersHooks, ValidatesConfiguration

An MQTT client implementing protocol version 3.1.

Interfaces, Classes, Traits and Enums

MqttClient
An interface for the MQTT client.

Table of Contents

MQTT_3_1  = '3.1'
MQTT_3_1_1  = '3.1.1'
QOS_AT_LEAST_ONCE  = 1
QOS_AT_MOST_ONCE  = 0
QOS_EXACTLY_ONCE  = 2
SOCKET_READ_BUFFER_SIZE  = 8192
$socket  : resource|null
$buffer  : string
$bytesReceived  : int
$bytesSent  : int
$clientId  : string
$connected  : bool
$connectedEventHandlers  : SplObjectStorage|array<string|int, Closure>
$host  : string
$interrupted  : bool
$lastPingAt  : float|null
$logger  : LoggerInterface
$loopEventHandlers  : SplObjectStorage|array<string|int, Closure>
$messageProcessor  : MessageProcessor
$messageReceivedEventHandlers  : SplObjectStorage|array<string|int, Closure>
$port  : int
$publishEventHandlers  : SplObjectStorage|array<string|int, Closure>
$repository  : Repository
$settings  : ConnectionSettings
__construct()  : mixed
Constructs a new MQTT client which subsequently supports publishing and subscribing
connect()  : void
Connect to the MQTT broker using the given settings.
disconnect()  : void
Sends a disconnect message to the broker and closes the socket.
getClientId()  : string
Returns the identifier used by the client.
getHost()  : string
Returns the host used by the client to connect to.
getPort()  : int
Returns the port used by the client to connect to.
getReceivedBytes()  : int
Returns the total number of received bytes, across reconnects.
getSentBytes()  : int
Returns the total number of sent bytes, across reconnects.
interrupt()  : void
Sets the interrupted signal. Doing so instructs the client to exit the loop, if it is actually looping.
isConnected()  : bool
Returns an indication, whether the client is supposed to be connected already or not.
loop()  : void
Runs an event loop that handles messages from the server and calls the registered callbacks for published messages.
loopOnce()  : void
Runs an event loop iteration that handles messages from the server and calls the registered callbacks for published messages. Also resends pending messages and calls loop event handlers.
publish()  : void
Publishes the given message on the given topic. If the additional quality of service and retention flags are set, the message will be published using these settings.
registerConnectedEventHandler()  : MqttClient
Registers an event handler which is called when the client established a connection to the broker.
registerLoopEventHandler()  : MqttClient
Registers a loop event handler which is called each iteration of the loop.
registerMessageReceivedEventHandler()  : MqttClient
Registers an event handler which is called when a message is received from the broker.
registerPublishEventHandler()  : MqttClient
Registers a loop event handler which is called when a message is published.
subscribe()  : void
Subscribe to the given topic with the given quality of service.
unregisterConnectedEventHandler()  : MqttClient
Unregisters a connected event handler which prevents it from being called in the future.
unregisterLoopEventHandler()  : MqttClient
Unregisters a loop event handler which prevents it from being called in the future.
unregisterMessageReceivedEventHandler()  : MqttClient
Unregisters a message received event handler which prevents it from being called in the future.
unregisterPublishEventHandler()  : MqttClient
Unregisters a publish event handler which prevents it from being called in the future.
unsubscribe()  : void
Unsubscribe from the given topic.
allQueuesAreEmpty()  : bool
Determines if all queues are empty.
closeSocket()  : void
Closes the socket connection immediately, without flushing queued data.
connectInternal()  : void
Connect to the MQTT broker using the configured settings.
deliverPublishedMessage()  : void
Delivers a published message to subscribed callbacks.
ensureConnected()  : void
Ensures the client is connected to a broker (or at least thinks it is).
ensureConnectionSettingsAreValid()  : void
Ensures the given connection settings are valid. If they are not valid, which means they are misconfigured, an exception containing information about the configuration error is thrown.
establishSocketConnection()  : void
Opens a socket that connects to the host and port set on the object.
generateRandomClientId()  : string
Generates a random client id in the form of an md5 hash.
handleMessage()  : void
Handles the given message according to its contents.
initializeEventHandlers()  : void
Needs to be called in order to initialize the trait.
nextPingAt()  : float
Returns the next time the broker expects to be pinged.
performConnectionHandshake()  : void
Performs the connection handshake with the help of the configured message processor.
ping()  : void
Sends a ping message to the broker to keep the connection alive.
publishMessage()  : void
Actually publishes a message after using the configured message processor to build it.
readAllAvailableDataFromSocket()  : string
Reads all the available data from the socket using non-blocking mode. Essentially this means that {@see MqttClient::readFromSocketWithAutoReconnect()} is called over and over again, as long as data is returned.
readFromSocket()  : string
Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.
readFromSocketWithAutoReconnect()  : string
Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.
reconnect()  : void
Attempts to reconnect to the broker. If a connection cannot be established within the configured number of retries, the last caught exception is thrown.
resendPendingMessages()  : void
Republishes pending messages.
sendDisconnect()  : void
Sends a disconnect message to the broker. Does not close the socket.
sendPublishAcknowledgement()  : void
Sends a publish acknowledgement for the given message identifier.
sendPublishComplete()  : void
Sends a publish complete message for the given message identifier.
sendPublishReceived()  : void
Sends a publish received message for the given message identifier.
sendPublishRelease()  : void
Sends a publish release message for the given message identifier.
writeToSocket()  : void
Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.
writeToSocketWithAutoReconnect()  : void
Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.
parseTlsErrorMessage()  : void
Internal parser for SSL-related PHP error messages.
processMessageBuffer()  : void
Processes the incoming message buffer by parsing and handling the messages, until the buffer is empty.
runConnectedEventHandlers()  : void
Runs all the registered connected event handlers.
runLoopEventHandlers()  : void
Runs all registered loop event handlers with the given parameters.
runMessageReceivedEventHandlers()  : void
Runs all the registered message received event handlers with the given parameters.
runPublishEventHandlers()  : void
Runs all the registered publish event handlers with the given parameters.

Constants

QOS_AT_LEAST_ONCE

public mixed QOS_AT_LEAST_ONCE = 1

QOS_AT_MOST_ONCE

public mixed QOS_AT_MOST_ONCE = 0

QOS_EXACTLY_ONCE

public mixed QOS_EXACTLY_ONCE = 2

SOCKET_READ_BUFFER_SIZE

public mixed SOCKET_READ_BUFFER_SIZE = 8192

Properties

$connectedEventHandlers

private SplObjectStorage|array<string|int, Closure> $connectedEventHandlers

$lastPingAt

private float|null $lastPingAt = null

$loopEventHandlers

private SplObjectStorage|array<string|int, Closure> $loopEventHandlers

$messageReceivedEventHandlers

private SplObjectStorage|array<string|int, Closure> $messageReceivedEventHandlers

$publishEventHandlers

private SplObjectStorage|array<string|int, Closure> $publishEventHandlers

Methods

__construct()

Constructs a new MQTT client which subsequently supports publishing and subscribing

public __construct(string $host[, int $port = 1883 ][, string|null $clientId = null ][, string $protocol = self::MQTT_3_1 ][, Repository|null $repository = null ][, LoggerInterface|null $logger = null ]) : mixed

Notes:

  • If no client id is given, a random one is generated, forcing a clean session implicitly.
  • If no protocol is given, MQTT v3 is used by default.
  • If no repository is given, an in-memory repository is created for you. Once you terminate your script, all stored data (like resend queues) is lost.
  • If no logger is given, log messages are dropped. Any PSR-3 logger will work.
Parameters
$host : string
$port : int = 1883
$clientId : string|null = null
$protocol : string = self::MQTT_3_1
$repository : Repository|null = null
$logger : LoggerInterface|null = null
Tags
throws
ProtocolNotSupportedException
Return values
mixed

connect()

Connect to the MQTT broker using the given settings.

public connect([ConnectionSettings $settings = null ][, bool $useCleanSession = false ]) : void
Parameters
$settings : ConnectionSettings = null
$useCleanSession : bool = false
Return values
void

disconnect()

Sends a disconnect message to the broker and closes the socket.

public disconnect() : void
Return values
void

getClientId()

Returns the identifier used by the client.

public getClientId() : string
Return values
string

getHost()

Returns the host used by the client to connect to.

public getHost() : string
Return values
string

getPort()

Returns the port used by the client to connect to.

public getPort() : int
Return values
int

getReceivedBytes()

Returns the total number of received bytes, across reconnects.

public getReceivedBytes() : int
Return values
int

getSentBytes()

Returns the total number of sent bytes, across reconnects.

public getSentBytes() : int
Return values
int

interrupt()

Sets the interrupted signal. Doing so instructs the client to exit the loop, if it is actually looping.

public interrupt() : void
Return values
void

isConnected()

Returns an indication, whether the client is supposed to be connected already or not.

public isConnected() : bool
Return values
bool

loop()

Runs an event loop that handles messages from the server and calls the registered callbacks for published messages.

public loop([bool $allowSleep = true ][, bool $exitWhenQueuesEmpty = false ][, int $queueWaitLimit = null ]) : void
Parameters
$allowSleep : bool = true
$exitWhenQueuesEmpty : bool = false
$queueWaitLimit : int = null
Return values
void

loopOnce()

Runs an event loop iteration that handles messages from the server and calls the registered callbacks for published messages. Also resends pending messages and calls loop event handlers.

public loopOnce(float $loopStartedAt[, bool $allowSleep = false ][, int $sleepMicroseconds = 100000 ]) : void
Parameters
$loopStartedAt : float
$allowSleep : bool = false
$sleepMicroseconds : int = 100000
Return values
void

publish()

Publishes the given message on the given topic. If the additional quality of service and retention flags are set, the message will be published using these settings.

public publish(string $topic, string $message, int $qualityOfService[, bool $retain = false ]) : void
Parameters
$topic : string
$message : string
$qualityOfService : int
$retain : bool = false
Return values
void

registerConnectedEventHandler()

Registers an event handler which is called when the client established a connection to the broker.

public registerConnectedEventHandler(Closure $callback) : MqttClient

This also includes manual reconnects as well as auto-reconnects by the client itself.

The event handler is passed the MQTT client as first argument, followed by a flag which indicates whether an auto-reconnect occurred as second argument.

Example:

$mqtt->registerConnectedEventHandler(function (
    MqttClient $mqtt,
    bool $isAutoReconnect
) use ($logger) {
    if ($isAutoReconnect) {
        $logger->info("Client successfully auto-reconnected to the broker.);
    } else {
        $logger->info("Client successfully connected to the broker.");
    }
});

Multiple event handlers can be registered at the same time.

Parameters
$callback : Closure
Return values
MqttClient

registerLoopEventHandler()

Registers a loop event handler which is called each iteration of the loop.

public registerLoopEventHandler(Closure $callback) : MqttClient

This event handler can be used for example to interrupt the loop under certain conditions.

The loop event handler is passed the MQTT client instance as first and the elapsed time which the loop is already running for as second parameter. The elapsed time is a float containing seconds.

Example:

$mqtt->registerLoopEventHandler(function (
    MqttClient $mqtt,
    float $elapsedTime
) use ($logger) {
    $logger->info("Running for [{$elapsedTime}] seconds already.");
});

Multiple event handlers can be registered at the same time.

Parameters
$callback : Closure
Return values
MqttClient

registerMessageReceivedEventHandler()

Registers an event handler which is called when a message is received from the broker.

public registerMessageReceivedEventHandler(Closure $callback) : MqttClient

The message received event handler is passed the MQTT client as first, the topic as second and the message as third parameter. As fourth parameter, the QoS level will be passed and the retained flag as fifth.

Example:

$mqtt->registerReceivedMessageEventHandler(function (
    MqttClient $mqtt,
    string $topic,
    string $message,
    int $qualityOfService,
    bool $retained
) use ($logger) {
    $logger->info("Received message on topic [{$topic}]: {$message}");
});

Multiple event handlers can be registered at the same time.

Parameters
$callback : Closure
Return values
MqttClient

registerPublishEventHandler()

Registers a loop event handler which is called when a message is published.

public registerPublishEventHandler(Closure $callback) : MqttClient

The loop event handler is passed the MQTT client as first, the topic as second and the message as third parameter. As fourth parameter, the message identifier will be passed, which can be null in case of QoS 0. The QoS level as well as the retained flag will also be passed as fifth and sixth parameters.

Example:

$mqtt->registerPublishEventHandler(function (
    MqttClient $mqtt,
    string $topic,
    string $message,
    ?int $messageId,
    int $qualityOfService,
    bool $retain
) use ($logger) {
    $logger->info("Sending message on topic [{$topic}]: {$message}");
});

Multiple event handlers can be registered at the same time.

Parameters
$callback : Closure
Return values
MqttClient

subscribe()

Subscribe to the given topic with the given quality of service.

public subscribe(string $topicFilter[, callable $callback = null ][, int $qualityOfService = self::QOS_AT_MOST_ONCE ]) : void
Parameters
$topicFilter : string
$callback : callable = null
$qualityOfService : int = self::QOS_AT_MOST_ONCE
Return values
void

unregisterConnectedEventHandler()

Unregisters a connected event handler which prevents it from being called in the future.

public unregisterConnectedEventHandler([Closure|null $callback = null ]) : MqttClient

This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.

Parameters
$callback : Closure|null = null
Return values
MqttClient

unregisterLoopEventHandler()

Unregisters a loop event handler which prevents it from being called in the future.

public unregisterLoopEventHandler([Closure|null $callback = null ]) : MqttClient

This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.

Parameters
$callback : Closure|null = null
Return values
MqttClient

unregisterMessageReceivedEventHandler()

Unregisters a message received event handler which prevents it from being called in the future.

public unregisterMessageReceivedEventHandler([Closure|null $callback = null ]) : MqttClient

This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.

Parameters
$callback : Closure|null = null
Return values
MqttClient

unregisterPublishEventHandler()

Unregisters a publish event handler which prevents it from being called in the future.

public unregisterPublishEventHandler([Closure|null $callback = null ]) : MqttClient

This does not affect other registered event handlers. It is possible to unregister all registered event handlers by passing null as callback.

Parameters
$callback : Closure|null = null
Return values
MqttClient

unsubscribe()

Unsubscribe from the given topic.

public unsubscribe(string $topicFilter) : void
Parameters
$topicFilter : string
Return values
void

allQueuesAreEmpty()

Determines if all queues are empty.

protected allQueuesAreEmpty() : bool
Return values
bool

closeSocket()

Closes the socket connection immediately, without flushing queued data.

protected closeSocket() : void
Return values
void

connectInternal()

Connect to the MQTT broker using the configured settings.

protected connectInternal([bool $useCleanSession = false ][, bool $isAutoReconnect = false ]) : void
Parameters
$useCleanSession : bool = false
$isAutoReconnect : bool = false
Tags
throws
ConnectingToBrokerFailedException
Return values
void

deliverPublishedMessage()

Delivers a published message to subscribed callbacks.

protected deliverPublishedMessage(string $topic, string $message, int $qualityOfServiceLevel[, bool $retained = false ]) : void
Parameters
$topic : string
$message : string
$qualityOfServiceLevel : int
$retained : bool = false
Return values
void

ensureConnected()

Ensures the client is connected to a broker (or at least thinks it is).

protected ensureConnected() : void

This method does not account for closed sockets.

Tags
throws
ClientNotConnectedToBrokerException
Return values
void

establishSocketConnection()

Opens a socket that connects to the host and port set on the object.

protected establishSocketConnection() : void

When this method is called, all connection settings have been validated.

Tags
throws
ConnectingToBrokerFailedException
Return values
void

generateRandomClientId()

Generates a random client id in the form of an md5 hash.

protected generateRandomClientId() : string
Return values
string

initializeEventHandlers()

Needs to be called in order to initialize the trait.

protected initializeEventHandlers() : void
Return values
void

nextPingAt()

Returns the next time the broker expects to be pinged.

protected nextPingAt() : float
Return values
float

performConnectionHandshake()

Performs the connection handshake with the help of the configured message processor.

protected performConnectionHandshake([bool $useCleanSession = false ]) : void

The connection handshake is expected to have the same flow all the time:

  • Connect request with variable length
  • Connect acknowledgement with variable length
Parameters
$useCleanSession : bool = false
Tags
throws
ConnectingToBrokerFailedException
Return values
void

ping()

Sends a ping message to the broker to keep the connection alive.

protected ping() : void
Tags
throws
DataTransferException
Return values
void

publishMessage()

Actually publishes a message after using the configured message processor to build it.

protected publishMessage(string $topic, string $message, int $qualityOfService, bool $retain[, int|null $messageId = null ][, bool $isDuplicate = false ]) : void

This is an internal method used for both, initial publishing of messages as well as re-publishing in case of timeouts.

Parameters
$topic : string
$message : string
$qualityOfService : int
$retain : bool
$messageId : int|null = null
$isDuplicate : bool = false
Tags
throws
DataTransferException
Return values
void

readAllAvailableDataFromSocket()

Reads all the available data from the socket using non-blocking mode. Essentially this means that {@see MqttClient::readFromSocketWithAutoReconnect()} is called over and over again, as long as data is returned.

protected readAllAvailableDataFromSocket([bool $withAutoReconnectIfConfigured = false ]) : string
Parameters
$withAutoReconnectIfConfigured : bool = false
Tags
throws
DataTransferException
Return values
string

readFromSocket()

Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.

protected readFromSocket([int $limit = self::SOCKET_READ_BUFFER_SIZE ][, bool $withoutBlocking = false ]) : string
Parameters
$limit : int = self::SOCKET_READ_BUFFER_SIZE
$withoutBlocking : bool = false
Tags
throws
DataTransferException
Return values
string

readFromSocketWithAutoReconnect()

Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, the method will wait until {@see $limit} bytes have been received.

protected readFromSocketWithAutoReconnect([int $limit = self::SOCKET_READ_BUFFER_SIZE ][, bool $withoutBlocking = false ]) : string

If configured, this method will try to reconnect in case of transmission errors.

Parameters
$limit : int = self::SOCKET_READ_BUFFER_SIZE
$withoutBlocking : bool = false
Tags
throws
DataTransferException
Return values
string

reconnect()

Attempts to reconnect to the broker. If a connection cannot be established within the configured number of retries, the last caught exception is thrown.

protected reconnect() : void
Tags
throws
ConnectingToBrokerFailedException
Return values
void

sendDisconnect()

Sends a disconnect message to the broker. Does not close the socket.

protected sendDisconnect() : void
Tags
throws
DataTransferException
Return values
void

sendPublishAcknowledgement()

Sends a publish acknowledgement for the given message identifier.

protected sendPublishAcknowledgement(int $messageId) : void
Parameters
$messageId : int
Tags
throws
DataTransferException
Return values
void

sendPublishComplete()

Sends a publish complete message for the given message identifier.

protected sendPublishComplete(int $messageId) : void
Parameters
$messageId : int
Tags
throws
DataTransferException
Return values
void

sendPublishReceived()

Sends a publish received message for the given message identifier.

protected sendPublishReceived(int $messageId) : void
Parameters
$messageId : int
Tags
throws
DataTransferException
Return values
void

sendPublishRelease()

Sends a publish release message for the given message identifier.

protected sendPublishRelease(int $messageId) : void
Parameters
$messageId : int
Tags
throws
DataTransferException
Return values
void

writeToSocket()

Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.

protected writeToSocket(string $data[, int|null $length = null ]) : void
Parameters
$data : string
$length : int|null = null
Tags
throws
DataTransferException
Return values
void

writeToSocketWithAutoReconnect()

Writes some data to the socket. If a {@see $length} is given, and it is shorter than the data, only {@see $length} amount of bytes will be sent.

protected writeToSocketWithAutoReconnect(string $data[, int|null $length = null ]) : void

If configured, this method will try to reconnect in case of transmission errors.

Parameters
$data : string
$length : int|null = null
Tags
throws
DataTransferException
Return values
void

parseTlsErrorMessage()

Internal parser for SSL-related PHP error messages.

private parseTlsErrorMessage(array<string|int, mixed>|null $phpError[, string|null &$tlsErrorCode = null ][, string|null &$tlsErrorMessage = null ]) : void
Parameters
$phpError : array<string|int, mixed>|null
$tlsErrorCode : string|null = null
$tlsErrorMessage : string|null = null
Return values
void

runConnectedEventHandlers()

Runs all the registered connected event handlers.

private runConnectedEventHandlers(bool $isAutoReconnect) : void

Each event handler is executed in a try-catch block to avoid spilling exceptions.

Parameters
$isAutoReconnect : bool
Return values
void

runLoopEventHandlers()

Runs all registered loop event handlers with the given parameters.

private runLoopEventHandlers(float $elapsedTime) : void

Each event handler is executed in a try-catch block to avoid spilling exceptions.

Parameters
$elapsedTime : float
Return values
void

runMessageReceivedEventHandlers()

Runs all the registered message received event handlers with the given parameters.

private runMessageReceivedEventHandlers(string $topic, string $message, int $qualityOfService, bool $retained) : void

Each event handler is executed in a try-catch block to avoid spilling exceptions.

Parameters
$topic : string
$message : string
$qualityOfService : int
$retained : bool
Return values
void

runPublishEventHandlers()

Runs all the registered publish event handlers with the given parameters.

private runPublishEventHandlers(string $topic, string $message, int|null $messageId, int $qualityOfService, bool $retain) : void

Each event handler is executed in a try-catch block to avoid spilling exceptions.

Parameters
$topic : string
$message : string
$messageId : int|null
$qualityOfService : int
$retain : bool
Return values
void

Search results