Files
Analyse_Reseaux/Parseurs_config_Firewall/src/scripts/json_Forcepoint.py
2026-01-15 13:54:56 +01:00

246 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
Parser pour configuration Forcepoint XML vers un format json normalisé OpenConfig YANG
Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles
"""
import xml.etree.ElementTree as ET
import json
from scripts.export_modele import ParserMixin
from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter
from pathlib import Path
class ForcepointParser(ParserMixin):
"""Parser pour fichier XML Forcepoint"""
def __init__(self, xml_file_path: str):
"""Initialise le parser avec le chemin du fichier XML"""
self.xml_file = xml_file_path
self.tree = None
self.root = None
self.config = {
'address_objects': [],
'address_groups': [],
'service_objects': [],
'service_groups': [],
'interfaces': [],
'virtual_routers': [],
'static_routes': [],
'security_rules': []
}
def load_xml(self):
"""Charge le fichier XML"""
try:
self.tree = ET.parse(self.xml_file)
self.root = self.tree.getroot()
print(f"✓ Fichier XML chargé: {self.xml_file}")
except ET.ParseError as e:
print(f"✗ Erreur de parsing XML: {e}")
raise
except FileNotFoundError:
print(f"✗ Fichier non trouvé: {self.xml_file}")
raise
def _parse_address_objects(self):
"""Parse hosts, networks, address ranges et ip_lists"""
for obj in self.root.findall(".//host"):
name = obj.get("name")
ip = obj.find("mvia_address")
ip_addr = ip.get("address") if ip is not None else None
desc = obj.get("comment") or None
self.config['address_objects'].append(AddressObject(name=name, ip_netmask=ip_addr, description=desc))
for obj in self.root.findall(".//network"):
name = obj.get("name")
ip_netmask = obj.get("ipv4_network")
comment = obj.get("comment")
self.config['address_objects'].append(AddressObject(name=name, ip_netmask=ip_netmask, description=comment))
for obj in self.root.findall(".//address_range"):
name = obj.get("name")
ip_range = obj.get("ip_range")
comment = obj.get("comment")
self.config['address_objects'].append(AddressObject(name=name, ip_netmask=ip_range, description=comment))
for obj in self.root.findall(".//ip_list"):
name = obj.get("name")
ips = [ip.get("value") for ip in obj.findall(".//ip")]
for ip in ips:
self.config['address_objects'].append(AddressObject(name=f"{name}_{ip}", ip_netmask=ip))
def _parse_address_groups(self):
"""Parse les groupes d'adresses"""
for group in self.root.findall(".//group"):
name = group.get("name")
desc = group.get("comment") or None
members = [m.get("ref") for m in group.findall(".//ne_list")]
self.config['address_groups'].append(AddressGroup(name=name, members=members, description=desc))
def _parse_service_objects(self):
"""Parse services TCP/UDP"""
for svc in self.root.findall(".//service_tcp"):
name = svc.get("name")
port = svc.get("min_dst_port")
proto = "tcp"
self.config['service_objects'].append(ServiceObject(name=name, protocol=proto, port=port))
for svc in self.root.findall(".//service_udp"):
name = svc.get("name")
port = svc.get("min_dst_port")
proto = "udp"
self.config['service_objects'].append(ServiceObject(name=name, protocol=proto, port=port))
def _parse_interfaces(self):
"""Parse interfaces de cluster"""
for iface in self.root.findall(".//cluster_virtual_interface"):
name = iface.get("name")
iptmp = iface.find("mvia_address")
ip_addr = iptmp.get("address") if iptmp is not None else None
net = iface.get("network_value")
netmask = net.split("/")[-1] if net else None
comment = iface.get("comment")
self.config['interfaces'].append(Interface(name=name, ip=ip_addr, netmask=netmask, comment=comment))
def _parse_virtual_routers(self):
"""Parse les virtual routers avec interfaces et routes statiques"""
for router in self.root.findall(".//routing_node"):
vr_name = router.get("name")
interfaces = []
static_routes = []
for iface in router.findall(".//interface_rn_level"):
iface_name = iface.get("name", None)
for network in iface.findall(".//network_rn_level"):
for gw in network.findall(".//gateway_rn_level"):
for ip_entry in gw.findall(".//ipaddress_behind_router"):
gw_ip = ip_entry.get("gateway")
ip_cidr = ip_entry.get("ipaddress")
static_routes.append(
StaticRoute(
name=f"{vr_name}_{ip_cidr}",
destination=ip_cidr,
metric=0,
next_hop_ip=gw_ip,
interface=iface_name
)
)
if gw_ip:
interfaces.append(gw_ip)
interfaces = list(set(interfaces))
self.config['virtual_routers'].append(
VirtualRouter(
name=vr_name,
interfaces=interfaces,
static_routes=static_routes
)
)
def _parse_security_rules(self):
"""Parse règles FW Forcepoint"""
for rule_entry in self.root.findall(".//fw_policy//rule_entry"):
rule_type = rule_entry.find("./access_rule") or rule_entry.find("./nat_rule")
if rule_type is None:
continue
name = rule_entry.get("tag")
action = rule_type.find("./action")
action_type = action.get("type") if action is not None else "undefined"
sources = [s.get("value") for s in rule_type.findall(".//match_sources//match_source_ref")]
destinations = [d.get("value") for d in rule_type.findall(".//match_destinations//match_destination_ref")]
services = [s.get("value") for s in rule_type.findall(".//match_services//match_service_ref")]
enabled_attr = rule_entry.get("is_disabled")
enabled = False if enabled_attr == "true" else True
desc = rule_entry.get("comment") or None
self.config['security_rules'].append(
SecurityRule(
name=f"Rule_{name}",
source_addresses=sources,
destination_addresses=destinations,
services=services,
action=action_type,
from_zones=[],
to_zones=[],
applications=[],
description=desc,
enabled=enabled
)
)
def parse_all(self):
"""Parse tous les éléments de configuration"""
print("Début du parsing de la configuration Forcepoint...")
self.load_xml()
self._parse_address_objects()
self._parse_address_groups()
self._parse_service_objects()
self._parse_interfaces()
self._parse_virtual_routers()
self._parse_security_rules()
print("✓ Parsing terminé avec succès!")
def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("forcepoint")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False)
print(f"✓ Configuration exportée vers: {output_file}")
def generate_json_forcepoint(input_file: str, output_file: str):
"""Génère le fichier JSON OpenConfig à partir du fichier XML Forcepoint"""
xml_file = Path(input_file)
if not xml_file.exists():
print(f"✗ Erreur: Le fichier '{xml_file}' n'existe pas")
return
try:
parser = ForcepointParser(str(xml_file))
parser.parse_all()
parser.print_summary()
output_file = output_file
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
except Exception as e:
print(f"✗ Erreur: {e}")
return
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python json_Forcepoint.py <fichier_config.xml>")
print("Exemple: python json_Forcepoint.py config_Forcepoint.xml")
sys.exit(1)
xml_file = sys.argv[1]
if not Path(xml_file).exists():
print(f"✗ Erreur: Le fichier '{xml_file}' n'existe pas")
sys.exit(1)
try:
parser = ForcepointParser(xml_file)
parser.parse_all()
parser.print_summary()
input_path = Path(xml_file)
output_file = 'output_Forcepoint.json'
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
except Exception as e:
print(f"✗ Erreur: {e}")
sys.exit(1)