Kategorie
coding

W 5 minut – Kafka

W branży tak zmiennej jak IT, szybkie przyswajanie wiedzy jest niezwykle istotne. Każdy, w mniejszym lub większym stopniu jest do tego motywowany, zarówno ze względu na „konkurencję” jak i szersze spojrzenie na problemy jakie rozwiązujemy. Jednym z tematów już nie tak nowym ale ciągle niezwykle aktualnych jest integracja pomiędzy systemami/serwisami. Jest wiele narzędzi, które pozwolą nam to osiągnąć i robić to dobrze, a jednym z takich narzędzi jest Kafka. Natomiast trzeba też przyznać, że nazwać Kafkę zaledwie brokerem wiadomości to powiedzieć mało.

Czym jest Kafka?

Przekazywanie danych czy informacji pomiędzy fragmentami systemu/serwisami jest typowym „dniem jak codzień” w świecie IT. Wokół ecosystemu javy, w szczególności w „dużych i poważnych” organizacjach, bardzo mocno zadomowił się w tej niszy JMS (Java Messaging System). Dodając do tego relacyjną bazę danych Oracle otrzymujemy lekką ręką przepis na większość systemów bankowych jakie te firmy wytworzyły i które wspierały ich biznes przez lata. Nie ma w tym naturalnie nic złego, ale wraz z postępem informatyzacji, powstaje naturalne zapotrzebowanie na bardziej skalowalne rozwiązania, większą elastyczność i naturalnie większa przepustowość danych.

Tutaj wkracza Kafka, którą można postrzegać jako zamiennik dla JMSa, ale jest to spore uproszczenie. Sami twórcy tego narzędzia nie nazywają Kafki brokerem wiadomości, a raczej bliższe jest postrzeganie go jako platforma do strumieniowego przekazywania zdarzeń (event streaming platform), a co za tym idzie, proponują nieco inny model przepływu danych. Nie będę dzielił włosa na czworo czy zdarzenie jest zasadniczo różne od wiadomości, bo i tu i tu myślimy o przekazywaniu pewnego zestawu informacji w ustalonym formacie (np. json), różnica tutaj jest nieco semantyczna. Sposób działania systemu natomiast ma dość dostrzegalne różnice. Systemy oparte o strumienie danych reprezentowanych jako zdarzenia, nie zamykają się na prostych kalkulacjach w ramach jednej wiadomości, jak to ma miejsce przy typowym modelu kolejkowym. Ewolucja systemu i jego stanu przebiega w bardziej ciągły sposób, przetwarzając wiele strumieni w tym samym momencie. Sposób tworzenia i jak i ich odczytu zdarzeń w Kafce ukrywa się za podobną abstrakcją jaka funkcjonuje w JMSie, mianowicie używany jest topic. Mając go, możemy dokonać publikacji (zapisu) lub też subskrybować (odczytywać) wszystkie zdarzenia jakie w jego ramach się pojawiają i następnie je przetwarzać zgodnie z ich naturą.

Pojedyńcze wiadomości w tradycyjnym brokerze po udanym przetwarzaniu zostają usuwane z kolejki bądź topicu. Kafka natomiast tworzy specjalny dziennik zdarzeń (commit log) w ramach jednego topicu wraz z zadanym oknem czasowym, jak długo ów zdarzenia mają być osiągalne i dostępne dla kolejnych konsumentów. Pozwala to na reagowanie i przetwarzanie zdarzeń w czasie rzeczywistym jak i na powtórne przetwarzanie zdarzeń z przeszłości.

Mnogość zastosowań tak zdefiniowanego problemu jest ogromna, a architektura oparta o mikroserwisy z pewnością do takiej kategorii należy.

Gdzie tkwi siła Kafki?

Przede wszystkim należy na początku zauważyć, że Kafka reprezentuje zupełnie inny model integracji niż rozwiązania pokroju JMS i nie opiera się na centralizacji zarządzania. W tradycyjnym brokerze, to on decyduje co i komu wysłać, podążając za podejściem push. To powoduje sporą duplikację bo wymusza utrzymywanie informacji odnośnie wysłanych wiadomości do każdego klienta, włącznie z samą wiadomością. Jeśli naszym zamiarem jest obsługa dużego ruchu, a także stworzenie namiastki high-availability, musimy liczyć się z dość nietrywialnym sprzętem i już na wstępie widać, że skalowanie może nie przebiegać zbyt gładko. Kafka wybrała inną drogę, w ramach której dane są z definicji rozproszone i co do zasady tworzone raz, niezależnie od ilości klientów, którym zostaną udostepnione.

