Skip to content

Commit

Permalink
feat: Added tests for async watcher
Browse files Browse the repository at this point in the history
I've added pytests to verify the functionality of async watcher callbacks.

Note: I used the existing pytest for synchronous calls as a reference for creating this async version.
  • Loading branch information
tanasecucliciu committed Jan 31, 2024
1 parent eabb1bd commit 43dba49
Showing 1 changed file with 179 additions and 1 deletion.
180 changes: 179 additions & 1 deletion tests/test_watcher_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import casbin
from tests.test_enforcer import get_examples, TestCaseBase

from unittest import IsolatedAsyncioTestCase

class SampleWatcher:
def __init__(self):
Expand Down Expand Up @@ -113,6 +113,103 @@ def start_watch(self):
pass


class AsyncSampleWatcher():
def __init__(self):
self.callback = None
self.notify_message = None

async def close(self):
pass

async def set_update_callback(self, callback):
"""
sets the callback function to be called when the policy is updated
:param callable callback: callback(event)
- event: event received from the rabbitmq
:return:
"""
self.callback = callback

async def update(self, msg):
"""
update the policy
"""
self.notify_message = msg
return True

async def update_for_add_policy(self, section, ptype, *params):
"""
update for add policy
:param section: section
:param ptype: policy type
:param params: other params
:return: True if updated
"""
message = "called add policy"
return await self.update(message)

async def update_for_remove_policy(self, section, ptype, *params):
"""
update for remove policy
:param section: section
:param ptype: policy type
:param params: other params
:return: True if updated
"""
message = "called remove policy"
return await self.update(message)

async def update_for_remove_filtered_policy(self, section, ptype, field_index, *params):
"""
update for remove filtered policy
:param section: section
:param ptype: policy type
:param field_index: field index
:param params: other params
:return:
"""
message = "called remove filtered policy"
return await self.update(message)

async def update_for_save_policy(self, model: casbin.Model):
"""
update for save policy
:param model: casbin model
:return:
"""
message = "called save policy"
return await self.update(message)

async def update_for_add_policies(self, section, ptype, *params):
"""
update for add policies
:param section: section
:param ptype: policy type
:param params: other params
:return:
"""
message = "called add policies"
return await self.update(message)

async def update_for_remove_policies(self, section, ptype, *params):
"""
update for remove policies
:param section: section
:param ptype: policy type
:param params: other params
:return:
"""
message = "called remove policies"
return await self.update(message)

async def start_watch(self):
"""
starts the watch thread
:return:
"""
pass


class TestWatcherEx(TestCaseBase):
def get_enforcer(self, model=None, adapter=None):
return casbin.Enforcer(
Expand Down Expand Up @@ -187,3 +284,84 @@ def test_auto_notify_disabled(self):

e.remove_policies(rules)
self.assertEqual(w.notify_message, None)


class TestAsyncWatcherEx(IsolatedAsyncioTestCase):
def get_enforcer(self, model=None, adapter=None):
return casbin.AsyncEnforcer(
model,
adapter,
)

async def test_auto_notify_enabled(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()

w = AsyncSampleWatcher()
e.set_watcher(w)
e.enable_auto_notify_watcher(True)

await e.save_policy()
self.assertEqual(w.notify_message, "called save policy")

await e.add_policy("admin", "data1", "read")
self.assertEqual(w.notify_message, "called add policy")

await e.remove_policy("admin", "data1", "read")
self.assertEqual(w.notify_message, "called remove policy")

await e.remove_filtered_policy(1, "data1")
self.assertEqual(w.notify_message, "called remove filtered policy")

rules = [
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
]
await e.add_policies(rules)
self.assertEqual(w.notify_message, "called add policies")

await e.remove_policies(rules)
self.assertEqual(w.notify_message, "called remove policies")


async def test_auto_notify_disabled(self):
e = self.get_enforcer(
get_examples("basic_model.conf"),
get_examples("basic_policy.csv"),
)
await e.load_policy()

w = SampleWatcher()
e.set_watcher(w)
e.enable_auto_notify_watcher(False)

await e.save_policy()
self.assertEqual(w.notify_message, "called save policy")

w.notify_message = None

await e.add_policy("admin", "data1", "read")
self.assertEqual(w.notify_message, None)

await e.remove_policy("admin", "data1", "read")
self.assertEqual(w.notify_message, None)

await e.remove_filtered_policy(1, "data1")
self.assertEqual(w.notify_message, None)

rules = [
["jack", "data4", "read"],
["katy", "data4", "write"],
["leyo", "data4", "read"],
["ham", "data4", "write"],
]
await e.add_policies(rules)
self.assertEqual(w.notify_message, None)

await e.remove_policies(rules)
self.assertEqual(w.notify_message, None)

0 comments on commit 43dba49

Please sign in to comment.