change cards generation

This commit is contained in:
Your Name 2026-02-18 16:21:50 +01:00
parent 778281f5c0
commit aa034adf58
1 changed files with 119 additions and 20 deletions

View File

@ -129,29 +129,128 @@ class Integration:
return None
def _get_nested(self, data, path, default=None):
"""Safely read a nested value in a dict."""
cur = data
for key in path:
if isinstance(cur, dict) and key in cur:
cur = cur[key]
else:
return default
return cur
def _is_present(self, value):
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
return True
def _add_fact(self, facts, title, value):
"""Add a fact only if its value is usable."""
if self._is_present(value):
facts.append({"title": title, "value": str(value)})
def _base_facts(self, alert, pr):
"""Facts that are shared by all alert types."""
rule = alert.get("rule", {}) or {}
agent = alert.get("agent", {}) or {}
facts = []
self._add_fact(facts, "Level", f"{pr['txt']} ({pr['lvl']})")
self._add_fact(facts, "Rule ID", rule.get("id", "N/A"))
self._add_fact(facts, "Description", rule.get("description", "N/A"))
self._add_fact(facts, "Agent", f"{agent.get('name', '?')} ({agent.get('ip', '?')})")
self._add_fact(facts, "Timestamp", self._format_time(str(alert.get("timestamp", ""))))
ip_source = self._extract_ip_source(alert)
self._add_fact(facts, "IP Source", ip_source)
return facts
def _rule_groups(self, alert):
rule = alert.get("rule", {}) or {}
groups = rule.get("groups", [])
if isinstance(groups, str):
groups = [groups]
if not isinstance(groups, list):
return []
return [str(g).strip().lower() for g in groups if self._is_present(g)]
def _is_windows_alert(self, alert):
groups = self._rule_groups(alert)
has_win_data = isinstance(self._get_nested(alert, ("data", "win"), default=None), dict)
return has_win_data or ("windows" in groups)
def _is_suricata_alert(self, alert):
groups = self._rule_groups(alert)
decoder_name = str(self._get_nested(alert, ("decoder", "name"), default="")).lower()
rule_desc = str(self._get_nested(alert, ("rule", "description"), default="")).lower()
return ("suricata" in groups) or ("suricata" in decoder_name) or ("suricata" in rule_desc)
def _specific_facts_windows(self, alert):
"""Windows-specific fields (eventdata)."""
facts = []
win = self._get_nested(alert, ("data", "win", "eventdata"), default={}) or {}
self._add_fact(facts, "Utilisateur", win.get("targetUserName"))
self._add_fact(facts, "Ordinateur", win.get("workstationName"))
self._add_fact(facts, "Event ID", self._get_nested(alert, ("data", "win", "system", "eventID")))
self._add_fact(facts, "Process", win.get("processName"))
self._add_fact(facts, "Source IP", win.get("ipAddress"))
return facts
def _specific_facts_suricata(self, alert):
"""Suricata-specific fields (signature + network flow)."""
facts = []
data = alert.get("data", {}) or {}
alert_data = data.get("alert", {}) or {}
flow = data.get("flow", {}) or {}
self._add_fact(facts, "Signature", alert_data.get("signature"))
self._add_fact(facts, "Category", alert_data.get("category"))
self._add_fact(facts, "Severity", alert_data.get("severity"))
self._add_fact(facts, "Source", f"{flow.get('src_ip', '?')}:{flow.get('src_port', '?')}")
self._add_fact(facts, "Destination", f"{flow.get('dest_ip', '?')}:{flow.get('dest_port', '?')}")
self._add_fact(facts, "Proto", flow.get("proto") or data.get("proto"))
self._add_fact(facts, "App Proto", flow.get("app_proto"))
return facts
def _specific_facts_generic(self, alert):
"""Fallback: extract useful scalar fields without hard-coding one schema."""
facts = []
data = alert.get("data", {}) or {}
decoder_name = self._get_nested(alert, ("decoder", "name"))
location = alert.get("location")
self._add_fact(facts, "Decoder", decoder_name)
self._add_fact(facts, "Location", location)
# Keep only flat scalar fields to avoid unreadable nested dumps.
for key in ("srcip", "dstip", "srcport", "dstport", "protocol", "action"):
self._add_fact(facts, key, data.get(key))
return facts
def _specific_facts(self, alert):
"""
Route alerts to a dedicated builder.
Order matters: first match wins.
"""
builders = [
(self._is_suricata_alert, self._specific_facts_suricata),
(self._is_windows_alert, self._specific_facts_windows),
]
for predicate, builder in builders:
if predicate(alert):
return builder(alert)
return self._specific_facts_generic(alert)
def _make_card(self, alert):
pr = self._priority(alert)
rule = alert.get("rule", {}) or {}
agent = alert.get("agent", {}) or {}
ip_source = self._extract_ip_source(alert)
facts = []
if ip_source:
facts.append({"title": "IP Source", "value": ip_source})
facts.extend(
[
{"title": "Utilisateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("targetUserName", "N/A3"))},
{"title": "Ordinateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("workstationName", "N/A3"))},
{"title": "Level", "value": f"{pr['txt']} ({pr['lvl']})"},
{"title": "Rule ID", "value": str(rule.get("id", "N/A"))},
{"title": "Description", "value": str(rule.get("description", "N/A"))},
{"title": "Agent", "value": f"{agent.get('name','?')} ({agent.get('ip','?')})"},
{"title": "Timestamp", "value": self._format_time(str(alert.get("timestamp", "")))},
]
)
facts = self._base_facts(alert, pr)
facts.extend(self._specific_facts(alert))
payload = {
"type": "message",