C
C#3w ago
AllMight

Thread safety when mutating an object

I have two IAsyncEnumerables I want to join into one dictionary of result objects without buffering them into memory - which means build the results objects as they come from the IAsyncEnumerable. Will this be thread safe and work correctly or do I need to ensure better thread safety?
public class Result
{
public string Name { get; set; }
public int Age { get; set; }
}

public record NameResult(string Id, string Name);
public record AgeResult(string Id, int Age);

public class Service
{
public async Task<IDictionary<string, Result>> ProcessAsync(
IAsyncEnumerable<NameResult> namesSource,
IAsyncEnumerable<AgeResult> agesSource)
{
var results = new ConcurrentDictionary<string, Result>();

async Task ConsumeNamesSource()
{
await foreach (NameResult nameResult in namesSource)
{
var result = results.GetOrAdd(nameResult.Id, _ => new Result { Name = nameResult.Name });
result.Name = nameResult.Name;
}
}

async Task ConsumeAgesSource()
{
await foreach (AgeResult ageResult in agesSource)
{
var result = results.GetOrAdd(ageResult.Id, _ => new Result { Age = ageResult.Age });
result.Age = ageResult.Age;
}
}

Task namesConsumer = ConsumeNamesSource();
Task agesConsumer = ConsumeAgesSource();

await namesConsumer;
await agesConsumer;

return results;
}
}
public class Result
{
public string Name { get; set; }
public int Age { get; set; }
}

public record NameResult(string Id, string Name);
public record AgeResult(string Id, int Age);

public class Service
{
public async Task<IDictionary<string, Result>> ProcessAsync(
IAsyncEnumerable<NameResult> namesSource,
IAsyncEnumerable<AgeResult> agesSource)
{
var results = new ConcurrentDictionary<string, Result>();

async Task ConsumeNamesSource()
{
await foreach (NameResult nameResult in namesSource)
{
var result = results.GetOrAdd(nameResult.Id, _ => new Result { Name = nameResult.Name });
result.Name = nameResult.Name;
}
}

async Task ConsumeAgesSource()
{
await foreach (AgeResult ageResult in agesSource)
{
var result = results.GetOrAdd(ageResult.Id, _ => new Result { Age = ageResult.Age });
result.Age = ageResult.Age;
}
}

Task namesConsumer = ConsumeNamesSource();
Task agesConsumer = ConsumeAgesSource();

await namesConsumer;
await agesConsumer;

return results;
}
}
19 Replies
canton7
canton73w ago
That depends entirely on whether the thread which runs ProcessAsync is something like a UI thread, which has a SynchronizationContext installed on it, which posts messages back to a dedicated thread's message queue (also, note, you can do await Task.WhenAll(namesConsumer, agesConsumer) and save yourself an await
AllMight
AllMightOP3w ago
Ah forgot to mention... dotnet 8 we service
canton7
canton73w ago
"we"?
AllMight
AllMightOP3w ago
web* Rest API web server
canton7
canton73w ago
So, you might have two threads calling GetOrAdd at the same time. But I've just noticed it's a ConcurrentDictionary so that should be fine The guarantees made by GetOrAdd ensure that you can't get a situation where two threads call GetOrAdd at the same time with different IDs, and GetOrAdd ends up returning different objects to both I think you'd be better off with AddOrUpdate anyway: that stops you setting Name/Age twice if you insert the object
AllMight
AllMightOP3w ago
So the update fn will mutate the object and return itself? And also is it safe to mutate an object from different threads like what's happening here?
canton7
canton73w ago
Yes and yes. The two threads are mutating different fields, and not reading the other field, so it's OK
AllMight
AllMightOP3w ago
And will be OK no matter the field type inside?
canton7
canton73w ago
If one thread was making decisions based on whether the other thread had written the other field, that would be racy, potentially But it's safe for one thread to be manipulating one field, and another thread to be manipulating another field, as long as they just limit themselves to "their" field
AllMight
AllMightOP3w ago
And just to make sure GetOrAdd is wrong here or just that GetOrUpdate is better?
canton7
canton73w ago
The field type might matter if you have two different threads doing stuff to a single field at the same time. But since each field is only interacted with by a single thread, you can't have two things happening to a single field at the same time GetOrAdd is fine. Just in the case where you insert an object, you set e.g. Name twice, as you do new Result { Name = nameResult.Name } then result.Name = nameResult.Name
AllMight
AllMightOP3w ago
Ah got it makes sense This is better or just a stylistic preference?
canton7
canton73w ago
Slightly better. With two awaits, a thread has to do a bit of work once the first await completes, just to start the second await
AllMight
AllMightOP3w ago
Interesting I was pretty sure when all does await in a loop or similar Good to know @canton7 Anyway thanks a lot!
AllMight
AllMightOP3w ago
@canton7 btw is it a valid approach for the problem or will you do it differently?
canton7
canton73w ago
I've been trying to think of a better way and tbh I'm struggling. I was wondering whether there's something in System.Linq.AsyncEnumerable, but anything I can come up with is probably slower and less obvious than your approach It might be quicker to just:
var namesTask = namesSource.ToListAsync();
var dict = await agesSource.ToDictionaryAsync(x => x.Id, x => new Result { Age = x.Age });
foreach (var nameResult in await namesTask)
{
dict[nameResult.Id].Name = nameResult.Name;
}
var namesTask = namesSource.ToListAsync();
var dict = await agesSource.ToDictionaryAsync(x => x.Id, x => new Result { Age = x.Age });
foreach (var nameResult in await namesTask)
{
dict[nameResult.Id].Name = nameResult.Name;
}
It means that you can't do both names and ages in parallel, and you need to buffer all the names in memory, but you do cut out the overhead of a ConcurrentDictionary, so it might be quicker? You'd need to test it
AllMight
AllMightOP3w ago
Ok got it thanks... I doubt it as the IAsyncEnumerable can return 100K items and have more properties that what's shown here Still worth the benchmark though
canton7
canton73w ago
Fair!

Did you find this page helpful?