diff --git a/.github/workflows/integration-testing.yml b/.github/workflows/integration-testing.yml index c884cce4..229a4140 100644 --- a/.github/workflows/integration-testing.yml +++ b/.github/workflows/integration-testing.yml @@ -60,9 +60,9 @@ jobs: uses: convictional/trigger-workflow-and-wait@v1.6.5 with: owner: opencybersecurityalliance - repo: hunting-stack-testing + repo: federated-search-end-to-end-testing github_token: ${{ secrets.KESTREL_STIXSHIFTER_INTEGRATION_TESTING_TOKEN }} - workflow_file_name: kestrel-integration-testing-flow.yml + workflow_file_name: kestrel-end-to-end-testing-flow.yml ref: main wait_interval: 10 propagate_failure: true diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3baf537c..ed1ebac4 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,39 @@ The format is based on `Keep a Changelog`_. Unreleased ========== +1.6.1 (2023-05-31) +================== + +Changed +------- + +- Kestrel variable definition syntax changed back to ``CNAME`` from ``ECNAME`` +- stix-shifter data source profile config changes + + - Replace ``result_limit`` with ``retrieval_batch_size`` + - Replace ``timeout`` with ``single_batch_timeout`` + - Add default values for the configs + - Document updates + - The new fields will be processed by Kestrel before given to stix-shifter + +- Replace stix-shifter sync APIs with async APIs + +Added +----- + +- Scalability end-to-end testing for large query with multiple pages +- Test cases for new stix-shfiter data source configs + +Fixed +----- + +- Temporary fix of stix-shifter/issues/1493 + + - Add retry-once logic if server timeout (busy CPU on the client side) + - Nullify the pipelining; need better long-term fix to enable it + +- Fixed bugs and reimplement ``transmission_complete()`` in stix-shifter data source interface + 1.6.0 (2023-05-17) ================== @@ -183,6 +216,7 @@ Added ----- - Kestrel doc for v1.5 syntax, mostly the language specification chapter + - New section on the Kestrel patterning: Extended Centered Graph Pattern (ECGP) - New section on entity, attribute, and related mechanisms - Commands section updated with v1.5 syntax diff --git a/docs/installation/runtime.rst b/docs/installation/runtime.rst index 07a7ec82..b0e4769d 100644 --- a/docs/installation/runtime.rst +++ b/docs/installation/runtime.rst @@ -8,7 +8,7 @@ please use Python inside Windows Subsystem for Linux (WSL). General Requirements ==================== -Python 3.8 is required. Refer to the `Python installation guide`_ if you need to install or upgrade Python. +Python 3.8 is required. Follow the `Python installation guide`_ to install or upgrade Python. OS-specific Requirements ======================== diff --git a/setup.cfg b/setup.cfg index d9b4b08a..f3c12071 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = kestrel-lang -version = 1.6.0 +version = 1.6.1 description = Kestrel Threat Hunting Language long_description = file:README.rst long_description_content_type = text/x-rst @@ -37,7 +37,7 @@ install_requires = docker>=5.0.0 stix-shifter>=5.3.0 stix-shifter-utils>=5.3.0 - firepit>=2.3.19 + firepit>=2.3.20 typeguard tests_require = pytest diff --git a/src/kestrel/syntax/kestrel.lark b/src/kestrel/syntax/kestrel.lark index e3cbceee..6bdb1a14 100644 --- a/src/kestrel/syntax/kestrel.lark +++ b/src/kestrel/syntax/kestrel.lark @@ -64,7 +64,7 @@ save: "SAVE"i VARIABLE "TO"i stdpath variables: VARIABLE ("," VARIABLE)* -VARIABLE: ECNAME +VARIABLE: CNAME // // Expression @@ -293,7 +293,7 @@ SIMPLE_STRING: ECNAME // check Lark example of Python parser for reference ADVANCED_STRING: /(r?)("(?!"").*?(? COMMENT %ignore WS diff --git a/src/kestrel_datasource_stixshifter/config.py b/src/kestrel_datasource_stixshifter/config.py index e1625b03..ce5754e0 100644 --- a/src/kestrel_datasource_stixshifter/config.py +++ b/src/kestrel_datasource_stixshifter/config.py @@ -14,6 +14,7 @@ STIXSHIFTER_DEBUG_ENV_VAR = "KESTREL_STIXSHIFTER_DEBUG" # debug mode for stix-shifter if the environment variable exists ENV_VAR_PREFIX = "STIXSHIFTER_" RETRIEVAL_BATCH_SIZE = 2000 +SINGLE_BATCH_TIMEOUT = 60 FAST_TRANSLATE_CONNECTORS = [] # Suggested: ["qradar", "elastic_ecs"] ASYNC_TRANSLATION_WORKERS_CNT = 1 @@ -135,13 +136,47 @@ def get_datasource_from_profiles(profile_name, profiles): f'invalid {profile_name} configuration section: no "auth" field', ) + if "options" not in connection: + connection["options"] = {} + retrieval_batch_size = RETRIEVAL_BATCH_SIZE - if "options" in connection and "retrieval_batch_size" in connection["options"]: - # need to remove the non-stix-shifter field "retrieval_batch_size" to avoid stix-shifter error - retrieval_batch_size = connection["options"].pop("retrieval_batch_size") + if "retrieval_batch_size" in connection["options"]: + # remove the non-stix-shifter field "retrieval_batch_size" to avoid stix-shifter error + try: + retrieval_batch_size = int( + connection["options"].pop("retrieval_batch_size") + ) + except: + raise InvalidDataSource( + profile_name, + "stixshifter", + f"invalid {profile_name} connection section: options.retrieval_batch_size", + ) + # rename this field for stix-shifter use; x2 the size to ensure retrieval _logger.debug( f"profile-loaded retrieval_batch_size: {retrieval_batch_size}" ) + connection["options"]["result_limit"] = retrieval_batch_size * 2 + + single_batch_timeout = SINGLE_BATCH_TIMEOUT + if "single_batch_timeout" in connection["options"]: + # remove the non-stix-shifter field "single_batch_timeout" to avoid stix-shifter error + try: + single_batch_timeout = int( + connection["options"].pop("single_batch_timeout") + ) + except: + raise InvalidDataSource( + profile_name, + "stixshifter", + f"invalid {profile_name} connection section: options.single_batch_timeout", + ) + # rename this field for stix-shifter use + _logger.debug( + f"profile-loaded single_batch_timeout: {single_batch_timeout}" + ) + connection["options"]["timeout"] = single_batch_timeout + return connector_name, connection, configuration, retrieval_batch_size diff --git a/src/kestrel_datasource_stixshifter/interface.py b/src/kestrel_datasource_stixshifter/interface.py index f4fc1800..1612cccd 100644 --- a/src/kestrel_datasource_stixshifter/interface.py +++ b/src/kestrel_datasource_stixshifter/interface.py @@ -29,9 +29,8 @@ selfSignedCert: false # this means do NOT check cert indices: host101 options: # use any of this section when needed - result_limit: 500000 # stix-shifter default: 10000 - retrieval_batch_size: 10000 # safe to set to 10000 to match default Elasticsearch page size; Kestrel default: 2000 - timeout: 300 # allow a query to run for 5 minutes (300 seconds) before timing out; stix-shifter default: 30 + retrieval_batch_size: 10000 # set to 10000 to match default Elasticsearch page size; Kestrel default across connectors: 2000 + single_batch_timeout: 120 # increase it if hit 60 seconds (Kestrel default) timeout error for each batch of retrieval dialects: # more info: https://github.com/opencybersecurityalliance/stix-shifter/tree/develop/stix_shifter_modules/elastic_ecs#dialects - beats # need it if the index is created by Filebeat/Winlogbeat/*beat config: @@ -103,7 +102,6 @@ import asyncio import json -import time import copy import logging @@ -209,7 +207,7 @@ async def query(uri, pattern, session_id, config, store=None): ) translation_options = copy.deepcopy(connection_dict.get("options", {})) - dsl = translation.translate( + dsl = await translation.translate_async( connector_name, "query", query_metadata, pattern, translation_options ) @@ -259,30 +257,15 @@ async def query(uri, pattern, session_id, config, store=None): batch_index = 0 for query in dsl["queries"]: - search_meta_result = transmission.query(query) + search_meta_result = await transmission.query_async(query) if search_meta_result["success"]: search_id = search_meta_result["search_id"] - if transmission.is_async(): - time.sleep(1) - status = transmission.status(search_id) - if status["success"]: - while ( - status["progress"] < 100 - and status["status"] == "RUNNING" - ): - status = transmission.status(search_id) - else: - stix_shifter_error_msg = ( - status["error"] - if "error" in status - else "details not avaliable" - ) - raise DataSourceError( - f"STIX-shifter transmission.status() failed with message: {stix_shifter_error_msg}" - ) + + await transmission_complete(transmission, search_id) # run the producer and wait for completion batch_index = await transmission_produce( + connector_name, transmission_queue, transmission, search_id, @@ -329,14 +312,37 @@ def ingest_stixbundle_filepath(batch_index): return ingest_stixbundle_filepath +async def transmission_complete(transmission, search_id): + status = None + while True: + status = await transmission.status_async(search_id) + if not status["success"]: + stix_shifter_error_msg = ( + status["error"] if "error" in status else "details not avaliable" + ) + raise DataSourceError( + f"STIX-shifter transmission.status() failed with message: {stix_shifter_error_msg}" + ) + elif status["progress"] < 100 and status["status"] == "RUNNING": + await asyncio.sleep(1) + else: + break + + async def transmission_produce( - transmission_queue, transmission, search_id, retrieval_batch_size, batch_index + connector_name, + transmission_queue, + transmission, + search_id, + retrieval_batch_size, + batch_index, ): result_retrieval_offset = 0 has_remaining_results = True metadata = None + is_retry_cycle = False while has_remaining_results: - result_batch = transmission.results( + result_batch = await transmission.results_async( search_id, result_retrieval_offset, retrieval_batch_size, metadata ) if result_batch["success"]: @@ -350,15 +356,31 @@ async def transmission_produce( has_remaining_results = False if "metadata" in result_batch: metadata = result_batch["metadata"] + is_retry_cycle = False else: stix_shifter_error_msg = ( result_batch["error"] if "error" in result_batch else "details not avaliable" ) - raise DataSourceError( - f"STIX-shifter transmission.results() failed with message: {stix_shifter_error_msg}" - ) + if ( + stix_shifter_error_msg.startswith( + f"{connector_name} connector error => server timeout_error" + ) + and not is_retry_cycle + ): + # mitigate https://github.com/opencybersecurityalliance/stix-shifter/issues/1493 + # only give it one retry to mitigate high CPU occupation + # otherwise, it could be a real server connection issue + # /stix_shifter_utils/stix_transmission/utils/RestApiClientAsync.py + _logger.info( + f"busy CPU; hit stix-shifter transmission aiohttp connection timeout; retry" + ) + is_retry_cycle = True + else: + raise DataSourceError( + f"STIX-shifter transmission.results() failed with message: {stix_shifter_error_msg}" + ) return batch_index @@ -376,7 +398,7 @@ async def translation_ingest_consume( # wait for an item from the producer result_batch = await transmission_queue.get() - stixbundle = translation.translate( + stixbundle = await translation.translate_async( connector_name, "results", query_metadata, @@ -416,19 +438,17 @@ async def fast_translate_ingest_consume( while True: # wait for an item from the producer result_batch = await transmission_queue.get() - try: - await fast_translate( - connector_name, - result_batch["data"], - translation, - translation_options, - identity, - query_id, - store, - ) - finally: - # Notify the queue that the item has been processed - transmission_queue.task_done() + await fast_translate( + connector_name, + result_batch["data"], + translation, + translation_options, + identity, + query_id, + store, + ) + # Notify the queue that the item has been processed + transmission_queue.task_done() async def fast_translate( @@ -443,7 +463,7 @@ async def fast_translate( # Use the alternate, faster DataFrame-based translation (in firepit) _logger.debug("Using fast translation for connector %s", connector_name) transformers = get_module_transformers(connector_name) - mapping = translation.translate( + mapping = await translation.translate_async( connector_name, stix_translation.MAPPING, None, diff --git a/tests/test_stixshifter.py b/tests/test_stixshifter.py index 6c3cb843..a67d0526 100644 --- a/tests/test_stixshifter.py +++ b/tests/test_stixshifter.py @@ -49,6 +49,7 @@ def test_yaml_profiles_refresh(tmp_path): indices: host101 options: retrieval_batch_size: 10000 + single_batch_timeout: 120 dialects: - beats config: @@ -81,6 +82,8 @@ def test_yaml_profiles_refresh(tmp_path): assert connector_name == "elastic_ecs" assert configuration["auth"]["id"] == "profileA" assert configuration["auth"]["api_key"] == "qwer" + assert connection["options"]["timeout"] == 60 + assert connection["options"]["result_limit"] == 2000 * 2 assert retrieval_batch_size == 2000 with open(profile_file, "w") as pf: @@ -94,4 +97,6 @@ def test_yaml_profiles_refresh(tmp_path): assert connector_name == "elastic_ecs" assert configuration["auth"]["id"] == "profileB" assert configuration["auth"]["api_key"] == "asdf" + assert connection["options"]["timeout"] == 120 + assert connection["options"]["result_limit"] == 10000 * 2 assert retrieval_batch_size == 10000