Hey team ! I'm trying to edit the
Hey team ! I'm trying to edit the concurrency on some of my queues, but the option doesn't exist anymore:

89 Replies
Moreover, the queue concurrency was at 250, but now it max at 20

You set concurrency when adding a Consumer to the Worker

The actual concurrency value goes up over time based on how many unacknowledged messages you have and if your queue() handler isn’t throwing errors. Maybe after 20 you just ran out of messages?
@Sid , unlikely since we had 100k messages in the queue and it was stalling. Sorry I forgot to add this detail in the thread
I'm currently testing several configurations, but seems like that from time to time the queue is randomely hanging and will not try to add more consumers. The error rates is the same so I don't think it comes from that ?
The new queue I'm using has the following configuration:
The message retention is the default one of
345600 seconds
. I'll update the results in the threadWith this setup the queue doesn't scale that much ?
The behaviour we had before the problem, was that the queue would scale to 100 concurrency and with the same amount of messages will clean it without any issues. I'm doing all my tests with the same exact messages and the same Workers behind it. There is no notable difference




@1984 Ford Laser thanks btw for the feedback ! I thought it wasn't on the UI anymore but indeed you can change it changing the attached consumer ! Nice catch :cooldoge:
Can you give me a queue ID?
f024d80269db4736a73e60db222dec7d
I changed the batching to 1 instead of 50 and the concurrency went up to 130
Thanks for looking into this @Sid
Hey sorry for the delay. I looked into this and things do seem to be working as expected.
I will say though that there are a bunch of improvements we can make here. I’ve been doing some of that on the producer side recently, and I can probably get some quick wins on the consumer side as well.
I think the problem was the batching @Sid . When going from 50 to just 1, the queue managed to empty normally. This is a strange behaviour because for 1 year we had a batching of 100 with 20 concurrency and it worked like a charm. Maybe something changed on that part ?
Hm, we haven’t changed anything on that front yet.
If you’re saying that lowering your batch size increased your concurrency, then I would expect that. Consumer concurrency calculation isn’t straightforward right now so it’s hard to explain here, but if you really do value concurrency that much then lowering the batch size is the right thing to do.
@Sid | Queues, R2 it's pretty random on our side still. The 1 batch fixed for a few hours, but when doing it again today it takes 2 hours to do 70k messages, where we had 100k in 30 minutes before (we didn't change the code from the infra). I've reverted now to the 100 batch messages and see if it changes anything, but seems like the queues are not scaling like before
Hey @Sid | Queues, R2, I'm making a lot of different tests, but it seems like every time the queues are hanging. The consumer has a minimal code where it just do a 3 http request that shouldn't take too much time. Also, the queue were doing 250 concurrency on a 100 batch. Now the concurrency is dropping randamely, the backlog is not emptying as fast as before, and changing the parameters, or spawning a new queue is not helping at all. Here are some screenshots:



Queue Id:
fa75809b889646e1bb7e87bd33fe735d
(we created a new one in case it solved something haha)Can I get back to you on Monday? I’ll check how long your queue() handler is actually taking
3 HTTP requests seem fine. Our consumer concurrency logic (which can be improved for sure), currently looks at the rate of success of your handlers, and bumps up concurrency every 5 seconds or so
I will say though, we should look at the lag time instead of focusing on concurrency in general. Are you expecting the 5K seconds we predict to go much lower?
The number of messages (140k) also does not seem like a big deal to me. I’m unfortunately away from my laptop right now, but I’ll get back to you.
Aside: on our end we should probably look at the backlog size as well when scaling consumers up, this is something I want to do when I get some time. That should also help you predict concurrency a lot better (plus it’ll probably match your expectations better)
Yes of course @Sid ! Thanks for looking into it :D
Are you expecting the 5K seconds we predict to go much lower?Yes definetely. It was around 700 seconds before. I don't know what happened with the queues recently ^^' I just always assumed that the concurrency was based on the backlog + average lag time
For additional information, here is the state of the same queue since the previous screenshot

