protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting processing");
while (!stoppingToken.IsCancellationRequested)
{
if (_queueService.IsEmpty)
{
// Wait until some messages to process come through
await Task.Delay(_emptyQueueDelay, stoppingToken);
continue;
}
IEnumerable<StreamMessage<MessageType>> messages = _queueService
.DequeueUpTo(_processingBatchMaxSize);
await ProcessMessages(messages);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting processing");
while (!stoppingToken.IsCancellationRequested)
{
if (_queueService.IsEmpty)
{
// Wait until some messages to process come through
await Task.Delay(_emptyQueueDelay, stoppingToken);
continue;
}
IEnumerable<StreamMessage<MessageType>> messages = _queueService
.DequeueUpTo(_processingBatchMaxSize);
await ProcessMessages(messages);
}
}