流計算框架
流計算框架
IoTDB 流處理框架允許用戶實現自定義的流處理邏輯,可以實現對存儲引擎變更的監聽和捕獲、實現對變更數據的變形、實現對變形后數據的向外推送等邏輯。
我們將。一個流處理任務(Pipe)包含三個子任務:
- 抽取(Source)
- 處理(Process)
- 發送(Sink)
流處理框架允許用戶使用 Java 語言自定義編寫三個子任務的處理邏輯,通過類似 UDF 的方式處理數據。
在一個 Pipe 中,上述的三個子任務分別由三種插件執行實現,數據會依次經過這三個插件進行處理:
Pipe Source 用于抽取數據,Pipe Processor 用于處理數據,Pipe Sink 用于發送數據,最終數據將被發至外部系統。
Pipe 任務的模型如下:

描述一個數據流處理任務,本質就是描述 Pipe Source、Pipe Processor 和 Pipe Sink 插件的屬性。
用戶可以通過 SQL 語句聲明式地配置三個子任務的具體屬性,通過組合不同的屬性,實現靈活的數據 ETL 能力。
利用流處理框架,可以搭建完整的數據鏈路來滿足端邊云同步、異地災備、讀寫負載分庫等需求。
自定義流處理插件開發
編程開發依賴
推薦采用 maven 構建項目,在pom.xml中添加以下依賴。請注意選擇和 IoTDB 服務器版本相同的依賴版本。
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>pipe-api</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>事件驅動編程模型
流處理插件的用戶編程接口設計,參考了事件驅動編程模型的通用設計理念。事件(Event)是用戶編程接口中的數據抽象,而編程接口與具體的執行方式解耦,只需要專注于描述事件(數據)到達系統后,系統期望的處理方式即可。
在流處理插件的用戶編程接口中,事件是數據庫數據寫入操作的抽象。事件由單機流處理引擎捕獲,按照流處理三個階段的流程,依次傳遞至 PipeSource 插件,PipeProcessor 插件和 PipeSink 插件,并依次在三個插件中觸發用戶邏輯的執行。
為了兼顧端側低負載場景下的流處理低延遲和端側高負載場景下的流處理高吞吐,流處理引擎會動態地在操作日志和數據文件中選擇處理對象,因此,流處理的用戶編程接口要求用戶提供下列兩類事件的處理邏輯:操作日志寫入事件 TabletInsertionEvent 和數據文件寫入事件 TsFileInsertionEvent。
操作日志寫入事件(TabletInsertionEvent)
操作日志寫入事件(TabletInsertionEvent)是對用戶寫入請求的高層數據抽象,它通過提供統一的操作接口,為用戶提供了操縱寫入請求底層數據的能力。
對于不同的數據庫部署方式,操作日志寫入事件對應的底層存儲結構是不一樣的。對于單機部署的場景,操作日志寫入事件是對寫前日志(WAL)條目的封裝;對于分布式部署的場景,操作日志寫入事件是對單個節點共識協議操作日志條目的封裝。
對于數據庫不同寫入請求接口生成的寫入操作,操作日志寫入事件對應的請求結構體的數據結構也是不一樣的。IoTDB 提供了 InsertRecord、InsertRecords、InsertTablet、InsertTablets 等眾多的寫入接口,每一種寫入請求都使用了完全不同的序列化方式,生成的二進制條目也不盡相同。
操作日志寫入事件的存在,為用戶提供了一種統一的數據操作視圖,它屏蔽了底層數據結構的實現差異,極大地降低了用戶的編程門檻,提升了功能的易用性。
/** TabletInsertionEvent is used to define the event of data insertion. */
public interface TabletInsertionEvent extends Event {
/**
* The consumer processes the data row by row and collects the results by RowCollector.
*
* @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/
Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer);
/**
* The consumer processes the Tablet directly and collects the results by RowCollector.
*
* @return {@code Iterable<TabletInsertionEvent>} a list of new TabletInsertionEvent contains the
* results collected by the RowCollector
*/
Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer);
}數據文件寫入事件(TsFileInsertionEvent)
數據文件寫入事件(TsFileInsertionEvent) 是對數據庫文件落盤操作的高層抽象,它是若干操作日志寫入事件(TabletInsertionEvent)的數據集合。
IoTDB 的存儲引擎是 LSM 結構的。數據寫入時會先將寫入操作落盤到日志結構的文件里,同時將寫入數據保存在內存里。當內存達到控制上限,則會觸發刷盤行為,即將內存中的數據轉換為數據庫文件,同時刪除之前預寫的操作日志。當內存中的數據轉換為數據庫文件中的數據時,會經過編碼壓縮和通用壓縮兩次壓縮處理,因此數據庫文件的數據相比內存中的原始數據占用的空間更少。
在極端的網絡情況下,直接傳輸數據文件相比傳輸數據寫入的操作要更加經濟,它會占用更低的網絡帶寬,能實現更快的傳輸速度。當然,天下沒有免費的午餐,對文件中的數據進行計算處理,相比直接對內存中的數據進行計算處理時,需要額外付出文件 I/O 的代價。但是,正是磁盤數據文件和內存寫入操作兩種結構各有優劣的存在,給了系統做動態權衡調整的機會,也正是基于這樣的觀察,插件的事件模型中才引入了數據文件寫入事件。
綜上,數據文件寫入事件出現在流處理插件的事件流中,存在下面兩種情況:
(1)歷史數據抽?。阂粋€流處理任務開始前,所有已經落盤的寫入數據都會以 TsFile 的形式存在。一個流處理任務開始后,采集歷史數據時,歷史數據將以 TsFileInsertionEvent 作為抽象;
(2)實時數據抽?。阂粋€流處理任務進行時,當數據流中實時處理操作日志寫入事件的速度慢于寫入請求速度一定進度之后,未來得及處理的操作日志寫入事件會被被持久化至磁盤,以 TsFile 的形式存在,這一些數據被流處理引擎抽取到后,會以 TsFileInsertionEvent 作為抽象。
/**
* TsFileInsertionEvent is used to define the event of writing TsFile. Event data stores in disks,
* which is compressed and encoded, and requires IO cost for computational processing.
*/
public interface TsFileInsertionEvent extends Event {
/**
* The method is used to convert the TsFileInsertionEvent into several TabletInsertionEvents.
*
* @return {@code Iterable<TabletInsertionEvent>} the list of TabletInsertionEvent
*/
Iterable<TabletInsertionEvent> toTabletInsertionEvents();
}自定義流處理插件編程接口定義
基于自定義流處理插件編程接口,用戶可以輕松編寫數據抽取插件、數據處理插件和數據發送插件,從而使得流處理功能靈活適配各種工業場景。
數據抽取插件接口
數據抽取是流處理數據從數據抽取到數據發送三階段的第一階段。數據抽取插件(PipeSource)是流處理引擎和存儲引擎的橋梁,它通過監聽存儲引擎的行為,
捕獲各種數據寫入事件。
/**
* PipeSource
*
* <p>PipeSource is responsible for capturing events from sources.
*
* <p>Various data sources can be supported by implementing different PipeSource classes.
*
* <p>The lifecycle of a PipeSource is as follows:
*
* <ul>
* <li>When a collaboration task is created, the KV pairs of `WITH SOURCE` clause in SQL are
* parsed and the validation method {@link PipeSource#validate(PipeParameterValidator)} will
* be called to validate the parameters.
* <li>Before the collaboration task starts, the method {@link
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} will be called to
* config the runtime behavior of the PipeSource.
* <li>Then the method {@link PipeSource#start()} will be called to start the PipeSource.
* <li>While the collaboration task is in progress, the method {@link PipeSource#supply()} will be
* called to capture events from sources and then the events will be passed to the
* PipeProcessor.
* <li>The method {@link PipeSource#close()} will be called when the collaboration task is
* cancelled (the `DROP PIPE` command is executed).
* </ul>
*/
public interface PipeSource extends PipePlugin {
/**
* This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is called.
*
* @param validator the validator used to validate {@link PipeParameters}
* @throws Exception if any parameter is not valid
*/
void validate(PipeParameterValidator validator) throws Exception;
/**
* This method is mainly used to customize PipeSource. In this method, the user can do the
* following things:
*
* <ul>
* <li>Use PipeParameters to parse key-value pair attributes entered by the user.
* <li>Set the running configurations in PipeSourceRuntimeConfiguration.
* </ul>
*
* <p>This method is called after the method {@link PipeSource#validate(PipeParameterValidator)}
* is called.
*
* @param parameters used to parse the input parameters entered by the user
* @param configuration used to set the required properties of the running PipeSource
* @throws Exception the user can throw errors if necessary
*/
void customize(PipeParameters parameters, PipeSourceRuntimeConfiguration configuration)
throws Exception;
/**
* Start the Source. After this method is called, events should be ready to be supplied by
* {@link PipeSource#supply()}. This method is called after {@link
* PipeSource#customize(PipeParameters, PipeSourceRuntimeConfiguration)} is called.
*
* @throws Exception the user can throw errors if necessary
*/
void start() throws Exception;
/**
* Supply single event from the Source and the caller will send the event to the processor.
* This method is called after {@link PipeSource#start()} is called.
*
* @return the event to be supplied. the event may be null if the Source has no more events at
* the moment, but the Source is still running for more events.
* @throws Exception the user can throw errors if necessary
*/
Event supply() throws Exception;
}數據處理插件接口
數據處理是流處理數據從數據抽取到數據發送三階段的第二階段。數據處理插件(PipeProcessor)主要用于過濾和轉換由數據抽取插件(PipeSource)捕獲的
各種事件。
/**
* PipeProcessor
*
* <p>PipeProcessor is used to filter and transform the Event formed by the PipeSource.
*
* <p>The lifecycle of a PipeProcessor is as follows:
*
* <ul>
* <li>When a collaboration task is created, the KV pairs of `WITH PROCESSOR` clause in SQL are
* parsed and the validation method {@link PipeProcessor#validate(PipeParameterValidator)}
* will be called to validate the parameters.
* <li>Before the collaboration task starts, the method {@link
* PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} will be called
* to config the runtime behavior of the PipeProcessor.
* <li>While the collaboration task is in progress:
* <ul>
* <li>PipeSource captures the events and wraps them into three types of Event instances.
* <li>PipeProcessor processes the event and then passes them to the PipeSink. The
* following 3 methods will be called: {@link
* PipeProcessor#process(TabletInsertionEvent, EventCollector)}, {@link
* PipeProcessor#process(TsFileInsertionEvent, EventCollector)} and {@link
* PipeProcessor#process(Event, EventCollector)}.
* <li>PipeSink serializes the events into binaries and send them to sinks.
* </ul>
* <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
* PipeProcessor#close() } method will be called.
* </ul>
*/
public interface PipeProcessor extends PipePlugin {
/**
* This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
* PipeProcessor#customize(PipeParameters, PipeProcessorRuntimeConfiguration)} is called.
*
* @param validator the validator used to validate {@link PipeParameters}
* @throws Exception if any parameter is not valid
*/
void validate(PipeParameterValidator validator) throws Exception;
/**
* This method is mainly used to customize PipeProcessor. In this method, the user can do the
* following things:
*
* <ul>
* <li>Use PipeParameters to parse key-value pair attributes entered by the user.
* <li>Set the running configurations in PipeProcessorRuntimeConfiguration.
* </ul>
*
* <p>This method is called after the method {@link
* PipeProcessor#validate(PipeParameterValidator)} is called and before the beginning of the
* events processing.
*
* @param parameters used to parse the input parameters entered by the user
* @param configuration used to set the required properties of the running PipeProcessor
* @throws Exception the user can throw errors if necessary
*/
void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration)
throws Exception;
/**
* This method is called to process the TabletInsertionEvent.
*
* @param tabletInsertionEvent TabletInsertionEvent to be processed
* @param eventCollector used to collect result events after processing
* @throws Exception the user can throw errors if necessary
*/
void process(TabletInsertionEvent tabletInsertionEvent, EventCollector eventCollector)
throws Exception;
/**
* This method is called to process the TsFileInsertionEvent.
*
* @param tsFileInsertionEvent TsFileInsertionEvent to be processed
* @param eventCollector used to collect result events after processing
* @throws Exception the user can throw errors if necessary
*/
default void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector eventCollector)
throws Exception {
for (final TabletInsertionEvent tabletInsertionEvent :
tsFileInsertionEvent.toTabletInsertionEvents()) {
process(tabletInsertionEvent, eventCollector);
}
}
/**
* This method is called to process the Event.
*
* @param event Event to be processed
* @param eventCollector used to collect result events after processing
* @throws Exception the user can throw errors if necessary
*/
void process(Event event, EventCollector eventCollector) throws Exception;
}數據發送插件接口
數據發送是流處理數據從數據抽取到數據發送三階段的第三階段。數據發送插件(PipeSink)主要用于發送經由數據處理插件(PipeProcessor)處理過后的
各種事件,它作為流處理框架的網絡實現層,接口上應允許接入多種實時通信協議和多種連接器。
/**
* PipeSink
*
* <p>PipeSink is responsible for sending events to sinks.
*
* <p>Various network protocols can be supported by implementing different PipeSink classes.
*
* <p>The lifecycle of a PipeSink is as follows:
*
* <ul>
* <li>When a collaboration task is created, the KV pairs of `WITH SINK` clause in SQL are
* parsed and the validation method {@link PipeSink#validate(PipeParameterValidator)} will be
* called to validate the parameters.
* <li>Before the collaboration task starts, the method {@link PipeSink#customize(PipeParameters,
* PipeSinkRuntimeConfiguration)} will be called to config the runtime behavior of the
* PipeSink and the method {@link PipeSink#handshake()} will be called to create a connection
* with sink.
* <li>While the collaboration task is in progress:
* <ul>
* <li>PipeSource captures the events and wraps them into three types of Event instances.
* <li>PipeProcessor processes the event and then passes them to the PipeSink.
* <li>PipeSink serializes the events into binaries and send them to sinks. The following 3
* methods will be called: {@link PipeSink#transfer(TabletInsertionEvent)}, {@link
* PipeSink#transfer(TsFileInsertionEvent)} and {@link PipeSink#transfer(Event)}.
* </ul>
* <li>When the collaboration task is cancelled (the `DROP PIPE` command is executed), the {@link
* PipeSink#close() } method will be called.
* </ul>
*
* <p>In addition, the method {@link PipeSink#heartbeat()} will be called periodically to check
* whether the connection with sink is still alive. The method {@link PipeSink#handshake()} will be
* called to create a new connection with the sink when the method {@link PipeSink#heartbeat()}
* throws exceptions.
*/
public interface PipeSink extends PipePlugin {
/**
* This method is mainly used to validate {@link PipeParameters} and it is executed before {@link
* PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is called.
*
* @param validator the validator used to validate {@link PipeParameters}
* @throws Exception if any parameter is not valid
*/
void validate(PipeParameterValidator validator) throws Exception;
/**
* This method is mainly used to customize PipeSink. In this method, the user can do the following
* things:
*
* <ul>
* <li>Use PipeParameters to parse key-value pair attributes entered by the user.
* <li>Set the running configurations in PipeSinkRuntimeConfiguration.
* </ul>
*
* <p>This method is called after the method {@link PipeSink#validate(PipeParameterValidator)} is
* called and before the method {@link PipeSink#handshake()} is called.
*
* @param parameters used to parse the input parameters entered by the user
* @param configuration used to set the required properties of the running PipeSink
* @throws Exception the user can throw errors if necessary
*/
void customize(PipeParameters parameters, PipeSinkRuntimeConfiguration configuration)
throws Exception;
/**
* This method is used to create a connection with sink. This method will be called after the
* method {@link PipeSink#customize(PipeParameters, PipeSinkRuntimeConfiguration)} is called or
* will be called when the method {@link PipeSink#heartbeat()} throws exceptions.
*
* @throws Exception if the connection is failed to be created
*/
void handshake() throws Exception;
/**
* This method will be called periodically to check whether the connection with sink is still
* alive.
*
* @throws Exception if the connection dies
*/
void heartbeat() throws Exception;
/**
* This method is used to transfer the TabletInsertionEvent.
*
* @param tabletInsertionEvent TabletInsertionEvent to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception;
/**
* This method is used to transfer the TsFileInsertionEvent.
*
* @param tsFileInsertionEvent TsFileInsertionEvent to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
try {
for (final TabletInsertionEvent tabletInsertionEvent :
tsFileInsertionEvent.toTabletInsertionEvents()) {
transfer(tabletInsertionEvent);
}
} finally {
tsFileInsertionEvent.close();
}
}
/**
* This method is used to transfer the generic events, including HeartbeatEvent.
*
* @param event Event to be transferred
* @throws PipeConnectionException if the connection is broken
* @throws Exception the user can throw errors if necessary
*/
void transfer(Event event) throws Exception;
}自定義流處理插件管理
為了保證用戶自定義插件在實際生產中的靈活性和易用性,系統還需要提供對插件進行動態統一管理的能力。
本章節介紹的流處理插件管理語句提供了對插件進行動態統一管理的入口。
加載插件語句
在 IoTDB 中,若要在系統中動態載入一個用戶自定義插件,則首先需要基于 PipeSource、 PipeProcessor 或者 PipeSink 實現一個具體的插件類,
然后需要將插件類編譯打包成 jar 可執行文件,最后使用加載插件的管理語句將插件載入 IoTDB。
加載插件的管理語句的語法如圖所示。
CREATE PIPEPLUGIN [IF NOT EXISTS] <別名>
AS <全類名>
USING <JAR 包的 URI>IF NOT EXISTS 語義:用于創建操作中,確保當指定 Pipe Plugin 不存在時,執行創建命令,防止因嘗試創建已存在的 Pipe Plugin 而導致報錯。
示例:假如用戶實現了一個全類名為edu.tsinghua.iotdb.pipe.ExampleProcessor 的數據處理插件,打包后的jar包為 pipe-plugin.jar ,用戶希望在流處理引擎中使用這個插件,將插件標記為 example。插件包有兩種使用方式,一種為上傳到URI服務器,一種為上傳到集群本地目錄,兩種方法任選一種即可。
【方式一】上傳到URI服務器
準備工作:使用該種方式注冊,您需要提前將 JAR 包上傳到 URI 服務器上并確保執行注冊語句的IoTDB實例能夠訪問該 URI 服務器。例如 https://example.com:8080/iotdb/pipe-plugin.jar 。
創建語句:
CREATE PIPEPLUGIN IF NOT EXISTS example
AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor'
USING URI <https://example.com:8080/iotdb/pipe-plugin.jar>【方式二】上傳到集群本地目錄
準備工作:使用該種方式注冊,您需要提前將 JAR 包放置到DataNode節點所在機器的任意路徑下,推薦您將JAR包放在IoTDB安裝路徑的/ext/pipe目錄下(安裝包中已有,無需新建)。例如:iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar。(注意:如果您使用的是集群,那么需要將 JAR 包放置到每個 DataNode 節點所在機器的該路徑下)
創建語句:
CREATE PIPEPLUGIN IF NOT EXISTS example
AS 'edu.tsinghua.iotdb.pipe.ExampleProcessor'
USING URI <file:/iotdb安裝路徑/iotdb-1.x.x-bin/ext/pipe/pipe-plugin.jar>刪除插件語句
當用戶不再想使用一個插件,需要將插件從系統中卸載時,可以使用如圖所示的刪除插件語句。
DROP PIPEPLUGIN [IF EXISTS] <別名>IF EXISTS 語義:用于刪除操作中,確保當指定 Pipe Plugin 存在時,執行刪除命令,防止因嘗試刪除不存在的 Pipe Plugin 而導致報錯。
查看插件語句
用戶也可以按需查看系統中的插件。查看插件的語句如圖所示。
SHOW PIPEPLUGINS系統預置的流處理插件
預置 source 插件
iotdb-source
作用:抽取 IoTDB 內部的歷史或實時數據進入 pipe。
| key | value | value 取值范圍 | required or optional with default |
|---|---|---|---|
| source | iotdb-source | String: iotdb-source | required |
| source.pattern | 用于篩選時間序列的路徑前綴 | String: 任意的時間序列前綴 | optional: root |
| source.history.start-time | 抽取的歷史數據的開始 event time,包含 start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MIN_VALUE |
| source.history.end-time | 抽取的歷史數據的結束 event time,包含 end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MAX_VALUE |
| start-time(V1.3.1+) | start of synchronizing all data event time,including start-time. Will disable "history.start-time" "history.end-time" if configured | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MIN_VALUE |
| end-time(V1.3.1+) | end of synchronizing all data event time,including end-time. Will disable "history.start-time" "history.end-time" if configured | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | optional: Long.MAX_VALUE |
| source.realtime.mode | 實時數據的抽取模式 | String: hybrid, log, file | optional: hybrid |
| source.forwarding-pipe-requests | 是否抽取由其他 Pipe (通常是數據同步)寫入的數據 | Boolean: true, false | optional: true |
?? source.pattern 參數說明
Pattern 需用反引號修飾不合法字符或者是不合法路徑節點,例如如果希望篩選 root.`a@b` 或者 root.`123`,應設置 pattern 為 root.`a@b` 或者 root.`123`(具體參考 單雙引號和反引號的使用時機)
在底層實現中,當檢測到 pattern 為 root(默認值)時,抽取效率較高,其他任意格式都將降低性能
路徑前綴不需要能夠構成完整的路徑。例如,當創建一個包含參數為 'source.pattern'='root.aligned.1' 的 pipe 時:
- root.aligned.1TS
- root.aligned.1TS.`1`
- root.aligned.100T
的數據會被抽??;
- root.aligned.`1`
- root.aligned.`123`
的數據不會被抽取。
??source.history 的 start-time,end-time 參數說明
- start-time,end-time 應為 ISO 格式,例如 2011-12-03T10:15:30 或 2011-12-03T10:15:30+01:00
? 一條數據從生產到落庫 IoTDB,包含兩個關鍵的時間概念
- event time: 數據實際生產時的時間(或者數據生產系統給數據賦予的生成時間,是數據點中的時間項),也稱為事件時間。
- arrival time: 數據到達 IoTDB 系統內的時間。
我們常說的亂序數據,指的是數據到達時,其 event time 遠落后于當前系統時間(或者已經落庫的最大 event time)的數據。另一方面,不論是亂序數據還是順序數據,只要它們是新到達系統的,那它們的 arrival time 都是會隨著數據到達 IoTDB 的順序遞增的。
?? iotdb-source 的工作可以拆分成兩個階段
- 歷史數據抽?。核?arrival time < 創建 pipe 時當前系統時間的數據稱為歷史數據
- 實時數據抽取:所有 arrival time >= 創建 pipe 時當前系統時間的數據稱為實時數據
歷史數據傳輸階段和實時數據傳輸階段,兩階段串行執行,只有當歷史數據傳輸階段完成后,才執行實時數據傳輸階段。
?? source.realtime.mode:數據抽取的模式
- log:該模式下,任務僅使用操作日志進行數據處理、發送
- file:該模式下,任務僅使用數據文件進行數據處理、發送
- hybrid:該模式,考慮了按操作日志逐條目發送數據時延遲低但吞吐低的特點,以及按數據文件批量發送時發送吞吐高但延遲高的特點,能夠在不同的寫入負載下自動切換適合的數據抽取方式,首先采取基于操作日志的數據抽取方式以保證低發送延遲,當產生數據積壓時自動切換成基于數據文件的數據抽取方式以保證高發送吞吐,積壓消除時自動切換回基于操作日志的數據抽取方式,避免了采用單一數據抽取算法難以平衡數據發送延遲或吞吐的問題。
?? source.forwarding-pipe-requests:是否允許轉發從另一 pipe 傳輸而來的數據
- 如果要使用 pipe 構建 A -> B -> C 的數據同步,那么 B -> C 的 pipe 需要將該參數為 true 后,A -> B 中 A 通過 pipe 寫入 B 的數據才能被正確轉發到 C
- 如果要使用 pipe 構建 A <-> B 的雙向數據同步(雙活),那么 A -> B 和 B -> A 的 pipe 都需要將該參數設置為 false,否則將會造成數據無休止的集群間循環轉發
預置 processor 插件
do-nothing-processor
作用:不對 source 傳入的事件做任何的處理。
| key | value | value 取值范圍 | required or optional with default |
|---|---|---|---|
| processor | do-nothing-processor | String: do-nothing-processor | required |
預置 sink 插件
do-nothing-sink
作用:不對 processor 傳入的事件做任何的處理。
| key | value | value 取值范圍 | required or optional with default |
|---|---|---|---|
| sink | do-nothing-sink | String: do-nothing-sink | required |
流處理任務管理
創建流處理任務
使用 CREATE PIPE 語句來創建流處理任務。以數據同步流處理任務的創建為例,示例 SQL 語句如下:
CREATE PIPE <PipeId> -- PipeId 是能夠唯一標定流處理任務的名字
WITH SOURCE (
-- 默認的 IoTDB 數據抽取插件
'source' = 'iotdb-source',
-- 路徑前綴,只有能夠匹配該路徑前綴的數據才會被抽取,用作后續的處理和發送
'source.pattern' = 'root.timecho',
-- 是否抽取歷史數據
'source.history.enable' = 'true',
-- 描述被抽取的歷史數據的時間范圍,表示最早時間
'source.history.start-time' = '2011.12.03T10:15:30+01:00',
-- 描述被抽取的歷史數據的時間范圍,表示最晚時間
'source.history.end-time' = '2022.12.03T10:15:30+01:00',
-- 是否抽取實時數據
'source.realtime.enable' = 'true',
-- 描述實時數據的抽取方式
'source.realtime.mode' = 'hybrid',
)
WITH PROCESSOR (
-- 默認的數據處理插件,即不做任何處理
'processor' = 'do-nothing-processor',
)
WITH SINK (
-- IoTDB 數據發送插件,目標端為 IoTDB
'sink' = 'iotdb-thrift-sink',
-- 目標端 IoTDB 其中一個 DataNode 節點的數據服務 ip
'sink.ip' = '127.0.0.1',
-- 目標端 IoTDB 其中一個 DataNode 節點的數據服務 port
'sink.port' = '6667',
)創建流處理任務時需要配置 PipeId 以及三個插件部分的參數:
| 配置項 | 說明 | 是否必填 | 默認實現 | 默認實現說明 | 是否允許自定義實現 |
|---|---|---|---|---|---|
| PipeId | 全局唯一標定一個流處理任務的名稱 | - | - | - | |
| source | Pipe Source 插件,負責在數據庫底層抽取流處理數據 | 選填 | iotdb-source | 將數據庫的全量歷史數據和后續到達的實時數據接入流處理任務 | 否 |
| processor | Pipe Processor 插件,負責處理數據 | 選填 | do-nothing-processor | 對傳入的數據不做任何處理 | |
| sink | Pipe Sink 插件,負責發送數據 | - | - |
示例中,使用了 iotdb-source、do-nothing-processor 和 iotdb-thrift-sink 插件構建數據流處理任務。IoTDB 還內置了其他的流處理插件,請查看“系統預置流處理插件”一節。
一個最簡的 CREATE PIPE 語句示例如下:
CREATE PIPE <PipeId> -- PipeId 是能夠唯一標定流處理任務的名字
WITH SINK (
-- IoTDB 數據發送插件,目標端為 IoTDB
'sink' = 'iotdb-thrift-sink',
-- 目標端 IoTDB 其中一個 DataNode 節點的數據服務 ip
'sink.ip' = '127.0.0.1',
-- 目標端 IoTDB 其中一個 DataNode 節點的數據服務 port
'sink.port' = '6667',
)其表達的語義是:將本數據庫實例中的全量歷史數據和后續到達的實時數據,同步到目標為 127.0.0.1:6667 的 IoTDB 實例上。
注意:
SOURCE 和 PROCESSOR 為選填配置,若不填寫配置參數,系統則會采用相應的默認實現
SINK 為必填配置,需要在 CREATE PIPE 語句中聲明式配置
SINK 具備自復用能力。對于不同的流處理任務,如果他們的 SINK 具備完全相同 KV 屬性的(所有屬性的 key 對應的 value 都相同),那么系統最終只會創建一個 SINK 實例,以實現對連接資源的復用。
- 例如,有下面 pipe1, pipe2 兩個流處理任務的聲明:
CREATE PIPE pipe1 WITH SINK ( 'sink' = 'iotdb-thrift-sink', 'sink.ip' = 'localhost', 'sink.port' = '9999', ) CREATE PIPE pipe2 WITH SINK ( 'sink' = 'iotdb-thrift-sink', 'sink.port' = '9999', 'sink.ip' = 'localhost', )- 因為它們對 SINK 的聲明完全相同(即使某些屬性聲明時的順序不同),所以框架會自動對它們聲明的 SINK 進行復用,最終 pipe1, pipe2 的 SINK 將會是同一個實例。
在 source 為默認的 iotdb-source,且 source.forwarding-pipe-requests 為默認值 true 時,請不要構建出包含數據循環同步的應用場景(會導致無限循環):
- IoTDB A -> IoTDB B -> IoTDB A
- IoTDB A -> IoTDB A
啟動流處理任務
CREATE PIPE 語句成功執行后,流處理任務相關實例會被創建,但整個流處理任務的運行狀態會被置為 STOPPED,即流處理任務不會立刻處理數據(V1.3.0)。在 1.3.1 及以上的版本,流處理任務的運行狀態在創建后將被立即置為 RUNNING。
可以使用 START PIPE 語句使流處理任務開始處理數據:
START PIPE <PipeId>停止流處理任務
使用 STOP PIPE 語句使流處理任務停止處理數據:
STOP PIPE <PipeId>刪除流處理任務
使用 DROP PIPE 語句使流處理任務停止處理數據(當流處理任務狀態為 RUNNING 時),然后刪除整個流處理任務流處理任務:
DROP PIPE <PipeId>用戶在刪除流處理任務前,不需要執行 STOP 操作。
展示流處理任務
使用 SHOW PIPES 語句查看所有流處理任務:
SHOW PIPES查詢結果如下:
+-----------+-----------------------+-------+----------+-------------+--------+----------------+
| ID| CreationTime | State|PipeSource|PipeProcessor|PipeSink|ExceptionMessage|
+-----------+-----------------------+-------+----------+-------------+--------+----------------+
|iotdb-kafka|2022-03-30T20:58:30.689|RUNNING| ...| ...| ...| {}|
+-----------+-----------------------+-------+----------+-------------+--------+----------------+
|iotdb-iotdb|2022-03-31T12:55:28.129|STOPPED| ...| ...| ...| TException: ...|
+-----------+-----------------------+-------+----------+-------------+--------+----------------+可以使用 <PipeId> 指定想看的某個流處理任務狀態:
SHOW PIPE <PipeId>您也可以通過 where 子句,判斷某個 <PipeId> 使用的 Pipe Sink 被復用的情況。
SHOW PIPES
WHERE SINK USED BY <PipeId>流處理任務運行狀態遷移
一個流處理 pipe 在其的生命周期中會經過多種狀態:
- RUNNING: pipe 正在正常工作
- 當一個 pipe 被成功創建之后,其初始狀態為工作狀態(V1.3.1+)
- STOPPED: pipe 處于停止運行狀態。當管道處于該狀態時,有如下幾種可能:
- 當一個 pipe 被成功創建之后,其初始狀態為暫停狀態(V1.3.0)
- 用戶手動將一個處于正常運行狀態的 pipe 暫停,其狀態會被動從 RUNNING 變為 STOPPED
- 當一個 pipe 運行過程中出現無法恢復的錯誤時,其狀態會自動從 RUNNING 變為 STOPPED
- DROPPED: pipe 任務被永久刪除
下圖表明了所有狀態以及狀態的遷移:

