Merge branch 'master' into scraper

This commit is contained in:
2025-08-09 17:23:49 +02:00
6 changed files with 144 additions and 44 deletions

View File

@ -32,3 +32,11 @@ def is_nochick(data: BeautifulSoup | str | bytes) -> bool:
def is_toodeep(data: BeautifulSoup | str | bytes) -> bool:
return _is_header(bs4(data), "unable to search this deep in temporarily.")
def is_view(data: BeautifulSoup | str | bytes) -> bool:
doc = bs4(data)
els = doc.find_all("img", attrs = {"id": "image"})
if len(els) == 0:
return False
return True

View File

@ -1,23 +1,24 @@
from .post import Post
from .scraper import scraper, ScraperException
from .url import parse_thumbnail_url
from .dockid import is_nochick, is_toodeep
from .scraper import scraper, ScraperException, CaptchaException
from .url import parse_thumbnail_url, ListURL
from .dockid import is_nochick, is_toodeep, is_captcha
from concurrent.futures import ThreadPoolExecutor
import urllib.parse
from threading import Thread
from retry import retry
class ListException(Exception):
def __init__(self, documnet: bytes, *argv):
super(self, *argv)
super().__init__(self, *argv)
self.document = document
@retry(CaptchaException, tries=5, delay=3, jitter=2)
class List:
def __init__(self, tags: list[str], offset: int = 0, fetch_thumbnails: bool = True):
self.posts: list[Post] = []
tags = "+".join(map(urllib.parse.quote_plus, tags))
document = scraper.get_html(f"https://rule34.xxx/index.php?page=post&s=list&tags={tags}&pid={offset}")
document = scraper.get_html(ListURL(tags, offset))
if is_nochick(document):
return []
@ -25,6 +26,9 @@ class List:
if is_toodeep(document):
raise ListException(document, "Search to deep")
if is_captcha(document):
raise CaptchaException("Received captcha")
try:
for entry in document.find_all("div", {"class": "image-list"})[0].children:
# Skip garbage
@ -41,27 +45,31 @@ class List:
# Extract image
img = entry.find_all("img")[0]
if "src" in img.attrs:
img_src = parse_thumbnail_url(img["src"])
img_src = img["src"]
else:
img_src = parse_thumbnail_url(img["data-cfsrc"])
img_src = img["data-cfsrc"]
# Is it a deleted post?
if img_src.split('?')[0].endswith("thumbnail_.jpg"):
# Post has been deleted, continue
continue
# Parse thumbnail url
img_src = parse_thumbnail_url(img_src)
# Append post
self.posts.append(Post(
post_id,
img_src.dir,
img_src.id,
img["alt"].split(" "),
sorted(list(filter(bool, map(str.strip, img["alt"].split(" "))))),
))
# Download thumbnails
if fetch_thumbnails:
threads = [Thread(target=Post.get_thumbnail, args=(post,)) for post in self.posts]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
with ThreadPoolExecutor(max_workers=5) as pool:
for post in self.posts:
pool.submit(Post.get_thumbnail_data, post)
except ScraperException as ex:
raise ex

View File

@ -13,22 +13,26 @@ class Post:
self.tags: list[str] = tags.copy()
self._thumbnail_data: bytes | None = thumbnail
self._thumbnail: ImageFile | None = Image.open(BytesIO(thumbnail)) if thumbnail else None
self._thumbnail: ImageFile | None = None
self._image_format: str | None = None
self._image_data: bytes | None = None
self._image: ImageFile | None = None
def empty(id: int) -> "Post":
return Post(id, 0, "00", [], b"")
def get_thumbnail(self) -> ImageFile:
if self._thumbnail:
if self._thumbnail is not None:
return self._thumbnail
self._thumbnail = Image.open(BytesIO(self.get_thumbnail_data()))
return self._thumbnail
def get_thumbnail_data(self) -> bytes:
if self._thumbnail_data:
if self._thumbnail_data is not None:
return self._thumbnail_data
self._thumbnail_data = scraper.get(ThumbnailURL(self.image_dir, self.image_id))
return self._thumbnail_data

View File

@ -3,30 +3,57 @@ from cloudscraper import CloudScraper
from requests import Response
from retry import retry
from bs4 import BeautifulSoup
import time
class ScraperException(Exception):
def __init__(self, res: Response, *argv: any):
super(Exception, *argv)
super().__init__(self, *argv)
self.response = res
class CaptchaException(Exception):
def __init__(self, *argv: any):
# Reset scraper
global scraper
scraper.reset()
# Construct the exception
super().__init__(self, *argv)
class Scraper:
def __init__(self):
self._scraper: CloudScraper = CloudScraper()
def _get(self, url: str, body: bool) -> bytes | Response:
res: Response = self._scraper.get(url)
if not body:
return res
if res.status_code != 200:
raise ScraperException(res, "Request did not succeed")
return res.content
def __del__(self):
self.close()
@retry(Exception, tries=5, delay=3)
def _get(self, url: str, body: bool) -> bytes | Response:
while True:
res: Response = self._scraper.get(url)
res.close()
if res.status_code == 429:
self.reset()
time.sleep(10)
continue
if not body:
return res
if res.status_code != 200:
raise ScraperException(res, "Request did not succeed")
return res.content
@retry(Exception, tries=5, delay=5)
def _retry_get(self, url: str, body: bool) -> bytes | Response:
return self._get(url, body=body)
def close(self):
self._scraper.close()
def reset(self):
self._scraper.close()
self._scraper = CloudScraper()
def get(self, url: str, retry: bool = True, body: bool = True):
if retry:
return self._retry_get(url, body=body)

