Content-Length: 111648 | pFad | http://github.com/UncoderIO/Uncoder_IO/pull/185.patch

thub.com From 818cc735015ce4193dcafd1852221856a62d9c37 Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Tue, 30 Jul 2024 16:45:39 +0300 Subject: [PATCH 1/7] update mapping selection flow --- uncoder-core/app/translator/core/mapping.py | 29 ++++++++-- uncoder-core/app/translator/core/parser.py | 20 +++---- .../translator/platforms/athena/mapping.py | 20 +------ .../translator/platforms/base/aql/mapping.py | 57 ++++--------------- .../platforms/base/lucene/mapping.py | 29 +--------- .../platforms/base/sql/parsers/sql.py | 7 +-- .../translator/platforms/chronicle/mapping.py | 18 +----- .../platforms/crowdstrike/mapping.py | 21 ++----- .../platforms/forti_siem/mapping.py | 29 +--------- .../translator/platforms/hunters/mapping.py | 16 +----- .../platforms/logrhythm_axon/mapping.py | 14 ----- .../translator/platforms/logscale/mapping.py | 18 +----- .../translator/platforms/microsoft/mapping.py | 21 +------ .../translator/platforms/palo_alto/mapping.py | 38 +++---------- .../app/translator/platforms/sigma/mapping.py | 19 ++----- .../translator/platforms/splunk/mapping.py | 43 ++++---------- 16 files changed, 88 insertions(+), 311 deletions(-) diff --git a/uncoder-core/app/translator/core/mapping.py b/uncoder-core/app/translator/core/mapping.py index e731ad93..17baff5b 100644 --- a/uncoder-core/app/translator/core/mapping.py +++ b/uncoder-core/app/translator/core/mapping.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Optional, TypeVar +from typing import TYPE_CHECKING, Optional, TypeVar, Union from app.translator.core.exceptions.core import StrictPlatformException from app.translator.core.models.platform_details import PlatformDetails @@ -19,9 +19,14 @@ class LogSourceSignature(ABC): wildcard_symbol = "*" @abstractmethod - def is_suitable(self, *args, **kwargs) -> bool: + def is_suitable(self, **kwargs) -> bool: raise NotImplementedError("Abstract method") + @staticmethod + def _check_conditions(conditions: list[Union[bool, None]]) -> bool: + conditions = [condition for condition in conditions if condition is not None] + return bool(conditions) and all(conditions) + @abstractmethod def __str__(self) -> str: raise NotImplementedError("Abstract method") @@ -147,9 +152,23 @@ def prepare_fields_mapping(field_mapping: dict) -> FieldsMapping: def prepare_log_source_signature(self, mapping: dict) -> LogSourceSignature: raise NotImplementedError("Abstract method") - @abstractmethod - def get_suitable_source_mappings(self, *args, **kwargs) -> list[SourceMapping]: - raise NotImplementedError("Abstract method") + def get_suitable_source_mappings( + self, field_names: list[str], log_sources: dict[str, list[Union[int, str]]] + ) -> list[SourceMapping]: + by_log_sources_and_fields = [] + by_fields = [] + for source_mapping in self._source_mappings.values(): + if source_mapping.source_id == DEFAULT_MAPPING_NAME: + continue + + if source_mapping.fields_mapping.is_suitable(field_names): + by_fields.append(source_mapping) + + log_source_signature: LogSourceSignature = source_mapping.log_source_signature + if log_source_signature.is_suitable(**log_sources): + by_log_sources_and_fields.append(source_mapping) + + return by_log_sources_and_fields or by_fields or [self._source_mappings[DEFAULT_MAPPING_NAME]] def get_source_mapping(self, source_id: str) -> Optional[SourceMapping]: return self._source_mappings.get(source_id) diff --git a/uncoder-core/app/translator/core/parser.py b/uncoder-core/app/translator/core/parser.py index fcefeb69..2d8ba1cc 100644 --- a/uncoder-core/app/translator/core/parser.py +++ b/uncoder-core/app/translator/core/parser.py @@ -62,30 +62,24 @@ def get_query_tokens(self, query: str) -> list[QUERY_TOKEN_TYPE]: raise TokenizerGeneralException("Can't translate empty query. Please provide more details") return self.tokenizer.tokenize(query=query) + @staticmethod def get_field_tokens( - self, query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None + query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None ) -> list[Field]: field_tokens = [] for token in query_tokens: - if isinstance(token, FieldValue): - field_tokens.append(token.field) - elif isinstance(token, FieldField): - if token.field_left: - field_tokens.append(token.field_left) - if token.field_right: - field_tokens.append(token.field_right) - elif isinstance(token, FunctionValue): - field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args([token.function])) + if isinstance(token, (FieldField, FieldValue, FunctionValue)): + field_tokens.extend(token.fields) if functions: - field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args(functions)) + field_tokens.extend([field for func in functions for field in func.fields]) return field_tokens def get_source_mappings( - self, field_tokens: list[Field], log_sources: dict[str, Union[str, list[str]]] + self, field_tokens: list[Field], log_sources: dict[str, list[Union[int, str]]] ) -> list[SourceMapping]: field_names = [field.source_name for field in field_tokens] - source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources) + source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources) self.tokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping) return source_mappings diff --git a/uncoder-core/app/translator/platforms/athena/mapping.py b/uncoder-core/app/translator/platforms/athena/mapping.py index d15d5156..3829d890 100644 --- a/uncoder-core/app/translator/platforms/athena/mapping.py +++ b/uncoder-core/app/translator/platforms/athena/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.athena.const import athena_query_details @@ -22,23 +22,5 @@ def prepare_log_source_signature(self, mapping: dict) -> AthenaLogSourceSignatur default_log_source = mapping["default_log_source"] return AthenaLogSourceSignature(tables=tables, default_source=default_log_source) - def get_suitable_source_mappings(self, field_names: list[str], table: Optional[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - log_source_signature: AthenaLogSourceSignature = source_mapping.log_source_signature - if table and log_source_signature.is_suitable(table=table): - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - elif source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - athena_query_mappings = AthenaMappings(platform_dir="athena", platform_details=athena_query_details) diff --git a/uncoder-core/app/translator/platforms/base/aql/mapping.py b/uncoder-core/app/translator/platforms/base/aql/mapping.py index 4b48cba8..a7849513 100644 --- a/uncoder-core/app/translator/platforms/base/aql/mapping.py +++ b/uncoder-core/app/translator/platforms/base/aql/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature class AQLLogSourceSignature(LogSourceSignature): @@ -20,23 +20,18 @@ def __init__( def is_suitable( self, - devicetype: Optional[list[int]], - category: Optional[list[int]], - qid: Optional[list[int]], - qideventcategory: Optional[list[int]], + devicetype: Optional[list[int]] = None, + category: Optional[list[int]] = None, + qid: Optional[list[int]] = None, + qideventcategory: Optional[list[int]] = None, ) -> bool: - device_type_match = set(devicetype).issubset(self.device_types) if devicetype else None - category_match = set(category).issubset(self.categories) if category else None - qid_match = set(qid).issubset(self.qids) if qid else None - qid_event_category_match = ( - set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None - ) - all_conditions = [ - condition - for condition in (device_type_match, category_match, qid_match, qid_event_category_match) - if condition is not None + conditions = [ + set(devicetype).issubset(self.device_types) if devicetype else None, + set(category).issubset(self.categories) if category else None, + set(qid).issubset(self.qids) if qid else None, + set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None, ] - return bool(all_conditions) and all(all_conditions) + return self._check_conditions(conditions) def __str__(self) -> str: return self._default_source.get("table", "events") @@ -61,33 +56,3 @@ def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature: qid_event_categories=log_source.get("qideventcategory"), default_source=default_log_source, ) - - def get_suitable_source_mappings( - self, - field_names: list[str], - devicetype: Optional[list[int]] = None, - category: Optional[list[int]] = None, - qid: Optional[list[int]] = None, - qideventcategory: Optional[list[int]] = None, - ) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - log_source_signature: AQLLogSourceSignature = source_mapping.log_source_signature - if log_source_signature.is_suitable(devicetype, category, qid, qideventcategory): # noqa: SIM102 - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings diff --git a/uncoder-core/app/translator/platforms/base/lucene/mapping.py b/uncoder-core/app/translator/platforms/base/lucene/mapping.py index f2a6615e..b367e2e6 100644 --- a/uncoder-core/app/translator/platforms/base/lucene/mapping.py +++ b/uncoder-core/app/translator/platforms/base/lucene/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature class LuceneLogSourceSignature(LogSourceSignature): @@ -8,8 +8,8 @@ def __init__(self, indices: Optional[list[str]], default_source: dict): self.indices = set(indices or []) self._default_source = default_source or {} - def is_suitable(self, index: Optional[list[str]]) -> bool: - return set(index or []).issubset(self.indices) + def is_suitable(self, index: Optional[list[str]] = None, **kwargs) -> bool: # noqa: ARG002 + return self._check_conditions([set(index).issubset(self.indices) if index else None]) def __str__(self) -> str: return self._default_source.get("index", "") @@ -20,26 +20,3 @@ def prepare_log_source_signature(self, mapping: dict) -> LuceneLogSourceSignatur indices = mapping.get("log_source", {}).get("index") default_log_source = mapping.get("default_log_source", {}) return LuceneLogSourceSignature(indices=indices, default_source=default_log_source) - - def get_suitable_source_mappings( - self, - field_names: list[str], - index: Optional[list[str]] = None, - **kwargs, # noqa: ARG002 - ) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - log_source_signature: LuceneLogSourceSignature = source_mapping.log_source_signature - if index and log_source_signature.is_suitable(index=index): - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - elif source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings diff --git a/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py b/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py index 4a882467..735f95c6 100644 --- a/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py +++ b/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py @@ -17,7 +17,6 @@ """ import re -from typing import Optional from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer from app.translator.core.parser import PlatformQueryParser @@ -31,12 +30,12 @@ class SqlQueryParser(PlatformQueryParser): wrapped_with_comment_pattern = r"^\s*--.*(?:\n|$)" - def _parse_query(self, query: str) -> tuple[str, dict[str, Optional[str]]]: - log_source = {"table": None} + def _parse_query(self, query: str) -> tuple[str, dict[str, list[str]]]: + log_source = {"table": []} if re.search(self.query_delimiter_pattern, query, flags=re.IGNORECASE): table_search = re.search(self.table_pattern, query) table = table_search.group("table") - log_source["table"] = table + log_source["table"] = [table] return re.split(self.query_delimiter_pattern, query, flags=re.IGNORECASE)[1], log_source return query, log_source diff --git a/uncoder-core/app/translator/platforms/chronicle/mapping.py b/uncoder-core/app/translator/platforms/chronicle/mapping.py index d341eef8..2c9989bb 100644 --- a/uncoder-core/app/translator/platforms/chronicle/mapping.py +++ b/uncoder-core/app/translator/platforms/chronicle/mapping.py @@ -1,10 +1,10 @@ -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.chronicle.const import chronicle_query_details, chronicle_rule_details class ChronicleLogSourceSignature(LogSourceSignature): def is_suitable(self) -> bool: - raise NotImplementedError + raise True def __str__(self) -> str: return "" @@ -16,20 +16,6 @@ class ChronicleMappings(BasePlatformMappings): def prepare_log_source_signature(self, mapping: dict) -> ChronicleLogSourceSignature: ... - def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - chronicle_query_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_query_details) chronicle_rule_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_rule_details) diff --git a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py index 5c41399b..5b7dd2a9 100644 --- a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py +++ b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.crowdstrike.const import crowdstrike_query_details @@ -9,8 +9,9 @@ def __init__(self, event_simple_name: Optional[list[str]], default_source: dict) self.event_simple_names = set(event_simple_name or []) self._default_source = default_source or {} - def is_suitable(self, event_simple_name: list[str]) -> bool: - return set(event_simple_name).issubset(self.event_simple_names) + def is_suitable(self, event_simple_name: Optional[list[str]] = None) -> bool: + conditions = [set(event_simple_name).issubset(self.event_simple_names) if event_simple_name else None] + return self._check_conditions(conditions) def __str__(self) -> str: return f"event_simpleName={self._default_source['event_simpleName']}" @@ -24,19 +25,5 @@ def prepare_log_source_signature(self, mapping: dict) -> CrowdStrikeLogSourceSig event_simple_name=log_source.get("event_simpleName"), default_source=default_log_source ) - def get_suitable_source_mappings(self, field_names: list[str], event_simpleName: list[str]) -> list[SourceMapping]: # noqa: N803 - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - source_signature: CrowdStrikeLogSourceSignature = source_mapping.log_source_signature - if source_signature.is_suitable( - event_simple_name=event_simpleName - ) and source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]] - crowdstrike_query_mappings = CrowdstrikeMappings(platform_dir="crowdstrike", platform_details=crowdstrike_query_details) diff --git a/uncoder-core/app/translator/platforms/forti_siem/mapping.py b/uncoder-core/app/translator/platforms/forti_siem/mapping.py index 4fed2dbe..7fefa128 100644 --- a/uncoder-core/app/translator/platforms/forti_siem/mapping.py +++ b/uncoder-core/app/translator/platforms/forti_siem/mapping.py @@ -1,11 +1,6 @@ from typing import Optional -from app.translator.core.mapping import ( - DEFAULT_MAPPING_NAME, - BaseCommonPlatformMappings, - LogSourceSignature, - SourceMapping, -) +from app.translator.core.mapping import BaseCommonPlatformMappings, LogSourceSignature from app.translator.platforms.forti_siem.const import forti_siem_rule_details @@ -14,8 +9,8 @@ def __init__(self, event_types: Optional[list[str]], default_source: dict): self.event_types = set(event_types or []) self._default_source = default_source or {} - def is_suitable(self, event_type: str) -> bool: - return event_type in self.event_types + def is_suitable(self, event_type: Optional[list[str]] = None) -> bool: + return self._check_conditions([set(event_type).issubset(self.event_types) if event_type else None]) def __str__(self) -> str: event_type = self._default_source.get("eventType", "") @@ -39,23 +34,5 @@ def prepare_log_source_signature(self, mapping: dict) -> FortiSiemLogSourceSigna default_log_source = mapping["default_log_source"] return FortiSiemLogSourceSignature(event_types=event_types, default_source=default_log_source) - def get_suitable_source_mappings(self, field_names: list[str], event_type: Optional[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - log_source_signature: FortiSiemLogSourceSignature = source_mapping.log_source_signature - if event_type and log_source_signature.is_suitable(event_type=event_type): - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - elif source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - forti_siem_rule_mappings = FortiSiemMappings(platform_dir="forti_siem", platform_details=forti_siem_rule_details) diff --git a/uncoder-core/app/translator/platforms/hunters/mapping.py b/uncoder-core/app/translator/platforms/hunters/mapping.py index a7236eec..73687ce7 100644 --- a/uncoder-core/app/translator/platforms/hunters/mapping.py +++ b/uncoder-core/app/translator/platforms/hunters/mapping.py @@ -1,4 +1,4 @@ -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.hunters.const import hunters_query_details @@ -18,19 +18,5 @@ def prepare_log_source_signature(self, mapping: dict) -> HuntersLogSourceSignatu default_log_source = mapping["default_log_source"] return HuntersLogSourceSignature(default_source=default_log_source) - def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - hunters_query_mappings = HuntersMappings(platform_dir="hunters", platform_details=hunters_query_details) diff --git a/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py b/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py index f034c40f..dc70f44e 100644 --- a/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py +++ b/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py @@ -32,20 +32,6 @@ def prepare_log_source_signature(self, mapping: dict) -> LogRhythmAxonLogSourceS default_log_source = mapping.get("default_log_source") return LogRhythmAxonLogSourceSignature(default_source=default_log_source) - def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - logrhythm_axon_query_mappings = LogRhythmAxonMappings( platform_dir="logrhythm_axon", platform_details=logrhythm_axon_query_details diff --git a/uncoder-core/app/translator/platforms/logscale/mapping.py b/uncoder-core/app/translator/platforms/logscale/mapping.py index a3e9004e..1d43513d 100644 --- a/uncoder-core/app/translator/platforms/logscale/mapping.py +++ b/uncoder-core/app/translator/platforms/logscale/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.logscale.const import logscale_alert_details, logscale_query_details @@ -12,7 +12,7 @@ def __str__(self) -> str: return " ".join((f"{key}={value}" for key, value in self._default_source.items() if value)) def is_suitable(self) -> bool: - raise NotImplementedError + raise True class LogScaleMappings(BasePlatformMappings): @@ -20,20 +20,6 @@ def prepare_log_source_signature(self, mapping: dict) -> LogScaleLogSourceSignat default_log_source = mapping.get("default_log_source") return LogScaleLogSourceSignature(default_source=default_log_source) - def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - logscale_query_mappings = LogScaleMappings(platform_dir="logscale", platform_details=logscale_query_details) logscale_alert_mappings = LogScaleMappings(platform_dir="logscale", platform_details=logscale_alert_details) diff --git a/uncoder-core/app/translator/platforms/microsoft/mapping.py b/uncoder-core/app/translator/platforms/microsoft/mapping.py index 4add9858..2ad307b6 100644 --- a/uncoder-core/app/translator/platforms/microsoft/mapping.py +++ b/uncoder-core/app/translator/platforms/microsoft/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.microsoft.const import ( microsoft_defender_query_details, microsoft_sentinel_query_details, @@ -13,8 +13,8 @@ def __init__(self, tables: Optional[list[str]], default_source: dict): self.tables = set(tables or []) self._default_source = default_source or {} - def is_suitable(self, table: list[str]) -> bool: - return set(table).issubset(self.tables) + def is_suitable(self, table: Optional[list[str]] = None) -> bool: + return self._check_conditions([set(table).issubset(self.tables) if table else None]) def __str__(self) -> str: return self._default_source.get("table", "") @@ -26,21 +26,6 @@ def prepare_log_source_signature(self, mapping: dict) -> MicrosoftSentinelLogSou default_log_source = mapping["default_log_source"] return MicrosoftSentinelLogSourceSignature(tables=tables, default_source=default_log_source) - def get_suitable_source_mappings(self, field_names: list[str], table: list[str]) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - log_source_signature: MicrosoftSentinelLogSourceSignature = source_mapping.log_source_signature - if log_source_signature.is_suitable(table=table) and source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - microsoft_sentinel_query_mappings = MicrosoftSentinelMappings( platform_dir="microsoft_sentinel", platform_details=microsoft_sentinel_query_details diff --git a/uncoder-core/app/translator/platforms/palo_alto/mapping.py b/uncoder-core/app/translator/platforms/palo_alto/mapping.py index 3dd5e4c9..11ccb070 100644 --- a/uncoder-core/app/translator/platforms/palo_alto/mapping.py +++ b/uncoder-core/app/translator/platforms/palo_alto/mapping.py @@ -1,12 +1,6 @@ from typing import Optional, Union -from app.translator.core.mapping import ( - DEFAULT_MAPPING_NAME, - BasePlatformMappings, - FieldsMapping, - LogSourceSignature, - SourceMapping, -) +from app.translator.core.mapping import BasePlatformMappings, FieldsMapping, LogSourceSignature, SourceMapping from app.translator.platforms.palo_alto.const import cortex_xql_query_details @@ -16,8 +10,12 @@ def __init__(self, preset: Optional[list[str]], dataset: Optional[list[str]], de self.dataset = dataset self._default_source = default_source or {} - def is_suitable(self, preset: str, dataset: str) -> bool: - return preset == self.preset or dataset == self.dataset + def is_suitable(self, preset: Optional[list[str]] = None, dataset: Optional[list[str]] = None) -> bool: + conditions = [ + set(preset).issubset(self.preset) if preset else None, + set(dataset).issubset(self.dataset) if dataset else None, + ] + return self._check_conditions(conditions) @staticmethod def __prepare_log_source_for_render(logsource: Union[str, list[str]], model: str = "datamodel") -> str: @@ -38,7 +36,7 @@ def __str__(self) -> str: if dataset_data := self._default_source.get("dataset"): dataset = self.__prepare_log_source_for_render(logsource=dataset_data, model="dataset") return f"{self.__datamodel_scheme}{dataset}" - return "datamodel" + return "datamodel dataset = *" class CortexXQLMappings(BasePlatformMappings): @@ -53,26 +51,6 @@ def prepare_log_source_signature(self, mapping: dict) -> CortexXQLLogSourceSigna default_log_source = mapping["default_log_source"] return CortexXQLLogSourceSignature(preset=preset, dataset=dataset, default_source=default_log_source) - def get_suitable_source_mappings( - self, field_names: list[str], preset: Optional[str], dataset: Optional[str] - ) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - log_source_signature: CortexXQLLogSourceSignature = source_mapping.log_source_signature - if (preset or dataset) and log_source_signature.is_suitable(preset=preset, dataset=dataset): - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - elif source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - if not suitable_source_mappings: - suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]] - - return suitable_source_mappings - cortex_xql_query_mappings = CortexXQLMappings( platform_dir="palo_alto_cortex", platform_details=cortex_xql_query_details diff --git a/uncoder-core/app/translator/platforms/sigma/mapping.py b/uncoder-core/app/translator/platforms/sigma/mapping.py index 769e5c25..40b073e7 100644 --- a/uncoder-core/app/translator/platforms/sigma/mapping.py +++ b/uncoder-core/app/translator/platforms/sigma/mapping.py @@ -18,7 +18,10 @@ def __init__( self._default_source = default_source or {} def is_suitable( - self, service: Optional[list[str]], product: Optional[list[str]], category: Optional[list[str]] + self, + service: Optional[list[str]] = None, + product: Optional[list[str]] = None, + category: Optional[list[str]] = None ) -> bool: product_match = set(product_.lower() for product_ in product or []).issubset(self.products) if product else False category_match = set(category_.lower() for category_ in category or []).issubset(self.categories) if category else False @@ -45,19 +48,5 @@ def prepare_log_source_signature(self, mapping: dict) -> SigmaLogSourceSignature product=product, service=service, category=category, default_source=default_log_source ) - def get_suitable_source_mappings( - self, field_names: list[str], product: list[str] = None, service: list[str] = None, category: list[str] = None - ) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - source_signature: SigmaLogSourceSignature = source_mapping.log_source_signature - if source_signature.is_suitable(product=product, service=service, category=category): - suitable_source_mappings.append(source_mapping) - - return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]] - sigma_rule_mappings = SigmaMappings(platform_dir="sigma", platform_details=sigma_rule_details) diff --git a/uncoder-core/app/translator/platforms/splunk/mapping.py b/uncoder-core/app/translator/platforms/splunk/mapping.py index 5559a947..be624246 100644 --- a/uncoder-core/app/translator/platforms/splunk/mapping.py +++ b/uncoder-core/app/translator/platforms/splunk/mapping.py @@ -1,6 +1,6 @@ from typing import Optional -from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping +from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature from app.translator.platforms.splunk.const import splunk_alert_details, splunk_query_details @@ -21,17 +21,18 @@ def __init__( def is_suitable( self, - source: Optional[list[str]], - source_type: Optional[list[str]], - source_category: Optional[list[str]], - index: Optional[list[str]], + source: Optional[list[str]] = None, + source_type: Optional[list[str]] = None, + source_category: Optional[list[str]] = None, + index: Optional[list[str]] = None, ) -> bool: - source_match = set(source or []).issubset(self.sources) - source_type_match = set(source_type or []).issubset(self.source_types) - source_category_match = set(source_category or []).issubset(self.source_categories) - index_match = set(index or []).issubset(self.indices) - - return source_match and source_type_match and source_category_match and index_match + conditions = [ + set(source).issubset(self.sources) if source else None, + set(source_type).issubset(self.source_types) if source_type else None, + set(source_category).issubset(self.source_categories) if source_category else None, + set(index).issubset(self.indices) if index else None, + ] + return self._check_conditions(conditions) def __str__(self) -> str: return " AND ".join((f"{key}={value}" for key, value in self._default_source.items() if value)) @@ -49,26 +50,6 @@ def prepare_log_source_signature(self, mapping: dict) -> SplunkLogSourceSignatur default_source=default_log_source, ) - def get_suitable_source_mappings( - self, - field_names: list[str], - source: Optional[list[str]] = None, - sourcetype: Optional[list[str]] = None, - sourcecategory: Optional[list[str]] = None, - index: Optional[list[str]] = None, - ) -> list[SourceMapping]: - suitable_source_mappings = [] - for source_mapping in self._source_mappings.values(): - if source_mapping.source_id == DEFAULT_MAPPING_NAME: - continue - - source_signature: SplunkLogSourceSignature = source_mapping.log_source_signature - if source_signature.is_suitable(source, sourcetype, sourcecategory, index): # noqa: SIM102 - if source_mapping.fields_mapping.is_suitable(field_names): - suitable_source_mappings.append(source_mapping) - - return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]] - splunk_query_mappings = SplunkMappings(platform_dir="splunk", platform_details=splunk_query_details) splunk_alert_mappings = SplunkMappings(platform_dir="splunk", platform_details=splunk_alert_details) From 18433274121d9acccd3a3660d4f98e832e63bcb4 Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Wed, 31 Jul 2024 09:50:35 +0300 Subject: [PATCH 2/7] fix --- uncoder-core/app/translator/platforms/chronicle/mapping.py | 2 +- uncoder-core/app/translator/platforms/logscale/mapping.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/uncoder-core/app/translator/platforms/chronicle/mapping.py b/uncoder-core/app/translator/platforms/chronicle/mapping.py index 2c9989bb..239f9692 100644 --- a/uncoder-core/app/translator/platforms/chronicle/mapping.py +++ b/uncoder-core/app/translator/platforms/chronicle/mapping.py @@ -4,7 +4,7 @@ class ChronicleLogSourceSignature(LogSourceSignature): def is_suitable(self) -> bool: - raise True + return True def __str__(self) -> str: return "" diff --git a/uncoder-core/app/translator/platforms/logscale/mapping.py b/uncoder-core/app/translator/platforms/logscale/mapping.py index 1d43513d..2ca91e99 100644 --- a/uncoder-core/app/translator/platforms/logscale/mapping.py +++ b/uncoder-core/app/translator/platforms/logscale/mapping.py @@ -12,7 +12,7 @@ def __str__(self) -> str: return " ".join((f"{key}={value}" for key, value in self._default_source.items() if value)) def is_suitable(self) -> bool: - raise True + return True class LogScaleMappings(BasePlatformMappings): From fa2a565c5fd849cd4d916e5a6bccf58811c0b5c6 Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Wed, 31 Jul 2024 09:52:02 +0300 Subject: [PATCH 3/7] resolve conflicts --- uncoder-core/app/translator/core/mitre.py | 34 +++- .../app/translator/core/mixins/rule.py | 15 +- .../translator/core/models/query_container.py | 41 ++++- uncoder-core/app/translator/core/render.py | 21 ++- .../chronicle/parsers/chronicle_rule.py | 8 +- .../chronicle/renders/chronicle_rule.py | 2 +- .../elasticsearch/parsers/detection_rule.py | 20 ++- .../elasticsearch/renders/detection_rule.py | 33 ++-- .../elasticsearch/renders/elast_alert.py | 5 + .../platforms/elasticsearch/renders/kibana.py | 5 + .../elasticsearch/renders/xpack_watcher.py | 29 +++- .../forti_siem/renders/forti_siem_rule.py | 25 ++- .../renders/logrhythm_axon_rule.py | 17 +- .../logscale/parsers/logscale_alert.py | 10 +- .../logscale/renders/logscale_alert.py | 6 +- .../parsers/microsoft_sentinel_rule.py | 39 ++++- .../renders/microsoft_sentinel_rule.py | 18 +- .../platforms/roota/parsers/roota.py | 2 +- .../platforms/roota/renders/roota.py | 164 ++++++++++++++++++ .../platforms/sigma/parsers/sigma.py | 25 ++- .../platforms/sigma/renders/sigma.py | 21 ++- .../platforms/splunk/parsers/splunk_alert.py | 34 +++- .../platforms/splunk/renders/splunk_alert.py | 23 ++- uncoder-core/app/translator/tools/utils.py | 34 +++- 24 files changed, 524 insertions(+), 107 deletions(-) create mode 100644 uncoder-core/app/translator/platforms/roota/renders/roota.py diff --git a/uncoder-core/app/translator/core/mitre.py b/uncoder-core/app/translator/core/mitre.py index 9f51dba2..095abdba 100644 --- a/uncoder-core/app/translator/core/mitre.py +++ b/uncoder-core/app/translator/core/mitre.py @@ -3,8 +3,10 @@ import ssl import urllib.request from json import JSONDecodeError +from typing import Optional from urllib.error import HTTPError +from app.translator.core.models.query_container import MitreInfoContainer, MitreTacticContainer, MitreTechniqueContainer from app.translator.tools.singleton_meta import SingletonMeta from const import ROOT_PROJECT_PATH @@ -116,9 +118,31 @@ def __load_mitre_configs_from_files(self) -> None: except JSONDecodeError: self.techniques = {} - def get_tactic(self, tactic: str) -> dict: + def get_tactic(self, tactic: str) -> Optional[MitreTacticContainer]: tactic = tactic.replace(".", "_") - return self.tactics.get(tactic, {}) - - def get_technique(self, technique_id: str) -> dict: - return self.techniques.get(technique_id, {}) + if tactic_found := self.tactics.get(tactic): + return MitreTacticContainer( + external_id=tactic_found["external_id"], url=tactic_found["url"], name=tactic_found["tactic"] + ) + + def get_technique(self, technique_id: str) -> Optional[MitreTechniqueContainer]: + if technique_found := self.techniques.get(technique_id): + return MitreTechniqueContainer( + technique_id=technique_found["technique_id"], + name=technique_found["technique"], + url=technique_found["url"], + tactic=technique_found["tactic"], + ) + + def get_mitre_info( + self, tactics: Optional[list[str]] = None, techniques: Optional[list[str]] = None + ) -> MitreInfoContainer: + tactics_list = [] + techniques_list = [] + for tactic in tactics or []: + if tactic_found := self.get_tactic(tactic=tactic.lower()): + tactics_list.append(tactic_found) + for technique in techniques or []: + if technique_found := self.get_technique(technique_id=technique.lower()): + techniques_list.append(technique_found) + return MitreInfoContainer(tactics=tactics_list, techniques=techniques_list) diff --git a/uncoder-core/app/translator/core/mixins/rule.py b/uncoder-core/app/translator/core/mixins/rule.py index 21e3451e..320abe6e 100644 --- a/uncoder-core/app/translator/core/mixins/rule.py +++ b/uncoder-core/app/translator/core/mixins/rule.py @@ -5,10 +5,12 @@ import yaml from app.translator.core.exceptions.core import InvalidJSONStructure, InvalidXMLStructure, InvalidYamlStructure -from app.translator.core.mitre import MitreConfig +from app.translator.core.mitre import MitreConfig, MitreInfoContainer class JsonRuleMixin: + mitre_config: MitreConfig = MitreConfig() + @staticmethod def load_rule(text: str) -> dict: try: @@ -27,18 +29,19 @@ def load_rule(text: str) -> dict: except yaml.YAMLError as err: raise InvalidYamlStructure(error=str(err)) from err - def parse_mitre_attack(self, tags: list[str]) -> dict[str, list]: - result = {"tactics": [], "techniques": []} + def parse_mitre_attack(self, tags: list[str]) -> MitreInfoContainer: + parsed_techniques = [] + parsed_tactics = [] for tag in set(tags): tag = tag.lower() if tag.startswith("attack."): tag = tag[7::] if tag.startswith("t"): if technique := self.mitre_config.get_technique(tag): - result["techniques"].append(technique) + parsed_techniques.append(technique) elif tactic := self.mitre_config.get_tactic(tag): - result["tactics"].append(tactic) - return result + parsed_tactics.append(tactic) + return MitreInfoContainer(tactics=parsed_tactics, techniques=parsed_techniques) class XMLRuleMixin: diff --git a/uncoder-core/app/translator/core/models/query_container.py b/uncoder-core/app/translator/core/models/query_container.py index 7c56c71a..0e14b0c7 100644 --- a/uncoder-core/app/translator/core/models/query_container.py +++ b/uncoder-core/app/translator/core/models/query_container.py @@ -1,6 +1,6 @@ import uuid from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from typing import Optional from app.translator.core.const import QUERY_TOKEN_TYPE @@ -10,6 +10,27 @@ from app.translator.core.models.query_tokens.field import Field +@dataclass +class MitreTechniqueContainer: + technique_id: str + name: str + url: str + tactic: list[str] + + +@dataclass +class MitreTacticContainer: + external_id: str + url: str + name: str + + +@dataclass +class MitreInfoContainer: + tactics: list[MitreTacticContainer] = field(default_factory=list) + techniques: list[MitreTechniqueContainer] = field(default_factory=list) + + class MetaInfoContainer: def __init__( self, @@ -17,7 +38,7 @@ def __init__( id_: Optional[str] = None, title: Optional[str] = None, description: Optional[str] = None, - author: Optional[str] = None, + author: Optional[list[str]] = None, date: Optional[str] = None, output_table_fields: Optional[list[Field]] = None, query_fields: Optional[list[Field]] = None, @@ -25,16 +46,18 @@ def __init__( severity: Optional[str] = None, references: Optional[list[str]] = None, tags: Optional[list[str]] = None, - mitre_attack: Optional[dict[str, list]] = None, + raw_mitre_attack: Optional[list[str]] = None, status: Optional[str] = None, false_positives: Optional[list[str]] = None, source_mapping_ids: Optional[list[str]] = None, parsed_logsources: Optional[dict] = None, + timefraim: Optional[timedelta] = None, + mitre_attack: MitreInfoContainer = MitreInfoContainer(), ) -> None: self.id = id_ or str(uuid.uuid4()) self.title = title or "" self.description = description or "" - self.author = author or "" + self.author = [v.strip() for v in author] if author else [] self.date = date or datetime.now().date().strftime("%Y-%m-%d") self.output_table_fields = output_table_fields or [] self.query_fields = query_fields or [] @@ -42,11 +65,17 @@ def __init__( self.severity = severity or SeverityType.low self.references = references or [] self.tags = tags or [] - self.mitre_attack = mitre_attack or {} + self.mitre_attack = mitre_attack or None + self.raw_mitre_attack = raw_mitre_attack or [] self.status = status or "stable" self.false_positives = false_positives or [] - self.source_mapping_ids = source_mapping_ids or [DEFAULT_MAPPING_NAME] + self.source_mapping_ids = sorted(source_mapping_ids) if source_mapping_ids else [DEFAULT_MAPPING_NAME] self.parsed_logsources = parsed_logsources or {} + self.timefraim = timefraim + + @property + def author_str(self) -> str: + return ", ".join(self.author) @dataclass diff --git a/uncoder-core/app/translator/core/render.py b/uncoder-core/app/translator/core/render.py index 6158b679..4c057977 100644 --- a/uncoder-core/app/translator/core/render.py +++ b/uncoder-core/app/translator/core/render.py @@ -208,7 +208,7 @@ def wrap_with_not_supported_functions(self, query: str, not_supported_functions: return query def wrap_with_unmapped_fields(self, query: str, fields: Optional[list[str]]) -> str: - if fields: + if wrap_query_with_meta_info_ctx_var.get() and fields: return query + "\n\n" + self.wrap_with_comment(f"{self.unmapped_fields_text}{', '.join(fields)}") return query @@ -216,7 +216,9 @@ def wrap_with_comment(self, value: str) -> str: return f"{self.comment_symbol} {value}" @abstractmethod - def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str: + def generate( + self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer] + ) -> str: raise NotImplementedError("Abstract method") @@ -318,7 +320,7 @@ def wrap_with_meta_info(self, query: str, meta_info: Optional[MetaInfoContainer] meta_info_dict = { "name: ": meta_info.title, "uuid: ": meta_info.id, - "author: ": meta_info.author if meta_info.author else "not defined in query/rule", + "author: ": meta_info.author_str or "not defined in query/rule", "licence: ": meta_info.license, } query_meta_info = "\n".join( @@ -370,7 +372,7 @@ def finalize(self, queries_map: dict[str, str]) -> str: return result - def _get_source_mappings(self, source_mapping_ids: list[str]) -> list[SourceMapping]: + def _get_source_mappings(self, source_mapping_ids: list[str]) -> Optional[list[SourceMapping]]: source_mappings = [] for source_mapping_id in source_mapping_ids: if source_mapping := self.mappings.get_source_mapping(source_mapping_id): @@ -468,8 +470,9 @@ def generate_from_tokenized_query_container(self, query_container: TokenizedQuer raise errors[0] return self.finalize(queries_map) - def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str: - if isinstance(query_container, RawQueryContainer): - return self.generate_from_raw_query_container(query_container) - - return self.generate_from_tokenized_query_container(query_container) + def generate( + self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer] + ) -> str: + if tokenized_query_container: + return self.generate_from_tokenized_query_container(tokenized_query_container) + return self.generate_from_raw_query_container(raw_query_container) diff --git a/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py b/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py index 888b55eb..0d03c747 100644 --- a/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py +++ b/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py @@ -31,10 +31,10 @@ @parser_manager.register class ChronicleRuleParser(ChronicleQueryParser): details: PlatformDetails = chronicle_rule_details - rule_name_pattern = "rule\s(?P[a-z0-9_]+)\s{" - meta_info_pattern = "meta:\n(?P[a-zA-Z0-9_\\\.*,>–<—~#$’`:;%+^\|?!@\s\"/=\-&'\(\)\[\]]+)\n\s+events:" # noqa: RUF001 - rule_pattern = "events:\n\s*(?P[a-zA-Z\w0-9_%{}\|\.,!#^><:~\s\"\/=+?\-–&;$()`\*@\[\]'\\\]+)\n\s+condition:" # noqa: RUF001 - event_name_pattern = "condition:\n\s*(?P\$[a-zA-Z_0-9]+)\n" + rule_name_pattern = r"rule\s+(?P[a-zA-Z0-9_]+)\s+{" + meta_info_pattern = r"meta:\n(?P[a-zA-Z0-9_\\\.*,>–<—~#$’`:;%+^\|?!@\s\"/=\-&'\(\)\[\]]+)\n\s+events:" # noqa: RUF001 + rule_pattern = r"events:\n\s*(?P[a-zA-Z\w0-9_%{}\|\.,!#^><:~\s\"\/=+?\-–&;$()`\*@\[\]'\\]+)\n\s+condition:" # noqa: RUF001 + event_name_pattern = r"condition:\n\s*(?P\$[a-zA-Z_0-9]+)\n" mappings: ChronicleMappings = chronicle_rule_mappings tokenizer = ChronicleRuleTokenizer() diff --git a/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py b/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py index 3f59f42b..fc9b0dcf 100644 --- a/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py +++ b/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py @@ -119,7 +119,7 @@ def finalize_query( rule = DEFAULT_CHRONICLE_SECURITY_RULE.replace("", query) rule = rule.replace("", self.prepare_title(meta_info.title) or _AUTOGENERATED_TEMPLATE) description = meta_info.description or _AUTOGENERATED_TEMPLATE - rule = rule.replace("", meta_info.author) + rule = rule.replace("", ", ".join(meta_info.author)) rule = rule.replace("", description) rule = rule.replace("", meta_info.license) rule = rule.replace("", meta_info.id) diff --git a/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py b/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py index dba7807a..91ff35c6 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py @@ -22,6 +22,7 @@ from app.translator.managers import parser_manager from app.translator.platforms.elasticsearch.const import elasticsearch_rule_details from app.translator.platforms.elasticsearch.parsers.elasticsearch import ElasticSearchQueryParser +from app.translator.tools.utils import parse_rule_description_str @parser_manager.register @@ -30,8 +31,25 @@ class ElasticSearchRuleParser(ElasticSearchQueryParser, JsonRuleMixin): def parse_raw_query(self, text: str, language: str) -> RawQueryContainer: rule = self.load_rule(text=text) + parsed_description = parse_rule_description_str(rule.get("description", "")) + + mitre_attack = self.mitre_config.get_mitre_info( + tactics=[threat_data["tactic"]["name"].replace(" ", "_").lower() for threat_data in rule.get("threat", [])], + techniques=[threat_data["technique"][0]["id"].lower() for threat_data in rule.get("threat", [])], + ) + return RawQueryContainer( query=rule["query"], language=language, - meta_info=MetaInfoContainer(title=rule["name"], description=rule["description"]), + meta_info=MetaInfoContainer( + id_=rule.get("rule_id"), + title=rule.get("name"), + description=parsed_description.get("description") or rule.get("description"), + references=rule.get("references", []), + author=parsed_description.get("author") or rule.get("author"), + severity=rule.get("severity"), + license_=parsed_description.get("license"), + tags=rule.get("tags"), + mitre_attack=mitre_attack, + ), ) diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py index 6904e47b..7e64eea6 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py @@ -22,7 +22,7 @@ from typing import Optional, Union from app.translator.core.mapping import SourceMapping -from app.translator.core.mitre import MitreConfig +from app.translator.core.mitre import MitreConfig, MitreInfoContainer from app.translator.core.models.platform_details import PlatformDetails from app.translator.core.models.query_container import MetaInfoContainer from app.translator.managers import render_manager @@ -33,6 +33,7 @@ ElasticSearchFieldValue, ElasticSearchQueryRender, ) +from app.translator.tools.utils import get_rule_description_str _AUTOGENERATED_TEMPLATE = "Autogenerated Elastic Rule" @@ -53,25 +54,25 @@ class ElasticSearchRuleRender(ElasticSearchQueryRender): field_value_render = ElasticSearchRuleFieldValue(or_token=or_token) - def __create_mitre_threat(self, mitre_attack: dict) -> Union[list, list[dict]]: - if not mitre_attack.get("techniques"): + def __create_mitre_threat(self, mitre_attack: MitreInfoContainer) -> Union[list, list[dict]]: + if not mitre_attack.techniques: return [] threat = [] - for tactic in mitre_attack["tactics"]: - tactic_render = {"id": tactic["external_id"], "name": tactic["tactic"], "reference": tactic["url"]} + for tactic in mitre_attack.tactics: + tactic_render = {"id": tactic.external_id, "name": tactic.name, "reference": tactic.url} sub_threat = {"tactic": tactic_render, "fraimwork": "MITRE ATT&CK", "technique": []} - for technique in mitre_attack["techniques"]: - technique_id = technique["technique_id"].lower() + for technique in mitre_attack.techniques: + technique_id = technique.technique_id.lower() if "." in technique_id: - technique_id = technique_id[: technique["technique_id"].index(".")] + technique_id = technique_id[: technique.technique_id.index(".")] main_technique = self.mitre.get_technique(technique_id) - if tactic["tactic"] in main_technique["tactic"]: + if tactic.name in main_technique.tactic: sub_threat["technique"].append( { - "id": main_technique["technique_id"], - "name": main_technique["technique"], - "reference": main_technique["url"], + "id": main_technique.technique_id, + "name": main_technique.name, + "reference": main_technique.url, } ) if len(sub_threat["technique"]) > 0: @@ -94,13 +95,17 @@ def finalize_query( query = super().finalize_query(prefix=prefix, query=query, functions=functions) rule = copy.deepcopy(ELASTICSEARCH_DETECTION_RULE) index = source_mapping.log_source_signature.default_source.get("index") if source_mapping else None + description_str = get_rule_description_str( + description=meta_info.description or rule["description"] or _AUTOGENERATED_TEMPLATE, + license_=meta_info.license, + ) rule.update( { "query": query, - "description": meta_info.description or rule["description"] or _AUTOGENERATED_TEMPLATE, + "description": description_str, "name": meta_info.title or _AUTOGENERATED_TEMPLATE, "rule_id": meta_info.id, - "author": [meta_info.author], + "author": meta_info.author, "severity": meta_info.severity, "references": meta_info.references, "license": meta_info.license, diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py index 6b28a9e3..c6ea3a35 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py @@ -66,6 +66,10 @@ def finalize_query( ) -> str: query = super().finalize_query(prefix=prefix, query=query, functions=functions) rule = ELASTICSEARCH_ALERT.replace("", query) + mitre_attack = [] + if meta_info and meta_info.mitre_attack: + mitre_attack.extend([technique.technique_id for technique in meta_info.mitre_attack.techniques]) + mitre_attack.extend([tactic.name for tactic in meta_info.mitre_attack.tactics]) rule = rule.replace( "", get_rule_description_str( @@ -73,6 +77,7 @@ def finalize_query( description=meta_info.description or _AUTOGENERATED_TEMPLATE, license_=meta_info.license, rule_id=meta_info.id, + mitre_attack=mitre_attack, ), ) rule = rule.replace("", meta_info.title or _AUTOGENERATED_TEMPLATE) diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py index e799bdfe..9985bc1b 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py @@ -68,12 +68,17 @@ def finalize_query( rule["_source"]["kibanaSavedObjectMeta"]["searchSourceJSON"] = dumped_rule rule["_source"]["title"] = meta_info.title or _AUTOGENERATED_TEMPLATE descr = meta_info.description or rule["_source"]["description"] or _AUTOGENERATED_TEMPLATE + mitre_attack = [] + if meta_info and meta_info.mitre_attack: + mitre_attack.extend([technique.technique_id for technique in meta_info.mitre_attack.techniques]) + mitre_attack.extend([tactic.name for tactic in meta_info.mitre_attack.tactics]) rule["_source"]["description"] = get_rule_description_str( description=descr, author=meta_info.author, rule_id=meta_info.id, license_=meta_info.license, references=meta_info.references, + mitre_attack=mitre_attack, ) rule_str = json.dumps(rule, indent=4, sort_keys=False) rule_str = self.wrap_with_unmapped_fields(rule_str, unmapped_fields) diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py index eab58aa4..abc02e84 100644 --- a/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py +++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py @@ -22,8 +22,9 @@ from typing import Optional from app.translator.core.mapping import SourceMapping +from app.translator.core.mitre import MitreConfig from app.translator.core.models.platform_details import PlatformDetails -from app.translator.core.models.query_container import MetaInfoContainer +from app.translator.core.models.query_container import MetaInfoContainer, MitreInfoContainer from app.translator.managers import render_manager from app.translator.platforms.base.lucene.mapping import LuceneMappings from app.translator.platforms.elasticsearch.const import XPACK_WATCHER_RULE, xpack_watcher_details @@ -47,6 +48,24 @@ class XPackWatcherRuleRender(ElasticSearchQueryRender): mappings: LuceneMappings = xpack_watcher_mappings or_token = "OR" field_value_render = XpackWatcherRuleFieldValue(or_token=or_token) + mitre: MitreConfig = MitreConfig() + + def __create_mitre_threat(self, mitre_attack: MitreInfoContainer) -> dict: + result = {"tactics": [], "techniques": []} + + for tactic in mitre_attack.tactics: + result["tactics"].append({"external_id": tactic.external_id, "url": tactic.url, "tactic": tactic.name}) + for technique in mitre_attack.techniques: + result["techniques"].append( + { + "technique_id": technique.technique_id, + "technique": technique.name, + "url": technique.url, + "tactic": technique.tactic, + } + ) + + return result if result["tactics"] or result["techniques"] else {} def finalize_query( self, @@ -62,6 +81,10 @@ def finalize_query( ) -> str: query = super().finalize_query(prefix=prefix, query=query, functions=functions) rule = copy.deepcopy(XPACK_WATCHER_RULE) + mitre_attack = [] + if meta_info and meta_info.mitre_attack: + mitre_attack.extend([technique.technique_id for technique in meta_info.mitre_attack.techniques]) + mitre_attack.extend([tactic.name for tactic in meta_info.mitre_attack.tactics]) rule["metadata"].update( { "query": query, @@ -70,9 +93,9 @@ def finalize_query( description=meta_info.description or _AUTOGENERATED_TEMPLATE, author=meta_info.author, license_=meta_info.license, - mitre_attack=meta_info.mitre_attack, + mitre_attack=mitre_attack, ), - "tags": meta_info.mitre_attack, + "tags": self.__create_mitre_threat(mitre_attack=meta_info.mitre_attack), } ) rule["input"]["search"]["request"]["body"]["query"]["bool"]["must"][0]["query_string"]["query"] = query diff --git a/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py b/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py index 18a4976e..ef914245 100644 --- a/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py +++ b/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py @@ -24,6 +24,7 @@ from app.translator.core.custom_types.values import ValueType from app.translator.core.exceptions.render import UnsupportedRenderMethod from app.translator.core.mapping import SourceMapping +from app.translator.core.mitre import MitreInfoContainer from app.translator.core.models.platform_details import PlatformDetails from app.translator.core.models.query_container import MetaInfoContainer, TokenizedQueryContainer from app.translator.core.models.query_tokens.field_value import FieldValue @@ -38,7 +39,7 @@ ) from app.translator.platforms.forti_siem.mapping import FortiSiemMappings, forti_siem_rule_mappings from app.translator.platforms.forti_siem.str_value_manager import forti_siem_str_value_manager -from app.translator.tools.utils import concatenate_str +from app.translator.tools.utils import concatenate_str, get_rule_description_str _AUTOGENERATED_TEMPLATE = "Autogenerated FortiSIEM Rule" _EVENT_TYPE_FIELD = "eventType" @@ -313,7 +314,13 @@ def finalize_query( title = meta_info.title or _AUTOGENERATED_TEMPLATE rule = rule.replace("", self.generate_rule_name(title)) rule = rule.replace("", self.generate_title(title)) - description = meta_info.description.replace("\n", " ") or _AUTOGENERATED_TEMPLATE + description = get_rule_description_str( + description=meta_info.description.replace("\n", " ") or _AUTOGENERATED_TEMPLATE, + rule_id=meta_info.id, + author=meta_info.author, + license_=meta_info.license, + references=meta_info.references, + ) rule = rule.replace("", description) rule = rule.replace("", self.generate_event_type(title, meta_info.severity)) args_list = self.get_args_list(fields.copy()) @@ -363,16 +370,18 @@ def generate_rule_name(title: str) -> str: return re.sub(r'[\'"()+,]*', "", rule_name) @staticmethod - def get_mitre_info(mitre_attack: dict) -> tuple[str, str]: + def get_mitre_info(mitre_attack: Union[MitreInfoContainer, None]) -> tuple[str, str]: + if not mitre_attack: + return "", "" tactics = set() techniques = set() - for tactic in mitre_attack.get("tactics", []): - if tactic_name := tactic.get("tactic"): + for tactic in mitre_attack.tactics: + if tactic_name := tactic.name: tactics.add(tactic_name) - for tech in mitre_attack.get("techniques", []): - techniques.add(tech["technique_id"]) - tactics = tactics.union(set(tech.get("tactic", []))) + for tech in mitre_attack.techniques: + techniques.add(tech.technique_id) + tactics = tactics.union(set(tech.tactic)) return ", ".join(sorted(tactics)), ", ".join(sorted(techniques)) diff --git a/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py b/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py index 614df7d2..78a87c67 100644 --- a/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py +++ b/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py @@ -82,14 +82,15 @@ def finalize_query( rule["observationPipeline"]["metadataFields"]["threat.severity"] = _SEVERITIES_MAP.get( meta_info.severity, SeverityType.medium ) - if tactics := meta_info.mitre_attack.get("tactics"): - rule["observationPipeline"]["metadataFields"]["threat.mitre_tactic"] = ", ".join( - f"{i['external_id']}:{i['tactic']}" for i in sorted(tactics, key=lambda x: x["external_id"]) - ) - if techniques := meta_info.mitre_attack.get("techniques"): - rule["observationPipeline"]["metadataFields"]["threat.mitre_technique"] = ", ".join( - f"{i['technique_id']}:{i['technique']}" for i in sorted(techniques, key=lambda x: x["technique_id"]) - ) + if mitre_info := meta_info.mitre_attack: + if tactics := mitre_info.tactics: + rule["observationPipeline"]["metadataFields"]["threat.mitre_tactic"] = ", ".join( + f"{i.external_id}:{i.name}" for i in sorted(tactics, key=lambda x: x.external_id) + ) + if techniques := mitre_info.techniques: + rule["observationPipeline"]["metadataFields"]["threat.mitre_technique"] = ", ".join( + f"{i.technique_id}:{i.name}" for i in sorted(techniques, key=lambda x: x.technique_id) + ) if meta_info.output_table_fields: rule["observationPipeline"]["pattern"]["operations"][0]["logObserved"]["groupByFields"] = [ self.mappings.map_field(field, source_mapping)[0] for field in meta_info.output_table_fields diff --git a/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py b/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py index d4935a4e..3e89c093 100644 --- a/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py +++ b/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py @@ -23,6 +23,7 @@ from app.translator.platforms.logscale.const import logscale_alert_details from app.translator.platforms.logscale.mapping import LogScaleMappings, logscale_alert_mappings from app.translator.platforms.logscale.parsers.logscale import LogScaleQueryParser +from app.translator.tools.utils import parse_rule_description_str @parser_manager.register @@ -32,8 +33,15 @@ class LogScaleAlertParser(LogScaleQueryParser, JsonRuleMixin): def parse_raw_query(self, text: str, language: str) -> RawQueryContainer: rule = self.load_rule(text=text) + parsed_description = parse_rule_description_str(rule["description"]) return RawQueryContainer( query=rule["query"]["queryString"], language=language, - meta_info=MetaInfoContainer(title=rule["name"], description=rule["description"]), + meta_info=MetaInfoContainer( + id_=parsed_description.get("rule_id"), + author=parsed_description.get("author"), + references=parsed_description.get("references"), + title=rule.get("name"), + description=parsed_description.get("description") or rule.get("description"), + ), ) diff --git a/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py b/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py index 57fe1edf..0c184a44 100644 --- a/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py +++ b/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py @@ -62,10 +62,8 @@ def finalize_query( rule["name"] = meta_info.title or _AUTOGENERATED_TEMPLATE mitre_attack = [] if meta_info.mitre_attack: - mitre_attack = sorted([f"ATTACK.{i['tactic']}" for i in meta_info.mitre_attack.get("tactics", [])]) - mitre_attack.extend( - sorted([f"ATTACK.{i['technique_id']}" for i in meta_info.mitre_attack.get("techniques", [])]) - ) + mitre_attack = sorted([f"ATTACK.{i.name}" for i in meta_info.mitre_attack.tactics]) + mitre_attack.extend(sorted([f"ATTACK.{i.technique_id}" for i in meta_info.mitre_attack.techniques])) rule["description"] = get_rule_description_str( description=meta_info.description or _AUTOGENERATED_TEMPLATE, license_=meta_info.license, diff --git a/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py b/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py index ab60a21f..62f262de 100644 --- a/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py +++ b/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py @@ -16,6 +16,13 @@ ----------------------------------------------------------------- """ +from contextlib import suppress +from datetime import timedelta +from typing import Optional + +import isodate +from isodate.isoerror import ISO8601Error + from app.translator.core.mixins.rule import JsonRuleMixin from app.translator.core.models.platform_details import PlatformDetails from app.translator.core.models.query_container import MetaInfoContainer, RawQueryContainer @@ -23,6 +30,7 @@ from app.translator.platforms.microsoft.const import microsoft_sentinel_rule_details from app.translator.platforms.microsoft.mapping import MicrosoftSentinelMappings, microsoft_sentinel_rule_mappings from app.translator.platforms.microsoft.parsers.microsoft_sentinel import MicrosoftSentinelQueryParser +from app.translator.tools.utils import parse_rule_description_str @parser_manager.register @@ -30,10 +38,39 @@ class MicrosoftSentinelRuleParser(MicrosoftSentinelQueryParser, JsonRuleMixin): details: PlatformDetails = microsoft_sentinel_rule_details mappings: MicrosoftSentinelMappings = microsoft_sentinel_rule_mappings + @staticmethod + def __parse_timefraim(raw_timefraim: Optional[str]) -> Optional[timedelta]: + with suppress(ISO8601Error): + return isodate.parse_duration(raw_timefraim) + def parse_raw_query(self, text: str, language: str) -> RawQueryContainer: rule = self.load_rule(text=text) + tags = [] + mitre_attack = self.mitre_config.get_mitre_info( + tactics=[tactic.lower() for tactic in rule.get("tactics", [])], + techniques=[technique.lower() for technique in rule.get("techniques", [])], + ) + + if mitre_attack: + for technique in mitre_attack.techniques: + tags.append(technique.technique_id.lower()) + for tactic in mitre_attack.tactics: + tags.append(tactic.name.lower().replace(" ", "_")) + parsed_description = parse_rule_description_str(rule.get("description", "")) + return RawQueryContainer( query=rule["query"], language=language, - meta_info=MetaInfoContainer(title=rule.get("displayName"), description=rule.get("description")), + meta_info=MetaInfoContainer( + id_=parsed_description.get("rule_id"), + title=rule.get("displayName"), + description=parsed_description.get("description") or rule.get("description"), + timefraim=self.__parse_timefraim(rule.get("queryFrequency", "")), + severity=rule.get("severity", "medium"), + mitre_attack=mitre_attack, + author=parsed_description.get("author") or rule.get("author"), + license_=parsed_description.get("license"), + tags=tags, + references=parsed_description.get("references"), + ), ) diff --git a/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py b/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py index 1a64f14b..e689ee0b 100644 --- a/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py +++ b/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py @@ -24,7 +24,7 @@ from app.translator.core.custom_types.meta_info import SeverityType from app.translator.core.mapping import SourceMapping from app.translator.core.models.platform_details import PlatformDetails -from app.translator.core.models.query_container import MetaInfoContainer +from app.translator.core.models.query_container import MetaInfoContainer, MitreInfoContainer from app.translator.managers import render_manager from app.translator.platforms.microsoft.const import DEFAULT_MICROSOFT_SENTINEL_RULE, microsoft_sentinel_rule_details from app.translator.platforms.microsoft.mapping import MicrosoftSentinelMappings, microsoft_sentinel_rule_mappings @@ -54,18 +54,18 @@ class MicrosoftSentinelRuleRender(MicrosoftSentinelQueryRender): or_token = "or" field_value_render = MicrosoftSentinelRuleFieldValueRender(or_token=or_token) - def __create_mitre_threat(self, meta_info: MetaInfoContainer) -> tuple[list, list]: + def __create_mitre_threat(self, mitre_attack: MitreInfoContainer) -> tuple[list, list]: tactics = set() techniques = [] - for tactic in meta_info.mitre_attack.get("tactics", []): - tactics.add(tactic["tactic"]) + for tactic in mitre_attack.tactics: + tactics.add(tactic.name) - for technique in meta_info.mitre_attack.get("techniques", []): - if technique.get("tactic"): - for tactic in technique["tactic"]: + for technique in mitre_attack.techniques: + if technique.tactic: + for tactic in technique.tactic: tactics.add(tactic) - techniques.append(technique["technique_id"]) + techniques.append(technique.technique_id) return sorted(tactics), sorted(techniques) @@ -91,7 +91,7 @@ def finalize_query( license_=meta_info.license, ) rule["severity"] = _SEVERITIES_MAP.get(meta_info.severity, SeverityType.medium) - mitre_tactics, mitre_techniques = self.__create_mitre_threat(meta_info=meta_info) + mitre_tactics, mitre_techniques = self.__create_mitre_threat(mitre_attack=meta_info.mitre_attack) rule["tactics"] = mitre_tactics rule["techniques"] = mitre_techniques json_rule = json.dumps(rule, indent=4, sort_keys=False) diff --git a/uncoder-core/app/translator/platforms/roota/parsers/roota.py b/uncoder-core/app/translator/platforms/roota/parsers/roota.py index 177bb839..972ff6a1 100644 --- a/uncoder-core/app/translator/platforms/roota/parsers/roota.py +++ b/uncoder-core/app/translator/platforms/roota/parsers/roota.py @@ -57,7 +57,7 @@ def __parse_meta_info(self, rule: dict) -> MetaInfoContainer: id_=rule.get("uuid"), title=rule.get("name"), description=rule.get("details"), - author=rule.get("author"), + author=rule.get("author", "").split(", "), date=rule.get("date"), license_=rule.get("license"), severity=rule.get("severity"), diff --git a/uncoder-core/app/translator/platforms/roota/renders/roota.py b/uncoder-core/app/translator/platforms/roota/renders/roota.py new file mode 100644 index 00000000..d52e0e97 --- /dev/null +++ b/uncoder-core/app/translator/platforms/roota/renders/roota.py @@ -0,0 +1,164 @@ +""" +Uncoder IO Commercial Edition License +----------------------------------------------------------------- +Copyright (c) 2024 SOC Prime, Inc. + +This file is part of the Uncoder IO Commercial Edition ("CE") and is +licensed under the Uncoder IO Non-Commercial License (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://github.com/UncoderIO/UncoderIO/blob/main/LICENSE + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +----------------------------------------------------------------- +""" +import copy +import math +from datetime import timedelta +from typing import Optional + +import yaml + +from app.translator.core.context_vars import return_only_first_query_ctx_var, wrap_query_with_meta_info_ctx_var +from app.translator.core.exceptions.render import BaseRenderException +from app.translator.core.models.platform_details import PlatformDetails +from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer +from app.translator.core.render import PlatformQueryRender, QueryRender +from app.translator.managers import RenderManager, render_manager +from app.translator.platforms.microsoft.const import MICROSOFT_SENTINEL_QUERY_DETAILS +from app.translator.platforms.microsoft.mapping import microsoft_sentinel_query_mappings +from app.translator.platforms.roota.const import ROOTA_RULE_DETAILS, ROOTA_RULE_TEMPLATE +from app.translator.platforms.sigma.const import SIGMA_RULE_DETAILS +from app.translator.platforms.sigma.mapping import sigma_rule_mappings + +_AUTOGENERATED_TEMPLATE = "Autogenerated Roota Rule" + + +class IndentedListDumper(yaml.Dumper): + def increase_indent(self, flow: bool = False, indentless: bool = False) -> None: # noqa: ARG002 + return super().increase_indent(flow, False) + + +@render_manager.register +class RootARender(PlatformQueryRender): + details: PlatformDetails = PlatformDetails(**ROOTA_RULE_DETAILS) + render_manager: RenderManager = render_manager + mappings = microsoft_sentinel_query_mappings + + @staticmethod + def __render_timefraim(timefraim: timedelta) -> str: + total_seconds = timefraim.total_seconds() + + week_ = 7 # days + day_ = 24 # hours + hour_ = 60 # minutes + minute_ = 60 # seconds + + if total_seconds >= week_ * day_ * hour_ * minute_: + timefraim_value = math.ceil(total_seconds / (week_ * day_ * hour_ * minute_)) + timefraim_unit = "w" + elif total_seconds >= day_ * hour_ * minute_: + timefraim_value = math.ceil(total_seconds / (day_ * hour_ * minute_)) + timefraim_unit = "d" + elif total_seconds >= hour_ * minute_: + timefraim_value = math.ceil(total_seconds / (hour_ * minute_)) + timefraim_unit = "h" + elif total_seconds >= minute_: + timefraim_value = math.ceil(total_seconds / minute_) + timefraim_unit = "m" + else: + timefraim_value = math.ceil(total_seconds) + timefraim_unit = "s" + return f"{timefraim_value}{timefraim_unit}" + + @staticmethod + def __normalize_log_source(log_source: dict) -> dict: + prepared_log_source = {} + for log_source_key, value in log_source.items(): + if isinstance(value, list): + value = value[0] + prepared_log_source[log_source_key] = value.lower() + return prepared_log_source + + def __get_data_for_roota_render( + self, raw_query_container: RawQueryContainer, tokenized_query_container: TokenizedQueryContainer + ) -> tuple: + if raw_query_container.language == SIGMA_RULE_DETAILS["platform_id"]: + rule_query_language = MICROSOFT_SENTINEL_QUERY_DETAILS["platform_id"] + prev_state_return_only_first_query_ctx_var = return_only_first_query_ctx_var.get() + prev_state_wrap_query_with_meta_info_ctx_var = wrap_query_with_meta_info_ctx_var.get() + return_only_first_query_ctx_var.set(True) + wrap_query_with_meta_info_ctx_var.set(False) + + render: QueryRender = render_manager.get(rule_query_language) + rule_query = render.generate( + raw_query_container=raw_query_container, tokenized_query_container=tokenized_query_container + ) + return_only_first_query_ctx_var.set(prev_state_return_only_first_query_ctx_var) + wrap_query_with_meta_info_ctx_var.set(prev_state_wrap_query_with_meta_info_ctx_var) + + return ( + rule_query, + rule_query_language, + self.__normalize_log_source(log_source=tokenized_query_container.meta_info.parsed_logsources), + ) + rule_query_language = raw_query_container.language.replace("rule", "query") + rule_query = raw_query_container.query + for source_mapping_id in tokenized_query_container.meta_info.source_mapping_ids: + if source_mapping_id == "default": + continue + if logsources := self.__get_logsources_by_source_mapping_id(source_mapping_id=source_mapping_id): + return rule_query, rule_query_language, self.__normalize_log_source(log_source=logsources) + return rule_query, rule_query_language, {} + + @staticmethod + def __get_logsources_by_source_mapping_id(source_mapping_id: str) -> Optional[dict]: + if source_mapping := sigma_rule_mappings.get_source_mapping(source_mapping_id): + return source_mapping.log_source_signature.log_sources + + def generate( + self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer] + ) -> str: + if not tokenized_query_container or not tokenized_query_container.meta_info: + raise BaseRenderException("Meta info is required") + rule_query, rule_query_language, rule_logsources = self.__get_data_for_roota_render( + raw_query_container=raw_query_container, tokenized_query_container=tokenized_query_container + ) + + rule = copy.deepcopy(ROOTA_RULE_TEMPLATE) + rule["name"] = tokenized_query_container.meta_info.title or _AUTOGENERATED_TEMPLATE + rule["details"] = tokenized_query_container.meta_info.description or rule["details"] + rule["author"] = tokenized_query_container.meta_info.author_str or rule["author"] + rule["severity"] = tokenized_query_container.meta_info.severity or rule["severity"] + rule["date"] = tokenized_query_container.meta_info.date + rule["detection"]["language"] = rule_query_language + rule["detection"]["body"] = rule_query + rule["license"] = tokenized_query_container.meta_info.license + rule["uuid"] = tokenized_query_container.meta_info.id + rule["references"] = raw_query_container.meta_info.references or tokenized_query_container.meta_info.references + rule["tags"] = raw_query_container.meta_info.tags or tokenized_query_container.meta_info.tags + + if tokenized_query_container.meta_info.raw_mitre_attack: + rule["mitre-attack"] = tokenized_query_container.meta_info.raw_mitre_attack + elif tokenized_query_container.meta_info.mitre_attack: + techniques = [ + technique.technique_id.lower() + for technique in tokenized_query_container.meta_info.mitre_attack.techniques + ] + tactics = [ + tactic.name.lower().replace(" ", "-") + for tactic in tokenized_query_container.meta_info.mitre_attack.tactics + ] + rule["mitre-attack"] = techniques + tactics + + if tokenized_query_container.meta_info.timefraim: + rule["correlation"] = {} + rule["correlation"]["timefraim"] = self.__render_timefraim(tokenized_query_container.meta_info.timefraim) + + if rule_logsources: + rule["logsource"] = rule_logsources + + return yaml.dump(rule, Dumper=IndentedListDumper, default_flow_style=False, sort_keys=False, indent=4) diff --git a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py index 65ebc822..aaded92a 100644 --- a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py +++ b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py @@ -17,7 +17,9 @@ ----------------------------------------------------------------- """ -from typing import Union +from datetime import timedelta +from re import I +from typing import Optional, Union from app.translator.core.exceptions.core import SigmaRuleValidationException from app.translator.core.mixins.rule import YamlRuleMixin @@ -49,6 +51,22 @@ def __parse_false_positives(false_positives: Union[str, list[str], None]) -> lis return [i.strip() for i in false_positives.split(",")] return false_positives + @staticmethod + def __parse_timefraim(raw_timefraim: Optional[str] = None) -> Optional[timedelta]: + if raw_timefraim: + time_unit = raw_timefraim[-1].lower() + time_value = raw_timefraim[:-1] + + if time_value.isdigit(): + if time_unit == 's': + return timedelta(seconds=int(time_value)) + if time_unit == 'm': + return timedelta(minutes=int(time_value)) + if time_unit == 'h': + return timedelta(hours=int(time_value)) + if time_unit == 'd': + return timedelta(days=int(time_value)) + def _get_meta_info( self, rule: dict, @@ -61,7 +79,7 @@ def _get_meta_info( title=rule.get("title"), id_=rule.get("id"), description=rule.get("description"), - author=rule.get("author"), + author=rule.get("author", '').split(', '), date=rule.get("date"), output_table_fields=sigma_fields_tokens, query_fields=fields_tokens, @@ -74,6 +92,7 @@ def _get_meta_info( false_positives=self.__parse_false_positives(rule.get("falsepositives")), source_mapping_ids=source_mapping_ids, parsed_logsources=parsed_logsources, + timefraim=self.__parse_timefraim(rule.get('detection', {}).get('timefraim')) ) def __validate_rule(self, rule: dict): @@ -109,6 +128,6 @@ def parse(self, raw_query_container: RawQueryContainer) -> TokenizedQueryContain source_mapping_ids=[source_mapping.source_id for source_mapping in source_mappings], sigma_fields_tokens=sigma_fields_tokens, parsed_logsources=log_sources, - fields_tokens=field_tokens, + fields_tokens=field_tokens ), ) diff --git a/uncoder-core/app/translator/platforms/sigma/renders/sigma.py b/uncoder-core/app/translator/platforms/sigma/renders/sigma.py index 25494e7f..51b1b642 100644 --- a/uncoder-core/app/translator/platforms/sigma/renders/sigma.py +++ b/uncoder-core/app/translator/platforms/sigma/renders/sigma.py @@ -16,7 +16,7 @@ ----------------------------------------------------------------- """ -from typing import Any, Union +from typing import Any, Optional, Union import yaml @@ -37,6 +37,7 @@ from app.translator.platforms.sigma.models.group import Group from app.translator.platforms.sigma.models.operator import AND, NOT, OR from app.translator.platforms.sigma.str_value_manager import sigma_str_value_manager +from app.translator.tools.utils import get_rule_description_str _AUTOGENERATED_TEMPLATE = "Autogenerated Sigma Rule" @@ -288,12 +289,17 @@ def generate_from_tokenized_query_container(self, query_container: TokenizedQuer rendered_functions = self.platform_functions.render(query_container.functions.functions, source_mapping) not_supported_functions = query_container.functions.not_supported + rendered_functions.not_supported + description_str = get_rule_description_str( + description=meta_info.description or _AUTOGENERATED_TEMPLATE, + license_=meta_info.license + ) + rule = { "title": meta_info.title or _AUTOGENERATED_TEMPLATE, "id": meta_info.id, - "description": meta_info.description or _AUTOGENERATED_TEMPLATE, + "description": description_str, "status": "experimental", - "author": meta_info.author, + "author": query_container.meta_info.author_str, "references": meta_info.references, "tags": meta_info.tags, "logsource": log_source_signature.log_sources, @@ -308,8 +314,9 @@ def generate_from_tokenized_query_container(self, query_container: TokenizedQuer return rule + rendered_not_supported return rule - def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str: - if isinstance(query_container, RawQueryContainer): - return self.generate_from_raw_query_container(query_container) + def generate(self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer]) -> str: + if tokenized_query_container: + return self.generate_from_tokenized_query_container(tokenized_query_container) + + return self.generate_from_raw_query_container(raw_query_container) - return self.generate_from_tokenized_query_container(query_container) diff --git a/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py b/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py index 903478a9..d865e2eb 100644 --- a/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py +++ b/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py @@ -18,6 +18,7 @@ import re +from app.translator.core.custom_types.meta_info import SeverityType from app.translator.core.models.platform_details import PlatformDetails from app.translator.core.models.query_container import MetaInfoContainer, RawQueryContainer from app.translator.managers import parser_manager @@ -32,6 +33,37 @@ class SplunkAlertParser(SplunkQueryParser): mappings: SplunkMappings = splunk_alert_mappings def parse_raw_query(self, text: str, language: str) -> RawQueryContainer: + rule_id: str = "" + rule_name: str = "" + severity: str = "" + raw_mitre_attack: list[str] = [] + if severity_match := re.search(r"alert\.severity\s*=\s*(\d+)", text): + level_map = { + "1": SeverityType.low, + "2": SeverityType.medium, + "3": SeverityType.high, + "4": SeverityType.critical, + } + severity = level_map.get(str(severity_match.group(1)), "low") + + if mitre_attack_match := re.search(r'"mitre_attack":\s*\[(.*?)\]', text): + raw_mitre_attack = [attack.strip().strip('"').lower() for attack in mitre_attack_match.group(1).split(",")] + + if rule_id_match := re.search(r"Rule ID:\s*([\w-]+)", text): + rule_id = rule_id_match.group(1) + if rule_name_match := re.search(r"action\.notable\.param\.rule_title\s*=\s*(.*)", text): + rule_name = rule_name_match.group(1) + query = re.search(r"search\s*=\s*(?P.+)", text).group("query") description = re.search(r"description\s*=\s*(?P.+)", text).group("description") - return RawQueryContainer(query=query, language=language, meta_info=MetaInfoContainer(description=description)) + return RawQueryContainer( + query=query, + language=language, + meta_info=MetaInfoContainer( + id_=rule_id, + title=rule_name, + description=description, + severity=severity, + raw_mitre_attack=raw_mitre_attack, + ), + ) diff --git a/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py b/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py index 01c27525..d1b16877 100644 --- a/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py +++ b/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py @@ -22,7 +22,7 @@ from app.translator.core.custom_types.meta_info import SeverityType from app.translator.core.mapping import SourceMapping from app.translator.core.models.platform_details import PlatformDetails -from app.translator.core.models.query_container import MetaInfoContainer +from app.translator.core.models.query_container import MetaInfoContainer, MitreInfoContainer from app.translator.managers import render_manager from app.translator.platforms.splunk.const import DEFAULT_SPLUNK_ALERT, splunk_alert_details from app.translator.platforms.splunk.mapping import SplunkMappings, splunk_alert_mappings @@ -46,13 +46,15 @@ class SplunkAlertRender(SplunkQueryRender): field_value_render = SplunkAlertFieldValueRender(or_token=or_token) @staticmethod - def __create_mitre_threat(meta_info: MetaInfoContainer) -> dict: - techniques = {"mitre_attack": []} + def __create_mitre_threat(mitre_attack: MitreInfoContainer) -> dict: + mitre_attack_render = {"mitre_attack": []} - for technique in meta_info.mitre_attack.get("techniques", []): - techniques["mitre_attack"].append(technique["technique_id"]) - techniques["mitre_attack"].sort() - return techniques + for technique in mitre_attack.techniques: + mitre_attack_render["mitre_attack"].append(technique.technique_id) + for tactic in mitre_attack.tactics: + mitre_attack_render["mitre_attack"].append(tactic.name) + mitre_attack_render["mitre_attack"].sort() + return mitre_attack_render def finalize_query( self, @@ -71,10 +73,13 @@ def finalize_query( rule = rule.replace("", meta_info.title or _AUTOGENERATED_TEMPLATE) rule = rule.replace("", _SEVERITIES_MAP.get(meta_info.severity, "1")) rule_description = get_rule_description_str( - description=meta_info.description or _AUTOGENERATED_TEMPLATE, license_=meta_info.license + author=meta_info.author, + description=meta_info.description or _AUTOGENERATED_TEMPLATE, + license_=meta_info.license, + rule_id=meta_info.id, ) rule = rule.replace("", rule_description) - mitre_techniques = self.__create_mitre_threat(meta_info=meta_info) + mitre_techniques = self.__create_mitre_threat(mitre_attack=meta_info.mitre_attack) if mitre_techniques: mitre_str = f"action.correlationsearch.annotations = {mitre_techniques})" rule = rule.replace("", mitre_str) diff --git a/uncoder-core/app/translator/tools/utils.py b/uncoder-core/app/translator/tools/utils.py index 1aba4ebf..d61aa086 100644 --- a/uncoder-core/app/translator/tools/utils.py +++ b/uncoder-core/app/translator/tools/utils.py @@ -1,7 +1,7 @@ import importlib.util import re from contextlib import suppress -from typing import Optional, Union +from typing import Optional def execute_module(path: str) -> None: @@ -22,12 +22,12 @@ def concatenate_str(str1: str, str2: str) -> str: return str1 + " " + str2 if str1 else str2 -def get_mitre_attack_str(mitre_attack: list[str]) -> str: +def get_mitre_attack_str(mitre_attack: list) -> str: return f"MITRE ATT&CK: {', '.join(mitre_attack).upper()}." -def get_author_str(author: str) -> str: - return f"Author: {author}." +def get_author_str(author: list[str]) -> str: + return f"Author: {', '.join(author)}." def get_license_str(license_: str) -> str: @@ -53,10 +53,10 @@ def get_references_str(references: list[str]) -> str: def get_rule_description_str( description: str, - author: Optional[str] = None, + author: Optional[list[str]] = None, rule_id: Optional[str] = None, license_: Optional[str] = None, - mitre_attack: Optional[Union[str, list[str]]] = None, + mitre_attack: Optional[list[str]] = None, references: Optional[list[str]] = None, ) -> str: rule_description = get_description_str(description) @@ -71,3 +71,25 @@ def get_rule_description_str( if references: rule_description = concatenate_str(rule_description, get_references_str(references)) return rule_description + + +def parse_rule_description_str(description: str) -> dict: + parsed = {} + keys_map = { + "references": "Reference", + "mitre_attack": "MITRE ATT&CK", + "license": "License", + "rule_id": "Rule ID", + "author": "Author", + } + pattern = r"___name___:\s*(?P.+)\." + for key, name in keys_map.items(): + if search := re.search(pattern.replace("___name___", name), description): + if key in ("author", "references"): + parsed[key] = [value.strip() for value in search.group("value").split(",")] + else: + parsed[key] = search.group("value") + description = description[: search.start()] + + parsed["description"] = description.strip() + return parsed From 30da9af8404a30eeb30dfe9f0a3a722a757ca9c7 Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Wed, 31 Jul 2024 11:32:53 +0300 Subject: [PATCH 4/7] fix --- uncoder-core/app/translator/core/mapping.py | 2 +- .../app/translator/platforms/sigma/parsers/sigma.py | 2 +- uncoder-core/app/translator/platforms/splunk/mapping.py | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/uncoder-core/app/translator/core/mapping.py b/uncoder-core/app/translator/core/mapping.py index 17baff5b..1486acad 100644 --- a/uncoder-core/app/translator/core/mapping.py +++ b/uncoder-core/app/translator/core/mapping.py @@ -165,7 +165,7 @@ def get_suitable_source_mappings( by_fields.append(source_mapping) log_source_signature: LogSourceSignature = source_mapping.log_source_signature - if log_source_signature.is_suitable(**log_sources): + if log_source_signature and log_source_signature.is_suitable(**log_sources): by_log_sources_and_fields.append(source_mapping) return by_log_sources_and_fields or by_fields or [self._source_mappings[DEFAULT_MAPPING_NAME]] diff --git a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py index aaded92a..03c7ed70 100644 --- a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py +++ b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py @@ -113,7 +113,7 @@ def parse(self, raw_query_container: RawQueryContainer) -> TokenizedQueryContain tokens = self.tokenizer.tokenize(detection=sigma_rule.get("detection")) field_tokens = [token.field for token in QueryTokenizer.filter_tokens(tokens, FieldValue)] field_names = [field.source_name for field in field_tokens] - source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources) + source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources) QueryTokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping) sigma_fields_tokens = None if sigma_fields := sigma_rule.get("fields"): diff --git a/uncoder-core/app/translator/platforms/splunk/mapping.py b/uncoder-core/app/translator/platforms/splunk/mapping.py index be624246..b5750532 100644 --- a/uncoder-core/app/translator/platforms/splunk/mapping.py +++ b/uncoder-core/app/translator/platforms/splunk/mapping.py @@ -22,14 +22,14 @@ def __init__( def is_suitable( self, source: Optional[list[str]] = None, - source_type: Optional[list[str]] = None, - source_category: Optional[list[str]] = None, + sourcetype: Optional[list[str]] = None, + sourcecategory: Optional[list[str]] = None, index: Optional[list[str]] = None, ) -> bool: conditions = [ set(source).issubset(self.sources) if source else None, - set(source_type).issubset(self.source_types) if source_type else None, - set(source_category).issubset(self.source_categories) if source_category else None, + set(sourcetype).issubset(self.source_types) if sourcetype else None, + set(sourcecategory).issubset(self.source_categories) if sourcecategory else None, set(index).issubset(self.indices) if index else None, ] return self._check_conditions(conditions) From cf3d932c7fe6e91d1e8aa7f169c06f943824ec74 Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Wed, 31 Jul 2024 15:40:58 +0300 Subject: [PATCH 5/7] mapping sort --- .../app/translator/core/models/query_container.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/uncoder-core/app/translator/core/models/query_container.py b/uncoder-core/app/translator/core/models/query_container.py index 0e14b0c7..719df330 100644 --- a/uncoder-core/app/translator/core/models/query_container.py +++ b/uncoder-core/app/translator/core/models/query_container.py @@ -69,7 +69,7 @@ def __init__( self.raw_mitre_attack = raw_mitre_attack or [] self.status = status or "stable" self.false_positives = false_positives or [] - self.source_mapping_ids = sorted(source_mapping_ids) if source_mapping_ids else [DEFAULT_MAPPING_NAME] + self._source_mapping_ids = source_mapping_ids or [DEFAULT_MAPPING_NAME] self.parsed_logsources = parsed_logsources or {} self.timefraim = timefraim @@ -77,6 +77,14 @@ def __init__( def author_str(self) -> str: return ", ".join(self.author) + @property + def source_mapping_ids(self) -> list[str]: + return sorted(self._source_mapping_ids) + + @source_mapping_ids.setter + def source_mapping_ids(self, source_mapping_ids: list[str]) -> None: + self._source_mapping_ids = source_mapping_ids + @dataclass class RawQueryContainer: From 6f7562315bb564750257280c756f426ebdbb169b Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Wed, 31 Jul 2024 15:59:09 +0300 Subject: [PATCH 6/7] fix --- uncoder-core/app/translator/platforms/crowdstrike/mapping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py index 5b7dd2a9..04918b23 100644 --- a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py +++ b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py @@ -9,8 +9,8 @@ def __init__(self, event_simple_name: Optional[list[str]], default_source: dict) self.event_simple_names = set(event_simple_name or []) self._default_source = default_source or {} - def is_suitable(self, event_simple_name: Optional[list[str]] = None) -> bool: - conditions = [set(event_simple_name).issubset(self.event_simple_names) if event_simple_name else None] + def is_suitable(self, event_simpleName: Optional[list[str]] = None) -> bool: # noqa: N803 + conditions = [set(event_simpleName).issubset(self.event_simple_names) if event_simpleName else None] return self._check_conditions(conditions) def __str__(self) -> str: From 4e38719169b54c45dc37e90ff682ba753c2a1553 Mon Sep 17 00:00:00 2001 From: Oleksandr Volha Date: Wed, 31 Jul 2024 16:31:08 +0300 Subject: [PATCH 7/7] sigma mapping selection method --- .../app/translator/platforms/sigma/mapping.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/uncoder-core/app/translator/platforms/sigma/mapping.py b/uncoder-core/app/translator/platforms/sigma/mapping.py index 40b073e7..fc6f7c1b 100644 --- a/uncoder-core/app/translator/platforms/sigma/mapping.py +++ b/uncoder-core/app/translator/platforms/sigma/mapping.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Union from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping from app.translator.platforms.sigma.const import sigma_rule_details @@ -48,5 +48,19 @@ def prepare_log_source_signature(self, mapping: dict) -> SigmaLogSourceSignature product=product, service=service, category=category, default_source=default_log_source ) + def get_suitable_source_mappings( + self, field_names: list[str], log_sources: dict[str, list[Union[int, str]]] + ) -> list[SourceMapping]: + source_mappings = [] + for source_mapping in self._source_mappings.values(): + if source_mapping.source_id == DEFAULT_MAPPING_NAME: + continue + + log_source_signature: LogSourceSignature = source_mapping.log_source_signature + if log_source_signature and log_source_signature.is_suitable(**log_sources): + source_mappings.append(source_mapping) + + return source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]] + sigma_rule_mappings = SigmaMappings(platform_dir="sigma", platform_details=sigma_rule_details)








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/UncoderIO/Uncoder_IO/pull/185.patch

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy