found on GitHub: http://github.com/scemama/tccm2014 Warning You are not expected to be able to do by yourself everything I will show! My goal: • Show you different visions of parallel computing • Introduce some words you will here in the future • Show you what exists, what can be done, and how Don't panic and consider this class as general knowledge. If you don't understand something, please STOP ME! 1
be carried out concurrently. If multiple computing hardware is used, concurrent computing is called parallel computing. Many levels of parallelism: • Distributed, Loosely-coupled : Computing grids : shell scripts • Distributed, Tightly-coupled : Supercomputers : MPI, sockets, CoArray Fortran, UPC,... • Hybrid: wth accelerators like GPUs, FPGAs, Xeon Phi, etc • Shared memory : OpenMP, threads • Socket-level : Shared cache • Instruction-level : superscalar processors • Bit-level : vectorization All levels of parallelism can be exploited in the same code, but every problem is not parallelizable at all levels. 2
Potential energy surface 7 GNU Parallel 9 Potential energy surface 14 Links 20 Problem 2 : Computation of Pi 21 Inter-process communication 26 Processes vs threads 26 Communication with named pipes 27 Communication with unnamed pipes 31 Sockets 46 Remote procedure call (RPC) 62 3
• We are allowed to use 100 CPU cores simultaneously • We like to use Gaussian09 to calculate the CCSD(T) energy But: • The grid points are completely independent • Any CPU core can calculate any point Optimal solution: work stealing • One grid point is E(r1,r2,angle) • Dress the list of all the arguments (r1,r2,angle) : [ (0.8,0.8,70.), ..., (1.1,1.1,140.) ] (the queue) • Each CPU core, when idle, pops out the head of the queue and computes E(r1,r2,angle) • All the results are stored in a single file • The results are sorted for plotting 8
can guarantee that the output is the same as if the commands were executed sequentially. Example: $ parallel echo ::: A B C A B C is equivalent to: $ echo A ; echo B ; echo C Multiple input sources can be given: $ parallel echo ::: A B ::: C D A C A D 9
parallel the arguments are treated as commands: $ parallel ::: pwd hostname "echo $TMPDIR" /home/scemama lpqdh82 /tmp Jobs can be run on remote servers: $ parallel ::: echo hostname lpqdh82.ups-tlse.fr $ parallel -S lpqlx139.ups-tlse.fr ::: echo hostname lpqlx139.ups-tlse.fr File can be transfered to the remote hosts: 10
file Running a CCSD(T) calculation with Gaussian09 gives the energy somewhere in the output: CCSD(T)= -0.76329294074D+02 To get only the energy in the output, we can use the following command: g09 < input | grep "^ CCSD(T)=" | cut -d "=" -f 2 2. Script that takes r1, r2 and angle as arguments We create a script run_h2o.sh that runs Gaussian09 for the water molecule taking r 1 , r 2 , and angle as command-line parameters, and prints the CCSD(T) energy: #!/bin/bash r1=$1 14
the names of the machines and their number of CPUs: 2//usr/bin/ssh compute-0-10.local 2//usr/bin/ssh compute-0-6.local 16//usr/bin/ssh compute-0-12.local 16//usr/bin/ssh compute-0-5.local 16//usr/bin/ssh compute-0-7.local 6//usr/bin/ssh compute-0-1.local 2//usr/bin/ssh compute-0-13.local 4//usr/bin/ssh compute-0-8.local 17
the value of with a Monte Carlo algorithm. • The surface of the circle is => For a unit circle, the surface is • The function in the red square is (the circle is ) • The surface in grey corresponds to 21
integral, a Monte Carlo algorithm can be used: • Points are drawn randomly in the unit square. • Count how many times the points are inside the circle • The ratio (inside)/(inside+outside) is . 22
be computed ( ) • We are allowed to use 100 CPU cores simultaneously • We stop when the statistical error is below a given threshold ( ) Optimal algorithm: • Each CPU core computes the its own average over a smaller number of Monte Carlo steps ( ) compute_pi() { result := 0 for i=1 to NMAX { x = random() ; y = random() if ( x^2 + y^2 <= 1 ) { result := result + 1 } } return 4*result/NMAX 23
independent, so they are Gaussian-distributed random variables (central-limit theorem) • The are sent to a central server • The central server computes the running average ¼ » ¹ X = 1 M M X i =1 Xi and the variance ¾2 = 1 M¡1 M X i =1 (Xi ¡ ¹ X)2 to compute the statistical error as • The clients compute blocks as long as the central server asks them to do so when is above the target error 24
the calculations are no more independent: the stopping criterion depends on the results of all previous runs. We have introduced very simple inter-process communications. 25
memory address space • Context switching between processes is slow • Processes interact only through system-provided communication mechanisms • Fork: creates a copy of the current process • Exec: switches to running another binary executable • Spawn: Fork and exec on the child Theads: • Exist as subsets of a process • Context switching between threads is fast • Share the same memory address space : interact via shared memory 26
mkfifo command mkfifo /tmp/pipe /tmp/pipe2 # Unzip the input file and write the result # in the 1st pipe echo "Run gunzip" gunzip --to-stdout input.gz > /tmp/pipe # Zip what comes from the second pipe echo "Run gzip" gzip < /tmp/pipe2 > output.gz # Clear the pipes in the filesystem rm /tmp/pipe /tmp/pipe2 Process 2: p2.sh 29
write # in the 2nd pipe echo "Run sort" sort < /tmp/pipe > /tmp/pipe2 Execution: $ ./p1.sh & Run gunzip $ ./p2.sh Run sort Run gzip [1]+ Done ./p1.sh This simple example is equivalent to: gunzip --to-stdout input.gz | sort | gzip > output.gz 30
started independently : p1 waits for p2 (blocking) • can be run in different shells • named pipes allow multiple processes to write in the same pipe Communication with unnamed pipes Unnamed pipes are equivalent to pipes, but they are opened/closed in the programs themselves. They imply a modification of the source files (apart from using unnamed pipes in the shell with the | operator). 31
(PID) of the current process print "PID: %d" % (os.getpid()) # Open the pipe for inter-process communication r, w = os.pipe() new_pid = os.fork() if new_pid != 0: # This is the parent process print "I am the parent, my PID is %d"%(os.getpid()) print "and the PID of my child is %d"%(new_pid) # Close write and open read file descriptors os.close(w) 33
"Reading from the child" s = r.read() r.close() print "Read '%s' from the child"%(s) else: # This is the child process print " I am the child, my PID is %d"%(os.getpid()) # Close read and open write file descriptors os.close(r) w = os.fdopen(w,'w') print " Sending 'Hello' to the parent" # Send 'Hello' to the parent w.write( "Hello!" ) w.close() print " Sent 'Hello'" 34
am the parent, my PID is 5804 and the PID of my child is 5805 I am the child, my PID is 5805 Reading from the child Sending 'Hello' to the parent Sent 'Hello' Read 'Hello!' from the child 35
pipe(i) := create_pipe() fork() if ( Child process ) { close(pipe(i).read ) open (pipe(i).write) do { X := compute_pi() write X into pipe if ( failure ) { exit process } } } close(pipe(i).write) 36
do { for i=1 to NPROC { X := pipe(i).read() data := data+[X] N := N+1 ave := average(data) err := error (data) if (error < error_threshold) { print ave and err exit process } } } 37
MC steps/process NMAX_inv = 1.e-7 error_threshold = 1.0e-4 # Stopping criterion NPROC=4 # Use 4 processes import os from random import random, seed from math import sqrt def compute_pi(): """Local Monte Carlo calculation of pi""" # Initialize random number generator seed(None) result = 0. 38
2 random numbers x and y x = random() y = random() # Check if (x,y) is in the circle if x*x + y*y <= 1.: result += 1 # X = estimation of pi result = 4.* float(result)*NMAX_inv return result import sys def main(): # Reading edges of the pipes r = [None]*NPROC 39
# Create the pipe r[i], w = os.pipe() # Save the PIDs pid[i] = os.fork() if pid[i] == 0: # This is the child process os.close(r[i]) w = os.fdopen(w,'w') while True: # Compute pi on this process X = compute_pi() # Write the result in the pipe try: 40
else: # This is the parent process os.close(w) r[i] = os.fdopen(r[i],'r') data = [] while True: for i in range(NPROC): # Read in the pipe of the corresponding process X = float( r[i].readline() ) data.append( float(X) ) N = len(data) 41
N > 2: l = [ (x-average)*(x-average) for x in data ] variance = sum(l)/(N-1.) else: variance = 0. # Compute error error = sqrt(variance)/sqrt(N) print '%f +/- %f'%(average,error) # Stopping condition if N > 2 and error < error_threshold: 42
ends of the pipe to be on different machines connected by a network interface. An Internet socket is characterized by a unique combination of : • A transport protocol: TCP, UDP, raw IP, ... • A local socket address: Local IP address and port number, for example 192.168.2.2:22 • A remote socket address: Optional (TCP) 46
import sys,os import socket import datetime # For printing the time now = datetime.datetime.now def main(): # Get host name HOSTNAME = socket.gethostname() PORT = 11279 49
an INET socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Bind the socket to the address and port s.bind( (HOSTNAME, PORT) ) # Wait for incoming connections s.listen(5) # Accept connection conn, addr = s.accept() print now(), "Connected by", addr # Buffered read of the socket print now(), "Reading from socket" data = "" 50
datetime.datetime.now def main(): # Get host name HOSTNAME = sys.argv[1] PORT = int(sys.argv[2]) print now(), "The target server is : %s:%d"%(HOSTNAME,PORT) # Create an INET socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Connect the socket to the address and port of the server 52
"Hello, world !!!!!!" print now(), "Sending : "+message s.send(message) # Read the reply of the server data = s.recv(1024) s.close() print now(), 'Received: ', data if __name__ == "__main__": main() Server execution: $ ./sock_server.py 2014-09-04 01:13:49.903443 I am the server : lpqdh82:11279 53
(HOSTNAME, PORT) ) while True: # Wait for incoming connections s.listen(5) # Accept connection conn, addr = s.accept() # Buffered read of the socket X = "" while True: message = conn.recv(128) X += message if message == "" or len(message) < 128: break 56
import sys def compute_pi(): """Local Monte Carlo calculation of pi""" # Initialize random number generator seed(None) result = 0. # Loop 10^7 times for i in xrange(NMAX): # Draw 2 random numbers x and y x = random() y = random() # Check if (x,y) is in the circle 59
X = estimation of pi result = 4.* float(result)*NMAX_inv return result def main(): while True: X = compute_pi() # Create an INET socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Connect the socket to the address and port of the server try: s.connect( (HOSTNAME, PORT) ) except socket.error: 60
languages and running on different computers to work with each other seamlessly. One program running in a process (the client) calls a function belonging to another program running in another process (the server). All the inter-process communication is hidden. 62
converted to a standard representation (de-referencing pointers, big/little endian, etc) 2. The client stub marshals the parameters : they are packed together in a message. 3. The message is sent to the server 4. The server transmits the message to the server stub 5. The server stub unmarshals the message 6. The server calls its subroutine with the parameters 7. The output is sent back to the client using the same mechanism 63
and HTTP is the transport protocol • JSON-RPC: JSON is the encoding format and HTTP is the transport protocol • SOAP: Simple Object Access Protocol. Uses XML for encoding, but can use HTTP, HTTPS, SMTP, UDP, ... transport protocols • CORBA: Common Object Request Broker Architecture • etc... 64
def hostname(self): """Returns the name of the host on which the server runs""" return socket.gethostname() def split(self, string): """Splits a string in a list of words""" return string.split() 66
the standard output host = socket.gethostbyname( socket.gethostname() ) port = 8000 print "Server URL is http://%s:%d"%(host,port) # Create an instance of the server server = SimpleXMLRPCServer.SimpleXMLRPCServer( (host, port) ) # Associate all functions of MyServer with the server server.register_instance( MyServer() ) # Start the server server.serve_forever() if __name__ == '__main__': main() Client code 67
# XML-RPC library def main(): host = gethostname() print 'This host is: %s'%(host) # The URL of the server is the 1st argument of the command line url = sys.argv[1] # Create a proxy object for the server server = xmlrpclib.Server(url) # Run the 'hostname' function on the server and print the output remote = server.hostname() 68
on the server and print the output s = "This is the string to split" splitted = server.split(s) print 'Splitted string has type:', type(splitted) print str(splitted) if __name__ == '__main__': main() Execution scemama@lpqdh82 $ ./xmlrpc_server.py Server URL is http://192.168.2.8:8000 lpqdh82 - - [29/Jul/2014 01:08:06] "POST /RPC2 HTTP/1.1" 200 - lpqdh82 - - [29/Jul/2014 01:08:06] "POST /RPC2 HTTP/1.1" 200 - 69
from math import sqrt from time import gmtime, strftime # Termination condition error_threshold = 1.e-4 class PiServer(object): def __init__(self): """Initialization of the server""" # Data is stored in a list self.data = [] 74
0 def set_result(self,value,address): """Adds a value coming from a given host""" self.data.append( value ) self.N += 1 # Termination condition is calculated now if self.N > 4 and self.error() < error_threshold: self.terminate() result = 0 else: result = 1 # Each time a new event is added, display the # current average and error self.print_status(address) return result 75
the server host = gethostbyname( gethostname() ) port = 8000 print >>sys.stderr, "Server URL is http://%s:%d"%(host,port) # Create the server server = SimpleXMLRPCServer( (host, port), logRequests=False ) # All functions of PiServer are accessible via XML-RPC server.register_instance( PiServer() ) # Run while the global variable 'running' is True while running: server.handle_request() if __name__ == '__main__': main() Client code: 78
MC steps NMAX = 10000000 NMAX_inv = 1.e-7 from random import random, seed def compute_pi(): """Local Monte Carlo calculation of pi""" # Initialize random number generator seed(None) result = 0. # Loop 10^7 times for i in xrange(NMAX): # Draw 2 random numbers x and y x = random() 79
circle if x*x + y*y <= 1.: result += 1 # X = estimation of pi result = 4.* float(result)*NMAX_inv return result import sys import xmlrpclib from socket import gethostbyname, gethostname def main(): # The URL of the server is the 1st command line argument url = sys.argv[1] address = gethostbyname(gethostname()) # Proxy for the XML-RPC server 80
a new estimate of pi pi = compute_pi() # If it is not possible to set the result on the # server, the server is down so stop the calculation try: cont = server.set_result(pi,address) loop = (cont == 1) except: loop = False if __name__ == '__main__': main() Example fo execution using a single client: 81
want to compute numerically the value of the following integral: Á1 Á2 jÁ3 Á4 ® = ZZ Á1 (r1 )Á2 (r2 ) 1 r12 Á3 (r1 )Á4 (r2 )dr1 dr2 Constraints: • We need to use Fortran • A large number of points will be computed ( ) Simple solution: • Compute the sum over a fixed number of grid points per CPU • Use the Message Passing Interface (MPI) to communicate 84
(API) which specifies how processes can communicate together. • Each process has a rank and belongs to a group of processes. • Processes can do point-to-point or collective communications There is no need to pass the IP address and port number. All low-level communication is handled. MPI programs start with a call to the MPI_Init function ! Fortran integer :: ierr call MPI_Init(ierr) // C #include <mpi.h> int MPI_Init(int *argc, char ***argv) 87
MPI::Init() MPI programs end with a call to the MPI_Finalize function integer :: ierr call MPI_Finalize(ierr) The rank of the current process is obtained with call MPI_COMM_RANK(MPI_COMM_WORLD, myid, ierr) and the total number of processes is obtained with call MPI_COMM_SIZE(MPI_COMM_WORLD, nproc, ierr) 88
are all at this point. They wait for each other. Point-to-point send/receive operation include ’mpif.h’ <type> :: BUF(*) integer :: n, datatype, tag, comm, ierr integer :: status(MPI_STATUS_SIZE) integer :: sender, receiver if (my_id == sender) then call MPI_SEND(buffer, n, datatype, receiver, tag, comm, ierr) else if (my_id == receiver) then call MPI_RECV(buffer, n, datatype, sender, tag, comm, status, ierr) endif 89
• receiver : Rank of the process receiving the data • <type> : Type of data (double precision, integer, etc) • buffer : array of type <type> • n : number of elements to send • datatype : MPI type of data (MPI_DOUBLE_PRECISION, MPI_INTEGER4, etc) • tag : Message tag. Used to identify the message. • comm : Communicator. Usually MPI_COMM_WORLD • ierr : if ierr == MPI_SUCCESS, everything went fine • status : Contains some information about the incoming message to track failures Collective communications Broadcast : one-to-all communication. Send the same data to all processes. 90
comm, ierr call MPI_BCAST(buffer, n, datatype, sender, comm, ierr) • buffer : Data to send to all processes • n : Number of elements in buffer Reductions: all-to-one communication. include ’mpif.h’ <type> :: sendbuf(*), recvbuf(*) integer :: n, datatype, op, sender, comm, ierr call MPI_REDUCE(sendbuf, recvbuf, n, datatype, op, sender, comm, ierr) • sendbuf : Buffer of data to send • recvbuf : Buffer in which the data will be received • op : Reduction operation to perform. Examples: MPI_SUM, MPI_MAX, MPI_PROD, etc 91
double precision, parameter :: xmin = -2.d0, xmax = 2.d0 double precision, external :: f double precision :: r1(3), r2(3) double precision :: local_result, result double precision :: dx,dv ! Initialize the MPI library call MPI_Init(ierr) ! Get the rank of the current process call MPI_COMM_RANK(MPI_COMM_WORLD, myid, ierr) ! Get the the total number of processes 98
then print *, result endif ! Terminate the MPI library call MPI_Finalize(ierr) end Links • Open MPI : Open source MPI implementation : http://www.open-mpi.org/ • Open MPI documentation : http://www.open-mpi.org/doc/v1.8/ 101
Each running process is called an image. • The number of images is obtained with the built-in num_image() function • The rank of the current process is obtained with this_image() A codimension can be given to arrays in square brackets, for example: integer :: i[*] double precision :: A(10)[*] For any image, • i[2] : value of i in image number 2 • A(5)[4] : value of A(5) in image number 4 Any image can directly have access an element in the memory of another image. PGAS : Partitioned Global Address Space. 102
MPI • Types, message sizes, tags, etc are known by the compiler • Compiler can place the communication instructions where it is the best (asynchronous comm) • Better performance obtained by non-experts But: • Experts can get more performance with MPI : more flexibility (lower level) • Having knowledge of how MPI works helps to write efficient (CAF) code 103
r2(2) = dble(m-1) * dx + xmin + dx/2 do n=1,nmax r2(3) = dble(n-1) * dx + xmin + dx/2 local_result = local_result + f(r1,r2) * dv enddo enddo enddo enddo enddo enddo ! Sum the local results of all processes do i=1,num_images() result = result + local_result[i] enddo 108
usually not written by the user. It is preferable to use optimized libraries to perform linear algebra. A standardized API exists (Lapack) on top of the BLAS API. Every CPU manufacturer provides optimized libraries (MKL, ATLAS, NAG, ACML, CULA, etc). For matrix products, we use DGEMM: • D : double precision • Ge : General • MM : Matrix Multiplication NAME DGEMM - perform one of the matrix-matrix operations C := alpha*op( A )*op( B ) + beta*C 110
C21 =A21 ¢B11 +A22 ¢B21 C22 =A21 ¢B12 +A22 ¢B22 The large N x N matrix product can be performed by doing 8 smaller matrix products of size N/2 x N/2, that can be done simultaneously by 8 CPUs. 114
: ~95% of the peak performance • data access and flops -> High arithmetic intensity -> Compute bound. • (2 x N²) data reads, (N) data writes and (N³) flops • Arithmetic intensity = N/2 4-way parallel algorithm: • Here, the data can not be disjoint between the CPUs • To build one block, 4 blocks are needed • The same block will be read by different CPUs • (2 x N x N/2) data reads, (N/2 x N/2 x N) flops • Arithmetic intensity = N/4 : less than sequential algorithm Difficulty: • A modern CPU can perform 8 FP ADD and 8 FP MUL per cycle (!!!) • A random memory access takes ~300 cycles (4 800 flops!) 116
• To benefit from distributed parallelism, the matrices have to be very large Proposed solution: Use shared-memory parallelism • Avoids network bottleneck (~10x slower than RAM) • L3 cache sharing optimizes data access (~6x faster than RAM) • Hardware memory prefetchers will mask the RAM latencies 117
execution of a function is started in the same memory domain. • A private memory domain is created for the thread • The parent process can wait until all the children threads have finished their work • Fork/join model Example in pseudo-code function f() { ... } t = pthread_create(f); Example in Python 118
f(x): global A time.sleep(1.) A = x print x, "written by thread" def main(): t = threading.Thread(target=f, args = [2] ) print "Before thread starts, A= ", A t.start() time.sleep(0.5) 119
t.join() print "After join, A=", A if __name__ == '__main__': main() What happens when 2 threads read from the same memory address at the same time? Nothing special What happens when 2 threads write at the same memory address at the same time? If you are lucky, the program crashes. Otherwise, it is unpredictible. 120
we introduce Locks: acquire_lock(L) if L is free, the current thread gets the lock. Otherwise, block until the lock can be acquired release_lock(L) the lock is released by the current thread Example of wrong code #!/usr/bin/env python import threading import time A = 0 def f(x): 121
main(): t = [None for i in range(10)] for i in range(10): t[i] = threading.Thread(target=f, args = [100000] ) for i in range(10): t[i].start() for i in range(10): t[i].join() print A if __name__ == '__main__': main() Using a lock: 122
= threading.Lock() def f(x): global A a = 0 for i in range(x): a = a+1 lock.acquire() A = A+a lock.release() def main(): t = [None for i in range(10)] 123
) for i in range(10): t[i].start() for i in range(10): t[i].join() print A if __name__ == '__main__': main() A semaphore is more general than a lock : it can be taken simultaneously by more than 1 thread. 124
the use of multi-threading to parallelize the code using directives given as comments. The same source code can be compiled with/without OpenMP. For example: !$OMP PARALLEL DEFAULT(SHARED) PRIVATE(i) !$OMP DO do i=1,n A(i) = B(i) + C(i) end do !$OMP END DO !$OMP END PARALLEL • !$OMP PARALLEL starts a new multi-threaded section. Everything inside this block is executed by all the threads 125
among the different threads (by changing the loop boundaries for instance) • !$OMP END DO marks the end of the parallel loop. It contains an implicit synchronization. After this line, all the threads have finished executing the loop. • !$OMP END PARALLEL marks the end of the parallel section. Contains also an implicit barrier. • DEFAULT(SHARED) : all the variables (A,B,C) are in shared memory by default • PRIVATE(i) : the variable i is private to every thread Other important directives: • !$OMP CRITICAL ... !$OMP END CRITICAL : all the statements in this block are protected by a lock • !$OMP TASK ... !$OMP END TASK : define a new task to execute • !$OMP BARRIER : synchronization barrier 126
statements in this block are executed by a single thread • !$OMP MASTER ... !$OMP END MASTER : all the statements in this block are executed by the master thread • omp_get_thread_num() : returns the ID of the current running thread • omp_get_num_threads() : returns the total number of running threads • OMP_NUM_THREADS : Environment variable (shell) that fixes the number of threads to run 127
create_matrix() B = create_matrix() // parallelize loop over i and j for i=1 to N using a step of N/2 { for j=1 to N using a step of N/2 { for k=1 to N using a step of N/2 { // C_ij = A_ik.B_kj DGEMM ( C(i,j), A(i,k), B(k,j), (N/2, N/2) ) } } } 128
iend(1),jstart(1),jend(1)) enddo enddo !$OMP END DO !$OMP MASTER call wall_time(cpu_1) write(0,*) 'Matrix build time : ', cpu_1-cpu_0, 's' call wall_time(cpu_0) !$OMP END MASTER !$OMP DO COLLAPSE(2) do i1=1,sze,step do j2=1,sze,step 131
write(0,*) 'Compute Time : ', cpu_1-cpu_0, 's' !$OMP END MASTER !$OMP END PARALLEL ! Print the sum of the elements s = 0.d0 do j=1,sze do i=1,sze s = s+C(i,j) enddo enddo 133
create_matrix() queue= [] for i=1 to N using a step of N/2 { for j=1 to N using a step of N/2 { for k=1 to N using a step of N/2 { // C_ij = A_ik.B_kj queue = queue + [ ( i, j, k ) ] } } } sem = semaphore(nproc) 135
while queue is not empty { acquire_semaphore(sem) // Pop out the 1st element of the queue params = queue.pop() pthread_create( do_work, params ) } program submatrix_openmp implicit none integer, parameter :: sze = 5000 double precision, allocatable, dimension (:,:) :: A, B, C 136
is divided in sub-problems that are solved in the same way as the large problem. Example : Sum Suppose you want to compute the sum of all the elements of the array A(1:16). This sum can be expressed as the sum of the two halves of the array : S[ A(1:16) ] = S[ A(1:8) ] + S[ A(9:16) ] The S function will be applied recursively. 142
sze ) !$OMP END SINGLE NOWAIT !$OMP TASKWAIT !$OMP END PARALLEL call wall_time(wall_1) write(0,*) 'Compute Time : ', wall_1-wall_0, 's' ! Print the sum of the elements s = 0.d0 do j=1,sze do i=1,sze s = s+C(i,j) enddo enddo 154
: Single Instruction, Multiple Data Execute the same instruction in parallel on all the elements of a vector: Example : AVX vector ADD in double precision: Different instruction sets exist in the x86 micro-architecture: • MMX : Integer (64-bit wide) • SSE -> SSE4.2 : Integer and Floating-point (128-bit) 159
Integer and Floating-point (512-bit) Requirements: 1. The elements of each SIMD vector must be contiguous in memory 2. The first element of each SIMD vector must be aligned on a proper boundary (64, 128, 256 or 512-bit). Automatic vectorization The compiler can generate automatically vector instructions when possible. A double precision AVX auto-vectorized loop generates 3 loops: Peel loop (scalar) First elements until the 256-bit boundary is met Vector loop Vectorized version until the last vector of 4 elements Tail loop (scalar) Last elements 160
can tell the compiler to align the arrays on a 32 byte boundary using: double precision, allocatable :: A(:), B(:) !DIR$ ATTRIBUTES ALIGN : 32 :: A, B Then, before using the arrays in a loop, you can tell the compiler that the arrays are aligned. Be careful: if one array is not aligned, this may cause a segmentation fault. !DIR$ VECTOR ALIGNED do i=1,n A(i) = A(i) + B(i) end do To remove the tail loop, you can allocate A such that its dimension is a multiple of 4 elements: 162
n else n_4 = n - n_4 + 4 endif allocate ( A(n_4), B(n_4) ) and rewrite the loop as follows: do i=1,n,4 !DIR$ VECTOR ALIGNED !DIR$ VECTOR ALWAYS do k=0,3 A(i+k) = A(i+k) + B(i+k) end do end do 163
cycle can be transformed safely into only vector instructions, and it will not produce the tail and peel loops with the branching. For small arrays, the gain can be significant. For multi-dimensional arrays, if the 1st dimension is a multiple of 4 elements, all the columns are aligned: double precision, allocatable :: A(:,:) !DIR$ ATTRIBUTES ALIGN : 32 :: A allocate( A(n_4,m) ) do j=1,m do i=1,n,4 !DIR$ VECTOR ALIGNED !DIR$ VECTOR ALWAYS do k=0,3 A(i+k,j) = A(i+k,j) * B(i+k,j) end do end do end do 164
always the best choice. Using multiples of 8 or 16 elements can be better because the inner-most loop may be unrolled by the compiler to improve the efficiency of the pipeline. 165
ILP, different execution units are used in parallel. For example, Sandy-Bridge (2011) x86 CPUs can perform simultaneously: • 1 vector ADD • 1 vector MUL • 2 vector LOADs • 1 vector STORE • 1 integer ADD Ideal for a scalar product (or a matrix product): do i=1,N x = x + B(i)*C(i) end do 166
8 flops/cycle. For a 10-core CPU at 2.8GHz: 8 x 2.8E9 x 10 = 224 Gflops/s in double precision Example: do i=1,N A(i) = X(i) + Y(i) end do and do i=1,N A(i) = 2.d0*(X(i) + Y(i)) end do take the same amount of time. Pipelining Here we consider a typical RISC processor with 4 different stages to perform an operation: 167
to perform one single operation Throughput 1 cycle. We get one result every cycle Out of order execution Inside the CPU, the instructions are not executed in the exact sequence of the code, provided that it does not affect the result: independent instructions can be executed in any order. The CPU can choose an execution order that improves the efficiency of the pipeline. Branch prediction When an if statement occurs, two paths can be taken by the program: it is a branch. The pipeline has to be filled differently depending on the branch. 170
likely to be chosen, and fills the pipeline for it (speculative execution). If the branch is mispredicted, the pipeline is emptied and the calculation is rolled back. Branch mispredictions can have a large penalty on the execution. Many branch predictors exist: • Static predictor : always assume the condition is true • Saturating counter : 1. Strongly not taken 2. Weakly not taken 3. Weakly taken 4. Strongly taken • Two-level adaptive predictor : a branch might be taken depending upon whether the previous two were taken • Local branch prediction : one history buffer (~4 bits) for each conditional • Global branch prediction : keep a global history buffer for all branches • Loop predictor • etc... Example: 171
image was created with Inkscape.. Licensed under Creative Commons Attribution-Share Alike 3.0 via Wikimedia Commons - http://commons.wikimedia.org/wiki/File:Pipeline,_4_stage.svg 173
• Coarse-grained will give the highest level of parallel efficiency (lowest Communication/Computation ratio) • Different levels of parallelism can be combined 174
For example • doing a Monte Carlo calculation using OpenMP is a bad choice: • Shared memory is not required • Communication is generally low • Synchronization barriers can be avoided • Scaling would be limited to the number of cores/node • diagonalizing a matrix with XML-RPC would not give a good scaling: • A lot of communication (matrix products) • Synchronizations necessary If you need to do a Monte Carlo calculation where every Monte Carlo step diagonalizes a very large matrix, you can use OpenMP for the diagonalization and XML-RPC for the distribution of the MC steps. 175