Compare commits

...

8 Commits

Author SHA1 Message Date
Chevallier
fea80315ef correction Equipement Stormshield 2026-01-19 14:17:53 +01:00
Chevallier
bb4de4e855 ajout name dans json, correction champ equipement matrice de routage, correction doublon headers matrice de routage 2026-01-16 14:57:51 +01:00
Chevallier
e4a906652d correction README.md 2026-01-15 15:28:02 +01:00
Chevallier
8314c18c0f correction parsing route Forcepoint 2026-01-15 13:54:56 +01:00
Chevallier
0079cbe738 correction output file name 2026-01-15 11:37:51 +01:00
Chevallier
52c1f73121 correction venv 2026-01-15 09:57:43 +01:00
Chevallier
c429d2633a fin firewall 2026-01-15 09:38:37 +01:00
Chevallier
131f1e6735 multi firewall analyze 2026-01-15 09:37:42 +01:00
9 changed files with 444 additions and 20 deletions

View File

@@ -50,10 +50,11 @@ Il founit également la possibilité de générer une **matrice de routage au fo
### Pré-requis ### Pré-requis
```bash ```bash
cd ..\..\
python -m venv .venv python -m venv .venv
.\.venv\Scripts\activate .\.venv\Scripts\activate
pip install -r .\src\requierements.txt pip install -r .\src\requirements.txt
``` ```
#### Commandes principales #### Commandes principales

View File

@@ -7,6 +7,15 @@ from scripts.json_Forcepoint import generate_json_forcepoint
from scripts.export_matrice_flux import export_to_excel as export_flux_to_excel from scripts.export_matrice_flux import export_to_excel as export_flux_to_excel
from scripts.export_matrice_routage import export_to_excel as export_routing_to_excel from scripts.export_matrice_routage import export_to_excel as export_routing_to_excel
def verify_if_file_exists(name):
base, ext = os.path.splitext(name)
counter = 1
new_name = name
while os.path.exists(new_name):
new_name = f"{base}_{counter}{ext}"
counter += 1
return new_name
def main(): def main():
if len(sys.argv) < 3: if len(sys.argv) < 3:
print("Usage: python3 src/main.py <firewall_type>[paloalto|stormshield|forcepoint] <input_directory/file> [-o <output_file>] [-f] [-r]") print("Usage: python3 src/main.py <firewall_type>[paloalto|stormshield|forcepoint] <input_directory/file> [-o <output_file>] [-f] [-r]")
@@ -21,6 +30,7 @@ def main():
input_data = sys.argv[2] input_data = sys.argv[2]
input_path = "src/input/" input_path = "src/input/"
output_path = "src/output/" output_path = "src/output/"
os.makedirs(output_path, exist_ok=True)
if "-o" in sys.argv: if "-o" in sys.argv:
o_index = sys.argv.index("-o") o_index = sys.argv.index("-o")
if o_index + 1 < len(sys.argv): if o_index + 1 < len(sys.argv):
@@ -31,7 +41,7 @@ def main():
else: else:
timestamp = time.strftime("%Y%m%d") timestamp = time.strftime("%Y%m%d")
output_file_json = f"{output_path}{firewall_type}_{timestamp}.json" output_file_json = f"{output_path}{firewall_type}_{timestamp}.json"
os.makedirs(output_path, exist_ok=True) output_file_json = verify_if_file_exists(output_file_json)
if firewall_type == "paloalto": if firewall_type == "paloalto":
generate_json_paloalto(input_data, output_file_json) generate_json_paloalto(input_data, output_file_json)
@@ -55,7 +65,7 @@ def main():
else: else:
timestamp = time.strftime("%Y%m%d") timestamp = time.strftime("%Y%m%d")
output_file_excel = f"{output_path}matrice_flux_{firewall_type}_{timestamp}.xlsx" output_file_excel = f"{output_path}matrice_flux_{firewall_type}_{timestamp}.xlsx"
output_file_excel = verify_if_file_exists(output_file_excel)
excel_file = export_flux_to_excel(output_file_json, output_file_excel) excel_file = export_flux_to_excel(output_file_json, output_file_excel)
print(f"✓ Processus terminé. Fichiers générés:\n - JSON: {output_file_json}\n - Excel: {excel_file}") print(f"✓ Processus terminé. Fichiers générés:\n - JSON: {output_file_json}\n - Excel: {excel_file}")
@@ -71,7 +81,7 @@ def main():
else: else:
timestamp = time.strftime("%Y%m%d") timestamp = time.strftime("%Y%m%d")
output_file_excel = f"{output_path}matrice_routage_{firewall_type}_{timestamp}.xlsx" output_file_excel = f"{output_path}matrice_routage_{firewall_type}_{timestamp}.xlsx"
output_file_excel = verify_if_file_exists(output_file_excel)
excel_file = export_routing_to_excel(output_file_json, output_file_excel) excel_file = export_routing_to_excel(output_file_json, output_file_excel)
print(f"✓ Processus terminé. Fichiers générés:\n - JSON: {output_file_json}\n - Excel: {excel_file}") print(f"✓ Processus terminé. Fichiers générés:\n - JSON: {output_file_json}\n - Excel: {excel_file}")

