R
Runpod2y ago
ribbit

How do I handle both streaming and non-streaming request in a serverless pod?

How can I handle both effectively? Is it okay to have a handler witht both yield and return? i.e.
def handler(endpoint):
if endpoint == "stream_response":
yield stream_response()
elif endpoint == "get_response":
return get_response()

runpod.serverless.start(
{
"handler": handler,
"return_aggregate_stream": True
}
)
def handler(endpoint):
if endpoint == "stream_response":
yield stream_response()
elif endpoint == "get_response":
return get_response()

runpod.serverless.start(
{
"handler": handler,
"return_aggregate_stream": True
}
)
Will this work?
42 Replies
agentpietrucha
Seems it should work. Have you tried testing it locally? Runpod works really good locally, so it's easy to test
J.
J.2y ago
Eh. runpod local stuff is pretty limited imo. It works for basic use case validation, but i tend to just run it against a GPU Pod instead since it seems to actually work by just directly calling the function. I just skip the runpod.start({{}}) when im on a GPU Pod. In terms of yield and return you should be able to, but I think using yield syntax both ways do not matter. https://github.com/justinwlin/Runpod-OpenLLM-Pod-and-Serverless/blob/main/handler.py
GitHub
Runpod-OpenLLM-Pod-and-Serverless/handler.py at main · justinwlin/R...
A repo for OpenLLM to run pod. Contribute to justinwlin/Runpod-OpenLLM-Pod-and-Serverless development by creating an account on GitHub.
agentpietrucha
Nice way to run the same handler in both pod and serverless!
J.
J.2y ago
Thx! Yeah, I love being able to debug on GPU Pod, makes it much easier.
ribbit
ribbitOP2y ago
no unfortunately it's not easy to test locally, but I tried deploying it anyway and turns out whenever there's yield in the handler function everything I return becomes a generator? I can't get it to work yet
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
https://discord.com/channels/912829806415085598/948767517332107274/1235352455995199500 Maybe check if your updated runpod version? lol just random guess
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
but my code that i link, works with both streaming / nonstreaming i feel the docs don't show streaming surprisingly 👁️, at least on the client side that there is an endpoint you gotta hit called /stream
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
oh interesting maybe im just bad at looking
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
ah found it
J.
J.2y ago
Endpoint operations | RunPod Documentation
Comprehensive guide on interacting with models using RunPod's API Endpoints without managing the pods yourself.
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
yeaaaaa, not the best doc xD
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
A bit more involved imo is the only problem with the doc:
# Stream output
status_url = f"https://api.runpod.ai/v2/{endpoint_id}/stream/{task_id}"

try:
while True: # Adjust the range or use a while loop for continuous polling
time.sleep(1) # Polling interval
get_status = requests.get(status_url, headers=headers)
if get_status.status_code == 200:
status_response = get_status.json()
if 'stream' in status_response and len(status_response['stream']) > 0:
for item in status_response['stream']:
print(item['output']['text'], end='') # Adjust based on the actual structure
if status_response.get('status') == 'COMPLETED':
print("\nJob completed.")
break
else:
print(f"Error streaming job output: {get_status.text}")
break
# Stream output
status_url = f"https://api.runpod.ai/v2/{endpoint_id}/stream/{task_id}"

try:
while True: # Adjust the range or use a while loop for continuous polling
time.sleep(1) # Polling interval
get_status = requests.get(status_url, headers=headers)
if get_status.status_code == 200:
status_response = get_status.json()
if 'stream' in status_response and len(status_response['stream']) > 0:
for item in status_response['stream']:
print(item['output']['text'], end='') # Adjust based on the actual structure
if status_response.get('status') == 'COMPLETED':
print("\nJob completed.")
break
else:
print(f"Error streaming job output: {get_status.text}")
break
ribbit
ribbitOP2y ago
sorry, I mean whenever a yield is present in the handler, the output becomes a generator regardless if I use yield or return. for example
def handler():
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
def handler():
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
yeah that's not how, i just wrote the example that way for convinience
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
i hit the /run endpoint first, then retrieve the stream by hitting /stream sorry i can't produce screenshot right now, but in the local testing library, the output is somewhat like this:
...runpod logs...
output: <generator object ...>
...runpod logs...
...runpod logs...
output: <generator object ...>
...runpod logs...
assume that this is my code
def handler():
value = 1
something = False
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
def handler():
value = 1
something = False
if something:
yield value # output is a generator
else:
return value # this also outputs a generator
it is expected if something is True, but somehow when something is False, it returns a generator as well
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
haha sorry was dizzy, i meant /run that's from the runpod local dev server log, the one that shows the output of the handler function
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
STREAM_ENDPOINTS = {"stream_answer"}

def handler(job):
validated_input = validate_input(job)
if "errors" in validated_input:
return {
"error": "\n".join(validated_input["errors"])
}

validated_api = validate_api(job)
if "errors" in validated_api:
return {
"error": "\n".join(validated_api["errors"])
}

endpoint, validated_payload = validate_payload(job)
if "errors" in validated_payload:
return {
"error": "\n".join(validated_payload["errors"])
}

