#!/usr/bin/env python3 """ Parser pour configuration Stormshield vers un format json normalisé OpenConfig YANG Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles """ from dataclasses import dataclass import os import re import json from scripts.export_modele import ParserMixin from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter from pathlib import Path class StormshieldParser(ParserMixin): """Parser pour dossier Stormshield""" def __init__(self, base_dir: str): """Initialise le parser avec le chemin du dossier Stormshield""" self.base_dir = base_dir self.config = { "address_objects": [], "address_groups": [], "service_objects": [], "service_groups": [], "interfaces": [], "virtual_routers": [], 'static_routes': [], "security_rules": [], "default_route": None # Ajout Stormshield } def _parse_address_objects(self): """Parse le fichier object contenant [Host], [Network], [Service], etc.""" current_section = None path = os.path.join(self.base_dir, "object") with open(path, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if line.startswith("[") and line.endswith("]"): current_section = line.strip("[]") continue if current_section in ("Host", "Network", "Fqdn"): match = re.match(r"([^=]+)=([^,#]+)", line) if match: name = match.group(1).strip() ip = match.group(2).strip() self.config["address_objects"].append( AddressObject(name=name, ip_netmask=ip) ) elif current_section == "Service": match = re.match(r"([^=]+)=([\d\-]+)/(\w+)", line) if match: name = match.group(1).strip() port = match.group(2) proto = match.group(3).lower() self.config["service_objects"].append( ServiceObject(name=name, protocol=proto, port=port) ) def _parse_address_groups(self): """Parse le fichier objectgroup pour groupes d’adresses""" current_group = None members = [] path = os.path.join(self.base_dir, "objectgroup") with open(path, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if line.startswith("[") and "]" in line: if current_group: self.config["address_groups"].append( AddressGroup(name=current_group, members=members) ) current_group = line.split("]")[0][1:].strip() members = [] continue if current_group: member = line.split("#")[0].strip() if member: members.append(member) if current_group: self.config["address_groups"].append( AddressGroup(name=current_group, members=members) ) def _parse_service_groups(self): """Parse le fichier servicegroup pour groupes de services""" current_group = None members = [] path = os.path.join(self.base_dir, "servicegroup") with open(path, "r", encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if line.startswith("[") and "]" in line: if current_group: self.config["service_groups"].append( ServiceGroup(name=current_group, members=members) ) current_group = line.split("]")[0][1:].strip() members = [] continue if current_group: member = line.split("#")[0].strip() if member: members.append(member) if current_group: self.config["service_groups"].append( ServiceGroup(name=current_group, members=members) ) def _parse_interfaces(self): """Analyse du fichier network et extraction des interfaces""" current_section = None current_data = {} path = os.path.join(self.base_dir, "network") with open(path, "r", encoding="utf-8", errors="ignore") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("#"): continue if line.startswith("[") and line.endswith("]"): if current_section and current_section.lower() != "config": self._add_interface(current_section, current_data) current_section = line.strip("[]") current_data = {} continue if "=" in line: key, value = line.split("=", 1) key = key.strip() value = value.split("#")[0].strip() current_data[key] = value if current_section and current_section.lower() != "config": self._add_interface(current_section, current_data) def _add_interface(self, section_name, data): """Crée un objet Interface à partir d’une section complète""" if section_name.lower().startswith("vlan"): iface_type = "vlan" elif section_name.lower().startswith("bridge"): iface_type = "bridge" elif section_name.lower().startswith("wifi"): iface_type = "wifi" elif section_name.lower().startswith("loopback"): iface_type = "loopback" elif section_name.lower().startswith("agg"): iface_type = "aggregate" else: iface_type = "ethernet" enabled = data.get("State", "0") == "1" name = data.get("Name", section_name) ip = data.get("Address") netmask = data.get("Mask") misc = data.get("Misc") comment = data.get("Comment", "") if not name or name.lower() == "dynamic": return interface = Interface( name=name, ip=ip if ip and ip.upper() != "DHCP" else None, netmask=netmask, misc=int(misc) if misc and misc.isdigit() else None, comment=comment, interface_type=iface_type, enabled=enabled, ) self.config["interfaces"].append(interface) def _parse_virtual_routers(self): route_path = os.path.join(self.base_dir, "route") if not os.path.exists(route_path): return virtual_routers = {} default_route = None section = None with open(route_path, encoding="utf-8", errors="ignore") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("off") or line.startswith("#"): continue if line.startswith("[") and line.endswith("]"): section = line.strip("[]") continue # ===== Default route ===== if section == "Config" and line.startswith("DefaultRoute="): default_route = line.split("=", 1)[1].strip() # ===== Static routes ===== elif section == "StaticRoutes": parts = [p.strip() for p in line.split(",")] if len(parts) < 2: continue destination = parts[0] vr_name = None interface = None next_hop_ip = None # Format : VR->NextHop if "->" in parts[1]: vr_name, next_hop_ip = map(str.strip, parts[1].split("->", 1)) interface = vr_name else: vr_name = parts[1] interface = parts[1] # Création VR si absent if vr_name not in virtual_routers: virtual_routers[vr_name] = { "interfaces": set(), "static_routes": [] } virtual_routers[vr_name]["interfaces"].add(interface) static_route = StaticRoute( name=f"{vr_name}-static-{len(virtual_routers[vr_name]['static_routes']) + 1}", destination=destination, metric=None, next_vr=None, next_hop_ip=next_hop_ip, interface=interface, bfd_profile=None ) virtual_routers[vr_name]["static_routes"].append(static_route) # ===== Default VR (route par défaut) ===== if default_route: vr_name = "default" virtual_routers.setdefault(vr_name, { "interfaces": set(), "static_routes": [] }) virtual_routers[vr_name]["static_routes"].insert( 0, StaticRoute( name="default-route", destination="0.0.0.0/0", metric=1, next_vr=None, next_hop_ip=default_route, interface=None, bfd_profile=None ) ) # ===== Construction finale ===== self.config["virtual_routers"] = [ VirtualRouter( name=vr_name, interfaces=list(data["interfaces"]), static_routes=data["static_routes"] ) for vr_name, data in virtual_routers.items() ] # def _parse_slotinfo_file(self): # path = os.path.join(self.base_dir, "Filter", "slotinfo") # if not os.path.exists(path): # return # current = None # with open(path, encoding="utf-8", errors="ignore") as f: # for raw_line in f: # line = raw_line.strip() # if not line or line.startswith("#"): # continue # if line.startswith("[") and line.endswith("]"): # current = line.strip("[]") # continue # if current and line.startswith("Name="): # name = line.split("=", 1)[1].strip('"') # if name: # self.filter_slots[current] = name def _parse_security_rules(self): filter_dir = os.path.join(self.base_dir, "Filter") if not os.path.exists(filter_dir): return rule_regex = re.compile( r'^(?Poff\s+)?' r'(?Ppass|block|nat)\b' r'(?:\s+noconnlog)?' r'(?:\s+all)?' r'(?:\s+inspection)?' r'(?:\s+firewall)?' r'(?:\s+route\s+[^\s]+)?' r'(?:\s+ipproto\s+[^\s]+)?' r'(?:\s+type\s+\d+)?' r'(?:\s+code\s+\d+)?' r'(?:\s+proto\s+[^\s]+)?' r'\s+from\s+(?P[^\s]+)' r'(?:\s+on\s+(?P[^\s]+))?' r'\s+to\s+(?P[^\s]+)' r'(?:\s+port\s+(?P[^\s]+))?' r'(?:\s+rulename\s+"(?P[^"]+)")?', re.IGNORECASE ) slotinfo_path = os.path.join(filter_dir, "slotinfo") active_slot = "Global" if os.path.exists(slotinfo_path): with open(slotinfo_path, encoding="utf-8", errors="ignore") as f: for raw_line in f: line = raw_line.strip() if line.startswith("Active="): active_slot = line.split("=", 1)[1].strip() break for filename in sorted(os.listdir(filter_dir)): if not re.fullmatch(r"\d{2}", filename): continue if filename.lstrip("0") != active_slot: enabled = False else: enabled = True path = os.path.join(filter_dir, filename) with open(path, encoding="utf-8", errors="ignore") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("separator") or line.startswith("["): continue match = rule_regex.match(line) if not match: continue data = match.groupdict() # ----- Détermination de l'action finale ----- action_raw = data["action"].lower() if action_raw == "nat": final_action = "nat" elif action_raw == "pass": final_action = "allow" elif action_raw == "block": final_action = "block" else: final_action = "unknown" rule = SecurityRule( name=data.get("rulename") or f"rule_{filename}_{len(self.config['security_rules'])}", from_zones=[data.get("zone")] if data.get("zone") else [], to_zones=[], source_addresses=[data.get("src") or "any"], destination_addresses=[data.get("dst") or "any"], services=data["ports"].split("|") if data.get("ports") else [], applications=["any"], action=final_action, description=self._extract_comment(raw_line), enabled=enabled ) self.config["security_rules"].append(rule) def _extract_comment(self, line: str) -> str: """Extrait un commentaire après #""" if "#" in line: return line.split("#", 1)[1].strip() return "" def parse_all(self): print("🔍 Parsing de la configuration Stormshield...") self._parse_virtual_routers() self._parse_address_objects() self._parse_address_groups() self._parse_service_groups() self._parse_interfaces() self._parse_security_rules() print("✓ Parsing terminé avec succès!") def export_to_json(self, output_file: str): """Exporte la configuration au format JSON OpenConfig""" openconfig_data = self.to_openconfig_yang("stormshield") with open(output_file, 'w', encoding='utf-8') as f: json.dump(openconfig_data, f, indent=2, ensure_ascii=False) print(f"✓ Configuration exportée vers: {output_file}") def generate_json_stormshield(input_dir: str, output_file: str): input_dir = input_dir + "/usr/Firewall/ConfigFiles/" parser = StormshieldParser(input_dir) parser.parse_all() parser.print_summary() output_file = output_file parser.export_to_json(output_file) print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}") if __name__ == "__main__": import sys if len(sys.argv) != 2: print("Usage: python json_Stormshield.py ") sys.exit(1) input_dir = sys.argv[1] if not Path(input_dir).exists(): print(f"Erreur : dossier '{input_dir}' inexistant") sys.exit(1) output_path = Path("output_Stormshield.json") generate_json_stormshield(input_dir, str(output_path))