View File

@@ -44,6 +44,9 @@ def export_to_excel(json_file_path, output_file_excel):
for ni in network_instances: for ni in network_instances:
equipement = ni.get("name", "") equipement = ni.get("name", "")
fw_names = data.get("firewall-device", {}).get("name", [])
if equipement not in fw_names:
equipement = ", ".join(fw_names)
protocols = ni.get("protocols", {}).get("protocol", []) protocols = ni.get("protocols", {}).get("protocol", [])
for proto in protocols: for proto in protocols:
@@ -84,7 +87,7 @@ def export_to_excel(json_file_path, output_file_excel):
df = pd.DataFrame(rows) df = pd.DataFrame(rows)
with pd.ExcelWriter(output_file_excel, engine="openpyxl") as writer: with pd.ExcelWriter(output_file_excel, engine="openpyxl") as writer:
df.to_excel(writer,sheet_name="Routes statiques",index=False,startrow=1) df.to_excel(writer,sheet_name="Routes statiques",index=False,startrow=1,header=False)
wb = load_workbook(output_file_excel) wb = load_workbook(output_file_excel)
if "Sheet1" in wb.sheetnames: if "Sheet1" in wb.sheetnames:

View File

@@ -6,11 +6,12 @@ Fournit des méthodes communes pour la conversion et l'exportation des données
from typing import Dict, Any from typing import Dict, Any
class ParserMixin: class ParserMixin:
def to_openconfig_yang(self, type) -> Dict[str, Any]: def to_openconfig_yang(self, type, name) -> Dict[str, Any]:
"""Convertit la configuration au format OpenConfig YANG avec les nouveaux modèles""" """Convertit la configuration au format OpenConfig YANG avec les nouveaux modèles"""
openconfig = { openconfig = {
"firewall-device": { "firewall-device": {
"type": type "type": type,
"name": name
}, },
"openconfig-interfaces:interfaces": { "openconfig-interfaces:interfaces": {
"interface": [] "interface": []

View File

@@ -103,12 +103,43 @@ class ForcepointParser(ParserMixin):
self.config['interfaces'].append(Interface(name=name, ip=ip_addr, netmask=netmask, comment=comment)) self.config['interfaces'].append(Interface(name=name, ip=ip_addr, netmask=netmask, comment=comment))
def _parse_virtual_routers(self): def _parse_virtual_routers(self):
"""Parse routeurs simples""" """Parse les virtual routers avec interfaces et routes statiques"""
for router in self.root.findall(".//router"): for router in self.root.findall(".//routing_node"):
name = router.get("name") vr_name = router.get("name")
ip = router.find("mvia_address") interfaces = []
ip_addr = ip.get("address") if ip is not None else None static_routes = []
self.config['virtual_routers'].append(VirtualRouter(name=name, interfaces=[ip_addr] if ip_addr else [], static_routes=[]))
for iface in router.findall(".//interface_rn_level"):
iface_name = iface.get("name", None)
for network in iface.findall(".//network_rn_level"):
for gw in network.findall(".//gateway_rn_level"):
for ip_entry in gw.findall(".//ipaddress_behind_router"):
gw_ip = ip_entry.get("gateway")
ip_cidr = ip_entry.get("ipaddress")
static_routes.append(
StaticRoute(
name=f"{vr_name}_{ip_cidr}",
destination=ip_cidr,
metric=0,
next_hop_ip=gw_ip,
interface=iface_name
)
)
if gw_ip:
interfaces.append(gw_ip)
interfaces = list(set(interfaces))
self.config['virtual_routers'].append(
VirtualRouter(
name=vr_name,
interfaces=interfaces,
static_routes=static_routes
)
)
def _parse_security_rules(self): def _parse_security_rules(self):
"""Parse règles FW Forcepoint""" """Parse règles FW Forcepoint"""
@@ -158,7 +189,8 @@ class ForcepointParser(ParserMixin):
def export_to_json(self, output_file: str): def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig""" """Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("forcepoint") fw_names = [cluster.get("name") for cluster in self.root.findall(".//fw_cluster")]
openconfig_data = self.to_openconfig_yang("forcepoint", fw_names)
with open(output_file, 'w', encoding='utf-8') as f: with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False) json.dump(openconfig_data, f, indent=2, ensure_ascii=False)

View File

@@ -451,7 +451,11 @@ class PaloAltoParser(ParserMixin):
def export_to_json(self, output_file: str): def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig""" """Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("palo-alto") fw_name = ""
deviceconfig = self.root.find(".//deviceconfig/system/hostname")
if deviceconfig is not None and deviceconfig.text:
fw_name = deviceconfig.text.strip()
openconfig_data = self.to_openconfig_yang("palo-alto", [fw_name])
with open(output_file, 'w', encoding='utf-8') as f: with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False) json.dump(openconfig_data, f, indent=2, ensure_ascii=False)

View File

@@ -390,6 +390,18 @@ class StormshieldParser(ParserMixin):
self.config["security_rules"].append(rule) self.config["security_rules"].append(rule)
def _parse_name(self):
"""Parse le nom du firewall depuis le fichier network"""
path = os.path.join(self.base_dir, "network")
fw_name = ""
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if line.startswith("Address="):
fw_name = line.split("=", 1)[1].strip()
break
return fw_name
def _extract_comment(self, line: str) -> str: def _extract_comment(self, line: str) -> str:
"""Extrait un commentaire après #""" """Extrait un commentaire après #"""
@@ -409,7 +421,7 @@ class StormshieldParser(ParserMixin):
def export_to_json(self, output_file: str): def export_to_json(self, output_file: str):
"""Exporte la configuration au format JSON OpenConfig""" """Exporte la configuration au format JSON OpenConfig"""
openconfig_data = self.to_openconfig_yang("stormshield") openconfig_data = self.to_openconfig_yang("stormshield", [self._parse_name()])
with open(output_file, 'w', encoding='utf-8') as f: with open(output_file, 'w', encoding='utf-8') as f:
json.dump(openconfig_data, f, indent=2, ensure_ascii=False) json.dump(openconfig_data, f, indent=2, ensure_ascii=False)

View File

@@ -1,3 +1,4 @@
from multiprocessing import process
import subprocess import subprocess
import os import os
import sys import sys
@@ -5,6 +6,32 @@ import tkinter as tk
from tkinter import ttk, filedialog, messagebox from tkinter import ttk, filedialog, messagebox
from datetime import datetime from datetime import datetime
from threading import Thread from threading import Thread
from pathlib import Path
def make_scrollable(parent):
canvas = tk.Canvas(parent, highlightthickness=0)
scrollbar = ttk.Scrollbar(parent, orient="vertical", command=canvas.yview)
scrollable_frame = ttk.Frame(canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
def on_configure(event):
canvas.itemconfig("frame", width=canvas.winfo_width())
canvas.create_window((0,0), window=scrollable_frame, anchor="nw", tags="frame")
canvas.bind("<Configure>", on_configure)
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
canvas.bind_all("<MouseWheel>", lambda e: canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
return scrollable_frame
class ToolTip: class ToolTip:
def __init__(self, widget, text): def __init__(self, widget, text):
@@ -44,6 +71,333 @@ class ToolTip:
self.tipwindow.destroy() self.tipwindow.destroy()
self.tipwindow = None self.tipwindow = None
# Contenu fenêtre Analyse Firewall Multi
def open_firewall_gui_multi(app, OUTPUT_DIR, HELP_FILE_FW, FIREWALL_DIR, FIREWALL_MAIN):
app2 = tk.Toplevel(app)
app2.title("Analyse Configuration Firewall - Plusieurs firewall")
app2.geometry("800x450")
app2.resizable(True, True)
matrice_flux = tk.BooleanVar()
matrice_routage = tk.BooleanVar()
content = make_scrollable(app2)
def open_output_folder():
if sys.platform == "win32":
os.startfile(OUTPUT_DIR)
elif sys.platform == "darwin":
subprocess.run(["open", OUTPUT_DIR])
else:
subprocess.run(["xdg-open", OUTPUT_DIR])
def open_help():
if not os.path.exists(HELP_FILE_FW):
messagebox.showerror("Erreur", "Le fichier d'aide est introuvable.")
return
if sys.platform == "win32":
os.startfile(HELP_FILE_FW)
elif sys.platform == "darwin":
subprocess.run(["open", HELP_FILE_FW])
else:
subprocess.run(["xdg-open", HELP_FILE_FW])
def get_selected_firewall(vars_dict):
for fw, var in vars_dict.items():
if var.get():
return fw
return None
def run_parser_multi():
selections = []
for name, vars_dict in file_selections.items():
fw = get_selected_firewall(vars_dict)
if fw:
selections.append({
"type": "file",
"name": name,
"firewall": fw
})
for name, vars_dict in dir_selections.items():
fw = get_selected_firewall(vars_dict)
if fw:
selections.append({
"type": "dir",
"name": name,
"firewall": fw
})
if not selections:
messagebox.showwarning(
"Aucune sélection",
"Veuillez sélectionner au moins un fichier ou dossier."
)
return
content.config(cursor="watch")
progress_bar = ttk.Progressbar(content, mode="indeterminate")
progress_bar.pack(pady=10, fill="x", padx=10)
progress_bar.start(10)
def process():
try:
for item in selections:
input_path = str(INPUT_DIR / item["name"])
cmd = [
sys.executable,
FIREWALL_MAIN,
item["firewall"],
input_path
]
if matrice_flux.get():
cmd.append("-f")
if matrice_routage.get():
cmd.append("-r")
cmd.append("-o")
if item["type"] == "file":
base_name = os.path.splitext(item["name"])[0]
cmd.append(base_name)
else:
cmd.append(item["name"])
print("Commande exécutée :", " ".join(cmd))
subprocess.run(cmd, cwd=FIREWALL_DIR, check=True)
def on_success():
progress_bar.stop()
progress_bar.destroy()
content.config(cursor="")
messagebox.showinfo("Succès", "Tous les traitements sont terminés.")
open_output_folder()
app2.destroy()
app2.after(0, on_success)
except subprocess.CalledProcessError as e:
def on_error():
progress_bar.stop()
progress_bar.destroy()
content.config(cursor="")
messagebox.showerror(
"Erreur",
f"Erreur lors de l'exécution du traitement.\n\n{e}"
)
app2.after(0, on_error)
Thread(target=process, daemon=True).start()
def create_table(parent, title, items, mode):
"""
mode = 'file' or 'dir'
"""
style = ttk.Style()
style.configure(
"Table.TFrame",
background="white",
borderwidth=1,
relief="solid"
)
style.configure(
"Table.TLabel",
background="white"
)
style.configure(
"Table.TCheckbutton",
background="white"
)
style.map(
"Table.TCheckbutton",
background=[("disabled", "#f0f0f0")],
foreground=[("disabled", "#a0a0a0")]
)
ttk.Label(
parent,
text=title,
font=("Arial", 10, "bold")
).pack(anchor="w", padx=10, pady=(0, 4))
table = ttk.Frame(parent, style="Table.TFrame")
table.pack(fill="x", padx=10)
table.columnconfigure(0, weight=1)
if mode == "file": n = "Fichier source"
else : n = "Dossier source"
headers = [n, "Stormshield", "Palo-Alto", "Forcepoint"]
for col, text in enumerate(headers):
ttk.Label(
table,
text=text,
style="Table.TLabel",
font=("Arial", 9, "bold")
).grid(row=0, column=col, padx=8, pady=6, sticky="e" if col > 0 else "w")
ttk.Separator(table, orient="horizontal").grid(
row=1, column=0, columnspan=4, sticky="ew", pady=(0, 2)
)
selections = {}
grid_row = 2
for name in items:
ttk.Label(
table,
text=name,
style="Table.TLabel"
).grid(row=grid_row, column=0, sticky="w", padx=8, pady=4)
var_storm = tk.BooleanVar()
var_palo = tk.BooleanVar()
var_force = tk.BooleanVar()
def make_command(active, others):
def cmd():
if active.get():
for var in others:
var.set(False)
return cmd
cb_storm = ttk.Checkbutton(
table,
variable=var_storm,
style="Table.TCheckbutton",
command=make_command(var_storm, [var_palo, var_force])
)
cb_palo = ttk.Checkbutton(
table,
variable=var_palo,
style="Table.TCheckbutton",
command=make_command(var_palo, [var_storm, var_force])
)
cb_force = ttk.Checkbutton(
table,
variable=var_force,
style="Table.TCheckbutton",
command=make_command(var_force, [var_storm, var_palo])
)
cb_storm.grid(row=grid_row, column=1, sticky="n", padx=8)
cb_palo.grid(row=grid_row, column=2, sticky="n", padx=8)
cb_force.grid(row=grid_row, column=3, sticky="n", padx=8)
if mode == "file":
cb_storm.state(["disabled"])
elif mode == "dir":
cb_palo.state(["disabled"])
cb_force.state(["disabled"])
selections[name] = {
"stormshield": var_storm,
"paloalto": var_palo,
"forcepoint": var_force
}
# Séparateur interligne
ttk.Separator(table, orient="horizontal").grid(
row=grid_row + 1,
column=0,
columnspan=4,
sticky="ew"
)
grid_row += 2
return selections
INPUT_DIR = Path(OUTPUT_DIR).parent / "input"
if not INPUT_DIR.exists():
messagebox.showerror("Erreur", f"Dossier input introuvable : {INPUT_DIR}")
return
header_frame = ttk.Frame(content)
header_frame.pack(fill="x", padx=10, pady=5)
ttk.Button(
header_frame,
text="Help",
width=6,
command=open_help
).pack(side="right")
ttk.Label(
header_frame,
text="Génération de fichiers JSON au format normalisé YANG à partir des configurations de plusieurs firewalls",
font=("Arial", 8, "bold")
).pack(side="left")
ttk.Label(
content,
text="/!\\ Mettre les fichiers/dossiers de configurations dans le dossier `./Parseurs_config_Firewall/src/input/`",
font=("Arial", 9, "italic"),
).pack(anchor="w", padx=10, pady=(0, 10))
ttk.Label(
content,
text="Sélectionnez les fichiers ou dossiers de configuration source à traiter :",
font=("Arial", 10)
).pack(anchor="w", padx=10)
files = [item.name for item in INPUT_DIR.iterdir() if item.is_file()]
dirs = [item.name for item in INPUT_DIR.iterdir() if item.is_dir()]
file_selections = create_table(
content,
"🗎 Liste des fichiers de configuration source",
files,
mode="file"
)
dir_selections = create_table(
content,
"🗁 Liste des dossiers de configuration source",
dirs,
mode="dir"
)
ttk.Checkbutton(
content,
text="Générer la matrice de flux en Excel",
variable=matrice_flux
).pack(anchor="w", padx=10, pady=(10, 0))
matrice_flux.set(True)
ttk.Checkbutton(
content,
text="Générer la matrice de routage en Excel (route statique uniquement)",
variable=matrice_routage
).pack(anchor="w", padx=10, pady=(0, 10))
matrice_routage.set(True)
ttk.Label(
content,
text="Dossier de sortie :",
).pack(anchor="w", padx=10)
ttk.Button(
content,
text="Ouvrir le dossier de sortie",
command=open_output_folder
).pack(anchor="w", padx=10)
ttk.Button(
content,
text="Lancer le traitement",
command=run_parser_multi
).pack(pady=15)
content.mainloop()
# Contenu fenêtre Analyse Firewall # Contenu fenêtre Analyse Firewall
def open_firewall_gui(root, BASE_DIR): def open_firewall_gui(root, BASE_DIR):
FIREWALL_DIR = os.path.join( FIREWALL_DIR = os.path.join(
@@ -64,8 +418,8 @@ def open_firewall_gui(root, BASE_DIR):
app = tk.Toplevel(root) app = tk.Toplevel(root)
app.title("Analyse Configuration Firewall") app.title("Analyse Configuration Firewall")
app.geometry("800x400") app.geometry("800x450")
app.resizable(False, False) app.resizable(True, True)
firewall_var = tk.StringVar() firewall_var = tk.StringVar()
input_var = tk.StringVar() input_var = tk.StringVar()
@@ -218,6 +572,13 @@ def open_firewall_gui(root, BASE_DIR):
font=("Arial", 8, "bold") font=("Arial", 8, "bold")
).pack(side="left") ).pack(side="left")
ttk.Button(
app,
text="Plusieurs firewall",
command=lambda: open_firewall_gui_multi(app, OUTPUT_DIR, HELP_FILE_FW, FIREWALL_DIR, FIREWALL_MAIN)
).pack(anchor="w", padx=10)
ttk.Label(app, text="Type de firewall").pack(anchor="w", padx=10, pady=5) ttk.Label(app, text="Type de firewall").pack(anchor="w", padx=10, pady=5)
ttk.Combobox( ttk.Combobox(
app, app,

View File

@@ -8,7 +8,7 @@ from gui_firewall import open_firewall_gui
from gui_switch import open_switch_gui from gui_switch import open_switch_gui
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) BASE_DIR = os.path.dirname(os.path.abspath(__file__))
VENV_DIR = os.path.join(BASE_DIR, "venv") VENV_DIR = os.path.join(BASE_DIR, ".venv")
REQ_FILE = os.path.join(BASE_DIR, "requirements.txt") REQ_FILE = os.path.join(BASE_DIR, "requirements.txt")
def venv_python(): def venv_python():
@@ -69,7 +69,7 @@ def open_main_gui():
root = tk.Tk() root = tk.Tk()
root.title("Analyse Réseau") root.title("Analyse Réseau")
root.geometry("650x350") root.geometry("650x350")
root.resizable(False, False) root.resizable(True, True)
ttk.Label( ttk.Label(
root, root,