[Python] Easy multithreading


Heute möchte ich euch mal einen Weg zeigen, wie es relativ einfach möglich ist, Multithreading in Python zu realisieren.

Wie immer, hat Python da schon nützliche Module parat, die nur darauf warten genutzt zu werden. :)



Für diejenigen, die nicht genau wissen was ein Thread bzw. Multithreading ist:



Crashkurs Multithreading



Wird ein Programm gestartet, wird dementsprechend ein "Prozess" gestartet, welcher vom "Prozess-Scheduler" des Betriebssystems verwaltet wird und die Ablaufumgebung des Programms als Binärcode darstellt.
Der Prozess-Scheduler wechselt zwischen allen aktiven Prozessen (also allen laufenden Programmen) hin und her, wodurch der Eindruck entsteht, dass mehrere Programme gleichzeitig laufen. Dem ist jedoch nicht so, denn zu jedem Zeitpunkt, wird immer nur ein Prozess verarbeitet.

Bei einem Programm wird i.d.R. immer nur ein Befehl nach dem anderen ausgeführt, das heißt, dass der nächste Befehl solange bis zu seiner Verarbeitung warten muss , bis der aktuelle Befehl verarbeitet wurde.

Ein "Thread" jedoch, ist eine nebenläufige Ausführungseinheit innerhalb eines Prozesses. Jeder Prozess hat mindestens einen Thread der den Programmcode ausführt und kann aber bis zu hundert oder mehr Threads auf einmal haben.

Threads ermöglichen es also, gleichzeitig mehrere Befehle auszuführen. Dies funktioniert so, dass ein Thread erstellt wird und ihm dann eine Aufgabe zugewiesen wird, welche dann asynchron neben dem Hauptthread verarbeitet wird.



Bitte mit Klasse


Um alles organisiert zu halten, schreiben wir uns dafür am besten eine Klasse.

Lasst uns also mal überlegen, was diese können soll:

  • Threads erstellen
  • Aufgaben den Threads zuweisen
  • Threads ausführen
  • Rückgabewerte der Aufgaben entgegennehmen
  • Maximale Anzahl an Threads überwachen bzw. begrenzen
  • Threads schließen

Threads erstellen

Threads werden in einem Pool an Threads erstellt, welcher sicherstellt, dass die vorher definierte Anzahl an Threads nicht überschritten wird. Darüber stellen wir sicher, dass nicht blind versucht wird, mehrere hundert Threads zu erstellen, was definitiv zum Absturz des Programms führen würde.


Aufgaben den Threads zuweisen

Hier müssen wir uns schon ein bisschen mehr Gedanken machen wie wir das am besten realisieren. Eine Aufgabe wird über eine Funktion definiert, welche wir dann an den Konstruktor der Klasse übergeben werden. Da wir es ebenfalls ermöglichen möchten, dass eine solche Funktion nicht an eine bestimmte Anzahl an Parametern begrenzt ist, werden wir zusätzlich an den Konstruktor ein Tupel mit Argumenten übergeben. Dieser Tupel wiederum, wird dann beim Aufruf der Aufgabenfunktion als Argument übergeben. So ist sichergestellt, dass jede beliebige Funktion mit beliebig vielen Parametern als Aufgabenfunktion genutzt werden kann.


Threads ausführen

Wie oben schon erwähnt, wird die dem Konstruktor übergebene Funktion mit dem ebenfalls übergebenen Argumententupel aufgerufen und an einen Thread übergeben.


Rückgabewerte der Aufgaben entgegen nehmen

Wenn in der Aufgabenfunktion ein Rückgabewert definiert wurde, soll dieser natürlich von der Klasse entgegen genommen bzw. weitergereicht werden.


Maximale Anzahl an Threads überwachen bzw. begrenzen

Darüber wurde eigentlich schon alles in "Threads erstellen" gesagt.


Threads schließen

Dies geht mit der Begrenzung der Anzahl an Threads einher. Ist ein Thread fertig mit seiner Aufgabe, wird er geschlossen um so Platz für einen neuen Thread und seine Aufgabe zu machen.
Wurden alle Aufgaben abgearbeitet, kann der Pool geschlossen werden.





Die Module


Wie ich bereits erwähnt hatte, bietet Python schon Module, die uns den Umgang mit Threads vereinfachen, binden wir es also ein:


1
from multiprocessing.pool import ThreadPool

