add project
This commit is contained in:
405
src/scripts/json_Stormshield.py
Normal file
405
src/scripts/json_Stormshield.py
Normal file
@@ -0,0 +1,405 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Parser pour configuration Stormshield vers un format json normalisé OpenConfig YANG
|
||||
Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
from scripts.export_modele import ParserMixin
|
||||
from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter
|
||||
from pathlib import Path
|
||||
|
||||
class StormshieldParser(ParserMixin):
|
||||
"""Parser pour dossier Stormshield"""
|
||||
def __init__(self, base_dir: str):
|
||||
"""Initialise le parser avec le chemin du dossier Stormshield"""
|
||||
self.base_dir = base_dir
|
||||
|
||||
self.config = {
|
||||
"address_objects": [],
|
||||
"address_groups": [],
|
||||
"service_objects": [],
|
||||
"service_groups": [],
|
||||
"interfaces": [],
|
||||
"virtual_routers": [],
|
||||
'static_routes': [],
|
||||
"security_rules": [],
|
||||
"default_route": None # Ajout Stormshield
|
||||
}
|
||||
|
||||
|
||||
def _parse_address_objects(self):
|
||||
"""Parse le fichier object contenant [Host], [Network], [Service], etc."""
|
||||
current_section = None
|
||||
path = os.path.join(self.base_dir, "object")
|
||||
|
||||
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and line.endswith("]"):
|
||||
current_section = line.strip("[]")
|
||||
continue
|
||||
|
||||
if current_section in ("Host", "Network", "Fqdn"):
|
||||
match = re.match(r"([^=]+)=([^,#]+)", line)
|
||||
if match:
|
||||
name = match.group(1).strip()
|
||||
ip = match.group(2).strip()
|
||||
self.config["address_objects"].append(
|
||||
AddressObject(name=name, ip_netmask=ip)
|
||||
)
|
||||
|
||||
elif current_section == "Service":
|
||||
match = re.match(r"([^=]+)=([\d\-]+)/(\w+)", line)
|
||||
if match:
|
||||
name = match.group(1).strip()
|
||||
port = match.group(2)
|
||||
proto = match.group(3).lower()
|
||||
self.config["service_objects"].append(
|
||||
ServiceObject(name=name, protocol=proto, port=port)
|
||||
)
|
||||
|
||||
def _parse_address_groups(self):
|
||||
"""Parse le fichier objectgroup pour groupes d’adresses"""
|
||||
current_group = None
|
||||
members = []
|
||||
path = os.path.join(self.base_dir, "objectgroup")
|
||||
|
||||
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and "]" in line:
|
||||
if current_group:
|
||||
self.config["address_groups"].append(
|
||||
AddressGroup(name=current_group, members=members)
|
||||
)
|
||||
current_group = line.split("]")[0][1:].strip()
|
||||
members = []
|
||||
continue
|
||||
|
||||
if current_group:
|
||||
member = line.split("#")[0].strip()
|
||||
if member:
|
||||
members.append(member)
|
||||
|
||||
if current_group:
|
||||
self.config["address_groups"].append(
|
||||
AddressGroup(name=current_group, members=members)
|
||||
)
|
||||
|
||||
def _parse_service_groups(self):
|
||||
"""Parse le fichier servicegroup pour groupes de services"""
|
||||
current_group = None
|
||||
members = []
|
||||
path = os.path.join(self.base_dir, "servicegroup")
|
||||
|
||||
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and "]" in line:
|
||||
if current_group:
|
||||
self.config["service_groups"].append(
|
||||
ServiceGroup(name=current_group, members=members)
|
||||
)
|
||||
current_group = line.split("]")[0][1:].strip()
|
||||
members = []
|
||||
continue
|
||||
|
||||
if current_group:
|
||||
member = line.split("#")[0].strip()
|
||||
if member:
|
||||
members.append(member)
|
||||
|
||||
if current_group:
|
||||
self.config["service_groups"].append(
|
||||
ServiceGroup(name=current_group, members=members)
|
||||
)
|
||||
|
||||
def _parse_interfaces(self):
|
||||
"""Analyse du fichier network et extraction des interfaces"""
|
||||
current_section = None
|
||||
current_data = {}
|
||||
path = os.path.join(self.base_dir, "network")
|
||||
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
for raw_line in f:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and line.endswith("]"):
|
||||
if current_section and current_section.lower() != "config":
|
||||
self._add_interface(current_section, current_data)
|
||||
current_section = line.strip("[]")
|
||||
current_data = {}
|
||||
continue
|
||||
|
||||
if "=" in line:
|
||||
key, value = line.split("=", 1)
|
||||
key = key.strip()
|
||||
value = value.split("#")[0].strip()
|
||||
current_data[key] = value
|
||||
|
||||
if current_section and current_section.lower() != "config":
|
||||
self._add_interface(current_section, current_data)
|
||||
|
||||
def _add_interface(self, section_name, data):
|
||||
"""Crée un objet Interface à partir d’une section complète"""
|
||||
if section_name.lower().startswith("vlan"):
|
||||
iface_type = "vlan"
|
||||
elif section_name.lower().startswith("bridge"):
|
||||
iface_type = "bridge"
|
||||
elif section_name.lower().startswith("wifi"):
|
||||
iface_type = "wifi"
|
||||
elif section_name.lower().startswith("loopback"):
|
||||
iface_type = "loopback"
|
||||
elif section_name.lower().startswith("agg"):
|
||||
iface_type = "aggregate"
|
||||
else:
|
||||
iface_type = "ethernet"
|
||||
|
||||
enabled = data.get("State", "0") == "1"
|
||||
name = data.get("Name", section_name)
|
||||
ip = data.get("Address")
|
||||
netmask = data.get("Mask")
|
||||
misc = data.get("Misc")
|
||||
comment = data.get("Comment", "")
|
||||
|
||||
if not name or name.lower() == "dynamic":
|
||||
return
|
||||
|
||||
interface = Interface(
|
||||
name=name,
|
||||
ip=ip if ip and ip.upper() != "DHCP" else None,
|
||||
netmask=netmask,
|
||||
misc=int(misc) if misc and misc.isdigit() else None,
|
||||
comment=comment,
|
||||
interface_type=iface_type,
|
||||
enabled=enabled,
|
||||
)
|
||||
|
||||
self.config["interfaces"].append(interface)
|
||||
|
||||
def _parse_virtual_routers(self):
|
||||
route_path = os.path.join(self.base_dir, "route")
|
||||
if not os.path.exists(route_path):
|
||||
return
|
||||
|
||||
section = None
|
||||
with open(route_path, encoding="utf-8", errors="ignore") as f:
|
||||
for raw_line in f:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and line.endswith("]"):
|
||||
section = line.strip("[]")
|
||||
continue
|
||||
|
||||
if section == "Config" and line.startswith("DefaultRoute="):
|
||||
self.config["default_route"] = line.split("=", 1)[1].strip()
|
||||
|
||||
elif section == "StaticRoutes" and not line.startswith("#"):
|
||||
parts = line.split(",")
|
||||
if len(parts) >= 2:
|
||||
self.config["static_routes"].append({
|
||||
"destination": parts[0],
|
||||
"interface": parts[1],
|
||||
"extra": parts[2:] if len(parts) > 2 else []
|
||||
})
|
||||
|
||||
static_routes = []
|
||||
if self.config["default_route"]:
|
||||
static_routes.append(
|
||||
StaticRoute(
|
||||
name="default-route",
|
||||
destination="0.0.0.0/0",
|
||||
metric=1,
|
||||
next_hop_ip=self.config["default_route"],
|
||||
interface=None
|
||||
)
|
||||
)
|
||||
|
||||
for idx, route in enumerate(self.config["static_routes"]):
|
||||
static_routes.append(
|
||||
StaticRoute(
|
||||
name=f"static-{idx+1}",
|
||||
destination=route["destination"],
|
||||
metric=1,
|
||||
next_hop_ip=None,
|
||||
interface=route["interface"]
|
||||
)
|
||||
)
|
||||
|
||||
vr = VirtualRouter(
|
||||
name="default-vr",
|
||||
interfaces=[r["interface"] for r in self.config["static_routes"]],
|
||||
static_routes=static_routes
|
||||
)
|
||||
|
||||
self.config["virtual_routers"] = [vr]
|
||||
|
||||
# def _parse_slotinfo_file(self):
|
||||
# path = os.path.join(self.base_dir, "Filter", "slotinfo")
|
||||
# if not os.path.exists(path):
|
||||
# return
|
||||
# current = None
|
||||
# with open(path, encoding="utf-8", errors="ignore") as f:
|
||||
# for raw_line in f:
|
||||
# line = raw_line.strip()
|
||||
# if not line or line.startswith("#"):
|
||||
# continue
|
||||
# if line.startswith("[") and line.endswith("]"):
|
||||
# current = line.strip("[]")
|
||||
# continue
|
||||
# if current and line.startswith("Name="):
|
||||
# name = line.split("=", 1)[1].strip('"')
|
||||
# if name:
|
||||
# self.filter_slots[current] = name
|
||||
|
||||
def _parse_security_rules(self):
|
||||
filter_dir = os.path.join(self.base_dir, "Filter")
|
||||
if not os.path.exists(filter_dir):
|
||||
return
|
||||
|
||||
rule_regex = re.compile(
|
||||
r'^(?P<disabled>off\s+)?'
|
||||
r'(?P<action>pass|block|nat)\b'
|
||||
r'(?:\s+noconnlog)?'
|
||||
r'(?:\s+all)?'
|
||||
r'(?:\s+inspection)?'
|
||||
r'(?:\s+firewall)?'
|
||||
r'(?:\s+route\s+[^\s]+)?'
|
||||
r'(?:\s+ipproto\s+[^\s]+)?'
|
||||
r'(?:\s+type\s+\d+)?'
|
||||
r'(?:\s+code\s+\d+)?'
|
||||
r'(?:\s+proto\s+[^\s]+)?'
|
||||
r'\s+from\s+(?P<src>[^\s]+)'
|
||||
r'(?:\s+on\s+(?P<zone>[^\s]+))?'
|
||||
r'\s+to\s+(?P<dst>[^\s]+)'
|
||||
r'(?:\s+port\s+(?P<ports>[^\s]+))?'
|
||||
r'(?:\s+rulename\s+"(?P<rulename>[^"]+)")?',
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
slotinfo_path = os.path.join(filter_dir, "slotinfo")
|
||||
active_slot = "Global"
|
||||
if os.path.exists(slotinfo_path):
|
||||
with open(slotinfo_path, encoding="utf-8", errors="ignore") as f:
|
||||
for raw_line in f:
|
||||
line = raw_line.strip()
|
||||
if line.startswith("Active="):
|
||||
active_slot = line.split("=", 1)[1].strip()
|
||||
break
|
||||
|
||||
for filename in sorted(os.listdir(filter_dir)):
|
||||
if not re.fullmatch(r"\d{2}", filename):
|
||||
continue
|
||||
|
||||
if filename.lstrip("0") != active_slot:
|
||||
enabled = False
|
||||
else:
|
||||
enabled = True
|
||||
|
||||
path = os.path.join(filter_dir, filename)
|
||||
with open(path, encoding="utf-8", errors="ignore") as f:
|
||||
for raw_line in f:
|
||||
line = raw_line.strip()
|
||||
|
||||
if not line or line.startswith("separator") or line.startswith("["):
|
||||
continue
|
||||
|
||||
match = rule_regex.match(line)
|
||||
if not match:
|
||||
continue
|
||||
|
||||
data = match.groupdict()
|
||||
|
||||
# ----- Détermination de l'action finale -----
|
||||
action_raw = data["action"].lower()
|
||||
|
||||
if action_raw == "nat":
|
||||
final_action = "nat"
|
||||
elif action_raw == "pass":
|
||||
final_action = "allow"
|
||||
elif action_raw == "block":
|
||||
final_action = "block"
|
||||
else:
|
||||
final_action = "unknown"
|
||||
|
||||
rule = SecurityRule(
|
||||
name=data.get("rulename")
|
||||
or f"rule_{filename}_{len(self.config['security_rules'])}",
|
||||
from_zones=[data.get("zone")] if data.get("zone") else [],
|
||||
to_zones=[],
|
||||
source_addresses=[data.get("src") or "any"],
|
||||
destination_addresses=[data.get("dst") or "any"],
|
||||
services=data["ports"].split("|") if data.get("ports") else [],
|
||||
applications=["any"],
|
||||
action=final_action,
|
||||
description=self._extract_comment(raw_line),
|
||||
enabled=enabled
|
||||
)
|
||||
|
||||
self.config["security_rules"].append(rule)
|
||||
|
||||
|
||||
def _extract_comment(self, line: str) -> str:
|
||||
"""Extrait un commentaire après #"""
|
||||
if "#" in line:
|
||||
return line.split("#", 1)[1].strip()
|
||||
return ""
|
||||
|
||||
def parse_all(self):
|
||||
print("🔍 Parsing de la configuration Stormshield...")
|
||||
self._parse_virtual_routers()
|
||||
self._parse_address_objects()
|
||||
self._parse_address_groups()
|
||||
self._parse_service_groups()
|
||||
self._parse_interfaces()
|
||||
self._parse_security_rules()
|
||||
print("✓ Parsing terminé avec succès!")
|
||||
|
||||
def export_to_json(self, output_file: str):
|
||||
"""Exporte la configuration au format JSON OpenConfig"""
|
||||
openconfig_data = self.to_openconfig_yang("stormshield")
|
||||
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(openconfig_data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"✓ Configuration exportée vers: {output_file}")
|
||||
|
||||
def generate_json_stormshield(input_dir: str, output_file: str):
|
||||
input_dir = input_dir + "/usr/Firewall/ConfigFiles/"
|
||||
parser = StormshieldParser(input_dir)
|
||||
parser.parse_all()
|
||||
parser.print_summary()
|
||||
output_file = output_file
|
||||
parser.export_to_json(output_file)
|
||||
|
||||
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python json_Stormshield.py <dossier>")
|
||||
sys.exit(1)
|
||||
|
||||
input_dir = sys.argv[1]
|
||||
if not Path(input_dir).exists():
|
||||
print(f"Erreur : dossier '{input_dir}' inexistant")
|
||||
sys.exit(1)
|
||||
|
||||
output_path = Path("output_Stormshield.json")
|
||||
generate_json_stormshield(input_dir, str(output_path))
|
||||
Reference in New Issue
Block a user