Files
py34/scraper/scraper.py
2025-07-26 22:52:02 +02:00

97 lines
2.7 KiB
Python

from bs4 import BeautifulSoup
import os
import base64
from threading import Thread
from retry import retry
from PIL import Image
import cloudscraper
import io
enable_cache = False
scraper = cloudscraper.CloudScraper()
def bs4(*argv, **kwargs) -> BeautifulSoup:
return BeautifulSoup(*argv, **kwargs, features="html.parser")
@retry(Exception, tries=5, delay=3)
def get(url: str) -> bs4:
if enable_cache:
if not os.path.exists("cache"):
os.mkdir("cache")
path = "cache/" + base64.b32encode(url.encode()).decode()
if os.path.exists(path):
with open(path, "rb") as file:
return file.read()
res = scraper.get(url)
if res.status_code != 200:
raise Exception(f"Failed to get {url}")
data = res.content
if enable_cache:
with open(path, "wb") as file:
file.write(data)
return data
class Post:
def __init__(self, id: int, image_dir: int, image_id: str, tags: list[str]):
self.id = id
self.image_dir = image_dir
self.image_id = image_id
self.tags = tags.copy()
self.thumbnail_data: bytes | None = None
self.thumbnail: Image.ImageFile.ImageFile | None = None
def _thread():
self.thumbnail_data = get(f"https://wimg.rule34.xxx/thumbnails/{self.image_dir}/thumbnail_{self.image_id}.jpg")
self.thumbnail = Image.open(io.BytesIO(self.thumbnail_data))
self._thread = Thread(target=_thread)
self._thread.start()
def _join(self):
self._thread.join()
def __str__(self):
return f"<Post {self.id}:{self.image_dir}/{self.image_id}>"
def __repr__(self):
return self.__str__()
def get_posts(url: str) -> list[Post]:
posts: list[Post] = []
document = get(url)
try:
for entry in bs4(document).find_all("div", {"class": "image-list"})[0].children:
# Skip garbage
if str(entry).strip() == "": continue
if entry.name != "span": continue
# Extract image
img = entry.find_all("img")[0]
if "src" in img.attrs:
img_src = img["src"].split("?")[0].split("/")[-2:]
else:
img_src = img["data-cfsrc"].split("?")[0].split("/")[-2:]
# Append post
posts.append(Post(
int(entry["id"][1:]),
int(img_src[0]),
img_src[1].split("_")[1].split(".")[0], # Thumbnail_[id].jpg
img["alt"].split(" "),
))
# Process posts
for post in posts:
post._join()
return posts
except Exception as e:
with open("errored-document.html", "wb") as file:
file.write(document)
raise e