first commit

This commit is contained in:
Chevallier
2025-07-23 09:39:15 +02:00
commit ab7611a923
6 changed files with 1778 additions and 0 deletions

57
README.md Normal file
View File

@@ -0,0 +1,57 @@
# Parseur de Logs HPE 5130
Ce projet permet d'extraire automatiquement des informations pertinentes depuis les logs de switchs **HPE 5130**, notamment les connexions entre les switchs d'accès et le cœur de réseau. Les résultats sont exportés sous deux formats :
- Un **fichier Excel** listant les interfaces côté accès et cœur.
- Un **diagramme Mermaid** simplifié représentant les connexions.
---
## 🧰 Fonctionnalités
- Extraction des données utiles depuis les fichiers `.log` au format **JSON**.
- Génération dun **rapport Excel (`uplink_report.xlsx`)** contenant :
- Les informations des interfaces côté accès connectées au cœur.
- Les informations des interfaces du cœur.
- Création dun **fichier Mermaid (`mermaid.md`)** représentant graphiquement les connexions.
---
## 🚀 Lancement
Pour lancer loutil, placez-vous dans le dossier du projet et exécutez la commande suivante :
```bash
python .\src\main.py [fichier logs du cœur]
```
#### 💡 Exemple : python .\src\main.py .\src\logs\core_switch.log
---
## 📁 Arborescence du projet
```makefile
C:.
src
logs # Contient les fichiers .log des switchs (à déposer ici)
├───output # Contient les fichiers générés (Excel et Mermaid)
├───scripts # Contient les scripts de traitement
│ ├───extract_json.py # Extraction des données en JSON
│ ├───format.py # Formatage des données
│ ├───mermaid.py # Formatage et génération du fichier Mermaid
│ └───parse_uplinks.py # Création du fichier Excel
└───main.py # Programme de lancement
```
---
## 📌 Instructions d'utilisation
1. Déposer les fichiers .log dans le dossier src/logs.
2. Lancer le script main.py avec en argument le log du switch cœur.
3. Les résultats seront disponibles dans le dossier src/output :
- uplink_report.xlsx
- mermaid.md
---
## ⚠️ Remarque
- Bien penser à **mettre les fichiers de logs avec l'extension `.log`** dans le dossier `src/logs`.

21
src/main.py Normal file
View File

@@ -0,0 +1,21 @@
import sys
import os
from pathlib import Path
sys.path.append(os.path.dirname(__file__))
from scripts.mermaid import NetworkAnalyzer
from scripts.parse_uplinks import UplinkReportGenerator
def main():
"""Fonction principale du programme d'analyse."""
if len(sys.argv) != 2:
print("Usage: python main.py <fichier_log_coeur>")
sys.exit(1)
coeur_log_filename = sys.argv[1]
coeur_log_filename = os.path.basename(coeur_log_filename)
UplinkReportGenerator().generate_report(coeur_log_filename)
NetworkAnalyzer(base_dir=Path(__file__).parent).analyze(coeur_log_filename)
if __name__ == "__main__":
main()

640
src/scripts/extract_json.py Normal file
View File

