From a774b8f430a5196d55828e740b339a3380f44900 Mon Sep 17 00:00:00 2001 From: John Erickson Date: Mon, 7 Jul 2025 13:22:57 -0700 Subject: [PATCH 1/3] handle concurrent uploads --- .../PipelineCachingCacheClient.cs | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/AzurePipelines/PipelineCachingCacheClient.cs b/src/AzurePipelines/PipelineCachingCacheClient.cs index 2155c57..2092e95 100644 --- a/src/AzurePipelines/PipelineCachingCacheClient.cs +++ b/src/AzurePipelines/PipelineCachingCacheClient.cs @@ -292,12 +292,22 @@ protected override async Task AddNodeAsync( result.ProofNodes, ContentFormatConstants.Files); - CreateResult createResult = await WithHttpRetries( - () => _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken), + await WithHttpRetries( + async () => + { + try + { + CreateResult createResult = await _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken); + Tracer.Debug(context, $"Cache entry for {fingerprint} stored in scope `{createResult.ScopeUsed}`"); + } + catch (PipelineCacheItemAlreadyExistsException) + { + Tracer.Debug(context, $"Cache entry for {fingerprint} already exists."); + } + }, cacheContext: context, message: $"Storing cache key for {fingerprint}", cancellationToken); - Tracer.Debug(context, $"Cache entry stored in scope `{createResult.ScopeUsed}`"); } finally { @@ -730,16 +740,27 @@ private string ComputeSelectorsKey(BuildXL.Cache.MemoizationStore.Interfaces.Ses cancellationToken); } + private Task WithHttpRetries(Func taskFactory, Context cacheContext, string message, CancellationToken token) + => WithHttpRetries( + async () => + { + await taskFactory(); + return 0; // we don't care about the result, just that it succeeded + }, + cacheContext, + message, + token); + private Task WithHttpRetries(Func> taskFactory, Context cacheContext, string message, CancellationToken token) { return AsyncHttpRetryHelper.InvokeAsync( - taskFactory, - maxRetries: 10, - tracer: _azureDevopsTracer, - canRetryDelegate: _ => true, // retry on any exception - cancellationToken: token, - continueOnCapturedContext: false, - context: EmbedCacheContext(cacheContext, message)); + taskFactory, + maxRetries: 10, + tracer: _azureDevopsTracer, + canRetryDelegate: _ => true, // retry on any exception + cancellationToken: token, + continueOnCapturedContext: false, + context: EmbedCacheContext(cacheContext, message)); } public override async ValueTask DisposeAsync() From 93838c72acdb4bf83cc72673e243cef9393674d2 Mon Sep 17 00:00:00 2001 From: John Erickson Date: Mon, 7 Jul 2025 16:30:16 -0700 Subject: [PATCH 2/3] supress code analysis --- src/AzurePipelines/PipelineCachingCacheClient.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/AzurePipelines/PipelineCachingCacheClient.cs b/src/AzurePipelines/PipelineCachingCacheClient.cs index 2092e95..6187a97 100644 --- a/src/AzurePipelines/PipelineCachingCacheClient.cs +++ b/src/AzurePipelines/PipelineCachingCacheClient.cs @@ -740,6 +740,7 @@ private string ComputeSelectorsKey(BuildXL.Cache.MemoizationStore.Interfaces.Ses cancellationToken); } +#pragma warning disable CA1859 // returning Task would be misleading private Task WithHttpRetries(Func taskFactory, Context cacheContext, string message, CancellationToken token) => WithHttpRetries( async () => @@ -750,6 +751,7 @@ private Task WithHttpRetries(Func taskFactory, Context cacheContext, strin cacheContext, message, token); +#pragma warning restore CA1859 // returning Task would be misleading private Task WithHttpRetries(Func> taskFactory, Context cacheContext, string message, CancellationToken token) { From c6877d880e2813308916ae6fb3602bc70ffae830 Mon Sep 17 00:00:00 2001 From: John Erickson Date: Tue, 8 Jul 2025 08:57:54 -0700 Subject: [PATCH 3/3] Align impls better. --- .../PipelineCachingCacheClient.cs | 49 ++++++++++--------- src/Common/Caching/CacheClient.cs | 16 ++++-- src/Common/Caching/CasCacheClient.cs | 6 ++- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/AzurePipelines/PipelineCachingCacheClient.cs b/src/AzurePipelines/PipelineCachingCacheClient.cs index 6187a97..a4613fa 100644 --- a/src/AzurePipelines/PipelineCachingCacheClient.cs +++ b/src/AzurePipelines/PipelineCachingCacheClient.cs @@ -185,7 +185,7 @@ string s when int.TryParse(s, out int i) => i, _startupTask = Task.Run(() => QueryPipelineCaching(rootContext, new VisualStudio.Services.PipelineCache.WebApi.Fingerprint("init"), CancellationToken.None)); } - protected override async Task AddNodeAsync( + protected override async Task AddNodeAsync( Context context, StrongFingerprint fingerprint, IReadOnlyDictionary outputs, @@ -195,10 +195,11 @@ protected override async Task AddNodeAsync( { if (_remoteCacheIsReadOnly) { - return; + return AddNodeResult.Skipped; } // write the SFP -> manifest + bool sfpAddded; List tempFilePaths = new(); try { @@ -268,7 +269,7 @@ protected override async Task AddNodeAsync( infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray(); } - var result = await WithHttpRetries( + PublishResult result = await WithHttpRetries( () => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken), cacheContext: context, message: $"Publishing content for {fingerprint}", @@ -292,17 +293,19 @@ protected override async Task AddNodeAsync( result.ProofNodes, ContentFormatConstants.Files); - await WithHttpRetries( + sfpAddded = await WithHttpRetries( async () => { try { CreateResult createResult = await _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken); Tracer.Debug(context, $"Cache entry for {fingerprint} stored in scope `{createResult.ScopeUsed}`"); + return true; } catch (PipelineCacheItemAlreadyExistsException) { Tracer.Debug(context, $"Cache entry for {fingerprint} already exists."); + return false; } }, cacheContext: context, @@ -320,6 +323,7 @@ await WithHttpRetries( } // add the WFP -> Selector mapping + bool wfpAddded; List pathSetTempFiles = new(); try { @@ -373,13 +377,13 @@ await WithHttpRetries( } } - var result = await WithHttpRetries( + PublishResult result = await WithHttpRetries( () => _manifestClient.PublishAsync(TempFolder, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken), cacheContext: context, message: $"Publishing content for {fingerprint}", cancellationToken); - var entry = new CreatePipelineCacheArtifactContract( + CreatePipelineCacheArtifactContract entry = new( DomainId, new VisualStudio.Services.PipelineCache.WebApi.Fingerprint(key.Split(KeySegmentSeperator)), result.ManifestId, @@ -387,13 +391,27 @@ await WithHttpRetries( result.ProofNodes, ContentFormatConstants.Files); - CreateResult createResult = await WithHttpRetries( - () => _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken), + wfpAddded = await WithHttpRetries( + async () => + { + try + { + CreateResult createResult = await _cacheClient.CreatePipelineCacheArtifactAsync(entry, null, cancellationToken); + Tracer.Debug(context, $"SFP `{fingerprint}` stored in scope `{createResult.ScopeUsed}`"); + return true; + } + catch (PipelineCacheItemAlreadyExistsException) + { + return false; + } + }, cacheContext: context, message: $"Storing cache key for {fingerprint}", cancellationToken); - Tracer.Debug(context, $"SFP `{fingerprint}` stored in scope `{createResult.ScopeUsed}`"); + return wfpAddded || sfpAddded + ? AddNodeResult.Added + : AddNodeResult.AlreadyExists; } finally { @@ -740,19 +758,6 @@ private string ComputeSelectorsKey(BuildXL.Cache.MemoizationStore.Interfaces.Ses cancellationToken); } -#pragma warning disable CA1859 // returning Task would be misleading - private Task WithHttpRetries(Func taskFactory, Context cacheContext, string message, CancellationToken token) - => WithHttpRetries( - async () => - { - await taskFactory(); - return 0; // we don't care about the result, just that it succeeded - }, - cacheContext, - message, - token); -#pragma warning restore CA1859 // returning Task would be misleading - private Task WithHttpRetries(Func> taskFactory, Context cacheContext, string message, CancellationToken token) { return AsyncHttpRetryHelper.InvokeAsync( diff --git a/src/Common/Caching/CacheClient.cs b/src/Common/Caching/CacheClient.cs index 62a906b..222c1d5 100644 --- a/src/Common/Caching/CacheClient.cs +++ b/src/Common/Caching/CacheClient.cs @@ -28,6 +28,13 @@ namespace Microsoft.MSBuildCache.Caching; +public enum AddNodeResult +{ + Added, + AlreadyExists, + Skipped +} + public abstract class CacheClient : ICacheClient { private static readonly byte[] EmptySelectorOutput = new byte[1]; @@ -122,7 +129,8 @@ protected CacheClient( /* abstract methods for subclasses to implement */ protected abstract Task OpenStreamAsync(Context context, ContentHash contentHash, CancellationToken cancellationToken); - protected abstract Task AddNodeAsync( + /// True if added, false if already exists. Otherwise throws. + protected abstract Task AddNodeAsync( Context context, StrongFingerprint fingerprint, IReadOnlyDictionary outputs, @@ -342,9 +350,7 @@ public async Task AddNodeInternalAsync( StrongFingerprint cacheStrongFingerprint = new(cacheWeakFingerprint, selector); - Tracer.Debug(context, $"StrongFingerprint is {cacheStrongFingerprint} for {nodeContext.Id}"); - - await AddNodeAsync( + AddNodeResult addresult = await AddNodeAsync( context, cacheStrongFingerprint, outputsToCache, @@ -352,6 +358,8 @@ await AddNodeAsync( pathSetBytes, cancellationToken); + Tracer.Debug(context, $"Adding StrongFingerprint {cacheStrongFingerprint} for {nodeContext.Id} resulted in {addresult}."); + if (_localCacheStateManager is not null) { await _localCacheStateManager.WriteStateFileAsync(nodeContext, nodeBuildResult); diff --git a/src/Common/Caching/CasCacheClient.cs b/src/Common/Caching/CasCacheClient.cs index 1277575..f9ca237 100644 --- a/src/Common/Caching/CasCacheClient.cs +++ b/src/Common/Caching/CasCacheClient.cs @@ -111,7 +111,7 @@ protected override async IAsyncEnumerable GetSelectors(Context context protected override Task OpenStreamAsync(Context context, ContentHash contentHash, CancellationToken cancellationToken) => _twoLevelCacheSession.OpenStreamAsync(context, contentHash, cancellationToken); - protected override async Task AddNodeAsync( + protected override async Task AddNodeAsync( Context context, StrongFingerprint fingerprint, IReadOnlyDictionary outputs, @@ -253,6 +253,10 @@ static async Task checkUploadResultsAsync(List> uploadTas throw new CacheException($"{nameof(_twoLevelCacheSession.AddOrGetContentHashListAsync)} failed for {fingerprint}."); } + return contentHashList.Equals(addResult?.ContentHashListWithDeterminism.ContentHashList) + ? AddNodeResult.Added + : AddNodeResult.AlreadyExists; + // TODO dfederm: Handle CHL races }