Can you clear all of your cookies? cc @Diogo Ferreira
Can you clear all of your cookies? cc @Diogo Ferreira
internal error before the workflow tries to execute any steps. Any ideas? 

WorkflowEntrypoint and bindings are accessed via this.env rather than just env^[a-zA-Z0-9_][a-zA-Z0-9-_]*$ (https://developers.cloudflare.com/workflows/reference/limits)5e3dea7b807381cba36b9292bde14920
1ac93aea0842f2d3ef10111b5bf7bb11
npx wrangler types supposed to work for workflows?services = [{ binding = "WORKFLOW_PROCESS_TRANSACTIONS", service = "money" }]wrangler 3.107.3
wrangler workflows instances list <YOUR_WORKFLOW_NAME> | grep "▶ Running" -B1 | grep -Eo '([a-f0-9\-]{36})' | xargs -I {} wrangler workflows instances terminate <YOUR_WORKFLOW_NAME> {} definitly helped me. You can also do xargs -I {} -P 10 sh -c to parallelize
9 running in the screenshot above, no workflows had run in since Friday. Workflow metrics show 0 instances and 0 steps. Please, help!producer worker where you send the events, and it then places them into the queue.main step, which contains all the rest of the steps as substep. That is - it iterates over the batch of received events one by one and all the rest is the same as before. 
9 processing.
await before step.dointernal errorWorkflowEntrypointthis.envenv^[a-zA-Z0-9_][a-zA-Z0-9-_]*$5e3dea7b807381cba36b9292bde149201ac93aea0842f2d3ef10111b5bf7bb11npx wrangler typesservices = [{ binding = "WORKFLOW_PROCESS_TRANSACTIONS", service = "money" }]wrangler 3.107.3interface Env {
SESSION_SECRET: string;
DB: D1Database;
WORKFLOW_PROCESS_TRANSACTIONS: Fetcher;
}wrangler workflows instances list <YOUR_WORKFLOW_NAME> | grep "▶ Running" -B1 | grep -Eo '([a-f0-9\-]{36})' | xargs -I {} wrangler workflows instances terminate <YOUR_WORKFLOW_NAME> {}xargs -I {} -P 10 sh -c9 runningproducerexport class WORKFLOW_WITH_BATCHED_EVENTS extends WorkflowEntrypoint<Env, BatchPayload> {
async run(event: WorkflowEvent<BatchPayload>, step: WorkflowStep) {
// Can access bindings on `this.env`
// Can access params on `event.payload`
const ctx = this.ctx;
const events = event.payload.data;
// THIS IS THE MAIN STEP, WHICH CONTAINS THE ACTUAL SUB-STEPS, WHAT I MENTIONED BEFORE.
const process_events = await step.do("process_events",
async () => {
/// ITERATE OVER THE EVENTS ONE-BY-ONE
for(const key in events){
const e = events[key];
//SUBSTEPS - Process as usual.
const fbcapi = await step.do(`your_substep_name`,
async () => { ... }
)
//.... the rest of your wf logic
}
//...
})
}9export class RAGWorkflow {
async run(event, step) {
const { text } = event.params;
const record = await step.do('create database record', async () => {
const query = 'INSERT INTO notes (text) VALUES (?) RETURNING *';
const { results } = await env.DB.prepare(query).bind(text).run();
const record = results[0];
if (!record) throw new Error('Failed to create note');
return record;
});
const embedding = await step.do('generate embedding', async () => {
const embeddings = await env.AI.run('@cf/baai/bge-base-en-v1.5', { text: text });
const values = embeddings.data[0];
if (!values) throw new Error('Failed to generate vector embedding');
return values;
});
await step.do('insert vector', async () => {
return env.VECTOR_INDEX.upsert([
{
id: record.id.toString(),
values: embedding,
},
]);
});
}
}interface CycleParams<
I extends Rpc.Serializable<I> = Rpc.Serializable<any>,
O extends Rpc.Serializable<O> = Rpc.Serializable<any>,
> {
name: string;
do: {
name: string;
callback: (data: I) => Promise<O>;
};
sleep?: {
name: string;
duration: WorkflowSleepDuration;
};
}
class Cycle<T> extends WorkflowEntrypoint<Env, CycleParams[]> {
async run(event: WorkflowEvent<CycleParams[]>, step: WorkflowStep) {
const { instanceId, payload, timestamp } = event;
const data: any[] = [null];
for (const [index, cycleStep] of payload.entries()) {
if (cycleStep.sleep)
await step.sleep(cycleStep.sleep.name, cycleStep.sleep.duration);
data[index + 1] = await step.do(cycleStep.do.name, () => {
if (data.length > index + 1) return data[index + 1];
return cycleStep.do.callback(data[index]);
});
}
return data[data.length - 1] as T;
}
}step.do('Validate params', async () => {
if (!event.payload.email) {
throw new NonRetryableError('No email provided');
}
if (!event.payload.url) {
throw new NonRetryableError('No url provided');
}
if (!isValidUrl(event.payload.url)) {
throw new NonRetryableError('Invalid URL');
}
if (!isValidEmail(event.payload.email)) {
throw new NonRetryableError('Invalid email provided');
}
});#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Check if workflow name and status are provided
if [ -z "$1" ] || [ -z "$2" ]; then
echo "Usage: $0 <workflow-name> <status>"
echo "Status can be: queued, running, succeeded, failed"
exit 1
fi
WORKFLOW_NAME="$1"
STATUS="$2"
while true; do
# Get workflow instances with specified status
output=$(npx wrangler workflows instances list "$WORKFLOW_NAME" --status="$STATUS" 2>/dev/null)
ids=$(echo "$output" | awk -F '│' '{gsub(/ /, "", $2); if ($2 ~ /^[a-f0-9-]+$/) print $2}')
instance_count=$(echo "$ids" | wc -w | tr -d ' ')
if [ "$instance_count" -eq 0 ]; then
echo "No more instances found with status '$STATUS'"
break
fi
echo "Found $instance_count instances with status '$STATUS'"
# Process in batches
BATCH_SIZE=5
echo "$ids" | xargs -n$BATCH_SIZE -P$BATCH_SIZE -I{} sh -c '
id="$1"
workflow="$2"
if ! npx wrangler workflows instances terminate "$workflow" "$id" >/dev/null 2>&1; then
echo "Failed to terminate instance $id"
exit 1
fi
echo "Terminated $id"
' -- {} "$WORKFLOW_NAME"
echo "Batch complete, checking for remaining instances..."
done
echo "Done terminating all instances"