Dieses Modul stellt uns "ThreadPool" zur Verfügung.
Das ist auch schon alles was wir an Modulen benötigen.





Das Grundgerüst



Erstellen wir uns also ein Grundgerüst der Klasse:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class MultiThread():

    @classmethod
    def begin(cls):
        pass
    
    @classmethod
    def end(cls):
        pass
    
    def __init__(self):
        pass
    
    def run(self):
        pass


Nicht grad viel, aber mehr brauchen wir auch nicht :)


Gehen wir das ganze mal durch:

@classmethod
def begin(cls):
    pass
Diese Methode wird den Threadpool initialisieren und die maximale Anzahl an Threads definieren.
Da diese Informationen nicht an einen einzelnen Thread gebunden sind, machen wir eine Klassenmethode daraus

@classmethod
def end(cls):
    pass
Hier werden wir den ThreadPool schließen und auf den Abschluss aller Threads warten.
Wie bei der Methode "begin", ist diese Funktionalität ebenfalls nicht an einen Thread gebunden, daher wird auch diese Methode eine Klassenmethode

def __init__(self):
    pass
Der Konstruktor. Diesem werden wir einen Handle der Aufgabenfunktion und die Argumente als Tupel übergeben.

def run(self):
    pass
Und schließlich die Funktion, welche den Thread startet und ihm die Aufgabenfunktion und die dazugehörigen Argumente übergibt. Zugleich wird diese die Rückgabewerte entgegennehmen und diese ebenfalls zurückgeben.

Zusätzlich brauchen wir noch eine Klassenvariable, welche den Threadpool speichert:

1
__thread_pool = None
Diese ist mit den 2 Unterstrichen als "private" markiert, damit der Zugriff außerhalb der Klasse nicht möglich ist




Nun können wir uns an das Ausprogrammieren der einzelnen Methoden machen:



Die begin Methode

In der Methode "begin" erstellen wir einen Threadpool, in dem alle aktiven Threads hinterlegt werden.

1
2
3
@classmethod
def begin(cls, max_threads):
    MultiThread.__thread_pool = ThreadPool(max_threads)
Der Methode "begin" geben wir einen Parameter "max_threads" um beim Aufruf die maximale Anzahl an Threads festlegen zu können.

MultiThread.__thread_pool = ThreadPool(max_threads)
Hier rufen wir die Klassenvariable "__thread_pool" auf, und übergeben ihr den mit "ThreadPool" erstellten Threadpool. Gleichzeitig übergeben wir dieser Funktion die Anzahl der maximal möglichen Threads.


Die end Methode

In der Methode "end" sorgen wir dafür, dass nach Vergabe aller Aufgaben, der Threadpool geschlossen wird und auf das erledigen aller Aufgaben gewartet wird.

1
2
3
4
@classmethod
def end(cls):
    MultiThread.__thread_pool.close()
    MultiThread.__thread_pool.join()
Die "close" Methode schließt den Threadpool und die Methode "join" wartet darauf, dass alle Threads ihre Aufgabe erledigt haben



Der Konstruktor

In der "__init__" Methode, oder auch oft Konstruktor genannt, wird ein Objekt unserer Klasse "Multithread" erstellt.

1
2
3
def __init__(self, target=None, args:tuple=()):
    self.__target = target
    self.__args = args
Diesem Objekt geben wir zwei private Instanzvariablen: "__target" und "__args". Zusätzlich geben wir dieser Methode zwei Parameter: "target" und "args". Somit können wir beim erstellen eines Objektes gleich die Aufgabenfunktion als "target" und die dazugehörigen Argumente als "args" übergeben.



Die run Methode

Die "run" Methode erstellt einen neuen Thread, übergibt diesem die im Konstruktor übergebene Aufgabenfunktion und die dazugehörigen Argumente. Anschließend gibt sie den Rückgabewert der Aufgabenfunktion zurück.

1
2
3
4
5
6
def run(self):
    try:
        result = MultiThread.__thread_pool.apply_async(self.__target, args=self.__args)
        return result.get()
    except:
        pass
Hier wird auf dem in der "begin" Methode definierten Threadpool "__thread_pool" die Methode "apply_async" Aufgerufen. Diese erstellt einen neuen Thread mit der dem Konstruktor übergebenen Aufgabenfunktion und der übergebenen Argumente. Der Rückgabewert der Aufgabenfunktion wird in "result" gespeichert und anschließend über den Befehl "return result.get()" zurückgegeben.


