Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: async write prec where DEFAULT_PRECISION should not be used #675

Merged
merged 5 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
## 1.47.0 [unreleased]

### Bug Fixes
1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Add type validation to url attribute in client object
1. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py

1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Adding type validation to url attribute in client object
2. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py
3. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION`

## 1.46.0 [2024-09-13]

Expand Down
27 changes: 18 additions & 9 deletions influxdb_client/client/write_api_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS."""
import logging
from asyncio import ensure_future, gather
from collections import defaultdict
from typing import Union, Iterable, NamedTuple

Expand Down Expand Up @@ -114,12 +115,20 @@ async def write(self, bucket: str, org: str = None,
self._append_default_tags(record)

payloads = defaultdict(list)
self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs)

# joint list by \n
body = b'\n'.join(payloads[write_precision])
response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body,
precision=write_precision, async_req=False,
_return_http_data_only=False,
content_type="text/plain; charset=utf-8")
return response[1] in (201, 204)
self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs)

futures = []
for payload_precision, payload_line in payloads.items():
futures.append(ensure_future
(self._write_service.post_write_async(org=org, bucket=bucket,
body=b'\n'.join(payload_line),
precision=payload_precision, async_req=False,
_return_http_data_only=False,
content_type="text/plain; charset=utf-8")))

results = await gather(*futures, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
raise result

return False not in [re[1] in (201, 204) for re in results]
149 changes: 137 additions & 12 deletions tests/test_InfluxDBClientAsync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import dateutil.parser
import logging
import math
import re
import time
import unittest
import os
from datetime import datetime, timezone
from io import StringIO

import pandas
import pytest
import warnings
from aioresponses import aioresponses
Expand Down Expand Up @@ -199,30 +203,151 @@ async def test_write_empty_data(self):

self.assertEqual(True, response)

def gen_fractional_utc(self, nano, precision) -> str:
raw_sec = nano / 1_000_000_000
if precision == WritePrecision.NS:
rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0")
return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc)
.isoformat()
.replace("+00:00", "") + f".{rem}Z")
#f".{rem}Z"))
elif precision == WritePrecision.US:
# rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0")
return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc)
.isoformat()
.replace("+00:00","")
.strip("0") + "Z"
)
elif precision == WritePrecision.MS:
#rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0")
return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc)
.isoformat()
.replace("+00:00","")
.strip("0") + "Z"
)
elif precision == WritePrecision.S:
return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc)
.isoformat()
.replace("+00:00","Z"))
else:
raise ValueError(f"Unknown precision: {precision}")


@async_test
async def test_write_points_different_precision(self):
now_ns = time.time_ns()
now_us = now_ns / 1_000
now_ms = now_us / 1_000
now_s = now_ms / 1_000

now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S)
now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS)
now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US)
now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS)

points = {
WritePrecision.S: [],
WritePrecision.MS: [],
WritePrecision.US: [],
WritePrecision.NS: []
}

expected = {}

measurement = generate_name("measurement")
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \
.time(datetime.fromtimestamp(0, tz=timezone.utc), write_precision=WritePrecision.S)
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \
.time(datetime.fromtimestamp(1, tz=timezone.utc), write_precision=WritePrecision.MS)
_point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \
.time(datetime.fromtimestamp(2, tz=timezone.utc), write_precision=WritePrecision.NS)
await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3],
# basic date-time value
points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \
.time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S))
expected['SecDateTime'] = now_date_s
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \
.time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS))
expected['MilDateTime'] = now_date_ms
points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \
.time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US))
expected['MicDateTime'] = now_date_us
# N.B. datetime does not handle nanoseconds
# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \
# .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS))

# long timestamps based on POSIX time
points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \
.time(round(now_s), write_precision=WritePrecision.S))
expected['SecPosix'] = now_date_s
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \
.time(round(now_ms), write_precision=WritePrecision.MS))
expected['MilPosix'] = now_date_ms
points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \
.time(round(now_us), write_precision=WritePrecision.US))
expected['MicPosix'] = now_date_us
points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \
.time(now_ns, write_precision=WritePrecision.NS))
expected['NanPosix'] = now_date_ns

# ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z"
points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \
.time(now_date_s, write_precision=WritePrecision.S))
expected['SecDTZulu'] = now_date_s
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \
.time(now_date_ms, write_precision=WritePrecision.MS))
expected['MilDTZulu'] = now_date_ms
points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \
.time(now_date_us, write_precision=WritePrecision.US))
expected['MicDTZulu'] = now_date_us
# This keeps resulting in micro second resolution in response
# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \
# .time(now_date_ns, write_precision=WritePrecision.NS))

recs = [x for x in [v for v in points.values()]]

await self.client.write_api().write(bucket="my-bucket", record=recs,
write_precision=WritePrecision.NS)
query = f'''
from(bucket:"my-bucket")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> keep(columns: ["_time"])
|> keep(columns: ["method","_time"])
'''
query_api = self.client.query_api()

# ensure calls fully processed on server
await asyncio.sleep(1)

raw = await query_api.query_raw(query)
self.assertEqual(8, len(raw.splitlines()))
self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4])
self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5])
self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6])
linesRaw = raw.splitlines()[4:]

lines = []
for lnr in linesRaw:
lines.append(lnr[2:].split(","))

def get_time_for_method(lines, method):
for l in lines:
if l[2] == method:
return l[1]
return ""

self.assertEqual(15, len(raw.splitlines()))

for key in expected:
t = get_time_for_method(lines,key)
comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key))
target_time = dateutil.parser.isoparse(expected[key])
self.assertEqual(target_time.date(), comp_time.date())
self.assertEqual(target_time.hour, comp_time.hour)
self.assertEqual(target_time.second,comp_time.second)
dif = abs(target_time.microsecond - comp_time.microsecond)
if key[:3] == "Sec":
# Already tested
pass
elif key[:3] == "Mil":
# may be slight rounding differences
self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}")
elif key[:3] == "Mic":
# may be slight rounding differences
self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}")
elif key[:3] == "Nan":
self.assertEqual(expected[key], get_time_for_method(lines, key))
else:
raise Exception(f"Unhandled key {key}")

@async_test
async def test_delete_api(self):
Expand Down