數據同步
數據同步
數據同步是工業物聯網的典型需求,通過數據同步機制,可實現 IoTDB 之間的數據共享,搭建完整的數據鏈路來滿足內網外網數據互通、端邊云同步、數據遷移、數據備份等需求。
1. 功能概述
1.1 數據同步
一個數據同步任務包含 3 個階段:

- 抽取(Source)階段:該部分用于從源 IoTDB 抽取數據,在 SQL 語句中的 source 部分定義
- 處理(Process)階段:該部分用于處理從源 IoTDB 抽取出的數據,在 SQL 語句中的 processor 部分定義
- 發送(Sink)階段:該部分用于向目標 IoTDB 發送數據,在 SQL 語句中的 sink 部分定義
通過 SQL 語句聲明式地配置 3 個部分的具體內容,可實現靈活的數據同步能力。目前數據同步支持以下信息的同步,您可以在創建同步任務時對同步范圍進行選擇(默認選擇 data.insert,即同步新寫入的數據):
| 同步范圍 | 同步內容 | 說明 |
|---|---|---|
| all | 所有范圍 | |
| data(數據) | insert(增量) | 同步新寫入的數據 |
| delete(刪除) | 同步被刪除的數據 | |
| schema(元數據) | database(數據庫) | 同步數據庫的創建、修改或刪除操作 |
| timeseries(時間序列) | 同步時間序列的定義和屬性 | |
| TTL(數據到期時間) | 同步數據的存活時間 | |
| auth(權限) | - | 同步用戶權限和訪問控制 |
1.2 功能限制及說明
元數據(schema)、權限(auth)同步功能存在如下限制:
使用元數據同步時,要求
Schema region、ConfigNode的共識協議必須為默認的 ratis 協議,即iotdb-system.properties配置文件中是否包含config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus、schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus,不包含即為默認值ratis 協議。為了防止潛在的沖突,請在開啟元數據同步時關閉接收端自動創建元數據功能。可通過修改
iotdb-system.properties配置文件中的enable_auto_create_schema配置項為 false,關閉元數據自動創建功能。開啟元數據同步時,不支持使用自定義插件。
雙活集群中元數據同步需避免兩端同時操作。
在進行數據同步任務時,請避免執行任何刪除操作,防止兩端狀態不一致。
2. 使用說明
數據同步任務有三種狀態:RUNNING、STOPPED 和 DROPPED。任務狀態轉換如下圖所示:

創建后任務會直接啟動,同時當任務發生異常停止后,系統會自動嘗試重啟任務。
提供以下 SQL 語句對同步任務進行狀態管理。
2.1 創建任務
使用 CREATE PIPE 語句來創建一條數據同步任務,下列屬性中PipeId和sink必填,source和processor為選填項,輸入 SQL 時注意 SOURCE與 SINK 插件順序不能替換。
SQL 示例如下:
CREATE PIPE [IF NOT EXISTS] <PipeId> -- PipeId 是能夠唯一標定任務的名字
-- 數據抽取插件,可選插件
WITH SOURCE (
[<parameter> = <value>,],
)
-- 數據處理插件,可選插件
WITH PROCESSOR (
[<parameter> = <value>,],
)
-- 數據連接插件,必填插件
WITH SINK (
[<parameter> = <value>,],
)IF NOT EXISTS 語義:用于創建操作中,確保當指定 Pipe 不存在時,執行創建命令,防止因嘗試創建已存在的 Pipe 而導致報錯。
注意:V2.0.8 起,創建一個全量數據同步 Pipe (例如 Pipeid : alldatapipe)時,系統會自動將其拆分為兩個獨立的 Pipe:
歷史 Pipe:PipeId 為原名稱加 _history后綴(如
alldatapipe_history),source 參數默認攜帶'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''實時 Pipe:PipeId 為原名稱加 _realtime后綴(如
alldatapipe_realtime),source 參數默認攜帶'history.enable'='false',若配置了元數據同步,則由實時 Pipe 負責發送
創建成功后,原 PipeId(如 alldatapipe)將不再作為有效標識符。在進行啟動、停止、刪除、查看等任務操作時,必須使用拆分后的獨立 PipeId(即 *_history或 *_realtime)。操作示例見查看任務小節
2.2 開始任務
開始處理數據:
START PIPE<PipeId>2.3 停止任務
停止處理數據:
STOP PIPE <PipeId>2.4 刪除任務
刪除指定任務:
DROP PIPE [IF EXISTS] <PipeId>IF EXISTS 語義:用于刪除操作中,確保當指定 Pipe 存在時,執行刪除命令,防止因嘗試刪除不存在的 Pipe 而導致報錯。
刪除任務不需要先停止同步任務。
2.5 查看任務
查看全部任務:
SHOW PIPES查看指定任務:
SHOW PIPE <PipeId>pipe 的 show pipes 結果示例:
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+
| ID| CreationTime| State|PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+
|59abf95db892428b9d01c5fa318014ea|2024-06-17T14:03:44.189|RUNNING| {}| {}|{sink=iotdb-thrift-sink, sink.ip=127.0.0.1, sink.port=6668}| | 128| 1.03|
+--------------------------------+-----------------------+-------+----------+-------------+-----------------------------------------------------------+----------------+-------------------+-------------------------+其中各列含義如下:
- ID:同步任務的唯一標識符
- CreationTime:同步任務的創建的時間
- State:同步任務的狀態
- PipeSource:同步數據流的來源
- PipeProcessor:同步數據流在傳輸過程中的處理邏輯
- PipeSink:同步數據流的目的地
- ExceptionMessage:顯示同步任務的異常信息
- RemainingEventCount(統計存在延遲):剩余 event 數,當前數據同步任務中的所有 event 總數,包括數據和元數據同步的 event,以及系統和用戶自定義的 event。
- EstimatedRemainingSeconds(統計存在延遲):剩余時間,基于當前 event 個數和 pipe 處速率,預估完成傳輸的剩余時間。
示例:
在 V2.0.8 及之后的版本中,創建一個全量數據同步任務,并查看該任務詳情
IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668')
IoTDB> show pipe alldatapipe_history
+-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
+-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00|
+-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
IoTDB> show pipe alldatapipe_realtime
+--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds|
+--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+
|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00|
+--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+2.6 同步插件
為了使得整體架構更加靈活以匹配不同的同步場景需求,我們支持在同步任務框架中進行插件組裝。系統為您預置了一些常用插件可直接使用,同時您也可以自定義 processor 插件 和 Sink 插件,并加載至 IoTDB 系統進行使用。查看系統中的插件(含自定義與內置插件)可以用以下語句:
SHOW PIPEPLUGINS返回結果如下:
IoTDB> SHOW PIPEPLUGINS
+------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+
| PluginName|PluginType| ClassName| PluginJar|
+------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+
| DO-NOTHING-PROCESSOR| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.processor.donothing.DoNothingProcessor| |
| DO-NOTHING-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.donothing.DoNothingConnector| |
| IOTDB-AIR-GAP-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector| |
| IOTDB-SOURCE| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.extractor.iotdb.IoTDBExtractor| |
| IOTDB-THRIFT-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector| |
| IOTDB-THRIFT-SSL-SINK| Builtin| org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector| |
+------------------------------+----------+--------------------------------------------------------------------------------------------------+----------------------------------------------------+預置插件詳細介紹如下(各插件的詳細參數可參考本文參數說明):
| 類型 | 自定義插件 | 插件名稱 | 介紹 | 適用版本 |
|---|---|---|---|---|
| source 插件 | 不支持 | iotdb-source | 默認的 extractor 插件,用于抽取 IoTDB 歷史或實時數據 | 1.2.x |
| processor 插件 | 支持 | do-nothing-processor | 默認的 processor 插件,不對傳入的數據做任何的處理 | 1.2.x |
| sink 插件 | 支持 | do-nothing-sink | 不對發送出的數據做任何的處理 | 1.2.x |
| iotdb-thrift-sink | 默認的 sink 插件(V1.3.1及以上),用于 IoTDB(V1.2.0 及以上)與 IoTDB(V1.2.0 及以上)之間的數據傳輸。使用 Thrift RPC 框架傳輸數據,多線程 async non-blocking IO 模型,傳輸性能高,尤其適用于目標端為分布式時的場景 | 1.2.x | ||
| iotdb-air-gap-sink | 用于 IoTDB(V1.2.2 及以上)向 IoTDB(V1.2.2 及以上)跨單向數據網閘的數據同步。支持的網閘型號包括南瑞 Syskeeper 2000 等 | 1.2.x | ||
| iotdb-thrift-ssl-sink | 用于 IoTDB(V1.3.1 及以上)與 IoTDB(V1.2.0 及以上)之間的數據傳輸。使用 Thrift RPC 框架傳輸數據,單線程 sync blocking IO 模型,適用于安全需求較高的場景 | 1.3.1+ |
導入自定義插件可參考流處理框架章節。
3. 使用示例
3.1 全量數據同步
本例子用來演示將一個 IoTDB 的所有數據同步至另一個 IoTDB,數據鏈路如下圖所示:

