385 lines
12 KiB
Python
385 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import subprocess
|
|
import datetime
|
|
import html
|
|
import shutil
|
|
import smtplib
|
|
import re
|
|
from email.message import EmailMessage
|
|
from typing import Optional
|
|
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
|
|
|
import config
|
|
|
|
|
|
# ---------------------- PARSING HELPERS ---------------------- #
|
|
|
|
def extract_metrics(raw: str) -> dict:
|
|
"""Extrae valores básicos del informe de Lynis."""
|
|
def extract_int(pattern: str) -> Optional[int]:
|
|
m = re.search(pattern, raw)
|
|
if m:
|
|
try:
|
|
return int(m.group(1))
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
metrics = {
|
|
"hardening_index": extract_int(r"Hardening index\s*:\s*([0-9]+)"),
|
|
"tests_performed": extract_int(r"Tests performed\s*:\s*([0-9]+)"),
|
|
"warnings": extract_int(r"Warnings\s*\((\d+)\)"),
|
|
"suggestions": extract_int(r"Suggestions\s*\((\d+)\)"),
|
|
}
|
|
|
|
# Componentes de software (Firewall / IDS / Malware)
|
|
if "Firewall [V]" in raw:
|
|
metrics["firewall"] = "enabled"
|
|
else:
|
|
metrics["firewall"] = "missing"
|
|
|
|
if "Intrusion software [V]" in raw:
|
|
metrics["ids"] = "enabled"
|
|
else:
|
|
metrics["ids"] = "missing"
|
|
|
|
if "Malware scanner [V]" in raw:
|
|
metrics["malware_scanner"] = "enabled"
|
|
elif "Malware scanner [X]" in raw:
|
|
metrics["malware_scanner"] = "missing"
|
|
else:
|
|
metrics["malware_scanner"] = "unknown"
|
|
|
|
return metrics
|
|
|
|
|
|
def extract_warning_and_suggestion_blocks(raw: str):
|
|
"""
|
|
Extrae las secciones de Warnings y Suggestions del informe
|
|
y las devuelve como listas de líneas.
|
|
"""
|
|
warnings_lines: list[str] = []
|
|
suggestions_lines: list[str] = []
|
|
state: Optional[str] = None
|
|
|
|
for line in raw.splitlines():
|
|
stripped = line.rstrip("\n")
|
|
s = stripped.lstrip()
|
|
|
|
if s.startswith("Warnings ("):
|
|
state = "warnings"
|
|
continue
|
|
if s.startswith("Suggestions ("):
|
|
state = "suggestions"
|
|
continue
|
|
if s.startswith("Follow-up:"):
|
|
state = None
|
|
continue
|
|
|
|
# Fuera de sección
|
|
if state is None:
|
|
continue
|
|
|
|
# Saltar líneas vacías y las de guiones -------------------
|
|
if not s:
|
|
continue
|
|
if set(s) <= {"-", " "}:
|
|
continue
|
|
|
|
if state == "warnings":
|
|
warnings_lines.append(stripped)
|
|
elif state == "suggestions":
|
|
suggestions_lines.append(stripped)
|
|
|
|
return warnings_lines, suggestions_lines
|
|
|
|
|
|
def group_entries(lines: list[str], marker: str) -> list[str]:
|
|
"""
|
|
Agrupa bloques que empiezan por "marker " (ej: "* " o "! ")
|
|
en entradas independientes (cada una puede ocupar varias líneas).
|
|
"""
|
|
entries: list[list[str]] = []
|
|
current: list[str] = []
|
|
|
|
for line in lines:
|
|
s = line.lstrip()
|
|
if s.startswith(marker + " "):
|
|
# Nuevo bloque
|
|
if current:
|
|
entries.append(current)
|
|
# Guardamos la línea sin el marcador inicial
|
|
current = [s[len(marker) + 1:]]
|
|
else:
|
|
if s:
|
|
current.append(s)
|
|
|
|
if current:
|
|
entries.append(current)
|
|
|
|
# Convertimos a texto multi-línea
|
|
return ["\n".join(entry) for entry in entries]
|
|
|
|
|
|
def text_block_to_html(entry: str) -> str:
|
|
"""
|
|
Convierte un bloque de texto en HTML sencillo:
|
|
- Escapa caracteres especiales.
|
|
- Sustituye saltos de línea por <br>.
|
|
"""
|
|
escaped = html.escape(entry)
|
|
return escaped.replace("\n", "<br>")
|
|
|
|
|
|
# ---------------------- LYNIS + REPORT ---------------------- #
|
|
|
|
def run_lynis(text_path: str) -> None:
|
|
"""Ejecuta Lynis y guarda la salida en un TXT."""
|
|
os.makedirs(os.path.dirname(text_path), exist_ok=True)
|
|
print(f"[+] Ejecutando Lynis: {' '.join(config.LYNIS_CMD)}")
|
|
result = subprocess.run(
|
|
config.LYNIS_CMD,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
with open(text_path, "w", encoding="utf-8") as f:
|
|
f.write(result.stdout)
|
|
print(f"[+] Informe TXT generado: {text_path}")
|
|
|
|
|
|
def generate_html(text_path: str, html_path: str, hostname: str, now_str: str) -> str:
|
|
"""
|
|
Genera el HTML usando una plantilla Jinja2 y lo devuelve como string
|
|
(además de guardarlo en html_path).
|
|
"""
|
|
with open(text_path, "r", encoding="utf-8") as f:
|
|
raw = f.read()
|
|
|
|
metrics = extract_metrics(raw)
|
|
warnings_lines, suggestions_lines = extract_warning_and_suggestion_blocks(raw)
|
|
warning_entries_text = group_entries(warnings_lines, marker="!")
|
|
suggestion_entries_text = group_entries(suggestions_lines, marker="*")
|
|
|
|
warning_entries_html = [text_block_to_html(e) for e in warning_entries_text]
|
|
suggestion_entries_html = [text_block_to_html(e) for e in suggestion_entries_text]
|
|
|
|
hi = metrics.get("hardening_index") or 0
|
|
if hi >= 80:
|
|
hi_class = "good"
|
|
elif hi >= 60:
|
|
hi_class = "medium"
|
|
else:
|
|
hi_class = "bad"
|
|
|
|
def safe_int(v: Optional[int]) -> str:
|
|
return str(v) if v is not None else "N/A"
|
|
|
|
def status_to_icon(status: str) -> str:
|
|
if status == "enabled":
|
|
return "✅"
|
|
if status == "missing":
|
|
return "⚠️"
|
|
return "❓"
|
|
|
|
firewall_icon = status_to_icon(metrics.get("firewall", "unknown"))
|
|
ids_icon = status_to_icon(metrics.get("ids", "unknown"))
|
|
malware_icon = status_to_icon(metrics.get("malware_scanner", "unknown"))
|
|
|
|
context = {
|
|
"hostname": hostname,
|
|
"now_str": now_str,
|
|
"hardening_index": safe_int(metrics.get("hardening_index")),
|
|
"hi_class": hi_class,
|
|
"warnings_count": safe_int(metrics.get("warnings")),
|
|
"suggestions_count": safe_int(metrics.get("suggestions")),
|
|
"tests_performed": safe_int(metrics.get("tests_performed")),
|
|
"firewall_icon": firewall_icon,
|
|
"ids_icon": ids_icon,
|
|
"malware_icon": malware_icon,
|
|
"warning_entries": warning_entries_html,
|
|
"suggestion_entries": suggestion_entries_html,
|
|
}
|
|
|
|
template_dir = os.path.join(os.path.dirname(__file__), "templates")
|
|
env = Environment(
|
|
loader=FileSystemLoader(template_dir),
|
|
autoescape=select_autoescape(["html", "xml"]),
|
|
)
|
|
template = env.get_template("report.html.j2")
|
|
html_content = template.render(**context)
|
|
|
|
os.makedirs(os.path.dirname(html_path), exist_ok=True)
|
|
with open(html_path, "w", encoding="utf-8") as f:
|
|
f.write(html_content)
|
|
|
|
print(f"[+] Informe HTML generado: {html_path}")
|
|
return html_content
|
|
|
|
|
|
def generate_pdf(html_path: str, pdf_path: str) -> bool:
|
|
"""
|
|
Genera un PDF visual a partir del HTML renderizado.
|
|
Requiere wkhtmltopdf o weasyprint para generar PDFs con formato.
|
|
"""
|
|
# Intentar con wkhtmltopdf primero (mejor calidad y soporte CSS)
|
|
if shutil.which("wkhtmltopdf"):
|
|
cmd = [
|
|
"wkhtmltopdf",
|
|
"--enable-local-file-access",
|
|
"--print-media-type",
|
|
"--no-stop-slow-scripts",
|
|
"--enable-javascript",
|
|
"--javascript-delay", "1000",
|
|
"--margin-top", "5mm",
|
|
"--margin-bottom", "5mm",
|
|
"--margin-left", "5mm",
|
|
"--margin-right", "5mm",
|
|
"--page-size", "A4",
|
|
"--encoding", "UTF-8",
|
|
html_path,
|
|
pdf_path,
|
|
]
|
|
print(f"[+] Generando PDF con wkhtmltopdf: {pdf_path}")
|
|
try:
|
|
subprocess.run(cmd, check=True, capture_output=True)
|
|
print(f"[+] Informe PDF generado: {pdf_path}")
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"[!] Error al generar PDF con wkhtmltopdf: {e}")
|
|
|
|
# Intentar con weasyprint (segunda mejor opción)
|
|
try:
|
|
from weasyprint import HTML
|
|
print(f"[+] Generando PDF con WeasyPrint: {pdf_path}")
|
|
HTML(filename=html_path).write_pdf(pdf_path)
|
|
print(f"[+] Informe PDF generado: {pdf_path}")
|
|
return True
|
|
except ImportError:
|
|
pass
|
|
except Exception as e:
|
|
print(f"[!] Error al generar PDF con WeasyPrint: {e}")
|
|
|
|
# Intentar con chromium/chrome headless como alternativa
|
|
for browser in ["chromium", "chromium-browser", "google-chrome", "chrome"]:
|
|
if shutil.which(browser):
|
|
cmd = [
|
|
browser,
|
|
"--headless",
|
|
"--disable-gpu",
|
|
"--print-to-pdf=" + pdf_path,
|
|
"--no-margins",
|
|
html_path,
|
|
]
|
|
print(f"[+] Generando PDF con {browser} headless: {pdf_path}")
|
|
try:
|
|
subprocess.run(cmd, check=True, capture_output=True, timeout=30)
|
|
print(f"[+] Informe PDF generado: {pdf_path}")
|
|
return True
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
|
|
print(f"[!] Error al generar PDF con {browser}: {e}")
|
|
continue
|
|
|
|
print("[!] No se encontró ninguna herramienta para generar PDFs con formato visual.")
|
|
print("[!] Instala alguna de las siguientes:")
|
|
print(" - wkhtmltopdf (recomendado): apt install wkhtmltopdf")
|
|
print(" - weasyprint: pip install weasyprint")
|
|
print(" - chromium: apt install chromium-browser")
|
|
return False
|
|
|
|
|
|
def attach_file(msg: EmailMessage, path: str, mime_type: str, subtype: str) -> None:
|
|
with open(path, "rb") as f:
|
|
data = f.read()
|
|
filename = os.path.basename(path)
|
|
msg.add_attachment(data, maintype=mime_type, subtype=subtype, filename=filename)
|
|
|
|
|
|
def send_email(
|
|
text_path: str,
|
|
pdf_path: Optional[str],
|
|
hostname: str,
|
|
now_str: str,
|
|
html_body: str,
|
|
) -> None:
|
|
subject = f"{config.SUBJECT_PREFIX} {hostname} - {now_str}"
|
|
|
|
msg = EmailMessage()
|
|
msg["From"] = config.FROM_ADDR
|
|
msg["To"] = ", ".join(config.TO_ADDRS)
|
|
msg["Subject"] = subject
|
|
|
|
plain_body = (
|
|
f"Lynis security report\n\n"
|
|
f"Host : {hostname}\n"
|
|
f"Date : {now_str}\n\n"
|
|
f"This email contains an HTML version of the Lynis report in the body.\n"
|
|
f"Full report (TXT) and PDF versions are attached.\n"
|
|
)
|
|
msg.set_content(plain_body)
|
|
|
|
msg.add_alternative(html_body, subtype="html")
|
|
|
|
attach_file(msg, text_path, "text", "plain")
|
|
|
|
if pdf_path and os.path.exists(pdf_path):
|
|
attach_file(msg, pdf_path, "application", "pdf")
|
|
|
|
print(f"[+] Enviando correo a: {config.TO_ADDRS} via {config.SMTP_HOST}:{config.SMTP_PORT}")
|
|
with smtplib.SMTP(config.SMTP_HOST, config.SMTP_PORT) as server:
|
|
server.starttls()
|
|
if config.SMTP_USER:
|
|
server.login(config.SMTP_USER, config.SMTP_PASS)
|
|
server.send_message(msg)
|
|
print("[+] Correo enviado correctamente.")
|
|
|
|
|
|
def main() -> None:
|
|
now = datetime.datetime.now()
|
|
now_str = now.strftime("%Y-%m-%d %H:%M:%S")
|
|
stamp = now.strftime("%Y%m%d-%H%M")
|
|
hostname = os.uname().nodename
|
|
|
|
os.makedirs(config.BASE_DIR, exist_ok=True)
|
|
|
|
text_path = os.path.join(config.BASE_DIR, f"lynis-{hostname}-{stamp}.txt")
|
|
html_path = os.path.join(config.BASE_DIR, f"lynis-{hostname}-{stamp}.html")
|
|
pdf_path = os.path.join(config.BASE_DIR, f"lynis-{hostname}-{stamp}.pdf")
|
|
|
|
# Ejecutar Lynis y guardar salida TXT original
|
|
run_lynis(text_path)
|
|
|
|
# Generar HTML formateado desde el TXT
|
|
html_body = generate_html(text_path, html_path, hostname, now_str)
|
|
|
|
# Generar PDF formateado desde el HTML
|
|
pdf_ok = generate_pdf(html_path, pdf_path)
|
|
if not pdf_ok:
|
|
pdf_path = None
|
|
|
|
send_email(text_path, pdf_path, hostname, now_str, html_body)
|
|
|
|
# Limpiar archivos después del envío
|
|
print("[+] Limpiando archivos temporales...")
|
|
files_to_remove = [text_path, html_path]
|
|
if pdf_path and os.path.exists(pdf_path):
|
|
files_to_remove.append(pdf_path)
|
|
|
|
for file_path in files_to_remove:
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
print(f" - Eliminado: {os.path.basename(file_path)}")
|
|
except Exception as e:
|
|
print(f"[!] Error al eliminar {file_path}: {e}")
|
|
|
|
print("[+] Proceso completado.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|