import { sendEmail } from "@typebot.io/emails/helpers/sendEmail";
import { uploadFileToBucket } from "@typebot.io/lib/s3/uploadFileToBucket";
import { streamAllResultsToCsv } from "@typebot.io/results/streamAllResultsToCsv";
import { readFileSync } from "fs";
import { actor } from "rivetkit";
type JobState = { sendToEmailAfterCompletion?: string } & (
| { status: "pending" }
| { status: "fetching"; progress: number }
| { status: "uploading" }
| { status: "completed"; downloadUrl: string }
| { status: "failed"; error: string }
);
export const exportAllResultsActor = actor({
state: {
jobs: {} as Record<string, JobState>,
},
actions: {
enqueue: (c, jobId: string) => {
if (c.state.jobs[jobId]) return;
c.state.jobs[jobId] = { status: "pending" };
c.schedule.after(0, "process", jobId);
return jobId;
},
process: async (c, jobId: string, typebotId: string) => {
c.state.jobs[jobId] = { status: "fetching", progress: 0 };
const result = await streamAllResultsToCsv(typebotId, {
writeStreamPath: `${jobId}.csv`,
onProgressUpdate: (progress) => {
c.state.jobs[jobId] = {
status: "fetching",
progress,
};
c.broadcast("jobUpdate", { status: "fetching", progress });
},
});
if (result.status === "error") {
c.state.jobs[jobId] = {
status: "failed",
error: result.message,
};
} else {
c.state.jobs[jobId] = {
status: "uploading",
};
c.broadcast("jobUpdate", { status: "uploading" });
const downloadUrl = await uploadFileToBucket({
key: `public/results/${jobId}.csv`,
file: readFileSync(`${jobId}.csv`),
mimeType: "text/csv",
});
c.broadcast("jobUpdate", { status: "completed", downloadUrl });
c.state.jobs[jobId] = {
status: "completed",
downloadUrl,
};
}
},
sendByEmail: async (c, jobId: string) => {
if (
c.state.jobs[jobId].status !== "completed" ||
!c.state.jobs[jobId].sendToEmailAfterCompletion
)
return;
await sendEmail({
to: c.state.jobs[jobId].sendToEmailAfterCompletion,
subject: "Your results are ready",
//TODO: add body
text: "Your results are ready",
});
},
setSendToEmailAfterCompletion: (c, jobId: string, email: string) => {
c.state.jobs[jobId].sendToEmailAfterCompletion = email;
if (c.state.jobs[jobId].status === "completed")
c.schedule.after(0, "sendByEmail", jobId);
},
},
});
import { sendEmail } from "@typebot.io/emails/helpers/sendEmail";
import { uploadFileToBucket } from "@typebot.io/lib/s3/uploadFileToBucket";
import { streamAllResultsToCsv } from "@typebot.io/results/streamAllResultsToCsv";
import { readFileSync } from "fs";
import { actor } from "rivetkit";
type JobState = { sendToEmailAfterCompletion?: string } & (
| { status: "pending" }
| { status: "fetching"; progress: number }
| { status: "uploading" }
| { status: "completed"; downloadUrl: string }
| { status: "failed"; error: string }
);
export const exportAllResultsActor = actor({
state: {
jobs: {} as Record<string, JobState>,
},
actions: {
enqueue: (c, jobId: string) => {
if (c.state.jobs[jobId]) return;
c.state.jobs[jobId] = { status: "pending" };
c.schedule.after(0, "process", jobId);
return jobId;
},
process: async (c, jobId: string, typebotId: string) => {
c.state.jobs[jobId] = { status: "fetching", progress: 0 };
const result = await streamAllResultsToCsv(typebotId, {
writeStreamPath: `${jobId}.csv`,
onProgressUpdate: (progress) => {
c.state.jobs[jobId] = {
status: "fetching",
progress,
};
c.broadcast("jobUpdate", { status: "fetching", progress });
},
});
if (result.status === "error") {
c.state.jobs[jobId] = {
status: "failed",
error: result.message,
};
} else {
c.state.jobs[jobId] = {
status: "uploading",
};
c.broadcast("jobUpdate", { status: "uploading" });
const downloadUrl = await uploadFileToBucket({
key: `public/results/${jobId}.csv`,
file: readFileSync(`${jobId}.csv`),
mimeType: "text/csv",
});
c.broadcast("jobUpdate", { status: "completed", downloadUrl });
c.state.jobs[jobId] = {
status: "completed",
downloadUrl,
};
}
},
sendByEmail: async (c, jobId: string) => {
if (
c.state.jobs[jobId].status !== "completed" ||
!c.state.jobs[jobId].sendToEmailAfterCompletion
)
return;
await sendEmail({
to: c.state.jobs[jobId].sendToEmailAfterCompletion,
subject: "Your results are ready",
//TODO: add body
text: "Your results are ready",
});
},
setSendToEmailAfterCompletion: (c, jobId: string, email: string) => {
c.state.jobs[jobId].sendToEmailAfterCompletion = email;
if (c.state.jobs[jobId].status === "completed")
c.schedule.after(0, "sendByEmail", jobId);
},
},
});