Files
py34/scraper/block.py
2025-08-09 17:15:20 +02:00

326 lines
8.8 KiB
Python

from py34 import Post
from py34.url import ImageURL, IMAGE_FORMATS
from py34.scraper import scraper
from typing import IO
from io import BytesIO
from enum import IntEnum
from bisect import insort
import math
import zlib
import time
class BlockEntryType(IntEnum):
# Status
ERROR = 0
MISSING = 1
MAYBE_MISSING = 2 # Did not appear on the listing, but the post was never checked
# Image
JPG = 100
JPEG = 101
PNG = 102
# Animated Image
GIF = 200
# Audio
# Video
MP4 = 400
WEBM = 401
def ext2enttype(ext: str) -> BlockEntryType:
return BlockEntryType[ext.upper()]
def enttype2ext(enttype: BlockEntryType) -> str:
if enttype.value < 50:
raise Exception("Entry does not refer to a file")
return enttype.name.lower()
def _compute_tag_bits(tags: list[str]) -> list[dict[int]]:
# Compute tag bits
tag_bits = [{} for _ in range(math.ceil(len(tags) / 32))]
for n in range(len(tags)):
bit = n % 32
rank = n // 32
tag_bits[rank][tags[n]] = 1 << bit
return tag_bits
def _pad_bytes(size: int, pad_addr: int) -> int:
addr_bytes = pad_addr // 8
overflow = size % addr_bytes
if overflow == 0:
return 0
else:
return addr_bytes - overflow
class Block:
def __init__(self, start: int, size: int):
self.start: int = start
self.size: int = size
self.entries: list[BlockEntry] = []
def add_post(self, post: Post, enttype: BlockEntryType | None = None):
self.add_entry(BlockEntry(post, enttype))
def add_entry(self, entry: "BlockEntry"):
# Check if entry belongs to this block
if not self.belongs(entry.post):
raise Exception("Given post does not belong to this block")
# Remove entry with matching post-id, if exists
self.entries = list(filter(lambda e: e.post != entry.post, self.entries))
# insert the entry
insort(self.entries, entry)
def dump(self, fd: IO[bytes]):
# Sort all entries
self.entries.sort()
# Combine all tags
tags: set[str] = set()
for entry in self.entries:
tags |= set(entry.tags)
tags: list[str] = list(tags)
tags.sort()
# Compute tag bits
tag_bits = _compute_tag_bits(tags)
# Compute length of tag stringzs
tags_len_b = 0
for tag in tags:
tags_len_b += len(tag) + 1
# Compress tag data
tag_data = zlib.compress((b"\0".join(map(str.encode, tags)))+b"\0", level=9)
# Magic 4B
fd.write(b"34BK")
# Entry amount 2B
fd.write(len(self.entries).to_bytes(2, "little", signed=False))
# Size 2B
fd.write(self.size.to_bytes(2, "little", signed=False))
# Start 4B
fd.write(self.start.to_bytes(4, "little", signed=False))
# Amount of different tags 2B
fd.write(len(tags).to_bytes(2, "little", signed=False))
# Size of tag data 2B
fd.write(len(tag_data).to_bytes(2, "little", signed=False))
# Write all the tags
fd.write(tag_data)
# Pad to the nearest 32bit address
for _ in range(_pad_bytes(len(tag_data), 32)):
fd.write(b"\0")
# Dump entries
for entry in self.entries:
image = bytearray.fromhex(entry.image)
# Post ID 4B
fd.write(entry.post.to_bytes(4, "little", signed=False))
# Entry type enum 2B
fd.write(entry.type.to_bytes(2, "little", signed=False))
# Image id length 2B
fd.write(len(image).to_bytes(2, "little", signed=False))
# Image dir 4B
fd.write(entry.dir.to_bytes(4, "little", signed=False))
# Thumbnail size 4B
fd.write(len(entry.thumbnail).to_bytes(4, "little", signed=False))
# Tag bits 4B*ranks
for rank in tag_bits:
word = 0
for tag in entry.tags:
if tag in rank:
word |= rank[tag]
fd.write(word.to_bytes(4, "little", signed=False))
# Image ID
fd.write(image)
# Thumbnail
fd.write(entry.thumbnail)
# Pad to the nearest 32bit address
for _ in range(_pad_bytes(len(image) + len(entry.thumbnail), 32)):
fd.write(b"\0")
def dumps(self) -> bytes:
io = BytesIO()
self.dump(io)
return io.getvalue()
def entry(self, post_id: int) -> "BlockEntry":
for entry in self.entries:
if entry.post == post_id:
return entry
raise IndexError("Entry not found")
def belongs(self, post_id: int) -> bool:
return post_id >= self.start and post_id < self.start+self.size
def to_dict(self):
return {
"start": self.start,
"size": self.size,
"entries": list(map(BlockEntry.to_dict, self.entries)),
}
class BlockEntry:
def __init__(self, post: Post, enttype: BlockEntryType | None):
self.post = post.id
self.dir = post.image_dir
self.image = post.image_id
self.type = enttype
self.tags = post.tags.copy()
self.thumbnail = post.get_thumbnail_data()
if self.type is None:
for ext in IMAGE_FORMATS:
image_url = ImageURL(post.image_dir, post.image_id, ext)
status: int = None
# CDN tends to return 503 : Service Unavailable
while status != 200 and status != 404:
status = scraper.get(image_url, body=False).status_code
if status != 200 and status != 404:
scraper.reset()
time.sleep(1)
if status == 200:
self.type = ext2enttype(ext)
break
if self.type is None:
self.type = BlockEntryType.ERROR
def to_dict(self):
return {
"post": self.post,
"dir": self.dir,
"image": self.image,
"type": self.type.name,
"tags": self.tags,
}
def __lt__(self, other):
return self.post < other.post
class BlockHeader:
def __init__(self, start: int, size: int, entries: int):
self.start = start
self.size = size
self.entries = entries
def to_dict(self):
return {
"start": self.start,
"size": self.size,
"entries": self.entries,
}
def dump(block: Block, fd: IO[bytes]):
block.dump(fd)
def dumps(block: Block) -> bytes:
return block.dumps()
def load_header(fd: IO[bytes]) -> Block:
if fd.read(4) != b"34BK":
raise Exception("Stream is not Block data")
def read_dword(fd: IO[bytes]) -> int:
return int.from_bytes(fd.read(4), "little", signed=False)
def read_word(fd: IO[bytes]) -> int:
return int.from_bytes(fd.read(2), "little", signed=False)
entries_len = read_word(fd)
block_size = read_word(fd)
block_start = read_dword(fd)
return BlockHeader(block_start, block_size, entries_len)
def load(fd: IO[bytes]) -> Block:
if fd.read(4) != b"34BK":
raise Exception("Stream is not Block data")
def read_dword(fd: IO[bytes]) -> int:
return int.from_bytes(fd.read(4), "little", signed=False)
def read_word(fd: IO[bytes]) -> int:
return int.from_bytes(fd.read(2), "little", signed=False)
# Read header
entries_len = read_word(fd)
block_size = read_word(fd)
block_start = read_dword(fd)
tags_len = read_word(fd)
tags_data_len = read_word(fd)
# Read tags
tags_data = zlib.decompress(fd.read(tags_data_len))
tags = list(map(bytes.decode, tags_data.rstrip(b"\0").split(b"\0")))
# Slurp padding bytes
fd.read(_pad_bytes(tags_data_len, 32))
# Compute tag bits
tag_bits = _compute_tag_bits(tags)
# Load entries
block = Block(block_start, block_size)
for n in range(entries_len):
# Read header
post_id = read_dword(fd)
enttype = BlockEntryType(read_word(fd))
image_id_len = read_word(fd)
image_dir = read_dword(fd)
thumbnail_size = read_dword(fd)
# Read tags
post_tags = []
for rank in tag_bits:
bits = read_dword(fd)
for tag in rank:
if rank[tag] & bits:
post_tags.append(tag)
# Read image id
image = fd.read(image_id_len)
image_id = image.hex()
# Read image thumbnail
thumbnail = fd.read(thumbnail_size)
# Slurp padding bytes
fd.read(_pad_bytes(image_id_len + thumbnail_size, 32))
block.add_post(Post(post_id, image_dir, image_id, post_tags, thumbnail), enttype)
return block
def loads(data: bytes) -> Block:
return load(BytesIO(data));