cómo utilizar el rendimiento para devolver la colección de artículo en bloque paralelo o tarea

Busco ayuda sobre cómo hacer uso de la palabra clave yield para devolver IEnumberable en bloques paralelos o bloque de tareas. A continuación se muestra el pseudo código

public IEnumerable<List> ReadFile( ) { foreach (string filepath in lstOfFiles) { var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); foreach (var item in ReadStream(stream)) yield return item; //where item is of type List } } 

Quiero convertir el código anterior al bloque paralelo como abajo

 lstOfFiles.AsParallel() .ForAll(filepath => { var stream = new FileStream(filepath , FileMode.Open, FileAccess.Read); foreach (var item in ReadStream(Stream)) yield return item; }); 

pero el comstackdor arroja el error de que el rendimiento no se puede utilizar en bloques paralelos o delegates anónimos. Intenté con el bloque de tareas también, el rendimiento no está permitido en la tarea delegado anónimo

Cualquiera me sugiere una forma simple y mejor de obtener rendimiento para devolver la recostackción de datos en bloques o tareas paralelos.

Leí que RX 2.0 o TPL son buenos para usar en el escenario anterior. Tengo una duda sobre si utilizar la biblioteca RX o TPL para obtener un rendimiento asíncrono del rendimiento de los valores. ¿Puede alguien sugerirme cuál es mejor Rx o TPL.

Si utilizo Rx, ¿es necesario crear suscripciones y convertir el bloque paralelo AsObservable?

Parece que quieres usar SelectMany . No puede utilizar el yield en un método anónimo, pero puede dividirlo en un nuevo método, como por ejemplo:

 IEnumerable items = lstOfFiles.AsParallel() .SelectMany(( filepath ) => ReadItems(filepath)); IEnumerable ReadItems(string filePath) { using(var Stream = new FileStream(filePath, FileMode.Open, FileAccess.Read)) { foreach (var item in ReadStream(Stream)) yield return item; } } 

Para usar Rx, tendrás que usar IObservable lugar de IEnumerable .

 public IObservable ReadFiles() { return from filepath in lstOfFiles.ToObservable() from item in Observable.Using(() => File.OpenRead(filepath), ReadStream) select item; } 

Cada vez que llame a Subscribe en el observable devuelto por ReadFiles , iterará sobre todas las cadenas en lstOfFiles y, en paralelo *, leerá cada flujo de archivos.

Secuencialmente, la consulta abre cada flujo de archivos y lo pasa a ReadStream , que es responsable de generar la secuencia asíncrona de elementos para un flujo dado.

La consulta ReadFiles , que utiliza el operador SelectMany escrito en la syntax de comprensión de consulta, combina cada “elemento” generado por todos los observables ReadStream en una única secuencia observable, respetando la asincronía de la fuente.

Debería considerar la posibilidad de escribir un iterador asíncrono para su método ReadStream como se muestra aquí; de lo contrario, si debe devolver IEnumerable , deberá convertirlo aplicando el ToObservable(scheduler) con un planificador de introducción de concurrencia, que puede ser menos eficiente.

 public IObservable ReadStream(Stream stream) { return Observable.Create(async (observer, cancel) => { // Here's one example of reading a stream with fixed item lengths. var buffer = new byte[itemLength]; // TODO: Define itemLength var remainder = itemLength; int read; do { read = await stream.ReadAsync(buffer, itemLength - remainder, remainder, cancel) .ConfigureAwait(false); remainder -= read; if (read == 0) { if (remainder < itemLength) { throw new InvalidOperationException("End of stream unexpected."); } else { break; } } else if (remainder == 0) { observer.OnNext(ReadItem(buffer)); // TODO: Define ReadItem remainder = itemLength; } } while (true); }); } 

* Rx no introduce ninguna concurrencia aquí. La paralelización es simplemente un resultado de la naturaleza asíncrona de la API subyacente, por lo que es muy eficiente. La lectura de un flujo de archivos de forma asíncrona puede hacer que Windows use un puerto de finalización de E / S como una optimización, notificando en un subproceso agrupado cuando cada búfer esté disponible. Esto garantiza que Windows sea totalmente responsable de progtwigr las devoluciones de llamada a su aplicación, en lugar de a la TPL o a usted mismo.

Rx es un hilo libre, por lo que cada notificación a su observador puede estar en un hilo agrupado diferente; sin embargo, debido al contrato de serialización de Rx (§4.2 Pautas de diseño de Rx ), no recibirá notificaciones superpuestas en su observador cuando llame a Subscribe , por lo que no es necesario proporcionar una sincronización explícita, como el locking.

Sin embargo, debido a la naturaleza paralelizada de esta consulta, puede observar notificaciones alternativas con respecto a cada archivo, pero nunca las notificaciones superpuestas.

Si prefiere recibir todos los elementos para un archivo dado a la vez, como se indicó en su pregunta, entonces simplemente puede aplicar el operador de la ToList a la consulta y cambiar el tipo de devolución:

 public IObservable> ReadFiles() { return from filepath in lstOfFiles.ToObservable() from items in Observable.Using(() => File.OpenRead(filepath), ReadStream) .ToList() select items; } 

Si necesita observar notificaciones con afinidad de subprocesos (en un subproceso de la GUI, por ejemplo), debe reunir las notificaciones porque llegarán a un subproceso agrupado. Como esta consulta no presenta la concurrencia en sí misma, la mejor manera de lograrlo es aplicar el operador ObserveOnDispatcher (WPF, Store Apps, Phone, Silverlight) o la ObserveOn(SynchronizationContext) (WinForms, ASP.NET, etc.). Simplemente no olvide agregar una referencia al paquete NuGet apropiado para la plataforma apropiada; por ejemplo, Rx-Wpf, Rx-WinForms, Rx-WindowsStore, etc.

Puede tener la tentación de convertir lo observable nuevamente en un IEnumerable lugar de llamar Subscribe . No hagas esto. En la mayoría de los casos es innecesario, puede ser ineficiente y, en el peor de los casos, podría causar lockings muertos. Una vez que entras en el mundo de la asincronía, debes intentar permanecer en él. Esto no solo es cierto para Rx sino también para async/await .