Skip to content

Commit

Permalink
Merge branch 'main' into FT-892
Browse files Browse the repository at this point in the history
  • Loading branch information
Aryamanz29 authored Jan 6, 2025
2 parents 2110c4f + bb17ccd commit e900993
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 229 deletions.
189 changes: 146 additions & 43 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
AtlanDeleteType,
CertificateStatus,
EntityStatus,
SaveSemantic,
SortOrder,
)
from pyatlan.model.fields.atlan_fields import AtlanField
Expand Down Expand Up @@ -1330,29 +1331,64 @@ def append_terms(
:param qualified_name: the qualified_name of the asset to which to link the terms
:returns: the asset that was updated (note that it will NOT contain details of the appended terms)
"""
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.fluent_search import FluentSearch

client = AtlanClient.get_default_client()
if guid:
if qualified_name:
raise ErrorCode.QN_OR_GUID_NOT_BOTH.exception_with_parameters()
asset = self.get_by_guid(guid=guid, asset_type=asset_type)
results = (
FluentSearch()
.select()
.where(asset_type.GUID.eq(guid))
.execute(client=client)
)
elif qualified_name:
asset = self.get_by_qualified_name(
qualified_name=qualified_name, asset_type=asset_type
results = (
FluentSearch()
.select()
.where(asset_type.QUALIFIED_NAME.eq(qualified_name))
.execute(client=client)
)
else:
raise ErrorCode.QN_OR_GUID.exception_with_parameters()
if not terms:
return asset
replacement_terms: List[AtlasGlossaryTerm] = []
if existing_terms := asset.assigned_terms:
replacement_terms.extend(
term for term in existing_terms if term.relationship_status != "DELETED"
)
replacement_terms.extend(terms)
asset.assigned_terms = replacement_terms
response = self.save(entity=asset)

if results and results.current_page():
first_result = results.current_page()[0]
if not isinstance(first_result, asset_type):
if guid is None:
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
asset_type.__name__, qualified_name
)
else:
raise ErrorCode.ASSET_NOT_TYPE_REQUESTED.exception_with_parameters(
guid, asset_type.__name__
)
else:
if guid is None:
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
qualified_name, asset_type.__name__
)
else:
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
qualified_name = first_result.qualified_name
name = first_result.name
updated_asset = asset_type.updater(qualified_name=qualified_name, name=name)
for i, term in enumerate(terms):
if hasattr(term, "guid") and term.guid:
terms[i] = AtlasGlossaryTerm.ref_by_guid(
guid=term.guid, semantic=SaveSemantic.APPEND
)
elif hasattr(term, "qualified_name") and term.qualified_name:
terms[i] = AtlasGlossaryTerm.ref_by_qualified_name(
qualified_name=term.qualified_name, semantic=SaveSemantic.APPEND
)
updated_asset.assigned_terms = terms
response = self.save(entity=updated_asset)
if assets := response.assets_updated(asset_type=asset_type):
return assets[0]
return asset
return updated_asset

@validate_arguments
def replace_terms(
Expand All @@ -1372,25 +1408,64 @@ def replace_terms(
:param qualified_name: the qualified_name of the asset to which to replace the terms
:returns: the asset that was updated (note that it will NOT contain details of the replaced terms)
"""
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.fluent_search import FluentSearch

client = AtlanClient.get_default_client()
if guid:
if qualified_name:
raise ErrorCode.QN_OR_GUID_NOT_BOTH.exception_with_parameters()
asset = self.get_by_guid(
guid=guid, asset_type=asset_type, ignore_relationships=False
results = (
FluentSearch()
.select()
.where(asset_type.GUID.eq(guid))
.execute(client=client)
)
elif qualified_name:
asset = self.get_by_qualified_name(
qualified_name=qualified_name,
asset_type=asset_type,
ignore_relationships=False,
results = (
FluentSearch()
.select()
.where(asset_type.QUALIFIED_NAME.eq(qualified_name))
.execute(client=client)
)
else:
raise ErrorCode.QN_OR_GUID.exception_with_parameters()
asset.assigned_terms = terms
response = self.save(entity=asset)

if results and results.current_page():
first_result = results.current_page()[0]
if not isinstance(first_result, asset_type):
if guid is None:
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
asset_type.__name__, qualified_name
)
else:
raise ErrorCode.ASSET_NOT_TYPE_REQUESTED.exception_with_parameters(
guid, asset_type.__name__
)
else:
if guid is None:
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
qualified_name, asset_type.__name__
)
else:
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
qualified_name = first_result.qualified_name
name = first_result.name
updated_asset = asset_type.updater(qualified_name=qualified_name, name=name)
for i, term in enumerate(terms):
if hasattr(term, "guid") and term.guid:
terms[i] = AtlasGlossaryTerm.ref_by_guid(
guid=term.guid, semantic=SaveSemantic.REPLACE
)
elif hasattr(term, "qualified_name") and term.qualified_name:
terms[i] = AtlasGlossaryTerm.ref_by_qualified_name(
qualified_name=term.qualified_name, semantic=SaveSemantic.REPLACE
)
updated_asset.assigned_terms = terms
response = self.save(entity=updated_asset)
if assets := response.assets_updated(asset_type=asset_type):
return assets[0]
return asset
return updated_asset

@validate_arguments
def remove_terms(
Expand All @@ -1412,36 +1487,64 @@ def remove_terms(
:param qualified_name: the qualified_name of the asset from which to remove the terms
:returns: the asset that was updated (note that it will NOT contain details of the resulting terms)
"""
if not terms:
raise ErrorCode.MISSING_TERMS.exception_with_parameters()
from pyatlan.client.atlan import AtlanClient
from pyatlan.model.fluent_search import FluentSearch

client = AtlanClient.get_default_client()
if guid:
if qualified_name:
raise ErrorCode.QN_OR_GUID_NOT_BOTH.exception_with_parameters()
asset = self.get_by_guid(
guid=guid, asset_type=asset_type, ignore_relationships=False
results = (
FluentSearch()
.select()
.where(asset_type.GUID.eq(guid))
.execute(client=client)
)
elif qualified_name:
asset = self.get_by_qualified_name(
qualified_name=qualified_name,
asset_type=asset_type,
ignore_relationships=False,
results = (
FluentSearch()
.select()
.where(asset_type.QUALIFIED_NAME.eq(qualified_name))
.execute(client=client)
)
else:
raise ErrorCode.QN_OR_GUID.exception_with_parameters()
replacement_terms: List[AtlasGlossaryTerm] = []
guids_to_be_removed = {t.guid for t in terms}
if existing_terms := asset.assigned_terms:
replacement_terms.extend(
term
for term in existing_terms
if term.relationship_status != "DELETED"
and term.guid not in guids_to_be_removed
)
asset.assigned_terms = replacement_terms
response = self.save(entity=asset)

if results and results.current_page():
first_result = results.current_page()[0]
if not isinstance(first_result, asset_type):
if guid is None:
raise ErrorCode.ASSET_NOT_FOUND_BY_NAME.exception_with_parameters(
asset_type.__name__, qualified_name
)
else:
raise ErrorCode.ASSET_NOT_TYPE_REQUESTED.exception_with_parameters(
guid, asset_type.__name__
)
else:
if guid is None:
raise ErrorCode.ASSET_NOT_FOUND_BY_QN.exception_with_parameters(
qualified_name, asset_type.__name__
)
else:
raise ErrorCode.ASSET_NOT_FOUND_BY_GUID.exception_with_parameters(guid)
qualified_name = first_result.qualified_name
name = first_result.name
updated_asset = asset_type.updater(qualified_name=qualified_name, name=name)
for i, term in enumerate(terms):
if hasattr(term, "guid") and term.guid:
terms[i] = AtlasGlossaryTerm.ref_by_guid(
guid=term.guid, semantic=SaveSemantic.REMOVE
)
elif hasattr(term, "qualified_name") and term.qualified_name:
terms[i] = AtlasGlossaryTerm.ref_by_qualified_name(
qualified_name=term.qualified_name, semantic=SaveSemantic.REMOVE
)
updated_asset.assigned_terms = terms
response = self.save(entity=updated_asset)
if assets := response.assets_updated(asset_type=asset_type):
return assets[0]
return asset
return updated_asset

@validate_arguments
def find_connections_by_name(
Expand Down
10 changes: 8 additions & 2 deletions pyatlan/model/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ def process_relationship_attributes(cls, asset, attribute):
replace_attributes = []
exclude_attributes = set()

attribute_name, attribute_value = attribute, getattr(asset, attribute, None)
# Updated to use `asset.attribute` instead of `asset` to align with the API.
# This change ensures the correct value is retrieved regardless of the naming conventions.
attribute_name, attribute_value = attribute, getattr(
asset.attributes, attribute, None
)

# Process list of relationship attributes
if attribute_value and isinstance(attribute_value, list):
Expand All @@ -418,7 +422,9 @@ def process_relationship_attributes(cls, asset, attribute):
{to_camel_case(attribute_name): append_attributes}
)
if replace_attributes:
setattr(asset, attribute_name, replace_attributes)
# Updated to use `asset.attribute` instead of `asset` to align with the API.
# This change ensures the correct value is retrieved regardless of the naming conventions.
setattr(asset.attributes, attribute_name, replace_attributes)

# If 'remove', 'append', or both attributes are present and there are no 'replace' attributes,
# add the attribute to the set to exclude it from the bulk request payload.
Expand Down
12 changes: 11 additions & 1 deletion tests/integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,11 @@ def _test_remove_announcement(


def test_append_terms_with_guid(
client: AtlanClient, term1: AtlasGlossaryTerm, database: Database
client: AtlanClient,
term1: AtlasGlossaryTerm,
database: Database,
):
time.sleep(5)
assert (
database := client.asset.append_terms(
guid=database.guid, asset_type=Database, terms=[term1]
Expand All @@ -284,6 +287,7 @@ def test_append_terms_with_qualified_name(
term1: AtlasGlossaryTerm,
database: Database,
):
time.sleep(5)
assert (
database := client.asset.append_terms(
qualified_name=database.qualified_name, asset_type=Database, terms=[term1]
Expand All @@ -302,6 +306,7 @@ def test_append_terms_using_ref_by_guid_for_term(
term1: AtlasGlossaryTerm,
database: Database,
):
time.sleep(5)
assert (
database := client.asset.append_terms(
qualified_name=database.qualified_name,
Expand All @@ -323,6 +328,7 @@ def test_replace_a_term(
term2: AtlasGlossaryTerm,
database: Database,
):
time.sleep(5)
assert (
database := client.asset.append_terms(
qualified_name=database.qualified_name,
Expand Down Expand Up @@ -359,6 +365,7 @@ def test_replace_all_term(
term1: AtlasGlossaryTerm,
database: Database,
):
time.sleep(5)
assert (
database := client.asset.append_terms(
qualified_name=database.qualified_name,
Expand Down Expand Up @@ -391,6 +398,7 @@ def test_remove_term(
term2: AtlasGlossaryTerm,
database: Database,
):
time.sleep(5)
assert (
database := client.asset.append_terms(
qualified_name=database.qualified_name,
Expand Down Expand Up @@ -469,6 +477,7 @@ def test_get_asset_by_guid_when_table_specified_and_glossary_returned_raises_not


def test_get_by_guid_with_fs(client: AtlanClient, term: AtlasGlossaryTerm):
time.sleep(5)
# Default - should call `GET_ENTITY_BY_GUID` API
result = client.asset.get_by_guid(guid=term.guid, asset_type=AtlasGlossaryTerm)
assert isinstance(result, AtlasGlossaryTerm)
Expand Down Expand Up @@ -538,6 +547,7 @@ def test_get_by_guid_with_fs(client: AtlanClient, term: AtlasGlossaryTerm):


def test_get_by_qualified_name_with_fs(client: AtlanClient, term: AtlasGlossaryTerm):
time.sleep(5)
# Default - should call `GET_ENTITY_BY_GUID` API
assert term and term.qualified_name
result = client.asset.get_by_qualified_name(
Expand Down
Loading

0 comments on commit e900993

Please sign in to comment.