import os import re import sys import json import logging from typing import Dict, List, Optional, Union, Any from dataclasses import dataclass from pathlib import Path import pandas as pd from openpyxl import load_workbook from scripts.extract_json import process_file_return_json mac_coeur=[] @dataclass class Config: """Configuration centralisée pour l'application.""" LOGS_DIR: str = './src/logs' OUTPUT_DIR: str = './src/output' DATA_JSON_PATH: str = './src/data.json' OUTPUT_FILE_NAME: str = 'uplink_report.xlsx' @property def output_file_path(self) -> str: return os.path.join(self.OUTPUT_DIR, self.OUTPUT_FILE_NAME) @dataclass class InterfaceResult: """Résultat d'analyse d'une interface.""" switch: str logfile: str interface: str status: str speed_gbps: Optional[Union[int, float, str]] port_channel: Optional[str] description: str issues: str mac_local: Optional[List[str]] mac_destination: Optional[str] bridge_name: Optional[str] bridge_speed: Optional[Union[int, float, str]] nb_liens: Optional[int] @dataclass class LCInterfaceResult: """Résultat d'analyse d'une interface LC.""" switch: str interface: str status: str speed_gbps: Optional[Union[int, float, str]] port_channel: Optional[str] description: str type_lien: str nb_liens: Optional[int] bridge_utilise: Optional[str] mac_destination: Optional[str] switch_destination: str class InterfaceAnalyzer: """Analyseur d'interfaces réseau.""" CORE_INTERLINK_PATTERN = re.compile(r'/0/(49|50)$') SPEED_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*Gbps", re.I) SHORT_NAME_PATTERN = re.compile(r'(\d+/\d+/\d+)$') INTERCORE_PATTERN = re.compile(r'[1-4]/0/4[3-8]$') CORE_INTERFACE_PATTERN = re.compile(r'/0/(4[89]|50)$') @classmethod def is_core_interlink(cls, interface_name: str) -> bool: """Détermine si l'interface est un lien inter-cœur.""" return cls.CORE_INTERLINK_PATTERN.search(interface_name) is not None @classmethod def is_intercore_link(cls, interface_name: str) -> bool: """Détermine si l'interface est un lien inter-cœur (LC).""" return cls.INTERCORE_PATTERN.search(interface_name) is not None @classmethod def guess_speed(cls, speed_mode: Optional[str], interface_name: Optional[str] = None) -> Optional[Union[int, float, str]]: """Devine la vitesse d'une interface.""" if speed_mode: match = cls.SPEED_PATTERN.match(speed_mode) if match: return float(match.group(1)) if interface_name and cls.CORE_INTERFACE_PATTERN.search(interface_name): if "Ten-GigabitEthernet" in interface_name: return 10 elif "GigabitEthernet" in interface_name: return 1 elif "HundredGigE" in interface_name: return 100 elif "M-GigabitEthernet" in interface_name: return "?" return None @classmethod def extract_short_name(cls, interface_name: str) -> str: """Extrait le nom court d'une interface.""" match = cls.SHORT_NAME_PATTERN.search(interface_name) return match.group(1) if match else interface_name @classmethod def identify_issues(cls, info: Dict[str, Any], speed: Optional[Union[int, float, str]]) -> List[str]: """Identifie les problèmes potentiels d'une interface.""" issues = [] if info.get("current_state") is not None: if info.get("current_state", "").upper() != "UP": issues.append("DOWN") if isinstance(speed, (int, float)) and speed < 10: issues.append(f"Poss. goulot ({speed} Gbps)") if info.get("port_channel"): issues.append(f"Membre Port-Channel {info['port_channel']}") return issues class SwitchLogAnalyzer: """Analyseur de logs de switchs.""" def __init__(self, config: Config): self.config = config self.analyzer = InterfaceAnalyzer() self.logger = self._setup_logger() def _setup_logger(self) -> logging.Logger: """Configure le logger.""" logger = logging.getLogger(__name__) if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def analyze_switch_log( self, filename: str, switch_name: str, interfaces: Dict[str, Any], mac_addresses: Optional[List[str]] = None, is_core: bool = False, filter_mac_destination: bool = True ) -> List[InterfaceResult]: """Analyse les interfaces d'un switch.""" results = [] for iface_name, info in interfaces.items(): if not self._should_process_interface(info, filter_mac_destination, is_core, iface_name): continue result = self._create_interface_result( filename, switch_name, iface_name, info, interfaces, mac_addresses ) results.append(result) return results def _should_process_interface( self, info: Dict[str, Any], filter_mac_destination: bool, is_core: bool, iface_name: str ) -> bool: """Détermine si une interface doit être traitée.""" global mac_coeur if filter_mac_destination and info.get("mac_destination") not in mac_coeur: return False if is_core and not self.analyzer.is_core_interlink(iface_name): return False return True def _create_interface_result( self, filename: str, switch_name: str, iface_name: str, info: Dict[str, Any], interfaces: Dict[str, Any], mac_addresses: Optional[List[str]] ) -> InterfaceResult: """Crée un résultat d'interface.""" mac_destination = info.get("mac_destination") bridge_name = info.get("bridge_name") bridge_info = interfaces.get(bridge_name, {}) if bridge_name else {} speed = self.analyzer.guess_speed(info.get("speed_mode"), iface_name) bridge_speed = self.analyzer.guess_speed(bridge_info.get("speed_mode"), bridge_name) issues = self.analyzer.identify_issues(info, speed) description = info.get("description") or info.get("config_description") or "" return InterfaceResult( switch=switch_name, logfile=filename, interface=iface_name, status=info.get("current_state", "unknown"), speed_gbps=speed, port_channel=info.get("port_channel"), description=description, issues=", ".join(issues) if issues else "RAS", mac_local=mac_addresses, mac_destination=mac_destination, bridge_name=bridge_name, bridge_speed=bridge_speed, nb_liens=bridge_info.get("nb_liens") ) def analyze_lc_log(self, filename: str, switch_name: str, interfaces: Dict[str, Any]) -> List[LCInterfaceResult]: """Analyse les interfaces LC.""" results = [] for iface_name, info in interfaces.items(): if not info or not isinstance(info, dict): continue result = self._create_lc_interface_result(filename, switch_name, iface_name, info) results.append(result) return results def _create_lc_interface_result( self, filename: str, switch_name: str, iface_name: str, info: Dict[str, Any] ) -> LCInterfaceResult: """Crée un résultat d'interface LC.""" speed = self.analyzer.guess_speed(info.get("speed_mode"), iface_name) type_lien = "Lien inter-coeur" if self.analyzer.is_intercore_link(iface_name) else "Vers Accès" description = info.get("description") or info.get("config_description") or "" nb_liens = info.get("nb_liens") if iface_name.startswith("Bridge-Aggregation") else None bridge_utilise = None if iface_name.startswith("Bridge-Aggregation") else info.get("bridge_name") mac_destination = None if iface_name.startswith("Bridge-Aggregation") else info.get("mac_destination") return LCInterfaceResult( switch=switch_name, interface=iface_name, status=info.get("current_state", "unknown"), speed_gbps=speed, port_channel=info.get("port_channel"), description=description, type_lien=type_lien, nb_liens=nb_liens, bridge_utilise=bridge_utilise, mac_destination=mac_destination, switch_destination="" ) class ExcelReportGenerator: """Générateur de rapports Excel.""" def __init__(self, config: Config): self.config = config self.logger = logging.getLogger(__name__) def generate_report(self, uplink_results: List[InterfaceResult], lc_results: List[LCInterfaceResult]) -> None: """Génère le rapport Excel.""" df_uplink = self._create_uplink_dataframe(uplink_results) df_lc = self._create_lc_dataframe(lc_results) os.makedirs(self.config.OUTPUT_DIR, exist_ok=True) self._write_excel_file(df_uplink, df_lc) self._format_excel_file() print(f"✅ Rapport Excel généré : {self.config.output_file_path}") def _create_uplink_dataframe(self, results: List[InterfaceResult]) -> pd.DataFrame: """Crée le DataFrame pour les uplinks.""" data = [] for result in results: data.append({ "switch": result.switch, "logfile": result.logfile, "interface": result.interface, "status": result.status, "speed_gbps": result.speed_gbps, "port_channel": result.port_channel, "description": result.description, "issues": result.issues, "mac_local": result.mac_local, "mac_destination": result.mac_destination, "bridge_name": result.bridge_name, "bridge_speed": result.bridge_speed, "nb_liens": result.nb_liens }) return pd.DataFrame(data) def _create_lc_dataframe(self, results: List[LCInterfaceResult]) -> pd.DataFrame: """Crée le DataFrame pour les interfaces LC.""" data = [] for result in results: data.append({ "Switch": result.switch, "Interface": result.interface, "Status": result.status, "Vitesse (Gbps)": result.speed_gbps, "Port-Channel": result.port_channel, "Description": result.description, "Type de lien": result.type_lien, "Nb liens": result.nb_liens, "Bridge utilisé": result.bridge_utilise, "MAC destination": result.mac_destination, "Switch destination": result.switch_destination }) return pd.DataFrame(data) def _write_excel_file(self, df_uplink: pd.DataFrame, df_lc: pd.DataFrame) -> None: """Écrit le fichier Excel.""" with pd.ExcelWriter(self.config.output_file_path, engine='openpyxl') as writer: df_uplink.to_excel(writer, index=False, sheet_name="Uplinks") df_lc.to_excel(writer, index=False, sheet_name="LC_Interfaces") def _format_excel_file(self) -> None: """Met en forme le fichier Excel.""" wb = load_workbook(self.config.output_file_path) self._add_formulas(wb) self._format_columns(wb) wb.save(self.config.output_file_path) def _add_formulas(self, workbook) -> None: """Ajoute les formules Excel.""" ws = workbook["LC_Interfaces"] for row in range(2, 400): formula = ( f'=IF(J{row}="","",IFERROR(INDEX(Uplinks!A$2:A$400,' f'MATCH(TRUE,ISNUMBER(SEARCH(J{row},Uplinks!I$2:I$400)),0)),""))' ) ws[f'K{row}'] = formula def _format_columns(self, workbook) -> None: """Met en forme les colonnes.""" column_widths = { "LC_Interfaces": [15, 25, 25, 15, 12, 45, 15, 10, 22, 20, 20], "Uplinks": [15, 30, 25, 7, 12, 12, 30, 22, 47, 15, 20, 13, 10] } for sheet_name, widths in column_widths.items(): ws = workbook[sheet_name] for col_idx, width in enumerate(widths, start=1): ws.column_dimensions[chr(64 + col_idx)].width = width class UplinkReportGenerator: """Générateur principal de rapports d'uplink.""" def __init__(self, config: Optional[Config] = None): self.config = config or Config() self.analyzer = SwitchLogAnalyzer(self.config) self.report_generator = ExcelReportGenerator(self.config) self.logger = logging.getLogger(__name__) def generate_report(self, lc_filename_prefix: str) -> None: """Génère le rapport complet.""" try: uplink_results, lc_results = self._process_log_files(lc_filename_prefix) self.report_generator.generate_report(uplink_results, lc_results) except Exception as e: self.logger.error(f"Erreur lors de la génération du rapport : {e}") raise def _process_log_files(self, lc_filename_prefix: str) -> tuple[List[InterfaceResult], List[LCInterfaceResult]]: """Traite tous les fichiers de logs.""" global mac_coeur uplink_results = [] lc_results = [] log_files = self._get_log_files() for filename in log_files: try: data = self._process_single_log_file(filename) if not data: continue interfaces = data.get("data", {}).get("interfaces", {}) switch_name = data.get("metadata", {}).get("switch_name", "unknown") if filename.startswith(lc_filename_prefix): mac_coeur = data.get("metadata", {}).get("mac_addresses", []) mac_coeur = [mac.lower() for mac in mac_coeur] lc_results.extend(self.analyzer.analyze_lc_log(filename, switch_name, interfaces)) else: mac_addresses = data.get("metadata", {}).get("mac_addresses", []) uplink_results.extend( self.analyzer.analyze_switch_log( filename, switch_name, interfaces, mac_addresses=mac_addresses, filter_mac_destination=True ) ) except Exception as e: self.logger.error(f"Erreur lors du traitement de {filename}: {e}") continue return uplink_results, lc_results def _get_log_files(self) -> List[str]: """Récupère la liste des fichiers de logs.""" if not os.path.exists(self.config.LOGS_DIR): raise FileNotFoundError(f"Le répertoire {self.config.LOGS_DIR} n'existe pas") return [f for f in os.listdir(self.config.LOGS_DIR) if f.endswith('.log')] def _process_single_log_file(self, filename: str) -> Optional[Dict[str, Any]]: """Traite un fichier de log individuel.""" filepath = os.path.join(self.config.LOGS_DIR, filename) return process_file_return_json(filepath) def main(): """Fonction principale.""" if len(sys.argv) < 2: print("Usage: python parse_uplinks.py (sans chemin)") sys.exit(1) lc_filename_prefix = sys.argv[1] try: generator = UplinkReportGenerator() generator.generate_report(lc_filename_prefix) except Exception as e: print(f"❌ Erreur : {e}") sys.exit(1) if __name__ == "__main__": main()