C
C#•4w ago
Rory&

Optimising read-and-process loop

.
21 Replies
Rory&
Rory&OP•4w ago
was going to test, but it seems wildly unstable...
Rory&
Rory&OP•4w ago
thanks :)
canton7
canton7•4w ago
And yeah, filesystem caches will play havoc with your testing if you're not careful
Rory&
Rory&OP•4w ago
await foreach (var res in GetSerializedUnoptimisedResponses()) {
if (res.resp is null) continue;
}
await foreach (var res in GetSerializedUnoptimisedResponses()) {
if (res.resp is null) continue;
}
Result: 2m17s
List<string> serialisedKeys = new(4000000);
await foreach (var res in GetSerializedUnoptimisedResponses()) {
if (res.resp is null) continue;
serialisedKeys.Add(res.key);
if (serialisedKeys.Count % 1000 == 0) _ = Console.Out.WriteAsync($"{serialisedKeys.Count}\r");
}

var chunkSize = serialisedKeys.Count / Environment.ProcessorCount;
var chunks = serialisedKeys.Chunk(chunkSize+1).Select(x => (x.First(), x.Length)).ToList();
Console.WriteLine($"Got {chunks.Count} chunks:");
foreach (var chunk in chunks) {
Console.WriteLine($"Chunk {chunk.Item1} with length {chunk.Length}");
}
List<string> serialisedKeys = new(4000000);
await foreach (var res in GetSerializedUnoptimisedResponses()) {
if (res.resp is null) continue;
serialisedKeys.Add(res.key);
if (serialisedKeys.Count % 1000 == 0) _ = Console.Out.WriteAsync($"{serialisedKeys.Count}\r");
}

