xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 43a90d8472dff9fb76015431d971d6cf7e7ec88a)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.54 1996/11/20 19:59:48 curfman Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
13 {
14   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
15   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
16   int          roworiented = A->roworiented;
17 
18   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
19     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
20   }
21   A->insertmode = addv;
22   for ( i=0; i<m; i++ ) {
23     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
24     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
25     if (idxm[i] >= rstart && idxm[i] < rend) {
26       row = idxm[i] - rstart;
27       if (roworiented) {
28         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
29       }
30       else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
33           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
35         }
36       }
37     }
38     else {
39       if (roworiented) {
40         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
41       }
42       else { /* must stash each seperately */
43         row = idxm[i];
44         for ( j=0; j<n; j++ ) {
45           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
46         }
47       }
48     }
49   }
50   return 0;
51 }
52 
53 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
54 {
55   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
56   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
57 
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
65         if (idxn[j] >= mdn->N)
66           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     }
70     else {
71       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
72     }
73   }
74   return 0;
75 }
76 
77 static int MatGetArray_MPIDense(Mat A,Scalar **array)
78 {
79   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
80   int ierr;
81 
82   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
83   return 0;
84 }
85 
86 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
87 {
88   return 0;
89 }
90 
91 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
92 {
93   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
94   MPI_Comm     comm = mat->comm;
95   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
96   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
97   int          tag = mat->tag, *owner,*starts,count,ierr;
98   InsertMode   addv;
99   MPI_Request  *send_waits,*recv_waits;
100   Scalar       *rvalues,*svalues;
101 
102   /* make sure all processors are either in INSERTMODE or ADDMODE */
103   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
104   if (addv == (ADD_VALUES|INSERT_VALUES)) {
105     SETERRQ(1,"MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
106   }
107   mdn->insertmode = addv; /* in case this processor had no cache */
108 
109   /*  first count number of contributors to each processor */
110   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
111   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
112   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
113   for ( i=0; i<mdn->stash.n; i++ ) {
114     idx = mdn->stash.idx[i];
115     for ( j=0; j<size; j++ ) {
116       if (idx >= owners[j] && idx < owners[j+1]) {
117         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
118       }
119     }
120   }
121   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
122 
123   /* inform other processors of number of messages and max length*/
124   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
125   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
126   nreceives = work[rank];
127   if (nreceives > size) SETERRQ(1,"MatAssemblyBegin_MPIDense:Internal PETSc error");
128   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
129   nmax = work[rank];
130   PetscFree(work);
131 
132   /* post receives:
133        1) each message will consist of ordered pairs
134      (global index,value) we store the global index as a double
135      to simplify the message passing.
136        2) since we don't know how long each individual message is we
137      allocate the largest needed buffer for each receive. Potentially
138      this is a lot of wasted space.
139 
140        This could be done better.
141   */
142   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
143   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
144   for ( i=0; i<nreceives; i++ ) {
145     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
146   }
147 
148   /* do sends:
149       1) starts[i] gives the starting index in svalues for stuff going to
150          the ith processor
151   */
152   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
153   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
154   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
155   starts[0] = 0;
156   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
157   for ( i=0; i<mdn->stash.n; i++ ) {
158     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
159     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
160     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
161   }
162   PetscFree(owner);
163   starts[0] = 0;
164   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
165   count = 0;
166   for ( i=0; i<size; i++ ) {
167     if (procs[i]) {
168       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);
169     }
170   }
171   PetscFree(starts); PetscFree(nprocs);
172 
173   /* Free cache space */
174   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
175 
176   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
177   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
178   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
179   mdn->rmax       = nmax;
180 
181   return 0;
182 }
183 extern int MatSetUpMultiply_MPIDense(Mat);
184 
185 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
186 {
187   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
188   MPI_Status   *send_status,recv_status;
189   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
190   Scalar       *values,val;
191   InsertMode   addv = mdn->insertmode;
192 
193   /*  wait on receives */
194   while (count) {
195     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
196     /* unpack receives into our local space */
197     values = mdn->rvalues + 3*imdex*mdn->rmax;
198     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
199     n = n/3;
200     for ( i=0; i<n; i++ ) {
201       row = (int) PetscReal(values[3*i]) - mdn->rstart;
202       col = (int) PetscReal(values[3*i+1]);
203       val = values[3*i+2];
204       if (col >= 0 && col < mdn->N) {
205         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
206       }
207       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
208     }
209     count--;
210   }
211   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
212 
213   /* wait on sends */
214   if (mdn->nsends) {
215     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
216     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
217     PetscFree(send_status);
218   }
219   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
220 
221   mdn->insertmode = NOT_SET_VALUES;
222   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
223   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
224 
225   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
226     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
227   }
228   return 0;
229 }
230 
231 static int MatZeroEntries_MPIDense(Mat A)
232 {
233   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
234   return MatZeroEntries(l->A);
235 }
236 
237 static int MatGetBlockSize_MPIDense(Mat A,int *bs)
238 {
239   *bs = 1;
240   return 0;
241 }
242 
243 /* the code does not do the diagonal entries correctly unless the
244    matrix is square and the column and row owerships are identical.
245    This is a BUG. The only way to fix it seems to be to access
246    mdn->A and mdn->B directly and not through the MatZeroRows()
247    routine.
248 */
249 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
250 {
251   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
252   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
253   int            *procs,*nprocs,j,found,idx,nsends,*work;
254   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
255   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
256   int            *lens,imdex,*lrows,*values;
257   MPI_Comm       comm = A->comm;
258   MPI_Request    *send_waits,*recv_waits;
259   MPI_Status     recv_status,*send_status;
260   IS             istmp;
261 
262   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
263   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
264 
265   /*  first count number of contributors to each processor */
266   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
267   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
268   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
269   for ( i=0; i<N; i++ ) {
270     idx = rows[i];
271     found = 0;
272     for ( j=0; j<size; j++ ) {
273       if (idx >= owners[j] && idx < owners[j+1]) {
274         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
275       }
276     }
277     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
278   }
279   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
280 
281   /* inform other processors of number of messages and max length*/
282   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
283   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
284   nrecvs = work[rank];
285   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
286   nmax = work[rank];
287   PetscFree(work);
288 
289   /* post receives:   */
290   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
291   CHKPTRQ(rvalues);
292   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
293   CHKPTRQ(recv_waits);
294   for ( i=0; i<nrecvs; i++ ) {
295     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
296   }
297 
298   /* do sends:
299       1) starts[i] gives the starting index in svalues for stuff going to
300          the ith processor
301   */
302   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
303   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
304   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
305   starts[0]  = 0;
306   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
307   for ( i=0; i<N; i++ ) {
308     svalues[starts[owner[i]]++] = rows[i];
309   }
310   ISRestoreIndices(is,&rows);
311 
312   starts[0] = 0;
313   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
314   count = 0;
315   for ( i=0; i<size; i++ ) {
316     if (procs[i]) {
317       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
318     }
319   }
320   PetscFree(starts);
321 
322   base = owners[rank];
323 
324   /*  wait on receives */
325   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
326   source = lens + nrecvs;
327   count  = nrecvs; slen = 0;
328   while (count) {
329     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
330     /* unpack receives into our local space */
331     MPI_Get_count(&recv_status,MPI_INT,&n);
332     source[imdex]  = recv_status.MPI_SOURCE;
333     lens[imdex]  = n;
334     slen += n;
335     count--;
336   }
337   PetscFree(recv_waits);
338 
339   /* move the data into the send scatter */
340   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
341   count = 0;
342   for ( i=0; i<nrecvs; i++ ) {
343     values = rvalues + i*nmax;
344     for ( j=0; j<lens[i]; j++ ) {
345       lrows[count++] = values[j] - base;
346     }
347   }
348   PetscFree(rvalues); PetscFree(lens);
349   PetscFree(owner); PetscFree(nprocs);
350 
351   /* actually zap the local rows */
352   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
353   PLogObjectParent(A,istmp);
354   PetscFree(lrows);
355   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
356   ierr = ISDestroy(istmp); CHKERRQ(ierr);
357 
358   /* wait on sends */
359   if (nsends) {
360     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
361     MPI_Waitall(nsends,send_waits,send_status);
362     PetscFree(send_status);
363   }
364   PetscFree(send_waits); PetscFree(svalues);
365 
366   return 0;
367 }
368 
369 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
370 {
371   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
372   int          ierr;
373 
374   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
375   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
376   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
377   return 0;
378 }
379 
380 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
381 {
382   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
383   int          ierr;
384 
385   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
387   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
388   return 0;
389 }
390 
391 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
392 {
393   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
394   int          ierr;
395   Scalar       zero = 0.0;
396 
397   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
398   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
399   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
400   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
401   return 0;
402 }
403 
404 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
405 {
406   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
407   int          ierr;
408 
409   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
410   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
411   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
412   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
413   return 0;
414 }
415 
416 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
417 {
418   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
419   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
420   int          ierr, len, i, n, m = a->m, radd;
421   Scalar       *x, zero = 0.0;
422 
423   VecSet(&zero,v);
424   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
425   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
426   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
427   len = PetscMin(aloc->m,aloc->n);
428   radd = a->rstart*m;
429   for ( i=0; i<len; i++ ) {
430     x[i] = aloc->v[radd + i*m + i];
431   }
432   return 0;
433 }
434 
435 static int MatDestroy_MPIDense(PetscObject obj)
436 {
437   Mat          mat = (Mat) obj;
438   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
439   int          ierr;
440 
441 #if defined(PETSC_LOG)
442   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
443 #endif
444   PetscFree(mdn->rowners);
445   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
446   if (mdn->lvec)   VecDestroy(mdn->lvec);
447   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
448   if (mdn->factor) {
449     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
450     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
451     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
452     PetscFree(mdn->factor);
453   }
454   PetscFree(mdn);
455   if (mat->mapping) {
456     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
457   }
458   PLogObjectDestroy(mat);
459   PetscHeaderDestroy(mat);
460   return 0;
461 }
462 
463 #include "pinclude/pviewer.h"
464 
465 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
466 {
467   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
468   int          ierr;
469 
470   if (mdn->size == 1) {
471     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
472   }
473   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
474   return 0;
475 }
476 
477 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
478 {
479   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
480   int          ierr, format;
481   FILE         *fd;
482   ViewerType   vtype;
483 
484   ViewerGetType(viewer,&vtype);
485   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
486   ierr = ViewerGetFormat(viewer,&format);
487   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
488     int rank;
489     MatInfo info;
490     MPI_Comm_rank(mat->comm,&rank);
491     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
492     PetscSequentialPhaseBegin(mat->comm,1);
493       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
494          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
495       fflush(fd);
496     PetscSequentialPhaseEnd(mat->comm,1);
497     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
498     return 0;
499   }
500   else if (format == VIEWER_FORMAT_ASCII_INFO) {
501     return 0;
502   }
503   if (vtype == ASCII_FILE_VIEWER) {
504     PetscSequentialPhaseBegin(mat->comm,1);
505     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
506              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
507     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
508     fflush(fd);
509     PetscSequentialPhaseEnd(mat->comm,1);
510   }
511   else {
512     int size = mdn->size, rank = mdn->rank;
513     if (size == 1) {
514       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
515     }
516     else {
517       /* assemble the entire matrix onto first processor. */
518       Mat          A;
519       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
520       Scalar       *vals;
521       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
522 
523       if (!rank) {
524         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
525       }
526       else {
527         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
528       }
529       PLogObjectParent(mat,A);
530 
531       /* Copy the matrix ... This isn't the most efficient means,
532          but it's quick for now */
533       row = mdn->rstart; m = Amdn->m;
534       for ( i=0; i<m; i++ ) {
535         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
536         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
537         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         row++;
539       }
540 
541       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
542       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
543       if (!rank) {
544         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
545       }
546       ierr = MatDestroy(A); CHKERRQ(ierr);
547     }
548   }
549   return 0;
550 }
551 
552 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
553 {
554   Mat          mat = (Mat) obj;
555   int          ierr;
556   ViewerType   vtype;
557 
558   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
559   if (vtype == ASCII_FILE_VIEWER) {
560     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
561   }
562   else if (vtype == ASCII_FILES_VIEWER) {
563     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
564   }
565   else if (vtype == BINARY_FILE_VIEWER) {
566     return MatView_MPIDense_Binary(mat,viewer);
567   }
568   return 0;
569 }
570 
571 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
572 {
573   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
574   Mat          mdn = mat->A;
575   int          ierr;
576   double       isend[5], irecv[5];
577 
578   info->rows_global    = (double)mat->M;
579   info->columns_global = (double)mat->N;
580   info->rows_local     = (double)mat->m;
581   info->columns_local  = (double)mat->N;
582   info->block_size     = 1.0;
583   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
584   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
585   isend[3] = info->memory;  isend[4] = info->mallocs;
586   if (flag == MAT_LOCAL) {
587     info->nz_used      = isend[0];
588     info->nz_allocated = isend[1];
589     info->nz_unneeded  = isend[2];
590     info->memory       = isend[3];
591     info->mallocs      = isend[4];
592   } else if (flag == MAT_GLOBAL_MAX) {
593     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
594     info->nz_used      = irecv[0];
595     info->nz_allocated = irecv[1];
596     info->nz_unneeded  = irecv[2];
597     info->memory       = irecv[3];
598     info->mallocs      = irecv[4];
599   } else if (flag == MAT_GLOBAL_SUM) {
600     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
601     info->nz_used      = irecv[0];
602     info->nz_allocated = irecv[1];
603     info->nz_unneeded  = irecv[2];
604     info->memory       = irecv[3];
605     info->mallocs      = irecv[4];
606   }
607   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
608   info->fill_ratio_needed = 0;
609   info->factor_mallocs    = 0;
610   return 0;
611 }
612 
613 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
614    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
615    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
616    extern int MatSolve_MPIDense(Mat,Vec,Vec);
617    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
618    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
619    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
620 
621 static int MatSetOption_MPIDense(Mat A,MatOption op)
622 {
623   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
624 
625   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
626       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
627       op == MAT_COLUMNS_SORTED) {
628         MatSetOption(a->A,op);
629   } else if (op == MAT_ROW_ORIENTED) {
630         a->roworiented = 1;
631         MatSetOption(a->A,op);
632   } else if (op == MAT_ROWS_SORTED ||
633              op == MAT_SYMMETRIC ||
634              op == MAT_STRUCTURALLY_SYMMETRIC ||
635              op == MAT_YES_NEW_DIAGONALS)
636     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
637   else if (op == MAT_COLUMN_ORIENTED)
638     {a->roworiented = 0; MatSetOption(a->A,op);}
639   else if (op == MAT_NO_NEW_DIAGONALS)
640     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:MAT_NO_NEW_DIAGONALS");}
641   else
642     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
643   return 0;
644 }
645 
646 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
647 {
648   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
649   *m = mat->M; *n = mat->N;
650   return 0;
651 }
652 
653 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
654 {
655   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
656   *m = mat->m; *n = mat->N;
657   return 0;
658 }
659 
660 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
661 {
662   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
663   *m = mat->rstart; *n = mat->rend;
664   return 0;
665 }
666 
667 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
668 {
669   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
670   int          lrow, rstart = mat->rstart, rend = mat->rend;
671 
672   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
673   lrow = row - rstart;
674   return MatGetRow(mat->A,lrow,nz,idx,v);
675 }
676 
677 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
678 {
679   if (idx) PetscFree(*idx);
680   if (v) PetscFree(*v);
681   return 0;
682 }
683 
684 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
685 {
686   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
687   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
688   int          ierr, i, j;
689   double       sum = 0.0;
690   Scalar       *v = mat->v;
691 
692   if (mdn->size == 1) {
693     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
694   } else {
695     if (type == NORM_FROBENIUS) {
696       for (i=0; i<mat->n*mat->m; i++ ) {
697 #if defined(PETSC_COMPLEX)
698         sum += real(conj(*v)*(*v)); v++;
699 #else
700         sum += (*v)*(*v); v++;
701 #endif
702       }
703       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
704       *norm = sqrt(*norm);
705       PLogFlops(2*mat->n*mat->m);
706     }
707     else if (type == NORM_1) {
708       double *tmp, *tmp2;
709       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
710       tmp2 = tmp + mdn->N;
711       PetscMemzero(tmp,2*mdn->N*sizeof(double));
712       *norm = 0.0;
713       v = mat->v;
714       for ( j=0; j<mat->n; j++ ) {
715         for ( i=0; i<mat->m; i++ ) {
716           tmp[j] += PetscAbsScalar(*v);  v++;
717         }
718       }
719       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
720       for ( j=0; j<mdn->N; j++ ) {
721         if (tmp2[j] > *norm) *norm = tmp2[j];
722       }
723       PetscFree(tmp);
724       PLogFlops(mat->n*mat->m);
725     }
726     else if (type == NORM_INFINITY) { /* max row norm */
727       double ntemp;
728       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
729       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
730     }
731     else {
732       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
733     }
734   }
735   return 0;
736 }
737 
738 static int MatTranspose_MPIDense(Mat A,Mat *matout)
739 {
740   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
741   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
742   Mat          B;
743   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
744   int          j, i, ierr;
745   Scalar       *v;
746 
747   if (matout == PETSC_NULL && M != N) {
748     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
749   }
750   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
751 
752   m = Aloc->m; n = Aloc->n; v = Aloc->v;
753   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
754   for ( j=0; j<n; j++ ) {
755     for (i=0; i<m; i++) rwork[i] = rstart + i;
756     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
757     v   += m;
758   }
759   PetscFree(rwork);
760   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
761   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
762   if (matout != PETSC_NULL) {
763     *matout = B;
764   } else {
765     /* This isn't really an in-place transpose, but free data struct from a */
766     PetscFree(a->rowners);
767     ierr = MatDestroy(a->A); CHKERRQ(ierr);
768     if (a->lvec) VecDestroy(a->lvec);
769     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
770     PetscFree(a);
771     PetscMemcpy(A,B,sizeof(struct _Mat));
772     PetscHeaderDestroy(B);
773   }
774   return 0;
775 }
776 
777 #include "pinclude/plapack.h"
778 static int MatScale_MPIDense(Scalar *alpha,Mat inA)
779 {
780   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
781   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
782   int          one = 1, nz;
783 
784   nz = a->m*a->n;
785   BLscal_( &nz, alpha, a->v, &one );
786   PLogFlops(nz);
787   return 0;
788 }
789 
790 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
791 
792 /* -------------------------------------------------------------------*/
793 static struct _MatOps MatOps = {MatSetValues_MPIDense,
794        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
795        MatMult_MPIDense,MatMultAdd_MPIDense,
796        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
797 /*       MatSolve_MPIDense,0, */
798        0,0,
799        0,0,
800        0,0,
801 /*       MatLUFactor_MPIDense,0, */
802        0,MatTranspose_MPIDense,
803        MatGetInfo_MPIDense,0,
804        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
805        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
806        0,
807        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
808        0,0,
809 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
810        0,0,
811        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
812        MatGetOwnershipRange_MPIDense,
813        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
814        0,MatConvertSameType_MPIDense,
815        0,0,0,0,0,
816        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense,
817        0,0,0,MatGetBlockSize_MPIDense};
818 
819 /*@C
820    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
821 
822    Input Parameters:
823 .  comm - MPI communicator
824 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
825 .  n - number of local columns (or PETSC_DECIDE to have calculated
826            if N is given)
827 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
828 .  N - number of global columns (or PETSC_DECIDE to have calculated
829            if n is given)
830 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
831    to control all matrix memory allocation.
832 
833    Output Parameter:
834 .  A - the matrix
835 
836    Notes:
837    The dense format is fully compatible with standard Fortran 77
838    storage by columns.
839 
840    The data input variable is intended primarily for Fortran programmers
841    who wish to allocate their own matrix memory space.  Most users should
842    set data=PETSC_NULL.
843 
844    The user MUST specify either the local or global matrix dimensions
845    (possibly both).
846 
847    Currently, the only parallel dense matrix decomposition is by rows,
848    so that n=N and each submatrix owns all of the global columns.
849 
850 .keywords: matrix, dense, parallel
851 
852 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
853 @*/
854 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
855 {
856   Mat          mat;
857   Mat_MPIDense *a;
858   int          ierr, i,flg;
859 
860   /* Note:  For now, when data is specified above, this assumes the user correctly
861    allocates the local dense storage space.  We should add error checking. */
862 
863   *A = 0;
864   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
865   PLogObjectCreate(mat);
866   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
867   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
868   mat->destroy    = MatDestroy_MPIDense;
869   mat->view       = MatView_MPIDense;
870   mat->factor     = 0;
871   mat->mapping    = 0;
872 
873   a->factor       = 0;
874   a->insertmode   = NOT_SET_VALUES;
875   MPI_Comm_rank(comm,&a->rank);
876   MPI_Comm_size(comm,&a->size);
877 
878   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
879   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
880 
881   /* each row stores all columns */
882   if (N == PETSC_DECIDE) N = n;
883   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
884   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
885   a->N = mat->N = N;
886   a->M = mat->M = M;
887   a->m = mat->m = m;
888   a->n = mat->n = n;
889 
890   /* build local table of row and column ownerships */
891   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
892   a->cowners = a->rowners + a->size + 1;
893   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
894   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
895   a->rowners[0] = 0;
896   for ( i=2; i<=a->size; i++ ) {
897     a->rowners[i] += a->rowners[i-1];
898   }
899   a->rstart = a->rowners[a->rank];
900   a->rend   = a->rowners[a->rank+1];
901   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
902   a->cowners[0] = 0;
903   for ( i=2; i<=a->size; i++ ) {
904     a->cowners[i] += a->cowners[i-1];
905   }
906 
907   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
908   PLogObjectParent(mat,a->A);
909 
910   /* build cache for off array entries formed */
911   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
912 
913   /* stuff used for matrix vector multiply */
914   a->lvec        = 0;
915   a->Mvctx       = 0;
916   a->roworiented = 1;
917 
918   *A = mat;
919   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
920   if (flg) {
921     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
922   }
923   return 0;
924 }
925 
926 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
927 {
928   Mat          mat;
929   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
930   int          ierr;
931   FactorCtx    *factor;
932 
933   *newmat       = 0;
934   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
935   PLogObjectCreate(mat);
936   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
937   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
938   mat->destroy   = MatDestroy_MPIDense;
939   mat->view      = MatView_MPIDense;
940   mat->factor    = A->factor;
941   mat->assembled = PETSC_TRUE;
942 
943   a->m = mat->m = oldmat->m;
944   a->n = mat->n = oldmat->n;
945   a->M = mat->M = oldmat->M;
946   a->N = mat->N = oldmat->N;
947   if (oldmat->factor) {
948     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
949     /* copy factor contents ... add this code! */
950   } else a->factor = 0;
951 
952   a->rstart     = oldmat->rstart;
953   a->rend       = oldmat->rend;
954   a->size       = oldmat->size;
955   a->rank       = oldmat->rank;
956   a->insertmode = NOT_SET_VALUES;
957 
958   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
959   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
960   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
961   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
962 
963   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
964   PLogObjectParent(mat,a->lvec);
965   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
966   PLogObjectParent(mat,a->Mvctx);
967   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
968   PLogObjectParent(mat,a->A);
969   *newmat = mat;
970   return 0;
971 }
972 
973 #include "sys.h"
974 
975 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
976 {
977   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
978   Scalar     *array,*vals,*vals_ptr;
979   MPI_Status status;
980 
981   MPI_Comm_rank(comm,&rank);
982   MPI_Comm_size(comm,&size);
983 
984   /* determine ownership of all rows */
985   m = M/size + ((M % size) > rank);
986   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
987   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
988   rowners[0] = 0;
989   for ( i=2; i<=size; i++ ) {
990     rowners[i] += rowners[i-1];
991   }
992   rstart = rowners[rank];
993   rend   = rowners[rank+1];
994 
995   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
996   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
997 
998   if (!rank) {
999     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1000 
1001     /* read in my part of the matrix numerical values  */
1002     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
1003 
1004     /* insert into matrix-by row (this is why cannot directly read into array */
1005     vals_ptr = vals;
1006     for ( i=0; i<m; i++ ) {
1007       for ( j=0; j<N; j++ ) {
1008         array[i + j*m] = *vals_ptr++;
1009       }
1010     }
1011 
1012     /* read in other processors and ship out */
1013     for ( i=1; i<size; i++ ) {
1014       nz   = (rowners[i+1] - rowners[i])*N;
1015       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1016       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
1017     }
1018   }
1019   else {
1020     /* receive numeric values */
1021     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1022 
1023     /* receive message of values*/
1024     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1025 
1026     /* insert into matrix-by row (this is why cannot directly read into array */
1027     vals_ptr = vals;
1028     for ( i=0; i<m; i++ ) {
1029       for ( j=0; j<N; j++ ) {
1030         array[i + j*m] = *vals_ptr++;
1031       }
1032     }
1033   }
1034   PetscFree(rowners);
1035   PetscFree(vals);
1036   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1037   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1038   return 0;
1039 }
1040 
1041 
1042 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1043 {
1044   Mat          A;
1045   int          i, nz, ierr, j,rstart, rend, fd;
1046   Scalar       *vals,*svals;
1047   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1048   MPI_Status   status;
1049   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1050   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1051   int          tag = ((PetscObject)viewer)->tag;
1052 
1053   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1054   if (!rank) {
1055     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1056     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1057     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1058   }
1059 
1060   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1061   M = header[1]; N = header[2]; nz = header[3];
1062 
1063   /*
1064        Handle case where matrix is stored on disk as a dense matrix
1065   */
1066   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1067     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1068   }
1069 
1070   /* determine ownership of all rows */
1071   m = M/size + ((M % size) > rank);
1072   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1073   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1074   rowners[0] = 0;
1075   for ( i=2; i<=size; i++ ) {
1076     rowners[i] += rowners[i-1];
1077   }
1078   rstart = rowners[rank];
1079   rend   = rowners[rank+1];
1080 
1081   /* distribute row lengths to all processors */
1082   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1083   offlens = ourlens + (rend-rstart);
1084   if (!rank) {
1085     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1086     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1087     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1088     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1089     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1090     PetscFree(sndcounts);
1091   }
1092   else {
1093     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1094   }
1095 
1096   if (!rank) {
1097     /* calculate the number of nonzeros on each processor */
1098     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1099     PetscMemzero(procsnz,size*sizeof(int));
1100     for ( i=0; i<size; i++ ) {
1101       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1102         procsnz[i] += rowlengths[j];
1103       }
1104     }
1105     PetscFree(rowlengths);
1106 
1107     /* determine max buffer needed and allocate it */
1108     maxnz = 0;
1109     for ( i=0; i<size; i++ ) {
1110       maxnz = PetscMax(maxnz,procsnz[i]);
1111     }
1112     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1113 
1114     /* read in my part of the matrix column indices  */
1115     nz = procsnz[0];
1116     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1117     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1118 
1119     /* read in every one elses and ship off */
1120     for ( i=1; i<size; i++ ) {
1121       nz = procsnz[i];
1122       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1123       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1124     }
1125     PetscFree(cols);
1126   }
1127   else {
1128     /* determine buffer space needed for message */
1129     nz = 0;
1130     for ( i=0; i<m; i++ ) {
1131       nz += ourlens[i];
1132     }
1133     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1134 
1135     /* receive message of column indices*/
1136     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1137     MPI_Get_count(&status,MPI_INT,&maxnz);
1138     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1139   }
1140 
1141   /* loop over local rows, determining number of off diagonal entries */
1142   PetscMemzero(offlens,m*sizeof(int));
1143   jj = 0;
1144   for ( i=0; i<m; i++ ) {
1145     for ( j=0; j<ourlens[i]; j++ ) {
1146       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1147       jj++;
1148     }
1149   }
1150 
1151   /* create our matrix */
1152   for ( i=0; i<m; i++ ) {
1153     ourlens[i] -= offlens[i];
1154   }
1155   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1156   A = *newmat;
1157   for ( i=0; i<m; i++ ) {
1158     ourlens[i] += offlens[i];
1159   }
1160 
1161   if (!rank) {
1162     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1163 
1164     /* read in my part of the matrix numerical values  */
1165     nz = procsnz[0];
1166     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1167 
1168     /* insert into matrix */
1169     jj      = rstart;
1170     smycols = mycols;
1171     svals   = vals;
1172     for ( i=0; i<m; i++ ) {
1173       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1174       smycols += ourlens[i];
1175       svals   += ourlens[i];
1176       jj++;
1177     }
1178 
1179     /* read in other processors and ship out */
1180     for ( i=1; i<size; i++ ) {
1181       nz = procsnz[i];
1182       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1183       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1184     }
1185     PetscFree(procsnz);
1186   }
1187   else {
1188     /* receive numeric values */
1189     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1190 
1191     /* receive message of values*/
1192     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1193     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1194     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1195 
1196     /* insert into matrix */
1197     jj      = rstart;
1198     smycols = mycols;
1199     svals   = vals;
1200     for ( i=0; i<m; i++ ) {
1201       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1202       smycols += ourlens[i];
1203       svals   += ourlens[i];
1204       jj++;
1205     }
1206   }
1207   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1208 
1209   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1210   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1211   return 0;
1212 }
1213 
1214 
1215 
1216 
1217 
1218