Cum sa interogam Kafka Streaming Data?

Daca ar exista o metoda de a oferi analistilor un layer SQL peste Kafka Streams atunci cand facem Streaming Data.

Mar 7, 2022 256
In unul din proiectele noastre, am avut de-a face cu o situatie unde echipa de analisti aveam nevoie sa lucreze cu Streaming Data insa nu aveau competente de programare. Erau insa familiari cu interogarile SQL. Asa ca ne-am uitat la cum sa le oferim acestora un layer de SQL peste Kafka Streams.

KSQL este un streaming SQL engine pentru Kfk, care ofera o interfata SQL interactiva care ne permite sa scriem wer strem ressing queries fara a fi nevoiti sa scriem cod. KSQL este util in special la detectarea fraudelor si aplicatii in timp real.

KSQL ofera stream processing scalabil si distribuit care include si ggregtins, jins, windwing si multe altele. Mai mult decat atat, spre deosebire de SQL, care ruleaza pe o baza de date sau un bth ressing system, rezultatele unei interogari KSQL sunt continue. Inainte sa incepe sa discutam despre scrierea de streming queries, sa luam cateva minute pentru revizui cateva concepte fundamentale ale KSQL.


KSQL Strems si Tbles

Un event strem este un unbunded strem de evenimente individuale si independente, in timp ce udte sau rerd strem este un stream de actualizari la nivelul inregistrarilor anterioare cu aceeasi cheie.

KSQL are un concept similar de interogare a unui Strem sau Tble. Unde un Stream este o serie infinita de evenimente sau date, dar care sunt imutabile. Insa cu o interogare pe un Table datele pot sa fie actualizare sau chiar sterse.
Desi unele terminologii s-ar putea sa fie diferite, conceptele sunt cat de cat asemanatoare, si daca esti confortabil cu Kfk Strems, te vei obisnui cu rapid KSQL.


Arhitectura KSQL

KSQL foloseste Kfk Strems pentru a construi si prelua rezultatele interogarii. KSQL este format din doua componente, KSQL LI si KSQL server. Cei care utilizeaza instrumentele standard SQL precum MySql, rle, sau chiar si Hive se vor simti foarte ok cu LI cand scriu interogari in KSQL. Si mai bine, KSQL este en-sure (he 2.0 liensed).

LI este de asemenea si clientul care se conecteaza cu KSQL Server. Serverul KSQL este responsabil cu procesarea interogarilor si preluarea datelor din Kfk, precum si scrierea rezultatelor in Kfk.

KSQL ruleaza in doua moduri. De sine statator, util pentru prototipare si modul dezvoltare sau distribuit care este modul in care ai folosi KSQL cand lucrezi intr-un mediu cu date mai realist.
Pe cat de interesant este KSQL si ce promite sa faca pentru SQL si stremingul de dte, la momentul scrierii acestui articol, KSQL este considerat in modul develer review si nu este recomandat sa ruleze impreuna cu clustere din productie.

Listing 1. Cum pornim KSQL in ll mde

./bin/ksql-cli local

Dupa ce rulam comanda de mai sus ar trebui sa vedem ceva asemanator consolei de mai jos:

KSQL in ll mde.png



Crearea unui KSQL Strem

Astfel ca acum daca trebuie sa facem niste modificari la aplicatie, pornim consola KSQL si il lasam pe analist sa reconstruiasc aplicatia ca o formulare SQL!

Exemplul pe care il folosi ca sa aratam cum convertim este ultimul windwed strem din exemplul interogarilor interactive pe care il gasim in

sr/min/jv/bbejek/hter_9/StkerfrmneIntertiveQuerylitin.jv frm lines 96103.

In respectiva aplicatie, urmaresti numarul de actiuni vandute la fiecare 10 secunde de simbolul aferent unei companii.

Deja ai subiectul definit (acesta este mapat pe un tabel dintr-o baza de date) si ai un mdel bjet StkTrnstin unde campurile obiectului sunt mapate pe coloanele unui tabel. Chiar daca subiectul este definit, trebuie sa inregistram aceasta informatie cu KSQL folosind RETE STREM:

Listing 2. Crearea unui Strem fund

SQL fr erfrming stk nlysis.png

  1. CREATE STREAM statement a numit stock_txn_stream
  2. Inregistrarea campurilor obiectului StockTransaction ca coloane
  3. Specificarea formatului datelor si subiectului Kafka care va functiona ca sursa pentru stream (ambii sunt parametri necesari)

Cu aceasta declaratie creezi o instanta a KSQL Strem pe care o poti folosi pentru interogare. In clauza WITH vei observa doi parametri necesari VLUE_FRMT care ii spune lui KSQL care este formatul datelor si parametrul KFK_TI, care ii spune lui KSQL de unde sa ia datele.

Mai sunt inca doi parametri aditionali pe care ii poti folosi cu clauza WITH atunci cand creezi un stream. Unul este TIMESTM care asociaza timestampul mesajului cu o coloana din KSQL Strem. ertiunile care necesita timestm, cum ar fi windwing, folosesc aceasta coloana pentru a procesa inregistrarile.

Celalalt parametru este KEY care asociaza cheia mesajului cu o coloana pe defined strem. In cazul nostru messge key pentru subiectul tranzactii actiuni se potriveste cu campul simbol din valoarea JSN, si nu trebuie sa specificam cheia.

Dar daca nu era asta cazul, atunci ar fi trebui sa mapam cheia pe o anumita coloana deoarece vei avea nevoie mereu de o cheie pentru a efectua operatiuni de grupare, pe care le vei vedea cand executam streamul SQL intr-un articol urmator.

In ceea ce priveste o lista a comenzilor KSQL o vei vedea pe brokerul spre care KSQL LI arata si daca subiectele sunt inregistrate sau nu.

Dupa ce ti-ai creat un nou stream poti sa vezi toate stream-urile si sa verifici daca KSQL a creat noul stream asa cum te asteptai folosind urmatoarele comenzi:

Listing 3 Listarea tuturor stream-urilor si descrierea stream-ului pe care tocmai l-ai creat

show streams;
describestock_txn_stream;

Rezultatele folosirii acestei comenzi iti ofera informatiile de mai jos:

Listing ll Strems.png


Vei observa doua coloane in plus RWTIME si RWKEY care au fost inserate de. Coloana RWTIME este timestm-ul pus pe mesaj (fie de la producator, fie de la broker), si RWKEY este cheia (daca exista) mesajului. Acum ca ai creat streamul, putem rula interogarea noastra.

Articolul original poate fi gasit aici.

Descopera calendarul nostru de cursuri.

Siddharth Garg
Software Development Engineer

Daca iti place acest articol, distribuie-l si prietenilor tai!




Mai ai intrebari?
Contacteaza-ne.
Thank you.
Your request has been received.