From bbc7c8a62fe53543ce1faa72585c3b98d039ab06 Mon Sep 17 00:00:00 2001 From: ache Date: Fri, 13 Jan 2023 02:51:14 +0100 Subject: Make a script to download images --- down_imgs.py | 226 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100755 down_imgs.py diff --git a/down_imgs.py b/down_imgs.py new file mode 100755 index 0000000..5d44b05 --- /dev/null +++ b/down_imgs.py @@ -0,0 +1,226 @@ +#!/bin/env python + +import os +import re +import requests +import sys +import csv +import lxml +from urllib.parse import urlparse +from pathlib import Path +import lxml.html +import argparse + + +DEBUG_MODE = False +dirOut = Path("ss") + + +class Spinner(object): + def __init__(self): + self.SPINNER = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'] + self.SPINNER_status = 1 + + def __call__(self): + self.SPINNER_status = (self.SPINNER_status + 1) % len(self.SPINNER) + return self.SPINNER[self.SPINNER_status] + + +quickSpinner = Spinner() + + +def pretty_print_Request_header(req): + print(f"{req.method} {req.url}") + + for k, v in req.headers.items(): + print(f"{k}: {v}") + + +def outputFileFromURL(url): + u = urlparse(url) + name = Path(u.path).name + + finalDest = dirOut.joinpath(Path(name)) + + i = 1 + + while finalDest.exists(): + s_name = Path(u.path).stem + s_suffix = Path(u.path).suffix + + next_name = f"{s_name}_{i}{s_suffix}" + i += 1 + + finalDest = dirOut.joinpath(Path(next_name)) + + return str(finalDest) + + +def downloadURL(s, url, idLink, output, headers={}): + r = s.get(url, stream=True, headers=headers) + + if DEBUG_MODE: + pretty_print_Request_header(r.request) + + u = urlparse(url) + + if r.ok: + print(f"\r⬇️ {Path(u.path).name}", end='') + + try: + with open(output, "wb") as f: + print(f"\r⬇️ {quickSpinner()} {Path(u.path).name}", end='') + chunk_size = 4 * 1024 + + for chunk in r.iter_content(chunk_size=chunk_size): + f.write(chunk) + print(f"\rDownloaded {Path(u.path).name} - {idLink}") + except Exception as e: + print(f"\r☠️ error {Path(u.path).name} - {idLink}") + if DEBUG_MODE: + print(e) + + +def downloadImagesFromUserboard(index, link): + s = requests.Session() + + u = urlparse(link) + p = Path(u.path) + boardName = p.stem + + print(f"Link n°{index} - {boardName}", end='') + resp = s.get(link) + + if DEBUG_MODE: + pretty_print_Request_header(resp.request) + + findImagesRegex = re.compile('> {outputFile}") + + idLink = match.group(1) + print(f"Link n°{index}.{subIndex} - {boardName}>{idLink}", end='') + + headers = {"Referer": f"{u.scheme}://{u.hostname}/"} + headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" + headers['Accept-encoding'] = "gzip, deflate, br" + downloadURL(s, urlImg, idLink, outputFile, headers) + + +def downloadVideoFromScript(index, urlBase, idVid, scriptElem): + s = requests.Session() + + print(f"Link n°{index} - Vidéo {idVid}", end='') + + urlRegex = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' + scriptHTML = lxml.html.tostring(scriptElem).decode('utf8') + startsWithCode = 'player.updateSrc([\n' + + if scriptHTML.index(startsWithCode) >= 0: + startIndex = scriptHTML.index(startsWithCode) + scriptHTML = scriptHTML[startIndex + len(startsWithCode):] + + match = re.search(urlRegex, scriptHTML) + if not match: + print(f"No found in {scriptHTML}") + return + + urlVideo = match.group(0) + print(f"Link : {urlVideo} !", end='\n') + + if '\'' in urlVideo: + urlVideo = urlVideo[:urlVideo.index('\'')] + + outputFile = outputFileFromURL(urlVideo) + + u = urlparse(urlBase) + headers = {"Referer": f"{u.scheme}://{u.hostname}/"} + headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" + headers['Accept-encoding'] = "gzip, deflate, br" + + print(f"Download to {outputFile}") + downloadURL(s, urlVideo, idVid, outputFile, headers) + + +def downloadImageFromPage(index, link): + s = requests.Session() + match = re.search(r'(\d{7,})', link) + + if not match: + return + + idLink = match.group(1) + print(f"Link n°{index} - {idLink}", end='') + resp = s.get(link) + + if resp.ok and resp.status_code == 200: + try: + html = lxml.html.fromstring(resp.content) + imgs = html.cssselect(".image_frame img") + + if len(imgs) == 0: + vids = html.cssselect(".image_frame video") + vid_script = html.cssselect(".image_frame script") + if len(vids) > 0: + print(": it's a video !") + downloadVideoFromScript(index, link, idLink, vid_script[-1]) + else: + img = imgs[0] + + if DEBUG_MODE: + print(lxml.html.tostring(img)) + + if 'src' in img.attrib: + url = img.attrib['src'] + outputFile = outputFileFromURL(url) + + if DEBUG_MODE: + print(f"\n{url} >> {outputFile}") + + u = urlparse(link) + + headers = {"Referer": f"{u.scheme}://{u.hostname}/"} + headers['Accept'] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8" + headers['Accept-encoding'] = "gzip, deflate, br" + downloadURL(s, url, idLink, outputFile, headers) + + except: + return + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--directory", help="Output directory") + parser.add_argument('files', nargs=argparse.REMAINDER) + args = parser.parse_args() + + if args.directory: + global dirOut + dirOut = Path(args.directory) + + if not dirOut.exists(): + print(f"Output dir \"{dirOut}\" doesn't exists") + return + + if not dirOut.is_dir(): + print(f"Output dir \"{dirOut}\" isn't a directory") + return + + for file in args.files: + with open(file) as f_listLink: + for index, link in enumerate(f_listLink.read().split()): + if '/pin/' in link: + downloadImageFromPage(index, link) + if '/user/' in link: + downloadImagesFromUserboard(index, link) + + +if __name__ == "__main__": + main() -- cgit v1.2.3