Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 182 additions & 16 deletions src/UserGuide/Master/Table/API/Programming-MQTT.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,20 @@

## 1. Overview

MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).

IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.

![](/img/mqtt-table-en-1.png)
<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">


## 2. Configuration

By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.

| **Property** | **Description** | **Default** |
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- | ------------------- |
| `enable_mqtt_service` | Enable/ disable the MQTT service. | FALSE |
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** |
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |

## 3. Write Protocol
## 2. Built-in MQTT Service
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
and then write the data into storage immediately.
The MQTT topic corresponds to IoTDB timeseries.The first segment of the MQTT topic (split by `/`) is used as the database name.The table name is derived from the `<measurement>` in the line protocol.
The messages payload can be format to events by `PayloadFormatter` which loaded by java SPI, and the implementation of `PayloadFormatter` for table is `LinePayloadFormatter`.
The following is the line protocol syntax of MQTT message payload and an example:

* Line Protocol Syntax

Expand All @@ -56,7 +49,23 @@ By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IO
myMeasurement,tag1=value1,tag2=value2 attr1=value1,attr2=value2 fieldKey="fieldValue" 1556813561098000000
```

![](/img/mqtt-table-en-2.png)


## 3. MQTT Configurations

By default, the IoTDB MQTT service loads configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties`.

Configurations are as follows:

| **Property** | **Description** | **Default** |
| ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |-------------|
| `enable_mqtt_service` | Enable/ disable the MQTT service. | false |
| `mqtt_host` | Host address bound to the MQTT service. | 127.0.0.1 |
| `mqtt_port` | Port bound to the MQTT service. | 1883 |
| `mqtt_handler_pool_size` | Thread pool size for processing MQTT messages. | 1 |
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** |
| `mqtt_max_message_size` | Maximum allowed MQTT message size (bytes). | 1048576 |


## 4. Naming Conventions

Expand Down Expand Up @@ -88,3 +97,160 @@ The table name is derived from the `<measurement>` in the line protocol.
| 1`i32`<br>123`i32` | INT32 |
| `"xxx"` | TEXT |
| `t`,`T`,`true`,`True`,`TRUE`<br> `f`,`F`,`false`,`False`,`FALSE` | BOOLEAN |


## 5. Coding Examples
The following is an example which a mqtt client send messages to IoTDB server.

```java
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
mqtt.setUserName("root");
mqtt.setPassword("root");

BlockingConnection connection = mqtt.blockingConnection();
String DATABASE = "myMqttTest";
connection.connect();

String payload =
"test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 field1=\"fieldValue1\",field2=1i,field3=1u 1";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2";
connection.publish(DATABASE, payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 6";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

//batch write example
payload =
"test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n "
+ "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

//batch write example
payload =
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
+ "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);

connection.disconnect();
```



## 6. Customize your MQTT Message Format

If you do not like the above Line format, you can customize your MQTT Message format by just writing several lines
of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project.

Steps:
1. Create a java project, and add dependency:
```xml
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>2.0.4-SNAPSHOT</version>
</dependency>
```
2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`
e.g.,

```java
package org.apache.iotdb.mqtt.server;

import io.netty.buffer.ByteBuf;
import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class CustomizedLinePayloadFormatter implements PayloadFormatter {

@Override
public List<Message> format(String topic, ByteBuf payload) {
// Suppose the payload is a line format
if (payload == null) {
return null;
}

String line = payload.toString(StandardCharsets.UTF_8);
// parse data from the line and generate Messages and put them into List<Meesage> ret
List<Message> ret = new ArrayList<>();
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 3; i++) {
long ts = i;
TableMessage message = new TableMessage();

// Parsing Database Name
message.setDatabase("db" + i);

//Parsing Table Names
message.setTable("t" + i);

// Parsing Tags
List<String> tagKeys = new ArrayList<>();
tagKeys.add("tag1" + i);
tagKeys.add("tag2" + i);
List<Object> tagValues = new ArrayList<>();
tagValues.add("t_value1" + i);
tagValues.add("t_value2" + i);
message.setTagKeys(tagKeys);
message.setTagValues(tagValues);

// Parsing Attributes
List<String> attributeKeys = new ArrayList<>();
List<Object> attributeValues = new ArrayList<>();
attributeKeys.add("attr1" + i);
attributeKeys.add("attr2" + i);
attributeValues.add("a_value1" + i);
attributeValues.add("a_value2" + i);
message.setAttributeKeys(attributeKeys);
message.setAttributeValues(attributeValues);

// Parsing Fields
List<String> fields = Arrays.asList("field1" + i, "field2" + i);
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
List<Object> values = Arrays.asList("4.0" + i, "5.0" + i);
message.setFields(fields);
message.setDataTypes(dataTypes);
message.setValues(values);

//// Parsing timestamp
message.setTimestamp(ts);
ret.add(message);
}
return ret;
}

@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
return "CustomizedLine";
}
}
```
3. modify the file in `src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`:
clean the file and put your implementation class name into the file.
In this example, the content is: `org.apache.iotdb.mqtt.server.CustomizedLinePayloadFormatter`
4. compile your implementation as a jar file: `mvn package -DskipTests`


Then, in your server:
1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder.
2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`)
3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation
, in this example, the value is `CustomizedLine`
4. Launch the IoTDB server.
5. Now IoTDB will use your implementation to parse the MQTT message.

More: the message format can be anything you want. For example, if it is a binary format,
just use `payload.forEachByte()` or `payload.array` to get bytes content.
74 changes: 52 additions & 22 deletions src/UserGuide/Master/Tree/API/Programming-MQTT.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@
-->
# MQTT Protocol