Der komplette Code sieht dann wie folgt aus:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing.pool import ThreadPool


class MultiThread():
    __thread_pool = None

    @classmethod
    def begin(cls, max_threads):
        MultiThread.__thread_pool = ThreadPool(max_threads)

    @classmethod
    def end(cls):
        MultiThread.__thread_pool.close()
        MultiThread.__thread_pool.join()

    def __init__(self, target=None, args:tuple=()):
        self.__target = target
        self.__args = args

    def run(self):
        try:
            result = MultiThread.__thread_pool.apply_async(self.__target, args=self.__args)
            return result.get()
        except:
            pass


Damit wären wir auch schon fertig mit unserer Klasse.




Der Test


Um sicherzustellen, dass auch alles funktioniert, schreiben wir uns einen kleinen Test.


Zuerst brauchen wir also eine Aufgabenfunktion, welche als Thread aufgerufen werden kann:

1
2
3
4
def call_me(test1, test2):
    print(test1 + test2)
    if test2 == "": return False
    return True
Diese Funktion macht nicht viel, sie gibt die beiden übergebenen Strings "test1" und "test2" aus. Damit können wir überprüfen, ob die Funktion abgearbeitet wurde. Zudem überprüfen wir, ober der übergebene String "test2" leer ist. Abhängig davon, geben wir entweder "True" oder "False" zurück.


Nun können wir einen kleinen Code schreiben, der die Funktion als Thread mithilfe unserer Klasse "Multithread" erstellt .


1
2
3
4
5
MultiThread.begin(50)
for i in range(0, 100):
    t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))
    t.run()
MultiThread.end()


Gehen wir das mal durch:

MultiThread.begin(50)
Hier starten wir das Multithreading und legen fest, dass maximal 50 Threads auf einmal laufen können.

for i in range(0, 100):
Eine "for" Schleife welche 100 mal durchlaufen werden soll

t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))
Hier erstellen wir ein neues Objekt unserer Klasse "MultiThread" und übergeben dem intern aufgerufenen Konstruktor (__init__) unsere Testfunktion "call_me" mit 2 Strings als Argumente.

t.run()
Anschließend geben wir den Startschuss den Thread zu erstellen und laufen zu lassen.

MultiThread.end()
Wurden alle 100 Threads erstellt, kann der Pool geschlossen werden und auf das beenden aller Threads gewartet werden.


Der Output sieh wie folgt aus:

...
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
...


Sieht so aus, als wenn der Aufruf der Funktion klappt, lasst uns nun das Zurückgeben des Rückgabewertes testen.

Dazu ändern wir den Aufruf der "run" Methode ein klein wenig ab:

1
2
3
4
5
6
7
8
MultiThread.begin(50)
for i in range(0, 99):
    t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))
    result = t.run()
    if result == False
        print("Success " + str(i))
        break
MultiThread.end()

result = t.run()
Hier speichern wir den Rückgabewert der Methode "run" in der Variable "result"


if result == False
Und überprüfen ihren Wert

print("Success " + str(i))
break
Ist dieser Wert "False", dann geben wir "Success" und den Wert der Iteratorvariable "i" aus, um überprüfen zu können, in welcher Iterationstiefe die Schleife beendet wurde. Anschließend beenden wir die Schleife sofort über den Befehl "break".


Würden wir diesen Code jetzt ausführen, würde die if-Bedingung gleich beim ersten Schleifendurchlauf erfüllt sein, denn unsere "call_me" Funktion gibt dann "True" zurück, wenn der zweite Parameter kein leerer String ist. Und momentan übergeben wir als zweiten Parameter "This is string 2.

Also lasst uns das durch eine zweite Bedingung ändern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
MultiThread.begin(50)
for i in range(0, 100):
    t = None
    if i == 50:
        t = MultiThread(target=call_me, args=("This is string 1 ", ""))
    else:
        t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))

    result = t.run()

    if result == False:
        print("Success " + str(i))
        break
MultiThread.end()


t = None
Da "t" nun in einem "if"-Block definiert wird, müssen wir "t" auch außerhalb dieses Blocks bekannt machen

if i == 50:
Hier überprüfen wir, ob der 50ste Schleifendurchlauf erreicht wurde

t = MultiThread(target=call_me, args=("This is string 1 ", ""))
Wenn ja, dann übergebe als zweiten Parameter einen leeren String

