I will upload in a moment the producer/consumer workers in GitHub.
I will upload in a moment the producer/consumer workers in GitHub.
9 processing.
await before step.dopause the workflow from within a workflow instance, like calling the workflow binding and then resuming the workflow from a webhook or something like thatconst wf = await MY_WORKFLOW.get(id) and then await wf.status() should return errors, but they are always empty.An RPC stub was not disposed properly. You must call dispose() on all stubs in order to let the other side know that you are no longer using them. You cannot rely on the garbage collector for this because it may take arbitrarily long before actually collecting unreachable objects. As a shortcut, calling dispose() on the result of an RPC call disposes all stubs within it.step.do expects to return Rpc.Serializable<unknown>, so I did a little bit of reading about the RPC lifecycle here and found that in the context of Worker Service Bindings, you should either use the experimental using syntax, or explicitly call object[Symbol.dispose](). step.do, and not others. If someone could provide some insight as to what's going on, that would be great. Happy to provide more details also

sleep locally with a number value, it treats it like ms instead of secondssleep it's seconds not msstep.do('Validate params', async () => {
if (!event.payload.email) {
throw new NonRetryableError('No email provided');
}
if (!event.payload.url) {
throw new NonRetryableError('No url provided');
}
if (!isValidUrl(event.payload.url)) {
throw new NonRetryableError('Invalid URL');
}
if (!isValidEmail(event.payload.email)) {
throw new NonRetryableError('Invalid email provided');
}
});#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Check if workflow name and status are provided
if [ -z "$1" ] || [ -z "$2" ]; then
echo "Usage: $0 <workflow-name> <status>"
echo "Status can be: queued, running, succeeded, failed"
exit 1
fi
WORKFLOW_NAME="$1"
STATUS="$2"
while true; do
# Get workflow instances with specified status
output=$(npx wrangler workflows instances list "$WORKFLOW_NAME" --status="$STATUS" 2>/dev/null)
ids=$(echo "$output" | awk -F '│' '{gsub(/ /, "", $2); if ($2 ~ /^[a-f0-9-]+$/) print $2}')
instance_count=$(echo "$ids" | wc -w | tr -d ' ')
if [ "$instance_count" -eq 0 ]; then
echo "No more instances found with status '$STATUS'"
break
fi
echo "Found $instance_count instances with status '$STATUS'"
# Process in batches
BATCH_SIZE=5
echo "$ids" | xargs -n$BATCH_SIZE -P$BATCH_SIZE -I{} sh -c '
id="$1"
workflow="$2"
if ! npx wrangler workflows instances terminate "$workflow" "$id" >/dev/null 2>&1; then
echo "Failed to terminate instance $id"
exit 1
fi
echo "Terminated $id"
' -- {} "$WORKFLOW_NAME"
echo "Batch complete, checking for remaining instances..."
done
echo "Done terminating all instances"while (true) {
const isDone = await checkIfSomethingIsDone()
if (isDone) break;
step.sleep('wait for event', '5 minutes');
}export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
step.do('Step 1', async () => { return 'Step 1 finished!'; });
step.do('Step 2', {
retries: { limit: 2, delay: '1 second', backoff: 'linear', },
}, async () => {
console.log('Trying Step 2');
throw new NonRetryableError('Step 3 failed. Non-retryable');
});
}
}An RPC stub was not disposed properly. You must call dispose() on all stubs in order to let the other side know that you are no longer using them. You cannot rely on the garbage collector for this because it may take arbitrarily long before actually collecting unreachable objects. As a shortcut, calling dispose() on the result of an RPC call disposes all stubs within it.export class WORKFLOW_WITH_BATCHED_EVENTS extends WorkflowEntrypoint<Env, BatchPayload> {
async run(event: WorkflowEvent<BatchPayload>, step: WorkflowStep) {
// Can access bindings on `this.env`
// Can access params on `event.payload`
const ctx = this.ctx;
const events = event.payload.data;
// THIS IS THE MAIN STEP, WHICH CONTAINS THE ACTUAL SUB-STEPS, WHAT I MENTIONED BEFORE.
const process_events = await step.do("process_events",
async () => {
/// ITERATE OVER THE EVENTS ONE-BY-ONE
for(const key in events){
const e = events[key];
//SUBSTEPS - Process as usual.
const fbcapi = await step.do(`your_substep_name`,
async () => { ... }
)
//.... the rest of your wf logic
}
//...
})
}interface CycleParams<
I extends Rpc.Serializable<I> = Rpc.Serializable<any>,
O extends Rpc.Serializable<O> = Rpc.Serializable<any>,
> {
name: string;
do: {
name: string;
callback: (data: I) => Promise<O>;
};
sleep?: {
name: string;
duration: WorkflowSleepDuration;
};
}
class Cycle<T> extends WorkflowEntrypoint<Env, CycleParams[]> {
async run(event: WorkflowEvent<CycleParams[]>, step: WorkflowStep) {
const { instanceId, payload, timestamp } = event;
const data: any[] = [null];
for (const [index, cycleStep] of payload.entries()) {
if (cycleStep.sleep)
await step.sleep(cycleStep.sleep.name, cycleStep.sleep.duration);
data[index + 1] = await step.do(cycleStep.do.name, () => {
if (data.length > index + 1) return data[index + 1];
return cycleStep.do.callback(data[index]);
});
}
return data[data.length - 1] as T;
}
}