InfluxDB 2.0 python client
Contents:
- User Guide
- API Reference
- Async API Reference
- Migration Guide
- Development
This repository contains the Python client library for the InfluxDB 2.0.
Note: Use this client library with InfluxDB 2.x and InfluxDB 1.8+. For connecting to InfluxDB 1.7 or earlier instances, use the influxdb-python client library. The API of the influxdb-client-python is not the backwards-compatible with the old one - influxdb-python.
Documentation
This section contains links to the client library documentation.
InfluxDB 2.0 client features
- Querying data
using the Flux language
into csv, raw data, flux_table structure, Pandas DataFrame
- Writing data using
RxPY Observable
- InfluxDB 2.0 API client for management
the client is generated from the swagger by using the openapi-generator
organizations & users management
buckets management
tasks management
authorizations
health check
…
Installation
InfluxDB python library uses RxPY - The Reactive Extensions for Python (RxPY).
Python 3.7 or later is required.
Note
It is recommended to use ciso8601 with client for parsing dates. ciso8601 is much faster than built-in Python datetime. Since it’s written as a C module the best way is build it from sources:
Windows:
You have to install Visual C++ Build Tools 2015 to build ciso8601 by pip.
conda:
Install from sources: conda install -c conda-forge/label/cf202003 ciso8601.
pip install
The python package is hosted on PyPI, you can install latest version directly:
pip install 'influxdb-client[ciso]'
Then import the package:
import influxdb_client
If your application uses async/await in Python you can install with the async extra:
$ pip install influxdb-client[async]
For more info see How to use Asyncio.
Setuptools
Install via Setuptools.
python setup.py install --user
(or sudo python setup.py install to install the package for all users)
Getting Started
Please follow the Installation and then run the following:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "my-bucket"
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, record=p)
## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for row in table.records:
print (row.values)
## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
for cell in row:
val_count += 1
Client configuration
Via File
A client can be configured via *.ini file in segment influx2.
The following options are supported:
url- the url to connect to InfluxDBorg- default destination organization for writes and queriestoken- the token to use for the authorizationtimeout- socket timeout in ms (default value is 10000)verify_ssl- set this to false to skip verifying SSL certificate when calling API from https serverssl_ca_cert- set this to customize the certificate file to verify the peercert_file- path to the certificate that will be used for mTLS authenticationcert_key_file- path to the file contains private key for mTLS certificatecert_key_password- string or function which returns password for decrypting the mTLS private keyconnection_pool_maxsize- set the number of connections to save that can be reused by urllib3auth_basic- enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)profilers- set the list of enabled Flux profilers
self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
verify_ssl=False
Via Environment Properties
A client can be configured via environment properties.
Supported properties are:
INFLUXDB_V2_URL- the url to connect to InfluxDBINFLUXDB_V2_ORG- default destination organization for writes and queriesINFLUXDB_V2_TOKEN- the token to use for the authorizationINFLUXDB_V2_TIMEOUT- socket timeout in ms (default value is 10000)INFLUXDB_V2_VERIFY_SSL- set this to false to skip verifying SSL certificate when calling API from https serverINFLUXDB_V2_SSL_CA_CERT- set this to customize the certificate file to verify the peerINFLUXDB_V2_CERT_FILE- path to the certificate that will be used for mTLS authenticationINFLUXDB_V2_CERT_KEY_FILE- path to the file contains private key for mTLS certificateINFLUXDB_V2_CERT_KEY_PASSWORD- string or function which returns password for decrypting the mTLS private keyINFLUXDB_V2_CONNECTION_POOL_MAXSIZE- set the number of connections to save that can be reused by urllib3INFLUXDB_V2_AUTH_BASIC- enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)INFLUXDB_V2_PROFILERS- set the list of enabled Flux profilers
self.client = InfluxDBClient.from_env_properties()
Profile query
The Flux Profiler package provides performance profiling tools for Flux queries and operations.
You can enable printing profiler information of the Flux query in client library by:
set QueryOptions.profilers in QueryApi,
set
INFLUXDB_V2_PROFILERSenvironment variable,set
profilersoption in configuration file.
When the profiler is enabled, the result of flux query contains additional tables “profiler/*”.
In order to have consistent behaviour with enabled/disabled profiler, FluxCSVParser excludes “profiler/*” measurements
from result.
Example how to enable profilers using API:
q = '''
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
"stringParam": "my-bucket",
}
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)
Example of a profiler output:
===============
Profiler: query
===============
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
========================
Profiler: profiler/query
========================
result : _profiler
table : 0
_measurement : profiler/query
TotalDuration : 8924700
CompileDuration : 350900
QueueDuration : 33800
PlanDuration : 0
RequeueDuration : 0
ExecuteDuration : 8486500
Concurrency : 0
MaxAllocated : 2072
TotalAllocated : 0
flux/query-plan :
digraph {
ReadWindowAggregateByTime11
// every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
pivot8
generated_yield
ReadWindowAggregateByTime11 -> pivot8
pivot8 -> generated_yield
}
influxdb/scanned-bytes: 0
influxdb/scanned-values: 0
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *universe.pivotTransformation
Label : pivot8
Count : 3
MinDuration : 32600
MaxDuration : 126200
DurationSum : 193400
MeanDuration : 64466.666666666664
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *influxdb.readWindowAggregateSource
Label : ReadWindowAggregateByTime11
Count : 1
MinDuration : 940500
MaxDuration : 940500
DurationSum : 940500
MeanDuration : 940500.0
You can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.
Example how to use profilers with callback:
class ProfilersCallback(object):
def __init__(self):
self.records = []
def __call__(self, flux_record):
self.records.append(flux_record.values)
callback = ProfilersCallback()
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
Example output of this callback:
Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}