Skip to content

Commit

Permalink
Merge pull request #108 from Mat0vu:fix_set_state_index
Browse files Browse the repository at this point in the history
ESQL: flatten list of indices before converting to string
  • Loading branch information
thomaspatzke authored Dec 17, 2024
2 parents 483dfbe + e8a2f77 commit ba295d7
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 117 deletions.
32 changes: 24 additions & 8 deletions sigma/backends/elasticsearch/elasticsearch_esql.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,31 @@ def __init__(
"CRITICAL": 99,
}

def flatten_list_of_indices(
self, nested_list: List[Union[str, List[str]]]
) -> List[str]:
flat_list = []
for item in nested_list:
if isinstance(item, list):
flat_list.extend(
self.flatten_list_of_indices(item)
) # Recursively flatten the sublist
else:
flat_list.append(item) # Append the string
return flat_list

def preprocess_indices(self, indices: List[str]) -> str:
if not indices:
return self.state_defaults["index"]

if self.wildcard_multi in indices:
return self.wildcard_multi

indices = self.flatten_list_of_indices(nested_list=indices)
if len(indices) == 1:
return indices[0]

# Deduplicate sources using a set
indices = list(set(indices))
indices = list(set(indices)) # Deduplicate

# Sort the indices to ensure a consistent order as sets are arbitrary ordered
indices.sort()
Expand All @@ -274,11 +287,15 @@ def finalize_query(
output_format: str,
) -> Union[str, DeferredQueryExpression]:
# If set, load the index from the processing state
index_state = state.processing_state.get("index", self.state_defaults["index"]) if isinstance(rule, SigmaRule) else [
index_state = (
state.processing_state.get("index", self.state_defaults["index"])
for rule_reference in rule.rules
for state in rule_reference.rule.get_conversion_states()
]
if isinstance(rule, SigmaRule)
else [
state.processing_state.get("index", self.state_defaults["index"])
for rule_reference in rule.rules
for state in rule_reference.rule.get_conversion_states()
]
)
# If the non-default index is not a string, preprocess it
if not isinstance(index_state, str):
index_state = self.preprocess_indices(index_state)
Expand Down Expand Up @@ -501,8 +518,7 @@ def finalize_query_siem_rule_ndjson(
),
"severity": (
"low"
if rule.level is None
or str(rule.level.name).lower() == "informational"
if rule.level is None or str(rule.level.name).lower() == "informational"
else str(rule.level.name).lower()
),
"note": "",
Expand Down
Loading

0 comments on commit ba295d7

Please sign in to comment.