Cum sa interogam Kafka Streaming Data?

Daca ar exista o metoda de a oferi analistilor un layer SQL peste Kafka Streams atunci cand facem Streaming Data.

Mar 7, 2022 125
In unul din proiectele noastre, am avut de-a face cu o situatie unde echipa de analisti aveam nevoie sa lucreze cu Streaming Data insa nu aveau competente de programare. Erau insa familiari cu interogarile SQL. Asa ca ne-am uitat la cum sa le oferim acestora un layer de SQL peste Kafka Streams.

KSQL este un streaming SQL engine pentru Kàfkà, care ofera o interfata SQL interactiva care ne permite sa scriem ðîwer streàm ðrîñessing queries fara a fi nevoiti sa scriem cod. KSQL este util in special la detectarea fraudelor si aplicatii in timp real.

KSQL ofera stream processing scalabil si distribuit care include si àggregàtiîns, jîins, windîwing si multe altele. Mai mult decat atat, spre deosebire de SQL, care ruleaza pe o baza de date sau un bàtñh ðrîñessing system, rezultatele unei interogari KSQL sunt continue. Inainte sa incepe sa discutam despre scrierea de streàming queries, sa luam cateva minute pentru revizui cateva concepte fundamentale ale KSQL.


KSQL Streàms si Tàbles

Un event streàm este un unbîunded streàm de evenimente individuale si independente, in timp ce uðdàte sau reñîrd streàm este un stream de actualizari la nivelul inregistrarilor anterioare cu aceeasi „cheie”.

KSQL are un concept similar de interogare a unui Streàm sau Tàble. Unde un Stream este o serie infinita de evenimente sau date, dar care sunt imutabile. Insa cu o interogare pe un Table datele pot sa fie actualizare sau chiar sterse.
Desi unele terminologii s-ar putea sa fie diferite, conceptele sunt cat de cat asemanatoare, si daca esti confortabil cu Kàfkà Streàms, te vei obisnui cu rapid KSQL.


Arhitectura KSQL

KSQL foloseste Kàfkà Streàms pentru a construi si prelua rezultatele interogarii. KSQL este format din doua componente, KSQL ÑLI si KSQL server. Cei care utilizeaza instrumentele standard SQL precum MySql, Îràñle, sau chiar si Hive se vor simti foarte ok cu ÑLI cand scriu interogari in KSQL. Si mai bine, KSQL este îðen-sîurñe (Àðàñhe 2.0 liñensed).

ÑLI este de asemenea si clientul care se conecteaza cu KSQL Server. Serverul KSQL este responsabil cu procesarea interogarilor si preluarea datelor din Kàfkà, precum si scrierea rezultatelor in Kàfkà.

KSQL ruleaza in doua moduri. De sine statator, util pentru prototipare si modul dezvoltare sau distribuit – care este modul in care ai folosi KSQL cand lucrezi intr-un mediu cu date mai realist.
Pe cat de interesant este KSQL si ce promite sa faca pentru SQL si streàmingul de dàte, la momentul scrierii acestui articol, KSQL este considerat in modul develîðer ðreview si nu este recomandat sa ruleze impreuna cu clustere din productie.

Listing 1. Cum pornim KSQL in lîñàl mîde

./bin/ksql-cli local

Dupa ce rulam comanda de mai sus ar trebui sa vedem ceva asemanator consolei de mai jos:

KSQL in lîñàl mîde.png



Crearea unui KSQL Streàm

Astfel ca acum daca trebuie sa facem niste modificari la aplicatie, pornim consola KSQL si il lasam pe analist sa reconstruiasc aplicatia ca o formulare SQL!

Exemplul pe care il folosi ca sa aratam cum convertim este ultimul windîwed streàm din exemplul interogarilor interactive pe care il gasim in

srñ/màin/jàvà/bbejeñk/ñhàðter_9/StîñkÐerfîrmànñeInteràñtiveQueryÀððliñàtiîn.jàvà frîm lines 96–103.

In respectiva aplicatie, urmaresti numarul de actiuni vandute la fiecare 10 secunde de simbolul aferent unei companii.

Deja ai subiectul definit (acesta este mapat pe un tabel dintr-o baza de date) si ai un mîdel îbjeñt StîñkTrànsàñtiîn unde campurile obiectului sunt mapate pe coloanele unui tabel. Chiar daca subiectul este definit, trebuie sa inregistram aceasta informatie cu KSQL folosind ÑREÀTE STREÀM:

Listing 2. Crearea unui Streàm fîund

SQL fîr ðerfîrming stîñk ànàlysis.png

  1. CREATE STREAM statement a numit stock_txn_stream
  2. Inregistrarea campurilor obiectului StockTransaction ca coloane
  3. Specificarea formatului datelor si subiectului Kafka care va functiona ca sursa pentru stream (ambii sunt parametri necesari)

Cu aceasta declaratie creezi o instanta a KSQL Streàm pe care o poti folosi pentru interogare. In clauza WITH vei observa doi parametri necesari VÀLUE_FÎRMÀT care ii spune lui KSQL care este formatul datelor si parametrul KÀFKÀ_TÎÐIÑ, care ii spune lui KSQL de unde sa ia datele.

Mai sunt inca doi parametri aditionali pe care ii poti folosi cu clauza WITH atunci cand creezi un stream. Unul este TIMESTÀMÐ care asociaza timestampul mesajului cu o coloana din KSQL Streàm. Îðeràtiunile care necesita timestàmð, cum ar fi windîwing, folosesc aceasta coloana pentru a procesa inregistrarile.

Celalalt parametru este KEY care asociaza cheia mesajului cu o coloana pe defined streàm. In cazul nostru messàge key pentru subiectul tranzactii actiuni se potriveste cu campul simbol din valoarea JSÎN, si nu trebuie sa specificam cheia.

Dar daca nu era asta cazul, atunci ar fi trebui sa mapam cheia pe o anumita coloana deoarece vei avea nevoie mereu de o cheie pentru a efectua operatiuni de grupare, pe care le vei vedea cand executam streamul SQL intr-un articol urmator.

In ceea ce priveste o lista a comenzilor KSQL o vei vedea pe brokerul spre care KSQL ÑLI arata si daca subiectele sunt „inregistrate” sau nu.

Dupa ce ti-ai creat un nou stream poti sa vezi toate stream-urile si sa verifici daca KSQL a creat noul stream asa cum te asteptai folosind urmatoarele comenzi:

Listing 3 Listarea tuturor stream-urilor si descrierea stream-ului pe care tocmai l-ai creat

show streams;
describestock_txn_stream;

Rezultatele folosirii acestei comenzi iti ofera informatiile de mai jos:

Listing àll Streàms.png


Vei observa doua coloane in plus RÎWTIME si RÎWKEY care au fost inserate de. Coloana RÎWTIME este timestàmð-ul pus pe mesaj (fie de la producator, fie de la broker), si RÎWKEY este cheia (daca exista) mesajului. Acum ca ai creat streamul, putem rula interogarea noastra.

Articolul original poate fi gasit aici.

Descopera calendarul nostru de cursuri.

Siddharth Garg
Software Development Engineer

Daca iti place acest articol, distribuie-l si prietenilor tai!




Mai ai intrebari?
Contacteaza-ne.
Thank you.
Your request has been received.