View File

@ -1,10 +1,28 @@
from urllib.parse import urlparse
from urllib.parse import urlparse, quote_plus
from os.path import splitext
IMAGE_FORMATS = ["jpeg", "jpg", "png", "gif", "mp4", "webm"]
class ViewURL:
def __init__(self, post_id: int):
self.id = post_id
def __str__(self) -> str:
return f"https://rule34.xxx/index.php?page=post&s=view&id={self.id}"
class ListURL:
def __init__(self, tags: list[str], offset: int):
self.tags = tags
self.offset = offset
def __str__(self):
tags = "+".join(map(quote_plus, self.tags))
return f"https://rule34.xxx/index.php?page=post&s=list&tags={tags}&pid={self.offset}"
class ImageURL:
def __init__(self, image_dir: int, image_id: str, image_format: str):
self.dir: int = image_dir
@ -15,6 +33,15 @@ class ImageURL:
return f"https://wimg.rule34.xxx//images/{self.dir}/{self.id}.{self.format}"
class SampleURL:
def __init__(self, image_dir: int, image_id: str):
self.dir: int = image_dir
self.id: str = image_id
def __str__(self) -> str:
return f"https://wimg.rule34.xxx//samples/{self.dir}/sample_{self.id}.jpg"
class ThumbnailURL:
def __init__(self, image_dir: int, image_id: str):
self.dir: int = image_dir

View File

@ -1,11 +1,17 @@
from .post import Post
from .url import ImageURL, ThumbnailURL, parse_image_url
from .url import ViewURL, ImageURL, SampleURL, ThumbnailURL, parse_image_url
from .scraper import scraper
from .dockid import is_view
from io import BytesIO
from PIL import Image
from PIL.ImageFile import ImageFile
class ViewMissingException(Exception):
def __init__(self, *args, **kwargs):
super().__init__(self, *args, **kwargs)
class ViewTags:
def __init__(self, cpr: list[str], chr: list[str], art: list[str], gen: list[str], met: list[str]):
self.copyright = cpr.copy()
@ -35,13 +41,19 @@ class ViewTags:
class View:
def __init__(self, id: int):
self.id = int(id)
self._image_data: bytes | None = None
self._image: ImageFile | None = None
self._image_url: ImageURL | None = None
self._thumb_data: bytes | None = None
self._thumb: ImageFile | None = None
self._thumb_url: ThumbnailURL | None = None
document = scraper.get_html(f"https://rule34.xxx/index.php?page=post&s=view&id={id}")
self._image_data: bytes | None = None
self._image: ImageFile | None = None
self.image_url: ImageURL | None = None
self._sample_data: bytes | None = None
self._sample: ImageFile | None = None
self.sample_url: ThumbnailURL | None = None
self._thumb_data: bytes | None = None
self._thumb: ImageFile | None = None
self.thumb_url: ThumbnailURL | None = None
document = scraper.get_html(ViewURL(self.id))
if not is_view(document):
raise ViewMissingException("View does not exist")
tag_bar = document.find_all("ul", attrs={"id": "tag-sidebar"})[0]
cpr = []
@ -71,9 +83,10 @@ class View:
label = ent.text.lower().strip()
match label:
case "original image":
self._image_url = parse_image_url(ent.find_all("a")[0]["href"])
self.image_url = parse_image_url(ent.find_all("a")[0]["href"])
self._thumb_url = ThumbnailURL(self._image_url.dir, self._image_url.id)
self.sample_url = SampleURL(self.image_url.dir, self.image_url.id)
self.thumb_url = ThumbnailURL(self.image_url.dir, self.image_url.id)
def get_image(self) -> ImageFile:
@ -86,10 +99,23 @@ class View:
def get_image_data(self) -> bytes:
if self._image_data is not None:
return self._image_data
self._image_data = scraper.get(self._image_url)
self._image_data = scraper.get(self.image_url)
return self._image_data
def get_sample(self) -> ImageFile:
if self._sample is not None:
return self._sample
self._sample = Image.open(BytesIO(self.get_sample_data()))
return self._sample
def get_sample_data(self) -> bytes:
if self._sample_data is not None:
return self._sample_data
self._sample_data = scraper.get(self.sample_url)
def get_thumbnail(self) -> ImageFile:
if self._thumb is not None:
return self._thumb
@ -100,14 +126,14 @@ class View:
def get_thumbnail_data(self) -> bytes:
if self._thumb_data is not None:
return self._thumb_data
self._thumb_data = scraper.get(self._thumb_url)
self._thumb_data = scraper.get(self.thumb_url)
def to_post(self) -> Post:
return Post(
self.id,
self._image_url.dir,
self._image_url.id,
self.image_url.dir,
self.image_url.id,
self.tags.to_list(),
)