# models/hpe5130.py from models.base_switch import Switch, normalize_port_name from models.base_switch import Fan, PowerSupply, MACEntry, Port, VLAN import re class SwitchHPE5130(Switch): def parse_block(self, section: str, content: str): section = section.strip().lower() if section == "version": self._parse_display_version(content) elif section == "fan": self._parse_display_fan(content) elif section == "power": self._parse_display_power(content) elif section == "mac-address": self._parse_display_mac_address(content) elif section == "interface": self._parse_display_interface(content) elif section == "current-configuration": self._parse_display_current_config(content) def _parse_display_version(self, content: str): lines = content.splitlines() for line in lines: if "Comware Software" in line: match = re.search(r"Version ([^,]+, Release [^\s]+)", line) if match: self.firmware_version = match.group(1).strip() elif "uptime is" in line: match = re.search(r"uptime is (.+)$", line) if match: self.uptime = match.group(1).strip() elif "Boot image:" in line: match = re.search(r"(flash:.+\.bin)", line) if match: self.boot_image = match.group(1) elif "Release Version:" in line: self.model = line.replace("Release Version:", "").strip() parts = [] if self.model: parts.append(self.model) if self.firmware_version: parts.append(self.firmware_version) if self.uptime: parts.append(f"uptime: {self.uptime}") self.summary = ", ".join(parts) def _parse_display_fan(self, content: str): fan_id = None for line in content.splitlines(): match_id = re.search(r"Fan\s+(\d+):", line) match_state = re.search(r"State\s*:\s*(\w+)", line) if match_id: fan_id = int(match_id.group(1)) elif match_state and fan_id is not None: state = match_state.group(1) self.fans.append(Fan(fan_id, state)) fan_id = None def _parse_display_power(self, content: str): lines = content.splitlines() for line in lines: match = re.match(r"\s*(\d+)\s+(\w+)\s+(AC|DC)\s+", line) if match: psu_id = int(match.group(1)) state = match.group(2) mode = match.group(3) self.power_supplies.append(PowerSupply(psu_id, state, mode)) def _parse_display_mac_address(self, content: str): for line in content.splitlines(): match = re.match(r"([0-9a-fA-F\-]{14})\s+(\d+)\s+\w+\s+(\S+)", line) if match: mac = match.group(1).replace("-", ":").lower() vlan = int(match.group(2)) port = match.group(3) mac_entry = MACEntry(mac, vlan, port, type_="dynamic") self.mac_table.append(mac_entry) # 🔁 Ajout dans le bon port (nom normalisé) normalized_port = normalize_port_name(port) port_obj = next((p for p in self.ports if p.name.upper() == normalized_port), None) if port_obj: port_obj.mac_addresses.append(mac) def _parse_display_interface(self, content: str): pattern = re.compile(r"^(Bridge-Aggregation\d+|\S*Ethernet\d+/\d+/\d+|InLoopBack\d+|NULL\d+)$", re.MULTILINE) matches = list(pattern.finditer(content)) for idx, match in enumerate(matches): start = match.start() end = matches[idx + 1].start() if idx + 1 < len(matches) else len(content) block = content[start:end].strip() lines = block.splitlines() if not lines: continue name = lines[0].strip() port = next((p for p in self.ports if p.name == name), None) if not port: port = Port(name) self.ports.append(port) for line in lines: if "Current state:" in line: port.status = line.split(":")[-1].strip().lower() elif "Bandwidth:" in line: match_speed = re.search(r"Bandwidth:\s+(\d+) kbps", line) if match_speed: kbps = int(match_speed.group(1)) if kbps >= 1_000_000: port.speed = f"{kbps / 1_000_000:.1f} Gbps" elif kbps >= 1_000: port.speed = f"{kbps / 1_000:.1f} Mbps" else: port.speed = f"{kbps} kbps" elif "PVID:" in line: match = re.search(r"PVID:\s+(\d+)", line) if match: port.vlan = int(match.group(1)) port.summary = f"Port {port.name} est {port.status.upper()} à {port.speed or 'vitesse inconnue'}, VLAN {port.vlan or '?'}" def _apply_interface_config(self, ifname: str, lines: list): ifname = ifname.strip() port = next((p for p in self.ports if p.name == ifname), None) if not port: port = Port(ifname) self.ports.append(port) is_trunk = False permit_vlans = [] for line in lines: line = line.strip() match line: case l if l.startswith("port access vlan"): if m := re.search(r"port access vlan (\d+)", l): port.vlan = int(m.group(1)) port.link_type = "access" case l if l.startswith("pvid"): if m := re.search(r"pvid (\d+)", l): port.vlan = int(m.group(1)) case l if l.startswith("port link-type trunk"): is_trunk = True port.link_type = "trunk" case l if l.startswith("undo port trunk permit vlan"): continue # ignorer case l if l.startswith("port trunk permit vlan"): vlan_groups = re.findall(r"\d+(?:\s+to\s+\d+)?", l) for item in vlan_groups: parts = item.strip().split() if len(parts) == 3 and parts[1] == "to": start, end = int(parts[0]), int(parts[2]) permit_vlans.extend(range(start, end + 1)) elif len(parts) == 1: permit_vlans.append(int(parts[0])) case l if "stp edged-port" in l: port.portfast = True case l if "bpdu" in l: port.bpdu_protection = True case l if "loopback-detection enable" in l: port.loopback_detection = True if is_trunk and permit_vlans: port.trunk_vlans = sorted(set(permit_vlans)) if port.link_type == "trunk" and hasattr(port, "trunk_vlans"): vlan_info = f"TRUNK ({len(port.trunk_vlans)} VLANs)" else: vlan_info = f"VLAN {port.vlan or '?'}" status = port.status.upper() if port.status else "INCONNU" speed = port.speed or "vitesse inconnue" port.summary = f"Port {port.name} est {status} à {speed}, {vlan_info}" def _parse_display_current_config(self, content: str): current_interface = None block_lines = [] vlan_id = None for line in content.splitlines(): line = line.strip() match line: case l if l.startswith("sysname"): parts = l.split() if len(parts) >= 2: self.hostname = parts[1] case l if l.startswith("vlan") and re.match(r"vlan \d+", l): vlan_id = int(l.split()[1]) case l if l.startswith("name") and vlan_id is not None: name = l.split("name")[-1].strip() self.vlans.append(VLAN(vlan_id, name)) vlan_id = None case l if l.startswith("irf-port"): continue case l if "port group interface" in l: match = re.search(r"interface (\S+)", l) if match: portname = match.group(1) port = next((p for p in self.ports if p.name == portname), None) if not port: port = Port(portname) self.ports.append(port) port.link_type = "irf" port.summary = f"Port {port.name} est utilisé pour un lien IRF" case l if l.startswith("interface"): if current_interface and block_lines: self._apply_interface_config(current_interface, block_lines) current_interface = l.split()[1] block_lines = [] case _ if current_interface: block_lines.append(line) if current_interface and block_lines: self._apply_interface_config(current_interface, block_lines)