R
RunPod•6mo ago
Concept

Serverless Endpoint Streaming

I'm currently working with Llama.cpp for my inference and have setup my handler.py file to be similar to this guide. https://docs.runpod.io/docs/handler-generator My input and handler file looks like this:
{
"input": {
"query": "Temp",
"stream":true
}
}
{
"input": {
"query": "Temp",
"stream":true
}
}
import runpod
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import PromptTemplate
from langchain.llms import LlamaCpp
from langchain.chains import LLMChain

def load_llm():

n_gpu_layers = 200
n_batch = 500

# Make sure the model path is correct for your system!
llm = LlamaCpp(
model_path="starling-lm-7b-alpha.Q4_K_M.gguf",
n_gpu_layers=n_gpu_layers,
use_mlock=True,
use_mmap=True,
max_tokens=1024,
stop=["Q:","Disclaimer:","</s>","Source:","Legal Inquiry:","\n\n ","Summary:"],
n_batch=n_batch,
temp= 0.5,
n_ctx = 8192,
repeat_penalty=1.18,
)
print("LLM Loaded! ;)")
return llm


def process_input(input):
"""
Execute the application code
"""
llm = load_llm()
query = input['query']

prompt = PromptTemplate(
input_variables=["context"],
template="""
GPT4 User: {context}

<|end_of_turn|>GPT4 Legal Assistant:
""",
)

llmchain = LLMChain(llm = llm, prompt = prompt)
answer = llmchain.run(query)

return {
"answer": answer
}


# ---------------------------------------------------------------------------- #
# RunPod Handler #
# ---------------------------------------------------------------------------- #
def handler(event):
"""
This is the handler function that will be called by RunPod serverless.
"""
return process_input(event['input'])


if __name__ == '__main__':
runpod.serverless.start({
'handler': handler,
"return_aggregate_stream": True
})
import runpod
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import PromptTemplate
from langchain.llms import LlamaCpp
from langchain.chains import LLMChain

def load_llm():

n_gpu_layers = 200
n_batch = 500

# Make sure the model path is correct for your system!
llm = LlamaCpp(
model_path="starling-lm-7b-alpha.Q4_K_M.gguf",
n_gpu_layers=n_gpu_layers,
use_mlock=True,
use_mmap=True,
max_tokens=1024,
stop=["Q:","Disclaimer:","</s>","Source:","Legal Inquiry:","\n\n ","Summary:"],
n_batch=n_batch,
temp= 0.5,
n_ctx = 8192,
repeat_penalty=1.18,
)
print("LLM Loaded! ;)")
return llm


def process_input(input):
"""
Execute the application code
"""
llm = load_llm()
query = input['query']

prompt = PromptTemplate(
input_variables=["context"],
template="""
GPT4 User: {context}

<|end_of_turn|>GPT4 Legal Assistant:
""",
)

llmchain = LLMChain(llm = llm, prompt = prompt)
answer = llmchain.run(query)

return {
"answer": answer
}


# ---------------------------------------------------------------------------- #
# RunPod Handler #
# ---------------------------------------------------------------------------- #
def handler(event):
"""
This is the handler function that will be called by RunPod serverless.
"""
return process_input(event['input'])


if __name__ == '__main__':
runpod.serverless.start({
'handler': handler,
"return_aggregate_stream": True
})
My problem is that whenever I am testing this out in the requests tab on the dashboard, it keeps saying stream is empty. https://github.com/runpod-workers/worker-vllm
RunPod
Generator Handler
A handler that can stream fractional results.
GitHub
GitHub - runpod-workers/worker-vllm: The RunPod worker template for...
The RunPod worker template for serving our large language model endpoints. Powered by VLLM. - GitHub - runpod-workers/worker-vllm: The RunPod worker template for serving our large language model en...
18 Replies
Concept
Concept•6mo ago
Thank you!
Justin Merrell
Justin Merrell•6mo ago
No problem, let me know if you run into any other issues 🙂
Concept
Concept•6mo ago
I'm assuimg this wouldn't really work if I wanted to test locally because the output would just be async generator objects right?
Justin Merrell
Justin Merrell•6mo ago
How are you testing locally? If you are starting the API server with the rp_serve_api flag there is a stream test endpoint
Concept
Concept•6mo ago
Just python rp_handler.py
Justin Merrell
Justin Merrell•6mo ago
Could you show your local test run to see what you are doing specifically?
Concept
Concept•6mo ago
import runpod
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import PromptTemplate
from langchain.llms import LlamaCpp
from langchain.chains import LLMChain

