from scraper.block import * from scraper.config import config from pathlib import Path BLOCK_SIZE = config["scraper"]["block-size"] class BlockListing: def __init__(self, start: int, size: int, full: bool): self.start = int(start) self.size = int(size) self.full = bool(full) def to_dict(self): return { "start": self.start, "size": self.size, "full": self.full, } def _block_dir() -> Path: return Path(config["server"]["block-dir"]) def _ensure_block_dir() -> Path: path = _block_dir() if path.exists(): return path path.mkdir(parents=True, exist_ok=True) return path def list_blocks() -> list[BlockListing]: path = _ensure_block_dir() block_files = path.glob("*") parts = map(lambda p: p.name.split("-"), path.glob("*")) return sorted(list(map(lambda p: BlockListing(*tuple(map(int, p))), parts)), key=lambda bl: bl.start) def load_blocks() -> list[Block]: path = _ensure_block_dir() blocks = [] for block_path in path.glob("*"): with open(block_path, "rb") as file: blocks.append(load(file)) return blocks def load_block_stats() -> list[BlockHeader]: path = _ensure_block_dir() headers = [] for block in path.glob("*"): with open(block, "rb") as file: headers.append(load_header(file)) return headers def load_block(entry_id: int) -> Block: path = _ensure_block_dir() block_files = path.glob("*") low_block: int = None low_block_size = 0 high_block: int = None high_block_size = 0 # Try to find block file for file, start, size, full in map(lambda f: [f]+list(map(int, f.name.split("-"))), path.glob("*")): # Find closest non-matching lower block if entry_id >= start and (low_block == None or start >= low_block): low_block = start low_block_size = size # Find closest non-matching higher block if entry_id <= start and (high_block == None or start <= high_block): high_block = start high_block_size = size # Find matching block if entry_id >= start and entry_id < start+size: # Found it, return with open(file, "rb") as file: return load(file) # Failed... # Determine name of the block file block_size = BLOCK_SIZE virtual_block = entry_id // block_size * block_size + 1 virtual_block_end = virtual_block + block_size # Not inclusive # Clamp around lower block if low_block is not None: virtual_block = max(virtual_block, low_block+low_block_size) # Clamp around higher block if high_block is not None: virtual_block_end = min(virtual_block_end, high_block) # Create a new block return Block(virtual_block, virtual_block_end - virtual_block) def save_block(new_block: Block): # Don't bother saving empty blocks if len(new_block.entries) == 0: return def write_block(b: Block): path = _ensure_block_dir() with open(path / f"{b.start}-{b.size}-{int(len(b.entries)==b.size)}", "wb") as file: dump(block, file) badfile = path / f"{b.start}-{b.size}-{int(len(b.entries)!=b.size)}" if badfile.exists(): badfile.unlink() block = load_block(new_block.entries[0].post) for entry in new_block.entries: if block.start + block.size <= entry.post: write_block(block) block = load_block(entry.post) block.add_entry(entry) write_block(block)