A simple tool for archiving:
just run command - python i.py
after installing python and dependencies e.g. python -m pip install requests
i.py
import os
import requests
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import json
# Constants
OUTPUT_DIR = "./media"
LOG_FILE = "./log.txt"
MAX_RETRIES = 3
MAX_THREADS = 4 # Optimal number of simultaneous downloads
# Custom headers to bypass server restrictions
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
# Function to sanitize file names by removing or replacing invalid characters
def sanitize_filename(filename):
return re.sub(r'[\\/*?:"<>|]', '_', filename)
def fetch_url_content(url):
"""Fetch content from a URL with custom headers."""
print(f"Fetching XML from: {url}")
response = requests.get(url, headers=HEADERS)
response.raise_for_status()
return response.content
def parse_xml(content):
"""Extract media links and metadata from XML."""
print("Parsing XML content...")
tree = ET.fromstring(content)
items = []
for item in tree.findall('channel/item'):
title = item.findtext('title', '').strip()
enclosure = item.find('enclosure')
if enclosure is not None:
url = enclosure.get('url', '').strip()
if url:
items.append({
'url': url,
'title': title,
})
print(f"Found {len(items)} media files.")
return items
def download_media_item(item, output_dir, retries=0):
"""Download a media file and save it to the specified directory."""
url = item['url']
title = item['title']
sanitized_title = sanitize_filename(title) # Sanitize the title to avoid invalid characters in the filename
# Ensure the file path is safe for Windows
file_name = os.path.join(output_dir, f"{sanitized_title}.mp3" if '.mp3' in url else os.path.basename(url))
print(f"Downloading: {title}\nURL: {url}\nSaving to: {file_name}")
attempt = 0
success = False
# Retry mechanism
while attempt < MAX_RETRIES and not success:
try:
response = requests.get(url, stream=True, headers=HEADERS)
response.raise_for_status()
with open(file_name, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # Filter out keep-alive chunks
f.write(chunk)
print(f"Download successful: {file_name}")
success = True
except requests.exceptions.RequestException as e:
attempt += 1
print(f"Error downloading {url}: {e}. Attempt {attempt}/{MAX_RETRIES}")
time.sleep(2 ** attempt) # Exponential backoff
except Exception as e:
print(f"General error: {e}")
break
return success, item
def download_media(media, output_dir):
"""Download media files using parallel threads with progress tracking."""
os.makedirs(output_dir, exist_ok=True)
total_files = len(media)
# Initialize progress bar
with tqdm(total=total_files, desc="Downloading", unit="file") as pbar:
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
futures = {executor.submit(download_media_item, item, output_dir): item for item in media}
for future in as_completed(futures):
item = futures[future]
success, item = future.result()
pbar.update(1) # Update progress bar
# Log result
if not success:
with open(LOG_FILE, 'a') as log:
log.write(f"Failed to download: {item['title']} - {item['url']}\n")
def save_progress(media, filename="download_progress.json"):
"""Save download progress to a file."""
progress = {"downloads": media}
with open(filename, "w") as f:
json.dump(progress, f)
def load_progress(filename="download_progress.json"):
"""Load download progress from a file."""
if os.path.exists(filename):
with open(filename, "r") as f:
return json.load(f).get("downloads", [])
return []
def main(url):
"""Main function to handle XML sources and download media."""
try:
content = fetch_url_content(url)
media = parse_xml(content)
# Create output directory based on the host
parsed_url = urlparse(url)
host_dir = os.path.join(OUTPUT_DIR, parsed_url.netloc)
# Load previous progress (if available) and resume from where we left off
progress = load_progress()
remaining_media = [item for item in media if item not in progress]
if remaining_media:
download_media(remaining_media, host_dir)
save_progress(media) # Save progress after download completion
else:
print("All media files have already been downloaded.")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
url = input("Enter URL to scrape (XML feed): ").strip()
main(url)