var chunkSize = serialisedKeys.Count / Environment.ProcessorCount;
var chunks = serialisedKeys.Chunk(chunkSize+1).Select(x => (x.First(), x.Length)).ToList();
Console.WriteLine($"Got {chunks.Count} chunks:");
foreach (var chunk in chunks) {
Console.WriteLine($"Chunk {chunk.Item1} with length {chunk.Length}");
}
Result: 1m24s
canton7
canton7•4w ago
Now run the first again 😛
Rory&
Rory&OP•4w ago
i wonder if i can somehow make the directory a tmpfs to bypass caching? sure 1m27
canton7
canton7•4w ago
Yeah there we go
Rory&
Rory&OP•4w ago
i probably should be testing without filesystem cache though, as that's the most likely case :p
canton7
canton7•4w ago
I'm not sure how you can get rid of it tbh -- it tends to be an OS thing
Rory&
Rory&OP•4w ago
echo 3 > /proc/sys/vm/drop_caches is what im going to try oh yeah, its hella slow now not going to do any intermediate flushes, just once before starting my test 2m58 on the second one though i highly doubt some parallelisation would hurt, with multiqueue access etc
canton7
canton7•4w ago
Doing parallel disk access tends to hurt
Rory&
Rory&OP•4w ago
private async Task<List<string>> GetSerializedUnoptimisedKeysParallel(string start = "init") {
ConcurrentDictionary<string, string> pairs = [];
var unoptimisedKeys = (await storageProvider.GetAllKeysAsync()).Where(static x => !x.StartsWith("old/")).ToFrozenSet();
await Parallel.ForEachAsync(unoptimisedKeys, async (key, _) => {
var data = await storageProvider.LoadObjectAsync<SyncResponse>(key, SyncResponseSerializerContext.Default.SyncResponse);
if (data is null) return;
pairs.TryAdd(key, data.NextBatch);
});

var serializedKeys = new List<string>();
var currentKey = start;
while (pairs.TryGetValue(currentKey, out var nextKey)) {
serializedKeys.Add(currentKey);
currentKey = nextKey;
}

return serializedKeys;
}
private async Task<List<string>> GetSerializedUnoptimisedKeysParallel(string start = "init") {
ConcurrentDictionary<string, string> pairs = [];
var unoptimisedKeys = (await storageProvider.GetAllKeysAsync()).Where(static x => !x.StartsWith("old/")).ToFrozenSet();
await Parallel.ForEachAsync(unoptimisedKeys, async (key, _) => {
var data = await storageProvider.LoadObjectAsync<SyncResponse>(key, SyncResponseSerializerContext.Default.SyncResponse);
if (data is null) return;
pairs.TryAdd(key, data.NextBatch);
});

var serializedKeys = new List<string>();
var currentKey = start;
while (pairs.TryGetValue(currentKey, out var nextKey)) {
serializedKeys.Add(currentKey);
currentKey = nextKey;
}

return serializedKeys;
}
just wrote this up real quick, so guess we'll find out isnt that what multiqueueing is for?
canton7
canton7•4w ago
"multiqueueing"?
Rory&
Rory&OP•4w ago
https://docs.kernel.org/block/blk-mq.html huh... didnt expect to get a... JsonReaderException?
fail: ModerationClient.ViewModels.ClientViewModel[0]
Error running client view model.
System.AggregateException: One or more errors occurred. ('s' is an invalid start of a value. Path: $ | LineNumber: 0 | BytePositionInLine: 0.)
---> System.Text.Json.JsonException: 's' is an invalid start of a value. Path: $ | LineNumber: 0 | BytePositionInLine: 0.
---> System.Text.Json.JsonReaderException: 's' is an invalid start of a value. LineNumber: 0 | BytePositionInLine: 0.
at System.Text.Json.ThrowHelper.ThrowJsonReaderException(Utf8JsonReader& json, ExceptionResource resource, Byte nextByte, ReadOnlySpan`1 bytes)
at System.Text.Json.Utf8JsonReader.ConsumeValue(Byte marker)
at System.Text.Json.Utf8JsonReader.ReadFirstToken(Byte first)
at System.Text.Json.Utf8JsonReader.ReadSingleSegment()
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, T& value, JsonSerializerOptions options, ReadStack& state)
--- End of inner exception stack trace ---
at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, JsonReaderException ex)
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, T& value, JsonSerializerOptions options, ReadStack& state)
at System.Text.Json.Serialization.Metadata.JsonTypeInfo`1.Deserialize(Stream utf8Json)
at ModerationClient.Services.FileStorageProvider.LoadObjectAsync[T](String key, JsonTypeInfo`1 jsonTypeInfo) in /home/Rory/git/matrix/ModerationClient/ModerationClient/Services/FileStorageProvider.cs:line 64
at LibMatrix.Helpers.SyncStateResolver.<>c__DisplayClass31_0.<<GetSerializedUnoptimisedKeysParallel>b__0>d.MoveNext() in /home/Rory/git/matrix/ModerationClient/LibMatrix/LibMatrix/Helpers/SyncStateResolver.cs:line 85
--- End of stack trace from previous location ---
at System.Threading.Tasks.Parallel.<>c__53`1.<<ForEachAsync>b__53_0>d.MoveNext()
--- End of stack trace from previous location ---
at LibMatrix.Helpers.SyncStateResolver.GetSerializedUnoptimisedKeysParallel(String start) in /home/Rory/git/matrix/ModerationClient/LibMatrix/LibMatrix/Helpers/SyncStateResolver.cs:line 84
at LibMatrix.Helpers.SyncStateResolver.OptimiseStore(Action`2 progressCallback) in /home/Rory/git/matrix/ModerationClient/LibMatrix/LibMatrix/Helpers/SyncStateResolver.cs:line 124
at ModerationClient.ViewModels.ClientViewModel.Run() in /home/Rory/git/matrix/ModerationClient/ModerationClient/ViewModels/ClientViewModel.cs:line 99
--- End of inner exception stack trace ---
fail: ModerationClient.ViewModels.ClientViewModel[0]
Error running client view model.
System.AggregateException: One or more errors occurred. ('s' is an invalid start of a value. Path: $ | LineNumber: 0 | BytePositionInLine: 0.)
---> System.Text.Json.JsonException: 's' is an invalid start of a value. Path: $ | LineNumber: 0 | BytePositionInLine: 0.
---> System.Text.Json.JsonReaderException: 's' is an invalid start of a value. LineNumber: 0 | BytePositionInLine: 0.
at System.Text.Json.ThrowHelper.ThrowJsonReaderException(Utf8JsonReader& json, ExceptionResource resource, Byte nextByte, ReadOnlySpan`1 bytes)
at System.Text.Json.Utf8JsonReader.ConsumeValue(Byte marker)
at System.Text.Json.Utf8JsonReader.ReadFirstToken(Byte first)
at System.Text.Json.Utf8JsonReader.ReadSingleSegment()
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, T& value, JsonSerializerOptions options, ReadStack& state)
--- End of inner exception stack trace ---
at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, JsonReaderException ex)
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, T& value, JsonSerializerOptions options, ReadStack& state)
at System.Text.Json.Serialization.Metadata.JsonTypeInfo`1.Deserialize(Stream utf8Json)
at ModerationClient.Services.FileStorageProvider.LoadObjectAsync[T](String key, JsonTypeInfo`1 jsonTypeInfo) in /home/Rory/git/matrix/ModerationClient/ModerationClient/Services/FileStorageProvider.cs:line 64
at LibMatrix.Helpers.SyncStateResolver.<>c__DisplayClass31_0.<<GetSerializedUnoptimisedKeysParallel>b__0>d.MoveNext() in /home/Rory/git/matrix/ModerationClient/LibMatrix/LibMatrix/Helpers/SyncStateResolver.cs:line 85
--- End of stack trace from previous location ---
at System.Threading.Tasks.Parallel.<>c__53`1.<<ForEachAsync>b__53_0>d.MoveNext()
--- End of stack trace from previous location ---
at LibMatrix.Helpers.SyncStateResolver.GetSerializedUnoptimisedKeysParallel(String start) in /home/Rory/git/matrix/ModerationClient/LibMatrix/LibMatrix/Helpers/SyncStateResolver.cs:line 84
at LibMatrix.Helpers.SyncStateResolver.OptimiseStore(Action`2 progressCallback) in /home/Rory/git/matrix/ModerationClient/LibMatrix/LibMatrix/Helpers/SyncStateResolver.cs:line 124
at ModerationClient.ViewModels.ClientViewModel.Run() in /home/Rory/git/matrix/ModerationClient/ModerationClient/ViewModels/ClientViewModel.cs:line 99
--- End of inner exception stack trace ---
oh, i see. it was reading metadata files i was writing into subdirs because i was getting root level keys wrong result: 32s
Rory&
Rory&OP•4w ago
No description
Rory&
Rory&OP•4w ago
lets try without FS cache
Rory&
Rory&OP•4w ago
No description
Rory&
Rory&OP•4w ago
1m16s without filesystem cache
private async Task<List<string>> GetSerializedUnoptimisedKeysParallel(string start = "init") {
ConcurrentDictionary<string, string> pairs = [];
var unoptimisedKeys = (await storageProvider.GetAllKeysAsync()).Where(static x => !x.Contains('/')).ToFrozenSet();
await Parallel.ForEachAsync(unoptimisedKeys, async (key, _) => {
try {
var data = await storageProvider.LoadObjectAsync<SyncResponse>(key, SyncResponseSerializerContext.Default.SyncResponse);
if (data is null) return;
pairs.TryAdd(key, data.NextBatch);
}
catch (Exception e) {
Console.WriteLine($"Failed to read {key}:");
throw;
}
});

var serializedKeys = new List<string>();
var currentKey = start;
while (pairs.TryGetValue(currentKey, out var nextKey)) {
serializedKeys.Add(currentKey);
currentKey = nextKey;
}

return serializedKeys;
}

