I don't think that works for the case of retry limits being reached
I don't think that works for the case of retry limits being reached
console.log messages emitted during that workflow run.`
export abstract class AbstractWorkflow<T, R> extends WorkflowEntrypoint<Env, T> {
protected abstract getName(): string;
protected abstract validateInput(input: T): void;
protected abstract runStep(event: WorkflowEvent<T>, step: WorkflowStep): Promise<R>;
async run(event: WorkflowEvent<T>, step: WorkflowStep): Promise<unknown> {
console.log({
message: `${this.getName()} started.`,
level: "info",
event,
});
try {
this.validateInput(event.payload);
const output = await this.runStep(event, step);
console.log({
message: `${this.getName()} completed.`,
level: "info",
event,
output,
});
return output;
} catch (error) {
console.log({
message: `${this.getName()} failed.`,
level: "error",
event,
error,
});
throw error;
}
}
}

waitForEvent step, but when we use sendEvent in our route to continue the workflow, we are sometimes waiting up to 45 minutes for the workflow to continue. Some extra info:sendEvent is being called is deployed to a Cloudflare Worker.sendEvent is called, and the types match.waitForEvent step has only been waiting for a few minutes I haven’t recreated the issue yet, but when the waitForEvent step has been waiting 20+ minutes we run into the issue the majority of the time.workflows-starter@0.0.1 deploy
wrangler deploy
do something with ids run multiple times even tho it doesn't fail or anything

fetch(req: Request, env: Env): Promise<Response> for the root, and the favicon requests that explains why each step is being called multiple times.await env.WORKFLOW_NAME.create() with an ambiguous "internal error", whereas some succeeded."Error: internal error" as far as I can tell.Promise.allif i recall correctly.`
export abstract class AbstractWorkflow<T, R> extends WorkflowEntrypoint<Env, T> {
protected abstract getName(): string;
protected abstract validateInput(input: T): void;
protected abstract runStep(event: WorkflowEvent<T>, step: WorkflowStep): Promise<R>;
async run(event: WorkflowEvent<T>, step: WorkflowStep): Promise<unknown> {
console.log({
message: `${this.getName()} started.`,
level: "info",
event,
});
try {
this.validateInput(event.payload);
const output = await this.runStep(event, step);
console.log({
message: `${this.getName()} completed.`,
level: "info",
event,
output,
});
return output;
} catch (error) {
console.log({
message: `${this.getName()} failed.`,
level: "error",
event,
error,
});
throw error;
}
}
}
// src/workers/my.worker.ts
import { MyWorkflow } from "../workflows/my.workflow";
export {
MyWorkflow
};
type Bindings = {
MY_WORKFLOW: Workflow;
};
const app = new Hono<{ Bindings: Bindings }>();
app.post('/', async (c) => {
return c.env.MY_WORKFLOW.create(..);
})
export default {
fetch: app.fetch,
};export default {
async fetch(req: Request, env: Env): Promise<Response> {
let instance = await env.DATA_WORKFLOW.create();
return Response.json({
id: instance.id,
details: await instance.status(),
});
},
};async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const storageKey = `instructions-${event.instanceId}.json`;
const defaultConfig = {retries: { limit: 1, delay: 1000 }};
try {
const ids = await step.do('download ids', defaultConfig, async () => {
//
});
await step.do('do something with ids', defaultConfig, async () => {
//
});
await step.do('do something with ids', defaultConfig, async () => {
//
});
// Cleanup after successful completion
await this.env.TEMP_STORAGE.delete(storageKey);
} catch (error) {
// Cleanup on failure
await this.env.TEMP_STORAGE.delete(storageKey);
throw new NonRetryableError("Unknown error");
}
let workflow = await this.env.DATA_WORKFLOW.get(event.instanceId);
await workflow.terminate();
}fetch(req: Request, env: Env): Promise<Response>await env.WORKFLOW_NAME.create()"Error: internal error"[32mdetails:[0m { "request_id": "ae65c446-f13d-4f95-98b8-ed25ccfa47e6", "fileId": "a386a543-91f9-4a78-af85-257ee4ba2728", "error": "internal error", "stack": "Error: internal error\n at callFetcher (cloudflare-internal:workflows-api:24:15)\n at async WorkflowImpl.create (cloudflare-internal:workflows-api:78:24)\n at async fileUploadService (main.js:919559:11)\n at async fileUploadController (main.js:919663:32)\n at async dispatch (main.js:1116:21)\n at async main.js:859416:9\n at async dispatch (main.js:1116:21)\n at async main.js:859416:9\n at async dispatch (main.js:1116:21)\n at async main.js:967997:5" }",Promise.all// ✅ Good: steps that are dynamically named are constructed in a deterministic way.
// In this case, `catList` is a step output, which is stable, and `catList` is
// traversed in a deterministic fashion (no shuffles or random accesses) so,
// it's fine to dynamically name steps (e.g: create a step per list entry).
let catList = await step.do("get cat list from KV", async () => {
return await env.KV.get("cat-list");
});
for (const cat of catList) {
await step.do(`get cat: ${cat}`, async () => {
return await env.KV.get(cat);
});
}