Parallel list
I'm trying to write to a list in parallel due to vendor SDK performance issues with sequential operations. I have the following code. Is this a proper approach?
c#
private readonly ConcurrentQueue<DRT> _recordQueue = new ConcurrentQueue<DRT>();
private readonly SemaphoreSlim _flushLock = new SemaphoreSlim(1, 1);
public async Task Execute() {
try {
// external/irrelevant to the list question
await PerformWorkerSetup();
IEnumerable<SRT> sourceRecords = ExtractRecords();
//
await TransformRecords(sourceRecords);
await FlushRecords(1);
} catch (Exception ex) {
Log.Error(ex.Message);
}
}
public async Task TransformRecordsParallel(IEnumerable<SRT> sourceRecords) {
IEnumerable<Task> tasks = sourceRecords.Select(async (record) => {
_recordQueue.Enqueue(this.FormatRecord(record));
await FlushRecords(Config.Threshold);
});
await Task.WhenAll(tasks);
}
public async Task FlushRecords(int threshold) {
if (_recordQueue.Count >= threshold) {
await _flushLock.WaitAsync();
try {
IList<DRT> recordList = new List<DRT>();
// Pull records out of the bag for processing.
while (recordList.Count <= threshold && _recordQueue.TryDequeue(out DRT record)) {
recordList.Add(record);
}
// write the recordList to file or SQL
} catch {
throw;
} finally {
_flushLock.Release();
}
}
}c#
private readonly ConcurrentQueue<DRT> _recordQueue = new ConcurrentQueue<DRT>();
private readonly SemaphoreSlim _flushLock = new SemaphoreSlim(1, 1);
public async Task Execute() {
try {
// external/irrelevant to the list question
await PerformWorkerSetup();
IEnumerable<SRT> sourceRecords = ExtractRecords();
//
await TransformRecords(sourceRecords);
await FlushRecords(1);
} catch (Exception ex) {
Log.Error(ex.Message);
}
}
public async Task TransformRecordsParallel(IEnumerable<SRT> sourceRecords) {
IEnumerable<Task> tasks = sourceRecords.Select(async (record) => {
_recordQueue.Enqueue(this.FormatRecord(record));
await FlushRecords(Config.Threshold);
});
await Task.WhenAll(tasks);
}
public async Task FlushRecords(int threshold) {
if (_recordQueue.Count >= threshold) {
await _flushLock.WaitAsync();
try {
IList<DRT> recordList = new List<DRT>();
// Pull records out of the bag for processing.
while (recordList.Count <= threshold && _recordQueue.TryDequeue(out DRT record)) {
recordList.Add(record);
}
// write the recordList to file or SQL
} catch {
throw;
} finally {
_flushLock.Release();
}
}
}