Is there a recommended way to have a

Is there a recommended way to have a conditional step in a workflow? It seems like an "if" outside of a step, causes an error even if the workflow completes successfully. I'm wanting to pull a config from the KV store, so I can toggle steps on/off without having to deploy code. Should I put the conditional inside a step?
7 Replies
avenceslau
avenceslau2w ago
Hey @Levi you should be able to use if outside of a step can you share what you are doing
Levi
LeviOP2w ago
Here is a slightly modified version of my code, but I think it shows what I'm doing. The workflow runs fine, but I see errors in the Workers observability.
export default class ProcessQuery extends WorkflowEntrypoint<Env, MyPayload>
{
async run(event: WorkflowEvent<MyPayload>, step: WorkflowStep)
{
const config = await step.do("retrieve config", async () => {
return await this.env.KV_CONFIG.get<Config>("config", {type: "json"});
});

if(!config) {
throw new NonRetryableError("could not retrieve config");
}

await step.do("log payload", async () => {
console.log(event.payload);
});

if(config.is_pipeline_enabled) {
try {
await step.do("send data to r2 catalog", async () => {
await this.env.STREAM.send([event.payload]);
});
} catch(e) {
console.log(`failed to send data to r2 catalog.`, e);
}
}
}
}
export default class ProcessQuery extends WorkflowEntrypoint<Env, MyPayload>
{
async run(event: WorkflowEvent<MyPayload>, step: WorkflowStep)
{
const config = await step.do("retrieve config", async () => {
return await this.env.KV_CONFIG.get<Config>("config", {type: "json"});
});

if(!config) {
throw new NonRetryableError("could not retrieve config");
}

await step.do("log payload", async () => {
console.log(event.payload);
});

if(config.is_pipeline_enabled) {
try {
await step.do("send data to r2 catalog", async () => {
await this.env.STREAM.send([event.payload]);
});
} catch(e) {
console.log(`failed to send data to r2 catalog.`, e);
}
}
}
}
avenceslau
avenceslau2w ago
Can you share the error that you are seeing on workers observability?
Levi
LeviOP2w ago
{
"level": "error",
"message": "run",
"$workers": {
"diagnosticsChannelEvents": [],
"truncated": false,
"event": {
"rpcMethod": "run"
},
"scriptName": "ns-dns-jobs",
"outcome": "exception",
"eventType": "rpc",
"entrypoint": "ProcessQuery",
"scriptVersion": {
"id": "617cffcd-94ad-4264-afb9-e99eb516020a"
},
"executionModel": "stateless",
"requestId": "FB5O7UL9UW2PXBN1",
"wallTimeMs": 11430,
"cpuTimeMs": 2
},
"$metadata": {
"id": "01K9SDE9TS33JQ3JS6V5E5Z9M7",
"requestId": "FB5O7UL9UW2PXBN1",
"trigger": "default.run",
"service": "ns-dns-jobs",
"level": "error",
"error": "run",
"message": "run",
"account": "redacted",
"type": "cf-worker-event",
"fingerprint": "032b89c947e3a5fa9ad692189dc61159",
"origin": "rpc",
"messageTemplate": "run"
}
}
{
"level": "error",
"message": "run",
"$workers": {
"diagnosticsChannelEvents": [],
"truncated": false,
"event": {
"rpcMethod": "run"
},
"scriptName": "ns-dns-jobs",
"outcome": "exception",
"eventType": "rpc",
"entrypoint": "ProcessQuery",
"scriptVersion": {
"id": "617cffcd-94ad-4264-afb9-e99eb516020a"
},
"executionModel": "stateless",
"requestId": "FB5O7UL9UW2PXBN1",
"wallTimeMs": 11430,
"cpuTimeMs": 2
},
"$metadata": {
"id": "01K9SDE9TS33JQ3JS6V5E5Z9M7",
"requestId": "FB5O7UL9UW2PXBN1",
"trigger": "default.run",
"service": "ns-dns-jobs",
"level": "error",
"error": "run",
"message": "run",
"account": "redacted",
"type": "cf-worker-event",
"fingerprint": "032b89c947e3a5fa9ad692189dc61159",
"origin": "rpc",
"messageTemplate": "run"
}
}
avenceslau
avenceslau2w ago
That’s a known bug that we are solving (it’s not actually an error) it’s just displayed as such.
Levi
LeviOP2w ago
Thank you @avenceslau ! Is there a github issue I can watch? I'll probably wait until it's fixed before making my steps conditional. I don't like errors 😄 fake ones or real ones

Did you find this page helpful?