Limitar el número de subprocesos en la biblioteca paralela de tareas

Tengo algunos cientos de archivos que necesito cargar en Azure Blob Storage.
Quiero usar la biblioteca de tareas paralelas.
Pero en lugar de ejecutar todos los 100 subprocesos para cargar en un foreach en la lista de archivos, ¿cómo puedo poner un límite al número máximo de subprocesos que puede usar y terminar el trabajo en paralelo? ¿O equilibra las cosas automáticamente?

No deberías usar hilos para esto en absoluto. Hay una API basada en Task para esto, que es naturalmente asíncrona: CloudBlockBlob.UploadFromFileAsync . Úselo con async/await y SemaphoreSlim para acelerar el número de subidas paralelas.

Ejemplo (no probado):

 const MAX_PARALLEL_UPLOADS = 5; async Task UploadFiles() { var files = new List(); // ... add files to the list // init the blob block and // upload files asynchronously using (var blobBlock = new CloudBlockBlob(url, credentials)) using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS)) { var tasks = files.Select(async(filename) => { await semaphore.WaitAsync(); try { await blobBlock.UploadFromFileAsync(filename, FileMode.Create); } finally { semaphore.Release(); } }).ToArray(); await Task.WhenAll(tasks); } } 

¿Intentaste usar MaxDegreeOfParallelism? Me gusta esto:

 System.Threading.Tasks.Parallel.Invoke( new Tasks.ParallelOptions {MaxDegreeOfParallelism = 5 }, actionsArray) 

Esencialmente, querrá crear una Acción o Tarea para cada archivo a cargar, colocarlos en una Lista y luego procesar esa lista, limitando el número que puede procesarse en paralelo.

La publicación de mi blog muestra cómo hacer esto con Tareas y con Acciones, y proporciona un proyecto de muestra que puede descargar y ejecutar para ver ambas en acción.

Con acciones

Si usa Acciones, puede usar la función incorporada .Net Parallel.Invoke. Aquí lo limitamos a correr como máximo 5 hilos en paralelo.

 var listOfActions = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 5}; Parallel.Invoke(options, listOfActions.ToArray()); 

Sin embargo, esta opción no hace uso de la naturaleza asíncrona de UploadFromFileAsync, por lo que es posible que desee utilizar el siguiente ejemplo de tarea.

Con tareas

Con Tareas no hay función incorporada. Sin embargo, puedes usar el que proporciono en mi blog.

  ///  /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } ///  /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } } 

Y luego, creando su lista de tareas y llamando a la función para que se ejecuten, con un máximo de 5 simultáneas a la vez, puede hacer esto:

 var listOfTasks = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 5); 

Puedes averiguarlo ejecutando esto:

 class Program { static void Main(string[] args) { var list = new List(); for (int i = 0; i < 100; i++) { list.Add(i); } var runningIndex = 0; Task.Factory.StartNew(() => Action(ref runningIndex)); Parallel.ForEach(list, i => { runningIndex ++; Console.WriteLine(i); Thread.Sleep(3000); }); Console.ReadKey(); } private static void Action(ref int number) { while (true) { Console.WriteLine("worked through {0}", number); Thread.Sleep(2900); } } } 

Como puede ver, la cantidad de paralelismo es más pequeña al principio, se hace más grande y se hace más pequeña hacia el final. Así que definitivamente hay algún tipo de optimización automática en marcha.