diff --git a/examples/task_example.py b/examples/task_example.py index 55595ba9..242dcf4d 100644 --- a/examples/task_example.py +++ b/examples/task_example.py @@ -25,3 +25,9 @@ task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active") task = tasks_api.create_task(task_create_request=task_request) print(task) + + tasks = tasks_api.find_tasks_iter() + + # print all tasks id + for task in tasks: + print(task.id) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index dd85683b..691604e9 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -10,6 +10,24 @@ from influxdb_client import TasksService, Task, TaskCreateRequest, TaskUpdateRequest, LabelResponse, LabelMapping, \ AddResourceMemberRequestBody, RunManually, Run, LogEvent +class TasksIterator: + def __init__(self, values, next) -> None: + self.values = values + self.next = next + self.no_values = False if values else True + + def __iter__(self): + return self + + def __next__(self): + if self.no_values: + raise StopIteration + if not self.values: + self.values, self.next = self.next() + if not self.values: + raise StopIteration + return self.values.pop(0) + class TasksApi(object): """Implementation for '/api/v2/tasks' endpoint.""" @@ -25,7 +43,7 @@ def find_task_by_id(self, task_id) -> Task: return task def find_tasks(self, **kwargs): - """List all tasks. + """List all tasks up to set limit (max 500). :key str name: only returns tasks with the specified name :key str after: returns tasks after specified ID @@ -37,6 +55,45 @@ def find_tasks(self, **kwargs): """ return self._service.get_tasks(**kwargs).tasks + def _find_tasks_paged(self, **kwargs): + """List all tasks with ability to list next tasks after limit. + + :key str name: only returns tasks with the specified name + :key str after: returns tasks after specified ID + :key str user: filter tasks to a specific user ID + :key str org: filter tasks to a specific organization name + :key str org_id: filter tasks to a specific organization ID + :key int limit: the number of tasks to return in one page + :return: Tasks, Next + """ + tasks = self._service.get_tasks(**kwargs).tasks + + last_id = tasks[-1].id if tasks else None + def next(): + if last_id is not None: + return self._find_tasks_paged(**{**kwargs, 'after': last_id}) + else: + def func(): + raise Exception("There are no additional pages remaining for tasks.") + return [], func + + return tasks, next + + def find_tasks_iter(self, **kwargs): + """Iterate over all tasks with pagination. + + :key str name: only returns tasks with the specified name + :key str after: returns tasks after specified ID + :key str user: filter tasks to a specific user ID + :key str org: filter tasks to a specific organization name + :key str org_id: filter tasks to a specific organization ID + :key int limit: the number of tasks in one page + :return: Tasks iterator + """ + tasks, next = self._find_tasks_paged(**kwargs) + + return iter(TasksIterator(tasks, next)) + def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" if task_create_request is not None: