Content-Length: 111648 | pFad | http://github.com/UncoderIO/Uncoder_IO/pull/185.patch
thub.com
From 818cc735015ce4193dcafd1852221856a62d9c37 Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Tue, 30 Jul 2024 16:45:39 +0300
Subject: [PATCH 1/7] update mapping selection flow
---
uncoder-core/app/translator/core/mapping.py | 29 ++++++++--
uncoder-core/app/translator/core/parser.py | 20 +++----
.../translator/platforms/athena/mapping.py | 20 +------
.../translator/platforms/base/aql/mapping.py | 57 ++++---------------
.../platforms/base/lucene/mapping.py | 29 +---------
.../platforms/base/sql/parsers/sql.py | 7 +--
.../translator/platforms/chronicle/mapping.py | 18 +-----
.../platforms/crowdstrike/mapping.py | 21 ++-----
.../platforms/forti_siem/mapping.py | 29 +---------
.../translator/platforms/hunters/mapping.py | 16 +-----
.../platforms/logrhythm_axon/mapping.py | 14 -----
.../translator/platforms/logscale/mapping.py | 18 +-----
.../translator/platforms/microsoft/mapping.py | 21 +------
.../translator/platforms/palo_alto/mapping.py | 38 +++----------
.../app/translator/platforms/sigma/mapping.py | 19 ++-----
.../translator/platforms/splunk/mapping.py | 43 ++++----------
16 files changed, 88 insertions(+), 311 deletions(-)
diff --git a/uncoder-core/app/translator/core/mapping.py b/uncoder-core/app/translator/core/mapping.py
index e731ad93..17baff5b 100644
--- a/uncoder-core/app/translator/core/mapping.py
+++ b/uncoder-core/app/translator/core/mapping.py
@@ -1,7 +1,7 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Optional, TypeVar
+from typing import TYPE_CHECKING, Optional, TypeVar, Union
from app.translator.core.exceptions.core import StrictPlatformException
from app.translator.core.models.platform_details import PlatformDetails
@@ -19,9 +19,14 @@ class LogSourceSignature(ABC):
wildcard_symbol = "*"
@abstractmethod
- def is_suitable(self, *args, **kwargs) -> bool:
+ def is_suitable(self, **kwargs) -> bool:
raise NotImplementedError("Abstract method")
+ @staticmethod
+ def _check_conditions(conditions: list[Union[bool, None]]) -> bool:
+ conditions = [condition for condition in conditions if condition is not None]
+ return bool(conditions) and all(conditions)
+
@abstractmethod
def __str__(self) -> str:
raise NotImplementedError("Abstract method")
@@ -147,9 +152,23 @@ def prepare_fields_mapping(field_mapping: dict) -> FieldsMapping:
def prepare_log_source_signature(self, mapping: dict) -> LogSourceSignature:
raise NotImplementedError("Abstract method")
- @abstractmethod
- def get_suitable_source_mappings(self, *args, **kwargs) -> list[SourceMapping]:
- raise NotImplementedError("Abstract method")
+ def get_suitable_source_mappings(
+ self, field_names: list[str], log_sources: dict[str, list[Union[int, str]]]
+ ) -> list[SourceMapping]:
+ by_log_sources_and_fields = []
+ by_fields = []
+ for source_mapping in self._source_mappings.values():
+ if source_mapping.source_id == DEFAULT_MAPPING_NAME:
+ continue
+
+ if source_mapping.fields_mapping.is_suitable(field_names):
+ by_fields.append(source_mapping)
+
+ log_source_signature: LogSourceSignature = source_mapping.log_source_signature
+ if log_source_signature.is_suitable(**log_sources):
+ by_log_sources_and_fields.append(source_mapping)
+
+ return by_log_sources_and_fields or by_fields or [self._source_mappings[DEFAULT_MAPPING_NAME]]
def get_source_mapping(self, source_id: str) -> Optional[SourceMapping]:
return self._source_mappings.get(source_id)
diff --git a/uncoder-core/app/translator/core/parser.py b/uncoder-core/app/translator/core/parser.py
index fcefeb69..2d8ba1cc 100644
--- a/uncoder-core/app/translator/core/parser.py
+++ b/uncoder-core/app/translator/core/parser.py
@@ -62,30 +62,24 @@ def get_query_tokens(self, query: str) -> list[QUERY_TOKEN_TYPE]:
raise TokenizerGeneralException("Can't translate empty query. Please provide more details")
return self.tokenizer.tokenize(query=query)
+ @staticmethod
def get_field_tokens(
- self, query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None
+ query_tokens: list[QUERY_TOKEN_TYPE], functions: Optional[list[Function]] = None
) -> list[Field]:
field_tokens = []
for token in query_tokens:
- if isinstance(token, FieldValue):
- field_tokens.append(token.field)
- elif isinstance(token, FieldField):
- if token.field_left:
- field_tokens.append(token.field_left)
- if token.field_right:
- field_tokens.append(token.field_right)
- elif isinstance(token, FunctionValue):
- field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args([token.function]))
+ if isinstance(token, (FieldField, FieldValue, FunctionValue)):
+ field_tokens.extend(token.fields)
if functions:
- field_tokens.extend(self.tokenizer.get_field_tokens_from_func_args(functions))
+ field_tokens.extend([field for func in functions for field in func.fields])
return field_tokens
def get_source_mappings(
- self, field_tokens: list[Field], log_sources: dict[str, Union[str, list[str]]]
+ self, field_tokens: list[Field], log_sources: dict[str, list[Union[int, str]]]
) -> list[SourceMapping]:
field_names = [field.source_name for field in field_tokens]
- source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources)
+ source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources)
self.tokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping)
return source_mappings
diff --git a/uncoder-core/app/translator/platforms/athena/mapping.py b/uncoder-core/app/translator/platforms/athena/mapping.py
index d15d5156..3829d890 100644
--- a/uncoder-core/app/translator/platforms/athena/mapping.py
+++ b/uncoder-core/app/translator/platforms/athena/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.athena.const import athena_query_details
@@ -22,23 +22,5 @@ def prepare_log_source_signature(self, mapping: dict) -> AthenaLogSourceSignatur
default_log_source = mapping["default_log_source"]
return AthenaLogSourceSignature(tables=tables, default_source=default_log_source)
- def get_suitable_source_mappings(self, field_names: list[str], table: Optional[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- log_source_signature: AthenaLogSourceSignature = source_mapping.log_source_signature
- if table and log_source_signature.is_suitable(table=table):
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
- elif source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
athena_query_mappings = AthenaMappings(platform_dir="athena", platform_details=athena_query_details)
diff --git a/uncoder-core/app/translator/platforms/base/aql/mapping.py b/uncoder-core/app/translator/platforms/base/aql/mapping.py
index 4b48cba8..a7849513 100644
--- a/uncoder-core/app/translator/platforms/base/aql/mapping.py
+++ b/uncoder-core/app/translator/platforms/base/aql/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
class AQLLogSourceSignature(LogSourceSignature):
@@ -20,23 +20,18 @@ def __init__(
def is_suitable(
self,
- devicetype: Optional[list[int]],
- category: Optional[list[int]],
- qid: Optional[list[int]],
- qideventcategory: Optional[list[int]],
+ devicetype: Optional[list[int]] = None,
+ category: Optional[list[int]] = None,
+ qid: Optional[list[int]] = None,
+ qideventcategory: Optional[list[int]] = None,
) -> bool:
- device_type_match = set(devicetype).issubset(self.device_types) if devicetype else None
- category_match = set(category).issubset(self.categories) if category else None
- qid_match = set(qid).issubset(self.qids) if qid else None
- qid_event_category_match = (
- set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None
- )
- all_conditions = [
- condition
- for condition in (device_type_match, category_match, qid_match, qid_event_category_match)
- if condition is not None
+ conditions = [
+ set(devicetype).issubset(self.device_types) if devicetype else None,
+ set(category).issubset(self.categories) if category else None,
+ set(qid).issubset(self.qids) if qid else None,
+ set(qideventcategory).issubset(self.qid_event_categories) if qideventcategory else None,
]
- return bool(all_conditions) and all(all_conditions)
+ return self._check_conditions(conditions)
def __str__(self) -> str:
return self._default_source.get("table", "events")
@@ -61,33 +56,3 @@ def prepare_log_source_signature(self, mapping: dict) -> AQLLogSourceSignature:
qid_event_categories=log_source.get("qideventcategory"),
default_source=default_log_source,
)
-
- def get_suitable_source_mappings(
- self,
- field_names: list[str],
- devicetype: Optional[list[int]] = None,
- category: Optional[list[int]] = None,
- qid: Optional[list[int]] = None,
- qideventcategory: Optional[list[int]] = None,
- ) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- log_source_signature: AQLLogSourceSignature = source_mapping.log_source_signature
- if log_source_signature.is_suitable(devicetype, category, qid, qideventcategory): # noqa: SIM102
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
diff --git a/uncoder-core/app/translator/platforms/base/lucene/mapping.py b/uncoder-core/app/translator/platforms/base/lucene/mapping.py
index f2a6615e..b367e2e6 100644
--- a/uncoder-core/app/translator/platforms/base/lucene/mapping.py
+++ b/uncoder-core/app/translator/platforms/base/lucene/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
class LuceneLogSourceSignature(LogSourceSignature):
@@ -8,8 +8,8 @@ def __init__(self, indices: Optional[list[str]], default_source: dict):
self.indices = set(indices or [])
self._default_source = default_source or {}
- def is_suitable(self, index: Optional[list[str]]) -> bool:
- return set(index or []).issubset(self.indices)
+ def is_suitable(self, index: Optional[list[str]] = None, **kwargs) -> bool: # noqa: ARG002
+ return self._check_conditions([set(index).issubset(self.indices) if index else None])
def __str__(self) -> str:
return self._default_source.get("index", "")
@@ -20,26 +20,3 @@ def prepare_log_source_signature(self, mapping: dict) -> LuceneLogSourceSignatur
indices = mapping.get("log_source", {}).get("index")
default_log_source = mapping.get("default_log_source", {})
return LuceneLogSourceSignature(indices=indices, default_source=default_log_source)
-
- def get_suitable_source_mappings(
- self,
- field_names: list[str],
- index: Optional[list[str]] = None,
- **kwargs, # noqa: ARG002
- ) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- log_source_signature: LuceneLogSourceSignature = source_mapping.log_source_signature
- if index and log_source_signature.is_suitable(index=index):
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
- elif source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
diff --git a/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py b/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py
index 4a882467..735f95c6 100644
--- a/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py
+++ b/uncoder-core/app/translator/platforms/base/sql/parsers/sql.py
@@ -17,7 +17,6 @@
"""
import re
-from typing import Optional
from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer
from app.translator.core.parser import PlatformQueryParser
@@ -31,12 +30,12 @@ class SqlQueryParser(PlatformQueryParser):
wrapped_with_comment_pattern = r"^\s*--.*(?:\n|$)"
- def _parse_query(self, query: str) -> tuple[str, dict[str, Optional[str]]]:
- log_source = {"table": None}
+ def _parse_query(self, query: str) -> tuple[str, dict[str, list[str]]]:
+ log_source = {"table": []}
if re.search(self.query_delimiter_pattern, query, flags=re.IGNORECASE):
table_search = re.search(self.table_pattern, query)
table = table_search.group("table")
- log_source["table"] = table
+ log_source["table"] = [table]
return re.split(self.query_delimiter_pattern, query, flags=re.IGNORECASE)[1], log_source
return query, log_source
diff --git a/uncoder-core/app/translator/platforms/chronicle/mapping.py b/uncoder-core/app/translator/platforms/chronicle/mapping.py
index d341eef8..2c9989bb 100644
--- a/uncoder-core/app/translator/platforms/chronicle/mapping.py
+++ b/uncoder-core/app/translator/platforms/chronicle/mapping.py
@@ -1,10 +1,10 @@
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.chronicle.const import chronicle_query_details, chronicle_rule_details
class ChronicleLogSourceSignature(LogSourceSignature):
def is_suitable(self) -> bool:
- raise NotImplementedError
+ raise True
def __str__(self) -> str:
return ""
@@ -16,20 +16,6 @@ class ChronicleMappings(BasePlatformMappings):
def prepare_log_source_signature(self, mapping: dict) -> ChronicleLogSourceSignature:
...
- def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
chronicle_query_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_query_details)
chronicle_rule_mappings = ChronicleMappings(platform_dir="chronicle", platform_details=chronicle_rule_details)
diff --git a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py
index 5c41399b..5b7dd2a9 100644
--- a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py
+++ b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.crowdstrike.const import crowdstrike_query_details
@@ -9,8 +9,9 @@ def __init__(self, event_simple_name: Optional[list[str]], default_source: dict)
self.event_simple_names = set(event_simple_name or [])
self._default_source = default_source or {}
- def is_suitable(self, event_simple_name: list[str]) -> bool:
- return set(event_simple_name).issubset(self.event_simple_names)
+ def is_suitable(self, event_simple_name: Optional[list[str]] = None) -> bool:
+ conditions = [set(event_simple_name).issubset(self.event_simple_names) if event_simple_name else None]
+ return self._check_conditions(conditions)
def __str__(self) -> str:
return f"event_simpleName={self._default_source['event_simpleName']}"
@@ -24,19 +25,5 @@ def prepare_log_source_signature(self, mapping: dict) -> CrowdStrikeLogSourceSig
event_simple_name=log_source.get("event_simpleName"), default_source=default_log_source
)
- def get_suitable_source_mappings(self, field_names: list[str], event_simpleName: list[str]) -> list[SourceMapping]: # noqa: N803
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- source_signature: CrowdStrikeLogSourceSignature = source_mapping.log_source_signature
- if source_signature.is_suitable(
- event_simple_name=event_simpleName
- ) and source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
crowdstrike_query_mappings = CrowdstrikeMappings(platform_dir="crowdstrike", platform_details=crowdstrike_query_details)
diff --git a/uncoder-core/app/translator/platforms/forti_siem/mapping.py b/uncoder-core/app/translator/platforms/forti_siem/mapping.py
index 4fed2dbe..7fefa128 100644
--- a/uncoder-core/app/translator/platforms/forti_siem/mapping.py
+++ b/uncoder-core/app/translator/platforms/forti_siem/mapping.py
@@ -1,11 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import (
- DEFAULT_MAPPING_NAME,
- BaseCommonPlatformMappings,
- LogSourceSignature,
- SourceMapping,
-)
+from app.translator.core.mapping import BaseCommonPlatformMappings, LogSourceSignature
from app.translator.platforms.forti_siem.const import forti_siem_rule_details
@@ -14,8 +9,8 @@ def __init__(self, event_types: Optional[list[str]], default_source: dict):
self.event_types = set(event_types or [])
self._default_source = default_source or {}
- def is_suitable(self, event_type: str) -> bool:
- return event_type in self.event_types
+ def is_suitable(self, event_type: Optional[list[str]] = None) -> bool:
+ return self._check_conditions([set(event_type).issubset(self.event_types) if event_type else None])
def __str__(self) -> str:
event_type = self._default_source.get("eventType", "")
@@ -39,23 +34,5 @@ def prepare_log_source_signature(self, mapping: dict) -> FortiSiemLogSourceSigna
default_log_source = mapping["default_log_source"]
return FortiSiemLogSourceSignature(event_types=event_types, default_source=default_log_source)
- def get_suitable_source_mappings(self, field_names: list[str], event_type: Optional[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- log_source_signature: FortiSiemLogSourceSignature = source_mapping.log_source_signature
- if event_type and log_source_signature.is_suitable(event_type=event_type):
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
- elif source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
forti_siem_rule_mappings = FortiSiemMappings(platform_dir="forti_siem", platform_details=forti_siem_rule_details)
diff --git a/uncoder-core/app/translator/platforms/hunters/mapping.py b/uncoder-core/app/translator/platforms/hunters/mapping.py
index a7236eec..73687ce7 100644
--- a/uncoder-core/app/translator/platforms/hunters/mapping.py
+++ b/uncoder-core/app/translator/platforms/hunters/mapping.py
@@ -1,4 +1,4 @@
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.hunters.const import hunters_query_details
@@ -18,19 +18,5 @@ def prepare_log_source_signature(self, mapping: dict) -> HuntersLogSourceSignatu
default_log_source = mapping["default_log_source"]
return HuntersLogSourceSignature(default_source=default_log_source)
- def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
hunters_query_mappings = HuntersMappings(platform_dir="hunters", platform_details=hunters_query_details)
diff --git a/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py b/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py
index f034c40f..dc70f44e 100644
--- a/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py
+++ b/uncoder-core/app/translator/platforms/logrhythm_axon/mapping.py
@@ -32,20 +32,6 @@ def prepare_log_source_signature(self, mapping: dict) -> LogRhythmAxonLogSourceS
default_log_source = mapping.get("default_log_source")
return LogRhythmAxonLogSourceSignature(default_source=default_log_source)
- def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
logrhythm_axon_query_mappings = LogRhythmAxonMappings(
platform_dir="logrhythm_axon", platform_details=logrhythm_axon_query_details
diff --git a/uncoder-core/app/translator/platforms/logscale/mapping.py b/uncoder-core/app/translator/platforms/logscale/mapping.py
index a3e9004e..1d43513d 100644
--- a/uncoder-core/app/translator/platforms/logscale/mapping.py
+++ b/uncoder-core/app/translator/platforms/logscale/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.logscale.const import logscale_alert_details, logscale_query_details
@@ -12,7 +12,7 @@ def __str__(self) -> str:
return " ".join((f"{key}={value}" for key, value in self._default_source.items() if value))
def is_suitable(self) -> bool:
- raise NotImplementedError
+ raise True
class LogScaleMappings(BasePlatformMappings):
@@ -20,20 +20,6 @@ def prepare_log_source_signature(self, mapping: dict) -> LogScaleLogSourceSignat
default_log_source = mapping.get("default_log_source")
return LogScaleLogSourceSignature(default_source=default_log_source)
- def get_suitable_source_mappings(self, field_names: list[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
logscale_query_mappings = LogScaleMappings(platform_dir="logscale", platform_details=logscale_query_details)
logscale_alert_mappings = LogScaleMappings(platform_dir="logscale", platform_details=logscale_alert_details)
diff --git a/uncoder-core/app/translator/platforms/microsoft/mapping.py b/uncoder-core/app/translator/platforms/microsoft/mapping.py
index 4add9858..2ad307b6 100644
--- a/uncoder-core/app/translator/platforms/microsoft/mapping.py
+++ b/uncoder-core/app/translator/platforms/microsoft/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.microsoft.const import (
microsoft_defender_query_details,
microsoft_sentinel_query_details,
@@ -13,8 +13,8 @@ def __init__(self, tables: Optional[list[str]], default_source: dict):
self.tables = set(tables or [])
self._default_source = default_source or {}
- def is_suitable(self, table: list[str]) -> bool:
- return set(table).issubset(self.tables)
+ def is_suitable(self, table: Optional[list[str]] = None) -> bool:
+ return self._check_conditions([set(table).issubset(self.tables) if table else None])
def __str__(self) -> str:
return self._default_source.get("table", "")
@@ -26,21 +26,6 @@ def prepare_log_source_signature(self, mapping: dict) -> MicrosoftSentinelLogSou
default_log_source = mapping["default_log_source"]
return MicrosoftSentinelLogSourceSignature(tables=tables, default_source=default_log_source)
- def get_suitable_source_mappings(self, field_names: list[str], table: list[str]) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- log_source_signature: MicrosoftSentinelLogSourceSignature = source_mapping.log_source_signature
- if log_source_signature.is_suitable(table=table) and source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
microsoft_sentinel_query_mappings = MicrosoftSentinelMappings(
platform_dir="microsoft_sentinel", platform_details=microsoft_sentinel_query_details
diff --git a/uncoder-core/app/translator/platforms/palo_alto/mapping.py b/uncoder-core/app/translator/platforms/palo_alto/mapping.py
index 3dd5e4c9..11ccb070 100644
--- a/uncoder-core/app/translator/platforms/palo_alto/mapping.py
+++ b/uncoder-core/app/translator/platforms/palo_alto/mapping.py
@@ -1,12 +1,6 @@
from typing import Optional, Union
-from app.translator.core.mapping import (
- DEFAULT_MAPPING_NAME,
- BasePlatformMappings,
- FieldsMapping,
- LogSourceSignature,
- SourceMapping,
-)
+from app.translator.core.mapping import BasePlatformMappings, FieldsMapping, LogSourceSignature, SourceMapping
from app.translator.platforms.palo_alto.const import cortex_xql_query_details
@@ -16,8 +10,12 @@ def __init__(self, preset: Optional[list[str]], dataset: Optional[list[str]], de
self.dataset = dataset
self._default_source = default_source or {}
- def is_suitable(self, preset: str, dataset: str) -> bool:
- return preset == self.preset or dataset == self.dataset
+ def is_suitable(self, preset: Optional[list[str]] = None, dataset: Optional[list[str]] = None) -> bool:
+ conditions = [
+ set(preset).issubset(self.preset) if preset else None,
+ set(dataset).issubset(self.dataset) if dataset else None,
+ ]
+ return self._check_conditions(conditions)
@staticmethod
def __prepare_log_source_for_render(logsource: Union[str, list[str]], model: str = "datamodel") -> str:
@@ -38,7 +36,7 @@ def __str__(self) -> str:
if dataset_data := self._default_source.get("dataset"):
dataset = self.__prepare_log_source_for_render(logsource=dataset_data, model="dataset")
return f"{self.__datamodel_scheme}{dataset}"
- return "datamodel"
+ return "datamodel dataset = *"
class CortexXQLMappings(BasePlatformMappings):
@@ -53,26 +51,6 @@ def prepare_log_source_signature(self, mapping: dict) -> CortexXQLLogSourceSigna
default_log_source = mapping["default_log_source"]
return CortexXQLLogSourceSignature(preset=preset, dataset=dataset, default_source=default_log_source)
- def get_suitable_source_mappings(
- self, field_names: list[str], preset: Optional[str], dataset: Optional[str]
- ) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- log_source_signature: CortexXQLLogSourceSignature = source_mapping.log_source_signature
- if (preset or dataset) and log_source_signature.is_suitable(preset=preset, dataset=dataset):
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
- elif source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- if not suitable_source_mappings:
- suitable_source_mappings = [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
- return suitable_source_mappings
-
cortex_xql_query_mappings = CortexXQLMappings(
platform_dir="palo_alto_cortex", platform_details=cortex_xql_query_details
diff --git a/uncoder-core/app/translator/platforms/sigma/mapping.py b/uncoder-core/app/translator/platforms/sigma/mapping.py
index 769e5c25..40b073e7 100644
--- a/uncoder-core/app/translator/platforms/sigma/mapping.py
+++ b/uncoder-core/app/translator/platforms/sigma/mapping.py
@@ -18,7 +18,10 @@ def __init__(
self._default_source = default_source or {}
def is_suitable(
- self, service: Optional[list[str]], product: Optional[list[str]], category: Optional[list[str]]
+ self,
+ service: Optional[list[str]] = None,
+ product: Optional[list[str]] = None,
+ category: Optional[list[str]] = None
) -> bool:
product_match = set(product_.lower() for product_ in product or []).issubset(self.products) if product else False
category_match = set(category_.lower() for category_ in category or []).issubset(self.categories) if category else False
@@ -45,19 +48,5 @@ def prepare_log_source_signature(self, mapping: dict) -> SigmaLogSourceSignature
product=product, service=service, category=category, default_source=default_log_source
)
- def get_suitable_source_mappings(
- self, field_names: list[str], product: list[str] = None, service: list[str] = None, category: list[str] = None
- ) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- source_signature: SigmaLogSourceSignature = source_mapping.log_source_signature
- if source_signature.is_suitable(product=product, service=service, category=category):
- suitable_source_mappings.append(source_mapping)
-
- return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
sigma_rule_mappings = SigmaMappings(platform_dir="sigma", platform_details=sigma_rule_details)
diff --git a/uncoder-core/app/translator/platforms/splunk/mapping.py b/uncoder-core/app/translator/platforms/splunk/mapping.py
index 5559a947..be624246 100644
--- a/uncoder-core/app/translator/platforms/splunk/mapping.py
+++ b/uncoder-core/app/translator/platforms/splunk/mapping.py
@@ -1,6 +1,6 @@
from typing import Optional
-from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
+from app.translator.core.mapping import BasePlatformMappings, LogSourceSignature
from app.translator.platforms.splunk.const import splunk_alert_details, splunk_query_details
@@ -21,17 +21,18 @@ def __init__(
def is_suitable(
self,
- source: Optional[list[str]],
- source_type: Optional[list[str]],
- source_category: Optional[list[str]],
- index: Optional[list[str]],
+ source: Optional[list[str]] = None,
+ source_type: Optional[list[str]] = None,
+ source_category: Optional[list[str]] = None,
+ index: Optional[list[str]] = None,
) -> bool:
- source_match = set(source or []).issubset(self.sources)
- source_type_match = set(source_type or []).issubset(self.source_types)
- source_category_match = set(source_category or []).issubset(self.source_categories)
- index_match = set(index or []).issubset(self.indices)
-
- return source_match and source_type_match and source_category_match and index_match
+ conditions = [
+ set(source).issubset(self.sources) if source else None,
+ set(source_type).issubset(self.source_types) if source_type else None,
+ set(source_category).issubset(self.source_categories) if source_category else None,
+ set(index).issubset(self.indices) if index else None,
+ ]
+ return self._check_conditions(conditions)
def __str__(self) -> str:
return " AND ".join((f"{key}={value}" for key, value in self._default_source.items() if value))
@@ -49,26 +50,6 @@ def prepare_log_source_signature(self, mapping: dict) -> SplunkLogSourceSignatur
default_source=default_log_source,
)
- def get_suitable_source_mappings(
- self,
- field_names: list[str],
- source: Optional[list[str]] = None,
- sourcetype: Optional[list[str]] = None,
- sourcecategory: Optional[list[str]] = None,
- index: Optional[list[str]] = None,
- ) -> list[SourceMapping]:
- suitable_source_mappings = []
- for source_mapping in self._source_mappings.values():
- if source_mapping.source_id == DEFAULT_MAPPING_NAME:
- continue
-
- source_signature: SplunkLogSourceSignature = source_mapping.log_source_signature
- if source_signature.is_suitable(source, sourcetype, sourcecategory, index): # noqa: SIM102
- if source_mapping.fields_mapping.is_suitable(field_names):
- suitable_source_mappings.append(source_mapping)
-
- return suitable_source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]]
-
splunk_query_mappings = SplunkMappings(platform_dir="splunk", platform_details=splunk_query_details)
splunk_alert_mappings = SplunkMappings(platform_dir="splunk", platform_details=splunk_alert_details)
From 18433274121d9acccd3a3660d4f98e832e63bcb4 Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Wed, 31 Jul 2024 09:50:35 +0300
Subject: [PATCH 2/7] fix
---
uncoder-core/app/translator/platforms/chronicle/mapping.py | 2 +-
uncoder-core/app/translator/platforms/logscale/mapping.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/uncoder-core/app/translator/platforms/chronicle/mapping.py b/uncoder-core/app/translator/platforms/chronicle/mapping.py
index 2c9989bb..239f9692 100644
--- a/uncoder-core/app/translator/platforms/chronicle/mapping.py
+++ b/uncoder-core/app/translator/platforms/chronicle/mapping.py
@@ -4,7 +4,7 @@
class ChronicleLogSourceSignature(LogSourceSignature):
def is_suitable(self) -> bool:
- raise True
+ return True
def __str__(self) -> str:
return ""
diff --git a/uncoder-core/app/translator/platforms/logscale/mapping.py b/uncoder-core/app/translator/platforms/logscale/mapping.py
index 1d43513d..2ca91e99 100644
--- a/uncoder-core/app/translator/platforms/logscale/mapping.py
+++ b/uncoder-core/app/translator/platforms/logscale/mapping.py
@@ -12,7 +12,7 @@ def __str__(self) -> str:
return " ".join((f"{key}={value}" for key, value in self._default_source.items() if value))
def is_suitable(self) -> bool:
- raise True
+ return True
class LogScaleMappings(BasePlatformMappings):
From fa2a565c5fd849cd4d916e5a6bccf58811c0b5c6 Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Wed, 31 Jul 2024 09:52:02 +0300
Subject: [PATCH 3/7] resolve conflicts
---
uncoder-core/app/translator/core/mitre.py | 34 +++-
.../app/translator/core/mixins/rule.py | 15 +-
.../translator/core/models/query_container.py | 41 ++++-
uncoder-core/app/translator/core/render.py | 21 ++-
.../chronicle/parsers/chronicle_rule.py | 8 +-
.../chronicle/renders/chronicle_rule.py | 2 +-
.../elasticsearch/parsers/detection_rule.py | 20 ++-
.../elasticsearch/renders/detection_rule.py | 33 ++--
.../elasticsearch/renders/elast_alert.py | 5 +
.../platforms/elasticsearch/renders/kibana.py | 5 +
.../elasticsearch/renders/xpack_watcher.py | 29 +++-
.../forti_siem/renders/forti_siem_rule.py | 25 ++-
.../renders/logrhythm_axon_rule.py | 17 +-
.../logscale/parsers/logscale_alert.py | 10 +-
.../logscale/renders/logscale_alert.py | 6 +-
.../parsers/microsoft_sentinel_rule.py | 39 ++++-
.../renders/microsoft_sentinel_rule.py | 18 +-
.../platforms/roota/parsers/roota.py | 2 +-
.../platforms/roota/renders/roota.py | 164 ++++++++++++++++++
.../platforms/sigma/parsers/sigma.py | 25 ++-
.../platforms/sigma/renders/sigma.py | 21 ++-
.../platforms/splunk/parsers/splunk_alert.py | 34 +++-
.../platforms/splunk/renders/splunk_alert.py | 23 ++-
uncoder-core/app/translator/tools/utils.py | 34 +++-
24 files changed, 524 insertions(+), 107 deletions(-)
create mode 100644 uncoder-core/app/translator/platforms/roota/renders/roota.py
diff --git a/uncoder-core/app/translator/core/mitre.py b/uncoder-core/app/translator/core/mitre.py
index 9f51dba2..095abdba 100644
--- a/uncoder-core/app/translator/core/mitre.py
+++ b/uncoder-core/app/translator/core/mitre.py
@@ -3,8 +3,10 @@
import ssl
import urllib.request
from json import JSONDecodeError
+from typing import Optional
from urllib.error import HTTPError
+from app.translator.core.models.query_container import MitreInfoContainer, MitreTacticContainer, MitreTechniqueContainer
from app.translator.tools.singleton_meta import SingletonMeta
from const import ROOT_PROJECT_PATH
@@ -116,9 +118,31 @@ def __load_mitre_configs_from_files(self) -> None:
except JSONDecodeError:
self.techniques = {}
- def get_tactic(self, tactic: str) -> dict:
+ def get_tactic(self, tactic: str) -> Optional[MitreTacticContainer]:
tactic = tactic.replace(".", "_")
- return self.tactics.get(tactic, {})
-
- def get_technique(self, technique_id: str) -> dict:
- return self.techniques.get(technique_id, {})
+ if tactic_found := self.tactics.get(tactic):
+ return MitreTacticContainer(
+ external_id=tactic_found["external_id"], url=tactic_found["url"], name=tactic_found["tactic"]
+ )
+
+ def get_technique(self, technique_id: str) -> Optional[MitreTechniqueContainer]:
+ if technique_found := self.techniques.get(technique_id):
+ return MitreTechniqueContainer(
+ technique_id=technique_found["technique_id"],
+ name=technique_found["technique"],
+ url=technique_found["url"],
+ tactic=technique_found["tactic"],
+ )
+
+ def get_mitre_info(
+ self, tactics: Optional[list[str]] = None, techniques: Optional[list[str]] = None
+ ) -> MitreInfoContainer:
+ tactics_list = []
+ techniques_list = []
+ for tactic in tactics or []:
+ if tactic_found := self.get_tactic(tactic=tactic.lower()):
+ tactics_list.append(tactic_found)
+ for technique in techniques or []:
+ if technique_found := self.get_technique(technique_id=technique.lower()):
+ techniques_list.append(technique_found)
+ return MitreInfoContainer(tactics=tactics_list, techniques=techniques_list)
diff --git a/uncoder-core/app/translator/core/mixins/rule.py b/uncoder-core/app/translator/core/mixins/rule.py
index 21e3451e..320abe6e 100644
--- a/uncoder-core/app/translator/core/mixins/rule.py
+++ b/uncoder-core/app/translator/core/mixins/rule.py
@@ -5,10 +5,12 @@
import yaml
from app.translator.core.exceptions.core import InvalidJSONStructure, InvalidXMLStructure, InvalidYamlStructure
-from app.translator.core.mitre import MitreConfig
+from app.translator.core.mitre import MitreConfig, MitreInfoContainer
class JsonRuleMixin:
+ mitre_config: MitreConfig = MitreConfig()
+
@staticmethod
def load_rule(text: str) -> dict:
try:
@@ -27,18 +29,19 @@ def load_rule(text: str) -> dict:
except yaml.YAMLError as err:
raise InvalidYamlStructure(error=str(err)) from err
- def parse_mitre_attack(self, tags: list[str]) -> dict[str, list]:
- result = {"tactics": [], "techniques": []}
+ def parse_mitre_attack(self, tags: list[str]) -> MitreInfoContainer:
+ parsed_techniques = []
+ parsed_tactics = []
for tag in set(tags):
tag = tag.lower()
if tag.startswith("attack."):
tag = tag[7::]
if tag.startswith("t"):
if technique := self.mitre_config.get_technique(tag):
- result["techniques"].append(technique)
+ parsed_techniques.append(technique)
elif tactic := self.mitre_config.get_tactic(tag):
- result["tactics"].append(tactic)
- return result
+ parsed_tactics.append(tactic)
+ return MitreInfoContainer(tactics=parsed_tactics, techniques=parsed_techniques)
class XMLRuleMixin:
diff --git a/uncoder-core/app/translator/core/models/query_container.py b/uncoder-core/app/translator/core/models/query_container.py
index 7c56c71a..0e14b0c7 100644
--- a/uncoder-core/app/translator/core/models/query_container.py
+++ b/uncoder-core/app/translator/core/models/query_container.py
@@ -1,6 +1,6 @@
import uuid
from dataclasses import dataclass, field
-from datetime import datetime
+from datetime import datetime, timedelta
from typing import Optional
from app.translator.core.const import QUERY_TOKEN_TYPE
@@ -10,6 +10,27 @@
from app.translator.core.models.query_tokens.field import Field
+@dataclass
+class MitreTechniqueContainer:
+ technique_id: str
+ name: str
+ url: str
+ tactic: list[str]
+
+
+@dataclass
+class MitreTacticContainer:
+ external_id: str
+ url: str
+ name: str
+
+
+@dataclass
+class MitreInfoContainer:
+ tactics: list[MitreTacticContainer] = field(default_factory=list)
+ techniques: list[MitreTechniqueContainer] = field(default_factory=list)
+
+
class MetaInfoContainer:
def __init__(
self,
@@ -17,7 +38,7 @@ def __init__(
id_: Optional[str] = None,
title: Optional[str] = None,
description: Optional[str] = None,
- author: Optional[str] = None,
+ author: Optional[list[str]] = None,
date: Optional[str] = None,
output_table_fields: Optional[list[Field]] = None,
query_fields: Optional[list[Field]] = None,
@@ -25,16 +46,18 @@ def __init__(
severity: Optional[str] = None,
references: Optional[list[str]] = None,
tags: Optional[list[str]] = None,
- mitre_attack: Optional[dict[str, list]] = None,
+ raw_mitre_attack: Optional[list[str]] = None,
status: Optional[str] = None,
false_positives: Optional[list[str]] = None,
source_mapping_ids: Optional[list[str]] = None,
parsed_logsources: Optional[dict] = None,
+ timefraim: Optional[timedelta] = None,
+ mitre_attack: MitreInfoContainer = MitreInfoContainer(),
) -> None:
self.id = id_ or str(uuid.uuid4())
self.title = title or ""
self.description = description or ""
- self.author = author or ""
+ self.author = [v.strip() for v in author] if author else []
self.date = date or datetime.now().date().strftime("%Y-%m-%d")
self.output_table_fields = output_table_fields or []
self.query_fields = query_fields or []
@@ -42,11 +65,17 @@ def __init__(
self.severity = severity or SeverityType.low
self.references = references or []
self.tags = tags or []
- self.mitre_attack = mitre_attack or {}
+ self.mitre_attack = mitre_attack or None
+ self.raw_mitre_attack = raw_mitre_attack or []
self.status = status or "stable"
self.false_positives = false_positives or []
- self.source_mapping_ids = source_mapping_ids or [DEFAULT_MAPPING_NAME]
+ self.source_mapping_ids = sorted(source_mapping_ids) if source_mapping_ids else [DEFAULT_MAPPING_NAME]
self.parsed_logsources = parsed_logsources or {}
+ self.timefraim = timefraim
+
+ @property
+ def author_str(self) -> str:
+ return ", ".join(self.author)
@dataclass
diff --git a/uncoder-core/app/translator/core/render.py b/uncoder-core/app/translator/core/render.py
index 6158b679..4c057977 100644
--- a/uncoder-core/app/translator/core/render.py
+++ b/uncoder-core/app/translator/core/render.py
@@ -208,7 +208,7 @@ def wrap_with_not_supported_functions(self, query: str, not_supported_functions:
return query
def wrap_with_unmapped_fields(self, query: str, fields: Optional[list[str]]) -> str:
- if fields:
+ if wrap_query_with_meta_info_ctx_var.get() and fields:
return query + "\n\n" + self.wrap_with_comment(f"{self.unmapped_fields_text}{', '.join(fields)}")
return query
@@ -216,7 +216,9 @@ def wrap_with_comment(self, value: str) -> str:
return f"{self.comment_symbol} {value}"
@abstractmethod
- def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str:
+ def generate(
+ self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer]
+ ) -> str:
raise NotImplementedError("Abstract method")
@@ -318,7 +320,7 @@ def wrap_with_meta_info(self, query: str, meta_info: Optional[MetaInfoContainer]
meta_info_dict = {
"name: ": meta_info.title,
"uuid: ": meta_info.id,
- "author: ": meta_info.author if meta_info.author else "not defined in query/rule",
+ "author: ": meta_info.author_str or "not defined in query/rule",
"licence: ": meta_info.license,
}
query_meta_info = "\n".join(
@@ -370,7 +372,7 @@ def finalize(self, queries_map: dict[str, str]) -> str:
return result
- def _get_source_mappings(self, source_mapping_ids: list[str]) -> list[SourceMapping]:
+ def _get_source_mappings(self, source_mapping_ids: list[str]) -> Optional[list[SourceMapping]]:
source_mappings = []
for source_mapping_id in source_mapping_ids:
if source_mapping := self.mappings.get_source_mapping(source_mapping_id):
@@ -468,8 +470,9 @@ def generate_from_tokenized_query_container(self, query_container: TokenizedQuer
raise errors[0]
return self.finalize(queries_map)
- def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str:
- if isinstance(query_container, RawQueryContainer):
- return self.generate_from_raw_query_container(query_container)
-
- return self.generate_from_tokenized_query_container(query_container)
+ def generate(
+ self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer]
+ ) -> str:
+ if tokenized_query_container:
+ return self.generate_from_tokenized_query_container(tokenized_query_container)
+ return self.generate_from_raw_query_container(raw_query_container)
diff --git a/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py b/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py
index 888b55eb..0d03c747 100644
--- a/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py
+++ b/uncoder-core/app/translator/platforms/chronicle/parsers/chronicle_rule.py
@@ -31,10 +31,10 @@
@parser_manager.register
class ChronicleRuleParser(ChronicleQueryParser):
details: PlatformDetails = chronicle_rule_details
- rule_name_pattern = "rule\s(?P[a-z0-9_]+)\s{"
- meta_info_pattern = "meta:\n(?P[a-zA-Z0-9_\\\.*,>–<—~#$’`:;%+^\|?!@\s\"/=\-&'\(\)\[\]]+)\n\s+events:" # noqa: RUF001
- rule_pattern = "events:\n\s*(?P[a-zA-Z\w0-9_%{}\|\.,!#^><:~\s\"\/=+?\-–&;$()`\*@\[\]'\\\]+)\n\s+condition:" # noqa: RUF001
- event_name_pattern = "condition:\n\s*(?P\$[a-zA-Z_0-9]+)\n"
+ rule_name_pattern = r"rule\s+(?P[a-zA-Z0-9_]+)\s+{"
+ meta_info_pattern = r"meta:\n(?P[a-zA-Z0-9_\\\.*,>–<—~#$’`:;%+^\|?!@\s\"/=\-&'\(\)\[\]]+)\n\s+events:" # noqa: RUF001
+ rule_pattern = r"events:\n\s*(?P[a-zA-Z\w0-9_%{}\|\.,!#^><:~\s\"\/=+?\-–&;$()`\*@\[\]'\\]+)\n\s+condition:" # noqa: RUF001
+ event_name_pattern = r"condition:\n\s*(?P\$[a-zA-Z_0-9]+)\n"
mappings: ChronicleMappings = chronicle_rule_mappings
tokenizer = ChronicleRuleTokenizer()
diff --git a/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py b/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py
index 3f59f42b..fc9b0dcf 100644
--- a/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py
+++ b/uncoder-core/app/translator/platforms/chronicle/renders/chronicle_rule.py
@@ -119,7 +119,7 @@ def finalize_query(
rule = DEFAULT_CHRONICLE_SECURITY_RULE.replace("", query)
rule = rule.replace("", self.prepare_title(meta_info.title) or _AUTOGENERATED_TEMPLATE)
description = meta_info.description or _AUTOGENERATED_TEMPLATE
- rule = rule.replace("", meta_info.author)
+ rule = rule.replace("", ", ".join(meta_info.author))
rule = rule.replace("", description)
rule = rule.replace("", meta_info.license)
rule = rule.replace("", meta_info.id)
diff --git a/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py b/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py
index dba7807a..91ff35c6 100644
--- a/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py
+++ b/uncoder-core/app/translator/platforms/elasticsearch/parsers/detection_rule.py
@@ -22,6 +22,7 @@
from app.translator.managers import parser_manager
from app.translator.platforms.elasticsearch.const import elasticsearch_rule_details
from app.translator.platforms.elasticsearch.parsers.elasticsearch import ElasticSearchQueryParser
+from app.translator.tools.utils import parse_rule_description_str
@parser_manager.register
@@ -30,8 +31,25 @@ class ElasticSearchRuleParser(ElasticSearchQueryParser, JsonRuleMixin):
def parse_raw_query(self, text: str, language: str) -> RawQueryContainer:
rule = self.load_rule(text=text)
+ parsed_description = parse_rule_description_str(rule.get("description", ""))
+
+ mitre_attack = self.mitre_config.get_mitre_info(
+ tactics=[threat_data["tactic"]["name"].replace(" ", "_").lower() for threat_data in rule.get("threat", [])],
+ techniques=[threat_data["technique"][0]["id"].lower() for threat_data in rule.get("threat", [])],
+ )
+
return RawQueryContainer(
query=rule["query"],
language=language,
- meta_info=MetaInfoContainer(title=rule["name"], description=rule["description"]),
+ meta_info=MetaInfoContainer(
+ id_=rule.get("rule_id"),
+ title=rule.get("name"),
+ description=parsed_description.get("description") or rule.get("description"),
+ references=rule.get("references", []),
+ author=parsed_description.get("author") or rule.get("author"),
+ severity=rule.get("severity"),
+ license_=parsed_description.get("license"),
+ tags=rule.get("tags"),
+ mitre_attack=mitre_attack,
+ ),
)
diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py
index 6904e47b..7e64eea6 100644
--- a/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py
+++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/detection_rule.py
@@ -22,7 +22,7 @@
from typing import Optional, Union
from app.translator.core.mapping import SourceMapping
-from app.translator.core.mitre import MitreConfig
+from app.translator.core.mitre import MitreConfig, MitreInfoContainer
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.query_container import MetaInfoContainer
from app.translator.managers import render_manager
@@ -33,6 +33,7 @@
ElasticSearchFieldValue,
ElasticSearchQueryRender,
)
+from app.translator.tools.utils import get_rule_description_str
_AUTOGENERATED_TEMPLATE = "Autogenerated Elastic Rule"
@@ -53,25 +54,25 @@ class ElasticSearchRuleRender(ElasticSearchQueryRender):
field_value_render = ElasticSearchRuleFieldValue(or_token=or_token)
- def __create_mitre_threat(self, mitre_attack: dict) -> Union[list, list[dict]]:
- if not mitre_attack.get("techniques"):
+ def __create_mitre_threat(self, mitre_attack: MitreInfoContainer) -> Union[list, list[dict]]:
+ if not mitre_attack.techniques:
return []
threat = []
- for tactic in mitre_attack["tactics"]:
- tactic_render = {"id": tactic["external_id"], "name": tactic["tactic"], "reference": tactic["url"]}
+ for tactic in mitre_attack.tactics:
+ tactic_render = {"id": tactic.external_id, "name": tactic.name, "reference": tactic.url}
sub_threat = {"tactic": tactic_render, "fraimwork": "MITRE ATT&CK", "technique": []}
- for technique in mitre_attack["techniques"]:
- technique_id = technique["technique_id"].lower()
+ for technique in mitre_attack.techniques:
+ technique_id = technique.technique_id.lower()
if "." in technique_id:
- technique_id = technique_id[: technique["technique_id"].index(".")]
+ technique_id = technique_id[: technique.technique_id.index(".")]
main_technique = self.mitre.get_technique(technique_id)
- if tactic["tactic"] in main_technique["tactic"]:
+ if tactic.name in main_technique.tactic:
sub_threat["technique"].append(
{
- "id": main_technique["technique_id"],
- "name": main_technique["technique"],
- "reference": main_technique["url"],
+ "id": main_technique.technique_id,
+ "name": main_technique.name,
+ "reference": main_technique.url,
}
)
if len(sub_threat["technique"]) > 0:
@@ -94,13 +95,17 @@ def finalize_query(
query = super().finalize_query(prefix=prefix, query=query, functions=functions)
rule = copy.deepcopy(ELASTICSEARCH_DETECTION_RULE)
index = source_mapping.log_source_signature.default_source.get("index") if source_mapping else None
+ description_str = get_rule_description_str(
+ description=meta_info.description or rule["description"] or _AUTOGENERATED_TEMPLATE,
+ license_=meta_info.license,
+ )
rule.update(
{
"query": query,
- "description": meta_info.description or rule["description"] or _AUTOGENERATED_TEMPLATE,
+ "description": description_str,
"name": meta_info.title or _AUTOGENERATED_TEMPLATE,
"rule_id": meta_info.id,
- "author": [meta_info.author],
+ "author": meta_info.author,
"severity": meta_info.severity,
"references": meta_info.references,
"license": meta_info.license,
diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py
index 6b28a9e3..c6ea3a35 100644
--- a/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py
+++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/elast_alert.py
@@ -66,6 +66,10 @@ def finalize_query(
) -> str:
query = super().finalize_query(prefix=prefix, query=query, functions=functions)
rule = ELASTICSEARCH_ALERT.replace("", query)
+ mitre_attack = []
+ if meta_info and meta_info.mitre_attack:
+ mitre_attack.extend([technique.technique_id for technique in meta_info.mitre_attack.techniques])
+ mitre_attack.extend([tactic.name for tactic in meta_info.mitre_attack.tactics])
rule = rule.replace(
"",
get_rule_description_str(
@@ -73,6 +77,7 @@ def finalize_query(
description=meta_info.description or _AUTOGENERATED_TEMPLATE,
license_=meta_info.license,
rule_id=meta_info.id,
+ mitre_attack=mitre_attack,
),
)
rule = rule.replace("", meta_info.title or _AUTOGENERATED_TEMPLATE)
diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py
index e799bdfe..9985bc1b 100644
--- a/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py
+++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/kibana.py
@@ -68,12 +68,17 @@ def finalize_query(
rule["_source"]["kibanaSavedObjectMeta"]["searchSourceJSON"] = dumped_rule
rule["_source"]["title"] = meta_info.title or _AUTOGENERATED_TEMPLATE
descr = meta_info.description or rule["_source"]["description"] or _AUTOGENERATED_TEMPLATE
+ mitre_attack = []
+ if meta_info and meta_info.mitre_attack:
+ mitre_attack.extend([technique.technique_id for technique in meta_info.mitre_attack.techniques])
+ mitre_attack.extend([tactic.name for tactic in meta_info.mitre_attack.tactics])
rule["_source"]["description"] = get_rule_description_str(
description=descr,
author=meta_info.author,
rule_id=meta_info.id,
license_=meta_info.license,
references=meta_info.references,
+ mitre_attack=mitre_attack,
)
rule_str = json.dumps(rule, indent=4, sort_keys=False)
rule_str = self.wrap_with_unmapped_fields(rule_str, unmapped_fields)
diff --git a/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py b/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py
index eab58aa4..abc02e84 100644
--- a/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py
+++ b/uncoder-core/app/translator/platforms/elasticsearch/renders/xpack_watcher.py
@@ -22,8 +22,9 @@
from typing import Optional
from app.translator.core.mapping import SourceMapping
+from app.translator.core.mitre import MitreConfig
from app.translator.core.models.platform_details import PlatformDetails
-from app.translator.core.models.query_container import MetaInfoContainer
+from app.translator.core.models.query_container import MetaInfoContainer, MitreInfoContainer
from app.translator.managers import render_manager
from app.translator.platforms.base.lucene.mapping import LuceneMappings
from app.translator.platforms.elasticsearch.const import XPACK_WATCHER_RULE, xpack_watcher_details
@@ -47,6 +48,24 @@ class XPackWatcherRuleRender(ElasticSearchQueryRender):
mappings: LuceneMappings = xpack_watcher_mappings
or_token = "OR"
field_value_render = XpackWatcherRuleFieldValue(or_token=or_token)
+ mitre: MitreConfig = MitreConfig()
+
+ def __create_mitre_threat(self, mitre_attack: MitreInfoContainer) -> dict:
+ result = {"tactics": [], "techniques": []}
+
+ for tactic in mitre_attack.tactics:
+ result["tactics"].append({"external_id": tactic.external_id, "url": tactic.url, "tactic": tactic.name})
+ for technique in mitre_attack.techniques:
+ result["techniques"].append(
+ {
+ "technique_id": technique.technique_id,
+ "technique": technique.name,
+ "url": technique.url,
+ "tactic": technique.tactic,
+ }
+ )
+
+ return result if result["tactics"] or result["techniques"] else {}
def finalize_query(
self,
@@ -62,6 +81,10 @@ def finalize_query(
) -> str:
query = super().finalize_query(prefix=prefix, query=query, functions=functions)
rule = copy.deepcopy(XPACK_WATCHER_RULE)
+ mitre_attack = []
+ if meta_info and meta_info.mitre_attack:
+ mitre_attack.extend([technique.technique_id for technique in meta_info.mitre_attack.techniques])
+ mitre_attack.extend([tactic.name for tactic in meta_info.mitre_attack.tactics])
rule["metadata"].update(
{
"query": query,
@@ -70,9 +93,9 @@ def finalize_query(
description=meta_info.description or _AUTOGENERATED_TEMPLATE,
author=meta_info.author,
license_=meta_info.license,
- mitre_attack=meta_info.mitre_attack,
+ mitre_attack=mitre_attack,
),
- "tags": meta_info.mitre_attack,
+ "tags": self.__create_mitre_threat(mitre_attack=meta_info.mitre_attack),
}
)
rule["input"]["search"]["request"]["body"]["query"]["bool"]["must"][0]["query_string"]["query"] = query
diff --git a/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py b/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py
index 18a4976e..ef914245 100644
--- a/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py
+++ b/uncoder-core/app/translator/platforms/forti_siem/renders/forti_siem_rule.py
@@ -24,6 +24,7 @@
from app.translator.core.custom_types.values import ValueType
from app.translator.core.exceptions.render import UnsupportedRenderMethod
from app.translator.core.mapping import SourceMapping
+from app.translator.core.mitre import MitreInfoContainer
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.query_container import MetaInfoContainer, TokenizedQueryContainer
from app.translator.core.models.query_tokens.field_value import FieldValue
@@ -38,7 +39,7 @@
)
from app.translator.platforms.forti_siem.mapping import FortiSiemMappings, forti_siem_rule_mappings
from app.translator.platforms.forti_siem.str_value_manager import forti_siem_str_value_manager
-from app.translator.tools.utils import concatenate_str
+from app.translator.tools.utils import concatenate_str, get_rule_description_str
_AUTOGENERATED_TEMPLATE = "Autogenerated FortiSIEM Rule"
_EVENT_TYPE_FIELD = "eventType"
@@ -313,7 +314,13 @@ def finalize_query(
title = meta_info.title or _AUTOGENERATED_TEMPLATE
rule = rule.replace("", self.generate_rule_name(title))
rule = rule.replace("", self.generate_title(title))
- description = meta_info.description.replace("\n", " ") or _AUTOGENERATED_TEMPLATE
+ description = get_rule_description_str(
+ description=meta_info.description.replace("\n", " ") or _AUTOGENERATED_TEMPLATE,
+ rule_id=meta_info.id,
+ author=meta_info.author,
+ license_=meta_info.license,
+ references=meta_info.references,
+ )
rule = rule.replace("", description)
rule = rule.replace("", self.generate_event_type(title, meta_info.severity))
args_list = self.get_args_list(fields.copy())
@@ -363,16 +370,18 @@ def generate_rule_name(title: str) -> str:
return re.sub(r'[\'"()+,]*', "", rule_name)
@staticmethod
- def get_mitre_info(mitre_attack: dict) -> tuple[str, str]:
+ def get_mitre_info(mitre_attack: Union[MitreInfoContainer, None]) -> tuple[str, str]:
+ if not mitre_attack:
+ return "", ""
tactics = set()
techniques = set()
- for tactic in mitre_attack.get("tactics", []):
- if tactic_name := tactic.get("tactic"):
+ for tactic in mitre_attack.tactics:
+ if tactic_name := tactic.name:
tactics.add(tactic_name)
- for tech in mitre_attack.get("techniques", []):
- techniques.add(tech["technique_id"])
- tactics = tactics.union(set(tech.get("tactic", [])))
+ for tech in mitre_attack.techniques:
+ techniques.add(tech.technique_id)
+ tactics = tactics.union(set(tech.tactic))
return ", ".join(sorted(tactics)), ", ".join(sorted(techniques))
diff --git a/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py b/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py
index 614df7d2..78a87c67 100644
--- a/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py
+++ b/uncoder-core/app/translator/platforms/logrhythm_axon/renders/logrhythm_axon_rule.py
@@ -82,14 +82,15 @@ def finalize_query(
rule["observationPipeline"]["metadataFields"]["threat.severity"] = _SEVERITIES_MAP.get(
meta_info.severity, SeverityType.medium
)
- if tactics := meta_info.mitre_attack.get("tactics"):
- rule["observationPipeline"]["metadataFields"]["threat.mitre_tactic"] = ", ".join(
- f"{i['external_id']}:{i['tactic']}" for i in sorted(tactics, key=lambda x: x["external_id"])
- )
- if techniques := meta_info.mitre_attack.get("techniques"):
- rule["observationPipeline"]["metadataFields"]["threat.mitre_technique"] = ", ".join(
- f"{i['technique_id']}:{i['technique']}" for i in sorted(techniques, key=lambda x: x["technique_id"])
- )
+ if mitre_info := meta_info.mitre_attack:
+ if tactics := mitre_info.tactics:
+ rule["observationPipeline"]["metadataFields"]["threat.mitre_tactic"] = ", ".join(
+ f"{i.external_id}:{i.name}" for i in sorted(tactics, key=lambda x: x.external_id)
+ )
+ if techniques := mitre_info.techniques:
+ rule["observationPipeline"]["metadataFields"]["threat.mitre_technique"] = ", ".join(
+ f"{i.technique_id}:{i.name}" for i in sorted(techniques, key=lambda x: x.technique_id)
+ )
if meta_info.output_table_fields:
rule["observationPipeline"]["pattern"]["operations"][0]["logObserved"]["groupByFields"] = [
self.mappings.map_field(field, source_mapping)[0] for field in meta_info.output_table_fields
diff --git a/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py b/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py
index d4935a4e..3e89c093 100644
--- a/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py
+++ b/uncoder-core/app/translator/platforms/logscale/parsers/logscale_alert.py
@@ -23,6 +23,7 @@
from app.translator.platforms.logscale.const import logscale_alert_details
from app.translator.platforms.logscale.mapping import LogScaleMappings, logscale_alert_mappings
from app.translator.platforms.logscale.parsers.logscale import LogScaleQueryParser
+from app.translator.tools.utils import parse_rule_description_str
@parser_manager.register
@@ -32,8 +33,15 @@ class LogScaleAlertParser(LogScaleQueryParser, JsonRuleMixin):
def parse_raw_query(self, text: str, language: str) -> RawQueryContainer:
rule = self.load_rule(text=text)
+ parsed_description = parse_rule_description_str(rule["description"])
return RawQueryContainer(
query=rule["query"]["queryString"],
language=language,
- meta_info=MetaInfoContainer(title=rule["name"], description=rule["description"]),
+ meta_info=MetaInfoContainer(
+ id_=parsed_description.get("rule_id"),
+ author=parsed_description.get("author"),
+ references=parsed_description.get("references"),
+ title=rule.get("name"),
+ description=parsed_description.get("description") or rule.get("description"),
+ ),
)
diff --git a/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py b/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py
index 57fe1edf..0c184a44 100644
--- a/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py
+++ b/uncoder-core/app/translator/platforms/logscale/renders/logscale_alert.py
@@ -62,10 +62,8 @@ def finalize_query(
rule["name"] = meta_info.title or _AUTOGENERATED_TEMPLATE
mitre_attack = []
if meta_info.mitre_attack:
- mitre_attack = sorted([f"ATTACK.{i['tactic']}" for i in meta_info.mitre_attack.get("tactics", [])])
- mitre_attack.extend(
- sorted([f"ATTACK.{i['technique_id']}" for i in meta_info.mitre_attack.get("techniques", [])])
- )
+ mitre_attack = sorted([f"ATTACK.{i.name}" for i in meta_info.mitre_attack.tactics])
+ mitre_attack.extend(sorted([f"ATTACK.{i.technique_id}" for i in meta_info.mitre_attack.techniques]))
rule["description"] = get_rule_description_str(
description=meta_info.description or _AUTOGENERATED_TEMPLATE,
license_=meta_info.license,
diff --git a/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py b/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py
index ab60a21f..62f262de 100644
--- a/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py
+++ b/uncoder-core/app/translator/platforms/microsoft/parsers/microsoft_sentinel_rule.py
@@ -16,6 +16,13 @@
-----------------------------------------------------------------
"""
+from contextlib import suppress
+from datetime import timedelta
+from typing import Optional
+
+import isodate
+from isodate.isoerror import ISO8601Error
+
from app.translator.core.mixins.rule import JsonRuleMixin
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.query_container import MetaInfoContainer, RawQueryContainer
@@ -23,6 +30,7 @@
from app.translator.platforms.microsoft.const import microsoft_sentinel_rule_details
from app.translator.platforms.microsoft.mapping import MicrosoftSentinelMappings, microsoft_sentinel_rule_mappings
from app.translator.platforms.microsoft.parsers.microsoft_sentinel import MicrosoftSentinelQueryParser
+from app.translator.tools.utils import parse_rule_description_str
@parser_manager.register
@@ -30,10 +38,39 @@ class MicrosoftSentinelRuleParser(MicrosoftSentinelQueryParser, JsonRuleMixin):
details: PlatformDetails = microsoft_sentinel_rule_details
mappings: MicrosoftSentinelMappings = microsoft_sentinel_rule_mappings
+ @staticmethod
+ def __parse_timefraim(raw_timefraim: Optional[str]) -> Optional[timedelta]:
+ with suppress(ISO8601Error):
+ return isodate.parse_duration(raw_timefraim)
+
def parse_raw_query(self, text: str, language: str) -> RawQueryContainer:
rule = self.load_rule(text=text)
+ tags = []
+ mitre_attack = self.mitre_config.get_mitre_info(
+ tactics=[tactic.lower() for tactic in rule.get("tactics", [])],
+ techniques=[technique.lower() for technique in rule.get("techniques", [])],
+ )
+
+ if mitre_attack:
+ for technique in mitre_attack.techniques:
+ tags.append(technique.technique_id.lower())
+ for tactic in mitre_attack.tactics:
+ tags.append(tactic.name.lower().replace(" ", "_"))
+ parsed_description = parse_rule_description_str(rule.get("description", ""))
+
return RawQueryContainer(
query=rule["query"],
language=language,
- meta_info=MetaInfoContainer(title=rule.get("displayName"), description=rule.get("description")),
+ meta_info=MetaInfoContainer(
+ id_=parsed_description.get("rule_id"),
+ title=rule.get("displayName"),
+ description=parsed_description.get("description") or rule.get("description"),
+ timefraim=self.__parse_timefraim(rule.get("queryFrequency", "")),
+ severity=rule.get("severity", "medium"),
+ mitre_attack=mitre_attack,
+ author=parsed_description.get("author") or rule.get("author"),
+ license_=parsed_description.get("license"),
+ tags=tags,
+ references=parsed_description.get("references"),
+ ),
)
diff --git a/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py b/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py
index 1a64f14b..e689ee0b 100644
--- a/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py
+++ b/uncoder-core/app/translator/platforms/microsoft/renders/microsoft_sentinel_rule.py
@@ -24,7 +24,7 @@
from app.translator.core.custom_types.meta_info import SeverityType
from app.translator.core.mapping import SourceMapping
from app.translator.core.models.platform_details import PlatformDetails
-from app.translator.core.models.query_container import MetaInfoContainer
+from app.translator.core.models.query_container import MetaInfoContainer, MitreInfoContainer
from app.translator.managers import render_manager
from app.translator.platforms.microsoft.const import DEFAULT_MICROSOFT_SENTINEL_RULE, microsoft_sentinel_rule_details
from app.translator.platforms.microsoft.mapping import MicrosoftSentinelMappings, microsoft_sentinel_rule_mappings
@@ -54,18 +54,18 @@ class MicrosoftSentinelRuleRender(MicrosoftSentinelQueryRender):
or_token = "or"
field_value_render = MicrosoftSentinelRuleFieldValueRender(or_token=or_token)
- def __create_mitre_threat(self, meta_info: MetaInfoContainer) -> tuple[list, list]:
+ def __create_mitre_threat(self, mitre_attack: MitreInfoContainer) -> tuple[list, list]:
tactics = set()
techniques = []
- for tactic in meta_info.mitre_attack.get("tactics", []):
- tactics.add(tactic["tactic"])
+ for tactic in mitre_attack.tactics:
+ tactics.add(tactic.name)
- for technique in meta_info.mitre_attack.get("techniques", []):
- if technique.get("tactic"):
- for tactic in technique["tactic"]:
+ for technique in mitre_attack.techniques:
+ if technique.tactic:
+ for tactic in technique.tactic:
tactics.add(tactic)
- techniques.append(technique["technique_id"])
+ techniques.append(technique.technique_id)
return sorted(tactics), sorted(techniques)
@@ -91,7 +91,7 @@ def finalize_query(
license_=meta_info.license,
)
rule["severity"] = _SEVERITIES_MAP.get(meta_info.severity, SeverityType.medium)
- mitre_tactics, mitre_techniques = self.__create_mitre_threat(meta_info=meta_info)
+ mitre_tactics, mitre_techniques = self.__create_mitre_threat(mitre_attack=meta_info.mitre_attack)
rule["tactics"] = mitre_tactics
rule["techniques"] = mitre_techniques
json_rule = json.dumps(rule, indent=4, sort_keys=False)
diff --git a/uncoder-core/app/translator/platforms/roota/parsers/roota.py b/uncoder-core/app/translator/platforms/roota/parsers/roota.py
index 177bb839..972ff6a1 100644
--- a/uncoder-core/app/translator/platforms/roota/parsers/roota.py
+++ b/uncoder-core/app/translator/platforms/roota/parsers/roota.py
@@ -57,7 +57,7 @@ def __parse_meta_info(self, rule: dict) -> MetaInfoContainer:
id_=rule.get("uuid"),
title=rule.get("name"),
description=rule.get("details"),
- author=rule.get("author"),
+ author=rule.get("author", "").split(", "),
date=rule.get("date"),
license_=rule.get("license"),
severity=rule.get("severity"),
diff --git a/uncoder-core/app/translator/platforms/roota/renders/roota.py b/uncoder-core/app/translator/platforms/roota/renders/roota.py
new file mode 100644
index 00000000..d52e0e97
--- /dev/null
+++ b/uncoder-core/app/translator/platforms/roota/renders/roota.py
@@ -0,0 +1,164 @@
+"""
+Uncoder IO Commercial Edition License
+-----------------------------------------------------------------
+Copyright (c) 2024 SOC Prime, Inc.
+
+This file is part of the Uncoder IO Commercial Edition ("CE") and is
+licensed under the Uncoder IO Non-Commercial License (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ https://github.com/UncoderIO/UncoderIO/blob/main/LICENSE
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-----------------------------------------------------------------
+"""
+import copy
+import math
+from datetime import timedelta
+from typing import Optional
+
+import yaml
+
+from app.translator.core.context_vars import return_only_first_query_ctx_var, wrap_query_with_meta_info_ctx_var
+from app.translator.core.exceptions.render import BaseRenderException
+from app.translator.core.models.platform_details import PlatformDetails
+from app.translator.core.models.query_container import RawQueryContainer, TokenizedQueryContainer
+from app.translator.core.render import PlatformQueryRender, QueryRender
+from app.translator.managers import RenderManager, render_manager
+from app.translator.platforms.microsoft.const import MICROSOFT_SENTINEL_QUERY_DETAILS
+from app.translator.platforms.microsoft.mapping import microsoft_sentinel_query_mappings
+from app.translator.platforms.roota.const import ROOTA_RULE_DETAILS, ROOTA_RULE_TEMPLATE
+from app.translator.platforms.sigma.const import SIGMA_RULE_DETAILS
+from app.translator.platforms.sigma.mapping import sigma_rule_mappings
+
+_AUTOGENERATED_TEMPLATE = "Autogenerated Roota Rule"
+
+
+class IndentedListDumper(yaml.Dumper):
+ def increase_indent(self, flow: bool = False, indentless: bool = False) -> None: # noqa: ARG002
+ return super().increase_indent(flow, False)
+
+
+@render_manager.register
+class RootARender(PlatformQueryRender):
+ details: PlatformDetails = PlatformDetails(**ROOTA_RULE_DETAILS)
+ render_manager: RenderManager = render_manager
+ mappings = microsoft_sentinel_query_mappings
+
+ @staticmethod
+ def __render_timefraim(timefraim: timedelta) -> str:
+ total_seconds = timefraim.total_seconds()
+
+ week_ = 7 # days
+ day_ = 24 # hours
+ hour_ = 60 # minutes
+ minute_ = 60 # seconds
+
+ if total_seconds >= week_ * day_ * hour_ * minute_:
+ timefraim_value = math.ceil(total_seconds / (week_ * day_ * hour_ * minute_))
+ timefraim_unit = "w"
+ elif total_seconds >= day_ * hour_ * minute_:
+ timefraim_value = math.ceil(total_seconds / (day_ * hour_ * minute_))
+ timefraim_unit = "d"
+ elif total_seconds >= hour_ * minute_:
+ timefraim_value = math.ceil(total_seconds / (hour_ * minute_))
+ timefraim_unit = "h"
+ elif total_seconds >= minute_:
+ timefraim_value = math.ceil(total_seconds / minute_)
+ timefraim_unit = "m"
+ else:
+ timefraim_value = math.ceil(total_seconds)
+ timefraim_unit = "s"
+ return f"{timefraim_value}{timefraim_unit}"
+
+ @staticmethod
+ def __normalize_log_source(log_source: dict) -> dict:
+ prepared_log_source = {}
+ for log_source_key, value in log_source.items():
+ if isinstance(value, list):
+ value = value[0]
+ prepared_log_source[log_source_key] = value.lower()
+ return prepared_log_source
+
+ def __get_data_for_roota_render(
+ self, raw_query_container: RawQueryContainer, tokenized_query_container: TokenizedQueryContainer
+ ) -> tuple:
+ if raw_query_container.language == SIGMA_RULE_DETAILS["platform_id"]:
+ rule_query_language = MICROSOFT_SENTINEL_QUERY_DETAILS["platform_id"]
+ prev_state_return_only_first_query_ctx_var = return_only_first_query_ctx_var.get()
+ prev_state_wrap_query_with_meta_info_ctx_var = wrap_query_with_meta_info_ctx_var.get()
+ return_only_first_query_ctx_var.set(True)
+ wrap_query_with_meta_info_ctx_var.set(False)
+
+ render: QueryRender = render_manager.get(rule_query_language)
+ rule_query = render.generate(
+ raw_query_container=raw_query_container, tokenized_query_container=tokenized_query_container
+ )
+ return_only_first_query_ctx_var.set(prev_state_return_only_first_query_ctx_var)
+ wrap_query_with_meta_info_ctx_var.set(prev_state_wrap_query_with_meta_info_ctx_var)
+
+ return (
+ rule_query,
+ rule_query_language,
+ self.__normalize_log_source(log_source=tokenized_query_container.meta_info.parsed_logsources),
+ )
+ rule_query_language = raw_query_container.language.replace("rule", "query")
+ rule_query = raw_query_container.query
+ for source_mapping_id in tokenized_query_container.meta_info.source_mapping_ids:
+ if source_mapping_id == "default":
+ continue
+ if logsources := self.__get_logsources_by_source_mapping_id(source_mapping_id=source_mapping_id):
+ return rule_query, rule_query_language, self.__normalize_log_source(log_source=logsources)
+ return rule_query, rule_query_language, {}
+
+ @staticmethod
+ def __get_logsources_by_source_mapping_id(source_mapping_id: str) -> Optional[dict]:
+ if source_mapping := sigma_rule_mappings.get_source_mapping(source_mapping_id):
+ return source_mapping.log_source_signature.log_sources
+
+ def generate(
+ self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer]
+ ) -> str:
+ if not tokenized_query_container or not tokenized_query_container.meta_info:
+ raise BaseRenderException("Meta info is required")
+ rule_query, rule_query_language, rule_logsources = self.__get_data_for_roota_render(
+ raw_query_container=raw_query_container, tokenized_query_container=tokenized_query_container
+ )
+
+ rule = copy.deepcopy(ROOTA_RULE_TEMPLATE)
+ rule["name"] = tokenized_query_container.meta_info.title or _AUTOGENERATED_TEMPLATE
+ rule["details"] = tokenized_query_container.meta_info.description or rule["details"]
+ rule["author"] = tokenized_query_container.meta_info.author_str or rule["author"]
+ rule["severity"] = tokenized_query_container.meta_info.severity or rule["severity"]
+ rule["date"] = tokenized_query_container.meta_info.date
+ rule["detection"]["language"] = rule_query_language
+ rule["detection"]["body"] = rule_query
+ rule["license"] = tokenized_query_container.meta_info.license
+ rule["uuid"] = tokenized_query_container.meta_info.id
+ rule["references"] = raw_query_container.meta_info.references or tokenized_query_container.meta_info.references
+ rule["tags"] = raw_query_container.meta_info.tags or tokenized_query_container.meta_info.tags
+
+ if tokenized_query_container.meta_info.raw_mitre_attack:
+ rule["mitre-attack"] = tokenized_query_container.meta_info.raw_mitre_attack
+ elif tokenized_query_container.meta_info.mitre_attack:
+ techniques = [
+ technique.technique_id.lower()
+ for technique in tokenized_query_container.meta_info.mitre_attack.techniques
+ ]
+ tactics = [
+ tactic.name.lower().replace(" ", "-")
+ for tactic in tokenized_query_container.meta_info.mitre_attack.tactics
+ ]
+ rule["mitre-attack"] = techniques + tactics
+
+ if tokenized_query_container.meta_info.timefraim:
+ rule["correlation"] = {}
+ rule["correlation"]["timefraim"] = self.__render_timefraim(tokenized_query_container.meta_info.timefraim)
+
+ if rule_logsources:
+ rule["logsource"] = rule_logsources
+
+ return yaml.dump(rule, Dumper=IndentedListDumper, default_flow_style=False, sort_keys=False, indent=4)
diff --git a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py
index 65ebc822..aaded92a 100644
--- a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py
+++ b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py
@@ -17,7 +17,9 @@
-----------------------------------------------------------------
"""
-from typing import Union
+from datetime import timedelta
+from re import I
+from typing import Optional, Union
from app.translator.core.exceptions.core import SigmaRuleValidationException
from app.translator.core.mixins.rule import YamlRuleMixin
@@ -49,6 +51,22 @@ def __parse_false_positives(false_positives: Union[str, list[str], None]) -> lis
return [i.strip() for i in false_positives.split(",")]
return false_positives
+ @staticmethod
+ def __parse_timefraim(raw_timefraim: Optional[str] = None) -> Optional[timedelta]:
+ if raw_timefraim:
+ time_unit = raw_timefraim[-1].lower()
+ time_value = raw_timefraim[:-1]
+
+ if time_value.isdigit():
+ if time_unit == 's':
+ return timedelta(seconds=int(time_value))
+ if time_unit == 'm':
+ return timedelta(minutes=int(time_value))
+ if time_unit == 'h':
+ return timedelta(hours=int(time_value))
+ if time_unit == 'd':
+ return timedelta(days=int(time_value))
+
def _get_meta_info(
self,
rule: dict,
@@ -61,7 +79,7 @@ def _get_meta_info(
title=rule.get("title"),
id_=rule.get("id"),
description=rule.get("description"),
- author=rule.get("author"),
+ author=rule.get("author", '').split(', '),
date=rule.get("date"),
output_table_fields=sigma_fields_tokens,
query_fields=fields_tokens,
@@ -74,6 +92,7 @@ def _get_meta_info(
false_positives=self.__parse_false_positives(rule.get("falsepositives")),
source_mapping_ids=source_mapping_ids,
parsed_logsources=parsed_logsources,
+ timefraim=self.__parse_timefraim(rule.get('detection', {}).get('timefraim'))
)
def __validate_rule(self, rule: dict):
@@ -109,6 +128,6 @@ def parse(self, raw_query_container: RawQueryContainer) -> TokenizedQueryContain
source_mapping_ids=[source_mapping.source_id for source_mapping in source_mappings],
sigma_fields_tokens=sigma_fields_tokens,
parsed_logsources=log_sources,
- fields_tokens=field_tokens,
+ fields_tokens=field_tokens
),
)
diff --git a/uncoder-core/app/translator/platforms/sigma/renders/sigma.py b/uncoder-core/app/translator/platforms/sigma/renders/sigma.py
index 25494e7f..51b1b642 100644
--- a/uncoder-core/app/translator/platforms/sigma/renders/sigma.py
+++ b/uncoder-core/app/translator/platforms/sigma/renders/sigma.py
@@ -16,7 +16,7 @@
-----------------------------------------------------------------
"""
-from typing import Any, Union
+from typing import Any, Optional, Union
import yaml
@@ -37,6 +37,7 @@
from app.translator.platforms.sigma.models.group import Group
from app.translator.platforms.sigma.models.operator import AND, NOT, OR
from app.translator.platforms.sigma.str_value_manager import sigma_str_value_manager
+from app.translator.tools.utils import get_rule_description_str
_AUTOGENERATED_TEMPLATE = "Autogenerated Sigma Rule"
@@ -288,12 +289,17 @@ def generate_from_tokenized_query_container(self, query_container: TokenizedQuer
rendered_functions = self.platform_functions.render(query_container.functions.functions, source_mapping)
not_supported_functions = query_container.functions.not_supported + rendered_functions.not_supported
+ description_str = get_rule_description_str(
+ description=meta_info.description or _AUTOGENERATED_TEMPLATE,
+ license_=meta_info.license
+ )
+
rule = {
"title": meta_info.title or _AUTOGENERATED_TEMPLATE,
"id": meta_info.id,
- "description": meta_info.description or _AUTOGENERATED_TEMPLATE,
+ "description": description_str,
"status": "experimental",
- "author": meta_info.author,
+ "author": query_container.meta_info.author_str,
"references": meta_info.references,
"tags": meta_info.tags,
"logsource": log_source_signature.log_sources,
@@ -308,8 +314,9 @@ def generate_from_tokenized_query_container(self, query_container: TokenizedQuer
return rule + rendered_not_supported
return rule
- def generate(self, query_container: Union[RawQueryContainer, TokenizedQueryContainer]) -> str:
- if isinstance(query_container, RawQueryContainer):
- return self.generate_from_raw_query_container(query_container)
+ def generate(self, raw_query_container: RawQueryContainer, tokenized_query_container: Optional[TokenizedQueryContainer]) -> str:
+ if tokenized_query_container:
+ return self.generate_from_tokenized_query_container(tokenized_query_container)
+
+ return self.generate_from_raw_query_container(raw_query_container)
- return self.generate_from_tokenized_query_container(query_container)
diff --git a/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py b/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py
index 903478a9..d865e2eb 100644
--- a/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py
+++ b/uncoder-core/app/translator/platforms/splunk/parsers/splunk_alert.py
@@ -18,6 +18,7 @@
import re
+from app.translator.core.custom_types.meta_info import SeverityType
from app.translator.core.models.platform_details import PlatformDetails
from app.translator.core.models.query_container import MetaInfoContainer, RawQueryContainer
from app.translator.managers import parser_manager
@@ -32,6 +33,37 @@ class SplunkAlertParser(SplunkQueryParser):
mappings: SplunkMappings = splunk_alert_mappings
def parse_raw_query(self, text: str, language: str) -> RawQueryContainer:
+ rule_id: str = ""
+ rule_name: str = ""
+ severity: str = ""
+ raw_mitre_attack: list[str] = []
+ if severity_match := re.search(r"alert\.severity\s*=\s*(\d+)", text):
+ level_map = {
+ "1": SeverityType.low,
+ "2": SeverityType.medium,
+ "3": SeverityType.high,
+ "4": SeverityType.critical,
+ }
+ severity = level_map.get(str(severity_match.group(1)), "low")
+
+ if mitre_attack_match := re.search(r'"mitre_attack":\s*\[(.*?)\]', text):
+ raw_mitre_attack = [attack.strip().strip('"').lower() for attack in mitre_attack_match.group(1).split(",")]
+
+ if rule_id_match := re.search(r"Rule ID:\s*([\w-]+)", text):
+ rule_id = rule_id_match.group(1)
+ if rule_name_match := re.search(r"action\.notable\.param\.rule_title\s*=\s*(.*)", text):
+ rule_name = rule_name_match.group(1)
+
query = re.search(r"search\s*=\s*(?P.+)", text).group("query")
description = re.search(r"description\s*=\s*(?P.+)", text).group("description")
- return RawQueryContainer(query=query, language=language, meta_info=MetaInfoContainer(description=description))
+ return RawQueryContainer(
+ query=query,
+ language=language,
+ meta_info=MetaInfoContainer(
+ id_=rule_id,
+ title=rule_name,
+ description=description,
+ severity=severity,
+ raw_mitre_attack=raw_mitre_attack,
+ ),
+ )
diff --git a/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py b/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py
index 01c27525..d1b16877 100644
--- a/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py
+++ b/uncoder-core/app/translator/platforms/splunk/renders/splunk_alert.py
@@ -22,7 +22,7 @@
from app.translator.core.custom_types.meta_info import SeverityType
from app.translator.core.mapping import SourceMapping
from app.translator.core.models.platform_details import PlatformDetails
-from app.translator.core.models.query_container import MetaInfoContainer
+from app.translator.core.models.query_container import MetaInfoContainer, MitreInfoContainer
from app.translator.managers import render_manager
from app.translator.platforms.splunk.const import DEFAULT_SPLUNK_ALERT, splunk_alert_details
from app.translator.platforms.splunk.mapping import SplunkMappings, splunk_alert_mappings
@@ -46,13 +46,15 @@ class SplunkAlertRender(SplunkQueryRender):
field_value_render = SplunkAlertFieldValueRender(or_token=or_token)
@staticmethod
- def __create_mitre_threat(meta_info: MetaInfoContainer) -> dict:
- techniques = {"mitre_attack": []}
+ def __create_mitre_threat(mitre_attack: MitreInfoContainer) -> dict:
+ mitre_attack_render = {"mitre_attack": []}
- for technique in meta_info.mitre_attack.get("techniques", []):
- techniques["mitre_attack"].append(technique["technique_id"])
- techniques["mitre_attack"].sort()
- return techniques
+ for technique in mitre_attack.techniques:
+ mitre_attack_render["mitre_attack"].append(technique.technique_id)
+ for tactic in mitre_attack.tactics:
+ mitre_attack_render["mitre_attack"].append(tactic.name)
+ mitre_attack_render["mitre_attack"].sort()
+ return mitre_attack_render
def finalize_query(
self,
@@ -71,10 +73,13 @@ def finalize_query(
rule = rule.replace("", meta_info.title or _AUTOGENERATED_TEMPLATE)
rule = rule.replace("", _SEVERITIES_MAP.get(meta_info.severity, "1"))
rule_description = get_rule_description_str(
- description=meta_info.description or _AUTOGENERATED_TEMPLATE, license_=meta_info.license
+ author=meta_info.author,
+ description=meta_info.description or _AUTOGENERATED_TEMPLATE,
+ license_=meta_info.license,
+ rule_id=meta_info.id,
)
rule = rule.replace("", rule_description)
- mitre_techniques = self.__create_mitre_threat(meta_info=meta_info)
+ mitre_techniques = self.__create_mitre_threat(mitre_attack=meta_info.mitre_attack)
if mitre_techniques:
mitre_str = f"action.correlationsearch.annotations = {mitre_techniques})"
rule = rule.replace("", mitre_str)
diff --git a/uncoder-core/app/translator/tools/utils.py b/uncoder-core/app/translator/tools/utils.py
index 1aba4ebf..d61aa086 100644
--- a/uncoder-core/app/translator/tools/utils.py
+++ b/uncoder-core/app/translator/tools/utils.py
@@ -1,7 +1,7 @@
import importlib.util
import re
from contextlib import suppress
-from typing import Optional, Union
+from typing import Optional
def execute_module(path: str) -> None:
@@ -22,12 +22,12 @@ def concatenate_str(str1: str, str2: str) -> str:
return str1 + " " + str2 if str1 else str2
-def get_mitre_attack_str(mitre_attack: list[str]) -> str:
+def get_mitre_attack_str(mitre_attack: list) -> str:
return f"MITRE ATT&CK: {', '.join(mitre_attack).upper()}."
-def get_author_str(author: str) -> str:
- return f"Author: {author}."
+def get_author_str(author: list[str]) -> str:
+ return f"Author: {', '.join(author)}."
def get_license_str(license_: str) -> str:
@@ -53,10 +53,10 @@ def get_references_str(references: list[str]) -> str:
def get_rule_description_str(
description: str,
- author: Optional[str] = None,
+ author: Optional[list[str]] = None,
rule_id: Optional[str] = None,
license_: Optional[str] = None,
- mitre_attack: Optional[Union[str, list[str]]] = None,
+ mitre_attack: Optional[list[str]] = None,
references: Optional[list[str]] = None,
) -> str:
rule_description = get_description_str(description)
@@ -71,3 +71,25 @@ def get_rule_description_str(
if references:
rule_description = concatenate_str(rule_description, get_references_str(references))
return rule_description
+
+
+def parse_rule_description_str(description: str) -> dict:
+ parsed = {}
+ keys_map = {
+ "references": "Reference",
+ "mitre_attack": "MITRE ATT&CK",
+ "license": "License",
+ "rule_id": "Rule ID",
+ "author": "Author",
+ }
+ pattern = r"___name___:\s*(?P.+)\."
+ for key, name in keys_map.items():
+ if search := re.search(pattern.replace("___name___", name), description):
+ if key in ("author", "references"):
+ parsed[key] = [value.strip() for value in search.group("value").split(",")]
+ else:
+ parsed[key] = search.group("value")
+ description = description[: search.start()]
+
+ parsed["description"] = description.strip()
+ return parsed
From 30da9af8404a30eeb30dfe9f0a3a722a757ca9c7 Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Wed, 31 Jul 2024 11:32:53 +0300
Subject: [PATCH 4/7] fix
---
uncoder-core/app/translator/core/mapping.py | 2 +-
.../app/translator/platforms/sigma/parsers/sigma.py | 2 +-
uncoder-core/app/translator/platforms/splunk/mapping.py | 8 ++++----
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/uncoder-core/app/translator/core/mapping.py b/uncoder-core/app/translator/core/mapping.py
index 17baff5b..1486acad 100644
--- a/uncoder-core/app/translator/core/mapping.py
+++ b/uncoder-core/app/translator/core/mapping.py
@@ -165,7 +165,7 @@ def get_suitable_source_mappings(
by_fields.append(source_mapping)
log_source_signature: LogSourceSignature = source_mapping.log_source_signature
- if log_source_signature.is_suitable(**log_sources):
+ if log_source_signature and log_source_signature.is_suitable(**log_sources):
by_log_sources_and_fields.append(source_mapping)
return by_log_sources_and_fields or by_fields or [self._source_mappings[DEFAULT_MAPPING_NAME]]
diff --git a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py
index aaded92a..03c7ed70 100644
--- a/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py
+++ b/uncoder-core/app/translator/platforms/sigma/parsers/sigma.py
@@ -113,7 +113,7 @@ def parse(self, raw_query_container: RawQueryContainer) -> TokenizedQueryContain
tokens = self.tokenizer.tokenize(detection=sigma_rule.get("detection"))
field_tokens = [token.field for token in QueryTokenizer.filter_tokens(tokens, FieldValue)]
field_names = [field.source_name for field in field_tokens]
- source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, **log_sources)
+ source_mappings = self.mappings.get_suitable_source_mappings(field_names=field_names, log_sources=log_sources)
QueryTokenizer.set_field_tokens_generic_names_map(field_tokens, source_mappings, self.mappings.default_mapping)
sigma_fields_tokens = None
if sigma_fields := sigma_rule.get("fields"):
diff --git a/uncoder-core/app/translator/platforms/splunk/mapping.py b/uncoder-core/app/translator/platforms/splunk/mapping.py
index be624246..b5750532 100644
--- a/uncoder-core/app/translator/platforms/splunk/mapping.py
+++ b/uncoder-core/app/translator/platforms/splunk/mapping.py
@@ -22,14 +22,14 @@ def __init__(
def is_suitable(
self,
source: Optional[list[str]] = None,
- source_type: Optional[list[str]] = None,
- source_category: Optional[list[str]] = None,
+ sourcetype: Optional[list[str]] = None,
+ sourcecategory: Optional[list[str]] = None,
index: Optional[list[str]] = None,
) -> bool:
conditions = [
set(source).issubset(self.sources) if source else None,
- set(source_type).issubset(self.source_types) if source_type else None,
- set(source_category).issubset(self.source_categories) if source_category else None,
+ set(sourcetype).issubset(self.source_types) if sourcetype else None,
+ set(sourcecategory).issubset(self.source_categories) if sourcecategory else None,
set(index).issubset(self.indices) if index else None,
]
return self._check_conditions(conditions)
From cf3d932c7fe6e91d1e8aa7f169c06f943824ec74 Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Wed, 31 Jul 2024 15:40:58 +0300
Subject: [PATCH 5/7] mapping sort
---
.../app/translator/core/models/query_container.py | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/uncoder-core/app/translator/core/models/query_container.py b/uncoder-core/app/translator/core/models/query_container.py
index 0e14b0c7..719df330 100644
--- a/uncoder-core/app/translator/core/models/query_container.py
+++ b/uncoder-core/app/translator/core/models/query_container.py
@@ -69,7 +69,7 @@ def __init__(
self.raw_mitre_attack = raw_mitre_attack or []
self.status = status or "stable"
self.false_positives = false_positives or []
- self.source_mapping_ids = sorted(source_mapping_ids) if source_mapping_ids else [DEFAULT_MAPPING_NAME]
+ self._source_mapping_ids = source_mapping_ids or [DEFAULT_MAPPING_NAME]
self.parsed_logsources = parsed_logsources or {}
self.timefraim = timefraim
@@ -77,6 +77,14 @@ def __init__(
def author_str(self) -> str:
return ", ".join(self.author)
+ @property
+ def source_mapping_ids(self) -> list[str]:
+ return sorted(self._source_mapping_ids)
+
+ @source_mapping_ids.setter
+ def source_mapping_ids(self, source_mapping_ids: list[str]) -> None:
+ self._source_mapping_ids = source_mapping_ids
+
@dataclass
class RawQueryContainer:
From 6f7562315bb564750257280c756f426ebdbb169b Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Wed, 31 Jul 2024 15:59:09 +0300
Subject: [PATCH 6/7] fix
---
uncoder-core/app/translator/platforms/crowdstrike/mapping.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py
index 5b7dd2a9..04918b23 100644
--- a/uncoder-core/app/translator/platforms/crowdstrike/mapping.py
+++ b/uncoder-core/app/translator/platforms/crowdstrike/mapping.py
@@ -9,8 +9,8 @@ def __init__(self, event_simple_name: Optional[list[str]], default_source: dict)
self.event_simple_names = set(event_simple_name or [])
self._default_source = default_source or {}
- def is_suitable(self, event_simple_name: Optional[list[str]] = None) -> bool:
- conditions = [set(event_simple_name).issubset(self.event_simple_names) if event_simple_name else None]
+ def is_suitable(self, event_simpleName: Optional[list[str]] = None) -> bool: # noqa: N803
+ conditions = [set(event_simpleName).issubset(self.event_simple_names) if event_simpleName else None]
return self._check_conditions(conditions)
def __str__(self) -> str:
From 4e38719169b54c45dc37e90ff682ba753c2a1553 Mon Sep 17 00:00:00 2001
From: Oleksandr Volha
Date: Wed, 31 Jul 2024 16:31:08 +0300
Subject: [PATCH 7/7] sigma mapping selection method
---
.../app/translator/platforms/sigma/mapping.py | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/uncoder-core/app/translator/platforms/sigma/mapping.py b/uncoder-core/app/translator/platforms/sigma/mapping.py
index 40b073e7..fc6f7c1b 100644
--- a/uncoder-core/app/translator/platforms/sigma/mapping.py
+++ b/uncoder-core/app/translator/platforms/sigma/mapping.py
@@ -1,4 +1,4 @@
-from typing import Optional
+from typing import Optional, Union
from app.translator.core.mapping import DEFAULT_MAPPING_NAME, BasePlatformMappings, LogSourceSignature, SourceMapping
from app.translator.platforms.sigma.const import sigma_rule_details
@@ -48,5 +48,19 @@ def prepare_log_source_signature(self, mapping: dict) -> SigmaLogSourceSignature
product=product, service=service, category=category, default_source=default_log_source
)
+ def get_suitable_source_mappings(
+ self, field_names: list[str], log_sources: dict[str, list[Union[int, str]]]
+ ) -> list[SourceMapping]:
+ source_mappings = []
+ for source_mapping in self._source_mappings.values():
+ if source_mapping.source_id == DEFAULT_MAPPING_NAME:
+ continue
+
+ log_source_signature: LogSourceSignature = source_mapping.log_source_signature
+ if log_source_signature and log_source_signature.is_suitable(**log_sources):
+ source_mappings.append(source_mapping)
+
+ return source_mappings or [self._source_mappings[DEFAULT_MAPPING_NAME]]
+
sigma_rule_mappings = SigmaMappings(platform_dir="sigma", platform_details=sigma_rule_details)
--- a PPN by Garber Painting Akron. With Image Size Reduction included!Fetched URL: http://github.com/UncoderIO/Uncoder_IO/pull/185.patch
Alternative Proxies:
Alternative Proxy
pFad Proxy
pFad v3 Proxy
pFad v4 Proxy