124 lines
3.5 KiB
Python
124 lines
3.5 KiB
Python
from scraper.block import *
|
|
from scraper.config import config
|
|
from pathlib import Path
|
|
|
|
|
|
BLOCK_SIZE = config["scraper"]["block-size"]
|
|
|
|
|
|
class BlockListing:
|
|
def __init__(self, start: int, size: int, full: bool):
|
|
self.start = int(start)
|
|
self.size = int(size)
|
|
self.full = bool(full)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"start": self.start,
|
|
"size": self.size,
|
|
"full": self.full,
|
|
}
|
|
|
|
|
|
def _block_dir() -> Path:
|
|
return Path(config["server"]["block-dir"])
|
|
|
|
def _ensure_block_dir() -> Path:
|
|
path = _block_dir()
|
|
if path.exists():
|
|
return path
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def list_blocks() -> list[BlockListing]:
|
|
path = _ensure_block_dir()
|
|
block_files = path.glob("*")
|
|
parts = map(lambda p: p.name.split("-"), path.glob("*"))
|
|
return sorted(list(map(lambda p: BlockListing(*tuple(map(int, p))), parts)), key=lambda bl: bl.start)
|
|
|
|
|
|
def load_blocks() -> list[Block]:
|
|
path = _ensure_block_dir()
|
|
blocks = []
|
|
for block_path in path.glob("*"):
|
|
with open(block_path, "rb") as file:
|
|
blocks.append(load(file))
|
|
return blocks
|
|
|
|
|
|
def load_block_stats() -> list[BlockHeader]:
|
|
path = _ensure_block_dir()
|
|
headers = []
|
|
for block in path.glob("*"):
|
|
with open(block, "rb") as file:
|
|
headers.append(load_header(file))
|
|
return headers
|
|
|
|
|
|
def load_block(entry_id: int) -> Block:
|
|
path = _ensure_block_dir()
|
|
block_files = path.glob("*")
|
|
low_block: int = None
|
|
low_block_size = 0
|
|
high_block: int = None
|
|
high_block_size = 0
|
|
|
|
# Try to find block file
|
|
for file, start, size, full in map(lambda f: [f]+list(map(int, f.name.split("-"))), path.glob("*")):
|
|
|
|
# Find closest non-matching lower block
|
|
if entry_id >= start and (low_block == None or start >= low_block):
|
|
low_block = start
|
|
low_block_size = size
|
|
|
|
# Find closest non-matching higher block
|
|
if entry_id <= start and (high_block == None or start <= high_block):
|
|
high_block = start
|
|
high_block_size = size
|
|
|
|
# Find matching block
|
|
if entry_id >= start and entry_id < start+size:
|
|
# Found it, return
|
|
with open(file, "rb") as file:
|
|
return load(file)
|
|
# Failed...
|
|
|
|
# Determine name of the block file
|
|
block_size = BLOCK_SIZE
|
|
virtual_block = entry_id // block_size * block_size + 1
|
|
virtual_block_end = virtual_block + block_size # Not inclusive
|
|
|
|
# Clamp around lower block
|
|
if low_block is not None:
|
|
virtual_block = max(virtual_block, low_block+low_block_size)
|
|
|
|
# Clamp around higher block
|
|
if high_block is not None:
|
|
virtual_block_end = min(virtual_block_end, high_block)
|
|
|
|
# Create a new block
|
|
return Block(virtual_block, virtual_block_end - virtual_block)
|
|
|
|
|
|
def save_block(new_block: Block):
|
|
# Don't bother saving empty blocks
|
|
if len(new_block.entries) == 0:
|
|
return
|
|
|
|
def write_block(b: Block):
|
|
path = _ensure_block_dir()
|
|
with open(path / f"{b.start}-{b.size}-{int(len(b.entries)==b.size)}", "wb") as file:
|
|
dump(block, file)
|
|
badfile = path / f"{b.start}-{b.size}-{int(len(b.entries)!=b.size)}"
|
|
if badfile.exists():
|
|
badfile.unlink()
|
|
|
|
block = load_block(new_block.entries[0].post)
|
|
for entry in new_block.entries:
|
|
if block.start + block.size <= entry.post:
|
|
write_block(block)
|
|
block = load_block(entry.post)
|
|
block.add_entry(entry)
|
|
write_block(block)
|