Skip to content

Commit

Permalink
Fix tests IV
Browse files Browse the repository at this point in the history
  • Loading branch information
mrvanes committed Nov 17, 2023
1 parent 5461f99 commit 74750a3
Showing 1 changed file with 30 additions and 23 deletions.
53 changes: 30 additions & 23 deletions tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def check_people(rdn, people, context_checks):

for u in people:
# Suspended users don't appear in LDAP
if u['user']['suspended'] is True:
if u['user'].get('suspended') is True:
continue

user_object = check_object(f"uid={u['user']['username']},{rdn}")
Expand Down Expand Up @@ -81,9 +81,10 @@ def check_group(rdn, group, members):

active_members = []
for m in members:
logger.debug(f"member: {m['user']['username']}, status: {m['status']}")
if m['status'] == 'active' and m['user']['suspended'] is not True:
active_members.append(m)
# logger.debug(f"member: {m['user']['username']}, status: {m['status']}")
logger.debug(f"member: {m['user']['username']}")
# if m['status'] == 'active' and m['user'].get('suspended') is not True:
active_members.append(m)

if len(active_members) > 0:
member_element = group_object[list(group_object)[0]]['member']
Expand All @@ -108,15 +109,21 @@ def check_ldap(rdn, people, groups, group_name_function, context_checks=[]):
for g in groups:
check_group(f"ou=groups,{rdn}", group_name_function(g['short_name']), g['collaboration_memberships'])

for c in self.src.collaborations():
logger.info(f"* Checking collaboration: {c['name']}")
services = self.src.service_collaborations()

detail = self.src.collaboration(c['id'])
for s in detail['services']:
for s_id, s in services.items():
logger.info(f"* checking service: {s_id}")

for co_id, c in s.get('cos', {}).items():
logger.info(f"* Checking collaboration: {c['name']}")

# detail = self.src.collaboration(c['id'])
# detail = cos(c['id'])
# for s_id, s in services.items():

def check_ordered_person_expiry(person, _):
# When CO is expired, the person in de Ordered Subtree should mut be expired as well
if detail['status'] == 'expired':
if c['status'] == 'expired':
logger.debug(f"Checking expiry status of {person['user']['username']}")
self.assertEqual([person['status'], 'expired'])

Expand Down Expand Up @@ -161,14 +168,14 @@ def group_name_ordered(g):
def group_name_flat(g):
return f"{org_sname}.{c['short_name']}.{g}"

logger.info(f"** Checking Service: {s['ldap_identifier']}, Enabled: {s['ldap_enabled']}")
logger.info(f"** Checking Service: {s_id}, Enabled: {s['enabled']}")

if s['ldap_enabled']:
if s['enabled']:
check_ldap(
f"o={org_sname}.{c['short_name']},dc=ordered,dc={s['ldap_identifier']},\
f"o={org_sname}.{c['short_name']},dc=ordered,dc={s_id},\
{self.dst_conf['basedn']}",
detail['collaboration_memberships'],
detail['groups'],
c['collaboration_memberships'],
c['groups'],
group_name_ordered,
context_checks=[
check_ordered_person_expiry,
Expand All @@ -177,32 +184,32 @@ def group_name_flat(g):
)

check_ldap(
f"dc=flat,dc={s['ldap_identifier']},{self.dst_conf['basedn']}",
detail['collaboration_memberships'],
detail['groups'],
f"dc=flat,dc={s_id},{self.dst_conf['basedn']}",
c['collaboration_memberships'],
c['groups'],
group_name_flat
)
elif object_count(f"dc={s['ldap_identifier']},{self.dst_conf['basedn']}") > 0:
elif object_count(f"dc={s_id},{self.dst_conf['basedn']}") > 0:
# in case the service 'exists' in LDAP but is not enabled, make sure
# people and group are 'empty'
check_ldap(
f"o={org_sname}.{c['short_name']},dc=ordered,dc={s['ldap_identifier']},\
f"o={org_sname}.{c['short_name']},dc=ordered,dc={s_id},\
{self.dst_conf['basedn']}",
[],
[],
group_name_ordered
)
check_ldap(
f"dc=flat,dc={s['ldap_identifier']},{self.dst_conf['basedn']}",
f"dc=flat,dc={s_id},{self.dst_conf['basedn']}",
[],
[],
group_name_flat
)

if object_count(f"dc={s['ldap_identifier']},{self.dst_conf['basedn']}") > 0:
logger.info(f"*** Checking Admin account: {s['ldap_identifier']}")
if object_count(f"dc={s_id},{self.dst_conf['basedn']}") > 0:
logger.info(f"*** Checking Admin account: {s_id}")
self.assertTrue('ldap_password' in s)
admin_object = check_object(f"cn=admin,dc={s['ldap_identifier']},"
admin_object = check_object(f"cn=admin,dc={s_id},"
f"{self.dst_conf['basedn']}", expected_count=1)
ldap_password = s['ldap_password']
if ldap_password:
Expand Down

0 comments on commit 74750a3

Please sign in to comment.