英特尔多核平台编码优化大赛就顺便试试身手了

    技术2022-05-11  147

    这段正好在看《并行编程模式》这本书,看见英特尔多核平台编码优化大赛就顺便试试身手了,由于本人对汇编不熟,intel的编译器和工具以是临时抱佛脚,所以比起其他高手应该还差得多了,所以只将重点是放在并行性的改造上了。 原程序: #include  < stdio.h > #include  < stdlib.h > #include  < math.h > #include  < windows.h > #include  < time.h >   #define  NPARTS 1000 #define  NITER 201 #define  DIMS 3   int  rand(  void  ); int  computePot( void ); void  initPositions( void ); void  updatePositions( void );  double  r[DIMS][NPARTS]; double  pot; double  distx, disty, distz, dist; int  main()  {   int i;   clock_t start, stop;   initPositions();   updatePositions();    start=clock();   for( i=0; i<NITER; i++ ) {      pot = 0.0;      computePot();      if (i%10 == 0) printf("]: Potential:  .7f ", i, pot);      updatePositions();   }   stop=clock();   printf ("Seconds = .9f ",(double)(stop-start)/ CLOCKS_PER_SEC);}    void  initPositions()  {   int i, j;    for( i=0; i<DIMS; i++ )      for( j=0; j<NPARTS; j++ )          r[i][j] = 0.5 + ( (double) rand() / (double) RAND_MAX );}    void  updatePositions()  {   int i, j;    for( i=0; i<DIMS; i++ )      for( j=0; j<NPARTS; j++ )         r[i][j] -= 0.5 + ( (double) rand() / (double) RAND_MAX );}    int  computePot()  {   int i, j;   for( i=0; i<NPARTS; i++ ) {      for( j=0; j<i-1; j++ ) {        distx = pow( (r[0][j] - r[0][i]), 2 );        disty = pow( (r[1][j] - r[1][i]), 2 );        distz = pow( (r[2][j] - r[2][i]), 2 );        dist = sqrt( distx + disty + distz );        pot += 1.0 / dist;      }   }   return 0;} 优化报告:

    系统:

    程序:

    potential_serial.cpp

    目的:

    1.       好的多线程平发性

    2.       高效的计算性

    使用工具:

    Intel® C++ Compiler 9.1

    Intel® VTune Analyzers 8.0

    Intel® Math Kernel Library 9.0

    Microsoft Visual Studio* .NET 2005

    编译选项:

    /O3 /Qopenmp /QaxT /QxT /Qc99 /Qparallel

     

     

    优化步骤:

    1.       使用VTune Analyzers分析程序的关健点.

    a)         将程序编译后,使用VTune Analyzers分析程序中的关键点,发现计算时间总要用的computePot函数中的powsqrt调用上,说明在进行优化时,这两个地方应该是优化的重点所在。

    2.       单线程优化

    a)         在找到了程序的中关键点后,就可能先进行单线程中关键计算点的优化了,pow为计算xy次方的函数,在这里只需要计算x2次方,由于的double型的数,不能用移位的方法的计算平方,改用进行一次自乘操作,这样比pow应该会快不少。

    b)        使用VTune Analyzers分析程序,证实以上的优化达到有预想的效果。

    c)         initPositionsupdatePositions两个函数为相式的代码,在此使用相同的方法。这两个函数都是双重循环,先用如下的方法将双循环改为单循环,减少调用。

    3.       并行改造

    a)         本程序主要是对数据的循环迭代操作,自然首选循环并行模式来改造,且具有很好的串行等价性。本程序中主要的计算点在computePot函数,所以只有让computePot函数能并行工作,要使程序有好的并行性,必须尽量减少多个线程之间的同步开销,最好是在多个线程中没有共享数据需要操作,这样多个线程之间就不需要同步操作了。

    b)        要得到好的并行性,必须使computePot函数是线程安全的,而在computePot函数中使用了全局数组r、全局变量pot, distx, disty, distz, dist,而这几个变量只是冲当了局部变量的功能,只要将它改为一个局部变量,然后结果返回值来交给调用者。

    c)         经过上面的computePot函数以可是线程安全的了,下一步就是让多个线程来同时调用它了,而它是在main()中在一个for循环中调用的,在每次调用后,还调用了一次updatePositions()函数来对全局数组r中的值进行更新,在updatePositions()函数中每次都调用rand()函数来产生一个值后更新数组,所以要想得到和初始程序一至的结果,必须以按相同的顺序来更新数组中的值,这样又限制了用并行的方式调用computePot函数。

    d)        所以要使程序能并行的调用computePot函数,必须用别外的方式来更新数组中的值,而不是计算一次更新一次。在更新数组时,每次都根据上一次的值来计算的,在此,我使用一个3维数组来保存每一次的更新值,这个数组中一共NITER个原来的数组r,在这个数据中第一维中的每一个数组,保存一次更新的结果。

                        double r[NITER+1][DIMS][NPARTS];

           然后在改变initPositionsupdatePositions函数使它们能更新新的数组。

    void initPositions() {

       int i, j;

       for( i=0; i<DIMS; i++ )

          for( j=0; j<NPARTS; j++ )

             r[0][i][j] = 0.5 + ( (double) rand() / (double) RAND_MAX );

    }

     

    void updatePositions() {

         int i, j;

    for( int idx=1; idx<= NITER; idx++ )

             for( i=0; i<DIMS; i++ )

                  for( j=0; j<NPARTS; j++ )

                       r[idx][i][j] =  r[idx-1][i][j] - (0.5 + ( (double) rand() / (double) RAND_MAX ));

    改变computePot函数为double computePot( double t[][NPARTS] ) 这样就可以在main中的for中使用每个i对应的数组来调用它就行了。至此,程序computePot的每次调用都是独立的了。

    e)         经过上次的改造后,就可能使用openmp来使main中的for循环并行的工作了,为了在多个cpu中得到一个好的负载平衡,使用使用omenmpguided调试。

    #pragma omp parallel private( i )

            #pragma omp for schedule(static,10) nowait

            for( i=0; i<NITER; i++ ) {

               ret[i] = computePot( r[i+1] );

            }

    4.       使用MKL库优化

    a)         通过上面的几步优化后,程序以有很好的并行性,程序现在的计算量主要表现在进行sqrt和浮点除法上了,在MKL库中的vdInvSqrt函数正好是完成这项工作的最佳选择; 但是, vdInvSqrt函数只能对数组进行计算,而程序中每个循环就要计算一次,所以必须修循环中的计算方法。

    b)        先将内层循环中每一次distx, disty, distz的乘积放到一个数组dtA中,待内层循环退出以后,就可以使用vdInvSqrt函数来对这个数组进行计算;然后在将vdInvSqrt函数计算的结果求合就得到了最终结果。

         for( j=0; j<i-1; j++ ) {

             distx = ( (t[0][j] - t[0][i] ) );

             disty = ( (t[1][j] - t[1][i] ) );

             distz = ( (t[2][j] - t[2][i] ) );

             dtA[j] = ( distx * distx + disty * disty + distz * distz );

         }

         vdInvSqrt( j, dtA, dtB );

         for( j=0; j< i-1; j++ ) {

             m_pot += dtB[j];

         }

     

    优化后的代码: #include  < stdio.h > #include  < stdlib.h > // #include <math.h> #include  < mathimf.h > #include  < windows.h > #include  < time.h > #include  < omp.h > #include  < mkl_vml_defines.h > #include  < mkl.h >   #define  NPARTS 1000 #define  NITER 201 #define  DIMS 3 int  rand(  void  ); double  computePot(  const   double  t[][NPARTS] ); void  initPositions( void ); void  updatePositions( void );  double  r[NITER + 1 ][DIMS][NPARTS]; double  ret[NITER]; int  main()  {    int i;    clock_t start, stop;    initPositions();    updatePositions();    vmlSetMode( VML_LA | VML_FLOAT_CONSISTENT );;    start=clock();    #pragma omp parallel private( i )    {        #pragma omp for schedule(guided,1) nowait        for( i=0; i<NITER; i++ ) {            ret[i] = computePot( r[i+1] );            //if (i == 0) printf("]: Potential: .3f ", i, pot);            //updatePositions();        }    }    for( i=0; i<NITER; i+=10)        printf("]: Potential:  .7f ", i, ret[i]);    stop=clock();    printf ("Seconds = .9f ",(double)(stop-start)/ CLOCKS_PER_SEC);}    void  initPositions()  {    int i, j;    for( i=0; i<DIMS; i++ )        for( j=0; j<NPARTS; j++ )             r[0][i][j] = 0.5 + ( (double) rand() / (double) RAND_MAX );}    void  updatePositions()  {    int i, j;    forint idx=1; idx<= NITER; idx++ )         for( i=0; i<DIMS; i++ )            for( j=0; j<NPARTS; j++ )                r[idx][i][j] =  r[idx-1][i][j] - (0.5 + ( (double) rand() / (double) RAND_MAX ));}    double  computePot(  const   double  t[][NPARTS] )  {    int i, j;    double distx, disty, distz;    double dtA[NPARTS];    double dtB[NPARTS];    double m_pot =0.0;    for( i=0; i<NPARTS; i++ ) {        for( j=0; j<i-1; j++ ) {            distx = ( (t[0][j] - t[0][i] ) );            disty = ( (t[1][j] - t[1][i] ) );            distz = ( (t[2][j] - t[2][i] ) );            dtA[j] = ( distx * distx + disty * disty + distz * distz );        }        vdInvSqrt( j, dtA, dtB );        for( j=0; j< i-1; j++ ) {            m_pot += dtB[j];        }    }    return m_pot;}

    最新回复(0)