Skip to content

Commit

Permalink
Merge pull request #423 from atlanhq/FT-783
Browse files Browse the repository at this point in the history
FT-783: Changed fields to optional to ensure capture results without any Pydantic `ValidationError`
  • Loading branch information
Aryamanz29 authored Nov 20, 2024
2 parents 48322a9 + cbddde0 commit 340fa21
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 99 deletions.
90 changes: 64 additions & 26 deletions pyatlan/client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
WorkflowSearchResult,
WorkflowSearchResultDetail,
)
from pyatlan.utils import validate_type

MONITOR_SLEEP_SECONDS = 5

Expand Down Expand Up @@ -98,7 +99,7 @@ def find_by_type(
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []
return response.hits and response.hits.hits or []

@validate_arguments
def find_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
Expand All @@ -124,7 +125,7 @@ def find_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return results[0] if (results := response.hits.hits) else None
return results[0] if (results := response.hits and response.hits.hits) else None

@validate_arguments
def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
Expand All @@ -145,7 +146,7 @@ def find_run_by_id(self, id: str) -> Optional[WorkflowSearchResult]:
]
)
response = self._find_runs(query, size=1)
return results[0] if (results := response.hits.hits) else None
return results[0] if (results := response.hits and response.hits.hits) else None

@validate_arguments
def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
Expand All @@ -167,7 +168,7 @@ def _find_latest_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]
]
)
response = self._find_runs(query, size=1)
return results[0] if (results := response.hits.hits) else None
return results[0] if (results := response.hits and response.hits.hits) else None