在這個例子中,我們可以創建一個名為 A2B 的同步任務,用來同步 A IoTDB 到 B IoTDB 間的全量數據,這里需要用到用到 sink 的 iotdb-thrift-sink 插件(內置插件),需通過 node-urls 配置目標端 IoTDB 中 DataNode 節點的數據服務端口的 url,如下面的示例語句:
create pipe A2B
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)3.2 部分數據同步
本例子用來演示同步某個歷史時間范圍( 2023 年 8 月 23 日 8 點到 2023 年 10 月 23 日 8 點)的數據至另一個 IoTDB,數據鏈路如下圖所示:

在這個例子中,我們可以創建一個名為 A2B 的同步任務。首先我們需要在 source 中定義傳輸數據的范圍,由于傳輸的是歷史數據(歷史數據是指同步任務創建之前存在的數據),需要配置數據的起止時間 start-time 和 end-time 以及傳輸的模式 mode。通過 node-urls 配置目標端 IoTDB 中 DataNode 節點的數據服務端口的 url。
詳細語句如下:
create pipe A2B
WITH SOURCE (
'source'= 'iotdb-source',
'realtime.mode' = 'stream' -- 新插入數據(pipe創建后)的抽取模式
'path' = 'root.vehicle.**', -- 同步數據的范圍
'start-time' = '2023.08.23T08:00:00+00:00', -- 同步所有數據的開始 event time,包含 start-time
'end-time' = '2023.10.23T08:00:00+00:00' -- 同步所有數據的結束 event time,包含 end-time
)
with SINK (
'sink'='iotdb-thrift-async-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)3.3 雙向數據傳輸
本例子用來演示兩個 IoTDB 之間互為雙活的場景,數據鏈路如下圖所示:

在這個例子中,為了避免數據無限循環,需要將 A 和 B 上的參數forwarding-pipe-requests 均設置為 false,表示不轉發從另一 pipe 傳輸而來的數據,以及要保持兩側的數據一致 pipe 需要配置inclusion=all來同步全量數據和元數據。
詳細語句如下:
在 A IoTDB 上執行下列語句:
create pipe AB
with source (
'inclusion'='all', -- 表示同步全量數據、元數據和權限
'forwarding-pipe-requests' = 'false' --不轉發由其他 Pipe 寫入的數據
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)在 B IoTDB 上執行下列語句:
create pipe BA
with source (
'inclusion'='all', -- 表示同步全量數據、元數據和權限
'forwarding-pipe-requests' = 'false' --是否轉發由其他 Pipe 寫入的數據
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)3.4 邊云數據傳輸
本例子用來演示多個 IoTDB 之間邊云傳輸數據的場景,數據由 B 、C、D 集群分別都同步至 A 集群,數據鏈路如下圖所示:

在這個例子中,為了將 B 、C、D 集群的數據同步至 A,在 BA 、CA、DA 之間的 pipe 需要配置path限制范圍,以及要保持邊側和云側的數據一致 pipe 需要配置inclusion=all來同步全量數據和元數據,詳細語句如下:
在 B IoTDB 上執行下列語句,將 B 中數據同步至 A:
create pipe BA
with source (
'inclusion'='all', -- 表示同步全量數據、元數據和權限
'path'='root.db.**', -- 限制范圍
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6667', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)在 C IoTDB 上執行下列語句,將 C 中數據同步至 A:
create pipe CA
with source (
'inclusion'='all', -- 表示同步全量數據、元數據和權限
'path'='root.db.**', -- 限制范圍
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)在 D IoTDB 上執行下列語句,將 D 中數據同步至 A:
create pipe DA
with source (
'inclusion'='all', -- 表示同步全量數據、元數據和權限
'path'='root.db.**', -- 限制范圍
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)3.5 級聯數據傳輸
本例子用來演示多個 IoTDB 之間級聯傳輸數據的場景,數據由 A 集群同步至 B 集群,再同步至 C 集群,數據鏈路如下圖所示:

在這個例子中,為了將 A 集群的數據同步至 C,在 BC 之間的 pipe 需要將 forwarding-pipe-requests 配置為true,詳細語句如下:
在 A IoTDB 上執行下列語句,將 A 中數據同步至 B:
create pipe AB
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)在 B IoTDB 上執行下列語句,將 B 中數據同步至 C:
create pipe BC
with source (
'forwarding-pipe-requests' = 'true' --是否轉發由其他 Pipe 寫入的數據
)
with sink (
'sink'='iotdb-thrift-sink',
'node-urls' = '127.0.0.1:6669', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)3.6 跨網閘數據傳輸
本例子用來演示將一個 IoTDB 的數據,經過單向網閘,同步至另一個 IoTDB 的場景,數據鏈路如下圖所示:

在這個例子中,需要使用 sink 任務中的 iotdb-air-gap-sink 插件,配置網閘后,在 A IoTDB 上執行下列語句,其中 node-urls 填寫網閘配置的目標端 IoTDB 中 DataNode 節點的數據服務端口的 url,詳細語句如下:
create pipe A2B
with sink (
'sink'='iotdb-air-gap-sink',
'node-urls' = '10.53.53.53:9780', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
)注意:目前支持的網閘型號
其他型號的網閘設備,請與天謀商務聯系確認是否支持。
| 網閘類型 | 網閘型號 | 回包限制 | 發送限制 |
|---|---|---|---|
| 正向型 | 南瑞 Syskeeper-2000 正向型 | 全 0 / 全 1 bytes | 無限制 |
| 正向型 | 許繼自研網閘 | 全 0 / 全 1 bytes | 無限制 |
| 未標記正反向 | 威努特安全隔離與信息交換系統 | 無限制 | 無限制 |
| 正向型 | 科東 StoneWall-2000 網絡安全隔離設備(正向型) | 無限制 | 無限制 |
| 反向型 | 南瑞 Syskeeper-2000 反向型 | 全 0 / 全 1 bytes | 滿足 E 語言格式 |
| 未標記正反向 | 迪普科技ISG5000 | 無限制 | 無限制 |
| 未標記正反向 | 熙羚安全隔離與信息交換系統XL—GAP | 無限制 | 無限制 |
3.7 壓縮同步
IoTDB 支持在同步過程中指定數據壓縮方式。可通過配置 compressor 參數,實現數據的實時壓縮和傳輸。compressor目前支持 snappy / gzip / lz4 / zstd / lzma2 5 種可選算法,且可以選擇多種壓縮算法組合,按配置的順序進行壓縮。rate-limit-bytes-per-second(V1.3.3 及以后版本支持)每秒最大允許傳輸的byte數,計算壓縮后的byte,若小于0則不限制。
如創建一個名為 A2B 的同步任務:
create pipe A2B
with sink (
'node-urls' = '127.0.0.1:6668', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
'compressor' = 'snappy,lz4' --
'rate-limit-bytes-per-second'='1048576' -- 每秒最大允許傳輸的byte數
)3.8 加密同步
IoTDB 支持在同步過程中使用 SSL 加密,從而在不同的 IoTDB 實例之間安全地傳輸數據。通過配置 SSL 相關的參數,如證書地址和密碼(ssl.trust-store-path)、(ssl.trust-store-pwd)可以確保數據在同步過程中被 SSL 加密所保護。
如創建名為 A2B 的同步任務:
create pipe A2B
with sink (
'sink'='iotdb-thrift-ssl-sink',
'node-urls'='127.0.0.1:6667', -- 目標端 IoTDB 中 DataNode 節點的數據服務端口的 url
'ssl.trust-store-path'='pki/trusted', -- 連接目標端 DataNode 所需的 trust store 證書路徑
'ssl.trust-store-pwd'='root' -- 連接目標端 DataNode 所需的 trust store 證書密碼
)4. 參考:注意事項
可通過修改 IoTDB 配置文件(iotdb-system.properties)以調整數據同步的參數,如同步數據存儲目錄等。完整配置如下::
V1.3.3+:
# pipe_receiver_file_dir
# If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${cn_system_dir}/pipe/receiver).
# If it is absolute, system will save the data in the exact location it points to.
# If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder.
# Note: If pipe_receiver_file_dir is assigned an empty string(i.e.,zero-size), it will be handled as a relative path.
# effectiveMode: restart
# For windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative.
# pipe_receiver_file_dir=data\\confignode\\system\\pipe\\receiver
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
pipe_receiver_file_dir=data/confignode/system/pipe/receiver
####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# effectiveMode: first_start
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# effectiveMode: restart
# Datatype: int
pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# effectiveMode: restart
# Datatype: int
pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# effectiveMode: restart
# Datatype: int
pipe_sink_selector_number=4
# The maximum number of clients that can be used in the sink.
# effectiveMode: restart
# Datatype: int
pipe_sink_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# effectiveMode: restart
# Datatype: Boolean
pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# Datatype: int
# effectiveMode: restart
pipe_air_gap_receiver_port=9780
# The total bytes that all pipe sinks can transfer per second.
# When given a value less than or equal to 0, it means no limit.
# default value is -1, which means no limit.
# effectiveMode: hot_reload
# Datatype: double
pipe_all_sinks_rate_limit_bytes_per_second=-15. 參考:參數說明
5.1 source 參數
| 參數 | 描述 | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| source | iotdb-source | String: iotdb-source | 必填 | - |
| inclusion | 用于指定數據同步任務中需要同步范圍,分為數據、元數據和權限 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 選填 | data.insert |
| inclusion.exclusion | 用于從 inclusion 指定的同步范圍內排除特定的操作,減少同步的數據量 | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | 選填 | 空字符串 |
| mode.streaming | 此參數指定時序數據寫入的捕獲來源。適用于 mode.streaming為 false 模式下的場景,決定inclusion中data.insert數據的捕獲來源。提供兩種捕獲策略:true: 動態選擇捕獲的類型。系統將根據下游處理速度,自適應地選擇是捕獲每個寫入請求還是僅捕獲 TsFile 文件的封口請求。當下游處理速度快時,優先捕獲寫入請求以減少延遲;當處理速度慢時,僅捕獲文件封口請求以避免處理堆積。這種模式適用于大多數場景,能夠實現處理延遲和吞吐量的最優平衡。false:固定按批捕獲方式。僅捕獲 TsFile 文件的封口請求,適用于資源受限的應用場景,以降低系統負載。注意,pipe 啟動時捕獲的快照數據只會以文件的方式供下游處理。 | Boolean: true / false | 否 | true |
| mode.strict | 在使用 time / path / database-name / table-name 參數過濾數據時,是否需要嚴格按照條件篩選:true: 嚴格篩選。系統將完全按照給定條件過濾篩選被捕獲的數據,確保只有符合條件的數據被選中。false:非嚴格篩選。系統在篩選被捕獲的數據時可能會包含一些額外的數據,適用于性能敏感的場景,可降低 CPU 和 IO 消耗。 | Boolean: true / false | 否 | true |
| mode.snapshot | 此參數決定時序數據的捕獲方式,影響inclusion中的data數據。提供兩種模式:true:靜態數據捕獲。啟動 pipe 時,會進行一次性的數據快照捕獲。當快照數據被完全消費后,pipe 將自動終止(DROP PIPE SQL 會自動執行)。false:動態數據捕獲。除了在 pipe 啟動時捕獲快照數據外,還會持續捕獲后續的數據變更。pipe 將持續運行以處理動態數據流。 | Boolean: true / false | 否 | false |
| path | 當用戶連接指定的sql_dialect為tree時可以指定。對于升級上來的用戶pipe,默認sql_dialect為tree。此參數決定時序數據的捕獲范圍,影響 inclusion中的data數據,以及部分序列相關的元數據。當數據的樹模型路徑能夠被path匹配時,數據會被篩選出來進入流處理pipe。 | String:IoTDB標準的樹路徑模式,可以帶通配符 | 選填 | root.** |
| start-time | 同步所有數據的開始 event time,包含 start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 選填 | Long.MIN_VALUE |
| end-time | 同步所有數據的結束 event time,包含 end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | 選填 | Long.MAX_VALUE |
| forwarding-pipe-requests | 是否轉發由其他 Pipe (通常是數據同步)寫入的數據 | Boolean: true, false | 選填 | true |
| mods | 同 mods.enable,是否發送 tsfile 的 mods 文件 | Boolean: true / false | 選填 | false |
?? 說明:數據抽取模式 mode.streaming 取值 true 和 false 的差異
- true(推薦):該取值下,任務將對數據進行實時處理、發送,其特點是高時效、低吞吐
- false:該取值下,任務將對數據進行批量(按底層數據文件)處理、發送,其特點是低時效、高吞吐
5.2 sink 參數
iotdb-thrift-sink
| key | value | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| sink | iotdb-thrift-sink 或 iotdb-thrift-async-sink | String: iotdb-thrift-sink 或 iotdb-thrift-async-sink | 必填 | - |
| node-urls | 目標端 IoTDB 任意多個 DataNode 節點的數據服務端口的 url(請注意同步任務不支持向自身服務進行轉發) | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| user/username | 連接接收端使用的用戶名,同步要求該用戶具備相應的操作權限 | String | 選填 | root |
| password | 連接接收端使用的用戶名對應的密碼,同步要求該用戶具備相應的操作權限 | String | 選填 | TimechoDB@2021, V2.0.6.x 之前為root |
| batch.enable | 是否開啟日志攢批發送模式,用于提高傳輸吞吐,降低 IOPS | Boolean: true, false | 選填 | true |
| batch.max-delay-seconds | 在開啟日志攢批發送模式時生效,表示一批數據在發送前的最長等待時間(單位:s) | Integer | 選填 | 1 |
| batch.max-delay-ms | 在開啟日志攢批發送模式時生效,表示一批數據在發送前的最長等待時間(單位:ms)(V2.0.5及以后版本支持) | Integer | 選填 | 1 |
| batch.size-bytes | 在開啟日志攢批發送模式時生效,表示一批數據最大的攢批大小(單位:byte) | Long | 選填 | 1610241024 |
| compressor | 所選取的 rpc 壓縮算法,可配置多個,對每個請求順序采用 | String: snappy / gzip / lz4 / zstd / lzma2 | 選填 | "" |
| compressor.zstd.level | 所選取的 rpc 壓縮算法為 zstd 時,可使用該參數額外配置 zstd 算法的壓縮等級 | Int: [-131072, 22] | 選填 | 3 |
| rate-limit-bytes-per-second | 每秒最大允許傳輸的 byte 數,計算壓縮后的 byte(如壓縮),若小于 0 則不限制 | Double: [Double.MIN_VALUE, Double.MAX_VALUE] | 選填 | -1 |
| load-tsfile-strategy | 文件同步數據時,接收端請求返回發送端前,是否等待接收端本地的 load tsfile 執行結果返回。 sync:等待本地的 load tsfile 執行結果返回; async:不等待本地的 load tsfile 執行結果返回。 | String: sync / async | 選填 | sync |
| format | 數據傳輸的payload格式, 可選項包括: - hybrid: 取決于 processor 傳遞過來的格式(tsfile或tablet),sink不做任何轉換。 - tsfile:強制轉換成tsfile發送,可用于數據文件備份等場景。 - tablet:強制轉換成tsfile發送,可用于發送端/接收端數據類型不完全兼容時的數據同步(以減少報錯)。 | String: hybrid / tsfile / tablet | 選填 | hybrid |
iotdb-air-gap-sink
| key | value | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| sink | iotdb-air-gap-sink | String: iotdb-air-gap-sink | 必填 | - |
| node-urls | 目標端 IoTDB 任意多個 DataNode 節點的數據服務端口的 url | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| user/username | 連接接收端使用的用戶名,同步要求該用戶具備相應的操作權限 | String | 選填 | root |
| password | 連接接收端使用的用戶名對應的密碼,同步要求該用戶具備相應的操作權限 | String | 選填 | TimechoDB@2021, V2.0.6.x 之前為root |
| compressor | 所選取的 rpc 壓縮算法,可配置多個,對每個請求順序采用 | String: snappy / gzip / lz4 / zstd / lzma2 | 選填 | "" |
| compressor.zstd.level | 所選取的 rpc 壓縮算法為 zstd 時,可使用該參數額外配置 zstd 算法的壓縮等級 | Int: [-131072, 22] | 選填 | 3 |
| rate-limit-bytes-per-second | 每秒最大允許傳輸的 byte 數,計算壓縮后的 byte(如壓縮),若小于 0 則不限制 | Double: [Double.MIN_VALUE, Double.MAX_VALUE] | 選填 | -1 |
| load-tsfile-strategy | 文件同步數據時,接收端請求返回發送端前,是否等待接收端本地的 load tsfile 執行結果返回。 sync:等待本地的 load tsfile 執行結果返回; async:不等待本地的 load tsfile 執行結果返回。 | String: sync / async | 選填 | sync |
| air-gap.handshake-timeout-ms | 發送端與接收端在首次嘗試建立連接時握手請求的超時時長,單位:毫秒 | Integer | 選填 | 5000 |
iotdb-thrift-ssl-sink
| key | value | value 取值范圍 | 是否必填 | 默認取值 |
|---|---|---|---|---|
| sink | iotdb-thrift-ssl-sink | String: iotdb-thrift-ssl-sink | 必填 | - |
| node-urls | 目標端 IoTDB 任意多個 DataNode 節點的數據服務端口的 url(請注意同步任務不支持向自身服務進行轉發) | String. 例:'127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| user/username | 連接接收端使用的用戶名,同步要求該用戶具備相應的操作權限 | String | 選填 | root |
| password | 連接接收端使用的用戶名對應的密碼,同步要求該用戶具備相應的操作權限 | String | 選填 | TimechoDB@2021, V2.0.6.x 之前為root |
| batch.enable | 是否開啟日志攢批發送模式,用于提高傳輸吞吐,降低 IOPS | Boolean: true, false | 選填 | true |
| batch.max-delay-seconds | 在開啟日志攢批發送模式時生效,表示一批數據在發送前的最長等待時間(單位:s) | Integer | 選填 | 1 |
| batch.max-delay-ms | 在開啟日志攢批發送模式時生效,表示一批數據在發送前的最長等待時間(單位:ms)(V2.0.5及以后版本支持) | Integer | 選填 | 1 |
| batch.size-bytes | 在開啟日志攢批發送模式時生效,表示一批數據最大的攢批大小(單位:byte) | Long | 選填 | 1610241024 |
| compressor | 所選取的 rpc 壓縮算法,可配置多個,對每個請求順序采用 | String: snappy / gzip / lz4 / zstd / lzma2 | 選填 | "" |
| compressor.zstd.level | 所選取的 rpc 壓縮算法為 zstd 時,可使用該參數額外配置 zstd 算法的壓縮等級 | Int: [-131072, 22] | 選填 | 3 |
| rate-limit-bytes-per-second | 每秒最大允許傳輸的 byte 數,計算壓縮后的 byte(如壓縮),若小于 0 則不限制 | Double: [Double.MIN_VALUE, Double.MAX_VALUE] | 選填 | -1 |
| load-tsfile-strategy | 文件同步數據時,接收端請求返回發送端前,是否等待接收端本地的 load tsfile 執行結果返回。 sync:等待本地的 load tsfile 執行結果返回; async:不等待本地的 load tsfile 執行結果返回。 | String: sync / async | 選填 | sync |
| ssl.trust-store-path | 連接目標端 DataNode 所需的 trust store 證書路徑 | String.Example: '127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669', '127.0.0.1:6667' | 必填 | - |
| ssl.trust-store-pwd | 連接目標端 DataNode 所需的 trust store 證書密碼 | Integer | 必填 | - |
| format | 數據傳輸的payload格式, 可選項包括: - hybrid: 取決于 processor 傳遞過來的格式(tsfile或tablet),sink不做任何轉換。 - tsfile:強制轉換成tsfile發送,可用于數據文件備份等場景。 - tablet:強制轉換成tsfile發送,可用于發送端/接收端數據類型不完全兼容時的數據同步(以減少報錯)。 | String: hybrid / tsfile / tablet | 選填 | hybrid |