-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProgram.cs
More file actions
101 lines (84 loc) · 3.16 KB
/
Program.cs
File metadata and controls
101 lines (84 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
* TPL Dataflow Example for .NET Core
*
* This file demonstrates the implementation of a data processing pipeline using TPL Dataflow.
* The example shows how to process CSV data through multiple stages with different levels of parallelism.
*
* Author: AHT9
* GitHub: https://github.com/Aht9/TPLDataflow
* Email: Amirht97@gmail.com
*
* Key Components:
* - BufferBlock: Acts as a buffer for incoming data
* - TransformBlock: Transforms CSV lines into CustomerRecord objects
* - ActionBlock: Handles the processed records (e.g., saving to database)
*
* Last Updated: September 22, 2025
*/
using System.Threading.Tasks.Dataflow;
namespace TPLDataflow;
class Program
{
static async Task Main(string[] args)
{
string filePath = "bigdata.csv";
// مرحله 1: BufferBlock برای صف دادهها
var bufferBlock = new BufferBlock<string>(
new DataflowBlockOptions { BoundedCapacity = 500 }
);
// مرحله 2: TransformBlock برای تبدیل خط CSV به رکورد معتبر
var transformBlock = new TransformBlock<string, CustomerRecord>(
line =>
{
var columns = line.Split(',');
// اعتبارسنجی داده
if (columns.Length < 3) return null;
return new CustomerRecord
{
Id = int.Parse(columns[0]),
Name = columns[1],
Email = columns[2]
};
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
});
// مرحله 3: ActionBlock برای ذخیره رکورد در دیتابیس
var actionBlock = new ActionBlock<CustomerRecord>(
async record =>
{
if (record == null) return;
await SaveToDatabaseAsync(record);
Console.WriteLine($"Processed: {record.Id}");
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4, // کنترل همزمانی در DB
BoundedCapacity = 100
});
// اتصال بلاکها (Pipeline)
bufferBlock.LinkTo(transformBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
// خواندن خط به خط فایل و ارسال به BufferBlock
foreach (var line in File.ReadLines(filePath).Skip(1)) // Skip header
{
await bufferBlock.SendAsync(line);
}
// پایان پردازش
bufferBlock.Complete();
await actionBlock.Completion;
Console.WriteLine("Processing completed.");
}
static Task SaveToDatabaseAsync(CustomerRecord record)
{
// شبیهسازی ذخیره در DB
return Task.Delay(50);
}
}
public class CustomerRecord
{
public int Id { get; set; }
public string Name { get; set; }
public string Email { get; set; }
}