Python 原生接口
Python 原生接口
依賴
在使用 Python 原生接口包前,您需要安裝 thrift (>=0.13) 依賴。
如何使用 (示例)
首先下載包:pip3 install "apache-iotdb<2.0"
您可以從這里得到一個使用該包進(jìn)行數(shù)據(jù)讀寫的例子:Session Example
關(guān)于對齊時間序列讀寫的例子:Aligned Timeseries Session Example
(您需要在文件的頭部添加import iotdb)
或者:
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_)
session.open(False)
zone = session.get_time_zone()
session.close()基本接口說明
下面將給出 Session 對應(yīng)的接口的簡要介紹和對應(yīng)參數(shù):
初始化
- 初始化 Session
session = Session(
ip="127.0.0.1",
port="6667",
user="root",
password="root",
fetch_size=1024,
zone_id="UTC+8",
enable_redirection=True
)- 初始化可連接多節(jié)點的 Session
session = Session.init_from_node_urls(
node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
user="root",
password="root",
fetch_size=1024,
zone_id="UTC+8",
enable_redirection=True
)- 開啟 Session,并決定是否開啟 RPC 壓縮
session.open(enable_rpc_compression=False)注意: 客戶端的 RPC 壓縮開啟狀態(tài)需和服務(wù)端一致
- 關(guān)閉 Session
session.close()通過SessionPool管理session連接
利用SessionPool管理session,不需要再考慮如何重用session。當(dāng)session連接到達(dá)pool的最大值時,獲取session的請求會被阻塞,可以通過參數(shù)設(shè)置阻塞等待時間。每次session使用完需要使用putBack方法將session歸還到SessionPool中管理。
創(chuàng)建SessionPool
pool_config = PoolConfig(host=ip,port=port, user_name=username,
password=password, fetch_size=1024,
time_zone="UTC+8", max_retry=3)
max_pool_size = 5
wait_timeout_in_ms = 3000
# 通過配置參數(shù)創(chuàng)建連接池
session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)通過分布式節(jié)點創(chuàng)建SessionPool
pool_config = PoolConfig(node_urls=node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], user_name=username,
password=password, fetch_size=1024,
time_zone="UTC+8", max_retry=3)
max_pool_size = 5
wait_timeout_in_ms = 3000通過SessionPool獲取session,使用完手動調(diào)用PutBack
session = session_pool.get_session()
session.set_storage_group(STORAGE_GROUP_NAME)
session.create_time_series(
TIMESERIES_PATH, TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
# 使用完調(diào)用putBack歸還
session_pool.put_back(session)
# 關(guān)閉sessionPool時同時關(guān)閉管理的session
session_pool.close()SSL 連接
服務(wù)器端配置證書
conf/iotdb-system.properties 配置文件中查找或添加以下配置項:
enable_thrift_ssl=true
key_store_path=/path/to/your/server_keystore.jks
key_store_pwd=your_keystore_password配置 python 客戶端證書
- 設(shè)置 use_ssl 為 True 以啟用 SSL。
- 指定客戶端證書路徑,使用 ca_certs 參數(shù)。
use_ssl = True
ca_certs = "/path/to/your/server.crt" # 或 ca_certs = "/path/to/your//ca_cert.pem"示例代碼:使用 SSL 連接 IoTDB
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from iotdb.SessionPool import PoolConfig, SessionPool
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
# Configure SSL enabled
use_ssl = True
# Configure certificate path
ca_certs = "/path/server.crt"
def get_data():
session = Session(
ip, port_, username_, password_, use_ssl=use_ssl, ca_certs=ca_certs
)
session.open(False)
result = session.execute_query_statement("select * from root.eg.etth")
df = result.todf()
df.rename(columns={"Time": "date"}, inplace=True)
session.close()
return df
def get_data2():
pool_config = PoolConfig(
host=ip,
port=port_,
user_name=username_,
password=password_,
fetch_size=1024,
time_zone="UTC+8",
max_retry=3,
use_ssl=use_ssl,
ca_certs=ca_certs,
)
max_pool_size = 5
wait_timeout_in_ms = 3000
session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
session = session_pool.get_session()
result = session.execute_query_statement("select * from root.eg.etth")
df = result.todf()
df.rename(columns={"Time": "date"}, inplace=True)
session_pool.put_back(session)
session_pool.close()
if __name__ == "__main__":
df = get_data()數(shù)據(jù)定義接口 DDL
Database 管理
- 設(shè)置 database
session.set_storage_group(group_name)- 刪除單個或多個 database
session.delete_storage_group(group_name)
session.delete_storage_groups(group_name_lst)時間序列管理
- 創(chuàng)建單個或多個時間序列
session.create_time_series(ts_path, data_type, encoding, compressor,
props=None, tags=None, attributes=None, alias=None)
session.create_multi_time_series(
ts_path_lst, data_type_lst, encoding_lst, compressor_lst,
props_lst=None, tags_lst=None, attributes_lst=None, alias_lst=None
)- 創(chuàng)建對齊時間序列
session.create_aligned_time_series(
device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst
)注意:目前暫不支持使用傳感器別名。
- 刪除一個或多個時間序列
session.delete_time_series(paths_list)- 檢測時間序列是否存在
session.check_time_series_exists(path)數(shù)據(jù)操作接口 DML
數(shù)據(jù)寫入
推薦使用 insert_tablet 幫助提高寫入效率
- 插入一個 Tablet,Tablet 是一個設(shè)備若干行數(shù)據(jù)塊,每一行的列都相同
- 寫入效率高
- 支持寫入空值 (0.13 版本起)
Python API 里目前有兩種 Tablet 實現(xiàn)
- 普通 Tablet
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
]
timestamps_ = [1, 2, 3, 4]
tablet_ = Tablet(
device_id, measurements_, data_types_, values_, timestamps_
)
session.insert_tablet(tablet_)
values_ = [
[None, 10, 11, 1.1, 10011.1, "test01"],
[True, None, 11111, 1.25, 101.0, "test02"],
[False, 100, None, 188.1, 688.25, "test03"],
[True, 0, 0, 0, None, None],
]
timestamps_ = [16, 17, 18, 19]
tablet_ = Tablet(
device_id, measurements_, data_types_, values_, timestamps_
)
session.insert_tablet(tablet_)- Numpy Tablet
相較于普通 Tablet,Numpy Tablet 使用 numpy.ndarray 來記錄數(shù)值型數(shù)據(jù)。
內(nèi)存占用和序列化耗時會降低很多,寫入效率也會有很大提升。
注意
- Tablet 中的每一列時間戳和值記錄為一個 ndarray
- Numpy Tablet 只支持大端類型數(shù)據(jù),ndarray 構(gòu)建時如果不指定數(shù)據(jù)類型會使用小端,因此推薦在構(gòu)建 ndarray 時指定下面例子中類型使用大端。如果不指定,IoTDB Python客戶端也會進(jìn)行大小端轉(zhuǎn)換,不影響使用正確性。
import numpy as np
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
np_values_ = [
np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
]
np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
np_tablet_ = NumpyTablet(
device_id, measurements_, data_types_, np_values_, np_timestamps_
)
session.insert_tablet(np_tablet_)
# insert one numpy tablet with None into the database.
np_values_ = [
np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
]
np_timestamps_ = np.array([98, 99, 100, 101], TSDataType.INT64.np_dtype())
np_bitmaps_ = []
for i in range(len(measurements_)):
np_bitmaps_.append(BitMap(len(np_timestamps_)))
np_bitmaps_[0].mark(0)
np_bitmaps_[1].mark(1)
np_bitmaps_[2].mark(2)
np_bitmaps_[4].mark(3)
np_bitmaps_[5].mark(3)
np_tablet_with_none = NumpyTablet(
device_id, measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_
)
session.insert_tablet(np_tablet_with_none)- 插入多個 Tablet
session.insert_tablets(tablet_lst)- 插入一個 Record,一個 Record 是一個設(shè)備一個時間戳下多個測點的數(shù)據(jù)。
session.insert_record(device_id, timestamp, measurements_, data_types_, values_)- 插入多個 Record
session.insert_records(
device_ids_, time_list_, measurements_list_, data_type_list_, values_list_
)- 插入同屬于一個 device 的多個 Record
session.insert_records_of_one_device(device_id, time_list, measurements_list, data_types_list, values_list)帶有類型推斷的寫入
當(dāng)數(shù)據(jù)均是 String 類型時,我們可以使用如下接口,根據(jù) value 的值進(jìn)行類型推斷。例如:value 為 "true" ,就可以自動推斷為布爾類型。value 為 "3.2" ,就可以自動推斷為數(shù)值類型。服務(wù)器需要做類型推斷,可能會有額外耗時,速度較無需類型推斷的寫入慢
session.insert_str_record(device_id, timestamp, measurements, string_values)對齊時間序列的寫入
對齊時間序列的寫入使用 insert_aligned_xxx 接口,其余與上述接口類似:
- insert_aligned_record
- insert_aligned_records
- insert_aligned_records_of_one_device
- insert_aligned_tablet
- insert_aligned_tablets
IoTDB-SQL 接口
- 執(zhí)行查詢語句
session.execute_query_statement(sql)- 執(zhí)行非查詢語句
session.execute_non_query_statement(sql)- 執(zhí)行語句
session.execute_statement(sql)元數(shù)據(jù)模版接口
構(gòu)建元數(shù)據(jù)模版
- 首先構(gòu)建 Template 類
- 添加子節(jié)點 MeasurementNode
- 調(diào)用創(chuàng)建元數(shù)據(jù)模版接口
template = Template(name=template_name, share_time=True)
m_node_x = MeasurementNode("x", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
m_node_y = MeasurementNode("y", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
m_node_z = MeasurementNode("z", TSDataType.FLOAT, TSEncoding.RLE, Compressor.SNAPPY)
template.add_template(m_node_x)
template.add_template(m_node_y)
template.add_template(m_node_z)
session.create_schema_template(template)修改模版節(jié)點信息
修改模版節(jié)點,其中修改的模版必須已經(jīng)被創(chuàng)建。以下函數(shù)能夠在已經(jīng)存在的模版中增加或者刪除物理量
- 在模版中增加實體
session.add_measurements_in_template(template_name, measurements_path, data_types, encodings, compressors, is_aligned)- 在模版中刪除物理量
session.delete_node_in_template(template_name, path)掛載元數(shù)據(jù)模板
session.set_schema_template(template_name, prefix_path)卸載元數(shù)據(jù)模版
session.unset_schema_template(template_name, prefix_path)查看元數(shù)據(jù)模版
- 查看所有的元數(shù)據(jù)模版
session.show_all_templates()- 查看元數(shù)據(jù)模版中的物理量個數(shù)
session.count_measurements_in_template(template_name)- 判斷某個節(jié)點是否為物理量,該節(jié)點必須已經(jīng)在元數(shù)據(jù)模版中
session.count_measurements_in_template(template_name, path)- 判斷某個路徑是否在元數(shù)據(jù)模版中,這個路徑有可能不在元數(shù)據(jù)模版中
session.is_path_exist_in_template(template_name, path)- 查看某個元數(shù)據(jù)模板下的物理量
session.show_measurements_in_template(template_name)- 查看掛載了某個元數(shù)據(jù)模板的路徑前綴
session.show_paths_template_set_on(template_name)- 查看使用了某個元數(shù)據(jù)模板(即序列已創(chuàng)建)的路徑前綴
session.show_paths_template_using_on(template_name)刪除元數(shù)據(jù)模版
刪除已經(jīng)存在的元數(shù)據(jù)模版,不支持刪除已經(jīng)掛載的模版
session.drop_schema_template("template_python")對 Pandas 的支持
我們支持將查詢結(jié)果輕松地轉(zhuǎn)換為 Pandas Dataframe。
SessionDataSet 有一個方法.todf(),它的作用是消費 SessionDataSet 中的數(shù)據(jù),并將數(shù)據(jù)轉(zhuǎn)換為 pandas dataframe。
例子:
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_)
session.open(False)
result = session.execute_query_statement("SELECT ** FROM root")
# Transform to Pandas Dataset
df = result.todf()
session.close()
# Now you can work with the dataframe
df = ...IoTDB Testcontainer
Python 客戶端對測試的支持是基于testcontainers庫 (https://testcontainers-python.readthedocs.io/en/latest/index.html) 的,如果您想使用該特性,就需要將其安裝到您的項目中。
要在 Docker 容器中啟動(和停止)一個 IoTDB 數(shù)據(jù)庫,只需這樣做:
class MyTestCase(unittest.TestCase):
def test_something(self):
with IoTDBContainer() as c:
session = Session("localhost", c.get_exposed_port(6667), "root", "root")
session.open(False)
result = session.execute_query_statement("SHOW TIMESERIES")
print(result)
session.close()默認(rèn)情況下,它會拉取最新的 IoTDB 鏡像 apache/iotdb:latest進(jìn)行測試,如果您想指定待測 IoTDB 的版本,您只需要將版本信息像這樣聲明:IoTDBContainer("apache/iotdb:0.12.0"),此時,您就會得到一個0.12.0版本的 IoTDB 實例。
IoTDB DBAPI
IoTDB DBAPI 遵循 Python DB API 2.0 規(guī)范 (https://peps.python.org/pep-0249/),實現(xiàn)了通過Python語言訪問數(shù)據(jù)庫的通用接口。
例子
- 初始化
初始化的參數(shù)與Session部分保持一致(sqlalchemy_mode參數(shù)除外,該參數(shù)僅在SQLAlchemy方言中使用)
from iotdb.dbapi import connect
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
conn = connect(ip, port_, username_, password_,fetch_size=1024,zone_id="UTC+8",sqlalchemy_mode=False)
cursor = conn.cursor()- 執(zhí)行簡單的SQL語句
cursor.execute("SELECT ** FROM root")
for row in cursor.fetchall():
print(row)- 執(zhí)行帶有參數(shù)的SQL語句
IoTDB DBAPI 支持pyformat風(fēng)格的參數(shù)
cursor.execute("SELECT ** FROM root WHERE time < %(time)s",{"time":"2017-11-01T00:08:00.000"})
for row in cursor.fetchall():
print(row)- 批量執(zhí)行帶有參數(shù)的SQL語句
seq_of_parameters = [
{"timestamp": 1, "temperature": 1},
{"timestamp": 2, "temperature": 2},
{"timestamp": 3, "temperature": 3},
{"timestamp": 4, "temperature": 4},
{"timestamp": 5, "temperature": 5},
]
sql = "insert into root.cursor(timestamp,temperature) values(%(timestamp)s,%(temperature)s)"
cursor.executemany(sql,seq_of_parameters)- 關(guān)閉連接
cursor.close()
conn.close()IoTDB SQLAlchemy Dialect(實驗性)
IoTDB的SQLAlchemy方言主要是為了適配Apache superset而編寫的,該部分仍在完善中,請勿在生產(chǎn)環(huán)境中使用!
元數(shù)據(jù)模型映射
SQLAlchemy 所使用的數(shù)據(jù)模型為關(guān)系數(shù)據(jù)模型,這種數(shù)據(jù)模型通過表格來描述不同實體之間的關(guān)系。
而 IoTDB 的數(shù)據(jù)模型為層次數(shù)據(jù)模型,通過樹狀結(jié)構(gòu)來對數(shù)據(jù)進(jìn)行組織。
為了使 IoTDB 能夠適配 SQLAlchemy 的方言,需要對 IoTDB 中原有的數(shù)據(jù)模型進(jìn)行重新組織,
把 IoTDB 的數(shù)據(jù)模型轉(zhuǎn)換成 SQLAlchemy 的數(shù)據(jù)模型。
IoTDB 中的元數(shù)據(jù)有:
- Database:數(shù)據(jù)庫
- Path:存儲路徑
- Entity:實體
- Measurement:物理量
SQLAlchemy 中的元數(shù)據(jù)有:
- Schema:數(shù)據(jù)模式
- Table:數(shù)據(jù)表
- Column:數(shù)據(jù)列
它們之間的映射關(guān)系為:
| SQLAlchemy中的元數(shù)據(jù) | IoTDB中對應(yīng)的元數(shù)據(jù) |
|---|---|
| Schema | Database |
| Table | Path ( from database to entity ) + Entity |
| Column | Measurement |
下圖更加清晰的展示了二者的映射關(guān)系:

數(shù)據(jù)類型映射
| IoTDB 中的數(shù)據(jù)類型 | SQLAlchemy 中的數(shù)據(jù)類型 |
|---|---|
| BOOLEAN | Boolean |
| INT32 | Integer |
| INT64 | BigInteger |
| FLOAT | Float |
| DOUBLE | Float |
| TEXT | Text |
| LONG | BigInteger |
Example
- 執(zhí)行語句
from sqlalchemy import create_engine
engine = create_engine("iotdb://root:root@127.0.0.1:6667")
connect = engine.connect()
result = connect.execute("SELECT ** FROM root")
for row in result.fetchall():
print(row)- ORM (目前只支持簡單的查詢)
from sqlalchemy import create_engine, Column, Float, BigInteger, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
metadata = MetaData(
schema='root.factory'
)
Base = declarative_base(metadata=metadata)
class Device(Base):
__tablename__ = "room2.device1"
Time = Column(BigInteger, primary_key=True)
temperature = Column(Float)
status = Column(Float)
engine = create_engine("iotdb://root:root@127.0.0.1:6667")
DbSession = sessionmaker(bind=engine)
session = DbSession()
res = session.query(Device.status).filter(Device.temperature > 1)
for row in res:
print(row)給開發(fā)人員
介紹
這是一個使用 thrift rpc 接口連接到 IoTDB 的示例。在 Windows 和 Linux 上操作幾乎是一樣的,但要注意路徑分隔符等不同之處。
依賴
首選 Python3.7 或更高版本。
必須安裝 thrift(0.11.0 或更高版本)才能將 thrift 文件編譯為 Python 代碼。下面是官方的安裝教程,最終,您應(yīng)該得到一個 thrift 可執(zhí)行文件。
http://thrift.apache.org/docs/install/在開始之前,您還需要在 Python 環(huán)境中安裝requirements_dev.txt中的其他依賴:
pip install -r requirements_dev.txt編譯 thrift 庫并調(diào)試
在 IoTDB 源代碼文件夾的根目錄下,運行mvn clean generate-sources -pl iotdb-client/client-py -am,
這個指令將自動刪除iotdb/thrift中的文件,并使用新生成的 thrift 文件重新填充該文件夾。
這個文件夾在 git 中會被忽略,并且永遠(yuǎn)不應(yīng)該被推到 git 中!
注意不要將iotdb/thrift上傳到 git 倉庫中 !
Session 客戶端 & 使用示例
我們將 thrift 接口打包到client-py/src/iotdb/session.py 中(與 Java 版本類似),還提供了一個示例文件client-py/src/SessionExample.py來說明如何使用 Session 模塊。請仔細(xì)閱讀。
另一個簡單的例子:
from iotdb.Session import Session
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_)
session.open(False)
zone = session.get_time_zone()
session.close()測試
請在tests文件夾中添加自定義測試。
要運行所有的測試,只需在根目錄中運行pytest . 即可。
注意一些測試需要在您的系統(tǒng)上使用 docker,因為測試的 IoTDB 實例是使用 testcontainers 在 docker 容器中啟動的。
其他工具
black 和 flake8 分別用于自動格式化和 linting。
它們可以通過 black . 或 flake8 . 分別運行。
發(fā)版
要進(jìn)行發(fā)版,
只需確保您生成了正確的 thrift 代碼,
運行了 linting 并進(jìn)行了自動格式化,
然后,確保所有測試都正常通過(通過pytest . ),
最后,您就可以將包發(fā)布到 pypi 了。
準(zhǔn)備您的環(huán)境
首先,通過pip install -r requirements_dev.txt安裝所有必要的開發(fā)依賴。
發(fā)版
有一個腳本release.sh可以用來執(zhí)行發(fā)版的所有步驟。
這些步驟包括:
刪除所有臨時目錄(如果存在)
(重新)通過 mvn 生成所有必須的源代碼
運行 linting (flke8)
通過 pytest 運行測試
Build
發(fā)布到 pypi