It's not the first time we noticed that kind of weird curves (we've opened tickets in the past) but seems like the problem is still not resolved ^^'
The last 12 hours stats look like this. Before we were doing 100k messages in 30 minutes, now it's 200k messages in 6 hours

@Sid I confirm with you that unfortunately we are still having the same lag in all of our queues. Should I open a formal ticket since it's impacting our PRD and STG at this point ?

Yeah let’s open a ticket. It’s going to come back to me, but I can look into this. Sorry it’s been a really busy few days
Hi @Sid ! I took some time, but I finally opened the case since the problem is coming back again:
CaseId: 01630994
Thank you for your time and I'm looking forward to sort this out ! :D
Hi @Sid ! I haven't heard from the ticket and it seems that the performances are still unstable:

The queue randomly dropped to 1 concurrency even though there was still messages in the queue
Is this still your Queue ID?
fa75809b889646e1bb7e87bd33fe735d
? I don't see any traffic on this one for the last 7 daysThis problem is happening on all of our queues to be fair @Sid . I need to find back the queue Id for this speicifc one
Can you use the /link command to link your account here? Then I can look at what queues you have
Generally I’m looking for one with some traffic / backlog
Consumer concurrency going down might be a result of what happens in your queue() handler. That’s what I wanted to check
Thanks @Sid for pointing that out ! I wasn't aware that we could link our Cloudflare accounts. Just did it :)
Hi @Sid ! We are still experimenting the problem and I would say it's even worse than before
f024d80269db4736a73e60db222dec7d
On the span of 12 hours the queue didn't move ^^'

Oh man I’m so sorry I’ve been absolutely swamped, let me catch back up on your case
No worries at all I totally get that. It's just that it's production breaking at this point and we are really out of ideas ^^'
Your issue was that you’re not seeing as much concurrency as you’d want, right?
As you see the lag time is growing, the queue is not shrinking and the concurrency is around 10/15 instead of the 250 that it should scale to
And no matter how many messages are in the queue (20k or 2M) it will be bloated no matter what
So the first thing I see is that your queue consumer takes a while, a few minutes
IIRC you’ve already tried reducing your batch size, right?
Yes we've experimented with 1, 5, 50, 100
The consumer takes time because of the underlying HTTP requests it does, but it has no CPU time
I'm wondering if a rule changed on the queue concurrency calculation regarding this, because as outlined in the thread and the 2 tickets we created, we didn't have this problem before and it would scale to 250 in a few minutes
So definitely nothing has changed in that part of our code, but the consumer concurrency not scaling up is definitely the issue. If you say it used to go up to 250 before then I’m confused why it isn’t anymore. Looking
It’s probably because you stopped writing to the queue actually, now that I think about it
Mm maybe not
Yeah it worked fine before so I'm puzzled since we didn't change much on our side
The consumer doesn't need to see the response of the HTTP request, so we could drop the connection as long as the worker it pings can actually continue the process even if the client dropped. We tried to experiment with that but we didn't manage to make it work
I'm going afk for a few hours but I'll be on my phone to answer some questions. Feel free to DM me if you require information that can't be shared publicly :)
And thanks for your responsiveness again, I know it's a tricky case and I'm really glad you're looking into it
So I’m looking at our code and your Queue, and I think one issue that’s preventing consumer concurrency from going up consistently is that your Worker regularly crashes (either because of an uncouth exception or memory issues). When you get back, can you confirm if that’s what you see on your end as well?
Hey @Sid ! Thanks for your time. So I'm back and I'm looking into this. The consumer had 2k errors in the last 12 hours. And around 400 in the last 3 hours. I don't know how it is calculated on your side, but it seems like a low error count for this type of impact ?