@validate_arguments
def _find_current_run(self, workflow_name: str) -> Optional[WorkflowSearchResult]:
Expand All @@ -191,7 +192,7 @@ def _find_current_run(self, workflow_name: str) -> Optional[WorkflowSearchResult
]
)
response = self._find_runs(query, size=50)
if results := response.hits.hits:
if results := response.hits and response.hits.hits:
for result in results:
if result.status in {
AtlanWorkflowPhase.PENDING,
Expand Down Expand Up @@ -230,7 +231,7 @@ def _add_schedule(
"""
Adds required schedule parameters to the workflow object.
"""
workflow.metadata.annotations and workflow.metadata.annotations.update(
workflow.metadata and workflow.metadata.annotations and workflow.metadata.annotations.update(
{
self._WORKFLOW_RUN_SCHEDULE: workflow_schedule.cron_schedule,
self._WORKFLOW_RUN_TIMEZONE: workflow_schedule.timezone,
Expand Down Expand Up @@ -266,7 +267,6 @@ def rerun(
self, workflow: WorkflowSearchResult, idempotent: bool = False
) -> WorkflowRunResponse: ...

@validate_arguments
def rerun(
self,
workflow: Union[
Expand All @@ -285,25 +285,37 @@ def rerun(
:raises InvalidRequestException: If no prior runs are available for the provided workflow
:raises AtlanError: on any API communication issue
"""
validate_type(
name="workflow",
_type=(WorkflowPackage, WorkflowSearchResultDetail, WorkflowSearchResult),
value=workflow,
)
request = None
detail = self._handle_workflow_types(workflow)
if idempotent and detail.metadata.name:
if idempotent and detail and detail.metadata and detail.metadata.name:
# Introducing a delay before checking the current workflow run
# since it takes some time to start or stop
sleep(10)
if (
current_run_details := self._find_current_run(
workflow_name=detail.metadata.name
(
current_run_details := self._find_current_run(
workflow_name=detail.metadata.name
)
)
) and current_run_details.source.status:
and current_run_details.source
and current_run_details.source.metadata
and current_run_details.source.spec
and current_run_details.source.status
):
return WorkflowRunResponse(
metadata=current_run_details.source.metadata,
spec=current_run_details.source.spec,
status=current_run_details.source.status,
)

request = ReRunRequest(
namespace=detail.metadata.namespace, resource_name=detail.metadata.name
)
if detail and detail.metadata:
request = ReRunRequest(
namespace=detail.metadata.namespace, resource_name=detail.metadata.name
)
raw_json = self._client._call_api(
WORKFLOW_RERUN,
request_obj=request,
Expand Down Expand Up @@ -350,7 +362,9 @@ def update(self, workflow: Workflow) -> WorkflowResponse:
:raises AtlanError: on any API communication issue.
"""
raw_json = self._client._call_api(
WORKFLOW_UPDATE.format_path({"workflow_name": workflow.metadata.name}),
WORKFLOW_UPDATE.format_path(
{"workflow_name": workflow.metadata and workflow.metadata.name}
),
request_obj=workflow,
)
return WorkflowResponse(**raw_json)
Expand Down Expand Up @@ -439,7 +453,7 @@ def get_runs(
filter=[Term(field="status.phase.keyword", value=workflow_phase.value)],
)
response = self._find_runs(query, from_=from_, size=size)
return results if (results := response.hits.hits) else None
return results if (results := response.hits and response.hits.hits) else None

@validate_arguments
def stop(
Expand Down Expand Up @@ -495,7 +509,6 @@ def add_schedule(
self, workflow: WorkflowSearchResultDetail, workflow_schedule: WorkflowSchedule
) -> WorkflowResponse: ...

@validate_arguments
def add_schedule(
self,
workflow: Union[
Expand All @@ -517,12 +530,24 @@ def add_schedule(
:returns: a scheduled workflow.
:raises AtlanError: on any API communication issue.
"""

validate_type(
name="workflow",
_type=(
WorkflowResponse,
WorkflowPackage,
WorkflowSearchResult,
WorkflowSearchResultDetail,
),
value=workflow,
)
workflow_to_update = self._handle_workflow_types(workflow)
self._add_schedule(workflow_to_update, workflow_schedule)
raw_json = self._client._call_api(
WORKFLOW_UPDATE.format_path(
{"workflow_name": workflow_to_update.metadata.name}
{
"workflow_name": workflow_to_update.metadata
and workflow_to_update.metadata.name
}
),
request_obj=workflow_to_update,
)
Expand All @@ -542,7 +567,6 @@ def remove_schedule(
self, workflow: WorkflowSearchResultDetail
) -> WorkflowResponse: ...

@validate_arguments
def remove_schedule(
self,
workflow: Union[
Expand All @@ -559,13 +583,27 @@ def remove_schedule(
:returns: a workflow.
:raises AtlanError: on any API communication issue.
"""
workflow_to_update = self._handle_workflow_types(workflow)
workflow_to_update.metadata.annotations and workflow_to_update.metadata.annotations.pop(
self._WORKFLOW_RUN_SCHEDULE, None
validate_type(
name="workflow",
_type=(
WorkflowResponse,
WorkflowPackage,
WorkflowSearchResult,
WorkflowSearchResultDetail,
),
value=workflow,
)
workflow_to_update = self._handle_workflow_types(workflow)
if workflow_to_update.metadata and workflow_to_update.metadata.annotations:
workflow_to_update.metadata.annotations.pop(
self._WORKFLOW_RUN_SCHEDULE, None
)
raw_json = self._client._call_api(
WORKFLOW_UPDATE.format_path(
{"workflow_name": workflow_to_update.metadata.name}
{
"workflow_name": workflow_to_update.metadata
and workflow_to_update.metadata.name
}
),
request_obj=workflow_to_update,
)
Expand Down Expand Up @@ -632,7 +670,7 @@ def find_schedule_query(
request_obj=request,
)
response = WorkflowSearchResponse(**raw_json)
return response.hits.hits or []
return response.hits and response.hits.hits or []

@validate_arguments
def re_run_schedule_query(self, schedule_query_id: str) -> WorkflowRunResponse:
Expand Down
79 changes: 41 additions & 38 deletions pyatlan/model/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@


class PackageParameter(AtlanObject):
parameter: str
type: str
body: Dict[str, Any]
parameter: Optional[str] = Field(default=None)
type: Optional[str] = Field(default=None)
body: Optional[Dict[str, Any]] = Field(default=None)


class WorkflowMetadata(AtlanObject):
Expand All @@ -29,36 +29,36 @@ class WorkflowMetadata(AtlanObject):


class WorkflowTemplateRef(AtlanObject):
name: str
template: str
cluster_scope: bool
name: Optional[str] = Field(default=None)
template: Optional[str] = Field(default=None)
cluster_scope: Optional[bool] = Field(default=None)


class NameValuePair(AtlanObject):
name: str
value: Any
name: Optional[str] = Field(default=None)
value: Optional[Any] = Field(default=None)


class WorkflowParameters(AtlanObject):
parameters: List[NameValuePair]
parameters: Optional[List[NameValuePair]] = Field(default=None)


class WorkflowTask(AtlanObject):
name: str
arguments: WorkflowParameters
template_ref: WorkflowTemplateRef
name: Optional[str] = Field(default=None)
arguments: Optional[WorkflowParameters] = Field(default=None)
template_ref: Optional[WorkflowTemplateRef] = Field(default=None)


class WorkflowDAG(AtlanObject):
tasks: List[WorkflowTask]
tasks: Optional[List[WorkflowTask]] = Field(default=None)


class WorkflowTemplate(AtlanObject):
name: str
name: Optional[str] = Field(default=None)
inputs: Any = Field(default=None)
outputs: Any = Field(default=None)
metadata: Any = Field(default=None)
dag: WorkflowDAG
dag: Optional[WorkflowDAG] = Field(default=None)


class WorkflowSpec(AtlanObject):
Expand All @@ -70,8 +70,8 @@ class WorkflowSpec(AtlanObject):


class Workflow(AtlanObject):
metadata: WorkflowMetadata
spec: WorkflowSpec
metadata: Optional[WorkflowMetadata] = Field(default=None)
spec: Optional[WorkflowSpec] = Field(default=None)
payload: List[PackageParameter] = Field(default_factory=list)


Expand All @@ -97,21 +97,21 @@ class WorkflowSearchResultStatus(AtlanObject):


class WorkflowSearchResultDetail(AtlanObject):
api_version: str
kind: str
metadata: WorkflowMetadata
spec: WorkflowSpec
api_version: Optional[str] = Field(default=None)
kind: Optional[str] = Field(default=None)
metadata: Optional[WorkflowMetadata] = Field(default=None)
spec: Optional[WorkflowSpec] = Field(default=None)
status: Optional[WorkflowSearchResultStatus] = Field(default=None)


class WorkflowSearchResult(AtlanObject):
index: str = Field(alias="_index")
type: str = Field(alias="_type")
id: str = Field(alias="_id")
seq_no: Any = Field(alias="_seq_no")
primary_term: Any = Field(alias="_primary_term")
sort: List[Any]
source: WorkflowSearchResultDetail = Field(alias="_source")
index: Optional[str] = Field(default=None, alias="_index")
type: Optional[str] = Field(default=None, alias="_type")
id: Optional[str] = Field(default=None, alias="_id")
seq_no: Optional[Any] = Field(default=None, alias="_seq_no")
primary_term: Optional[Any] = Field(default=None, alias="_primary_term")
sort: Optional[List[Any]] = Field(default=None)
source: Optional[WorkflowSearchResultDetail] = Field(default=None, alias="_source")

@property
def status(self) -> Optional[AtlanWorkflowPhase]:
Expand All @@ -121,38 +121,41 @@ def status(self) -> Optional[AtlanWorkflowPhase]:
return None

def to_workflow(self) -> Workflow:
return Workflow(spec=self.source.spec, metadata=self.source.metadata)
return Workflow(
spec=self.source and self.source.spec or None,
metadata=self.source and self.source.metadata or None,
)


class WorkflowSearchHits(AtlanObject):
total: Dict[str, Any]
total: Optional[Dict[str, Any]] = Field(default=None)
hits: Optional[List[WorkflowSearchResult]] = Field(default=None)


class WorkflowSearchResponse(AtlanObject):
took: Optional[int] = Field(default=None)
hits: WorkflowSearchHits
shards: Dict[str, Any] = Field(alias="_shards")
hits: Optional[WorkflowSearchHits] = Field(default=None)
shards: Optional[Dict[str, Any]] = Field(alias="_shards", default=None)


class ReRunRequest(AtlanObject):
namespace: str = "default"
resource_kind: str = "WorkflowTemplate"
resource_name: str
namespace: Optional[str] = Field(default="default")
resource_kind: Optional[str] = Field(default="WorkflowTemplate")
resource_name: Optional[str] = Field(default=None)

def __init__(__pydantic_self__, **data: Any) -> None:
super().__init__(**data)
__pydantic_self__.__fields_set__.update(["resource_kind"])


class WorkflowResponse(AtlanObject):
metadata: WorkflowMetadata
spec: WorkflowSpec
metadata: Optional[WorkflowMetadata] = Field(default=None)
spec: Optional[WorkflowSpec] = Field(default=None)
payload: Optional[List[Any]] = Field(default_factory=list)


class WorkflowRunResponse(WorkflowResponse):
status: WorkflowSearchResultStatus
status: Optional[WorkflowSearchResultStatus] = Field(default=None)


class ScheduleQueriesSearchRequest(AtlanObject):
Expand Down
6 changes: 3 additions & 3 deletions pyatlan/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ def validate_type(name: str, _type, value):
return

type_name = (
", ".join(t.__name__ for t in _type)
if isinstance(_type, tuple)
else _type.__name__
", ".join(t.__name__ for t in _type[:-1]) + f" or {_type[-1].__name__}"
if isinstance(_type, tuple) and len(_type) > 1
else _type.__name__ if isinstance(_type, tuple) else _type.__name__ # type: ignore[attr-defined]
)

raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(name, type_name)
Expand Down
Loading

0 comments on commit 340fa21

Please sign in to comment.