Skip to content

Commit

Permalink
Merge pull request #349 from opencybersecurityalliance/develop
Browse files Browse the repository at this point in the history
v1.6.1
  • Loading branch information
subbyte authored May 31, 2023
2 parents 9adb547 + aecddb4 commit 4ef70ac
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ jobs:
uses: convictional/trigger-workflow-and-wait@v1.6.5
with:
owner: opencybersecurityalliance
repo: hunting-stack-testing
repo: federated-search-end-to-end-testing
github_token: ${{ secrets.KESTREL_STIXSHIFTER_INTEGRATION_TESTING_TOKEN }}
workflow_file_name: kestrel-integration-testing-flow.yml
workflow_file_name: kestrel-end-to-end-testing-flow.yml
ref: main
wait_interval: 10
propagate_failure: true
Expand Down
34 changes: 34 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,39 @@ The format is based on `Keep a Changelog`_.
Unreleased
==========

1.6.1 (2023-05-31)
==================

Changed
-------

- Kestrel variable definition syntax changed back to ``CNAME`` from ``ECNAME``
- stix-shifter data source profile config changes

- Replace ``result_limit`` with ``retrieval_batch_size``
- Replace ``timeout`` with ``single_batch_timeout``
- Add default values for the configs
- Document updates
- The new fields will be processed by Kestrel before given to stix-shifter

- Replace stix-shifter sync APIs with async APIs

Added
-----

- Scalability end-to-end testing for large query with multiple pages
- Test cases for new stix-shfiter data source configs

Fixed
-----

- Temporary fix of stix-shifter/issues/1493

- Add retry-once logic if server timeout (busy CPU on the client side)
- Nullify the pipelining; need better long-term fix to enable it

- Fixed bugs and reimplement ``transmission_complete()`` in stix-shifter data source interface

1.6.0 (2023-05-17)
==================

Expand Down Expand Up @@ -183,6 +216,7 @@ Added
-----

- Kestrel doc for v1.5 syntax, mostly the language specification chapter

- New section on the Kestrel patterning: Extended Centered Graph Pattern (ECGP)
- New section on entity, attribute, and related mechanisms
- Commands section updated with v1.5 syntax
Expand Down
2 changes: 1 addition & 1 deletion docs/installation/runtime.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ please use Python inside Windows Subsystem for Linux (WSL).
General Requirements
====================

Python 3.8 is required. Refer to the `Python installation guide`_ if you need to install or upgrade Python.
Python 3.8 is required. Follow the `Python installation guide`_ to install or upgrade Python.

OS-specific Requirements
========================
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = kestrel-lang
version = 1.6.0
version = 1.6.1
description = Kestrel Threat Hunting Language
long_description = file:README.rst
long_description_content_type = text/x-rst
Expand Down Expand Up @@ -37,7 +37,7 @@ install_requires =
docker>=5.0.0
stix-shifter>=5.3.0
stix-shifter-utils>=5.3.0
firepit>=2.3.19
firepit>=2.3.20
typeguard
tests_require =
pytest
Expand Down
4 changes: 2 additions & 2 deletions src/kestrel/syntax/kestrel.lark
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ save: "SAVE"i VARIABLE "TO"i stdpath

variables: VARIABLE ("," VARIABLE)*

VARIABLE: ECNAME
VARIABLE: CNAME

//
// Expression
Expand Down Expand Up @@ -293,7 +293,7 @@ SIMPLE_STRING: ECNAME
// check Lark example of Python parser for reference
ADVANCED_STRING: /(r?)("(?!"").*?(?<!\\)(\\\\)*?"|'(?!'').*?(?<!\\)(\\\\)*?')/

%import common (LETTER, DIGIT, WS, INT, WORD, NUMBER, _STRING_ESC_INNER)
%import common (LETTER, DIGIT, WS, INT, WORD, NUMBER, CNAME, _STRING_ESC_INNER)
%import common.SH_COMMENT -> COMMENT

%ignore WS
Expand Down
41 changes: 38 additions & 3 deletions src/kestrel_datasource_stixshifter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
STIXSHIFTER_DEBUG_ENV_VAR = "KESTREL_STIXSHIFTER_DEBUG" # debug mode for stix-shifter if the environment variable exists
ENV_VAR_PREFIX = "STIXSHIFTER_"
RETRIEVAL_BATCH_SIZE = 2000
SINGLE_BATCH_TIMEOUT = 60
FAST_TRANSLATE_CONNECTORS = [] # Suggested: ["qradar", "elastic_ecs"]
ASYNC_TRANSLATION_WORKERS_CNT = 1

