Python 原生接口
Python 原生接口
1. 依賴
在使用 Python 原生接口包前,您需要安裝 thrift (>=0.13) 依賴。
2. 如何使用 (示例)
首先下載包:pip3 install apache-iotdb>=2.0
您可以從這里得到一個使用該包進行數據讀寫的例子:Session Example
關于對齊時間序列讀寫的例子:Aligned Timeseries Session Example
(您需要在文件的頭部添加import iotdb)
或者:
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "TimechoDB@2021" //V2.0.6.x 之前密碼默認值為root
session = Session(ip, port_, username_, password_)
session.open(False)
zone = session.get_time_zone()
session.close()3. 基本接口說明
下面將給出 Session 對應的接口的簡要介紹和對應參數:
3.1 初始化
- 初始化 Session
session = Session(
ip="127.0.0.1",
port="6667",
user="root",
password="TimechoDB@2021", //V2.0.6.x 之前密碼默認值為root
fetch_size=1024,
zone_id="UTC+8",
enable_redirection=True
)- 初始化可連接多節點的 Session
session = Session.init_from_node_urls(
node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
user="root",
password="TimechoDB@2021", //V2.0.6.x 之前密碼默認值為root
fetch_size=1024,
zone_id="UTC+8",
enable_redirection=True
)- 開啟 Session,并決定是否開啟 RPC 壓縮
session.open(enable_rpc_compression=False)注意: 客戶端的 RPC 壓縮開啟狀態需和服務端一致
- 關閉 Session
session.close()3.2 通過SessionPool管理session連接
利用SessionPool管理session,不需要再考慮如何重用session。當session連接到達pool的最大值時,獲取session的請求會被阻塞,可以通過參數設置阻塞等待時間。每次session使用完需要使用putBack方法將session歸還到SessionPool中管理。
創建SessionPool
pool_config = PoolConfig(host=ip,port=port, user_name=username,
password=password, fetch_size=1024,
time_zone="UTC+8", max_retry=3)
max_pool_size = 5
wait_timeout_in_ms = 3000
# 通過配置參數創建連接池
session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)通過分布式節點創建SessionPool
pool_config = PoolConfig(node_urls=node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], user_name=username,
password=password, fetch_size=1024,
time_zone="UTC+8", max_retry=3)
max_pool_size = 5
wait_timeout_in_ms = 3000通過SessionPool獲取session,使用完手動調用PutBack
session = session_pool.get_session()
session.set_storage_group(STORAGE_GROUP_NAME)
session.create_time_series(
TIMESERIES_PATH, TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
# 使用完調用putBack歸還
session_pool.put_back(session)
# 關閉sessionPool時同時關閉管理的session
session_pool.close()3.3 SSL 連接
3.3.1 服務器端配置證書
conf/iotdb-system.properties 配置文件中查找或添加以下配置項:
enable_thrift_ssl=true
key_store_path=/path/to/your/server_keystore.jks
key_store_pwd=your_keystore_password3.3.2 配置 python 客戶端證書
- 設置 use_ssl 為 True 以啟用 SSL。
- 指定客戶端證書路徑,使用 ca_certs 參數。
use_ssl = True
ca_certs = "/path/to/your/server.crt" # 或 ca_certs = "/path/to/your//ca_cert.pem"示例代碼:使用 SSL 連接 IoTDB
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from iotdb.SessionPool import PoolConfig, SessionPool
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "TimechoDB@2021" //V2.0.6.x 之前密碼默認值為root
# Configure SSL enabled
use_ssl = True
# Configure certificate path
ca_certs = "/path/server.crt"
def get_data():
session = Session(
ip, port_, username_, password_, use_ssl=use_ssl, ca_certs=ca_certs
)
session.open(False)
result = session.execute_query_statement("select * from root.eg.etth")
df = result.todf()
df.rename(columns={"Time": "date"}, inplace=True)
session.close()
return df
def get_data2():
pool_config = PoolConfig(
host=ip,
port=port_,
user_name=username_,
password=password_,
fetch_size=1024,
time_zone="UTC+8",
max_retry=3,
use_ssl=use_ssl,
ca_certs=ca_certs,
)
max_pool_size = 5
wait_timeout_in_ms = 3000
session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
session = session_pool.get_session()
result = session.execute_query_statement("select * from root.eg.etth")
df = result.todf()
df.rename(columns={"Time": "date"}, inplace=True)
session_pool.put_back(session)
session_pool.close()
if __name__ == "__main__":
df = get_data()4. 數據定義接口 DDL
4.1 Database 管理
- 設置 database
session.set_storage_group(group_name)- 刪除單個或多個 database
session.delete_storage_group(group_name)
session.delete_storage_groups(group_name_lst)4.2 時間序列管理
- 創建單個或多個時間序列
session.create_time_series(ts_path, data_type, encoding, compressor,
props=None, tags=None, attributes=None, alias=None)
session.create_multi_time_series(
ts_path_lst, data_type_lst, encoding_lst, compressor_lst,
props_lst=None, tags_lst=None, attributes_lst=None, alias_lst=None
)- 創建對齊時間序列
session.create_aligned_time_series(
device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
)注意:目前暫不支持使用傳感器別名。
- 刪除一個或多個時間序列
session.delete_time_series(paths_list)- 檢測時間序列是否存在
session.check_time_series_exists(path)5. 數據操作接口 DML
5.1 數據寫入
推薦使用 insert_tablet 幫助提高寫入效率
- 插入一個 Tablet,Tablet 是一個設備若干行數據塊,每一行的列都相同
- 寫入效率高
- 支持寫入空值 (0.13 版本起)
Python API 里目前有兩種 Tablet 實現
- 普通 Tablet
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
]
timestamps_ = [1, 2, 3, 4]
tablet_ = Tablet(
device_id, measurements_, data_types_, values_, timestamps_
)
session.insert_tablet(tablet_)
values_ = [
[None, 10, 11, 1.1, 10011.1, "test01"],
[True, None, 11111, 1.25, 101.0, "test02"],
[False, 100, None, 188.1, 688.25, "test03"],
[True, 0, 0, 0, None, None],
]
timestamps_ = [16, 17, 18, 19]
tablet_ = Tablet(
device_id, measurements_, data_types_, values_, timestamps_
)
session.insert_tablet(tablet_)- Numpy Tablet
相較于普通 Tablet,Numpy Tablet 使用 numpy.ndarray 來記錄數值型數據。
內存占用和序列化耗時會降低很多,寫入效率也會有很大提升。
注意
- Tablet 中的每一列時間戳和值記錄為一個 ndarray
- Numpy Tablet 只支持大端類型數據,ndarray 構建時如果不指定數據類型會使用小端,因此推薦在構建 ndarray 時指定下面例子中類型使用大端。如果不指定,IoTDB Python客戶端也會進行大小端轉換,不影響使用正確性。
import numpy as np
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
np_values_ = [
np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
]
np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
np_tablet_ = NumpyTablet(
device_id, measurements_, data_types_, np_values_, np_timestamps_
)
session.insert_tablet(np_tablet_)
# insert one numpy tablet with None into the database.
np_values_ = [
np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
]
np_timestamps_ = np.array([98, 99, 100, 101], TSDataType.INT64.np_dtype())
np_bitmaps_ = []
for i in range(len(measurements_)):
np_bitmaps_.append(BitMap(len(np_timestamps_)))
np_bitmaps_[0].mark(0)
np_bitmaps_[1].mark(1)
np_bitmaps_[2].mark(2)
np_bitmaps_[4].mark(3)
np_bitmaps_[5].mark(3)
np_tablet_with_none = NumpyTablet(
device_id, measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
)
session.insert_tablet(np_tablet_with_none)- 插入多個 Tablet
session.insert_tablets(tablet_lst)- 插入一個 Record,一個 Record 是一個設備一個時間戳下多個測點的數據。
session.insert_record(device_id, timestamp, measurements_, data_types_, values_)- 插入多個 Record
session.insert_records(
device_ids_, time_list_, measurements_list_, data_type_list_, values_list_
)- 插入同屬于一個 device 的多個 Record
session.insert_records_of_one_device(device_id, time_list, measurements_list, data_types_list, values_list)5.2 帶有類型推斷的寫入
當數據均是 String 類型時,我們可以使用如下接口,根據 value 的值進行類型推斷。例如:value 為 "true" ,就可以自動推斷為布爾類型。value 為 "3.2" ,就可以自動推斷為數值類型。服務器需要做類型推斷,可能會有額外耗時,速度較無需類型推斷的寫入慢
session.insert_str_record(device_id, timestamp, measurements, string_values)5.3 對齊時間序列的寫入
對齊時間序列的寫入使用 insert_aligned_xxx 接口,其余與上述接口類似:
- insert_aligned_record
- insert_aligned_records
- insert_aligned_records_of_one_device
- insert_aligned_tablet
- insert_aligned_tablets
6. IoTDB-SQL 接口
- 執行查詢語句
session.execute_query_statement(sql)- 執行非查詢語句
session.execute_non_query_statement(sql)- 執行語句
session.execute_statement(sql)7. 元數據模版接口
7.1 構建元數據模版
- 首先構建 Template 類
- 添加子節點 MeasurementNode
- 調用創建元數據模版接口
template = Template(name=template_name, share_time=True)
m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
m_node_y = MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
m_node_z = MeasurementNode("z", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
template.add_template(m_node_x)
template.add_template(m_node_y)
template.add_template(m_node_z)
session.create_schema_template(template)7.2 改模版節點信息
修改模版節點,其中修改的模版必須已經被創建。以下函數能夠在已經存在的模版中增加或者刪除物理量
- 在模版中增加實體
session.add_measurements_in_template(template_name, measurements_path, data_types, encodings, compressors, is_aligned)- 在模版中刪除物理量
session.delete_node_in_template(template_name, path)7.3 掛載元數據模板
session.set_schema_template(template_name, prefix_path)7.4 卸載元數據模版
session.unset_schema_template(template_name, prefix_path)7.5 查看元數據模版
- 查看所有的元數據模版
session.show_all_templates()- 查看元數據模版中的物理量個數
session.count_measurements_in_template(template_name)- 判斷某個節點是否為物理量,該節點必須已經在元數據模版中
session.count_measurements_in_template(template_name, path)- 判斷某個路徑是否在元數據模版中,這個路徑有可能不在元數據模版中
session.is_path_exist_in_template(template_name, path)- 查看某個元數據模板下的物理量
session.show_measurements_in_template(template_name)- 查看掛載了某個元數據模板的路徑前綴
session.show_paths_template_set_on(template_name)- 查看使用了某個元數據模板(即序列已創建)的路徑前綴
session.show_paths_template_using_on(template_name)7.6 刪除元數據模版
刪除已經存在的元數據模版,不支持刪除已經掛載的模版
session.drop_schema_template("template_python")8. 對 Pandas 的支持
我們支持將查詢結果輕松地轉換為 Pandas Dataframe。
SessionDataSet 有一個方法.todf(),它的作用是消費 SessionDataSet 中的數據,并將數據轉換為 pandas dataframe。
例子:
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "TimechoDB@2021" //V2.0.6.x 之前密碼默認值為root
session = Session(ip, port_, username_, password_)
session.open(False)
result = session.execute_query_statement("SELECT ** FROM root")
# Transform to Pandas Dataset
df = result.todf()
session.close()
# Now you can work with the dataframe
df = ...9. IoTDB Testcontainer
Python 客戶端對測試的支持是基于testcontainers庫 (https://testcontainers-python.readthedocs.io/en/latest/index.html) 的,如果您想使用該特性,就需要將其安裝到您的項目中。
要在 Docker 容器中啟動(和停止)一個 IoTDB 數據庫,只需這樣做:
class MyTestCase(unittest.TestCase):
def test_something(self):
with IoTDBContainer() as c:
session = Session("localhost", c.get_exposed_port(6667), "root", "TimechoDB@2021") //V2.0.6.x 之前密碼默認值為root
session.open(False)
result = session.execute_query_statement("SHOW TIMESERIES")
print(result)
session.close()默認情況下,它會拉取最新的 IoTDB 鏡像 apache/iotdb:latest進行測試,如果您想指定待測 IoTDB 的版本,您只需要將版本信息像這樣聲明:IoTDBContainer("apache/iotdb:0.12.0"),此時,您就會得到一個0.12.0版本的 IoTDB 實例。
10. IoTDB DBAPI
IoTDB DBAPI 遵循 Python DB API 2.0 規范 (https://peps.python.org/pep-0249/),實現了通過Python語言訪問數據庫的通用接口。
10.1 例子
- 初始化
初始化的參數與Session部分保持一致(sqlalchemy_mode參數除外,該參數僅在SQLAlchemy方言中使用)
from iotdb.dbapi import connect
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "TimechoDB@2021" //V2.0.6.x 之前密碼默認值為root
conn = connect(ip, port_, username_, password_,fetch_size=1024,zone_id="UTC+8",sqlalchemy_mode=False)
cursor = conn.cursor()- 執行簡單的SQL語句
cursor.execute("SELECT ** FROM root")
for row in cursor.fetchall():
print(row)- 執行帶有參數的SQL語句
IoTDB DBAPI 支持pyformat風格的參數
cursor.execute("SELECT ** FROM root WHERE time < %(time)s",{"time":"2017-11-01T00:08:00.000"})
for row in cursor.fetchall():
print(row)- 批量執行帶有參數的SQL語句
seq_of_parameters = [
{"timestamp": 1, "temperature": 1},
{"timestamp": 2, "temperature": 2},
{"timestamp": 3, "temperature": 3},
{"timestamp": 4, "temperature": 4},
{"timestamp": 5, "temperature": 5},
]
sql = "insert into root.cursor(timestamp,temperature) values(%(timestamp)s,%(temperature)s)"
cursor.executemany(sql,seq_of_parameters)- 關閉連接
cursor.close()
conn.close()11. IoTDB SQLAlchemy Dialect(實驗性)
IoTDB的SQLAlchemy方言主要是為了適配Apache superset而編寫的,該部分仍在完善中,請勿在生產環境中使用!
11.1 元數據模型映射
SQLAlchemy 所使用的數據模型為關系數據模型,這種數據模型通過表格來描述不同實體之間的關系。
而 IoTDB 的數據模型為層次數據模型,通過樹狀結構來對數據進行組織。
為了使 IoTDB 能夠適配 SQLAlchemy 的方言,需要對 IoTDB 中原有的數據模型進行重新組織,
把 IoTDB 的數據模型轉換成 SQLAlchemy 的數據模型。
IoTDB 中的元數據有:
- Database:數據庫
- Path:存儲路徑
- Entity:實體
- Measurement:物理量
SQLAlchemy 中的元數據有:
- Schema:數據模式
- Table:數據表
- Column:數據列
它們之間的映射關系為:
| SQLAlchemy中的元數據 | IoTDB中對應的元數據 |
|---|---|
| Schema | Database |
| Table | Path ( from database to entity ) + Entity |
| Column | Measurement |
下圖更加清晰的展示了二者的映射關系:

11.2 數據類型映射
| IoTDB 中的數據類型 | SQLAlchemy 中的數據類型 |
|---|---|
| BOOLEAN | Boolean |
| INT32 | Integer |
| INT64 | BigInteger |
| FLOAT | Float |
| DOUBLE | Float |
| TEXT | Text |
| LONG | BigInteger |
11.3 Example
- 執行語句
from sqlalchemy import create_engine
engine = create_engine("iotdb://root:TimechoDB@2021@127.0.0.1:6667") //V2.0.6.x 之前密碼默認值為root
connect = engine.connect()
result = connect.execute("SELECT ** FROM root")
for row in result.fetchall():
print(row)- ORM (目前只支持簡單的查詢)
from sqlalchemy import create_engine, Column, Float, BigInteger, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
metadata = MetaData(
schema='root.factory'
)
Base = declarative_base(metadata=metadata)
class Device(Base):
__tablename__ = "room2.device1"
Time = Column(BigInteger, primary_key=True)
temperature = Column(Float)
status = Column(Float)
engine = create_engine("iotdb://root:TimechoDB@2021@127.0.0.1:6667") //V2.0.6.x 之前密碼默認值為root
DbSession = sessionmaker(bind=engine)
session = DbSession()
res = session.query(Device.status).filter(Device.temperature > 1)
for row in res:
print(row)12. 給開發人員
12.1 介紹
這是一個使用 thrift rpc 接口連接到 IoTDB 的示例。在 Windows 和 Linux 上操作幾乎是一樣的,但要注意路徑分隔符等不同之處。
12.2 依賴
首選 Python3.7 或更高版本。
必須安裝 thrift(0.11.0 或更高版本)才能將 thrift 文件編譯為 Python 代碼。下面是官方的安裝教程,最終,您應該得到一個 thrift 可執行文件。
http://thrift.apache.org/docs/install/在開始之前,您還需要在 Python 環境中安裝requirements_dev.txt中的其他依賴:
pip install -r requirements_dev.txt12.3 編譯 thrift 庫并調試
在 IoTDB 源代碼文件夾的根目錄下,運行mvn clean generate-sources -pl iotdb-client/client-py -am,
這個指令將自動刪除iotdb/thrift中的文件,并使用新生成的 thrift 文件重新填充該文件夾。
這個文件夾在 git 中會被忽略,并且永遠不應該被推到 git 中!
注意不要將iotdb/thrift上傳到 git 倉庫中 !
12.4 Session 客戶端 & 使用示例
我們將 thrift 接口打包到client-py/src/iotdb/session.py 中(與 Java 版本類似),還提供了一個示例文件client-py/src/SessionExample.py來說明如何使用 Session 模塊。請仔細閱讀。
另一個簡單的例子:
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "TimechoDB@2021" //V2.0.6.x 之前密碼默認值為root
session = Session(ip, port_, username_, password_)
session.open(False)
zone = session.get_time_zone()
session.close()12.5 測試
請在tests文件夾中添加自定義測試。
要運行所有的測試,只需在根目錄中運行pytest . 即可。
注意一些測試需要在您的系統上使用 docker,因為測試的 IoTDB 實例是使用 testcontainers 在 docker 容器中啟動的。
12.6 其他工具
black 和 flake8 分別用于自動格式化和 linting。
它們可以通過 black . 或 flake8 . 分別運行。
13. 發版
要進行發版,
只需確保您生成了正確的 thrift 代碼,
運行了 linting 并進行了自動格式化,
然后,確保所有測試都正常通過(通過pytest . ),
最后,您就可以將包發布到 pypi 了。
13.1 準備您的環境
首先,通過pip install -r requirements_dev.txt安裝所有必要的開發依賴。
13.2 發版
有一個腳本release.sh可以用來執行發版的所有步驟。
這些步驟包括:
刪除所有臨時目錄(如果存在)
(重新)通過 mvn 生成所有必須的源代碼
運行 linting (flke8)
通過 pytest 運行測試
Build
發布到 pypi