Files
Parseurs_config_firewall/src/scripts/json_Stormshield.py

441 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Parser pour configuration Stormshield vers un format json normalisé OpenConfig YANG
Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles
"""
from dataclasses import dataclass
import os
import re
import json
from scripts.export_modele import ParserMixin
from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter
from pathlib import Path
class StormshieldParser(ParserMixin):
"""Parser pour dossier Stormshield"""
def __init__(self, base_dir: str):
"""Initialise le parser avec le chemin du dossier Stormshield"""
self.base_dir = base_dir
self.config = {
"address_objects": [],
"address_groups": [],
"service_objects": [],
"service_groups": [],
"interfaces": [],
"virtual_routers": [],
'static_routes': [],
"security_rules": [],
"default_route": None # Ajout Stormshield
}
def _parse_address_objects(self):
"""Parse le fichier object contenant [Host], [Network], [Service], etc."""
current_section = None
path = os.path.join(self.base_dir, "object")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
current_section = line.strip("[]")
continue
if current_section in ("Host", "Network", "Fqdn"):
match = re.match(r"([^=]+)=([^,#]+)", line)
if match:
name = match.group(1).strip()
ip = match.group(2).strip()
self.config["address_objects"].append(
AddressObject(name=name, ip_netmask=ip)
)
elif current_section == "Service":
match = re.match(r"([^=]+)=([\d\-]+)/(\w+)", line)
if match:
name = match.group(1).strip()
port = match.group(2)
proto = match.group(3).lower()
self.config["service_objects"].append(
ServiceObject(name=name, protocol=proto, port=port)
)
def _parse_address_groups(self):
"""Parse le fichier objectgroup pour groupes dadresses"""
current_group = None
members = []
path = os.path.join(self.base_dir, "objectgroup")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and "]" in line:
if current_group:
self.config["address_groups"].append(
AddressGroup(name=current_group, members=members)
)
current_group = line.split("]")[0][1:].strip()
members = []
continue
if current_group:
member = line.split("#")[0].strip()
if member:
members.append(member)
if current_group:
self.config["address_groups"].append(
AddressGroup(name=current_group, members=members)
)
def _parse_service_groups(self):
"""Parse le fichier servicegroup pour groupes de services"""
current_group = None
members = []
path = os.path.join(self.base_dir, "servicegroup")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and "]" in line:
if current_group:
self.config["service_groups"].append(
ServiceGroup(name=current_group, members=members)
)
current_group = line.split("]")[0][1:].strip()
members = []
continue
if current_group:
member = line.split("#")[0].strip()
if member:
members.append(member)
if current_group:
self.config["service_groups"].append(
ServiceGroup(name=current_group, members=members)
)
def _parse_interfaces(self):
"""Analyse du fichier network et extraction des interfaces"""
current_section = None
current_data = {}
path = os.path.join(self.base_dir, "network")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
if current_section and current_section.lower() != "config":
self._add_interface(current_section, current_data)
current_section = line.strip("[]")
current_data = {}
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.split("#")[0].strip()
current_data[key] = value
if current_section and current_section.lower() != "config":
self._add_interface(current_section, current_data)
def _add_interface(self, section_name, data):
"""Crée un objet Interface à partir dune section complète"""
if section_name.lower().startswith("vlan"):
iface_type = "vlan"
elif section_name.lower().startswith("bridge"):
iface_type = "bridge"
elif section_name.lower().startswith("wifi"):
iface_type = "wifi"
elif section_name.lower().startswith("loopback"):
iface_type = "loopback"
elif section_name.lower().startswith("agg"):
iface_type = "aggregate"
else:
iface_type = "ethernet"
enabled = data.get("State", "0") == "1"
name = data.get("Name", section_name)
ip = data.get("Address")
netmask = data.get("Mask")
misc = data.get("Misc")
comment = data.get("Comment", "")
if not name or name.lower() == "dynamic":
return
interface = Interface(
name=name,
ip=ip if ip and ip.upper() != "DHCP" else None,
netmask=netmask,
misc=int(misc) if misc and misc.isdigit() else None,
comment=comment,
interface_type=iface_type,
enabled=enabled,
)
self.config["interfaces"].append(interface)
def _parse_virtual_routers(self):
route_path = os.path.join(self.base_dir, "route")
if not os.path.exists(route_path):
return
virtual_routers = {}
default_route = None
section = None
with open(route_path, encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("off") or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
section = line.strip("[]")
continue
# ===== Default route =====
if section == "Config" and line.startswith("DefaultRoute="):
default_route = line.split("=", 1)[1].strip()
# ===== Static routes =====
elif section == "StaticRoutes":
parts = [p.strip() for p in line.split(",")]
if len(parts) < 2:
continue
destination = parts[0]
vr_name = None
interface = None
next_hop_ip = None
# Format : VR->NextHop
if "->" in parts[1]:
vr_name, next_hop_ip = map(str.strip, parts[1].split("->", 1))
interface = vr_name
else:
vr_name = parts[1]
interface = parts[1]
# Création VR si absent
if vr_name not in virtual_routers:
virtual_routers[vr_name] = {
"interfaces": set(),
"static_routes": []
}
virtual_routers[vr_name]["interfaces"].add(interface)
static_route = StaticRoute(
name=f"{vr_name}-static-{len(virtual_routers[vr_name]['static_routes']) + 1}",
destination=destination,
metric=None,
next_vr=None,
next_hop_ip=next_hop_ip,
interface=interface,
bfd_profile=None
)
virtual_routers[vr_name]["static_routes"].append(static_route)
# ===== Default VR (route par défaut) =====
if default_route:
vr_name = "default"
virtual_routers.setdefault(vr_name, {
"interfaces": set(),
"static_routes": []
})
virtual_routers[vr_name]["static_routes"].insert(
0,
StaticRoute(
name="default-route",
destination="0.0.0.0/0",
metric=1,
next_vr=None,
next_hop_ip=default_route,
interface=None,
bfd_profile=None
)
)
# ===== Construction finale =====
self.config["virtual_routers"] = [
VirtualRouter(
name=vr_name,
interfaces=list(data["interfaces"]),
static_routes=data["static_routes"]
)
for vr_name, data in virtual_routers.items()
]
# def _parse_slotinfo_file(self):
# path = os.path.join(self.base_dir, "Filter", "slotinfo")
# if not os.path.exists(path):
# return
# current = None
# with open(path, encoding="utf-8", errors="ignore") as f:
# for raw_line in f:
# line = raw_line.strip()
# if not line or line.startswith("#"):
# continue
# if line.startswith("[") and line.endswith("]"):
# current = line.strip("[]")
# continue
# if current and line.startswith("Name="):
# name = line.split("=", 1)[1].strip('"')
# if name:
# self.filter_slots[current] = name
def _parse_security_rules(self):
filter_dir = os.path.join(self.base_dir, "Filter")
if not os.path.exists(filter_dir):
return
rule_regex = re.compile(
r'^(?P<disabled>off\s+)?'
r'(?P<action>pass|block|nat)\b'
r'(?:\s+noconnlog)?'
r'(?:\s+all)?'
r'(?:\s+inspection)?'
r'(?:\s+firewall)?'
r'(?:\s+route\s+[^\s]+)?'
r'(?:\s+ipproto\s+[^\s]+)?'
r'(?:\s+type\s+\d+)?'
r'(?:\s+code\s+\d+)?'
r'(?:\s+proto\s+[^\s]+)?'
r'\s+from\s+(?P<src>[^\s]+)'
r'(?:\s+on\s+(?P<zone>[^\s]+))?'
r'\s+to\s+(?P<dst>[^\s]+)'
r'(?:\s+port\s+(?P<ports>[^\s]+))?'
r'(?:\s+rulename\s+"(?P<rulename>[^"]+)")?',
re.IGNORECASE
)
slotinfo_path = os.path.join(filter_dir, "slotinfo")
active_slot = "Global"
if os.path.exists(slotinfo_path):
with open(slotinfo_path, encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if line.startswith("Active="):
active_slot = line.split("=", 1)[1].strip()
break
for filename in sorted(os.listdir(filter_dir)):
if not re.fullmatch(r"\d{2}", filename):
continue
if filename.lstrip("0") != active_slot:
enabled = False
else:
enabled = True
path = os.path.join(filter_dir, filename)
with open(path, encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("separator") or line.startswith("["):
continue
match = rule_regex.match(line)
if not match:
continue
data = match.groupdict()
# ----- Détermination de l'action finale -----
action_raw = data["action"].lower()
if action_raw == "nat":
final_action = "nat"
elif action_raw == "pass":
final_action = "allow"
elif action_raw == "block":
final_action = "block"
else:
final_action = "unknown"
rule = SecurityRule(
name=data.get("rulename")
or f"rule_{filename}_{len(self.config['security_rules'])}",
from_zones=[data.get("zone")] if data.get("zone") else [],
to_zones=[],
source_addresses=[data.get("src") or "any"],
destination_addresses=[data.get("dst") or "any"],
services=data["ports"].split("|") if data.get("ports") else [],
applications=["any"],
action=final_action,
description=self._extract_comment(raw_line),
enabled=enabled
)
self.config["security_rules"].append(rule)
def _extract_comment(self, line: str) -> str:
"""Extrait un commentaire après #"""
if "#" in line:
return line.split("#", 1)[1].strip()
return ""
def parse_all(self):
print("🔍 Parsing de la configuration Stormshield...")
self._parse_virtual_routers()
self._parse_address_objects()
self._parse_address_groups()
self._parse_service_groups()
self._parse_interfaces()
self._parse_security_rules()
print("✓ Parsing terminé avec succès!")
def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("stormshield")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False)
print(f"✓ Configuration exportée vers: {output_file}")
def generate_json_stormshield(input_dir: str, output_file: str):
input_dir = input_dir + "/usr/Firewall/ConfigFiles/"
parser = StormshieldParser(input_dir)
parser.parse_all()
parser.print_summary()
output_file = output_file
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python json_Stormshield.py <dossier>")
sys.exit(1)
input_dir = sys.argv[1]
if not Path(input_dir).exists():
print(f"Erreur : dossier '{input_dir}' inexistant")
sys.exit(1)
output_path = Path("output_Stormshield.json")
generate_json_stormshield(input_dir, str(output_path))