Expand Down Expand Up @@ -135,13 +136,47 @@ def get_datasource_from_profiles(profile_name, profiles):
f'invalid {profile_name} configuration section: no "auth" field',
)

if "options" not in connection:
connection["options"] = {}

retrieval_batch_size = RETRIEVAL_BATCH_SIZE
if "options" in connection and "retrieval_batch_size" in connection["options"]:
# need to remove the non-stix-shifter field "retrieval_batch_size" to avoid stix-shifter error
retrieval_batch_size = connection["options"].pop("retrieval_batch_size")
if "retrieval_batch_size" in connection["options"]:
# remove the non-stix-shifter field "retrieval_batch_size" to avoid stix-shifter error
try:
retrieval_batch_size = int(
connection["options"].pop("retrieval_batch_size")
)
except:
raise InvalidDataSource(
profile_name,
"stixshifter",
f"invalid {profile_name} connection section: options.retrieval_batch_size",
)
# rename this field for stix-shifter use; x2 the size to ensure retrieval
_logger.debug(
f"profile-loaded retrieval_batch_size: {retrieval_batch_size}"
)
connection["options"]["result_limit"] = retrieval_batch_size * 2

single_batch_timeout = SINGLE_BATCH_TIMEOUT
if "single_batch_timeout" in connection["options"]:
# remove the non-stix-shifter field "single_batch_timeout" to avoid stix-shifter error
try:
single_batch_timeout = int(
connection["options"].pop("single_batch_timeout")
)
except:
raise InvalidDataSource(
profile_name,
"stixshifter",
f"invalid {profile_name} connection section: options.single_batch_timeout",
)
# rename this field for stix-shifter use
_logger.debug(
f"profile-loaded single_batch_timeout: {single_batch_timeout}"
)
connection["options"]["timeout"] = single_batch_timeout

return connector_name, connection, configuration, retrieval_batch_size


Expand Down
108 changes: 64 additions & 44 deletions src/kestrel_datasource_stixshifter/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
selfSignedCert: false # this means do NOT check cert
indices: host101
options: # use any of this section when needed
result_limit: 500000 # stix-shifter default: 10000
retrieval_batch_size: 10000 # safe to set to 10000 to match default Elasticsearch page size; Kestrel default: 2000
timeout: 300 # allow a query to run for 5 minutes (300 seconds) before timing out; stix-shifter default: 30
retrieval_batch_size: 10000 # set to 10000 to match default Elasticsearch page size; Kestrel default across connectors: 2000
single_batch_timeout: 120 # increase it if hit 60 seconds (Kestrel default) timeout error for each batch of retrieval
dialects: # more info: https://github.com/opencybersecurityalliance/stix-shifter/tree/develop/stix_shifter_modules/elastic_ecs#dialects
- beats # need it if the index is created by Filebeat/Winlogbeat/*beat
config:
Expand Down Expand Up @@ -103,7 +102,6 @@

import asyncio
import json
import time
import copy
import logging

Expand Down Expand Up @@ -209,7 +207,7 @@ async def query(uri, pattern, session_id, config, store=None):
)

