數(shù)據(jù)同步
數(shù)據(jù)同步
數(shù)據(jù)同步是工業(yè)物聯(lián)網的典型需求,通過數(shù)據(jù)同步機制,可實現(xiàn) IoTDB 之間的數(shù)據(jù)共享,搭建完整的數(shù)據(jù)鏈路來滿足內網外網數(shù)據(jù)互通、端邊云同步、數(shù)據(jù)遷移、數(shù)據(jù)備份等需求。
功能概述
數(shù)據(jù)同步
一個數(shù)據(jù)同步任務包含 3 個階段:

- 抽取(Source)階段:該部分用于從源 IoTDB 抽取數(shù)據(jù),在 SQL 語句中的 source 部分定義
- 處理(Process)階段:該部分用于處理從源 IoTDB 抽取出的數(shù)據(jù),在 SQL 語句中的 processor 部分定義
- 發(fā)送(Sink)階段:該部分用于向目標 IoTDB 發(fā)送數(shù)據(jù),在 SQL 語句中的 sink 部分定義
通過 SQL 語句聲明式地配置 3 個部分的具體內容,可實現(xiàn)靈活的數(shù)據(jù)同步能力。目前數(shù)據(jù)同步支持以下信息的同步,您可以在創(chuàng)建同步任務時對同步范圍進行選擇(默認選擇 data.insert,即同步新寫入的數(shù)據(jù)):
| 同步范圍 | 同步內容 | 說明 |
|---|---|---|
| all | 所有范圍 | |
| data(數(shù)據(jù)) | insert(增量) | 同步新寫入的數(shù)據(jù) |
| delete(刪除) | 同步被刪除的數(shù)據(jù) | |
| schema(元數(shù)據(jù)) | database(數(shù)據(jù)庫) | 同步數(shù)據(jù)庫的創(chuàng)建、修改或刪除操作 |
| timeseries(時間序列) | 同步時間序列的定義和屬性 | |
| TTL(數(shù)據(jù)到期時間) | 同步數(shù)據(jù)的存活時間 | |
| auth(權限) | - | 同步用戶權限和訪問控制 |
功能限制及說明
元數(shù)據(jù)(schema)、權限(auth)同步功能存在如下限制:
使用元數(shù)據(jù)同步時,要求
Schema region、ConfigNode的共識協(xié)議必須為默認的 ratis 協(xié)議,即iotdb-system.properties配置文件中是否包含config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus、schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus,不包含即為默認值ratis 協(xié)議。為了防止?jié)撛诘臎_突,請在開啟元數(shù)據(jù)同步時關閉接收端自動創(chuàng)建元數(shù)據(jù)功能。可通過修改
iotdb-system.properties配置文件中的enable_auto_create_schema配置項為 false,關閉元數(shù)據(jù)自動創(chuàng)建功能。開啟元數(shù)據(jù)同步時,不支持使用自定義插件。
雙活集群中元數(shù)據(jù)同步需避免兩端同時操作。
在進行數(shù)據(jù)同步任務時,請避免執(zhí)行任何刪除操作,防止兩端狀態(tài)不一致。
使用說明
數(shù)據(jù)同步任務有三種狀態(tài):RUNNING、STOPPED 和 DROPPED。任務狀態(tài)轉換如下圖所示:

創(chuàng)建后任務會直接啟動,同時當任務發(fā)生異常停止后,系統(tǒng)會自動嘗試重啟任務。
提供以下 SQL 語句對同步任務進行狀態(tài)管理。
創(chuàng)建任務
使用 CREATE PIPE 語句來創(chuàng)建一條數(shù)據(jù)同步任務,下列屬性中PipeId和sink必填,source和processor為選填項,輸入 SQL 時注意 SOURCE與 SINK 插件順序不能替換。
SQL 示例如下:
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能夠唯一標定任務的名字
-- 數(shù)據(jù)抽取插件,可選插件
WITH SOURCE (
[<parameter> = <value>,],
)
-- 數(shù)據(jù)處理插件,可選插件
WITH PROCESSOR (
[<parameter> = <value>,],
)
-- 數(shù)據(jù)連接插件,必填插件
WITH SINK (
[<parameter> = <value>,],
)IF NOT EXISTS 語義:用于創(chuàng)建操作中,確保當指定 Pipe 不存在時,執(zhí)行創(chuàng)建命令,防止因嘗試創(chuàng)建已存在的 Pipe 而導致報錯。
注意:V1.3.6 起,創(chuàng)建一個全量數(shù)據(jù)同步 Pipe (例如 Pipeid : alldatapipe)時,系統(tǒng)會自動將其拆分為兩個獨立的 Pipe:
歷史 Pipe:PipeId 為原名稱加 _history后綴(如
alldatapipe_history),source 參數(shù)默認攜帶'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''實時 Pipe:PipeId 為原名稱加 _realtime后綴(如
alldatapipe_realtime),source 參數(shù)默認攜帶'history.enable'='false',若配置了元數(shù)據(jù)同步,則由實時 Pipe 負責發(fā)送
創(chuàng)建成功后,原 PipeId(如 alldatapipe)將不再作為有效標識符。在進行啟動、停止、刪除、查看等任務操作時,必須使用拆分后的獨立 PipeId(即 *_history或 *_realtime)。操作示例見查看任務小節(jié)
開始任務
開始處理數(shù)據(jù):
START PIPE<PipeId>停止任務
停止處理數(shù)據(jù):
STOP PIPE <PipeId>刪除任務
刪除指定任務:
DROP PIPE [IF EXISTS] <PipeId>IF EXISTS 語義:用于刪除操作中,確保當指定 Pipe 存在時,執(zhí)行刪除命令,防止因嘗試刪除不存在的 Pipe 而導致報錯。
刪除任務不需要先停止同步任務。
查看任務
查看全部任務:
SHOW PIPES查看指定任務:
SHOW PIPE <PipeId>pipe 的 show pipes 結果示例:
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+
| ID| CreationTime| State|PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+
|59abf95db892428b9d01c5fa318014ea|2024-06-17T14:03:44.189|RUNNING| {}| {}|{sink=iotdb-thrift-sink, sink.ip=127.0.0.1, sink.port=6668}| | 128| 1.03|
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+其中各列含義如下:
- ID:同步任務的唯一標識符
- CreationTime:同步任務的創(chuàng)建的時間
- State:同步任務的狀態(tài)
- PipeSource:同步數(shù)據(jù)流的來源
- PipeProcessor:同步數(shù)據(jù)流在傳輸過程中的處理邏輯
- PipeSink:同步數(shù)據(jù)流的目的地
- ExceptionMessage:顯示同步任務的異常信息
- RemainingEventCount(統(tǒng)計存在延遲):剩余 event 數(shù),當前數(shù)據(jù)同步任務中的所有 event 總數(shù),包括數(shù)據(jù)和元數(shù)據(jù)同步的 event,以及系統(tǒng)和用戶自定義的 event。
- EstimatedRemainingSeconds(統(tǒng)計存在延遲):剩余時間,基于當前 event 個數(shù)和 pipe 處速率,預估完成傳輸?shù)氖S鄷r間。
示例:
在 V1.3.6 及之后的版本中,創(chuàng)建一個全量數(shù)據(jù)同步任務,并查看該任務詳情
IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668')
IoTDB> show pipe alldatapipe_history
+-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
+-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00|
+-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
IoTDB> show pipe alldatapipe_realtime
+--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
+--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00|
+--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+同步插件
為了使得整體架構更加靈活以匹配不同的同步場景需求,我們支持在同步任務框架中進行插件組裝。系統(tǒng)為您預置了一些常用插件可直接使用,同時您也可以自定義 processor 插件 和 Sink 插件,并加載至 IoTDB 系統(tǒng)進行使用。查看系統(tǒng)中的插件(含自定義與內置插件)可以用以下語句:
SHOW PIPEPLUGINS返回結果如下:
IoTDB> SHOW PIPEPLUGINS
+------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+
| PluginName|PluginType| ClassName| PluginJar|
+------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+
| DO-NOTHING-PROCESSOR| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor| |
| DO-NOTHING-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector| |
| IOTDB-AIR-GAP-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector| |
| IOTDB-SOURCE| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor| |
| IOTDB-THRIFT-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector| |
| IOTDB-THRIFT-SSL-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector| |
+------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+預置插件詳細介紹如下(各插件的詳細參數(shù)可參考本文參數(shù)說明):
| 類型 | 自定義插件 | 插件名稱 | 介紹 | 適用版本 |
|---|---|---|---|---|
| source 插件 | 不支持 | iotdb-source | 默認的 extractor 插件,用于抽取 IoTDB 歷史或實時數(shù)據(jù) | 1.2.x |
| processor 插件 | 支持 | do-nothing-processor | 默認的 processor 插件,不對傳入的數(shù)據(jù)做任何的處理 | 1.2.x |
| sink 插件 | 支持 | do-nothing-sink | 不對發(fā)送出的數(shù)據(jù)做任何的處理 | 1.2.x |
| iotdb-thrift-sink | 默認的 sink 插件(V1.3.1及以上),用于 IoTDB(V1.2.0 及以上)與 IoTDB(V1.2.0 及以上)之間的數(shù)據(jù)傳輸。使用 Thrift RPC 框架傳輸數(shù)據(jù),多線程 async non-blocking IO 模型,傳輸性能高,尤其適用于目標端為分布式時的場景 | 1.2.x | ||
| iotdb-air-gap-sink | 用于 IoTDB(V1.2.2 及以上)向 IoTDB(V1.2.2 及以上)跨單向數(shù)據(jù)網閘的數(shù)據(jù)同步。支持的網閘型號包括南瑞 Syskeeper 2000 等 | 1.2.x | ||
| iotdb-thrift-ssl-sink | 用于 IoTDB(V1.3.1 及以上)與 IoTDB(V1.2.0 及以上)之間的數(shù)據(jù)傳輸。使用 Thrift RPC 框架傳輸數(shù)據(jù),單線程 sync blocking IO 模型,適用于安全需求較高的場景 | 1.3.1+ |
導入自定義插件可參考流處理框架章節(jié)。
使用示例
全量數(shù)據(jù)同步
本例子用來演示將一個 IoTDB 的所有數(shù)據(jù)同步至另一個 IoTDB,數(shù)據(jù)鏈路如下圖所示:
據(jù)同步1.png)
在這個例子中,我們可以創(chuàng)建一個名為 A2B 的同步任務,用來同步 A IoTDB 到 B IoTDB 間的全量數(shù)據(jù),這里需要用到用到 sink 的 iotdb-thrift-sink 插件(內置插件),需通過 node-urls 配置目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url,如下面的示例語句:
create pipe A2B
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)部分數(shù)據(jù)同步
本例子用來演示同步某個歷史時間范圍( 2023 年 8 月 23 日 8 點到 2023 年 10 月 23 日 8 點)的數(shù)據(jù)至另一個 IoTDB,數(shù)據(jù)鏈路如下圖所示:
據(jù)同步1.png)
在這個例子中,我們可以創(chuàng)建一個名為 A2B 的同步任務。首先我們需要在 source 中定義傳輸數(shù)據(jù)的范圍,由于傳輸?shù)氖菤v史數(shù)據(jù)(歷史數(shù)據(jù)是指同步任務創(chuàng)建之前存在的數(shù)據(jù)),需要配置數(shù)據(jù)的起止時間 start-time 和 end-time 以及傳輸?shù)哪J?mode。通過 node-urls 配置目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url。
詳細語句如下:
create pipe A2B
WITH SOURCE (
'source'= 'iotdb-source',
'realtime.mode' = 'stream' -- 新插入數(shù)據(jù)(pipe創(chuàng)建后)的抽取模式
'path' = 'root.vehicle.**', -- 同步數(shù)據(jù)的范圍
'start-time' = '2023.08.23T08:00:00+00:00', -- 同步所有數(shù)據(jù)的開始 event time,包含 start-time
'end-time' = '2023.10.23T08:00:00+00:00' -- 同步所有數(shù)據(jù)的結束 event time,包含 end-time
)
with SINK (
'sink'='iotdb-thrift-async-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)雙向數(shù)據(jù)傳輸
本例子用來演示兩個 IoTDB 之間互為雙活的場景,數(shù)據(jù)鏈路如下圖所示:

在這個例子中,為了避免數(shù)據(jù)無限循環(huán),需要將 A 和 B 上的參數(shù)forwarding-pipe-requests 均設置為 false,表示不轉發(fā)從另一 pipe 傳輸而來的數(shù)據(jù),以及要保持兩側的數(shù)據(jù)一致 pipe 需要配置inclusion=all來同步全量數(shù)據(jù)和元數(shù)據(jù)。
詳細語句如下:
在 A IoTDB 上執(zhí)行下列語句:
create pipe AB
with source (
'inclusion'='all', -- 表示同步全量數(shù)據(jù)、元數(shù)據(jù)和權限
'forwarding-pipe-requests' = 'false' --不轉發(fā)由其他 Pipe 寫入的數(shù)據(jù)
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)在 B IoTDB 上執(zhí)行下列語句:
create pipe BA
with source (
'inclusion'='all', -- 表示同步全量數(shù)據(jù)、元數(shù)據(jù)和權限
'forwarding-pipe-requests' = 'false' --是否轉發(fā)由其他 Pipe 寫入的數(shù)據(jù)
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)邊云數(shù)據(jù)傳輸
本例子用來演示多個 IoTDB 之間邊云傳輸數(shù)據(jù)的場景,數(shù)據(jù)由 B 、C、D 集群分別都同步至 A 集群,數(shù)據(jù)鏈路如下圖所示:

在這個例子中,為了將 B 、C、D 集群的數(shù)據(jù)同步至 A,在 BA 、CA、DA 之間的 pipe 需要配置path限制范圍,以及要保持邊側和云側的數(shù)據(jù)一致 pipe 需要配置inclusion=all來同步全量數(shù)據(jù)和元數(shù)據(jù),詳細語句如下:
在 B IoTDB 上執(zhí)行下列語句,將 B 中數(shù)據(jù)同步至 A:
create pipe BA
with source (
'inclusion'='all', -- 表示同步全量數(shù)據(jù)、元數(shù)據(jù)和權限
'path'='root.db.**', -- 限制范圍
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)在 C IoTDB 上執(zhí)行下列語句,將 C 中數(shù)據(jù)同步至 A:
create pipe CA
with source (
'inclusion'='all', -- 表示同步全量數(shù)據(jù)、元數(shù)據(jù)和權限
'path'='root.db.**', -- 限制范圍
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)在 D IoTDB 上執(zhí)行下列語句,將 D 中數(shù)據(jù)同步至 A:
create pipe DA
with source (
'inclusion'='all', -- 表示同步全量數(shù)據(jù)、元數(shù)據(jù)和權限
'path'='root.db.**', -- 限制范圍
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)級聯(lián)數(shù)據(jù)傳輸
本例子用來演示多個 IoTDB 之間級聯(lián)傳輸數(shù)據(jù)的場景,數(shù)據(jù)由 A 集群同步至 B 集群,再同步至 C 集群,數(shù)據(jù)鏈路如下圖所示:

在這個例子中,為了將 A 集群的數(shù)據(jù)同步至 C,在 BC 之間的 pipe 需要將 forwarding-pipe-requests 配置為true,詳細語句如下:
在 A IoTDB 上執(zhí)行下列語句,將 A 中數(shù)據(jù)同步至 B:
create pipe AB
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)在 B IoTDB 上執(zhí)行下列語句,將 B 中數(shù)據(jù)同步至 C:
create pipe BC
with source (
'forwarding-pipe-requests' = 'true' --是否轉發(fā)由其他 Pipe 寫入的數(shù)據(jù)
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)跨網閘數(shù)據(jù)傳輸
本例子用來演示將一個 IoTDB 的數(shù)據(jù),經過單向網閘,同步至另一個 IoTDB 的場景,數(shù)據(jù)鏈路如下圖所示:

