Skip to content

Commit

Permalink
Record methods
Browse files Browse the repository at this point in the history
  • Loading branch information
infeeeee committed Dec 31, 2024
1 parent e7713df commit 2227e27
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 70 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/unittests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name: Unit tests
on:
# push:
# branches: ["main"]
pull_request_target:
types: [labeled]
pull_request:
branches: ["main"]
workflow_dispatch:

permissions:
Expand All @@ -13,7 +13,6 @@ permissions:
jobs:
tests:
name: Unit tests on Cloud
if: contains(github.event.pull_request.labels.*.name, 'safe to test')
strategy:
max-parallel: 1
matrix:
Expand Down
1 change: 1 addition & 0 deletions nocodb/Column.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_id_metadata() -> list[dict]:
"ai": True,
"dtx": "integer",
"dtxp": "11",
"system": True
},
{
"title": "Title",
Expand Down
43 changes: 37 additions & 6 deletions nocodb/Record.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from pathlib import Path

from nocodb.Column import Column

Expand Down Expand Up @@ -57,23 +58,53 @@ def get_linked_records(self, column: Column) -> list[Record]:
record_ids = [r.json()["Id"]]

linked_table = self.noco_db.get_table(column.linked_table_id)
return [linked_table.get_record(i) for i in record_ids]
return linked_table.get_records_by_id(record_ids)

def get_value(self, field: str) -> Any:
try:
return self.metadata[field]
except KeyError:
raise Exception(f"Value for {field} not found!")
return self.get_values([field])[field]

def get_column_value(self, column: Column) -> Any:
return self.get_value(column.title)

def get_values(self, fields: list[str] | None = None, include_system: bool = True) -> dict:
if not include_system:
cols = [c.title for c in self.table.get_columns(include_system)]
if fields:
fields = [f for f in fields if f in cols]
else:
fields = cols

field_str = ",".join(fields) if fields else ""
r = self.noco_db.call_noco(
path=f"tables/{self.table.table_id}/records/{self.record_id}",
params={"fields": field_str}
)
return r.json()

def get_attachments(self, field: str, encoding: str = "utf-8") -> list[str]:
value_list = self.get_value(field)
if not isinstance(value_list, list):
raise Exception("Invalid field value")

return [
self.noco_db.get_file(p["signedPath"], encoding=encoding)
self.noco_db.get_file(p["signedUrl"], encoding=encoding)
for p in value_list
]

def update(self, **kwargs) -> Record:
kwargs["Id"] = self.record_id
r = self.noco_db.call_noco(
path=f"tables/{self.table.table_id}/records",
method="PATCH",
json=kwargs,
)
return self.table.get_record(record_id=r.json()["Id"])

def upload_attachment(
self, field: str, filepath: Path, mimetype: str = ""
) -> Record:
value = self.get_value(field=field) or []
value.append(self.noco_db.upload_file(
filepath=filepath, mimetype=mimetype))

return self.update(**{field: value})
101 changes: 59 additions & 42 deletions nocodb/Table.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,53 @@ def get_basic_metadata(self) -> dict:
return {k: v for k, v in m.items() if not k in extra_keys}

def get_number_of_records(self) -> int:
r = self.noco_db.call_noco(path=f"tables/{self.table_id}/records/count")
r = self.noco_db.call_noco(
path=f"tables/{self.table_id}/records/count")
return r.json()["count"]

def get_base(self) -> Base:
return self.noco_db.get_base(self.base_id)

def duplicate(self, exclude_data: bool = True, exclude_views: bool = True) -> None:
r = self.noco_db.call_noco(
path=f"meta/duplicate/{self.base_id}/table/{self.table_id}",
method="POST",
json={"excludeData": exclude_data, "excludeViews": exclude_views},
)
_logger.info(f"Table {self.title} duplicated")
return

# Bug in noco API, wrong Id response

def get_duplicates(self) -> list["Table"]:
duplicates = {}
for t in self.get_base().get_tables():
if re.match(f"^{self.title} copy(_\\d+)?$", t.title):
nr = re.findall("_(\\d+)", t.title)
if nr:
duplicates[int(nr[0])] = t
else:
duplicates[0] = t

return list(dict(sorted(duplicates.items(), reverse=True)).values())

def delete(self) -> bool:
r = self.noco_db.call_noco(
path=f"meta/tables/{self.table_id}", method="DELETE")
_logger.info(f"Table {self.title} deleted")
return r.json()

def get_columns(self, include_system: bool = False) -> list[Column]:
r = self.noco_db.call_noco(path=f"meta/tables/{self.table_id}")
cols = [Column(noco_db=self.noco_db, **f) for f in r.json()["columns"]]
if include_system:
return cols
else:
return [c for c in cols if not c.system and not c.primary_key]
return [c for c in cols if not c.system]

def get_columns_hash(self) -> str:
r = self.noco_db.call_noco(path=f"meta/tables/{self.table_id}/columns/hash")
r = self.noco_db.call_noco(
path=f"meta/tables/{self.table_id}/columns/hash")
return r.json()["hash"]

def get_column_by_title(self, title: str) -> Column:
Expand All @@ -69,34 +103,6 @@ def create_column(
)
return self.get_column_by_title(title=title)

def duplicate(self, exclude_data: bool = True, exclude_views: bool = True) -> None:
r = self.noco_db.call_noco(
path=f"meta/duplicate/{self.base_id}/table/{self.table_id}",
method="POST",
json={"excludeData": exclude_data, "excludeViews": exclude_views},
)
_logger.info(f"Table {self.title} duplicated")
return

# Bug in noco API, wrong Id response

def get_duplicates(self) -> list["Table"]:
duplicates = {}
for t in self.get_base().get_tables():
if re.match(f"^{self.title} copy(_\\d+)?$", t.title):
nr = re.findall("_(\\d+)", t.title)
if nr:
duplicates[int(nr[0])] = t
else:
duplicates[0] = t

return list(dict(sorted(duplicates.items(), reverse=True)).values())

def delete(self) -> bool:
r = self.noco_db.call_noco(path=f"meta/tables/{self.table_id}", method="DELETE")
_logger.info(f"Table {self.title} deleted")
return r.json()

def get_records(self, params: dict | None = None) -> list[Record]:
params = params or {}

Expand Down Expand Up @@ -128,9 +134,14 @@ def get_records(self, params: dict | None = None) -> list[Record]:
return records

def get_record(self, record_id: int) -> Record:
r = self.noco_db.call_noco(path=f"tables/{self.table_id}/records/{record_id}")
r = self.noco_db.call_noco(
path=f"tables/{self.table_id}/records/{record_id}")
return Record(self, **r.json())

def get_records_by_id(self, record_ids: list[int]) -> list[Record]:
ids_string = ",".join(map(str, record_ids))
return self.get_records(params={"where": f"(Id,in,{ids_string})"})

def get_records_by_field_value(self, field: str, value) -> list[Record]:
return self.get_records(params={"where": f"({field},eq,{value})"})

Expand All @@ -144,26 +155,32 @@ def create_records(self, records: list[dict]) -> list[Record]:
r = self.noco_db.call_noco(
path=f"tables/{self.table_id}/records", method="POST", json=records
)
ids_string = ",".join([str(d["Id"]) for d in r.json()])
return self.get_records(params={"where": f"(Id,in,{ids_string})"})

def get_base(self) -> Base:
return self.noco_db.get_base(self.base_id)
return self.get_records_by_id([r_id["Id"] for r_id in r.json()])

def delete_record(self, record_id: int) -> bool:
def delete_record(self, record_id: int) -> int:
r = self.noco_db.call_noco(
path=f"tables/{self.table_id}/records",
method="DELETE",
json={"Id": record_id},
)
return r.json()["Id"]

return r.json()
def delete_records_by_id(self, record_ids: list[int]) -> list[int]:
r = self.noco_db.call_noco(
path=f"tables/{self.table_id}/records",
method="DELETE",
json=[{"Id": r_id} for r_id in record_ids]
)
return [r_id["Id"] for r_id in r.json()]

def delete_records(self, records: list[Record]) -> list[int]:
return self.delete_records_by_id([rec.record_id for rec in records])

def update_record(self, **kwargs) -> None:
def update_records(self, records: list[dict]) -> list[Record]:
r = self.noco_db.call_noco(
path=f"tables/{self.table_id}/records",
method="PATCH",
json=kwargs,
json=records,
)

return r.json()
return self.get_records_by_id([r_id["Id"] for r_id in r.json()])
54 changes: 35 additions & 19 deletions nocodb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations
import requests
from urllib.parse import urlsplit, urljoin
from pathlib import Path
import mimetypes

from nocodb.Base import Base
from nocodb.Column import Column
Expand Down Expand Up @@ -34,25 +36,6 @@ def _get_base_url(url) -> str:
url = url[:i]
return urlsplit(url).geturl() + "/"

def get_file(self, path, encoding: str = "utf-8") -> str:
"""Get a file from the noco server
Args:
path (_type_): _description_
encoding (str, optional): Encoding of the response. Defaults to "utf-8".
Returns:
str: _description_
About encoding: https://requests.readthedocs.io/en/latest/user/quickstart/#response-content
"""
headers = {"xc-token": self.api_key}
url = urljoin(self.base_url, path)
r = requests.get(url=url, headers=headers)
r.encoding = encoding

return r.text

def call_noco(self, path: str, method: str = "GET", **kwargs) -> requests.Response:
headers = {"xc-token": self.api_key}
url = urljoin(self.api_url, path)
Expand All @@ -70,6 +53,39 @@ def call_noco(self, path: str, method: str = "GET", **kwargs) -> requests.Respon

return r

def get_file(self, path, encoding: str = "utf-8") -> str:
"""Get a file from the noco server
Args:
path (_type_): _description_
encoding (str, optional): Encoding of the response. Defaults to "utf-8".
Returns:
str: _description_
About encoding: https://requests.readthedocs.io/en/latest/user/quickstart/#response-content
"""

r = self.call_noco(path=path)
r.encoding = encoding

return r.text

def upload_file(self, filepath: Path, mimetype: str = "") -> dict:
if not mimetype:
mt = mimetypes.guess_type(filepath, strict=False)
if mt and mt[0]:
mimetype = mt[0]
else:
mimetype = "text/plain"

r = self.call_noco(
path="storage/upload",
method="POST",
files={"file": (filepath.name, open(filepath, "rb"), mimetype)},
)
return r.json()[0]

def get_bases(self) -> list[Base]:
r = self.call_noco(path="meta/bases")
return [Base(noco_db=self, **f) for f in r.json()["list"]]
Expand Down

0 comments on commit 2227e27

Please sign in to comment.