-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBatchProcessor.cs
More file actions
115 lines (104 loc) · 4.09 KB
/
BatchProcessor.cs
File metadata and controls
115 lines (104 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.Storage.Auth;
using Newtonsoft.Json;
using System.IO;
using System.Linq;
namespace LastFM.ReaderCore
{
public class BatchProcessor : IDisposable
{
private readonly CloudBlobContainer _container;
private readonly string _username;
private readonly IErrorLogger _errorLogger;
private readonly RetryPolicy _retryPolicy;
private readonly HashSet<string> _processedTrackIds;
private readonly List<Track> _allTracks;
public BatchProcessor(string storageAccount, string storageKey, string username, IErrorLogger errorLogger = null)
{
var blobCreds = new StorageCredentials(storageAccount, storageKey);
var storageUri = new Uri($"https://{storageAccount}.blob.core.windows.net/");
var blobClient = new CloudBlobClient(storageUri, blobCreds);
_container = blobClient.GetContainerReference("lastfmdata");
_username = username;
_errorLogger = errorLogger;
_retryPolicy = new RetryPolicy(errorLogger: errorLogger);
_allTracks = new List<Track>();
_processedTrackIds = new HashSet<string>();
}
public async Task InitializeAsync()
{
await _retryPolicy.ExecuteWithRetryAsync(
async () =>
{
await _container.CreateIfNotExistsAsync();
return true;
},
"Initialize container"
);
}
public Task ProcessBatchAsync(IEnumerable<Track> tracks)
{
return Task.Run(() =>
{
foreach (var track in tracks)
{
// Create a unique ID for the track using artist, name, and timestamp
var trackId = $"{track.artist.name}_{track.name}_{track.date?.uts}";
if (!_processedTrackIds.Contains(trackId))
{
_processedTrackIds.Add(trackId);
_allTracks.Add(track);
}
}
});
}
public async Task FinalizeAsync()
{
try
{
var blobName = $"data/{_username}.json";
var blobRef = _container.GetBlockBlobReference(blobName);
// Sort tracks by scrobble time, handling null values
var sortedTracks = _allTracks
.Where(t => t != null) // Filter out any null tracks
.OrderByDescending(t => t.scrobbleTime ?? DateTime.MinValue.ToString("o")) // Use MinValue for null scrobbleTimes
.ToList();
if (!sortedTracks.Any())
{
_errorLogger?.LogError(null, "No tracks to upload");
return;
}
// Serialize the data
var jsonData = JsonConvert.SerializeObject(sortedTracks, new JsonSerializerSettings
{
Formatting = Formatting.Indented,
NullValueHandling = NullValueHandling.Ignore,
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
});
await _retryPolicy.ExecuteWithRetryAsync(
async () =>
{
await blobRef.UploadTextAsync(jsonData);
return true;
},
"Upload final tracks file"
);
_errorLogger?.LogError(null, $"Successfully uploaded {sortedTracks.Count} tracks to {blobName}");
}
catch (Exception ex)
{
_errorLogger?.LogError(ex, $"Error uploading final tracks file for user {_username}");
throw;
}
}
public void Dispose()
{
_allTracks.Clear();
_processedTrackIds.Clear();
}
}
}