- agency-archivist/SKILL.md: Skill per upload, estrazione, catalogazione
- scripts/extract_archive.py: Estrazione zip/URL in clients/{client}/assets/
- scripts/scan_resources.py: Scansione metadata (2 passate: base + vision)
- scripts/generate_catalog.py: Generazione catalogo.md con tag e use case
- references/resource_types.md: Tipologie risorse e use case per skill
- agency-orchestrator/SKILL.md: Integrazione archivist in Fase 1
- Step opzionale upload risorse
- Sezione dedicata gestione risorse
- Comportamento proattivo (richiesta risorse mancanti)
- Pattern per altre skill
Integrazione completa: orchestrator → archivist → visual-generator/design/web/social
316 lines
11 KiB
Python
Executable file
316 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
extract_archive.py — Estrae archivi (zip, tar, rar) e organizza risorse in clients/{client}/assets/
|
|
|
|
Usage:
|
|
python extract_archive.py <path_or_url> --client <client_name>
|
|
python extract_archive.py brand_assets.zip --client demo_co_srl
|
|
python extract_archive.py https://example.com/assets.zip --client demo_co_srl
|
|
|
|
Options:
|
|
--keep-archive Mantieni file originale (default: elimina dopo estrazione)
|
|
--verbose Log dettagliato
|
|
--dry-run Simula senza estrazione
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import zipfile
|
|
import tarfile
|
|
import shutil
|
|
import hashlib
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Try to import rarfile (optional, requires unrar)
|
|
try:
|
|
import rarfile
|
|
HAS_RAR = True
|
|
except ImportError:
|
|
HAS_RAR = False
|
|
|
|
# Mapping parole chiave → cartelle
|
|
CATEGORY_KEYWORDS = {
|
|
'images/logo': ['logo', 'marchio', 'brand', 'logotipo'],
|
|
'images/prodotto': ['prodotto', 'product', 'item', 'articolo'],
|
|
'images/team': ['team', 'staff', 'ufficio', 'office', 'persone', 'people'],
|
|
'images/stock': ['sfondo', 'background', 'texture', 'stock'],
|
|
'videos/promo': ['promo', 'reel', 'trailer', 'advertisement'],
|
|
'videos/tutorial': ['tutorial', 'howto', 'demo', 'dimostrazione', 'guida'],
|
|
'documents/brand': ['brand', 'guideline', 'manual', 'linee guida'],
|
|
'documents/product': ['scheda', 'datasheet', 'spec', 'specifiche'],
|
|
}
|
|
|
|
def get_file_type(filename):
|
|
"""Determina tipo file dall'estensione."""
|
|
ext = filename.lower().split('.')[-1]
|
|
|
|
image_exts = ['jpg', 'jpeg', 'png', 'gif', 'webp', 'svg', 'bmp', 'tiff']
|
|
video_exts = ['mp4', 'mov', 'avi', 'mkv', 'webm', 'wmv']
|
|
doc_exts = ['pdf', 'doc', 'docx', 'txt', 'md', 'ppt', 'pptx', 'xls', 'xlsx']
|
|
|
|
if ext in image_exts:
|
|
return 'images'
|
|
elif ext in video_exts:
|
|
return 'videos'
|
|
elif ext in doc_exts:
|
|
return 'documents'
|
|
else:
|
|
return 'other'
|
|
|
|
def categorize_file(filename, file_type):
|
|
"""Assegna categoria basata su parole chiave nel nome."""
|
|
filename_lower = filename.lower()
|
|
|
|
for category, keywords in CATEGORY_KEYWORDS.items():
|
|
base_type = category.split('/')[0]
|
|
if base_type == file_type:
|
|
for keyword in keywords:
|
|
if keyword in filename_lower:
|
|
return category
|
|
|
|
# Fallback: cartella base per tipo
|
|
return f"{file_type}/" if file_type != 'other' else 'misc/'
|
|
|
|
def get_file_size(path):
|
|
"""Restituisce dimensione file in bytes."""
|
|
return os.path.getsize(path)
|
|
|
|
def format_size(size_bytes):
|
|
"""Formatta dimensione in KB/MB/GB."""
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if size_bytes < 1024:
|
|
return f"{size_bytes:.1f} {unit}"
|
|
size_bytes /= 1024
|
|
return f"{size_bytes:.1f} TB"
|
|
|
|
def download_file(url, dest_path, verbose=False):
|
|
"""Download file da URL."""
|
|
import urllib.request
|
|
|
|
if verbose:
|
|
print(f"📥 Download: {url}")
|
|
|
|
try:
|
|
urllib.request.urlretrieve(url, dest_path)
|
|
if verbose:
|
|
print(f"✅ Download completato: {dest_path}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Errore download: {e}")
|
|
return False
|
|
|
|
def extract_archive(archive_path, extract_to, verbose=False):
|
|
"""Estrae archivio e restituisce lista file estratti."""
|
|
extracted_files = []
|
|
|
|
# Determina formato
|
|
filename = os.path.basename(archive_path)
|
|
ext = filename.lower().split('.')[-1]
|
|
|
|
try:
|
|
if ext == 'zip' or filename.endswith('.tar.gz') or filename.endswith('.tgz'):
|
|
if ext == 'zip':
|
|
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_to)
|
|
extracted_files = zip_ref.namelist()
|
|
else:
|
|
with tarfile.open(archive_path, 'r:gz') as tar_ref:
|
|
tar_ref.extractall(extract_to)
|
|
extracted_files = tar_ref.getnames()
|
|
|
|
elif ext == 'rar':
|
|
if not HAS_RAR:
|
|
print("❌ Supporto RAR non disponibile. Installa: pip install rarfile unrar")
|
|
return []
|
|
with rarfile.RarFile(archive_path, 'r') as rar_ref:
|
|
rar_ref.extractall(extract_to)
|
|
extracted_files = rar_ref.namelist()
|
|
|
|
else:
|
|
print(f"❌ Formato .{ext} non supportato. Usa zip, tar.gz, o rar.")
|
|
return []
|
|
|
|
if verbose:
|
|
print(f"✅ Estratti {len(extracted_files)} file")
|
|
|
|
return extracted_files
|
|
|
|
except Exception as e:
|
|
print(f"❌ Errore estrazione: {e}")
|
|
return []
|
|
|
|
def organize_files(temp_dir, assets_dir, client, verbose=False):
|
|
"""Organizza file estratti per categoria."""
|
|
organized = []
|
|
|
|
# Crea struttura cartelle
|
|
for folder in ['images/logo', 'images/prodotto', 'images/team', 'images/stock',
|
|
'videos/promo', 'videos/tutorial', 'documents/brand', 'documents/product']:
|
|
os.makedirs(os.path.join(assets_dir, folder), exist_ok=True)
|
|
|
|
# Processa ogni file
|
|
for root, dirs, files in os.walk(temp_dir):
|
|
for filename in files:
|
|
# Salta file nascosti e system
|
|
if filename.startswith('.') or filename == 'Thumbs.db':
|
|
continue
|
|
|
|
src_path = os.path.join(root, filename)
|
|
file_type = get_file_type(filename)
|
|
category = categorize_file(filename, file_type)
|
|
|
|
# Path destinazione
|
|
dest_folder = os.path.join(assets_dir, category)
|
|
dest_path = os.path.join(dest_folder, filename)
|
|
|
|
# Gestisci nomi duplicati
|
|
base, ext = os.path.splitext(filename)
|
|
counter = 1
|
|
while os.path.exists(dest_path):
|
|
dest_path = os.path.join(dest_folder, f"{base}_{counter}{ext}")
|
|
counter += 1
|
|
|
|
# Copia file
|
|
shutil.copy2(src_path, dest_path)
|
|
organized.append({
|
|
'original': filename,
|
|
'destination': os.path.relpath(dest_path, assets_dir),
|
|
'type': file_type,
|
|
'category': category,
|
|
'size': get_file_size(dest_path)
|
|
})
|
|
|
|
if verbose:
|
|
print(f" 📁 {filename} → {category}/")
|
|
|
|
return organized
|
|
|
|
def log_operation(client, archive_name, organized_files, ops_log_path):
|
|
"""Registra operazione nel run log."""
|
|
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M')
|
|
|
|
log_entry = f"""
|
|
## {timestamp} — Archivist Upload
|
|
|
|
- **Archivio:** `{archive_name}`
|
|
- **File estratti:** {len(organized_files)}
|
|
- **Status:** ✅ Completato
|
|
|
|
### Dettagli
|
|
|
|
| Tipo | Count | Dimensione Totale |
|
|
|------|-------|-------------------|
|
|
| Immagini | {sum(1 for f in organized_files if f['type'] == 'images')} | {format_size(sum(f['size'] for f in organized_files if f['type'] == 'images'))} |
|
|
| Video | {sum(1 for f in organized_files if f['type'] == 'videos')} | {format_size(sum(f['size'] for f in organized_files if f['type'] == 'videos'))} |
|
|
| Documenti | {sum(1 for f in organized_files if f['type'] == 'documents')} | {format_size(sum(f['size'] for f in organized_files if f['type'] == 'documents'))} |
|
|
|
|
"""
|
|
|
|
with open(ops_log_path, 'a') as f:
|
|
f.write(log_entry)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Estrae archivi e organizza risorse')
|
|
parser.add_argument('path_or_url', help='Path archivio o URL')
|
|
parser.add_argument('--client', required=True, help='Nome cliente (cartella clients/{client}/)')
|
|
parser.add_argument('--keep-archive', action='store_true', help='Mantieni archivio originale')
|
|
parser.add_argument('--verbose', action='store_true', help='Log dettagliato')
|
|
parser.add_argument('--dry-run', action='store_true', help='Simula senza estrazione')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Workspace root
|
|
workspace = Path.home() / '.openclaw' / 'workspace' / 'agency-skills-suite'
|
|
clients_dir = workspace / 'clients'
|
|
|
|
# Cartella cliente
|
|
client_dir = clients_dir / args.client
|
|
assets_dir = client_dir / 'assets'
|
|
archive_dir = assets_dir / 'archive'
|
|
ops_log = client_dir / 'ops' / 'run_log.md'
|
|
|
|
# Verifica esistenza cartella cliente
|
|
if not client_dir.exists():
|
|
print(f"❌ Cartella cliente non trovata: {client_dir}")
|
|
print(f" Crea prima il progetto con agency-orchestrator")
|
|
sys.exit(1)
|
|
|
|
# Crea cartelle necessarie
|
|
os.makedirs(archive_dir, exist_ok=True)
|
|
os.makedirs(client_dir / 'ops', exist_ok=True)
|
|
|
|
# Determina se è URL o path locale
|
|
is_url = args.path_or_url.startswith('http://') or args.path_or_url.startswith('https://') or args.path_or_url.startswith('ftp://')
|
|
|
|
if is_url:
|
|
# Download
|
|
archive_name = os.path.basename(args.path_or_url.split('?')[0])
|
|
archive_path = archive_dir / archive_name
|
|
|
|
if args.dry_run:
|
|
print(f"🔍 [DRY-RUN] Download: {args.path_or_url} → {archive_path}")
|
|
sys.exit(0)
|
|
|
|
if not download_file(args.path_or_url, str(archive_path), args.verbose):
|
|
sys.exit(1)
|
|
|
|
else:
|
|
# Path locale
|
|
archive_path = Path(args.path_or_url)
|
|
archive_name = archive_path.name
|
|
|
|
if not archive_path.exists():
|
|
print(f"❌ File non trovato: {archive_path}")
|
|
sys.exit(1)
|
|
|
|
if args.dry_run:
|
|
print(f"🔍 [DRY-RUN] Estrai: {archive_path} → {assets_dir}")
|
|
sys.exit(0)
|
|
|
|
# Copia in archive/
|
|
shutil.copy2(archive_path, archive_dir / archive_name)
|
|
|
|
if args.verbose:
|
|
print(f"\n📦 Archivio: {archive_name}")
|
|
print(f"📁 Destinazione: {assets_dir}")
|
|
print()
|
|
|
|
# Estrai in temporanea
|
|
temp_dir = archive_dir / '.temp_extract'
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
print("🔄 Estrazione in corso...")
|
|
extracted = extract_archive(str(archive_dir / archive_name), str(temp_dir), args.verbose)
|
|
|
|
if not extracted:
|
|
print("❌ Nessun file estratto")
|
|
shutil.rmtree(temp_dir)
|
|
sys.exit(1)
|
|
|
|
# Organizza file
|
|
print("\n🗂️ Organizzazione file...")
|
|
organized = organize_files(temp_dir, assets_dir, args.client, args.verbose)
|
|
|
|
# Pulisci temporanea
|
|
shutil.rmtree(temp_dir)
|
|
|
|
# Log operazione
|
|
log_operation(args.client, archive_name, organized, ops_log)
|
|
|
|
# Elimina archivio originale (se non --keep-archive)
|
|
if not args.keep_archive:
|
|
os.remove(archive_dir / archive_name)
|
|
if args.verbose:
|
|
print(f"\n🗑️ Archivio originale eliminato")
|
|
|
|
# Riepilogo
|
|
print(f"\n✅ Completato!")
|
|
print(f" 📦 File estratti: {len(organized)}")
|
|
print(f" 📁 Cartella: {assets_dir}")
|
|
print(f" 📝 Log: {ops_log}")
|
|
print(f"\n👉 Prossimo step: python scripts/scan_resources.py --client {args.client}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|