系统:
程序:
potential_serial.cpp
目的:
1. 好的多线程平发性
2. 高效的计算性
使用工具:
Intel® C++ Compiler 9.1
Intel® VTune Analyzers 8.0
Intel® Math Kernel Library 9.0
Microsoft Visual Studio* .NET 2005
编译选项:
/O3 /Qopenmp /QaxT /QxT /Qc99 /Qparallel
优化步骤:
1. 使用VTune Analyzers分析程序的关健点.
a) 将程序编译后,使用VTune Analyzers分析程序中的关键点,发现计算时间总要用的computePot函数中的pow和sqrt调用上,说明在进行优化时,这两个地方应该是优化的重点所在。
2. 单线程优化
a) 在找到了程序的中关键点后,就可能先进行单线程中关键计算点的优化了,pow为计算x的y次方的函数,在这里只需要计算x的2次方,由于的double型的数,不能用移位的方法的计算平方,改用进行一次自乘操作,这样比pow应该会快不少。
b) 使用VTune Analyzers分析程序,证实以上的优化达到有预想的效果。
c) initPositions与updatePositions两个函数为相式的代码,在此使用相同的方法。这两个函数都是双重循环,先用如下的方法将双循环改为单循环,减少调用。
3. 并行改造
a) 本程序主要是对数据的循环迭代操作,自然首选循环并行模式来改造,且具有很好的串行等价性。本程序中主要的计算点在computePot函数,所以只有让computePot函数能并行工作,要使程序有好的并行性,必须尽量减少多个线程之间的同步开销,最好是在多个线程中没有共享数据需要操作,这样多个线程之间就不需要同步操作了。
b) 要得到好的并行性,必须使computePot函数是线程安全的,而在computePot函数中使用了全局数组r、全局变量pot, distx, disty, distz, dist,而这几个变量只是冲当了局部变量的功能,只要将它改为一个局部变量,然后结果返回值来交给调用者。
c) 经过上面的computePot函数以可是线程安全的了,下一步就是让多个线程来同时调用它了,而它是在main()中在一个for循环中调用的,在每次调用后,还调用了一次updatePositions()函数来对全局数组r中的值进行更新,在updatePositions()函数中每次都调用rand()函数来产生一个值后更新数组,所以要想得到和初始程序一至的结果,必须以按相同的顺序来更新数组中的值,这样又限制了用并行的方式调用computePot函数。
d) 所以要使程序能并行的调用computePot函数,必须用别外的方式来更新数组中的值,而不是计算一次更新一次。在更新数组时,每次都根据上一次的值来计算的,在此,我使用一个3维数组来保存每一次的更新值,这个数组中一共NITER个原来的数组r,在这个数据中第一维中的每一个数组,保存一次更新的结果。
double r[NITER+1][DIMS][NPARTS];
然后在改变initPositions和updatePositions函数使它们能更新新的数组。
void initPositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[0][i][j] = 0.5 + ( (double) rand() / (double) RAND_MAX );
}
void updatePositions() {
int i, j;
for( int idx=1; idx<= NITER; idx++ )
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[idx][i][j] = r[idx-1][i][j] - (0.5 + ( (double) rand() / (double) RAND_MAX ));
改变computePot函数为double computePot( double t[][NPARTS] ), 这样就可以在main中的for中使用每个i对应的数组来调用它就行了。至此,程序computePot的每次调用都是独立的了。
e) 经过上次的改造后,就可能使用openmp来使main中的for循环并行的工作了,为了在多个cpu中得到一个好的负载平衡,使用使用omenmp的guided调试。
#pragma omp parallel private( i )
#pragma omp for schedule(static,10) nowait
for( i=0; i<NITER; i++ ) {
ret[i] = computePot( r[i+1] );
}
4. 使用MKL库优化
a) 通过上面的几步优化后,程序以有很好的并行性,程序现在的计算量主要表现在进行sqrt和浮点除法上了,在MKL库中的vdInvSqrt函数正好是完成这项工作的最佳选择; 但是, vdInvSqrt函数只能对数组进行计算,而程序中每个循环就要计算一次,所以必须修循环中的计算方法。
b) 先将内层循环中每一次distx, disty, distz的乘积放到一个数组dtA中,待内层循环退出以后,就可以使用vdInvSqrt函数来对这个数组进行计算;然后在将vdInvSqrt函数计算的结果求合就得到了最终结果。
for( j=0; j<i-1; j++ ) {
distx = ( (t[0][j] - t[0][i] ) );
disty = ( (t[1][j] - t[1][i] ) );
distz = ( (t[2][j] - t[2][i] ) );
dtA[j] = ( distx * distx + disty * disty + distz * distz );
}
vdInvSqrt( j, dtA, dtB );
for( j=0; j< i-1; j++ ) {
m_pot += dtB[j];
}优化后的代码: #include < stdio.h > #include < stdlib.h > // #include <math.h> #include < mathimf.h > #include < windows.h > #include < time.h > #include < omp.h > #include < mkl_vml_defines.h > #include < mkl.h > #define NPARTS 1000 #define NITER 201 #define DIMS 3 int rand( void ); double computePot( const double t[][NPARTS] ); void initPositions( void ); void updatePositions( void ); double r[NITER + 1 ][DIMS][NPARTS]; double ret[NITER]; int main() ... { int i; clock_t start, stop; initPositions(); updatePositions(); vmlSetMode( VML_LA | VML_FLOAT_CONSISTENT );; start=clock(); #pragma omp parallel private( i ) ...{ #pragma omp for schedule(guided,1) nowait for( i=0; i<NITER; i++ ) ...{ ret[i] = computePot( r[i+1] ); //if (i == 0) printf("]: Potential: .3f ", i, pot); //updatePositions(); } } for( i=0; i<NITER; i+=10) printf("]: Potential: .7f ", i, ret[i]); stop=clock(); printf ("Seconds = .9f ",(double)(stop-start)/ CLOCKS_PER_SEC);} void initPositions() ... { int i, j; for( i=0; i<DIMS; i++ ) for( j=0; j<NPARTS; j++ ) r[0][i][j] = 0.5 + ( (double) rand() / (double) RAND_MAX );} void updatePositions() ... { int i, j; for( int idx=1; idx<= NITER; idx++ ) for( i=0; i<DIMS; i++ ) for( j=0; j<NPARTS; j++ ) r[idx][i][j] = r[idx-1][i][j] - (0.5 + ( (double) rand() / (double) RAND_MAX ));} double computePot( const double t[][NPARTS] ) ... { int i, j; double distx, disty, distz; double dtA[NPARTS]; double dtB[NPARTS]; double m_pot =0.0; for( i=0; i<NPARTS; i++ ) ...{ for( j=0; j<i-1; j++ ) ...{ distx = ( (t[0][j] - t[0][i] ) ); disty = ( (t[1][j] - t[1][i] ) ); distz = ( (t[2][j] - t[2][i] ) ); dtA[j] = ( distx * distx + disty * disty + distz * distz ); } vdInvSqrt( j, dtA, dtB ); for( j=0; j< i-1; j++ ) ...{ m_pot += dtB[j]; } } return m_pot;}