Dzisiejszy wpis poświęcę jednemu z moich ulubionych tematów programistycznych, szybkości przetwarzania. Jest to aspekt programowania, który przez ostatnie lata nieco się zmienił. Dużo większy nacisk kładzie się na poprawne zamodelowanie problemu kosztem szybkości przetwarzania i w wielu miejscach ma to sens. Będąc w pracy, poświęcamy coraz więcej czasu na tworzenie kolejnych mikroserwisów i w momencie problemów z wydajnością, skalujemy się poziomo. Używając chmury, dokładamy kolejne serwery wraz z kolejnymi instancjami serwisu w nadziei, że rozwiąże to nasz problem. Ani moce procesora, ani pamięć na maszynie nie ograniczają nas, ponieważ w najgorszym wypadku zawsze dołożymy kolejną. Jest to z pewnością zalecane podejście, mające wiele dobrych skutków ubocznych, np. odporność na śmierć pojedyńczej maszyny. Jeśli tylko nasze zadanie jest podzielne i nie musi być wykonane w obrębie jednej instancji, to powinna być nasza droga.
Z chwilą, gdy zadanie musi zostać wykonane w całości lokalnie, możemy tę dedykowaną maszynę dopieścić i skalować pionowo, tworząc poważną machinę z wyśrubowaną ilością pamięci i rdzeni. Niestety tutaj możemy się spotkać z fizycznymi ograniczeniami, więc tę operację możemy przeprowadzać tylko do pewnego momentu. Przykładem takiego lokalnego problemu, może być scentralizowane źródło prawdy, np. zbiór zleceń na giełdzie (określany najczęściej jako orderbook). Na ogół charakterystyka tego problemu ma pewne cechy:
- musi być lokalny, utrzymanie spójności bliźniaczych zbiorów w trybie read/write daje ogromny narzut związany z synchronizacją,
- musi być deterministyczny i poprawnie porządkować przetwarzanie kolejnych instrukcji, w nielokalnym ustawieniu, czekanie i sprawdzanie kolejności generuje ogromne straty, wpływając negatywnie na ostatnią, najistotniejszą cechę, mianowicie,
- musi być szybki, jeśli ten zbiór jest jedynym źródłem prawdy, musi być w stanie obsłużyć wszystkie interakcje najszybciej jak to możliwe, wszelkie przestoje i opóźnienia będą propagowane przenikając cały system
W tym miejscu, nie będę poruszał tematu różnorakich podręcznych i rozproszonych cache, bo choć rozwiązują problem centralizacji danych, to co do zasady, prędkość nie jest ich głównym i nadrzędnym celem. Mogą być szybkie, ale odbywa się to kosztem ewentualnej spójności. Jeśli są spójne, nie mogą być zarazem szybkie, ponieważ zapis prawdopodobnie jest obciążony mechanizmem odporności na utratę danych, np. dodatkową replikacją przy zapisie. Co nam zatem pozostaje?
Stare dobre wątki
W przypadku takich problemów, najczęściej przychodzą do głowy wszelkie śmiałe pomysły, wliczając kolejne narzędzie z programistycznej skrzynki – wątki. W Javie jest to prosta klasa, choć daleko jej od bycią zwykłą. Osoby mniej zaznajomione z tematem wątków, zachęcam do sięgnięcia po dokumentację Oracle, która jest bardzo dobrym źródłem informacji. Cytując B. Parkera:
With great power comes great responsibility.
Ich użycie niesie bardzo wiele korzyści, z czego nadrzędnym jest możliwość przetwarzania współbieżnego, mówiąc potocznie „możemy przetwarzać parę rzeczy naraz”. Mając do wykonania parę zadań w ramach naszej aplikacji, możemy odpowiednio je podzielić i przypisać poszczególnym wątkom, osiągając w każdym z nich pewien postęp. Wątki zabiegają o dostęp do zasobów danej maszyny, a w szczególności o dostęp do procesora. Im częściej się to udaje, tym większy postęp zachodzi w ramach jednego wątku. Przy coraz bardziej zaawansowanych architekturach wieloprocesorowych zbliżamy się wręcz do przetwarzania równoległego, choć nadal mogą występować inne ograniczenia, np. współdzielona pamięć.
Ponieważ „darmowe obiady” nie występują przy tworzeniu oprogramowania, naturalnie jest też druga strona medalu. Systemy tworzone w oparciu o wielowątkową architekturę są dużo bardziej skomplikowane i ciągną za sobą problemy, czasami nietrywialne, jak zakleszczenia (deadlock) czy też zagłodzenia wątku.
Nakreślenie problemu
Rozpatrzmy zatem pewien wyimaginowany i trywialny na pozór problem. Nasz system generuje pewne zdarzenia, przekazując do dalszego przetwarzania wartości long opakowane w prostą klasę:
public class DataTransferEvent { @Getter @Setter private long value; }
System jest głównym i jedynym źródłem tych zdarzeń, zamknięty w obrębie interfejsu Runnable
, a nam zależy na możliwie najszybszym ich odczytywaniu. Sensownym więc wydaje się użycie wielu wątków. Zdarzenia przekazywane są do medium, które pozwala na ich bezpieczne przekazywanie do wielu wątków:
public class DefaultEventProducer implements Runnable { ... public void run() { while (true) { transport.send(random.nextLong()); numberOfMessagesSent.set(numberOfMessagesSent.incrementAndGet()); } } }
Wspomniane medium to implementacja prostego interfejsu BetweenProcessTransport
, opierająca się na kolejce ArrayBlockingQueue
. Jest to jedna z szybszych opcji w ramach jdk do tego typu zadań. Używając implementacji BlockingQueue
, dbamy, aby każdy wątek na równych prawach mógł pobierać zdarzenia do przetwarzania, a także w prosty sposób synchronizować to pobieranie pomiędzy wątkami.
public class BlockingQueueBasedTransport implements BetweenProcessTransport { private final ArrayBlockingQueue<DataTransferEvent> queue; ... @Override public void send(long message) { DataTransferEvent valueEvent = new DataTransferEvent(); valueEvent.setValue(message); try { queue.put(valueEvent); } catch (InterruptedException exception) { throw new RuntimeException("Something went wrong here", exception); } } ... }
Nasze nowo utworzone wątki, mając dostęp do powyższego medium, pobierają kolejne zdarzenia i przetwarzają je używając DefaultEventConsumer
.
public class BlockingQueueBasedTransport implements BetweenProcessTransport { private final DefaultEventConsumer[] handlers; ... @Override public void start() { ExecutorService executorService = Executors.newFixedThreadPool(handlers.length); Arrays.asList(handlers) .forEach(handler -> executorService.execute(eventFetchingWrapperAround(handler))); } ... private Runnable eventFetchingWrapperAround(DefaultEventConsumer handler) { return () -> { while (true) { try { DataTransferEvent take = queue.poll(100L, TimeUnit.MILLISECONDS); if (!isNull(take)) { handler.process(take); } } catch (InterruptedException exception) { throw new RuntimeException("Something went wrong here", exception); } } }; } }
Tak poukładany system pozwala na współbieżne przetwarzanie i można wręcz odnieść wrażenie, że im więcej wątków będziemy używać, tym lepiej.
Warto także zaznaczyć, że wszelkie poniższe testy są wykonywane na moim prywatnym laptopie z relatywnie skromnym procesorem Intel Core i7-4720HQ z architekturą Haswell, a nie na serwerowych potworach ze stajni Xeon.
Sprawdźmy, jak nasz przykładowy system radzi sobie z 3 wątkami po stronie odbiorcy. Do monitorowania i analizy używam Grafany ze specjalnym dashboardem zdefiniowanym tutaj. Dzięki temu, możemy obserwować, ile zbiorczo system wysłał zdarzeń, ile każdy z wątków przeprocesował zdarzeń samodzielnie, a także ile zdarzeń przeprocesowano w sumie.
Powyższy wykres daje nam pierwsze bazowe zrozumienie jak system się zachowuje. Możemy mentalnie odnotować, że jesteśmy w stanie przetworzyć średnio po 1,1M zdarzeń na wątek. Jednym wychodzi to lepiej, innym gorzej, ale jako całość, 3,3M zdarzeń udaje się przetworzyć. Wygląda zachęcająco, ale być może, jesteśmy w stanie wycisnąć z naszego systemu nieco więcej? Może udałoby się obsłużyć co najmniej 5M zdarzeń? Rozwiązanie jest w miarę dobrze zdekomponowane, więc dodając kolejne 2 wątki, nasze oczekiwania powinny być spełnione.
Patrząc na powyższy wykres, możemy być lekko zaskoczeni, że wbrew obiecywanych 5M obsłużonych zdarzeń, mamy 2,4M. Każdy z wątków zdołał obsłużyć niecałe 0,5M zdarzeń. Dobrze, że monitorujemy zachowanie naszego systemu i jesteśmy w stanie dokładnie zauważyć, jak wprowadzona zmiana w ustawieniach bezpośrednio wpływa na cały system. W tym wypadku negatywnie, więc ten kierunek warto szybko porzucić.
Skoro zwiększenie ilości wątków mocno zdegradowało osiągi systemu, być może odwrotny kierunek pozwoli na uzyskanie nieco lepszych wyników. Co może się stać, gdy użyjemy po prostu jednego wątku?
Na ogół niespodziewamy się dwóch rzeczy, hiszpańskiej inkwizycji oraz tego, że jeden wątek poradzi sobie z zadaniem lepiej niż trzy (lub pięć). Dzięki zastosowaniu jednego wątku, uzyskaliśmy najlepszy wynik, jaki nasz system może zaoferować przy aktualnej konstrukcji. Wynik jest niezły, nawet zbliżyliśmy się do wcześniej obranego celu, ale pozostaje pytanie…
Mniej znaczy więcej?
Nie jest niespodzianką, że przy rozwiązywaniu problemów, gdzie zamierzamy użyć wielu wątków, otwieramy się na nową gamę czynników, które musimy brać pod uwagę. Nie ma darmowych obiadów. W naszym wypadku, ogromne znaczenie ma synchronizacja między wątkami i fakt, że każdy z nich musi konkurować o dostęp do kolejki, z której pobierane są kolejne zdarzenia. Im więcej wątków, tym więcej walki o ten dostęp. W takim razie, eliminując przepychanki przy pobieraniu zdarzenia, zyskujemy na szybkości przetwarzania. Także ogromne znaczenie ma tutaj fakt, że samo przetworzenie zdarzenia jest na tyle banalne, że nie zajmuje dużo czasu. W związku z tym, każdy jeden wątek będzie spędzał relatywnie dużo czasu na operacji pobieraniu kolejnych zdarzeń. Warto więc zwracać uwagę, jak dużo pracy chcemy wykonać w obrębie jednego wątku, czasami nie opłaci się skórka za wyprawkę.
A może niesekretny sos?
Problemy z jakimi się często mierzymy, najczęściej już zostały rozwiązane przez kogoś innego. Poprawa szybkości przetwarzania nie jest tutaj wyjątkiem, podobnie jak powyżej przytoczony problem producenta i konsumenta, na którym opieraliśmy nasze rozważania. Parę lat temu, pewna firma chcąc spełnić swoje ambitne plany, ulubowała się w powyższym schemacie, czego efektem było powstanie disruptora. Biblioteka ta oferuje niesamowitą szybkość w przekazywaniu i przetwarzaniu danych w ramach jednego wątku. Sprawdzona w boju, stała się popularnym narzędziem wspierającym programistów w tworzeniu piekielnie szybkich systemów.
Skoro tak, być może zastąpienie medium z prostą kolejką nową implementacją, spowoduje poprawę naszych wyników?
Klasa DisruptorBasedTransport
korzysta z głównych mechanizmów oferowanych przez bibliotekę, czyli: Disruptor
oraz RingBuffer
. RingBuffer
to nieblokująca struktura zastępująca kolejkę, natomiast Disruptor
oprócz udzielenia nazwy dla całej biblioteki, umożliwia podpięcie przetwarzania.
public class DisruptorBasedTransport implements BetweenProcessTransport { ... @Override public void start() { disruptor.handleEventsWith(handlers); ringBuffer = disruptor.start(); } @Override public void send(long message) { long sequence = ringBuffer.next(); try { DataTransferEvent valueEvent = ringBuffer.get(sequence); valueEvent.setValue(message); } finally { ringBuffer.publish(sequence); } } }
To natomiast prowadzi do bardzo ciekawych wyników.
W powyższym rozwiązaniu używamy jednego wątku do przetwarzania, więc nie występują żadne przepychanki przy pobieraniu zdarzenia. Do pewnego stopnia, podobny charakter działania miała użyta kolejka w pierwszej wersji. O czym jednak nie wspominałem, to fakt, że wątki pobierające zdarzenia, konkurują o dostęp do kolejki, nie tylko między sobą. Część aplikacji (będącą osobnym wątkiem), która zapisuje zdarzenie do kolejki, również wymaga pewnego dostępu i synchronizacja między wątkami tutaj jest nie do uniknięcia. W nowej implementacji, użycie disruptora minimalizuje wpływ tego czynnika praktycznie do zera. Ten, i parę innych niuansów wewnątrz biblioteki, pozwala nam uzyskać blisko 17M obsłużonych zdarzeń, czyli ponad trzy razy więcej niż przy użyciu zwykłej kolejki. Wniosek, czego użyć, wydaje się oczywisty.
Słowo na koniec
Powyższe rozważania nie mają na celu pokazania rozwiązania będącego panaceum na absolutnie wszystkie problemy związane z szybkością przetwarzania, bo używanie wielu wątków czasami faktycznie się opłaca. Disruptor, mimo swoich niezaprzeczalnych zalet, ma swoje zastosowanie tylko w niektórych przypadkach, a w miejscach, gdzie faktycznie zostaje użyty, wymusza pewne konwencje i architekturę systemu. Jest to konieczne, aby móc wykorzystać w pełni drzemiące w nim możliwości, co jednak nie zawsze jest nam na rękę lub wręcz jest niemożliwe.
Trzeba także pamiętać, że powyższe przykłady są jednak swoistą piaskownicą. Niektóre rozwiązania lubią poddawać się zabiegom, które je znacząco przyspieszają, podczas gdy inne wręcz przeciwnie, będą bardzo szybko degradować szybkość przetwarzania i negatywnie wpływać na cały system. Charakter naszego problemu określa metody, jakich możemy użyć. Tym bardziej widoczne staje się, jak bardzo istotne jest zbieranie statystyk dotyczących pracy systemu i ich późniejsza analiza. Tylko w ten sposób, bez cienia wątpliwości, wiemy, że nasze zmiany faktycznie stoją za poprawą szybkości naszego systemu.
Cały projekt, który posłużył jako baza do tego wpisu możesz znaleźć tutaj: https://bitbucket.org/PawelRebisz/lmax-disruptor-usage/src/master/, więc jeśli chcesz samemu sprawdzić jakie pomiary uzyskasz na swoim komputerze, wszystko znajduje się w tym repozytorium. Znajdziesz tam zarówno skonfigurowaną Grafanę opakowaną w docker-compose jak i konfigurowalny generator ruchu.
2 odpowiedzi na “W poszukiwaniu szybkości”
Bardzo fajny artykuł, chyba najciekawszy do tej pory. Mimo, że ma całkiem całkiem konkurencję 😉
A co gdyby zrobić takiego SuperDisruptora ™, który sklejałby Disruptory i wykorzystując splątanie kwantowe w pokojowy sposób zapisuje/pobiera zdarzenia 🤔 Aż mnie ciarki przeszły
+ za fajne linki do zewnętrznych źródeł.
Dzięki wielkie za pochlebną opinię 🙂 Co do samego disruptora, to systemy tworzone w oparciu właśnie o niego mają tendencję do łączenia kolejnych ring bufferów ze sobą, więc pewne splątanie osiągniesz. Jeśli chcesz być naprawdę „edgy” to tworzysz je javassist’em w runtime poprzez anotacje 😉