[MQTT](http://mqtt.org/) is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
It was designed as an extremely lightweight publish/subscribe messaging transport.
It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
## 1. Overview

IoTDB supports the MQTT v3.1(an OASIS Standard) protocol.
IoTDB server includes a built-in MQTT service that allows remote devices send messages into IoTDB server directly.
[MQTT](Message Queuing Telemetry Transport)(http://mqtt.org/) is a lightweight messaging protocol designed for IoT and low-bandwidth environments. It operates on a Publish/Subscribe (Pub/Sub) model, enabling efficient and reliable bidirectional communication between devices. Its core objectives are low power consumption, minimal bandwidth usage, and high real-time performance, making it ideal for unstable networks or resource-constrained scenarios (e.g., sensors, mobile devices).

<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">
IoTDB provides deep integration with the MQTT protocol, fully compliant with MQTT v3.1 (OASIS International Standard). The IoTDB server includes a built-in high-performance MQTT Broker module, eliminating the need for third-party middleware. Devices can directly write time-series data into the IoTDB storage engine via MQTT messages.

<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357432-0c71cf80-75e4-11ea-98aa-c43a54d469ce.png">

## 1. Built-in MQTT Service
[Programming-MQTT.md](Programming-MQTT.md)
## 2. Built-in MQTT Service
The Built-in MQTT Service provide the ability of direct connection to IoTDB through MQTT. It listen the publish messages from MQTT clients
and then write the data into storage immediately.
The MQTT topic corresponds to IoTDB timeseries.
Expand Down Expand Up @@ -58,22 +57,22 @@ or json array of the above two.

<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="/img/github/78357469-1bf11880-75e4-11ea-978f-a53996667a0d.png">

## 2. MQTT Configurations
## 3. MQTT Configurations
The IoTDB MQTT service load configurations from `${IOTDB_HOME}/${IOTDB_CONF}/iotdb-system.properties` by default.

Configurations are as follows:

| NAME | DESCRIPTION | DEFAULT |
| **Property** | DESCRIPTION | DEFAULT |
| ------------- |:-------------:|:------:|
| enable_mqtt_service | whether to enable the mqtt service | false |
| mqtt_host | the mqtt service binding host | 127.0.0.1 |
| mqtt_port | the mqtt service binding port | 1883 |
| mqtt_handler_pool_size | the handler pool size for handing the mqtt messages | 1 |
| mqtt_payload_formatter | the mqtt message payload formatter | json |
| mqtt_max_message_size | the max mqtt message size in byte| 1048576 |
| `enable_mqtt_service` | whether to enable the mqtt service | false |
| `mqtt_host` | the mqtt service binding host | 127.0.0.1 |
| `mqtt_port` | the mqtt service binding port | 1883 |
| `mqtt_handler_pool_size` | the handler pool size for handing the mqtt messages | 1 |
| **`mqtt_payload_formatter`** | **Formatting method for MQTT message payloads. ​**​**Options: `json` (tree model), `line` (table model).** | **json** |
| `mqtt_max_message_size` | the max mqtt message size in byte| 1048576 |


## 3. Coding Examples
## 4. Coding Examples
The following is an example which a mqtt client send messages to IoTDB server.

```java
Expand Down Expand Up @@ -101,18 +100,49 @@ connection.disconnect();

```

## 4. Customize your MQTT Message Format
## 5. Customize your MQTT Message Format

In a production environment, each device typically has its own MQTT client, and the message formats of these clients have been pre-defined. If communication is to be carried out in accordance with the MQTT message format supported by IoTDB, a comprehensive upgrade and transformation of all existing clients would be required, which would undoubtedly incur significant costs. However, we can easily achieve customization of the MQTT message format through simple programming means, without the need to modify the clients.
An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project.

If you do not like the above Json format, you can customize your MQTT Message format by just writing several lines
of codes. An example can be found in [example/mqtt-customize](https://github.com/apache/iotdb/tree/master/example/mqtt-customize) project.
Assuming the MQTT client sends the following message format:
```json
{
"time":1586076045523,
"deviceID":"car_1",
"deviceType":"油车",
"point":"油量",
"value":10.0
}
```
或者JSON的数组形式:
```java
[
{
"time":1586076045523,
"deviceID":"car_1",
"deviceType":"油车",
"point":"油量",
"value":10.0
},
{
"time":1586076045524,
"deviceID":"car_2",
"deviceType":"新能源车",
"point":"速度",
"value":80.0
}
]
```

Then you can set up the custom MQTT message format through the following steps:
Steps:
1. Create a java project, and add dependency:
```xml
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>2.0.4-SNAPSHOT</version>
</dependency>
```
2. Define your implementation which implements `org.apache.iotdb.db.protocol.mqtt.PayloadFormatter`
Expand All @@ -133,7 +163,7 @@ import java.util.List;
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {

@Override
public List<Message> format(ByteBuf payload) {
public List<Message> format(String topic, ByteBuf payload) {
// Suppose the payload is a json format
if (payload == null) {
return null;
Expand Down Expand Up @@ -172,7 +202,7 @@ Then, in your server:
1. Create ${IOTDB_HOME}/ext/mqtt/ folder, and put the jar into this folder.
2. Update configuration to enable MQTT service. (`enable_mqtt_service=true` in `conf/iotdb-system.properties`)
3. Set the value of `mqtt_payload_formatter` in `conf/iotdb-system.properties` as the value of getName() in your implementation
, in this example, the value is `CustomizedJson`
, in this example, the value is `CustomizedJson2Table`
4. Launch the IoTDB server.
5. Now IoTDB will use your implementation to parse the MQTT message.

Expand Down
Loading