translation_options = copy.deepcopy(connection_dict.get("options", {}))
dsl = translation.translate(
dsl = await translation.translate_async(
connector_name, "query", query_metadata, pattern, translation_options
)

Expand Down Expand Up @@ -259,30 +257,15 @@ async def query(uri, pattern, session_id, config, store=None):

batch_index = 0
for query in dsl["queries"]:
search_meta_result = transmission.query(query)
search_meta_result = await transmission.query_async(query)
if search_meta_result["success"]:
search_id = search_meta_result["search_id"]
if transmission.is_async():
time.sleep(1)
status = transmission.status(search_id)
if status["success"]:
while (
status["progress"] < 100
and status["status"] == "RUNNING"
):
status = transmission.status(search_id)
else:
stix_shifter_error_msg = (
status["error"]
if "error" in status
else "details not avaliable"
)
raise DataSourceError(
f"STIX-shifter transmission.status() failed with message: {stix_shifter_error_msg}"
)

await transmission_complete(transmission, search_id)

# run the producer and wait for completion
batch_index = await transmission_produce(
connector_name,
transmission_queue,
transmission,
search_id,
Expand Down Expand Up @@ -329,14 +312,37 @@ def ingest_stixbundle_filepath(batch_index):
return ingest_stixbundle_filepath


async def transmission_complete(transmission, search_id):
status = None
while True:
status = await transmission.status_async(search_id)
if not status["success"]:
stix_shifter_error_msg = (
status["error"] if "error" in status else "details not avaliable"
)
raise DataSourceError(
f"STIX-shifter transmission.status() failed with message: {stix_shifter_error_msg}"
)
elif status["progress"] < 100 and status["status"] == "RUNNING":
await asyncio.sleep(1)
else:
break


async def transmission_produce(
transmission_queue, transmission, search_id, retrieval_batch_size, batch_index
connector_name,
transmission_queue,
transmission,
search_id,
retrieval_batch_size,
batch_index,
):
result_retrieval_offset = 0
has_remaining_results = True
metadata = None
is_retry_cycle = False
while has_remaining_results:
result_batch = transmission.results(
result_batch = await transmission.results_async(
search_id, result_retrieval_offset, retrieval_batch_size, metadata
)
if result_batch["success"]:
Expand All @@ -350,15 +356,31 @@ async def transmission_produce(
has_remaining_results = False
if "metadata" in result_batch:
metadata = result_batch["metadata"]
is_retry_cycle = False
else:
stix_shifter_error_msg = (
result_batch["error"]
if "error" in result_batch
else "details not avaliable"
)
raise DataSourceError(
f"STIX-shifter transmission.results() failed with message: {stix_shifter_error_msg}"
)
if (
stix_shifter_error_msg.startswith(
f"{connector_name} connector error => server timeout_error"
)
and not is_retry_cycle
):
# mitigate https://github.com/opencybersecurityalliance/stix-shifter/issues/1493
# only give it one retry to mitigate high CPU occupation
# otherwise, it could be a real server connection issue
# /stix_shifter_utils/stix_transmission/utils/RestApiClientAsync.py
_logger.info(
f"busy CPU; hit stix-shifter transmission aiohttp connection timeout; retry"
)
is_retry_cycle = True
else:
raise DataSourceError(
f"STIX-shifter transmission.results() failed with message: {stix_shifter_error_msg}"
)
return batch_index


Expand All @@ -376,7 +398,7 @@ async def translation_ingest_consume(
# wait for an item from the producer
result_batch = await transmission_queue.get()

stixbundle = translation.translate(
stixbundle = await translation.translate_async(
connector_name,
"results",
query_metadata,
Expand Down Expand Up @@ -416,19 +438,17 @@ async def fast_translate_ingest_consume(
while True:
# wait for an item from the producer
result_batch = await transmission_queue.get()
try:
await fast_translate(
connector_name,
result_batch["data"],
translation,
translation_options,
identity,
query_id,
store,
)
finally:
# Notify the queue that the item has been processed
transmission_queue.task_done()
await fast_translate(
connector_name,
result_batch["data"],
translation,
translation_options,
identity,
query_id,
store,
)
# Notify the queue that the item has been processed
transmission_queue.task_done()


async def fast_translate(
Expand All @@ -443,7 +463,7 @@ async def fast_translate(
# Use the alternate, faster DataFrame-based translation (in firepit)
_logger.debug("Using fast translation for connector %s", connector_name)
transformers = get_module_transformers(connector_name)
mapping = translation.translate(
mapping = await translation.translate_async(
connector_name,
stix_translation.MAPPING,
None,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_stixshifter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def test_yaml_profiles_refresh(tmp_path):
indices: host101
options:
retrieval_batch_size: 10000
single_batch_timeout: 120
dialects:
- beats
config:
Expand Down Expand Up @@ -81,6 +82,8 @@ def test_yaml_profiles_refresh(tmp_path):
assert connector_name == "elastic_ecs"
assert configuration["auth"]["id"] == "profileA"
assert configuration["auth"]["api_key"] == "qwer"
assert connection["options"]["timeout"] == 60
assert connection["options"]["result_limit"] == 2000 * 2
assert retrieval_batch_size == 2000

with open(profile_file, "w") as pf:
Expand All @@ -94,4 +97,6 @@ def test_yaml_profiles_refresh(tmp_path):
assert connector_name == "elastic_ecs"
assert configuration["auth"]["id"] == "profileB"
assert configuration["auth"]["api_key"] == "asdf"
assert connection["options"]["timeout"] == 120
assert connection["options"]["result_limit"] == 10000 * 2
assert retrieval_batch_size == 10000

0 comments on commit 4ef70ac

Please sign in to comment.