forked from Selig/openclaw-skill
Initial commit: OpenClaw Skill Collection
6 custom skills (assign-task, dispatch-webhook, daily-briefing, task-capture, qmd-brain, tts-voice) with technical documentation. Compatible with Claude Code, OpenClaw, Codex CLI, and OpenCode.
This commit is contained in:
402
openclaw-knowhow-skill/patches/add_fetch_and_save.py
Executable file
402
openclaw-knowhow-skill/patches/add_fetch_and_save.py
Executable file
@@ -0,0 +1,402 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Patch script to add fetch_and_save tool to Skill Seekers MCP.
|
||||
|
||||
This tool fetches web pages and saves directly to files WITHOUT passing content
|
||||
through the Claude API, saving ~97% of tokens compared to WebFetch.
|
||||
|
||||
Usage:
|
||||
python add_fetch_and_save.py
|
||||
|
||||
After running:
|
||||
- Restart Claude Code to reload MCP servers
|
||||
- Use: mcp__skill-seeker__fetch_and_save(url="...", output="...")
|
||||
"""
|
||||
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Find skill_seekers installation
|
||||
try:
|
||||
import skill_seekers
|
||||
SKILL_SEEKERS_PATH = Path(skill_seekers.__file__).parent
|
||||
except ImportError:
|
||||
print("❌ skill-seekers not installed. Run: pip install skill-seekers")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"📁 Found skill-seekers at: {SKILL_SEEKERS_PATH}")
|
||||
|
||||
# ============================================================================
|
||||
# 1. Add fetch_and_save_tool to scraping_tools.py
|
||||
# ============================================================================
|
||||
|
||||
FETCH_AND_SAVE_CODE = '''
|
||||
|
||||
async def fetch_and_save_tool(args: dict) -> list[TextContent]:
|
||||
"""
|
||||
Fetch web pages and save directly to files without passing content through Claude API.
|
||||
|
||||
This tool is optimized for token efficiency - it downloads content using Python's
|
||||
httpx library and saves directly to disk. Only metadata (status, bytes, path) is
|
||||
returned, NOT the actual content. This saves ~97% of tokens compared to WebFetch.
|
||||
|
||||
Supports:
|
||||
- Single URL or batch of URLs
|
||||
- Automatic markdown extraction from HTML
|
||||
- Raw markdown file preservation
|
||||
- Configurable output paths
|
||||
|
||||
Args:
|
||||
args: Dictionary containing:
|
||||
- url (str, optional): Single URL to fetch
|
||||
- urls (list, optional): List of {"url": str, "output": str} objects for batch
|
||||
- output (str, optional): Output file path (required if using single url)
|
||||
- extract_markdown (bool, optional): Extract markdown from HTML (default: True)
|
||||
- timeout (int, optional): Request timeout in seconds (default: 30)
|
||||
- rate_limit (float, optional): Delay between requests in seconds (default: 0.5)
|
||||
|
||||
Returns:
|
||||
List[TextContent]: Summary of fetched files (status, bytes, errors) - NOT content
|
||||
|
||||
Example:
|
||||
# Single file
|
||||
fetch_and_save(url="https://docs.example.com/guide.md", output="docs/guide.md")
|
||||
|
||||
# Batch mode
|
||||
fetch_and_save(urls=[
|
||||
{"url": "https://docs.example.com/intro.md", "output": "docs/intro.md"},
|
||||
{"url": "https://docs.example.com/api.md", "output": "docs/api.md"}
|
||||
])
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
try:
|
||||
import httpx
|
||||
from bs4 import BeautifulSoup
|
||||
except ImportError as e:
|
||||
return [TextContent(type="text", text=f"❌ Missing dependency: {e}\\nInstall with: pip install httpx beautifulsoup4")]
|
||||
|
||||
# Parse arguments
|
||||
single_url = args.get("url")
|
||||
urls_list = args.get("urls", [])
|
||||
single_output = args.get("output")
|
||||
extract_markdown = args.get("extract_markdown", True)
|
||||
timeout_val = args.get("timeout", 30)
|
||||
rate_limit = args.get("rate_limit", 0.5)
|
||||
|
||||
# Build task list
|
||||
tasks = []
|
||||
if single_url and single_output:
|
||||
tasks.append({"url": single_url, "output": single_output})
|
||||
if urls_list:
|
||||
tasks.extend(urls_list)
|
||||
|
||||
if not tasks:
|
||||
return [TextContent(type="text", text="❌ Error: Must provide 'url' + 'output' or 'urls' list")]
|
||||
|
||||
# Results tracking
|
||||
results = {
|
||||
"success": [],
|
||||
"failed": [],
|
||||
"total_bytes": 0,
|
||||
}
|
||||
|
||||
def extract_text_from_html(html_content: str) -> str:
|
||||
"""Extract clean text content from HTML, preserving structure as markdown."""
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
|
||||
# Remove script and style elements
|
||||
for tag in soup(["script", "style", "nav", "footer", "header"]):
|
||||
tag.decompose()
|
||||
|
||||
# Find main content area
|
||||
main = soup.select_one("main, article, [role='main'], .content, .markdown-body")
|
||||
if not main:
|
||||
main = soup.body or soup
|
||||
|
||||
lines = []
|
||||
|
||||
# Extract title
|
||||
title = soup.select_one("title")
|
||||
if title:
|
||||
lines.append(f"# {title.get_text().strip()}\\n")
|
||||
|
||||
# Process content
|
||||
for elem in main.find_all(["h1", "h2", "h3", "h4", "h5", "h6", "p", "pre", "code", "li", "blockquote"]):
|
||||
text = elem.get_text().strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
if elem.name.startswith("h"):
|
||||
level = int(elem.name[1])
|
||||
lines.append(f"\\n{'#' * level} {text}\\n")
|
||||
elif elem.name == "pre":
|
||||
code = elem.get_text()
|
||||
lang = ""
|
||||
code_elem = elem.find("code")
|
||||
if code_elem:
|
||||
classes = code_elem.get("class", [])
|
||||
for cls in classes:
|
||||
if cls.startswith("language-"):
|
||||
lang = cls.replace("language-", "")
|
||||
break
|
||||
lines.append(f"\\n```{lang}\\n{code}\\n```\\n")
|
||||
elif elem.name == "li":
|
||||
lines.append(f"- {text}")
|
||||
elif elem.name == "blockquote":
|
||||
lines.append(f"> {text}")
|
||||
elif elem.name == "p":
|
||||
lines.append(f"\\n{text}\\n")
|
||||
|
||||
return "\\n".join(lines)
|
||||
|
||||
async def fetch_single(client, task: dict) -> dict:
|
||||
"""Fetch a single URL and save to file."""
|
||||
url = task["url"]
|
||||
output_path = Path(task["output"])
|
||||
|
||||
try:
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
response = await client.get(url, follow_redirects=True)
|
||||
response.raise_for_status()
|
||||
content = response.text
|
||||
|
||||
if extract_markdown and not url.endswith(".md"):
|
||||
if content.strip().startswith("<!DOCTYPE") or content.strip().startswith("<html"):
|
||||
content = extract_text_from_html(content)
|
||||
|
||||
output_path.write_text(content, encoding="utf-8")
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"url": url,
|
||||
"output": str(output_path),
|
||||
"bytes": len(content.encode("utf-8")),
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "error",
|
||||
"url": url,
|
||||
"output": str(output_path),
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
async def run_all():
|
||||
async with httpx.AsyncClient(
|
||||
timeout=timeout_val,
|
||||
headers={"User-Agent": "Mozilla/5.0 (Skill Seeker Documentation Fetcher)"},
|
||||
follow_redirects=True,
|
||||
) as client:
|
||||
for i, task in enumerate(tasks):
|
||||
result = await fetch_single(client, task)
|
||||
|
||||
if result["status"] == "ok":
|
||||
results["success"].append(result)
|
||||
results["total_bytes"] += result["bytes"]
|
||||
else:
|
||||
results["failed"].append(result)
|
||||
|
||||
if i < len(tasks) - 1 and rate_limit > 0:
|
||||
await asyncio.sleep(rate_limit)
|
||||
|
||||
await run_all()
|
||||
|
||||
output_lines = [
|
||||
f"📥 Fetch and Save Complete",
|
||||
f"",
|
||||
f"✅ Success: {len(results['success'])} files",
|
||||
f"❌ Failed: {len(results['failed'])} files",
|
||||
f"📦 Total: {results['total_bytes']:,} bytes",
|
||||
f"",
|
||||
]
|
||||
|
||||
if results["success"]:
|
||||
output_lines.append("### Saved Files:")
|
||||
for r in results["success"][:20]:
|
||||
output_lines.append(f" - {r['output']} ({r['bytes']:,} bytes)")
|
||||
if len(results["success"]) > 20:
|
||||
output_lines.append(f" ... and {len(results['success']) - 20} more")
|
||||
|
||||
if results["failed"]:
|
||||
output_lines.append("\\n### Failed:")
|
||||
for r in results["failed"][:10]:
|
||||
output_lines.append(f" - {r['url']}: {r['error']}")
|
||||
if len(results["failed"]) > 10:
|
||||
output_lines.append(f" ... and {len(results['failed']) - 10} more")
|
||||
|
||||
return [TextContent(type="text", text="\\n".join(output_lines))]
|
||||
'''
|
||||
|
||||
TOOL_REGISTRATION = '''
|
||||
|
||||
@safe_tool_decorator(
|
||||
description="Fetch web pages and save directly to files. Token-efficient: downloads via Python httpx, saves to disk, returns only metadata (NOT content). Saves ~97% tokens vs WebFetch. Supports batch mode and HTML-to-markdown conversion."
|
||||
)
|
||||
async def fetch_and_save(
|
||||
url: str | None = None,
|
||||
output: str | None = None,
|
||||
urls: list | None = None,
|
||||
extract_markdown: bool = True,
|
||||
timeout: int = 30,
|
||||
rate_limit: float = 0.5,
|
||||
) -> str:
|
||||
"""
|
||||
Fetch web pages and save directly to files without passing content through Claude API.
|
||||
|
||||
Args:
|
||||
url: Single URL to fetch (use with 'output')
|
||||
output: Output file path for single URL mode
|
||||
urls: List of {"url": str, "output": str} dicts for batch mode
|
||||
extract_markdown: Extract markdown from HTML pages (default: true)
|
||||
timeout: Request timeout in seconds (default: 30)
|
||||
rate_limit: Delay between requests in seconds (default: 0.5)
|
||||
|
||||
Returns:
|
||||
Summary with success/failure counts and file sizes - NOT content.
|
||||
"""
|
||||
args = {
|
||||
"extract_markdown": extract_markdown,
|
||||
"timeout": timeout,
|
||||
"rate_limit": rate_limit,
|
||||
}
|
||||
if url:
|
||||
args["url"] = url
|
||||
if output:
|
||||
args["output"] = output
|
||||
if urls:
|
||||
args["urls"] = urls
|
||||
|
||||
result = await fetch_and_save_impl(args)
|
||||
if isinstance(result, list) and result:
|
||||
return result[0].text if hasattr(result[0], "text") else str(result[0])
|
||||
return str(result)
|
||||
|
||||
'''
|
||||
|
||||
|
||||
def patch_scraping_tools():
|
||||
"""Add fetch_and_save_tool to scraping_tools.py"""
|
||||
file_path = SKILL_SEEKERS_PATH / "mcp" / "tools" / "scraping_tools.py"
|
||||
|
||||
if not file_path.exists():
|
||||
print(f"❌ File not found: {file_path}")
|
||||
return False
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
|
||||
# Check if already patched
|
||||
if "fetch_and_save_tool" in content:
|
||||
print("✓ scraping_tools.py already patched")
|
||||
return True
|
||||
|
||||
# Add to end of file
|
||||
content = content.rstrip() + "\n" + FETCH_AND_SAVE_CODE
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
print("✅ Patched scraping_tools.py")
|
||||
return True
|
||||
|
||||
|
||||
def patch_init():
|
||||
"""Add export to __init__.py"""
|
||||
file_path = SKILL_SEEKERS_PATH / "mcp" / "tools" / "__init__.py"
|
||||
|
||||
if not file_path.exists():
|
||||
print(f"❌ File not found: {file_path}")
|
||||
return False
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
|
||||
if "fetch_and_save_impl" in content:
|
||||
print("✓ __init__.py already patched")
|
||||
return True
|
||||
|
||||
# Add import
|
||||
import_line = "from .scraping_tools import (\n fetch_and_save_tool as fetch_and_save_impl,\n)"
|
||||
content = content.replace(
|
||||
"from .scraping_tools import (\n build_how_to_guides_tool as build_how_to_guides_impl,\n)",
|
||||
"from .scraping_tools import (\n build_how_to_guides_tool as build_how_to_guides_impl,\n)\n" + import_line
|
||||
)
|
||||
|
||||
# Add to __all__
|
||||
content = content.replace(
|
||||
'"extract_config_patterns_impl",',
|
||||
'"extract_config_patterns_impl",\n "fetch_and_save_impl",'
|
||||
)
|
||||
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
print("✅ Patched __init__.py")
|
||||
return True
|
||||
|
||||
|
||||
def patch_server():
|
||||
"""Add tool registration to server_fastmcp.py"""
|
||||
file_path = SKILL_SEEKERS_PATH / "mcp" / "server_fastmcp.py"
|
||||
|
||||
if not file_path.exists():
|
||||
print(f"❌ File not found: {file_path}")
|
||||
return False
|
||||
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
|
||||
if "fetch_and_save_impl" in content:
|
||||
print("✓ server_fastmcp.py already patched")
|
||||
return True
|
||||
|
||||
# Add import
|
||||
content = content.replace(
|
||||
"extract_test_examples_impl,",
|
||||
"extract_test_examples_impl,\n fetch_and_save_impl,"
|
||||
)
|
||||
|
||||
# Add tool registration before PACKAGING TOOLS
|
||||
content = content.replace(
|
||||
"# ============================================================================\n# PACKAGING TOOLS",
|
||||
TOOL_REGISTRATION + "\n# ============================================================================\n# PACKAGING TOOLS"
|
||||
)
|
||||
|
||||
file_path.write_text(content, encoding="utf-8")
|
||||
print("✅ Patched server_fastmcp.py")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
print("=" * 60)
|
||||
print("🔧 Adding fetch_and_save tool to Skill Seekers MCP")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
success = True
|
||||
success = patch_scraping_tools() and success
|
||||
success = patch_init() and success
|
||||
success = patch_server() and success
|
||||
|
||||
print()
|
||||
if success:
|
||||
print("=" * 60)
|
||||
print("✅ Patch complete!")
|
||||
print()
|
||||
print("Next steps:")
|
||||
print(" 1. Restart Claude Code to reload MCP servers")
|
||||
print(" 2. Use the tool:")
|
||||
print()
|
||||
print(" mcp__skill-seeker__fetch_and_save(")
|
||||
print(' url="https://example.com/doc.md",')
|
||||
print(' output="local/path/doc.md"')
|
||||
print(" )")
|
||||
print()
|
||||
print(" Or batch mode:")
|
||||
print()
|
||||
print(" mcp__skill-seeker__fetch_and_save(urls=[")
|
||||
print(' {"url": "...", "output": "..."},')
|
||||
print(' {"url": "...", "output": "..."}')
|
||||
print(" ])")
|
||||
print("=" * 60)
|
||||
else:
|
||||
print("❌ Some patches failed. Check errors above.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user