from .config import config from .block import Block from typing import IO from datetime import datetime, timedelta import json class Job: def __init__(self, start, size, deadline = None): self.start = start self.size = size if deadline is None: self.deadline = (datetime.now() + timedelta(seconds=config["scraper"]["job-deadline"])).timestamp() else: self.deadline = deadline def to_dict(self): return { "start": self.start, "size": self.size, "deadline": self.deadline, } def dump(job: Job, fd: IO[str | bytes]): json.dump(job.to_dict(), fd) def dumps(job: Job) -> str: return json.dumps(job.to_dict(), fd) def load(fd: IO[str | bytes]) -> Job: data = json.load(fd) return Job(data["start"], data["size"], data["deadline"]) def loads(data: str | bytes) -> Job: data = json.loads(data) return Job(data["start"], data["size"], data["deadline"])