diff --git a/custom-teams.py b/custom-teams.py index ab925ce..292ea77 100755 --- a/custom-teams.py +++ b/custom-teams.py @@ -129,29 +129,128 @@ class Integration: return None + def _get_nested(self, data, path, default=None): + """Safely read a nested value in a dict.""" + cur = data + for key in path: + if isinstance(cur, dict) and key in cur: + cur = cur[key] + else: + return default + return cur + + def _is_present(self, value): + if value is None: + return False + if isinstance(value, str): + return bool(value.strip()) + return True + + def _add_fact(self, facts, title, value): + """Add a fact only if its value is usable.""" + if self._is_present(value): + facts.append({"title": title, "value": str(value)}) + + def _base_facts(self, alert, pr): + """Facts that are shared by all alert types.""" + rule = alert.get("rule", {}) or {} + agent = alert.get("agent", {}) or {} + facts = [] + + self._add_fact(facts, "Level", f"{pr['txt']} ({pr['lvl']})") + self._add_fact(facts, "Rule ID", rule.get("id", "N/A")) + self._add_fact(facts, "Description", rule.get("description", "N/A")) + self._add_fact(facts, "Agent", f"{agent.get('name', '?')} ({agent.get('ip', '?')})") + self._add_fact(facts, "Timestamp", self._format_time(str(alert.get("timestamp", "")))) + + ip_source = self._extract_ip_source(alert) + self._add_fact(facts, "IP Source", ip_source) + return facts + + def _rule_groups(self, alert): + rule = alert.get("rule", {}) or {} + groups = rule.get("groups", []) + if isinstance(groups, str): + groups = [groups] + if not isinstance(groups, list): + return [] + return [str(g).strip().lower() for g in groups if self._is_present(g)] + + def _is_windows_alert(self, alert): + groups = self._rule_groups(alert) + has_win_data = isinstance(self._get_nested(alert, ("data", "win"), default=None), dict) + return has_win_data or ("windows" in groups) + + def _is_suricata_alert(self, alert): + groups = self._rule_groups(alert) + decoder_name = str(self._get_nested(alert, ("decoder", "name"), default="")).lower() + rule_desc = str(self._get_nested(alert, ("rule", "description"), default="")).lower() + return ("suricata" in groups) or ("suricata" in decoder_name) or ("suricata" in rule_desc) + + def _specific_facts_windows(self, alert): + """Windows-specific fields (eventdata).""" + facts = [] + win = self._get_nested(alert, ("data", "win", "eventdata"), default={}) or {} + + self._add_fact(facts, "Utilisateur", win.get("targetUserName")) + self._add_fact(facts, "Ordinateur", win.get("workstationName")) + self._add_fact(facts, "Event ID", self._get_nested(alert, ("data", "win", "system", "eventID"))) + self._add_fact(facts, "Process", win.get("processName")) + self._add_fact(facts, "Source IP", win.get("ipAddress")) + return facts + + def _specific_facts_suricata(self, alert): + """Suricata-specific fields (signature + network flow).""" + facts = [] + data = alert.get("data", {}) or {} + alert_data = data.get("alert", {}) or {} + flow = data.get("flow", {}) or {} + + self._add_fact(facts, "Signature", alert_data.get("signature")) + self._add_fact(facts, "Category", alert_data.get("category")) + self._add_fact(facts, "Severity", alert_data.get("severity")) + self._add_fact(facts, "Source", f"{flow.get('src_ip', '?')}:{flow.get('src_port', '?')}") + self._add_fact(facts, "Destination", f"{flow.get('dest_ip', '?')}:{flow.get('dest_port', '?')}") + self._add_fact(facts, "Proto", flow.get("proto") or data.get("proto")) + self._add_fact(facts, "App Proto", flow.get("app_proto")) + return facts + + def _specific_facts_generic(self, alert): + """Fallback: extract useful scalar fields without hard-coding one schema.""" + facts = [] + data = alert.get("data", {}) or {} + decoder_name = self._get_nested(alert, ("decoder", "name")) + location = alert.get("location") + + self._add_fact(facts, "Decoder", decoder_name) + self._add_fact(facts, "Location", location) + + # Keep only flat scalar fields to avoid unreadable nested dumps. + for key in ("srcip", "dstip", "srcport", "dstport", "protocol", "action"): + self._add_fact(facts, key, data.get(key)) + + return facts + + def _specific_facts(self, alert): + """ + Route alerts to a dedicated builder. + Order matters: first match wins. + """ + builders = [ + (self._is_suricata_alert, self._specific_facts_suricata), + (self._is_windows_alert, self._specific_facts_windows), + ] + + for predicate, builder in builders: + if predicate(alert): + return builder(alert) + return self._specific_facts_generic(alert) + def _make_card(self, alert): pr = self._priority(alert) rule = alert.get("rule", {}) or {} - agent = alert.get("agent", {}) or {} - - - ip_source = self._extract_ip_source(alert) - - facts = [] - if ip_source: - facts.append({"title": "IP Source", "value": ip_source}) - - facts.extend( - [ - {"title": "Utilisateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("targetUserName", "N/A3"))}, - {"title": "Ordinateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("workstationName", "N/A3"))}, - {"title": "Level", "value": f"{pr['txt']} ({pr['lvl']})"}, - {"title": "Rule ID", "value": str(rule.get("id", "N/A"))}, - {"title": "Description", "value": str(rule.get("description", "N/A"))}, - {"title": "Agent", "value": f"{agent.get('name','?')} ({agent.get('ip','?')})"}, - {"title": "Timestamp", "value": self._format_time(str(alert.get("timestamp", "")))}, - ] - ) + facts = self._base_facts(alert, pr) + facts.extend(self._specific_facts(alert)) payload = { "type": "message",