class Client(AbstractContextManager):
"""Synchronous client for creating and managing Jobs"""
client: BaseClient
req: RequestBuilder
res: ResponseBuilder
def __init__(self, credentials: Credentials, *, url: str = "https://q3as.aqora.io"):
"""Create a new client instance"""
self.req = RequestBuilder(url)
self.res = ResponseBuilder()
self.client = BaseClient(auth=credentials.auth())
def close(self):
"""
Close the client
"""
self.client.close()
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def _send[T: BaseModel](self, request: Tuple[Type[T], Request]) -> T:
return self.res.parse(request[0], self.client.send(request[1]))
def _create_job(self, job: JobRequest) -> JobInfo:
return self._send(self.req.create_job(job))
def _get_job_info(self, slug: str) -> JobInfo:
return self._send(self.req.get_job_info(slug))
def _get_job_request(self, slug: str) -> JobRequest:
return self._send(self.req.get_job_request(slug))
def _get_job_result(self, slug: str) -> JobResult:
return self._send(self.req.get_job_result(slug))
def _pause_job(self, slug: str) -> JobInfo:
return self._send(self.req.pause_job(slug))
def _resume_job(self, slug: str) -> JobInfo:
return self._send(self.req.resume_job(slug))
def _delete_job(self, slug: str) -> JobInfo:
return self._send(self.req.delete_job(slug))
def _wait_for_job(
self,
name: str,
polling_interval: float = 1.0,
max_wait: Optional[float] = None,
) -> Tuple[JobInfo, JobResult]:
started = time.time()
while True:
job = self._get_job_info(name)
if job.status is not JobStatus.STARTED:
result = self._get_job_result(name)
if result.is_some():
return (job, result)
if max_wait is not None and time.time() - started >= max_wait:
raise TimeoutError("Job did not finish in time")
time.sleep(polling_interval)
def create_job(self, request: JobRequest) -> Job:
"""
Create a new Job
"""
info = self._create_job(request)
modified_request = self._get_job_request(info.slug)
return Job(self, info, modified_request)
def get_job(self, name: str) -> Job:
"""
Get a job by name
"""
info = self._get_job_info(name)
request = self._get_job_request(name)
return Job(self, info, request)