async def _sim_runsync(self, job_request: DefaultRequest) -> JobOutput:
""" Development endpoint to simulate runsync behavior. """
assigned_job_id = f"test-{uuid.uuid4()}"
job = TestJob(id=assigned_job_id, input=job_request.input)
if is_generator(self.config["handler"]):
generator_output = run_job_generator(self.config["handler"], job.__dict__)
job_output = {"output": []}
async for stream_output in generator_output:
job_output['output'].append(stream_output["output"])
else:
job_output = await run_job(self.config["handler"], job.__dict__)
if job_output.get('error', None):
return jsonable_encoder({
"id": job.id,
"status": "FAILED",
"error": job_output['error']
})
if job_request.webhook:
thread = threading.Thread(
target=_send_webhook,
args=(job_request.webhook, job_output), daemon=True)
thread.start()
return jsonable_encoder({
"id": job.id,
"status": "COMPLETED",
"output": job_output['output']
})
async def _sim_runsync(self, job_request: DefaultRequest) -> JobOutput:
""" Development endpoint to simulate runsync behavior. """
assigned_job_id = f"test-{uuid.uuid4()}"
job = TestJob(id=assigned_job_id, input=job_request.input)
if is_generator(self.config["handler"]):
generator_output = run_job_generator(self.config["handler"], job.__dict__)
job_output = {"output": []}
async for stream_output in generator_output:
job_output['output'].append(stream_output["output"])
else:
job_output = await run_job(self.config["handler"], job.__dict__)
if job_output.get('error', None):
return jsonable_encoder({
"id": job.id,
"status": "FAILED",
"error": job_output['error']
})
if job_request.webhook:
thread = threading.Thread(
target=_send_webhook,
args=(job_request.webhook, job_output), daemon=True)
thread.start()
return jsonable_encoder({
"id": job.id,
"status": "COMPLETED",
"output": job_output['output']
})