import project

This commit is contained in:
Chevallier
2026-01-08 13:51:16 +01:00
commit df6334febc
40 changed files with 17440 additions and 0 deletions

View File

@@ -0,0 +1,214 @@
import json
import re
import pandas as pd
import ipaddress
import time
from collections import deque
from openpyxl import load_workbook
from scripts.style_excel.style_address_groups import style_address_groups
from scripts.style_excel.style_service_groups import style_service_groups
from scripts.style_excel.style_matrice_flux import style_matrice_flux
def export_to_excel(json_file_path, output_file_excel):
"""
Export firewall data from JSON to Excel
Args:
json_file_path: Path to the JSON file to process
output_file_excel: Path to the output Excel file
"""
# === Load JSON ===
with open(json_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
acls = data.get("openconfig-acl:acl", {}).get("acl-sets", {}).get("acl-set", [])
objects = data.get("custom-firewall-objects:firewall-objects", {})
addresses = {a["name"]: a["config"] for a in objects.get("address", [])}
address_groups = {g["name"]: g["config"] for g in objects.get("address-group", [])}
services = {s["name"]: s["config"] for s in objects.get("service", [])}
service_groups = {g["name"]: g["config"] for g in objects.get("service-group", [])}
# === Load site mapping ===
with open("src/data/site.json", "r", encoding="utf-8") as f:
site_mapping = json.load(f)
# === Build flow matrix helpers ===
def get_tier_from_tags(tags):
s = ""
for t in ["T0","T1A","T1B","T2A","T2B"]:
if any(t in str(tag) for tag in tags): s += t + ","
return s[:-1] if s else ""
# === Extract tags without tier info ===
def get_tags_without_tier(text):
if not text: return ""
tags = re.findall(r"Misc\s*:\s*(.*)", text)
if not tags: return ""
tag_list = [tag.strip() for tag in tags[0].split(",")]
filtered_tags = [tag for tag in tag_list if tag not in ["T0","T1A","T1B","T2A","T2B"]]
return ", ".join(filtered_tags)
# === Resolve equipment to members ===
def resolve_equipment(name):
if str(name).lower() == "any": return ["any"]
if name in address_groups: return address_groups[name].get("members", [])
return [name]
# === Resolve equipment to IP/mask ===
def resolve_ip_masks(equipment_list):
if not equipment_list:
return ""
clean = str(equipment_list).replace("[","").replace("]","").replace("'","").replace('"',"")
eq_list = [eq.strip() for eq in clean.split(",") if eq.strip()]
cidrs = []
for eq in eq_list:
if eq in addresses:
ip = addresses[eq].get("ip_netmask","")
if ip:
cidrs.append(ip if "/" in ip else ip+"/32")
return ", ".join(cidrs)
# === Recursive address resolver ===
def resolve_all_addresses(equipment_list):
resolved = set()
eq_list = []
for item in equipment_list:
if isinstance(item, str):
if ',' in item:
eq_list.extend([eq.strip() for eq in item.split(',') if eq.strip()])
else:
eq_list.append(item.strip())
else:
eq_list.append(item)
to_process = deque(eq_list)
processed = set()
while to_process:
eq = to_process.popleft()
if eq in processed: continue
processed.add(eq)
if eq in addresses:
resolved.add(eq)
elif eq in address_groups:
members = address_groups[eq].get("members", [])
new_members = [m for m in members if m not in processed]
to_process.extend(new_members)
return list(resolved)
# === Map addresses to sites ===
def get_sites_for_addresses(address_list):
sites = set()
for addr in address_list:
ipmask = addresses.get(addr, {}).get("ip_netmask", "")
if not ipmask:
continue
ipmask = ipmask if "/" in ipmask else ipmask + "/32"
# cas particulier d'une plage d'IP format "CIDR1-CIDR2"
if "-" in ipmask:
start_ip, end_ip = ipmask.split("-")
end_ip = end_ip.split("/")[0]
for ip in range(int(ipaddress.ip_address(start_ip)), int(ipaddress.ip_address(end_ip)) + 1):
ip_obj = ipaddress.ip_network(f"{ipaddress.ip_address(ip)}/32", strict=False)
for cidr, site in site_mapping.items():
site_cidr = ipaddress.ip_network(cidr, strict=False)
if ip_obj.network_address in site_cidr or ip_obj.subnet_of(site_cidr):
sites.add(site)
break
continue
ip_obj = ipaddress.ip_network(ipmask, strict=False)
for cidr, site in site_mapping.items():
site_cidr = ipaddress.ip_network(cidr, strict=False)
if ip_obj.network_address in site_cidr or ip_obj.subnet_of(site_cidr):
sites.add(site)
break
return ", ".join(sorted(sites))
rows=[]
for acl in acls:
acl_name = acl.get("name","")
acl_desc = acl.get("config",{}).get("description","")
enabled = acl.get("state",{}).get("description","")
for entry in acl.get("acl-entries",{}).get("acl-entry",[]):
cfg = entry.get("config",{})
ipv4 = entry.get("ipv4",{}).get("config",{})
transport = entry.get("transport",{}).get("config",{})
actions = entry.get("actions",{}).get("config",{})
state = entry.get("state",{})
src_name = ipv4.get("source-address","any")
src_e = resolve_equipment(src_name)
src_c = resolve_ip_masks(src_e)
dst_name = ipv4.get("destination-address","any")
dst_e = resolve_equipment(dst_name)
dst_c = resolve_ip_masks(dst_e)
src_sites = get_sites_for_addresses(resolve_all_addresses(src_e))
dst_sites = get_sites_for_addresses(resolve_all_addresses(dst_e))
rows.append({
"Nom du flux": acl_name,
"Description": acl_desc,
"Src Equipement": ", ".join(src_e),
"Src CIDR": src_c,
"Src VLAN":"",
"Src Site": src_sites,
"Dst Equipement": ", ".join(dst_e),
"Dst CIDR": dst_c,
"Dst VLAN":"",
"Dst Site": dst_sites,
"Application": cfg.get("description",""),
"Port": transport.get("destination-port",""),
"Miscellaneous": get_tags_without_tier(state.get("description","")),
"Action": actions.get("forwarding-action","").replace("openconfig-acl:",""),
"Tier": get_tier_from_tags([state.get("description","")]),
"Active": "Oui" if enabled is True else "Non"
})
df = pd.DataFrame(rows)
df_addr = pd.DataFrame([
{
"Group Name": g,
"Member": m,
"IP": addresses[m]["ip_netmask"] if m in addresses and "/" in addresses[m].get("ip_netmask","") else
(addresses[m]["ip_netmask"]+"/32" if m in addresses else "")
}
for g, cfg in address_groups.items()
for m in cfg.get("members", [])
])
df_srv = pd.DataFrame([
{
"Group Name": g,
"Member": m,
"Protocol": services[m]["protocol"] if m in services and "protocol" in services[m] else "",
"Port": services[m]["port"] if m in services and "port" in services[m] else ""
}
for g, cfg in service_groups.items()
for m in cfg.get("members", [])
])
# === Export Excel ===
firewall_type = data.get("firewall-device", {}).get("type", "firewall").replace(" ","_").lower()
date = time.strftime("%Y%m%d")
with pd.ExcelWriter(output_file_excel, engine="openpyxl") as writer:
df_addr.to_excel(writer,sheet_name="Address-Groups",index=False)
df_srv.to_excel(writer,sheet_name="Service-Groups",index=False)
df.to_excel(writer,sheet_name="Matrice Flux",index=False,startrow=1)
wb = load_workbook(output_file_excel)
if "Sheet1" in wb.sheetnames:
del wb["Sheet1"]
style_address_groups(wb["Address-Groups"], address_groups)
style_service_groups(wb["Service-Groups"], service_groups)
style_matrice_flux(wb["Matrice Flux"])
wb.save(output_file_excel)
print(f"✓ Export Excel OK: {output_file_excel}")
return output_file_excel

View File

@@ -0,0 +1,296 @@
#!/usr/bin/env python3
"""
Modèle de base pour les parsers de configurations de firewalls vers un format json normalisé OpenConfig YANG
Fournit des méthodes communes pour la conversion et l'exportation des données
"""
from typing import Dict, Any
class ParserMixin:
def to_openconfig_yang(self, type) -> Dict[str, Any]:
"""Convertit la configuration au format OpenConfig YANG avec les nouveaux modèles"""
openconfig = {
"firewall-device": {
"type": type
},
"openconfig-interfaces:interfaces": {
"interface": []
},
"openconfig-network-instance:network-instances": {
"network-instance": []
},
"openconfig-acl:acl": {
"acl-sets": {
"acl-set": []
}
},
"custom-firewall-objects:firewall-objects": {
"address": [],
"address-group": [],
"service": [],
"service-group": []
}
}
# Conversion des interfaces
for interface in self.config['interfaces']:
intf_config = {
"name": interface.name,
"config": {
"name": interface.name,
"type": "iana-if-type:ethernetCsmacd" if interface.interface_type == "ethernet" else "iana-if-type:ieee8023adLag",
"enabled": interface.enabled,
"description": interface.comment or ""
}
}
if interface.ip:
intf_config["subinterfaces"] = {
"subinterface": [
{
"index": interface.misc or 0,
"config": {
"index": interface.misc or 0,
"enabled": interface.enabled
},
"oc-ip:ipv4": {
"oc-ip:addresses": {
"oc-ip:address": [
{
"oc-ip:ip": interface.ip,
"oc-ip:config": {
"oc-ip:ip": interface.ip,
"oc-ip:prefix-length": interface.netmask if interface.netmask else 24
}
}
]
}
}
}
]
}
openconfig["openconfig-interfaces:interfaces"]["interface"].append(intf_config)
# Conversion des virtual-routers et routes statiques
for vr in self.config.get('virtual_routers', []):
vr_entry = {
"name": vr.name,
"config": {
"name": vr.name,
"type": "L3VRF"
},
"interfaces": {
"interface": [{"id": iface} for iface in vr.interfaces]
},
"protocols": {
"protocol": [
{
"identifier": "STATIC",
"name": "STATIC",
"static-routes": {
"static": []
}
}
]
}
}
static_list = vr_entry["protocols"]["protocol"][0]["static-routes"]["static"]
for route in vr.static_routes:
prefix = route.destination
next_hops = []
nh_config = {
"index": route.name,
"config": {
"index": route.name,
"next-hop": route.next_vr if route.next_vr else route.next_hop_ip,
"metric": route.metric if getattr(route, "metric", None) is not None else 0
}
}
if route.next_vr:
nh_config["config"]["oc-loc-rt-netinst:next-network-instance"] = route.next_vr
nh_config["config"]["oc-loc-rt-netinst:nh-network-instance"] = None
elif route.next_hop_ip:
nh_config["config"]["oc-loc-rt-netinst:next-network-instance"] = None
nh_config["config"]["oc-loc-rt-netinst:nh-network-instance"] = vr.name
if route.interface:
nh_config["interface-ref"] = {
"config": {
"interface": route.interface,
}
}
next_hops.append(nh_config)
static_entry = {
"prefix": prefix,
"next-hops": {
"next-hop": next_hops
}
}
static_list.append(static_entry)
openconfig["openconfig-network-instance:network-instances"]["network-instance"].append(vr_entry)
# Conversion des règles de sécurité en ACL
for rule in self.config['security_rules']:
acl_entry = {
"name": rule.name,
"type": "openconfig-acl:ACL_IPV4",
"config": {
"name": rule.name,
"type": "openconfig-acl:ACL_IPV4",
"description": getattr(rule, 'description', "")
},
"state": {
"description": getattr(rule, 'enabled', "")
},
"acl-entries": {
"acl-entry": []
}
}
if getattr(rule, "action", "").lower() == "nat":
action_base = "openconfig-acl:NAT"
else:
action_base = (
"openconfig-acl:ACCEPT" if getattr(rule, "action", "").lower() == "allow"
else "openconfig-acl:DROP"
)
if hasattr(rule, "enabled") and not rule.enabled:
action_value = f"{action_base}_DISABLED"
else:
action_value = action_base
miscs_text = (
f"Misc: {', '.join(rule.miscs)}"
if hasattr(rule, "miscs") and rule.miscs
else ""
)
ports = []
for s in getattr(rule, "services", []):
if s.lower().startswith("port"):
ports.append(s[4:])
else:
ports.append(s)
desc_parts = []
if getattr(rule, "action", "").lower() == "nat":
if getattr(rule, "to_interface", ""):
desc_parts.append(rule.to_interface)
if getattr(rule, "translated_address", ""):
desc_parts.append(rule.translated_address)
if getattr(rule, "translated_port", ""):
desc_parts.append(rule.translated_port)
entry = {
"sequence-id": getattr(rule, "sequence_id", 1),
"config": {
"sequence-id": getattr(rule, "sequence_id", 1),
"description": desc_parts if desc_parts else getattr(rule, "applications", "")
},
"state": {
"sequence-id": getattr(rule, "sequence_id", 1),
"description": miscs_text
},
"ipv4": {
"config": {
"source-address": ",".join(rule.source_addresses) if hasattr(rule, "source_addresses") else "any",
"destination-address": ",".join(rule.destination_addresses) if hasattr(rule, "destination_addresses") else "any",
# "source-address-prefix-set": ",".join(rule.from_zones) if hasattr(rule, "from_zones") else "",
# "destination-address-prefix-set": ",".join(rule.to_zones) if hasattr(rule, "to_zones") else ""
}
},
"transport": {
"config": {
"source-port": "any",
"destination-port": ",".join(ports) if ports else "any",
}
},
"actions": {
"config": {
"forwarding-action": action_value,
"log-action": getattr(rule, "log_setting", "default")
}
}
}
acl_entry["acl-entries"]["acl-entry"].append(entry)
openconfig["openconfig-acl:acl"]["acl-sets"]["acl-set"].append(acl_entry)
# Conversion des objets firewall personnalisés
# Address objects
for addr_obj in self.config['address_objects']:
address_config = {
"name": addr_obj.name,
"config": {
"name": addr_obj.name,
"ip_netmask": addr_obj.ip_netmask,
"description": addr_obj.description,
"misc": addr_obj.misc
}
}
openconfig["custom-firewall-objects:firewall-objects"]["address"].append(address_config)
# Address groups
for addr_group in self.config['address_groups']:
address_group_config = {
"name": addr_group.name,
"config": {
"name": addr_group.name,
"members": addr_group.members,
"description": addr_group.description,
"misc": addr_group.misc
}
}
openconfig["custom-firewall-objects:firewall-objects"]["address-group"].append(address_group_config)
# Service objects
for svc_obj in self.config['service_objects']:
service_config = {
"name": svc_obj.name,
"config": {
"name": svc_obj.name,
"protocol": svc_obj.protocol,
"port": svc_obj.port,
"source_port": svc_obj.source_port,
"description": svc_obj.description,
"misc": svc_obj.misc
}
}
openconfig["custom-firewall-objects:firewall-objects"]["service"].append(service_config)
# Service groups
for svc_group in self.config['service_groups']:
service_group_config = {
"name": svc_group.name,
"config": {
"name": svc_group.name,
"members": svc_group.members,
"description": svc_group.description,
"misc": svc_group.misc
}
}
openconfig["custom-firewall-objects:firewall-objects"]["service-group"].append(service_group_config)
return openconfig
def print_summary(self):
"""Affiche un résumé de la configuration parsée"""
print("\n" + "="*50)
print("RÉSUMÉ DE LA CONFIGURATION")
print("="*50)
print(f"Objets adresse: {len(self.config['address_objects'])}")
print(f"Groupes d'adresses: {len(self.config['address_groups'])}")
print(f"Objets service: {len(self.config['service_objects'])}")
print(f"Groupes de services: {len(self.config['service_groups'])}")
print(f"Interfaces: {len(self.config['interfaces'])}")
print(f"Règles de sécurité: {len(self.config['security_rules'])}")

View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""
Parser pour configuration Forcepoint XML vers un format json normalisé OpenConfig YANG
Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles
"""
import xml.etree.ElementTree as ET
import json
from scripts.export_modele import ParserMixin
from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter
from pathlib import Path
class ForcepointParser(ParserMixin):
"""Parser pour fichier XML Forcepoint"""
def __init__(self, xml_file_path: str):
"""Initialise le parser avec le chemin du fichier XML"""
self.xml_file = xml_file_path
self.tree = None
self.root = None
self.config = {
'address_objects': [],
'address_groups': [],
'service_objects': [],
'service_groups': [],
'interfaces': [],
'virtual_routers': [],
'static_routes': [],
'security_rules': []
}
def load_xml(self):
"""Charge le fichier XML"""
try:
self.tree = ET.parse(self.xml_file)
self.root = self.tree.getroot()
print(f"✓ Fichier XML chargé: {self.xml_file}")
except ET.ParseError as e:
print(f"✗ Erreur de parsing XML: {e}")
raise
except FileNotFoundError:
print(f"✗ Fichier non trouvé: {self.xml_file}")
raise
def _parse_address_objects(self):
"""Parse hosts, networks, address ranges et ip_lists"""
for obj in self.root.findall(".//host"):
name = obj.get("name")
ip = obj.find("mvia_address")
ip_addr = ip.get("address") if ip is not None else None
desc = obj.get("comment") or None
self.config['address_objects'].append(AddressObject(name=name, ip_netmask=ip_addr, description=desc))
for obj in self.root.findall(".//network"):
name = obj.get("name")
ip_netmask = obj.get("ipv4_network")
comment = obj.get("comment")
self.config['address_objects'].append(AddressObject(name=name, ip_netmask=ip_netmask, description=comment))
for obj in self.root.findall(".//address_range"):
name = obj.get("name")
ip_range = obj.get("ip_range")
comment = obj.get("comment")
self.config['address_objects'].append(AddressObject(name=name, ip_netmask=ip_range, description=comment))
for obj in self.root.findall(".//ip_list"):
name = obj.get("name")
ips = [ip.get("value") for ip in obj.findall(".//ip")]
for ip in ips:
self.config['address_objects'].append(AddressObject(name=f"{name}_{ip}", ip_netmask=ip))
def _parse_address_groups(self):
"""Parse les groupes d'adresses"""
for group in self.root.findall(".//group"):
name = group.get("name")
desc = group.get("comment") or None
members = [m.get("ref") for m in group.findall(".//ne_list")]
self.config['address_groups'].append(AddressGroup(name=name, members=members, description=desc))
def _parse_service_objects(self):
"""Parse services TCP/UDP"""
for svc in self.root.findall(".//service_tcp"):
name = svc.get("name")
port = svc.get("min_dst_port")
proto = "tcp"
self.config['service_objects'].append(ServiceObject(name=name, protocol=proto, port=port))
for svc in self.root.findall(".//service_udp"):
name = svc.get("name")
port = svc.get("min_dst_port")
proto = "udp"
self.config['service_objects'].append(ServiceObject(name=name, protocol=proto, port=port))
def _parse_interfaces(self):
"""Parse interfaces de cluster"""
for iface in self.root.findall(".//cluster_virtual_interface"):
name = iface.get("name")
iptmp = iface.find("mvia_address")
ip_addr = iptmp.get("address") if iptmp is not None else None
net = iface.get("network_value")
netmask = net.split("/")[-1] if net else None
comment = iface.get("comment")
self.config['interfaces'].append(Interface(name=name, ip=ip_addr, netmask=netmask, comment=comment))
def _parse_virtual_routers(self):
"""Parse routeurs simples"""
for router in self.root.findall(".//router"):
name = router.get("name")
ip = router.find("mvia_address")
ip_addr = ip.get("address") if ip is not None else None
self.config['virtual_routers'].append(VirtualRouter(name=name, interfaces=[ip_addr] if ip_addr else [], static_routes=[]))
def _parse_security_rules(self):
"""Parse règles FW Forcepoint"""
for rule_entry in self.root.findall(".//fw_policy//rule_entry"):
rule_type = rule_entry.find("./access_rule") or rule_entry.find("./nat_rule")
if rule_type is None:
continue
name = rule_entry.get("tag")
action = rule_type.find("./action")
action_type = action.get("type") if action is not None else "undefined"
sources = [s.get("value") for s in rule_type.findall(".//match_sources//match_source_ref")]
destinations = [d.get("value") for d in rule_type.findall(".//match_destinations//match_destination_ref")]
services = [s.get("value") for s in rule_type.findall(".//match_services//match_service_ref")]
enabled_attr = rule_entry.get("is_disabled")
enabled = False if enabled_attr == "true" else True
desc = rule_entry.get("comment") or None
self.config['security_rules'].append(
SecurityRule(
name=f"Rule_{name}",
source_addresses=sources,
destination_addresses=destinations,
services=services,
action=action_type,
from_zones=[],
to_zones=[],
applications=[],
description=desc,
enabled=enabled
)
)
def parse_all(self):
"""Parse tous les éléments de configuration"""
print("Début du parsing de la configuration Forcepoint...")
self.load_xml()
self._parse_address_objects()
self._parse_address_groups()
self._parse_service_objects()
self._parse_interfaces()
self._parse_virtual_routers()
self._parse_security_rules()
print("✓ Parsing terminé avec succès!")
def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("forcepoint")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False)
print(f"✓ Configuration exportée vers: {output_file}")
def generate_json_forcepoint(input_file: str, output_file: str):
"""Génère le fichier JSON OpenConfig à partir du fichier XML Forcepoint"""
xml_file = Path(input_file)
if not xml_file.exists():
print(f"✗ Erreur: Le fichier '{xml_file}' n'existe pas")
return
try:
parser = ForcepointParser(str(xml_file))
parser.parse_all()
parser.print_summary()
output_file = output_file
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
except Exception as e:
print(f"✗ Erreur: {e}")
return
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python json_Forcepoint.py <fichier_config.xml>")
print("Exemple: python json_Forcepoint.py config_Forcepoint.xml")
sys.exit(1)
xml_file = sys.argv[1]
if not Path(xml_file).exists():
print(f"✗ Erreur: Le fichier '{xml_file}' n'existe pas")
sys.exit(1)
try:
parser = ForcepointParser(xml_file)
parser.parse_all()
parser.print_summary()
input_path = Path(xml_file)
output_file = 'output_Forcepoint.json'
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
except Exception as e:
print(f"✗ Erreur: {e}")
sys.exit(1)

