Skip to content

Commit

Permalink
feat: add test
Browse files Browse the repository at this point in the history
  • Loading branch information
HashCookie committed Nov 25, 2024
1 parent c1ea804 commit 2fb4770
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 0 deletions.
5 changes: 5 additions & 0 deletions casbin/persist/adapters/filtered_file_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def load_filtered_policy(self, model, filter):

try:
filter_value = [filter.__dict__["P"]] + [filter.__dict__["G"]]
is_empty_filter = all(not f for f in filter_value) or all(
all(not x.strip() for x in f) if f else True for f in filter_value
)
if is_empty_filter:
return self.load_policy(model)
except:
raise RuntimeError("invalid filter type")

Expand Down
167 changes: 167 additions & 0 deletions tests/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
from unittest import TestCase
from tests.test_enforcer import get_examples
from casbin.persist.adapters import FilteredFileAdapter
from casbin.persist.adapters.filtered_file_adapter import filter_line, filter_words
import tempfile
import shutil
import os


class Filter:
Expand Down Expand Up @@ -141,3 +145,166 @@ def test_filtered_adapter_invalid_filepath(self):

with self.assertRaises(RuntimeError):
e.load_filtered_policy(None)

def test_empty_filter_array(self):
"""Test filter for empty array."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = []
filter.G = []

e.load_filtered_policy(filter)
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_empty_string_filter(self):
"""Test the filter for all empty strings."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = ["", "", ""]
filter.G = ["", "", ""]

e.load_filtered_policy(filter)
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_mixed_empty_filter(self):
"""测试混合空字符串和非空字符串的过滤器"""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = ["", "domain1", ""]
filter.G = ["", "", "domain1"]

e.load_filtered_policy(filter)
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertFalse(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_nonexistent_domain_filter(self):
"""Testing the filter for a non-existent domain."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = ["", "domain3"]
filter.G = ["", "", "domain3"]

e.load_filtered_policy(filter)
self.assertFalse(e.has_policy(["admin", "domain3", "data3", "read"]))

def test_empty_filter_array(self):
"""Test filter for empty array."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = []
filter.G = []

try:
e.load_filtered_policy(filter)
except:
raise RuntimeError("unexpected error with empty filter arrays")

self.assertFalse(e.is_filtered(), "Adapter should not be marked as filtered with empty filters")

self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_empty_string_filter(self):
"""Test the filter for all empty strings."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = ["", "", ""]
filter.G = ["", "", ""]

try:
e.load_filtered_policy(filter)
except:
raise RuntimeError("unexpected error with empty string filters")

self.assertFalse(e.is_filtered(), "Adapter should not be marked as filtered with empty string filters")

try:
e.save_policy()
except:
raise RuntimeError("unexpected error in SavePolicy with empty string filters")

self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_mixed_empty_filter(self):
"""Test the filter for mixed empty and non-empty strings."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = ["", "domain1", ""]
filter.G = ["", "", "domain1"]

try:
e.load_filtered_policy(filter)
except:
raise RuntimeError("unexpected error with mixed empty filters")

self.assertTrue(e.is_filtered(), "Adapter should be marked as filtered")

with self.assertRaises(RuntimeError):
e.save_policy()

self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertFalse(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_whitespace_filter(self):
"""Test the filter for all blank characters."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = [" ", " ", "\t"]
filter.G = ["\n", " ", " "]

e.load_filtered_policy(filter)

self.assertFalse(e.is_filtered())
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))

def test_filter_line_edge_cases(self):
"""Test the boundary cases of the filter_line function."""
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))

self.assertFalse(filter_line("", [[""], [""]]))

self.assertFalse(filter_line("invalid_line", [[""], [""]]))

self.assertFalse(filter_line("p, admin, domain1, data1, read", None))

def test_filter_words_edge_cases(self):
"""Test the boundary cases of the filter_words function."""
self.assertTrue(filter_words(["p"], ["filter1", "filter2"]))

self.assertFalse(filter_words(["p", "admin", "domain1"], []))

line = ["admin", "domain1", "data*", "read"]
filter = ["", "", "data1", ""]
self.assertTrue(filter_words(line, filter))

def test_load_filtered_policy_with_comments(self):
"""Test loading filtering policies with comments."""
temp_file = tempfile.NamedTemporaryFile(delete=False)
try:
shutil.copyfile(get_examples("rbac_with_domains_policy.csv"), temp_file.name)

with open(temp_file.name, "a") as f:
f.write("\n# This is a comment\np, admin, domain1, data3, read")

adapter = FilteredFileAdapter(temp_file.name)
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
filter = Filter()
filter.P = ["", "domain1"]
filter.G = ["", "", "domain1"]

e.load_filtered_policy(filter)
self.assertTrue(e.has_policy(["admin", "domain1", "data3", "read"]))
finally:
os.unlink(temp_file.name)

0 comments on commit 2fb4770

Please sign in to comment.