Wazuh-Teams-Workflow/custom-teams.py

240 lines
7.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Wazuh → Microsoft Teams Integration (Workflows)
- Sends Adaptive Card to Teams Workflows webhook
- Adds "IP Source" extracted from alert JSON or from full_log text
- Does NOT send full_log in Teams (privacy / noise)
"""
import json
import logging
import re
import sys
from datetime import datetime
from urllib.parse import urlparse
import requests
LOG_FILE = "/var/ossec/logs/integrations.log"
USER_AGENT = "Wazuh-Teams-Integration/2.1"
ALLOWED_SUFFIXES = ("logic.azure.com", "powerplatform.com")
class Integration:
def __init__(self, alert_file, webhook_url, level):
self.alert_file = alert_file
self.webhook_url = webhook_url
self.level = level
self._setup_logging()
def _setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout),
],
)
self.logger = logging.getLogger("wazuh-teams")
def _validate(self):
try:
if not isinstance(self.alert_file, str) or not self.alert_file.endswith(".alert"):
self.logger.error(f"Invalid alert file: {self.alert_file!r} (must end with .alert)")
return False
if not isinstance(self.webhook_url, str) or not self.webhook_url.startswith(("http://", "https://")):
self.logger.error(f"Invalid webhook URL: {self.webhook_url!r} (must start with http/https)")
return False
parsed = urlparse(self.webhook_url)
host = (parsed.netloc or "").lower()
host_no_port = host.split(":")[0] if host else ""
if not any(host_no_port.endswith(sfx) for sfx in ALLOWED_SUFFIXES):
self.logger.error(
f"Invalid webhook host: {host_no_port!r} (expected domain ending with: {ALLOWED_SUFFIXES})"
)
return False
if not isinstance(self.level, int):
self.logger.error(f"Invalid level: {self.level!r} (must be int)")
return False
return True
except Exception as e:
self.logger.exception(f"Validation exception: {e}")
return False
def _load_alert(self):
try:
with open(self.alert_file, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
self.logger.error(f"Cannot load alert JSON: {e}")
return {}
def _priority(self, alert):
l = int(alert.get("rule", {}).get("level", 0) or 0)
if l >= 12:
return {"txt": "CRITICAL", "clr": "Attention", "lvl": l}
if l >= 7:
return {"txt": "HIGH", "clr": "Warning", "lvl": l}
if l >= 4:
return {"txt": "MEDIUM", "clr": "Good", "lvl": l}
return {"txt": "LOW", "clr": "Accent", "lvl": l}
def _format_time(self, ts):
try:
# Convert "+0100" → "+01:00" for fromisoformat
ts_fixed = ts[:-2] + ":" + ts[-2:] if ts and len(ts) > 5 and (ts[-5] in ["+", "-"]) else ts
dt = datetime.fromisoformat(ts_fixed)
local_dt = dt.astimezone()
return local_dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _extract_ip_source(self, alert):
# 1) Common fields in Wazuh alerts
candidates = [
("data", "ipAddress"),
("data", "srcip"),
("data", "src_ip"),
("data", "source_ip"),
("srcip",),
("src_ip",),
]
for path in candidates:
cur = alert
ok = True
for k in path:
if isinstance(cur, dict) and k in cur:
cur = cur[k]
else:
ok = False
break
if ok and isinstance(cur, str) and cur.strip():
return cur.strip()
# 2) Extract from full_log string (if present)
full_log = alert.get("full_log", "")
if isinstance(full_log, str) and full_log:
m = re.search(r'"ipAddress"\s*:\s*"([^"]+)"', full_log)
if m:
return m.group(1).strip()
return None
def _make_card(self, alert):
pr = self._priority(alert)
rule = alert.get("rule", {}) or {}
agent = alert.get("agent", {}) or {}
ip_source = self._extract_ip_source(alert)
facts = []
if ip_source:
facts.append({"title": "IP Source", "value": ip_source})
facts.extend(
[
{"title": "Utilisateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("targetUserName", "N/A3"))},
{"title": "Ordinateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("workstationName", "N/A3"))},
{"title": "Level", "value": f"{pr['txt']} ({pr['lvl']})"},
{"title": "Rule ID", "value": str(rule.get("id", "N/A"))},
{"title": "Description", "value": str(rule.get("description", "N/A"))},
{"title": "Agent", "value": f"{agent.get('name','?')} ({agent.get('ip','?')})"},
{"title": "Timestamp", "value": self._format_time(str(alert.get("timestamp", "")))},
]
)
payload = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": [
{
"type": "TextBlock",
"text": f"{pr['txt']} WAZUH ALERT - {rule.get('description', 'N/A')}",
"weight": "Bolder",
"size": "Large",
"color": pr["clr"],
},
{"type": "FactSet", "facts": facts},
],
},
}
],
}
# ✅ full_log intentionally NOT included
return payload
def _send(self, card):
headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT}
try:
resp = requests.post(self.webhook_url, json=card, headers=headers, timeout=30)
if resp.status_code in (200, 202):
self.logger.info(f"Sent ok (status {resp.status_code})")
return True
self.logger.error(f"Send failed: {resp.status_code} {resp.text}")
except Exception as e:
self.logger.error(f"Exception while sending: {e}")
return False
def run(self):
if not self._validate():
self.logger.error("Validation failed")
sys.exit(1)
alert = self._load_alert()
card = self._make_card(alert)
if not self._send(card):
sys.exit(1)
sys.exit(0)
def parse_args(argv):
alert_file = None
webhook = None
level = None
for arg in argv[1:]:
if arg.startswith("/tmp/") and arg.endswith(".alert"):
alert_file = arg
elif arg.startswith("http"):
webhook = arg
else:
try:
level = int(arg)
except Exception:
pass
return alert_file, webhook, level
def main():
af, wh, lv = parse_args(sys.argv)
if not (af and wh and lv is not None):
print("Usage: custom-teams.py <alert_file.alert> <webhook_url> <level>")
sys.exit(1)
Integration(af, wh, lv).run()
if __name__ == "__main__":
main()