權限管理
流處理任務
| 權限名稱 | 描述 |
|---|---|
| USE_PIPE | 注冊流處理任務。路徑無關。 |
| USE_PIPE | 開啟流處理任務。路徑無關。 |
| USE_PIPE | 停止流處理任務。路徑無關。 |
| USE_PIPE | 卸載流處理任務。路徑無關。 |
| USE_PIPE | 查詢流處理任務。路徑無關。 |
流處理任務插件
| 權限名稱 | 描述 |
|---|---|
| USE_PIPE | 注冊流處理任務插件。路徑無關。 |
| USE_PIPE | 卸載流處理任務插件。路徑無關。 |
| USE_PIPE | 查詢流處理任務插件。路徑無關。 |
配置參數
在 iotdb-system.properties 中:
V1.3.0+:
####################
### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_connector_timeout_ms=900000
# The maximum number of selectors that can be used in the async connector.
# pipe_async_connector_selector_number=1
# The core number of clients that can be used in the async connector.
# pipe_async_connector_core_client_number=8
# The maximum number of clients that can be used in the async connector.
# pipe_async_connector_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780V1.3.1+:
# Uncomment the following field to configure the pipe lib directory.
# For Windows platform
# If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is
# absolute. Otherwise, it is relative.
# pipe_lib_dir=ext\\pipe
# For Linux platform
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
# The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor.
# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)).
# pipe_subtask_executor_max_thread_num=5
# The connection timeout (in milliseconds) for the thrift client.
# pipe_sink_timeout_ms=900000
# The maximum number of selectors that can be used in the sink.
# Recommend to set this value to less than or equal to pipe_sink_max_client_number.
# pipe_sink_selector_number=4
# The maximum number of clients that can be used in the sink.
# pipe_sink_max_client_number=16
# Whether to enable receiving pipe data through air gap.
# The receiver can only return 0 or 1 in tcp mode to indicate whether the data is received successfully.
# pipe_air_gap_receiver_enabled=false
# The port for the server to receive pipe data through air gap.
# pipe_air_gap_receiver_port=9780