streamedQuery increase default timeout
Hello it's there a way to increase
streamedQuery or useQuery default timeout? I've got a SSE response which takes more than 30 seconds6 Replies
fascinating-indigo•2mo ago
Sounds like a timeout in whatever you're using to fetch. Query is agnostic
optimistic-goldOP•2mo ago
The thing is I don’t have any timeout
I’m using next.js route and useQuery with streamedQuery
fascinating-indigo•2mo ago
I haven't used next in awhile but I assume they define a default max duration, in this case it sounds like 30 seconds
optimistic-goldOP•2mo ago
maybe Next.js it will limit my request?
I've got something like this route.ts
type Params = { session_id: string };
export async function POST(
req: NextRequest,
{ params }: { params: Promise<Params> },
) {
const { session_id } = await params;
const body: ChatStreamBodyRequest = await req.json();
const upstream = await fetchAuth(streamChatUrl(session_id), {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify(body),
cache: "no-store",
next: { revalidate: 0 },
});
return new NextResponse(upstream.body, {
status: upstream.status,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
}
type Params = { session_id: string };
export async function POST(
req: NextRequest,
{ params }: { params: Promise<Params> },
) {
const { session_id } = await params;
const body: ChatStreamBodyRequest = await req.json();
const upstream = await fetchAuth(streamChatUrl(session_id), {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify(body),
cache: "no-store",
next: { revalidate: 0 },
});
return new NextResponse(upstream.body, {
status: upstream.status,
headers: {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
}
rare-sapphire•2mo ago
there is no timeout in react-query
optimistic-goldOP•2mo ago
I'm not sure, but it's okay how I stream the messages or I should use a mutation?
export type ChatStreamEvent =
| { type: "thought"; content: string }
| { type: "tool_call"; tool_name: string; tool_args: unknown }
| { type: "error"; content: string };
interface ChatAnswerParams {
streamId: number;
payload: ChatStreamBodyRequest;
}
async function* parseSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
) {
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
for (
let boundary = buffer.indexOf("\n\n");
boundary !== -1;
boundary = buffer.indexOf("\n\n")
) {
const chunk = buffer.slice(0, boundary).trim();
buffer = buffer.slice(boundary + 2);
if (!chunk.startsWith("data:")) continue;
const json = chunk.replace(/^data:\s*/, "");
try {
const event = JSON.parse(json);
yield event as ChatStreamEvent;
} catch (err) {
// biome-ignore lint/suspicious/noConsole: <debug>
console.warn("Failed to parse SSE chunk:", chunk, err);
}
}
}
}
function chatAnswer({ streamId, payload }: ChatAnswerParams) {
return async function* ({ signal }: QueryFunctionContext) {
const response = await fetch(`/insights/chat/api/stream/${streamId}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify(payload),
signal,
});
if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`);
const reader = response.body?.getReader();
if (!reader) throw new Error("No reader available");
try {
for await (const event of parseSSEStream(reader)) yield event;
} finally {
reader.releaseLock();
}
};
}
export type ChatStreamEvent =
| { type: "thought"; content: string }
| { type: "tool_call"; tool_name: string; tool_args: unknown }
| { type: "error"; content: string };
interface ChatAnswerParams {
streamId: number;
payload: ChatStreamBodyRequest;
}
async function* parseSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
) {
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
for (
let boundary = buffer.indexOf("\n\n");
boundary !== -1;
boundary = buffer.indexOf("\n\n")
) {
const chunk = buffer.slice(0, boundary).trim();
buffer = buffer.slice(boundary + 2);
if (!chunk.startsWith("data:")) continue;
const json = chunk.replace(/^data:\s*/, "");
try {
const event = JSON.parse(json);
yield event as ChatStreamEvent;
} catch (err) {
// biome-ignore lint/suspicious/noConsole: <debug>
console.warn("Failed to parse SSE chunk:", chunk, err);
}
}
}
}
function chatAnswer({ streamId, payload }: ChatAnswerParams) {
return async function* ({ signal }: QueryFunctionContext) {
const response = await fetch(`/insights/chat/api/stream/${streamId}`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify(payload),
signal,
});
if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`);
const reader = response.body?.getReader();
if (!reader) throw new Error("No reader available");
try {
for await (const event of parseSSEStream(reader)) yield event;
} finally {
reader.releaseLock();
}
};
}
export const chatStreamKey = (id: number, content?: string) =>
content ? ["chat-stream", id, content] : ["chat-stream", id];
export const chatQueryOptions = (
streamId: number,
message: ChatSessionMessagechema,
) =>
queryOptions({
queryKey: chatStreamKey(streamId, message.content),
queryFn: streamedQuery({
streamFn: chatAnswer({
streamId,
payload: { content: message.content, role: message.role },
}),
}),
staleTime: Number.POSITIVE_INFINITY,
gcTime: Number.POSITIVE_INFINITY,
retry: false,
});
export function useChatStream(
streamId: number,
message: ChatSessionMessagechema,
enabled = true,
) {
const isActive = enabled && streamId > 0 && message.role === "user";
return useQuery<ChatStreamEvent[], Error>({
queryKey: chatStreamKey(streamId, message.content),
queryFn: isActive
? chatQueryOptions(streamId, message).queryFn
: async () => [],
enabled: isActive,
});
}
export const chatStreamKey = (id: number, content?: string) =>
content ? ["chat-stream", id, content] : ["chat-stream", id];
export const chatQueryOptions = (
streamId: number,
message: ChatSessionMessagechema,
) =>
queryOptions({
queryKey: chatStreamKey(streamId, message.content),
queryFn: streamedQuery({
streamFn: chatAnswer({
streamId,
payload: { content: message.content, role: message.role },
}),
}),
staleTime: Number.POSITIVE_INFINITY,
gcTime: Number.POSITIVE_INFINITY,
retry: false,
});
export function useChatStream(
streamId: number,
message: ChatSessionMessagechema,
enabled = true,
) {
const isActive = enabled && streamId > 0 && message.role === "user";
return useQuery<ChatStreamEvent[], Error>({
queryKey: chatStreamKey(streamId, message.content),
queryFn: isActive
? chatQueryOptions(streamId, message).queryFn
: async () => [],
enabled: isActive,
});
}