else:
    t = MultiThread(target=call_me, args=("This is string 1 ", "This is string 2"))
Andernfalls übergebe wie gehabt den String "This is string 2"


result = t.run()
Speichere den Rückgabewert in der Variable "result"


if result == False:
    print("Success " + str(i))
    break
Und überprüfe den Rückgabewert


Führen wir das nun aus, bekommen wir folgenden Output:

...
...
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 This is string 2
This is string 1 
Success 50

Hier können wir schön erkennen, dass das Zurückgeben des Rückgabewertes funktioniert und dementsprechend die Schleife schon nach dem 50sten Durchlauf abgebrochen wurde.




Wo sind all' die Threads?



Bisher hat alles schön funktioniert was wir getestet haben.
Was wir anhand dieser Tests aber nicht sehen konnten, ist, wie viele Threads nun tatsächlich erstellt wurden.

Also lasst uns das ebenfalls noch testen - sicher ist sicher :)

Um herauszufinden, wie viele Threads im aktuellen Prozess so nebenher laufen, können wir das Modul "threading" verwenden.

Lasst es uns zu Testzwecken importieren:

1
import threading

Dieses Modul stellt uns unter anderem die Funktion "active_count()" bereit, welche uns die Anzahl aller laufenden Threads in diesem Prozess zurückgibt. Eingenommen des Hauptthreads und der von ThreadPool verwendeten Threads.

Ändern wir also unseren Testcode so ab, dass uns die Anzahl an Threads ausgegeben wird:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
MultiThread.begin(10)
for i in range(0, 100):
    t = None
    if i == 50:
        t = MultiThread(target=call_me, args=("This is string 1 ", ""))
    else:
        t = MultiThread(target=call_me, args=("Active Threads: ", str(threading.active_count())))

    result = t.run()

    if result == False:
        print("Success " + str(i))
        break
MultiThread.end()


t = MultiThread(target=call_me, args=("Active Threads: ", str(threading.active_count())))
In dieser Zeile ändern wir einfach nur die beiden Übergabeparameter zu "Active Threads: " und "str(threading.active_count())))"

MultiThread.begin(10)
Hier reduzieren wir die Anzahl der Threads von 50 auf 10, um den Unterschied deutlicher sehen zu können.
Nun müssen 10 mal 10 Threads erstellt werden, um die 100 Schleifendurchläufe zu beenden. Vorher waren es nur 2 mal 50 für die 100 Schleifendurchläufe.


Und der Output sieht folgendermaßen aus:

...
...
Active Threads: 14
Active Threads: 14
Active Threads: 14
Active Threads: 14
Active Threads: 14
Active Threads: 14
Active Threads: 14
This is string 1 
Success 50
Hier können wir erkennen, dass maximal 14 Threads erstellt wurden.
Warum 14 und nicht 10?
Nun ja, der Hauptthread ist auch ein Thread und die Funktion "ThreadPool" aus dem Modul "multiprocessing.pool" verwendet intern auch noch 3 Threads.


Also sehen wir, dass nun alles funktioniert.


Zum Abschluss nochmal der ganze Code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import threading
from multiprocessing.pool import ThreadPool


class MultiThread():
    __thread_pool = None

    @classmethod
    def begin(cls, max_threads):
        MultiThread.__thread_pool = ThreadPool(max_threads)

    @classmethod
    def end(cls):
        MultiThread.__thread_pool.close()
        MultiThread.__thread_pool.join()

    def __init__(self, target=None, args:tuple=()):
        self.__target = target
        self.__args = args

    def run(self):
        try:
            result = MultiThread.__thread_pool.apply_async(self.__target, args=self.__args)
            return result.get()
        except:
            pass


def call_me(test1, test2):
    print(test1 + test2)
    if test2 == "": return False
    return True


MultiThread.begin(10)
for i in range(0, 100):
    t = None
    if i == 50:
        t = MultiThread(target=call_me, args=("This is string 1 ", ""))
    else:
        t = MultiThread(target=call_me, args=("Active Threads: ", str(threading.active_count())))

    result = t.run()

    if result == False:
        print("Success " + str(i))
        break
MultiThread.end()


Comments

Popular posts from this blog

[Python] Machine Learning Intro #1 - Hello World

[Python] Passwort cracker for Zip-archives

WebGL 2 Guide - #1 Einführung