xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 70f55243aafb320636e2a54ff30cab5d1e8d3d7b)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.44 1996/08/06 04:02:14 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int ierr;
83 
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   return 0;
86 }
87 
88 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
89 {
90   return 0;
91 }
92 
93 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
94 {
95   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
96   MPI_Comm     comm = mat->comm;
97   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
98   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
99   int          tag = mat->tag, *owner,*starts,count,ierr;
100   InsertMode   addv;
101   MPI_Request  *send_waits,*recv_waits;
102   Scalar       *rvalues,*svalues;
103 
104   /* make sure all processors are either in INSERTMODE or ADDMODE */
105   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
106   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
107     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
108     }
109   mdn->insertmode = addv; /* in case this processor had no cache */
110 
111   /*  first count number of contributors to each processor */
112   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
113   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
114   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
115   for ( i=0; i<mdn->stash.n; i++ ) {
116     idx = mdn->stash.idx[i];
117     for ( j=0; j<size; j++ ) {
118       if (idx >= owners[j] && idx < owners[j+1]) {
119         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
120       }
121     }
122   }
123   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
124 
125   /* inform other processors of number of messages and max length*/
126   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
127   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
128   nreceives = work[rank];
129   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
130   nmax = work[rank];
131   PetscFree(work);
132 
133   /* post receives:
134        1) each message will consist of ordered pairs
135      (global index,value) we store the global index as a double
136      to simplify the message passing.
137        2) since we don't know how long each individual message is we
138      allocate the largest needed buffer for each receive. Potentially
139      this is a lot of wasted space.
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<mdn->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
167   }
168   PetscFree(owner);
169   starts[0] = 0;
170   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<size; i++ ) {
173     if (procs[i]) {
174       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PetscFree(starts); PetscFree(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
182 
183   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
184   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
185   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
186   mdn->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIDense(Mat);
191 
192 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
193 {
194   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
195   MPI_Status   *send_status,recv_status;
196   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
197   Scalar       *values,val;
198   InsertMode   addv = mdn->insertmode;
199 
200   /*  wait on receives */
201   while (count) {
202     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
203     /* unpack receives into our local space */
204     values = mdn->rvalues + 3*imdex*mdn->rmax;
205     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
206     n = n/3;
207     for ( i=0; i<n; i++ ) {
208       row = (int) PetscReal(values[3*i]) - mdn->rstart;
209       col = (int) PetscReal(values[3*i+1]);
210       val = values[3*i+2];
211       if (col >= 0 && col < mdn->N) {
212         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
213       }
214       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
215     }
216     count--;
217   }
218   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
219 
220   /* wait on sends */
221   if (mdn->nsends) {
222     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
223     CHKPTRQ(send_status);
224     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
225     PetscFree(send_status);
226   }
227   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
228 
229   mdn->insertmode = NOT_SET_VALUES;
230   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
231   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
232 
233   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
234     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
235   }
236   return 0;
237 }
238 
239 static int MatZeroEntries_MPIDense(Mat A)
240 {
241   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
242   return MatZeroEntries(l->A);
243 }
244 
245 /* the code does not do the diagonal entries correctly unless the
246    matrix is square and the column and row owerships are identical.
247    This is a BUG. The only way to fix it seems to be to access
248    mdn->A and mdn->B directly and not through the MatZeroRows()
249    routine.
250 */
251 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
252 {
253   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
254   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
255   int            *procs,*nprocs,j,found,idx,nsends,*work;
256   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
257   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
258   int            *lens,imdex,*lrows,*values;
259   MPI_Comm       comm = A->comm;
260   MPI_Request    *send_waits,*recv_waits;
261   MPI_Status     recv_status,*send_status;
262   IS             istmp;
263 
264   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
265   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
266 
267   /*  first count number of contributors to each processor */
268   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
269   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
270   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
271   for ( i=0; i<N; i++ ) {
272     idx = rows[i];
273     found = 0;
274     for ( j=0; j<size; j++ ) {
275       if (idx >= owners[j] && idx < owners[j+1]) {
276         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
277       }
278     }
279     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
280   }
281   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
282 
283   /* inform other processors of number of messages and max length*/
284   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
285   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
286   nrecvs = work[rank];
287   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
288   nmax = work[rank];
289   PetscFree(work);
290 
291   /* post receives:   */
292   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
293   CHKPTRQ(rvalues);
294   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
295   CHKPTRQ(recv_waits);
296   for ( i=0; i<nrecvs; i++ ) {
297     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
298   }
299 
300   /* do sends:
301       1) starts[i] gives the starting index in svalues for stuff going to
302          the ith processor
303   */
304   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
305   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
306   CHKPTRQ(send_waits);
307   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0] = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
364     CHKPTRQ(send_status);
365     MPI_Waitall(nsends,send_waits,send_status);
366     PetscFree(send_status);
367   }
368   PetscFree(send_waits); PetscFree(svalues);
369 
370   return 0;
371 }
372 
373 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
374 {
375   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
376   int          ierr;
377 
378   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
379   CHKERRQ(ierr);
380   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
381   CHKERRQ(ierr);
382   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
383   return 0;
384 }
385 
386 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
387 {
388   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
389   int          ierr;
390 
391   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
392   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
393   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
394   return 0;
395 }
396 
397 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
398 {
399   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
400   int          ierr;
401   Scalar       zero = 0.0;
402 
403   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
404   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
406          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
407   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   return 0;
410 }
411 
412 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   int          ierr;
416 
417   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
418   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
419   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
420          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
421   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
422          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
423   return 0;
424 }
425 
426 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
427 {
428   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
429   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
430   int          ierr, len, i, n, m = a->m, radd;
431   Scalar       *x, zero = 0.0;
432 
433   VecSet(&zero,v);
434   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
435   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
436   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
437   len = PetscMin(aloc->m,aloc->n);
438   radd = a->rstart*m;
439   for ( i=0; i<len; i++ ) {
440     x[i] = aloc->v[radd + i*m + i];
441   }
442   return 0;
443 }
444 
445 static int MatDestroy_MPIDense(PetscObject obj)
446 {
447   Mat          mat = (Mat) obj;
448   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
449   int          ierr;
450 
451 #if defined(PETSC_LOG)
452   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
453 #endif
454   PetscFree(mdn->rowners);
455   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
456   if (mdn->lvec)   VecDestroy(mdn->lvec);
457   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
458   if (mdn->factor) {
459     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
460     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
461     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
462     PetscFree(mdn->factor);
463   }
464   PetscFree(mdn);
465   PLogObjectDestroy(mat);
466   PetscHeaderDestroy(mat);
467   return 0;
468 }
469 
470 #include "pinclude/pviewer.h"
471 
472 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
473 {
474   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
475   int          ierr;
476   if (mdn->size == 1) {
477     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
478   }
479   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
480   return 0;
481 }
482 
483 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
484 {
485   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
486   int          ierr, format;
487   FILE         *fd;
488   ViewerType   vtype;
489 
490   ViewerGetType(viewer,&vtype);
491   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
492   ierr = ViewerGetFormat(viewer,&format);
493   if (format == ASCII_FORMAT_INFO_DETAILED) {
494     int nz, nzalloc, mem, rank;
495     MPI_Comm_rank(mat->comm,&rank);
496     ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
497     PetscSequentialPhaseBegin(mat->comm,1);
498       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
499           rank,mdn->m,nz,nzalloc,mem);
500       fflush(fd);
501     PetscSequentialPhaseEnd(mat->comm,1);
502     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
503     return 0;
504   }
505   else if (format == ASCII_FORMAT_INFO) {
506     return 0;
507   }
508   if (vtype == ASCII_FILE_VIEWER) {
509     PetscSequentialPhaseBegin(mat->comm,1);
510     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
511              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
512     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
513     fflush(fd);
514     PetscSequentialPhaseEnd(mat->comm,1);
515   }
516   else {
517     int size = mdn->size, rank = mdn->rank;
518     if (size == 1) {
519       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
520     }
521     else {
522       /* assemble the entire matrix onto first processor. */
523       Mat          A;
524       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
525       Scalar       *vals;
526       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
527 
528       if (!rank) {
529         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
530       }
531       else {
532         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
533       }
534       PLogObjectParent(mat,A);
535 
536       /* Copy the matrix ... This isn't the most efficient means,
537          but it's quick for now */
538       row = mdn->rstart; m = Amdn->m;
539       for ( i=0; i<m; i++ ) {
540         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
541         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
542         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
543         row++;
544       }
545 
546       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
547       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
548       if (!rank) {
549         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
550       }
551       ierr = MatDestroy(A); CHKERRQ(ierr);
552     }
553   }
554   return 0;
555 }
556 
557 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
558 {
559   Mat          mat = (Mat) obj;
560   int          ierr;
561   ViewerType   vtype;
562 
563   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
564   if (vtype == ASCII_FILE_VIEWER) {
565     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
566   }
567   else if (vtype == ASCII_FILES_VIEWER) {
568     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
569   }
570   else if (vtype == BINARY_FILE_VIEWER) {
571     return MatView_MPIDense_Binary(mat,viewer);
572   }
573   return 0;
574 }
575 
576 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
577                              int *nzalloc,int *mem)
578 {
579   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
580   Mat          mdn = mat->A;
581   int          ierr, isend[3], irecv[3];
582 
583   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
584   if (flag == MAT_LOCAL) {
585     if (nz)      *nz      = isend[0];
586     if (nzalloc) *nzalloc = isend[1];
587     if (mem)     *mem     = isend[2];
588   } else if (flag == MAT_GLOBAL_MAX) {
589     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
590     if (nz)      *nz      = irecv[0];
591     if (nzalloc) *nzalloc = irecv[1];
592     if (mem)     *mem     = irecv[2];
593   } else if (flag == MAT_GLOBAL_SUM) {
594     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
595     if (nz)      *nz      = irecv[0];
596     if (nzalloc) *nzalloc = irecv[1];
597     if (mem)     *mem     = irecv[2];
598   }
599   return 0;
600 }
601 
602 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
603    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
604    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
605    extern int MatSolve_MPIDense(Mat,Vec,Vec);
606    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
607    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
608    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
609 
610 static int MatSetOption_MPIDense(Mat A,MatOption op)
611 {
612   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
613 
614   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
615       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
616       op == MAT_COLUMNS_SORTED ||
617       op == MAT_ROW_ORIENTED) {
618         MatSetOption(a->A,op);
619   }
620   else if (op == MAT_ROWS_SORTED ||
621            op == MAT_SYMMETRIC ||
622            op == MAT_STRUCTURALLY_SYMMETRIC ||
623            op == MAT_YES_NEW_DIAGONALS)
624     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
625   else if (op == MAT_COLUMN_ORIENTED)
626     { a->roworiented = 0; MatSetOption(a->A,op);}
627   else if (op == MAT_NO_NEW_DIAGONALS)
628     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:MAT_NO_NEW_DIAGONALS");}
629   else
630     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
631   return 0;
632 }
633 
634 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
635 {
636   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
637   *m = mat->M; *n = mat->N;
638   return 0;
639 }
640 
641 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
642 {
643   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
644   *m = mat->m; *n = mat->N;
645   return 0;
646 }
647 
648 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
649 {
650   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
651   *m = mat->rstart; *n = mat->rend;
652   return 0;
653 }
654 
655 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
656 {
657   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
658   int          lrow, rstart = mat->rstart, rend = mat->rend;
659 
660   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
661   lrow = row - rstart;
662   return MatGetRow(mat->A,lrow,nz,idx,v);
663 }
664 
665 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
666 {
667   if (idx) PetscFree(*idx);
668   if (v) PetscFree(*v);
669   return 0;
670 }
671 
672 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
673 {
674   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
675   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
676   int          ierr, i, j;
677   double       sum = 0.0;
678   Scalar       *v = mat->v;
679 
680   if (mdn->size == 1) {
681     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
682   } else {
683     if (type == NORM_FROBENIUS) {
684       for (i=0; i<mat->n*mat->m; i++ ) {
685 #if defined(PETSC_COMPLEX)
686         sum += real(conj(*v)*(*v)); v++;
687 #else
688         sum += (*v)*(*v); v++;
689 #endif
690       }
691       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
692       *norm = sqrt(*norm);
693       PLogFlops(2*mat->n*mat->m);
694     }
695     else if (type == NORM_1) {
696       double *tmp, *tmp2;
697       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
698       tmp2 = tmp + mdn->N;
699       PetscMemzero(tmp,2*mdn->N*sizeof(double));
700       *norm = 0.0;
701       v = mat->v;
702       for ( j=0; j<mat->n; j++ ) {
703         for ( i=0; i<mat->m; i++ ) {
704           tmp[j] += PetscAbsScalar(*v);  v++;
705         }
706       }
707       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
708       for ( j=0; j<mdn->N; j++ ) {
709         if (tmp2[j] > *norm) *norm = tmp2[j];
710       }
711       PetscFree(tmp);
712       PLogFlops(mat->n*mat->m);
713     }
714     else if (type == NORM_INFINITY) { /* max row norm */
715       double ntemp;
716       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
717       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
718     }
719     else {
720       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
721     }
722   }
723   return 0;
724 }
725 
726 static int MatTranspose_MPIDense(Mat A,Mat *matout)
727 {
728   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
729   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
730   Mat          B;
731   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
732   int          j, i, ierr;
733   Scalar       *v;
734 
735   if (matout == PETSC_NULL && M != N)
736     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
737   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
738          CHKERRQ(ierr);
739 
740   m = Aloc->m; n = Aloc->n; v = Aloc->v;
741   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
742   for ( j=0; j<n; j++ ) {
743     for (i=0; i<m; i++) rwork[i] = rstart + i;
744     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
745     v += m;
746   }
747   PetscFree(rwork);
748   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
749   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
750   if (matout != PETSC_NULL) {
751     *matout = B;
752   } else {
753     /* This isn't really an in-place transpose, but free data struct from a */
754     PetscFree(a->rowners);
755     ierr = MatDestroy(a->A); CHKERRQ(ierr);
756     if (a->lvec) VecDestroy(a->lvec);
757     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
758     PetscFree(a);
759     PetscMemcpy(A,B,sizeof(struct _Mat));
760     PetscHeaderDestroy(B);
761   }
762   return 0;
763 }
764 
765 #include "pinclude/plapack.h"
766 static int MatScale_MPIDense(Scalar *alpha,Mat inA)
767 {
768   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
769   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
770   int          one = 1, nz;
771 
772   nz = a->m*a->n;
773   BLscal_( &nz, alpha, a->v, &one );
774   PLogFlops(nz);
775   return 0;
776 }
777 
778 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
779 
780 /* -------------------------------------------------------------------*/
781 static struct _MatOps MatOps = {MatSetValues_MPIDense,
782        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
783        MatMult_MPIDense,MatMultAdd_MPIDense,
784        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
785 /*       MatSolve_MPIDense,0, */
786        0,0,
787        0,0,
788        0,0,
789 /*       MatLUFactor_MPIDense,0, */
790        0,MatTranspose_MPIDense,
791        MatGetInfo_MPIDense,0,
792        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
793        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
794        0,
795        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
796        0,0,0,
797 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
798        0,0,
799        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
800        MatGetOwnershipRange_MPIDense,
801        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
802        0,MatConvertSameType_MPIDense,
803        0,0,0,0,0,
804        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense};
805 
806 /*@C
807    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
808 
809    Input Parameters:
810 .  comm - MPI communicator
811 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
812 .  n - number of local columns (or PETSC_DECIDE to have calculated
813            if N is given)
814 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
815 .  N - number of global columns (or PETSC_DECIDE to have calculated
816            if n is given)
817 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
818    to control all matrix memory allocation.
819 
820    Output Parameter:
821 .  A - the matrix
822 
823    Notes:
824    The dense format is fully compatible with standard Fortran 77
825    storage by columns.
826 
827    The data input variable is intended primarily for Fortran programmers
828    who wish to allocate their own matrix memory space.  Most users should
829    set data=PETSC_NULL.
830 
831    The user MUST specify either the local or global matrix dimensions
832    (possibly both).
833 
834    Currently, the only parallel dense matrix decomposition is by rows,
835    so that n=N and each submatrix owns all of the global columns.
836 
837 .keywords: matrix, dense, parallel
838 
839 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
840 @*/
841 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
842 {
843   Mat          mat;
844   Mat_MPIDense *a;
845   int          ierr, i,flg;
846 
847   /* Note:  For now, when data is specified above, this assumes the user correctly
848    allocates the local dense storage space.  We should add error checking. */
849 
850   *A = 0;
851   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
852   PLogObjectCreate(mat);
853   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
854   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
855   mat->destroy    = MatDestroy_MPIDense;
856   mat->view       = MatView_MPIDense;
857   mat->factor     = 0;
858 
859   a->factor = 0;
860   a->insertmode = NOT_SET_VALUES;
861   MPI_Comm_rank(comm,&a->rank);
862   MPI_Comm_size(comm,&a->size);
863 
864   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
865   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
866 
867   /* each row stores all columns */
868   if (N == PETSC_DECIDE) N = n;
869   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
870   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
871   a->N = mat->N = N;
872   a->M = mat->M = M;
873   a->m = mat->m = m;
874   a->n = mat->n = n;
875 
876   /* build local table of row and column ownerships */
877   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
878   a->cowners = a->rowners + a->size + 1;
879   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
880                        sizeof(Mat_MPIDense));
881   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
882   a->rowners[0] = 0;
883   for ( i=2; i<=a->size; i++ ) {
884     a->rowners[i] += a->rowners[i-1];
885   }
886   a->rstart = a->rowners[a->rank];
887   a->rend   = a->rowners[a->rank+1];
888   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
889   a->cowners[0] = 0;
890   for ( i=2; i<=a->size; i++ ) {
891     a->cowners[i] += a->cowners[i-1];
892   }
893 
894   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
895   PLogObjectParent(mat,a->A);
896 
897   /* build cache for off array entries formed */
898   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
899 
900   /* stuff used for matrix vector multiply */
901   a->lvec        = 0;
902   a->Mvctx       = 0;
903   a->roworiented = 1;
904 
905   *A = mat;
906   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
907   if (flg) {
908     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
909   }
910   return 0;
911 }
912 
913 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
914 {
915   Mat          mat;
916   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
917   int          ierr;
918   FactorCtx    *factor;
919 
920   *newmat       = 0;
921   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
922   PLogObjectCreate(mat);
923   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
924   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
925   mat->destroy  = MatDestroy_MPIDense;
926   mat->view     = MatView_MPIDense;
927   mat->factor   = A->factor;
928   mat->assembled  = PETSC_TRUE;
929 
930   a->m = mat->m = oldmat->m;
931   a->n = mat->n = oldmat->n;
932   a->M = mat->M = oldmat->M;
933   a->N = mat->N = oldmat->N;
934   if (oldmat->factor) {
935     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
936     /* copy factor contents ... add this code! */
937   } else a->factor = 0;
938 
939   a->rstart     = oldmat->rstart;
940   a->rend       = oldmat->rend;
941   a->size       = oldmat->size;
942   a->rank       = oldmat->rank;
943   a->insertmode = NOT_SET_VALUES;
944 
945   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
946   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
947   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
948   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
949 
950   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
951   PLogObjectParent(mat,a->lvec);
952   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
953   PLogObjectParent(mat,a->Mvctx);
954   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
955   PLogObjectParent(mat,a->A);
956   *newmat = mat;
957   return 0;
958 }
959 
960 #include "sys.h"
961 
962 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
963 {
964   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
965   Scalar     *array,*vals,*vals_ptr;
966   MPI_Status status;
967 
968   MPI_Comm_rank(comm,&rank);
969   MPI_Comm_size(comm,&size);
970 
971   /* determine ownership of all rows */
972   m = M/size + ((M % size) > rank);
973   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
974   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
975   rowners[0] = 0;
976   for ( i=2; i<=size; i++ ) {
977     rowners[i] += rowners[i-1];
978   }
979   rstart = rowners[rank];
980   rend   = rowners[rank+1];
981 
982   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
983   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
984 
985   if (!rank) {
986     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
987 
988     /* read in my part of the matrix numerical values  */
989     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
990 
991     /* insert into matrix-by row (this is why cannot directly read into array */
992     vals_ptr = vals;
993     for ( i=0; i<m; i++ ) {
994       for ( j=0; j<N; j++ ) {
995         array[i + j*m] = *vals_ptr++;
996       }
997     }
998 
999     /* read in other processors and ship out */
1000     for ( i=1; i<size; i++ ) {
1001       nz   = (rowners[i+1] - rowners[i])*N;
1002       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1003       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
1004     }
1005   }
1006   else {
1007     /* receive numeric values */
1008     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1009 
1010     /* receive message of values*/
1011     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1012 
1013     /* insert into matrix-by row (this is why cannot directly read into array */
1014     vals_ptr = vals;
1015     for ( i=0; i<m; i++ ) {
1016       for ( j=0; j<N; j++ ) {
1017         array[i + j*m] = *vals_ptr++;
1018       }
1019     }
1020   }
1021   PetscFree(rowners);
1022   PetscFree(vals);
1023   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1024   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1025   return 0;
1026 }
1027 
1028 
1029 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1030 {
1031   Mat          A;
1032   int          i, nz, ierr, j,rstart, rend, fd;
1033   Scalar       *vals,*svals;
1034   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1035   MPI_Status   status;
1036   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1037   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1038   int          tag = ((PetscObject)viewer)->tag;
1039 
1040   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1041   if (!rank) {
1042     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1043     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1044     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1045   }
1046 
1047   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1048   M = header[1]; N = header[2]; nz = header[3];
1049 
1050   /*
1051        Handle case where matrix is stored on disk as a dense matrix
1052   */
1053   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1054     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1055   }
1056 
1057   /* determine ownership of all rows */
1058   m = M/size + ((M % size) > rank);
1059   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1060   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1061   rowners[0] = 0;
1062   for ( i=2; i<=size; i++ ) {
1063     rowners[i] += rowners[i-1];
1064   }
1065   rstart = rowners[rank];
1066   rend   = rowners[rank+1];
1067 
1068   /* distribute row lengths to all processors */
1069   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1070   offlens = ourlens + (rend-rstart);
1071   if (!rank) {
1072     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1073     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1074     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1075     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1076     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1077     PetscFree(sndcounts);
1078   }
1079   else {
1080     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1081   }
1082 
1083   if (!rank) {
1084     /* calculate the number of nonzeros on each processor */
1085     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1086     PetscMemzero(procsnz,size*sizeof(int));
1087     for ( i=0; i<size; i++ ) {
1088       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1089         procsnz[i] += rowlengths[j];
1090       }
1091     }
1092     PetscFree(rowlengths);
1093 
1094     /* determine max buffer needed and allocate it */
1095     maxnz = 0;
1096     for ( i=0; i<size; i++ ) {
1097       maxnz = PetscMax(maxnz,procsnz[i]);
1098     }
1099     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1100 
1101     /* read in my part of the matrix column indices  */
1102     nz = procsnz[0];
1103     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1104     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1105 
1106     /* read in every one elses and ship off */
1107     for ( i=1; i<size; i++ ) {
1108       nz = procsnz[i];
1109       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1110       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1111     }
1112     PetscFree(cols);
1113   }
1114   else {
1115     /* determine buffer space needed for message */
1116     nz = 0;
1117     for ( i=0; i<m; i++ ) {
1118       nz += ourlens[i];
1119     }
1120     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1121 
1122     /* receive message of column indices*/
1123     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1124     MPI_Get_count(&status,MPI_INT,&maxnz);
1125     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1126   }
1127 
1128   /* loop over local rows, determining number of off diagonal entries */
1129   PetscMemzero(offlens,m*sizeof(int));
1130   jj = 0;
1131   for ( i=0; i<m; i++ ) {
1132     for ( j=0; j<ourlens[i]; j++ ) {
1133       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1134       jj++;
1135     }
1136   }
1137 
1138   /* create our matrix */
1139   for ( i=0; i<m; i++ ) {
1140     ourlens[i] -= offlens[i];
1141   }
1142   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1143   A = *newmat;
1144   for ( i=0; i<m; i++ ) {
1145     ourlens[i] += offlens[i];
1146   }
1147 
1148   if (!rank) {
1149     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1150 
1151     /* read in my part of the matrix numerical values  */
1152     nz = procsnz[0];
1153     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1154 
1155     /* insert into matrix */
1156     jj      = rstart;
1157     smycols = mycols;
1158     svals   = vals;
1159     for ( i=0; i<m; i++ ) {
1160       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1161       smycols += ourlens[i];
1162       svals   += ourlens[i];
1163       jj++;
1164     }
1165 
1166     /* read in other processors and ship out */
1167     for ( i=1; i<size; i++ ) {
1168       nz = procsnz[i];
1169       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1170       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1171     }
1172     PetscFree(procsnz);
1173   }
1174   else {
1175     /* receive numeric values */
1176     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1177 
1178     /* receive message of values*/
1179     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1180     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1181     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1182 
1183     /* insert into matrix */
1184     jj      = rstart;
1185     smycols = mycols;
1186     svals   = vals;
1187     for ( i=0; i<m; i++ ) {
1188       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1189       smycols += ourlens[i];
1190       svals   += ourlens[i];
1191       jj++;
1192     }
1193   }
1194   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1195 
1196   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1197   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1198   return 0;
1199 }
1200 
1201 
1202 
1203 
1204 
1205