#!/bin/env python import os import re import requests import sys import csv import lxml import json from urllib.parse import urlparse from pathlib import Path import lxml.html import argparse DEBUG_MODE = True dirOut = Path("r34") class Spinner(object): def __init__(self): self.SPINNER = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] self.SPINNER_status = 1 def __call__(self): self.SPINNER_status = (self.SPINNER_status + 1) % len(self.SPINNER) return self.SPINNER[self.SPINNER_status] quickSpinner = Spinner() def pretty_print_Request_header(req): print(f"{req.method} {req.url}") for k, v in req.headers.items(): print(f"{k}: {v}") def outputFileFromURL(url): u = urlparse(url) name = Path(u.path).name alreadyExists = False finalDest = dirOut.joinpath(Path(name)) i = 1 while finalDest.exists(): alreadyExists = True s_name = Path(u.path).stem s_suffix = Path(u.path).suffix next_name = f"{s_name}_{i}{s_suffix}" i += 1 finalDest = dirOut.joinpath(Path(next_name)) return alreadyExists, str(finalDest) def downloadURL(s, url, idLink, output, headers={}): r = s.get(url, stream=True, headers=headers) u = urlparse(url) if r.ok: if DEBUG_MODE: print(f"\n{url} >> {output}") print(f"\r⬇️ {Path(u.path).name}", end='') try: with open(output, "wb") as f: print(f"\r⬇️ {quickSpinner()} {Path(u.path).name}", end='') chunk_size = 4 * 1024 for chunk in r.iter_content(chunk_size=chunk_size): f.write(chunk) print(f"\rDownloaded {Path(u.path).name} - {idLink}") except Exception as e: print(f"\r☠️ error {Path(u.path).name} - {idLink}") if DEBUG_MODE: print(e) def downloadFromItems(items, s, u): replace = { 'mov480.': 'mov.', 'mov720.': 'mov.', 'mov256.': 'mov.', 'pic256.': 'pic.', 'pic480.': 'pic.', 'pic720.': 'pic.', 'picsmall.': 'pic.' } for item in items: isDownloaded = False isVideo = item['duration'] is not None for file in item['imageLinks']: url = file['url'] if isVideo and not url.endswith('.mp4'): continue for r,t in replace.items(): if r in url: url = url.replace(r, t) e, outputFile = outputFileFromURL(url) if e: print("Already downloaded") return headers = {"Referer": f"{u.scheme}://{u.hostname}/"} headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" headers['Accept-encoding'] = "gzip, deflate, br" downloadURL(s, url, "???", outputFile, headers) isDownloaded = True break if isDownloaded: break def downloadImagesFromPlaylist(index, link): s = requests.Session() u = urlparse(link) p = Path(u.path) playlistId = p.stem print(f"Link n°{index} - {playlistId}") nbPage = 0 while True: urlPlaylistFormat = f"https://rule34.world/api/playlist-item?playlistId={playlistId}&Skip={nbPage*60}&Take=60&DisableTotal=true" resp = s.get(urlPlaylistFormat) if resp.ok and resp.status_code == 200: data = json.loads(resp.content) if len(data['items']) > 0: print(f"Ok ! {len(data['items'])}") downloadFromItems(data['items'], s, u) else: break nbPage += 1 return try: html = lxml.html.fromstring(resp.content) boxes = html.cssselect("div.box a.boxInner") for box in boxes: url = box.attrib['href'] url = f"{u.scheme}://{u.hostname}{url}" downloadImageFromPage(index, url) except Exception as e: print(f"\r☠️ error {Path(u.path).name} - {idLink}") if DEBUG_MODE: print(e) def downloadImagesFromUserboard(index, link): s = requests.Session() u = urlparse(link) p = Path(u.path) boardName = p.stem print(f"Link n°{index} - {boardName}", end='') resp = s.get(link) findImagesRegex = re.compile('> {outputFile}") idLink = match.group(1) print(f"Link n°{index}.{subIndex} - {boardName}>{idLink}", end='') headers = {"Referer": f"{u.scheme}://{u.hostname}/"} headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" headers['Accept-encoding'] = "gzip, deflate, br" downloadURL(s, urlImg, idLink, outputFile, headers) def downloadVideoFromSource(index, urlBase, idVid, urlVideo): s = requests.Session() print(f"Link n°{index} - Vidéo {idVid}", end='') print(f"Link : {urlVideo} !", end='\n') u = urlparse(urlBase) urlVideo = urlVideo.replace('mov480', 'mov') e, outputFile = outputFileFromURL(urlVideo) if e: print("Already downloaded") return headers = {"Referer": f"{u.scheme}://{u.hostname}/"} headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" headers['Accept-encoding'] = "gzip, deflate, br" print(f"Download to {outputFile}") downloadURL(s, urlVideo, idVid, outputFile, headers) def downloadImageFromPage(index, link): s = requests.Session() match = re.search(r'(\d{6,})', link) if not match: return idLink = match.group(1) print(f"Link n°{index} - {idLink}\n", end='') resp = s.get(link) if resp.ok and resp.status_code == 200: try: html = lxml.html.fromstring(resp.content) imgs = html.cssselect("img.img.shadow-base") if len(imgs) == 0: vids = html.cssselect("video.video.shadow-base source") if len(vids) > 0: url = vids[0].attrib['src'] u = urlparse(link) if url[0] == '/': url = f"{u.scheme}://{u.hostname}{url}" downloadVideoFromSource(index, link, idLink, url) else: img = imgs[0] if 'src' in img.attrib: url = img.attrib['src'] u = urlparse(link) if url[0] == '/': url = f"{u.scheme}://{u.hostname}{url}" url = url.replace('picsmall', 'pic') e, outputFile = outputFileFromURL(url) if e: print("Already downloaded") return headers = {"Referer": f"{u.scheme}://{u.hostname}/"} headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" headers['Accept-encoding'] = "gzip, deflate, br" downloadURL(s, url, idLink, outputFile, headers) except: print(f"\r☠️ error {Path(u.path).name} - {idLink}") if DEBUG_MODE: print(e) return def main(): parser = argparse.ArgumentParser() parser.add_argument("-d", "--directory", help="Output directory") parser.add_argument('files', nargs=argparse.REMAINDER) args = parser.parse_args() if args.directory: global dirOut dirOut = Path(args.directory) if not dirOut.exists(): print(f"Output dir \"{dirOut}\" doesn't exists") return if not dirOut.is_dir(): print(f"Output dir \"{dirOut}\" isn't a directory") return for file in args.files: with open(file) as f_listLink: for index, link in enumerate(f_listLink.read().split()): if '/post/' in link: downloadImageFromPage(index, link) if '/user/' in link: downloadImagesFromUserboard(index, link) if '/playlists/' in link: downloadImagesFromPlaylist(index, link) if __name__ == "__main__": main()