Background jobs queue management and cleanup?

Couldn't find any useful info in the docs about this. Basically I am reproducing the Background jobs example. I am wondering: 1. queue in this example is basically useless? 2. jobs record will basically grows indefinitely? How would you recommend cleaning it up? Anything native to rivetkit? For reference here is my first implementation:
import { sendEmail } from "@typebot.io/emails/helpers/sendEmail";
import { uploadFileToBucket } from "@typebot.io/lib/s3/uploadFileToBucket";
import { streamAllResultsToCsv } from "@typebot.io/results/streamAllResultsToCsv";
import { readFileSync } from "fs";
import { actor } from "rivetkit";

type JobState = { sendToEmailAfterCompletion?: string } & (
| { status: "pending" }
| { status: "fetching"; progress: number }
| { status: "uploading" }
| { status: "completed"; downloadUrl: string }
| { status: "failed"; error: string }
);

export const exportAllResultsActor = actor({
state: {
jobs: {} as Record<string, JobState>,
},

actions: {
enqueue: (c, jobId: string) => {
if (c.state.jobs[jobId]) return;
c.state.jobs[jobId] = { status: "pending" };
c.schedule.after(0, "process", jobId);
return jobId;
},
process: async (c, jobId: string, typebotId: string) => {
c.state.jobs[jobId] = { status: "fetching", progress: 0 };
const result = await streamAllResultsToCsv(typebotId, {
writeStreamPath: `${jobId}.csv`,
onProgressUpdate: (progress) => {
c.state.jobs[jobId] = {
status: "fetching",
progress,
};
c.broadcast("jobUpdate", { status: "fetching", progress });
},
});
if (result.status === "error") {
c.state.jobs[jobId] = {
status: "failed",
error: result.message,
};
} else {
c.state.jobs[jobId] = {
status: "uploading",
};
c.broadcast("jobUpdate", { status: "uploading" });
const downloadUrl = await uploadFileToBucket({
key: `public/results/${jobId}.csv`,
file: readFileSync(`${jobId}.csv`),
mimeType: "text/csv",
});
c.broadcast("jobUpdate", { status: "completed", downloadUrl });
c.state.jobs[jobId] = {
status: "completed",
downloadUrl,
};
}
},

sendByEmail: async (c, jobId: string) => {
if (
c.state.jobs[jobId].status !== "completed" ||
!c.state.jobs[jobId].sendToEmailAfterCompletion
)
return;
await sendEmail({
to: c.state.jobs[jobId].sendToEmailAfterCompletion,
subject: "Your results are ready",
//TODO: add body
text: "Your results are ready",
});
},

setSendToEmailAfterCompletion: (c, jobId: string, email: string) => {
c.state.jobs[jobId].sendToEmailAfterCompletion = email;
if (c.state.jobs[jobId].status === "completed")
c.schedule.after(0, "sendByEmail", jobId);
},
},
});
import { sendEmail } from "@typebot.io/emails/helpers/sendEmail";
import { uploadFileToBucket } from "@typebot.io/lib/s3/uploadFileToBucket";
import { streamAllResultsToCsv } from "@typebot.io/results/streamAllResultsToCsv";
import { readFileSync } from "fs";
import { actor } from "rivetkit";

type JobState = { sendToEmailAfterCompletion?: string } & (
| { status: "pending" }
| { status: "fetching"; progress: number }
| { status: "uploading" }
| { status: "completed"; downloadUrl: string }
| { status: "failed"; error: string }
);

export const exportAllResultsActor = actor({
state: {
jobs: {} as Record<string, JobState>,
},

actions: {
enqueue: (c, jobId: string) => {
if (c.state.jobs[jobId]) return;
c.state.jobs[jobId] = { status: "pending" };
c.schedule.after(0, "process", jobId);
return jobId;
},
process: async (c, jobId: string, typebotId: string) => {
c.state.jobs[jobId] = { status: "fetching", progress: 0 };
const result = await streamAllResultsToCsv(typebotId, {
writeStreamPath: `${jobId}.csv`,
onProgressUpdate: (progress) => {
c.state.jobs[jobId] = {
status: "fetching",
progress,
};
c.broadcast("jobUpdate", { status: "fetching", progress });
},
});
if (result.status === "error") {
c.state.jobs[jobId] = {
status: "failed",
error: result.message,
};
} else {
c.state.jobs[jobId] = {
status: "uploading",
};
c.broadcast("jobUpdate", { status: "uploading" });
const downloadUrl = await uploadFileToBucket({
key: `public/results/${jobId}.csv`,
file: readFileSync(`${jobId}.csv`),
mimeType: "text/csv",
});
c.broadcast("jobUpdate", { status: "completed", downloadUrl });
c.state.jobs[jobId] = {
status: "completed",
downloadUrl,
};
}
},

sendByEmail: async (c, jobId: string) => {
if (
c.state.jobs[jobId].status !== "completed" ||
!c.state.jobs[jobId].sendToEmailAfterCompletion
)
return;
await sendEmail({
to: c.state.jobs[jobId].sendToEmailAfterCompletion,
subject: "Your results are ready",
//TODO: add body
text: "Your results are ready",
});
},

setSendToEmailAfterCompletion: (c, jobId: string, email: string) => {
c.state.jobs[jobId].sendToEmailAfterCompletion = email;
if (c.state.jobs[jobId].status === "completed")
c.schedule.after(0, "sendByEmail", jobId);
},
},
});
3 Replies
Nathan
Nathan2mo ago
oops, that code snippet was ment to be a vibe coded placeholder not to ship – this page is brand new. it should be 1 actor = 1 job. see https://github.com/rivet-dev/rivet/blob/main/examples/background-jobs/src/registry.ts
GitHub
rivet/examples/background-jobs/src/registry.ts at main · rivet-dev...
An open-source library for long-lived processes with realtime, persistence, and hibernation - rivet-dev/rivet
Nathan
Nathan2mo ago
points #1 and #2 you said are correct it should look smth like this (untested):
import { sendEmail } from "@typebot.io/emails/helpers/sendEmail";
import { uploadFileToBucket } from "@typebot.io/lib/s3/uploadFileToBucket";
import { streamAllResultsToCsv } from "@typebot.io/results/streamAllResultsToCsv";
import { readFileSync } from "fs";
import { actor } from "rivetkit";

interface ExportJobInput {
typebotId: string;
jobId: string;
sendToEmail?: string;
}

export const exportJob = actor({
createState: (c, input) => ({
status: "pending",
typebotId: input.typebotId,
jobId: input.jobId,
sendToEmail: input.sendToEmail,
downloadUrl: undefined as string | undefined,
}),

onCreate: async (c, input: ExportJobInput) => {
c.state.status = "fetching";

const result = await streamAllResultsToCsv(c.state.typebotId, {
writeStreamPath: `${c.state.jobId}.csv`,
onProgressUpdate: (progress) => {
c.state.status = "fetching";
c.broadcast("jobUpdate", { status: "fetching", progress });
},
});

if (result.status === "error") {
c.state.status = "failed";
c.broadcast("jobUpdate", { status: "failed", error: result.message });
} else {
c.state.status = "uploading";
c.broadcast("jobUpdate", { status: "uploading" });

const downloadUrl = await uploadFileToBucket({
key: `public/results/${c.state.jobId}.csv`,
file: readFileSync(`${c.state.jobId}.csv`),
mimeType: "text/csv",
});

c.state.status = "completed";
c.state.downloadUrl = downloadUrl;
c.broadcast("jobUpdate", { status: "completed", downloadUrl });

// If email was provided, send it
if (c.state.sendToEmail) {
c.schedule.after(0, "sendByEmail");
}
}
},

actions: {
sendByEmail: async (c) => {
if (c.state.status !== "completed" || !c.state.sendToEmail) return;

await sendEmail({
to: c.state.sendToEmail,
subject: "Your results are ready",
text: `Your results are ready. Download them here: ${c.state.downloadUrl}`,
});
},
},
});
import { sendEmail } from "@typebot.io/emails/helpers/sendEmail";
import { uploadFileToBucket } from "@typebot.io/lib/s3/uploadFileToBucket";
import { streamAllResultsToCsv } from "@typebot.io/results/streamAllResultsToCsv";
import { readFileSync } from "fs";
import { actor } from "rivetkit";

interface ExportJobInput {
typebotId: string;
jobId: string;
sendToEmail?: string;
}

export const exportJob = actor({
createState: (c, input) => ({
status: "pending",
typebotId: input.typebotId,
jobId: input.jobId,
sendToEmail: input.sendToEmail,
downloadUrl: undefined as string | undefined,
}),

onCreate: async (c, input: ExportJobInput) => {
c.state.status = "fetching";

const result = await streamAllResultsToCsv(c.state.typebotId, {
writeStreamPath: `${c.state.jobId}.csv`,
onProgressUpdate: (progress) => {
c.state.status = "fetching";
c.broadcast("jobUpdate", { status: "fetching", progress });
},
});

if (result.status === "error") {
c.state.status = "failed";
c.broadcast("jobUpdate", { status: "failed", error: result.message });
} else {
c.state.status = "uploading";
c.broadcast("jobUpdate", { status: "uploading" });

const downloadUrl = await uploadFileToBucket({
key: `public/results/${c.state.jobId}.csv`,
file: readFileSync(`${c.state.jobId}.csv`),
mimeType: "text/csv",
});

c.state.status = "completed";
c.state.downloadUrl = downloadUrl;
c.broadcast("jobUpdate", { status: "completed", downloadUrl });

// If email was provided, send it
if (c.state.sendToEmail) {
c.schedule.after(0, "sendByEmail");
}
}
},

actions: {
sendByEmail: async (c) => {
if (c.state.status !== "completed" || !c.state.sendToEmail) return;

await sendEmail({
to: c.state.sendToEmail,
subject: "Your results are ready",
text: `Your results are ready. Download them here: ${c.state.downloadUrl}`,
});
},
},
});
one cool idea might be to save the email so it gets sent once complete instead of having to wait for status == complete
Baptiste
BaptisteOP2mo ago
I appreciate the example, thanks @Nathan will try and report back 🙂

Did you find this page helpful?