diff --git a/sigma/backends/elasticsearch/elasticsearch_esql.py b/sigma/backends/elasticsearch/elasticsearch_esql.py index 66e2cff..a659831 100644 --- a/sigma/backends/elasticsearch/elasticsearch_esql.py +++ b/sigma/backends/elasticsearch/elasticsearch_esql.py @@ -247,6 +247,19 @@ def __init__( "CRITICAL": 99, } + def flatten_list_of_indices( + self, nested_list: List[Union[str, List[str]]] + ) -> List[str]: + flat_list = [] + for item in nested_list: + if isinstance(item, list): + flat_list.extend( + self.flatten_list_of_indices(item) + ) # Recursively flatten the sublist + else: + flat_list.append(item) # Append the string + return flat_list + def preprocess_indices(self, indices: List[str]) -> str: if not indices: return self.state_defaults["index"] @@ -254,11 +267,11 @@ def preprocess_indices(self, indices: List[str]) -> str: if self.wildcard_multi in indices: return self.wildcard_multi + indices = self.flatten_list_of_indices(nested_list=indices) if len(indices) == 1: return indices[0] - # Deduplicate sources using a set - indices = list(set(indices)) + indices = list(set(indices)) # Deduplicate # Sort the indices to ensure a consistent order as sets are arbitrary ordered indices.sort() @@ -274,11 +287,15 @@ def finalize_query( output_format: str, ) -> Union[str, DeferredQueryExpression]: # If set, load the index from the processing state - index_state = state.processing_state.get("index", self.state_defaults["index"]) if isinstance(rule, SigmaRule) else [ + index_state = ( state.processing_state.get("index", self.state_defaults["index"]) - for rule_reference in rule.rules - for state in rule_reference.rule.get_conversion_states() - ] + if isinstance(rule, SigmaRule) + else [ + state.processing_state.get("index", self.state_defaults["index"]) + for rule_reference in rule.rules + for state in rule_reference.rule.get_conversion_states() + ] + ) # If the non-default index is not a string, preprocess it if not isinstance(index_state, str): index_state = self.preprocess_indices(index_state) @@ -501,8 +518,7 @@ def finalize_query_siem_rule_ndjson( ), "severity": ( "low" - if rule.level is None - or str(rule.level.name).lower() == "informational" + if rule.level is None or str(rule.level.name).lower() == "informational" else str(rule.level.name).lower() ), "note": "", diff --git a/tests/test_backend_elasticsearch_esql.py b/tests/test_backend_elasticsearch_esql.py index d27a8f1..489d31e 100644 --- a/tests/test_backend_elasticsearch_esql.py +++ b/tests/test_backend_elasticsearch_esql.py @@ -3,6 +3,7 @@ from sigma.backends.elasticsearch.elasticsearch_esql import ESQLBackend from sigma.processing.pipeline import ProcessingPipeline + @pytest.fixture def esql_backend(): return ESQLBackend() @@ -26,7 +27,9 @@ def test_elasticsearch_esql_and_expression(esql_backend: ESQLBackend): """ ) ) - == ['from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"'] + == [ + 'from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + ] ) @@ -49,7 +52,9 @@ def test_elasticsearch_esql_or_expression(esql_backend: ESQLBackend): """ ) ) - == ['from * metadata _id, _index, _version | where fieldA=="valueA" or fieldB=="valueB"'] + == [ + 'from * metadata _id, _index, _version | where fieldA=="valueA" or fieldB=="valueB"' + ] ) @@ -128,7 +133,9 @@ def test_elasticsearch_esql_in_expression(esql_backend: ESQLBackend): """ ) ) - == ['from * metadata _id, _index, _version | where fieldA in ("valueA", "valueB", "valueC")'] + == [ + 'from * metadata _id, _index, _version | where fieldA in ("valueA", "valueB", "valueC")' + ] ) @@ -176,7 +183,9 @@ def test_elasticsearch_esql_regex_query(esql_backend: ESQLBackend): """ ) ) - == ['from * metadata _id, _index, _version | where fieldA rlike "foo.*bar" and fieldB=="foo"'] + == [ + 'from * metadata _id, _index, _version | where fieldA rlike "foo.*bar" and fieldB=="foo"' + ] ) @@ -197,7 +206,9 @@ def test_elasticsearch_esql_cidr_query(esql_backend: ESQLBackend): """ ) ) - == ['from * metadata _id, _index, _version | where cidr_match(field, "192.168.0.0/16")'] + == [ + 'from * metadata _id, _index, _version | where cidr_match(field, "192.168.0.0/16")' + ] ) @@ -221,6 +232,7 @@ def test_elasticsearch_esql_field_name_with_whitespace(esql_backend: ESQLBackend == ['from * metadata _id, _index, _version | where `field name`=="value"'] ) + def test_elasticsearch_esql_set_state_index_string(esql_backend: ESQLBackend): assert ( ESQLBackend( @@ -238,7 +250,8 @@ def test_elasticsearch_esql_set_state_index_string(esql_backend: ESQLBackend): category: test_category product: test_product """ - )).convert( + ) + ).convert( SigmaCollection.from_yaml( """ title: Test @@ -254,9 +267,12 @@ def test_elasticsearch_esql_set_state_index_string(esql_backend: ESQLBackend): """ ) ) - == ['from logs-test-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"'] + == [ + 'from logs-test-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + ] ) + def test_elasticsearch_esql_set_state_index_list(esql_backend: ESQLBackend): assert ( ESQLBackend( @@ -276,7 +292,8 @@ def test_elasticsearch_esql_set_state_index_list(esql_backend: ESQLBackend): category: test_category product: test_product """ - )).convert( + ) + ).convert( SigmaCollection.from_yaml( """ title: Test @@ -292,7 +309,82 @@ def test_elasticsearch_esql_set_state_index_list(esql_backend: ESQLBackend): """ ) ) - == ['from logs-test1-*,logs-test2-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"'] + == [ + 'from logs-test1-*,logs-test2-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + ] + ) + + +def test_elasticsearch_esql_set_state_index_list_correlation_rule( + esql_backend: ESQLBackend, +): + assert ( + ESQLBackend( + processing_pipeline=ProcessingPipeline.from_yaml( + """ + name: test-pipeline + priority: 30 + transformations: + - id: set_state_index_one_logsource + type: set_state + key: index + val: + - logs-test1-* + - logs-test2-* + rule_conditions: + - type: logsource + category: test_category + product: test_product + - id: set_state_index_other_logsource + type: set_state + key: index + val: "logs-test3-*" + rule_conditions: + - type: logsource + category: other_log_source + """ + ) + ).convert( + SigmaCollection.from_yaml( + """ +title: Correlation_Test_Multiple_Log_Sources +correlation: + type: value_count + rules: + - rule_for_one_log_source + - rule_for_other_source + group-by: + - field + timespan: 15m + condition: + field: User + gt: 5 +--- +title: Test_Rule_One +status: test +name: rule_for_one_log_source +logsource: + category: test_category + product: test_product +detection: + sel: + fieldA: valueA + condition: sel +--- +title: Test_Rule_Two +name: rule_for_other_source +logsource: + category: other_log_source +detection: + selection: + fieldB: valueB + condition: selection + """ + ) + ) + == [ + 'from logs-test1-*,logs-test2-*,logs-test3-* metadata _id, _index, _version | where (fieldA=="valueA") or (fieldB=="valueB")\n| eval timebucket=date_trunc(15minutes, @timestamp) | stats value_count=count_distinct(User) by timebucket, field\n| where value_count > 5' + ] ) @@ -314,7 +406,8 @@ def test_elasticsearch_esql_set_state_index_list_single(esql_backend: ESQLBacken category: test_category product: test_product """ - )).convert( + ) + ).convert( SigmaCollection.from_yaml( """ title: Test @@ -330,9 +423,12 @@ def test_elasticsearch_esql_set_state_index_list_single(esql_backend: ESQLBacken """ ) ) - == ['from logs-test-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"'] + == [ + 'from logs-test-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + ] ) + def test_elasticsearch_esql_set_state_index_list_deduplicate(esql_backend: ESQLBackend): assert ( ESQLBackend( @@ -352,7 +448,8 @@ def test_elasticsearch_esql_set_state_index_list_deduplicate(esql_backend: ESQLB category: test_category product: test_product """ - )).convert( + ) + ).convert( SigmaCollection.from_yaml( """ title: Test @@ -368,9 +465,12 @@ def test_elasticsearch_esql_set_state_index_list_deduplicate(esql_backend: ESQLB """ ) ) - == ['from logs-test-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"'] + == [ + 'from logs-test-* metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + ] ) + def test_elasticsearch_esql_set_state_index_list_wildcard(esql_backend: ESQLBackend): assert ( ESQLBackend( @@ -390,7 +490,8 @@ def test_elasticsearch_esql_set_state_index_list_wildcard(esql_backend: ESQLBack category: test_category product: test_product """ - )).convert( + ) + ).convert( SigmaCollection.from_yaml( """ title: Test @@ -406,9 +507,12 @@ def test_elasticsearch_esql_set_state_index_list_wildcard(esql_backend: ESQLBack """ ) ) - == ['from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"'] + == [ + 'from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + ] ) + def test_elasticsearch_esql_ndjson(esql_backend: ESQLBackend): """Test for NDJSON output with embedded query string query.""" rule = SigmaCollection.from_yaml( @@ -427,25 +531,25 @@ def test_elasticsearch_esql_ndjson(esql_backend: ESQLBackend): ) result = esql_backend.convert(rule, output_format="kibana_ndjson") assert result[0] == { - 'attributes': { - 'columns': [], - 'description': 'No description', - 'grid': {}, - 'hideChart': False, - 'isTextBasedQuery': True, - 'kibanaSavedObjectMeta': { - 'searchSourceJSON': '{"query": {"esql": "from * metadata _id, _index, _version | where fieldA==\\"valueA\\" and fieldB==\\"valueB\\""}, "index": {"title": "*", "timeFieldName": "@timestamp", "sourceFilters": [], "type": "esql", "fieldFormats": {}, "runtimeFieldMap": {}, "allowNoIndex": false, "name": "*", "allowHidden": false}, "filter": []}' + "attributes": { + "columns": [], + "description": "No description", + "grid": {}, + "hideChart": False, + "isTextBasedQuery": True, + "kibanaSavedObjectMeta": { + "searchSourceJSON": '{"query": {"esql": "from * metadata _id, _index, _version | where fieldA==\\"valueA\\" and fieldB==\\"valueB\\""}, "index": {"title": "*", "timeFieldName": "@timestamp", "sourceFilters": [], "type": "esql", "fieldFormats": {}, "runtimeFieldMap": {}, "allowNoIndex": false, "name": "*", "allowHidden": false}, "filter": []}' }, - 'sort': [['@timestamp', 'desc']], - 'timeRestore': False, - 'title': 'SIGMA - Test', - 'usesAdHocDataView': False + "sort": [["@timestamp", "desc"]], + "timeRestore": False, + "title": "SIGMA - Test", + "usesAdHocDataView": False, }, - 'id': 'None', - 'managed': False, - 'references': [], - 'type': 'search', - 'typeMigrationVersion': '10.2.0' + "id": "None", + "managed": False, + "references": [], + "type": "search", + "typeMigrationVersion": "10.2.0", } @@ -468,46 +572,42 @@ def test_elasticsearch_esql_siemrule(esql_backend: ESQLBackend): ) result = esql_backend.convert(rule, output_format="siem_rule") assert result[0] == { - 'name': 'SIGMA - Test', - 'tags': [], - 'enabled': True, - 'consumer': 'siem', - 'throttle': None, - 'schedule': { - 'interval': '5m' - }, - 'params': { - 'author': [], - 'description': 'No description', - 'ruleId': 'c277adc0-f0c4-42e1-af9d-fab062992156', - 'falsePositives': [], - 'from': 'now-5m', - 'immutable': False, - 'license': 'DRL', - 'outputIndex': '', - 'meta': { - 'from': '1m' - }, - 'maxSignals': 100, - 'relatedIntegrations': [], - 'requiredFields': [], - 'riskScore': 21, - 'riskScoreMapping': [], - 'setup': '', - 'severity': 'low', - 'severityMapping': [], - 'threat': [], - 'to': 'now', - 'references': [], - 'version': 1, - 'exceptionsList': [], - 'type': 'esql', - 'language': 'esql', - 'query': 'from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"' + "name": "SIGMA - Test", + "tags": [], + "enabled": True, + "consumer": "siem", + "throttle": None, + "schedule": {"interval": "5m"}, + "params": { + "author": [], + "description": "No description", + "ruleId": "c277adc0-f0c4-42e1-af9d-fab062992156", + "falsePositives": [], + "from": "now-5m", + "immutable": False, + "license": "DRL", + "outputIndex": "", + "meta": {"from": "1m"}, + "maxSignals": 100, + "relatedIntegrations": [], + "requiredFields": [], + "riskScore": 21, + "riskScoreMapping": [], + "setup": "", + "severity": "low", + "severityMapping": [], + "threat": [], + "to": "now", + "references": [], + "version": 1, + "exceptionsList": [], + "type": "esql", + "language": "esql", + "query": 'from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"', }, - 'rule_type_id': 'siem.esqlRule', - 'notify_when': 'onActiveAlert', - 'actions': [] + "rule_type_id": "siem.esqlRule", + "notify_when": "onActiveAlert", + "actions": [], } @@ -530,40 +630,38 @@ def test_elasticsearch_esql_siemrule_ndjson(esql_backend: ESQLBackend): ) result = esql_backend.convert(rule, output_format="siem_rule_ndjson") assert result[0] == { - 'id': 'c277adc0-f0c4-42e1-af9d-fab062992156', - 'name': 'SIGMA - Test', - 'tags': [], - 'interval': '5m', - 'enabled': True, - 'description': 'No description', - 'risk_score': 21, - 'severity': 'low', - 'note': '', - 'license': 'DRL', - 'output_index': '', - 'meta': { - 'from': '1m' - }, - 'author': [], - 'false_positives': [], - 'from': 'now-5m', - 'rule_id': 'c277adc0-f0c4-42e1-af9d-fab062992156', - 'max_signals': 100, - 'risk_score_mapping': [], - 'severity_mapping': [], - 'threat': [], - 'to': 'now', - 'references': [], - 'version': 1, - 'exceptions_list': [], - 'immutable': False, - 'related_integrations': [], - 'required_fields': [], - 'setup': '', - 'type': 'esql', - 'language': 'esql', - 'query': 'from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"', - 'actions': [] + "id": "c277adc0-f0c4-42e1-af9d-fab062992156", + "name": "SIGMA - Test", + "tags": [], + "interval": "5m", + "enabled": True, + "description": "No description", + "risk_score": 21, + "severity": "low", + "note": "", + "license": "DRL", + "output_index": "", + "meta": {"from": "1m"}, + "author": [], + "false_positives": [], + "from": "now-5m", + "rule_id": "c277adc0-f0c4-42e1-af9d-fab062992156", + "max_signals": 100, + "risk_score_mapping": [], + "severity_mapping": [], + "threat": [], + "to": "now", + "references": [], + "version": 1, + "exceptions_list": [], + "immutable": False, + "related_integrations": [], + "required_fields": [], + "setup": "", + "type": "esql", + "language": "esql", + "query": 'from * metadata _id, _index, _version | where fieldA=="valueA" and fieldB=="valueB"', + "actions": [], } @@ -593,7 +691,12 @@ def test_elasticsearch_esql_siemrule_ndjson_with_threat(esql_backend: ESQLBacken assert result[0] == { "id": "c277adc0-f0c4-42e1-af9d-fab062992156", "name": "SIGMA - Test", - "tags": ["attack-execution", "attack-t1059.001", "attack-defense_evasion", "attack-t1027"], + "tags": [ + "attack-execution", + "attack-t1059.001", + "attack-defense_evasion", + "attack-t1027", + ], "interval": "5m", "enabled": True, "description": "No description", @@ -602,9 +705,7 @@ def test_elasticsearch_esql_siemrule_ndjson_with_threat(esql_backend: ESQLBacken "note": "", "license": "DRL", "output_index": "", - "meta": { - "from": "1m" - }, + "meta": {"from": "1m"}, "author": [], "false_positives": [], "from": "now-5m",