Programmierung von Parallelrechnern mit gemeinsamem und
Transcrição
Programmierung von Parallelrechnern mit gemeinsamem und
Programmierung von Parallelrechnern mit gemeinsamem und verteiltem Speicher Programmierpraktikum WS 1995/96 Heinz Kredel, Akitoshi Yoshida Rechenzentrum Universitat Mannheim RUM 46/96 Vorwort Das folgende Skript ist aus einer Ausarbeitung von Teilen eines Programmierpraktikums an dem KSR1 Parallelrechner in Mannheim im Wintersemester 1993/94 entstanden. Er umfat hauptsachlich die Pthread und PVM Bibliotheken und Konzepte fur die Programmierung von Parallelrechnern. In dieser Fassung vom Wintersemester 1995/95 wurden diverse Fehler verbessert und die meisten Programmbeispiele werden nun in C++ angegeben. Zu dem FORTRAN, KAP und Optimierungsteil des Praktikums gibt es ein Script vom LRZ Munchen. Fur die Benutzung der Visualisierungstools und der Compiler mussen wir auf die entsprechenden man pages und auf das info System verweisen. Im Anhang benden sich kurze Hinweise zu der Benutzung der Compiler und Auistungen der wichtigsten Header Dateien, Denition Moduln und der verfugbaren Pthread und PVM Funktionen, sowie ein Index. Dieser Text bendet sich auch auf dem FTP Server des RUM ftp.uni-mannheim.de in dem Verzeichniss pub/info/rumdoc in der Datei rum4696.ps.Z. Die Beispielprogramme benden sich auf der KSR1 (suparum) in dem Verzeichniss /home/sw/ppkurs. Verbesserungsvorschlage und Fehlerhinweise nehmen wir dankbar entgegen. Wir sind unter der e-Mail Adresse [email protected] zuerreichen. Die erste Version des Skripts wurde von Frau Schnell erstellt. Diese Fassung des Skriptes wurde von Christoph Kuhmunch uberarbeitet, der auch mehrere Modula-2 Beispiele nach C++ portiert hat. Mannheim, 24. Mai 1996 Heinz Kredel 1 Inhaltsverzeichnis 1 Einleitung 5 2 Shared Memory Computer 7 2.1 Nebenlauge Ausfuhrung : : : : : : : : : : 2.2 Atomare Bestandteile und Synchronisation 2.2.1 Annahmen zum Prozessormodell : : 2.3 Explizite Synchronisation : : : : : : : : : : 2.3.1 Interferenz : : : : : : : : : : : : : : 2.3.2 Programmeigenschaften : : : : : : : 3 POSIX threads : : : : : : : : : : : : 3.1 Threads und Prozesse : : : : : : : : : : : : : : 3.1.1 Allgemeines Benutzungsschema : : : : : 3.2 U berblick uber die Pthreads : : : : : : : : : : : 3.2.1 Header Dateien und Denition Moduln 3.3 Erzeugung und Beendigung von Pthreads : : : 3.3.1 Losung in C++ : : : : : : : : : : : : : : 3.3.2 Losung in Modula-2 : : : : : : : : : : : 3.3.3 Losung in FORTRAN : : : : : : : : : : 3.4 Beginn und Ende von Mutual exclusion : : : : 3.5 Bedingungsvariablen und Mutual exclusion : : 3.6 Semaphore : : : : : : : : : : : : : : : : : : : : 3.6.1 Semaphoren Implementierung : : : : : : 3.6.2 Barriers mit Semaphoren : : : : : : : : 3.6.3 await mit Semaphoren : : : : : : : : : 3.7 Das Bounded Buer Problem : : : : : : : : : : 4 Distributed Memory Computer : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 7 : 8 : 9 : 10 : 11 : 11 : : : : : : : : : : : : : : : 14 14 15 15 16 16 18 19 21 23 27 28 30 32 33 33 39 4.1 Kanale, Senden und Empfangen : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 39 4.1.1 Programmeigenschaften : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 40 4.2 Nachrichtenaustausch bei gemeinsamem Speicher : : : : : : : : : : : : : : : : : : : : : : : 41 2 5 PVM Parallel Virtual Machine 5.1 5.2 5.3 5.4 5.5 5.6 U berblick uber PVM und die Message Passing Routinen Denition und Erzeugung von Kanalen : : : : : : : : : : Packen und Versenden : : : : : : : : : : : : : : : : : : : Empfangen und Entpacken : : : : : : : : : : : : : : : : Produzenten Konsumenten Beispiel : : : : : : : : : : : : Verteilte Semaphore : : : : : : : : : : : : : : : : : : : : 5.6.1 Logische Uhren : : : : : : : : : : : : : : : : : : : 5.6.2 User und Helper Pthread : : : : : : : : : : : : : A Benutzung der Compiler A.1 A.2 A.3 A.4 A.5 A.6 A.7 C:: ::: : C++ : : : : Modula-2 : FORTRAN KAP : : : : PVM : : : : Makeles : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : B Header Dateien und Denition Moduln B.1 B.2 B.3 B.4 B.5 B.6 B.7 Thread Denition Modul Pthread Denition Modul PVM C Header Datei : : PVM Denition Modul : : Header-Datei ERROR.h : Header-Datei TYPES.h : Header-Datei sema.h : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 50 50 52 54 55 55 58 59 59 67 67 67 68 68 68 69 69 71 71 72 76 82 87 88 88 C Verzeichnis der POSIX thread Funktionen 90 Index 96 C.1 POSIX threads : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 90 C.2 Mach threads : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 95 3 Tabellenverzeichnis 2.1 Matrix Multiplikation : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 8 3.1 Prozesse und Threads : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 14 3.2 Ringpuer : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 34 5.1 Funktionsschema von PVM : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 51 5.2 Request Message Queue : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : : 59 4 Kapitel 1 Einleitung Die Hardware Entwicklung der letzten Jahre und Jahrzehnte hat die Unterstutzung von Timesharing Betriebssystemen, Vektorrechner, Multiprozessor Rechner und Netzwerke von Workstations und PCs gebracht. Eine Konsequenz dieser Entwicklung war es, da nun verschiedene (Berechnungs-) Aufgaben gleichzeitig bzw. nebeneinander bearbeitet werden konnen. Gleichzeitig wurde Software entwickelt, um die gleichzeitige Bearbeitung auszudrucken: Erzeugen und Anstoen von Aufgaben Koordinierung der Ressourcen Synchronisierung des Zugris auf Ressourcen Wir wollen in diesem Kurs eine kompakte Notation (Sprachkonstrukte) zur parallelen Programmierung einfuhren, zusammen mit einer Diskussion der wichtigsten damit verbundenen Probleme (atomare Aktionen, warten auf Bedingungen, Interferenz, Deadlock, Scheduling). Danach besprechen wir die Umsetzung der Sprachkonstrukte mit Pthreads. Im zweiten Teil werden wir eine kompakte Notation zum Nachrichtenaustausch einfuhren und danach die Umsetzung der Sprachkonstrukte mit PVM (Parallel Virtual Machine) besprechen. Voraussetzungen In den Programm Beipielen setzen wir Kenntnisse in einer Programmiersprache (C, C++, Modula-2, FORTRAN) voraus. Literatur Als weiterfuhrende und erganzende Literatur sind die folgenden Bucher zu empfehlen. 1. G.R. Andrews: Concurrent programming: Principles and Practice, Benjamin / Cummings, USA. 2. M. Ben-Ari, Grundlagen der Parallel-Programmierung, Hanser, 1984. engl. Principles of Concurrent Programming, Prentice / Hall Int., London, 1982. 3. Th. Braunl: Parallele Programmierung, Vieweg, 1993. 5 4. M. Brehm, (ed.), E. Hahn, H-D. Steinhofer, Ch. Schaller, R. Schumacher, Nutzung des Parallelrechners SNI-KSR1, LRZ Munchen, 1993. Erhaltlich von ftp.lrz-muenchen.de in pub/comp/platforms/ksr, Datei ksr-umdruck. 5. M. Brehm, W. Kratzer, Ch. Schaller, Optimierung und Parallelisierung auf dem Parallelrechner SNI-KSR1, LRZ Munchen, 1993. Erhaltlich von ftp.lrz-muenchen.de in pub/comp/platforms/ksr, Datei par_guide.ps.Z oder http://suparum.rz.unimannheim.de/manuals/docs.html 6. Geist et. al.: PVM 3.3 Usersguide, Oak Ridge, USA. 7. Brinch Hansen: Konstruktion von Mehrproze-programmen, Oldenburg, 1981. engl. The Architecture of Concurent Programms, 8. Kendall Square Research: KSR1 C and FORTRAN Programming Manuals, Waltham, USA. 9. A. Schill: DCE Das OSF Distributed Computing Environment, Einfuhrung und Grundlagen, Springer, Heidelberg, 1993. 10. A.S. Tanenbaum: Modern Operating Systems, Prentice-Hall, USA. 11. Zobel, Hogenkamp: Konzepte der parallelen Programmierung, Teubner Stuttgart, 1988. 6 Kapitel 2 Shared Memory Computer Sequentielle Programme werden in Modula-2, C oder C++ notiert. 2.1 Nebenlauge Ausfuhrung Parallele oder nebenlauge (concurrent) Ausfuhrung von Programm-Satements S1 bis Sn bzw. der Statement-Sequenz S(i) wird durch con S ; : : :; Sn end 1 bzw. con i := 1 to n do S i end ( ) notiert. Dabei werden die Bestandteile der Si bzw. S(i) uberlappend ausgefuhrt. Das con Statement terminiert, wenn alle Si bzw. S(i) Statements terminiert sind. Im folgenden Beispiel werden die Variablen x und y zuerst initialisiert und dann nebenlaug um 1 incrementiert, bevor ihre Summe an z zugewiesen wird. x := 0; y := 0; con x := x + 1; y := y + 1 end; z := x + y; Variablen, die auerhalb von con deklariert werden, sind global, Variablen, die innerhalb von con deklariert werden, sind lokal zu dem jeweiligen sequentiellen Programmteil (auch wenn sie den gleichen Namen haben). Eine dritte Art von Variablen sind sogenante private Variablen, die global sind zu den parallelen Programmteilen, die aber in jedem parallelen Programmteilen verschieden sind. Beispiel: Matrix Multiplikation Es seien A; B; C jeweils n n Matrizen,Pgesucht ist das Produkt C := A B. Die Matrix C = fcij g berechnet sich nach der Vorschrift cij = nk=1 aik bkj i; j = 1; : : :; n. Stellt man die Matrixelemente xij durch die Arrayelemente x[i; j] dar, so ergibt sich der Algorithmus in Tabelle 2.1. 7 con i := 1 to n do con j := 1 to n do VAR s: REAL; s := 0:0. for k := 1 to n do s := s + a[i; k] b[k; j] end. c[i; j] := s end. end Tabelle 2.1: Matrix Multiplikation 2.2 Atomare Bestandteile und Synchronisation Um unerwunschte U berlappungen zu vermeiden, ist es erforderlich, Programmteile gegenseitig auszuschlieen (mutual exclusion) und gegebenenfalls die Abarbeitungsschritte aufeinander abzustimmen (condition synchronization). Zum Beispiel kann in dem Programmstuck y := 0; z := 0; con x := y + z; begin y := 1; z := 2 end end der Wert von x nach der Ausfuhrung 0, 1, 2 oder 3 sein ! Die Grund dafur liegt darin, da eine Zuweisung (in der Regel) als eine Folge von `load register' und `store register' Maschinenoperationen implementiert ist. x:=y+z; y:=1; z:=2; konnte z.B. durch die folgenden Maschineninstruktionen implementiert sein. load load add store y z y, z x load imediate 1 store y load imediate 2 store z Der Wert in der Variablen z hangt somit von der zeitlichen Versetzung der beiden Instruktionssequenzen ab. Dieses Beispiel fuhrt uns zu der Denition von elementaren oder atomaren Programmteilen. Denition: Ein Programmteil heit Atomare Aktion falls kein Zwischenschritt, kein Zwischenergebnis und kein Zwischenzustand in einem anderen Proze sichtbar ist. Notation: atomic S ; : : :; Sn end; Bezeichnung: Atomarer Bereich: Bereich innerhalb einer atomaren Aktion. Kritischer Bereich: auch fur 1 atomarer Bereich. Beispiel: Programmlauf auf der KSR1 Programmlauf in einem Interpreter von obigem Beispiel mit und ohne atomic. In den Beispielen werden jeweils die Variablen x und y um 1 bzw. um 2 incrementiert und die Summe x+y wird der Variablen z zugewiesen. 8 VAR x, y, z, i: ANY. BEGIN x:=0; y:=0; z:=0 END. i:=0. WHILE i < 50 DO i:=i+1; CON z:=x+y; x:=x+1; y:=y+2 END; CLOUT("x="); IWRITE(x); CLOUT(", y="); IWRITE(y); CLOUT(", z="); IWRITE(z); BLINES(0); END. Output: x=1, y=2, z=3 x=2, y=4, z=3 x=3, y=6, z=8 x=4, y=8, z=9 x=5, y=10, z=12 x=6, y=12, z=15 x=7, y=14, z=18 ... Ohne atomic treten alle moglichen Werte auf. In der ersten Zeile zeigt z=3, da sowohl x als auch y incrementiert wurden bevor die Summe x+y gebildet wurde. In der zweiten Zeile zeigt z=3, da die Summe x+y gebildet wurde bevor x oder y incrementiert wurden. In der dritten Zeile zeigt z=8, da zuerst y incrementiert wurde, dann die Summe x+y gebildet wurde und danach erst x incrementiert wurde. i:=0. WHILE i < 50 DO i:=i+1; BEGIN x:=0; y:=0; z:=0 END; CON ATOMIC z:=x+y END; ATOMIC x:=x+1; y:=y+2 END END; CLOUT("x="); IWRITE(x); CLOUT(", y="); IWRITE(y); CLOUT(", z="); IWRITE(z); BLINES(0); END. Output: x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, x=1, y=2, ... z=0 z=0 z=0 z=0 z=0 z=3 z=0 z=0 z=3 z=0 Mit atomic treten nur noch zwei mogliche Werte auf. In der ersten bis funften Zeile zeigt z=0, da die Summe x+y gebildet wurde bevor x als auch y incrementiert wurden. In der sechsten Zeile zeigt z=3, da die Summe x+y gebildet wurde nachdem x und y incrementiert wurden. 2.2.1 Annahmen zum Prozessormodell In den meisten Registerbasierten CPU's sind die elementaren Maschinenoperationen oft atomar. Genauer werden wir folgende Annahmen machen. 1. Die Werte der Grunddatentypen (INTEGER, REAL, ...) werden als atomare Programmteile geladen bzw. geschrieben. 9 2. Werte werden in Register geladen, dann dort bearbeitet und schlielich wieder gespeichert. 3. Jeder Proze hat seinen eigenen Registersatz. (realisiert durch eigene Prozessoren, verschiedene Registersatze oder Kontextswitch). 4. Zwischenergebnisse werden im lokalen Speicher gehalten (eigene Register oder eigener Stack) Diese Annahmen sind oft bei CPU's mit parallelen Ausfuhrungseinheiten fur Berechnungen und Adressdekodierungen nicht erfullt, aber es ist sichergestellt, da das die obigen Annahmen aus Programmsicht erfullt sind. Zum Beispiel auf der KSR1 gilt folgendes: das Laden von Werten in ein Register setzt sich z.B. zusammen aus einem Adreberechnungsschritt und dem eigentlichen Ladeschritt. Diese Operationen werden durch Pipelining uberlappt und es ist nur sichergestellt, da Werte, wenn sie benotigt werden, sich am richtigen Ort benden. Allerdings ist der globale virtuelle Speicher atomar, das heit, nur ein Prozessor kann auf eine Speicherstelle schreiben, erst danach konnen beliebig viele Prozessoren wieder diese Speicherstelle lesen. Fur die Programmierung von Parallelrechnern ergeben sich somit folgende Konsequenzen: 1. Keine Annahmen uber den zeitlichen Ablauf der Operationen machen! Alle Abhangigkeiten explizit abwarten. 2. Falls in einem Programmteil keine globalen Variablen gelesen oder geschrieben werden, ist der Programmteil atomar. 3. Falls bei der Auswertung eines Ausdrucks eine einzige globale Variable nur einmal gelesen oder geschrieben wird, ist die Auswertung atomar. 4. Die Zuweisung zu einer lokalen Variablen x := expr ist atomar, falls die Auswertung von expr atomar ist. 5. Die Zuweisung zu einer globalen Variablen x := expr ist atomar, falls die Auswertung von expr nur lokale Variablen benotigt oder die enthaltenen globalen Variablen nicht von einem anderen Proze beschrieben werden. Zum Beispiel sind die Auswertungen in der Matrix Multiplikation atomar, da die Elemente der Matrizen A und B nur gelesen werden, die Variable s jeweils lokal deklariert ist und die Elemente der Matrix C nur einmal geschrieben werden. 2.3 Explizite Synchronisation Falls explizite Synchronisation erforderlich ist, kann das await Statement verwendet werden. Notation: await B [then S ; : : :; Sn] end 1 Das await Statement ist per Denition ein atomarer Programmteil, das heit, wir konnten ausfuhrlich schreiben atomic await B [then S ; : : :; Sn] end end. 1 10 Der Boolesche Ausdruck B bezeichnet die (Verzogerungs-)bedingung, die erfullt sein mu, bevor die Statements S1 ; : : :; Sn ausgefuhrt werden. Die Ausfuhrung von await mu als atomar verlangt werden, damit in den Variablen, von denen B abhangt, keine A nderungen wahrend der Ausfuhrung der Si sichtbar sind. Insbesondere ist B wahrend der gesammten Ausfuhrung der Si wahr. Das await Statement ist sehr machtig, da B ein beliebiger Ausdruck sein kann, der von dem Zustand vieler Prozesse abhangen kann. Das heit await ist nur sehr aufwendig implementierbar und in der vollen Allgemeinheit nicht ezient. (Eine Implementierung wurde Unterstutzung des Betriebssystems und des Compilers erfordern.) Beispiel: await s > 0 then s := s ; 1 end atomic if B then s := s + 1 end end 2.3.1 Interferenz Falls Programmteile nicht atomar sind, konnen die Programmzustande in anderen Prozessen den Zustand in einem Proze beeinussen. Dieses Verhalten nennt man Interferenz. Das Ziel ist es diese Beeinuung zu vermeiden oder die wechselseitigen Beeinussungen zu synchronisieren. Moglichkeiten zur Vermeidung von Interferenz: Wahl von disjunkten Variablen () lokale und private Variablen). Modikation von globalen Variablen nur in atomaren Programmabschnitten. z.B. atomic sglobal := sglobal + slocal end. Datenduplikation Explizite Synchronisation mit await Statement. z.B. falls zur Ausfuhrung von S die Bedingung B erfullt sein mu: await B then S end. Einfuhrung von Hilfsvariablen, um die Reihenfolge der Zustande zu charakterisieren. (Hilfsvariablen nur mit Zuweisung.) 2.3.2 Programmeigenschaften Um uber die Korrektheit von Programmen zu diskutieren, benutzt man die Begrie \Sicherheit" und \Lebendigkeit". Ein Programm ist Sicher, falls keine schlechten (fehlerhaften) Zustande eintreten konnen. Ein Programm ist Lebendig, falls irgendwann alle gewunchten Zustande eintreten. Bei sequentiellen Programmen haben diese Eigenschaften folgende Bedeutung. Sicherheit: partielle Korrektheit, Lebendigkeit: Terminierung. Bei parallelen Programmen kommen folgende Bedingungen hinzu. Sicherheit: keine Interferenz in kritischen Bereichen (mutual exclusion), keine Verklemmung (Deadlock-Freiheit). Lebendigkeit: jeder Programmteil erhalt eine `faire' Chance ausgefuhrt zu werden (fair scheduling). Ein Korrektheitsbeweis fur ein paralleles Programm setzt sich dann zusammen aus Korrektheitsbeweisen fur die sequentiellen Abschnitte (partielle Korrektheit und Terminierung); sowie aus Beweisen fur Interferenz-Freiheit und Deadlock-Freiheit von nebenlaugen Abschnitten, sowie Beweisen fur Fairness des benutzten oder implementierten Schedulings. 11 Sicherheit Der Beweis von deadlock-Freiheit kann durch Techniken erfolgen, wie sie z.B. in dem Buch von Andrews beschrieben sind. Wenn etwa BAD ein Pradikat bezeichnet, das einen deadlock charakterisiert, so konnte man zeigen, da fur alle Bedingungen (Preconditions) in allen Programmteilen c: c =) : BAD. Womit dann bewiesen ware, da die Bedingung BAD nicht eintreten konnte, das Programm also deadlock frei ware. Eine andere Technik benutzt eine Invariante I, die in allen Programmteilen gilt und zeigt I =) : BAD. Wodurch dann ebenfalls sichergestellt ist, da BAD nicht eintreten kann. Ist das nicht moglich, kann eventuell noch durch sogenanntes exclusion of congurations (u.U. durch explizites await) sichergestellt werden, da keine Interferenz auftritt. Lebendigkeit Bei den Scheduling Strategien werden folgende Begrie unterschieden 1. unconditionally fair for atomic actions. 2. weak fair, unconditionally fair und bei await werden die Bedingungen auch einmal wahr. 3. strong fair, unconditionally fair und bei await werden die Bedingungen unendlich oft wahr. Zur Verdeutlichung der Strategien betrachten wir einige Beispiele. VAR cout: boolean; cont:=true; con while cont do (* nothing *) end; cont:=false; end; Bei `unconditional fair scheduling' wird auch das Statement cont:=false einmal ausgefuhrt, wodurch dann die while Schleife beendet wird und das con Statement terminiert. Ein unconditional fair scheduling ist z.B. durch round robin Verfahren einfach realisierbar. VAR cont, try: boolean; cont:=true; try:=false; con while cont do try:=true; try:=false end; await try then cont:=false end; end Bei `weak fair scheduling' folgt u.U. keine Terminierung, denn das await Statement wird zwar unendlich oft ausgefuhrt, aber es ist nicht sichergestellt, da in diesem Moment try den Wert true hat. In diesem Fall wird cont nie auf false gesetzt und das while Statement terminiert nicht. Nur bei `strong fair scheduling' folgt die Terminierung, da dann garantiert wird, da das await Statement unendlich oft ausgefuhrt wird und die Bedingung auch unendlich oft den Wert true annimmt. Wie man 12 sieht, kann das nur der Fall sein, wenn das await Statement genau zwichen den Statements try:=true und try:=false ausgefuhrt wird. Die Bestimmung dieses Zeitpunkts ist aber nur nach eingehender Analyse des Programmtextes moglich und kann praktisch nicht vom Betriebssystem und dem Laufzeitsystem durchgefuhrt werden. (Ist z.B. nur mit FIFO Queues in Spezialfallen realisierbar.) Eine letzte Lebendigkeitseigenschaft ist der sogenannte live lock. Dabei tut jeder nicht terminierte Proze etwas, aber ohne die Arbeit voranzubringen. Aufgaben Aufgabe 1 Betrachte das Programm VAR u; v; w; x: INTEGER; u := 0; v := 1; w := 2; con x := u + v + w; u := 3; v := 4; w := 5; end Wieviele Werte kann x annehmen ? (unter den obigen Annahmen zum Prozessormodell). Aufgabe 2 Produzenten-Konsumenten Kopierprogramm. Kopiere zwei Vektoren a und b der Lange n 0 mit zwei nebenlaugen Prozessen mit Hilfe eines gemeinsamen Puers `buf'. Mit anderen Worten implementiere buf := a[i], b[i] := buf , fur i von 1 bis n. Losungsvorschlag: VAR buf, p; c: INTEGER; p := 0; c := 0; con producer; consumer end; wobei producer: VAR a[1::n] INTEGER; while p < n do await p = c end; buf := a[p + 1]; p := p + 1 end consumer: VAR b[1::n] INTEGER; while c < n do await p > c end; b[c + 1] := buf; c := c + 1 end Ist das Programm deadlock frei ? Wenn ja, warum ? Hinweis: Zeige, da die Bedingung p c eine Invariante des Programms ist. Zeige dann, da nur die sich ausschlieenden Falle (p = c) und (p > c) moglich sind. 13 Kapitel 3 POSIX threads In diesem Kapitel wollen wir die entwickelten Sprachkonstrukte mit POSIX threads implementieren. Die POSIX threads Bibliothek ist enthalten in OSF DCE (Open Software Foundation, Distributed Computing Environment). POSIX bedeutet Portable Operating System IX, ein Standardisierungskomitee von IEEE. POSIX threads sind z.T. implementiert auf Mach (threads) oder auch OS/2 und Windows NT. Wichtig fur eine eziente Implementierung von Threads ist, da threads schon vom Betriebssystem bereitgestellt werden und nicht aufgesetzt sind. 3.1 Threads und Prozesse Threads werden manchmal als \leichtgewichtige" Prozesse bezeichnet, was aber das Konzept nicht genau trit, da es sich bei Threads und Prozessen eigentlich um orthogonale Konzepte handelt: Prozesse \besitzen" und kontrollieren externe Ressourcen (alles auerhalb der CPU, d.h. die processing resource) und Threads \besitzen" und kontrollieren die CPU. Eine Gegenuberstellung von Prozessen und Threads zeigt Tabelle 3.1. Prozesse Adreraum virtueller Anteil am Hauptspeicher globale Variablen synchronisat. Var. oene Dateien Kind-Prozesse Timer Signals Abrechnungsinformation Threads Programmzahler PC Stack Registersatz Kind-Threads Zustandsinformation Tabelle 3.1: Prozesse und Threads Wie Prozesse konnen sich Threads in folgenden Zustanden benden: running blocked der thread wird gerade ausgefuhrt, der thread wartet auf etwas, 14 ready der thread ist bereit zur Ausfuhrung terminated der thread ist beendet. 3.1.1 Allgemeines Benutzungsschema Als allgemeines Benutzungsschema werden in Prozessen diverse Operationen in einer Schleife abgearbeitet. Bei Threads werden meist verschiedene Threads gestartet, die dann diverse Operationen ausfuhren. Prozesse: Threads: repeat lese Datei bearbeite Datei until fertig var buffer, start lese thread start Arbeits thread wait thread termination Anstelle des Lesens einer Datei kommen auch diverse andere langsame Operationen in Frage, z.B. RPC Remote Procedure Call Socket communication, Message Passing Pipe I/O Terminal I/O Hauptspeicher Zugrie. Auch auf Maschinen mit nur einer CPU konnen Threads mit Erfolg eingesetzt werden. Bei 20 Benutzern pro Maschine sind Prozesse gut brauchbar und der Prozessor ist meist ausgelastet. Wenn ein Benutzer von einer Datei liet, kann ein anderer Rechenoperationen durchfuhren. Aber wenn nur noch 1 Benutzer pro Maschine vorhanden ist, ist der Prozessor nicht mehr ausgelastet wenn von einer Datei gelesen wird. Durch den Einsatz von Threads kann man auch hier den Durchsatz erhohen. Daher gewinnt auch bei 1 Prozessormaschinen das Threadkonzept und ihre Programmierung standig an Bedeutung. 3.2 U berblick uber die Pthreads Die DCE Pthread Software deniert etwa 51 Bibliotheksroutinen. Auf dem KSR1 Rechner besteht die Pthread Software aus etwa 127 Bibliotheksroutinen und C-Preprozessor Makros; zusatzlich gibt es noch etwa 25 Betriebssystemroutinen, die die zugrundeliegenden Mach threads implementieren. Ein Verzeichniss aller Pthread und MACH thread Funktionen auf der KSR1 bendet sich im Anhang C. Im wesentlichen besprechen wir drei Gruppen von Routinen: 1. con ... end: Erzeugung und Beendung von pthreads: pthread_create, pthread_join, (pthread_exit, pthread_detach) 2. atomic ... end: Begin und Ende von mutual exclusion: pthread_mutex_init, pthread_mutex_lock, pthread_mutex_unlock, (pthread_mutex_trylock) 3. await cond ... end: Bedingungsvariablen pthread_cond_init, pthread_cond_wait, pthread_cond_signal (pthread_cond_broadcast) 15 Pthreads, Mutexe und Bedingungsvariablen verfugen uber eine ganze Reihe von sogenannten Attributen: bei Pthreads sind das z.B.: Stack Size Priority Scheduling Strategie bei Mutexen z.B.: spinlimit: busy wait befor block und bei Bedingungsvariablen z.B.: div. Flags. Diese Attribute konnen mit weiteren Routinen erzeugt und modiziert werden. Eine letzte Gruppe von Funktionen dient dem Scheduling von Pthreads: , liefert den Prozessor auf dem der Pthread lauft. pthread_getthread, liefert den zum Pthread zugeh origen Mach thread. pthread_move, verschiebt oder binded einen Pthread zu oder auf einen Prozessor. pthread_procsetinfo, liefert Informationen u ber verfugbare Prozessoren. pthread_getproc 3.2.1 Header Dateien und Denition Moduln Bei der Programmierung mit Pthreads werden in C und in C++ Header-Dateien verwendet, in denen die Datentypen und Funktionen der Pthread Bibliothek deniert sind. Die Datei heit pthread.h und mu in jedem C bzw. C++ Programm mit include angesprochen werden. #include <pthread.h> Bei Modula-2 Programmen werden die Denitionen aus dem pthread.md Modul verwendet. Z.B. FROM pthread IMPORT ... Das pthread.md Modul ist im Anhang B aufgelistet. 3.3 Erzeugung und Beendigung von Pthreads Im folgenden wollen wir das con S ; : : :; Sn end 1 Sprachkonstrukt implementieren. Zur Verfugung stehen uns die Pthread-Funktionen create und join mit den folgenden Spezikationen: 16 int pthread_create (pthread_t *thread, pthread_attr_t attr, pthread_func start_routine, void *arg); Erzeugen eines neuen Pthreads. int pthread_join (pthread_t thread, void **status); Warten auf die Beendigung des angegebenen Pthreads. Die Datentypen pthread_t und pthread_attr_t sowie die Funktionen werden im pthread.h Header-File deklariert. Die start-routine mu eine einparametrige Funktion sein: typedef void *(*pthread_func)(void *) Als Attribut kann man ein default-Attribut verwenden, das ebenfalls in pthread.h deklariert wird. pthread_attr_t pthread_attr_default und schon initialisiert ist. Nur falls man spezielle Eigenschaften der Pthreads haben mochte, mu man sich diese Attribute mit den entsprechenden Funktionen selbst erzeugen. pthread_create nimmt als Eingabe ein Attribut attr, eine Prozedur start_routine und einen Pointer auf einen Parameter arg, der der start_routine beim Aufruf ubergeben wird. pthread_create hat als Ausgabe einen Pthread Identikator thread, (der z.B. in join wieder gebraucht wird) und einen Error code als Funktionswert vom Typ int (=32 bit int). pthread_join nimmt als Eingabe den entsprechenden Pthread Identikator thread und liefert in status den Funktionswert der start_routine zuruck. Der Funktionswert von pthread_join ist wieder ein Error code. Falls die Aufrufe erfolgreich sind, wird als Error code 0 zuruckgegeben, sonst ein Wert < 0. Die Semantik von e= pthread_create (t, a, f, x); e= pthread_join (t, y); ist somit gleich der Semantik von y= f(x); falls f keine globalen Variablen verwendet oder nur dieser eine Pthread erzeugt wird. Zur Implementierung des con Sprachkonstruktes mussen wir nur noch die Statements Si (Si) in Prozeduren Pi packen. void *Pi (void *i) { Si; RETURN NULL; }; Damit ist con S1 ; : : :; Sn end 17 e=pthread_create (t1, a, P1, x); ... e=pthread_create (tn, a, Pn, x); e=pthread_join (t1, y); ... e=pthread_join (tn, y); Wobei t1, ..., tn als Variablen vom Typ pthread_t deklariert werden mussen und x und y vom Typ (void *). Aufgabe 3 Implementierung des Programms aus Aufgabe 1. Wir geben im folgenden ausfurlich eine Losung in C++, in Modula-2 und in FORTRAN an. Bei den weiteren Aufgaben werden wir uns allerdings wieder nur auf die Diskussion der Losungen in einer Programmiersprache beschranken. 3.3.1 Losung in C++ Das C++ Programm ist wie folgt. Die Variablen u, v, w, x werden global deklariert. Die Statements x := u + v + w, u := 3, v := 4 und w := 5 werden in den Funktionen P1, P2, P3, P4 abgearbeitet. Im Hauptprogramm main werden dann die Pthread Variablen deklariert pthread_t t1, t2, t3, t4; und mit pthread_create wird dann die Ausfuhrung der 4 Funktionen angestoen. Anschliessend wird mit pthread_join auf die Beendigung der 4 Funktionen gewartet und danach werden die Werte der Variablen mit cout angezeigt. #include <iostream.h> #include <pthread.h> int x, u, v, w; void *P1 (void * arg) { x = u + v + w; return arg; } void *P2 (void * arg) { u = 3; return arg; } void *P3 (void * arg) { v = 4; return arg; } void *P4 (void * arg) { w = 5; return arg; } int main() { 18 pthread_t t1, t2, t3, t4; long int i1, i2, i3, i4; void *o1, *o2, *o3, *o4; i1 = 1; i2 = 2; i3 = 3; i4 =4; int i = 0; int e; while (i < 10) { i++; u = 0; v = 1; w = 2; e e e e = = = = pthread_create(&t1, pthread_create(&t2, pthread_create(&t3, pthread_create(&t4, e e e e = = = = pthread_join(t1, pthread_join(t2, pthread_join(t3, pthread_join(t4, cout cout cout cout cout << << << << << " u = " v = " w = " x = "\n"; " " " " << << << << pthread_attr_default, pthread_attr_default, pthread_attr_default, pthread_attr_default, P1, P2, P3, P4, &i1); &i2); &i3); &i4); &o1); &o2); &o3); &o4); u; v; w; x; }; return 0; } Das Programm wird mit >CC ppauf3.C -o ppauf3 -para compiliert und mit >ppauf3 ausgefuhrt. Die Ausgabe konnte etwa wie folgt aussehen. Die Ausgabe x = 3, in der ersten Zeile, zeigt, da die Summe u + v + w vor den Zuweisungen an die Variablen berechnet wurde. Die Ausgabe x = 9, in der zweiten Zeile, zeigt, da zu erst die Zuweisungen an v und w ausgefuhrt wurden, dann die Summe u + v + w berechnet wurde und schliesslich die Zuweisung an u ausgef uhrt wurde. Die Ausgabe x = 12, in der dritten Zeile, zeigt, da zu erst alle Zuweisungen an u, v und w ausgefuhrt wurden und dann erst die Summe u + v + w berechnet wurde. u u u u u u u u u u = = = = = = = = = = 3 3 3 3 3 3 3 3 3 3 v v v v v v v v v v = = = = = = = = = = 4 4 4 4 4 4 4 4 4 4 w w w w w w w w w w = = = = = = = = = = 5 5 5 5 5 5 5 5 5 5 x x x x x x x x x x = = = = = = = = = = 3 9 12 3 3 3 12 3 3 3 3.3.2 Losung in Modula-2 Das Modula-2 Programm ist wie folgt. Wie im C++ Programm werden die Variablen u, v, w, x global deklariert. Ebenso werden die Statements x := u + v + w, u := 3, v := 4 und w := 5 in den Funktionen P1, P2, P3, P4 abgearbeitet. Im Hauptprogramm tuwas werden dann die Pthread Variablen mit 19 deklariert und mit pthread_create wird dann die Ausfuhrung der 4 Funktionen angestoen. Anschliessend wird mit pthread_join auf die Beendigung der 4 Funktionen gewartet und danach werden die Werte der Varaiblen mit WriteLong angezeigt. VAR t1, t2, t3, t4: pthread_t; MODULE ppauf3; FROM SYSTEM IMPORT ADDRESS; FROM pthread IMPORT pthread_t, attr_default, pthread_create, pthread_join; FROM StdIO IMPORT WriteS, WriteLong, WriteNl, WriteFlush; VAR u, v, w, x: INTEGER; PROCEDURE P1(VAR arg: ADDRESS): ADDRESS; (* statement 1. *) BEGIN (*1*) x:=u+v+w; RETURN( arg ) (*9*) END P1; PROCEDURE P2(VAR arg: ADDRESS): ADDRESS; (* statement 2. *) BEGIN (*1*) u:=3; RETURN( arg ) (*9*) END P2; PROCEDURE P3(VAR arg: ADDRESS): ADDRESS; (* statement 3. *) BEGIN (*1*) v:=4; RETURN( arg ) (*9*) END P3; PROCEDURE P4(VAR arg: ADDRESS): ADDRESS; (* statement 4. *) BEGIN (*1*) w:=5; RETURN( arg ) (*9*) END P4; PROCEDURE tuwas; (* wie der name schon sagt. *) VAR t1, t2, t3, t4: pthread_t; i1, i2, i3, i4: INTEGER; o1, o2, o3, o4: INTEGER; BEGIN (*1*) (*initialize*) i1:=1; i2:=2; i3:=3; i4:=4; u:=0; v:=1; w:=2; (*2*) (*start procedures *) IF pthread_create(t1,attr_default,P1,i1) < 0 IF pthread_create(t2,attr_default,P2,i2) < 0 IF pthread_create(t3,attr_default,P3,i3) < 0 IF pthread_create(t4,attr_default,P4,i4) < 0 (*3*) (*collect procedures *) IF pthread_join(t1,o1) < 0 THEN (*...*) END; IF pthread_join(t2,o2) < 0 THEN END; IF pthread_join(t3,o3) < 0 THEN END; IF pthread_join(t4,o4) < 0 THEN END; (*4*) (*finalize*) WriteS(" u = "); WriteLong(u,0); WriteS(" v = "); WriteLong(v,0); WriteS(" w = "); WriteLong(w,0); 20 THEN THEN THEN THEN END; END; END; END; WriteS(" x = "); WriteLong(x,0); WriteNl; WriteFlush; (*9*) END tuwas; VAR i: INTEGER; BEGIN FOR i:=1 TO 10 DO tuwas; END; END ppauf3. Das Programm wird mit >make ppauf3 :mtc ppauf3.mi :cc ppauf3.c -para compiliert und mit >ppauf3 ausgefuhrt. Das zugehorige Makefile bendet sich im Anhang. Die Ausgabe konnte etwa wie folgt aussehen. Die Ausgabe x = 3, in der ersten Zeile, zeigt, da die Summe u + v + w vor den Zuweisungen an die Variablen berechnet wurde. Die Ausgabe x = 9, in der zweiten Zeile, zeigt, da zu erst die Zuweisungen an v und w ausgefuhrt wurden, dann die Summe u + v + w berechnet wurde und schliesslich die Zuweisung an u ausgefuhrt wurde. Die Ausgabe x = 6, in der dritten Zeile, zeigt, da zu erst die Zuweisung an u ausgefuhrt wurde und dann die Summe u + v + w berechnet wurde. Anschlieed wurden die Zuweisungen an v und w ausgefuhrt. u u u u u u u u u u = = = = = = = = = = 3 3 3 3 3 3 3 3 3 3 v v v v v v v v v v = = = = = = = = = = 4 4 4 4 4 4 4 4 4 4 w w w w w w w w w w = = = = = = = = = = 5 5 5 5 5 5 5 5 5 5 x x x x x x x x x x = = = = = = = = = = 3 9 6 3 3 3 3 3 3 3 3.3.3 Losung in FORTRAN Das FORTRAN Programm ist wie folgt. Wie im C++ und im Modula-2 Programm werden die Variablen u, v, w, x global deklariert, das bedeutet, da sie in einem common-Block abgelegt werden. Die Statements x = u + v + w, u = 3, v = 4 und w = 5 werden in den (externen) Funktionen if1, if2, if3, if4 abgearbeitet (siehe weiter unten). Die default-Attribute f ur die Pthreads werden aus dem externen common-Block pthread_defaults genommen. Im Hauptprogramm werden die Pthread Variablen it1, it2, it3, it4 als Integer deklariert. In einer do-Loop wird dann 10 mal mit pthread_create die Ausf uhrung der 4 Funktionen angestoen und mit pthread_join auf die Beendigung der 4 Funktionen gewartet. Anschlieend werden dann die Werte der Varaiblen mit print* angezeigt. program ppauf3 integer i1, i2, i3, i4, istat, i, it1, it2, it3, it4, X ij, iad, imd, icd, ibd, u, v, w, x common /data/ u, v, w, x common /pthread_defaults/iad, imd, icd, ibd external if1, if2, if3, if4 21 do 10 i=1,10 u=0 v=1 w=2 call pthread_create(it1,iad,if1,i1,istat) if (istat .ne. 0) print*, 'istat= ',istat call pthread_create(it2,iad,if2,i2,istat) if (istat .ne. 0) print*, 'istat= ',istat call pthread_create(it3,iad,if3,i3,istat) if (istat .ne. 0) print*, 'istat= ',istat call pthread_create(it4,iad,if4,i4,istat) if (istat .ne. 0) print*, 'istat= ',istat call pthread_join(it1,ij,istat) if (istat .ne. 0) print*, 'istat= call pthread_join(it2,ij,istat) if (istat .ne. 0) print*, 'istat= call pthread_join(it3,ij,istat) if (istat .ne. 0) print*, 'istat= call pthread_join(it4,ij,istat) if (istat .ne. 0) print*, 'istat= 10 ',istat ',istat ',istat ',istat print*, 'u = ',u, 'v = ',v, 'w = ',w, 'x = ',x continue end Die Statement-Funktionen werden hier als externe Funktionen deklariert und sind z.B. in der Datei ppauf3sub.f enthalten. integer function if1(iarg) integer iarg, u, v, w, x common /data/u, v, w, x x=u+v+w return end integer function if2(iarg) integer iarg, u, v, w, x common /data/u, v, w, x u=3 return end integer function if3(iarg) integer iarg, u, v, w, x common /data/u, v, w, x v=4 return end integer function if4(iarg) integer iarg, u, v, w, x common /data/u, v, w, x w=5 return end Das Programm wird mit f77 -u -para -o a3 ppauf3.f ppauf3sub.f compiliert und mit 22 >a3 ausgefuhrt. Die Ausgabe konnte etwa wie folgt aussehen. Die Ausgabe x = 3, in der ersten Zeile, zeigt, da die Summe u + v + w vor den Zuweisungen an die Variablen berechnet wurde. Die Ausgabe x = 12, in der zweiten Zeile, zeigt, da zu erst die Zuweisungen an u, v und w ausgefuhrt wurden und dann die Summe u + v + w berechnet wurde. Die Ausgabe x = 6, in der dritten Zeile, zeigt, da zu erst die Zuweisung an u ausgefuhrt wurde und dann die Summe u + v + w berechnet wurde. Anschlieed wurden die Zuweisungen an v und w ausgefuhrt. u u u u u u u u u u = = = = = = = = = = 3 3 3 3 3 3 3 3 3 3 v v v v v v v v v v = = = = = = = = = = 4 4 4 4 4 4 4 4 4 4 w w w w w w w w w w = = = = = = = = = = 5 5 5 5 5 5 5 5 5 5 x x x x x x x x x x = = = = = = = = = = 3 12 6 3 3 3 3 3 3 3 Damit ist die Besprechung der Losungen der Aufgabe in den Programmiersprachen C++, Modula-2 und FORTRAN beendet. Im Folgenden werden wir nur wieder eine Sprache verwenden. 3.4 Beginn und Ende von Mutual exclusion Im folgenden wollen wir das Sprachkonstrukt atomic S ; : : :; Sn end 1 implementieren. Zur Verfugung stehen uns die Pthread-Funktionen lock und unlock mit den folgenden Spezikationen: int pthread_mutex_lock (pthread_mutex_t* mux); locken eines mutexes. int pthread_mutex_unlock(pthread_mutex_t* mux); unlocken eines mutexes. int pthread_mutex_init (pthread_mutex_t* mux, pthread_mutexattr_t attr); Initialisierung eines mutex mit gew. Attributen. Die Datentypen pthread_mutex_t und pthread_mutexattr_t sowie die Funktionen werden im Header File pthread.h deklariert. Als Attribut kann man ein default-Attribut verwenden. pthread_mutexattr_t pthread_mutexattr_default; und schon initialisiert ist. Nur falls man spezielle Eigenschaften der Mutexe haben mochte, mu man sich diese Attribute mit den entsprechenden Funktionen selbst erzeugen. Im Gegensatz zu Pthreads mussen Mutexe vor der Benutzung initialisiert werden. Mutexe dienen der Serialisierung des Zugangs verschiedener Pthreads zu atomaren Bereichen. lock beginnt einen atomaren Bereich und unlock verlat den atomaren Bereich. Falls ein Pthread sich in einem 23 atomaren Bereich bendet (zwischen lock und unlock) wird ein anderer Pthread, der einen lock verlangt, blockiert. Nach Ausfuhrung von unlock erhalt genau ein anderer Pthread, der auf einen lock wartet, den lock. Welcher das ist, wird vom Betriebssystem festgelegt. Falls die Aufrufe erfolgreich sind, wird als Error code 0 zuruckgegeben, sonst ein Wert < 0. Im Gegensatz zu unserer ursprunglichen Denition vom atomaren Bereich, bieten die Mutexe die Freiheit, verschiedene kritische Bereiche zu denieren, die sich uberlappen konnen. Einem atomaren Bereich (in der ursprunglichen Denition) entsprache die Verwendung von genau einem Mutex fur alle kritischen Bereiche. Bei der Implementierung des atomic Sprachkonstrukts haben wir somit die Freiheit, verschiedene Mutexe zu verwenden und somit verschiedene kritische Bereiche zu denieren. Damit ist atomic S1 ; : : :; Sn end pthread_mutex_t globalmux; e= pthread_mutex_init (&globalmux, pthread_mutexattr_default); e= pthread_mutex_lock (&globalmux); S1; ...; Sn; e= pthread_mutex_unlock(&globalmux); Diese Implementierung ist allerdings nicht ganz korrekt, z.B. mute con z := x + y; atomic x := 1; y := 2 end end als con atomic z := x + y end; atomic x := 1; y := 2 end end implementiert werden, damit die Zwischenzustande wirklich nicht sichtbar sind. Wichtig: Bei der Implementierung besteht also zusatzlich die Aufgabe, den auszuschlieenden Gegenpart zu identizieren und mit dem Mutex auszuschlieen. Aufgabe 4 Implementiere das folgende Program mit Hilfe von Mutexen. u := 0; v := 1; w := 2; con atomic x := u + v + w end; atomic u := 3; v := 4; w := 5; end end; Losungsvorschlag in C++. Die Statements werden hier auf die zwei Funktionen P1 und P2 aufgeteilt. Innerhalb dieser Funktionen erfolgt jeweils ein lock bevor das Statement ausgefuhrt wird. Und nach der Ausfuhrung wird der lock wieder freigegeben. Im Hauptprogramm wird (nach den Deklarationen) zu erst der Mutex mux initialisiert und dann werden wie schon bekannt, die Pthreads gestartet. Nach der Beendigung der Pthreads werden dann wieder die Ergebnisse ausgegeben. #include <iostream.h> #include <pthread.h> int x, u, v, w; pthread_mutex_t mux; void *P1 (void * arg) { int e; e = pthread_mutex_lock(&mux); 24 } x = u + v + w; e = pthread_mutex_unlock(&mux); return arg; void *P2 (void * arg) { int e; e = pthread_mutex_lock(&mux); u = 3; v = 4; w = 5; e = pthread_mutex_unlock(&mux); return arg; } int main() { pthread_t t1, t2; long int i1, i2; void *o1, *o2; i1 = 1; i2 = 2; int i = 0; int e; e = pthread_mutex_init(&mux, pthread_mutexattr_default); while (i < 10) { i++; u = 0; v = 1; w = 2; e = pthread_create(&t1, pthread_attr_default, P1, &i1); e = pthread_create(&t2, pthread_attr_default, P2, &i2); e = pthread_join(t1, &o1); e = pthread_join(t2, &o2); cout cout cout cout cout } << << << << << " u = " v = " w = " x = "\n"; " " " " << << << << u; v; w; x; }; return 0; Die Ausgabe konnte etwa wie folgt aussehen. Es kommen jetzt nur die beiden Werte x = 3 und x = 12 vor. Die Ausgabe x = 3 zeigt, da die Summe u + v + w vor den Zuweisungen an die Variablen berechnet wurde. Die Ausgabe x = 12 zeigt, da zu erst die Zuweisungen an die Variablen ausgefuhrt wurden und dann die Summe u + v + w berechnet wurde. u u u u u u u u u u u u u u = = = = = = = = = = = = = = 3 3 3 3 3 3 3 3 3 3 3 3 3 3 v v v v v v v v v v v v v v = = = = = = = = = = = = = = 4 4 4 4 4 4 4 4 4 4 4 4 4 4 w w w w w w w w w w w w w w = = = = = = = = = = = = = = 5 5 5 5 5 5 5 5 5 5 5 5 5 5 x x x x x x x x x x x x x x = = = = = = = = = = = = = = 3 3 3 12 3 3 3 3 12 3 12 3 3 3 25 Aufgabe 5 Summiere die Elemente eines Arrays mit drei parallel arbeitenden Funktionen. Benutze eine globale Variable sum, die am Ende die Summe enthalten soll. Wobei S(i) VAR slocal : INTEGER; b; e; j: INTEGER; slocal := 0; b := b m3 c (i ; 1) + 1; e := b m3 c i; if i = 3 then e := m end; for j := b to e do slocal := slocal + A[j]; end; atomic sglobal := sglobal + slocal end; Losungsvorschlag in C++. Der Programmteil S(i) wird von der Funktion sfunc ausgefuhrt. In sfunc werden zunachst die Arbeitsbereiche (abangig von i) bestimmt (Berechnung von e und b). Dann wird lokal die Summe uber den entsprechenden Bereich gebildet und anschliessend (mit lock auf den Mutex pmt) die lokale Summe zu der globalen Summe hinzuaddiert. Im Hauptprogramm wird zun achst der Mutex pmt und das Array a initialisiert (mit Bildung einer Kontrollsumme). Dann werden 7 Pthreads gestartet (in dem Feld ia wird der Arbeitsindex i mitgeteilt) und wieder auf ihre Beendigung gewartet. D.h. implementiere VAR A: ARRAY [1..m] OF INTEGER; sglobal : INTEGER; (* Initialisiere A *) sglobal := 0; con S(1); S(2); S(3) end; print(sglobal ); #include <iostream.h> #include <pthread.h> const int ANZ_PTHREADS=7; const wsize=2000; long int a[wsize]; long int sum=0; long int xsum=0; pthread_mutex_t pmt; pthread_t pt[ANZ_PTHREADS]; void *sfunc(void *arg) { long int lsum=0; long int j, b, e, i =*((long int *)arg); b=(wsize/ANZ_PTHREADS)*(i-1); e=(wsize/ANZ_PTHREADS)*i; if (i==ANZ_PTHREADS) e=wsize; cout << " i=" << i << " b=" << b << " e=" << e << "\n"; for ( j=b; j<e; j++) lsum+=a[j]; } pthread_mutex_lock(&pmt); sum+=lsum; pthread_mutex_unlock(&pmt); return arg; main(){ void* return_pointer[ANZ_PTHREADS]; long int ia[ANZ_PTHREADS]; long int j, l; pthread_mutex_init(&pmt,pthread_mutexattr_default); for ( j=0; j<wsize; j++) { a[j]=j; xsum+=j; 26 } for ( l=0; l<ANZ_PTHREADS; l++) { ia[l]=l+1; pthread_create(&pt[l], pthread_attr_default, sfunc, (void *) &ia[l]); } for ( l=0; l<ANZ_PTHREADS; l++) { pthread_join(pt[l], &return_pointer[l]); } cout << "Summe ist " << sum << "\n"; cout << "Summe soll " << xsum << "\n"; return 0; } Die Ausgabe konnte etwa wie folgt aussehen. Die Anzeige von i = zeigt, da die Arbeitsschritte in der Reihenfolge 1; 4; 3; 2; 6;7 und 5 begonnen wurden. Das Ergebnis ist wie erwartet. i=1 b=0 e=285 i=4 b=855 e=1140 i=3 b=570 e=855 i=2 b=285 e=570 i=6 b=1425 e=1710 i=7 b=1710 e=2000 i=5 b=1140 e=1425 Summe ist 1999000 Summe soll 1999000 3.5 Bedingungsvariablen und Mutual exclusion Zur Implementierung des letzten Sprachkonstrukts await B then S ; : : :; Sn end 1 benotigen wir einige Vorbereitungen. Zur Verfugung stehen uns die Pthread-Funktionen wait und signal mit den folgenden Spezikationen: int pthread_cond_wait (pthread_cond_t* cond, pthread_mutex_t* mutex); Warten auf eine Bedingung bez. eines Mutexes. int pthread_cond_signal(pthread_cond_t* cond); Signalisieren einer Bedingung. int pthread_cond_init (pthread_cond_t* cond, pthread_condattr_t attr); Initialisieren einer Bedingungsvariablen. int pthread_cond_broadcast (pthread_cond_t* cond); Signalisieren einer Bedingung an alle wartenden Pthreads. Die Datentypen pthread_cond_t und pthread_condattr_t sowie die Funktionen sind im Header File pthread.h deklariert. Als Attribut kann man ein default-Attribut, verwenden 27 pthread_condattr_t pthread_condattr_default; das schon initialisiert ist. Nur falls man spezielle Eigenschaften der Bedingungsvariablen haben mochte, mu man sich diese Attribute mit den entsprechenden Funktionen selbst erzeugen. Die Bedingungsvariablen mussen wie Mutexe vor der Benutzung initialisiert werden. Falls die Funktionsaufrufe erfolgreich sind, wird als Error code 0 zuruckgegeben, sonst ein Wert < 0. Eine Variante von wait ist das `timed wait', bei dem eine Zeit mit angegeben wird, wie lange maximal auf das Eintreten einer Bedingung gewartet werden soll. Bedingungsvariablen dienen dazu, den Eintritt bestimmter Bedingungen abzuwarten bzw. die Erfullung oder den Eintritt einer Bedingung anzuzeigen. Bedingungsvariablen sind an Mutexe gekoppelt. Beim Warten auf eine Bedingung wird gleichzeitig der lock auf den gekoppelten Mutex freigegeben (zuvor mu naturlich ein lock auf diesen Mutex erfolgt sein). Umgekehrt sollte vor dem Signal auf die Bedingung ein unlock auf den gekoppelten Mutex erfolgen, damit nach dem wait auch der lock auf den Mutex wieder vorhanden ist. Ist kein unlock vor dem signal erfolgt, wartet wait wieder bis der lock auf den Mutex moglich ist. Das Problem bei der Implementierung des await-Statements ist, da die Bedingung ein beliebiger Boolescher Ausdruck sein darf, wir aber nur eine Boolesche Variable zur Verfugung haben. Die Grundidee bei der Losung ist zunachst fur jede semantisch verschiedene Bedingung eine eigene Bedingungsvariable einzufuhren und dann den Test, ob die Bedingung wahr ist an alle (wichtigen) Stellen im Programm zu verlegen, an denen die Bedingung wahr geworden sein konnte und dort, falls ja, mit signal (oder broadcast) den Eintritt der Bedingung zu signalisieren. Damit ergibt sich das folgende Schema: await B then S1 ; : : :; Sn end // globale Variablen: pthread_mutex_t globalmux; pthread_cont_t condb; /* init */ e= pthread_mutex_init (globalmux, pthread_mutexattr_default) e= pthread_cond_init (condb, pthread_condattr_default) /* waiter */ e= pthread_mutex_lock (globalmux); while (! B) e= pthread_cond_wait (condb, globalmux); S1; ...; Sn; e= pthread_mutex_unlock(globalmux); /* tester */ // B koennte wahr werden if (B) pthread_cond_signal(condb); Diese Losung ist allerdings nicht vollstandig, da z.B. signal vor wait ausgefuhrt werden konnte, was den einen Pthread auf immer blockieren wurde (sogenannte \lost signals"). Eine Losung zu diesem Problem werden wir im nachsten Abschnitt besprechen. 3.6 Semaphore Um die Verwendung von Mutexen und Bedingungsvariablen deutlicher zu machen, wollen wir eine allgemeinere Variante von Mutexen besprechen und implementieren. Dieses allgemeinere Konzept sind die sog. Semaphoren (Signalmasten, Leuchtturme), die im Gegensatz zu Mutexen nicht nur die Zustande blocked und unblocked einnehmen konnen, sondern beliebig viele (abzahlbar viele). Semaphoren wurden zuerst entwickelt, um das Problem der \lost signals" zu losen. Es gibt zwei Operationen `P' und `V' auf Semaphoren `sem': 28 P(sem): entspricht dem lock bei Mutexen, wobei mitgezahlt wird, der wievielte lock-Versuch es ist. V(sem): entspricht dem unlock bei Mutexen, wobei ebenfalls mitgezahlt wird, wieviele unlocks stattgefunden haben. Es wird sichergestellt, da die Anzahl der locks der Anzahl der unlocks plus ein Initialisierungswert ist. #P #V + init: Das Zahlen der P- und V-Operationen kann mit einer Variablen s #V +init ; #P und der Bedingung s 0 erledigt werden. Damit erhalten wir folgende (abstrakte) Implementierung der Operationen: P(sem): await s > 0 then s := s ; 1 end; V(sem): atomic s := s + 1 end; Man sieht, da jetzt keine V-Operation (signal) verloren geht: auch wenn V vor P ausgefuhrt wird, ist sichergestellt, da P ohne weitere Verzogerung beendet wird. Ein Spezialfall der Semaphoren sind die Mutexe, die wir schon kennen. Sie sind sog. binare Semaphoren, bei denen der Zahler s nur die Werte 0 und 1 annimmt. Mutexe werden meist als P(bin): await b > 0 then b := b ; 1 end V(bin): atomic b := b + 1 end implementiert, was zu den \lost signals" fuhrt. Die korrekte Fassung fur V ware V(bin): await b < 1 then b := b + 1 end; Ein Problem bei der Implementierung von V(sem) bleibt naturlich, da s in der Regel als Integer nur Werte bis 231 bzw. 263 annehmen kann. Korrekterweise mute daher auch hier die Implementierung von V(sem) sein V(sem): await s < 231 then s := s + 1 end. Die Implementierung von Semaphoren mit Hilfe von Mutexen und Bedingungsvariablen gibt uns auch ein erstes Beispiel, wie das allgemeine await Statement zu implementieren ist. Implementierungsskizze: P(sem): lock(sem.mux); WHILE sem.s <= 0 DO "waiting:=true" wait(sem.pos, sem.mux) END; DEC(sem.s) unlock(sem.mux): V(sem): lock(sem.mux): INC(sem.s); IF some are waiting THEN signal(sem.pos) END; unlock(sem.mux); Das Prinzip zur Implementierung von await wird an diesem Beispiel auch deutlich: aus \await s > 0 ..." wird IF not (s>0) THEN wait(sem.pos) ... 29 Nach dem s > 0 geworden sein kann, namlich nach INC(s) in V(sem), wird bei Bedarf signalisiert, da s nun positiv ist INC(s); ...; signal(sem.pos); Bemerkung: Das Zahlen der wartenden Pthreads konnte man einsparen, falls in V(sem) die SignalOperation bedingungslos nach jedem INC(s) ausgefuhrt wurde. ) u.U. schlechtere Performance 3.6.1 Semaphoren Implementierung Es folgt ein Listing einer vollstandigen Implementierung von Semaphoren mit Hilfe von Pthread Funktionen. Die Variablen der Klasse \sema" haben folgende Bedeutung. s: bezeichnet den Semaphorenzahler, wie beschrieben. mux: bezeichnet einen Mutex, der den Zugang zu den Semaphor-Variablen serialisiert, d.h. garantiert, da die Modikationen als atomare Operationen ausgefuhrt werden. pos: Bedingungsvariable, die signalisiert, da s > 0 ist, und auf die gewartet wird, falls s 0 ist. del: Anzahl der wartenden Pthreads. Notwendig um festzustellen, ob eine Signal-Operation erforderlich ist. Wenn sich signal darauf beschrankte, genau einen Pthread \aufzuwecken", wahre das Statement IF s <= 0 THEN ... bereits ausreichend. So ist aber das Statement WHILE s <= 0 DO erforderlich um sicherzustellen, da nicht ein anderer Thread den Wert von s in der Zwischenzeit verandert hat. // semaphore class #include <pthread.h> #include "TYPES.h" #include "ERROR.h" ////////////////////////////////////////////////////////////////////////// // Interface class sema ////////////////////////////////////////////////////////////////////////// class sema { private: INTEGER init; INTEGER s; INTEGER del; pthread_mutex_t mux; pthread_cond_t pos; public: sema(); sema(INTEGER); ~sema(); void P(); void V(); }; ////////////////////////////////////////////////////////////////////////// // Implementierung der Methoden class sem ////////////////////////////////////////////////////////////////////////// sema::sema(INTEGER i = 0) 30 { if (pthread_mutex_init(&mux, pthread_mutexattr_default) < 0) { ERROR(fatal, "sema init: Cannot initialize sem.mux"); } if (pthread_cond_init(&pos, pthread_condattr_default) < 0) { ERROR(fatal, "sema init: Cannot initialize sem.pos"); } if (i >= 0) { s = i; } else { s = 0; } init = s; del = 0; } sema::~sema() { if (s != init) { GWRITE( s-init ); ERROR(fatal, "sema destruction: pending operations "); } } void sema::P() { if (pthread_mutex_lock(&mux) < 0) { ERROR(fatal, "P: Cannot lock sem.mux"); } while (s <= 0) { del++; if (pthread_cond_wait(&pos, &mux) < 0) { ERROR(fatal, "P: Cannot wait on sem.pos"); } del--; } s--; if (pthread_mutex_unlock(&mux) < 0) { ERROR(fatal, "seminit: Cannot unlock sem.mux"); } } void sema::V() { if (pthread_mutex_lock(&mux) < 0) { ERROR(fatal, "P: Cannot lock sem.mux"); } s++; if (del > 0) { if (pthread_cond_signal(&pos) < 0) { ERROR(fatal, "P: Cannot signal sem.pos"); } } if (pthread_mutex_unlock(&mux) < 0) { ERROR(fatal, "seminit: Cannot unlock sem.mux"); } } 31 3.6.2 Barriers mit Semaphoren Aufgabe 6 Implementierung des Barrierkonstrukts mit Hilfe von Semaphoren. Barrier ist ein Sprachkonstrukt, mit dem man warten kann, bis sich eine vorgegebene Anzahl von Teilnehmern angemeldet haben und das dann terminiert. Eine (inkorrekte, warum ?) Losung mit await ware etwa VAR b; n: INTEGER; b := 0; n :=AnzahlTeilnehmer; Barrier: atomic b := b + 1; end; await b n then b := 0 end Tip: Was passiert, wenn ein Prozess ein zweites Mal die Barrier benutzt, wahrend die ubrigen Prozesse noch den then-Zweig ausfuhren? Eine korrekte Losung mit Semaphoren ist etwa VAR bs, bl: Semaphore; VAR b, n: INTEGER; seminit(bs, 0); seminit(bl, 1); PROCEDURE Barrier(); VAR i: INTEGER; BEGIN P(bl); b:= b + 1; IF b < n THEN V(bl); P(bs); ELSE b:=0; V(bl); FOR i:=2 TO n DO V(bs) END; END; END Barrier; Aufgabe: Verbessern Sie die erste Losung mit await. Ist die folgende Losung korrekt ? VAR b; n: INTEGER; b := 0; n :=AnzahlTeilnehmer; Barrier: VAR l: INTEGER; atomic b := b + 1; l := b; end; await b n end; if l = n then b := 0 end; await b = 0 end; Aufgabe 7 Ist die folgende Losung korrekt ? VAR bs, bl: Semaphore; VAR b, n: INTEGER; seminit(bs, 0); seminit(bl, 1); PROCEDURE Barrier(); BEGIN P(bl); b:= b + 1; IF b < n THEN V(bl); P(bs); 32 ELSE b:=0; V(bl); END; V(bs); END Barrier; Was sind die Vor- und Nachteile der beiden Losungen mit Semaphoren ? 3.6.3 await mit Semaphoren Die Implementierung von await mit Semaphoren bringt keine Verbesserung gegenuber der obigen Losung mit Mutexen und Bedingungsvariablen. Es mussen auf jeden Fall die Anzahl der wartenden Threads gezahlt werden um die richtige Anzahl der V Operationen ausfuhren zu konnen. await B then S1; : : :; Sn end; VAR sb, mb: semaphore; nb: INTEGER; seminit(sb, 0); seminit(mb, 0); nb:=0; Waiter: P(mb); WHILE not B DO INC(nb); V(mb); P(sb); P(mb); DEC(nb); END; S1; ...; Sn; V(mb); Tester: P(mb); IF B THEN FOR i:=1 TO nb DO V(sb) END; END; V(mb); 3.7 Das Bounded Buer Problem Mit Hilfe von Semaphoren wollen wir das folgende Problem losen: n Produzenten erzeugen Daten, die an m Konsumenten weitergegeben werden sollen. Zur Weitergabe der Daten dient ein Puer, der in der Lage sein soll, k Daten zu speichern. (Wobei 1 m, 1 k). Falls der Puer voll ist, sollen die Produzenten warten, bis wieder Platz ist und falls der Puer leer ist, sollen die Konsumenten warten, bis wieder Daten vorhanden sind. Falls keine Arbeit mehr anliegt, sollen die Produzenten und die Konsumenten terminieren. Der Puer wird als Ringpuer implementiert, siehe Abbildung 3.2. Die Variablen front und rear zeigen jeweils auf den nachsten vollen Platz zum Lesen des Puers bzw. auf den nachsten freien Platz zum Schreiben. Falls der Puer nicht voll ist, kann ein Produzent in den Puer schreiben und falls der Puer nicht leer ist, kann ein Konsument aus dem Puer lesen. Die Synchronisation mu somit sicherstellen, da der Produzent wartet, falls der Puer voll ist und der Konsument wartet, falls der Puer leer ist. D.h. sei x die Anzahl der Elemente im Puer, dann mu immer gelten 1 x k. Insbesondere mu gelten x k, damit der Produzent eine Nachricht schreiben kann und es mu gelten 1 x, damit der Konsument eine Nachricht lesen kann. Damit ergibt sich folgender Ansatz: 33 z 1. used }| x x {z rear . free }| . . . {z front x used }| x x { Tabelle 3.2: Ringpuer Producer (i): LOOP (* erzeuge *) await x < k then (*put to buffer*); x:= x + 1 end END Consumer (i): LOOP await x >= 1 then (*pop from buffer*); x:= x - 1 end (* verbrauchen *) END. zusammen mit dem Hauptprogramm x:=0; con Producer (1); ...; Producer (n); Consumer (1); ...; Consumer (m) end; Der Nachteil dieser Losung ist, da Produzenten und Konsumenten nur abwechselnd auf den Puer zugreifen konnen. Eine Abhilfe schat die Verwendung von zwei Zahlern, z.B. e und f (empty und full), die wie folgt verwendet werden. Wobei e den Anfangswert k und f den Anfangswert 0 haben mu. Producer (i): LOOP (* erzeuge *) await e >= 1 then e:= e - 1 end; (* put to buffer *) atomic f:= f + 1 end END; Consumer (i): LOOP await f >= 1 then f:= f - 1 end; (* pop from buffer *) atomic e:= e + 1 end; (* verbrauchen *) END; Zusatzlich mu noch sichergestellt werden, da nicht mehrere Produzenten bzw. mehrere Konsumenten gleichzeitig auf den Puer zugreifen. D.h. (* put to buffer *) mu auf Produzentenseite atomar sein und (* pop from buffer *) mu auf Konsumentenseite atomar sein. Die beiden await Statements sind nun schon direkt mit Semaphoren zu implementieren: Producer: await e >= 1 then e:= e - 1 end Consumer: atomic e:= e + 1 end wird zu Producer: P(empty) Consumer: V(empty) 34 wobei empty als Semaphor mit k initialisiert wird. Und Consumer: await f >= 1 then f:= f - 1 end Producer: atomic f:= f + 1 end wird zu Consumer: P (full) Producer: V (full) wobei full als Semaphor mit 0 initialisiert wird. Damit ergibt sich die folgende Losungsskizze Producer (i): LOOP (* erzeugen *) P(empty); atomic_producer (* put to buffer *) end; V(full); END Consumer (i): LOOP P(full): atomic_consumer (* get from buffer *) end; V (empty); (* verbrauchen *) END; Zur Implementierung ist noch die Frage zu klaren, wie die Arbeit verteilt werden soll. Wir nehmen an, da am Anfang feststeht, wieviel Arbeit insgesamt zu tun ist. Diese Arbeit wird dann zu gleichen Teilen auf die Produzenten bzw. die Konsumenten aufgeteilt, wobei jeweils der 1. die verbleibenden Reste bearbeiten soll. (Unter der Annahme, da fur jeden mindestens eine Arbeit anfallt.) Falls die Arbeit dynamisch anfallt, mute man Synchronisationspunkte einfuhren, nach denen jeweils die Arbeit neu verteilt wird. // Producer-Consumer Problem mit Semaphoren (Bounded Buffer) /***************************************************************************** * pcsem Februar 96 Parallelrechnerpraktikum * * ---------------------------- * * * * Producer-Consumer, Bounded Buffer * * * * Implementierung des Producer-Consumer-Problems mit Hilfe von Semaphoren. * * * * * * * *****************************************************************************/ #include <pthread.h> #include "sema.h" #define MAXPROD 3 #define MAXCONS 7 #define MAXWORK 100000 typedef unsigned int uint; class BoundedBuffer 35 { private: int *b; uint siz; uint anf, end; sema e, f; sema w, r; // Der Ringbuffer. // Groesse des Buffers. // Zeiger auf Anfang und Ende des Buffers. // Semaphoren fuer Buffer leer bzw voll. // Semaphoren fuer atomares Schreiben/Lesen. BoundedBuffer(const BoundedBuffer& bb) = 0; public: BoundedBuffer(uint s =16); // ~BoundedBuffer(); void write(int v); int read(); }; BoundedBuffer::BoundedBuffer(uint s =16) : b(new int[s]), siz(s), anf(0), end(0), e(s), f(0), w(1), r(1) { if (b == NULL) { cerr << "Ringbuffer: Konnte keinen Speicher allokieren" << endl; exit(1); } } void BoundedBuffer::write(int v) { e.P(); // Platz im Buffer reservieren. w.P(); // atomares Schreiben. b[end]= v; end= (end+1) % siz; w.V(); } int { f.V(); // ein gefuellter Platz mehr. BoundedBuffer::read() int result; f.P(); // Platz im Buffer loeschen. r.P(); // atomares Lesen. result= b[anf]; anf= (anf+1) % siz; r.V(); e.V(); } // ein freier Platz mehr. return result; ///////////////////////////////////////////////////////////// ////// --- globale Variablen --////// ///////////////////////////////////////////////////////////// BoundedBuffer bb(MAXBUF); 36 ///////////////////////////////////////////////////////////// ////// --Producer und Consumer --////// ///////////////////////////////////////////////////////////// ADDRESS Producer(ADRESS x) { INTEGER j, k; INTEGER maxtodo, t; j= *((INTEGER *) x); SWRITE("Producer start", 15L); GWRITE(j); BLINES(0); maxtodo = MAXWORK / MAXPROD; t = MAXWORK % MAXPROD; if (j == 1) maxtodo = maxtodo + t; // my number. // Arbeit Aufteilen. // Der 1. macht a bisle mehr. for (int cnt= 0; cnt < maxtodo; cnt++) { k = 0; for (int i= 1; i < j; i++) // Work berechnen. k+= i*j; bb.write(k); // und in den Buffer schreiben. } k = 0; SWRITE("Producer done ", 14L); GWRITE(j); BLINES(0); return (ADDRESS)k; }; ADDRESS Consumer(ADDRESS x) { INTEGER j, k= 0; INTEGER maxtodo, t; j= *((INTEGER *) x); SWRITE("Consumer start", 15L); GWRITE(j); BLINES(0); maxtodo = MAXWORK / MAXCONS; t = MAXWORK % MAXCONS; if (j == 1) maxtodo = maxtodo + t; // Arbeit Aufteilen. // Der 1. macht a bisle mehr. for (int cnt= 0; cnt < maxtodo; cnt++) k+= bb.read(); k = 0; SWRITE("Consumer done", 14L); GWRITE(j); BLINES(0); return (ADDRESS)k; }; ///////////////////////////////////////////////////////////// ////// --M A I N --////// ///////////////////////////////////////////////////////////// struct S_1 { 37 pthread_t A[MAXPROD]; }; struct S_2 { pthread_t A[MAXCONS]; }; struct S_3 { ADDRESS A[MAXPROD]; }; struct S_4 { ADDRESS A[MAXCONS]; }; int main() { struct struct struct struct int i; S_1 S_2 S_3 S_4 pt; ct; pi; ci; for (i= 0; i < MAXPROD; i++) { pi.A[i]= (ADDRESS)i; if (pthread_create(&pt.A[i], pthread_attr_default, &Producer, &pi.A[i]) < 0) ERROR(severe, "Cannot create Producer.", 23L); } for (i= 0; i < MAXCONS; i++) { ci.A[i]= (ADDRESS)i; if (pthread_create(&ct.A[i], pthread_attr_default, &Consumer, &ci.A[i - 1]) < 0) ERROR(severe, "Cannot create Consumer.", 23L); } for (i= 0; i < MAXPROD; i++) if (pthread_join(pt.A[i], &pi.A[i]) < 0) ERROR(severe, "Cannot join Producer.", 21L); for (i= 0; i < MAXCONS; i++) if (pthread_join(ct.A[i - 1], &ci.A[i - 1]) < 0) ERROR(severe, "Cannot join Consumer.", 21L); return 0; }; Damit ist die Besprechung des Beispiels beendet. 38 Kapitel 4 Distributed Memory Computer In Multiprozessorsystemen tritt grundsatzlich das Problem der Datenubermittlung zwischen den einzelnen Prozessoren auf. Mehrprozessorsysteme mit gemeinsamen Hauptspeicher tauschen Informationen erfolgt die uber diesen Speicher aus, wobei die Zugrie darauf mit Hilfe des Betriebssystems synchronisiert werden. Mehrprozessorsysteme ohne gemeinsamen Hauptspeicher (z.B. Transputer, Workstation-Cluster, : : :) benoetigen spezielle Leitungen zur Kommunikation. (Links bei Transputern, Ethernet und TCP/IP bei Workstation-Clustern, Dateien (bzw. pipes) bei Unixprozessen). Die Programmierung solcher Systeme erfordert daher die Denition von geeigneten Leitungen und U bertragungsprotokollen. Zur Kommunikation werden explizite Befehle zum Senden und Empfangen von Daten benoetigt. Der Schutz von gemeinsamen Variablen ist nicht mehr erforderlich, da alle Programmteile als atomare Aktionen ausgefuhrt werden. Wir werden im folgenden zunachst wieder eine kompakte Notation vorstellen. Das nachste Kapitel befat sich dann mit der Umsetzung dieser Notation in ein Message Passing System PVM (Parallel Virtual Machine). 4.1 Kanale, Senden und Empfangen Physikalische Leitungen und U bertragungsprotokolle werden durch Kanale notiert: channel ch (v : t ; :::; vn : tn). 1 1 Dabei ist ch der Name des Kanals. vi bezeichnen (optionale) Variablennamen vom jeweiligen Typ ti . In der Implementierung konnen die channels sowohl direkte Hardware bezeichnen (z.B. Transputer-Links), aber auch Softwareprotokolle (pipes, sockets und bind bei Unix und TCP/IP). Beispielsweise deniert channel Eingabe (CHAR); channel Ausgabe (INTEGER); einen Eingabekanal fur Zeichen und einen Ausgabekanal fur Integer. Zur Darstellung der eigentlichen Datenubertragung dienen die Konstrukte send und receive. Der Aufruf von send besitzt folgende Syntax: send ch (e ; : : :; en). 1 39 Der Parameter ch bezeichnet einen U bertragungskanal, und ei stellen Ausdrucke vom Typ ti dar. Die Typen Ti entsprechen den Typen der Variablen des Kanals ch. Die Ausdrucke ei werden ausgewertet und das Ergebnis wird auf dem angegebenen Kanal ubertragen. Der Receive-Aufruf wird analog zum Send-Befehl folgendermaen deniert: receive ch (v ; : : :; vn). 1 ch ist wiederum der Name des U bertragungskanals, und die vi bezeichnen Variablen vom Typ ti entsprechend der Denition des Kanals ch. Der Befehl liest Daten aus dem Kanal und schreibt sie in die Variablen vi . Sind keine Daten vorhanden, blockiert receive die weitere Ausfuhrung des Programms, bis die erforderlichen Daten eingetroen sind. Die Semantik eines send/receive-Paares entspricht somit der Semantik von n Zuweisungen v1 := val(e1 ); : : :; vn := val(en ); Voraussetzung ist, da Daten ubertragen wurden (val, da im Empfanger keine Nebeneekte sichtbar werden.). Wahrend der Aufruf von receive immer blockierend ist, kann der Sendbefehl sowohl synchron als auch asynchron arbeiten: asynchronous send = non blocking send, und synchronous send = blocking send. Im ersten Fall wird das Programm nach dem Versenden der Daten ohne weitere Verzogerung fortgesetzt. Im zweiten Fall wird die weitere Programmausfurung nach dem Send so lange verzogert bis ein entsprechendes receive ausgefuhrt wurde. Das asynchrone send erfordert einen potentiell unbeschrankt groen Puer, wahrend das synchrone send nur einen Puer fester Groe erfordert. Die Programmierung des synchronen send ist aber aufwendiger, da es sehr genau auf die zeitlich richtige Reihenfolge aller send / receive Operationen ankommt. Das synchrone send wird daher fast nur mit sogenannten `selected receives' verwendet, bei denen mehrere receives zusammengefat werden und dasjenige receive zur Ausfuhrung ausgewahlt wird, fur das ein send wartet. Fur diesen Fall ist auch die Funktion empty(ch): BOOLEAN; wichtig, die testet, ob Daten in einem Kanal vorhanden sind. Bei der Verwendung von empty ist aber Vorsicht geboten, da die Funktion nur Momentaufnahmen vom Zustand des Kanals liefert. So konnen beispielsweise kurz nachdem die Funktion den Wert TRUE zuruckgeliefert hat, Daten eintreen. Umgekehrt konnen die Daten kurz nachdem die Funktion FALSE zuruckgeliefert hat, bereits durch einen anderen Proze abgeholt worden sein (falls mehrere Prozesse von diesem Kanal Daten empfangen). 4.1.1 Programmeigenschaften Wie bei der Kommunikation uber einen gemeinsamen Speicher, sind bei Message Passing Programmen die Eigenschaften: Korrektheit, Deadlock Freiheit, Mutual Exclusion (bei gem. Zugri auf externe Ressourcen) sowie Lebendigkeit (liveness): strongly fair scheduling (jeder Proze bekommt genugend Chancen, die Daten zu empfangen) von Bedeutung. Korrektheitsbeweise kann man dadurch fuhren, da man den Inhalt der Kanalpuer durch Hilfsvariablen beschreibt und mit in Betracht zieht. 40 Beispiel: Datenduplizierung Die Daten (einzelne Zeichen) sollen von einem Eingabekanal `input' gelesen und auf zwei Ausgabekanale `out1' und `out2' geschrieben werden. channel input (CHAR); channel out1 (CHAR); channel out2 (CHAR); dup: VAR c: CHAR; repeat receive input(c); send out1(c); send out2(c); until c = EOF; 4.2 Nachrichtenaustausch bei gemeinsamem Speicher Mit Hilfe von Semaphoren und Shared Memory wollen wir die Message Passing Primitive channel, send und receive implementieren. channel soll als globale (shared) Datenstruktur implementiert werden, auf die (nur) mittels send und receive zugegrien wird. Falls sonst keine shared Variablen benutzt werden, erhalten wir genau die Funktionalitat des Distributed Memory Message Passing. Wir werden zunachst asynchrones Message Passing implementieren, d.h. send soll nicht blockieren und receive soll blockieren, falls keine Message vorliegt. Zur Vereinfachung werden wir allerdings den Kanal mit einer xen Groe implementieren und send blockieren, falls der channel voll ist. Danach werden wir noch synchrones Message Passing implementieren, d.h. send blockiert, bis ein entsprechendes receive die Nachricht abgeholt hat. Wenn wir einen channel als Puer mit xer Groe implementieren wollen, konnen wir auf die Vorarbeiten zum Bounded Buer Problem zuruckgreifen. channel wird dann eine Datenstruktur, die den Puer enthalt, sowie die Zahler front und rear und die Semaphoren empty und full. Als Vorlage fur send nehmen wir die Produzenten Prozedur und als Vorlage fur receive nehmen wir die Konsumenten Prozedur. Damit ergibt sich folgende Losungsskizze. template<class T> class channel { private: T buffer[asyncmsg_maxbuf]; INTEGER front, rear; pthread_mutex_t s_mux, r_mux; sema *empty; sema *full; public: channel(); ~channel(); void send(T); void receive(T*); }; template<class T> channel<T>::channel() { 41 if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) { ERROR(severe, "Cannot initialize s_mux."); } if (pthread_mutex_init(&r_mux, pthread_mutexattr_default) < 0) { ERROR(severe, "Cannot initialize r_mux."); } front = 0; rear = 0; empty = new sema((INTEGER)asyncmsg_maxbuf); full = new sema((INTEGER)0); } template<class T> channel<T>::~channel() { if ( front != rear ) { ERROR(severe, "channel not empty."); } delete empty; delete full; } template<class T> void channel<T>::send(T v) { empty->P(); if (pthread_mutex_lock(&s_mux) < 0) { ERROR(severe, "send: Cannot lock s_mux."); } buffer[rear] = v; rear++; if (rear >= asyncmsg_maxbuf) { rear = 0; } if (pthread_mutex_unlock(&s_mux) < 0) { ERROR(severe, "send: Cannot unlock s_mux."); } full->V(); } template<class T> void channel<T>::receive(T* v) { (*full).P(); if (pthread_mutex_lock(&r_mux) < 0) { ERROR(severe, "receive: Cannot lock r_mux."); } *v = buffer[front]; buffer[front] = 0; front++; if (front >= asyncmsg_maxbuf) { front = 0; } if (pthread_mutex_unlock(&r_mux) < 0) { ERROR(severe, "receive: Cannot unlock r_mux."); 42 } (*empty).V(); } Falls der Puer des Kanals voll ist, fuhrt ch.empty->P() zum Blockieren von send. Dies birgt eine Gefahr von Deadlocks, falls damit ein nachfolgendes send auf einen anderen Kanal nicht ausgefuhrt werden kann. Z.B. P1: send (x, 1); send (y, 2); P2: receive (y, v); receive (x, w). Falls send (x, 1) in P1 blockiert, wird receive (x, w) in P2 nie ausgefuhrt, falls nicht ein anderer Prozess einmal ein receive (y, v) ausfuhrt, um den Sendepuer zu leeren. Mit einer leichten Modikation von send konnen wir nun auch synchrones Message Passing implementieren. Der Puer braucht dafur naturlich nur aus einem Element zu bestehen. Die Modikation besteht darin, die Operation ch.empty->P() von Beginn der send Prozedur ans Ende nach ch.full->P() zu verlegen. Der Semaphore empty mu dann auch mit 0 initialisiert werden. send blockiert dann solange, bis ein anderer Proze ein receive ausgefuhrt hat. Zusatzlich mu der gleichzeitige Zugri auf send ausgeschlossen werden. Zur vollstandigen Implementierung ist noch eine Funktion int channel<T>::empty(); erforderlich, mit der festgestellt werden kann, auf welchem Kanal Eingabedaten vorhanden sind, bevor das blockierende receive verwendet wird. Die empty-Funktion wird oft auch bei asynchronem Message Passing implementiert, sie wird dort aber nur aus Performance Grunden angeboten. Beim synchronen Message Passing ist empty aber sehr wichtig, um Deadlocks zu vermeiden. Implementierungsskizze: template<class T> class channel { private: T buffer; int data; // boolean. pthread_mutex_t s_mux; sema *empty; sema *full; public: channel(); ~channel(); void send(T); void receive(T*); int empty(); }; template<class T> channel<T>::channel() { 43 if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) { ERROR(severe, "Cannot initialize s_mux."); } empty = new sema((INTEGER)asyncmsg_maxbuf); full = new sema((INTEGER)0); } template<class T> channel<T>::~channel() { delete empty; delete full; } template<class T> channel<T>::empty() { return data; } template<class T> void channel<T>::send(T v) { if (pthread_mutex_lock(&s_mux) < 0) { ERROR(severe, "send: Cannot lock s_mux."); } buffer= v; data= 1; // true. full->V(); empty->P(); if (pthread_mutex_unlock(&s_mux) < 0) { ERROR(severe, "send: Cannot unlock s_mux."); } } template<class T> void channel<T>::receive(T* v) { (*full).P(); *v = buffer; data= 0; // false. (*empty).V(); } Warum wird kein atomic in receive benotigt ? Es folgen die Listings der C++ Versionen der Programme. Die Listings der Header-Dateien \ERROR.h" und \TYPES.h" benden sich im Anhang. // Async Message-Passing #include #include #include #include <pthread.h> "ERROR.h" "TYPES.h" "sema.h" 44 #define asyncmsg_maxbuf 200 template<class T> class channel { private: T buffer[asyncmsg_maxbuf]; INTEGER front, rear; pthread_mutex_t s_mux, r_mux; sema *empty; sema *full; public: channel(); ~channel(); void send(T); void receive(T*); }; template<class T> channel<T>::channel() { if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) { ERROR(severe, "Cannot initialize s_mux."); } if (pthread_mutex_init(&r_mux, pthread_mutexattr_default) < 0) { ERROR(severe, "Cannot initialize r_mux."); } front = 0; rear = 0; empty = new sema((INTEGER)asyncmsg_maxbuf); full = new sema((INTEGER)0); } template<class T> channel<T>::~channel() { if ( front != rear ) { ERROR(severe, "channel not empty."); } delete empty; delete full; } template<class T> void channel<T>::send(T v) { empty->P(); if (pthread_mutex_lock(&s_mux) < 0) { ERROR(severe, "send: Cannot lock s_mux."); } buffer[rear] = v; rear++; if (rear >= asyncmsg_maxbuf) { rear = 0; } if (pthread_mutex_unlock(&s_mux) < 0) { ERROR(severe, "send: Cannot unlock s_mux."); } full->V(); } template<class T> void channel<T>::receive(T* v) { (*full).P(); if (pthread_mutex_lock(&r_mux) < 0) { ERROR(severe, "receive: Cannot lock r_mux."); } *v = buffer[front]; buffer[front] = 0; front++; 45 if (front >= asyncmsg_maxbuf) { front = 0; } if (pthread_mutex_unlock(&r_mux) < 0) { ERROR(severe, "receive: Cannot unlock r_mux."); } (*empty).V(); } // Synchrones Message-Passing #include "ERROR.h" #include <pthread.h> #include "TYPES.h" #include "sema.h" template<class T> class channel { private: T buffer; int data; // boolean. pthread_mutex_t s_mux; sema *empty; sema *full; public: channel(); ~channel(); void send(T); void receive(T*); int empty(); }; template<class T> channel<T>::channel() { if (pthread_mutex_init(&s_mux, pthread_mutexattr_default) < 0) { ERROR(severe, "Cannot initialize s_mux."); } empty = new sema((INTEGER)0); full = new sema((INTEGER)0); } template<class T> channel<T>::~channel() { delete empty; delete full; } template<class T> channel<T>::empty() { return data; } template<class T> void channel<T>::send(T v) { if (pthread_mutex_lock(&s_mux) < 0) { ERROR(severe, "send: Cannot lock s_mux."); } buffer= v; data= 1; // true. full->V(); empty->P(); 46 if (pthread_mutex_unlock(&s_mux) < 0) { ERROR(severe, "send: Cannot unlock s_mux."); } } template<class T> void channel<T>::receive(T* v) { (*full).P(); *v = buffer; data= 0; // false. (*empty).V(); } Aufgabe 8 Implementierung des Duplizierungs-Filters mit shared memory Message Passing. Aufgabe 9 Implementierung des Produzenten-Konsumenten Beispiels mit Message Passing. Die Losung ergibt sich wie im alten Beispiel, nur entfallt die Puer Verwaltung und die Synchronisation, da nun alles von chaninit, send und receive erledigt wird. // producer consumer mit message passing #include #include #include #include #include "TYPES.h" <pthread.h> "asyncmsg.h" "ERROR.h" <iostream.h> #define maxprod 3 #define maxcons 7 #define maxwork 100000 channel<LONGINT> work; struct S_1 { pthread_t A[maxprod }; struct S_2 { pthread_t A[maxcons }; struct S_3 { ADDRESS A[maxprod }; struct S_4 { ADDRESS A[maxcons }; - 1 + 1]; - 1 + 1]; 1 + 1]; 1 + 1]; ADDRESS Producer(ADDRESS x) { INTEGER i, j, k, m; INTEGER maxtodo, t; j = * (INTEGER*) x; SWRITE("Producer start ", 15L); GWRITE(j); BLINES(0); maxtodo = maxwork / maxprod; t = maxwork % maxprod; if (j == 1) { 47 maxtodo = maxtodo + t; } m = 1; for (;;) { k = 0; { LONGINT B_1 = 1, B_2 = j; if (B_1 <= B_2) for (i = B_1;; i += 1) { k = k + i * j; if (i >= B_2) break; } } work.send(k); m = m + 1; if (m > maxtodo) { goto EXIT_1; } } EXIT_1:; k = 0; SWRITE("Producer done ", 14L); GWRITE(j); BLINES(0); return (ADDRESS)k; } ADDRESS Consumer(ADDRESS x) { INTEGER i, j, k, m; INTEGER maxtodo, t; j = * (INTEGER*) x; SWRITE("Consumer start ", 15L); GWRITE(j); BLINES(0); maxtodo = maxwork / maxcons; t = maxwork % maxcons; if (j == 1) { maxtodo = maxtodo + t; } m = 1; i = 0; for (;;) { work.receive( &k); i = i + k; m = m + 1; if (m > maxtodo) { goto EXIT_2; } } EXIT_2:; k = 0; SWRITE("Consumer done ", 14L); GWRITE(j); BLINES(0); return (ADDRESS)k; } static void tuwas() { struct S_1 pt; struct S_2 ct; struct S_3 pi; 48 struct S_4 ci; INTEGER i; // asyncmsg_chaninit(&work); i = 1; while (i <= maxprod) { pi.A[i - 1] = (ADDRESS)i; if (pthread_create(&pt.A[i - 1], pthread_attr_default, &Producer, &pi.A[i - 1]) < 0) { ERROR(severe, "Cannot create Producer.", 23L); } i = i + 1; } i = 1; while (i <= maxcons) { ci.A[i - 1] = (ADDRESS)i; if (pthread_create(&ct.A[i - 1], pthread_attr_default, &Consumer, &ci.A[i - 1]) < 0) { ERROR(severe, "Cannot create Consumer.", 23L); } i = i + 1; } i = 1; while (i <= maxprod) { if (pthread_join(pt.A[i - 1], &pi.A[i - 1]) < 0) { ERROR(severe, "Cannot join Producer.", 21L); } i = i + 1; } i = 1; while (i <= maxcons) { if (pthread_join(ct.A[i - 1], &ci.A[i - 1]) < 0) { ERROR(severe, "Cannot join Consumer.", 21L); } i = i + 1; } } int main() { tuwas(); } Aufgabe 10 Message Passing Produzenten-Konsumenten Beispiel mit 1. sovielen Kanalen wie Produzenten, 2. sovielen Kanalen wie Konsumenten. 49 Kapitel 5 PVM Parallel Virtual Machine PVM (Parallel Virtual Machine) ist ein Programmpaket, das es erlaubt, ein Netzwerk von heterogenen Rechnern als einen virtuellen parallelen Rechner zu benutzen. PVM benutzt im wesentlichen Unix Sockets und TCP/IP zur Kommunikation zwischen Rechnern. Jeder parallele virtuelle Rechner besteht aus einem Daemonproze, der auf jedem physikalischen Rechner lauft und einem Satz von Bibliotheksroutinen, mittels derer die Anwendungsprogramme mit den Daemons kommunizieren. A hnliche Pakete sind MPI eine Message Passing Bibliothek, die der zukunftige Standart werden soll; ermoglicht den Aufbau von Unterprogrammbibliotheken, derzeit noch keine Spezikation von Prozessverwaltungsfunktionen. TCGMSG eine Message Passing Bibliothek; optimiert fur shared memory Rechner, z.Z. nur synchrones send. PARMACS Macrobibliothek besonders fur Fortran (zuerst fur Suprenum entwickelt), P4 Message Passing Bibliothok, Macros und Funktionen fur shared und distributed memory Rechner. PVM hat eine hohe Funktionalitat und ist fur sehr viele Maschinen (praktisch alle mit Unix und TCP/IP) verfugbar. PVM benutzt das Standard XDR Dataexchange-Format, um Datenkonversionen zwischen diversen Rechnerarchitekturen durchzufuhren. Innerhalb von Shared Memory Rechnern (z.B. KSR1) ist PVM nicht das schnellste Paket. Fur verschiedene Architekturen, z.B. iPSC/860 wird versucht, direkt die Hardware Message Passing Primitive zu benutzen. Es gibt keine Vorkehrungen fur Fehlertoleranz, z.B. beim Absturz eines Rechners und es gibt keine dynamische Lastverteilung innerhalb der parallelen virtuellen Maschine. PVM wird gemeinsam von der Universitat von Tennessee, dem Oak Ridge National Laboratory und der Emory Universitat entwickelt. 5.1 U berblick uber PVM und die Message Passing Routinen PVM besteht aus etwa 64 Bibliotheksroutinen in libpvm3.a, einem Daemon Proze pvmd sowie einer Konsole pvm zur U berwachung der virtuellen Maschine. Die prinzipielle Funktionsweise von PVM wird in Abbildung 5.1 gezeigt. Auf jedem an PVM beteiligten Rechener lauft ein Daemonprozess pvmd, der mit den Benutzerprogrammen mittels der PVM Bibliotheksroutinen kommuniziert. Die PVM Daemone kommunizieren ihrerseits untereinander und vermitteln die Datenubertragung zwischen den Benutzerprogrammen (via TCP/IP oder UDP). 50 PVM daemon - PVM daemon 6 6 ? ? User Prog 1 User Prog 2 Computer A Computer B Tabelle 5.1: Funktionsschema von PVM Der PVM Demonprozess pvmd, wird etwa wie folgt (im Hintergrund) gestartet pvmd mytopology & Der Daemon lauft als normaler Benutzerprozess und benotigt keine besonderen Rechte. Als Argument benotigt pvmd ein sogenanntes Hostle mytopology in dem die an dieser parallelen virtuellen Maschine beteiligten Rechner deniert sind. Mit dem Starten von pvmd werden alle benotigten PVM Daemons auf allen verlangten Rechnern gestartet. Ein Hostle konnte z.B. so aussehen # pvm hosts file for Praktikum # defaults * ep=/home/kredel/pvm3/bin/%:/home/sw/pvm3/bin/% # hostnames suparum ep=/home/kredel/pvm3/bin/%:/home/kredel/cvs/mas/ksr1/machine Zeilen die mit # beginnen sind Kommentare. Jede sonstige Zeile deniert einen Hostrechner. z.B. die Zeile mit suparum deniert, da der Rechner mit dem Internet Namen suparum an PVM teilnimmt. Die auf den Hostnamen folgenden Argumente bezeichnen im obigen Beispiel die Pfadnamen in denen PVM die ausfuhrbaren Programme suchen soll (ep=). Ein % im Pfadnamen wird durch die aktuelle Architektur ersetzt (hier also durch KSR1). Die Zeile, die mit * als Hostnamen beginnt deniert defaults fur die verschiedenen Argumente. Weitere mogliche Argumente in einer Host Zeile sind lo=userid (deniert den Loginnamen auf dem Rechner), pw (prompted nach dem Password) und dx= (Pfadname fur den pvmd). U ber den Zustand der Daemone und der PVM tasks kann man sich mit der PVM Konsole informieren. Die PVM Konsole wird einfach mit pvm gestartet. Dann stehen z.B. die Kommandos help, conf, ps und halt zurverfugung. conf zeigt die Konguration der parallelen virtuellen Maschine an, d.h. die beteiligten Hosts. ps zeigt die gestarteten PVM Tasks an, d.h. die gestarteten und bei PVM angemeldeten Benutzerprogramme. halt stoppt alle PVM Tasks und alle PVM Daemone. 51 Von den PVM Routinen besprechen wir im Folgenden drei Gruppen: 1. channel name ( ): Denition und Erzeugung von Kanalen: pvm_spawn, pvm_my tid, pvm_parent, (pvm_exit, pvm_kill), message tags, 2. send (...): Packen und versenden: pvm_initsend, pvm_pk[type], pvm_send, (pvm_mcast), 3. receive (...): Empfangen und auspacken: pvm_recv, pvm_nrecv, pvm_upk[type]. Daruber hinaus gibt es noch eine Reihe weiterer Funktionen zur Beschaung von Informationen und zur Konguration der virtuellen Maschine, sowie Funktionen zur Bearbeitung mehrer Message Buer. Zusatzlich existiert noch ein experimentelles Paket zur Gruppenkommunikation. 5.2 Denition und Erzeugung von Kanalen Im folgenden wollen wir das channel ch (v : t , ...vn: tn); 1 1 Sprachkonstrukt implementieren. Die Identikation von Kanalen erfolgt in PVM zum einen mittels einer sogenannten Taskid (tid) und zum anderen mittels eines sog. Message Tags (msgtag). Eine taskid wird jeder PVM task ( Unix Proze) bei der Erzeugung oder bei der Registrierung zugeordnet. Damit ist eine eindeutige Bezeichnung von Sendern und Empfangern von Nachrichten moglich. Ein msgtag wird bei send und receive benutzt, um zwischen mehreren Kanalen zwischen Sendern und Empfangern unterscheiden zu konnen. Die msgtags sind frei wahlbar. Ein Kanal konnte somit durch folgenden Datentyp beschrieben werden. TYPE chan = RECORD tid: int; msgtag: int; v1: t1; ...; vn: tn; END; (32 bit init) ( " ) Da die msgtags von der Anwendung frei wahlbar sind, bleibt noch die Bestimmung der taskid. Dazu stehen die PVM-Funktionen mytid, parent und spawn mit den folgenden Spezikationen zur Verfugung: int pvm_mytid ( ); Bestimmung der eigenen Taskid und Registrierung beim PVM Daemon, falls noch nicht erfolgt. int pvm_parent ( ); Bestimmung der Taskid der Eltern Task. int pvm_exit ( ); Verlassen von PVM, d.h. Beenden der Verbindung zum PVM Daemon. int pvm_spawn ( char *file; char * argv[]; int flags; 52 char *where; int count; int tids[] ); Erzeugen von neuen PVM tasks. Spawn erzeugt count-viele neue Unix-Prozesse vom Programm file mit Argumenten argv auf Rechnern / Architekturen where mit Flags flags. In dem Array tids werden die Taskids aller erzeugten Tasks zuruckgeliefert, die Anzahl der erfolgreichen Tasks ist der Funktionsruckgabewert. Wichtig fur die Erzeugung neuer Tasks ist der file Parameter, der den Dateinamen des Unix-Prozesses angibt, der gestartet werden soll. Die Datei wird in $PVM_ROOT/bin/$PVM_ARCH gesucht bzw. in den Directories, die bei der Konguration der virtuellen Maschine mit dem `ep=' Parameter speziziert werden. Der Default fur $PVM_ROOT ist $HOME/pvm3 und $PVM_ARCH die entsprechende Architektur. Die Parameter, die der i-ten Task ubergeben werden, stehen in argv[i]. Mit Hilfe der `flags' und dem Parameter `where' kann festgelegt werden, ob die Task auf einen beliebigen Rechner (innerhalb der virtuellen Maschine), auf Rechnern einer bestimmten Architektur oder auf explizit genannten Rechnern gestartet werden sollen. Das Array `tids' enthalt nach Beendigung von Spawn die Taskids der erzeugten Tasks bzw. den jeweiligen Fehlercode, falls eine bestimmte Task nicht erzeugt werden konnte. Der spawn-Funktionswert ist die Anzahl der erfolgreich gestarteten Tasks. Eine mogliche Implementierung ware somit durch die Deklaration der Datentypen task und channel zusammen mit taskinit und chaninit. TYPE task = RECORD tid: int; flag: int; file, arg, where: string; END; TYPE channel = RECORD t: task msgtag: int; mode: int; END; PROCEDURE taskinit (VAR t: task); BEGIN (* initialize fields *) mytid:= pvm.mytid(); IF pvm_parent THEN t.tid:= parent. ELSE pvmspawn (t.file,...) t.tid:= ... END END taskinit; PROCEDURE chaninit (VAR ch: channel; t: task; no: init); BEGIN ch.t:=t; ch.msgtag:= no; ch.mode:= PVMDataDefault; END chaninit. 53 erzeugt entweder eine neue PVM Task oder es stellt die Verbindung zur Elterntask her. (1 Level). Damit wurde taskinit channel aus (v: INTEGER); channel ein (v: INTEGER); zum Beispiel als VAR t:task; aus: channel; ein: channel; taskinit(t); chaninit(aus, t, 1); chaninit(ein, t, 2); implementiert. 5.3 Packen und Versenden Im folgenden wollen wir das Sprachkonstrukt send chan (e ; ::::; en); 1 implementieren. Zur Verfugung stehen uns die PVM-Funktionen initsend, pk[typ] und send (u.U. mcast) mit den folgenden Spezikationen. int pvm_initsend (int encoding); Initialisiert den Sendepuffer mit der Packungsmethode encoding. (Default ist Raw) int pvm_pk[typ] (x *typ, int n, int s); Packt n Elemente mit stride s von dem Array x vom Typ typ in den aktuellen Sendepuffer. int pvm_send (tid: int; msgtag; int): int; Senden des aktuellen Sendepuffers an task tid mit message tag msgtag. Im Fehlerfall geben alle Routinen einen Wert < 0 zuruck, sonst 0. typ kann sein: byte, int, long, float, complex, short, string und double. Damit ist send chan (e1 ; : : :; en) VAR x1: ARRAY [0..1] OF t1; ... xn: ARRAY [0..1] OF tn; e:=pvm_initsend (PVMDataDefault): x1[0]:=v1; 54 e:=pvm_pk[t1] (x1, 1, 1); ... xn[0]:=vn; e:=pvm_pk[t1] (xn, 1, 1); e:= pvm_send (chan.task.tid, chan.msgtag); 5.4 Empfangen und Entpacken Im folgenden wollen wir das Sprachkonstrukt receive chan (v ; :::; vn); 1 implementieren. Zur Verfugung stehen uns die PVM-Funktionen recv, nrcv und upk[typ] mit den folgenden Spezikationen. int pvm_upk[typ] (typ *x; int n, int s); Entpackt n Elemente mit stride s von dem Array x vom Typ typ in den aktuellen Sendepuffer. int pvm_recv (int tid, int msgtag); Empfangen des aktuellen Sendepuffers von task tid mit message tag msgtag. Die Version pvm_nrcv ist eine nicht blockierende Variante von pvm_recv. Der Ruckgabewert ist hierbei 0 falls keine Nachricht vorliegt, sonst ein Wert > 0 (ein buer identier). Im Fehlerfall geben alle Routinen einen Wert < 0 zuruck, sonst 0. typ kann wie oben bei pvm_pk sein: byte, int, long, float, complex, short, string und double. Damit ist receive chan (v1 ; :::; vn) VAR x1: ARRAY [0..1] OF t1; ... xn: ARRAY [0..1] OF tn; e:= pvm_recv (chan.task.tid, chan.msgtag); e:= pvm_upk[t1] (x1, 1, 1); v1:=x1[0]; ... e:= pvm_upk[tn] (xn, 1, 1); vn:=xn[0]; 5.5 Produzenten Konsumenten Beispiel Aufgabe 11 Implementiere channel, send und receive mit PVM. Losungsskizze: 55 DEFINITION MODULE MsgPas; (* Message Passing with PVM Definition Module. *) FROM KSR1 IMPORT int; FROM pvm IMPORT ptrtchr; TYPE task = RECORD tid: int; flag: int; file: ptrtchr; arg: ptrtchr; where: ptrtchr; END; TYPE channel = RECORD t: task; msgtag: int; mode: int; END; PROCEDURE taskinit(VAR t: task); (* Task initialization. *) PROCEDURE chaninit(VAR ch: channel; VAR t: task; no: int); (* Channel initialization. *) PROCEDURE send(VAR ch: channel; v: INTEGER); (* Send v to channel ch. *) PROCEDURE receive(VAR ch: channel; VAR v: INTEGER); (* Receive v from channel ch. *) END MsgPas. IMPLEMENTATION MODULE MsgPas; (* Message Passing with PVM Implementation Module. *) FROM SYSTEM IMPORT ADR, ADDRESS; FROM KSR1 IMPORT int; FROM pvm IMPORT pvm_mytid, pvm_parent, pvm_exit, pvm_initsend, pvm_send, pvm_recv, pvm_pklong, pvm_upklong, PvmTaskDefault, PvmDataDefault, PvmDataRaw, PvmNoParent, ptrtchr, pvm_spawn; FROM MASERR IMPORT ERROR, severe, fatal; CONST file = "pcmsg"; CONST PARM = " -m 400 "; CONST where = ""; VAR mytid: int; PROCEDURE taskinit(VAR t: task); (* Task initialization. *) VAR argv: ARRAY [0..1] OF ptrtchr; tids: ARRAY [0..1] OF int; f: int; tid: int; BEGIN 56 (*1*) (*initialize fields*) t.tid:=-1; t.flag:=PvmTaskDefault; t.file:=ADDRESS(file); t.arg:=ADR(PARM); t.where:=ADDRESS(where); IF mytid = -1 THEN mytid:=pvm_mytid(); END; (*2*) (*see if we have a parent *) tid:=pvm_parent(); IF tid <> PvmNoParent THEN t.tid:=tid; RETURN; END; (*3*) (*initialize spawn parms*) argv[0]:=t.arg; argv[1]:=ADDRESS(0); (*4*) (* create task or connect to parent *) f:=pvm_spawn( t.file, argv, t.flag, t.where, 1, tids); IF f <> 1 THEN ERROR(severe,"taskinit: Cannot spawn pvm task."); RETURN; END; t.tid:=tids[0]; (*9*) END taskinit; PROCEDURE chaninit(VAR ch: channel; VAR t: task; no: int); (* Channel initialization. *) BEGIN (*1*) (*initialize *) ch.t:=t; ch.msgtag:=no; ch.mode:=PvmDataRaw; (*9*) END chaninit; PROCEDURE send(VAR ch: channel; v: INTEGER); (* Send v to channel ch. *) VAR val: ARRAY [0..1] OF int; BEGIN (*1*) (*init buffer*) IF pvm_initsend( ch.mode ) < 0 THEN ERROR(severe,"send: Cannot initialize buffer."); END; (*2*) (*deposit message*) val[0]:=v; IF pvm_pklong( val, 1, 1 ) < 0 THEN ERROR(severe,"send: Cannot put into buffer."); END; (*3*) (*send*) IF pvm_send( ch.t.tid, ch.msgtag ) < 0 THEN ERROR(severe,"send: Cannot send on channel."); END; (*9*) END send; PROCEDURE receive(VAR ch: channel; VAR v: INTEGER); (* Receive v from channel ch. *) VAR val: ARRAY [0..1] OF int; BEGIN (*1*) (*receive*) IF pvm_recv( ch.t.tid, ch.msgtag ) < 0 THEN ERROR(severe,"receive: Cannot receive on channel."); END; (*2*) (*deposit message*) IF pvm_upklong( val, 1, 1 ) < 0 THEN ERROR(severe,"receive: Cannot get from buffer."); END; v:=val[0]; (*3*) END receive; BEGIN mytid:=-1; END MsgPas. Aufgabe 12 Implementiere das Produzenten Konsumenten Problem mit Message Passing mit PVM. 57 MODULE pc1msg; (* Producer Consumer with PVM MP Module. *) FROM SYSTEM IMPORT ADDRESS, TSIZE; FROM MsgPas IMPORT task, taskinit, channel, chaninit, send, receive; FROM pvm IMPORT pvm_parent, pvm_exit; FROM MASELEM IMPORT GAMMAINT; FROM MASERR IMPORT harmless, severe, fatal, ERROR; FROM MASBIOS IMPORT SWRITE, GWRITE, BLINES; CONST maxprod = 1; CONST maxcons = 1; CONST maxwork = 10000; VAR co: task; work: channel; PROCEDURE Producer(VAR x: ADDRESS): ADDRESS; (* same as in pcmsg.mi. *) PROCEDURE Consumer(VAR x: ADDRESS): ADDRESS; (* same as in pcmsg.mi. *) PROCEDURE tuwas; (* wie der name schon sagt. *) VAR i, o: INTEGER; BEGIN (*0*) (*initialize*) i:=1; taskinit(co); chaninit(work, co, 1); (*2*) (*start*) IF pvm_parent() < 0 THEN o:=Producer(i); ELSE o:=Consumer(i) END; (*3*) (*exit*) pvm_exit(); (*9*) END tuwas; BEGIN tuwas; END pc1msg. 5.6 Verteilte Semaphore Als nachstes Problem wollen wir mit PVM und Pthreads verteilte dezentrale Semaphore implementieren. Das Problem entsteht wenn mehrere verteilte Prozesse auf eine gemeinsame (externe) Resource mit mutual exclusion zugreifen wollen. Als Alternativen zur Losung gibt es 1. Ein Prozess verwaltet den Semaphor und teilt den anderen Prozessen mittels send und receive den Zustand des Semaphores mit. 2. Jeder Prozess macht eine lokale Buchfuhrung uber den Zustand des Semaphores und die Zustandsinformation wird (auch mittels send und receive) in einem synchronisierten Zustand gehalten. Die 1. Variante hat den Vorteil, da sie sehr einfach zu implementieren ist. Der Nachteil besteht aber darin, da ein Ausfall des Verwaltungsprozesses zum Zusammenbruch des ganzen Systems fuhrt. Der 58 Nachteil der 2. Variante besteht in dem groeren Kommunikationsoverhead der zur Verwaltung des Semaphors notwendig ist. Aber da alle Informationen dezentral verwaltet werden, ist es leichter ein Protokoll einzubauen, das den Ausfall eines Prozesses erkennt und es den anderen erlaubt weiter zu arbeiten. Zur Losung der 2. Variante benotigen wir einige Vorbereitungen. 5.6.1 Logische Uhren In einem Netz von Rechnern mit verschiedenen Hardwareuhren ist es zunachst erforderlich die Ereignisse (send und receive) geeignet zu ordnen. D.h. wenn ein Prozess eine Nachricht erhalt, so soll sicher gestellt werden, da seine lokale Zeit groer der lokalen Zeit des Senders (zum Sendezeitpunkt) ist. Ein Verfahren, das dies garantiert sind die sogenannten logischen Uhren. Dabei verwendet jeder beteiligte Prozess eine lokale (Integer) Variable, in der die aktuelle Zeit gezahlt wird. Bei jeder send und receive Operation wird die lokale Zeit weitergezahlt. Zusatzlich wird bei jedem send die lokale Zeit mitgeschickt. Der Empfanger bildet dann nach einem receive seine neue Zeit als das Maximum aus seiner bisherigen Zeit und der Zeit, zu der die Nachricht abgeschickt wurde. Genauer ergibt sich das folgende Schema. lc bezeichnet die lokale Zeitvariable und ts bezeichnet den `Timestamp' der Nachricht. send: send semch ( , , lc ); INC(lc); receive: receive semch ( , , ts ); lc:=MAX(lc, ts + 1); INC(lc); Damit ergibt sich eine partielle Ordnung auf den (send und receive) Ereignissen. 5.6.2 User und Helper Pthread Jeder beteiligte Prozess fuhrt lokal einen Semaphorenzahler und eine Tabelle (request message queue) der verlangten Semaphoroperationen, siehe Abbildung 5.2. Die Tabelle wird nach der Sendezeit (ts) und dem Sender (s) der Anforderungen sortiert gehalten. z fully ack. }| ts,s ts,s ts,s {z some}|ack. ts,s ts,s ts,s ts,s ts,s {z * unused }| * * { Tabelle 5.2: Request Message Queue Jede Anforderung von einer P bzw. V Operation wird lokal vermerkt und an alle Partner ein `Acknowledgement' geschickt, da man die Anforderung vermerkt hat. Falls von allen Partnern wieder die Acknowledgements eingetroen sind wird die angeforderte Operation lokal an dem Semaphorenzahler ausgefuhrt und gegebenenfalls die weitere Verarbeitung angestossen. Im einzelnen sind folgende Schritte zu beachten. Mit PVM mussen in einem Vorbereitungsschritt die taskids aller beteiligten Prozesse verteilt und empfangen werden. Die Kanale sind als channel semch ( sender: INTEGER, action: Action, timestmp: INTEGER) deniert, mit 59 Action = f P, V, Ack, Fin g. Die lokale Bearbeitung in einem Prozess wird von je einem `User' und einem `Helper' Pthread durchgefuhrt. await soll mit einem lokalen Semaphore implementiert werden. Damit ergibt sich folgende Losungsskizze fur distributed semaphores. User(i): P(): broadcast c-sem( ..., P, ...); await go end; V(): broadcast c-sem( ..., V, ...); Helper(i): receive c-sem[i]( ..., action, ... ); CASE action = P: / insert into queue, action = V: \ broadcast c-sem( ..., Ack, ... ) action = Ack: at:=minimal Ack-time; FORALL acknowledged V DO INC(sem); remove from queue END; FORALL acknowledged P DO DEC(sem); IF my-request THEN signal go END; remove from queue END; action = Fin: EXIT Loop; Es folgt das Listing des Programms. MODULE dsema; (* Distributed Semaphores with PVM MP Module. *) FROM SYSTEM IMPORT ADDRESS, TSIZE; FROM KSR1 IMPORT int; FROM MsgPas IMPORT task, taskinit, channel, chaninit, send, receive; FROM pvm IMPORT pvm_mytid, pvm_parent, pvm_exit, pvm_initsend, pvm_send, pvm_recv, pvm_pklong, pvm_upklong, PvmTaskDefault, PvmDataDefault, PvmDataRaw, PvmNoParent, ptrtchr, pvm_spawn; FROM pthread IMPORT pthread_t, pthread_mutex_t, pthread_cond_t, attr_default, mutexattr_default, condattr_default, pthread_cond_init, pthread_cond_wait, pthread_cond_broadcast, pthread_mutex_init, pthread_mutex_lock, pthread_mutex_unlock, pthread_create, pthread_join; IMPORT sema; FROM MASELEM IMPORT GAMMAINT; FROM MASERR IMPORT harmless, severe, fatal, ERROR; FROM MASBIOS IMPORT SWRITE, GWRITE, BLINES; 60 CONST maxco = 3; CONST maxq = 100; CONST P V Ack Fin Undef TYPE event send action ts = = = = = 10; 20; 99; 0; -1; = : : : RECORD INTEGER; INTEGER; INTEGER; END; VAR mq: ARRAY[0..maxq] OF event; VAR valid: INTEGER; lc : INTEGER; PROCEDURE insert(s, a, t: INTEGER); (* Insert event in queue. *) VAR i, j, k: INTEGER; BEGIN (*1*) (*search location *) i:=0; j:=-1; k:=-1; WHILE (i <= valid) AND ( (mq[i].ts < t) OR ((mq[i].ts = t) AND (mq[i].send < s)) ) DO IF mq[i].action = Undef THEN k:=i END; i:=i+1 END; (*2*) (*insert *) IF i > valid THEN (*enter new*) valid:=valid+1; mq[valid].send:=s; mq[valid].action:=a; mq[valid].ts:=t; RETURN; END; (* now i <= valid *) (* and (mq[i].ts > t) OR ((mq[i].ts = t) AND (mq[i].send > s)) *) (* = unmoeglich*) valid:=valid+1; j:=valid; WHILE i < j DO (*move up*) mq[j].send:=mq[j-1].send; mq[j].action:=mq[j-1].action; mq[j].ts:=mq[j-1].ts; j:=j-1 END; mq[i].send:=s; mq[i].action:=a; mq[i].ts:=t; (*3*) (*compress *) IF k < 0 THEN RETURN END; i:=0; WHILE (i <= valid) AND (mq[i].action <> Undef) DO i:=i+1 END; j:=i; i:=i-1; WHILE j <= valid DO IF mq[j].action = Undef THEN j:=j+1 ELSE mq[i].send:=mq[j].send; mq[i].action:=mq[j].action; mq[i].ts:=mq[j].ts; j:=j+1; i:=i+1 END; END; valid:=i-1; (*9*) END insert; PROCEDURE Acktime(s, t: INTEGER): INTEGER; 61 (* Determine fully acknowledgement time. *) VAR i, j: INTEGER; BEGIN (*1*) (*update *) IF mintime[s] < t THEN mintime[s]:=t END; (*2*) (*find minimum *) j:=mintime[0]; i:=1; WHILE i <= maxco DO IF mintime[i] < j THEN j:=mintime[i] END; i:=i+1; END; RETURN(j); (*9*) END Acktime; PROCEDURE broadcast3(VAR cha: ARRAY OF channel; n, v1, v2, v3: INTEGER); (* Send v1, v2, v3 to first n channels of cha. *) VAR i: INTEGER; BEGIN (*1*) (*loop *) i:=0; WHILE i <= n DO send3(cha[i], v1, v2, v3); i:=i+1 END; (*9*) END broadcast3; PROCEDURE send3(VAR ch: channel; v1, v2, v3: INTEGER); (* Send v1, v2, v3 to channel ch. *) VAR val: ARRAY [0..2] OF int; BEGIN (*1*) (*init buffer*) sema.P(bmux); IF pvm_initsend( ch.mode ) < 0 THEN ERROR(severe,"send3: Cannot initialize buffer."); END; (*2*) (*deposit message*) val[0]:=v1; val[1]:=v2; val[2]:=v3; IF pvm_pklong( val, 3, 1 ) < 0 THEN ERROR(severe,"send3: Cannot put into buffer."); END; (*3*) (*send*) IF pvm_send( ch.t.tid, ch.msgtag ) < 0 THEN ERROR(severe,"send3: Cannot send on channel."); END; sema.V(bmux); (*9*) END send3; PROCEDURE receive3(VAR ch: channel; VAR v1, v2, v3: INTEGER); (* Receive v1, v2, v3 from channel ch. *) VAR val: ARRAY [0..2] OF int; BEGIN (*1*) (*receive*) IF pvm_recv( ch.t.tid, ch.msgtag ) < 0 THEN ERROR(severe,"receive3: Cannot receive on channel."); END; (*2*) (*deposit message*) IF pvm_upklong( val, 3, 1 ) < 0 THEN ERROR(severe,"receive3: Cannot get from buffer."); END; v1:=val[0]; v2:=val[1]; v3:=val[2]; (*3*) END receive3; VAR co: ARRAY [0..maxco+1] OF task; semch: ARRAY [0..maxco+1] OF channel; mintime: ARRAY [0..maxco] OF INTEGER; PROCEDURE Master(VAR x: ADDRESS): ADDRESS; (* Send coworkers all tids. *) VAR i, j, k, m: INTEGER; t: INTEGER; 62 BEGIN (*1*) (*my number *) j:=INTEGER(x); SWRITE("Master start "); GWRITE(j); BLINES(0); (*2*) (*initialize coworkers*) i:=0; co[i].tid:=mytid; (*hack*) chaninit(semch[i], co[i], 1); i:=1; WHILE i <= maxco DO taskinit(co[i]); chaninit(semch[i], co[i], 1); i:=i+1; END; i:=maxco+1; co[i].tid:=-1; (*hack*) chaninit(semch[i], co[i], 1); (*3*) (*broadcast tids except of master*) i:=1; WHILE i <= maxco DO m:=1; WHILE m <= maxco DO send(semch[i],co[m].tid); m:=m+1; END; i:=i+1; END; (*5*) (*finish*) k:=0; SWRITE("Master done "); GWRITE(j); BLINES(0); RETURN( ADDRESS(k) ) (*9*) END Master; PROCEDURE Coworker(VAR x: ADDRESS): ADDRESS; (* Receive tids of coworkers. *) VAR i, j, k, m: INTEGER; t: INTEGER; BEGIN (*1*) (*my number *) j:=INTEGER(x); SWRITE("Coworker start "); GWRITE(j); BLINES(0); (*2*) (*initialize channel to master*) i:=0; taskinit(co[i]); chaninit(semch[i], co[i], 1); (*2*) (*receive tids of coworkers*) k:=-1; m:=1; WHILE m <= maxco DO receive(semch[i],t); IF t = mytid THEN k:=m END; co[m].tid:=t; (*hack*) chaninit(semch[m], co[m], 1); m:=m+1; END; m:=maxco+1; co[m].tid:=-1; (*hack*) chaninit(semch[m], co[m], 1); (*5*) (*finish*) SWRITE("Coworker done "); GWRITE(j); BLINES(0); RETURN( ADDRESS(k) ) (*9*) END Coworker; PROCEDURE DoWork(VAR x: ADDRESS): ADDRESS; (* Do some work. *) VAR i, j, k: INTEGER; ts: INTEGER; BEGIN (*1*) (*my number *) j:=INTEGER(x); SWRITE("Do work start "); GWRITE(j); BLINES(0); lc:=0; (*2*) (*get lock*) broadcast3(semch,maxco,j,P,lc); INC(lc); sema.P(smux); SWRITE("Lock now held by "); GWRITE(j); BLINES(0); 63 (*3*) (*release lock*) broadcast3(semch,maxco,j,V,lc); INC(lc); (*5*) (*finish*) k:=0; IF j = 0 THEN ERROR(severe,"DoWork: need a break."); broadcast3(semch,maxco,j,Fin,lc); INC(lc); ELSE ERROR(harmless,"DoWork: need a break."); END; SWRITE("Do work done "); GWRITE(j); BLINES(0); RETURN( ADDRESS(k) ) (*9*) END DoWork; PROCEDURE DoHelp(VAR x: ADDRESS): ADDRESS; (* Do some help. *) VAR i, j, k: INTEGER; lc, ts, at: INTEGER; sem: INTEGER; sender, action: INTEGER; BEGIN (*1*) (*my number *) j:=INTEGER(x); SWRITE("Do help start "); GWRITE(j); BLINES(0); lc:=0; sem:=1; (*one may get the lock*) (*2*) (*process requests *) LOOP receive3(semch[maxco+1],sender,action,ts); IF ts >= lc THEN lc:=ts+1 END; INC(lc); SWRITE("DoHelp: sender = "); GWRITE(sender); SWRITE(", action = "); GWRITE(action); SWRITE(", timestamp = "); GWRITE(ts); IF action <> Ack THEN BLINES(0); END; CASE action OF | P : insert(sender,action,ts); broadcast3(semch,maxco,j,Ack,lc); INC(lc); | V : insert(sender,action,ts); broadcast3(semch,maxco,j,Ack,lc); INC(lc); | Ack : at:=Acktime(sender,ts); SWRITE(", Acktime = "); GWRITE(at); BLINES(0); i:=0; WHILE (i <= valid) AND (mq[i].ts < at) DO IF mq[i].action = V THEN sem:=sem+1; mq[i].action:=Undef; (* = remove *) END; i:=i+1 END; i:=0; WHILE (i <= valid) AND (mq[i].ts < at) DO IF mq[i].action = P THEN IF sem > 0 THEN sem:=sem-1; IF mq[i].send = j THEN (*my request*) sema.V(smux); END; mq[i].action:=Undef; (* = remove *) END; END; i:=i+1 END; | Fin : IF sem = 1 THEN EXIT; ELSE ERROR(harmless,"DoHelp: semaphore <> 1."); EXIT; END; ELSE ERROR(severe,"DoHelp: invalid action"); EXIT END; END (*loop*); (*5*) (*finish*) SWRITE("Do help done "); GWRITE(j); BLINES(0); RETURN( ADDRESS(k) ) (*9*) END DoHelp; VAR mytid: int; 64 VAR bmux, smux: sema.semaphore; PROCEDURE tuwas; (* wie der name schon sagt. *) VAR i, o, j: INTEGER; pt: pthread_t; BEGIN (*1*) (*initialization*) mytid:=pvm_mytid(); valid:=-1; j:=0; WHILE j <= maxco DO mintime[j]:=-1; j:=j+1 END; sema.seminit(bmux,1); sema.seminit(smux,0); (*2*) (*start*) i:=1; IF pvm_parent() < 0 THEN o:=Master(i); ELSE o:=Coworker(i) END; i:=o; (*3*) (*do work and help *) IF pthread_create(pt,attr_default,DoHelp,i) < 0 THEN ERROR(severe,"Cannot create DoHelp."); END; o:=DoWork(i); IF pthread_join(pt,o) < 0 THEN ERROR(severe,"Cannot join DoHelp."); END; (*4*) (*exit*) pvm_exit(); (*9*) END tuwas; BEGIN tuwas; END dsema. Die Ausgabe konnte zum Beispiel wie folgt aussehen. Es wird nur die Ausgabe des Master Prozesses gezeigt. Master start 1 Master done 1 Do work start 0 Do help start 0 DoHelp: sender = 0, action = 10, timestamp = 0 DoHelp: sender = 0, action = 99, timestamp = 2, Acktime = -1 DoHelp: sender = 1, action = 10, timestamp = 0 DoHelp: sender = 2, action = 10, timestamp = 0 DoHelp: sender = 2, action = 99, timestamp = 2, Acktime = -1 DoHelp: sender = 0, action = 99, timestamp = 5, Acktime = -1 DoHelp: sender = 2, action = 99, timestamp = 5, Acktime = -1 DoHelp: sender = 1, action = 99, timestamp = 2, Acktime = 2 DoHelp: sender = 2, action = 99, timestamp = 7, Acktime = 2 DoHelp: sender = 0, action = 99, timestamp = 7, Acktime = 2 DoHelp: Lock now held by 0 ** severe error: DoWork: need a break. (a)bort, (b)reak, (c)ontinue, (d)ebug, <ENTER> ? sender = 1, action = 99, timestamp = 5, Acktime = 5 DoHelp: sender = 0, action = 20, timestamp = 1 (Bem. *) DoHelp: sender = 2, action = 99, timestamp = 16, Acktime = 5 DoHelp: sender = 0, action = 99, timestamp = 16, Acktime = 5 DoHelp: sender = 1, action = 99, timestamp = 7, Acktime = 7 DoHelp: sender = 1, action = 99, timestamp = 16, Acktime = 16 DoHelp: sender = 1, action = 20, timestamp = 1 DoHelp: sender = 0, action = 99, timestamp = 22, Acktime = 16 DoHelp: sender = 2, action = 99, timestamp = 22, Acktime = 16 DoHelp: sender = 2, action = 20, timestamp = 1 DoHelp: sender = 2, action = 99, timestamp = 26, Acktime = 16 DoHelp: sender = 1, action = 99, timestamp = 22, Acktime = 22 65 DoHelp: sender = 0, action = 99, timestamp = 26, Acktime = 22 DoHelp: sender = 1, action = 99, timestamp = 26, Acktime = 26 continue. Do work done 0 DoHelp: sender = 0, action = 0, timestamp = 2 Do help done 0 Nach Master done sind die Subtasks mit spawn gestartet und die Kanale zwischen allen den Tasks initialisiert, d.h. auch die Subtasks kennen sich gegenseitig. Von den Subtasks werden entsprechend die Coworker Prozeduren abgearbeitet. Danach werden (von allen drei Tasks 0 1 2) die eigentlichen Arbeitsund Hilfstasks gestartet (DoWork und DoHelp). Die Zeilen, die mit DoHelp: beginnen, zeigen nun die Aktivitaten in der Hilfstask nach dem Empfang eines Requests. Es wird jeweils der Sender, die verlangte Aktion und die logische Zeit des Requests angegeben. Die letzte Angabe ist die minimale Acknowledgement Zeit, d.h. die Zeit zu der sicher jede beteiligte Task die Requests registriert hat. Die Aktionen sind wie folgt kodiert: 10 P Operation, 20 V Operation, 99 Acknowledge Operation, 0 Terminiere. Die einzelnen Aktivitaten sind wie folgt. P request von 0, Ack dazu von 0, P request von 1, P request von 2, Ack von 2, zweites Ack von 0, zweites Ack von 2, schlielich das noch fehlende erste Ack von 1. Ab diesem Zeitpunkt ist der erste P Request von allen Tasks registriert worden und die minimale Acknowledgement Zeit springt auf 2 (2 logische Ticks, da bis dahin 2 send/receives durchgefuhrt wurden). Warend noch 2 Acks eintreen (von 2 und 0) bekommt die Task 0 den Lock zugesprochen (Lock now held by 0). Die nachfolgende Fehlermeldung dient nur dazu die Programmausfuhrung eine Weile aufzuhalten. Dann wird von Task 0 der Semaphore wieder freigegeben, d.h. es wird eine V Operation requested. Nachdem alle Tasks diesen Request registriert haben bekommt Task 1 den Lock zugesprochen. Dies ist an dem nachfolgenden V Request von 1 ersichtlich. Warum bekommt die Task 1 den Lock und nicht Task 2, obwohl beide Tasks auf einen Lock warten ? Kann es sein, da sich zwei Task beide den Lock zusprechen ? Nachdem von Task 1 der Semaphore wieder freigegeben wurde, und alle anderen Tasks den Request registriert haben, bekommt endlich auch Task 2 den Lock zugesprochen. Nach der Freigabe des Locks durch Task 2 terminieren alle Arbeits- und Hilfstasks. Bemerkung: An der Stelle (Bem. *) bendet sich ein Fehler, der Timestamp mute groer als 1 sein. Der Fehler entstand durch die Verwendung der lc Variablen als lokale Variable in DoWork und DoHelp, lc h atte eine globale Variable sein mussen. Der Fehler ist in dem Programm-Listing beseitigt, nicht aber in dem Beispiel. Allerdings muten die A nderungen von lc atomar erfolgen, d.h. lc mute durch einen Mutex geschutzt werden. Damit ist die Besprechung des Beispiels beendet. 66 Anhang A Benutzung der Compiler In diesem Kapitel stellen wir einige Hinweise zur Benutzung der verwendeten Compiler zusammen. Fur die vollstandigen Informationen mussen wir auf die entsprechenden man pages und auf das info System verweisen. Weitere nutzliche Hinweise benden sich in dem Skript Nutung des Parallelrechners SNI-KSR des LRZ Munchen. A.1 C Header Dateien: #include <pthread.h> Aufruf: cc -para -o name name.c : Name der Datei mit dem C Programm. -o name: Name der Datei mit dem ausfuhrbaren Programm (executable). -para: bereitstellen aller benotigten Bibliotheken. name.c A.2 C++ Header Dateien: #include <pthread.h> Aufruf: CC -para -o name name.C name.C: Name der Datei mit dem C++ Programm. -o name: Name der Datei mit dem ausf uhrbaren Programm (executable). -para: bereitstellen aller benotigten Bibliotheken. 67 A.3 Modula-2 Denition Module: IMPORT KSR1; IMPORT pthread; Aufruf: mtc name.mi $(IMPORTS) cc -para -o name name.c $(LIBS) : Name der Datei mit dem Modula-2 Programm. name.c: Name der Datei mit dem erzeugten C Programm. -o name: Name der Datei mit dem ausfuhrbaren Programm (executable). $(IMPORTS): Variable mit den Namen der Import Directories. $(LIBS): Variable mit den Namen der Linkbibliotheken. name.mi Siehe auch den Abschnitt uber Makeles. A.4 FORTRAN Header Dateien, z. B.: include '/usr/local/include/fpvm3.h' Externe Common-Blocke, z. B.: & & & common /pthread_defaults/ ipthread_attr_default, ipthread_mutexattr_default, ipthread_condattr_default, ipthread_barrierattr_default Aufruf: f77 -para -o name name.f : Name der Datei mit dem FORTRAN Programm. -o name: Name der Datei mit dem ausfuhrbaren Programm (executable). -para: bereitstellen aller benotigten Bibliotheken. name.f A.5 KAP Aufruf: KAP name.F >name.f : Name der Datei mit dem KAP Programm. name.f: Name der Datei mit dem FORTRAN Programm. name.F 68 A.6 PVM Header Dateien: #include <pvm3.h> Aufruf: cc -para -o name name.c $(PVMLIB) : Name der Datei mit dem C Programm. -o name: Name der Datei mit dem ausfuhrbaren Programm (executable). -para: bereitstellen aller benotigten Bibliotheken. $(PVMLIB): Variable mit den Namen der PVM Linkbibliotheken. name.c A.7 Makeles Inhalt, z.B.: PVMLIB=$(PVM_ROOT)/lib/$(PVMARCH)/libpvm3.a MTCFLAGS=-d/usr/local/lib/reuse -d/usr/local/lib/src/gmd/ksr/lib name.h: name.md <TAB> /usr/local/bin/mtc $(MTCFLAGS) $(<) name.c: name.mi <TAB> /usr/local/bin/mtc $(MTCFLAGS) $(<) name.o: name.c <TAB> cc -c $(<) $(&) name: name.o <TAB> cc -para $(<) $(PVMLIB) Aufruf: make name Beispiel zu Aufgabe 3. # # # # # -------------------------------------------------------$Id: Makefile,v 1.1.1.1 1993/06/11 12:54:41 kredel Exp $ -------------------------------------------------------$Log: $ -------------------------------------------------------- GMD=/usr/local MTC=$(GMD)/bin/mtc MTCFLAGS=-d$(GMD)/lib/reuse -d$(GMD)/lib/src/gmd/ksr/lib %.h:%.md 69 $(MTC) $(MTCFLAGS) $< %.c:%.mi $(MTC) $(MTCFLAGS) $< CFLAGS=-I$(GMD)/include/m2c -I$(GMD)/lib/src/gmd/ksr/include LLIB=$(GMD)/lib/src/gmd/ksr/lib/libksrmtc.a all: ppauf3: ppauf3.o cc -para $(CFLAGS) -o $@ $? $(LLIB) $(GMD)/lib/mtc/libreuse.a ppauf3.c: ppauf3.mi clean: /bin/rm -f *.o *.c 70 Anhang B Header Dateien und Denition Moduln In diesem Anhang werden die wichtigsten C Header Dateien und Modula-2 Denitionsmoduln aufgefuhrt. B.1 Thread Denition Modul FOREIGN MODULE thread; (* Mach thread Foreign Module. *) FROM SYSTEM IMPORT ADDRESS; FROM KSR1 IMPORT int, card; (* For the definitions below see /usr/include/mach.h *) TYPE port_t TYPE processor_name_t TYPE thread_t TYPE ksr_thread_state ceu_regs ipu_regs fpu_regs xui_timers ceu_context ipu_context fpu_context ceu_trap ipu_trap fpu_trap xiu_trap reserved = ADDRESS; = port_t; = port_t; = : : : : : : : : : : : : RECORD ARRAY [0..31] OF LONGINT; ARRAY [0..31] OF LONGINT; ARRAY [0..63] OF LONGINT; ARRAY [0..1] OF LONGINT; LONGINT; LONGINT; LONGINT; LONGINT; LONGINT; LONGINT; LONGINT; ARRAY [0..63] OF LONGINT; (*for safety*) END; 71 TYPE thread_state_t = (*POINTER TO*) ksr_thread_state; CONST KSR_THREAD_STATE = 1; (* Mach thread status functions. *) PROCEDURE thread_get_state(thread: thread_t; flavor: int; old_state: thread_state_t; VAR old_stateCnt: card): LONGINT; (*Get thread state. *) END thread. B.2 Pthread Denition Modul FOREIGN MODULE pthread; (* Pthread Foreign Module. *) FROM SYSTEM IMPORT ADDRESS; FROM KSR1 IMPORT int, card, align128int, align128long; FROM thread IMPORT thread_t, processor_name_t; (* For the definitions below see /usr/include/pthread.h *) TYPE any_t TYPE slot_t TYPE timespec = = = : : tv_sec tv_nsec TYPE pthread_mutexattr flags logfile loginfo spinlimit LONGCARD; int; RECORD (* from <timers.h> *) CARDINAL; INTEGER; END; RECORD LONGINT; POINTER TO ADDRESS; int; int; END; TYPE pthread_mutexattr_t = POINTER TO pthread_mutex; TYPE pthread_mutex lock loginfo flags name spinlimit yieldcount = : : : : = : : : : : : RECORD align128int; int; LONGINT; POINTER TO CHAR; int; int; 72 logfile locker unlocker timestamp attr dummy1 dummy2 dummy3 dummy4 : : : : : : : : : TYPE pthread_mutex_t POINTER TO ADDRESS; POINTER TO pthread; POINTER TO pthread; timespec; pthread_mutexattr_t; ADDRESS; ADDRESS; ADDRESS; ADDRESS; END; = (*POINTER TO*) (*???*) pthread_mutex; TYPE pthread_condattr flags logfile loginfo = : : : RECORD LONGINT; POINTER TO ADDRESS; int; END; TYPE pthread_condattr_t = POINTER TO pthread_condattr; TYPE pthread_func TYPE pthread_func_t = PROCEDURE(VAR ADDRESS): ADDRESS; = POINTER TO pthread_func; TYPE pthread_attr flags stacksize logfile loginfo = : : : : TYPE pthread_attr_t TYPE p_queue RECORD LONGINT; LONGINT; POINTER TO ADDRESS; int; END; = POINTER TO pthread_attr; TYPE pthread_queue = RECORD : POINTER TO p_queue; : POINTER TO p_queue; END; = POINTER TO p_queue; TYPE pthread_condition lock waiters flags name loginfo logfile timestamp attr = : : : : : : : : next prev TYPE pthread_cond_t RECORD (*must fit in subpage*) align128int; pthread_queue; LONGINT; POINTER TO CHAR; int; POINTER TO ADDRESS; timespec; pthread_condattr_t; END; = (*POINTER TO*) (*???*) pthread_condition; 73 TYPE pthread = link : all_thread_link: flags : state : vp : name : lock : join_count : loginfo : logfile : done : func : arg : returned : attr : specific_data : cancel_state : cleanup_queue : reserved : TYPE pthread_t RECORD pthread_queue; POINTER TO pthread; card; card; POINTER TO ADDRESS; POINTER TO CHAR; ADDRESS; card; int; POINTER TO ADDRESS; pthread_cond_t; pthread_func_t; any_t; any_t; pthread_attr_t; POINTER TO ADDRESS; (*specific_data*) ADDRESS; (*pthread_cancel_t*) POINTER TO ADDRESS; (*pthread_cleanup_handler_t*) ARRAY [0..7] OF LONGINT; (*for safety*) END; = POINTER TO pthread; CONST rcsid = "$Id: pthread.md,v 1.1.1.1 1993/06/11 12:54:54 kredel Exp $"; CONST copyright = "Copyright (c) 1993 - 1994 Universitaet Mannheim"; CONST TH_BOUND_PROCESSOR = 2; (* TH_BOUND_RING0 = 1; TH_BOUND_NOWHERE = 0; *) VAR (*pthread_*) condattr_default: pthread_condattr_t; VAR (*pthread_*) mutexattr_default: pthread_mutexattr_t; VAR (*pthread_*) attr_default: pthread_attr_t; (* POSIX thread functions for mutexes. *) PROCEDURE pthread_mutex_init(VAR mutex: pthread_mutex_t; attr: pthread_mutexattr_t): int; (*Initialize a mutex. *) PROCEDURE pthread_mutex_destroy(VAR mutex: pthread_mutex_t): int; (*Destroy a mutex. *) PROCEDURE pthread_mutex_lock(VAR mutex: pthread_mutex_t): int; 74 (*Lock a mutex. *) PROCEDURE pthread_mutex_trylock(VAR mutex: pthread_mutex_t): int; (*Try to lock a mutex. *) PROCEDURE pthread_mutex_unlock(VAR mutex: pthread_mutex_t): int; (*Unlock a mutex. *) (* POSIX thread functions for condition variables. *) PROCEDURE pthread_cond_init(VAR cond: pthread_cond_t; attr: pthread_condattr_t): int; (*Initialize a condition variable. *) PROCEDURE pthread_cond_destroy(VAR cond: pthread_cond_t): int; (*Destroy a condition variable. *) PROCEDURE pthread_cond_wait(VAR cond: pthread_cond_t; VAR mutex: pthread_mutex_t): int; (*Wait on a condition wrt mutex. *) PROCEDURE pthread_cond_timedwait(VAR cond: pthread_cond_t; VAR mutex: pthread_mutex_t; VAR timeout: timespec): int; (*Wait with timeout on a condition wrt mutex. *) PROCEDURE pthread_cond_signal(VAR cond: pthread_cond_t): int; (*Signal a condition. *) PROCEDURE pthread_cond_broadcast(VAR cond: pthread_cond_t): int; (*Broadcast a condition. *) (* POSIX thread functions for threads. *) PROCEDURE pthread_self(): pthread_t; (*Get thread id of myself. *) PROCEDURE pthread_create( VAR thread: attr: start_routine: VAR arg: (*Create and start a thread. *) pthread_t; pthread_attr_t; pthread_func; ADDRESS ): int; PROCEDURE pthread_detach( VAR thread: pthread_t): int; (*Detach a thread. *) PROCEDURE pthread_join(thread: pthread_t; VAR status: ADDRESS): int; (*Join a thread. *) PROCEDURE pthread_exit(VAR status: ADDRESS); 75 (*Exit a thread. *) PROCEDURE pthread_yield(status: ADDRESS); (*Yield. *) PROCEDURE pthread_cancel(thread: pthread_t); (*Cancel a thread. *) (* POSIX thread scheduling for threads. *) PROCEDURE pthread_getproc(thread: pthread_t; VAR processor: processor_name_t) : int; (*Get the processor a thread runs on. *) PROCEDURE pthread_move(thread: pthread_t; destproc: processor_name_t; bind: int): int; (*Move a thread to a processor. *) PROCEDURE pthread_procsetinfo(pcell: ARRAY OF processor_name_t; len: int): int; (*Information about processors in procset. *) PROCEDURE pthread_getthread(thread: pthread_t; VAR machthread: thread_t): int; (*Get the MACH thread id of a pthread. *) END pthread. B.3 PVM C Header Datei /* * * * * * * * * * * * * * * * * * * * PVM version 3.3: Parallel Virtual Machine System University of Tennessee, Knoxville TN. Oak Ridge National Laboratory, Oak Ridge TN. Emory University, Atlanta GA. Authors: A. L. Beguelin, J. J. Dongarra, G. A. Geist, W. C. Jiang, R. J. Manchek, B. K. Moore, and V. S. Sunderam (C) 1992 All Rights Reserved NOTICE Permission to use, copy, modify, and distribute this software and its documentation for any purpose and without fee is hereby granted provided that the above copyright notice appear in all copies and that both the copyright notice and this permission notice appear in supporting documentation. Neither the Institutions (Emory University, Oak Ridge National Laboratory, and University of Tennessee) nor the Authors make any representations about the suitability of this software for any 76 * purpose. This software is provided ``as is'' without express or * implied warranty. * * PVM version 3 was funded in part by the U.S. Department of Energy, * the National Science Foundation and the State of Tennessee. */ /* * pvm3.h * * Libpvm3 includes. * $Log$ */ #ifndef _PVM3_H_ #define _PVM3_H_ #include <sys/time.h> /* * Data packing styles for pvm_initsend() */ #define #define #define #define PvmDataDefault 0 PvmDataRaw 1 PvmDataInPlace 2 PvmDataFoo 3 /* * pvm_spawn options */ #define PvmTaskDefault 0 #define PvmTaskHost 1 /* specify host */ #define PvmTaskArch 2 /* specify architecture */ #define PvmTaskDebug 4 /* start task in debugger */ #define PvmTaskTrace 8 /* process generates trace data */ /* for MPP ports */ #define PvmMppFront 16 /* spawn task on service node */ #define PvmHostCompl 32 /* complement host set */ /* * pvm_notify types */ #define PvmTaskExit 1 /* on task exit */ #define PvmHostDelete 2 /* on host fail/delete */ #define PvmHostAdd 3 /* on host startup */ /* 77 * for pvm_setopt and pvm_getopt */ #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define PvmRoute 1 /* routing policy */ PvmDontRoute 1 /* don't allow direct task-task links */ PvmAllowDirect 2 /* allow direct links, but don't request */ PvmRouteDirect 3 /* request direct links */ PvmDebugMask 2 /* debugmask */ PvmAutoErr 3 /* auto error reporting */ PvmOutputTid 4 /* stdout destination for children */ PvmOutputCode 5 /* stdout message tag */ PvmTraceTid 6 /* trace destination for children */ PvmTraceCode 7 /* trace message tag */ PvmFragSize 8 /* message fragment size */ PvmResvTids 9 /* allow reserved message tids and codes */ PvmSelfOutputTid 10 /* stdout destination for task */ PvmSelfOutputCode 11 /* stdout message tag */ PvmSelfTraceTid 12 /* trace destination for task */ PvmSelfTraceCode 13 /* trace message tag */ /* * for pvm_[sg]ettmask */ #define PvmTaskSelf 0 /* this task */ #define PvmTaskChild 1 /* (future) child tasks */ /* * Libpvm error codes */ #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define #define PvmOk 0 /* Error 0 */ PvmBadParam -2 /* Bad parameter */ PvmMismatch -3 /* Count mismatch */ PvmNoData -5 /* End of buffer */ PvmNoHost -6 /* No such host */ PvmNoFile -7 /* No such file */ PvmNoMem -10 /* Malloc failed */ PvmBadMsg -12 /* Can't decode message */ PvmSysErr -14 /* Can't contact local daemon */ PvmNoBuf -15 /* No current buffer */ PvmNoSuchBuf -16 /* No such buffer */ PvmNullGroup -17 /* Null group name */ PvmDupGroup -18 /* Already in group */ PvmNoGroup -19 /* No such group */ PvmNotInGroup -20 /* Not in group */ PvmNoInst -21 /* No such instance */ PvmHostFail -22 /* Host failed */ PvmNoParent -23 /* No parent task */ PvmNotImpl -24 /* Not implemented */ PvmDSysErr -25 /* Pvmd system error */ PvmBadVersion -26 /* Version mismatch */ 78 #define #define #define #define #define #define #define PvmOutOfRes -27 /* Out of resources */ PvmDupHost -28 /* Duplicate host */ PvmCantStart -29 /* Can't start pvmd */ PvmAlready -30 /* Already in progress */ PvmNoTask -31 /* No such task */ PvmNoEntry -32 /* No such entry */ PvmDupEntry -33 /* Duplicate entry */ /* * Data types for pvm_reduce(), pvm_psend(), pvm_precv() */ #define #define #define #define #define #define #define #define #define #define #define #define PVM_STR 0 /* string */ PVM_BYTE 1 /* byte */ PVM_SHORT 2 /* short */ PVM_INT 3 /* int */ PVM_FLOAT 4 /* real */ PVM_CPLX 5 /* complex */ PVM_DOUBLE 6 /* double */ PVM_DCPLX 7 /* double complex */ PVM_LONG 8 /* long integer */ PVM_USHORT 9 /* unsigned short int */ PVM_UINT 10 /* unsigned int */ PVM_ULONG 11 /* unsigned long int */ /* * returned by pvm_config() */ struct pvmhostinfo { int hi_tid; /* pvmd tid */ char *hi_name; /* host name */ char *hi_arch; /* host arch */ int hi_speed; /* cpu relative speed */ }; /* * returned by pvm_tasks() */ struct pvmtaskinfo { int ti_tid; /* task id */ int ti_ptid; /* parent tid */ int ti_host; /* pvmd tid */ int ti_flag; /* status flags */ char *ti_a_out; /* a.out name */ int ti_pid; /* task (O/S dependent) process id */ }; #ifdef __ProtoGlarp__ #undef __ProtoGlarp__ 79 #endif #if defined(__STDC__) || defined(__cplusplus) #define __ProtoGlarp__(x) x #else #define __ProtoGlarp__(x) () #endif #ifdef __cplusplus extern "C" { #endif int pvm_addhosts __ProtoGlarp__(( char **, int, int * )); int pvm_archcode __ProtoGlarp__(( char * )); int pvm_barrier __ProtoGlarp__(( char *, int )); int pvm_bcast __ProtoGlarp__(( char *, int )); int pvm_bufinfo __ProtoGlarp__(( int, int *, int *, int * )); /* int pvm_catchout __ProtoGlarp__(( FILE * )); */ int pvm_catchout(); int pvm_config __ProtoGlarp__(( int *, int *, struct pvmhostinfo ** )); int pvm_delete __ProtoGlarp__(( char *, int )); int pvm_delhosts __ProtoGlarp__(( char **, int, int * )); int pvm_exit __ProtoGlarp__(( void )); int pvm_freebuf __ProtoGlarp__(( int )); int pvm_gather __ProtoGlarp__(( void*, void*, int, int, int, char*, int)); int pvm_getfds __ProtoGlarp__(( int ** )); int pvm_getinst __ProtoGlarp__(( char *, int )); int pvm_getmwid __ProtoGlarp__(( int )); int pvm_getopt __ProtoGlarp__(( int )); int pvm_getrbuf __ProtoGlarp__(( void )); int pvm_getsbuf __ProtoGlarp__(( void )); int pvm_gettid __ProtoGlarp__(( char *, int )); int pvm_gsize __ProtoGlarp__(( char * )); int pvm_halt __ProtoGlarp__(( void )); int pvm_hostsync __ProtoGlarp__(( int, struct timeval *, struct timeval * )); int pvm_initsend __ProtoGlarp__(( int )); int pvm_insert __ProtoGlarp__(( char *, int, int )); int pvm_joingroup __ProtoGlarp__(( char * )); int pvm_kill __ProtoGlarp__(( int )); int pvm_lookup __ProtoGlarp__(( char *, int, int * )); int pvm_lvgroup __ProtoGlarp__(( char * )); int pvm_mcast __ProtoGlarp__(( int *, int, int )); int pvm_mkbuf __ProtoGlarp__(( int )); int pvm_mstat __ProtoGlarp__(( char * )); int pvm_mytid __ProtoGlarp__(( void )); int pvm_notify __ProtoGlarp__(( int, int, int, int * )); int pvm_nrecv __ProtoGlarp__(( int, int )); 80 int pvm_packf __ProtoGlarp__(( const char *, ... )); int pvm_parent __ProtoGlarp__(( void )); int pvm_perror __ProtoGlarp__(( char * )); int pvm_pkbyte __ProtoGlarp__(( char *, int, int )); int pvm_pkcplx __ProtoGlarp__(( float *, int, int )); int pvm_pkdcplx __ProtoGlarp__(( double *, int, int )); int pvm_pkdouble __ProtoGlarp__(( double *, int, int )); int pvm_pkfloat __ProtoGlarp__(( float *, int, int )); int pvm_pkint __ProtoGlarp__(( int *, int, int )); int pvm_pklong __ProtoGlarp__(( long *, int, int )); int pvm_pkshort __ProtoGlarp__(( short *, int, int )); int pvm_pkstr __ProtoGlarp__(( char * )); int pvm_pkuint __ProtoGlarp__(( unsigned int *, int, int )); int pvm_pkulong __ProtoGlarp__(( unsigned long *, int, int )); int pvm_pkushort __ProtoGlarp__(( unsigned short *, int, int )); int pvm_precv __ProtoGlarp__(( int, int, void *, int, int, int *, int *, int * )); int pvm_probe __ProtoGlarp__(( int, int )); int pvm_psend __ProtoGlarp__(( int, int, void *, int, int )); int pvm_pstat __ProtoGlarp__(( int )); int pvm_recv __ProtoGlarp__(( int, int )); int (*pvm_recvf __ProtoGlarp__(( int (*)(int, int, int) )) )(); int pvm_reduce __ProtoGlarp__(( void (*)(int*, void*, void*, int*, int*), void *, int, int, int, char *, int )); /* * Predefined pvm_reduce functions */ void PvmMax __ProtoGlarp__(( int *, void *, void *, int *, int * )); void PvmMin __ProtoGlarp__(( int *, void *, void *, int *, int * )); void PvmSum __ProtoGlarp__(( int *, void *, void *, int *, int * )); void PvmProduct __ProtoGlarp__(( int *, void *, void *, int *, int * )); int pvm_reg_hoster __ProtoGlarp__(( void )); int pvm_reg_rm __ProtoGlarp__(( struct pvmhostinfo ** )); int pvm_reg_tasker __ProtoGlarp__(( void )); int pvm_scatter __ProtoGlarp__(( void*, void*, int, int, int, char*, int)); int pvm_send __ProtoGlarp__(( int, int )); int pvm_sendsig __ProtoGlarp__(( int, int )); int pvm_setmwid __ProtoGlarp__(( int, int )); int pvm_setopt __ProtoGlarp__(( int, int )); int pvm_setrbuf __ProtoGlarp__(( int )); int pvm_setsbuf __ProtoGlarp__(( int )); 81 int pvm_spawn __ProtoGlarp__(( char *, char **, int, char *, int, int * )); int pvm_start_pvmd __ProtoGlarp__(( int, char **, int )); int pvm_tasks __ProtoGlarp__(( int, int *, struct pvmtaskinfo ** )); int pvm_tickle __ProtoGlarp__(( int, int *, int *, int * )); int pvm_tidtohost __ProtoGlarp__(( int )); int pvm_trecv __ProtoGlarp__(( int, int, struct timeval * )); int pvm_unpackf __ProtoGlarp__(( const char *, ... )); int pvm_upkbyte __ProtoGlarp__(( char *, int, int )); int pvm_upkcplx __ProtoGlarp__(( float *, int, int )); int pvm_upkdcplx __ProtoGlarp__(( double *, int, int )); int pvm_upkdouble __ProtoGlarp__(( double *, int, int )); int pvm_upkfloat __ProtoGlarp__(( float *, int, int )); int pvm_upkint __ProtoGlarp__(( int *, int, int )); int pvm_upklong __ProtoGlarp__(( long *, int, int )); int pvm_upkshort __ProtoGlarp__(( short *, int, int )); int pvm_upkstr __ProtoGlarp__(( char * )); int pvm_upkuint __ProtoGlarp__(( unsigned int *, int, int )); int pvm_upkulong __ProtoGlarp__(( unsigned long *, int, int )); int pvm_upkushort __ProtoGlarp__(( unsigned short *, int, int )); char *pvm_version __ProtoGlarp__(( void )); #ifdef __cplusplus } #endif #endif /*_PVM3_H_*/ B.4 PVM Denition Modul (* ---------------------------------------------------------------------------* Id: pvm33.md,v 1.1 1994/10/06 14:13:39 kredel Exp * ---------------------------------------------------------------------------* Copyright (c) 1993 - 1994 Universitaet Mannheim * ---------------------------------------------------------------------------* Log: pvm33.md,v * Revision 1.1 1994/10/06 14:13:39 kredel * Intermediate pvm33 interface * * ---------------------------------------------------------------------------*) FOREIGN MODULE pvm33; (* PVM Version 3.3 Foreign Module. *) FROM KSR1 IMPORT int; 82 FROM SYSTEM IMPORT ADDRESS; CONST rcsid = "$Id: pvm33.md,v 1.1 1994/10/06 14:13:39 kredel Exp $"; CONST copyright = "Copyright (c) 1993 - 1994 Universitaet Mannheim"; (*For the definitions compare pvm3.3/include/pvm3.h *) (* Data packing styles for pvm_initsend *) CONST CONST CONST CONST PvmDataDefault PvmDataRaw PvmDataInPlace PvmDataFoo = = = = 0; 1; 2; 3; (* pvm_spawn options *) CONST PvmTaskDefault = 0; CONST PvmTaskHost = 1; (* specify host *) CONST PvmTaskArch = 2; (* specify architecture *) CONST PvmTaskDebug = 4; (* start task in debugger *) CONST PvmTaskTrace = 8; (* process generates trace data *) (* for MPP ports *) CONST PvmMppFront = 16; (* spawn task on service node *) CONST PvmHostCompl = 32; (* complement host set *) (* pvm_notify types *) CONST CONST CONST PvmTaskExit PvmHostDelete PvmHostAdd = 1; = 2; = 3; (* on task exit *) (* on host fail/delete *) (* on host startup *) (* for pvm_setopt and pvm_getopt *) CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST PvmRoute = 1; (* routing policy *) PvmDontRoute = 1; (* don't allow direct task-task links *) PvmAllowDirect = 2; (* allow direct links, but don't request *) PvmRouteDirect = 3; (* request direct links *) PvmDebugMask = 2; (* debugmask *) PvmAutoErr = 3; (* auto error reporting *) PvmOutputTid = 4; (* stdout destination for children *) PvmOutputCode = 5; (* stdout message tag *) PvmTraceTid = 6; (* trace destination for children *) PvmTraceCode = 7; (* trace message tag *) PvmFragSize = 8; (* message fragment size *) PvmResvTids = 9; (* allow reserved message tids and codes *) PvmSelfOutputTid = 10; (* stdout destination for task *) 83 CONST PvmSelfOutputCode = 11; (* stdout message tag *) CONST PvmSelfTraceTid = 12; (* trace destination for task *) CONST PvmSelfTraceCode = 13; (* trace message tag *) (* for pvm_[sg]ettmask *) CONST PvmTaskSelf = 0; (* this task *) CONST PvmTaskChild = 1; (* (future) child tasks *) (* Libpvm error codes *) CONST PvmOk = 0; CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST CONST PvmBadParam PvmMismatch PvmNoData PvmNoHost PvmNoFile PvmNoMem PvmBadMsg PvmSysErr PvmNoBuf PvmNoSuchBuf PvmNullGroup PvmDupGroup PvmNoGroup PvmNotInGroup PvmNoInst PvmHostFail PvmNoParent PvmNotImpl PvmDSysErr PvmBadVersion PvmOutOfRes PvmDupHost PvmCantStart PvmAlready PvmNoTask PvmNoEntry PvmDupEntry = = = = = = = = = = = = = = = = = = = = = = = = = = = -2; -3; -5; -6; -7; -10; -12; -14; -15; -16; -17; -18; -19; -20; -21; -22; -23; -24; -25; -26; -27; -28; -29; -30; -31; -32; -33; (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* (* okay *) reserve -1 *) bad parameter (neg msg id, etc) *) barrier count mismatch *) read past end of buffer *) no such host *) no such executable *) can't get memory *) received msg can't be decoded *) can't contact our pvmd/some system error *) no current buffer *) bad message id *) null group name is illegal *) already in group *) no group with name *) task not in group *) no such instance in group *) host failed *) no parent task *) function not implemented *) pvmd system error *) pvmd-pvmd protocol version mismatch *) out of resources *) host already configured *) failed to exec new slave pvmd *) already doing operation *) no such task *) no such name, index pair *) name, index pair already exists *) (* Data types for pvm_reduce(), pvm_psend(), pvm_precv() *) CONST CONST CONST CONST CONST CONST PVM_STR = 0; (* string *) PVM_BYTE = 1; (* byte *) PVM_SHORT = 2; (* short *) PVM_INT = 3; (* int *) PVM_FLOAT = 4; (* real *) PVM_CPLX = 5; (* complex *) 84 CONST CONST CONST CONST CONST CONST PVM_DOUBLE = 6; (* double *) PVM_DCPLX = 7; (* double complex *) PVM_LONG = 8; (* long integer *) PVM_USHORT = 9; (* unsigned short int *) PVM_UINT = 10; (* unsigned int *) PVM_ULONG = 11; (* unsigned long int *) (* PVM type definitions *) TYPE pvmhostinfo = RECORD hi_tid : int; hi_name : POINTER TO CHAR; hi_arch : POINTER TO CHAR; hi_speed : int; END; (* (* (* (* (* returned by pvm_config *) pvmd tid *) host name *) host arch *) cpu relative speed *) TYPE pvmtaskinfo = RECORD ti_tid : int; ti_ptid : int; ti_host : int; ti_flag : int; ti_a_out : POINTER TO CHAR; ti_pid : int; END; (* (* (* (* (* (* (* returned by pvm_tasks *) task id *) parent tid *) pvmd tid *) status flags *) a.out name *) task (O/S dependent) process id *) TYPE compfunc = PROCEDURE ((*mid:*) int, (*tid:*) int, (*msgtag:*) int): int; (* comparison function to accept messages *) TYPE redufunc = PROCEDURE ( VAR (*datatype:*) int, VAR (*x:*) ADDRESS, VAR (*y:*) ADDRESS, VAR (*num:*) int, VAR (*info:*) int ); (* function for pvm_reduce *) TYPE ptrtchr = POINTER TO CHAR; (* PVM functions *) PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE pvm_addhosts pvm_archcode pvm_barrier pvm_bcast pvm_bufinfo ( ( ( ( ( PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE pvm_catchout pvm_config pvm_delete pvm_delhosts pvm_exit pvm_freebuf pvm_gather ( ( ( ( ( ( ( names: ADDRESS; count: int; svp: ADDRESS ): xx: ptrtchr ): int; group: ptrtchr; count: int ): int; group: ptrtchr; msgtag: int ): int; mid: int; VAR len: int; VAR msgtag: int; VAR tid: int ): int; (* VAR ff: FILE *) ): int; VAR nhost: int; VAR narchp: int; VAR hostp: name: ptrtchr; req: int ): int; names: ADDRESS; count: int; svp: ADDRESS ): ): int; mid: int ): int; x1: ADDRESS; x2: ADDRESS; i1: int; i2: int; s: ptrtchr; i4: int): int; 85 int; ADDRESS ): int; int; i3: int; PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE pvm_getfds pvm_getinst pvm_getmwid pvm_getopt pvm_getrbuf pvm_getsbuf pvm_gettid pvm_gsize pvm_halt pvm_hostsync ( ( ( ( ( ( ( ( ( VAR x: ADDRESS ): int; group: ptrtchr; tid: int ): int; i: int ): int; what: int ): int; ): int; ): int; group: ptrtchr; inst: int ): int; group: ptrtchr ): int; ): int; ( host: int; VAR clk: ADDRESS (*struct timeval*); VAR delta: ADDRESS (*struct timeval*) ): int; PROCEDURE pvm_initsend ( encod: int ): int; PROCEDURE pvm_insert ( name: ptrtchr; req: int; data: int ): int; PROCEDURE pvm_joingroup ( group: ptrtchr ): int; PROCEDURE pvm_kill ( tid: int ): int; PROCEDURE pvm_lookup ( name: ptrtchr; req: int; VAR datap: int ): int; PROCEDURE pvm_lvgroup ( group: ptrtchr ): int; PROCEDURE pvm_mcast ( tids: ADDRESS; count: int; msgtag: int ): int; PROCEDURE pvm_mkbuf ( encod: int ): int; PROCEDURE pvm_mstat ( host: ptrtchr ): int; PROCEDURE pvm_mytid ( ): int; PROCEDURE pvm_notify ( what: int; msgtag: int; count: int; vals: ADDRESS ): int; PROCEDURE pvm_nrecv ( tid: int; msgtag: int ): int; PROCEDURE pvm_parent ( ): int; PROCEDURE pvm_perror ( msg: ptrtchr ): int; PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE pvm_packf pvm_pkbyte pvm_pkcplx pvm_pkdcplx pvm_pkdouble pvm_pkfloat pvm_pkint pvm_pklong pvm_pkshort pvm_pkstr pvm_pkuint pvm_pkulong pvm_pkushort ( ( ( ( ( ( ( ( ( ( ( ( ( PROCEDURE pvm_precv PROCEDURE pvm_probe PROCEDURE pvm_psend PROCEDURE PROCEDURE PROCEDURE PROCEDURE pvm_pstat pvm_recv pvm_recvf pvm_reduce cp: cp: xp: zp: dp: fp: np: np: np: cp: np: np: np: ptrtchr (* ... *) ): int; ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ptrtchr ): int; ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ): ): ): ): ): ): ): ): int; int; int; int; int; int; int; int; ): int; ): int; ): int; ( tid: int; msgtag: int; VAR buf: ADDRESS; len: int; datatype: int; VAR sid: int; VAR atag: int; VAR alen: int ): int; ( tid: int; msgtag: int ): int; ( tid: int; msgtag: int; VAR buf: ADDRESS; len: int; datatype: int ): int; ( tid: int ): int; ( tid: int; msgtag: int ): int; ( new: compfunc ): compfunc; ( func: redufunc; VAR data: ADDRESS; count: int; datatype: int; msgtag: int; group: ptrtchr; root: int ): int; 86 PROCEDURE PvmMax ( VAR datatype: int; VAR x: ADDRESS; VAR y: ADDRESS; VAR num: int; VAR info: int ); PROCEDURE PvmMin ( VAR datatype: int; VAR x: ADDRESS; VAR y: ADDRESS; VAR num: int; VAR info: int ); PROCEDURE PvmSum ( VAR datatype: int; VAR x: ADDRESS; VAR y: ADDRESS; VAR num: int; VAR info: int ); PROCEDURE PvmProduct ( VAR datatype: int; VAR x: ADDRESS; VAR y: ADDRESS; VAR num: int; VAR info: int ); PROCEDURE pvm_reg_hoster ( ): int; PROCEDURE pvm_reg_rm ( VAR hi: ADDRESS (* pvmhostinfo *) ): int; PROCEDURE pvm_reg_tasker ( ): int; PROCEDURE pvm_scatter ( x1: ADDRESS; x1: ADDRESS; i1: int; i2: int; i3: int; s: ptrtchr; i4: int): int; PROCEDURE pvm_send ( tid: int; msgtag: int ): int; PROCEDURE pvm_sendsig ( tid: int; signum: int ): int; PROCEDURE pvm_setmwid ( x: int; y: int ): int; PROCEDURE pvm_setopt ( what: int; val: int ): int; PROCEDURE pvm_setrbuf ( mid: int ): int; PROCEDURE pvm_setsbuf ( mid: int ): int; PROCEDURE pvm_spawn ( file: ptrtchr; argv: ADDRESS; flags: int; where: ptrtchr; count: int; tids: ADDRESS ): int; PROCEDURE pvm_start_pvmd ( argc: int; argv: ADDRESS ): int; PROCEDURE pvm_tasks ( where: int; VAR ntaskp: int; VAR taskp: ADDRESS ): int; PROCEDURE pvm_tickle ( narg: int; VAR argp: int; nresp: ADDRESS; VAR resp: int ): int; PROCEDURE pvm_tidtohost ( tid: int ): int; PROCEDURE pvm_trecv ( tid: int; msgtag: int; VAR tim: ADDRESS (*timeval*) ): int; PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE PROCEDURE pvm_unpackf pvm_upkbyte pvm_upkcplx pvm_upkdcplx pvm_upkdouble pvm_upkfloat pvm_upkint pvm_upklong pvm_upkshort pvm_upkstr pvm_upkuint pvm_upkulong pvm_upkushort PROCEDURE pvm_version ( ( ( ( ( ( ( ( ( ( ( ( ( cp: cp: xp: zp: dp: fp: np: np: np: cp: np: np: np: ADDRESS (* ... *) ): int; ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ptrtchr ): int; ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ADDRESS; cnt: int; std: int ( ): ptrtchr; END pvm33. (* -EOF- *) B.5 Header-Datei ERROR.h 87 ): ): ): ): ): ): ): ): int; int; int; int; int; int; int; int; ): int; ): int; ): int; // errors #ifndef _ERROR_H #define _ERROR_H #include "TYPES.h" enum kinds { spotless, harmless, severe, fatal, confusion = 9 } ; void ERROR( kinds, char* ); void ERROR( kinds, char*, long int ); void SWRITE( char* ); void SWRITE( char*, long int ); void GWRITE( GAMMAINT ); void BLINES( GAMMAINT ); #endif _ERROR_H B.6 Header-Datei TYPES.h // types #ifndef _TYPES_H #define _TYPES_H typedef typedef typedef typedef typedef typedef long int INTEGER; long int INT8; long int LONGINT; int INT4; short int INT2; LONGINT GAMMAINT; typedef typedef typedef typedef typedef unsigned unsigned unsigned unsigned unsigned long int CARDINAL; long int CARD8; long int LONGCARD; int CARD4; short int CARD2; typedef void* ADDRESS; #endif _TYPES_H B.7 Header-Datei sema.h // semaphore class 88 #ifndef _SEMA_H #define _SEMA_H #include <pthread.h> #include "TYPES.h" class sema { private: INTEGER init; INTEGER s; INTEGER del; pthread_mutex_t mux; pthread_cond_t pos; public: sema(); sema(INTEGER); ~sema(); void P(); void V(); }; #endif _SEMA_H 89 Anhang C Verzeichnis der POSIX thread Funktionen C.1 POSIX threads pthread_attr_create (3) - Creates a thread attributes object pthread_attr_delete (3) - Deletes a thread attributes object pthread_attr_getstacksize (3) - Returns the value of the stack size attribute of a thread attributes object pthread_attr_setstacksize (3) - Sets the value of the stack size attribute of a thread attributes object pthread_cancel (3) - Initiates termination of a thread pthread_cleanup_pop (3) - Removes a routine from the top of the cleanup stack of the calling thread and optionally executes it pthread_cleanup_push (3) - Pushes a routine onto the cleanup stack of the calling thread pthread_cond_broadcast (3) - Wakes up all threads that are waiting on a condition variable pthread_cond_destroy (3) - Destroys a condition variable pthread_cond_init (3) - Creates a condition variable pthread_cond_signal (3) - Wakes up a thread that is waiting on a condition variable pthread_cond_timedwait (3) - Waits on a condition variable for a specified period of time pthread_cond_wait (3) - Waits on a condition variable pthread_condattr_create (3) - Creates a condition variable attributes object 90 pthread_condattr_delete (3) - Deletes a condition variable attributes object pthread_create (3) - Creates a thread pthread_detach (3) - Detaches a thread pthread_equal (3) - Compares two thread identifiers pthread_exit (3) - Terminates the calling thread pthread_getspecific (3) - Returns the value bound to a key pthread_join (3) - Waits for a thread to terminate pthread_keycreate (3) - Creates a key to be used with thread-specific data pthread_mutex_destroy (3) - Deletes a mutex pthread_mutex_init (3) - Creates a mutex pthread_mutex_lock (3) - Locks a mutex pthread_mutex_trylock (3) - Tries once to lock a mutex pthread_mutex_unlock (3) - Unlocks a mutex pthread_mutexattr_create (3) - Creates a mutex attributes object pthread_mutexattr_delete (3) - Deletes a mutex attributes object pthread_once (3) - Calls an initialization routine pthread_self (3) - Returns the ID of the calling thread pthread_setasynccancel (3) - Enables or disables the asynchronous cancelability of the calling thread pthread_setcancel (3) - Enables or disables the general cancelability of the calling thread pthread_setspecific (3) - Binds a thread-specific value to a key pthread_testcancel (3) - Creates a cancellation point in the calling thread pthread_yield (3) - Allows the scheduler to run another thread instead of the current one pthread_attr_getflags (3) - Returns the value of the flags attribute of a thread attributes object pthread_attr_getinheritsched (3) - Returns the value of the inheritsched attribute of a thread 91 attributes object pthread_attr_getlogfile (3) - Returns the value of the logfile attribute of a thread attributes object pthread_attr_getloginfo (3) - Returns the value of the loginfo attribute of a thread attributes object pthread_attr_getprio (3) - Returns the value of the prio attribute of a thread attributes object pthread_attr_getsched (3) - Returns the value of the sched attribute of a thread attributes object pthread_attr_setflags (3) - Sets the flags of a thread attributes object pthread_attr_setinheritsched (3) - Sets the inheritsched field of a thread attributes object pthread_attr_setlogfile (3) - Sets the logfile field of a thread attributes object pthread_attr_setloginfo (3) - Sets the loginfo field of a thread attributes object pthread_attr_setprio (3) - Sets the prio field of a thread attributes object pthread_attr_setsched (3) - Sets the sched field of a thread attributes object pthread_barrier_checkin (3) - Check into a barrier pthread_barrier_checkout (3) - Check out of a barrier pthread_barrier_destroy (3) - Deletes a barrier pthread_barrier_getflags (3) - Determines the flag settings of a barrier pthread_barrier_getloginfo (3) - Returns the value of the loginfo field of a barrier object loginfo pthread_barrier_init (3) - Creates a barrier pthread_barrier_reinit (3) - Modify an existing barrier pthread_barrier_setflags (3) - Sets the flags of a barrier pthread_barrier_setloginfo (3) - Sets the loginfo field of a barrier object loginfo pthread_barrier_size (3) - Report the number of members in a barrier pthread_barrierattr_create (3) - Creates a barrier attributes object pthread_barrierattr_delete (3) - Deletes a barrier attributes object pthread_barrierattr_getdelayperiod (3) - Returns the value of the delayperiod attribute of a barrier 92 attributes object pthread_barrierattr_getflags (3) - Returns the value of the flags attribute of a barrier attributes object pthread_barrierattr_getlogfile (3) - Returns the value of the logfile attribute of a barrier attributes object pthread_barrierattr_getloginfo (3) - Returns the value of the loginfo attribute of a barrier attributes object pthread_barrierattr_getmaxmembers (3) - Returns the value of the maxmembers attribute of a barrier attributes object pthread_barrierattr_gettreewidth (3) - Returns the value of the treewidth attribute of a barrier attributes object pthread_barrierattr_setdelayperiod (3) - Sets the delayperiod field of a barrier attributes object pthread_barrierattr_setflags (3) - Sets the flags of a barrier attributes object pthread_barrierattr_setlogfile (3) - Sets the logfile field of a barrier attributes object pthread_barrierattr_setloginfo (3) - Sets the loginfo field of a barrier attributes object pthread_barrierattr_setmaxmembers (3) - Sets the maxmembers field of a barrier attributes object pthread_barrierattr_settreewidth (3) - Sets the treewidth field of a barrier attributes object pthread_cond_getflags (3) - Determines the flag settings of a condition variable pthread_cond_getloginfo (3) - Returns the value of the loginfo field of a condition variable object loginfo pthread_cond_setflags (3) - Sets the flags of a condition variable pthread_cond_setloginfo (3) - Sets the loginfo field of a condition variable object loginfo pthread_condattr_getflags (3) - Returns the value of the flags attribute of a condition variable attributes object pthread_condattr_getlogfile (3) - Returns the value of the logfile attribute of a condition variable attributes object pthread_condattr_getloginfo (3) - Returns the value of the loginfo attribute of a condition variable attributes object pthread_condattr_setflags (3) - Sets the flags of a condition variable attributes object pthread_condattr_setlogfile (3) - Sets the logfile field of a condition variable attributes object pthread_condattr_setloginfo (3) 93 - Sets the loginfo field of a condition variable attributes object pthread_dumpall, pthread_dump, pthread_cond_dump, pthread_mutex_dump, pthread_barrier_dump (3) - Report diagnostic information about the threads or related objects in a thread application. pthread_getflags (3) - Determines the flag settings of a thread pthread_getloginfo (3) - Returns the value of the loginfo field of a thread object loginfo pthread_getprio (3) - Determines the scheduling priority of a thread pthread_getproc (3) - Determines the processor a thread is running on pthread_getthread (3) - Determines the kernel thread a pthread is using pthread_log, pthread_cond_log, pthread_mutex_log, pthread_barrier_log (3) - Report a summary of diagnostic information concerning a thread or related object in a thread application. pthread_move (3) - Move and/or bind a thread to a processor pthread_mutex_getflags (3) - Determines the flag settings of a mutex pthread_mutex_getloginfo (3) - Returns the value of the loginfo field of a mutex object loginfo pthread_mutex_getowner (3) - Gets the ID of the thread that owns a mutex pthread_mutex_setflags (3) - Sets the flags of a mutex pthread_mutex_setloginfo (3) - Sets the loginfo field of a mutex object loginfo pthread_mutexattr_getflags (3) - Returns the value of the flags attribute of a mutex attributes object pthread_mutexattr_getlogfile (3) - Returns the value of the logfile attribute of a mute attributes object pthread_mutexattr_getloginfo (3) - Returns the value of the loginfo attribute of a mutex attributes object pthread_mutexattr_getspinlimit (3) - Returns the value of the spinlimit attribute of a mutex attributes object pthread_mutexattr_setflags (3) - Sets the flags of a mutex attributes object pthread_mutexattr_setlogfile (3) - Sets the logfile field of a mutex attributes object pthread_mutexattr_setloginfo (3) - Sets the loginfo field of a mutex attributes object pthread_mutexattr_setspinlimit (3) 94 - Sets the spinlimit field of a mutex attributes object pthread_procdistance (3) - Determines the communication distance between two processors pthread_procsetinfo (3) - Determine the names of all processors in the application's processor set pthread_setflags (3) - Sets the flags of a thread pthread_setlogfile, pthread_getlogfile (3) - Set or get the file pointer of the application-wide thread diagnostic file. pthread_setloginfo (3) - Sets the loginfo field of a thread object loginfo pthread_setlogname, pthread_getlogname (3) - Set or get the filename of the application-wide thread diagnostic file pthread_setname, pthread_getname, pthread_mutex_setname, pthread_mutex_getname, pthread_cond_setname, pthread_cond_getname, pthread_barrier_setname, pthread_barrier_getname (3) - Set or get the name of a thread or thread-related object pthread_setprio (3) - Sets the scheduling priority of a thread C.2 Mach threads processor_set_threads /(2) - gets thread ports for threads assigned to a processor set sigwait (3) - Suspends a calling thread task_threads (2) - Returns a list of the threads within a task thread_abort (2) - Aborts a thread thread_assign (2) - assign thread to processor set thread_assign_default - assign thread to default processor set thread_get_assignment - get processor set to which thread is assigned thread_basic_info (4) - Defines basic information for threads thread_create (2) - Creates a thread within a task thread_get_special_port (2) - Returns a send right to a special port thread_get_state (2) - Returns the execution state for a thread thread_info (2) - Returns information about a thread thread_policy (2) - set schedulng policy for a thread 95 thread_priority (2) - set scheduling priority for thread thread_reply (2) - Returns rights to the calling thread's reply port thread_resume (2) - Resumes a thread thread_self (2) - Returns a send right to the calling thread's kernel port thread_set_special_port (2) - Sets a special port for a thread thread_set_state (2) - Sets the execution state for a thread thread_suspend (2) - Suspends a thread thread_switch (2) - cause context switch with options thread_restore_priority - restore priority depressed by context switch thread_depress_abort - abort priority depression thread_terminate (2) - Destroys a thread thread_get_binding (2) - find out about thread binding thread_set_binding (2) - move and/or bind a thread to a cell or Search Group 0 96 Index FORTRAN Compiler, 68 FORTRAN Programm, 21 Adressraum, 14 asynchronous send, 40 atomare Aktion, 8 atomare Bestandteile, 8 atomarer Bereich, 23 atomic, 8, 23 Implementierung, 24 Attribute, 16 await, 10, 27 Implementierung, 28 Gegenpart, 24 gekoppelter Mutex, 28 Grunddatentypen, 9 Hardware, 5 Hauptspeicher, 15 Header-Dateien, 16 Hostle, 51 Barrier, 32 Bedingungsvariablen, 28 blocked, 14 Bounded Buer Problem, 33 IEEE, 14 Interferenz, 11 Interferenz-Freiheit, 11 Invariante, 12 iPSC/860, 50 C Compiler, 67 C++ Compiler, 67 C++ Programm, 18, 24, 26 channel, 39, 52 Implementierung, 54 con, 7, 16 Implementierung, 18 concurrent, 7 condition synchronization, 8 CPU, 14 create, 16 join, 16 Kanal, 39 KAP, 1 KAP Preprozessor, 68 Konsumenten, 33, 57 Korrektheitsbeweis, 11, 40 Kritischer Bereich, 8 KSR1 Parallelrechner, 1 Lastverteilung, 50 Lebendigkeit, 11, 12 leichtgewichtige Prozesse, 14 Leitung, 39 Leuchtturm, 28 Link, 39 live lock, 13 lock, 23, 28, 29 Logische Uhr, 59 lokale Zeit, 59 lost signals, 28 Daemon, 50 Dataexchange-Format, 50 DCE, 14 Deadlock, 43 Deadlock-Freiheit, 11 default-Attribut, 17, 23, 27 dezentrale Semaphore, 58 distributed memory, 39 distributed semaphores, 60 Ethernet, 39 exclusion of congurations, 12 Mach, 14 Makeles, 69 Maschineninstruktionen, 8 Maschinenoperationen, 9 fair scheduling, 11 Fairness, 11 Fehlertoleranz, 50 97 PVM task, 52 pvm exit, 53 pvm initsend, 54 pvm mytid, 53 pvm nrcv, 55 pvm parent, 53 pvm pk[typ], 54 pvm recv, 55 pvm send, 54 pvm spawn, 53 pvm upk[typ], 55 ready, 15 receive, 40, 55 Implementierung, 55 Register, 10 Registersatz, 10, 14 Registrierung, 52 remote procedure call, 15 Request Queue, 59 Ringpuer, 33 RPC, 15 running, 14 Scheduling, 12, 16 selected receives, 40 Semaphore, 28 Implementierung, 30 send, 40, 54 Implementierung, 55 shared memory, 7 Sicherheit, 11, 12 Signal, 28 signal, 27 Signalmast, 28 signals lost, 28 socket communication, 15 Software, 5 Stack, 10 strong fair, 12 suparum, 1 Synchronisation, 8, 10 synchronous send, 40 taskid, 52, 59 TCGMSG, 50 TCP/IP, 39 terminal, 15 terminated, 15 Terminierung, 11 thread Zustaende, 14 threads, 14 Matrix Multiplikation, 8 Message Passing, 39 Modula-2 Compiler, 68 Modula-2 Programm, 19 MPI, 50 msgtag, 52 Mutex, 23, 28, 29 gekoppelter, 28 mutual exclusion, 8, 11 Optimierungen, 1 OS/2, 14 OSF DCE, 14 P(sem), 29 P4, 50 Parallel Virtual Machine, 39, 50 PARMACS, 50 partielle Korrektheit, 11 pipe, 15, 39 POSIX, 14 private, 7 processingresource, 14 Produzenten, 33, 57 Programmeigenschaften, 11, 40 Protokoll, 39 Prozess, 14 Prozessormodell, 9 Pthread Bibliothek, 15 pthread attr t, 17 pthread cond broadcast, 27 pthread cond init, 27 pthread cond signal, 27 pthread cond t, 27 pthread cond wait, 27 pthread condattr t, 27 pthread create, 17 pthread func, 17 pthread join, 17 pthread mutex init, 23 pthread mutex lock, 23 pthread mutex t, 23 pthread mutex unlock, 23 pthread mutexattr t, 23 pthread t, 17 Pthreads, 23 Puer, 33, 40 PVM, 39, 50 PVM Bibliothek, 69 PVM Demonprozess, 51 PVM Konsole, 51 PVM Routinen, 52 98 timed wait, 28 Timestamp, 59 Transputer, 39 Ueberlappungen, 8 unconditionally fair, 12 unlock, 23, 28, 29 V(sem), 29 verteilte Semaphore, 58 wait, 27, 28 timed, 28 weak fair, 12 Windows NT, 14 Workstation-Cluster, 39 XDR, 50 Zwischenergebnisse, 10 99