add python scraper using claude ai
This commit is contained in:
commit
907dc82938
187
scrape.py
Normal file
187
scrape.py
Normal file
@ -0,0 +1,187 @@
|
||||
import os
|
||||
import requests
|
||||
from urllib.parse import urljoin, urlparse, urlunparse
|
||||
from bs4 import BeautifulSoup
|
||||
import mimetypes
|
||||
import re
|
||||
import logging
|
||||
|
||||
class WebflowScraper:
|
||||
def __init__(self, base_url, output_dir="website"):
|
||||
self.base_url = base_url.rstrip('/')
|
||||
self.output_dir = output_dir
|
||||
self.visited_urls = set()
|
||||
self.session = requests.Session()
|
||||
self.encoding='utf-8'
|
||||
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(message)s')
|
||||
self.logger = logging
|
||||
|
||||
def clean_url(self, url):
|
||||
"""Remove query parameters and fragments from URL"""
|
||||
parsed = urlparse(url)
|
||||
path = parsed.path
|
||||
if not path or path == '/':
|
||||
path = '/index.html'
|
||||
return urlunparse((parsed.scheme, parsed.netloc, path, '', '', ''))
|
||||
|
||||
def is_same_domain(self, url):
|
||||
"""Check if URL belongs to the same domain"""
|
||||
return urlparse(self.base_url).netloc == urlparse(url).netloc
|
||||
|
||||
def get_local_path(self, url):
|
||||
"""Convert URL to local file path"""
|
||||
clean_url = self.clean_url(url)
|
||||
parsed = urlparse(clean_url)
|
||||
path = parsed.path.lstrip('/')
|
||||
|
||||
# Handle root URL
|
||||
if not path:
|
||||
return os.path.join(self.output_dir, 'index.html')
|
||||
|
||||
# Handle paths without extensions
|
||||
if '.' not in os.path.basename(path):
|
||||
if path.endswith('/'):
|
||||
path = os.path.join(path, 'index.html')
|
||||
else:
|
||||
path = f"{path}.html"
|
||||
|
||||
return os.path.join(self.output_dir, path)
|
||||
|
||||
def get_relative_path(self, from_path, to_path):
|
||||
"""Get relative path from one file to another"""
|
||||
from_dir = os.path.dirname(from_path)
|
||||
return os.path.relpath(to_path, from_dir)
|
||||
|
||||
def save_file(self, url, content, content_type):
|
||||
"""Save content to appropriate file"""
|
||||
local_path = self.get_local_path(url)
|
||||
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
||||
|
||||
mode = 'wb' if isinstance(content, bytes) else 'w'
|
||||
with open(local_path, mode) as f:
|
||||
f.write(content)
|
||||
|
||||
self.logger.info(f"Saved file: {local_path}")
|
||||
return local_path
|
||||
|
||||
def download_asset(self, url):
|
||||
"""Download and save an asset file"""
|
||||
try:
|
||||
response = self.session.get(url)
|
||||
response.raise_for_status()
|
||||
|
||||
content_type = response.headers.get('content-type', '').split(';')[0]
|
||||
local_path = self.save_file(url, response.content, content_type)
|
||||
return local_path
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error downloading asset {url}: {str(e)}")
|
||||
return None
|
||||
|
||||
def process_html(self, url, html_content):
|
||||
"""Process HTML content and download all referenced assets"""
|
||||
soup = BeautifulSoup(html_content, 'html.parser')
|
||||
current_file_path = self.get_local_path(url)
|
||||
|
||||
# Update base href
|
||||
base_tag = soup.find('base')
|
||||
if base_tag:
|
||||
base_tag.decompose()
|
||||
|
||||
# Process different types of assets
|
||||
asset_selectors = {
|
||||
'img': 'src',
|
||||
'link': 'href',
|
||||
'script': 'src',
|
||||
'video': 'src',
|
||||
'source': 'src',
|
||||
'audio': 'src',
|
||||
'iframe': 'src',
|
||||
}
|
||||
|
||||
# Process assets
|
||||
for tag, attr in asset_selectors.items():
|
||||
for element in soup.find_all(tag):
|
||||
if attr in element.attrs:
|
||||
asset_url = element[attr]
|
||||
if not asset_url or asset_url.startswith(('data:', 'blob:', 'javascript:', '#')):
|
||||
continue
|
||||
|
||||
asset_url = urljoin(url, asset_url)
|
||||
|
||||
if self.is_same_domain(asset_url):
|
||||
if asset_url not in self.visited_urls:
|
||||
self.visited_urls.add(asset_url)
|
||||
local_path = self.download_asset(asset_url)
|
||||
if local_path:
|
||||
relative_path = self.get_relative_path(current_file_path, local_path)
|
||||
element[attr] = relative_path
|
||||
else:
|
||||
local_path = self.get_local_path(asset_url)
|
||||
relative_path = self.get_relative_path(current_file_path, local_path)
|
||||
element[attr] = relative_path
|
||||
|
||||
# Process internal links
|
||||
for a in soup.find_all('a', href=True):
|
||||
href = urljoin(url, a['href'])
|
||||
if href.startswith(('mailto:', 'tel:', '#')):
|
||||
continue
|
||||
|
||||
if self.is_same_domain(href):
|
||||
clean_href = self.clean_url(href)
|
||||
if clean_href not in self.visited_urls:
|
||||
self.crawl(clean_href)
|
||||
|
||||
local_path = self.get_local_path(clean_href)
|
||||
relative_path = self.get_relative_path(current_file_path, local_path)
|
||||
a['href'] = relative_path
|
||||
|
||||
# Update form actions
|
||||
for form in soup.find_all('form', action=True):
|
||||
action_url = urljoin(url, form['action'])
|
||||
if self.is_same_domain(action_url):
|
||||
local_path = self.get_local_path(action_url)
|
||||
relative_path = self.get_relative_path(current_file_path, local_path)
|
||||
form['action'] = relative_path
|
||||
|
||||
# Handle UTF-8 encoding issues
|
||||
html_content = str(soup)
|
||||
html_content = html_content.encode('utf-8', 'replace').decode('utf-8')
|
||||
|
||||
return html_content
|
||||
|
||||
def crawl(self, url):
|
||||
"""Crawl the website starting from the given URL"""
|
||||
if url in self.visited_urls:
|
||||
return
|
||||
|
||||
self.visited_urls.add(url)
|
||||
self.logger.info(f"Crawling: {url}")
|
||||
|
||||
try:
|
||||
response = self.session.get(url)
|
||||
response.raise_for_status()
|
||||
|
||||
content_type = response.headers.get('content-type', '').split(';')[0]
|
||||
|
||||
if 'text/html' in content_type:
|
||||
processed_html = self.process_html(url, response.text)
|
||||
self.save_file(url, processed_html, content_type)
|
||||
else:
|
||||
self.save_file(url, response.content, content_type)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error crawling {url}: {str(e)}")
|
||||
|
||||
def scrape(self):
|
||||
"""Start the scraping process"""
|
||||
self.logger.info(f"Starting to scrape {self.base_url}")
|
||||
if os.path.exists(self.output_dir):
|
||||
self.logger.warning(f"Warning: Output directory {self.output_dir} already exists. Files may be overwritten.")
|
||||
self.crawl(self.base_url)
|
||||
self.logger.info("Scraping completed")
|
||||
|
||||
# Usage
|
||||
scraper = WebflowScraper('https://laconic-staging.webflow.io')
|
||||
scraper.scrape()
|
Loading…
Reference in New Issue
Block a user