The error that it logged is mostly a parsing error when the underlying worker has a 500 error, and that's it
I don't see a memory issue here, but indeed those are all uncaught exceptions
So there is 1 error per 10 execution if my math is right ?, would that make the queue hang that much ?
I'm deploying a new version attempting to address this by removing catching the error and removing the baselime telemetry since it has been discontinued
I've deployed it at 16h32 UTC just to monitor the logs
👀

Seems like this was the error \o/
I'm wondering if I can further optimise by taking the 100 messages, and sending asynchronously 10 requests at a time, wait for those and then continue the loop, instead of doing one by one
IIRC there is 15 subrequests per invocation at once right ?
I'm currently monitoring the consumer and the queue, but it seems that when reaching the 250 it just stopped ack the messages :/

This is the logs from the consumer. It seems it stopped acknowledging at the same moment it stopped receiving messages. And it's when it reached 250 concurrency :NotLikeThis:

So this is the current state of the queue


It seems that the queue randomely exceed CPUs once in a while and then works correctly. I'm wondering it this has to do with the concurrency rising, it somehow creates an exceed in CPU error, so the concurrency goes down and then it's just looping ?

I've tried changing the batch time, hard coding the ack in the consumer, catching all errors and optimising the consumer but I don't know what to do anymore. I'm really having a hard time understanding if it comes from the engineering or the infrastructure @Sid
So to be clear, when we see errors from your consumer, we halve your concurency, so a regular rate of errors would prevent you from going super high. The reason this behavior exists is so that we don’t chew through your backlog of you have a faulty release, but I’m beginning to question this decision since it makes it hard for people to predict concurrency
This is unexpected. You’re supposed to ACK messages BTW, by calling the .ack() method on your message / batch in the queue() handler
Alright so I can confirm that there were periods of time when your consumer stopped crashing and as a result your concurrency shot up
From my understanding of the documentation, I thought we could just return the queue without acknowledging:
These APIs allow a consumer Worker to consume messages from a Queue. To define a consumer Worker, add aqueue()
function to the default export of the Worker. This will allow it to receive messages from the Queue. By default, all messages in the batch will be acknowledged as soon as all of the following conditions are met: 1. Thequeue()
function has returned. 2. If thequeue()
function returned a promise, the promise has resolved. 3. Any promises passed towaitUntil()
have resolved. If thequeue()
function throws, or the promise returned by it or any of the promises passed towaitUntil()
were rejected, then the entire batch will be considered a failure and will be retried according to the consumer's retry settings.
This is true, I didn’t realize you were depending this
So on my end I see your queue handlers still taking a while, are you sure your promises are resolving in time?
Here’s what I’d recommend: if your ultimate goal is max concurrency, then use a really small batch size, and do little work at a time.
Yeah, I've currently set the batch size to 5 and I'm still experimenting a lag
You said you’re making a few HTTP requests right? What batch size would let you make 1-2 requests per queue() invocation?
1 message = 1 HTTP request
Right now I’m seeing your handler running for 15 mins, after which we just cancel it
Let’s set your batch size to 1 then, and do your HTTP request without a waitUntil.
Let’s try this out and make sure things work the way you expect, then we can build from there
Good call. We don't have a waitUntil, just an await on the fetch
Ah okay!
Let’s do 1, and then let me know. If it’s just 1 HTTP request and you’re still seeing your handler run for 15 mins, then we can look into that
The weird thing is that I don't see anywhere my handler running for 15 minutes ? With 5 requests the P999 was 37 seconds

In the meantime the handler is currenlty set to a batch of 1

I’ve got some metrics on my end that track how long your Worker is actually running in our infra. If you’re not doing any waitUntils, then that’d be kinda weird
That's so odd since my logs are not giving me this info ^^'
Here are the logs of the queue and the consumer for the last 15 minutes. No errors, but no read/write either. As if the consumer is not called anymore atm



