[Python] Easy multithreading
Heute möchte ich euch mal einen Weg zeigen, wie es relativ einfach möglich ist, Multithreading in Python zu realisieren.
Wie immer, hat Python da schon nützliche Module parat, die nur darauf warten genutzt zu werden. :)
Für diejenigen, die nicht genau wissen was ein Thread bzw. Multithreading ist:
Crashkurs Multithreading
Wird ein Programm gestartet, wird dementsprechend ein "Prozess" gestartet, welcher vom "Prozess-Scheduler" des Betriebssystems verwaltet wird und die Ablaufumgebung des Programms als Binärcode darstellt.
Der Prozess-Scheduler wechselt zwischen allen aktiven Prozessen (also allen laufenden Programmen) hin und her, wodurch der Eindruck entsteht, dass mehrere Programme gleichzeitig laufen. Dem ist jedoch nicht so, denn zu jedem Zeitpunkt, wird immer nur ein Prozess verarbeitet.
Bei einem Programm wird i.d.R. immer nur ein Befehl nach dem anderen ausgeführt, das heißt, dass der nächste Befehl solange bis zu seiner Verarbeitung warten muss , bis der aktuelle Befehl verarbeitet wurde.
Bei einem Programm wird i.d.R. immer nur ein Befehl nach dem anderen ausgeführt, das heißt, dass der nächste Befehl solange bis zu seiner Verarbeitung warten muss , bis der aktuelle Befehl verarbeitet wurde.
Ein "Thread" jedoch, ist eine nebenläufige Ausführungseinheit innerhalb eines Prozesses. Jeder Prozess hat mindestens einen Thread der den Programmcode ausführt und kann aber bis zu hundert oder mehr Threads auf einmal haben.
Threads ermöglichen es also, gleichzeitig mehrere Befehle auszuführen. Dies funktioniert so, dass ein Thread erstellt wird und ihm dann eine Aufgabe zugewiesen wird, welche dann asynchron neben dem Hauptthread verarbeitet wird.
Bitte mit Klasse
Um alles organisiert zu halten, schreiben wir uns dafür am besten eine Klasse.
Lasst uns also mal überlegen, was diese können soll:
- Threads erstellen
- Aufgaben den Threads zuweisen
- Threads ausführen
- Rückgabewerte der Aufgaben entgegennehmen
- Maximale Anzahl an Threads überwachen bzw. begrenzen
- Threads schließen
Threads erstellen
Threads werden in einem Pool an Threads erstellt, welcher sicherstellt, dass die vorher definierte Anzahl an Threads nicht überschritten wird. Darüber stellen wir sicher, dass nicht blind versucht wird, mehrere hundert Threads zu erstellen, was definitiv zum Absturz des Programms führen würde.
Aufgaben den Threads zuweisen
Hier müssen wir uns schon ein bisschen mehr Gedanken machen wie wir das am besten realisieren. Eine Aufgabe wird über eine Funktion definiert, welche wir dann an den Konstruktor der Klasse übergeben werden. Da wir es ebenfalls ermöglichen möchten, dass eine solche Funktion nicht an eine bestimmte Anzahl an Parametern begrenzt ist, werden wir zusätzlich an den Konstruktor ein Tupel mit Argumenten übergeben. Dieser Tupel wiederum, wird dann beim Aufruf der Aufgabenfunktion als Argument übergeben. So ist sichergestellt, dass jede beliebige Funktion mit beliebig vielen Parametern als Aufgabenfunktion genutzt werden kann.
Threads ausführen
Wie oben schon erwähnt, wird die dem Konstruktor übergebene Funktion mit dem ebenfalls übergebenen Argumententupel aufgerufen und an einen Thread übergeben.
Rückgabewerte der Aufgaben entgegen nehmen
Wenn in der Aufgabenfunktion ein Rückgabewert definiert wurde, soll dieser natürlich von der Klasse entgegen genommen bzw. weitergereicht werden.
Maximale Anzahl an Threads überwachen bzw. begrenzen
Darüber wurde eigentlich schon alles in "Threads erstellen" gesagt.
Threads schließen
Dies geht mit der Begrenzung der Anzahl an Threads einher. Ist ein Thread fertig mit seiner Aufgabe, wird er geschlossen um so Platz für einen neuen Thread und seine Aufgabe zu machen.
Wurden alle Aufgaben abgearbeitet, kann der Pool geschlossen werden.
Die Module
Wie ich bereits erwähnt hatte, bietet Python schon Module, die uns den Umgang mit Threads vereinfachen, binden wir es also ein:
1 | from multiprocessing.pool import ThreadPool |
Dieses Modul stellt uns "ThreadPool" zur Verfügung.
Das ist auch schon alles was wir an Modulen benötigen.
Das Grundgerüst
Erstellen wir uns also ein Grundgerüst der Klasse:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | class MultiThread(): @classmethod def begin(cls): pass @classmethod def end(cls): pass def __init__(self): pass def run(self): pass |
Nicht grad viel, aber mehr brauchen wir auch nicht :)
Gehen wir das ganze mal durch:
@classmethod def begin(cls): passDiese Methode wird den Threadpool initialisieren und die maximale Anzahl an Threads definieren.
Da diese Informationen nicht an einen einzelnen Thread gebunden sind, machen wir eine Klassenmethode daraus
@classmethod def end(cls): passHier werden wir den ThreadPool schließen und auf den Abschluss aller Threads warten.
Wie bei der Methode "begin", ist diese Funktionalität ebenfalls nicht an einen Thread gebunden, daher wird auch diese Methode eine Klassenmethode
def __init__(self): passDer Konstruktor. Diesem werden wir einen Handle der Aufgabenfunktion und die Argumente als Tupel übergeben.
def run(self): passUnd schließlich die Funktion, welche den Thread startet und ihm die Aufgabenfunktion und die dazugehörigen Argumente übergibt. Zugleich wird diese die Rückgabewerte entgegennehmen und diese ebenfalls zurückgeben.
Zusätzlich brauchen wir noch eine Klassenvariable, welche den Threadpool speichert:
1 | __thread_pool = None |
Nun können wir uns an das Ausprogrammieren der einzelnen Methoden machen:
Die begin Methode
In der Methode "begin" erstellen wir einen Threadpool, in dem alle aktiven Threads hinterlegt werden.
1 2 3 | @classmethod def begin(cls, max_threads): MultiThread.__thread_pool = ThreadPool(max_threads) |
MultiThread.__thread_pool = ThreadPool(max_threads)Hier rufen wir die Klassenvariable "__thread_pool" auf, und übergeben ihr den mit "ThreadPool" erstellten Threadpool. Gleichzeitig übergeben wir dieser Funktion die Anzahl der maximal möglichen Threads.
Die end Methode
In der Methode "end" sorgen wir dafür, dass nach Vergabe aller Aufgaben, der Threadpool geschlossen wird und auf das erledigen aller Aufgaben gewartet wird.
1 2 3 4 | @classmethod def end(cls): MultiThread.__thread_pool.close() MultiThread.__thread_pool.join() |
Der Konstruktor
In der "__init__" Methode, oder auch oft Konstruktor genannt, wird ein Objekt unserer Klasse "Multithread" erstellt.
1 2 3 | def __init__(self, target=None, args:tuple=()): self.__target = target self.__args = args |
Die run Methode
Die "run" Methode erstellt einen neuen Thread, übergibt diesem die im Konstruktor übergebene Aufgabenfunktion und die dazugehörigen Argumente. Anschließend gibt sie den Rückgabewert der Aufgabenfunktion zurück.
1 2 3 4 5 6 | def run(self): try: result = MultiThread.__thread_pool.apply_async(self.__target, args=self.__args) return result.get() except: pass |
Der komplette Code sieht dann wie folgt aus:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | from multiprocessing.pool import ThreadPool class MultiThread(): __thread_pool = None @classmethod def begin(cls, max_threads): MultiThread.__thread_pool = ThreadPool(max_threads) @classmethod def end(cls): MultiThread.__thread_pool.close() MultiThread.__thread_pool.join() def __init__(self, target=None, args:tuple=()): self.__target = target self.__args = args def run(self): try: result = MultiThread.__thread_pool.apply_async(self.__target, args=self.__args) return result.get() except: pass |
Damit wären wir auch schon fertig mit unserer Klasse.
Der Test
Um sicherzustellen, dass auch alles funktioniert, schreiben wir uns einen kleinen Test.
Zuerst brauchen wir also eine Aufgabenfunktion, welche als Thread aufgerufen werden kann:
1 2 3 4 | def call_me(test1, test2): print(test1 + test2) if test2 == "": return False return True |
Nun können wir einen kleinen Code schreiben, der die Funktion als Thread mithilfe unserer Klasse "Multithread" erstellt .
1 2 3 4 5 | MultiThread.begin(50) for i in range(0, 100): t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2")) t.run() MultiThread.end() |
Gehen wir das mal durch:
MultiThread.begin(50)Hier starten wir das Multithreading und legen fest, dass maximal 50 Threads auf einmal laufen können.
for i in range(0, 100):Eine "for" Schleife welche 100 mal durchlaufen werden soll
t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))Hier erstellen wir ein neues Objekt unserer Klasse "MultiThread" und übergeben dem intern aufgerufenen Konstruktor (__init__) unsere Testfunktion "call_me" mit 2 Strings als Argumente.
t.run()Anschließend geben wir den Startschuss den Thread zu erstellen und laufen zu lassen.
MultiThread.end()Wurden alle 100 Threads erstellt, kann der Pool geschlossen werden und auf das beenden aller Threads gewartet werden.
Der Output sieh wie folgt aus:
... This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 ...
Sieht so aus, als wenn der Aufruf der Funktion klappt, lasst uns nun das Zurückgeben des Rückgabewertes testen.
Dazu ändern wir den Aufruf der "run" Methode ein klein wenig ab:
1 2 3 4 5 6 7 8 | MultiThread.begin(50) for i in range(0, 99): t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2")) result = t.run() if result == False print("Success " + str(i)) break MultiThread.end() |
result = t.run()Hier speichern wir den Rückgabewert der Methode "run" in der Variable "result"
if result == FalseUnd überprüfen ihren Wert
print("Success " + str(i)) breakIst dieser Wert "False", dann geben wir "Success" und den Wert der Iteratorvariable "i" aus, um überprüfen zu können, in welcher Iterationstiefe die Schleife beendet wurde. Anschließend beenden wir die Schleife sofort über den Befehl "break".
Würden wir diesen Code jetzt ausführen, würde die if-Bedingung gleich beim ersten Schleifendurchlauf erfüllt sein, denn unsere "call_me" Funktion gibt dann "True" zurück, wenn der zweite Parameter kein leerer String ist. Und momentan übergeben wir als zweiten Parameter "This is string 2.
Also lasst uns das durch eine zweite Bedingung ändern:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | MultiThread.begin(50) for i in range(0, 100): t = None if i == 50: t = MultiThread(target=call_me, args=("This is string 1 ", "")) else: t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2")) result = t.run() if result == False: print("Success " + str(i)) break MultiThread.end() |
t = NoneDa "t" nun in einem "if"-Block definiert wird, müssen wir "t" auch außerhalb dieses Blocks bekannt machen
if i == 50:Hier überprüfen wir, ob der 50ste Schleifendurchlauf erreicht wurde
t = MultiThread(target=call_me, args=("This is string 1 ", ""))Wenn ja, dann übergebe als zweiten Parameter einen leeren String
else: t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))Andernfalls übergebe wie gehabt den String "This is string 2"
result = t.run()Speichere den Rückgabewert in der Variable "result"
if result == False: print("Success " + str(i)) breakUnd überprüfe den Rückgabewert
Führen wir das nun aus, bekommen wir folgenden Output:
... ... This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 This is string 2 This is string 1 Success 50
Hier können wir schön erkennen, dass das Zurückgeben des Rückgabewertes funktioniert und dementsprechend die Schleife schon nach dem 50sten Durchlauf abgebrochen wurde.
Wo sind all' die Threads?
Bisher hat alles schön funktioniert was wir getestet haben.
Was wir anhand dieser Tests aber nicht sehen konnten, ist, wie viele Threads nun tatsächlich erstellt wurden.
Also lasst uns das ebenfalls noch testen - sicher ist sicher :)
Um herauszufinden, wie viele Threads im aktuellen Prozess so nebenher laufen, können wir das Modul "threading" verwenden.
Lasst es uns zu Testzwecken importieren:
Dieses Modul stellt uns unter anderem die Funktion "active_count()" bereit, welche uns die Anzahl aller laufenden Threads in diesem Prozess zurückgibt. Eingenommen des Hauptthreads und der von ThreadPool verwendeten Threads.
Ändern wir also unseren Testcode so ab, dass uns die Anzahl an Threads ausgegeben wird:
Nun müssen 10 mal 10 Threads erstellt werden, um die 100 Schleifendurchläufe zu beenden. Vorher waren es nur 2 mal 50 für die 100 Schleifendurchläufe.
Und der Output sieht folgendermaßen aus:
Hier können wir erkennen, dass maximal 14 Threads erstellt wurden.1 | import threading |
Dieses Modul stellt uns unter anderem die Funktion "active_count()" bereit, welche uns die Anzahl aller laufenden Threads in diesem Prozess zurückgibt. Eingenommen des Hauptthreads und der von ThreadPool verwendeten Threads.
Ändern wir also unseren Testcode so ab, dass uns die Anzahl an Threads ausgegeben wird:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | MultiThread.begin(10) for i in range(0, 100): t = None if i == 50: t = MultiThread(target=call_me, args=("This is string 1 ", "")) else: t = MultiThread(target=call_me, args=("Active Threads: ", str(threading.active_count()))) result = t.run() if result == False: print("Success " + str(i)) break MultiThread.end() |
t = MultiThread(target=call_me, args=("Active Threads: ", str(threading.active_count())))In dieser Zeile ändern wir einfach nur die beiden Übergabeparameter zu "Active Threads: " und "str(threading.active_count())))"
MultiThread.begin(10)Hier reduzieren wir die Anzahl der Threads von 50 auf 10, um den Unterschied deutlicher sehen zu können.
Nun müssen 10 mal 10 Threads erstellt werden, um die 100 Schleifendurchläufe zu beenden. Vorher waren es nur 2 mal 50 für die 100 Schleifendurchläufe.
Und der Output sieht folgendermaßen aus:
... ... Active Threads: 14 Active Threads: 14 Active Threads: 14 Active Threads: 14 Active Threads: 14 Active Threads: 14 Active Threads: 14 This is string 1 Success 50
Warum 14 und nicht 10?
Nun ja, der Hauptthread ist auch ein Thread und die Funktion "ThreadPool" aus dem Modul "multiprocessing.pool" verwendet intern auch noch 3 Threads.
Also sehen wir, dass nun alles funktioniert.
Zum Abschluss nochmal der ganze Code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import threading from multiprocessing.pool import ThreadPool class MultiThread(): __thread_pool = None @classmethod def begin(cls, max_threads): MultiThread.__thread_pool = ThreadPool(max_threads) @classmethod def end(cls): MultiThread.__thread_pool.close() MultiThread.__thread_pool.join() def __init__(self, target=None, args:tuple=()): self.__target = target self.__args = args def run(self): try: result = MultiThread.__thread_pool.apply_async(self.__target, args=self.__args) return result.get() except: pass def call_me(test1, test2): print(test1 + test2) if test2 == "": return False return True MultiThread.begin(10) for i in range(0, 100): t = None if i == 50: t = MultiThread(target=call_me, args=("This is string 1 ", "")) else: t = MultiThread(target=call_me, args=("Active Threads: ", str(threading.active_count()))) result = t.run() if result == False: print("Success " + str(i)) break MultiThread.end() |
Comments
Post a Comment