# 如果文件已存在且大小匹配,跳过下载 if os.path.exists(local_filename) and os.path.getsize(local_filename) == content_length: logging.info(f"{local_filename} already exists and is complete. Skipping...") return
# 启用断点续传 resume_header = {} if os.path.exists(local_filename): resume_header['Range'] = f'bytes={os.path.getsize(local_filename)}-'
# 下载文件并显示进度条 with session.get(file_url, headers=resume_header, stream=True) as r: r.raise_for_status() mode = 'ab'if'Range'in resume_header else'wb' withopen(local_filename, mode) as f, tqdm( desc=local_filename, total=content_length, initial=os.path.getsize(local_filename) if'Range'in resume_header else0, unit='B', unit_scale=True, unit_divisor=1024, ) as bar: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) bar.update(len(chunk)) logging.info(f"Saved {local_filename}") return except requests.exceptions.RequestException as e: logging.error(f"Failed to download {file_url} (attempt {attempt + 1}): {e}") logging.error(f"Failed to download {file_url} after {retries} attempts.") except Exception as e: logging.error(f"Unexpected error while downloading {file_url}: {e}")
links_to_download = [] for link in soup.find_all("a", href=True): href = link['href'] if'.'in href: file_url = href if href.startswith("http") else urljoin(url, href) if file_types: ifany(file_url.endswith(file_type) for file_type in file_types): links_to_download.append(file_url) else: links_to_download.append(file_url)
return links_to_download except requests.exceptions.RequestException as e: logging.error(f"Failed to process {url}: {e}") return []
# 使用线程池提取链接 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(extract_links, url, file_types) for url in urls] for future in as_completed(futures): result = future.result() if result: all_links_to_download.extend(result)
# 统计信息 logging.info(f"Found {len(all_links_to_download)} files to download.")
total_size = 0 downloaded_files = 0
# 使用线程池下载文件 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(download_file, file_url) for file_url in all_links_to_download] for future in as_completed(futures): future.result() downloaded_files += 1
defread_urls_from_xlsx(filename): urls = [] wb = openpyxl.load_workbook(filename) ws = wb.active for row in ws.iter_rows(min_row=2, values_only=True): if row[0]: urls.append(row[0]) return urls
这个函数假设Excel文件的第一行是标题行,从第二行开始读取URL。
运行脚本
最后,我们在脚本中添加以下代码来运行整个程序,并支持从命令行传递参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Batch download files from web pages.") parser.add_argument("xlsx_filename", help="Path to the Excel file containing URLs.") parser.add_argument("--max_workers", type=int, default=5, help="Maximum number of threads to use.") parser.add_argument("--file_types", nargs="*", help="List of file types to download (e.g. pdf, jpg).") args = parser.parse_args()
if os.path.exists
(args.xlsx_filename): urls = read_urls_from_xlsx(args.xlsx_filename) if urls: main(urls, args.max_workers, args.file_types) else: logging.error("No URLs found in the Excel file.") else: logging.error("The specified Excel file does not exist.")