Give it just a couple of minutes. Your concurrency went down to 1 and one of your handlers is still stuck, so we’ll have to wait 15 mins for it to time out. Should happen any minute now
Makes sense ! Thanks for the deep explanations it really helps me understand how to debug this more
In all honesty you shouldn’t have to understand all this haha, this is exactly why you offload this work to us
You should be starting to see concurrency go up
Yes I see the concurrency go up !
Okay let’s let it top out and then decide if the speed of processing is good enough for you. If it is, we should leave it as-is, otherwise we can start parallelizing your upstream requests by upping your batch size
Anything above 6 is going to cause problems because a Worker can only have 6 concurrency outgoing subrequests at a time, so if you try to make like 10 requests in a Promise.all, 4 of them will get queued up. So really there’s no point in going above a batch size of 5 in use-cases like yours (AKA where you’re using a queue to defer upstream requests, this is quite common)
A few weeks ago we had 3000 messages / minute, so I guess that would be top notch
Good to know ! So the max batch size would be 5 and then doing a Promise.all
And how long do your upstream requests take?
It really depends on the message. Here is the data for the last 15 minutes

P99 is 7 seconds max
I see. So I guess even if all of them take 7 seconds, with a concurrency of 250, hours should be able to reach 2K/min at least
Roughly
Setting your batch size to 2 should get you what you want then. I’m trying to prioritize stability here, which is why I’m not asking you to set it to 5 if 2 works
I see your Queue is quite backed up, if you want we can leave this for a bit and let your handlers run it back down. Then, we can look at what your write rate looks like (I think you’ve probably stopped writing to this queue?)
If a batch size of 1 lets you keep up with writes, then that’s what you want
Ideally that’s what the perfect setup is, your read rate matching your write rate. Trying to over-optimize the read rate is not really worth the effort
I’m seeing your concurrency is still at 250, but your queue() handler is starting to take a while again
For our context, it's expected the write to be faster than the read since we are just splitting data and putting in the queue for the write, and then the queue is distributing to the worker to do complex operations depending on what's in the message. So I don't expect to be able to match that
It's odd since the underlying worker has normal response time, and the consumer is really doing:
1. Getting message
2. Sending message to the worker
3. returning

Okay so I see a few more CPU Exceeded errors, which is going to hurt your concurrency
I don't get how we can have CPU Exceeded errors with a consumer that just waits for a single request 😅
It’s weird because your Worker doesn’t seem to be seeing these CPU errors?
I don't see any errors in the last 15 minutes in the logs section of the consumer
What is your Worker (that you’re calling from your queue() handler) doing?
A lot of different analysis
CPU heavy work?
However I confirm that the subworker has some exceeded limits
scratch that, it doesn't (I was checking the wrong time) ^^'
Not CPU heavy, it analyzes data by doing some HTTP requests too, but it takes different routes of requests depending on the message
Ok so on my side, the consumer and the subworker didn't run in any errors
And as you noticed the concurrency is now to 0
I can’t look into this anymore today but I think this is a bug we’ve been trying to track down for a while. Generally things seems to resolve on their own, but not in your case. Maybe what I ca do on Monday is force your consumer to stay at max concurrency.
Thanks for all your time today @Sid
Might have found the root cause. Sometimes the underlying worker is going 800k ms somehow. I'm investigating
Seems like the Worker is calling an API that takes > 15 minutes to answer a single call. I'm currently adding a timeout of 30 seconds
Well that sorted the issue
We are doing between 4000 to 6000 messages a minute currently
:Peepo_TypingHappy:
Oh interesting, that didn’t show up in the Worker’s own metrics?
Nice, good stuff
Not on the consumer level. I had to check the observability and select the query by hand
Ah maybe because of sampling. I guess it was quite rare then
Hey @Sid ! Here is a quick feedback a few weeks later: We don't have any problem with the queues anymore and it totally work as expected. We managed to otpimise the queues even more and now we are achieving 500% to 600% improvement on speed depending on the messages. It's insane ! :D
Thank you so much for your help, you rock 🤟
Haha nice, glad it’s working the way you expect now!