一、Conway’s Game of Life描述
康威生命游戏(英语:Conway’s Game of Life),又称康威生命棋,是英国数学家约翰·何顿·康威在1970年发明的细胞自动机。
1、规则
生命游戏中,对于任意细胞,规则如下。每个细胞有两种状态:存活或死亡,每个细胞与以自身为中心的周围八格细胞产生互动。
当前细胞为存活状态时,当周围低于2个(不包含2个)存活细胞时, 该细胞变成死亡状态。(模拟生命数量稀少)当前细胞为存活状态时,当周围有2个或3个存活细胞时, 该细胞保持原样。当前细胞为存活状态时,当周围有3个以上的存活细胞时,该细胞变成死亡状态。(模拟生命数量过多)当前细胞为死亡状态时,当周围有3个存活细胞时,该细胞变成存活状态。 (模拟繁殖)
可以把最初的细胞结构定义为种子,当所有在种子中的细胞同时被以上规则处理后, 可以得到第一代细胞图。按规则继续处理当前的细胞图,可以得到下一代的细胞图,周而复始。随题给出了一个细胞图的不断演变过程,见Gospers_glider_gun.gif。
Gospers_glider_gun.gif二、实现思路准备先用java和C++各先实现一次串行的。因为java不支持SIMD所以并行版本只用C++实现。2.1基本实现思路输入数据格式输入数据格式如下所示:因为屏幕限制,不能显示全部,数据为50X100。以空格为间隔。数据结构及实现思路使用以为数组存储,在本例中采用一维数组存储。也可以采用二维数组实现,基本原理是一样的。遍历整个一维数组,对每个数组的周围的八个值进行判断并计算周围八个cell存活的个数count(即周围为1的值个数)。然后根据个数count以及自身是否存活(0 or 1),判断这次迭代中,本细胞该不该存活。依次类推,遍历50000次。输入输出的问题文件输入的话,读取出来的是字符,所以1变成了49,0变成了48。还要注意去除空格。输出数据的话,如果在程序内部把字符1(49)转成了数字1,那么输出的时候要想输出字符1,则要把数组里面的数字1转化成字符1(49)。(0亦然)具体细节请看博客:http://blog.csdn.net/anialy/article/details/7961179三、串行java实现
Conway2D
1 import java.io.*; 2 3 /** 4 * Java class for simulation of Conway's Game of Life. 5 */ 6 7 class Conway2D { 8 private final int width; 9 private final int height; 10 private final int size; 11 private byte[] data; 12 13 /** 14 * @param width 15 * @param height 16 */ 17 Conway2D(int width, int height) { 18 this.width = width; 19 this.height = height; 20 this.size = width * height; 21 data = new byte[size]; 22 } 23 24 25 /** 26 * 生命游戏迭代一次 27 */ 28 void iterate() { 29 byte[] prev = new byte[size]; 30 System.arraycopy(data, 0, prev, 0, size); 31 byte[] next = new byte[size]; 32 for (int i = 0; i < width; i++) { 33 for (int j = 0; j < height; j++) { 34 if (isAlive(i, j, prev)) { 35 next[j * width + i] = 49; 36 } else { 37 next[j * width + i] = 48; 38 } 39 } 40 } 41 System.arraycopy(next, 0, data, 0, size); 42 } 43 44 45 /** 46 * 检查cell是否存活 47 * 48 * @param x The x position 49 * @param y The y position 50 * @param d The grid data. 51 * @return 52 */ 53 54 private boolean isAlive(int x, int y, byte[] d) { 55 int count = 0; 56 int pos1 = y * width + x; 57 //循环该cell的周围,累计周围活细胞个数 58 for (int i = x - 1; i <= x + 1; i++) { 59 for (int j = y - 1; j <= y + 1; j++) { 60 int pos = j * width + i; 61 if (pos >= 0 && pos < size - 1 && pos != pos1) //如果点在圈内 并且 不是自己 62 { 63 if (d[pos] == 49) { 64 count++; 65 } 66 } 67 } 68 } 69 //如果该细胞死亡 70 if (d[pos1] == 48) { 71 return count == 3; 72 } else {//如果该细胞活着 73 return !(count < 2 || count > 3); 74 } 75 } 76 77 78 /** 79 * 读取文件中的数据 80 * 注意byte的不同,空格和换行在byte中的表示不同 81 * 82 * @param file 83 */ 84 void readData(String file) throws IOException { 85 BufferedInputStream in = new BufferedInputStream(new FileInputStream(file)); 86 ByteArrayOutputStream out = new ByteArrayOutputStream(1); 87 88 byte[] temp = new byte[1]; 89 int size; 90 while ((size = in.read(temp)) != -1) { 91 if (temp[0] != 32 && temp[0] != 10) { 92 out.write(temp, 0, size); 93 } 94 } 95 in.close(); 96 data = out.toByteArray(); 97 } 98 99 100 void writeToFile(String file) throws IOException {101 FileOutputStream fos =new FileOutputStream(file);102 BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(data));103 byte[] tmp = new byte[100];104 int size;105 while((size = bis.read(tmp)) != -1){106 fos.write(tmp);107 fos.write(new byte[]{10});108 }109 }110 }
View Code
main
1 import java.io.IOException; 2 3 /** 4 * Created by anyuan on 2017/1/4. 5 */ 6 public class Main { 7 public static final int MAXTIMES = 50000; 8 9 public static void main(String[] args) {10 Conway2D conway2D = new Conway2D(100, 50);11 try {12 conway2D.readData("C:\\Users\\anyuan\\IdeaProjects\\操作系统实验\\LifeGame\\input_50x100");13 double start = System.currentTimeMillis();14 for (int i = 0; i < MAXTIMES; i++) {15 conway2D.iterate();16 }17 double end = System.currentTimeMillis();18 System.out.println("运行时间:" + (end - start)/1000 + "秒");//应该是end - start19 conway2D.writeToFile("output_java串行");20 } catch (IOException e) {21 e.printStackTrace();22 }23 }24 }
View Code
结果:运行时间大概6秒左右。四、C++串行和并行实现C++串行思路和java差不多。只不过在并行代码中的步骤稍微有些不同。并行思路如下:
- 读入数据//去空格读取数据转换49to1//将48转换为0,49转换为1。迭代50000次//思路为将第一遍历,一次性读入八个数据到__m128i寄存器,然后对该寄存器的数据周边八个数值进行相加。作为count以判断是否该存活。所以实际上只需要迭代50000/8次。数据转换1to49//1转换成49,0转换成48输出数据
下面是C++的代码。关于SIMD的相关介绍,我会在下一节稍微介绍一下,并给出几个供参考的博客。下一节中也会介绍一下并行化的思路。Header.h
1 #pragma once 2 #include <algorithm> 3 #include <fstream> 4 #include <iostream> 5 #include <string> 6 #define D_SCL_SECURE_NO_WARNINGS 7 8 #define WIDTH 100 9 #define HEIGHT 50 10 #define MAXSIZE 50*100 11 12 class Conway2D 13 { 14 const int width; 15 const int height; 16 const int size; 17 int data[MAXSIZE]; 18 19 20 public: 21 Conway2D() : width(100), height(50), size(width * height) 22 { 23 } 24 25 Conway2D(int width, int height) 26 : width(width), 27 height(height), 28 size(width * height) 29 { 30 } 31 32 void iterate() 33 { 34 int prev[MAXSIZE]; 35 std::copy(data, data + size, prev); 36 int next[MAXSIZE]; 37 for (int i = 0; i < width; i++) 38 { 39 for (int j = 0; j < height; j++) 40 { 41 if (isAlive(i, j, prev)) 42 { 43 next[j * width + i] = 49; 44 } 45 else 46 { 47 next[j * width + i] = 48; 48 } 49 } 50 } 51 std::copy(next, next + size, data); 52 } 53 54 bool isAlive(int x, int y, int d[]) const 55 { 56 int count = 0; 57 int pos1 = y * width + x; 58 //循环该cell的周围,累计周围活细胞个数 59 for (int i = x - 1; i <= x + 1; i++) 60 { 61 for (int j = y - 1; j <= y + 1; j++) 62 { 63 int pos = j * width + i; 64 if (pos >= 0 && pos < size - 1 && pos != pos1) //如果点在圈内 并且 不是自己 65 if (d[pos] == 49) 66 ++count; 67 } 68 } 69 //如果该细胞死亡 70 if (d[pos1] == 48) 71 { 72 return count == 3; 73 } 74 //如果该细胞活着 75 return !(count < 2 || count > 3); 76 } 77 78 void readData(std::string file) 79 { 80 std::ifstream f1(file); 81 if (!f1) 82 std::cout << "文件打开失败" << std::endl; 83 char tmp[1]; 84 int count = 0; 85 while (count < MAXSIZE) 86 { 87 f1.read(tmp, 1); 88 if (tmp[0] == '1' || tmp[0] == '0') 89 { 90 data[count] = static_cast<int>(tmp[0]); 91 ++count; 92 } 93 } 94 } 95 96 void writeToFile(std::string file) 97 { 98 std::ofstream f1(file); 99 if (!f1)100 std::cout << "文件创建失败" << std::endl;101 char tmp[WIDTH];102 for (int i = 0; i < HEIGHT; ++i)103 {104 std::copy(data + i*WIDTH, data + (i + 1)*WIDTH, tmp);105 f1.write(tmp,WIDTH);106 f1.write("\n",1);107 }108 }109 };110 111 112
View Code
Header_simd_2.h
1 #pragma once 2 #include <fstream> 3 #include <iostream> 4 #include <emmintrin.h>//sse2 5 #define D_SCL_SECURE_NO_WARNINGS 6 #define WIDTH 100 7 #define HEIGHT 50 8 #define MAXSIZE 50*100 9 10 class Conway2D_simd_2 11 { 12 const __int16 width; 13 const __int16 height; 14 const __int16 size; 15 __int16 data_s[MAXSIZE + 2 * WIDTH + 2];//这样就可以考虑边界cell的四周的cell了 16 __int16* data = &data_s[WIDTH + 1];//从data_s的WIDTH+1开始,到MAXSIZE+WIDTH+1结束 17 //之后要使用的8个寄存器 18 __m128i _m128_1; 19 __m128i _m128_2; 20 __m128i _m128_r; 21 __m128i _m128_s; 22 23 public: 24 25 Conway2D_simd_2(__int16 width, __int16 height) 26 : width(width), 27 height(height), 28 size(width * height) 29 { 30 for (int i = 0; i < MAXSIZE + 2 * WIDTH + 2; ++i) 31 { 32 data_s[i] = 48; 33 } 34 // data = &data_s[WIDTH + 1]; 35 } 36 37 void iterate() 38 { 39 __int16 prev[MAXSIZE]; 40 // std::copy(data, data + size, prev); 41 __int16 pos_s = 0; 42 for (; pos_s < MAXSIZE; pos_s += 8) 43 { 44 _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH - 1])); 45 _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH])); 46 _m128_r = _mm_add_epi16(_m128_1, _m128_2); 47 _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH + 1])); 48 _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - 1])); 49 _m128_s = _mm_add_epi16(_m128_1, _m128_2); 50 _m128_r = _mm_add_epi16(_m128_r, _m128_s); 51 _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + 1])); 52 _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH - 1])); 53 _m128_s = _mm_add_epi16(_m128_1, _m128_2); 54 _m128_r = _mm_add_epi16(_m128_r, _m128_s); 55 _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH])); 56 _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH + 1])); 57 _m128_s = _mm_add_epi16(_m128_1, _m128_2); 58 _m128_r = _mm_add_epi16(_m128_r, _m128_s); 59 for (int i = 0; i < 8; ++i) 60 { 61 //如果该细胞死亡 62 if (data[pos_s+i] == 0) 63 { 64 // if (_m128_r.m128i_i16[i] == 3) 65 // { 66 // _m128_r.m128i_i16[i] = 1; 67 // } 68 // else 69 // { 70 // _m128_r.m128i_i16[i] = 0; 71 // } 72 _m128_r.m128i_i16[i] = (_m128_r.m128i_i16[i] == 3); 73 } 74 else 75 { 76 //如果该细胞活着 77 // if(_m128_r.m128i_i16[i] == 2 || _m128_r.m128i_i16[i] == 3) 78 // { 79 // _m128_r.m128i_i16[i] = 1; 80 // } 81 // else 82 // { 83 // _m128_r.m128i_i16[i] = 0; 84 // 85 // } 86 _m128_r.m128i_i16[i] = (!(_m128_r.m128i_i16[i] < 2 || _m128_r.m128i_i16[i] > 3)); 87 } 88 } 89 _mm_storeu_si128(reinterpret_cast<__m128i *>(&prev[pos_s]), _m128_r); 90 } 91 std::copy(prev, prev + size, data); 92 } 93 94 95 void readData(std::string file) const 96 { 97 std::ifstream f1(file); 98 if (!f1) 99 std::cout << "文件打开失败" << std::endl;100 char tmp[1];101 __int16 count = 0;102 while (count < MAXSIZE)103 {104 f1.read(tmp, 1);105 if (tmp[0] == '1' || tmp[0] == '0')106 {107 data[count] = static_cast<int>(tmp[0]);108 ++count;109 }110 }111 }112 113 void writeToFile(std::string file) const114 {115 std::ofstream f1(file);116 if (!f1)117 std::cout << "文件创建失败" << std::endl;118 char tmp[WIDTH];119 for (__int16 i = 0; i < HEIGHT; ++i)120 {121 std::copy(data + i * WIDTH, data + (i + 1) * WIDTH, tmp);122 f1.write(tmp, WIDTH);123 f1.write("\n", 1);124 }125 }126 127 void dataFrom1to49()128 {129 for (int i = 0; i < MAXSIZE + 2 * WIDTH + 2; ++i)130 {131 if (data_s[i] == 1)132 {133 data_s[i] = 49;134 }135 else136 {137 data_s[i] = 48;138 }139 }140 }141 142 void dataFrom49to1()143 {144 for (int i = 0; i < MAXSIZE + 2 * WIDTH + 2; ++i)145 {146 if (data_s[i] == 49)147 {148 data_s[i] = 1;149 }150 else151 {152 data_s[i] = 0;153 }154 }155 }156 };
View Code
Source.cpp
1 #include "Header.h" 2 #include <time.h> 3 //#include "Header_simd.h" 4 #include "Header_simd_2.h" 5 #define MAXTIMES 50000 6 void main() 7 { 8 //串行 9 Conway2D conway2_d = Conway2D(100, 50);10 conway2_d.readData("input_50x100");11 clock_t start_time = clock();12 for (int i = 0; i < MAXTIMES; ++i)13 {14 conway2_d.iterate();15 }16 clock_t end_time = clock();17 std::cout << "串行 Runing time is:" << static_cast<double>(end_time - start_time) / CLOCKS_PER_SEC << "s" << std::endl;18 conway2_d.writeToFile("output_Cpp串行");19 20 // //并行21 // Conway2D_simd conway2_d_simd = Conway2D_simd(100, 50);22 // conway2_d_simd.readData("input_50x100");23 // clock_t start_time_simd = clock();24 // for (int i = 0; i < MAXTIMES; ++i)25 // {26 // conway2_d_simd.iterate();27 // }28 // clock_t end_time_simd = clock();29 // std::cout << "Runing time is:" << static_cast<double>(end_time_simd - start_time_simd) / CLOCKS_PER_SEC << "s" << std::endl;30 // conway2_d_simd.writeToFile("output_Cpp并行");31 32 //并行233 Conway2D_simd_2 conway2_d_simd_2 = Conway2D_simd_2(100, 50);34 conway2_d_simd_2.readData("input_50x100");35 conway2_d_simd_2.dataFrom49to1();36 clock_t start_time_simd_2 = clock();37 for (int i = 0; i < MAXTIMES; ++i)38 {39 conway2_d_simd_2.iterate();40 }41 clock_t end_time_simd_2 = clock();42 std::cout << "并行2 Runing time is:" << static_cast<double>(end_time_simd_2 - start_time_simd_2) / CLOCKS_PER_SEC << "s" << std::endl;43 conway2_d_simd_2.dataFrom1to49();44 conway2_d_simd_2.writeToFile("output_Cpp并行_2");45 system("pause");46 }
View Code
五、SIMD介绍和C++并行思路SIMD简要介绍
SIMD intrinsics有一些类似于C语言中的函数,可以被其它代码直接调用,可以像其它函数一样给它传递参数,Intel C++编译器支持SIMD intrinsics(VS2005/2010也支持,GCC也支持),并且可以针对函数进行内联等优化。需要包含的头文件:
#include //MMX#include //SSE (include mmintrin.h)#include //SSE2 (include xmmintrin.h)#include //SSE3 (include emmintrin.h)
这些头文件定了一些数据类型对应于那些SIMD指令要适应的浮点数和整数。
这些数据类型名以两个下划线开始:__m64用于表示一个MMX寄存器的内容,用于64bi的MMX整数指令,可以封装正8个8bit,4个16bit,2个32bit和一个64bit的整数。__m128用于SSE,表示封装了四个32bit的单精度浮点数的SSE寄存器。__m128d可以封装2个64bit的双精度浮点数
__m128i用于表示支持128bi的内存变量,位于16B的边界。声明为__m64的内存变量位于8B的边界。
SSE(一种SIMD指令集)基本操作SSE的基本命令有以下几种:load系列命令(读取连续的内存内容到寄存器中)set系列命令(读取内存内容到寄存器中,与load的区别在于不要连续,可以传入多个参数)算数逻辑系列命令(这些命令和汇编比较类似,执行一些简单的算数和逻辑操作)store系列命令(把寄存器内容存储到内存中)SSE基本操作流程:SSE的操作流程比较单纯,所以目前使用SIMD编程的难度比较大。指令较少,要实现复杂功能比较困难。
- load/set指令把内存数据读取到寄存器中。算数指令对寄存器进行相应的操作。store指令将操作结果存回到内存中。
SSE指令格式 SMD intrinsics函数采用一个非常标准的命名格式,大部分采取:_mm__的格式,函数名以_mm开头,然后表示函数要执行的SIMD指令,比如,add,sub,srli分别表示加法,减法,以为运算,最后是后缀,后缀的一部分给出了药进行运算的函数的数据范围,其中p表示封装操作,s表示变量操作,而ep表示扩展操作,接下来的部分表示要进行运算的数据类型,其中s表示单精度操作,d表示双精度操作,i表示整数操作,u表示无符号整数,数字表示整数的比特数。例如:__m128 _mm_set_ss (float w),__m128 _mm_add_ss (__m128 a, __m128 b)。SSE另外要注意的事项u代表不需要对齐,没有u的命令代表需要对齐。(对齐和连续概念是不一样的)。本例中使用的命令例如:_mm_loadu_si128,是不需要对齐的。如要要求数据对齐,则在分配内存的时候要特别声明。如:
#include <intrin.h> __declspec(align(16)) float op1[4] = {1.0, 2.0, 3.0, 4.0};
关于SIMD的其他博客介绍:http://blog.csdn.net/hellochenlu/article/details/52370757http://www.cnblogs.com/dragon2012/p/5200698.html本例中使用的寄存器和数据类型
typedef union __declspec(intrin_type) __declspec(align(16)) __m128i { __int8 m128i_i8[16]; __int16 m128i_i16[8]; __int32 m128i_i32[4]; __int64 m128i_i64[2]; unsigned __int8 m128i_u8[16]; unsigned __int16 m128i_u16[8]; unsigned __int32 m128i_u32[4]; unsigned __int64 m128i_u64[2];} __m128i;
本例用__m128i存8个__int16数据。
一次迭代的操作
void iterate() { __int16 prev[MAXSIZE]; // std::copy(data, data + size, prev); __int16 pos_s = 0; for (; pos_s < MAXSIZE; pos_s += 8) { _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH - 1])); _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH])); _m128_r = _mm_add_epi16(_m128_1, _m128_2); _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH + 1])); _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - 1])); _m128_s = _mm_add_epi16(_m128_1, _m128_2); _m128_r = _mm_add_epi16(_m128_r, _m128_s); _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + 1])); _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH - 1])); _m128_s = _mm_add_epi16(_m128_1, _m128_2); _m128_r = _mm_add_epi16(_m128_r, _m128_s); _m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH])); _m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH + 1])); _m128_s = _mm_add_epi16(_m128_1, _m128_2); _m128_r = _mm_add_epi16(_m128_r, _m128_s); for (int i = 0; i < 8; ++i) { //如果该细胞死亡 if (data[pos_s+i] == 0) {// if (_m128_r.m128i_i16[i] == 3)// {// _m128_r.m128i_i16[i] = 1;// }// else// {// _m128_r.m128i_i16[i] = 0;// } _m128_r.m128i_i16[i] = (_m128_r.m128i_i16[i] == 3); } else { //如果该细胞活着// if(_m128_r.m128i_i16[i] == 2 || _m128_r.m128i_i16[i] == 3)// {// _m128_r.m128i_i16[i] = 1;// }// else// {// _m128_r.m128i_i16[i] = 0;//// } _m128_r.m128i_i16[i] = (!(_m128_r.m128i_i16[i] < 2 || _m128_r.m128i_i16[i] > 3)); } } _mm_storeu_si128(reinterpret_cast<__m128i *>(&prev[pos_s]), _m128_r); } std::copy(prev, prev + size, data); }
代码中一次取八个数,把这八个数的周边八个(逻辑上)数值依次相加,(//另外说一句,为了防止可能出现周边没有值的情况,也就是边界的cell。在分配数组时,特意多分配2*WIGHT+2个__int16的内存。而data取其中间区域,多余区域设置为0,从而避免数组越界问题。)相加的结果即为count,然后根据条件判断给寄存器赋值,最后写会内存中。因为每一次迭代中,处理的数据之间不能互相影响,所以也要有一次数组拷贝。具体细节请参考上一小节的源码。至此C++并行思路的介绍已经完毕。最后迭代50000次的结果和java一样。时间上并行要比串行块10倍左右(8次一处理,然后再加上是寄存器操作,这种加速比很正常)。但是目前我电脑的CPU被占满了。所以和上次跑的不一样。电脑空闲时候的时间是串行:19秒左右并行:2秒多一点(2.3左右)这次CPU被其他程序占满了90%以上的情况下结果如图:加速比倒是差不多。不过还是可以看出来SSE并行化真的比串行程序快很多啊!往事是尘封在记忆中的梦,而你是我唯一鲜明的记忆,