W Kafce, zasada działania klienta subskrybującego topic polega na pobieraniu danych przy ciągłym precyzowaniu „numeru” zdarzenia (lub kilku kolejnych zdarzeń), które chce otrzymać. Jest to spore przesunięcie odpowiedzialności ze strony brokera do klienta w kierunku podejścia pull. To minimalizuje ilość informacji po stronie brokera odnośnie podłączonych klientów, a ponieważ każdy nowy klient nie wymaga dużej ilości systemowych zasobów, takich klientów możemy podłączyć i obsłużyć więcej bez znaczącego wpływu na wydajność. Przy tradycyjnym podejściu, ze wzrostem konsumentów następuje niestety powolna degradacja wydajności.

W tym miejscu warto także poruszyć bardzo istotny aspekt Kafki, strukturę topicu, który jest podzielony na fragmenty nazywane partycjami. Jest to uporządkowana sekwencja zdarzeń, do której są dopisywane kolejne zdarzenia przy publikacji, a całość tworzy wspomniany wcześniej dziennik zdarzeń (commit log). Każdemu ze zdarzeń jest przyporządkowany unikalny „numer” nazywany offsetem, który pozwala na identyfikację w ramach partycji, którą wybiera się w momencie zapisu. W tymi miejscu warto się zatrzymać na moment i przyswoić ta informację, że topic jest strukturą, która nie jest „liniowa” i nie gwarantuje kolejności zdarzeń w swojej całej objętości. Ta jest zapewniana na poziomie partycji, jeśli więc potrzebujesz kolejki, topic z jedną partycją będzie pierwszą rzeczą na jaką zwrócisz uwagę. Jest to kompromis jaki twórcy Kafki wybrali, ale dzięki niemu byli w stanie rozproszyć dane na więcej węzłów klastra i uzyskać poziome skalowanie swojego rozwiązania, a im więcej partycji tym wyższa skalowalność. Jeśli więc zdecydujemy się na migrację i użycie Kafki w aplikacji, widać dokładnie, że nie jest to zwykła zamiana i nie uciekniemy od dokładniejszej znajomości procesów, które wspiera nasz system.

Rozproszona struktura pozwala na wydajną replikację partycji, a co za tym idzie wysoką dostępność oraz automatyczną organizację klastra, w momencie gdy jeden z węzłów z jakiegoś powodu stał się niedostępny.

Klient mając kontrolę nad tym jakie zdarzenie odczytuje, zyskuje dodatkową możliwość cofania się w czasie i ponownego przetwarzania zdarzeń, jeśli zajdzie taka potrzeba bez konieczności jakichkolwiek zmian po stronie brokera. Jeśli nasz system z jakiegoś powodu niewłaściwie obsłużył serię zdarzeń, po skutecznej naprawie jesteśmy w stanie zresetować offset zdarzenia, od którego mamy ponowić przetwarzanie i tym razem, już poprawnie, je obsłużyć.

Małym choć miłym dodatkiem do powyższych jest fakt, że Kafka jest wspierana u większości dostawców chmurowych bez większego wysiłku, archaiczni choć ciągle szeroko używani brokerzy jak IBM MQ czy też Tibco JMS nie mogą się tym pochwalić.

Integracja przy użyciu Javy

Sporo już zostało powiedziane o Kafce, ale nic tak nie tłumaczy zagadnienia jak faktyczny działający kod. Poniższe przykłady bardziej dobitnie pokazują, jaki wpływ ma ilość partycji na odczytywanie topicu, ale aby móc zacząć o tym w ogóle mówić, potrzebujemy działającej instancji. Zapewni to testcontainers, który wprowadza do testów możliwość uruchamiania obrazów dockera, przez co możemy testować integrację z faktyczną implementacją przeróżnych narzędzi, w naszym wypadku, instancję Kafki.

