From ab7611a923b541b67fb998fcab439d124a4eacd1 Mon Sep 17 00:00:00 2001 From: Chevallier Date: Wed, 23 Jul 2025 09:39:15 +0200 Subject: [PATCH] first commit --- README.md | 57 ++++ src/main.py | 21 ++ src/scripts/extract_json.py | 640 +++++++++++++++++++++++++++++++++++ src/scripts/format.py | 254 ++++++++++++++ src/scripts/mermaid.py | 358 ++++++++++++++++++++ src/scripts/parse_uplinks.py | 448 ++++++++++++++++++++++++ 6 files changed, 1778 insertions(+) create mode 100644 README.md create mode 100644 src/main.py create mode 100644 src/scripts/extract_json.py create mode 100644 src/scripts/format.py create mode 100644 src/scripts/mermaid.py create mode 100644 src/scripts/parse_uplinks.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..f809f88 --- /dev/null +++ b/README.md @@ -0,0 +1,57 @@ +# Parseur de Logs HPE 5130 + +Ce projet permet d'extraire automatiquement des informations pertinentes depuis les logs de switchs **HPE 5130**, notamment les connexions entre les switchs d'accès et le cœur de réseau. Les résultats sont exportés sous deux formats : + +- Un **fichier Excel** listant les interfaces côté accès et cœur. +- Un **diagramme Mermaid** simplifié représentant les connexions. + +--- + +## 🧰 Fonctionnalités + +- Extraction des données utiles depuis les fichiers `.log` au format **JSON**. +- Génération d’un **rapport Excel (`uplink_report.xlsx`)** contenant : + - Les informations des interfaces côté accès connectées au cœur. + - Les informations des interfaces du cœur. +- Création d’un **fichier Mermaid (`mermaid.md`)** représentant graphiquement les connexions. + +--- + +## 🚀 Lancement + +Pour lancer l’outil, placez-vous dans le dossier du projet et exécutez la commande suivante : + +```bash +python .\src\main.py [fichier logs du cœur] +``` +#### 💡 Exemple : python .\src\main.py .\src\logs\core_switch.log + +--- + +## 📁 Arborescence du projet +```makefile +C:. +└───src + ├───logs # Contient les fichiers .log des switchs (à déposer ici) + ├───output # Contient les fichiers générés (Excel et Mermaid) + ├───scripts # Contient les scripts de traitement + │ ├───extract_json.py # Extraction des données en JSON + │ ├───format.py # Formatage des données + │ ├───mermaid.py # Formatage et génération du fichier Mermaid + │ └───parse_uplinks.py # Création du fichier Excel + └───main.py # Programme de lancement +``` + +--- + +## 📌 Instructions d'utilisation +1. Déposer les fichiers .log dans le dossier src/logs. +2. Lancer le script main.py avec en argument le log du switch cœur. +3. Les résultats seront disponibles dans le dossier src/output : + - uplink_report.xlsx + - mermaid.md + +--- + +## ⚠️ Remarque +- Bien penser à **mettre les fichiers de logs avec l'extension `.log`** dans le dossier `src/logs`. \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..78db894 --- /dev/null +++ b/src/main.py @@ -0,0 +1,21 @@ +import sys +import os +from pathlib import Path +sys.path.append(os.path.dirname(__file__)) +from scripts.mermaid import NetworkAnalyzer +from scripts.parse_uplinks import UplinkReportGenerator + +def main(): + """Fonction principale du programme d'analyse.""" + if len(sys.argv) != 2: + print("Usage: python main.py ") + sys.exit(1) + + coeur_log_filename = sys.argv[1] + coeur_log_filename = os.path.basename(coeur_log_filename) + + UplinkReportGenerator().generate_report(coeur_log_filename) + NetworkAnalyzer(base_dir=Path(__file__).parent).analyze(coeur_log_filename) + +if __name__ == "__main__": + main() diff --git a/src/scripts/extract_json.py b/src/scripts/extract_json.py new file mode 100644 index 0000000..37538b9 --- /dev/null +++ b/src/scripts/extract_json.py @@ -0,0 +1,640 @@ +import os +import re +import json +import glob +import sys +from datetime import datetime + +class InterfaceParser: + """ + Parses 'display interface' command output to extract interface details. + """ + def parse(self, content: str) -> dict: + interfaces = {} + current_interface = None + lines = content.split('\n') + + for line in lines: + line = line.strip() + if re.match(r'^(?!\s)(?:interface\s*)?(?:[A-Za-z-]*Ethernet|Bridge-Aggregation|Vlan-interface)\d+', line): + current_interface = line + interfaces[current_interface] = { + "name": current_interface, + "current_state": None, + "description": None, + "speed_mode": None, + "pvid": None, + "port_channel": None, + "bridge_name": None, + "type": None, + "access_vlan": None, + "trunk_vlans": [], + "ip_address": None, + "shutdown": False, + "config_description": None, + "nb_liens": None, + "mac_destination": None + } + continue + + if current_interface and line: + if line.startswith('Current state:'): + interfaces[current_interface]["current_state"] = line.split(':', 1)[1].strip() + + elif line.startswith('Description:'): + description = line.split(':', 1)[1].strip() + if description and description != f"{current_interface} Interface": + interfaces[current_interface]["description"] = description + else: + interfaces[current_interface]["description"] = None + + elif 'speed mode' in line.lower(): + speed_match = re.search(r'(\d+(?:\.\d+)?)\s*(Gbps|Mbps|kbps)[^\w]*speed\s+mode', line, re.I) + if speed_match: + speed_value = speed_match.group(1) + speed_unit = speed_match.group(2) + interfaces[current_interface]["speed_mode"] = f"{speed_value}{speed_unit}" + + elif 'pvid' in line.lower(): + pvid_match = re.search(r'pvid[:\s]*(\d+)', line, re.I) + if pvid_match: + interfaces[current_interface]["pvid"] = int(pvid_match.group(1)) + + elif 'port channel' in line.lower() or 'channel group' in line.lower(): + pc_match = re.search(r'(?:port channel|channel group)[:\s]*(\d+)', line, re.I) + if pc_match: + interfaces[current_interface]["port_channel"] = int(pc_match.group(1)) + elif 'member of bridge-aggregation' in line.lower(): + ba_match = re.search(r'member of bridge-aggregation\s*(\d+)', line, re.I) + if ba_match: + interfaces[current_interface]["port_channel"] = int(ba_match.group(1)) + return interfaces + +class LinkAggregationParser: + """ + Parses 'display link-aggregation verbose' command output. + Extracts link counts, MAC mappings, and bridge mappings. + """ + def _convert_port_name(self, port_name: str) -> str: + """Converts the port name to a standardized format.""" + port_name = re.sub(r'\([A-Z]\)', '', port_name) + patterns = [ + (r'^XGE(\d+/\d+/\d+)$', r'Ten-GigabitEthernet\1'), + (r'^GE(\d+/\d+/\d+)$', r'GigabitEthernet\1'), + (r'^FE(\d+/\d+/\d+)$', r'HundredGigE\1'), + (r'^TE(\d+/\d+/\d+)$', r'TenGigabitEthernet\1'), + ] + for pattern, replacement in patterns: + if re.match(pattern, port_name): + return re.sub(pattern, replacement, port_name) + return port_name + + def parse(self, content: str) -> tuple[dict, dict, dict]: + link_counts = {} + mac_mappings = {} + bridge_mappings = {} + + lines = content.split('\n') + current_aggregate = None + current_section = None + local_ports = [] + remote_ports = [] + + for line in lines: + line = line.strip() + + if line.startswith('Aggregate Interface:'): + interface_match = re.search(r'Aggregate Interface:\s*(.+)', line) + if interface_match: + current_aggregate = interface_match.group(1).strip() + link_counts[current_aggregate] = 0 + local_ports = [] + remote_ports = [] + continue + + if line.startswith('Local:'): + current_section = 'local' + continue + + if line.startswith('Remote:'): + current_section = 'remote' + continue + + if line.startswith('Aggregate Interface:'): + interface_match = re.search(r'Aggregate Interface:\s*(.+)', line) + if interface_match: + current_aggregate = interface_match.group(1).strip() + + if current_aggregate and current_section == 'local' and line: + port_match = re.match(r'^([A-Za-z]+\d+/\d+/\d+(?:\([A-Z]\))?)\s+', line) + if port_match: + if not any(keyword in line for keyword in ['Port', 'Status', 'Priority', 'Index', 'Oper-Key', 'Flag']): + local_port = port_match.group(1) + local_ports.append(local_port) + link_counts[current_aggregate] += 1 + + if current_aggregate and current_section == 'remote' and line: + remote_match = re.match(r'^([A-Za-z]+\d+/\d+/\d+(?:\([A-Z]\))?)\s+\d+\s+\d+\s+\d+\s+0x[0-9a-fA-F]+,\s*([0-9a-fA-F-]+)', line) + if remote_match: + if not any(keyword in line for keyword in ['Actor', 'Priority', 'Index', 'Oper-Key', 'SystemID', 'Flag']): + remote_port = remote_match.group(1) + system_id = remote_match.group(2) + remote_ports.append((remote_port, system_id)) + + if current_aggregate and local_ports and remote_ports and \ + (line.startswith('Aggregate Interface:') or line == lines[-1].strip()): + min_length = min(len(local_ports), len(remote_ports)) + for i in range(min_length): + local_port = local_ports[i] + remote_port, system_id = remote_ports[i] + + interface_name = self._convert_port_name(local_port) + mac_mappings[interface_name] = system_id + bridge_mappings[interface_name] = current_aggregate + if line.startswith('Aggregate Interface:'): + current_aggregate = None + local_ports = [] + remote_ports = [] + current_section = None + + if current_aggregate and local_ports and remote_ports: + min_length = min(len(local_ports), len(remote_ports)) + for i in range(min_length): + local_port = local_ports[i] + remote_port, system_id = remote_ports[i] + + interface_name = self._convert_port_name(local_port) + mac_mappings[interface_name] = system_id + bridge_mappings[interface_name] = current_aggregate + + for interface, count in link_counts.items(): + if 'Bridge-Aggregation' in interface: + link_counts[interface] = count + + return link_counts, mac_mappings, bridge_mappings + +class ConfigurationParser: + """ + Parses 'display current-configuration' command output. + Extracts interface configurations and general switch configurations. + """ + def _parse_vlan_ranges(self, vlan_string: str) -> list: + """Parses a VLAN string and returns a list of unique VLANs.""" + vlans = set() + parts = vlan_string.split() + i = 0 + while i < len(parts): + if i + 2 < len(parts) and parts[i + 1].lower() == 'to': + try: + start_vlan = int(parts[i]) + end_vlan = int(parts[i + 2]) + vlans.update(range(start_vlan, end_vlan + 1)) + i += 3 + except ValueError: + i += 1 + else: + try: + vlans.add(int(parts[i])) + except ValueError: + pass + i += 1 + return sorted(list(vlans)) + + def parse(self, content: str) -> tuple[dict, dict]: + interfaces_config = {} + switch_config = { + "version": None, + "lldp": {"enabled": False, "details": {}}, + "stp": {"enabled": False, "details": {}}, + "ssh": {"enabled": False, "details": {}}, + "snmp_contact": None, + "ntp_server": [], + "users": [], + "vlans": [] + } + + lines = content.split('\n') + current_interface = None + current_section = None + + i = 0 + while i < len(lines): + line = lines[i] + line_stripped = line.strip() + + if line_stripped.startswith('version '): + switch_config["version"] = line_stripped.split('version ', 1)[1].strip() + + elif 'lldp enable' in line_stripped.lower(): + switch_config["lldp"]["enabled"] = True + elif line_stripped.startswith('lldp '): + switch_config["lldp"]["details"][line_stripped] = True + + elif 'stp enable' in line_stripped.lower() or 'spanning-tree' in line_stripped.lower(): + switch_config["stp"]["enabled"] = True + switch_config["stp"]["details"][line_stripped] = True + + elif 'ssh server enable' in line_stripped.lower() or 'ssh user' in line_stripped.lower(): + switch_config["ssh"]["enabled"] = True + switch_config["ssh"]["details"][line_stripped] = True + + elif line_stripped.startswith('snmp-agent sys-info contact'): + switch_config["snmp_contact"] = line_stripped.split('contact', 1)[1].strip().strip('"') + + elif line_stripped.startswith('ntp-service unicast-server'): + ntp_server = line_stripped.split('unicast-server', 1)[1].strip() + switch_config["ntp_server"].append(ntp_server) + + elif line_stripped.startswith('local-user '): + username = line_stripped.split('local-user ', 1)[1].strip() + switch_config["users"].append(username) + + elif line_stripped.startswith('vlan '): + vlan_info = line_stripped.split('vlan ', 1)[1].strip() + if "vlans" not in switch_config: + switch_config["vlans"] = [] + + if 'to' in vlan_info: + parts = vlan_info.split(' to ') + if len(parts) == 2: + start_vlan = int(parts[0]) + end_vlan = int(parts[1]) + for vlan_id in range(start_vlan, end_vlan + 1): + if vlan_id not in switch_config["vlans"]: + switch_config["vlans"].append(vlan_id) + else: + vlan_id = int(vlan_info) + if vlan_id not in switch_config["vlans"]: + switch_config["vlans"].append(vlan_id) + + elif line_stripped.startswith('interface '): + interface_match = re.match(r'^interface\s+(.+)', line_stripped) + if interface_match: + current_interface = interface_match.group(1).strip() + interfaces_config[current_interface] = { + "name": current_interface, + "type": "access", + "access_vlan": None, + "trunk_vlans": [], + "ip_address": None, + "shutdown": False, + "port_channel": None, + "description": None + } + current_section = "interface" + + elif current_section == "interface" and current_interface: + if line_stripped != '' and not line.startswith(' ') and not line.startswith('\t'): + if not line_stripped.startswith('interface '): + current_interface = None + current_section = None + continue + else: + interface_match = re.match(r'^interface\s+(.+)', line_stripped) + if interface_match: + current_interface = interface_match.group(1).strip() + interfaces_config[current_interface] = { + "name": current_interface, + "type": "access", + "access_vlan": None, + "trunk_vlans": [], + "ip_address": None, + "shutdown": False, + "port_channel": None, + "description": None + } + + elif line.startswith(' ') or line.startswith('\t'): + command = line.strip() + + if command.startswith('description '): + interfaces_config[current_interface]["description"] = command.split('description ', 1)[1].strip() + + elif command == 'shutdown': + interfaces_config[current_interface]["shutdown"] = True + + elif command.startswith('ip address '): + ip_info = command.split('ip address ', 1)[1].strip() + interfaces_config[current_interface]["ip_address"] = ip_info + + elif 'link-aggregation' in command.lower() or 'port-group' in command.lower(): + pc_match = re.search(r'(\d+)', command) + if pc_match: + interfaces_config[current_interface]["port_channel"] = int(pc_match.group(1)) + + elif command.startswith('port access vlan '): + vlan_id = int(command.split('port access vlan ', 1)[1].strip()) + interfaces_config[current_interface]["type"] = "access" + interfaces_config[current_interface]["access_vlan"] = vlan_id + + elif command.startswith('port link-type trunk'): + interfaces_config[current_interface]["type"] = "trunk" + + elif command.startswith('port trunk permit vlan '): + interfaces_config[current_interface]["type"] = "trunk" + vlan_info = command.split('port trunk permit vlan ', 1)[1].strip() + if vlan_info == "all": + interfaces_config[current_interface]["trunk_vlans"] = "all" + else: + trunk_vlans = self._parse_vlan_ranges(vlan_info) + interfaces_config[current_interface]["trunk_vlans"].extend(trunk_vlans) + + elif command.startswith('undo port trunk permit vlan '): + vlan_info = command.split('undo port trunk permit vlan ', 1)[1].strip() + if vlan_info == "all": + interfaces_config[current_interface]["trunk_vlans"] = [] + else: + vlans_to_remove = self._parse_vlan_ranges(vlan_info) + current_trunk = interfaces_config[current_interface]["trunk_vlans"] + if current_trunk != "all": + for vlan in vlans_to_remove: + if vlan in current_trunk: + current_trunk.remove(vlan) + + elif command.startswith('port trunk pvid vlan '): + interfaces_config[current_interface]["type"] = "trunk" + pvid = int(command.split('port trunk pvid vlan ', 1)[1].strip()) + trunk_vlans = interfaces_config[current_interface]["trunk_vlans"] + if trunk_vlans != "all" and pvid not in trunk_vlans: + trunk_vlans.append(pvid) + i += 1 + + if isinstance(switch_config["vlans"], list): + switch_config["vlans"] = sorted(list(set(switch_config["vlans"]))) + + for interface in interfaces_config.values(): + if isinstance(interface["trunk_vlans"], list): + interface["trunk_vlans"] = sorted(list(set(interface["trunk_vlans"]))) + + return interfaces_config, switch_config + +class DeviceInfoParser: + """ + Parses 'display device manuinfo' command output to extract MAC addresses. + """ + def parse(self, content: str) -> list[str]: + mac_addresses = [] + lines = content.split('\n') + current_slot_cpu = None + + for line in lines: + line = line.strip() + if re.match(r'^\s*Slot\s+\d+\s+CPU\s+0\s*:', line, re.I): + current_slot_cpu = line.strip() + continue + if current_slot_cpu and line.startswith('MAC_ADDRESS'): + mac_match = re.search(r'MAC_ADDRESS\s*:\s*([A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4})', line) + if mac_match: + mac_address = mac_match.group(1) + mac_addresses.append(mac_address) + if line.startswith('Slot') and 'CPU' in line and current_slot_cpu: + if line.strip() != current_slot_cpu: + current_slot_cpu = None + return mac_addresses + +class LogFileProcessor: + """ + Manages the parsing of a single log file, coordinating different parsers + and merging their results into a unified JSON structure. + """ + def __init__(self): + self.interface_parser = InterfaceParser() + self.link_agg_parser = LinkAggregationParser() + self.config_parser = ConfigurationParser() + self.device_info_parser = DeviceInfoParser() + + def _extract_sections(self, file_content: str) -> dict: + """Extracts different sections from the log file content.""" + sections = {} + current_section = None + lines = file_content.splitlines() + + section_patterns = { + "display_interface": re.compile(r"^=+\s*display interface\s*=+$", re.I), + "display_link_aggregation_verbose": re.compile(r"^=+\s*display link-aggregation verbose\s*=+$", re.I), + "display_current_configuration": re.compile(r"^=+\s*display current-configuration\s*=+$", re.I), + "display_device_manuinfo": re.compile(r"^=+\s*display device manuinfo\s*=+$", re.I) + } + + for line in lines: + line_strip = line.strip() + matched_section = None + for section_name, pattern in section_patterns.items(): + if pattern.match(line_strip): + matched_section = section_name + break + + if matched_section: + current_section = matched_section + sections[current_section] = [] + continue + + if re.match(r"^=+$", line_strip): + current_section = None + continue + + if current_section: + sections[current_section].append(line) + + for key in sections: + sections[key] = "\n".join(sections[key]) + return sections + + def _extract_switch_name(self, content: str) -> str | None: + """Extracts the switch name from the log file content.""" + sysname_match = re.search(r"(?m)^\s*sysname\s+(.+)$", content) + if sysname_match: + return sysname_match.group(1).strip() + return None + + def _merge_configuration_into_interfaces(self, interfaces: dict, interfaces_config: dict): + """Merges configuration details into the parsed interface data.""" + for interface_name, config in interfaces_config.items(): + if interface_name in interfaces: + interfaces[interface_name].update({ + "type": config["type"], + "access_vlan": config["access_vlan"], + "trunk_vlans": config["trunk_vlans"], + "ip_address": config["ip_address"], + "shutdown": config["shutdown"], + "config_description": config["description"], + }) + if config["port_channel"] is not None: + interfaces[interface_name]["port_channel"] = config["port_channel"] + else: + interfaces[interface_name] = { + "name": interface_name, + "current_state": None, + "description": None, + "speed_mode": None, + "pvid": None, + "port_channel": config["port_channel"], + "type": config["type"], + "access_vlan": config["access_vlan"], + "trunk_vlans": config["trunk_vlans"], + "ip_address": config["ip_address"], + "shutdown": config["shutdown"], + "config_description": config["description"], + "nb_liens": None, + "mac_destination": None + } + + def _merge_link_aggregation_counts(self, interfaces: dict, link_counts: dict): + """Merges link aggregation counts into the interface data.""" + for interface_name, count in link_counts.items(): + if interface_name in interfaces: + interfaces[interface_name]["nb_liens"] = count + + def _merge_mac_mappings(self, interfaces: dict, mac_mappings: dict): + """Merges MAC address mappings into the interface data.""" + for interface_name, mac_address in mac_mappings.items(): + if interface_name in interfaces: + interfaces[interface_name]["mac_destination"] = mac_address + else: + interfaces[interface_name] = { + "name": interface_name, + "current_state": None, + "description": None, + "speed_mode": None, + "pvid": None, + "port_channel": None, + "type": None, + "access_vlan": None, + "trunk_vlans": [], + "ip_address": None, + "shutdown": False, + "config_description": None, + "nb_liens": None, + "mac_destination": mac_address + } + + def _merge_bridge_mappings(self, interfaces: dict, bridge_mappings: dict): + """Merges bridge aggregation mappings into existing interfaces.""" + for interface_name, bridge_name in bridge_mappings.items(): + if interface_name in interfaces: + interfaces[interface_name]["bridge_name"] = bridge_name + + def process_file(self, filepath: str) -> dict | None: + """ + Processes a single log file to extract and consolidate network device data. + Returns a dictionary containing metadata and parsed data, or None on error. + """ + try: + with open(filepath, 'r', encoding='latin-1', errors='ignore') as f: + content = f.read() + + sections = self._extract_sections(content) + + if not sections: + return None + + mac_addresses = [] + if "display_device_manuinfo" in sections: + mac_addresses = self.device_info_parser.parse(sections["display_device_manuinfo"]) + + result = { + "metadata": { + "filename": os.path.basename(filepath), + "switch_name": self._extract_switch_name(content), + "extraction_date": datetime.now().isoformat(), + "sections_found": list(sections.keys()), + "mac_addresses": mac_addresses + }, + "data": {} + } + + interfaces = {} + + if "display_interface" in sections: + interfaces = self.interface_parser.parse(sections["display_interface"]) + + if "display_current_configuration" in sections: + try: + interfaces_config, switch_config = self.config_parser.parse(sections["display_current_configuration"]) + self._merge_configuration_into_interfaces(interfaces, interfaces_config) + result["data"]["switch_config"] = switch_config + except Exception as e: + print(f" - Error parsing current-configuration for {filepath}: {e}", file=sys.stderr) + result["data"]["switch_config"] = { + "version": None, "lldp": {"enabled": False, "details": {}}, + "stp": {"enabled": False, "details": {}}, "ssh": {"enabled": False, "details": {}}, + "snmp_contact": None, "ntp_server": [], "users": [], "vlans": [] + } + + if "display_link_aggregation_verbose" in sections: + try: + link_counts, mac_mappings, bridge_mappings = self.link_agg_parser.parse(sections["display_link_aggregation_verbose"]) + self._merge_link_aggregation_counts(interfaces, link_counts) + self._merge_mac_mappings(interfaces, mac_mappings) + self._merge_bridge_mappings(interfaces, bridge_mappings) + except Exception as e: + print(f" - Error parsing link-aggregation verbose for {filepath}: {e}", file=sys.stderr) + + result["data"]["interfaces"] = interfaces + + return result + + except Exception as e: + print(f"Error processing file {filepath}: {e}", file=sys.stderr) + return None + +class DataExporter: + """ + Handles the export of processed data to a JSON file. + """ + def export(self, data: dict, output_filepath: str): + try: + output_dir = os.path.dirname(output_filepath) + if output_dir and not os.path.exists(output_dir): + os.makedirs(output_dir) + + with open(output_filepath, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + print(f"Data saved to: {output_filepath}") + except Exception as e: + print(f"Error saving data to {output_filepath}: {e}", file=sys.stderr) + +class MainApplication: + """ + Main application class to orchestrate the log file processing. + """ + def __init__(self): + self.processor = LogFileProcessor() + self.exporter = DataExporter() + + def run(self, log_file_pattern: str): + current_dir = os.getcwd() + log_files = glob.glob(os.path.join(current_dir, log_file_pattern)) + + if not log_files: + print(f"No log files found matching pattern: {log_file_pattern}", file=sys.stderr) + return + + results = {} + success_count = 0 + + for log_file in sorted(log_files): + result = self.processor.process_file(log_file) + if result: + results[os.path.basename(log_file)] = result + success_count += 1 + + output_file = "./src/data.json" + self.exporter.export(results, output_file) + + +def process_file_return_json(filepath: str) -> dict | None: + """ + Standalone function to process a single file and return JSON object, + useful for external calls that don't need the full CLI application. + """ + processor = LogFileProcessor() + return processor.process_file(filepath) + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python extract_json.py ") + else: + app = MainApplication() + app.run(sys.argv[1]) \ No newline at end of file diff --git a/src/scripts/format.py b/src/scripts/format.py new file mode 100644 index 0000000..948212b --- /dev/null +++ b/src/scripts/format.py @@ -0,0 +1,254 @@ +import sys +import os +from pathlib import Path +from typing import Dict, List, Optional, Any +from dataclasses import dataclass +from contextlib import redirect_stdout +from scripts.extract_json import process_file_return_json + + +@dataclass +class BridgeInfo: + """Informations sur un bridge.""" + nb_liens: int + speed: str + + @classmethod + def from_interface_data(cls, interface_data: Dict[str, Any]) -> 'BridgeInfo': + """Crée une instance à partir des données d'interface.""" + return cls( + nb_liens=interface_data.get("nb_liens", 0), + speed=interface_data.get("speed_mode", "unknown") + ) + + +@dataclass +class InterfaceInfo: + """Informations sur une interface réseau.""" + name: str + mac_destination: Optional[str] + bridge_name: Optional[str] + + @classmethod + def from_interface_data(cls, interface_data: Dict[str, Any]) -> 'InterfaceInfo': + """Crée une instance à partir des données d'interface.""" + return cls( + name=interface_data.get("name", ""), + mac_destination=interface_data.get("mac_destination"), + bridge_name=interface_data.get("bridge_name") + ) + + +@dataclass +class DeviceInfo: + """Informations sur un équipement réseau.""" + switch_name: str + mac_addresses: List[str] + interfaces: Dict[str, Any] + + @property + def mac_addresses_upper(self) -> List[str]: + """Retourne les adresses MAC en majuscules.""" + return [mac.upper() for mac in self.mac_addresses] + + @property + def mac_addresses_str(self) -> str: + """Retourne les adresses MAC formatées en une chaîne.""" + return ','.join(self.mac_addresses_upper) + + +class LogDataExtractor: + """Classe pour extraire les données des fichiers de log.""" + + @staticmethod + def extract_log_data(filename: str) -> Optional[Dict[str, Any]]: + """Extrait les informations du fichier de log et retourne un dictionnaire.""" + try: + data = process_file_return_json(filename) + return data if data else None + except Exception as e: + print(f"Erreur lors du traitement de {filename} : {e}") + sys.exit(1) + + @classmethod + def create_device_info(cls, filename: str) -> Optional[DeviceInfo]: + """Crée un objet DeviceInfo à partir d'un fichier de log.""" + data = cls.extract_log_data(filename) + if not data: + return None + + metadata = data.get("metadata", {}) + return DeviceInfo( + switch_name=metadata.get("switch_name", "unknown"), + mac_addresses=metadata.get("mac_addresses", []), + interfaces=data.get("data", {}).get("interfaces", {}) + ) + + +class BridgeAnalyzer: + """Classe pour analyser les informations de bridge.""" + + @staticmethod + def extract_bridge_info(interfaces: Dict[str, Any]) -> Dict[str, BridgeInfo]: + """Extrait les informations de bridge des interfaces.""" + bridge_info = {} + + for interface_name, interface_data in interfaces.items(): + if interface_name.startswith("Bridge-Aggregation"): + bridge_info[interface_name] = BridgeInfo.from_interface_data(interface_data) + + return bridge_info + + @staticmethod + def get_bridge_details(bridge_info: Dict[str, BridgeInfo], bridge_name: str) -> tuple[str, int]: + """Récupère les détails d'un bridge spécifique.""" + bridge = bridge_info.get(bridge_name, BridgeInfo(0, "unknown")) + return bridge.speed, bridge.nb_liens + + +class NetworkFormatter: + """Classe pour formater les sorties réseau.""" + + @staticmethod + def format_device_output(device_info: DeviceInfo, interface_info: InterfaceInfo, + speed: str, nb_liens: int) -> str: + """Formate la sortie pour un équipement.""" + mac_dest = interface_info.mac_destination.upper() if interface_info.mac_destination else "" + return (f"{device_info.switch_name} [{device_info.mac_addresses_str}] " + f"{interface_info.name} -> {mac_dest} " + f"[{interface_info.bridge_name},{speed},{nb_liens}]") + + +class CoeurAnalyzer: + """Classe pour analyser les équipements coeur.""" + + def __init__(self): + self.mac_coeur = "" + + def set_mac_coeur(self, mac_addresses: List[str]) -> None: + """Met les MAC du coeur dans une variable d'instance.""" + self.mac_coeur = ', '.join(mac_addresses) + + def analyze(self, filename: str) -> None: + """Analyse l'extraction json du fichier log du coeur.""" + device_info = LogDataExtractor.create_device_info(filename) + if not device_info: + return + + # Stocker les MAC du coeur pour les switches + self.set_mac_coeur(device_info.mac_addresses_upper) + + bridge_info = BridgeAnalyzer.extract_bridge_info(device_info.interfaces) + + for interface_data in device_info.interfaces.values(): + interface_info = InterfaceInfo.from_interface_data(interface_data) + + if interface_info.mac_destination: + speed, nb_liens = BridgeAnalyzer.get_bridge_details( + bridge_info, interface_info.bridge_name + ) + + output = NetworkFormatter.format_device_output( + device_info, interface_info, speed, nb_liens + ) + print(output) + + +class SwitchAnalyzer: + """Classe pour analyser les switches.""" + + def __init__(self, coeur_analyzer: CoeurAnalyzer): + self.coeur_analyzer = coeur_analyzer + + def analyze(self, filename: str) -> None: + """Analyse l'extraction json du fichier log d'un switch.""" + device_info = LogDataExtractor.create_device_info(filename) + if not device_info: + return + + bridge_info = BridgeAnalyzer.extract_bridge_info(device_info.interfaces) + + for interface_data in device_info.interfaces.values(): + interface_info = InterfaceInfo.from_interface_data(interface_data) + + if (interface_info.mac_destination and + interface_info.mac_destination.upper() in self.coeur_analyzer.mac_coeur): + + speed, nb_liens = BridgeAnalyzer.get_bridge_details( + bridge_info, interface_info.bridge_name + ) + + output = NetworkFormatter.format_device_output( + device_info, interface_info, speed, nb_liens + ) + print(output) + + +class NetworkAnalysisOrchestrator: + """Classe principale pour orchestrer l'analyse réseau.""" + + def __init__(self, base_dir: Path = None): + if base_dir is None: + base_dir = Path(__file__).parent.parent + + self.base_dir = Path(base_dir) + self.log_dir = self.base_dir / 'logs' + self.output_file = self.base_dir / 'data.txt' + + self.coeur_analyzer = CoeurAnalyzer() + self.switch_analyzer = SwitchAnalyzer(self.coeur_analyzer) + + def _get_log_files(self, exclude_filename: str) -> List[Path]: + """Récupère tous les fichiers de log sauf celui exclu.""" + if not self.log_dir.exists(): + return [] + + return [ + log_file for log_file in self.log_dir.glob('*.log') + if log_file.name != exclude_filename + ] + + def run_analysis(self, filename: str) -> None: + """Lance l'analyse du coeur et des switches à partir d'un fichier log coeur donné.""" + coeur_log_path = self.log_dir / filename + + if not coeur_log_path.exists(): + print(f"Erreur: Le fichier {coeur_log_path} n'existe pas") + return + + # Rediriger la sortie vers le fichier + with open(self.output_file, 'w', encoding='utf-8') as f: + with redirect_stdout(f): + print("Coeur:") + self.coeur_analyzer.analyze(str(coeur_log_path)) + + print("Switches:") + switch_files = self._get_log_files(filename) + for switch_file in switch_files: + self.switch_analyzer.analyze(str(switch_file)) + + +class FormatMain: + """Classe principale pour le module format.""" + + def __init__(self, base_dir: Path = None): + self.orchestrator = NetworkAnalysisOrchestrator(base_dir) + + def run_analysis(self, filename: str) -> None: + """Point d'entrée public pour l'analyse.""" + self.orchestrator.run_analysis(filename) + + +def main(filename: str = None) -> None: + """Fonction principale.""" + if filename is None: + if len(sys.argv) != 2: + print("Usage: python format.py ") + sys.exit(1) + filename = sys.argv[1] + + format_main = FormatMain() + format_main.run_analysis(filename) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/scripts/mermaid.py b/src/scripts/mermaid.py new file mode 100644 index 0000000..457b7cd --- /dev/null +++ b/src/scripts/mermaid.py @@ -0,0 +1,358 @@ +import re +import sys +from collections import defaultdict +from pathlib import Path +from typing import List, Optional, Tuple +from dataclasses import dataclass +from scripts.format import FormatMain + + +@dataclass +class NetworkDevice: + """Classe de base pour représenter un équipement réseau.""" + name: str + macs: List[str] + interface: str + dest_mac: str + bridge: str + vitesse: str + nb_liens: str + + def __post_init__(self): + """Validation et nettoyage des données après initialisation.""" + if isinstance(self.macs, str): + self.macs = [mac.strip() for mac in self.macs.split(',')] + self.bridge = self.bridge.strip() + self.vitesse = self.vitesse.strip() + self.nb_liens = self.nb_liens.strip() + + @property + def macs_str(self) -> str: + """Retourne les MACs sous forme de chaîne formatée.""" + return ','.join(self.macs) + + def to_formatted_string(self) -> str: + """Retourne une représentation formatée de l'équipement.""" + return f"{self.name} [{self.macs_str}] {self.interface} -> {self.dest_mac} [{self.bridge},{self.vitesse},{self.nb_liens}]" + + +@dataclass +class Coeur(NetworkDevice): + """Classe représentant un équipement coeur.""" + pass + + +@dataclass +class Switch(NetworkDevice): + """Classe représentant un switch.""" + pass + + +class NetworkFileParser: + """Classe pour parser les fichiers de configuration réseau.""" + + ENCODINGS = ['utf-8', 'utf-16', 'utf-16-le', 'utf-16-be', 'latin-1', 'cp1252', 'iso-8859-1'] + + PATTERNS = [ + r'(\S+)\s+\[([^\]]+)\]\s+(\S+)\s+->\s+([A-Fa-f0-9:-]+)\s*\[([^,]+),([^,]+),([^\]]+)\]', + r'(\S+)\s+\[([^\]]+)\]\s+(\S+)\s*->\s*([A-Fa-f0-9:-]+)\s*\[([^,]+),([^,]+),([^\]]+)\]', + r'(\w+)\s*\[([^\]]+)\]\s*(\w+)\s*->\s*([A-Fa-f0-9:-]+)\s*\[([^,]+),([^,]+),([^\]]+)\]' + ] + + def __init__(self, file_path: str): + self.file_path = Path(file_path) + + def _read_file_with_encoding(self) -> Optional[str]: + """Lit le fichier en essayant différents encodages.""" + for encoding in self.ENCODINGS: + try: + return self.file_path.read_text(encoding=encoding) + except (UnicodeDecodeError, UnicodeError): + continue + except FileNotFoundError: + return None + return None + + def _parse_line(self, line: str, device_class) -> Optional[NetworkDevice]: + """Parse une ligne et retourne un objet NetworkDevice.""" + line = line.strip() + if not line: + return None + + for pattern in self.PATTERNS: + match = re.match(pattern, line) + if match: + return device_class( + name=match.group(1), + macs=match.group(2), + interface=match.group(3), + dest_mac=match.group(4), + bridge=match.group(5), + vitesse=match.group(6), + nb_liens=match.group(7) + ) + + print(f" -> Aucun pattern ne correspond à cette ligne: {line}") + return None + + def parse(self) -> Tuple[List[Coeur], List[Switch]]: + """Parse le fichier et retourne les coeurs et switches.""" + content = self._read_file_with_encoding() + if content is None: + return [], [] + + sections = content.split('Switches:') + if len(sections) < 2: + return [], [] + + coeur_section = sections[0].replace('Coeur:', '').strip() + switches_section = sections[1].strip() + + coeurs = [] + for line in coeur_section.split('\n'): + coeur = self._parse_line(line, Coeur) + if coeur: + coeurs.append(coeur) + + switches = [] + for line in switches_section.split('\n'): + switch = self._parse_line(line, Switch) + if switch: + switches.append(switch) + + return coeurs, switches + + +class NetworkDeviceGrouper: + """Classe pour grouper les équipements réseau.""" + + @staticmethod + def group_coeurs_by_dest_mac(coeurs: List[Coeur]) -> List[Coeur]: + """Groupe les coeurs par adresse MAC de destination.""" + grouped = defaultdict(list) + + for coeur in coeurs: + key = (coeur.dest_mac, coeur.name, coeur.macs_str, + coeur.bridge, coeur.vitesse, coeur.nb_liens) + grouped[key].append(coeur) + + result = [] + for key, group in grouped.items(): + dest_mac, name, macs_str, bridge, vitesse, nb_liens = key + interfaces = [item.interface for item in group] + + result.append(Coeur( + name=name, + macs=macs_str.split(','), + interface='-'.join(interfaces), + dest_mac=dest_mac, + bridge=bridge, + vitesse=vitesse, + nb_liens=nb_liens + )) + + return result + + @staticmethod + def group_switches_by_local_mac(switches: List[Switch]) -> List[Switch]: + """Groupe les switches par adresse MAC locale.""" + grouped = defaultdict(list) + + for switch in switches: + key = (switch.name, switch.macs_str, switch.dest_mac, + switch.bridge, switch.vitesse, switch.nb_liens) + grouped[key].append(switch) + + result = [] + for key, group in grouped.items(): + name, macs_str, dest_mac, bridge, vitesse, nb_liens = key + interfaces = [item.interface for item in group] + + result.append(Switch( + name=name, + macs=macs_str.split(','), + interface='-'.join(interfaces), + dest_mac=dest_mac, + bridge=bridge, + vitesse=vitesse, + nb_liens=nb_liens + )) + + return result + + +class MermaidDiagramGenerator: + """Classe pour générer les diagrammes Mermaid.""" + + def __init__(self, coeurs: List[Coeur], switches: List[Switch]): + self.coeurs = coeurs + self.switches = switches + + def _find_matching_switch(self, coeur: Coeur) -> Optional[Switch]: + """Trouve le switch correspondant à un coeur.""" + for switch in self.switches: + if coeur.dest_mac in switch.macs: + return switch + return None + + def _format_bridge_label(self, bridge: str, vitesse: str) -> str: + """Formate le label du bridge.""" + bridge_formatted = bridge.replace('Bridge-Aggregation', 'BAGG') + return f"{bridge_formatted}
{vitesse}" + + def generate_links_diagram(self) -> str: + """Génère un diagramme Mermaid des liaisons.""" + if not self.coeurs or not self.switches: + return "Aucune donnée à afficher" + + mermaid_lines = ["graph LR"] + + total_devices = min(len(self.coeurs), len(self.switches)) + + for i, coeur in enumerate(self.coeurs): + target_switch = self._find_matching_switch(coeur) + label = self._format_bridge_label(coeur.bridge, coeur.vitesse) + + if target_switch: + if i < total_devices // 2: + line = f' Coeur(("{coeur.name}")) <-->|{label}| {target_switch.name}' + else: + line = f' {target_switch.name} <-->|{label}| Coeur(("{coeur.name}"))' + mermaid_lines.append(line) + else: + line = f' Coeur(("{coeur.name}")) <-->|{label}| {coeur.dest_mac}["📄manquant {coeur.dest_mac}"]' + mermaid_lines.append(line) + mermaid_lines.append(f' class {coeur.dest_mac} error;') + + return '\n'.join(mermaid_lines) + + +class FileWriter: + """Classe pour l'écriture de fichiers.""" + + @staticmethod + def write_mermaid_diagram(mermaid_code: str, output_path: Path) -> None: + """Sauvegarde le diagramme Mermaid.""" + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + + content = [ + "# Diagramme des liaisons Coeur-Switch\n", + "```mermaid\n", + "---\n", + "config:\n", + " theme: 'base'\n", + " themeVariables:\n", + " primaryColor: '#25bb75ff'\n", + " primaryTextColor: '#ffffff'\n", + " primaryBorderColor: '#000000ff'\n", + " lineColor: '#f82929ff'\n", + " secondaryColor: '#5e0c1aff'\n", + " tertiaryColor: '#ffffff'\n", + "---\n", + mermaid_code, + "\nclass Coeur coeur;\n", + "classDef coeur fill:#0590e6;\n", + "classDef error fill:#e64c05;\n", + "\n```\n" + ] + + output_path.write_text(''.join(content), encoding='utf-8') + print(f"✅ Diagramme Mermaid généré : {output_path}") + + except Exception as e: + print(f"Erreur lors de la sauvegarde: {e}") + + @staticmethod + def write_grouped_file(coeurs: List[Coeur], switches: List[Switch], output_path: Path) -> None: + """Écrit les équipements groupés dans un fichier.""" + try: + lines = ["Coeur:\n"] + lines.extend(f"{coeur.to_formatted_string()}\n" for coeur in coeurs) + lines.append("Switches:\n") + lines.extend(f"{switch.to_formatted_string()}\n" for switch in switches) + + output_path.write_text(''.join(lines), encoding='utf-8') + + except Exception as e: + print(f"Erreur lors de la sauvegarde: {e}") + + @staticmethod + def write_links_file(coeurs: List[Coeur], switches: List[Switch], output_path: Path) -> None: + """Écrit les liens entre coeurs et switches.""" + try: + lines = [] + + for coeur in coeurs: + matching_switch = None + for switch in switches: + if coeur.dest_mac in switch.macs: + matching_switch = switch + break + + if matching_switch: + coeur_info = f"{coeur.name} [{coeur.macs_str}] {coeur.interface} [{coeur.bridge},{coeur.vitesse},{coeur.nb_liens}]" + switch_info = f"{matching_switch.name} [{matching_switch.macs_str}] {matching_switch.interface} [{matching_switch.bridge},{matching_switch.vitesse},{matching_switch.nb_liens}]" + lines.append(f"{coeur_info} -> {switch_info}\n") + else: + lines.append(f"{coeur.name} [{coeur.macs_str}] {coeur.interface} -> Aucune correspondance de switch pour MAC {coeur.dest_mac}\n") + + output_path.write_text(''.join(lines), encoding='utf-8') + + except Exception as e: + print(f"Erreur lors de l'écriture des liens: {e}") + + +class NetworkAnalyzer: + """Classe principale pour l'analyse réseau.""" + + def __init__(self, base_dir: Path = None): + if base_dir is None: + base_dir = Path(__file__).parent.parent + + self.base_dir = Path(base_dir) + self.data_file = self.base_dir / 'data.txt' + self.output_dir = self.base_dir / 'output' + self.mermaid_file = self.output_dir / 'mermaid.md' + + def analyze(self, filename: str) -> None: + """Analyse complète du réseau.""" + # Exécute l'analyse initiale + FormatMain().run_analysis(filename) + + # Parse le fichier + parser = NetworkFileParser(self.data_file) + coeurs, switches = parser.parse() + + if not coeurs and not switches: + print("Impossible de lire le fichier ou format incorrect") + return + + # Groupe les équipements + grouper = NetworkDeviceGrouper() + grouped_coeurs = grouper.group_coeurs_by_dest_mac(coeurs) + grouped_switches = grouper.group_switches_by_local_mac(switches) + + # Écrit les fichiers de sortie + writer = FileWriter() + writer.write_grouped_file(grouped_coeurs, grouped_switches, self.data_file) + writer.write_links_file(grouped_coeurs, grouped_switches, self.data_file) + + # Génère le diagramme Mermaid + diagram_generator = MermaidDiagramGenerator(grouped_coeurs, grouped_switches) + mermaid_code = diagram_generator.generate_links_diagram() + writer.write_mermaid_diagram(mermaid_code, self.mermaid_file) + + +def main(): + """Fonction principale.""" + if len(sys.argv) != 2: + print("Usage: python mermaid.py ") + sys.exit(1) + + analyzer = NetworkAnalyzer() + analyzer.analyze(filename=sys.argv[1]) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/scripts/parse_uplinks.py b/src/scripts/parse_uplinks.py new file mode 100644 index 0000000..3fae6b7 --- /dev/null +++ b/src/scripts/parse_uplinks.py @@ -0,0 +1,448 @@ +import os +import re +import sys +import json +import logging +from typing import Dict, List, Optional, Union, Any +from dataclasses import dataclass +from pathlib import Path +import pandas as pd +from openpyxl import load_workbook +from scripts.extract_json import process_file_return_json + +mac_coeur=[] + +@dataclass +class Config: + """Configuration centralisée pour l'application.""" + LOGS_DIR: str = './src/logs' + OUTPUT_DIR: str = './src/output' + DATA_JSON_PATH: str = './src/data.json' + OUTPUT_FILE_NAME: str = 'uplink_report.xlsx' + + @property + def output_file_path(self) -> str: + return os.path.join(self.OUTPUT_DIR, self.OUTPUT_FILE_NAME) + + +@dataclass +class InterfaceResult: + """Résultat d'analyse d'une interface.""" + switch: str + logfile: str + interface: str + status: str + speed_gbps: Optional[Union[int, float, str]] + port_channel: Optional[str] + description: str + issues: str + mac_local: Optional[List[str]] + mac_destination: Optional[str] + bridge_name: Optional[str] + bridge_speed: Optional[Union[int, float, str]] + nb_liens: Optional[int] + + +@dataclass +class LCInterfaceResult: + """Résultat d'analyse d'une interface LC.""" + switch: str + interface: str + status: str + speed_gbps: Optional[Union[int, float, str]] + port_channel: Optional[str] + description: str + type_lien: str + nb_liens: Optional[int] + bridge_utilise: Optional[str] + mac_destination: Optional[str] + switch_destination: str + + +class InterfaceAnalyzer: + """Analyseur d'interfaces réseau.""" + + CORE_INTERLINK_PATTERN = re.compile(r'/0/(49|50)$') + SPEED_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*Gbps", re.I) + SHORT_NAME_PATTERN = re.compile(r'(\d+/\d+/\d+)$') + INTERCORE_PATTERN = re.compile(r'[1-4]/0/4[3-8]$') + CORE_INTERFACE_PATTERN = re.compile(r'/0/(4[89]|50)$') + + @classmethod + def is_core_interlink(cls, interface_name: str) -> bool: + """Détermine si l'interface est un lien inter-cœur.""" + return cls.CORE_INTERLINK_PATTERN.search(interface_name) is not None + + @classmethod + def is_intercore_link(cls, interface_name: str) -> bool: + """Détermine si l'interface est un lien inter-cœur (LC).""" + return cls.INTERCORE_PATTERN.search(interface_name) is not None + + @classmethod + def guess_speed(cls, speed_mode: Optional[str], interface_name: Optional[str] = None) -> Optional[Union[int, float, str]]: + """Devine la vitesse d'une interface.""" + if speed_mode: + match = cls.SPEED_PATTERN.match(speed_mode) + if match: + return float(match.group(1)) + + if interface_name and cls.CORE_INTERFACE_PATTERN.search(interface_name): + if "Ten-GigabitEthernet" in interface_name: + return 10 + elif "GigabitEthernet" in interface_name: + return 1 + elif "HundredGigE" in interface_name: + return 100 + elif "M-GigabitEthernet" in interface_name: + return "?" + + return None + + @classmethod + def extract_short_name(cls, interface_name: str) -> str: + """Extrait le nom court d'une interface.""" + match = cls.SHORT_NAME_PATTERN.search(interface_name) + return match.group(1) if match else interface_name + + @classmethod + def identify_issues(cls, info: Dict[str, Any], speed: Optional[Union[int, float, str]]) -> List[str]: + """Identifie les problèmes potentiels d'une interface.""" + issues = [] + + if info.get("current_state") is not None: + if info.get("current_state", "").upper() != "UP": + issues.append("DOWN") + + if isinstance(speed, (int, float)) and speed < 10: + issues.append(f"Poss. goulot ({speed} Gbps)") + + if info.get("port_channel"): + issues.append(f"Membre Port-Channel {info['port_channel']}") + + return issues + + +class SwitchLogAnalyzer: + """Analyseur de logs de switchs.""" + + def __init__(self, config: Config): + self.config = config + self.analyzer = InterfaceAnalyzer() + self.logger = self._setup_logger() + + def _setup_logger(self) -> logging.Logger: + """Configure le logger.""" + logger = logging.getLogger(__name__) + if not logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + return logger + + def analyze_switch_log( + self, + filename: str, + switch_name: str, + interfaces: Dict[str, Any], + mac_addresses: Optional[List[str]] = None, + is_core: bool = False, + filter_mac_destination: bool = True + ) -> List[InterfaceResult]: + """Analyse les interfaces d'un switch.""" + results = [] + + for iface_name, info in interfaces.items(): + if not self._should_process_interface(info, filter_mac_destination, is_core, iface_name): + continue + + result = self._create_interface_result( + filename, switch_name, iface_name, info, interfaces, mac_addresses + ) + results.append(result) + + return results + + def _should_process_interface( + self, + info: Dict[str, Any], + filter_mac_destination: bool, + is_core: bool, + iface_name: str + ) -> bool: + """Détermine si une interface doit être traitée.""" + global mac_coeur + if filter_mac_destination and info.get("mac_destination") not in mac_coeur: + return False + + if is_core and not self.analyzer.is_core_interlink(iface_name): + return False + + return True + + def _create_interface_result( + self, + filename: str, + switch_name: str, + iface_name: str, + info: Dict[str, Any], + interfaces: Dict[str, Any], + mac_addresses: Optional[List[str]] + ) -> InterfaceResult: + """Crée un résultat d'interface.""" + mac_destination = info.get("mac_destination") + bridge_name = info.get("bridge_name") + bridge_info = interfaces.get(bridge_name, {}) if bridge_name else {} + + speed = self.analyzer.guess_speed(info.get("speed_mode"), iface_name) + bridge_speed = self.analyzer.guess_speed(bridge_info.get("speed_mode"), bridge_name) + + issues = self.analyzer.identify_issues(info, speed) + + description = info.get("description") or info.get("config_description") or "" + + return InterfaceResult( + switch=switch_name, + logfile=filename, + interface=iface_name, + status=info.get("current_state", "unknown"), + speed_gbps=speed, + port_channel=info.get("port_channel"), + description=description, + issues=", ".join(issues) if issues else "RAS", + mac_local=mac_addresses, + mac_destination=mac_destination, + bridge_name=bridge_name, + bridge_speed=bridge_speed, + nb_liens=bridge_info.get("nb_liens") + ) + + def analyze_lc_log(self, filename: str, switch_name: str, interfaces: Dict[str, Any]) -> List[LCInterfaceResult]: + """Analyse les interfaces LC.""" + results = [] + + for iface_name, info in interfaces.items(): + if not info or not isinstance(info, dict): + continue + + result = self._create_lc_interface_result(filename, switch_name, iface_name, info) + results.append(result) + + return results + + def _create_lc_interface_result( + self, + filename: str, + switch_name: str, + iface_name: str, + info: Dict[str, Any] + ) -> LCInterfaceResult: + """Crée un résultat d'interface LC.""" + speed = self.analyzer.guess_speed(info.get("speed_mode"), iface_name) + type_lien = "Lien inter-coeur" if self.analyzer.is_intercore_link(iface_name) else "Vers Accès" + description = info.get("description") or info.get("config_description") or "" + + nb_liens = info.get("nb_liens") if iface_name.startswith("Bridge-Aggregation") else None + bridge_utilise = None if iface_name.startswith("Bridge-Aggregation") else info.get("bridge_name") + mac_destination = None if iface_name.startswith("Bridge-Aggregation") else info.get("mac_destination") + + return LCInterfaceResult( + switch=switch_name, + interface=iface_name, + status=info.get("current_state", "unknown"), + speed_gbps=speed, + port_channel=info.get("port_channel"), + description=description, + type_lien=type_lien, + nb_liens=nb_liens, + bridge_utilise=bridge_utilise, + mac_destination=mac_destination, + switch_destination="" + ) + + +class ExcelReportGenerator: + """Générateur de rapports Excel.""" + + def __init__(self, config: Config): + self.config = config + self.logger = logging.getLogger(__name__) + + def generate_report(self, uplink_results: List[InterfaceResult], lc_results: List[LCInterfaceResult]) -> None: + """Génère le rapport Excel.""" + df_uplink = self._create_uplink_dataframe(uplink_results) + df_lc = self._create_lc_dataframe(lc_results) + + os.makedirs(self.config.OUTPUT_DIR, exist_ok=True) + + self._write_excel_file(df_uplink, df_lc) + self._format_excel_file() + + print(f"✅ Rapport Excel généré : {self.config.output_file_path}") + + def _create_uplink_dataframe(self, results: List[InterfaceResult]) -> pd.DataFrame: + """Crée le DataFrame pour les uplinks.""" + data = [] + for result in results: + data.append({ + "switch": result.switch, + "logfile": result.logfile, + "interface": result.interface, + "status": result.status, + "speed_gbps": result.speed_gbps, + "port_channel": result.port_channel, + "description": result.description, + "issues": result.issues, + "mac_local": result.mac_local, + "mac_destination": result.mac_destination, + "bridge_name": result.bridge_name, + "bridge_speed": result.bridge_speed, + "nb_liens": result.nb_liens + }) + return pd.DataFrame(data) + + def _create_lc_dataframe(self, results: List[LCInterfaceResult]) -> pd.DataFrame: + """Crée le DataFrame pour les interfaces LC.""" + data = [] + for result in results: + data.append({ + "Switch": result.switch, + "Interface": result.interface, + "Status": result.status, + "Vitesse (Gbps)": result.speed_gbps, + "Port-Channel": result.port_channel, + "Description": result.description, + "Type de lien": result.type_lien, + "Nb liens": result.nb_liens, + "Bridge utilisé": result.bridge_utilise, + "MAC destination": result.mac_destination, + "Switch destination": result.switch_destination + }) + return pd.DataFrame(data) + + def _write_excel_file(self, df_uplink: pd.DataFrame, df_lc: pd.DataFrame) -> None: + """Écrit le fichier Excel.""" + with pd.ExcelWriter(self.config.output_file_path, engine='openpyxl') as writer: + df_uplink.to_excel(writer, index=False, sheet_name="Uplinks") + df_lc.to_excel(writer, index=False, sheet_name="LC_Interfaces") + + def _format_excel_file(self) -> None: + """Met en forme le fichier Excel.""" + wb = load_workbook(self.config.output_file_path) + + self._add_formulas(wb) + self._format_columns(wb) + + wb.save(self.config.output_file_path) + + def _add_formulas(self, workbook) -> None: + """Ajoute les formules Excel.""" + ws = workbook["LC_Interfaces"] + + for row in range(2, 400): + formula = ( + f'=IF(J{row}="","",IFERROR(INDEX(Uplinks!A$2:A$400,' + f'MATCH(TRUE,ISNUMBER(SEARCH(J{row},Uplinks!I$2:I$400)),0)),""))' + ) + ws[f'K{row}'] = formula + + def _format_columns(self, workbook) -> None: + """Met en forme les colonnes.""" + column_widths = { + "LC_Interfaces": [15, 25, 25, 15, 12, 45, 15, 10, 22, 20, 20], + "Uplinks": [15, 30, 25, 7, 12, 12, 30, 22, 47, 15, 20, 13, 10] + } + + for sheet_name, widths in column_widths.items(): + ws = workbook[sheet_name] + for col_idx, width in enumerate(widths, start=1): + ws.column_dimensions[chr(64 + col_idx)].width = width + + +class UplinkReportGenerator: + """Générateur principal de rapports d'uplink.""" + + def __init__(self, config: Optional[Config] = None): + self.config = config or Config() + self.analyzer = SwitchLogAnalyzer(self.config) + self.report_generator = ExcelReportGenerator(self.config) + self.logger = logging.getLogger(__name__) + + def generate_report(self, lc_filename_prefix: str) -> None: + """Génère le rapport complet.""" + try: + uplink_results, lc_results = self._process_log_files(lc_filename_prefix) + self.report_generator.generate_report(uplink_results, lc_results) + except Exception as e: + self.logger.error(f"Erreur lors de la génération du rapport : {e}") + raise + + def _process_log_files(self, lc_filename_prefix: str) -> tuple[List[InterfaceResult], List[LCInterfaceResult]]: + """Traite tous les fichiers de logs.""" + global mac_coeur + uplink_results = [] + lc_results = [] + + log_files = self._get_log_files() + + for filename in log_files: + try: + data = self._process_single_log_file(filename) + if not data: + continue + + interfaces = data.get("data", {}).get("interfaces", {}) + switch_name = data.get("metadata", {}).get("switch_name", "unknown") + + if filename.startswith(lc_filename_prefix): + mac_coeur = data.get("metadata", {}).get("mac_addresses", []) + mac_coeur = [mac.lower() for mac in mac_coeur] + lc_results.extend(self.analyzer.analyze_lc_log(filename, switch_name, interfaces)) + else: + mac_addresses = data.get("metadata", {}).get("mac_addresses", []) + uplink_results.extend( + self.analyzer.analyze_switch_log( + filename, switch_name, interfaces, + mac_addresses=mac_addresses, + filter_mac_destination=True + ) + ) + + except Exception as e: + self.logger.error(f"Erreur lors du traitement de {filename}: {e}") + continue + + return uplink_results, lc_results + + def _get_log_files(self) -> List[str]: + """Récupère la liste des fichiers de logs.""" + if not os.path.exists(self.config.LOGS_DIR): + raise FileNotFoundError(f"Le répertoire {self.config.LOGS_DIR} n'existe pas") + + return [f for f in os.listdir(self.config.LOGS_DIR) if f.endswith('.log')] + + def _process_single_log_file(self, filename: str) -> Optional[Dict[str, Any]]: + """Traite un fichier de log individuel.""" + filepath = os.path.join(self.config.LOGS_DIR, filename) + return process_file_return_json(filepath) + + +def main(): + """Fonction principale.""" + if len(sys.argv) < 2: + print("Usage: python parse_uplinks.py (sans chemin)") + sys.exit(1) + + lc_filename_prefix = sys.argv[1] + + try: + generator = UplinkReportGenerator() + generator.generate_report(lc_filename_prefix) + except Exception as e: + print(f"❌ Erreur : {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file