def load_llm():
n_gpu_layers = 200 # Adjust based on your model and GPU VRAM pool.
n_batch = 500 # Should be between 1 and n_ctx, considering your GPU VRAM.
# Ensure the model path is correct for your system.
llm = LlamaCpp(
model_path="starling-lm-7b-alpha.Q4_K_M.gguf",
n_gpu_layers=n_gpu_layers,
use_mlock=True,
use_mmap=True,
max_tokens=1024,
stop=["Q:","Disclaimer:","</s>","Source:","Legal Inquiry:","\n\n ","Summary:"],
n_batch=n_batch,
temp=0.5,
n_ctx=8192,
repeat_penalty=1.18,
)
print("LLM Loaded! ;)")
return llm

async def process_input(input):
"""
Execute the application code with streaming support.
"""
llm = load_llm()
query = input['query']

prompt = PromptTemplate(
input_variables=["context"],
template="""
GPT4 User: {context}

GPT4 Legal Assistant:
""",
)

llmchain = LLMChain(llm=llm, prompt=prompt)

# Assuming llmchain.run supports asynchronous iteration for streaming.
async for part in llmchain.run(query, stream=True):
yield {"part": part}

# ---------------------------------------------------------------------------- #
# RunPod Handler #
# ---------------------------------------------------------------------------- #
async def handler(event):
"""
Asynchronous handler function for RunPod serverless with streaming support.
"""
async for output in process_input(event['input']):
yield output

if __name__ == '__main__':
runpod.serverless.start({
'handler': handler,
"return_aggregate_stream": True
})
import runpod
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import PromptTemplate
from langchain.llms import LlamaCpp
from langchain.chains import LLMChain

def load_llm():
n_gpu_layers = 200 # Adjust based on your model and GPU VRAM pool.
n_batch = 500 # Should be between 1 and n_ctx, considering your GPU VRAM.
# Ensure the model path is correct for your system.
llm = LlamaCpp(
model_path="starling-lm-7b-alpha.Q4_K_M.gguf",
n_gpu_layers=n_gpu_layers,
use_mlock=True,
use_mmap=True,
max_tokens=1024,
stop=["Q:","Disclaimer:","</s>","Source:","Legal Inquiry:","\n\n ","Summary:"],
n_batch=n_batch,
temp=0.5,
n_ctx=8192,
repeat_penalty=1.18,
)
print("LLM Loaded! ;)")
return llm

async def process_input(input):
"""
Execute the application code with streaming support.
"""
llm = load_llm()
query = input['query']

prompt = PromptTemplate(
input_variables=["context"],
template="""
GPT4 User: {context}

GPT4 Legal Assistant:
""",
)

llmchain = LLMChain(llm=llm, prompt=prompt)

# Assuming llmchain.run supports asynchronous iteration for streaming.
async for part in llmchain.run(query, stream=True):
yield {"part": part}

# ---------------------------------------------------------------------------- #
# RunPod Handler #
# ---------------------------------------------------------------------------- #
async def handler(event):
"""
Asynchronous handler function for RunPod serverless with streaming support.
"""
async for output in process_input(event['input']):
yield output

