xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 90f02eec332fcca4c33b4e7b21cabed76bf0614e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.51 1996/11/07 15:09:14 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
13 {
14   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
15   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
16   int          roworiented = A->roworiented;
17 
18   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
19     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
20   }
21   A->insertmode = addv;
22   for ( i=0; i<m; i++ ) {
23     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
24     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
25     if (idxm[i] >= rstart && idxm[i] < rend) {
26       row = idxm[i] - rstart;
27       if (roworiented) {
28         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
29       }
30       else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
33           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
35         }
36       }
37     }
38     else {
39       if (roworiented) {
40         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
41       }
42       else { /* must stash each seperately */
43         row = idxm[i];
44         for ( j=0; j<n; j++ ) {
45           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
46         }
47       }
48     }
49   }
50   return 0;
51 }
52 
53 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
54 {
55   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
56   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
57 
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
65         if (idxn[j] >= mdn->N)
66           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     }
70     else {
71       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
72     }
73   }
74   return 0;
75 }
76 
77 static int MatGetArray_MPIDense(Mat A,Scalar **array)
78 {
79   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
80   int ierr;
81 
82   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
83   return 0;
84 }
85 
86 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
87 {
88   return 0;
89 }
90 
91 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
92 {
93   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
94   MPI_Comm     comm = mat->comm;
95   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
96   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
97   int          tag = mat->tag, *owner,*starts,count,ierr;
98   InsertMode   addv;
99   MPI_Request  *send_waits,*recv_waits;
100   Scalar       *rvalues,*svalues;
101 
102   /* make sure all processors are either in INSERTMODE or ADDMODE */
103   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
104   if (addv == (ADD_VALUES|INSERT_VALUES)) {
105     SETERRQ(1,"MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
106   }
107   mdn->insertmode = addv; /* in case this processor had no cache */
108 
109   /*  first count number of contributors to each processor */
110   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
111   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
112   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
113   for ( i=0; i<mdn->stash.n; i++ ) {
114     idx = mdn->stash.idx[i];
115     for ( j=0; j<size; j++ ) {
116       if (idx >= owners[j] && idx < owners[j+1]) {
117         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
118       }
119     }
120   }
121   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
122 
123   /* inform other processors of number of messages and max length*/
124   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
125   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
126   nreceives = work[rank];
127   if (nreceives > size) SETERRQ(1,"MatAssemblyBegin_MPIDense:Internal PETSc error");
128   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
129   nmax = work[rank];
130   PetscFree(work);
131 
132   /* post receives:
133        1) each message will consist of ordered pairs
134      (global index,value) we store the global index as a double
135      to simplify the message passing.
136        2) since we don't know how long each individual message is we
137      allocate the largest needed buffer for each receive. Potentially
138      this is a lot of wasted space.
139 
140        This could be done better.
141   */
142   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
143   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
144   for ( i=0; i<nreceives; i++ ) {
145     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
146   }
147 
148   /* do sends:
149       1) starts[i] gives the starting index in svalues for stuff going to
150          the ith processor
151   */
152   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
153   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
154   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
155   starts[0] = 0;
156   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
157   for ( i=0; i<mdn->stash.n; i++ ) {
158     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
159     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
160     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
161   }
162   PetscFree(owner);
163   starts[0] = 0;
164   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
165   count = 0;
166   for ( i=0; i<size; i++ ) {
167     if (procs[i]) {
168       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);
169     }
170   }
171   PetscFree(starts); PetscFree(nprocs);
172 
173   /* Free cache space */
174   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
175 
176   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
177   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
178   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
179   mdn->rmax       = nmax;
180 
181   return 0;
182 }
183 extern int MatSetUpMultiply_MPIDense(Mat);
184 
185 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
186 {
187   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
188   MPI_Status   *send_status,recv_status;
189   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
190   Scalar       *values,val;
191   InsertMode   addv = mdn->insertmode;
192 
193   /*  wait on receives */
194   while (count) {
195     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
196     /* unpack receives into our local space */
197     values = mdn->rvalues + 3*imdex*mdn->rmax;
198     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
199     n = n/3;
200     for ( i=0; i<n; i++ ) {
201       row = (int) PetscReal(values[3*i]) - mdn->rstart;
202       col = (int) PetscReal(values[3*i+1]);
203       val = values[3*i+2];
204       if (col >= 0 && col < mdn->N) {
205         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
206       }
207       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
208     }
209     count--;
210   }
211   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
212 
213   /* wait on sends */
214   if (mdn->nsends) {
215     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
216     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
217     PetscFree(send_status);
218   }
219   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
220 
221   mdn->insertmode = NOT_SET_VALUES;
222   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
223   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
224 
225   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
226     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
227   }
228   return 0;
229 }
230 
231 static int MatZeroEntries_MPIDense(Mat A)
232 {
233   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
234   return MatZeroEntries(l->A);
235 }
236 
237 static int MatGetBlockSize_MPIDense(Mat A,int *bs)
238 {
239   *bs = 1;
240   return 0;
241 }
242 
243 /* the code does not do the diagonal entries correctly unless the
244    matrix is square and the column and row owerships are identical.
245    This is a BUG. The only way to fix it seems to be to access
246    mdn->A and mdn->B directly and not through the MatZeroRows()
247    routine.
248 */
249 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
250 {
251   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
252   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
253   int            *procs,*nprocs,j,found,idx,nsends,*work;
254   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
255   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
256   int            *lens,imdex,*lrows,*values;
257   MPI_Comm       comm = A->comm;
258   MPI_Request    *send_waits,*recv_waits;
259   MPI_Status     recv_status,*send_status;
260   IS             istmp;
261 
262   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
263   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
264 
265   /*  first count number of contributors to each processor */
266   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
267   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
268   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
269   for ( i=0; i<N; i++ ) {
270     idx = rows[i];
271     found = 0;
272     for ( j=0; j<size; j++ ) {
273       if (idx >= owners[j] && idx < owners[j+1]) {
274         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
275       }
276     }
277     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
278   }
279   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
280 
281   /* inform other processors of number of messages and max length*/
282   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
283   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
284   nrecvs = work[rank];
285   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
286   nmax = work[rank];
287   PetscFree(work);
288 
289   /* post receives:   */
290   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
291   CHKPTRQ(rvalues);
292   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
293   CHKPTRQ(recv_waits);
294   for ( i=0; i<nrecvs; i++ ) {
295     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
296   }
297 
298   /* do sends:
299       1) starts[i] gives the starting index in svalues for stuff going to
300          the ith processor
301   */
302   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
303   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
304   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
305   starts[0]  = 0;
306   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
307   for ( i=0; i<N; i++ ) {
308     svalues[starts[owner[i]]++] = rows[i];
309   }
310   ISRestoreIndices(is,&rows);
311 
312   starts[0] = 0;
313   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
314   count = 0;
315   for ( i=0; i<size; i++ ) {
316     if (procs[i]) {
317       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
318     }
319   }
320   PetscFree(starts);
321 
322   base = owners[rank];
323 
324   /*  wait on receives */
325   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
326   source = lens + nrecvs;
327   count  = nrecvs; slen = 0;
328   while (count) {
329     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
330     /* unpack receives into our local space */
331     MPI_Get_count(&recv_status,MPI_INT,&n);
332     source[imdex]  = recv_status.MPI_SOURCE;
333     lens[imdex]  = n;
334     slen += n;
335     count--;
336   }
337   PetscFree(recv_waits);
338 
339   /* move the data into the send scatter */
340   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
341   count = 0;
342   for ( i=0; i<nrecvs; i++ ) {
343     values = rvalues + i*nmax;
344     for ( j=0; j<lens[i]; j++ ) {
345       lrows[count++] = values[j] - base;
346     }
347   }
348   PetscFree(rvalues); PetscFree(lens);
349   PetscFree(owner); PetscFree(nprocs);
350 
351   /* actually zap the local rows */
352   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
353   PLogObjectParent(A,istmp);
354   PetscFree(lrows);
355   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
356   ierr = ISDestroy(istmp); CHKERRQ(ierr);
357 
358   /* wait on sends */
359   if (nsends) {
360     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
361     MPI_Waitall(nsends,send_waits,send_status);
362     PetscFree(send_status);
363   }
364   PetscFree(send_waits); PetscFree(svalues);
365 
366   return 0;
367 }
368 
369 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
370 {
371   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
372   int          ierr;
373 
374   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
375   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
376   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
377   return 0;
378 }
379 
380 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
381 {
382   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
383   int          ierr;
384 
385   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
387   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
388   return 0;
389 }
390 
391 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
392 {
393   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
394   int          ierr;
395   Scalar       zero = 0.0;
396 
397   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
398   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
399   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
400   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
401   return 0;
402 }
403 
404 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
405 {
406   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
407   int          ierr;
408 
409   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
410   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
411   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
412   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
413   return 0;
414 }
415 
416 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
417 {
418   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
419   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
420   int          ierr, len, i, n, m = a->m, radd;
421   Scalar       *x, zero = 0.0;
422 
423   VecSet(&zero,v);
424   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
425   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
426   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
427   len = PetscMin(aloc->m,aloc->n);
428   radd = a->rstart*m;
429   for ( i=0; i<len; i++ ) {
430     x[i] = aloc->v[radd + i*m + i];
431   }
432   return 0;
433 }
434 
435 static int MatDestroy_MPIDense(PetscObject obj)
436 {
437   Mat          mat = (Mat) obj;
438   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
439   int          ierr;
440 
441 #if defined(PETSC_LOG)
442   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
443 #endif
444   PetscFree(mdn->rowners);
445   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
446   if (mdn->lvec)   VecDestroy(mdn->lvec);
447   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
448   if (mdn->factor) {
449     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
450     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
451     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
452     PetscFree(mdn->factor);
453   }
454   PetscFree(mdn);
455   if (mat->mapping) {
456     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
457   }
458   PLogObjectDestroy(mat);
459   PetscHeaderDestroy(mat);
460   return 0;
461 }
462 
463 #include "pinclude/pviewer.h"
464 
465 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
466 {
467   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
468   int          ierr;
469 
470   if (mdn->size == 1) {
471     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
472   }
473   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
474   return 0;
475 }
476 
477 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
478 {
479   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
480   int          ierr, format;
481   FILE         *fd;
482   ViewerType   vtype;
483 
484   ViewerGetType(viewer,&vtype);
485   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
486   ierr = ViewerGetFormat(viewer,&format);
487   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
488     int rank;
489     MatInfo info;
490     MPI_Comm_rank(mat->comm,&rank);
491     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
492     PetscSequentialPhaseBegin(mat->comm,1);
493       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
494          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
495       fflush(fd);
496     PetscSequentialPhaseEnd(mat->comm,1);
497     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
498     return 0;
499   }
500   else if (format == VIEWER_FORMAT_ASCII_INFO) {
501     return 0;
502   }
503   if (vtype == ASCII_FILE_VIEWER) {
504     PetscSequentialPhaseBegin(mat->comm,1);
505     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
506              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
507     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
508     fflush(fd);
509     PetscSequentialPhaseEnd(mat->comm,1);
510   }
511   else {
512     int size = mdn->size, rank = mdn->rank;
513     if (size == 1) {
514       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
515     }
516     else {
517       /* assemble the entire matrix onto first processor. */
518       Mat          A;
519       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
520       Scalar       *vals;
521       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
522 
523       if (!rank) {
524         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
525       }
526       else {
527         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
528       }
529       PLogObjectParent(mat,A);
530 
531       /* Copy the matrix ... This isn't the most efficient means,
532          but it's quick for now */
533       row = mdn->rstart; m = Amdn->m;
534       for ( i=0; i<m; i++ ) {
535         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
536         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
537         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         row++;
539       }
540 
541       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
542       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
543       if (!rank) {
544         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
545       }
546       ierr = MatDestroy(A); CHKERRQ(ierr);
547     }
548   }
549   return 0;
550 }
551 
552 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
553 {
554   Mat          mat = (Mat) obj;
555   int          ierr;
556   ViewerType   vtype;
557 
558   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
559   if (vtype == ASCII_FILE_VIEWER) {
560     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
561   }
562   else if (vtype == ASCII_FILES_VIEWER) {
563     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
564   }
565   else if (vtype == BINARY_FILE_VIEWER) {
566     return MatView_MPIDense_Binary(mat,viewer);
567   }
568   return 0;
569 }
570 
571 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
572 {
573   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
574   Mat          mdn = mat->A;
575   int          ierr;
576   double       isend[5], irecv[5];
577 
578   info->rows_global    = (double)mat->M;
579   info->columns_global = (double)mat->N;
580   info->rows_local     = (double)mat->m;
581   info->columns_local  = (double)mat->N;
582   info->block_size     = 1.0;
583   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
584   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
585   isend[3] = info->memory;  isend[4] = info->mallocs;
586   if (flag == MAT_LOCAL) {
587     info->nz_used      = isend[0];
588     info->nz_allocated = isend[1];
589     info->nz_unneeded  = isend[2];
590     info->memory       = isend[3];
591     info->mallocs      = isend[4];
592   } else if (flag == MAT_GLOBAL_MAX) {
593     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
594     info->nz_used      = irecv[0];
595     info->nz_allocated = irecv[1];
596     info->nz_unneeded  = irecv[2];
597     info->memory       = irecv[3];
598     info->mallocs      = irecv[4];
599   } else if (flag == MAT_GLOBAL_SUM) {
600     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
601     info->nz_used      = irecv[0];
602     info->nz_allocated = irecv[1];
603     info->nz_unneeded  = irecv[2];
604     info->memory       = irecv[3];
605     info->mallocs      = irecv[4];
606   }
607   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
608   info->fill_ratio_needed = 0;
609   info->factor_mallocs    = 0;
610   return 0;
611 }
612 
613 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
614    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
615    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
616    extern int MatSolve_MPIDense(Mat,Vec,Vec);
617    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
618    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
619    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
620 
621 static int MatSetOption_MPIDense(Mat A,MatOption op)
622 {
623   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
624 
625   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
626       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
627       op == MAT_COLUMNS_SORTED ||
628       op == MAT_ROW_ORIENTED) {
629         MatSetOption(a->A,op);
630   }
631   else if (op == MAT_ROWS_SORTED ||
632            op == MAT_SYMMETRIC ||
633            op == MAT_STRUCTURALLY_SYMMETRIC ||
634            op == MAT_YES_NEW_DIAGONALS)
635     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
636   else if (op == MAT_COLUMN_ORIENTED)
637     { a->roworiented = 0; MatSetOption(a->A,op);}
638   else if (op == MAT_NO_NEW_DIAGONALS)
639     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:MAT_NO_NEW_DIAGONALS");}
640   else
641     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
642   return 0;
643 }
644 
645 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
646 {
647   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
648   *m = mat->M; *n = mat->N;
649   return 0;
650 }
651 
652 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
653 {
654   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
655   *m = mat->m; *n = mat->N;
656   return 0;
657 }
658 
659 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
660 {
661   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
662   *m = mat->rstart; *n = mat->rend;
663   return 0;
664 }
665 
666 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
667 {
668   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
669   int          lrow, rstart = mat->rstart, rend = mat->rend;
670 
671   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
672   lrow = row - rstart;
673   return MatGetRow(mat->A,lrow,nz,idx,v);
674 }
675 
676 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
677 {
678   if (idx) PetscFree(*idx);
679   if (v) PetscFree(*v);
680   return 0;
681 }
682 
683 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
684 {
685   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
686   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
687   int          ierr, i, j;
688   double       sum = 0.0;
689   Scalar       *v = mat->v;
690 
691   if (mdn->size == 1) {
692     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
693   } else {
694     if (type == NORM_FROBENIUS) {
695       for (i=0; i<mat->n*mat->m; i++ ) {
696 #if defined(PETSC_COMPLEX)
697         sum += real(conj(*v)*(*v)); v++;
698 #else
699         sum += (*v)*(*v); v++;
700 #endif
701       }
702       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
703       *norm = sqrt(*norm);
704       PLogFlops(2*mat->n*mat->m);
705     }
706     else if (type == NORM_1) {
707       double *tmp, *tmp2;
708       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
709       tmp2 = tmp + mdn->N;
710       PetscMemzero(tmp,2*mdn->N*sizeof(double));
711       *norm = 0.0;
712       v = mat->v;
713       for ( j=0; j<mat->n; j++ ) {
714         for ( i=0; i<mat->m; i++ ) {
715           tmp[j] += PetscAbsScalar(*v);  v++;
716         }
717       }
718       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
719       for ( j=0; j<mdn->N; j++ ) {
720         if (tmp2[j] > *norm) *norm = tmp2[j];
721       }
722       PetscFree(tmp);
723       PLogFlops(mat->n*mat->m);
724     }
725     else if (type == NORM_INFINITY) { /* max row norm */
726       double ntemp;
727       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
728       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
729     }
730     else {
731       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
732     }
733   }
734   return 0;
735 }
736 
737 static int MatTranspose_MPIDense(Mat A,Mat *matout)
738 {
739   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
740   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
741   Mat          B;
742   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
743   int          j, i, ierr;
744   Scalar       *v;
745 
746   if (matout == PETSC_NULL && M != N) {
747     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
748   }
749   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
750 
751   m = Aloc->m; n = Aloc->n; v = Aloc->v;
752   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
753   for ( j=0; j<n; j++ ) {
754     for (i=0; i<m; i++) rwork[i] = rstart + i;
755     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
756     v   += m;
757   }
758   PetscFree(rwork);
759   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
760   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
761   if (matout != PETSC_NULL) {
762     *matout = B;
763   } else {
764     /* This isn't really an in-place transpose, but free data struct from a */
765     PetscFree(a->rowners);
766     ierr = MatDestroy(a->A); CHKERRQ(ierr);
767     if (a->lvec) VecDestroy(a->lvec);
768     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
769     PetscFree(a);
770     PetscMemcpy(A,B,sizeof(struct _Mat));
771     PetscHeaderDestroy(B);
772   }
773   return 0;
774 }
775 
776 #include "pinclude/plapack.h"
777 static int MatScale_MPIDense(Scalar *alpha,Mat inA)
778 {
779   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
780   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
781   int          one = 1, nz;
782 
783   nz = a->m*a->n;
784   BLscal_( &nz, alpha, a->v, &one );
785   PLogFlops(nz);
786   return 0;
787 }
788 
789 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
790 
791 /* -------------------------------------------------------------------*/
792 static struct _MatOps MatOps = {MatSetValues_MPIDense,
793        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
794        MatMult_MPIDense,MatMultAdd_MPIDense,
795        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
796 /*       MatSolve_MPIDense,0, */
797        0,0,
798        0,0,
799        0,0,
800 /*       MatLUFactor_MPIDense,0, */
801        0,MatTranspose_MPIDense,
802        MatGetInfo_MPIDense,0,
803        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
804        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
805        0,
806        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
807        0,0,
808 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
809        0,0,
810        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
811        MatGetOwnershipRange_MPIDense,
812        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
813        0,MatConvertSameType_MPIDense,
814        0,0,0,0,0,
815        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense,
816        0,0,0,MatGetBlockSize_MPIDense};
817 
818 /*@C
819    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
820 
821    Input Parameters:
822 .  comm - MPI communicator
823 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
824 .  n - number of local columns (or PETSC_DECIDE to have calculated
825            if N is given)
826 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
827 .  N - number of global columns (or PETSC_DECIDE to have calculated
828            if n is given)
829 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
830    to control all matrix memory allocation.
831 
832    Output Parameter:
833 .  A - the matrix
834 
835    Notes:
836    The dense format is fully compatible with standard Fortran 77
837    storage by columns.
838 
839    The data input variable is intended primarily for Fortran programmers
840    who wish to allocate their own matrix memory space.  Most users should
841    set data=PETSC_NULL.
842 
843    The user MUST specify either the local or global matrix dimensions
844    (possibly both).
845 
846    Currently, the only parallel dense matrix decomposition is by rows,
847    so that n=N and each submatrix owns all of the global columns.
848 
849 .keywords: matrix, dense, parallel
850 
851 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
852 @*/
853 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
854 {
855   Mat          mat;
856   Mat_MPIDense *a;
857   int          ierr, i,flg;
858 
859   /* Note:  For now, when data is specified above, this assumes the user correctly
860    allocates the local dense storage space.  We should add error checking. */
861 
862   *A = 0;
863   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
864   PLogObjectCreate(mat);
865   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
866   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
867   mat->destroy    = MatDestroy_MPIDense;
868   mat->view       = MatView_MPIDense;
869   mat->factor     = 0;
870   mat->mapping    = 0;
871 
872   a->factor       = 0;
873   a->insertmode   = NOT_SET_VALUES;
874   MPI_Comm_rank(comm,&a->rank);
875   MPI_Comm_size(comm,&a->size);
876 
877   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
878   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
879 
880   /* each row stores all columns */
881   if (N == PETSC_DECIDE) N = n;
882   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
883   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
884   a->N = mat->N = N;
885   a->M = mat->M = M;
886   a->m = mat->m = m;
887   a->n = mat->n = n;
888 
889   /* build local table of row and column ownerships */
890   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
891   a->cowners = a->rowners + a->size + 1;
892   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
893   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
894   a->rowners[0] = 0;
895   for ( i=2; i<=a->size; i++ ) {
896     a->rowners[i] += a->rowners[i-1];
897   }
898   a->rstart = a->rowners[a->rank];
899   a->rend   = a->rowners[a->rank+1];
900   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
901   a->cowners[0] = 0;
902   for ( i=2; i<=a->size; i++ ) {
903     a->cowners[i] += a->cowners[i-1];
904   }
905 
906   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
907   PLogObjectParent(mat,a->A);
908 
909   /* build cache for off array entries formed */
910   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
911 
912   /* stuff used for matrix vector multiply */
913   a->lvec        = 0;
914   a->Mvctx       = 0;
915   a->roworiented = 1;
916 
917   *A = mat;
918   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
919   if (flg) {
920     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
921   }
922   return 0;
923 }
924 
925 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
926 {
927   Mat          mat;
928   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
929   int          ierr;
930   FactorCtx    *factor;
931 
932   *newmat       = 0;
933   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
934   PLogObjectCreate(mat);
935   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
936   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
937   mat->destroy   = MatDestroy_MPIDense;
938   mat->view      = MatView_MPIDense;
939   mat->factor    = A->factor;
940   mat->assembled = PETSC_TRUE;
941 
942   a->m = mat->m = oldmat->m;
943   a->n = mat->n = oldmat->n;
944   a->M = mat->M = oldmat->M;
945   a->N = mat->N = oldmat->N;
946   if (oldmat->factor) {
947     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
948     /* copy factor contents ... add this code! */
949   } else a->factor = 0;
950 
951   a->rstart     = oldmat->rstart;
952   a->rend       = oldmat->rend;
953   a->size       = oldmat->size;
954   a->rank       = oldmat->rank;
955   a->insertmode = NOT_SET_VALUES;
956 
957   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
958   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
959   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
960   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
961 
962   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
963   PLogObjectParent(mat,a->lvec);
964   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
965   PLogObjectParent(mat,a->Mvctx);
966   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
967   PLogObjectParent(mat,a->A);
968   *newmat = mat;
969   return 0;
970 }
971 
972 #include "sys.h"
973 
974 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
975 {
976   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
977   Scalar     *array,*vals,*vals_ptr;
978   MPI_Status status;
979 
980   MPI_Comm_rank(comm,&rank);
981   MPI_Comm_size(comm,&size);
982 
983   /* determine ownership of all rows */
984   m = M/size + ((M % size) > rank);
985   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
986   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
987   rowners[0] = 0;
988   for ( i=2; i<=size; i++ ) {
989     rowners[i] += rowners[i-1];
990   }
991   rstart = rowners[rank];
992   rend   = rowners[rank+1];
993 
994   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
995   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
996 
997   if (!rank) {
998     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
999 
1000     /* read in my part of the matrix numerical values  */
1001     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
1002 
1003     /* insert into matrix-by row (this is why cannot directly read into array */
1004     vals_ptr = vals;
1005     for ( i=0; i<m; i++ ) {
1006       for ( j=0; j<N; j++ ) {
1007         array[i + j*m] = *vals_ptr++;
1008       }
1009     }
1010 
1011     /* read in other processors and ship out */
1012     for ( i=1; i<size; i++ ) {
1013       nz   = (rowners[i+1] - rowners[i])*N;
1014       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1015       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
1016     }
1017   }
1018   else {
1019     /* receive numeric values */
1020     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1021 
1022     /* receive message of values*/
1023     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1024 
1025     /* insert into matrix-by row (this is why cannot directly read into array */
1026     vals_ptr = vals;
1027     for ( i=0; i<m; i++ ) {
1028       for ( j=0; j<N; j++ ) {
1029         array[i + j*m] = *vals_ptr++;
1030       }
1031     }
1032   }
1033   PetscFree(rowners);
1034   PetscFree(vals);
1035   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1036   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1037   return 0;
1038 }
1039 
1040 
1041 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1042 {
1043   Mat          A;
1044   int          i, nz, ierr, j,rstart, rend, fd;
1045   Scalar       *vals,*svals;
1046   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1047   MPI_Status   status;
1048   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1049   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1050   int          tag = ((PetscObject)viewer)->tag;
1051 
1052   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1053   if (!rank) {
1054     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1055     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1056     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1057   }
1058 
1059   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1060   M = header[1]; N = header[2]; nz = header[3];
1061 
1062   /*
1063        Handle case where matrix is stored on disk as a dense matrix
1064   */
1065   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1066     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1067   }
1068 
1069   /* determine ownership of all rows */
1070   m = M/size + ((M % size) > rank);
1071   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1072   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1073   rowners[0] = 0;
1074   for ( i=2; i<=size; i++ ) {
1075     rowners[i] += rowners[i-1];
1076   }
1077   rstart = rowners[rank];
1078   rend   = rowners[rank+1];
1079 
1080   /* distribute row lengths to all processors */
1081   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1082   offlens = ourlens + (rend-rstart);
1083   if (!rank) {
1084     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1085     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1086     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1087     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1088     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1089     PetscFree(sndcounts);
1090   }
1091   else {
1092     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1093   }
1094 
1095   if (!rank) {
1096     /* calculate the number of nonzeros on each processor */
1097     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1098     PetscMemzero(procsnz,size*sizeof(int));
1099     for ( i=0; i<size; i++ ) {
1100       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1101         procsnz[i] += rowlengths[j];
1102       }
1103     }
1104     PetscFree(rowlengths);
1105 
1106     /* determine max buffer needed and allocate it */
1107     maxnz = 0;
1108     for ( i=0; i<size; i++ ) {
1109       maxnz = PetscMax(maxnz,procsnz[i]);
1110     }
1111     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1112 
1113     /* read in my part of the matrix column indices  */
1114     nz = procsnz[0];
1115     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1116     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1117 
1118     /* read in every one elses and ship off */
1119     for ( i=1; i<size; i++ ) {
1120       nz = procsnz[i];
1121       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1122       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1123     }
1124     PetscFree(cols);
1125   }
1126   else {
1127     /* determine buffer space needed for message */
1128     nz = 0;
1129     for ( i=0; i<m; i++ ) {
1130       nz += ourlens[i];
1131     }
1132     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1133 
1134     /* receive message of column indices*/
1135     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1136     MPI_Get_count(&status,MPI_INT,&maxnz);
1137     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1138   }
1139 
1140   /* loop over local rows, determining number of off diagonal entries */
1141   PetscMemzero(offlens,m*sizeof(int));
1142   jj = 0;
1143   for ( i=0; i<m; i++ ) {
1144     for ( j=0; j<ourlens[i]; j++ ) {
1145       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1146       jj++;
1147     }
1148   }
1149 
1150   /* create our matrix */
1151   for ( i=0; i<m; i++ ) {
1152     ourlens[i] -= offlens[i];
1153   }
1154   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1155   A = *newmat;
1156   for ( i=0; i<m; i++ ) {
1157     ourlens[i] += offlens[i];
1158   }
1159 
1160   if (!rank) {
1161     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1162 
1163     /* read in my part of the matrix numerical values  */
1164     nz = procsnz[0];
1165     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1166 
1167     /* insert into matrix */
1168     jj      = rstart;
1169     smycols = mycols;
1170     svals   = vals;
1171     for ( i=0; i<m; i++ ) {
1172       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1173       smycols += ourlens[i];
1174       svals   += ourlens[i];
1175       jj++;
1176     }
1177 
1178     /* read in other processors and ship out */
1179     for ( i=1; i<size; i++ ) {
1180       nz = procsnz[i];
1181       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1182       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1183     }
1184     PetscFree(procsnz);
1185   }
1186   else {
1187     /* receive numeric values */
1188     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1189 
1190     /* receive message of values*/
1191     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1192     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1193     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1194 
1195     /* insert into matrix */
1196     jj      = rstart;
1197     smycols = mycols;
1198     svals   = vals;
1199     for ( i=0; i<m; i++ ) {
1200       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1201       smycols += ourlens[i];
1202       svals   += ourlens[i];
1203       jj++;
1204     }
1205   }
1206   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1207 
1208   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1209   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1210   return 0;
1211 }
1212 
1213 
1214 
1215 
1216 
1217