Wazuh-Teams-Workflow/custom-teams.py

339 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Wazuh → Microsoft Teams Integration (Workflows)
- Sends Adaptive Card to Teams Workflows webhook
- Adds "IP Source" extracted from alert JSON or from full_log text
- Does NOT send full_log in Teams (privacy / noise)
"""
import json
import logging
import re
import sys
from datetime import datetime
from urllib.parse import urlparse
import requests
LOG_FILE = "/var/ossec/logs/integrations.log"
USER_AGENT = "Wazuh-Teams-Integration/2.1"
ALLOWED_SUFFIXES = ("logic.azure.com", "powerplatform.com")
class Integration:
def __init__(self, alert_file, webhook_url, level):
self.alert_file = alert_file
self.webhook_url = webhook_url
self.level = level
self._setup_logging()
def _setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout),
],
)
self.logger = logging.getLogger("wazuh-teams")
def _validate(self):
try:
if not isinstance(self.alert_file, str) or not self.alert_file.endswith(".alert"):
self.logger.error(f"Invalid alert file: {self.alert_file!r} (must end with .alert)")
return False
if not isinstance(self.webhook_url, str) or not self.webhook_url.startswith(("http://", "https://")):
self.logger.error(f"Invalid webhook URL: {self.webhook_url!r} (must start with http/https)")
return False
parsed = urlparse(self.webhook_url)
host = (parsed.netloc or "").lower()
host_no_port = host.split(":")[0] if host else ""
if not any(host_no_port.endswith(sfx) for sfx in ALLOWED_SUFFIXES):
self.logger.error(
f"Invalid webhook host: {host_no_port!r} (expected domain ending with: {ALLOWED_SUFFIXES})"
)
return False
if not isinstance(self.level, int):
self.logger.error(f"Invalid level: {self.level!r} (must be int)")
return False
return True
except Exception as e:
self.logger.exception(f"Validation exception: {e}")
return False
def _load_alert(self):
try:
with open(self.alert_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
self.logger.error(f"Cannot load alert JSON: {e}")
return {}
def _priority(self, alert):
l = int(alert.get("rule", {}).get("level", 0) or 0)
if l >= 12:
return {"txt": "CRITICAL", "clr": "Attention", "lvl": l}
if l >= 7:
return {"txt": "HIGH", "clr": "Warning", "lvl": l}
if l >= 4:
return {"txt": "MEDIUM", "clr": "Good", "lvl": l}
return {"txt": "LOW", "clr": "Accent", "lvl": l}
def _format_time(self, ts):
try:
# Convert "+0100" → "+01:00" for fromisoformat
ts_fixed = ts[:-2] + ":" + ts[-2:] if ts and len(ts) > 5 and (ts[-5] in ["+", "-"]) else ts
dt = datetime.fromisoformat(ts_fixed)
local_dt = dt.astimezone()
return local_dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _extract_ip_source(self, alert):
# 1) Common fields in Wazuh alerts
candidates = [
("data", "ipAddress"),
("data", "srcip"),
("data", "src_ip"),
("data", "source_ip"),
("srcip",),
("src_ip",),
]
for path in candidates:
cur = alert
ok = True
for k in path:
if isinstance(cur, dict) and k in cur:
cur = cur[k]
else:
ok = False
break
if ok and isinstance(cur, str) and cur.strip():
return cur.strip()
# 2) Extract from full_log string (if present)
full_log = alert.get("full_log", "")
if isinstance(full_log, str) and full_log:
m = re.search(r'"ipAddress"\s*:\s*"([^"]+)"', full_log)
if m:
return m.group(1).strip()
return None
def _get_nested(self, data, path, default=None):
"""Safely read a nested value in a dict."""
cur = data
for key in path:
if isinstance(cur, dict) and key in cur:
cur = cur[key]
else:
return default
return cur
def _is_present(self, value):
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
return True
def _add_fact(self, facts, title, value):
"""Add a fact only if its value is usable."""
if self._is_present(value):
facts.append({"title": title, "value": str(value)})
def _base_facts(self, alert, pr):
"""Facts that are shared by all alert types."""
rule = alert.get("rule", {}) or {}
agent = alert.get("agent", {}) or {}
facts = []
self._add_fact(facts, "Level", f"{pr['txt']} ({pr['lvl']})")
self._add_fact(facts, "Rule ID", rule.get("id", "N/A"))
self._add_fact(facts, "Description", rule.get("description", "N/A"))
self._add_fact(facts, "Agent", f"{agent.get('name', '?')} ({agent.get('ip', '?')})")
self._add_fact(facts, "Timestamp", self._format_time(str(alert.get("timestamp", ""))))
ip_source = self._extract_ip_source(alert)
self._add_fact(facts, "IP Source", ip_source)
return facts
def _rule_groups(self, alert):
rule = alert.get("rule", {}) or {}
groups = rule.get("groups", [])
if isinstance(groups, str):
groups = [groups]
if not isinstance(groups, list):
return []
return [str(g).strip().lower() for g in groups if self._is_present(g)]
def _is_windows_alert(self, alert):
groups = self._rule_groups(alert)
has_win_data = isinstance(self._get_nested(alert, ("data", "win"), default=None), dict)
return has_win_data or ("windows" in groups)
def _is_suricata_alert(self, alert):
groups = self._rule_groups(alert)
decoder_name = str(self._get_nested(alert, ("decoder", "name"), default="")).lower()
rule_desc = str(self._get_nested(alert, ("rule", "description"), default="")).lower()
return ("suricata" in groups) or ("suricata" in decoder_name) or ("suricata" in rule_desc)
def _specific_facts_windows(self, alert):
"""Windows-specific fields (eventdata)."""
facts = []
win = self._get_nested(alert, ("data", "win", "eventdata"), default={}) or {}
self._add_fact(facts, "Utilisateur", win.get("targetUserName"))
self._add_fact(facts, "Ordinateur", win.get("workstationName"))
self._add_fact(facts, "Event ID", self._get_nested(alert, ("data", "win", "system", "eventID")))
self._add_fact(facts, "Process", win.get("processName"))
self._add_fact(facts, "Source IP", win.get("ipAddress"))
return facts
def _specific_facts_suricata(self, alert):
"""Suricata-specific fields (signature + network flow)."""
facts = []
data = alert.get("data", {}) or {}
alert_data = data.get("alert", {}) or {}
flow = data.get("flow", {}) or {}
self._add_fact(facts, "Signature", alert_data.get("signature"))
self._add_fact(facts, "Category", alert_data.get("category"))
self._add_fact(facts, "Severity", alert_data.get("severity"))
self._add_fact(facts, "Source", f"{flow.get('src_ip', '?')}:{flow.get('src_port', '?')}")
self._add_fact(facts, "Destination", f"{flow.get('dest_ip', '?')}:{flow.get('dest_port', '?')}")
self._add_fact(facts, "Proto", flow.get("proto") or data.get("proto"))
self._add_fact(facts, "App Proto", flow.get("app_proto"))
return facts
def _specific_facts_generic(self, alert):
"""Fallback: extract useful scalar fields without hard-coding one schema."""
facts = []
data = alert.get("data", {}) or {}
decoder_name = self._get_nested(alert, ("decoder", "name"))
location = alert.get("location")
self._add_fact(facts, "Decoder", decoder_name)
self._add_fact(facts, "Location", location)
# Keep only flat scalar fields to avoid unreadable nested dumps.
for key in ("srcip", "dstip", "srcport", "dstport", "protocol", "action"):
self._add_fact(facts, key, data.get(key))
return facts
def _specific_facts(self, alert):
"""
Route alerts to a dedicated builder.
Order matters: first match wins.
"""
builders = [
(self._is_suricata_alert, self._specific_facts_suricata),
(self._is_windows_alert, self._specific_facts_windows),
]
for predicate, builder in builders:
if predicate(alert):
return builder(alert)
return self._specific_facts_generic(alert)
def _make_card(self, alert):
pr = self._priority(alert)
rule = alert.get("rule", {}) or {}
facts = self._base_facts(alert, pr)
facts.extend(self._specific_facts(alert))
payload = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": [
{
"type": "TextBlock",
"text": f"{pr['txt']} WAZUH ALERT - {rule.get('description', 'N/A')}",
"weight": "Bolder",
"size": "Large",
"color": pr["clr"],
},
{"type": "FactSet", "facts": facts},
],
},
}
],
}
# ✅ full_log intentionally NOT included
return payload
def _send(self, card):
headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT}
try:
resp = requests.post(self.webhook_url, json=card, headers=headers, timeout=30)
if resp.status_code in (200, 202):
self.logger.info(f"Sent ok (status {resp.status_code})")
return True
self.logger.error(f"Send failed: {resp.status_code} {resp.text}")
except Exception as e:
self.logger.error(f"Exception while sending: {e}")
return False
def run(self):
if not self._validate():
self.logger.error("Validation failed")
sys.exit(1)
alert = self._load_alert()
card = self._make_card(alert)
if not self._send(card):
sys.exit(1)
sys.exit(0)
def parse_args(argv):
alert_file = None
webhook = None
level = None
for arg in argv[1:]:
if arg.startswith("/tmp/") and arg.endswith(".alert"):
alert_file = arg
elif arg.startswith("http"):
webhook = arg
else:
try:
level = int(arg)
except Exception:
pass
return alert_file, webhook, level
def main():
af, wh, lv = parse_args(sys.argv)
if not (af and wh and lv is not None):
print("Usage: custom-teams.py <alert_file.alert> <webhook_url> <level>")
sys.exit(1)
Integration(af, wh, lv).run()
if __name__ == "__main__":
main()