'Y'

Idiomul ForEachAsync

In trecut am vorbit despre idiomul care iti permite sa procesezi rezultatele in ordinea completarii taskurilor si nu in ordinea lansarii. Dar am trecut peste un aspect important. Sa presupunem ca avem toti acelasi serviciu meteo si vrem sa obtinem rezultatele pentru fiecare oras cat mai repede posibil.

Feb 26, 2016 1744
In trecut am vorbit despre idiomul care iti permite sa procesezi rezultatele in ordinea completarii taskurilor si nu in ordinea lansarii. Dar am trecut peste un aspect important. Sa presupunem ca avem toti acelasi serviciu meteo si vrem sa obtinem rezultatele pentru fiecare oras cat mai repede posibil. Asta inseamna ca trebuie sa facem mii de cereri pentru fiecare oras din lume? Cei care ofera prognoza s-ar putea sa creada ca exista anumite probleme cu clientul care a cerut asta si s-ar putea sa incerce sa limiteze o parte din cererile care depasesc anumite limite (acest tip de limitare este o provocare majora pentru toate serviciile de cloud, atat pentru cei care ofera aceste servicii cat si pentru clienti).

Astfel, ca trebuie sa limitam intr-un fel numarul de cereri sau alte operatiuni asincron.

In general cand vine vorba de limitarea numarului de sarcini apare o problema majora. In cazul operatiunilor care utilizeaza foarte mult CPU-ul (number crunchers, operatiuni care incarca CPU/GPU) exista anumite solutii cunoscute – numarul de sarcini trebuie sa fie limitat in functie de numarul dispozitivelor. Insa in cazul unor operatiuni care afecteaza IO nu exista astfel de restrictii. Mai mult, nu exista instrumente interne de verificare a numarului de astfel de operatiuni.

NOTA

Thread pool, in teorie, ar putea sa ajute in situatiile in care o combinatie de operatiuni care folosesc IO sau CPU sunt executate in sistem. Exista anumite metode care incearca sa descopere numarul optim de sarcini care lucreaza simultan pentru a a asigura un nivel maxim de randament pentru procesarea sarcinilor. Insa aceste metode nu ne ajuta foarte mult atunci cand avem un numar mare de operatiuni care folosesc IO si doar noi stim cand partea de backend va incepe sa dea rateuri din cauza supraincarcarii. Daca vrei sa afli mai multe poti sa gasesti un articol mai detaliat aici - Throttling Concurrency in the CLR 4.0 ThreadPool.

Astfel ca avem nevoie de o metoda cum ar fi ForEachAsync, care sa ia o secventa de elemente si metoda factory pentru lansarea task-ului. Acest lucru va limita si numarul de operatiuni simultane.

ForEachAsync_Idiom_1.jpg

Arata infricosator dar nu este atat de rau!

Exista peste 9000 de metode de a limita numarul de operatiuni simultane. Si prima din lista este semaforul.
Aceasta solutie este in regula si poate fi folosita pentru aceasta sarcina, dar poti alege ceva mai high-level – clasa Partitioner din TPL.

Partitioner.Create (source) returneaza un obiect care poate sa imparta secventa de input pentru procesarea paralela. Algoritmul “divizare” poate sa fie diferit; depinde de tipul de secventa/colectie si nu este de interes pentru situatia noastra. Elementul important este ca partitioner iti permite sa obtii mai multi “iteratori” care pot lucra in paralel, fiecare cu propria secventa de input.

Procesarea paralela este buna, dar avem nevoie de un loc unde putem stoca rezultatele. Pentru asta, la inceputul metodei, dezvoltam o “materializare” a secventei in lista, si un array de obiecte TaskCompletionSource pentru fiecare sarcina viitoare creata. Apoi rulam procesarea paralela pentru fiecare partitie si “adaugam” rezultatele intr-un array TaskCompletionSources pe masura ce devin disponibile.

ForEachAsync_Idiom_2.jpg

Si iata rezultatele operatiunii:

[1:22:09 PM]: Obtinem vremea pentru 'Moscow'
[1:22:09 PM]: Obtinem vremea pentru 'Seattle'
-- Limitarea sarcinilor este activata! Se asteapta finalizarea primei sarcini!
[1:22:10 PM]: Obtinem vremea pentru 'Moscow': 'Temp: 6C'
-- Lansarea urmatoarei sarcini imediat dupa terminarea sarcinii anterioare
[1:22:10 PM]: Obtinem vremea pentru 'New York'
-- Cea mai noua sarcina este finalizata
[1:22:15 PM]: Obtinem vremea pentru 'New York': 'Temp: 8C'
-- Lansam urmatoare sarcina
[1:22:15 PM]: Obtinem vremea pentru 'Kiev'
-- A doua sarcina a fost finalizata
[1:22:16 PM]: Obtinem vremea pentru 'Seattle': 'Temp: 7C'
-- Si acum ultima
[1:22:20 PM]: Obtinem vremea pentru 'Kiev': 'Temp: 4C'

Sub forma de grafic ar arata:

ForEachAsync_Idiom_3.jpg

Se pare ca aceasta solutie nu doar ca limiteaza numarul de operatiuni simultane, dar iti permite sa si procesezi rezultatele in ordinea completarii nu in ordinea lansarii!

Ce sa returnam? Task sau IEnumerable?

Comrade Taub descrie implementarea ForEachAsync in doua parti (
Implementing a simple ForEachAsync si Implementing a simple ForEachAsync, part 2). Dar implementarea sa este oarecum diferita. Principala diferenta este ca metoda ForEachAsync returneaza Task in locul IEnumerable. Aceasta diferenta este importanta:

ForEachAsync_Idiom_4.jpg

Codul este mai scurt (ceea ce este bine) dar se comporta diferit (si nu stim daca este bine sau nu). In primul rand, aceasta abordare functioneaza doar pentru comenzi dar nu functioneaza pentru cereri (command este un mutator, request este un getter). In al doilea rand prezenta unui singur rezultat complica destul de mult procesul de management al erorilor, chiar si in cazul restrictiilor impuse asupra operatiunilor – cum ar fi salvarea datelor.

In acest caz, actiunea de management al erorilor se va opri complet atunci cand are loc o exceptie in fiecare din partitiile existente! Atunci cand are loc prima eroare si await body se termina cu o eroare, bucla while se inchide si Dispose va fi apelat pe obiectul din partitie. Partitionarea se implementeaza cu ajutorul unui idiom stealing work – acest lucru inseamna ca elementele curente vor fi (in mod logic) adaugate la coada pentru a fi procesate de o partitie diferita. Daca procesarea pe o alta partitie esueaza, numarul de handleri va scadea si mai mult. Si acest lucru se va intampla pana cand toate partitiile au erori. In cel mai bun caz vei obtine doar o parte din datele procesate si in cel mai rau caz vei avea probleme legate de eficienta, datorita faptului ca vei avea mai putini task handleri activi decat ai crezut.

Problema poate fi rezolvata prin acumularea de erori si generarea de AggregateException:

ForEachAsync_Idiom_5.jpg

Aceasta abordare este complet operationala desi problema legata de asocierea dintre sarcina si eroarea care a avut loc ramane.

Sergey Teplyakov
Expert in .Net, С++ and Application Architecture

Daca iti place acest articol, distribuie-l si prietenilor tai!




Mai ai intrebari?
Contacteaza-ne.
Thank you.
Your request has been received.