1 /*
2 *
3 * This is the SyncIO library that uses MPI-IO collective functions to
4 * implement a flexible I/O checkpoint solution for a large number of
5 * processors.
6 *
7 * Previous developer: Ning Liu (liun2@cs.rpi.edu)
8 * Jing Fu (fuj@cs.rpi.edu)
9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu),
10 * Ben Matthews (benjamin.a.matthews@colorado.edu)
11 *
12 */
13
14 #include <map>
15 #include <vector>
16 #include <string>
17 #include <cstdarg>
18 #include <string.h>
19 #include <ctype.h>
20 #include <stdlib.h>
21 #include <stdio.h>
22 #include <math.h>
23 #include <sstream>
24 #include "phastaIO.h"
25 #include "phiotmrc.h"
26 #include "phiompi.h"
27 #include "mpi.h"
28 #include "phiostats.h"
29 #include "phiotimer.h"
30 #include <assert.h>
31
32 /* OS-specific things try to stay here */
33 #include <sys/stat.h>
34 #include <unistd.h>
35 #include <errno.h>
36
37
38 #define VERSION_INFO_HEADER_SIZE 8192
39 #define DB_HEADER_SIZE 1024
40 #define ONE_MEGABYTE 1048576
41 #define TWO_MEGABYTE 2097152
42 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!!
43 #define MAX_PHASTA_FILES 64
44 #define MAX_PHASTA_FILE_NAME_LENGTH 1024
45 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf
46 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles)
47 #define MAX_FIELDS_NAME_LENGTH 128
48 #define DefaultMHSize (4*ONE_MEGABYTE)
49 //#define DefaultMHSize (8350) //For test
50 #define LATEST_WRITE_VERSION 1
51 #define inv1024sq 953.674316406e-9 // = 1/1024/1024
52 int MasterHeaderSize = -1;
53
54 bool PRINT_PERF = false; // default print no perf results
55 int irank = -1; // global rank, should never be manually manipulated
56 int mysize = -1;
57
58 // Static variables are bad but used here to store the subcommunicator and associated variables
59 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it)
60 static int s_assign_local_comm = 0;
61 static MPI_Comm s_local_comm;
62 static int s_local_size = -1;
63 static int s_local_rank = -1;
64
65 // the following defines are for debug printf
66 #define PHASTAIO_DEBUG 0 //default to not print any debugging info
67
phprintf(const char * fmt,...)68 void phprintf(const char* fmt, ...) {
69 (void)fmt;
70 #if PHASTAIO_DEBUG
71 char format[1024];
72 snprintf(format, sizeof(format), "phastaIO debug: %s", fmt);
73 va_list ap;
74 va_start(ap,fmt);
75 vprintf(format,ap);
76 va_end(ap);
77 #endif
78 }
79
phprintf_0(const char * fmt,...)80 void phprintf_0(const char* fmt, ...) {
81 (void)fmt;
82 #if PHASTAIO_DEBUG
83 int rank = 0;
84 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
85 if(rank == 0){
86 char format[1024];
87 snprintf(format, sizeof(format), "phastaIO debug: irank=0 %s", fmt);
88 va_list ap;
89 va_start(ap,fmt);
90 vprintf(format, ap);
91 va_end(ap);
92 }
93 #endif
94 }
95
96 enum PhastaIO_Errors
97 {
98 MAX_PHASTA_FILES_EXCEEDED = -1,
99 UNABLE_TO_OPEN_FILE = -2,
100 NOT_A_MPI_FILE = -3,
101 GPID_EXCEEDED = -4,
102 DATA_TYPE_ILLEGAL = -5
103 };
104
105 using namespace std;
106
107 namespace{
108
109 map<int, std::string> LastHeaderKey;
110 vector< FILE* > fileArray;
111 vector< bool > byte_order;
112 vector< int > header_type;
113 int DataSize=0;
114 bool LastHeaderNotFound = false;
115 bool Wrong_Endian = false ;
116 bool Strict_Error = false ;
117 bool binary_format = true;
118
119 /***********************************************************************/
120 /***************** NEW PHASTA IO CODE STARTS HERE **********************/
121 /***********************************************************************/
122
123 typedef struct
124 {
125 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */
126 unsigned long my_offset;
127 unsigned long next_start_address;
128 unsigned long **my_offset_table;
129 unsigned long **my_read_table;
130
131 double * double_chunk;
132 double * read_double_chunk;
133
134 int field_count;
135 int part_count;
136 int read_field_count;
137 int read_part_count;
138 int GPid;
139 int start_id;
140
141 int mhsize;
142
143 int myrank;
144 int numprocs;
145 int local_myrank;
146 int local_numprocs;
147
148 int nppp;
149 int nPPF;
150 int nFiles;
151 int nFields;
152
153 int * int_chunk;
154 int * read_int_chunk;
155
156 int Wrong_Endian; /* default to false */
157 char * master_header;
158 MPI_File file_handle;
159 MPI_Comm local_comm;
160 } phastaio_file_t;
161
162 typedef struct
163 {
164 int nppf, nfields;
165 char * masterHeader;
166 }serial_file;
167
168 serial_file *SerialFile;
169 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES];
170 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */
171
172 // the caller has the responsibility to delete the returned string
173 // TODO: StringStipper("nbc value? ") returns NULL?
StringStripper(const char istring[])174 char* StringStripper( const char istring[] ) {
175 int length = strlen( istring );
176 char* dest = (char *)malloc( length + 1 );
177 strcpy( dest, istring );
178 dest[ length ] = '\0';
179 if ( char* p = strpbrk( dest, " ") )
180 *p = '\0';
181 return dest;
182 }
183
cscompare(const char teststring[],const char targetstring[])184 inline int cscompare( const char teststring[], const char targetstring[] ) {
185 char* s1 = const_cast<char*>(teststring);
186 char* s2 = const_cast<char*>(targetstring);
187
188 while( *s1 == ' ') s1++;
189 while( *s2 == ' ') s2++;
190 while( ( *s1 )
191 && ( *s2 )
192 && ( *s2 != '?')
193 && ( tolower( *s1 )==tolower( *s2 ) ) ) {
194 s1++;
195 s2++;
196 while( *s1 == ' ') s1++;
197 while( *s2 == ' ') s2++;
198 }
199 if ( !( *s1 ) || ( *s1 == '?') ) return 1;
200 else return 0;
201 }
202
isBinary(const char iotype[])203 inline void isBinary( const char iotype[] ) {
204 char* fname = StringStripper( iotype );
205 if ( cscompare( fname, "binary" ) ) binary_format = true;
206 else binary_format = false;
207 free (fname);
208
209 }
210
typeSize(const char typestring[])211 inline size_t typeSize( const char typestring[] ) {
212 char* ts1 = StringStripper( typestring );
213 if ( cscompare( "integer", ts1 ) ) {
214 free (ts1);
215 return sizeof(int);
216 } else if ( cscompare( "double", ts1 ) ) {
217 free (ts1);
218 return sizeof( double );
219 } else {
220 free (ts1);
221 fprintf(stderr,"unknown type : %s\n",ts1);
222 return 0;
223 }
224 }
225
readHeader(FILE * fileObject,const char phrase[],int * params,int expect)226 int readHeader(
227 FILE* fileObject,
228 const char phrase[],
229 int* params,
230 int expect ) {
231 char* text_header;
232 char* token;
233 char Line[1024] = "\0";
234 char junk;
235 bool FOUND = false ;
236 int real_length;
237 int skip_size, integer_value;
238 int rewind_count=0;
239
240 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) {
241 rewind( fileObject );
242 clearerr( fileObject );
243 rewind_count++;
244 fgets( Line, 1024, fileObject );
245 }
246
247 while( !FOUND && ( rewind_count < 2 ) ) {
248 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ) {
249 text_header = (char *)malloc( real_length + 1 );
250 strncpy( text_header, Line, real_length );
251 text_header[ real_length ] =static_cast<char>(NULL);
252 token = strtok ( text_header, ":" );
253 assert(token);
254 if( cscompare( phrase , token ) ) {
255 FOUND = true ;
256 token = strtok( NULL, " ,;<>" );
257 assert(token);
258 skip_size = atoi( token );
259 int i;
260 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) {
261 assert(token);
262 params[i] = atoi( token );
263 }
264 if ( i < expect ) {
265 fprintf(stderr,
266 "Aloha Expected # of ints not found for: %s\n",phrase );
267 }
268 } else if ( cscompare(token,"byteorder magic number") ) {
269 if ( binary_format ) {
270 fread((void*)&integer_value,sizeof(int),1,fileObject);
271 fread( &junk, sizeof(char), 1 , fileObject );
272 if ( 362436 != integer_value ) Wrong_Endian = true;
273 } else{
274 fscanf(fileObject, "%d\n", &integer_value );
275 }
276 } else {
277 /* some other header, so just skip over */
278 token = strtok( NULL, " ,;<>" );
279 assert(token);
280 skip_size = atoi( token );
281 if ( binary_format)
282 fseek( fileObject, skip_size, SEEK_CUR );
283 else
284 for( int gama=0; gama < skip_size; gama++ )
285 fgets( Line, 1024, fileObject );
286 }
287 free (text_header);
288 } // end of if before while loop
289
290 if ( !FOUND )
291 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) {
292 rewind( fileObject );
293 clearerr( fileObject );
294 rewind_count++;
295 fgets( Line, 1024, fileObject );
296 }
297 }
298
299 if ( !FOUND ) {
300 //fprintf(stderr, "Error: Could not find: %s\n", phrase);
301 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase);
302 return 1;
303 }
304 return 0;
305 }
306
307 } // end unnamed namespace
308
309
310 // begin of publicly visible functions
311
312 /**
313 * This function takes a long long pointer and assign (start) phiotmrc value to it
314 */
startTimer(double * start)315 void startTimer(double* start) {
316 if( !PRINT_PERF ) return;
317 MPI_Barrier(MPI_COMM_WORLD);
318 *start = phiotmrc();
319 }
320
321 /**
322 * This function takes a long long pointer and assign (end) phiotmrc value to it
323 */
endTimer(double * end)324 void endTimer(double* end) {
325 if( !PRINT_PERF ) return;
326 *end = phiotmrc();
327 MPI_Barrier(MPI_COMM_WORLD);
328 }
329
330 /**
331 * choose to print some performance results (or not) according to
332 * the PRINT_PERF macro
333 */
printPerf(const char * func_name,double start,double end,unsigned long datasize,int printdatainfo,const char * extra_msg)334 void printPerf(
335 const char* func_name,
336 double start,
337 double end,
338 unsigned long datasize,
339 int printdatainfo,
340 const char* extra_msg) {
341 if( !PRINT_PERF ) return;
342 unsigned long data_size = datasize;
343 double time = end - start;
344 unsigned long isizemin,isizemax,isizetot;
345 double sizemin,sizemax,sizeavg,sizetot,rate;
346 double tmin, tmax, tavg, ttot;
347
348 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD);
349 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD);
350 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
351 tavg = ttot/mysize;
352
353 if(irank == 0) {
354 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP ");
355 else printf("** syncIO ");
356 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg);
357 }
358
359 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size
360 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD);
361 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD);
362 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD);
363
364 sizemin=(double)(isizemin*inv1024sq);
365 sizemax=(double)(isizemax*inv1024sq);
366 sizetot=(double)(isizetot*inv1024sq);
367 sizeavg=(double)(1.0*sizetot/mysize);
368 rate=(double)(1.0*sizetot/tmax);
369
370 if( irank == 0) {
371 printf(", Rate = %f MB/s [%s] \n \t\t\t"
372 " block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n",
373 rate, extra_msg, sizemin,sizemax,sizeavg,sizetot);
374 }
375 }
376 else {
377 if(irank == 0) {
378 printf(" \n");
379 //printf(" (%s) \n", extra_msg);
380 }
381 }
382 }
383
384 /**
385 * This function is normally called at the beginning of a read operation, before
386 * init function.
387 * This function (uses rank 0) reads out nfields, nppf, master header size,
388 * endianess and allocates for masterHeader string.
389 * These values are essential for following read operations. Rank 0 will bcast
390 * these values to other ranks in the commm world
391 *
392 * If the file set is of old POSIX format, it would throw error and exit
393 */
queryphmpiio(const char filename[],int * nfields,int * nppf)394 void queryphmpiio(const char filename[],int *nfields, int *nppf)
395 {
396 MPI_Comm_rank(MPI_COMM_WORLD, &irank);
397 MPI_Comm_size(MPI_COMM_WORLD, &mysize);
398
399 if(irank == 0) {
400 FILE * fileHandle;
401 char* fname = StringStripper( filename );
402
403 PHASTAIO_OPENTIME(fileHandle = fopen (fname,"rb");)
404 if (fileHandle == NULL ) {
405 printf("\nError: File %s doesn't exist! Please check!\n",fname);
406 }
407 else {
408 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) );
409 int meta_size_limit = VERSION_INFO_HEADER_SIZE;
410 SerialFile->masterHeader = (char *)malloc( meta_size_limit );
411 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle);
412
413 char read_out_tag[MAX_FIELDS_NAME_LENGTH];
414 char version[MAX_FIELDS_NAME_LENGTH/4];
415 int mhsize;
416 char * token;
417 int magic_number;
418
419 memcpy( read_out_tag,
420 SerialFile->masterHeader,
421 MAX_FIELDS_NAME_LENGTH-1 );
422
423 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) {
424 // Test endianess ...
425 memcpy (&magic_number,
426 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0"
427 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format
428
429 if ( magic_number != ENDIAN_TEST_NUMBER ) {
430 printf("Endian is different!\n");
431 // Will do swap later
432 }
433
434 // test version, old version, default masterheader size is 4M
435 // newer version masterheader size is read from first line
436 memcpy(version,
437 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2,
438 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1?
439
440 if( cscompare ("version",version) ) {
441 // if there is "version" tag in the file, then it is newer format
442 // read master header size from here, otherwise use default
443 // Note: if version is "1", we know mhsize is at 3/4 place...
444
445 token = strtok(version, ":");
446 token = strtok(NULL, " ,;<>" );
447 int iversion = atoi(token);
448
449 if( iversion == 1) {
450 memcpy( &mhsize,
451 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1,
452 sizeof(int));
453 if ( magic_number != ENDIAN_TEST_NUMBER )
454 SwapArrayByteOrder(&mhsize, sizeof(int), 1);
455
456 if( mhsize > DefaultMHSize ) {
457 //if actual headersize is larger than default, let's re-read
458 free(SerialFile->masterHeader);
459 SerialFile->masterHeader = (char *)malloc(mhsize);
460 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position
461 fread(SerialFile->masterHeader,1,mhsize,fileHandle);
462 }
463 }
464 //TODO: check if this is a valid int??
465 MasterHeaderSize = mhsize;
466 }
467 else { // else it's version 0's format w/o version tag, implicating MHSize=4M
468 MasterHeaderSize = DefaultMHSize;
469 }
470
471 memcpy( read_out_tag,
472 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1,
473 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1
474
475 // Read in # fields ...
476 token = strtok( read_out_tag, ":" );
477 token = strtok( NULL," ,;<>" );
478 *nfields = atoi( token );
479 if ( *nfields > MAX_FIELDS_NUMBER) {
480 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n");
481 }
482 SerialFile->nfields=*nfields; //TODO: sanity check of this int?
483
484 memcpy( read_out_tag,
485 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2
486 + *nfields * MAX_FIELDS_NAME_LENGTH,
487 MAX_FIELDS_NAME_LENGTH);
488
489 token = strtok( read_out_tag, ":" );
490 token = strtok( NULL," ,;<>" );
491 *nppf = atoi( token );
492 SerialFile->nppf=*nppf; //TODO: sanity check of int
493 } // end of if("MPI_IO_TAG")
494 else {
495 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag);
496 exit(1);
497 }
498 PHASTAIO_CLOSETIME(fclose(fileHandle);)
499 free(SerialFile->masterHeader);
500 free(SerialFile);
501 } //end of else
502 free(fname);
503 }
504
505 // Bcast value to every one
506 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD );
507 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD );
508 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD );
509 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize);
510 }
511
512 /**
513 * This function computes the right master header size (round to size of 2^n).
514 * This is only needed for file format version 1 in "write" mode.
515 */
computeMHSize(int nfields,int nppf,int version)516 int computeMHSize(int nfields, int nppf, int version) {
517 int mhsize=0;
518 if(version == 1) {
519 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf
520 int meta_info_size = VERSION_INFO_HEADER_SIZE;
521 int actual_size = nfields * nppf * sizeof(unsigned long) + meta_info_size;
522 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long));
523 if (actual_size > DefaultMHSize) {
524 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value
525 mhsize *= DefaultMHSize;
526 }
527 else {
528 mhsize = DefaultMHSize;
529 }
530 } else {
531 int rank = 0;
532 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
533 if(!rank) {
534 fprintf(stderr,
535 "ERROR invalid version passed to %s... exiting\n", __func__);
536 exit(EXIT_FAILURE);
537 }
538 }
539 return mhsize;
540 }
541
542 /**
543 * Computes correct color of a rank according to number of files.
544 */
computeColor(int myrank,int numprocs,int nfiles)545 extern "C" int computeColor( int myrank, int numprocs, int nfiles) {
546 int color =
547 (int)(myrank / (numprocs / nfiles));
548 return color;
549 }
550
551
552 /**
553 * Check the file descriptor.
554 */
checkFileDescriptor(const char fctname[],int * fileDescriptor)555 void checkFileDescriptor(const char fctname[], int* fileDescriptor ) {
556 if ( *fileDescriptor < 0 ) {
557 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname);
558 exit(1);
559 }
560 }
561
562 /**
563 * Initialize the file struct members and allocate space for file struct
564 * buffers.
565 *
566 * Note: this function is only called when we are using new format. Old POSIX
567 * format should skip this routine and call openfile() directly instead.
568 */
initphmpiio(int * nfields,int * nppf,int * nfiles,int * filehandle,const char mode[])569 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[])
570 {
571 // we init irank again in case query not called (e.g. syncIO write case)
572 MPI_Comm_rank(MPI_COMM_WORLD, &irank);
573 MPI_Comm_size(MPI_COMM_WORLD, &mysize);
574
575 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d, nfields %d, nppf %d, nfiles %d\n", irank, MasterHeaderSize, *nfields, *nppf, *nfiles);
576
577 double timer_start, timer_end;
578 startTimer(&timer_start);
579
580 char* imode = StringStripper( mode );
581
582 // Note: if it's read, we presume query was called prior to init and
583 // MasterHeaderSize is already set to correct value from parsing header
584 // otherwise it's write then it needs some computation to be set
585 if ( cscompare( "read", imode ) ) {
586 // do nothing
587 }
588 else if( cscompare( "write", imode ) ) {
589 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION);
590 }
591 else {
592 printf("Error initphmpiio: can't recognize the mode %s", imode);
593 exit(1);
594 }
595 free ( imode );
596
597 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize);
598
599 int i, j;
600
601 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) {
602 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES");
603 endTimer(&timer_end);
604 printPerf("initphmpiio", timer_start, timer_end, 0, 0, "");
605 return MAX_PHASTA_FILES_EXCEEDED;
606 }
607 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid
608 // {
609 // for( i = 0; i < MAX_PHASTA_FILES; i++ );
610 // {
611 // PhastaIOActiveFiles[i] = NULL;
612 // }
613 // }
614
615
616 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) );
617
618 i = PhastaIONextActiveIndex;
619 PhastaIONextActiveIndex++;
620
621 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE;
622
623 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO
624
625 PhastaIOActiveFiles[i]->Wrong_Endian = false;
626
627 PhastaIOActiveFiles[i]->nFields = *nfields;
628 PhastaIOActiveFiles[i]->nPPF = *nppf;
629 PhastaIOActiveFiles[i]->nFiles = *nfiles;
630 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank));
631 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs));
632
633
634 if( *nfiles > 1 ) { // split the ranks according to each mpiio file
635
636 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time
637
638 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n");
639
640 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles);
641 MPI_Comm_split(MPI_COMM_WORLD,
642 color,
643 PhastaIOActiveFiles[i]->myrank,
644 &(PhastaIOActiveFiles[i]->local_comm));
645 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm,
646 &(PhastaIOActiveFiles[i]->local_numprocs));
647 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm,
648 &(PhastaIOActiveFiles[i]->local_myrank));
649
650 // back up now these variables so that we do not need to call comm_split again
651 s_local_comm = PhastaIOActiveFiles[i]->local_comm;
652 s_local_size = PhastaIOActiveFiles[i]->local_numprocs;
653 s_local_rank = PhastaIOActiveFiles[i]->local_myrank;
654 s_assign_local_comm = 1;
655 }
656 else { // recycle the subcommunicator
657 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n");
658 PhastaIOActiveFiles[i]->local_comm = s_local_comm;
659 PhastaIOActiveFiles[i]->local_numprocs = s_local_size;
660 PhastaIOActiveFiles[i]->local_myrank = s_local_rank;
661 }
662 }
663 else { // *nfiles == 1 here - no need to call mpi_comm_split here
664
665 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n");
666 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD;
667 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs;
668 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank;
669
670 }
671
672 PhastaIOActiveFiles[i]->nppp =
673 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs;
674
675 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF *
676 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) +
677 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp);
678
679 PhastaIOActiveFiles[i]->my_offset_table =
680 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) );
681
682 PhastaIOActiveFiles[i]->my_read_table =
683 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) );
684
685 for (j=0; j<*nfields; j++)
686 {
687 PhastaIOActiveFiles[i]->my_offset_table[j] =
688 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) );
689
690 PhastaIOActiveFiles[i]->my_read_table[j] =
691 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) );
692 }
693 *filehandle = i;
694
695 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char ));
696 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double ));
697 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int ));
698 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double ));
699 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int ));
700
701 // Time monitoring
702 endTimer(&timer_end);
703 printPerf("initphmpiio", timer_start, timer_end, 0, 0, "");
704
705 phprintf_0("Info initphmpiio: quiting function");
706
707 return i;
708 }
709
710 /**
711 * Destruct the file struct and free buffers allocated in init function.
712 */
finalizephmpiio(int * fileDescriptor)713 void finalizephmpiio( int *fileDescriptor )
714 {
715 double timer_start, timer_end;
716 startTimer(&timer_start);
717
718 int i, j;
719 i = *fileDescriptor;
720 //PhastaIONextActiveIndex--;
721
722 /* //free the offset table for this phasta file */
723 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL
724 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++)
725 {
726 free( PhastaIOActiveFiles[i]->my_offset_table[j]);
727 free( PhastaIOActiveFiles[i]->my_read_table[j]);
728 }
729 free ( PhastaIOActiveFiles[i]->my_offset_table );
730 free ( PhastaIOActiveFiles[i]->my_read_table );
731 free ( PhastaIOActiveFiles[i]->master_header );
732 free ( PhastaIOActiveFiles[i]->double_chunk );
733 free ( PhastaIOActiveFiles[i]->int_chunk );
734 free ( PhastaIOActiveFiles[i]->read_double_chunk );
735 free ( PhastaIOActiveFiles[i]->read_int_chunk );
736
737 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split
738 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n");
739 s_assign_local_comm = 0;
740 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm));
741 }
742
743 free( PhastaIOActiveFiles[i]);
744
745 endTimer(&timer_end);
746 printPerf("finalizempiio", timer_start, timer_end, 0, 0, "");
747
748 PhastaIONextActiveIndex--;
749 }
750
751
752 /**
753 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now)
754 * Initialize the file struct members and allocate space for file struct buffers.
755 *
756 * Note: this function is only called when we are using new format. Old POSIX
757 * format should skip this routine and call openfile() directly instead.
758 */
initphmpiiosub(int * nfields,int * nppf,int * nfiles,int * filehandle,const char mode[],MPI_Comm my_local_comm)759 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm)
760 {
761 // we init irank again in case query not called (e.g. syncIO write case)
762
763 MPI_Comm_rank(my_local_comm, &irank);
764 MPI_Comm_size(my_local_comm, &mysize);
765
766 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize);
767
768 double timer_start, timer_end;
769 startTimer(&timer_start);
770
771 char* imode = StringStripper( mode );
772
773 // Note: if it's read, we presume query was called prior to init and
774 // MasterHeaderSize is already set to correct value from parsing header
775 // otherwise it's write then it needs some computation to be set
776 if ( cscompare( "read", imode ) ) {
777 // do nothing
778 }
779 else if( cscompare( "write", imode ) ) {
780 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION);
781 }
782 else {
783 printf("Error initphmpiio: can't recognize the mode %s", imode);
784 exit(1);
785 }
786 free ( imode );
787
788 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize);
789
790 int i, j;
791
792 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) {
793 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES");
794 endTimer(&timer_end);
795 printPerf("initphmpiio", timer_start, timer_end, 0, 0, "");
796 return MAX_PHASTA_FILES_EXCEEDED;
797 }
798 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid
799 // {
800 // for( i = 0; i < MAX_PHASTA_FILES; i++ );
801 // {
802 // PhastaIOActiveFiles[i] = NULL;
803 // }
804 // }
805
806
807 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) );
808
809 i = PhastaIONextActiveIndex;
810 PhastaIONextActiveIndex++;
811
812 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE;
813
814 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO
815
816 PhastaIOActiveFiles[i]->Wrong_Endian = false;
817
818 PhastaIOActiveFiles[i]->nFields = *nfields;
819 PhastaIOActiveFiles[i]->nPPF = *nppf;
820 PhastaIOActiveFiles[i]->nFiles = *nfiles;
821 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank));
822 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs));
823
824 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles);
825 MPI_Comm_split(my_local_comm,
826 color,
827 PhastaIOActiveFiles[i]->myrank,
828 &(PhastaIOActiveFiles[i]->local_comm));
829 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm,
830 &(PhastaIOActiveFiles[i]->local_numprocs));
831 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm,
832 &(PhastaIOActiveFiles[i]->local_myrank));
833 PhastaIOActiveFiles[i]->nppp =
834 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs;
835
836 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF *
837 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) +
838 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp);
839
840 PhastaIOActiveFiles[i]->my_offset_table =
841 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) );
842
843 PhastaIOActiveFiles[i]->my_read_table =
844 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) );
845
846 for (j=0; j<*nfields; j++)
847 {
848 PhastaIOActiveFiles[i]->my_offset_table[j] =
849 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) );
850
851 PhastaIOActiveFiles[i]->my_read_table[j] =
852 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) );
853 }
854 *filehandle = i;
855
856 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char ));
857 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double ));
858 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int ));
859 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double ));
860 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int ));
861
862 // Time monitoring
863 endTimer(&timer_end);
864 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, "");
865
866 phprintf_0("Info initphmpiiosub: quiting function");
867
868 return i;
869 }
870
871 namespace {
872
873 enum {
874 DIR_MODE = S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH
875 };
876
my_mkdir(std::string name)877 bool my_mkdir(std::string name) {
878 if(name.empty())
879 return true;
880 errno = 0;
881 int err = mkdir(name.c_str(), DIR_MODE);
882 if ((err == -1) && (errno == EEXIST)) {
883 errno = 0;
884 err = 0;
885 return false;
886 }
887 assert(!err);
888 return true;
889 }
890
891 enum {
892 DIR_FANOUT = 2048
893 };
894
getSubDirPrefix()895 std::string getSubDirPrefix() {
896 if (phio_peers() <= DIR_FANOUT)
897 return string("");
898 int self = phio_self();
899 int subSelf = self % DIR_FANOUT;
900 int subGroup = self / DIR_FANOUT;
901 std::stringstream ss;
902 ss << subGroup << '/';
903 return ss.str();
904 }
905 }
906
907 /** open file for both POSIX and MPI-IO syncIO format.
908 *
909 * If it's old POSIX format, simply call posix fopen().
910 *
911 * If it's MPI-IO foramt:
912 * in "read" mode, it builds the header table that points to the offset of
913 * fields for parts;
914 * in "write" mode, it opens the file with MPI-IO open routine.
915 */
openfile(const char filename[],const char mode[],int * fileDescriptor)916 void openfile(const char filename[], const char mode[], int* fileDescriptor )
917 {
918 phprintf_0("Info: entering openfile");
919
920 double timer_start, timer_end;
921 startTimer(&timer_start);
922
923 if ( PhastaIONextActiveIndex == 0 )
924 {
925 FILE* file=NULL ;
926 *fileDescriptor = 0;
927 char* fname = StringStripper( filename );
928 char* imode = StringStripper( mode );
929
930 std::string posixname = getSubDirPrefix();
931 if (!phio_self())
932 my_mkdir(posixname);
933 phio_barrier();
934 posixname += string(fname);
935 if ( cscompare( "read", imode ) )
936 PHASTAIO_OPENTIME(file = fopen(posixname.c_str(), "rb" );)
937 else if( cscompare( "write", imode ) )
938 PHASTAIO_OPENTIME(file = fopen(posixname.c_str(), "wb" );)
939 else if( cscompare( "append", imode ) )
940 PHASTAIO_OPENTIME(file = fopen(posixname.c_str(), "ab" );)
941
942 if ( !file ){
943 fprintf(stderr,"Error openfile: unable to open file %s\n",fname);
944 } else {
945 fileArray.push_back( file );
946 byte_order.push_back( false );
947 header_type.push_back( sizeof(int) );
948 *fileDescriptor = fileArray.size();
949 }
950 free (fname);
951 free (imode);
952 }
953 else // else it would be parallel I/O, opposed to posix io
954 {
955 char* fname = StringStripper( filename );
956 char* imode = StringStripper( mode );
957 int rc;
958 int i = *fileDescriptor;
959 checkFileDescriptor("openfile",&i);
960 char* token;
961
962 if ( cscompare( "read", imode ) )
963 {
964 // if (PhastaIOActiveFiles[i]->myrank == 0)
965 // printf("\n **********\nRead open ... ... regular version\n");
966
967 PHASTAIO_OPENTIME(
968 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm,
969 fname,
970 MPI_MODE_RDONLY,
971 MPI_INFO_NULL,
972 &(PhastaIOActiveFiles[i]->file_handle) );
973 )
974
975 if(rc)
976 {
977 *fileDescriptor = UNABLE_TO_OPEN_FILE;
978 int error_string_length;
979 char error_string[4096];
980 MPI_Error_string(rc, error_string, &error_string_length);
981 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string);
982 endTimer(&timer_end);
983 printPerf("openfile", timer_start, timer_end, 0, 0, "");
984 return;
985 }
986
987 MPI_Status read_tag_status;
988 char read_out_tag[MAX_FIELDS_NAME_LENGTH];
989 int j;
990 int magic_number;
991
992 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) {
993 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle,
994 0,
995 PhastaIOActiveFiles[i]->master_header,
996 MasterHeaderSize,
997 MPI_CHAR,
998 &read_tag_status );
999 }
1000
1001 MPI_Bcast( PhastaIOActiveFiles[i]->master_header,
1002 MasterHeaderSize,
1003 MPI_CHAR,
1004 0,
1005 PhastaIOActiveFiles[i]->local_comm );
1006
1007 memcpy( read_out_tag,
1008 PhastaIOActiveFiles[i]->master_header,
1009 MAX_FIELDS_NAME_LENGTH-1 );
1010
1011 if ( cscompare ("MPI_IO_Tag",read_out_tag) )
1012 {
1013 // Test endianess ...
1014 memcpy ( &magic_number,
1015 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0"
1016 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format
1017
1018 if ( magic_number != ENDIAN_TEST_NUMBER )
1019 {
1020 PhastaIOActiveFiles[i]->Wrong_Endian = true;
1021 }
1022
1023 memcpy( read_out_tag,
1024 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1???
1025 MAX_FIELDS_NAME_LENGTH );
1026
1027 // Read in # fields ...
1028 token = strtok ( read_out_tag, ":" );
1029 token = strtok( NULL," ,;<>" );
1030 PhastaIOActiveFiles[i]->nFields = atoi( token );
1031
1032 unsigned long **header_table;
1033 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *));
1034
1035 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ )
1036 {
1037 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long));
1038 }
1039
1040 // Read in the offset table ...
1041 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ )
1042 {
1043 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) {
1044 memcpy( header_table[j],
1045 PhastaIOActiveFiles[i]->master_header +
1046 VERSION_INFO_HEADER_SIZE +
1047 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long),
1048 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) );
1049 }
1050
1051 MPI_Scatter( header_table[j],
1052 PhastaIOActiveFiles[i]->nppp,
1053 MPI_LONG_LONG_INT,
1054 PhastaIOActiveFiles[i]->my_read_table[j],
1055 PhastaIOActiveFiles[i]->nppp,
1056 MPI_LONG_LONG_INT,
1057 0,
1058 PhastaIOActiveFiles[i]->local_comm );
1059
1060 // Swap byte order if endianess is different ...
1061 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) {
1062 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j],
1063 sizeof(unsigned long),
1064 PhastaIOActiveFiles[i]->nppp );
1065 }
1066 }
1067
1068 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) {
1069 free ( header_table[j] );
1070 }
1071 free (header_table);
1072
1073 } // end of if MPI_IO_TAG
1074 else //else not valid MPI file
1075 {
1076 *fileDescriptor = NOT_A_MPI_FILE;
1077 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag);
1078 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP
1079 endTimer(&timer_end);
1080 printPerf("openfile", timer_start, timer_end, 0, 0, "");
1081 return;
1082 }
1083 } // end of if "read"
1084 else if( cscompare( "write", imode ) )
1085 {
1086 PHASTAIO_OPENTIME(
1087 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm,
1088 fname,
1089 MPI_MODE_WRONLY | MPI_MODE_CREATE,
1090 MPI_INFO_NULL,
1091 &(PhastaIOActiveFiles[i]->file_handle) );
1092 )
1093 if(rc != MPI_SUCCESS)
1094 {
1095 *fileDescriptor = UNABLE_TO_OPEN_FILE;
1096 int error_string_length;
1097 char error_string[4096];
1098 MPI_Error_string(rc, error_string, &error_string_length);
1099 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string);
1100 return;
1101 }
1102 } // end of if "write"
1103 free (fname);
1104 free (imode);
1105 } // end of if FileIndex != 0
1106
1107 endTimer(&timer_end);
1108 printPerf("openfile", timer_start, timer_end, 0, 0, "");
1109 }
1110
1111 /** close file for both POSIX and MPI-IO syncIO format.
1112 *
1113 * If it's old POSIX format, simply call posix fclose().
1114 *
1115 * If it's MPI-IO foramt:
1116 * in "read" mode, it simply close file with MPI-IO close routine.
1117 * in "write" mode, rank 0 in each group will re-assemble the master header and
1118 * offset table and write to the beginning of file, then close the file.
1119 */
closefile(int * fileDescriptor,const char mode[])1120 void closefile( int* fileDescriptor, const char mode[] )
1121 {
1122 double timer_start, timer_end;
1123 startTimer(&timer_start);
1124
1125 int i = *fileDescriptor;
1126 checkFileDescriptor("closefile",&i);
1127
1128 if ( PhastaIONextActiveIndex == 0 ) {
1129 char* imode = StringStripper( mode );
1130
1131 if( cscompare( "write", imode )
1132 || cscompare( "append", imode ) ) {
1133 fflush( fileArray[ *fileDescriptor - 1 ] );
1134 }
1135
1136 PHASTAIO_CLOSETIME(fclose( fileArray[ *fileDescriptor - 1 ] );)
1137 free (imode);
1138 }
1139 else {
1140 char* imode = StringStripper( mode );
1141
1142 //write master header here:
1143 if ( cscompare( "write", imode ) ) {
1144 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED
1145 // MasterHeaderSize = 4*ONE_MEGABYTE;
1146 // else
1147 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE;
1148
1149 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION);
1150 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize);
1151
1152 MPI_Status write_header_status;
1153 char mpi_tag[MAX_FIELDS_NAME_LENGTH];
1154 char version[MAX_FIELDS_NAME_LENGTH/4];
1155 char mhsize[MAX_FIELDS_NAME_LENGTH/4];
1156 int magic_number = ENDIAN_TEST_NUMBER;
1157
1158 if ( PhastaIOActiveFiles[i]->local_myrank == 0 )
1159 {
1160 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH);
1161 sprintf(mpi_tag, "MPI_IO_Tag : ");
1162 memcpy(PhastaIOActiveFiles[i]->master_header,
1163 mpi_tag,
1164 MAX_FIELDS_NAME_LENGTH);
1165
1166 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4);
1167 // this version is "1", print version in ASCII
1168 sprintf(version, "version : %d",1);
1169 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2,
1170 version,
1171 MAX_FIELDS_NAME_LENGTH/4);
1172
1173 // master header size is computed using the formula above
1174 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4);
1175 sprintf(mhsize, "mhsize : ");
1176 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3,
1177 mhsize,
1178 MAX_FIELDS_NAME_LENGTH/4);
1179
1180 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH);
1181 sprintf(mpi_tag,
1182 "\nnFields : %d\n",
1183 PhastaIOActiveFiles[i]->nFields);
1184 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH,
1185 mpi_tag,
1186 MAX_FIELDS_NAME_LENGTH);
1187
1188 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH);
1189 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF);
1190 memcpy( PhastaIOActiveFiles[i]->master_header+
1191 PhastaIOActiveFiles[i]->nFields *
1192 MAX_FIELDS_NAME_LENGTH +
1193 MAX_FIELDS_NAME_LENGTH * 2,
1194 mpi_tag,
1195 MAX_FIELDS_NAME_LENGTH);
1196
1197 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0"
1198 &magic_number,
1199 sizeof(int));
1200
1201 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3,
1202 &MasterHeaderSize,
1203 sizeof(int));
1204 }
1205
1206 int j = 0;
1207 unsigned long **header_table;
1208 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *));
1209
1210 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) {
1211 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long));
1212 }
1213
1214 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank);
1215 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) {
1216 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j],
1217 PhastaIOActiveFiles[i]->nppp,
1218 MPI_LONG_LONG_INT,
1219 header_table[j],
1220 PhastaIOActiveFiles[i]->nppp,
1221 MPI_LONG_LONG_INT,
1222 0,
1223 PhastaIOActiveFiles[i]->local_comm );
1224 }
1225
1226 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) {
1227
1228 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank);
1229 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) {
1230 memcpy ( PhastaIOActiveFiles[i]->master_header +
1231 VERSION_INFO_HEADER_SIZE +
1232 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long),
1233 header_table[j],
1234 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) );
1235 }
1236
1237 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank);
1238 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle,
1239 0,
1240 PhastaIOActiveFiles[i]->master_header,
1241 MasterHeaderSize,
1242 MPI_CHAR,
1243 &write_header_status );
1244 }
1245
1246 ////free(PhastaIOActiveFiles[i]->master_header);
1247
1248 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) {
1249 free ( header_table[j] );
1250 }
1251 free (header_table);
1252 }
1253
1254 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank);
1255 PHASTAIO_CLOSETIME(
1256 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) );
1257 )
1258 free ( imode );
1259 }
1260
1261 endTimer(&timer_end);
1262 printPerf("closefile_", timer_start, timer_end, 0, 0, "");
1263 }
1264
readHeader(FILE * f,const char phrase[],int * params,int numParams,const char * iotype)1265 int readHeader( FILE* f, const char phrase[],
1266 int* params, int numParams, const char* iotype) {
1267 isBinary(iotype);
1268 return readHeader(f,phrase,params,numParams);
1269 }
1270
readheader(int * fileDescriptor,const char keyphrase[],void * valueArray,int * nItems,const char datatype[],const char iotype[])1271 void readheader(
1272 int* fileDescriptor,
1273 const char keyphrase[],
1274 void* valueArray,
1275 int* nItems,
1276 const char datatype[],
1277 const char iotype[] )
1278 {
1279 double timer_start, timer_end;
1280
1281 startTimer(&timer_start);
1282
1283 int i = *fileDescriptor;
1284 checkFileDescriptor("readheader",&i);
1285
1286 if ( PhastaIONextActiveIndex == 0 ) {
1287 int filePtr = *fileDescriptor - 1;
1288 FILE* fileObject;
1289 int* valueListInt;
1290
1291 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) {
1292 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor);
1293 fprintf(stderr,"openfile_ function has to be called before \n") ;
1294 fprintf(stderr,"acessing the file\n ") ;
1295 fprintf(stderr,"fatal error: cannot continue, returning out of call\n");
1296 endTimer(&timer_end);
1297 printPerf("readheader", timer_start, timer_end, 0, 0, "");
1298 return;
1299 }
1300
1301 LastHeaderKey[filePtr] = std::string(keyphrase);
1302 LastHeaderNotFound = false;
1303
1304 fileObject = fileArray[ filePtr ] ;
1305 Wrong_Endian = byte_order[ filePtr ];
1306
1307 isBinary( iotype );
1308 typeSize( datatype ); //redundant call, just avoid a compiler warning.
1309
1310 // right now we are making the assumption that we will only write integers
1311 // on the header line.
1312
1313 valueListInt = static_cast< int* >( valueArray );
1314 int ierr = readHeader( fileObject ,
1315 keyphrase,
1316 valueListInt,
1317 *nItems ) ;
1318
1319 byte_order[ filePtr ] = Wrong_Endian ;
1320
1321 if ( ierr ) LastHeaderNotFound = true;
1322
1323 //return ; // don't return, go to the end to print perf
1324 }
1325 else {
1326 int* valueListInt;
1327 valueListInt = static_cast <int*>(valueArray);
1328 char* token = NULL;
1329 bool FOUND = false ;
1330 isBinary( iotype );
1331
1332 MPI_Status read_offset_status;
1333 char read_out_tag[MAX_FIELDS_NAME_LENGTH];
1334 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH);
1335 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH];
1336 int j;
1337
1338 int string_length = strlen( keyphrase );
1339 char* buffer = (char*) malloc ( string_length+1 );
1340
1341 strcpy ( buffer, keyphrase );
1342 buffer[ string_length ] = '\0';
1343
1344 char* st2 = strtok ( buffer, "@" );
1345 st2 = strtok (NULL, "@");
1346 PhastaIOActiveFiles[i]->GPid = atoi(st2);
1347 if ( char* p = strpbrk(buffer, "@") )
1348 *p = '\0';
1349
1350 // Check if the user has input the right GPid
1351 if ( ( PhastaIOActiveFiles[i]->GPid <=
1352 PhastaIOActiveFiles[i]->myrank *
1353 PhastaIOActiveFiles[i]->nppp )||
1354 ( PhastaIOActiveFiles[i]->GPid >
1355 ( PhastaIOActiveFiles[i]->myrank + 1 ) *
1356 PhastaIOActiveFiles[i]->nppp ) )
1357 {
1358 *fileDescriptor = NOT_A_MPI_FILE;
1359 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase);
1360 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase
1361 endTimer(&timer_end);
1362 printPerf("readheader", timer_start, timer_end, 0, 0, "");
1363 return;
1364 }
1365
1366 // Find the field we want ...
1367 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ )
1368 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ )
1369 {
1370 memcpy( readouttag[j],
1371 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1,
1372 MAX_FIELDS_NAME_LENGTH-1 );
1373 }
1374
1375 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ )
1376 {
1377 token = strtok ( readouttag[j], ":" );
1378
1379 //if ( cscompare( buffer, token ) )
1380 if ( cscompare( token , buffer ) && cscompare( buffer, token ) )
1381 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh").
1382 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc).
1383 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character.
1384 {
1385 PhastaIOActiveFiles[i]->read_field_count = j;
1386 FOUND = true;
1387 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j);
1388 break;
1389 }
1390 }
1391 free(buffer);
1392
1393 if (!FOUND)
1394 {
1395 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here.
1396 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase);
1397 endTimer(&timer_end);
1398 printPerf("readheader", timer_start, timer_end, 0, 0, "");
1399 return;
1400 }
1401
1402 // Find the part we want ...
1403 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid -
1404 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1;
1405
1406 PhastaIOActiveFiles[i]->my_offset =
1407 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count];
1408
1409 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset);
1410
1411 // Read each datablock header here ...
1412
1413 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle,
1414 PhastaIOActiveFiles[i]->my_offset+1,
1415 read_out_tag,
1416 MAX_FIELDS_NAME_LENGTH-1,
1417 MPI_CHAR,
1418 &read_offset_status );
1419 token = strtok ( read_out_tag, ":" );
1420
1421 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag);
1422
1423 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem.
1424 {
1425 FOUND = true ;
1426 token = strtok( NULL, " ,;<>" );
1427 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ )
1428 valueListInt[j] = atoi( token );
1429
1430 if ( j < *nItems )
1431 {
1432 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase );
1433 }
1434 }
1435 else {
1436 //if(irank==0)
1437 if(PhastaIOActiveFiles[i]->myrank == 0)
1438 // If we enter this if, there is a problem with the name of some fields
1439 {
1440 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token);
1441 }
1442 }
1443 }
1444
1445 endTimer(&timer_end);
1446 printPerf("readheader", timer_start, timer_end, 0, 0, "");
1447
1448 }
1449
readDataBlock(FILE * fileObject,void * valueArray,int nItems,const char datatype[],const char iotype[])1450 void readDataBlock(
1451 FILE* fileObject,
1452 void* valueArray,
1453 int nItems,
1454 const char datatype[],
1455 const char iotype[] )
1456 {
1457 isBinary(iotype);
1458 size_t type_size = typeSize( datatype );
1459 phastaioTime t0,t1;
1460 phastaio_time(&t0);
1461 if ( binary_format ) {
1462 char junk = '\0';
1463 fread( valueArray, type_size, nItems, fileObject );
1464 fread( &junk, sizeof(char), 1 , fileObject );
1465 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nItems );
1466 } else {
1467 char* ts1 = StringStripper( datatype );
1468 if ( cscompare( "integer", ts1 ) ) {
1469 for( int n=0; n < nItems ; n++ )
1470 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) );
1471 } else if ( cscompare( "double", ts1 ) ) {
1472 for( int n=0; n < nItems ; n++ )
1473 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) );
1474 }
1475 free (ts1);
1476 }
1477 phastaio_time(&t1);
1478 const size_t elapsed = phastaio_time_diff(&t0,&t1);
1479 phastaio_addReadTime(elapsed);
1480 phastaio_addReadBytes(nItems*type_size);
1481 }
1482
readdatablock(int * fileDescriptor,const char keyphrase[],void * valueArray,int * nItems,const char datatype[],const char iotype[])1483 void readdatablock(
1484 int* fileDescriptor,
1485 const char keyphrase[],
1486 void* valueArray,
1487 int* nItems,
1488 const char datatype[],
1489 const char iotype[] )
1490 {
1491 //if(irank == 0) printf("entering readdatablock()\n");
1492 unsigned long data_size = 0;
1493 double timer_start, timer_end;
1494 startTimer(&timer_start);
1495
1496 int i = *fileDescriptor;
1497 checkFileDescriptor("readdatablock",&i);
1498
1499 if ( PhastaIONextActiveIndex == 0 ) {
1500 int filePtr = *fileDescriptor - 1;
1501 FILE* fileObject;
1502
1503 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) {
1504 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor);
1505 fprintf(stderr,"openfile_ function has to be called before\n") ;
1506 fprintf(stderr,"acessing the file\n ") ;
1507 fprintf(stderr,"fatal error: cannot continue, returning out of call\n");
1508 endTimer(&timer_end);
1509 printPerf("readdatablock", timer_start, timer_end, 0, 0, "");
1510 return;
1511 }
1512
1513 // error check..
1514 // since we require that a consistant header always preceed the data block
1515 // let us check to see that it is actually the case.
1516
1517 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) {
1518 fprintf(stderr, "Header not consistant with data block\n");
1519 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() );
1520 fprintf(stderr, "DataBlock: %s\n ", keyphrase );
1521 fprintf(stderr, "Please recheck read sequence \n");
1522 if( Strict_Error ) {
1523 fprintf(stderr, "fatal error: cannot continue, returning out of call\n");
1524 endTimer(&timer_end);
1525 printPerf("readdatablock", timer_start, timer_end, 0, 0, "");
1526 return;
1527 }
1528 }
1529
1530 if ( LastHeaderNotFound ) {
1531 endTimer(&timer_end);
1532 printPerf("readdatablock", timer_start, timer_end, 0, 0, "");
1533 return;
1534 }
1535 fileObject = fileArray[ filePtr ];
1536 Wrong_Endian = byte_order[ filePtr ];
1537 LastHeaderKey.erase(filePtr);
1538 readDataBlock(fileObject,valueArray,*nItems,datatype,iotype);
1539
1540 //return;
1541 }
1542 else {
1543 // printf("read data block\n");
1544 MPI_Status read_data_status;
1545 size_t type_size = typeSize( datatype );
1546 int nUnits = *nItems;
1547 isBinary( iotype );
1548
1549 // read datablock then
1550 //MR CHANGE
1551 // if ( cscompare ( datatype, "double"))
1552 char* ts2 = StringStripper( datatype );
1553 if ( cscompare ( "double" , ts2))
1554 //MR CHANGE END
1555 {
1556
1557 phastaioTime t0,t1;
1558 phastaio_time(&t0);
1559 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle,
1560 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE,
1561 valueArray,
1562 nUnits,
1563 MPI_DOUBLE );
1564 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle,
1565 valueArray,
1566 &read_data_status );
1567 data_size=8*nUnits;
1568 phastaio_time(&t1);
1569 const size_t elapsed = phastaio_time_diff(&t0,&t1);
1570 phastaio_addReadTime(elapsed);
1571 phastaio_addReadBytes(nUnits*sizeof(double));
1572 }
1573 //MR CHANGE
1574 // else if ( cscompare ( datatype, "integer"))
1575 else if ( cscompare ( "integer" , ts2))
1576 //MR CHANGE END
1577 {
1578 phastaioTime t0,t1;
1579 phastaio_time(&t0);
1580 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle,
1581 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE,
1582 valueArray,
1583 nUnits,
1584 MPI_INT );
1585 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle,
1586 valueArray,
1587 &read_data_status );
1588 data_size=4*nUnits;
1589 phastaio_time(&t1);
1590 const size_t elapsed = phastaio_time_diff(&t0,&t1);
1591 phastaio_addReadTime(elapsed);
1592 phastaio_addReadBytes(nUnits*sizeof(int));
1593 }
1594 else
1595 {
1596 *fileDescriptor = DATA_TYPE_ILLEGAL;
1597 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype);
1598 endTimer(&timer_end);
1599 printPerf("readdatablock", timer_start, timer_end, 0, 0, "");
1600 return;
1601 }
1602 free(ts2);
1603
1604
1605 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank);
1606
1607 // Swap data byte order if endianess is different ...
1608 if ( PhastaIOActiveFiles[i]->Wrong_Endian )
1609 {
1610 SwapArrayByteOrder( valueArray, type_size, nUnits );
1611 }
1612 }
1613
1614 endTimer(&timer_end);
1615 char extra_msg[1024];
1616 memset(extra_msg, '\0', 1024);
1617 char* key = StringStripper(keyphrase);
1618 sprintf(extra_msg, " field is %s ", key);
1619 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg);
1620 free(key);
1621
1622 }
1623
writeHeader(FILE * f,const char keyphrase[],const void * valueArray,const int nItems,const int ndataItems,const char datatype[],const char iotype[])1624 void writeHeader(
1625 FILE* f,
1626 const char keyphrase[],
1627 const void* valueArray,
1628 const int nItems,
1629 const int ndataItems,
1630 const char datatype[],
1631 const char iotype[])
1632 {
1633 isBinary( iotype );
1634
1635 const int _newline =
1636 ( ndataItems > 0 ) ? sizeof( char ) : 0;
1637 int size_of_nextblock =
1638 ( binary_format ) ? typeSize(datatype) * ndataItems + _newline : ndataItems;
1639
1640 fprintf( f, "%s : < %d > ", keyphrase, size_of_nextblock );
1641 for( int i = 0; i < nItems; i++ )
1642 fprintf( f, "%d ", *((int*)((int*)valueArray+i)));
1643 fprintf( f, "\n");
1644 }
1645
writeheader(const int * fileDescriptor,const char keyphrase[],const void * valueArray,const int * nItems,const int * ndataItems,const char datatype[],const char iotype[])1646 void writeheader(
1647 const int* fileDescriptor,
1648 const char keyphrase[],
1649 const void* valueArray,
1650 const int* nItems,
1651 const int* ndataItems,
1652 const char datatype[],
1653 const char iotype[])
1654 {
1655
1656 //if(irank == 0) printf("entering writeheader()\n");
1657
1658 double timer_start, timer_end;
1659 startTimer(&timer_start);
1660
1661 int i = *fileDescriptor;
1662 checkFileDescriptor("writeheader",&i);
1663
1664 if ( PhastaIONextActiveIndex == 0 ) {
1665 int filePtr = *fileDescriptor - 1;
1666 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) {
1667 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor);
1668 fprintf(stderr,"openfile_ function has to be called before \n") ;
1669 fprintf(stderr,"acessing the file\n ") ;
1670 fprintf(stderr,"fatal error: cannot continue, returning out of call\n");
1671 endTimer(&timer_end);
1672 printPerf("writeheader", timer_start, timer_end, 0, 0, "");
1673 return;
1674 }
1675
1676 LastHeaderKey[filePtr] = std::string(keyphrase);
1677 DataSize = *ndataItems;
1678 FILE* fileObject = fileArray[ filePtr ] ;
1679 header_type[ filePtr ] = typeSize( datatype );
1680 writeHeader(fileObject,keyphrase,valueArray,*nItems,
1681 *ndataItems,datatype,iotype);
1682 }
1683 else { // else it's parallel I/O
1684 DataSize = *ndataItems;
1685 size_t type_size = typeSize( datatype );
1686 isBinary( iotype );
1687 char mpi_tag[MAX_FIELDS_NAME_LENGTH];
1688
1689 int string_length = strlen( keyphrase );
1690 char* buffer = (char*) malloc ( string_length+1 );
1691
1692 strcpy ( buffer, keyphrase);
1693 buffer[ string_length ] = '\0';
1694
1695 char* st2 = strtok ( buffer, "@" );
1696 st2 = strtok (NULL, "@");
1697 PhastaIOActiveFiles[i]->GPid = atoi(st2);
1698
1699 if ( char* p = strpbrk(buffer, "@") )
1700 *p = '\0';
1701 assert(PhastaIOActiveFiles[i]->field_count < MAX_FIELDS_NUMBER);
1702 assert(PhastaIOActiveFiles[i]->part_count < PhastaIOActiveFiles[i]->nppp);
1703 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH);
1704 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count);
1705 unsigned long offset_value;
1706
1707 int temp = *ndataItems;
1708 unsigned long number_of_items = (unsigned long)temp;
1709 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm);
1710
1711 MPI_Scan( &number_of_items,
1712 &offset_value,
1713 1,
1714 MPI_LONG_LONG_INT,
1715 MPI_SUM,
1716 PhastaIOActiveFiles[i]->local_comm );
1717
1718 offset_value = (offset_value - number_of_items) * type_size;
1719
1720 offset_value += PhastaIOActiveFiles[i]->local_myrank *
1721 DB_HEADER_SIZE +
1722 PhastaIOActiveFiles[i]->next_start_address;
1723 // This offset is the starting address of each datablock header...
1724 PhastaIOActiveFiles[i]->my_offset = offset_value;
1725
1726 // Write in my offset table ...
1727 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = PhastaIOActiveFiles[i]->my_offset;
1728
1729 // Update the next-start-address ...
1730 PhastaIOActiveFiles[i]->next_start_address = offset_value +
1731 number_of_items * type_size +
1732 DB_HEADER_SIZE;
1733 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address),
1734 1,
1735 MPI_LONG_LONG_INT,
1736 PhastaIOActiveFiles[i]->local_numprocs-1,
1737 PhastaIOActiveFiles[i]->local_comm );
1738
1739 // Prepare datablock header ...
1740 int _newline = (*ndataItems>0)?sizeof(char):0;
1741 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline;
1742
1743 //char datablock_header[255];
1744 //bzero((void*)datablock_header,255);
1745 char datablock_header[DB_HEADER_SIZE];
1746 bzero((void*)datablock_header,DB_HEADER_SIZE);
1747
1748 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count;
1749 sprintf( datablock_header,
1750 "\n%s : < %u >",
1751 keyphrase,
1752 size_of_nextblock );
1753
1754 for ( int j = 0; j < *nItems; j++ )
1755 {
1756 sprintf( datablock_header,
1757 "%s %d ",
1758 datablock_header,
1759 *((int*)((int*)valueArray+j)));
1760 }
1761 sprintf( datablock_header,
1762 "%s\n ",
1763 datablock_header );
1764
1765 // Write datablock header ...
1766 //MR CHANGE
1767 // if ( cscompare(datatype,"double") )
1768 char* ts1 = StringStripper( datatype );
1769 if ( cscompare("double",ts1) )
1770 //MR CHANGE END
1771 {
1772 free ( PhastaIOActiveFiles[i]->double_chunk );
1773 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE));
1774
1775 double * aa = ( double * )datablock_header;
1776 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE);
1777 }
1778 //MR CHANGE
1779 // if ( cscompare(datatype,"integer") )
1780 else if ( cscompare("integer",ts1) )
1781 //MR CHANGE END
1782 {
1783 free ( PhastaIOActiveFiles[i]->int_chunk );
1784 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE));
1785
1786 int * aa = ( int * )datablock_header;
1787 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE);
1788 }
1789 else {
1790 // *fileDescriptor = DATA_TYPE_ILLEGAL;
1791 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype);
1792 endTimer(&timer_end);
1793 printPerf("writeheader", timer_start, timer_end, 0, 0, "");
1794 return;
1795 }
1796 free(ts1);
1797
1798 PhastaIOActiveFiles[i]->part_count++;
1799 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) {
1800 //A new field will be written
1801 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) {
1802 memcpy( PhastaIOActiveFiles[i]->master_header +
1803 PhastaIOActiveFiles[i]->field_count *
1804 MAX_FIELDS_NAME_LENGTH +
1805 MAX_FIELDS_NAME_LENGTH * 2,
1806 mpi_tag,
1807 MAX_FIELDS_NAME_LENGTH-1);
1808 }
1809 PhastaIOActiveFiles[i]->field_count++;
1810 PhastaIOActiveFiles[i]->part_count=0;
1811 }
1812 free(buffer);
1813 }
1814
1815 endTimer(&timer_end);
1816 printPerf("writeheader", timer_start, timer_end, 0, 0, "");
1817 }
1818
writeDataBlock(FILE * f,const void * valueArray,const int nItems,const char datatype[],const char iotype[])1819 void writeDataBlock(
1820 FILE* f,
1821 const void* valueArray,
1822 const int nItems,
1823 const char datatype[],
1824 const char iotype[] )
1825 {
1826 isBinary( iotype );
1827 size_t type_size = typeSize( datatype );
1828 phastaioTime t0,t1;
1829 phastaio_time(&t0);
1830 if ( binary_format ) {
1831 fwrite( valueArray, type_size, nItems, f );
1832 fprintf( f,"\n");
1833 } else {
1834 char* ts1 = StringStripper( datatype );
1835 if ( cscompare( "integer", ts1 ) ) {
1836 const int* vals = (int*) valueArray;
1837 for( int n=0; n < nItems ; n++ )
1838 fprintf(f,"%d\n",vals[n]);
1839 } else if ( cscompare( "double", ts1 ) ) {
1840 const double* vals = (double*) valueArray;
1841 for( int n=0; n < nItems ; n++ )
1842 fprintf(f,"%f\n",vals[n]);
1843 }
1844 free (ts1);
1845 }
1846 phastaio_time(&t1);
1847 const size_t elapsed = phastaio_time_diff(&t0,&t1);
1848 phastaio_addWriteTime(elapsed);
1849 phastaio_addWriteBytes(nItems*type_size);
1850 }
1851
writedatablock(const int * fileDescriptor,const char keyphrase[],const void * valueArray,const int * nItems,const char datatype[],const char iotype[])1852 void writedatablock(
1853 const int* fileDescriptor,
1854 const char keyphrase[],
1855 const void* valueArray,
1856 const int* nItems,
1857 const char datatype[],
1858 const char iotype[] )
1859 {
1860 //if(irank == 0) printf("entering writedatablock()\n");
1861
1862 unsigned long data_size = 0;
1863 double timer_start, timer_end;
1864 startTimer(&timer_start);
1865
1866 int i = *fileDescriptor;
1867 checkFileDescriptor("writedatablock",&i);
1868
1869 if ( PhastaIONextActiveIndex == 0 ) {
1870 int filePtr = *fileDescriptor - 1;
1871
1872 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) {
1873 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor);
1874 fprintf(stderr,"openfile_ function has to be called before \n") ;
1875 fprintf(stderr,"acessing the file\n ") ;
1876 fprintf(stderr,"fatal error: cannot continue, returning out of call\n");
1877 endTimer(&timer_end);
1878 printPerf("writedatablock", timer_start, timer_end, 0, 0, "");
1879 return;
1880 }
1881 // since we require that a consistant header always preceed the data block
1882 // let us check to see that it is actually the case.
1883
1884 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) {
1885 fprintf(stderr, "Header not consistant with data block\n");
1886 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() );
1887 fprintf(stderr, "DataBlock: %s\n ", keyphrase );
1888 fprintf(stderr, "Please recheck write sequence \n");
1889 if( Strict_Error ) {
1890 fprintf(stderr, "fatal error: cannot continue, returning out of call\n");
1891 endTimer(&timer_end);
1892 printPerf("writedatablock", timer_start, timer_end, 0, 0, "");
1893 return;
1894 }
1895 }
1896
1897 FILE* fileObject = fileArray[ filePtr ] ;
1898 size_t type_size=typeSize( datatype );
1899 isBinary( iotype );
1900
1901 LastHeaderKey.erase(filePtr);
1902
1903 if ( header_type[filePtr] != (int)type_size ) {
1904 fprintf(stderr,"header and datablock differ on typeof data in the block for\n");
1905 fprintf(stderr,"keyphrase : %s\n", keyphrase);
1906 if( Strict_Error ) {
1907 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" );
1908 endTimer(&timer_end);
1909 printPerf("writedatablock", timer_start, timer_end, 0, 0, "");
1910 return;
1911 }
1912 }
1913
1914 int nUnits = *nItems;
1915
1916 if ( nUnits != DataSize ) {
1917 fprintf(stderr,"header and datablock differ on number of data items for\n");
1918 fprintf(stderr,"keyphrase : %s\n", keyphrase);
1919 if( Strict_Error ) {
1920 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" );
1921 endTimer(&timer_end);
1922 printPerf("writedatablock", timer_start, timer_end, 0, 0, "");
1923 return;
1924 }
1925 }
1926 writeDataBlock(fileObject,valueArray,*nItems,datatype,iotype);
1927 }
1928 else { // syncIO case
1929 MPI_Status write_data_status;
1930 isBinary( iotype );
1931 int nUnits = *nItems;
1932
1933 //MR CHANGE
1934 // if ( cscompare(datatype,"double") )
1935 char* ts1 = StringStripper( datatype );
1936 if ( cscompare("double",ts1) )
1937 //MR CHANGE END
1938 {
1939 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double));
1940 phastaioTime t0,t1;
1941 phastaio_time(&t0);
1942 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle,
1943 PhastaIOActiveFiles[i]->my_offset,
1944 PhastaIOActiveFiles[i]->double_chunk,
1945 //BLOCK_SIZE/sizeof(double),
1946 nUnits+DB_HEADER_SIZE/sizeof(double),
1947 MPI_DOUBLE );
1948 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle,
1949 PhastaIOActiveFiles[i]->double_chunk,
1950 &write_data_status );
1951 data_size=8*nUnits;
1952 phastaio_time(&t1);
1953 const size_t elapsed = phastaio_time_diff(&t0,&t1);
1954 phastaio_addWriteTime(elapsed);
1955 phastaio_addWriteBytes((nUnits*sizeof(double))+DB_HEADER_SIZE);
1956 }
1957 //MR CHANGE
1958 // else if ( cscompare ( datatype, "integer"))
1959 else if ( cscompare("integer",ts1) )
1960 //MR CHANGE END
1961 {
1962 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int));
1963 phastaioTime t0,t1;
1964 phastaio_time(&t0);
1965 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle,
1966 PhastaIOActiveFiles[i]->my_offset,
1967 PhastaIOActiveFiles[i]->int_chunk,
1968 nUnits+DB_HEADER_SIZE/sizeof(int),
1969 MPI_INT );
1970 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle,
1971 PhastaIOActiveFiles[i]->int_chunk,
1972 &write_data_status );
1973 data_size=4*nUnits;
1974 phastaio_time(&t1);
1975 const size_t elapsed = phastaio_time_diff(&t0,&t1);
1976 phastaio_addWriteTime(elapsed);
1977 phastaio_addWriteBytes((nUnits*sizeof(int))+DB_HEADER_SIZE);
1978 }
1979 else {
1980 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype);
1981 endTimer(&timer_end);
1982 printPerf("writedatablock", timer_start, timer_end, 0, 0, "");
1983 return;
1984 }
1985 free(ts1);
1986 }
1987
1988 endTimer(&timer_end);
1989 char extra_msg[1024];
1990 memset(extra_msg, '\0', 1024);
1991 char* key = StringStripper(keyphrase);
1992 sprintf(extra_msg, " field is %s ", key);
1993 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg);
1994 free(key);
1995
1996 }
1997
SwapArrayByteOrder(void * array,int nbytes,int nItems)1998 void SwapArrayByteOrder( void* array, int nbytes, int nItems )
1999 {
2000 /* This swaps the byte order for the array of nItems each
2001 of size nbytes , This will be called only locally */
2002 int i,j;
2003 unsigned char* ucDst = (unsigned char*)array;
2004 for(i=0; i < nItems; i++) {
2005 for(j=0; j < (nbytes/2); j++)
2006 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] );
2007 ucDst += nbytes;
2008 }
2009 }
2010
writestring(int * fileDescriptor,const char inString[])2011 void writestring( int* fileDescriptor, const char inString[] )
2012 {
2013 int filePtr = *fileDescriptor - 1;
2014 FILE* fileObject = fileArray[filePtr] ;
2015 fprintf(fileObject,"%s",inString );
2016 return;
2017 }
2018
Gather_Headers(int * fileDescriptor,vector<string> & headers)2019 void Gather_Headers( int* fileDescriptor, vector< string >& headers )
2020 {
2021 FILE* fileObject;
2022 char Line[1024];
2023
2024 fileObject = fileArray[ (*fileDescriptor)-1 ];
2025
2026 while( !feof(fileObject) ) {
2027 fgets( Line, 1024, fileObject);
2028 if ( Line[0] == '#' ) {
2029 headers.push_back( Line );
2030 } else {
2031 break;
2032 }
2033 }
2034 rewind( fileObject );
2035 clearerr( fileObject );
2036 }
2037
isWrong(void)2038 void isWrong( void ) {
2039 (Wrong_Endian) ? fprintf(stdout,"YES\n") : fprintf(stdout,"NO\n");
2040 }
2041
togglestrictmode(void)2042 void togglestrictmode( void ) { Strict_Error = !Strict_Error; }
2043
isLittleEndian(void)2044 int isLittleEndian( void )
2045 {
2046 // this function returns a 1 if the current running architecture is
2047 // LittleEndian Byte Ordered, else it returns a zero
2048 union {
2049 long a;
2050 char c[sizeof( long )];
2051 } endianUnion;
2052 endianUnion.a = 1 ;
2053 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ;
2054 else return 0;
2055 }
2056
2057 namespace PHASTA {
2058 const char* const PhastaIO_traits<int>::type_string = "integer";
2059 const char* const PhastaIO_traits<double>::type_string = "double";
2060 }
2061