if __name__ == '__main__':
runpod.serverless.start({
'handler': handler,
"return_aggregate_stream": True
})
Not sure if I added yield correctly but below is my output: --- Starting Serverless Worker | Version 1.4.0 --- INFO | Using test_input.json as job input. DEBUG | Retrieved local job: {'input': {'query': 'Temp', 'stream': True}, 'id': 'local_test'} INFO | local_test | Started. DEBUG | local_test | Handler output: <async_generator object handler at 0x7f83774a6440> DEBUG | local_test | run_job return: {'output': <async_generator object handler at 0x7f83774a6440>} INFO | Job local_test completed successfully. INFO | Job result: {'output': <async_generator object handler at 0x7f83774a6440>} INFO | Local testing complete, exiting.
Justin Merrell
Justin Merrell•6mo ago
I see, I will need to re-visit the testing for streaming. Calling your program with the rp_serve_api is going to be the current option for testing
Concept
Concept•6mo ago
Tried using it. /stream only returns 'detail not found'. I should be expecting my result like this?:
{
"delayTime": 1234,
"executionTime": 1234,
"id": "...",
"output": [
[
{
"text": " Run",
"usage": {
"input": 27,
"output": 1
}
},
{
"text": "Pod",
"usage": {
"input": 27,
"output": 2
}
},
{
"text": " is",
"usage": {
"input": 27,
"output": 3
}
},
{
"text": " considered",
"usage": {
"input": 27,
"output": 4
}
},
{
"text": " the",
"usage": {
"input": 27,
"output": 5
}
},
{
"text": " best",
"usage": {
"input": 27,
"output": 6
}
},
{
"text": " GPU",
"usage": {
"input": 27,
"output": 7
}
},
{
"text": " provider",
"usage": {
"input": 27,
"output": 8
}
},
{
"text": " for",
"usage": {
"input": 27,
"output": 9
}
},
{
"text": " several",
"usage": {
"input": 27,
"output": 10
}
}
]
],
"status": "COMPLETED"
}
{
"delayTime": 1234,
"executionTime": 1234,
"id": "...",
"output": [
[
{
"text": " Run",
"usage": {
"input": 27,
"output": 1
}
},
{
"text": "Pod",
"usage": {
"input": 27,
"output": 2
}
},
{
"text": " is",
"usage": {
"input": 27,
"output": 3
}
},
{
"text": " considered",
"usage": {
"input": 27,
"output": 4
}
},
{
"text": " the",
"usage": {
"input": 27,
"output": 5
}
},
{
"text": " best",
"usage": {
"input": 27,
"output": 6
}
},
{
"text": " GPU",
"usage": {
"input": 27,
"output": 7
}
},
{
"text": " provider",
"usage": {
"input": 27,
"output": 8
}
},
{
"text": " for",
"usage": {
"input": 27,
"output": 9
}
},
{
"text": " several",
"usage": {
"input": 27,
"output": 10
}
}
]
],
"status": "COMPLETED"
}
Justin Merrell
Justin Merrell•6mo ago
Correct, could you provide a screenshot of your test? I'll double check that it is working as expected.
Concept
Concept•6mo ago
Hey Justin! I followed the readme above with the vllm runpod worker and got the streaming to work. One of my questions is that my delay time on this new endpoint is significantly longer than my original endpoint. Seems like adding more workers speeds it up?
Justin Merrell
Justin Merrell•6mo ago
When you sent the first request in were there any workers ready?
Concept
Concept•6mo ago
I think it was a cold start. I'm struggling for the workers now to turn off after running. Ah active workers I'm dumb. I turned on the active worker since the cold boot was 50 seconds haha
azeem5782
azeem5782•5mo ago
Could u please tell, how were u able to stream the output? What changes did you make in your previous code which already had "yield" for streaming and still was not working?
Concept
Concept•5mo ago
GitHub
GitHub - runpod-workers/worker-vllm: The RunPod worker template for...
The RunPod worker template for serving our large language model endpoints. Powered by vLLM. - GitHub - runpod-workers/worker-vllm: The RunPod worker template for serving our large language model en...
azeem5782
azeem5782•5mo ago
I checked this repo, but wasn't able to figure out what was needed. Could u please be a little specific like what exactly helped in streaming apart from yield?
Concept
Concept•5mo ago
I would fork this repo and adapt it to what you need. Use this worker to build/init your llm