View File

@@ -0,0 +1,508 @@
#!/usr/bin/env python3
"""
Parser pour configuration Palo Alto XML vers un format json normalisé OpenConfig YANG
Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles
"""
import xml.etree.ElementTree as ET
import json
from scripts.export_modele import ParserMixin
from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter
from pathlib import Path
class PaloAltoParser(ParserMixin):
"""Parser pour fichier XML Palo Alto"""
def __init__(self, xml_file_path: str):
"""Initialise le parser avec le chemin du fichier XML"""
self.xml_file = xml_file_path
self.tree = None
self.root = None
self.config = {
'address_objects': [],
'address_groups': [],
'service_objects': [],
'service_groups': [],
'interfaces': [],
'virtual_routers': [],
'static_routes': [],
'security_rules': []
}
def load_xml(self):
"""Charge le fichier XML"""
try:
self.tree = ET.parse(self.xml_file)
self.root = self.tree.getroot()
print(f"✓ Fichier XML chargé: {self.xml_file}")
except ET.ParseError as e:
print(f"✗ Erreur de parsing XML: {e}")
raise
except FileNotFoundError:
print(f"✗ Fichier non trouvé: {self.xml_file}")
raise
def _parse_address_objects(self):
"""Parse les objets adresse"""
address_objects = []
paths = [
".//shared/address/entry",
".//devices/entry/vsys/entry/address/entry"
]
for path in paths:
for addr in self.root.findall(path):
name = addr.get('name')
obj = AddressObject(name=name)
# IP/Netmask
ip_netmask = addr.find('ip-netmask')
if ip_netmask is not None:
obj.ip_netmask = ip_netmask.text
# Description
desc = addr.find('description')
if desc is not None:
obj.description = desc.text
# Misc
misc_elem = addr.find('tag')
if misc_elem is not None:
for member in misc_elem.findall('member'):
obj.misc.append(member.text)
address_objects.append(obj)
self.config['address_objects'] = address_objects
print(f"✓ Trouvé {len(address_objects)} objets adresse")
def _parse_address_groups(self):
"""Parse les groupes d'adresses"""
address_groups = []
paths = [
".//shared/address-group/entry",
".//devices/entry/vsys/entry/address-group/entry"
]
for path in paths:
for group in self.root.findall(path):
name = group.get('name')
members = []
# Membres statiques
static = group.find('static')
if static is not None:
for member in static.findall('member'):
members.append(member.text)
obj = AddressGroup(name=name, members=members)
# Description
desc = group.find('description')
if desc is not None:
obj.description = desc.text
# Misc
misc_elem = group.find('tag')
if misc_elem is not None:
for member in misc_elem.findall('member'):
obj.misc.append(member.text)
address_groups.append(obj)
self.config['address_groups'] = address_groups
print(f"✓ Trouvé {len(address_groups)} groupes d'adresses")
def _parse_service_objects(self):
"""Parse les objets service"""
service_objects = []
paths = [
".//shared/service/entry",
".//devices/entry/vsys/entry/service/entry"
]
for path in paths:
for service in self.root.findall(path):
name = service.get('name')
obj = ServiceObject(name=name)
# Protocol TCP
tcp = service.find('protocol/tcp')
if tcp is not None:
obj.protocol = 'tcp'
port = tcp.find('port')
if port is not None:
obj.port = port.text
source_port = tcp.find('source-port')
if source_port is not None:
obj.source_port = source_port.text
# Protocol UDP
udp = service.find('protocol/udp')
if udp is not None:
obj.protocol = 'udp'
port = udp.find('port')
if port is not None:
obj.port = port.text
source_port = udp.find('source-port')
if source_port is not None:
obj.source_port = source_port.text
# Description
desc = service.find('description')
if desc is not None:
obj.description = desc.text
# Misc
misc_elem = service.find('tag')
if misc_elem is not None:
for member in misc_elem.findall('member'):
obj.misc.append(member.text)
service_objects.append(obj)
self.config['service_objects'] = service_objects
print(f"✓ Trouvé {len(service_objects)} objets service")
def _parse_service_groups(self):
"""Parse les groupes de services"""
service_groups = []
paths = [
".//shared/service-group/entry",
".//devices/entry/vsys/entry/service-group/entry"
]
for path in paths:
for group in self.root.findall(path):
name = group.get('name')
members = []
# Membres
for member in group.findall('members/member'):
members.append(member.text)
obj = ServiceGroup(name=name, members=members)
# Description
desc = group.find('description')
if desc is not None:
obj.description = desc.text
# Misc
misc_elem = group.find('tag')
if misc_elem is not None:
for member in misc_elem.findall('member'):
obj.misc.append(member.text)
service_groups.append(obj)
self.config['service_groups'] = service_groups
print(f"✓ Trouvé {len(service_groups)} groupes de services")
def _parse_interfaces(self):
"""Parse les interfaces réseau"""
interfaces = []
# Interfaces Ethernet
for interface in self.root.findall(".//devices/entry/network/interface/ethernet/entry"):
name = interface.get('name')
obj = Interface(name=name, interface_type='ethernet')
# Adresse IP
layer3 = interface.find('layer3')
if layer3 is not None:
ip_elem = layer3.find('ip/entry')
if ip_elem is not None:
ip_name = ip_elem.get('name')
if '/' in ip_name:
obj.ip, netmask = ip_name.split('/')
obj.netmask = netmask
else:
obj.ip = ip_name
# VLAN misc
misc_elem = interface.find('layer3/tag')
if misc_elem is not None:
obj.misc = int(misc_elem.text)
# Commentaire
comment = interface.find('comment')
if comment is not None:
obj.comment = comment.text
interfaces.append(obj)
# Interfaces Aggregate Ethernet
for interface in self.root.findall(".//devices/entry/network/interface/aggregate-ethernet/entry"):
name = interface.get('name')
obj = Interface(name=name, interface_type='aggregate-ethernet')
# Adresse IP
layer3 = interface.find('layer3')
if layer3 is not None:
ip_elem = layer3.find('ip/entry')
if ip_elem is not None:
ip_name = ip_elem.get('name')
if '/' in ip_name:
obj.ip, netmask = ip_name.split('/')
obj.netmask = netmask
else:
obj.ip = ip_name
# VLAN misc
misc_elem = interface.find('layer3/tag')
if misc_elem is not None:
obj.misc = int(misc_elem.text)
# Commentaire
comment = interface.find('comment')
if comment is not None:
obj.comment = comment.text
interfaces.append(obj)
self.config['interfaces'] = interfaces
print(f"✓ Trouvé {len(interfaces)} interfaces")
def _parse_virtual_routers(self):
"""Parse les virtual-routers et leurs routes statiques"""
virtual_routers = []
for vr_entry in self.root.findall(".//virtual-router/entry"):
vr_name = vr_entry.get("name")
interfaces = [iface.text for iface in vr_entry.findall(".//interface/member")]
static_routes = []
for route_entry in vr_entry.findall(".//routing-table/ip/static-route/entry"):
name = route_entry.get("name")
destination = route_entry.findtext("destination")
metric = int(route_entry.findtext("metric", "0"))
next_vr = route_entry.findtext("nexthop/next-vr")
next_hop_ip = route_entry.findtext("nexthop/ip-address")
interface = route_entry.findtext("interface")
bfd_profile = route_entry.findtext("bfd/profile")
static_route = StaticRoute(
name=name,
destination=destination,
metric=metric,
next_vr=next_vr,
next_hop_ip=next_hop_ip,
interface=interface,
bfd_profile=bfd_profile
)
static_routes.append(static_route)
virtual_routers.append(
VirtualRouter(
name=vr_name,
interfaces=interfaces,
static_routes=static_routes
)
)
self.config['virtual_routers'] = virtual_routers
print(f"✓ Trouvé {len(virtual_routers)} virtual-routers")
def _parse_security_rules(self):
"""Parse les règles de sécurité"""
security_rules = []
rule_types = [
("security", ".//devices/entry/vsys/entry/rulebase/security/rules/entry"),
("nat", ".//devices/entry/vsys/entry/rulebase/nat/rules/entry"),
("default", ".//devices/entry/vsys/entry/rulebase/default-security-rules/rules/entry")
]
for rule_type, xpath in rule_types:
for rule in self.root.findall(xpath):
name = rule.get('name')
# Zones source
from_zones = []
from_elem = rule.find('from')
if from_elem is not None:
for member in from_elem.findall('member'):
from_zones.append(member.text)
# Zones destination
to_zones = []
to_elem = rule.find('to')
if to_elem is not None:
for member in to_elem.findall('member'):
to_zones.append(member.text)
# Adresses source
source_addresses = []
source_elem = rule.find('source')
if source_elem is not None:
for member in source_elem.findall('member'):
source_addresses.append(member.text)
# Adresses destination
destination_addresses = []
destination_elem = rule.find('destination')
if destination_elem is not None:
for member in destination_elem.findall('member'):
destination_addresses.append(member.text)
# Applications
applications = []
app_elem = rule.find('application')
if app_elem is not None:
for member in app_elem.findall('member'):
applications.append(member.text)
# Services
services = []
service_elem = rule.find('service')
if rule_type != "nat":
if service_elem is not None:
for member in service_elem.findall('member'):
services.append(member.text)
else:
services.append(service_elem.text)
# Statut (enabled/disabled)
enabled = True
rule.find('disabled')
if rule.find('disabled') is not None and rule.find('disabled').text == 'yes':
enabled = False
# Misc
miscs = [m.text for m in rule.findall('tag/member')]
# Log setting
log_elem = rule.find('log-setting')
log_setting = log_elem.text if log_elem is not None else "default"
# Action
if rule_type == "nat":
action = 'nat'
else:
action = 'deny'
action_elem = rule.find('action')
if action_elem is not None:
action = action_elem.text
obj = SecurityRule(
name=name,
from_zones=from_zones,
to_zones=to_zones,
source_addresses=source_addresses,
destination_addresses=destination_addresses,
applications=applications,
services=services,
miscs=miscs,
action=action,
log_setting=log_setting,
enabled=enabled
)
# Description
desc = rule.find('description')
if desc is not None:
obj.description = desc.text
# Statut (enabled/disabled)
disabled = rule.find('disabled')
if disabled is not None and disabled.text == 'yes':
obj.enabled = False
if rule_type == "nat":
dt = rule.find('destination-translation')
translated_address = dt.findtext('translated-address', "") if dt is not None else ""
translated_port = dt.findtext('translated-port', "any") if dt is not None else "any"
to_interface = rule.find('to-interface')
else:
translated_address = None
translated_port = None
if rule_type == "nat":
obj.translated_address = translated_address
obj.translated_port = translated_port
obj.to_interface = to_interface.text if to_interface is not None else ""
security_rules.append(obj)
self.config['security_rules'] = security_rules
print(f"✓ Trouvé {len(security_rules)} règles de sécurité")
def parse_all(self):
"""Parse tous les éléments de configuration"""
print("Début du parsing de la configuration Palo Alto...")
self.load_xml()
self._parse_address_objects()
self._parse_address_groups()
self._parse_service_objects()
self._parse_service_groups()
self._parse_interfaces()
self._parse_virtual_routers()
self._parse_security_rules()
print("✓ Parsing terminé avec succès!")
def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("palo-alto")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False)
print(f"✓ Configuration exportée vers: {output_file}")
def generate_json_paloalto(input_file: str, output_file: str):
"""Génère le fichier JSON OpenConfig à partir du fichier XML Palo Alto"""
xml_file = Path(input_file)
if not xml_file.exists():
print(f"✗ Erreur: Le fichier '{xml_file}' n'existe pas")
return
try:
parser = PaloAltoParser(str(xml_file))
parser.parse_all()
parser.print_summary()
output_file = output_file
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
except Exception as e:
print(f"✗ Erreur: {e}")
return
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python json_PaloAlto.py <fichier_config.xml>")
print("Exemple: python json_PaloAlto.py config_palo_alto.xml")
sys.exit(1)
xml_file = sys.argv[1]
if not Path(xml_file).exists():
print(f"✗ Erreur: Le fichier '{xml_file}' n'existe pas")
sys.exit(1)
try:
parser = PaloAltoParser(xml_file)
parser.parse_all()
parser.print_summary()
input_path = Path(xml_file)
output_file = 'output_PaloAlto.json'
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
except Exception as e:
print(f"✗ Erreur: {e}")
sys.exit(1)

