3. Előadás - Szálkezelés
Szálak
A segítségükkel ki lehet használni a számítógépben rendelkezésre álló több processzormagot.
Az I/O-ra várakozás is hasznosan tölthető.
Gazdaságosabb, mint új processzt indítani.
A szálak egy közös névtéren osztoznak.
A szálak nem tartanak nyilvántartást arról, hogy milyen szálakat indítottak, azt nekünk kell megoldani.
A szálak egy processzen belül a következőket osztják meg egymással.
Memória címtér
Megnyitott fájlok (fájlleírók)
Szignálok kezelése
Az aktuális munkajegyzéket (cwd, pwd)
Felhasználó név és csoport (jogosultságok)
Minden szálban viszont egyediek az alábbiak:
Szál azonosító (Thread ID)
Regiszterek, verem mutató
Lokális változók, visszatérési címek
Prioritások
Visszatérési hibakódok
Szálbiztonság
Thread safety
Egy implementációt szálbiztosnak nevezünk, hogy ha garantált, hogy helyesen működik abban az esetben is, hogy ha több szál próbálja meg elérni.
Fork
A nevét az elágazásról, mint villáról kapta.
A végrehajtás ezen pontján a programból egy új futási szál jön létre.
Ugyanazon kód fut tovább mindkét szálban.
A szál ezt követően el tudja dönteni magáról, hogy az eredeti vagy az újonnan létrehozott szál-e.
Join
Egy szinkronizációs primitív.
A program végrehajtása ezen a ponton bevárja, hogy egy másik szál végrehajtásra kerüljön.
Barrier
Egy szinkronizációs eszköz. A program ezen a ponton bevárja, hogy az összes szál elvégezze a számításait.
Egymás után kiadott join műveletekkel megoldható abban az esetben is, hogy ha nincsen rá kész elem.
Alapvető műveletek
Szál létrehozása
Szál indítása
Paraméterek átadása
Várakozás a szál végrehajtására
Szálak lekérdezése, státuszok
Aktuális szál adatainak lekérdezése
Mutex-ek, zárolás
Szál futásának leállítása
\(\rhd\) Hogyan tünne kézenfekvőnek ezeket egy programozási nyelv segítségével megoldani?
\(\rhd\) Gondoljuk át, hogy milyen problémák jelentkezhetnek OOP nyelvek esetében! (Például scope-ok kezelése, kivételkezelés.)
POSIX szálak
Portable Operating System Interface for UNIX
https://hu.wikipedia.org/wiki/POSIX
Szál létrehozása és indítása
A szál elindítása a létrehozáskor automatikusan megtörténik.
int pthread_create(
pthread_t* thread,
const pthread_attr_t* attr,
void *(*start_routine)(void *),
void *arg);
Paraméterek:
thread
: A szál azonosítója (unsigned long int
típus)attr
: Szál beállításához szükséges attribútumok. (AlapértelmezéshezNULL
érték.)start_routine
: A függvény, amelyet a szálban el kell indítani.arg
: Paraméterek, amelyek átadásra kerülnek a szálban indított függvénynek.
Visszatérési érték:
Amennyiben a szál sikeresen elindult, úgy 0 értékkel tér vissza, egyébként pedig hibakóddal.
Paraméterek átadása
A szálnak
void*
típusú paraméterként lehet átadni értékeket.Ez praktikusan jelentheti például egy struktúrának az átadását.
Várakozás a szál végrehajtására
int pthread_join(
pthread_t thread,
void **retval);
Paraméterek:
thread
: A szál azonosítója, amelyikre várni kell majd a főszálnak.retval
: A szál által visszaadott érték (pthread_exit
hívásban). Amennyiben nincs rá szükség, úgyNULL
-al jelölhető.
Visszatérési érték:
Sikeres végrehajtás esetén 0, egyébként hibakód.
Szálak lekérdezése
Nincs rá egységes, platformfüggetlen megoldás.
UNIX-szerű rendszerek esetében a
/proc
jegyzék tartalmaz erre vonatkozó információkat.
Aktuális szál adatainak lekérdezése
Az aktuális szál azonosítóját a
#include <unistd.h>
pid_t gettid(void);
#include <pthread.h>
pthread_t pthread_self(void);
függvényekkel lehet lekérdezni.
Mutexek
Mutex: Mutual Exclusion
Inicializálás:
Amennyiben a mutex-et statikus változóként (globális névtérben) szeretnénk használni, akkor a következő makró formájában megadható:
pthread_mutex_t value_mutex = PTHREAD_MUTEX_INITIALIZER;
Amennyiben dinamikusan (például stack-en allokált) változóról van szó, akkor az előbbivel ekvivalens az alábbi hívás:
pthread_mutex_init(&value_mutex, NULL);
Kritikus szakasz kijelölése:
pthread_mutex_lock(&value_mutex);
// Critical section
pthread_mutex_unlock(&value_mutex);
Szál végrehajtásának félbeszakítása
int pthread_cancel(pthread_t thread);
Visszatérési érték:
Sikeres végrehajtás esetén 0, egyébként hibakód.
forrás:
C++11/std::thread
A szálkezelést beemelték a szabványos függvénykönyvtárba.
Platformfüggetlen megoldást igyekszik adni a szálkezelési problémára. (Ahogy anno a POSIX is próbálta.)
#include <thread>
https://thispointer.com/c-11-multithreading-part-1-three-different-ways-to-create-threads/
Szál létrehozása
Egy std::thread
konstruktora a következőkkel inicializálható:
függvény mutató,
függvény objektum,
lambda függvény.
#include <thread>
void worker() { ... }
int main(int argc, char* argv[]) {
std::thread myThread(worker);
myThread.join();
return 0;
}
#include <thread>
class MyThread {
public:
operator()() {
// TODO: Calc here!
}
};
int main(int argc, char* argv[]) {
std::thread myThread(MyThread());
myThread.join();
return 0;
}
#include <thread>
int main(int argc, char* argv[]) {
std::thread myThread([]{
// TODO: Calc here!
});
myThread.join();
return 0;
}
Paraméterek átadása
Az std::thread
konstruktorában átadhatók.
#include <thread>
void worker(int x) {
// TODO: Calc here!
}
int main(int argc, char* argv[]) {
std::thread myThread(worker, 1234);
myThread.join();
return 0;
}
Várakozás a szál végrehajtására
void std::thread::join();
Szál adatainak lekérdezése
Egyedi azonosító lekérdezése:
std::thread::get_id()
Az aktuális szál azonosítójának a lekérdezése:
std::this_thread::get_id()
Zárolás
#include <mutex>
std::mutex myMutex;
// ...
myMutex.lock();
// ...
myMutex.unlock();
Future, Promise
Tudunk kezelni olyan objektumokat, amelyek csak később tesznek elérhetővé bizonyos értékeket.
#include <future>
#include <iostream>
#include <thread>
void worker(std::promise<int>* prom)
{
prom->set_value(1234);
}
int main(int argc, char* argv[])
{
std::promise<int> myPromise;
std::future<int> myFuture = myPromise.get_future();
std::thread myThread(worker, &myPromise);
std::cout << myFuture.get() << std::endl;
myThread.join();
return 0;
}
C++/Qt
Szál létrehozása és indítása
A
QThread
osztályt lehet példányosítani vagy származtatni.A
QThread::run
metódust kell felüldefiniálni.A szálakat külön el kell indítani.
class MyThread : public QThread
{
Q_OBJECT
private:
void run();
};
int main(int argc, char* argv[])
{
MyThread thread;
thread.run();
// ...
thread.wait();
return 0;
}
Paraméterek átadása
Mivel saját osztály definiálható, ezért kézenfekvő módon adódik.
Szálak adatainak lekérdezése
A QThread
osztály metódusain keresztül lehet hozzáférni.
bool isFinished();
bool isRunning();
QThread* currentThread();
Qt::HANDLE currentThreadId();
Zárolás
#include <QMutex>
// ...
QMutex mutex;
mutex.lock()
mutex.tryLock();
mutex.unlock();
A mutex scope-hoz is köthető a QMutexLocker
osztály segítségével.
Java, IRunnable, Thread
https://docs.oracle.com/javase/tutorial/essential/concurrency/runthread.html
Szál létrehozása és elindítása
A Runnable
interfész implementálásával:
public class MyRunnable implements Runnable {
public void run() {
System.out.println("Run!");
}
public static void main(String args[]) {
(new Thread(new MyRunnable())).start();
}
}
A Thread
osztály származtatásával:
public class MyThread extends Thread {
public void run() {
System.out.println("Run!");
}
public static void main(String args[]) {
(new MyThread()).start();
}
}
Paraméterek átadása
Mivel saját osztály definiálható, ezért kézenfekvő módon adódik.
Várakozás a szál végrehajtására
Thread.join();
Thread.join(long milliseconds)
Szálak lekérdezése, státusz
A Thread
osztályon keresztül hozzáférhetők a következő metódusok.
static Thread currentThread();
long getId();
String getName();
int getPriority();
boolean isAlive();
A szálakat a Java csoportokban tartja nyilván. A ThreadGroup
-okon keresztül hozzá lehet férni az összes szálhoz.
Zárolás
Lock lock = new ReentrantLock();
// ...
lock.lock();
lock.tryLock();
lock.unlock();
C#, System.Threading
Szál létrehozása és indítása
using System;
using System.Threading;
namespace MyApplication {
class Sample {
public static void Worker() {
// TODO: Calc here!
}
static void Main(string[] args) {
ThreadStart threadStart = new ThreadStart(Worker);
Thread thread = new Thread(threadStart);
thread.start();
thread.Name = "MainThread";
Console.WriteLine("Name: {0}", thread.Name);
Console.ReadKey();
}
}
}
Várakozás a szál végrehajtására
public void Join();
Szál futásának félbeszakítása
Thread.Abort();
Python
https://docs.python.org/3/library/threading.html
https://realpython.com/intro-to-python-threading/
Szálak létrehozása és indítása
import threading
def worker():
print('Work here!')
if __name__ == '__main__':
thread = threading.Thread(target=worker)
thread.start()
Paraméterek átadása
import threading
def worker(x, y):
print(f'Work here on {x} and {y}!')
if __name__ == '__main__':
thread = threading.Thread(target=worker, args=(12, 34))
thread.start()
Várakozás a szál végrehajtására
Thread.join()
Szálak adatainak lekérdezése
threading.get_ident()
threading.get_native_id()
Zárolás
class Sample:
def __init__(self):
self._lock = threading.Lock()
def work(self):
with self._lock():
pass
A Lock
objektum műveletei:
acquire
: zárásrelease
: feloldáslocked
: lekérdezés
Kérdések
Mi az a POSIX?
Milyen előnyei vannak a szálaknak a folyamatokhoz képest?
Min osztoznak a szálak egy folyamaton belül?
Mi az ami minden szál esetében egyedi?
Mit jelent a szálbiztonság/szálbiztos implementáció?
Milyen esetben van szükség reentrant lock-ra?
Feladatok
POSIX szálak
Készítsünk olyan programot, amelynél a fő szál 8 másodpercnyi számítást végez el, míg az általa indított csak 4-et! Vizsgáljuk meg a fordított esetet is!
Készítsünk egy programot, amelyik elindít 60 szálat. Mindegyik szál végezzen 1 másodpercnyi számítást! Vizsgáljuk meg a teljes program futási idejét!
Készítsünk egy programot, amelyik 10 szál segítségével meghatározza, hogy mennyi prímszám van a [0, 99], [100, 199], … intervallumokon! A szálaknak adja át az intervallumok indexét, majd az eredményt így írják közvetlenül egy globális tömbbe!
Vizsgáljuk meg, hogy mi történik, hogy ha egy szálban futás idejű hiba keletkezik!
Algoritmusok párhuzamosítása
Készítsen egy programot, amely egy tömb elemeinek összegét számítja ki több szál felhasználásával!
Oldja meg, hogy a program működjön tetszőleges méretű tömbökre is!
Írjon egy programot, amely a tömböt közel egyenlő részekre oszt oldja meg az összegzést!
Írjon egy programot, amely rekurzív felosztásra épül!
A programban a szálak száma paraméterként szerepeljen!
Hasonlítsa össze a \(p = 1\) esetet a szekvenciális változattal futási idő tekintetében!
Mérje le a futási időket különböző \(p\) értékek és különböző bemenetméretek mellett!
Adjon becslést a \(T(n, p)\) függvényre (ahol \(n\) a bemeneti tömb mérete, \(p\) pedig a szálak száma).
Adjon becslést a gyorsításra (mint \(S(p)\) függvényre)!
A kapott eredményeket foglalja össze táblázatban, és ábrázolja grafikonon!
Készítsen olyan programokat, amelyek meghatározzák, hogy bizonyos tulajdonságú elemből mennyi található egy sorozatban. Ilyen tulajdonság lehet például, hogy
egész értékek esetén páros vagy páratlan számok-e,
0 értékűek-e,
előjelesek-e,
lebegőpontos számok esetén 1-nél kisebb abszolút értékűek-e, vagy hogy
adott intervallumba esnek-e.
A feladat megoldásánál használjon
nagy méretű, véletlenszerűen generált adatokat,
dinamikusan allokált tömböket.
Hasonlítsa össze a szekvenciális és a többszálú megvalósítást!
Mérje le a futási időket a bemenet méretének és a szálak számának függvényében!
Összesítse a kapott eredményeket táblázatos formában!
Ábrázolja az eredményeket grafikonok!
Készítsen programot egy tömb prefixeinek a kiszámításához!
Adja meg a szekvenciális változatot!
Implementálja a
CREW_PREFIX
algoritmust!Implementálja az
EREW_PREFIX
algoritmust!Implementálja az
OPTIMAL_PREFIX
algoritmust!Mérje le a futási időket különböző számú szál és különböző bemenetméretek mellett!
Adjon becslést a gyorsításra (mint \(S(p)\) függvényre)!
A kapott eredményeket foglalja össze táblázatban, és ábrázolja grafikonon!