from py34 import Post from py34.url import ImageURL, IMAGE_FORMATS, VIDEO_FORMATS from py34.scraper import scraper from typing import IO from io import BytesIO from enum import IntEnum from bisect import insort import math import zlib import time class BlockEntryType(IntEnum): # Status ERROR = 0 MISSING = 1 MAYBE_MISSING = 2 # Did not appear on the listing, but the post was never checked # Image JPG = 100 JPEG = 101 PNG = 102 # Animated Image GIF = 200 # Audio # Video MP4 = 400 WEBM = 401 def ext2enttype(ext: str) -> BlockEntryType: return BlockEntryType[ext.upper()] def enttype2ext(enttype: BlockEntryType) -> str: if enttype.value < 50: raise Exception("Entry does not refer to a file") return enttype.name.lower() def _compute_tag_bits(tags: list[str]) -> list[dict[int]]: # Compute tag bits tag_bits = [{} for _ in range(math.ceil(len(tags) / 32))] for n in range(len(tags)): bit = n % 32 rank = n // 32 tag_bits[rank][tags[n]] = 1 << bit return tag_bits def _pad_bytes(size: int, pad_addr: int) -> int: addr_bytes = pad_addr // 8 overflow = size % addr_bytes if overflow == 0: return 0 else: return addr_bytes - overflow class Block: def __init__(self, start: int, size: int): self.start: int = start self.size: int = size self.entries: list[BlockEntry] = [] def add_post(self, post: Post, enttype: BlockEntryType | None = None): self.add_entry(BlockEntry(post, enttype)) def add_entry(self, entry: "BlockEntry"): # Check if entry belongs to this block if not self.belongs(entry.post): raise Exception("Given post does not belong to this block") # Remove entry with matching post-id, if exists self.entries = list(filter(lambda e: e.post != entry.post, self.entries)) # insert the entry insort(self.entries, entry) def dump(self, fd: IO[bytes]): # Sort all entries self.entries.sort() # Combine all tags tags: set[str] = set() for entry in self.entries: tags |= set(entry.tags) tags: list[str] = list(tags) tags.sort() # Compute tag bits tag_bits = _compute_tag_bits(tags) # Compute length of tag stringzs tags_len_b = 0 for tag in tags: tags_len_b += len(tag) + 1 # Compress tag data tag_data = zlib.compress((b"\0".join(map(str.encode, tags)))+b"\0", level=9) # Magic 4B fd.write(b"34BK") # Entry amount 2B fd.write(len(self.entries).to_bytes(2, "little", signed=False)) # Size 2B fd.write(self.size.to_bytes(2, "little", signed=False)) # Start 4B fd.write(self.start.to_bytes(4, "little", signed=False)) # Amount of different tags 2B fd.write(len(tags).to_bytes(2, "little", signed=False)) # Size of tag data 2B fd.write(len(tag_data).to_bytes(2, "little", signed=False)) # Write all the tags fd.write(tag_data) # Pad to the nearest 32bit address for _ in range(_pad_bytes(len(tag_data), 32)): fd.write(b"\0") # Dump entries for entry in self.entries: image = bytearray.fromhex(entry.image) # Post ID 4B fd.write(entry.post.to_bytes(4, "little", signed=False)) # Entry type enum 2B fd.write(entry.type.to_bytes(2, "little", signed=False)) # Image id length 2B fd.write(len(image).to_bytes(2, "little", signed=False)) # Image dir 4B fd.write(entry.dir.to_bytes(4, "little", signed=False)) # Thumbnail size 4B fd.write(len(entry.thumbnail).to_bytes(4, "little", signed=False)) # Tag bits 4B*ranks for rank in tag_bits: word = 0 for tag in entry.tags: if tag in rank: word |= rank[tag] fd.write(word.to_bytes(4, "little", signed=False)) # Image ID fd.write(image) # Thumbnail fd.write(entry.thumbnail) # Pad to the nearest 32bit address for _ in range(_pad_bytes(len(image) + len(entry.thumbnail), 32)): fd.write(b"\0") def dumps(self) -> bytes: io = BytesIO() self.dump(io) return io.getvalue() def entry(self, post_id: int) -> "BlockEntry": for entry in self.entries: if entry.post == post_id: return entry raise IndexError("Entry not found") def belongs(self, post_id: int) -> bool: return post_id >= self.start and post_id < self.start+self.size def to_dict(self): return { "start": self.start, "size": self.size, "entries": list(map(BlockEntry.to_dict, self.entries)), } class BlockEntry: def __init__(self, post: Post, enttype: BlockEntryType | None): self.post = post.id self.dir = post.image_dir self.image = post.image_id self.type = enttype self.tags = post.tags.copy() self.thumbnail = post.get_thumbnail_data() if self.type is None: for ext in VIDEO_FORMATS + IMAGE_FORMATS: image_url = ImageURL(post.image_dir, post.image_id, ext) status: int = None # CDN tends to return 503 : Service Unavailable while status != 200 and status != 404: status = scraper.head(image_url, body=False).status_code if status != 200 and status != 404: scraper.reset() time.sleep(1) # HEAD could fail, try with GET while status != 200 and status != 404: status = scraper.get(image_url, body=False).status_code if status != 200 and status != 404: scraper.reset() time.sleep(1) if status == 200: self.type = ext2enttype(ext) break if self.type is None: self.type = BlockEntryType.ERROR def to_dict(self): return { "post": self.post, "dir": self.dir, "image": self.image, "type": self.type.name, "tags": self.tags, } def __lt__(self, other): return self.post < other.post class BlockHeader: def __init__(self, start: int, size: int, entries: int): self.start = start self.size = size self.entries = entries def to_dict(self): return { "start": self.start, "size": self.size, "entries": self.entries, } def dump(block: Block, fd: IO[bytes]): block.dump(fd) def dumps(block: Block) -> bytes: return block.dumps() def load_header(fd: IO[bytes]) -> Block: if fd.read(4) != b"34BK": raise Exception("Stream is not Block data") def read_dword(fd: IO[bytes]) -> int: return int.from_bytes(fd.read(4), "little", signed=False) def read_word(fd: IO[bytes]) -> int: return int.from_bytes(fd.read(2), "little", signed=False) entries_len = read_word(fd) block_size = read_word(fd) block_start = read_dword(fd) return BlockHeader(block_start, block_size, entries_len) def load(fd: IO[bytes]) -> Block: if fd.read(4) != b"34BK": raise Exception("Stream is not Block data") def read_dword(fd: IO[bytes]) -> int: return int.from_bytes(fd.read(4), "little", signed=False) def read_word(fd: IO[bytes]) -> int: return int.from_bytes(fd.read(2), "little", signed=False) # Read header entries_len = read_word(fd) block_size = read_word(fd) block_start = read_dword(fd) tags_len = read_word(fd) tags_data_len = read_word(fd) # Read tags tags_data = zlib.decompress(fd.read(tags_data_len)) tags = list(map(bytes.decode, tags_data.rstrip(b"\0").split(b"\0"))) # Slurp padding bytes fd.read(_pad_bytes(tags_data_len, 32)) # Compute tag bits tag_bits = _compute_tag_bits(tags) # Load entries block = Block(block_start, block_size) for n in range(entries_len): # Read header post_id = read_dword(fd) enttype = BlockEntryType(read_word(fd)) image_id_len = read_word(fd) image_dir = read_dword(fd) thumbnail_size = read_dword(fd) # Read tags post_tags = [] for rank in tag_bits: bits = read_dword(fd) for tag in rank: if rank[tag] & bits: post_tags.append(tag) # Read image id image = fd.read(image_id_len) image_id = image.hex() # Read image thumbnail thumbnail = fd.read(thumbnail_size) # Slurp padding bytes fd.read(_pad_bytes(image_id_len + thumbnail_size, 32)) block.add_post(Post(post_id, image_dir, image_id, post_tags, thumbnail), enttype) return block def loads(data: bytes) -> Block: return load(BytesIO(data));