Disclaimer: los métodos y ejemplos de código de este artículo son fruto de mi propia investigación y aprendizaje autónomo, y en ningún caso están preparados para su utilización en código real de producción. El autor no se hace responsable del uso de estos ejemplos. Para más información sobre programación paralela y asíncrona y sus connotaciones, recomiendo acceder a la documentación oficial: https://docs.microsoft.com/es-es/dotnet/standard/parallel-processing-and-concurrency 

Este post forma parte de una serie de 3 artículos sobre mejoras de rendimiento utilizando programación paralela:

1. Mejorar el rendimiento con funciones asíncronas para la ejecución de procesos en paralelo
2. Utilizar ConcurrentBag para almacenar el resultado de métodos asíncronos (Este artículo)
3. Programación paralela en C# con la clase Parallel

 

Al hilo del post anterior sobre ejecución paralela de tareas, voy a ampliar el ejemplo para demostrar como podemos conseguir que diferentes procesos en paralelo puedan combinar el resultado en un mismo objeto, sin que la ejecución asíncrona sea un problema. En este caso, vamos a obtener los resultados del procesamiento en una misma lista de datos.

El código fuente de ejemplo mostrado lo podéis encontrar en Github: https://github.com/sgisbert/parallelization

 

Punto de partida

En el post anterior conseguimos reducir el tiempo de procesamiento de 2 segundos a 0,6 segundos, aunque en realidad nuestra función no devolvía ningún resultado. Para este ejemplo, vamos a modificarla para que devuelva un número aleatorio entre 1 y 100, mientras mantenemos el retardo para ilustrar el tiempo de ejecución.

Modificamos el método Process() para devolver un entero (ver archivo completo en Github):

private static async Task<int> Process(int id)
{
    var number = await Task<int>.Run(() =>
    {
        Stopwatch timer = new Stopwatch();
        timer.Start();
        Random random = new Random();
        Thread.Sleep(200);

        int number = random.Next(1,10);

        Console.WriteLine($"Process {id}: {timer.Elapsed}");
        return number;
    });
    return number;
}

Y añadimos el código necesario para recoger los resultados en el hilo principal:

List<Task<int>> tasks = new List<Task<int>>();
for (int i = 0; i < 10; i++)
{
    tasks.Add(Process(i));
}
Task.WaitAll(tasks.ToArray());

List<int> results = new List<int>();
foreach (var task in tasks)
{
    results.Add(task.Result);
}

Notar cómo al cambiar el tipo devuelto por Process()Task<int>, también tenemos que cambiar el tipo de datos de la lista donde guardamos las tareas que se van ejecutando a List<Task<int>>, para poder tener acceso al resultado posteriomente mediante task.Result.

El resultado de la ejecución es el siguiente:

Process 0: 00:00:00.2077483
Process 1: 00:00:00.2076874
Process 3: 00:00:00.2074275
Process 2: 00:00:00.2057897
Process 7: 00:00:00.2097799
Process 6: 00:00:00.2101881
Process 5: 00:00:00.2103866
Process 4: 00:00:00.2109039
Process 9: 00:00:00.2004370
Process 8: 00:00:00.2006408

Result: 76,2,79,86,12,45,27,17,55,5
Completed: 00:00:00.6971794

En este ejemplo, hemos tenido que procesar todos los resultados a posteriori, una vez todos los procesos han finalizado la ejecución. Para este ejemplo sencillo, este segundo bucle foreach ha incrementado el tiempo en casi una décima, pasando de los 0,6s originales a casi 0,7s. Imaginemos un caso complejo, donde hubiera que analizar cientos o miles de resultados, y es posible que estuviéramos perdiendo gran parte del rendimiento conseguido con la paralelización del proceso.

 

Devolver el resultado ya procesado

Para no tener que recuperar los resultados de los procesos asíncronos en un bucle posterior, podemos hacer uso listas concurrentes que son thread-safe, como ConcurrentBag<T>. También existen versiones concurrentes de Dictionary, Queue y Stack. Con este tipo de datos, cada proceso podrá añadir su resultado directamente a la ConcurrentBag, de manera que al volver al hilo principal, ya no será necesario recorrer todas las tareas para obtener los resultados.

Para ello, añadimos la ConcurrentBag como parámetro a nuestro método Process() (ver archivo completo en Github):

private static async Task Process(int id, ConcurrentBag<int> cb)
{
    await Task.Run(() =>
    {
        Stopwatch timer = new Stopwatch();
        timer.Start();
        Random random = new Random();
        Thread.Sleep(200);

        int number = random.Next(1,100);
        cb.Add(number);

        Console.WriteLine($"Process {id}: {timer.Elapsed}");
    });
}

Con lo que ya no es necesario devolver un Task<int>, ni volver a procesar los resultados en el hilo principal, sino que podemos hacer uso directamente de los resultados que tenemos en la ConcurrentBag:

static async Task Main(string[] args)
{
    ConcurrentBag<int> cb = new ConcurrentBag<int>();
    Stopwatch timer = new Stopwatch();
    timer.Start();

    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 10; i++)
    {
        tasks.Add(Process(i, cb));
    }
    Task.WaitAll(tasks.ToArray());

    Console.WriteLine();
    Console.WriteLine($"Result: {string.Join(",", cb)}");
    Console.WriteLine($"Completed: {timer.Elapsed}");
}

Volviendo a los resultados:

Process 1: 00:00:00.2029103
Process 2: 00:00:00.2028785
Process 0: 00:00:00.2029188
Process 3: 00:00:00.2028485
Process 5: 00:00:00.2007397
Process 7: 00:00:00.2007120
Process 6: 00:00:00.2007211
Process 4: 00:00:00.2008456
Process 9: 00:00:00.2001557
Process 8: 00:00:00.2007588

Result: 82,83,88,99,57,71,13,54,18,40
Completed: 00:00:00.6276547

de nuevo comprobamos que estamos más cerca de los 0,6 segundos que antes, al eliminar el post procesado innecesario de los resultados.

 

Conclusiones

  • Es posible ejecutar procesos en paralelo y obtener los resultados agrupados en un mismo objeto. Esto nos evitará un post procesado posterios de las tareas que añade tiempo innecesario de ejecución.
  • Existen tipos de datos thread-safe que podemos utilizar para almacenar resultados de tareas asíncronas, como ConcurrentBagConcurrentDictionaryConcurrentQueueConcurrentStack. Aprender a usarlos nos abre nuevas opciones de mejora de nuestros procesos.