@@ -0,0 +1,640 @@
import os
import re
import json
import glob
import sys
from datetime import datetime
class InterfaceParser:
"""
Parses 'display interface' command output to extract interface details.
"""
def parse(self, content: str) -> dict:
interfaces = {}
current_interface = None
lines = content.split('\n')
for line in lines:
line = line.strip()
if re.match(r'^(?!\s)(?:interface\s*)?(?:[A-Za-z-]*Ethernet|Bridge-Aggregation|Vlan-interface)\d+', line):
current_interface = line
interfaces[current_interface] = {
"name": current_interface,
"current_state": None,
"description": None,
"speed_mode": None,
"pvid": None,
"port_channel": None,
"bridge_name": None,
"type": None,
"access_vlan": None,
"trunk_vlans": [],
"ip_address": None,
"shutdown": False,
"config_description": None,
"nb_liens": None,
"mac_destination": None
}
continue
if current_interface and line:
if line.startswith('Current state:'):
interfaces[current_interface]["current_state"] = line.split(':', 1)[1].strip()
elif line.startswith('Description:'):
description = line.split(':', 1)[1].strip()
if description and description != f"{current_interface} Interface":
interfaces[current_interface]["description"] = description
else:
interfaces[current_interface]["description"] = None
elif 'speed mode' in line.lower():
speed_match = re.search(r'(\d+(?:\.\d+)?)\s*(Gbps|Mbps|kbps)[^\w]*speed\s+mode', line, re.I)
if speed_match:
speed_value = speed_match.group(1)
speed_unit = speed_match.group(2)
interfaces[current_interface]["speed_mode"] = f"{speed_value}{speed_unit}"
elif 'pvid' in line.lower():
pvid_match = re.search(r'pvid[:\s]*(\d+)', line, re.I)
if pvid_match:
interfaces[current_interface]["pvid"] = int(pvid_match.group(1))
elif 'port channel' in line.lower() or 'channel group' in line.lower():
pc_match = re.search(r'(?:port channel|channel group)[:\s]*(\d+)', line, re.I)
if pc_match:
interfaces[current_interface]["port_channel"] = int(pc_match.group(1))
elif 'member of bridge-aggregation' in line.lower():
ba_match = re.search(r'member of bridge-aggregation\s*(\d+)', line, re.I)
if ba_match:
interfaces[current_interface]["port_channel"] = int(ba_match.group(1))
return interfaces
class LinkAggregationParser:
"""
Parses 'display link-aggregation verbose' command output.
Extracts link counts, MAC mappings, and bridge mappings.
"""
def _convert_port_name(self, port_name: str) -> str:
"""Converts the port name to a standardized format."""
port_name = re.sub(r'\([A-Z]\)', '', port_name)
patterns = [
(r'^XGE(\d+/\d+/\d+)$', r'Ten-GigabitEthernet\1'),
(r'^GE(\d+/\d+/\d+)$', r'GigabitEthernet\1'),
(r'^FE(\d+/\d+/\d+)$', r'HundredGigE\1'),
(r'^TE(\d+/\d+/\d+)$', r'TenGigabitEthernet\1'),
]
for pattern, replacement in patterns:
if re.match(pattern, port_name):
return re.sub(pattern, replacement, port_name)
return port_name
def parse(self, content: str) -> tuple[dict, dict, dict]:
link_counts = {}
mac_mappings = {}
bridge_mappings = {}
lines = content.split('\n')
current_aggregate = None
current_section = None
local_ports = []
remote_ports = []
for line in lines:
line = line.strip()
if line.startswith('Aggregate Interface:'):
interface_match = re.search(r'Aggregate Interface:\s*(.+)', line)
if interface_match:
current_aggregate = interface_match.group(1).strip()
link_counts[current_aggregate] = 0
local_ports = []
remote_ports = []
continue
if line.startswith('Local:'):
current_section = 'local'
continue
if line.startswith('Remote:'):
current_section = 'remote'
continue
if line.startswith('Aggregate Interface:'):
interface_match = re.search(r'Aggregate Interface:\s*(.+)', line)
if interface_match:
current_aggregate = interface_match.group(1).strip()
if current_aggregate and current_section == 'local' and line:
port_match = re.match(r'^([A-Za-z]+\d+/\d+/\d+(?:\([A-Z]\))?)\s+', line)
if port_match:
if not any(keyword in line for keyword in ['Port', 'Status', 'Priority', 'Index', 'Oper-Key', 'Flag']):
local_port = port_match.group(1)
local_ports.append(local_port)
link_counts[current_aggregate] += 1
if current_aggregate and current_section == 'remote' and line:
remote_match = re.match(r'^([A-Za-z]+\d+/\d+/\d+(?:\([A-Z]\))?)\s+\d+\s+\d+\s+\d+\s+0x[0-9a-fA-F]+,\s*([0-9a-fA-F-]+)', line)
if remote_match:
if not any(keyword in line for keyword in ['Actor', 'Priority', 'Index', 'Oper-Key', 'SystemID', 'Flag']):
remote_port = remote_match.group(1)
system_id = remote_match.group(2)
remote_ports.append((remote_port, system_id))
if current_aggregate and local_ports and remote_ports and \
(line.startswith('Aggregate Interface:') or line == lines[-1].strip()):
min_length = min(len(local_ports), len(remote_ports))
for i in range(min_length):
local_port = local_ports[i]
remote_port, system_id = remote_ports[i]
interface_name = self._convert_port_name(local_port)
mac_mappings[interface_name] = system_id
bridge_mappings[interface_name] = current_aggregate
if line.startswith('Aggregate Interface:'):
current_aggregate = None
local_ports = []
remote_ports = []
current_section = None
if current_aggregate and local_ports and remote_ports:
min_length = min(len(local_ports), len(remote_ports))
for i in range(min_length):
local_port = local_ports[i]
remote_port, system_id = remote_ports[i]
interface_name = self._convert_port_name(local_port)
mac_mappings[interface_name] = system_id
bridge_mappings[interface_name] = current_aggregate
for interface, count in link_counts.items():
if 'Bridge-Aggregation' in interface:
link_counts[interface] = count
return link_counts, mac_mappings, bridge_mappings
class ConfigurationParser:
"""
Parses 'display current-configuration' command output.
Extracts interface configurations and general switch configurations.
"""
def _parse_vlan_ranges(self, vlan_string: str) -> list:
"""Parses a VLAN string and returns a list of unique VLANs."""
vlans = set()
parts = vlan_string.split()
i = 0
while i < len(parts):
if i + 2 < len(parts) and parts[i + 1].lower() == 'to':
try:
start_vlan = int(parts[i])
end_vlan = int(parts[i + 2])
vlans.update(range(start_vlan, end_vlan + 1))
i += 3
except ValueError:
i += 1
else:
try:
vlans.add(int(parts[i]))
except ValueError:
pass
i += 1
return sorted(list(vlans))
def parse(self, content: str) -> tuple[dict, dict]:
interfaces_config = {}
switch_config = {
"version": None,
"lldp": {"enabled": False, "details": {}},
"stp": {"enabled": False, "details": {}},
"ssh": {"enabled": False, "details": {}},
"snmp_contact": None,
"ntp_server": [],
"users": [],
"vlans": []
}
lines = content.split('\n')
current_interface = None
current_section = None
i = 0
while i < len(lines):
line = lines[i]
line_stripped = line.strip()
if line_stripped.startswith('version '):
switch_config["version"] = line_stripped.split('version ', 1)[1].strip()
elif 'lldp enable' in line_stripped.lower():
switch_config["lldp"]["enabled"] = True
elif line_stripped.startswith('lldp '):
switch_config["lldp"]["details"][line_stripped] = True
elif 'stp enable' in line_stripped.lower() or 'spanning-tree' in line_stripped.lower():
switch_config["stp"]["enabled"] = True
switch_config["stp"]["details"][line_stripped] = True
elif 'ssh server enable' in line_stripped.lower() or 'ssh user' in line_stripped.lower():
switch_config["ssh"]["enabled"] = True
switch_config["ssh"]["details"][line_stripped] = True
elif line_stripped.startswith('snmp-agent sys-info contact'):
switch_config["snmp_contact"] = line_stripped.split('contact', 1)[1].strip().strip('"')
elif line_stripped.startswith('ntp-service unicast-server'):
ntp_server = line_stripped.split('unicast-server', 1)[1].strip()
switch_config["ntp_server"].append(ntp_server)
elif line_stripped.startswith('local-user '):
username = line_stripped.split('local-user ', 1)[1].strip()
switch_config["users"].append(username)
elif line_stripped.startswith('vlan '):
vlan_info = line_stripped.split('vlan ', 1)[1].strip()
if "vlans" not in switch_config:
switch_config["vlans"] = []
if 'to' in vlan_info:
parts = vlan_info.split(' to ')
if len(parts) == 2:
start_vlan = int(parts[0])
end_vlan = int(parts[1])
for vlan_id in range(start_vlan, end_vlan + 1):
if vlan_id not in switch_config["vlans"]:
switch_config["vlans"].append(vlan_id)
else:
vlan_id = int(vlan_info)
if vlan_id not in switch_config["vlans"]:
switch_config["vlans"].append(vlan_id)
elif line_stripped.startswith('interface '):
interface_match = re.match(r'^interface\s+(.+)', line_stripped)
if interface_match:
current_interface = interface_match.group(1).strip()
interfaces_config[current_interface] = {
"name": current_interface,
"type": "access",
"access_vlan": None,
"trunk_vlans": [],
"ip_address": None,
"shutdown": False,
"port_channel": None,
"description": None
}
current_section = "interface"
elif current_section == "interface" and current_interface:
if line_stripped != '' and not line.startswith(' ') and not line.startswith('\t'):
if not line_stripped.startswith('interface '):
current_interface = None
current_section = None
continue
else:
interface_match = re.match(r'^interface\s+(.+)', line_stripped)
if interface_match:
current_interface = interface_match.group(1).strip()
interfaces_config[current_interface] = {
"name": current_interface,
"type": "access",
"access_vlan": None,
"trunk_vlans": [],
"ip_address": None,
"shutdown": False,
"port_channel": None,
"description": None
}
elif line.startswith(' ') or line.startswith('\t'):
command = line.strip()
if command.startswith('description '):
interfaces_config[current_interface]["description"] = command.split('description ', 1)[1].strip()
elif command == 'shutdown':
interfaces_config[current_interface]["shutdown"] = True
elif command.startswith('ip address '):
ip_info = command.split('ip address ', 1)[1].strip()
interfaces_config[current_interface]["ip_address"] = ip_info
elif 'link-aggregation' in command.lower() or 'port-group' in command.lower():
pc_match = re.search(r'(\d+)', command)
if pc_match:
interfaces_config[current_interface]["port_channel"] = int(pc_match.group(1))
elif command.startswith('port access vlan '):
vlan_id = int(command.split('port access vlan ', 1)[1].strip())
interfaces_config[current_interface]["type"] = "access"
interfaces_config[current_interface]["access_vlan"] = vlan_id
elif command.startswith('port link-type trunk'):
interfaces_config[current_interface]["type"] = "trunk"
elif command.startswith('port trunk permit vlan '):
interfaces_config[current_interface]["type"] = "trunk"
vlan_info = command.split('port trunk permit vlan ', 1)[1].strip()
if vlan_info == "all":
interfaces_config[current_interface]["trunk_vlans"] = "all"
else:
trunk_vlans = self._parse_vlan_ranges(vlan_info)
interfaces_config[current_interface]["trunk_vlans"].extend(trunk_vlans)
elif command.startswith('undo port trunk permit vlan '):
vlan_info = command.split('undo port trunk permit vlan ', 1)[1].strip()
if vlan_info == "all":
interfaces_config[current_interface]["trunk_vlans"] = []
else:
vlans_to_remove = self._parse_vlan_ranges(vlan_info)
current_trunk = interfaces_config[current_interface]["trunk_vlans"]
if current_trunk != "all":
for vlan in vlans_to_remove:
if vlan in current_trunk:
current_trunk.remove(vlan)
elif command.startswith('port trunk pvid vlan '):
interfaces_config[current_interface]["type"] = "trunk"
pvid = int(command.split('port trunk pvid vlan ', 1)[1].strip())
trunk_vlans = interfaces_config[current_interface]["trunk_vlans"]
if trunk_vlans != "all" and pvid not in trunk_vlans:
trunk_vlans.append(pvid)
i += 1
if isinstance(switch_config["vlans"], list):
switch_config["vlans"] = sorted(list(set(switch_config["vlans"])))
for interface in interfaces_config.values():
if isinstance(interface["trunk_vlans"], list):
interface["trunk_vlans"] = sorted(list(set(interface["trunk_vlans"])))
return interfaces_config, switch_config
class DeviceInfoParser:
"""
Parses 'display device manuinfo' command output to extract MAC addresses.
"""
def parse(self, content: str) -> list[str]:
mac_addresses = []
lines = content.split('\n')
current_slot_cpu = None
for line in lines:
line = line.strip()
if re.match(r'^\s*Slot\s+\d+\s+CPU\s+0\s*:', line, re.I):
current_slot_cpu = line.strip()
continue
if current_slot_cpu and line.startswith('MAC_ADDRESS'):
mac_match = re.search(r'MAC_ADDRESS\s*:\s*([A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4})', line)
if mac_match:
mac_address = mac_match.group(1)
mac_addresses.append(mac_address)
if line.startswith('Slot') and 'CPU' in line and current_slot_cpu:
if line.strip() != current_slot_cpu:
current_slot_cpu = None
return mac_addresses
class LogFileProcessor:
"""
Manages the parsing of a single log file, coordinating different parsers
and merging their results into a unified JSON structure.
"""
def __init__(self):
self.interface_parser = InterfaceParser()
self.link_agg_parser = LinkAggregationParser()
self.config_parser = ConfigurationParser()
self.device_info_parser = DeviceInfoParser()
def _extract_sections(self, file_content: str) -> dict:
"""Extracts different sections from the log file content."""
sections = {}
current_section = None
lines = file_content.splitlines()
section_patterns = {
"display_interface": re.compile(r"^=+\s*display interface\s*=+$", re.I),
"display_link_aggregation_verbose": re.compile(r"^=+\s*display link-aggregation verbose\s*=+$", re.I),
"display_current_configuration": re.compile(r"^=+\s*display current-configuration\s*=+$", re.I),
"display_device_manuinfo": re.compile(r"^=+\s*display device manuinfo\s*=+$", re.I)
}
for line in lines:
line_strip = line.strip()
matched_section = None
for section_name, pattern in section_patterns.items():
if pattern.match(line_strip):
matched_section = section_name
break
if matched_section:
current_section = matched_section
sections[current_section] = []
continue
if re.match(r"^=+$", line_strip):
current_section = None
continue
if current_section:
sections[current_section].append(line)
for key in sections:
sections[key] = "\n".join(sections[key])
return sections
def _extract_switch_name(self, content: str) -> str | None:
"""Extracts the switch name from the log file content."""
sysname_match = re.search(r"(?m)^\s*sysname\s+(.+)$", content)
if sysname_match:
return sysname_match.group(1).strip()
return None
def _merge_configuration_into_interfaces(self, interfaces: dict, interfaces_config: dict):
"""Merges configuration details into the parsed interface data."""
for interface_name, config in interfaces_config.items():
if interface_name in interfaces:
interfaces[interface_name].update({
"type": config["type"],
"access_vlan": config["access_vlan"],
"trunk_vlans": config["trunk_vlans"],
"ip_address": config["ip_address"],
"shutdown": config["shutdown"],
"config_description": config["description"],
})
if config["port_channel"] is not None:
interfaces[interface_name]["port_channel"] = config["port_channel"]
else:
interfaces[interface_name] = {
"name": interface_name,
"current_state": None,
"description": None,
"speed_mode": None,
"pvid": None,
"port_channel": config["port_channel"],
"type": config["type"],
"access_vlan": config["access_vlan"],
"trunk_vlans": config["trunk_vlans"],
"ip_address": config["ip_address"],
"shutdown": config["shutdown"],
"config_description": config["description"],
"nb_liens": None,
"mac_destination": None
}
def _merge_link_aggregation_counts(self, interfaces: dict, link_counts: dict):
"""Merges link aggregation counts into the interface data."""
for interface_name, count in link_counts.items():
if interface_name in interfaces:
interfaces[interface_name]["nb_liens"] = count
def _merge_mac_mappings(self, interfaces: dict, mac_mappings: dict):
"""Merges MAC address mappings into the interface data."""
for interface_name, mac_address in mac_mappings.items():
if interface_name in interfaces:
interfaces[interface_name]["mac_destination"] = mac_address
else:
interfaces[interface_name] = {
"name": interface_name,
"current_state": None,
"description": None,
"speed_mode": None,
"pvid": None,
"port_channel": None,
"type": None,
"access_vlan": None,
"trunk_vlans": [],
"ip_address": None,
"shutdown": False,
"config_description": None,
"nb_liens": None,
"mac_destination": mac_address
}
def _merge_bridge_mappings(self, interfaces: dict, bridge_mappings: dict):
"""Merges bridge aggregation mappings into existing interfaces."""
for interface_name, bridge_name in bridge_mappings.items():
if interface_name in interfaces:
interfaces[interface_name]["bridge_name"] = bridge_name
def process_file(self, filepath: str) -> dict | None:
"""
Processes a single log file to extract and consolidate network device data.
Returns a dictionary containing metadata and parsed data, or None on error.
"""
try:
with open(filepath, 'r', encoding='latin-1', errors='ignore') as f:
content = f.read()
sections = self._extract_sections(content)
if not sections:
return None
mac_addresses = []
if "display_device_manuinfo" in sections:
mac_addresses = self.device_info_parser.parse(sections["display_device_manuinfo"])
result = {
"metadata": {
"filename": os.path.basename(filepath),
"switch_name": self._extract_switch_name(content),
"extraction_date": datetime.now().isoformat(),
"sections_found": list(sections.keys()),
"mac_addresses": mac_addresses
},
"data": {}
}
interfaces = {}
if "display_interface" in sections:
interfaces = self.interface_parser.parse(sections["display_interface"])
if "display_current_configuration" in sections:
try:
interfaces_config, switch_config = self.config_parser.parse(sections["display_current_configuration"])
self._merge_configuration_into_interfaces(interfaces, interfaces_config)
result["data"]["switch_config"] = switch_config
except Exception as e:
print(f" - Error parsing current-configuration for {filepath}: {e}", file=sys.stderr)
result["data"]["switch_config"] = {
"version": None, "lldp": {"enabled": False, "details": {}},
"stp": {"enabled": False, "details": {}}, "ssh": {"enabled": False, "details": {}},
"snmp_contact": None, "ntp_server": [], "users": [], "vlans": []
}
if "display_link_aggregation_verbose" in sections:
try:
link_counts, mac_mappings, bridge_mappings = self.link_agg_parser.parse(sections["display_link_aggregation_verbose"])
self._merge_link_aggregation_counts(interfaces, link_counts)
self._merge_mac_mappings(interfaces, mac_mappings)
self._merge_bridge_mappings(interfaces, bridge_mappings)
except Exception as e:
print(f" - Error parsing link-aggregation verbose for {filepath}: {e}", file=sys.stderr)
result["data"]["interfaces"] = interfaces
return result
except Exception as e:
print(f"Error processing file {filepath}: {e}", file=sys.stderr)
return None
class DataExporter:
"""
Handles the export of processed data to a JSON file.
"""
def export(self, data: dict, output_filepath: str):
try:
output_dir = os.path.dirname(output_filepath)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(output_filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Data saved to: {output_filepath}")
except Exception as e:
print(f"Error saving data to {output_filepath}: {e}", file=sys.stderr)
class MainApplication:
"""
Main application class to orchestrate the log file processing.
"""
def __init__(self):
self.processor = LogFileProcessor()
self.exporter = DataExporter()
def run(self, log_file_pattern: str):
current_dir = os.getcwd()
log_files = glob.glob(os.path.join(current_dir, log_file_pattern))
if not log_files:
print(f"No log files found matching pattern: {log_file_pattern}", file=sys.stderr)
return
results = {}
success_count = 0
for log_file in sorted(log_files):
result = self.processor.process_file(log_file)
if result:
results[os.path.basename(log_file)] = result
success_count += 1
output_file = "./src/data.json"
self.exporter.export(results, output_file)
def process_file_return_json(filepath: str) -> dict | None:
"""
Standalone function to process a single file and return JSON object,
useful for external calls that don't need the full CLI application.
"""
processor = LogFileProcessor()
return processor.process_file(filepath)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python extract_json.py <log_file_pattern>")
else:
app = MainApplication()
app.run(sys.argv[1])

254
src/scripts/format.py Normal file
View File

@@ -0,0 +1,254 @@
import sys
import os
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from contextlib import redirect_stdout
from scripts.extract_json import process_file_return_json
@dataclass
class BridgeInfo:
"""Informations sur un bridge."""
nb_liens: int
speed: str
@classmethod
def from_interface_data(cls, interface_data: Dict[str, Any]) -> 'BridgeInfo':
"""Crée une instance à partir des données d'interface."""
return cls(
nb_liens=interface_data.get("nb_liens", 0),
speed=interface_data.get("speed_mode", "unknown")
)
@dataclass
class InterfaceInfo:
"""Informations sur une interface réseau."""
name: str
mac_destination: Optional[str]
bridge_name: Optional[str]
@classmethod
def from_interface_data(cls, interface_data: Dict[str, Any]) -> 'InterfaceInfo':
"""Crée une instance à partir des données d'interface."""
return cls(
name=interface_data.get("name", ""),
mac_destination=interface_data.get("mac_destination"),
bridge_name=interface_data.get("bridge_name")
)
@dataclass
class DeviceInfo:
"""Informations sur un équipement réseau."""
switch_name: str
mac_addresses: List[str]
interfaces: Dict[str, Any]
@property
def mac_addresses_upper(self) -> List[str]:
"""Retourne les adresses MAC en majuscules."""
return [mac.upper() for mac in self.mac_addresses]
@property
def mac_addresses_str(self) -> str:
"""Retourne les adresses MAC formatées en une chaîne."""
return ','.join(self.mac_addresses_upper)
class LogDataExtractor:
"""Classe pour extraire les données des fichiers de log."""
@staticmethod
def extract_log_data(filename: str) -> Optional[Dict[str, Any]]:
"""Extrait les informations du fichier de log et retourne un dictionnaire."""
try:
data = process_file_return_json(filename)
return data if data else None
except Exception as e:
print(f"Erreur lors du traitement de {filename} : {e}")
sys.exit(1)
@classmethod
def create_device_info(cls, filename: str) -> Optional[DeviceInfo]:
"""Crée un objet DeviceInfo à partir d'un fichier de log."""
data = cls.extract_log_data(filename)
if not data:
return None
metadata = data.get("metadata", {})
return DeviceInfo(
switch_name=metadata.get("switch_name", "unknown"),
mac_addresses=metadata.get("mac_addresses", []),
interfaces=data.get("data", {}).get("interfaces", {})
)
class BridgeAnalyzer:
"""Classe pour analyser les informations de bridge."""
@staticmethod
def extract_bridge_info(interfaces: Dict[str, Any]) -> Dict[str, BridgeInfo]:
"""Extrait les informations de bridge des interfaces."""
bridge_info = {}
for interface_name, interface_data in interfaces.items():
if interface_name.startswith("Bridge-Aggregation"):
bridge_info[interface_name] = BridgeInfo.from_interface_data(interface_data)
return bridge_info
@staticmethod
def get_bridge_details(bridge_info: Dict[str, BridgeInfo], bridge_name: str) -> tuple[str, int]:
"""Récupère les détails d'un bridge spécifique."""
bridge = bridge_info.get(bridge_name, BridgeInfo(0, "unknown"))
return bridge.speed, bridge.nb_liens
class NetworkFormatter:
"""Classe pour formater les sorties réseau."""
@staticmethod
def format_device_output(device_info: DeviceInfo, interface_info: InterfaceInfo,
speed: str, nb_liens: int) -> str:
"""Formate la sortie pour un équipement."""
mac_dest = interface_info.mac_destination.upper() if interface_info.mac_destination else ""
return (f"{device_info.switch_name} [{device_info.mac_addresses_str}] "
f"{interface_info.name} -> {mac_dest} "
f"[{interface_info.bridge_name},{speed},{nb_liens}]")
class CoeurAnalyzer:
"""Classe pour analyser les équipements coeur."""
def __init__(self):
self.mac_coeur = ""
def set_mac_coeur(self, mac_addresses: List[str]) -> None:
"""Met les MAC du coeur dans une variable d'instance."""
self.mac_coeur = ', '.join(mac_addresses)
def analyze(self, filename: str) -> None:
"""Analyse l'extraction json du fichier log du coeur."""
device_info = LogDataExtractor.create_device_info(filename)
if not device_info:
return
# Stocker les MAC du coeur pour les switches
self.set_mac_coeur(device_info.mac_addresses_upper)
bridge_info = BridgeAnalyzer.extract_bridge_info(device_info.interfaces)
for interface_data in device_info.interfaces.values():
interface_info = InterfaceInfo.from_interface_data(interface_data)
if interface_info.mac_destination:
speed, nb_liens = BridgeAnalyzer.get_bridge_details(
bridge_info, interface_info.bridge_name
)
output = NetworkFormatter.format_device_output(
device_info, interface_info, speed, nb_liens
)
print(output)
class SwitchAnalyzer:
"""Classe pour analyser les switches."""
def __init__(self, coeur_analyzer: CoeurAnalyzer):
self.coeur_analyzer = coeur_analyzer
def analyze(self, filename: str) -> None:
"""Analyse l'extraction json du fichier log d'un switch."""
device_info = LogDataExtractor.create_device_info(filename)
if not device_info:
return
bridge_info = BridgeAnalyzer.extract_bridge_info(device_info.interfaces)
for interface_data in device_info.interfaces.values():
interface_info = InterfaceInfo.from_interface_data(interface_data)
if (interface_info.mac_destination and
interface_info.mac_destination.upper() in self.coeur_analyzer.mac_coeur):
speed, nb_liens = BridgeAnalyzer.get_bridge_details(
bridge_info, interface_info.bridge_name
)
output = NetworkFormatter.format_device_output(
device_info, interface_info, speed, nb_liens
)
print(output)
class NetworkAnalysisOrchestrator:
"""Classe principale pour orchestrer l'analyse réseau."""
def __init__(self, base_dir: Path = None):
if base_dir is None:
base_dir = Path(__file__).parent.parent
self.base_dir = Path(base_dir)
self.log_dir = self.base_dir / 'logs'
self.output_file = self.base_dir / 'data.txt'
self.coeur_analyzer = CoeurAnalyzer()
self.switch_analyzer = SwitchAnalyzer(self.coeur_analyzer)
def _get_log_files(self, exclude_filename: str) -> List[Path]:
"""Récupère tous les fichiers de log sauf celui exclu."""
if not self.log_dir.exists():
return []
return [
log_file for log_file in self.log_dir.glob('*.log')
if log_file.name != exclude_filename
]
def run_analysis(self, filename: str) -> None:
"""Lance l'analyse du coeur et des switches à partir d'un fichier log coeur donné."""
coeur_log_path = self.log_dir / filename
if not coeur_log_path.exists():
print(f"Erreur: Le fichier {coeur_log_path} n'existe pas")
return
# Rediriger la sortie vers le fichier
with open(self.output_file, 'w', encoding='utf-8') as f:
with redirect_stdout(f):
print("Coeur:")
self.coeur_analyzer.analyze(str(coeur_log_path))
print("Switches:")
switch_files = self._get_log_files(filename)
for switch_file in switch_files:
self.switch_analyzer.analyze(str(switch_file))
class FormatMain:
"""Classe principale pour le module format."""
def __init__(self, base_dir: Path = None):
self.orchestrator = NetworkAnalysisOrchestrator(base_dir)
def run_analysis(self, filename: str) -> None:
"""Point d'entrée public pour l'analyse."""
self.orchestrator.run_analysis(filename)
def main(filename: str = None) -> None:
"""Fonction principale."""
if filename is None:
if len(sys.argv) != 2:
print("Usage: python format.py <filename>")
sys.exit(1)
filename = sys.argv[1]
format_main = FormatMain()
format_main.run_analysis(filename)
if __name__ == "__main__":
main()

358
src/scripts/mermaid.py Normal file
View File

@@ -0,0 +1,358 @@
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import List, Optional, Tuple
from dataclasses import dataclass
from scripts.format import FormatMain
@dataclass
class NetworkDevice:
"""Classe de base pour représenter un équipement réseau."""
name: str
macs: List[str]
interface: str
dest_mac: str
bridge: str
vitesse: str
nb_liens: str
def __post_init__(self):
"""Validation et nettoyage des données après initialisation."""
if isinstance(self.macs, str):
self.macs = [mac.strip() for mac in self.macs.split(',')]
self.bridge = self.bridge.strip()
self.vitesse = self.vitesse.strip()
self.nb_liens = self.nb_liens.strip()
@property
def macs_str(self) -> str:
"""Retourne les MACs sous forme de chaîne formatée."""
return ','.join(self.macs)
def to_formatted_string(self) -> str:
"""Retourne une représentation formatée de l'équipement."""
return f"{self.name} [{self.macs_str}] {self.interface} -> {self.dest_mac} [{self.bridge},{self.vitesse},{self.nb_liens}]"
@dataclass
class Coeur(NetworkDevice):
"""Classe représentant un équipement coeur."""
pass
@dataclass
class Switch(NetworkDevice):
"""Classe représentant un switch."""
pass
class NetworkFileParser:
"""Classe pour parser les fichiers de configuration réseau."""
ENCODINGS = ['utf-8', 'utf-16', 'utf-16-le', 'utf-16-be', 'latin-1', 'cp1252', 'iso-8859-1']
PATTERNS = [
r'(\S+)\s+\[([^\]]+)\]\s+(\S+)\s+->\s+([A-Fa-f0-9:-]+)\s*\[([^,]+),([^,]+),([^\]]+)\]',
r'(\S+)\s+\[([^\]]+)\]\s+(\S+)\s*->\s*([A-Fa-f0-9:-]+)\s*\[([^,]+),([^,]+),([^\]]+)\]',
r'(\w+)\s*\[([^\]]+)\]\s*(\w+)\s*->\s*([A-Fa-f0-9:-]+)\s*\[([^,]+),([^,]+),([^\]]+)\]'
]
def __init__(self, file_path: str):
self.file_path = Path(file_path)
def _read_file_with_encoding(self) -> Optional[str]:
"""Lit le fichier en essayant différents encodages."""
for encoding in self.ENCODINGS:
try:
return self.file_path.read_text(encoding=encoding)
except (UnicodeDecodeError, UnicodeError):
continue
except FileNotFoundError:
return None
return None
def _parse_line(self, line: str, device_class) -> Optional[NetworkDevice]:
"""Parse une ligne et retourne un objet NetworkDevice."""
line = line.strip()
if not line:
return None
for pattern in self.PATTERNS:
match = re.match(pattern, line)
if match:
return device_class(
name=match.group(1),
macs=match.group(2),
interface=match.group(3),
dest_mac=match.group(4),
bridge=match.group(5),
vitesse=match.group(6),
nb_liens=match.group(7)
)
print(f" -> Aucun pattern ne correspond à cette ligne: {line}")
return None
def parse(self) -> Tuple[List[Coeur], List[Switch]]:
"""Parse le fichier et retourne les coeurs et switches."""
content = self._read_file_with_encoding()
if content is None:
return [], []
sections = content.split('Switches:')
if len(sections) < 2:
return [], []
coeur_section = sections[0].replace('Coeur:', '').strip()
switches_section = sections[1].strip()
coeurs = []
for line in coeur_section.split('\n'):
coeur = self._parse_line(line, Coeur)
if coeur:
coeurs.append(coeur)
switches = []
for line in switches_section.split('\n'):
switch = self._parse_line(line, Switch)
if switch:
switches.append(switch)
return coeurs, switches
class NetworkDeviceGrouper:
"""Classe pour grouper les équipements réseau."""
@staticmethod
def group_coeurs_by_dest_mac(coeurs: List[Coeur]) -> List[Coeur]:
"""Groupe les coeurs par adresse MAC de destination."""
grouped = defaultdict(list)
for coeur in coeurs:
key = (coeur.dest_mac, coeur.name, coeur.macs_str,
coeur.bridge, coeur.vitesse, coeur.nb_liens)
grouped[key].append(coeur)
result = []
for key, group in grouped.items():
dest_mac, name, macs_str, bridge, vitesse, nb_liens = key
interfaces = [item.interface for item in group]
result.append(Coeur(
name=name,
macs=macs_str.split(','),
interface='-'.join(interfaces),
dest_mac=dest_mac,
bridge=bridge,
vitesse=vitesse,
nb_liens=nb_liens
))
return result
@staticmethod
def group_switches_by_local_mac(switches: List[Switch]) -> List[Switch]:
"""Groupe les switches par adresse MAC locale."""
grouped = defaultdict(list)
for switch in switches:
key = (switch.name, switch.macs_str, switch.dest_mac,
switch.bridge, switch.vitesse, switch.nb_liens)
grouped[key].append(switch)
result = []
for key, group in grouped.items():
name, macs_str, dest_mac, bridge, vitesse, nb_liens = key
interfaces = [item.interface for item in group]
result.append(Switch(
name=name,
macs=macs_str.split(','),
interface='-'.join(interfaces),
dest_mac=dest_mac,
bridge=bridge,
vitesse=vitesse,
nb_liens=nb_liens
))
return result
class MermaidDiagramGenerator:
"""Classe pour générer les diagrammes Mermaid."""
def __init__(self, coeurs: List[Coeur], switches: List[Switch]):
self.coeurs = coeurs
self.switches = switches
def _find_matching_switch(self, coeur: Coeur) -> Optional[Switch]:
"""Trouve le switch correspondant à un coeur."""
for switch in self.switches:
if coeur.dest_mac in switch.macs:
return switch
return None
def _format_bridge_label(self, bridge: str, vitesse: str) -> str:
"""Formate le label du bridge."""
bridge_formatted = bridge.replace('Bridge-Aggregation', 'BAGG')
return f"{bridge_formatted}<br/>{vitesse}"
def generate_links_diagram(self) -> str:
"""Génère un diagramme Mermaid des liaisons."""
if not self.coeurs or not self.switches:
return "Aucune donnée à afficher"
mermaid_lines = ["graph LR"]
total_devices = min(len(self.coeurs), len(self.switches))
for i, coeur in enumerate(self.coeurs):
target_switch = self._find_matching_switch(coeur)
label = self._format_bridge_label(coeur.bridge, coeur.vitesse)
if target_switch:
if i < total_devices // 2:
line = f' Coeur(("{coeur.name}")) <-->|{label}| {target_switch.name}'
else:
line = f' {target_switch.name} <-->|{label}| Coeur(("{coeur.name}"))'
mermaid_lines.append(line)
else:
line = f' Coeur(("{coeur.name}")) <-->|{label}| {coeur.dest_mac}["📄manquant {coeur.dest_mac}"]'
mermaid_lines.append(line)
mermaid_lines.append(f' class {coeur.dest_mac} error;')
return '\n'.join(mermaid_lines)
class FileWriter:
"""Classe pour l'écriture de fichiers."""
@staticmethod
def write_mermaid_diagram(mermaid_code: str, output_path: Path) -> None:
"""Sauvegarde le diagramme Mermaid."""
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
content = [
"# Diagramme des liaisons Coeur-Switch\n",
"```mermaid\n",
"---\n",
"config:\n",
" theme: 'base'\n",
" themeVariables:\n",
" primaryColor: '#25bb75ff'\n",
" primaryTextColor: '#ffffff'\n",
" primaryBorderColor: '#000000ff'\n",
" lineColor: '#f82929ff'\n",
" secondaryColor: '#5e0c1aff'\n",
" tertiaryColor: '#ffffff'\n",
"---\n",
mermaid_code,
"\nclass Coeur coeur;\n",
"classDef coeur fill:#0590e6;\n",
"classDef error fill:#e64c05;\n",
"\n```\n"
]
output_path.write_text(''.join(content), encoding='utf-8')
print(f"✅ Diagramme Mermaid généré : {output_path}")
except Exception as e:
print(f"Erreur lors de la sauvegarde: {e}")
@staticmethod
def write_grouped_file(coeurs: List[Coeur], switches: List[Switch], output_path: Path) -> None:
"""Écrit les équipements groupés dans un fichier."""
try:
lines = ["Coeur:\n"]
lines.extend(f"{coeur.to_formatted_string()}\n" for coeur in coeurs)
lines.append("Switches:\n")
lines.extend(f"{switch.to_formatted_string()}\n" for switch in switches)
output_path.write_text(''.join(lines), encoding='utf-8')
except Exception as e:
print(f"Erreur lors de la sauvegarde: {e}")
@staticmethod
def write_links_file(coeurs: List[Coeur], switches: List[Switch], output_path: Path) -> None:
"""Écrit les liens entre coeurs et switches."""
try:
lines = []
for coeur in coeurs:
matching_switch = None
for switch in switches:
if coeur.dest_mac in switch.macs:
matching_switch = switch
break
if matching_switch:
coeur_info = f"{coeur.name} [{coeur.macs_str}] {coeur.interface} [{coeur.bridge},{coeur.vitesse},{coeur.nb_liens}]"
switch_info = f"{matching_switch.name} [{matching_switch.macs_str}] {matching_switch.interface} [{matching_switch.bridge},{matching_switch.vitesse},{matching_switch.nb_liens}]"
lines.append(f"{coeur_info} -> {switch_info}\n")
else:
lines.append(f"{coeur.name} [{coeur.macs_str}] {coeur.interface} -> Aucune correspondance de switch pour MAC {coeur.dest_mac}\n")
output_path.write_text(''.join(lines), encoding='utf-8')
except Exception as e:
print(f"Erreur lors de l'écriture des liens: {e}")
class NetworkAnalyzer:
"""Classe principale pour l'analyse réseau."""
def __init__(self, base_dir: Path = None):
if base_dir is None:
base_dir = Path(__file__).parent.parent
self.base_dir = Path(base_dir)
self.data_file = self.base_dir / 'data.txt'
self.output_dir = self.base_dir / 'output'
self.mermaid_file = self.output_dir / 'mermaid.md'
def analyze(self, filename: str) -> None:
"""Analyse complète du réseau."""
# Exécute l'analyse initiale
FormatMain().run_analysis(filename)
# Parse le fichier
parser = NetworkFileParser(self.data_file)
coeurs, switches = parser.parse()
if not coeurs and not switches:
print("Impossible de lire le fichier ou format incorrect")
return
# Groupe les équipements
grouper = NetworkDeviceGrouper()
grouped_coeurs = grouper.group_coeurs_by_dest_mac(coeurs)
grouped_switches = grouper.group_switches_by_local_mac(switches)
# Écrit les fichiers de sortie
writer = FileWriter()
writer.write_grouped_file(grouped_coeurs, grouped_switches, self.data_file)
writer.write_links_file(grouped_coeurs, grouped_switches, self.data_file)
# Génère le diagramme Mermaid
diagram_generator = MermaidDiagramGenerator(grouped_coeurs, grouped_switches)
mermaid_code = diagram_generator.generate_links_diagram()
writer.write_mermaid_diagram(mermaid_code, self.mermaid_file)
def main():
"""Fonction principale."""
if len(sys.argv) != 2:
print("Usage: python mermaid.py <filename>")
sys.exit(1)
analyzer = NetworkAnalyzer()
analyzer.analyze(filename=sys.argv[1])
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,448 @@
import os
import re
import sys
import json
import logging
from typing import Dict, List, Optional, Union, Any
from dataclasses import dataclass
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook
from scripts.extract_json import process_file_return_json
mac_coeur=[]
@dataclass
class Config:
"""Configuration centralisée pour l'application."""
LOGS_DIR: str = './src/logs'
OUTPUT_DIR: str = './src/output'
DATA_JSON_PATH: str = './src/data.json'
OUTPUT_FILE_NAME: str = 'uplink_report.xlsx'
@property
def output_file_path(self) -> str:
return os.path.join(self.OUTPUT_DIR, self.OUTPUT_FILE_NAME)
@dataclass
class InterfaceResult:
"""Résultat d'analyse d'une interface."""
switch: str
logfile: str
interface: str
status: str
speed_gbps: Optional[Union[int, float, str]]
port_channel: Optional[str]
description: str
issues: str
mac_local: Optional[List[str]]
mac_destination: Optional[str]
bridge_name: Optional[str]
bridge_speed: Optional[Union[int, float, str]]
nb_liens: Optional[int]
@dataclass
class LCInterfaceResult:
"""Résultat d'analyse d'une interface LC."""
switch: str
interface: str
status: str
speed_gbps: Optional[Union[int, float, str]]
port_channel: Optional[str]
description: str
type_lien: str
nb_liens: Optional[int]
bridge_utilise: Optional[str]
mac_destination: Optional[str]
switch_destination: str
class InterfaceAnalyzer:
"""Analyseur d'interfaces réseau."""
CORE_INTERLINK_PATTERN = re.compile(r'/0/(49|50)$')
SPEED_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*Gbps", re.I)
SHORT_NAME_PATTERN = re.compile(r'(\d+/\d+/\d+)$')
INTERCORE_PATTERN = re.compile(r'[1-4]/0/4[3-8]$')
CORE_INTERFACE_PATTERN = re.compile(r'/0/(4[89]|50)$')
@classmethod
def is_core_interlink(cls, interface_name: str) -> bool:
"""Détermine si l'interface est un lien inter-cœur."""
return cls.CORE_INTERLINK_PATTERN.search(interface_name) is not None
@classmethod
def is_intercore_link(cls, interface_name: str) -> bool:
"""Détermine si l'interface est un lien inter-cœur (LC)."""
return cls.INTERCORE_PATTERN.search(interface_name) is not None
@classmethod
def guess_speed(cls, speed_mode: Optional[str], interface_name: Optional[str] = None) -> Optional[Union[int, float, str]]:
"""Devine la vitesse d'une interface."""
if speed_mode:
match = cls.SPEED_PATTERN.match(speed_mode)
if match:
return float(match.group(1))
if interface_name and cls.CORE_INTERFACE_PATTERN.search(interface_name):
if "Ten-GigabitEthernet" in interface_name:
return 10
elif "GigabitEthernet" in interface_name:
return 1
elif "HundredGigE" in interface_name:
return 100
elif "M-GigabitEthernet" in interface_name:
return "?"
return None
@classmethod
def extract_short_name(cls, interface_name: str) -> str:
"""Extrait le nom court d'une interface."""
match = cls.SHORT_NAME_PATTERN.search(interface_name)
return match.group(1) if match else interface_name
@classmethod
def identify_issues(cls, info: Dict[str, Any], speed: Optional[Union[int, float, str]]) -> List[str]:
"""Identifie les problèmes potentiels d'une interface."""
issues = []
if info.get("current_state") is not None:
if info.get("current_state", "").upper() != "UP":
issues.append("DOWN")
if isinstance(speed, (int, float)) and speed < 10:
issues.append(f"Poss. goulot ({speed} Gbps)")
if info.get("port_channel"):
issues.append(f"Membre Port-Channel {info['port_channel']}")
return issues
class SwitchLogAnalyzer:
"""Analyseur de logs de switchs."""
def __init__(self, config: Config):
self.config = config
self.analyzer = InterfaceAnalyzer()
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""Configure le logger."""
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def analyze_switch_log(
self,
filename: str,
switch_name: str,
interfaces: Dict[str, Any],
mac_addresses: Optional[List[str]] = None,
is_core: bool = False,
filter_mac_destination: bool = True
) -> List[InterfaceResult]:
"""Analyse les interfaces d'un switch."""
results = []
for iface_name, info in interfaces.items():
if not self._should_process_interface(info, filter_mac_destination, is_core, iface_name):
continue
result = self._create_interface_result(
filename, switch_name, iface_name, info, interfaces, mac_addresses
)
results.append(result)
return results
def _should_process_interface(
self,
info: Dict[str, Any],
filter_mac_destination: bool,
is_core: bool,
iface_name: str
) -> bool:
"""Détermine si une interface doit être traitée."""
global mac_coeur
if filter_mac_destination and info.get("mac_destination") not in mac_coeur:
return False
if is_core and not self.analyzer.is_core_interlink(iface_name):
return False
return True
def _create_interface_result(
self,
filename: str,
switch_name: str,
iface_name: str,
info: Dict[str, Any],
interfaces: Dict[str, Any],
mac_addresses: Optional[List[str]]
) -> InterfaceResult:
"""Crée un résultat d'interface."""
mac_destination = info.get("mac_destination")
bridge_name = info.get("bridge_name")
bridge_info = interfaces.get(bridge_name, {}) if bridge_name else {}
speed = self.analyzer.guess_speed(info.get("speed_mode"), iface_name)
bridge_speed = self.analyzer.guess_speed(bridge_info.get("speed_mode"), bridge_name)
issues = self.analyzer.identify_issues(info, speed)
description = info.get("description") or info.get("config_description") or ""
return InterfaceResult(
switch=switch_name,
logfile=filename,
interface=iface_name,
status=info.get("current_state", "unknown"),
speed_gbps=speed,
port_channel=info.get("port_channel"),
description=description,
issues=", ".join(issues) if issues else "RAS",
mac_local=mac_addresses,
mac_destination=mac_destination,
bridge_name=bridge_name,
bridge_speed=bridge_speed,
nb_liens=bridge_info.get("nb_liens")
)
def analyze_lc_log(self, filename: str, switch_name: str, interfaces: Dict[str, Any]) -> List[LCInterfaceResult]:
"""Analyse les interfaces LC."""
results = []
for iface_name, info in interfaces.items():
if not info or not isinstance(info, dict):
continue
result = self._create_lc_interface_result(filename, switch_name, iface_name, info)
results.append(result)
return results
def _create_lc_interface_result(
self,
filename: str,
switch_name: str,
iface_name: str,
info: Dict[str, Any]
) -> LCInterfaceResult:
"""Crée un résultat d'interface LC."""
speed = self.analyzer.guess_speed(info.get("speed_mode"), iface_name)
type_lien = "Lien inter-coeur" if self.analyzer.is_intercore_link(iface_name) else "Vers Accès"
description = info.get("description") or info.get("config_description") or ""
nb_liens = info.get("nb_liens") if iface_name.startswith("Bridge-Aggregation") else None
bridge_utilise = None if iface_name.startswith("Bridge-Aggregation") else info.get("bridge_name")
mac_destination = None if iface_name.startswith("Bridge-Aggregation") else info.get("mac_destination")
return LCInterfaceResult(
switch=switch_name,
interface=iface_name,
status=info.get("current_state", "unknown"),
speed_gbps=speed,
port_channel=info.get("port_channel"),
description=description,
type_lien=type_lien,
nb_liens=nb_liens,
bridge_utilise=bridge_utilise,
mac_destination=mac_destination,
switch_destination=""
)
class ExcelReportGenerator:
"""Générateur de rapports Excel."""
def __init__(self, config: Config):
self.config = config
self.logger = logging.getLogger(__name__)
def generate_report(self, uplink_results: List[InterfaceResult], lc_results: List[LCInterfaceResult]) -> None:
"""Génère le rapport Excel."""
df_uplink = self._create_uplink_dataframe(uplink_results)
df_lc = self._create_lc_dataframe(lc_results)
os.makedirs(self.config.OUTPUT_DIR, exist_ok=True)
self._write_excel_file(df_uplink, df_lc)
self._format_excel_file()
print(f"✅ Rapport Excel généré : {self.config.output_file_path}")
def _create_uplink_dataframe(self, results: List[InterfaceResult]) -> pd.DataFrame:
"""Crée le DataFrame pour les uplinks."""
data = []
for result in results:
data.append({
"switch": result.switch,
"logfile": result.logfile,
"interface": result.interface,
"status": result.status,
"speed_gbps": result.speed_gbps,
"port_channel": result.port_channel,
"description": result.description,
"issues": result.issues,
"mac_local": result.mac_local,
"mac_destination": result.mac_destination,
"bridge_name": result.bridge_name,
"bridge_speed": result.bridge_speed,
"nb_liens": result.nb_liens
})
return pd.DataFrame(data)
def _create_lc_dataframe(self, results: List[LCInterfaceResult]) -> pd.DataFrame:
"""Crée le DataFrame pour les interfaces LC."""
data = []
for result in results:
data.append({
"Switch": result.switch,
"Interface": result.interface,
"Status": result.status,
"Vitesse (Gbps)": result.speed_gbps,
"Port-Channel": result.port_channel,
"Description": result.description,
"Type de lien": result.type_lien,
"Nb liens": result.nb_liens,
"Bridge utilisé": result.bridge_utilise,
"MAC destination": result.mac_destination,
"Switch destination": result.switch_destination
})
return pd.DataFrame(data)
def _write_excel_file(self, df_uplink: pd.DataFrame, df_lc: pd.DataFrame) -> None:
"""Écrit le fichier Excel."""
with pd.ExcelWriter(self.config.output_file_path, engine='openpyxl') as writer:
df_uplink.to_excel(writer, index=False, sheet_name="Uplinks")
df_lc.to_excel(writer, index=False, sheet_name="LC_Interfaces")
def _format_excel_file(self) -> None:
"""Met en forme le fichier Excel."""
wb = load_workbook(self.config.output_file_path)
self._add_formulas(wb)
self._format_columns(wb)
wb.save(self.config.output_file_path)
def _add_formulas(self, workbook) -> None:
"""Ajoute les formules Excel."""
ws = workbook["LC_Interfaces"]
for row in range(2, 400):
formula = (
f'=IF(J{row}="","",IFERROR(INDEX(Uplinks!A$2:A$400,'
f'MATCH(TRUE,ISNUMBER(SEARCH(J{row},Uplinks!I$2:I$400)),0)),""))'
)
ws[f'K{row}'] = formula
def _format_columns(self, workbook) -> None:
"""Met en forme les colonnes."""
column_widths = {
"LC_Interfaces": [15, 25, 25, 15, 12, 45, 15, 10, 22, 20, 20],
"Uplinks": [15, 30, 25, 7, 12, 12, 30, 22, 47, 15, 20, 13, 10]
}
for sheet_name, widths in column_widths.items():
ws = workbook[sheet_name]
for col_idx, width in enumerate(widths, start=1):
ws.column_dimensions[chr(64 + col_idx)].width = width
class UplinkReportGenerator:
"""Générateur principal de rapports d'uplink."""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.analyzer = SwitchLogAnalyzer(self.config)
self.report_generator = ExcelReportGenerator(self.config)
self.logger = logging.getLogger(__name__)
def generate_report(self, lc_filename_prefix: str) -> None:
"""Génère le rapport complet."""
try:
uplink_results, lc_results = self._process_log_files(lc_filename_prefix)
self.report_generator.generate_report(uplink_results, lc_results)
except Exception as e:
self.logger.error(f"Erreur lors de la génération du rapport : {e}")
raise
def _process_log_files(self, lc_filename_prefix: str) -> tuple[List[InterfaceResult], List[LCInterfaceResult]]:
"""Traite tous les fichiers de logs."""
global mac_coeur
uplink_results = []
lc_results = []
log_files = self._get_log_files()
for filename in log_files:
try:
data = self._process_single_log_file(filename)
if not data:
continue
interfaces = data.get("data", {}).get("interfaces", {})
switch_name = data.get("metadata", {}).get("switch_name", "unknown")
if filename.startswith(lc_filename_prefix):
mac_coeur = data.get("metadata", {}).get("mac_addresses", [])
mac_coeur = [mac.lower() for mac in mac_coeur]
lc_results.extend(self.analyzer.analyze_lc_log(filename, switch_name, interfaces))
else:
mac_addresses = data.get("metadata", {}).get("mac_addresses", [])
uplink_results.extend(
self.analyzer.analyze_switch_log(
filename, switch_name, interfaces,
mac_addresses=mac_addresses,
filter_mac_destination=True
)
)
except Exception as e:
self.logger.error(f"Erreur lors du traitement de {filename}: {e}")
continue
return uplink_results, lc_results
def _get_log_files(self) -> List[str]:
"""Récupère la liste des fichiers de logs."""
if not os.path.exists(self.config.LOGS_DIR):
raise FileNotFoundError(f"Le répertoire {self.config.LOGS_DIR} n'existe pas")
return [f for f in os.listdir(self.config.LOGS_DIR) if f.endswith('.log')]
def _process_single_log_file(self, filename: str) -> Optional[Dict[str, Any]]:
"""Traite un fichier de log individuel."""
filepath = os.path.join(self.config.LOGS_DIR, filename)
return process_file_return_json(filepath)
def main():
"""Fonction principale."""
if len(sys.argv) < 2:
print("Usage: python parse_uplinks.py <fichier_log_coeur> (sans chemin)")
sys.exit(1)
lc_filename_prefix = sys.argv[1]
try:
generator = UplinkReportGenerator()
generator.generate_report(lc_filename_prefix)
except Exception as e:
print(f"❌ Erreur : {e}")
sys.exit(1)
if __name__ == "__main__":
main()