#!/usr/bin/env python3 """ Wazuh → Microsoft Teams Integration (Workflows) - Sends Adaptive Card to Teams Workflows webhook - Adds "IP Source" extracted from alert JSON or from full_log text - Does NOT send full_log in Teams (privacy / noise) """ import json import logging import re import sys from datetime import datetime from urllib.parse import urlparse import requests LOG_FILE = "/var/ossec/logs/integrations.log" USER_AGENT = "Wazuh-Teams-Integration/2.1" ALLOWED_SUFFIXES = ("logic.azure.com", "powerplatform.com") class Integration: def __init__(self, alert_file, webhook_url, level): self.alert_file = alert_file self.webhook_url = webhook_url self.level = level self._setup_logging() def _setup_logging(self): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout), ], ) self.logger = logging.getLogger("wazuh-teams") def _validate(self): try: if not isinstance(self.alert_file, str) or not self.alert_file.endswith(".alert"): self.logger.error(f"Invalid alert file: {self.alert_file!r} (must end with .alert)") return False if not isinstance(self.webhook_url, str) or not self.webhook_url.startswith(("http://", "https://")): self.logger.error(f"Invalid webhook URL: {self.webhook_url!r} (must start with http/https)") return False parsed = urlparse(self.webhook_url) host = (parsed.netloc or "").lower() host_no_port = host.split(":")[0] if host else "" if not any(host_no_port.endswith(sfx) for sfx in ALLOWED_SUFFIXES): self.logger.error( f"Invalid webhook host: {host_no_port!r} (expected domain ending with: {ALLOWED_SUFFIXES})" ) return False if not isinstance(self.level, int): self.logger.error(f"Invalid level: {self.level!r} (must be int)") return False return True except Exception as e: self.logger.exception(f"Validation exception: {e}") return False def _load_alert(self): try: with open(self.alert_file, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: self.logger.error(f"Cannot load alert JSON: {e}") return {} def _priority(self, alert): l = int(alert.get("rule", {}).get("level", 0) or 0) if l >= 12: return {"txt": "CRITICAL", "clr": "Attention", "lvl": l} if l >= 7: return {"txt": "HIGH", "clr": "Warning", "lvl": l} if l >= 4: return {"txt": "MEDIUM", "clr": "Good", "lvl": l} return {"txt": "LOW", "clr": "Accent", "lvl": l} def _format_time(self, ts): try: # Convert "+0100" → "+01:00" for fromisoformat ts_fixed = ts[:-2] + ":" + ts[-2:] if ts and len(ts) > 5 and (ts[-5] in ["+", "-"]) else ts dt = datetime.fromisoformat(ts_fixed) local_dt = dt.astimezone() return local_dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: return ts def _extract_ip_source(self, alert): # 1) Common fields in Wazuh alerts candidates = [ ("data", "ipAddress"), ("data", "srcip"), ("data", "src_ip"), ("data", "source_ip"), ("srcip",), ("src_ip",), ] for path in candidates: cur = alert ok = True for k in path: if isinstance(cur, dict) and k in cur: cur = cur[k] else: ok = False break if ok and isinstance(cur, str) and cur.strip(): return cur.strip() # 2) Extract from full_log string (if present) full_log = alert.get("full_log", "") if isinstance(full_log, str) and full_log: m = re.search(r'"ipAddress"\s*:\s*"([^"]+)"', full_log) if m: return m.group(1).strip() return None def _make_card(self, alert): pr = self._priority(alert) rule = alert.get("rule", {}) or {} agent = alert.get("agent", {}) or {} ip_source = self._extract_ip_source(alert) facts = [] if ip_source: facts.append({"title": "IP Source", "value": ip_source}) facts.extend( [ {"title": "Utilisateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("targetUserName", "N/A3"))}, {"title": "Ordinateur", "value": str(alert.get("data",{}).get("win",{}).get("eventdata",{}).get("workstationName", "N/A3"))}, {"title": "Level", "value": f"{pr['txt']} ({pr['lvl']})"}, {"title": "Rule ID", "value": str(rule.get("id", "N/A"))}, {"title": "Description", "value": str(rule.get("description", "N/A"))}, {"title": "Agent", "value": f"{agent.get('name','?')} ({agent.get('ip','?')})"}, {"title": "Timestamp", "value": self._format_time(str(alert.get("timestamp", "")))}, ] ) payload = { "type": "message", "attachments": [ { "contentType": "application/vnd.microsoft.card.adaptive", "content": { "$schema": "http://adaptivecards.io/schemas/adaptive-card.json", "type": "AdaptiveCard", "version": "1.4", "body": [ { "type": "TextBlock", "text": f"{pr['txt']} WAZUH ALERT - {rule.get('description', 'N/A')}", "weight": "Bolder", "size": "Large", "color": pr["clr"], }, {"type": "FactSet", "facts": facts}, ], }, } ], } # ✅ full_log intentionally NOT included return payload def _send(self, card): headers = {"Content-Type": "application/json", "User-Agent": USER_AGENT} try: resp = requests.post(self.webhook_url, json=card, headers=headers, timeout=30) if resp.status_code in (200, 202): self.logger.info(f"Sent ok (status {resp.status_code})") return True self.logger.error(f"Send failed: {resp.status_code} {resp.text}") except Exception as e: self.logger.error(f"Exception while sending: {e}") return False def run(self): if not self._validate(): self.logger.error("Validation failed") sys.exit(1) alert = self._load_alert() card = self._make_card(alert) if not self._send(card): sys.exit(1) sys.exit(0) def parse_args(argv): alert_file = None webhook = None level = None for arg in argv[1:]: if arg.startswith("/tmp/") and arg.endswith(".alert"): alert_file = arg elif arg.startswith("http"): webhook = arg else: try: level = int(arg) except Exception: pass return alert_file, webhook, level def main(): af, wh, lv = parse_args(sys.argv) if not (af and wh and lv is not None): print("Usage: custom-teams.py ") sys.exit(1) Integration(af, wh, lv).run() if __name__ == "__main__": main()