View File

@@ -0,0 +1,405 @@
#!/usr/bin/env python3
"""
Parser pour configuration Stormshield vers un format json normalisé OpenConfig YANG
Extrait les groupes d'objets, objets, groupes de services, services, routes, interfaces et règles
"""
from dataclasses import dataclass
import os
import re
import json
from scripts.export_modele import ParserMixin
from scripts.objets.data import AddressObject, AddressGroup, ServiceObject, ServiceGroup, Interface, SecurityRule, StaticRoute, VirtualRouter
from pathlib import Path
class StormshieldParser(ParserMixin):
"""Parser pour dossier Stormshield"""
def __init__(self, base_dir: str):
"""Initialise le parser avec le chemin du dossier Stormshield"""
self.base_dir = base_dir
self.config = {
"address_objects": [],
"address_groups": [],
"service_objects": [],
"service_groups": [],
"interfaces": [],
"virtual_routers": [],
'static_routes': [],
"security_rules": [],
"default_route": None # Ajout Stormshield
}
def _parse_address_objects(self):
"""Parse le fichier object contenant [Host], [Network], [Service], etc."""
current_section = None
path = os.path.join(self.base_dir, "object")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
current_section = line.strip("[]")
continue
if current_section in ("Host", "Network", "Fqdn"):
match = re.match(r"([^=]+)=([^,#]+)", line)
if match:
name = match.group(1).strip()
ip = match.group(2).strip()
self.config["address_objects"].append(
AddressObject(name=name, ip_netmask=ip)
)
elif current_section == "Service":
match = re.match(r"([^=]+)=([\d\-]+)/(\w+)", line)
if match:
name = match.group(1).strip()
port = match.group(2)
proto = match.group(3).lower()
self.config["service_objects"].append(
ServiceObject(name=name, protocol=proto, port=port)
)
def _parse_address_groups(self):
"""Parse le fichier objectgroup pour groupes dadresses"""
current_group = None
members = []
path = os.path.join(self.base_dir, "objectgroup")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and "]" in line:
if current_group:
self.config["address_groups"].append(
AddressGroup(name=current_group, members=members)
)
current_group = line.split("]")[0][1:].strip()
members = []
continue
if current_group:
member = line.split("#")[0].strip()
if member:
members.append(member)
if current_group:
self.config["address_groups"].append(
AddressGroup(name=current_group, members=members)
)
def _parse_service_groups(self):
"""Parse le fichier servicegroup pour groupes de services"""
current_group = None
members = []
path = os.path.join(self.base_dir, "servicegroup")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and "]" in line:
if current_group:
self.config["service_groups"].append(
ServiceGroup(name=current_group, members=members)
)
current_group = line.split("]")[0][1:].strip()
members = []
continue
if current_group:
member = line.split("#")[0].strip()
if member:
members.append(member)
if current_group:
self.config["service_groups"].append(
ServiceGroup(name=current_group, members=members)
)
def _parse_interfaces(self):
"""Analyse du fichier network et extraction des interfaces"""
current_section = None
current_data = {}
path = os.path.join(self.base_dir, "network")
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
if current_section and current_section.lower() != "config":
self._add_interface(current_section, current_data)
current_section = line.strip("[]")
current_data = {}
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.split("#")[0].strip()
current_data[key] = value
if current_section and current_section.lower() != "config":
self._add_interface(current_section, current_data)
def _add_interface(self, section_name, data):
"""Crée un objet Interface à partir dune section complète"""
if section_name.lower().startswith("vlan"):
iface_type = "vlan"
elif section_name.lower().startswith("bridge"):
iface_type = "bridge"
elif section_name.lower().startswith("wifi"):
iface_type = "wifi"
elif section_name.lower().startswith("loopback"):
iface_type = "loopback"
elif section_name.lower().startswith("agg"):
iface_type = "aggregate"
else:
iface_type = "ethernet"
enabled = data.get("State", "0") == "1"
name = data.get("Name", section_name)
ip = data.get("Address")
netmask = data.get("Mask")
misc = data.get("Misc")
comment = data.get("Comment", "")
if not name or name.lower() == "dynamic":
return
interface = Interface(
name=name,
ip=ip if ip and ip.upper() != "DHCP" else None,
netmask=netmask,
misc=int(misc) if misc and misc.isdigit() else None,
comment=comment,
interface_type=iface_type,
enabled=enabled,
)
self.config["interfaces"].append(interface)
def _parse_virtual_routers(self):
route_path = os.path.join(self.base_dir, "route")
if not os.path.exists(route_path):
return
section = None
with open(route_path, encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
section = line.strip("[]")
continue
if section == "Config" and line.startswith("DefaultRoute="):
self.config["default_route"] = line.split("=", 1)[1].strip()
elif section == "StaticRoutes" and not line.startswith("#"):
parts = line.split(",")
if len(parts) >= 2:
self.config["static_routes"].append({
"destination": parts[0],
"interface": parts[1],
"extra": parts[2:] if len(parts) > 2 else []
})
static_routes = []
if self.config["default_route"]:
static_routes.append(
StaticRoute(
name="default-route",
destination="0.0.0.0/0",
metric=1,
next_hop_ip=self.config["default_route"],
interface=None
)
)
for idx, route in enumerate(self.config["static_routes"]):
static_routes.append(
StaticRoute(
name=f"static-{idx+1}",
destination=route["destination"],
metric=1,
next_hop_ip=None,
interface=route["interface"]
)
)
vr = VirtualRouter(
name="default-vr",
interfaces=[r["interface"] for r in self.config["static_routes"]],
static_routes=static_routes
)
self.config["virtual_routers"] = [vr]
# def _parse_slotinfo_file(self):
# path = os.path.join(self.base_dir, "Filter", "slotinfo")
# if not os.path.exists(path):
# return
# current = None
# with open(path, encoding="utf-8", errors="ignore") as f:
# for raw_line in f:
# line = raw_line.strip()
# if not line or line.startswith("#"):
# continue
# if line.startswith("[") and line.endswith("]"):
# current = line.strip("[]")
# continue
# if current and line.startswith("Name="):
# name = line.split("=", 1)[1].strip('"')
# if name:
# self.filter_slots[current] = name
def _parse_security_rules(self):
filter_dir = os.path.join(self.base_dir, "Filter")
if not os.path.exists(filter_dir):
return
rule_regex = re.compile(
r'^(?P<disabled>off\s+)?'
r'(?P<action>pass|block|nat)\b'
r'(?:\s+noconnlog)?'
r'(?:\s+all)?'
r'(?:\s+inspection)?'
r'(?:\s+firewall)?'
r'(?:\s+route\s+[^\s]+)?'
r'(?:\s+ipproto\s+[^\s]+)?'
r'(?:\s+type\s+\d+)?'
r'(?:\s+code\s+\d+)?'
r'(?:\s+proto\s+[^\s]+)?'
r'\s+from\s+(?P<src>[^\s]+)'
r'(?:\s+on\s+(?P<zone>[^\s]+))?'
r'\s+to\s+(?P<dst>[^\s]+)'
r'(?:\s+port\s+(?P<ports>[^\s]+))?'
r'(?:\s+rulename\s+"(?P<rulename>[^"]+)")?',
re.IGNORECASE
)
slotinfo_path = os.path.join(filter_dir, "slotinfo")
active_slot = "Global"
if os.path.exists(slotinfo_path):
with open(slotinfo_path, encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if line.startswith("Active="):
active_slot = line.split("=", 1)[1].strip()
break
for filename in sorted(os.listdir(filter_dir)):
if not re.fullmatch(r"\d{2}", filename):
continue
if filename.lstrip("0") != active_slot:
enabled = False
else:
enabled = True
path = os.path.join(filter_dir, filename)
with open(path, encoding="utf-8", errors="ignore") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("separator") or line.startswith("["):
continue
match = rule_regex.match(line)
if not match:
continue
data = match.groupdict()
# ----- Détermination de l'action finale -----
action_raw = data["action"].lower()
if action_raw == "nat":
final_action = "nat"
elif action_raw == "pass":
final_action = "allow"
elif action_raw == "block":
final_action = "block"
else:
final_action = "unknown"
rule = SecurityRule(
name=data.get("rulename")
or f"rule_{filename}_{len(self.config['security_rules'])}",
from_zones=[data.get("zone")] if data.get("zone") else [],
to_zones=[],
source_addresses=[data.get("src") or "any"],
destination_addresses=[data.get("dst") or "any"],
services=data["ports"].split("|") if data.get("ports") else [],
applications=["any"],
action=final_action,
description=self._extract_comment(raw_line),
enabled=enabled
)
self.config["security_rules"].append(rule)
def _extract_comment(self, line: str) -> str:
"""Extrait un commentaire après #"""
if "#" in line:
return line.split("#", 1)[1].strip()
return ""
def parse_all(self):
print("🔍 Parsing de la configuration Stormshield...")
self._parse_virtual_routers()
self._parse_address_objects()
self._parse_address_groups()
self._parse_service_groups()
self._parse_interfaces()
self._parse_security_rules()
print("✓ Parsing terminé avec succès!")
def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("stormshield")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False)
print(f"✓ Configuration exportée vers: {output_file}")
def generate_json_stormshield(input_dir: str, output_file: str):
input_dir = input_dir + "/usr/Firewall/ConfigFiles/"
parser = StormshieldParser(input_dir)
parser.parse_all()
parser.print_summary()
output_file = output_file
parser.export_to_json(output_file)
print(f"\n✓ Conversion terminée! Vérifiez le fichier: {output_file}")
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
print("Usage: python json_Stormshield.py <dossier>")
sys.exit(1)
input_dir = sys.argv[1]
if not Path(input_dir).exists():
print(f"Erreur : dossier '{input_dir}' inexistant")
sys.exit(1)
output_path = Path("output_Stormshield.json")
generate_json_stormshield(input_dir, str(output_path))

View File

@@ -0,0 +1,95 @@
from dataclasses import dataclass
from typing import List
@dataclass
class AddressObject:
"""Objet adresse"""
name: str
ip_netmask: str = None
description: str = None
misc: List[str] = None
def __post_init__(self):
if self.misc is None:
self.misc = []
@dataclass
class AddressGroup:
"""Groupe d'adresses"""
name: str
members: List[str]
description: str = None
misc: List[str] = None
def __post_init__(self):
if self.misc is None:
self.misc = []
@dataclass
class ServiceObject:
"""Objet service"""
name: str
protocol: str = None
port: str = None
source_port: str = None
description: str = None
misc: List[str] = None
def __post_init__(self):
if self.misc is None:
self.misc = []
@dataclass
class ServiceGroup:
"""Groupe de services"""
name: str
members: List[str]
description: str = None
misc: List[str] = None
def __post_init__(self):
if self.misc is None:
self.misc = []
@dataclass
class Interface:
"""Interface réseau"""
name: str
ip: str = None
netmask: str = None
misc: int = None
comment: str = None
interface_type: str = None
enabled: bool = True
@dataclass
class SecurityRule:
"""Règle de sécurité"""
name: str
from_zones: List[str]
to_zones: List[str]
source_addresses: List[str]
destination_addresses: List[str]
applications: List[str]
services: List[str]
action: str
description: str = None
enabled: bool = True
log_setting: str = "default"
miscs: List[str] = None
@dataclass
class StaticRoute:
name: str
destination: str
metric: int
next_vr: str = None
next_hop_ip: str = None
interface: str = None
bfd_profile: str = None
@dataclass
class VirtualRouter:
name: str
interfaces: List[str]
static_routes: List[StaticRoute]

View File

@@ -0,0 +1,62 @@
from openpyxl.styles import PatternFill, Border, Side, Alignment, Font
from openpyxl.utils import get_column_letter
center = Alignment(horizontal="center", vertical="center")
left = Alignment(horizontal="left", vertical="center")
red_fill = PatternFill(start_color="FF9999", fill_type="solid")
orange_fill = PatternFill(start_color="FFCC99", fill_type="solid")
yellow_fill = PatternFill(start_color="FFFF99", fill_type="solid")
thick_border = Border(*(Side(style="thick"),)*4)
thin_border = Border(*(Side(style="thin"),)*4)
def style_address_groups(ws, address_groups):
col_group, col_member, col_ip = 1, 2, 3
start_row = 2
max_width = {1: 0, 2: 0, 3: 0}
for row in range(2, ws.max_row + 1):
g = ws.cell(row=row, column=col_group).value
m = ws.cell(row=row, column=col_member).value
ip = ws.cell(row=row, column=col_ip).value
ws.cell(row=row, column=col_group).fill = red_fill
ws.cell(row=row, column=col_group).font = Font(bold=True)
ws.cell(row=row, column=col_group).border = thin_border
member_fill = orange_fill if m in address_groups else yellow_fill
ws.cell(row=row, column=col_member).fill = member_fill
ws.cell(row=row, column=col_ip).fill = member_fill
ws.cell(row=row, column=col_member).alignment = left
ws.cell(row=row, column=col_ip).alignment = left
ws.cell(row=row, column=col_member).border = thin_border
ws.cell(row=row, column=col_ip).border = thin_border
max_width[1] = max(max_width[1], len(str(g)))
max_width[2] = max(max_width[2], len(str(m)))
max_width[3] = max(max_width[3], len(str(ip)))
next_val = ws.cell(row=row+1, column=col_group).value if row < ws.max_row else None
if g != next_val:
ws.merge_cells(start_row=start_row, start_column=col_group,
end_row=row, end_column=col_group)
for r in range(start_row, row + 1):
ws.cell(r, col_group).border = thick_border
ws.cell(r, col_member).border = thick_border
ws.cell(r, col_ip).border = thick_border
ws.cell(r, col_group).alignment = center
start_row = row + 1
for col in max_width:
ws.column_dimensions[get_column_letter(col)].width = max_width[col] + 2
for row in range(2, ws.max_row + 1):
ws.row_dimensions[row].height = 18

View File

@@ -0,0 +1,65 @@
from openpyxl.styles import Alignment, PatternFill, Font, Border, Side
from openpyxl.utils import get_column_letter
center = Alignment(horizontal="center", vertical="center")
left = Alignment(horizontal="left", vertical="center")
wrap_left = Alignment(horizontal="left", vertical="center", wrap_text=True)
header_fill = PatternFill(start_color="B4C6E7", fill_type="solid")
section_fill = PatternFill(start_color="D9E1F2", fill_type="solid")
thin_border = Border(*(Side(style="thin"),)*4)
def format_cell_multiline(value):
"""Convert comma-separated or list-style text into multiline text"""
if value is None:
return ""
val = str(value).strip()
if val.startswith("[") and val.endswith("]"):
val = val[1:-1]
val = val.replace("'", "").replace('"', "")
return val.replace(", ", "\n").replace(";", "\n").replace(",", "\n").replace(";", "\n")
def style_matrice_flux(ws):
headers = [
("Description",2),("Source",4),("Destination",4),
("Application",1),("Port",1),("Miscellaneous",1),("Action",1),("Tier",1),("Active",1)
]
col = 1
for label, span in headers:
ws.merge_cells(start_row=1, start_column=col, end_row=1, end_column=col + span - 1)
c = ws.cell(row=1, column=col, value=label)
c.alignment = center
c.fill = section_fill
c.font = Font(bold=True)
col += span
wrap_cols = ["Description", "Src Equipement", "Dst Equipement",
"Application", "Port", "Src CIDR", "Dst CIDR", "Src Site", "Dst Site", "Miscellaneous", "Action", "Tier", "Active"]
wrap_indexes = []
for col_idx, cell in enumerate(ws[2], start=1):
if cell.value in wrap_cols:
wrap_indexes.append(col_idx)
for row in ws.iter_rows(min_row=2, max_row=ws.max_row):
for cell in row:
if cell.row > 2:
if cell.col_idx in wrap_indexes:
cell.value = format_cell_multiline(cell.value)
cell.alignment = wrap_left
else:
cell.alignment = left
else:
cell.font = Font(bold=True)
cell.alignment = center
cell.fill = header_fill
cell.border = thin_border
col_widths = {
"A": 50, "B": 35, "C": 35, "D": 18, "E": 15, "F": 15,
"G": 35, "H": 18, "I": 15, "J": 15, "K": 12, "L": 20,
"M": 25, "N": 9, "O": 7, "P": 7
}
for col_letter, width in col_widths.items():
ws.column_dimensions[col_letter].width = width

View File

@@ -0,0 +1,59 @@
from openpyxl.styles import PatternFill, Border, Side, Alignment, Font
from openpyxl.utils import get_column_letter
center = Alignment(horizontal="center", vertical="center")
left = Alignment(horizontal="left", vertical="center")
red_fill = PatternFill(start_color="FF9999", fill_type="solid")
orange_fill = PatternFill(start_color="FFCC99", fill_type="solid")
yellow_fill = PatternFill(start_color="FFFF99", fill_type="solid")
thick_border = Border(*(Side(style="thick"),)*4)
thin_border = Border(*(Side(style="thin"),)*4)
def style_service_groups(ws, service_groups):
col_group, col_member, col_proto, col_port = 1, 2, 3, 4
start_row = 2
max_width = {1: 0, 2: 0, 3: 0, 4: 0}
for row in range(2, ws.max_row + 1):
g = ws.cell(row=row, column=col_group).value
m = ws.cell(row=row, column=col_member).value
proto = ws.cell(row=row, column=col_proto).value
port = ws.cell(row=row, column=col_port).value
ws.cell(row=row, column=col_group).fill = red_fill
ws.cell(row=row, column=col_group).font = Font(bold=True)
ws.cell(row=row, column=col_member).fill = (
orange_fill if m in service_groups else yellow_fill
)
ws.cell(row=row, column=col_proto).fill = yellow_fill
ws.cell(row=row, column=col_port).fill = yellow_fill
for col in range(1, 5):
ws.cell(row=row, column=col).border = thin_border
max_width[1] = max(max_width[1], len(str(g or "")))
max_width[2] = max(max_width[2], len(str(m or "")))
max_width[3] = max(max_width[3], len(str(proto or "")))
max_width[4] = max(max_width[4], len(str(port or "")))
next_val = ws.cell(row=row + 1, column=col_group).value if row < ws.max_row else None
if g != next_val:
if row > start_row:
ws.merge_cells(f"A{start_row}:A{row}")
for r in range(start_row, row + 1):
for c in range(1, 5):
ws.cell(r, c).border = thick_border
ws.cell(r, c).alignment = center if c in [1, 3, 4] else left
start_row = row + 1
for col, width in max_width.items():
ws.column_dimensions[get_column_letter(col)].width = width + 4
for row in range(2, ws.max_row + 1):
ws.row_dimensions[row].height = 18