Compare commits
3 Commits
8ec4f25db3
...
8fe4b94ab8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fe4b94ab8 | ||
|
|
247141be14 | ||
|
|
3e2820e54c |
18
README.md
18
README.md
@@ -5,6 +5,7 @@
|
||||
Cet outil permet de **parser les configurations de différents types de firewalls** (Palo Alto, Stormshield, Forcepoint) et de **convertir ces informations en un format JSON normalisé basé sur des modèles OpenConfig en YANG**.
|
||||
|
||||
Il fournit également la possibilité de générer une **matrice de flux au format Excel** pour visualiser les communications et règles de trafic dans l’infrastructure.
|
||||
Il founit également la possibilité de générer une **matrice de routage au format Excel** pour visualiser les routes statiques dans l’infrastructure.
|
||||
|
||||
## Fonctionnalités principales
|
||||
|
||||
@@ -41,6 +42,10 @@ Il fournit également la possibilité de générer une **matrice de flux au form
|
||||
- les règles de sécurité
|
||||
- les communications entre objets et groupes d’adresses/services.
|
||||
|
||||
6. **Génération de matrices de routage**
|
||||
- Script Python qui utilise le JSON normalisé pour générer automatiquement une matrice Excel détaillant :
|
||||
- les routes statiques
|
||||
|
||||
## Utilisation
|
||||
|
||||
### Pré-requis
|
||||
@@ -48,23 +53,22 @@ Il fournit également la possibilité de générer une **matrice de flux au form
|
||||
python -m venv .venv
|
||||
.\.venv\Scripts\activate
|
||||
|
||||
pip install -r .\src\requierements.txt
|
||||
pip install -r .\src\requirements.txt
|
||||
```
|
||||
- Mettre le/les fichier(s) de configurations dans le dossier `/src/input/`
|
||||
- Modifier le fichier `site.json` de données dans `/src/data/`
|
||||
|
||||
#### Commandes principales
|
||||
```bash
|
||||
python3 .\src\main.py stormshield .\src\input\backup\ -m
|
||||
python3 .\src\main.py paloalto .\src\input\nomfichier -m
|
||||
python3 .\src\main.py forcepoint .\src\input\nomfichier -m
|
||||
python3 .\src\main.py stormshield .\src\input\backup\ -f -r
|
||||
python3 .\src\main.py paloalto .\src\input\nomfichier -f -r
|
||||
python3 .\src\main.py forcepoint .\src\input\nomfichier -f -r
|
||||
```
|
||||
#### Options
|
||||
|
||||
| Option | Description |
|
||||
|--------|-------------|
|
||||
| -o [nom_fichier] | Spécifie le nom du fichier JSON de sortie (optionnel)
|
||||
| -m | Génère un rapport Excel de type matrice de flux (optionnel)
|
||||
| -f | Génère un rapport Excel de type matrice de flux (optionnel)
|
||||
| -r | Génère un rapport Excel de type matrice de routage (optionnel)
|
||||
---
|
||||
|
||||
## Arborescence du projet
|
||||
|
||||
46
src/main.py
46
src/main.py
@@ -4,21 +4,33 @@ import time
|
||||
from scripts.json_PaloAlto import generate_json_paloalto
|
||||
from scripts.json_Stormshield import generate_json_stormshield
|
||||
from scripts.json_Forcepoint import generate_json_forcepoint
|
||||
from scripts.export_excel import export_to_excel
|
||||
from scripts.export_matrice_flux import export_to_excel as export_flux_to_excel
|
||||
from scripts.export_matrice_routage import export_to_excel as export_routing_to_excel
|
||||
|
||||
def verify_if_file_exists(name):
|
||||
base, ext = os.path.splitext(name)
|
||||
counter = 1
|
||||
new_name = name
|
||||
while os.path.exists(new_name):
|
||||
new_name = f"{base}_{counter}{ext}"
|
||||
counter += 1
|
||||
return new_name
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: python3 src/main.py <firewall_type>[paloalto|stormshield|forcepoint] <input_directory/file> [-o <output_file>] [-m]")
|
||||
print("Usage: python3 src/main.py <firewall_type>[paloalto|stormshield|forcepoint] <input_directory/file> [-o <output_file>] [-f] [-r]")
|
||||
print("Options:")
|
||||
print(" <firewall_type> Type of firewall to process (paloalto|stormshield|forcepoint)")
|
||||
print(" -o <output_file> Specify output JSON file name (optional)")
|
||||
print(" -m Generate Excel report (optional)")
|
||||
print(" -f Generate matrix flux report (optional)")
|
||||
print(" -r Generate routing matrix report (optional)")
|
||||
sys.exit(1)
|
||||
|
||||
firewall_type = sys.argv[1].lower()
|
||||
input_data = sys.argv[2]
|
||||
input_path = "src/input/"
|
||||
output_path = "src/output/"
|
||||
os.makedirs(output_path, exist_ok=True)
|
||||
if "-o" in sys.argv:
|
||||
o_index = sys.argv.index("-o")
|
||||
if o_index + 1 < len(sys.argv):
|
||||
@@ -29,7 +41,7 @@ def main():
|
||||
else:
|
||||
timestamp = time.strftime("%Y%m%d")
|
||||
output_file_json = f"{output_path}{firewall_type}_{timestamp}.json"
|
||||
os.makedirs(output_path, exist_ok=True)
|
||||
output_file_json = verify_if_file_exists(output_file_json)
|
||||
|
||||
if firewall_type == "paloalto":
|
||||
generate_json_paloalto(input_data, output_file_json)
|
||||
@@ -41,20 +53,36 @@ def main():
|
||||
print("Erreur: type de firewall inconnu. Utilisez 'paloalto', 'stormshield' ou 'forcepoint'.")
|
||||
sys.exit(1)
|
||||
|
||||
if "-m" in sys.argv:
|
||||
print(f"\nGénération de l'export Excel...")
|
||||
if "-f" in sys.argv:
|
||||
print(f"\nGénération de l'export la matrice flux...")
|
||||
if "-o" in sys.argv:
|
||||
o_index = sys.argv.index("-o")
|
||||
if o_index + 1 < len(sys.argv):
|
||||
output_file_excel = os.path.join(f"{output_path}matrice_{firewall_type}_{sys.argv[o_index + 1]}.xlsx")
|
||||
output_file_excel = os.path.join(f"{output_path}matrice_flux_{firewall_type}_{sys.argv[o_index + 1]}.xlsx")
|
||||
else:
|
||||
print("Erreur: nom de fichier de sortie manquant après '-o'.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
timestamp = time.strftime("%Y%m%d")
|
||||
output_file_excel = f"{output_path}matrice_{firewall_type}_{timestamp}.xlsx"
|
||||
output_file_excel = f"{output_path}matrice_flux_{firewall_type}_{timestamp}.xlsx"
|
||||
output_file_excel = verify_if_file_exists(output_file_excel)
|
||||
excel_file = export_flux_to_excel(output_file_json, output_file_excel)
|
||||
print(f"✓ Processus terminé. Fichiers générés:\n - JSON: {output_file_json}\n - Excel: {excel_file}")
|
||||
|
||||
excel_file = export_to_excel(output_file_json, output_file_excel)
|
||||
if "-r" in sys.argv:
|
||||
print(f"\nGénération de l'export la matrice routage...")
|
||||
if "-o" in sys.argv:
|
||||
o_index = sys.argv.index("-o")
|
||||
if o_index + 1 < len(sys.argv):
|
||||
output_file_excel = os.path.join(f"{output_path}matrice_routage_{firewall_type}_{sys.argv[o_index + 1]}.xlsx")
|
||||
else:
|
||||
print("Erreur: nom de fichier de sortie manquant après '-o'.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
timestamp = time.strftime("%Y%m%d")
|
||||
output_file_excel = f"{output_path}matrice_routage_{firewall_type}_{timestamp}.xlsx"
|
||||
output_file_excel = verify_if_file_exists(output_file_excel)
|
||||
excel_file = export_routing_to_excel(output_file_json, output_file_excel)
|
||||
print(f"✓ Processus terminé. Fichiers générés:\n - JSON: {output_file_json}\n - Excel: {excel_file}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -11,7 +11,7 @@ module custom-firewall-objects {
|
||||
description
|
||||
"Custom firewall objects model depending on the firewall type.";
|
||||
|
||||
revision "2025-12-22" {
|
||||
revision "2025-10-21" {
|
||||
description "Initial version with conditional structure.";
|
||||
reference "Internal reference FW-OBJ-001.";
|
||||
}
|
||||
@@ -70,16 +70,16 @@ module custom-firewall-objects {
|
||||
}
|
||||
}
|
||||
|
||||
case forcepoint {
|
||||
when "/fwd:firewall-device/fwd:type = 'forcepoint'";
|
||||
description "Objects for forcepoint firewalls.";
|
||||
case fortinet {
|
||||
when "/fwd:firewall-device/fwd:type = 'fortinet'";
|
||||
description "Objects for Fortinet firewalls.";
|
||||
|
||||
container forcepoint-objects {
|
||||
description "Container for forcepoint specific objects.";
|
||||
container fortinet-objects {
|
||||
description "Container for Fortinet specific objects.";
|
||||
|
||||
list address {
|
||||
key "name";
|
||||
description "forcepoint address objects.";
|
||||
description "Fortinet address objects.";
|
||||
leaf name { type string; description "Object name."; }
|
||||
leaf subnet { type inet:ipv4-prefix; description "IPv4 subnet."; }
|
||||
leaf interface { type string; description "Associated interface."; }
|
||||
@@ -88,7 +88,7 @@ module custom-firewall-objects {
|
||||
|
||||
list service {
|
||||
key "name";
|
||||
description "forcepoint service objects.";
|
||||
description "Fortinet service objects.";
|
||||
leaf name { type string; description "Service name."; }
|
||||
leaf protocol {
|
||||
type enumeration {
|
||||
|
||||
97
src/scripts/export_matrice_routage.py
Normal file
97
src/scripts/export_matrice_routage.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import json
|
||||
import re
|
||||
import pandas as pd
|
||||
import ipaddress
|
||||
import time
|
||||
from collections import deque
|
||||
from openpyxl import load_workbook
|
||||
from scripts.style_excel.style_matrice_routage import style_matrice_routage
|
||||
|
||||
def is_pure_cidr(value):
|
||||
try:
|
||||
ipaddress.ip_network(value, strict=False)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def export_to_excel(json_file_path, output_file_excel):
|
||||
"""
|
||||
Export firewall data from JSON to Excel
|
||||
Args:
|
||||
json_file_path: Path to the JSON file to process
|
||||
output_file_excel: Path to the output Excel file
|
||||
"""
|
||||
|
||||
with open(json_file_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
address_objects = {}
|
||||
addr_objs = data.get(
|
||||
"custom-firewall-objects:firewall-objects", {}
|
||||
).get("address", [])
|
||||
|
||||
for obj in addr_objs:
|
||||
name = obj.get("name")
|
||||
ip_netmask = obj.get("config", {}).get("ip_netmask")
|
||||
|
||||
if name and ip_netmask:
|
||||
address_objects[name] = ip_netmask
|
||||
|
||||
rows = []
|
||||
network_instances = data.get(
|
||||
"openconfig-network-instance:network-instances", {}
|
||||
).get("network-instance", [])
|
||||
|
||||
for ni in network_instances:
|
||||
equipement = ni.get("name", "")
|
||||
|
||||
protocols = ni.get("protocols", {}).get("protocol", [])
|
||||
for proto in protocols:
|
||||
if proto.get("identifier") != "STATIC":
|
||||
continue
|
||||
|
||||
static_routes = proto.get("static-routes", {}).get("static", [])
|
||||
for route in static_routes:
|
||||
prefix = route.get("prefix", "")
|
||||
|
||||
# Découpage réseau / masque
|
||||
resolved_prefix = prefix
|
||||
if not is_pure_cidr(prefix):
|
||||
resolved_prefix = address_objects.get(prefix, prefix)
|
||||
|
||||
|
||||
try:
|
||||
net = ipaddress.ip_network(resolved_prefix, strict=False)
|
||||
reseau = str(net.network_address)
|
||||
masque = str(net.netmask)
|
||||
except ValueError:
|
||||
reseau = resolved_prefix
|
||||
masque = ""
|
||||
|
||||
next_hops = route.get("next-hops", {}).get("next-hop", [])
|
||||
for nh in next_hops:
|
||||
nh_config = nh.get("config", {})
|
||||
|
||||
rows.append({
|
||||
"Equipement": equipement,
|
||||
"Réseau destination": reseau,
|
||||
"Masque": masque,
|
||||
"Next Hop": nh_config.get("next-hop", ""),
|
||||
"Metrique": nh_config.get("metric", ""),
|
||||
"Commentaire": prefix
|
||||
})
|
||||
|
||||
df = pd.DataFrame(rows)
|
||||
|
||||
with pd.ExcelWriter(output_file_excel, engine="openpyxl") as writer:
|
||||
df.to_excel(writer,sheet_name="Routes statiques",index=False,startrow=1)
|
||||
|
||||
wb = load_workbook(output_file_excel)
|
||||
if "Sheet1" in wb.sheetnames:
|
||||
del wb["Sheet1"]
|
||||
|
||||
style_matrice_routage(wb["Routes statiques"])
|
||||
|
||||
wb.save(output_file_excel)
|
||||
print(f"✓ Export Excel OK: {output_file_excel}")
|
||||
return output_file_excel
|
||||
@@ -103,12 +103,43 @@ class ForcepointParser(ParserMixin):
|
||||
self.config['interfaces'].append(Interface(name=name, ip=ip_addr, netmask=netmask, comment=comment))
|
||||
|
||||
def _parse_virtual_routers(self):
|
||||
"""Parse routeurs simples"""
|
||||
for router in self.root.findall(".//router"):
|
||||
name = router.get("name")
|
||||
ip = router.find("mvia_address")
|
||||
ip_addr = ip.get("address") if ip is not None else None
|
||||
self.config['virtual_routers'].append(VirtualRouter(name=name, interfaces=[ip_addr] if ip_addr else [], static_routes=[]))
|
||||
"""Parse les virtual routers avec interfaces et routes statiques"""
|
||||
for router in self.root.findall(".//routing_node"):
|
||||
vr_name = router.get("name")
|
||||
interfaces = []
|
||||
static_routes = []
|
||||
|
||||
for iface in router.findall(".//interface_rn_level"):
|
||||
iface_name = iface.get("name", None)
|
||||
|
||||
for network in iface.findall(".//network_rn_level"):
|
||||
for gw in network.findall(".//gateway_rn_level"):
|
||||
|
||||
for ip_entry in gw.findall(".//ipaddress_behind_router"):
|
||||
gw_ip = ip_entry.get("gateway")
|
||||
ip_cidr = ip_entry.get("ipaddress")
|
||||
static_routes.append(
|
||||
StaticRoute(
|
||||
name=f"{vr_name}_{ip_cidr}",
|
||||
destination=ip_cidr,
|
||||
metric=0,
|
||||
next_hop_ip=gw_ip,
|
||||
interface=iface_name
|
||||
)
|
||||
)
|
||||
|
||||
if gw_ip:
|
||||
interfaces.append(gw_ip)
|
||||
|
||||
interfaces = list(set(interfaces))
|
||||
|
||||
self.config['virtual_routers'].append(
|
||||
VirtualRouter(
|
||||
name=vr_name,
|
||||
interfaces=interfaces,
|
||||
static_routes=static_routes
|
||||
)
|
||||
)
|
||||
|
||||
def _parse_security_rules(self):
|
||||
"""Parse règles FW Forcepoint"""
|
||||
|
||||
@@ -196,59 +196,95 @@ class StormshieldParser(ParserMixin):
|
||||
if not os.path.exists(route_path):
|
||||
return
|
||||
|
||||
virtual_routers = {}
|
||||
default_route = None
|
||||
|
||||
section = None
|
||||
with open(route_path, encoding="utf-8", errors="ignore") as f:
|
||||
for raw_line in f:
|
||||
line = raw_line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
if not line or line.startswith("off") or line.startswith("#"):
|
||||
continue
|
||||
|
||||
if line.startswith("[") and line.endswith("]"):
|
||||
section = line.strip("[]")
|
||||
continue
|
||||
|
||||
# ===== Default route =====
|
||||
if section == "Config" and line.startswith("DefaultRoute="):
|
||||
self.config["default_route"] = line.split("=", 1)[1].strip()
|
||||
default_route = line.split("=", 1)[1].strip()
|
||||
|
||||
elif section == "StaticRoutes" and not line.startswith("#"):
|
||||
parts = line.split(",")
|
||||
if len(parts) >= 2:
|
||||
self.config["static_routes"].append({
|
||||
"destination": parts[0],
|
||||
"interface": parts[1],
|
||||
"extra": parts[2:] if len(parts) > 2 else []
|
||||
})
|
||||
# ===== Static routes =====
|
||||
elif section == "StaticRoutes":
|
||||
parts = [p.strip() for p in line.split(",")]
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
|
||||
static_routes = []
|
||||
if self.config["default_route"]:
|
||||
static_routes.append(
|
||||
destination = parts[0]
|
||||
|
||||
vr_name = None
|
||||
interface = None
|
||||
next_hop_ip = None
|
||||
|
||||
# Format : VR->NextHop
|
||||
if "->" in parts[1]:
|
||||
vr_name, next_hop_ip = map(str.strip, parts[1].split("->", 1))
|
||||
interface = vr_name
|
||||
else:
|
||||
vr_name = parts[1]
|
||||
interface = parts[1]
|
||||
|
||||
# Création VR si absent
|
||||
if vr_name not in virtual_routers:
|
||||
virtual_routers[vr_name] = {
|
||||
"interfaces": set(),
|
||||
"static_routes": []
|
||||
}
|
||||
|
||||
virtual_routers[vr_name]["interfaces"].add(interface)
|
||||
|
||||
static_route = StaticRoute(
|
||||
name=f"{vr_name}-static-{len(virtual_routers[vr_name]['static_routes']) + 1}",
|
||||
destination=destination,
|
||||
metric=None,
|
||||
next_vr=None,
|
||||
next_hop_ip=next_hop_ip,
|
||||
interface=interface,
|
||||
bfd_profile=None
|
||||
)
|
||||
|
||||
virtual_routers[vr_name]["static_routes"].append(static_route)
|
||||
|
||||
# ===== Default VR (route par défaut) =====
|
||||
if default_route:
|
||||
vr_name = "default"
|
||||
virtual_routers.setdefault(vr_name, {
|
||||
"interfaces": set(),
|
||||
"static_routes": []
|
||||
})
|
||||
|
||||
virtual_routers[vr_name]["static_routes"].insert(
|
||||
0,
|
||||
StaticRoute(
|
||||
name="default-route",
|
||||
destination="0.0.0.0/0",
|
||||
metric=1,
|
||||
next_hop_ip=self.config["default_route"],
|
||||
interface=None
|
||||
next_vr=None,
|
||||
next_hop_ip=default_route,
|
||||
interface=None,
|
||||
bfd_profile=None
|
||||
)
|
||||
)
|
||||
|
||||
for idx, route in enumerate(self.config["static_routes"]):
|
||||
static_routes.append(
|
||||
StaticRoute(
|
||||
name=f"static-{idx+1}",
|
||||
destination=route["destination"],
|
||||
metric=1,
|
||||
next_hop_ip=None,
|
||||
interface=route["interface"]
|
||||
)
|
||||
# ===== Construction finale =====
|
||||
self.config["virtual_routers"] = [
|
||||
VirtualRouter(
|
||||
name=vr_name,
|
||||
interfaces=list(data["interfaces"]),
|
||||
static_routes=data["static_routes"]
|
||||
)
|
||||
|
||||
vr = VirtualRouter(
|
||||
name="default-vr",
|
||||
interfaces=[r["interface"] for r in self.config["static_routes"]],
|
||||
static_routes=static_routes
|
||||
)
|
||||
|
||||
self.config["virtual_routers"] = [vr]
|
||||
for vr_name, data in virtual_routers.items()
|
||||
]
|
||||
|
||||
# def _parse_slotinfo_file(self):
|
||||
# path = os.path.join(self.base_dir, "Filter", "slotinfo")
|
||||
|
||||
58
src/scripts/style_excel/style_matrice_routage.py
Normal file
58
src/scripts/style_excel/style_matrice_routage.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from openpyxl.styles import Alignment, PatternFill, Font, Border, Side
|
||||
from openpyxl.utils import get_column_letter
|
||||
|
||||
def style_matrice_routage(ws):
|
||||
|
||||
headers = [
|
||||
"Equipement",
|
||||
"Réseau destination",
|
||||
"Masque",
|
||||
"Next Hop",
|
||||
"Metrique",
|
||||
"Commentaire"
|
||||
]
|
||||
|
||||
header_fill = PatternFill("solid", fgColor="1F4E78")
|
||||
header_font = Font(color="FFFFFF", bold=True)
|
||||
header_alignment = Alignment(horizontal="center", vertical="center")
|
||||
|
||||
cell_alignment_left = Alignment(horizontal="left", vertical="center")
|
||||
cell_alignment_center = Alignment(horizontal="center", vertical="center")
|
||||
|
||||
thin_border = Border(
|
||||
left=Side(style="thin"),
|
||||
right=Side(style="thin"),
|
||||
top=Side(style="thin"),
|
||||
bottom=Side(style="thin")
|
||||
)
|
||||
|
||||
for col_idx, header in enumerate(headers, start=1):
|
||||
cell = ws.cell(row=1, column=col_idx, value=header)
|
||||
cell.fill = header_fill
|
||||
cell.font = header_font
|
||||
cell.alignment = header_alignment
|
||||
cell.border = thin_border
|
||||
|
||||
column_widths = {
|
||||
"A": 22, # Equipement
|
||||
"B": 22, # Réseau destination
|
||||
"C": 18, # Masque
|
||||
"D": 20, # Next Hop
|
||||
"E": 12, # Metrique
|
||||
"F": 30 # Commentaire
|
||||
}
|
||||
|
||||
for col_letter, width in column_widths.items():
|
||||
ws.column_dimensions[col_letter].width = width
|
||||
|
||||
for row in ws.iter_rows(min_row=2, max_row=ws.max_row, max_col=len(headers)):
|
||||
for cell in row:
|
||||
cell.border = thin_border
|
||||
|
||||
if cell.column in (1, 2, 4, 6):
|
||||
cell.alignment = cell_alignment_left
|
||||
else:
|
||||
cell.alignment = cell_alignment_center
|
||||
|
||||
ws.freeze_panes = "A2"
|
||||
ws.auto_filter.ref = ws.dimensions
|
||||
Reference in New Issue
Block a user