xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 94a424c13da5ee4d79d354b02fcfacbd9f84bab2)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.35 1996/03/19 21:25:49 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int ierr;
83 
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   return 0;
86 }
87 
88 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
89 {
90   return 0;
91 }
92 
93 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
94 {
95   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
96   MPI_Comm     comm = mat->comm;
97   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
98   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
99   int          tag = mat->tag, *owner,*starts,count,ierr;
100   InsertMode   addv;
101   MPI_Request  *send_waits,*recv_waits;
102   Scalar       *rvalues,*svalues;
103 
104   /* make sure all processors are either in INSERTMODE or ADDMODE */
105   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
106   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
107     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
108     }
109   mdn->insertmode = addv; /* in case this processor had no cache */
110 
111   /*  first count number of contributors to each processor */
112   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
113   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
114   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
115   for ( i=0; i<mdn->stash.n; i++ ) {
116     idx = mdn->stash.idx[i];
117     for ( j=0; j<size; j++ ) {
118       if (idx >= owners[j] && idx < owners[j+1]) {
119         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
120       }
121     }
122   }
123   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
124 
125   /* inform other processors of number of messages and max length*/
126   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
127   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
128   nreceives = work[rank];
129   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
130   nmax = work[rank];
131   PetscFree(work);
132 
133   /* post receives:
134        1) each message will consist of ordered pairs
135      (global index,value) we store the global index as a double
136      to simplify the message passing.
137        2) since we don't know how long each individual message is we
138      allocate the largest needed buffer for each receive. Potentially
139      this is a lot of wasted space.
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<mdn->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
167   }
168   PetscFree(owner);
169   starts[0] = 0;
170   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<size; i++ ) {
173     if (procs[i]) {
174       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PetscFree(starts); PetscFree(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
182 
183   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
184   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
185   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
186   mdn->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIDense(Mat);
191 
192 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
193 {
194   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
195   MPI_Status   *send_status,recv_status;
196   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
197   Scalar       *values,val;
198   InsertMode   addv = mdn->insertmode;
199 
200   /*  wait on receives */
201   while (count) {
202     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
203     /* unpack receives into our local space */
204     values = mdn->rvalues + 3*imdex*mdn->rmax;
205     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
206     n = n/3;
207     for ( i=0; i<n; i++ ) {
208       row = (int) PetscReal(values[3*i]) - mdn->rstart;
209       col = (int) PetscReal(values[3*i+1]);
210       val = values[3*i+2];
211       if (col >= 0 && col < mdn->N) {
212         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
213       }
214       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
215     }
216     count--;
217   }
218   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
219 
220   /* wait on sends */
221   if (mdn->nsends) {
222     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
223     CHKPTRQ(send_status);
224     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
225     PetscFree(send_status);
226   }
227   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
228 
229   mdn->insertmode = NOT_SET_VALUES;
230   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
231   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
232 
233   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
234     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
235   }
236   return 0;
237 }
238 
239 static int MatZeroEntries_MPIDense(Mat A)
240 {
241   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
242   return MatZeroEntries(l->A);
243 }
244 
245 /* the code does not do the diagonal entries correctly unless the
246    matrix is square and the column and row owerships are identical.
247    This is a BUG. The only way to fix it seems to be to access
248    mdn->A and mdn->B directly and not through the MatZeroRows()
249    routine.
250 */
251 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
252 {
253   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
254   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
255   int            *procs,*nprocs,j,found,idx,nsends,*work;
256   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
257   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
258   int            *lens,imdex,*lrows,*values;
259   MPI_Comm       comm = A->comm;
260   MPI_Request    *send_waits,*recv_waits;
261   MPI_Status     recv_status,*send_status;
262   IS             istmp;
263 
264   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
265   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
266 
267   /*  first count number of contributors to each processor */
268   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
269   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
270   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
271   for ( i=0; i<N; i++ ) {
272     idx = rows[i];
273     found = 0;
274     for ( j=0; j<size; j++ ) {
275       if (idx >= owners[j] && idx < owners[j+1]) {
276         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
277       }
278     }
279     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
280   }
281   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
282 
283   /* inform other processors of number of messages and max length*/
284   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
285   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
286   nrecvs = work[rank];
287   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
288   nmax = work[rank];
289   PetscFree(work);
290 
291   /* post receives:   */
292   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
293   CHKPTRQ(rvalues);
294   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
295   CHKPTRQ(recv_waits);
296   for ( i=0; i<nrecvs; i++ ) {
297     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
298   }
299 
300   /* do sends:
301       1) starts[i] gives the starting index in svalues for stuff going to
302          the ith processor
303   */
304   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
305   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
306   CHKPTRQ(send_waits);
307   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0] = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
364     CHKPTRQ(send_status);
365     MPI_Waitall(nsends,send_waits,send_status);
366     PetscFree(send_status);
367   }
368   PetscFree(send_waits); PetscFree(svalues);
369 
370   return 0;
371 }
372 
373 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
374 {
375   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
376   int          ierr;
377 
378   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
379   CHKERRQ(ierr);
380   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
381   CHKERRQ(ierr);
382   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
383   return 0;
384 }
385 
386 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
387 {
388   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
389   int          ierr;
390 
391   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
392   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
393   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
394   return 0;
395 }
396 
397 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
398 {
399   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
400   int          ierr;
401   Scalar       zero = 0.0;
402 
403   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
404   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
406          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
407   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   return 0;
410 }
411 
412 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   int          ierr;
416 
417   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
418   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
419   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
420          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
421   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
422          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
423   return 0;
424 }
425 
426 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
427 {
428   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
429   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
430   int          ierr, i, n, m = a->m, radd;
431   Scalar       *x;
432 
433   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
434   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
435   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
436   radd = a->rstart*m*m;
437   for ( i=0; i<m; i++ ) {
438     x[i] = aloc->v[radd + i*m + i];
439   }
440   return 0;
441 }
442 
443 static int MatDestroy_MPIDense(PetscObject obj)
444 {
445   Mat          mat = (Mat) obj;
446   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
447   int          ierr;
448 
449 #if defined(PETSC_LOG)
450   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
451 #endif
452   PetscFree(mdn->rowners);
453   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
454   if (mdn->lvec)   VecDestroy(mdn->lvec);
455   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
456   if (mdn->factor) {
457     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
458     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
459     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
460     PetscFree(mdn->factor);
461   }
462   PetscFree(mdn);
463   PLogObjectDestroy(mat);
464   PetscHeaderDestroy(mat);
465   return 0;
466 }
467 
468 #include "pinclude/pviewer.h"
469 
470 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
471 {
472   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
473   int          ierr;
474   if (mdn->size == 1) {
475     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
476   }
477   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
478   return 0;
479 }
480 
481 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
482 {
483   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
484   int          ierr, format;
485   FILE         *fd;
486   ViewerType   vtype;
487 
488   ViewerGetType(viewer,&vtype);
489   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
490   ierr = ViewerGetFormat(viewer,&format);
491   if (format == ASCII_FORMAT_INFO_DETAILED) {
492     int nz, nzalloc, mem, rank;
493     MPI_Comm_rank(mat->comm,&rank);
494     ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
495     PetscSequentialPhaseBegin(mat->comm,1);
496       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
497           rank,mdn->m,nz,nzalloc,mem);
498       fflush(fd);
499     PetscSequentialPhaseEnd(mat->comm,1);
500     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
501     return 0;
502   }
503   else if (format == ASCII_FORMAT_INFO) {
504     return 0;
505   }
506   if (vtype == ASCII_FILE_VIEWER) {
507     PetscSequentialPhaseBegin(mat->comm,1);
508     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
509              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
510     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
511     fflush(fd);
512     PetscSequentialPhaseEnd(mat->comm,1);
513   }
514   else {
515     int size = mdn->size, rank = mdn->rank;
516     if (size == 1) {
517       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
518     }
519     else {
520       /* assemble the entire matrix onto first processor. */
521       Mat          A;
522       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
523       Scalar       *vals;
524       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
525 
526       if (!rank) {
527         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
528       }
529       else {
530         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
531       }
532       PLogObjectParent(mat,A);
533 
534       /* Copy the matrix ... This isn't the most efficient means,
535          but it's quick for now */
536       row = mdn->rstart; m = Amdn->m;
537       for ( i=0; i<m; i++ ) {
538         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
539         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
540         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
541         row++;
542       }
543 
544       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
545       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
546       if (!rank) {
547         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
548       }
549       ierr = MatDestroy(A); CHKERRQ(ierr);
550     }
551   }
552   return 0;
553 }
554 
555 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
556 {
557   Mat          mat = (Mat) obj;
558   int          ierr;
559   ViewerType   vtype;
560 
561   if (!viewer) {
562     viewer = STDOUT_VIEWER_SELF;
563   }
564   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
565   if (vtype == ASCII_FILE_VIEWER) {
566     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
567   }
568   else if (vtype == ASCII_FILES_VIEWER) {
569     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
570   }
571   else if (vtype == BINARY_FILE_VIEWER) {
572     return MatView_MPIDense_Binary(mat,viewer);
573   }
574   return 0;
575 }
576 
577 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
578                              int *nzalloc,int *mem)
579 {
580   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
581   Mat          mdn = mat->A;
582   int          ierr, isend[3], irecv[3];
583 
584   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
585   if (flag == MAT_LOCAL) {
586     if (nz)      *nz      = isend[0];
587     if (nzalloc) *nzalloc = isend[1];
588     if (mem)     *mem     = isend[2];
589   } else if (flag == MAT_GLOBAL_MAX) {
590     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
591     if (nz)      *nz      = irecv[0];
592     if (nzalloc) *nzalloc = irecv[1];
593     if (mem)     *mem     = irecv[2];
594   } else if (flag == MAT_GLOBAL_SUM) {
595     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
596     if (nz)      *nz      = irecv[0];
597     if (nzalloc) *nzalloc = irecv[1];
598     if (mem)     *mem     = irecv[2];
599   }
600   return 0;
601 }
602 
603 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
604    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
605    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
606    extern int MatSolve_MPIDense(Mat,Vec,Vec);
607    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
608    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
609    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
610 
611 static int MatSetOption_MPIDense(Mat A,MatOption op)
612 {
613   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
614 
615   if (op == NO_NEW_NONZERO_LOCATIONS ||
616       op == YES_NEW_NONZERO_LOCATIONS ||
617       op == COLUMNS_SORTED ||
618       op == ROW_ORIENTED) {
619         MatSetOption(a->A,op);
620   }
621   else if (op == ROWS_SORTED ||
622            op == SYMMETRIC_MATRIX ||
623            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
624            op == YES_NEW_DIAGONALS)
625     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
626   else if (op == COLUMN_ORIENTED)
627     { a->roworiented = 0; MatSetOption(a->A,op);}
628   else if (op == NO_NEW_DIAGONALS)
629     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
630   else
631     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
632   return 0;
633 }
634 
635 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
636 {
637   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
638   *m = mat->M; *n = mat->N;
639   return 0;
640 }
641 
642 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
643 {
644   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
645   *m = mat->m; *n = mat->N;
646   return 0;
647 }
648 
649 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
650 {
651   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
652   *m = mat->rstart; *n = mat->rend;
653   return 0;
654 }
655 
656 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
657 {
658   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
659   int          lrow, rstart = mat->rstart, rend = mat->rend;
660 
661   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
662   lrow = row - rstart;
663   return MatGetRow(mat->A,lrow,nz,idx,v);
664 }
665 
666 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
667 {
668   if (idx) PetscFree(*idx);
669   if (v) PetscFree(*v);
670   return 0;
671 }
672 
673 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
674 {
675   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
676   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
677   int          ierr, i, j;
678   double       sum = 0.0;
679   Scalar       *v = mat->v;
680 
681   if (mdn->size == 1) {
682     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
683   } else {
684     if (type == NORM_FROBENIUS) {
685       for (i=0; i<mat->n*mat->m; i++ ) {
686 #if defined(PETSC_COMPLEX)
687         sum += real(conj(*v)*(*v)); v++;
688 #else
689         sum += (*v)*(*v); v++;
690 #endif
691       }
692       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
693       *norm = sqrt(*norm);
694       PLogFlops(2*mat->n*mat->m);
695     }
696     else if (type == NORM_1) {
697       double *tmp, *tmp2;
698       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
699       tmp2 = tmp + mdn->N;
700       PetscMemzero(tmp,2*mdn->N*sizeof(double));
701       *norm = 0.0;
702       v = mat->v;
703       for ( j=0; j<mat->n; j++ ) {
704         for ( i=0; i<mat->m; i++ ) {
705           tmp[j] += PetscAbsScalar(*v);  v++;
706         }
707       }
708       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
709       for ( j=0; j<mdn->N; j++ ) {
710         if (tmp2[j] > *norm) *norm = tmp2[j];
711       }
712       PetscFree(tmp);
713       PLogFlops(mat->n*mat->m);
714     }
715     else if (type == NORM_INFINITY) { /* max row norm */
716       double ntemp;
717       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
718       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
719     }
720     else {
721       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
722     }
723   }
724   return 0;
725 }
726 
727 static int MatTranspose_MPIDense(Mat A,Mat *matout)
728 {
729   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
730   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
731   Mat          B;
732   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
733   int          j, i, ierr;
734   Scalar       *v;
735 
736   if (matout == PETSC_NULL && M != N)
737     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
738   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
739          CHKERRQ(ierr);
740 
741   m = Aloc->m; n = Aloc->n; v = Aloc->v;
742   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
743   for ( j=0; j<n; j++ ) {
744     for (i=0; i<m; i++) rwork[i] = rstart + i;
745     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
746     v += m;
747   }
748   PetscFree(rwork);
749   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
750   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
751   if (matout != PETSC_NULL) {
752     *matout = B;
753   } else {
754     /* This isn't really an in-place transpose, but free data struct from a */
755     PetscFree(a->rowners);
756     ierr = MatDestroy(a->A); CHKERRQ(ierr);
757     if (a->lvec) VecDestroy(a->lvec);
758     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
759     PetscFree(a);
760     PetscMemcpy(A,B,sizeof(struct _Mat));
761     PetscHeaderDestroy(B);
762   }
763   return 0;
764 }
765 
766 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
767 
768 /* -------------------------------------------------------------------*/
769 static struct _MatOps MatOps = {MatSetValues_MPIDense,
770        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
771        MatMult_MPIDense,MatMultAdd_MPIDense,
772        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
773 /*       MatSolve_MPIDense,0, */
774        0,0,
775        0,0,
776        0,0,
777 /*       MatLUFactor_MPIDense,0, */
778        0,MatTranspose_MPIDense,
779        MatGetInfo_MPIDense,0,
780        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
781        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
782        0,
783        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
784        0,0,0,
785 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
786        0,0,
787        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
788        MatGetOwnershipRange_MPIDense,
789        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
790        0,0,0,MatConvertSameType_MPIDense,
791        0,0,0,0,0,
792        0,0,MatGetValues_MPIDense};
793 
794 /*@C
795    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
796 
797    Input Parameters:
798 .  comm - MPI communicator
799 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
800 .  n - number of local columns (or PETSC_DECIDE to have calculated
801            if N is given)
802 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
803 .  N - number of global columns (or PETSC_DECIDE to have calculated
804            if n is given)
805 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
806    to control all matrix memory allocation.
807 
808    Output Parameter:
809 .  newmat - the matrix
810 
811    Notes:
812    The dense format is fully compatible with standard Fortran 77
813    storage by columns.
814 
815    The data input variable is intended primarily for Fortran programmers
816    who wish to allocate their own matrix memory space.  Most users should
817    set data=PETSC_NULL.
818 
819    The user MUST specify either the local or global matrix dimensions
820    (possibly both).
821 
822    Currently, the only parallel dense matrix decomposition is by rows,
823    so that n=N and each submatrix owns all of the global columns.
824 
825 .keywords: matrix, dense, parallel
826 
827 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
828 @*/
829 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
830 {
831   Mat          mat;
832   Mat_MPIDense *a;
833   int          ierr, i,flg;
834 
835 /* Note:  For now, when data is specified above, this assumes the user correctly
836    allocates the local dense storage space.  We should add error checking. */
837 
838   *newmat         = 0;
839   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
840   PLogObjectCreate(mat);
841   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
842   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
843   mat->destroy    = MatDestroy_MPIDense;
844   mat->view       = MatView_MPIDense;
845   mat->factor     = 0;
846 
847   a->factor = 0;
848   a->insertmode = NOT_SET_VALUES;
849   MPI_Comm_rank(comm,&a->rank);
850   MPI_Comm_size(comm,&a->size);
851 
852   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
853   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
854 
855   /* each row stores all columns */
856   if (N == PETSC_DECIDE) N = n;
857   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
858   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
859   a->N = N;
860   a->M = M;
861   a->m = m;
862   a->n = n;
863 
864   /* build local table of row and column ownerships */
865   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
866   a->cowners = a->rowners + a->size + 1;
867   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
868                        sizeof(Mat_MPIDense));
869   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
870   a->rowners[0] = 0;
871   for ( i=2; i<=a->size; i++ ) {
872     a->rowners[i] += a->rowners[i-1];
873   }
874   a->rstart = a->rowners[a->rank];
875   a->rend   = a->rowners[a->rank+1];
876   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
877   a->cowners[0] = 0;
878   for ( i=2; i<=a->size; i++ ) {
879     a->cowners[i] += a->cowners[i-1];
880   }
881 
882   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
883   PLogObjectParent(mat,a->A);
884 
885   /* build cache for off array entries formed */
886   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
887 
888   /* stuff used for matrix vector multiply */
889   a->lvec        = 0;
890   a->Mvctx       = 0;
891   a->roworiented = 1;
892 
893   *newmat = mat;
894   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
895   if (flg) {
896     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
897   }
898   return 0;
899 }
900 
901 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
902 {
903   Mat          mat;
904   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
905   int          ierr;
906   FactorCtx    *factor;
907 
908   *newmat       = 0;
909   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
910   PLogObjectCreate(mat);
911   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
912   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
913   mat->destroy  = MatDestroy_MPIDense;
914   mat->view     = MatView_MPIDense;
915   mat->factor   = A->factor;
916   mat->assembled  = PETSC_TRUE;
917 
918   a->m          = oldmat->m;
919   a->n          = oldmat->n;
920   a->M          = oldmat->M;
921   a->N          = oldmat->N;
922   if (oldmat->factor) {
923     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
924     /* copy factor contents ... add this code! */
925   } else a->factor = 0;
926 
927   a->rstart     = oldmat->rstart;
928   a->rend       = oldmat->rend;
929   a->size       = oldmat->size;
930   a->rank       = oldmat->rank;
931   a->insertmode = NOT_SET_VALUES;
932 
933   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
934   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
935   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
936   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
937 
938   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
939   PLogObjectParent(mat,a->lvec);
940   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
941   PLogObjectParent(mat,a->Mvctx);
942   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
943   PLogObjectParent(mat,a->A);
944   *newmat = mat;
945   return 0;
946 }
947 
948 #include "sys.h"
949 
950 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
951 {
952   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
953   Scalar     *array,*vals,*vals_ptr;
954   MPI_Status status;
955 
956   MPI_Comm_rank(comm,&rank);
957   MPI_Comm_size(comm,&size);
958 
959   /* determine ownership of all rows */
960   m = M/size + ((M % size) > rank);
961   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
962   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
963   rowners[0] = 0;
964   for ( i=2; i<=size; i++ ) {
965     rowners[i] += rowners[i-1];
966   }
967   rstart = rowners[rank];
968   rend   = rowners[rank+1];
969 
970   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
971   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
972 
973   if (!rank) {
974     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
975 
976     /* read in my part of the matrix numerical values  */
977     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
978 
979     /* insert into matrix-by row (this is why cannot directly read into array */
980     vals_ptr = vals;
981     for ( i=0; i<m; i++ ) {
982       for ( j=0; j<N; j++ ) {
983         array[i + j*m] = *vals_ptr++;
984       }
985     }
986 
987     /* read in other processors and ship out */
988     for ( i=1; i<size; i++ ) {
989       nz   = (rowners[i+1] - rowners[i])*N;
990       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
991       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
992     }
993   }
994   else {
995     /* receive numeric values */
996     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
997 
998     /* receive message of values*/
999     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1000 
1001     /* insert into matrix-by row (this is why cannot directly read into array */
1002     vals_ptr = vals;
1003     for ( i=0; i<m; i++ ) {
1004       for ( j=0; j<N; j++ ) {
1005         array[i + j*m] = *vals_ptr++;
1006       }
1007     }
1008   }
1009   PetscFree(rowners);
1010   PetscFree(vals);
1011   ierr = MatAssemblyBegin(*newmat,FINAL_ASSEMBLY); CHKERRQ(ierr);
1012   ierr = MatAssemblyEnd(*newmat,FINAL_ASSEMBLY); CHKERRQ(ierr);
1013   return 0;
1014 }
1015 
1016 
1017 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1018 {
1019   Mat          A;
1020   int          i, nz, ierr, j,rstart, rend, fd;
1021   Scalar       *vals,*svals;
1022   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1023   MPI_Status   status;
1024   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1025   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1026   int          tag = ((PetscObject)viewer)->tag;
1027 
1028   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1029   if (!rank) {
1030     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1031     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1032     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1033   }
1034 
1035   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1036   M = header[1]; N = header[2]; nz = header[3];
1037 
1038   /*
1039        Handle case where matrix is stored on disk as a dense matrix
1040   */
1041   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1042     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1043   }
1044 
1045   /* determine ownership of all rows */
1046   m = M/size + ((M % size) > rank);
1047   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1048   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1049   rowners[0] = 0;
1050   for ( i=2; i<=size; i++ ) {
1051     rowners[i] += rowners[i-1];
1052   }
1053   rstart = rowners[rank];
1054   rend   = rowners[rank+1];
1055 
1056   /* distribute row lengths to all processors */
1057   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1058   offlens = ourlens + (rend-rstart);
1059   if (!rank) {
1060     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1061     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1062     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1063     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1064     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1065     PetscFree(sndcounts);
1066   }
1067   else {
1068     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1069   }
1070 
1071   if (!rank) {
1072     /* calculate the number of nonzeros on each processor */
1073     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1074     PetscMemzero(procsnz,size*sizeof(int));
1075     for ( i=0; i<size; i++ ) {
1076       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1077         procsnz[i] += rowlengths[j];
1078       }
1079     }
1080     PetscFree(rowlengths);
1081 
1082     /* determine max buffer needed and allocate it */
1083     maxnz = 0;
1084     for ( i=0; i<size; i++ ) {
1085       maxnz = PetscMax(maxnz,procsnz[i]);
1086     }
1087     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1088 
1089     /* read in my part of the matrix column indices  */
1090     nz = procsnz[0];
1091     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1092     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1093 
1094     /* read in every one elses and ship off */
1095     for ( i=1; i<size; i++ ) {
1096       nz = procsnz[i];
1097       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1098       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1099     }
1100     PetscFree(cols);
1101   }
1102   else {
1103     /* determine buffer space needed for message */
1104     nz = 0;
1105     for ( i=0; i<m; i++ ) {
1106       nz += ourlens[i];
1107     }
1108     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1109 
1110     /* receive message of column indices*/
1111     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1112     MPI_Get_count(&status,MPI_INT,&maxnz);
1113     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1114   }
1115 
1116   /* loop over local rows, determining number of off diagonal entries */
1117   PetscMemzero(offlens,m*sizeof(int));
1118   jj = 0;
1119   for ( i=0; i<m; i++ ) {
1120     for ( j=0; j<ourlens[i]; j++ ) {
1121       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1122       jj++;
1123     }
1124   }
1125 
1126   /* create our matrix */
1127   for ( i=0; i<m; i++ ) {
1128     ourlens[i] -= offlens[i];
1129   }
1130   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1131   A = *newmat;
1132   for ( i=0; i<m; i++ ) {
1133     ourlens[i] += offlens[i];
1134   }
1135 
1136   if (!rank) {
1137     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1138 
1139     /* read in my part of the matrix numerical values  */
1140     nz = procsnz[0];
1141     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1142 
1143     /* insert into matrix */
1144     jj      = rstart;
1145     smycols = mycols;
1146     svals   = vals;
1147     for ( i=0; i<m; i++ ) {
1148       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1149       smycols += ourlens[i];
1150       svals   += ourlens[i];
1151       jj++;
1152     }
1153 
1154     /* read in other processors and ship out */
1155     for ( i=1; i<size; i++ ) {
1156       nz = procsnz[i];
1157       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1158       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1159     }
1160     PetscFree(procsnz);
1161   }
1162   else {
1163     /* receive numeric values */
1164     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1165 
1166     /* receive message of values*/
1167     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1168     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1169     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1170 
1171     /* insert into matrix */
1172     jj      = rstart;
1173     smycols = mycols;
1174     svals   = vals;
1175     for ( i=0; i<m; i++ ) {
1176       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1177       smycols += ourlens[i];
1178       svals   += ourlens[i];
1179       jj++;
1180     }
1181   }
1182   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1183 
1184   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1185   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1186   return 0;
1187 }
1188 
1189 
1190 
1191 
1192 
1193