Does anyone else have any ideas about my

Does anyone else have any ideas about my issue here? I would be so greatful if there is any other insight into what might be causing this
3 Replies
Sid
Sid2mo ago
Hey, sorry I haven’t had time to look at Discord as much lately, but what does your consumer look like? There’s nothing special about 5 messages, and the fact that they’re ending up in your DLQ generally means your consumer is rejecting them. Maybe looking at your consumer will give me some ideas
tom.owen
tom.owenOP2mo ago
@Sid this is essentially a slightly simplified version of what I have. For the messages that get picked up, the console.log 'starting to process message' is executed. For the messages that are dropped, that line is not executed
export default {
async fetch(req, env, ctx) {
// ... The producer code is here
},

async queue(batch, env) {
// Process each message in the batch
for (let message of batch.messages) {
let { communicationEmissionID, fromAddress, recipient, text } = message.body;

console.log(`Starting to process message`, {
messageBody: message.body
});

try {
// ... Processing the message
message.ack();

} catch (error) {
const errorMessage = error.message || 'Unknown error';
console.error(`Error processing message for ${recipient}:`, errorMessage, {
communicationEmissionID,
error: error.stack
});

// Safely send failure notification
try {
// ... Putting it into a different queue to return the error to my application
} catch (queueError) {
console.error('Failed to send failure notification to COMMUNICATION_RESULT_QUEUE:', queueError.message);
}

message.ack(); // Acknowledge the message even if there's an error because we have already logged it
}
}
await sleep(10000); // Sleep for 30 seconds for rate limiting
},
};

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export default {
async fetch(req, env, ctx) {
// ... The producer code is here
},

async queue(batch, env) {
// Process each message in the batch
for (let message of batch.messages) {
let { communicationEmissionID, fromAddress, recipient, text } = message.body;

console.log(`Starting to process message`, {
messageBody: message.body
});

try {
// ... Processing the message
message.ack();

} catch (error) {
const errorMessage = error.message || 'Unknown error';
console.error(`Error processing message for ${recipient}:`, errorMessage, {
communicationEmissionID,
error: error.stack
});

// Safely send failure notification
try {
// ... Putting it into a different queue to return the error to my application
} catch (queueError) {
console.error('Failed to send failure notification to COMMUNICATION_RESULT_QUEUE:', queueError.message);
}

message.ack(); // Acknowledge the message even if there's an error because we have already logged it
}
}
await sleep(10000); // Sleep for 30 seconds for rate limiting
},
};

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
This is a much more simplified version of what I have containing the parts that I think are the most important:
async queue(batch, env) {
// Process each message in the batch
for (let message of batch.messages) {
let { communicationEmissionID, fromAddress, recipient, text } = message.body;

console.log(`Starting to process message`, {
messageBody: message.body
});

}
await sleep(10000); // Sleep for 30 seconds for rate limiting
},
async queue(batch, env) {
// Process each message in the batch
for (let message of batch.messages) {
let { communicationEmissionID, fromAddress, recipient, text } = message.body;

console.log(`Starting to process message`, {
messageBody: message.body
});

}
await sleep(10000); // Sleep for 30 seconds for rate limiting
},
Murmeltier
Murmeltier2mo ago
only thing fixing this is having max_retries to something higher than 0 otherwise messages will get deleted without any trace of a consumer worker invocation

Did you find this page helpful?