Skip to content

Client

Bases: AbstractContextManager

Synchronous client for creating and managing Jobs

Source code in q3as/client.py
class Client(AbstractContextManager):
    """Synchronous client for creating and managing Jobs"""

    client: BaseClient
    req: RequestBuilder
    res: ResponseBuilder

    def __init__(self, credentials: Credentials, *, url: str = "https://q3as.aqora.io"):
        """Create a new client instance"""
        self.req = RequestBuilder(url)
        self.res = ResponseBuilder()
        self.client = BaseClient(auth=credentials.auth())

    def close(self):
        """
        Close the client
        """
        self.client.close()

    def __enter__(self):
        return self

    def __exit__(self, *_):
        self.close()

    def _send[T: BaseModel](self, request: Tuple[Type[T], Request]) -> T:
        return self.res.parse(request[0], self.client.send(request[1]))

    def _create_job(self, job: JobRequest) -> JobInfo:
        return self._send(self.req.create_job(job))

    def _get_job_info(self, slug: str) -> JobInfo:
        return self._send(self.req.get_job_info(slug))

    def _get_job_request(self, slug: str) -> JobRequest:
        return self._send(self.req.get_job_request(slug))

    def _get_job_result(self, slug: str) -> JobResult:
        return self._send(self.req.get_job_result(slug))

    def _pause_job(self, slug: str) -> JobInfo:
        return self._send(self.req.pause_job(slug))

    def _resume_job(self, slug: str) -> JobInfo:
        return self._send(self.req.resume_job(slug))

    def _delete_job(self, slug: str) -> JobInfo:
        return self._send(self.req.delete_job(slug))

    def _wait_for_job(
        self,
        name: str,
        polling_interval: float = 1.0,
        max_wait: Optional[float] = None,
    ) -> Tuple[JobInfo, JobResult]:
        started = time.time()
        while True:
            job = self._get_job_info(name)
            if job.status is not JobStatus.STARTED:
                result = self._get_job_result(name)
                if result.is_some():
                    return (job, result)
            if max_wait is not None and time.time() - started >= max_wait:
                raise TimeoutError("Job did not finish in time")
            time.sleep(polling_interval)

    def create_job(self, request: JobRequest) -> Job:
        """
        Create a new Job
        """
        info = self._create_job(request)
        modified_request = self._get_job_request(info.slug)
        return Job(self, info, modified_request)

    def get_job(self, name: str) -> Job:
        """
        Get a job by name
        """
        info = self._get_job_info(name)
        request = self._get_job_request(name)
        return Job(self, info, request)

__init__(credentials, *, url='https://q3as.aqora.io')

Create a new client instance

Source code in q3as/client.py
def __init__(self, credentials: Credentials, *, url: str = "https://q3as.aqora.io"):
    """Create a new client instance"""
    self.req = RequestBuilder(url)
    self.res = ResponseBuilder()
    self.client = BaseClient(auth=credentials.auth())

close()

Close the client

Source code in q3as/client.py
def close(self):
    """
    Close the client
    """
    self.client.close()

create_job(request)

Create a new Job

Source code in q3as/client.py
def create_job(self, request: JobRequest) -> Job:
    """
    Create a new Job
    """
    info = self._create_job(request)
    modified_request = self._get_job_request(info.slug)
    return Job(self, info, modified_request)

get_job(name)

Get a job by name

Source code in q3as/client.py
def get_job(self, name: str) -> Job:
    """
    Get a job by name
    """
    info = self._get_job_info(name)
    request = self._get_job_request(name)
    return Job(self, info, request)