How do I handle both streaming and non-streaming request in a serverless pod?
How can I handle both effectively? Is it okay to have a handler witht both yield and return? i.e.
Will this work?
Will this work?
def handler(endpoint):
if endpoint == "stream_response":
yield stream_response()
elif endpoint == "get_response":
return get_response()
runpod.serverless.start(
{
"handler": handler,
"return_aggregate_stream": True
}
)
something is True, but somehow when something is False, it returns a generator as wellendpoint's value is not stream_answer, yet it always returns a generatorstream_func is the original code) no function ever coded to return a generator# Stream output
status_url = f"https://api.runpod.ai/v2/{endpoint_id}/stream/{task_id}"
try:
while True: # Adjust the range or use a while loop for continuous polling
time.sleep(1) # Polling interval
get_status = requests.get(status_url, headers=headers)
if get_status.status_code == 200:
status_response = get_status.json()
if 'stream' in status_response and len(status_response['stream']) > 0:
for item in status_response['stream']:
print(item['output']['text'], end='') # Adjust based on the actual structure
if status_response.get('status') == 'COMPLETED':
print("\nJob completed.")
break
else:
print(f"Error streaming job output: {get_status.text}")
break...runpod logs...
output: <generator object ...>
...runpod logs...somethingsomethingFalseINFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object handler at 0x7d0f40748890>
DEBUG | local_test | run_job return: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Local testing complete, exiting.endpointstream_answerstream_funcimport runpod # Required.
def handler(job):
job_input = job["input"] # Access the input from the request.
# Add your custom code here.
return "Your job results"def handler():
if something:
yield value # output is a generator
else:
return value # this also outputs a generatordef handler():
value = 1
something = False
if something:
yield value # output is a generator
else:
return value # this also outputs a generatorSTREAM_ENDPOINTS = {"stream_answer"}
def handler(job):
validated_input = validate_input(job)
if "errors" in validated_input:
return {
"error": "\n".join(validated_input["errors"])
}
validated_api = validate_api(job)
if "errors" in validated_api:
return {
"error": "\n".join(validated_api["errors"])
}
endpoint, validated_payload = validate_payload(job)
if "errors" in validated_payload:
return {
"error": "\n".join(validated_payload["errors"])
}
if "validated_input" in validated_payload:
payload = validated_payload["validated_input"]
else:
payload = validated_payload
if payload.get("emu_error", False):
return {
"error": "Error emulated. If this is unintended, please check your environment variables"
}
LOGGER.info(f"[INVOKING] {endpoint}", job["id"])
LOGGER.info(f"[SUCCESS] Returning")
response = invoke(endpoint, payload)
if endpoint in STREAM_ENDPOINTS:
for token in response:
yield token
else:
return responsedef invoke(
endpoint: str,
payload: dict
) -> Any:
if endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "answer_stream":
response = stream_func() # this returns a generator
return responseendpoint, validated_payload = validate_payload(job){
"input": {
"api": {
"endpoint": "...",
},
...rest of the json
}
}