if "validated_input" in validated_payload:
payload = validated_payload["validated_input"]
else:
payload = validated_payload

if payload.get("emu_error", False):
return {
"error": "Error emulated. If this is unintended, please check your environment variables"
}

LOGGER.info(f"[INVOKING] {endpoint}", job["id"])
LOGGER.info(f"[SUCCESS] Returning")

response = invoke(endpoint, payload)

if endpoint in STREAM_ENDPOINTS:
for token in response:
yield token
else:
return response
STREAM_ENDPOINTS = {"stream_answer"}

def handler(job):
validated_input = validate_input(job)
if "errors" in validated_input:
return {
"error": "\n".join(validated_input["errors"])
}

validated_api = validate_api(job)
if "errors" in validated_api:
return {
"error": "\n".join(validated_api["errors"])
}

endpoint, validated_payload = validate_payload(job)
if "errors" in validated_payload:
return {
"error": "\n".join(validated_payload["errors"])
}

if "validated_input" in validated_payload:
payload = validated_payload["validated_input"]
else:
payload = validated_payload

if payload.get("emu_error", False):
return {
"error": "Error emulated. If this is unintended, please check your environment variables"
}

LOGGER.info(f"[INVOKING] {endpoint}", job["id"])
LOGGER.info(f"[SUCCESS] Returning")

response = invoke(endpoint, payload)

if endpoint in STREAM_ENDPOINTS:
for token in response:
yield token
else:
return response
output
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object handler at 0x7d0f40748890>
DEBUG | local_test | run_job return: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Local testing complete, exiting.
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object handler at 0x7d0f40748890>
DEBUG | local_test | run_job return: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object handler at 0x7d0f40748890>}
INFO | Local testing complete, exiting.
the endpoint's value is not stream_answer, yet it always returns a generator
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
def invoke(
endpoint: str,
payload: dict
) -> Any:
if endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "answer_stream":
response = stream_func() # this returns a generator

return response
def invoke(
endpoint: str,
payload: dict
) -> Any:
if endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "...":
response = some_func() # this does not return a generator
elif endpoint == "answer_stream":
response = stream_func() # this returns a generator

return response
this is how the invoke method is, i omitted irrelevant codes but that's generally how the function is.
endpoint, validated_payload = validate_payload(job)
endpoint, validated_payload = validate_payload(job)
I got endpoint from this line in the handler function, which will pass job into a validator function that simply check if my payload schema is ok. basically the inputted payload should look like this:
{
"input": {
"api": {
"endpoint": "...",
},
...rest of the json
}
}
{
"input": {
"api": {
"endpoint": "...",
},
...rest of the json
}
}
basically that function validates and extract the endpoint variable from my input payload i am very sure that this is not the case, tried and checked every other function, this stream function is a new addition to the code and before that (all the function above of the stream_func is the original code) no function ever coded to return a generator
J.
J.2y ago
why does it matter if its a generator or not? If there is nothing else being sent, then the reponse is just done anyways Whether you use yield or return
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
because the change of the response type would disrupt other running services, i am to avoid that
def dummy(job):
something = False
if something:
yield 1
else:
return 1


if __name__ == "__main__":
runpod.serverless.start(
{
"handler": dummy,
"return_aggregate_stream": True
}
)
def dummy(job):
something = False
if something:
yield 1
else:
return 1


if __name__ == "__main__":
runpod.serverless.start(
{
"handler": dummy,
"return_aggregate_stream": True
}
)
this code also returns a generator
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object dummy at 0x7376fcbdedc0>
DEBUG | local_test | run_job return: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Local testing complete, exiting.
INFO | local_test | Started.
DEBUG | local_test | Handler output: <generator object dummy at 0x7376fcbdedc0>
DEBUG | local_test | run_job return: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Job local_test completed successfully.
INFO | Job result: {'output': <generator object dummy at 0x7376fcbdedc0>}
INFO | Local testing complete, exiting.
I think it's just the way it is thank you all
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
J.
J.2y ago
i think this a problem with local testing it shouldnt return a generator should just be 1
ribbit
ribbitOP2y ago
{
"delayTime": 9289,
"executionTime": 54,
"id": "bc6aaadd-0970-4100-99a0-f4230d18be4f-e1",
"output": [],
"status": "COMPLETED"
}
{
"delayTime": 9289,
"executionTime": 54,
"id": "bc6aaadd-0970-4100-99a0-f4230d18be4f-e1",
"output": [],
"status": "COMPLETED"
}
returned a generator still, same code ran on serverless. no log to prove that it's a generator tho but it has the same behavior as before when it casted all my outputs to be a generator, it returns a [] and empty stream when streamed.
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
ribbit
ribbitOP2y ago
the normal streaming works well, now we have 2 separate endpoints one to handle all non-streaming and one to handle all the streaming hahah will try to do so
Unknown User
Unknown User2y ago
Message Not Public
Sign In & Join Server To View
Dio the Debugger
did this ever get resolved @nerdylive @here I am facing a similar issue
Dio the Debugger
the generator object is what i get and on the deployment i get an empty object @Madiator2011 (Work)

Did you find this page helpful?