@Testcontainers
class ScenarioTest {

    public static final String TOPIC = "topic-1";

    @Container
    private final KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    @BeforeEach
    void beforeEach() {
        assertTrue(container.isRunning());
    }
    ...
}

Mając działającego brokera, możemy napisać testy, sprawdzające kolejność odczytywanych wiadomości z topicu, który ma 3 partycje.

    ...
    @Test
    void consumerShouldFetchEventsFromMultiplePartitionsOutOfOrder() {
        // given
        String bootstrapServers = container.getBootstrapServers();
        createTopic(bootstrapServers, TOPIC, 3);
        try (EventPublisher producer = new EventPublisher(bootstrapServers)) {
            IntStream.range(1, 11)
                    .forEach(counter -> producer.send(TOPIC, "key" + (counter % 3), "msg:" + counter));
        }

        // when
        EventConsumer consumer = new EventConsumer("consumer-1", bootstrapServers);
        consumer.subscribe(TOPIC);

        // then
        await()
            .atMost(10, TimeUnit.SECONDS)
            .untilAsserted(() -> {
                List<String> messages = consumer.getAllEvents();
                assertThat(messages)
                    .containsExactlyInAnyOrder(
                        "msg:1","msg:2","msg:3","msg:4","msg:5",
                        "msg:6","msg:7","msg:8","msg:9","msg:10"
                    );
            });

        // cleanup
        consumer.close();
    }
	...

Mamy więc pewność, że otrzymaliśmy wszystkie nasze zdarzenia, ale nie zakładamy ich kolejności. I bardzo dobrze, bo gdy przyjrzymy się logom, zobaczymy jak faktycznie były odczytywane.

