C
C#8mo ago
engineertdog

BulkInsert to SQL, parallel processing

I'm trying to improve the processing speed of a 3rd party integration (via SDK), wherein the processed data is written to SQL. I was able to reduce the total time of processing the 3rd party system from 40 hours to 3 by improving the methods used to process the data, and to turn the sequencing processing into parallel processing. The challenge I face now is with writing the data to SQL. What I currently have is the following:
c#
private ConcurrentBag<DataRow> _dataTableRows;
private DataTable _dataTable;

IList<PIPoint> points = PIPoint.FindPIPoints(_piServer, pointQuery).ToList();
PIPointList pointValues = new PIPointList(points);
AFListResults<PIPoint, AFValue> values = pointValues.CurrentValue();

Parallel.ForEach(values, value => {
IDcitionary<string, object> pointAttributes = value.PIPoint.GetAttributes();
DataRow row = ConfigureRow(pointAttributes, value); // this simply converts the 55 attributes & value (with timestamp) into the string columns for the data row

_dataTableRows.Add(row);
this.AddDataTableRecords(100000);
});

public void AddDataTableRecords(int threshold) {
if (_dataTableRows.Count >= threshold) {
lock (_dataTable.Rows.SyncRoot) {
foreach (DataRow row in _dataTableRows.Take(_dataTableRows.Count) {
_dataTable.Rows.Add(row);
}

bool success = _dataController.UpsertDataTableRecords(_dataTable);

if (success) {
_dataTable.Rows.Clear();
}
}
}
}
c#
private ConcurrentBag<DataRow> _dataTableRows;
private DataTable _dataTable;

IList<PIPoint> points = PIPoint.FindPIPoints(_piServer, pointQuery).ToList();
PIPointList pointValues = new PIPointList(points);
AFListResults<PIPoint, AFValue> values = pointValues.CurrentValue();

Parallel.ForEach(values, value => {
IDcitionary<string, object> pointAttributes = value.PIPoint.GetAttributes();
DataRow row = ConfigureRow(pointAttributes, value); // this simply converts the 55 attributes & value (with timestamp) into the string columns for the data row

_dataTableRows.Add(row);
this.AddDataTableRecords(100000);
});

public void AddDataTableRecords(int threshold) {
if (_dataTableRows.Count >= threshold) {
lock (_dataTable.Rows.SyncRoot) {
foreach (DataRow row in _dataTableRows.Take(_dataTableRows.Count) {
_dataTable.Rows.Add(row);
}

bool success = _dataController.UpsertDataTableRecords(_dataTable);

if (success) {
_dataTable.Rows.Clear();
}
}
}
}
With this method, I'll use a temp table with a merge / insert strategy to add or update the records. The problem is that the original Parallel.ForEach is sometimes running twice for the same record, when there truly is only 1 record available. There's 8 million records I need to process in this manner, and without using the Parallel method on the input records, it takes far too long to process all the data. Any thoughts / help?
2 Replies
viceroypenguin
viceroypenguin8mo ago
Couple things: * Stop using ConcurrentBag. The concurrency mechanisms it uses are not as efficient as other ConcurrentXxx data structures. Look at using Channels instead. * Stop using DataTable. It's an old data format that is no longer useful in the modern .net development system. Use a proper ORM, such as EFC, Linq2DB, or even Dapper * For loading massive amounts of data into a SQL server, you want to use SqlBulkCopy. Here's a starting point for some research on it: https://learn.microsoft.com/en-us/dotnet/api/microsoft.data.sqlclient.sqlbulkcopy?view=sqlclient-dotnet-standard-5.1 * Alternatively, you can use the .BulkCopy() method in linq2db, which will be simpler.
SqlBulkCopy Class (Microsoft.Data.SqlClient)
Lets you efficiently bulk load a SQL Server table with data from another source.
engineertdog
engineertdog7mo ago
Yeah, I was already using SqlBulkCopy. I've transitioned away from DataTable to using the native class along with FastMember (to generate the reader to pass to bulk.WriteToServer(reader). Overall, it's working fairly well.
c#
private ConcurrentBag<MyClass> _recordBag = new ConcurrentBag<MyClass>();
private IList<MyClass> _recordList = new List<MyClass>();
private MyClass _row;

public void AddDataTableRecords(int threshold) {
// multi threaded here
if (_recordBag.Count >= threshold) {
lock (_recordList) {
while (_recordBag.TryTake(out _row) {
_recordList.Add(row);
}

bool success = _dataController.UpsertDataTableRecords(_recordList);

if (success) {
_recordList.Clear();
}
}
}
}
c#
private ConcurrentBag<MyClass> _recordBag = new ConcurrentBag<MyClass>();
private IList<MyClass> _recordList = new List<MyClass>();
private MyClass _row;

public void AddDataTableRecords(int threshold) {
// multi threaded here
if (_recordBag.Count >= threshold) {
lock (_recordList) {
while (_recordBag.TryTake(out _row) {
_recordList.Add(row);
}

bool success = _dataController.UpsertDataTableRecords(_recordList);

if (success) {
_recordList.Clear();
}
}
}
}
I'll have to look at the other Concurrent data structures, and look at channels Aside from the 3rd party's IEnumerable being slow as crap, this has worked fairly well (about 4 hours to process everything compared to my original 40 hours with single threaded processing for everything). The only issue is the line _recordBag.Count >= threshold somehow reports true when the actual values are say 532 >= 50000 Also, I should have mentioned. I'm restricted to .NET Framework due to the 3rd party SDK