Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add type checks for Authorization with new example. #682

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions examples/authorizations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import os

from influxdb_client import InfluxDBClient, BucketRetentionRules, PermissionResource, Permission, Authorization, \
WriteOptions
from influxdb_client.client.write_api import WriteType
from influxdb_client.rest import ApiException

HOST_URL = os.environ.get("INFLUX_HOST") if os.environ.get("INFLUX_HOST") is not None else "http://localhost:8086"
TOKEN = os.environ.get("INFLUX_TOKEN") if os.environ.get("INFLUX_TOKEN") is not None else "my-token"
ORG = os.environ.get("INFLUX_ORG") if os.environ.get("INFLUX_ORG") is not None else "my-org"
SYS_BUCKET = os.environ.get("INFLUX_DB") if os.environ.get("INFLUX_DB") is not None else "my-bucket"
BUCKET = "special-bucket"


def create_auths():
# Create authorizations with an initial client using all-access permissions
with InfluxDBClient(url=HOST_URL, token=TOKEN, org=ORG, debug=False) as globalClient:
bucket_rules = BucketRetentionRules(type="expire", every_seconds=3600)
bucket = globalClient.buckets_api().create_bucket(bucket_name=BUCKET,
retention_rules=bucket_rules,
org=ORG)

bucket_permission_resource_r = PermissionResource(org=ORG,
org_id=bucket.org_id,
type="buckets",
id=bucket.id)
bucket_permission_resource_w = PermissionResource(org=ORG,
org_id=bucket.org_id,
type="buckets",
id=bucket.id)
read_bucket = Permission(action="read", resource=bucket_permission_resource_r)
write_bucket = Permission(action="write", resource=bucket_permission_resource_w)
permissions = [read_bucket, write_bucket]
auth_payload = Authorization(org_id=bucket.org_id,
permissions=permissions,
description="Shared bucket auth from Authorization object",
id="auth1_base")
auth_api = globalClient.authorizations_api()
# use keyword arguments
auth1 = auth_api.create_authorization(authorization=auth_payload)
# or use positional arguments
auth2 = auth_api.create_authorization(bucket.org_id, permissions)

return auth1, auth2


def try_sys_bucket(client):
print("starting to write")

w_api = client.write_api(write_options=WriteOptions(write_type=WriteType.synchronous))
try:
w_api.write(bucket=SYS_BUCKET, record="cpu,host=r2d2 use=3.14")
except ApiException as ae:
print(f"Write to {SYS_BUCKET} failed (as expected) due to:")
print(ae)


def try_restricted_bucket(client):
print("starting to write")
w_api = client.write_api(write_options=WriteOptions(write_type=WriteType.synchronous))

w_api.write(bucket=BUCKET, record="cpu,host=r2d2 usage=3.14")
print("written")
print("now query")
q_api = client.query_api()
query = f'''
from(bucket:"{BUCKET}")
|> range(start: -5m)
|> filter(fn: (r) => r["_measurement"] == "cpu")'''

tables = q_api.query(query=query, org=ORG)
for table in tables:
for record in table.records:
print(record["_time"].isoformat(sep="T") + " | " + record["host"] + " | " + record["_field"] + "=" + str(record["_value"]))


def main():
"""
a1 is generated using a local Authorization instance
a2 is generated using local permissions and an internally created Authorization
:return: void
"""
print("=== Setting up authorizations ===")
a1, a2 = create_auths()

print("=== Using a1 authorization ===")
client1 = InfluxDBClient(url=HOST_URL, token=a1.token, org=ORG, debug=False)
print(" --- Try System Bucket ---")
try_sys_bucket(client1)
print(" --- Try Special Bucket ---")
try_restricted_bucket(client1)
print()

print("=== Using a2 authorization ===")
client2 = InfluxDBClient(url=HOST_URL, token=a2.token, org=ORG, debug=False)
print(" --- Try System Bucket ---")
try_sys_bucket(client2)
print(" --- Try Special Bucket ---")
try_restricted_bucket(client2)


if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion influxdb_client/client/authorizations_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, influxdb_client):
self._influxdb_client = influxdb_client
self._authorizations_service = AuthorizationsService(influxdb_client.api_client)

def create_authorization(self, org_id=None, permissions: list = None,
def create_authorization(self, org_id: str = None, permissions: list = None,
authorization: Authorization = None) -> Authorization:
"""
Create an authorization.
Expand All @@ -23,6 +23,8 @@ def create_authorization(self, org_id=None, permissions: list = None,

"""
if authorization is not None:
if not isinstance(authorization, Authorization):
raise TypeError(f"Attempt to use non-Authorization value for authorization: {authorization}")
return self._authorizations_service.post_authorizations(authorization_post_request=authorization)

# if org_id is not None and permissions is not None:
Expand Down
4 changes: 4 additions & 0 deletions influxdb_client/domain/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ def __init__(self, created_at=None, updated_at=None, org_id=None, permissions=No
if updated_at is not None:
self.updated_at = updated_at
if org_id is not None:
if not isinstance(org_id, str):
raise TypeError("org_id must be a string.")
self.org_id = org_id
if permissions is not None:
if not isinstance(permissions, list):
raise TypeError("permissions must be a list.")
self.permissions = permissions
if id is not None:
self.id = id
Expand Down
19 changes: 19 additions & 0 deletions tests/test_AuthorizationApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ def test_createAuthorization(self):

self.assertEqual(authorization.links["user"], "/api/v2/users/" + self.user.id)

def test_AuthorizationTypeAssert(self):
self.assertRaisesRegex(TypeError, "org_id must be a string.", Authorization, org_id={})
self.assertRaisesRegex(TypeError, "permissions must be a list.", Authorization, permissions={})

def test_createAuthorizationWrongTypes(self):
user_resource = PermissionResource(org_id=self.organization.id, type="users")
read_users = Permission(action="read", resource=user_resource)

org_resource = PermissionResource(org_id=self.organization.id, type="orgs")
write_organizations = Permission(action="write", resource=org_resource)

permissions = [read_users, write_organizations]
self.assertRaisesRegex(TypeError, "org_id must be a string.",
self.authorizations_api.create_authorization, permissions)
self.assertRaisesRegex(TypeError, "permissions must be a list",
self.authorizations_api.create_authorization, "123456789ABCDEF0", "Foo")
self.assertRaisesRegex(TypeError, "Attempt to use non-Authorization value for authorization: Foo",
self.authorizations_api.create_authorization, "123456789ABCDEF0", permissions, "Foo")

def test_authorizationDescription(self):
organization = self.my_organization

Expand Down
Loading