refactor(repo): reorganize repo into skill package and script directories
This commit is contained in:
332
script/package_skills.py
Executable file
332
script/package_skills.py
Executable file
@@ -0,0 +1,332 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from zipfile import ZIP_DEFLATED, ZipFile
|
||||
|
||||
|
||||
EXCLUDED_DIRS = {"evals", "packages", "upload-md", "scripts"}
|
||||
KEY_VALUE_RE = re.compile(r"^([A-Za-z0-9_-]+)\s*:\s*(.*)\s*$")
|
||||
|
||||
|
||||
@dataclass
|
||||
class SkillRecord:
|
||||
source_dir_name: str
|
||||
source_path: Path
|
||||
declared_name: str
|
||||
description: str
|
||||
package_name: str
|
||||
|
||||
|
||||
def parse_frontmatter(skill_md: Path) -> tuple[str | None, str | None, str | None]:
|
||||
content = skill_md.read_text(encoding="utf-8")
|
||||
lines = content.splitlines()
|
||||
if not lines or lines[0].strip() != "---":
|
||||
return None, None, "missing YAML frontmatter opening delimiter '---'"
|
||||
|
||||
try:
|
||||
end_idx = lines.index("---", 1)
|
||||
except ValueError:
|
||||
return None, None, "missing YAML frontmatter closing delimiter '---'"
|
||||
|
||||
yaml_lines = lines[1:end_idx]
|
||||
values: dict[str, str] = {}
|
||||
for line in yaml_lines:
|
||||
match = KEY_VALUE_RE.match(line)
|
||||
if not match:
|
||||
continue
|
||||
key = match.group(1).strip()
|
||||
value = match.group(2).strip().strip("'\"")
|
||||
values[key] = value
|
||||
|
||||
name = values.get("name")
|
||||
description = values.get("description")
|
||||
if not name or not description:
|
||||
return None, None, "frontmatter must include both 'name' and 'description'"
|
||||
return name, description, None
|
||||
|
||||
|
||||
def iter_skill_dirs(skills_root: Path) -> list[Path]:
|
||||
return [
|
||||
path
|
||||
for path in sorted(skills_root.iterdir())
|
||||
if path.is_dir() and path.name not in EXCLUDED_DIRS and not path.name.startswith(".")
|
||||
]
|
||||
|
||||
|
||||
def ensure_clean_output(output_dir: Path) -> tuple[Path, Path]:
|
||||
per_skill_dir = output_dir / "per-skill"
|
||||
bundles_dir = output_dir / "bundles"
|
||||
|
||||
if per_skill_dir.exists():
|
||||
shutil.rmtree(per_skill_dir)
|
||||
if bundles_dir.exists():
|
||||
shutil.rmtree(bundles_dir)
|
||||
|
||||
per_skill_dir.mkdir(parents=True, exist_ok=True)
|
||||
bundles_dir.mkdir(parents=True, exist_ok=True)
|
||||
return per_skill_dir, bundles_dir
|
||||
|
||||
|
||||
def add_directory_to_zip(zip_file: ZipFile, source_dir: Path, arc_prefix: str | None = None) -> None:
|
||||
for file_path in sorted(source_dir.rglob("*")):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
relative = file_path.relative_to(source_dir).as_posix()
|
||||
if arc_prefix:
|
||||
relative = f"{arc_prefix}/{relative}"
|
||||
zip_file.write(file_path, relative)
|
||||
|
||||
|
||||
def create_skill_archives(skill: SkillRecord, per_skill_dir: Path) -> tuple[Path, Path]:
|
||||
skill_file = per_skill_dir / f"{skill.package_name}.skill"
|
||||
zip_file = per_skill_dir / f"{skill.package_name}.zip"
|
||||
|
||||
with ZipFile(skill_file, "w", compression=ZIP_DEFLATED) as zf:
|
||||
add_directory_to_zip(zf, skill.source_path)
|
||||
|
||||
with ZipFile(zip_file, "w", compression=ZIP_DEFLATED) as zf:
|
||||
add_directory_to_zip(zf, skill.source_path)
|
||||
|
||||
return skill_file, zip_file
|
||||
|
||||
|
||||
def validate_archive_contains_skill_md(archive_path: Path) -> tuple[bool, str]:
|
||||
with ZipFile(archive_path, "r") as zf:
|
||||
names = set(zf.namelist())
|
||||
if "SKILL.md" not in names:
|
||||
return False, f"{archive_path.name}: SKILL.md not found at archive root"
|
||||
content = zf.read("SKILL.md").decode("utf-8", errors="replace")
|
||||
if "name:" not in content or "description:" not in content:
|
||||
return False, f"{archive_path.name}: SKILL.md missing 'name' or 'description'"
|
||||
return True, f"{archive_path.name}: OK"
|
||||
|
||||
|
||||
def validate_bundle_contains_skills(bundle_path: Path, source_dir_names: list[str]) -> tuple[bool, str]:
|
||||
with ZipFile(bundle_path, "r") as zf:
|
||||
entries = zf.namelist()
|
||||
missing = []
|
||||
for dir_name in source_dir_names:
|
||||
prefix = f"{dir_name}/"
|
||||
if not any(item.startswith(prefix) for item in entries):
|
||||
missing.append(dir_name)
|
||||
if missing:
|
||||
return False, f"bundle missing directories: {', '.join(missing)}"
|
||||
return True, "bundle contains all valid skill directories"
|
||||
|
||||
|
||||
def build_index(
|
||||
output_file: Path,
|
||||
skills_root: Path,
|
||||
valid_skills: list[SkillRecord],
|
||||
skipped: list[str],
|
||||
warnings: list[str],
|
||||
validations: list[tuple[bool, str]],
|
||||
bundle_path: Path,
|
||||
) -> None:
|
||||
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
all_ok = all(ok for ok, _ in validations)
|
||||
|
||||
lines = [
|
||||
"# PACKAGES INDEX",
|
||||
"",
|
||||
f"- Generated: {ts}",
|
||||
f"- Skills root: `{skills_root}`",
|
||||
f"- Valid skills packaged: {len(valid_skills)}",
|
||||
f"- Package formats: `.skill` + `.zip` (per skill)",
|
||||
f"- Bundle: `{bundle_path.name}`",
|
||||
f"- Validation status: {'PASS' if all_ok else 'FAIL'}",
|
||||
"",
|
||||
"## Included Skills",
|
||||
"",
|
||||
]
|
||||
|
||||
for skill in valid_skills:
|
||||
lines.append(
|
||||
"- `{name}` (source dir: `{src}`, declared name: `{decl}`)".format(
|
||||
name=skill.package_name, src=skill.source_dir_name, decl=skill.declared_name
|
||||
)
|
||||
)
|
||||
|
||||
lines.extend(["", "## Warnings", ""])
|
||||
if warnings:
|
||||
lines.extend([f"- {msg}" for msg in warnings])
|
||||
else:
|
||||
lines.append("- None")
|
||||
|
||||
lines.extend(["", "## Skipped", ""])
|
||||
if skipped:
|
||||
lines.extend([f"- {msg}" for msg in skipped])
|
||||
else:
|
||||
lines.append("- None")
|
||||
|
||||
lines.extend(["", "## Validation", ""])
|
||||
for ok, message in validations:
|
||||
lines.append(f"- [{'x' if ok else ' '}] {message}")
|
||||
|
||||
lines.extend(["", "## Output Paths", "", "- `packages/per-skill/`", "- `packages/bundles/`"])
|
||||
output_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
def package_skills(skills_root: Path, output_dir: Path, bundle_name: str, spot_check_count: int) -> int:
|
||||
per_skill_dir, bundles_dir = ensure_clean_output(output_dir)
|
||||
|
||||
skipped: list[str] = []
|
||||
warnings: list[str] = []
|
||||
valid_skills: list[SkillRecord] = []
|
||||
|
||||
used_names: dict[str, int] = {}
|
||||
for skill_dir in iter_skill_dirs(skills_root):
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
if not skill_md.exists():
|
||||
skipped.append(f"{skill_dir.name}: missing SKILL.md")
|
||||
continue
|
||||
|
||||
declared_name, description, error = parse_frontmatter(skill_md)
|
||||
if error:
|
||||
skipped.append(f"{skill_dir.name}: {error}")
|
||||
continue
|
||||
assert declared_name is not None and description is not None
|
||||
|
||||
package_name = declared_name
|
||||
if declared_name != skill_dir.name:
|
||||
warnings.append(
|
||||
f"{skill_dir.name}: declared name '{declared_name}' differs from directory name"
|
||||
)
|
||||
|
||||
if package_name in used_names:
|
||||
used_names[package_name] += 1
|
||||
new_name = f"{package_name}-{used_names[package_name]}"
|
||||
warnings.append(
|
||||
f"name collision for '{package_name}', renamed package to '{new_name}'"
|
||||
)
|
||||
package_name = new_name
|
||||
else:
|
||||
used_names[package_name] = 1
|
||||
|
||||
valid_skills.append(
|
||||
SkillRecord(
|
||||
source_dir_name=skill_dir.name,
|
||||
source_path=skill_dir,
|
||||
declared_name=declared_name,
|
||||
description=description,
|
||||
package_name=package_name,
|
||||
)
|
||||
)
|
||||
|
||||
for skill in valid_skills:
|
||||
create_skill_archives(skill, per_skill_dir)
|
||||
|
||||
bundle_path = bundles_dir / bundle_name
|
||||
with ZipFile(bundle_path, "w", compression=ZIP_DEFLATED) as zf:
|
||||
for skill in valid_skills:
|
||||
add_directory_to_zip(zf, skill.source_path, arc_prefix=skill.source_dir_name)
|
||||
|
||||
validations: list[tuple[bool, str]] = []
|
||||
|
||||
expected = 2 * len(valid_skills)
|
||||
produced = len(list(per_skill_dir.glob("*.skill"))) + len(list(per_skill_dir.glob("*.zip")))
|
||||
validations.append(
|
||||
(
|
||||
expected == produced,
|
||||
f"package count check: expected {expected}, found {produced}",
|
||||
)
|
||||
)
|
||||
|
||||
check_items = valid_skills[: max(0, spot_check_count)]
|
||||
for skill in check_items:
|
||||
skill_archive = per_skill_dir / f"{skill.package_name}.skill"
|
||||
zip_archive = per_skill_dir / f"{skill.package_name}.zip"
|
||||
validations.append(validate_archive_contains_skill_md(skill_archive))
|
||||
validations.append(validate_archive_contains_skill_md(zip_archive))
|
||||
|
||||
validations.append(
|
||||
validate_bundle_contains_skills(
|
||||
bundle_path=bundle_path,
|
||||
source_dir_names=[record.source_dir_name for record in valid_skills],
|
||||
)
|
||||
)
|
||||
|
||||
index_path = output_dir / "PACKAGES_INDEX.md"
|
||||
build_index(
|
||||
output_file=index_path,
|
||||
skills_root=skills_root,
|
||||
valid_skills=valid_skills,
|
||||
skipped=skipped,
|
||||
warnings=warnings,
|
||||
validations=validations,
|
||||
bundle_path=bundle_path,
|
||||
)
|
||||
|
||||
print(f"Skills root: {skills_root}")
|
||||
print(f"Valid skills: {len(valid_skills)}")
|
||||
print(f"Skipped: {len(skipped)}")
|
||||
print(f"Warnings: {len(warnings)}")
|
||||
print(f"Per-skill output: {per_skill_dir}")
|
||||
print(f"Bundle output: {bundle_path}")
|
||||
print(f"Index: {index_path}")
|
||||
all_ok = all(ok for ok, _ in validations)
|
||||
print(f"Validation: {'PASS' if all_ok else 'FAIL'}")
|
||||
if not all_ok:
|
||||
for ok, message in validations:
|
||||
if not ok:
|
||||
print(f" - {message}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
repo_root = Path(__file__).resolve().parents[1]
|
||||
default_skills_root = repo_root / "skill"
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Create .skill/.zip import packages for each skill and a collection bundle."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--skills-root",
|
||||
type=Path,
|
||||
default=default_skills_root,
|
||||
help="Path to skill collection root (default: script parent directory).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Output directory (default: <skills-root>/packages).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--bundle-name",
|
||||
type=str,
|
||||
default="mechanical-skills-collection.zip",
|
||||
help="Filename for the collection bundle.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--spot-check-count",
|
||||
type=int,
|
||||
default=3,
|
||||
help="How many skills to spot-check for SKILL.md in both archive formats.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
skills_root = args.skills_root.resolve()
|
||||
output_dir = (args.output_dir or (skills_root.parent / "package")).resolve()
|
||||
|
||||
if not skills_root.exists() or not skills_root.is_dir():
|
||||
print(f"skills root does not exist or is not a directory: {skills_root}")
|
||||
return 2
|
||||
|
||||
return package_skills(
|
||||
skills_root=skills_root,
|
||||
output_dir=output_dir,
|
||||
bundle_name=args.bundle_name,
|
||||
spot_check_count=max(0, args.spot_check_count),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user