Go 原生接口
大約 9 分鐘
Go 原生接口
Go 原生 API 支持通過 Session 和 SessionPool 兩種方式與數據庫進行交互。由于 Session 非線程安全,因此強烈推薦使用 SessionPool 編程。在多線程并發的情形下,SessionPool 能夠合理地管理和分配連接資源,以提升系統性能與資源利用效率。
本文將圍繞 SessionPool 的使用進行說明,涵蓋從環境準備、核心操作步驟到全量接口的完整內容。
1. 環境準備
1.1 前置依賴
- golang >= 1.13
- make >= 3.0
- curl >= 7.1.1
- thrift: 0.15.0
- Linux、Macos 或其他類 unix 系統
- Windows+bash (下載 IoTDB Go client 需要 git ,通過 WSL、cygwin、Git Bash 任意一種方式均可)
1.2 安裝方法
- 使用 go mod
# 切換到 GOPATH 的 HOME 路徑,啟用 Go Modules 功能
export GO111MODULE=on
# 配置 GOPROXY 環境變量
export GOPROXY=https://goproxy.io
# 創建命名的文件夾或目錄,并切換當前目錄
mkdir session_example && cd session_example
# 保存文件,自動跳轉到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 環境
go mod init session_example
# 下載依賴包
go mod tidy
# 編譯并運行程序
go run session_example.go- 使用 GOPATH
# get thrift 0.13.0
go get github.com/apache/thrift@0.13.0
# 遞歸創建目錄
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
# 切換到當前目錄
cd $GOPATH/src/iotdb-client-go-example/session_example
# 保存文件,自動跳轉到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 環境
go mod init
# 下載依賴包
go mod tidy
# 編譯并運行程序
go run session_example.go2. 核心步驟
使用 Go 原生接口操作 IoTDB 的三個核心步驟如下:
- 創建連接池實例:初始化一個
SessionPool對象,配置連接參數和池大小。 - 執行數據庫操作:從連接池中
GetSession(),執行數據寫入或查詢等操作,完成后必須PutBack(session)。 - 關閉連接池資源:程序結束時調用
sessionPool.Close(),釋放所有連接。
下面的章節用于說明開發的核心流程,并未演示所有的參數和接口,如需了解全部功能及參數請參見: 全量接口說明 或 查閱: SessionPool 示例源碼
2.1 創建連接池實例
- 單實例
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer? ?sessionPool.Close()- 分布式或雙活
config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer? ?sessionPool.Close()2.2 數據庫操作
2.2.1 數據寫入
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)2.2.2 數據查詢
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}2.3 使用示例
import (
"flag"
"fmt"
"log"
"math/rand"
"strings"
"time"
"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)
var (
host string
port string
user string
password string
)
var sessionPool client.SessionPool
func main() {
flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
flag.StringVar(&port, "port", "6667", "--port=6667")
flag.StringVar(&user, "user", "root", "--user=root")
flag.StringVar(&password, "password", "root", "--password=root")
flag.Parse()
//1.創建連接池
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
//2.創建存儲組
setStorageGroup("root.sg1")
//3. 創建時間序列
createTimeseries("root.sg1.dev1.temperature")
//4.數據寫入
insertTablet()
//5. 數據查詢
executeQueryStatement("select temperature from root.sg1.dev1")
//6. 刪除
deleteTimeseries("root.sg1.dev1.temperature")
deleteStorageGroup("root.sg1")
}
// 設置存儲組
func setStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
session.SetStorageGroup(sg)
}
}
// 刪除存儲組
func deleteStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteStorageGroup(sg))
}
}
// 創建時間序列
func createTimeseries(path string) {
var (
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
}
// 刪除時間序列
func deleteTimeseries(paths ...string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteTimeseries(paths))
}
}
// 插入Tablet數據
func insertTablet() {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
} else {
log.Fatal(err)
}
}
}
//創建Tablet
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{
{
Measurement: "temperature",
DataType: client.FLOAT,
},
}, rowCount)
if err != nil {
return nil, err
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Float32(), 0, row)
tablet.RowSize++
}
return tablet, nil
}
// 執行查詢語句
func executeQueryStatement(sql string) {
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
}
// 打印查詢結果
func printDataSet(sds *client.SessionDataSet) {
columnNames := sds.GetColumnNames()
for _, value := range columnNames {
fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
for _, columnName := range columnNames {
isNull, _ := sds.IsNull(columnName)
if isNull {
fmt.Printf("%v\t\t", "null")
} else {
v, _ := sds.GetString(columnName)
fmt.Printf("%v\t\t", v)
}
}
fmt.Println()
}
}
// 檢查錯誤
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}3. 全量接口
3.1 SessionPool 管理接口
| 接口名稱 | 功能描述 | 參數說明 |
|---|---|---|
NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool | 創建并返回一個Session連接池實例。 | config: 連接池配置 maxSize: 最大連接數(≤0時取CPU數*5) connTimeoutMs: TCP連接超時(ms) waitTimeoutMs: 獲取Session等待超時(ms) enableComp: 是否啟用壓縮 |
GetSession() (Session, error) | 從池中獲取一個可用Session。若池滿則阻塞等待,超時返回錯誤。?必須與PutBack配對使用?。 | 無 |
PutBack(session Session) | 將使用完畢的Session歸還到連接池中。 | session: 從GetSession獲取的實例 |
Close() | 關閉連接池,釋放所有活躍連接。程序退出前必須調用。 | 無 |
3.2 數據寫入接口
以下接口需通過獲取的 Session 進行調用
| 接口名稱 | 功能描述 | 參數說明 |
|---|---|---|
InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | 插入單條記錄。 | deviceId: 設備ID measurements: 測點列表 dataTypes: 數據類型列表values: 值列表timestamp: 時間戳 |
InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | 插入單條對齊記錄。 | deviceId: 設備IDmeasurements: 測點列表dataTypes: 數據類型列表values: 值列表timestamp: 時間戳 |
InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error) | 插入字符串格式的單條記錄。 | deviceId: 設備IDmeasurements: 測點列表values: 字符串類型的值列表timestamp: 時間戳 |
InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | 插入多條記錄。 | deviceIds: 設備ID列表measurements:二維測點列表dataTypes: 二維數據類型列表values: 二維值列表timestamps: 時間戳列表 |
InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | 插入多個對齊設備的多條記錄。 | deviceIds: 設備ID列表measurements:二維測點列表dataTypes: 二維數據類型列表values: 二維值列表timestamps: 時間戳列表 |
InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | 插入單個設備的多條數據。 | tablet: 要插入的Tablet數據sorted: 數據是否已排序 |
InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | 插入單個對齊設備的多條數據。 | tablet: 要插入的Tablet數據sorted: 數據是否已排序 |
InsertTablets``(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | 批量插入多個 Tablet 數據。 | tablets: 要插入的多個Tablet 數據sorted: 數據是否已排序 |
InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | 批量插入多個對齊設備的數據。 | tablets: 要插入的多個Tablet 數據sorted: 數據是否已排序 |
3.3 SQL與查詢接口
以下接口需通過獲取的 Session 進行調用
| 接口名稱 | 功能描述 | 參數說明 |
|---|---|---|
ExecuteStatement(sql string)(SessionDataSet, error) | 執行SQL(主要查詢),返回SessionDataSet。 | sql:要執行的SQL查詢語句 |
ExecuteQueryStatement(sql string, timeoutMs int64) (SessionDataSet, error) | 執行查詢SQL,可指定超時,返回SessionDataSet。 | sql:要執行的SQL查詢語句timeoutMs: 查詢超時時間(毫秒) |
ExecuteNonQueryStatementExecuteNonQueryStatement(sql string) (r common.TSStatus, err error) | 執行不返回結果集的SQL(如INSERT, CREATE, DELETE)。 | sql:要執行的SQL語句 |
ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) | 查詢指定時間序列在時間范圍內的原始數據。 | paths: 查詢路徑列表startTime: 起始時間戳endTime: 結束時間戳 |
ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error) | 執行聚合查詢(COUNT, AVG等)。 | paths: 查詢路徑列表aggregations: 聚合類型列表startTime, endTime, interval: 起始時間、結束時間和間隔時間timeoutMs: 查詢超時時間 |
ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error) | 批量執行多條SQL語句。 | sqls:要執行的SQL語句 |
3.4 元數據操作接口
以下接口需通過獲取的 Session 進行調用
| 接口名稱 | 功能描述 | 參數說明 |
|---|---|---|
SetStorageGroup(storageGroupId string) (r common.TSStatus, err error) | 創建數據庫(存儲組)。 | storageGroupId:數據庫(存儲組)名稱 |
DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error) | 刪除一個數據庫(存儲組)。 | storageGroupId:要刪除的數據庫(存儲組)名稱 |
DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error) | 刪除多個數據庫(存儲組)。 | storageGroupIds:要刪除的數據庫(存儲組)名稱列表 |
CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error) | 創建非對齊時間序列。 | path: 時間序列路徑dataType: 數據類型encoding: 編碼方式compressor: 壓縮算法attributes: (可選)序列屬性tags: (可選)序列標簽 |
CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error) | 創建一組對齊時間序列。 | prefixPath: 時間序列路徑前綴measurements: 測點名稱列表dataTypes, encodings, compressors: 每個測點對應的數據類型、編碼和壓縮算法列表measurementAlias: (可選)每個測點的別名列表 |
DeleteTimeseries(paths []string) (r common.TSStatus, err error) | 刪除多條時間序列(含數據)。 | paths:要刪除的時間序列路徑列表 |
DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error) | 刪除指定時間序列在時間段內的數據(保留元數據)。 | paths: 要刪除的時間序列路徑列表startTime: 起始時間戳endTime: 結束時間戳。 |
SetTimeZone(timeZone string) (r common.TSStatus, err error) | 設置當前會話時區。 | timeZone: 時區字符串,例如 ”UTC”, ”Asia/Shanghai”, ”GMT+8” |
GetTimeZone() (string, error) | 獲取當前會話時區。 | 無 |
3.5 關鍵配置結構 (PoolConfig)
| 字段 | 類型 | 必填 | 描述 |
|---|---|---|---|
Host | string | 與NodeUrls二選一 | 單節點主機地址。 |
Port | string | 與NodeUrls二選一 | 單節點端口。 |
NodeUrls | []string | 與Host/Port二選一 | 集群節點地址列表,格式為”host:port”。 |
UserName | string | 是 | 用戶名。 |
Password | string | 是 | 密碼。 |
FetchSize | int32 | 否 | 查詢結果集獲取大小,默認1024。 |
TimeZone | string | 否 | 會話時區,如”Asia/Shanghai”,默認使用服務端時區。 |
Database | string | 否 | 表模型適用,用于設置會話默認數據庫。 |