// Usage:
List<string> serialisedKeys = await GetSerializedUnoptimisedKeysParallel();

var chunkSize = serialisedKeys.Count / Environment.ProcessorCount;
var chunks = serialisedKeys.Chunk(chunkSize+1).Select(x => (x.First(), x.Length)).ToList();
Console.WriteLine($"Got {chunks.Count} chunks:");
foreach (var chunk in chunks) {
Console.WriteLine($"Chunk {chunk.Item1} with length {chunk.Length}");
}
private async Task<List<string>> GetSerializedUnoptimisedKeysParallel(string start = "init") {
ConcurrentDictionary<string, string> pairs = [];
var unoptimisedKeys = (await storageProvider.GetAllKeysAsync()).Where(static x => !x.Contains('/')).ToFrozenSet();
await Parallel.ForEachAsync(unoptimisedKeys, async (key, _) => {
try {
var data = await storageProvider.LoadObjectAsync<SyncResponse>(key, SyncResponseSerializerContext.Default.SyncResponse);
if (data is null) return;
pairs.TryAdd(key, data.NextBatch);
}
catch (Exception e) {
Console.WriteLine($"Failed to read {key}:");
throw;
}
});

var serializedKeys = new List<string>();
var currentKey = start;
while (pairs.TryGetValue(currentKey, out var nextKey)) {
serializedKeys.Add(currentKey);
currentKey = nextKey;
}

return serializedKeys;
}

// Usage:
List<string> serialisedKeys = await GetSerializedUnoptimisedKeysParallel();

var chunkSize = serialisedKeys.Count / Environment.ProcessorCount;
var chunks = serialisedKeys.Chunk(chunkSize+1).Select(x => (x.First(), x.Length)).ToList();
Console.WriteLine($"Got {chunks.Count} chunks:");
foreach (var chunk in chunks) {
Console.WriteLine($"Chunk {chunk.Item1} with length {chunk.Length}");
}
so if im seeing it right, that's > 2x as fast 37s without filesystem cache, if i replace the ConcurrentDictionary with a regular Dictionary + lock so what, 6x faster? i'd assume it'd start thrasing on HDDs though... but that should be solved by using a decent scheduler at the OS kernel level hm, with the merge logic, that gives me 12m42s, and that doesnt even do the final merge implemented the merging recursively, its quite fast now but allocates a bunch of ReadStackFrames testing without filesystem cache: - chunked+threading approach: 13m27s - recursive with async (div2): 2m20s both depend on GetSerializedUnoptimisedKeysParallel as defined above honestly im pretty happy with that result
Rory&
Rory&OP•4w ago
Rory&
Rory&OP•4w ago
something seems a bit odd here ngl lol though i probably should reorder the data to be chronological...

Did you find this page helpful?