De-cola de elementos con hilos de trabajo

He estado tratando de averiguar cómo resolver un requisito que tengo, pero por mi vida simplemente no puedo encontrar una solución.

Tengo una base de datos de artículos que los almacena en una especie de cola. (La base de datos ya se ha implementado y otros procesos agregarán elementos a esta cola).

Los elementos requieren mucho trabajo / tiempo para “procesarlos”, por lo que debo poder: Desacoplar constantemente los elementos de la base de datos. Para cada elemento, ejecute un nuevo subproceso y procese el elemento y luego devuelva verdadero / falso si se procesó correctamente. (Esto se usará para volver a agregarlo a la cola de la base de datos o no)

Pero solo para hacer esto mientras el número actual de subprocesos activos (uno por elemento que se procesa) es menor que el número máximo de parámetros de subprocesos.

Una vez que se ha alcanzado el número máximo de subprocesos, necesito detener la eliminación de elementos de la cola de la base de datos hasta que el número actual de subprocesos sea inferior al número máximo de subprocesos. En qué momento debe continuar la eliminación de elementos en cola.

Siento que esto debería ser algo que se me ocurra, pero simplemente no viene a mí.

Para aclarar: solo necesito implementar el threading. La base de datos ya ha sido implementada.

Una forma realmente fácil de hacer esto es con un Semaphore . Tienes un hilo que saca de la cola los elementos y crea hilos para procesarlos. Por ejemplo:

 const int MaxThreads = 4; Semaphore sem = new Semaphore(MaxThreads, MaxThreads); while (Queue.HasItems()) { sem.WaitOne(); var item = Queue.Dequeue(); Threadpool.QueueUserWorkItem(ProcessItem, item); // see below } // When the queue is empty, you have to wait for all processing // threads to complete. // If you can acquire the semaphore MaxThreads times, all workers are done int count = 0; while (count < MaxThreads) { sem.WaitOne(); ++count; } // the code to process an item void ProcessItem(object item) { // cast the item to whatever type you need, // and process it. // when done processing, release the semaphore sem.Release(); } 

La técnica anterior funciona bastante bien. Es fácil de codificar, fácil de entender y muy efectivo.

Un cambio es que es posible que desee utilizar la API de Task lugar de Threadpool.QueueUserWorkItem . Task le da más control sobre el procesamiento asíncrono, incluida la cancelación. Utilicé QueueUserWorkItem en mi ejemplo porque estoy más familiarizado con él. Yo usaría Task en un progtwig de producción.

Aunque esto utiliza hilos N + 1 (donde N es el número de elementos que desea procesar simultáneamente), ese hilo adicional no suele hacer nada. La única vez que se ejecuta es cuando asigna trabajo a subprocesos de trabajo. De lo contrario, está haciendo una espera no ocupada en el semáforo.

¿No sabes por dónde empezar?

Considere un grupo de hilos con un número máximo de hilos. http://msdn.microsoft.com/en-us/library/y5htx827.aspx

Considere girar su número máximo de subprocesos inmediatamente y monitorear la base de datos. http://msdn.microsoft.com/en-us/library/system.threading.threadpool.queueuserworkitem.aspx es conveniente.

Recuerde que no puede garantizar que su proceso finalice de manera segura … los accidentes ocurren. Considere el registro del estado de procesamiento.

Recuerde que las operaciones de selección y eliminación de la cola deben ser atómicas.

Ok, entonces la architecture de la solución dependerá de una cosa: ¿el tiempo de procesamiento por elemento de la cola varía según los datos del elemento?

Si no, entonces puedes tener algo que simplemente redondea los hilos entre los hilos de procesamiento. Esto será bastante simple de implementar.

Si el tiempo de procesamiento varía, entonces va a necesitar algo con una sensación de “próxima disponibilidad”, de modo que cualquiera de sus hilos sea libre primero reciba la tarea de procesar el elemento de datos.

Una vez que lo haya resuelto, tendrá la oportunidad habitual de sincronizar entre un lector de cola y los subprocesos de procesamiento. La diferencia entre ‘next-available’ y ’round-robin’ es cómo haces esa sincronización.

No estoy demasiado familiarizado con C #, pero he oído hablar de una bestia llamada un trabajador de fondo. Es probable que sea un medio aceptable para lograr esto.

Para round robin, simplemente inicie un trabajador de fondo por elemento de la cola, almacenando las referencias de los trabajadores en una matriz. Limítese a, digamos, 16 trabajadores en proceso en proceso. La idea es que habiendo comenzado 16, esperarías a que se completara el primero antes de comenzar el 17, y así sucesivamente. Creo que los trabajadores en segundo plano realmente se ejecutan como trabajos en el grupo de subprocesos, por lo que limitará automáticamente la cantidad de subprocesos que se ejecutan en cualquier momento a algo apropiado para el hardware subyacente. Para esperar a un trabajador de fondo ver esto . Habiendo esperado a que un trabajador de fondo completara, usted manejaría su resultado y comenzaría otro.

Para el próximo enfoque disponible no es tan diferente. En lugar de esperar a que se complete el primero, utilizaría WaitAny () para esperar a que finalice cualquiera de los trabajadores. Usted maneja la devolución de la que haya completado, y luego inicia otra y vuelve a WaitAny ().

La filosofía general de ambos enfoques es mantener un número de hilos en ebullición todo el tiempo. Una característica del siguiente enfoque disponible es que el orden en el que se emiten los resultados no es necesariamente el mismo que el de los elementos de entrada. Si eso importa, entonces el enfoque de round robin con más trabajadores de fondo que núcleos de CPU será razonablemente eficiente (el conjunto de subprocesos simplemente comenzará a comisionarse pero aún no ejecutará trabajadores) Sin embargo, la latencia variará con el tiempo de procesamiento.

BTW 16 es un número arbitrario elegido sobre la base de cuántos núcleos crees que habrá en la PC que ejecuta el software. Más núcleos, mayor número.

Por supuesto, en el mundo aparentemente inquieto y siempre cambiante de .NET ahora puede haber una mejor manera de hacerlo.

¡Buena suerte!