在這個例子中,需要使用 sink 任務中的 iotdb-air-gap-sink 插件,配置網閘后,在 A IoTDB 上執(zhí)行下列語句,其中 node-urls 填寫網閘配置的目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url,詳細語句如下:
create pipe A2B
with sink (
'sink'='iotdb-air-gap-sink',
'node-urls' = '10.53.53.53:9780', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
)注意:目前支持的網閘型號
其他型號的網閘設備,請與天謀商務聯(lián)系確認是否支持。
| 網閘類型 | 網閘型號 | 回包限制 | 發(fā)送限制 |
|---|---|---|---|
| 正向型 | 南瑞 Syskeeper-2000 正向型 | 全 0 / 全 1 bytes | 無限制 |
| 正向型 | 許繼自研網閘 | 全 0 / 全 1 bytes | 無限制 |
| 未標記正反向 | 威努特安全隔離與信息交換系統(tǒng) | 無限制 | 無限制 |
| 正向型 | 科東 StoneWall-2000 網絡安全隔離設備(正向型) | 無限制 | 無限制 |
| 反向型 | 南瑞 Syskeeper-2000 反向型 | 全 0 / 全 1 bytes | 滿足 E 語言格式 |
| 未標記正反向 | 迪普科技ISG5000 | 無限制 | 無限制 |
| 未標記正反向 | 熙羚安全隔離與信息交換系統(tǒng)XL—GAP | 無限制 | 無限制 |
壓縮同步
IoTDB 支持在同步過程中指定數(shù)據(jù)壓縮方式。可通過配置 compressor 參數(shù),實現(xiàn)數(shù)據(jù)的實時壓縮和傳輸。compressor目前支持 snappy / gzip / lz4 / zstd / lzma2 5 種可選算法,且可以選擇多種壓縮算法組合,按配置的順序進行壓縮。rate-limit-bytes-per-second(V1.3.3 及以后版本支持)每秒最大允許傳輸?shù)腷yte數(shù),計算壓縮后的byte,若小于0則不限制。
如創(chuàng)建一個名為 A2B 的同步任務:
create pipe A2B
with sink (
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
'compressor' = 'snappy,lz4' --
'rate-limit-bytes-per-second'='1048576' -- 每秒最大允許傳輸?shù)腷yte數(shù)
)加密同步
IoTDB 支持在同步過程中使用 SSL 加密,從而在不同的 IoTDB 實例之間安全地傳輸數(shù)據(jù)。通過配置 SSL 相關的參數(shù),如證書地址和密碼(ssl.trust-store-path)、(ssl.trust-store-pwd)可以確保數(shù)據(jù)在同步過程中被 SSL 加密所保護。
如創(chuàng)建名為 A2B 的同步任務:
create pipe A2B
with sink (
'sink'='iotdb-thrift-ssl-sink',
'node-urls'='127.0.0.1:6667', -- 目標端 IoTDB 中 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url
'ssl.trust-store-path'='pki/trusted', -- 連接目標端 DataNode 所需的 trust store 證書路徑
'ssl.trust-store-pwd'='root' -- 連接目標端 DataNode 所需的 trust store 證書密碼
)參考:注意事項
可通過修改 IoTDB 配置文件(iotdb-system.properties)以調整數(shù)據(jù)同步的參數(shù),如同步數(shù)據(jù)存儲目錄等。完整配置如下::
V1.3.3+:
# pipe_receiver_file_dir
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${cn_system_dir}/pipe/receiver).
# If it is absolute, system will save the data in the exact location it points to.
# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
# Note: If pipe_receiver_file_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
# effectiveMode: restart
# For windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
# pipe_receiver_file_dir=data\\confignode\\system\\pipe\\receiver
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
pipe_receiver_file_dir=data/confignode/system/pipe/receiver
####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# effectiveMode: first_start
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# effectiveMode: restart
# Datatype: int
pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# effectiveMode: restart
# Datatype: int
pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# effectiveMode: restart
# Datatype: int
pipe_sink_selector_number=4
# The maximum number of clients that can be used in the sink.
# effectiveMode: restart
# Datatype: int
pipe_sink_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# effectiveMode: restart
# Datatype: Boolean
pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# Datatype: int
# effectiveMode: restart
pipe_air_gap_receiver_port=9780
# The total bytes that all pipe sinks can transfer per second.
# When given a value less than or equal to 0, it means no limit.
# default value is -1, which means no limit.
# effectiveMode: hot_reload
# Datatype: double
pipe_all_sinks_rate_limit_bytes_per_second=-1參考:參數(shù)說明
source 參數(shù)(V1.3.3)
| 參數(shù) | 描述 | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| source | iotdb-source | String: iotdb-source | 必填 | - |
| inclusion | 用于指定數(shù)據(jù)同步任務中需要同步范圍,分為數(shù)據(jù)、元數(shù)據(jù)和權限 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 選填 | data.insert |
| inclusion.exclusion | 用于從 inclusion 指定的同步范圍內排除特定的操作,減少同步的數(shù)據(jù)量 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 選填 | 空字符串 |
| mode | 用于在每個 data region 發(fā)送完畢時分別發(fā)送結束事件,并在全部 data region 發(fā)送完畢后自動 drop pipe。query:結束,subscribe:不結束。 | String: query / subscribe | 選填 | subscribe |
| path | 用于篩選待同步的時間序列及其相關元數(shù)據(jù) / 數(shù)據(jù)的路徑模式元數(shù)據(jù)同步只能用pathpath 是精確匹配,參數(shù)必須為前綴路徑或完整路徑,即不能含有 "*",最多在 path參數(shù)的尾部含有一個 "**" | String:IoTDB 的 pattern | 選填 | root.** |
| pattern | 用于篩選時間序列的路徑前綴 | String: 任意的時間序列前綴 | 選填 | root |
| start-time | 同步所有數(shù)據(jù)的開始 event time,包含 start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 選填 | Long.MIN_VALUE |
| end-time | 同步所有數(shù)據(jù)的結束 event time,包含 end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 選填 | Long.MAX_VALUE |
| realtime.mode | 新插入數(shù)據(jù)(pipe創(chuàng)建后)的抽取模式 | String: stream, batch | 選填 | batch |
| forwarding-pipe-requests | 是否轉發(fā)由其他 Pipe (通常是數(shù)據(jù)同步)寫入的數(shù)據(jù) | Boolean: true, false | 選填 | true |
| history.loose-range | tsfile傳輸時,是否放寬歷史數(shù)據(jù)(pipe創(chuàng)建前)范圍。"":不放寬范圍,嚴格按照設置的條件挑選數(shù)據(jù)"time":放寬時間范圍,避免對TsFile進行拆分,可以提升同步效率"path":放寬路徑范圍,避免對TsFile進行拆分,可以提升同步效率"time, path" 、 "path, time" 、"all" : 放寬所有范圍,避免對TsFile進行拆分,可以提升同步效率 | String: "" 、 "time" 、 "path" 、 "time, path" 、 "path, time" 、 "all" | 選填 | "" |
| realtime.loose-range | tsfile傳輸時,是否放寬實時數(shù)據(jù)(pipe創(chuàng)建前)范圍。"":不放寬范圍,嚴格按照設置的條件挑選數(shù)據(jù)"time":放寬時間范圍,避免對TsFile進行拆分,可以提升同步效率"path":放寬路徑范圍,避免對TsFile進行拆分,可以提升同步效率"time, path" 、 "path, time" 、"all" : 放寬所有范圍,避免對TsFile進行拆分,可以提升同步效率 | String: "" 、 "time" 、 "path" 、 "time, path" 、 "path, time" 、 "all" | 選填 | "" |
| mods.enable | 是否發(fā)送 tsfile 的 mods 文件 | Boolean: true / false | 選填 | false |
?? 說明:為保持低版本兼容,history.enable、history.start-time、history.end-time、realtime.enable 仍可使用,但在新版本中不推薦。
?? 說明:數(shù)據(jù)抽取模式 stream 和 batch 的差異
- stream(推薦):該模式下,任務將對數(shù)據(jù)進行實時處理、發(fā)送,其特點是高時效、低吞吐
- batch:該模式下,任務將對數(shù)據(jù)進行批量(按底層數(shù)據(jù)文件)處理、發(fā)送,其特點是低時效、高吞吐
sink 參數(shù)
在 1.3.3 及以上的版本中,只包含sink的情況下,不再需要額外增加with sink 前綴
iotdb-thrift-sink
| key | value | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| sink | iotdb-thrift-sink 或 iotdb-thrift-async-sink | String: iotdb-thrift-sink 或 iotdb-thrift-async-sink | 必填 | - |
| node-urls | 目標端 IoTDB 任意多個 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url(請注意同步任務不支持向自身服務進行轉發(fā)) | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| batch.enable | 是否開啟日志攢批發(fā)送模式,用于提高傳輸吞吐,降低 IOPS | Boolean: true, false | 選填 | true |
| batch.max-delay-seconds | 在開啟日志攢批發(fā)送模式時生效,表示一批數(shù)據(jù)在發(fā)送前的最長等待時間(單位:s) | Integer | 選填 | 1 |
| batch.max-delay-ms | 在開啟日志攢批發(fā)送模式時生效,表示一批數(shù)據(jù)在發(fā)送前的最長等待時間(單位:ms)(V1.3.6及以后的V1.x版本支持) | Integer | 選填 | 1 |
| batch.size-bytes | 在開啟日志攢批發(fā)送模式時生效,表示一批數(shù)據(jù)最大的攢批大小(單位:byte) | Long | 選填 | 1610241024 |
| load-tsfile-strategy | 文件同步數(shù)據(jù)時,接收端請求返回發(fā)送端前,是否等待接收端本地的 load tsfile 執(zhí)行結果返回。 sync:等待本地的 load tsfile 執(zhí)行結果返回; async:不等待本地的 load tsfile 執(zhí)行結果返回。(V1.3.6及以后的V1.x版本支持) | String: sync / async | 選填 | sync |
iotdb-air-gap-sink
| key | value | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| sink | iotdb-air-gap-sink | String: iotdb-air-gap-sink | 必填 | - |
| node-urls | 目標端 IoTDB 任意多個 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| air-gap.handshake-timeout-ms | 發(fā)送端與接收端在首次嘗試建立連接時握手請求的超時時長,單位:毫秒 | Integer | 選填 | 5000 |
| load-tsfile-strategy | 文件同步數(shù)據(jù)時,接收端請求返回發(fā)送端前,是否等待接收端本地的 load tsfile 執(zhí)行結果返回。 sync:等待本地的 load tsfile 執(zhí)行結果返回; async:不等待本地的 load tsfile 執(zhí)行結果返回。(V1.3.6及以后的V1.x版本支持) | String: sync / async | 選填 | sync |
iotdb-thrift-ssl-sink
| key | value | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| sink | iotdb-thrift-ssl-sink | String: iotdb-thrift-ssl-sink | 必填 | - |
| node-urls | 目標端 IoTDB 任意多個 DataNode 節(jié)點的數(shù)據(jù)服務端口的 url(請注意同步任務不支持向自身服務進行轉發(fā)) | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| batch.enable | 是否開啟日志攢批發(fā)送模式,用于提高傳輸吞吐,降低 IOPS | Boolean: true, false | 選填 | true |
| batch.max-delay-seconds | 在開啟日志攢批發(fā)送模式時生效,表示一批數(shù)據(jù)在發(fā)送前的最長等待時間(單位:s) | Integer | 選填 | 1 |
| batch.max-delay-ms | 在開啟日志攢批發(fā)送模式時生效,表示一批數(shù)據(jù)在發(fā)送前的最長等待時間(單位:ms)(V1.3.6及以后的V1.x版本支持) | Integer | 選填 | 1 |
| batch.size-bytes | 在開啟日志攢批發(fā)送模式時生效,表示一批數(shù)據(jù)最大的攢批大小(單位:byte) | Long | 選填 | 1610241024 |
| load-tsfile-strategy | 文件同步數(shù)據(jù)時,接收端請求返回發(fā)送端前,是否等待接收端本地的 load tsfile 執(zhí)行結果返回。 sync:等待本地的 load tsfile 執(zhí)行結果返回; async:不等待本地的 load tsfile 執(zhí)行結果返回。(V1.3.6及以后的V1.x版本支持) | String: sync / async | 選填 | sync |
| ssl.trust-store-path | 連接目標端 DataNode 所需的 trust store 證書路徑 | String.Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| ssl.trust-store-pwd | 連接目標端 DataNode 所需的 trust store 證書密碼 | Integer | 必填 | - |