18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key1 - msg:1
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key2 - msg:2
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key1 - msg:4
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key2 - msg:5
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key1 - msg:7
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key2 - msg:8
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key1 - msg:10
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key0 - msg:3
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key0 - msg:6
18:26:31.678 [pool-5-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key0 - msg:9

W ramach topicu, klient odczytuje elementy z każdej partycji w tym samym czasie, co powoduje brak kolejności w jakiej zostały opublikowane. Jeśli użyjemy tylko jednej partycji, możemy oczekiwać poprawnej kolejności w bliźniaczym teście:

    ...
    @Test
    void consumerShouldFetchEventsFromOnePartitionInOrder() {
        // given
        String bootstrapServers = container.getBootstrapServers();
        createTopic(bootstrapServers, TOPIC, 1);
        ...

        // then
        await()
            .atMost(10, TimeUnit.SECONDS)
            .untilAsserted(() -> {
                List<String> messages = consumer.getAllEvents();
                assertThat(messages)
                    .containsExactly(
                        "msg:1","msg:2","msg:3","msg:4","msg:5",
                        "msg:6","msg:7","msg:8","msg:9","msg:10"
                    );
            });

        // cleanup
        consumer.close();
    }

Idąc dalej, nie musimy się ograniczać tylko do jednego klienta odczytującego topic, możemy tworzyć grupy zawierające większą liczbę klientów, które jako całość będą pobierać dane z podzielonego na partycje topicu.

    ...
    @Test
    void multipleConsumerShouldFetchEventsFromMultiplePartitions() {
        // given
        String bootstrapServers = container.getBootstrapServers();
        createTopic(bootstrapServers, TOPIC, 3);
        try (EventPublisher producer = new EventPublisher(bootstrapServers)) {
            IntStream.range(1, 11)
                    .forEach(counter -> producer.send(TOPIC, "key" + (counter % 3), "msg:" + counter));
        }

        // when
        List<EventConsumer> consumers = IntStream.range(1, 3)
                .mapToObj(operand -> {
                    EventConsumer consumer = new EventConsumer("consumer-" + operand, bootstrapServers);
                    consumer.subscribe(TOPIC);
                    return consumer;
                })
                .collect(toList());

        // then
        await()
            .atMost(10, TimeUnit.SECONDS)
            .untilAsserted(() -> {
                List<String> messages = consumers.stream()
                        .map(EventConsumer::getMessages)
                        .flatMap(Collection::stream)
                        .collect(toList());
                assertThat(messages)
                    .containsExactlyInAnyOrder(
                        "msg:1","msg:2","msg:3","msg:4","msg:5",
                        "msg:6","msg:7","msg:8","msg:9","msg:10"
                    );
            });

        // cleanup
        consumers.forEach(EventConsumer::close);
    }
    ...

Znacząco przyspieszając przetwarzanie dostępnych zdarzeń, oczywiście jeśli ich kolejność nie jest tak istotna.

18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key1 - msg:1
18:33:06.309 [pool-1-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key0 - msg:3
18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key2 - msg:2
18:33:06.309 [pool-1-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key0 - msg:6
18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key1 - msg:4
18:33:06.309 [pool-1-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-1 < topic-1|key0 - msg:9
18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key2 - msg:5
18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key1 - msg:7
18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key2 - msg:8
18:33:06.309 [pool-2-thread-1] INFO pl.niezawodnykod.in5mins.kafka.EventConsumer - < consumer-2 < topic-1|key1 - msg:10
Najczęstsze pytania na rekrutacji

Jak z każdą nową technologią, z która chcesz pracować, dobrze jest ją poznać poprzez własne miniprojekty, w ramach których odsłaniają się kolejne tajniki. Nie oszukujmy sie jednak, że to zastąpi doświadczenie zdobyte podczas pracy nad produkcyjnym systemem używającym Kafki. Niemniej, nie dając się złapać na typowe pytania, które padają na rozmowach rekrutacyjnych, zwiekszamy swoje szanse pokazując, że rozumiemy koncepty jakie kierują daną technologią. Natomiast kruczki z dokumentacji typu „Jak często log compactor dokonuje próby czyszczenia commit loga?” zostawiam tam gdzie ich miejsce, nadal w dokumentacji. Tutaj parę zagadnień, które również warto zgłębić, aby lepiej zrozumieć działanie Kafki.

  • Czym jest koncept leader and follower użyty w Kafce [link].
  • Jaką rolę w Kafce pełni ZooKeeper? [link1][link2]
  • Czym jest consumer group i jak działa w momencie, gdy mamy więcej niż jednego klienta? [link]
  • Jakie znaczenie dla wydajności ma offset? [link]
Słowo na koniec

Kafka jako narzędzie jest niezwykle elastyczna i oferuje ogromne możliwości dla systemów, które zdecydują się o nią oprzeć swoją komunikację. Ten jakże krótki wpis, nawet nie próbuje dać złudzenia, że całe zagadnienie jest prostym plug-and-play, bo jak z każdym narzędziem, także i to ma swój narzut (jeśli korzystasz z typowych chmurowych providerów, jak Confluent Cloud czy też Aiven, ten może być nieco zminimalizowany). Niemniej warto rozważyć swoje aktualne wymagania, a także przyszłe ambicje systemu, bo być może Kafka okaże się drogą godną uwagi, a przy tym zapewniającą odpowiednią skalowalność wraz ze wzrostem ilości danych. Miejmy też na uwadze, że Kafka to nie rozwiązanie na każdą bolączkę integracji systemu i tradycyjny broker nadal ma swoje miejsce w IT, tylko może kierujmy się do lżejszych rozwiązań jak ArtemisMQ, które mocno wspierają testowanie aplikacji.

Cały projekt, który posłużył jako baza do tego wpisu możesz znaleźć tutaj: https://bitbucket.org/PawelRebisz/in5mins-kafka/src/master/ i uruchomić własnoręcznie. Projekt korzysta z testcontainers, także zadbaj o uprzednią instalację, nim uruchomisz przykłady.

W odpowiedzi na “W 5 minut – Kafka”

Cześć! Dzięki za sprawne wyjaśnienie jak rozpocząć swoją przygodę z Kafką. Sam nie mam doświadczenia komercyjnego z tym narzędziem (taka specyfika zespołu, w którym pracuję), ale nie ukrywam, że spróbowałbym swoich sił w implementacji takiego brokera wiadomości. Nie wygląda to skomplikowanie, ale pewnie tak jak mówisz, to dopiero czubek góry lodowej!

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *