First commit
This commit is contained in:
223
models/hpe5130.py
Normal file
223
models/hpe5130.py
Normal file
@@ -0,0 +1,223 @@
|
||||
# models/hpe5130.py
|
||||
from models.base_switch import Switch
|
||||
from models.base_switch import Fan, PowerSupply, MACEntry, Port, VLAN
|
||||
import re
|
||||
|
||||
|
||||
class SwitchHPE5130(Switch):
|
||||
def parse_block(self, section: str, content: str):
|
||||
section = section.strip().lower()
|
||||
if section == "version":
|
||||
self._parse_display_version(content)
|
||||
elif section == "fan":
|
||||
self._parse_display_fan(content)
|
||||
elif section == "power":
|
||||
self._parse_display_power(content)
|
||||
elif section == "mac-address":
|
||||
self._parse_display_mac_address(content)
|
||||
elif section == "interface":
|
||||
self._parse_display_interface(content)
|
||||
elif section == "current-configuration":
|
||||
self._parse_display_current_config(content)
|
||||
|
||||
def _parse_display_version(self, content: str):
|
||||
lines = content.splitlines()
|
||||
|
||||
for line in lines:
|
||||
if "Comware Software" in line:
|
||||
match = re.search(r"Version ([^,]+, Release [^\s]+)", line)
|
||||
if match:
|
||||
self.firmware_version = match.group(1).strip()
|
||||
elif "uptime is" in line:
|
||||
match = re.search(r"uptime is (.+)$", line)
|
||||
if match:
|
||||
self.uptime = match.group(1).strip()
|
||||
elif "Boot image:" in line:
|
||||
match = re.search(r"(flash:.+\.bin)", line)
|
||||
if match:
|
||||
self.boot_image = match.group(1)
|
||||
elif "Release Version:" in line:
|
||||
self.model = line.replace("Release Version:", "").strip()
|
||||
|
||||
parts = []
|
||||
if self.model:
|
||||
parts.append(self.model)
|
||||
if self.firmware_version:
|
||||
parts.append(self.firmware_version)
|
||||
if self.uptime:
|
||||
parts.append(f"uptime: {self.uptime}")
|
||||
|
||||
self.summary = ", ".join(parts)
|
||||
|
||||
def _parse_display_fan(self, content: str):
|
||||
fan_id = None
|
||||
for line in content.splitlines():
|
||||
match_id = re.search(r"Fan\s+(\d+):", line)
|
||||
match_state = re.search(r"State\s*:\s*(\w+)", line)
|
||||
if match_id:
|
||||
fan_id = int(match_id.group(1))
|
||||
elif match_state and fan_id is not None:
|
||||
state = match_state.group(1)
|
||||
self.fans.append(Fan(fan_id, state))
|
||||
fan_id = None
|
||||
|
||||
def _parse_display_power(self, content: str):
|
||||
lines = content.splitlines()
|
||||
for line in lines:
|
||||
match = re.match(r"\s*(\d+)\s+(\w+)\s+(AC|DC)\s+", line)
|
||||
if match:
|
||||
psu_id = int(match.group(1))
|
||||
state = match.group(2)
|
||||
mode = match.group(3)
|
||||
self.power_supplies.append(PowerSupply(psu_id, state, mode))
|
||||
|
||||
def _parse_display_mac_address(self, content: str):
|
||||
for line in content.splitlines():
|
||||
match = re.match(r"([0-9a-fA-F\-]{14})\s+(\d+)\s+\w+\s+(\S+)", line)
|
||||
if match:
|
||||
mac = match.group(1).replace("-", ":").lower()
|
||||
vlan = int(match.group(2))
|
||||
port = match.group(3)
|
||||
self.mac_table.append(MACEntry(mac, vlan, port, type_="dynamic"))
|
||||
|
||||
def _parse_display_interface(self, content: str):
|
||||
pattern = re.compile(r"^(Bridge-Aggregation\d+|\S*Ethernet\d+/\d+/\d+|InLoopBack\d+|NULL\d+)$", re.MULTILINE)
|
||||
matches = list(pattern.finditer(content))
|
||||
|
||||
for idx, match in enumerate(matches):
|
||||
start = match.start()
|
||||
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(content)
|
||||
block = content[start:end].strip()
|
||||
lines = block.splitlines()
|
||||
if not lines:
|
||||
continue
|
||||
name = lines[0].strip()
|
||||
port = next((p for p in self.ports if p.name == name), None)
|
||||
if not port:
|
||||
port = Port(name)
|
||||
self.ports.append(port)
|
||||
for line in lines:
|
||||
if "Current state:" in line:
|
||||
port.status = line.split(":")[-1].strip().lower()
|
||||
elif "Bandwidth:" in line:
|
||||
match_speed = re.search(r"Bandwidth:\s+(\d+) kbps", line)
|
||||
if match_speed:
|
||||
kbps = int(match_speed.group(1))
|
||||
if kbps >= 1_000_000:
|
||||
port.speed = f"{kbps / 1_000_000:.1f} Gbps"
|
||||
elif kbps >= 1_000:
|
||||
port.speed = f"{kbps / 1_000:.1f} Mbps"
|
||||
else:
|
||||
port.speed = f"{kbps} kbps"
|
||||
elif "PVID:" in line:
|
||||
match = re.search(r"PVID:\s+(\d+)", line)
|
||||
if match:
|
||||
port.vlan = int(match.group(1))
|
||||
|
||||
port.summary = f"Port {port.name} est {port.status.upper()} à {port.speed or 'vitesse inconnue'}, VLAN {port.vlan or '?'}"
|
||||
|
||||
def _apply_interface_config(self, ifname: str, lines: list):
|
||||
ifname = ifname.strip()
|
||||
port = next((p for p in self.ports if p.name == ifname), None)
|
||||
if not port:
|
||||
port = Port(ifname)
|
||||
self.ports.append(port)
|
||||
|
||||
is_trunk = False
|
||||
permit_vlans = []
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
match line:
|
||||
case l if l.startswith("port access vlan"):
|
||||
if m := re.search(r"port access vlan (\d+)", l):
|
||||
port.vlan = int(m.group(1))
|
||||
port.link_type = "access"
|
||||
|
||||
case l if l.startswith("pvid"):
|
||||
if m := re.search(r"pvid (\d+)", l):
|
||||
port.vlan = int(m.group(1))
|
||||
|
||||
case l if l.startswith("port link-type trunk"):
|
||||
is_trunk = True
|
||||
port.link_type = "trunk"
|
||||
|
||||
case l if l.startswith("undo port trunk permit vlan"):
|
||||
continue # ignorer
|
||||
|
||||
case l if l.startswith("port trunk permit vlan"):
|
||||
vlan_groups = re.findall(r"\d+(?:\s+to\s+\d+)?", l)
|
||||
for item in vlan_groups:
|
||||
parts = item.strip().split()
|
||||
if len(parts) == 3 and parts[1] == "to":
|
||||
start, end = int(parts[0]), int(parts[2])
|
||||
permit_vlans.extend(range(start, end + 1))
|
||||
elif len(parts) == 1:
|
||||
permit_vlans.append(int(parts[0]))
|
||||
|
||||
case l if "stp edged-port" in l:
|
||||
port.portfast = True
|
||||
|
||||
case l if "bpdu" in l:
|
||||
port.bpdu_protection = True
|
||||
|
||||
case l if "loopback-detection enable" in l:
|
||||
port.loopback_detection = True
|
||||
|
||||
if is_trunk and permit_vlans:
|
||||
port.trunk_vlans = sorted(set(permit_vlans))
|
||||
|
||||
if port.link_type == "trunk" and hasattr(port, "trunk_vlans"):
|
||||
vlan_info = f"TRUNK ({len(port.trunk_vlans)} VLANs)"
|
||||
else:
|
||||
vlan_info = f"VLAN {port.vlan or '?'}"
|
||||
|
||||
status = port.status.upper() if port.status else "INCONNU"
|
||||
speed = port.speed or "vitesse inconnue"
|
||||
port.summary = f"Port {port.name} est {status} à {speed}, {vlan_info}"
|
||||
|
||||
def _parse_display_current_config(self, content: str):
|
||||
current_interface = None
|
||||
block_lines = []
|
||||
vlan_id = None
|
||||
for line in content.splitlines():
|
||||
line = line.strip()
|
||||
match line:
|
||||
case l if l.startswith("sysname"):
|
||||
parts = l.split()
|
||||
if len(parts) >= 2:
|
||||
self.hostname = parts[1]
|
||||
|
||||
case l if l.startswith("vlan") and re.match(r"vlan \d+", l):
|
||||
vlan_id = int(l.split()[1])
|
||||
|
||||
case l if l.startswith("name") and vlan_id is not None:
|
||||
name = l.split("name")[-1].strip()
|
||||
self.vlans.append(VLAN(vlan_id, name))
|
||||
vlan_id = None
|
||||
|
||||
case l if l.startswith("irf-port"):
|
||||
continue
|
||||
|
||||
case l if "port group interface" in l:
|
||||
match = re.search(r"interface (\S+)", l)
|
||||
if match:
|
||||
portname = match.group(1)
|
||||
port = next((p for p in self.ports if p.name == portname), None)
|
||||
if not port:
|
||||
port = Port(portname)
|
||||
self.ports.append(port)
|
||||
port.link_type = "irf"
|
||||
port.summary = f"Port {port.name} est utilisé pour un lien IRF"
|
||||
|
||||
case l if l.startswith("interface"):
|
||||
if current_interface and block_lines:
|
||||
self._apply_interface_config(current_interface, block_lines)
|
||||
current_interface = l.split()[1]
|
||||
block_lines = []
|
||||
|
||||
case _ if current_interface:
|
||||
block_lines.append(line)
|
||||
|
||||
if current_interface and block_lines:
|
||||
self._apply_interface_config(current_interface, block_lines)
|
||||
Reference in New Issue
Block a user