From 4337fb77ba4598cb98b2e08a2452c994ad34d50d Mon Sep 17 00:00:00 2001 From: "Dylan Wilson (bamhm182)" Date: Sun, 13 Nov 2022 01:50:21 +0000 Subject: [PATCH] Refactored target queries --- docs/src/usage/plugins/targets.md | 18 ++++- src/synack/plugins/targets.py | 60 ++++++--------- test/test_targets.py | 123 +++++++++++++++--------------- 3 files changed, 102 insertions(+), 99 deletions(-) diff --git a/docs/src/usage/plugins/targets.md b/docs/src/usage/plugins/targets.md index e406446..3544faf 100644 --- a/docs/src/usage/plugins/targets.md +++ b/docs/src/usage/plugins/targets.md @@ -16,7 +16,6 @@ >> 'DAPPERDINGO' >> ``` - ## targets.build_scope_host_db(slug, scope) > Prints a list of IPs ready to ingest into the Database @@ -147,6 +146,21 @@ >> [{"credentials": [{...},...],...}] >> ``` +## targets.get_query(status='registered', query_changes={}) + +> Pulls back a list of targets matching the specified query +> +> | Arguments | Type | Description +> | --- | --- | --- +> | `status` | string | The type of targets to pull back. (Ex: `registered`, `unregistered`, `upcoming`, `all`) +> | `query_changes` | dict() | Changes to make to the standard query. (Ex: `{"sorting['field']": "dateUploaded"}` +> +>> Examples +>> ```python3 +>> >>> h.targets.get_query(status='unregistered') +>> [{"codename": "SLEEPYSLUG", ...}, ...] +>> ``` + ## targets.get_registered_summary() > The Registered Summary is a short list of information about every target you have registered. @@ -230,7 +244,7 @@ >> Examples >> ```python3 >> >>> h.targets.get_upcoming() ->> [{'codename': 'SLEEPYSLUG', 'slug': '1o2h8o', 'category_name': 'Web Application', 'organization_name': 'SLEEPY Orgnization', 'upcoming_start_date': 1668430800}, ...] +>> [{'codename': 'SLEEPYSLUG', 'upcoming_start_date': 1668430800, ...}, ...] >> ``` ## targets.set_connected(target, **kwargs) diff --git a/src/synack/plugins/targets.py b/src/synack/plugins/targets.py index 7b4f967..b4adf9b 100644 --- a/src/synack/plugins/targets.py +++ b/src/synack/plugins/targets.py @@ -135,6 +135,26 @@ def get_credentials(self, **kwargs): if res.status_code == 200: return res.json() + def get_query(self, status='registered', query_changes={}): + """Get information about targets returned from a query""" + if not self.db.categories: + self.get_assessments() + categories = [] + for category in self.db.categories: + if category.passed_practical and category.passed_written: + categories.append(category.id) + query = { + 'filter[primary]': status, + 'filter[secondary]': 'all', + 'filter[industry]': 'all', + 'filter[category][]': categories + } + query.update(query_changes) + res = self.api.request('GET', 'targets', query=query) + if res.status_code == 200: + self.db.add_targets(res.json(), is_registered=True) + return res.json() + def get_registered_summary(self): """Get information on your registered targets""" res = self.api.request('GET', 'targets/registered_summary') @@ -199,45 +219,15 @@ def get_scope_web(self, target=None, **kwargs): def get_unregistered(self): """Get slugs of all unregistered targets""" - if not self.db.categories: - self.get_assessments() - categories = [] - for c in self.db.categories: - if c.passed_practical and c.passed_practical: - categories.append(c.id) - query = { - 'filter[primary]': 'unregistered', - 'filter[secondary]': 'all', - 'filter[industry]': 'all', - 'filter[category][]': categories - } - res = self.api.request('GET', 'targets', query=query) - ret = [] - if res.status_code == 200: - self.db.add_targets(res.json(), is_registered=True) - for t in res.json(): - ret.append({'codename': t['codename'], 'slug': t['slug']}) - return ret + return self.get_query(status='unregistered') def get_upcoming(self): """Get slugs and upcoming start dates of all upcoming targets""" - query = { - 'filter[primary]': 'upcoming', - 'filter[secondary]': 'all', - 'filter[industry]': 'all', - 'sorting[field]': 'upcomingStartDate', - 'sorting[direction]': 'asc' + query_changes = { + 'sorting[field]': 'upcomingStartDate', + 'sorting[direction]': 'asc' } - res = self.api.request('GET', 'targets', query=query) - ret = [] - if res.status_code == 200: - for t in res.json(): - ret.append({'codename': t['codename'], - 'slug': t['slug'], - 'category_name': t['category']['name'], - 'organization_name': t['organization']['name'], - 'upcoming_start_date': t['upcoming_start_date']}) - return ret + return self.get_query(status='upcoming', query_changes=query_changes) def set_connected(self, target=None, **kwargs): """Connect to a target""" diff --git a/test/test_targets.py b/test/test_targets.py index 4b8e80f..5584e39 100644 --- a/test/test_targets.py +++ b/test/test_targets.py @@ -271,6 +271,51 @@ def test_get_credentials(self): self.targets.get_credentials(codename='SLEEPYSLUG')) self.targets.api.request.assert_called_with('POST', url) + def test_get_query(self): + """Should get a list of targets""" + self.targets.db.categories = [ + Category(id=1, passed_practical=True, passed_written=True), + Category(id=2, passed_practical=True, passed_written=True), + Category(id=3, passed_practical=False, passed_written=False), + ] + query = { + 'filter[primary]': 'unregistered', + 'filter[secondary]': 'all', + 'filter[industry]': 'all', + 'filter[category][]': [1, 2] + } + self.targets.api.request.return_value.status_code = 200 + results = [ + { + "codename": "SLEEPYSLUG", + "slug": "1o2h8o" + } + ] + self.targets.api.request.return_value.json.return_value = results + self.assertEqual(results, self.targets.get_unregistered()) + self.targets.api.request.assert_called_with("GET", + "targets", + query=query) + + def test_get_query_assessments_empty(self): + """Should get a list of unregistered targets""" + self.targets.get_assessments = MagicMock() + self.targets.db.categories = [] + query = { + 'filter[primary]': 'unregistered', + 'filter[secondary]': 'all', + 'filter[industry]': 'all', + 'filter[category][]': [] + } + self.targets.api.request.return_value.status_code = 200 + results = [] + self.targets.api.request.return_value.json.return_value = results + self.assertEqual(results, self.targets.get_unregistered()) + self.targets.get_assessments.assert_called_with() + self.targets.api.request.assert_called_with("GET", + "targets", + query=query) + def test_get_registered_summary(self): """Should make a request to get basic info about registered targets""" t1 = { @@ -364,74 +409,28 @@ def test_get_scope_web(self): self.targets.api.request.return_value.json.assert_called() def test_get_unregistered(self): - """Should get a list unregistered targets""" - self.targets.db.categories = [ - Category(id=1, passed_practical=True, passed_written=True), - Category(id=2, passed_practical=True, passed_written=True), - Category(id=3, passed_practical=False, passed_written=False), - ] - query = { - 'filter[primary]': 'unregistered', - 'filter[secondary]': 'all', - 'filter[industry]': 'all', - 'filter[category][]': [1, 2] - } - self.targets.api.request.return_value.status_code = 200 - unreg = [ - { - "codename": "SLEEPYSLUG", - "slug": "1o2h8o" - } + """Should query for unregistered targets""" + results = [ + {'codename': 'SLEEPYSLUG', 'slug': '1283hi'} ] - self.targets.api.request.return_value.json.return_value = unreg - self.assertEqual(unreg, self.targets.get_unregistered()) - self.targets.api.request.assert_called_with("GET", - "targets", - query=query) + self.targets.get_query = MagicMock() + self.targets.get_query.return_value = results + self.assertEquals(results, self.targets.get_unregistered()) + self.targets.get_query.assert_called_with(status='unregistered') def test_get_upcoming(self): - """Should get a list upcoming targets""" - query = { - 'filter[primary]': 'upcoming', - 'filter[secondary]': 'all', - 'filter[industry]': 'all', + """Should query for upcoming targets""" + results = [ + {'codename': 'SLEEPYSLUG', 'slug': '1283hi'} + ] + query_changes = { 'sorting[field]': 'upcomingStartDate', 'sorting[direction]': 'asc' } - self.targets.api.request.return_value.status_code = 200 - upcoming = [ - { - "codename": "SLEEPYSLUG", - "slug": "1o2h8o", - 'category_name': 'Web Application', - 'organization_name': 'SLEEPY Orgnization', - 'upcoming_start_date': 1668430800 - } - ] - self.targets.api.request.return_value.json.return_value = upcoming - self.assertEqual(upcoming, self.targets.get_upcoming()) - self.targets.api.request.assert_called_with("GET", - "targets", - query=query) - - def test_get_unregistered_assessments_empty(self): - """Should get a list of unregistered targets""" - self.targets.get_assessments = MagicMock() - self.targets.db.categories = [] - query = { - 'filter[primary]': 'unregistered', - 'filter[secondary]': 'all', - 'filter[industry]': 'all', - 'filter[category][]': [] - } - self.targets.api.request.return_value.status_code = 200 - unreg = [] - self.targets.api.request.return_value.json.return_value = unreg - self.assertEqual(unreg, self.targets.get_unregistered()) - self.targets.get_assessments.assert_called_with() - self.targets.api.request.assert_called_with("GET", - "targets", - query=query) + self.targets.get_query = MagicMock() + self.targets.get_query.return_value = results + self.assertEquals(results, self.targets.get_upcoming()) + self.targets.get_query.assert_called_with(status='upcoming', query_changes=query_changes) def test_set_connected(self): """Should connect to a given target provided kwargs"""