xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 8d1cd36bbae09094f8343e481f39717c4ade5555)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.37 1996/04/07 22:45:20 curfman Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int ierr;
83 
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   return 0;
86 }
87 
88 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
89 {
90   return 0;
91 }
92 
93 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
94 {
95   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
96   MPI_Comm     comm = mat->comm;
97   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
98   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
99   int          tag = mat->tag, *owner,*starts,count,ierr;
100   InsertMode   addv;
101   MPI_Request  *send_waits,*recv_waits;
102   Scalar       *rvalues,*svalues;
103 
104   /* make sure all processors are either in INSERTMODE or ADDMODE */
105   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
106   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
107     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
108     }
109   mdn->insertmode = addv; /* in case this processor had no cache */
110 
111   /*  first count number of contributors to each processor */
112   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
113   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
114   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
115   for ( i=0; i<mdn->stash.n; i++ ) {
116     idx = mdn->stash.idx[i];
117     for ( j=0; j<size; j++ ) {
118       if (idx >= owners[j] && idx < owners[j+1]) {
119         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
120       }
121     }
122   }
123   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
124 
125   /* inform other processors of number of messages and max length*/
126   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
127   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
128   nreceives = work[rank];
129   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
130   nmax = work[rank];
131   PetscFree(work);
132 
133   /* post receives:
134        1) each message will consist of ordered pairs
135      (global index,value) we store the global index as a double
136      to simplify the message passing.
137        2) since we don't know how long each individual message is we
138      allocate the largest needed buffer for each receive. Potentially
139      this is a lot of wasted space.
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<mdn->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
167   }
168   PetscFree(owner);
169   starts[0] = 0;
170   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<size; i++ ) {
173     if (procs[i]) {
174       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PetscFree(starts); PetscFree(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
182 
183   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
184   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
185   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
186   mdn->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIDense(Mat);
191 
192 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
193 {
194   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
195   MPI_Status   *send_status,recv_status;
196   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
197   Scalar       *values,val;
198   InsertMode   addv = mdn->insertmode;
199 
200   /*  wait on receives */
201   while (count) {
202     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
203     /* unpack receives into our local space */
204     values = mdn->rvalues + 3*imdex*mdn->rmax;
205     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
206     n = n/3;
207     for ( i=0; i<n; i++ ) {
208       row = (int) PetscReal(values[3*i]) - mdn->rstart;
209       col = (int) PetscReal(values[3*i+1]);
210       val = values[3*i+2];
211       if (col >= 0 && col < mdn->N) {
212         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
213       }
214       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
215     }
216     count--;
217   }
218   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
219 
220   /* wait on sends */
221   if (mdn->nsends) {
222     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
223     CHKPTRQ(send_status);
224     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
225     PetscFree(send_status);
226   }
227   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
228 
229   mdn->insertmode = NOT_SET_VALUES;
230   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
231   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
232 
233   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
234     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
235   }
236   return 0;
237 }
238 
239 static int MatZeroEntries_MPIDense(Mat A)
240 {
241   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
242   return MatZeroEntries(l->A);
243 }
244 
245 /* the code does not do the diagonal entries correctly unless the
246    matrix is square and the column and row owerships are identical.
247    This is a BUG. The only way to fix it seems to be to access
248    mdn->A and mdn->B directly and not through the MatZeroRows()
249    routine.
250 */
251 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
252 {
253   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
254   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
255   int            *procs,*nprocs,j,found,idx,nsends,*work;
256   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
257   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
258   int            *lens,imdex,*lrows,*values;
259   MPI_Comm       comm = A->comm;
260   MPI_Request    *send_waits,*recv_waits;
261   MPI_Status     recv_status,*send_status;
262   IS             istmp;
263 
264   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
265   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
266 
267   /*  first count number of contributors to each processor */
268   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
269   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
270   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
271   for ( i=0; i<N; i++ ) {
272     idx = rows[i];
273     found = 0;
274     for ( j=0; j<size; j++ ) {
275       if (idx >= owners[j] && idx < owners[j+1]) {
276         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
277       }
278     }
279     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
280   }
281   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
282 
283   /* inform other processors of number of messages and max length*/
284   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
285   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
286   nrecvs = work[rank];
287   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
288   nmax = work[rank];
289   PetscFree(work);
290 
291   /* post receives:   */
292   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
293   CHKPTRQ(rvalues);
294   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
295   CHKPTRQ(recv_waits);
296   for ( i=0; i<nrecvs; i++ ) {
297     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
298   }
299 
300   /* do sends:
301       1) starts[i] gives the starting index in svalues for stuff going to
302          the ith processor
303   */
304   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
305   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
306   CHKPTRQ(send_waits);
307   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0] = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
364     CHKPTRQ(send_status);
365     MPI_Waitall(nsends,send_waits,send_status);
366     PetscFree(send_status);
367   }
368   PetscFree(send_waits); PetscFree(svalues);
369 
370   return 0;
371 }
372 
373 extern int MatMult_SeqDense(Mat A,Vec,Vec);
374 extern int MatMultAdd_SeqDense(Mat A,Vec,Vec,Vec);
375 extern int MatMultTrans_SeqDense(Mat A,Vec,Vec);
376 extern int MatMultTransAdd_SeqDense(Mat A,Vec,Vec,Vec);
377 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
378 {
379   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
380   int          ierr;
381 
382   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
383   CHKERRQ(ierr);
384   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
385   CHKERRQ(ierr);
386   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
387   return 0;
388 }
389 
390 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
391 {
392   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
393   int          ierr;
394 
395   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
396   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
397   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
398   return 0;
399 }
400 
401 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
402 {
403   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
404   int          ierr;
405   Scalar       zero = 0.0;
406 
407   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
408   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
409   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
410          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
411   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
412          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
413   return 0;
414 }
415 
416 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
417 {
418   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
419   int          ierr;
420 
421   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
422   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
423   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
424          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
425   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
426          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
427   return 0;
428 }
429 
430 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
431 {
432   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
433   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
434   int          ierr, len, i, n, m = a->m, radd;
435   Scalar       *x, zero = 0.0;
436 
437   VecSet(&zero,v);
438   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
439   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
440   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
441   len = PetscMin(aloc->m,aloc->n);
442   radd = a->rstart*m*m;
443   for ( i=0; i<len; i++ ) {
444     x[i] = aloc->v[radd + i*m + i];
445   }
446   return 0;
447 }
448 
449 static int MatDestroy_MPIDense(PetscObject obj)
450 {
451   Mat          mat = (Mat) obj;
452   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
453   int          ierr;
454 
455 #if defined(PETSC_LOG)
456   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
457 #endif
458   PetscFree(mdn->rowners);
459   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
460   if (mdn->lvec)   VecDestroy(mdn->lvec);
461   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
462   if (mdn->factor) {
463     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
464     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
465     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
466     PetscFree(mdn->factor);
467   }
468   PetscFree(mdn);
469   PLogObjectDestroy(mat);
470   PetscHeaderDestroy(mat);
471   return 0;
472 }
473 
474 #include "pinclude/pviewer.h"
475 
476 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
477 {
478   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
479   int          ierr;
480   if (mdn->size == 1) {
481     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
482   }
483   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
484   return 0;
485 }
486 
487 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
488 {
489   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
490   int          ierr, format;
491   FILE         *fd;
492   ViewerType   vtype;
493 
494   ViewerGetType(viewer,&vtype);
495   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
496   ierr = ViewerGetFormat(viewer,&format);
497   if (format == ASCII_FORMAT_INFO_DETAILED) {
498     int nz, nzalloc, mem, rank;
499     MPI_Comm_rank(mat->comm,&rank);
500     ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
501     PetscSequentialPhaseBegin(mat->comm,1);
502       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
503           rank,mdn->m,nz,nzalloc,mem);
504       fflush(fd);
505     PetscSequentialPhaseEnd(mat->comm,1);
506     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
507     return 0;
508   }
509   else if (format == ASCII_FORMAT_INFO) {
510     return 0;
511   }
512   if (vtype == ASCII_FILE_VIEWER) {
513     PetscSequentialPhaseBegin(mat->comm,1);
514     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
515              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
516     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
517     fflush(fd);
518     PetscSequentialPhaseEnd(mat->comm,1);
519   }
520   else {
521     int size = mdn->size, rank = mdn->rank;
522     if (size == 1) {
523       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
524     }
525     else {
526       /* assemble the entire matrix onto first processor. */
527       Mat          A;
528       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
529       Scalar       *vals;
530       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
531 
532       if (!rank) {
533         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
534       }
535       else {
536         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
537       }
538       PLogObjectParent(mat,A);
539 
540       /* Copy the matrix ... This isn't the most efficient means,
541          but it's quick for now */
542       row = mdn->rstart; m = Amdn->m;
543       for ( i=0; i<m; i++ ) {
544         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
545         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
546         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
547         row++;
548       }
549 
550       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
551       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
552       if (!rank) {
553         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
554       }
555       ierr = MatDestroy(A); CHKERRQ(ierr);
556     }
557   }
558   return 0;
559 }
560 
561 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
562 {
563   Mat          mat = (Mat) obj;
564   int          ierr;
565   ViewerType   vtype;
566 
567   if (!viewer) {
568     viewer = STDOUT_VIEWER_SELF;
569   }
570   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
571   if (vtype == ASCII_FILE_VIEWER) {
572     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
573   }
574   else if (vtype == ASCII_FILES_VIEWER) {
575     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
576   }
577   else if (vtype == BINARY_FILE_VIEWER) {
578     return MatView_MPIDense_Binary(mat,viewer);
579   }
580   return 0;
581 }
582 
583 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
584                              int *nzalloc,int *mem)
585 {
586   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
587   Mat          mdn = mat->A;
588   int          ierr, isend[3], irecv[3];
589 
590   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
591   if (flag == MAT_LOCAL) {
592     if (nz)      *nz      = isend[0];
593     if (nzalloc) *nzalloc = isend[1];
594     if (mem)     *mem     = isend[2];
595   } else if (flag == MAT_GLOBAL_MAX) {
596     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
597     if (nz)      *nz      = irecv[0];
598     if (nzalloc) *nzalloc = irecv[1];
599     if (mem)     *mem     = irecv[2];
600   } else if (flag == MAT_GLOBAL_SUM) {
601     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
602     if (nz)      *nz      = irecv[0];
603     if (nzalloc) *nzalloc = irecv[1];
604     if (mem)     *mem     = irecv[2];
605   }
606   return 0;
607 }
608 
609 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
610    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
611    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
612    extern int MatSolve_MPIDense(Mat,Vec,Vec);
613    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
614    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
615    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
616 
617 static int MatSetOption_MPIDense(Mat A,MatOption op)
618 {
619   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
620 
621   if (op == NO_NEW_NONZERO_LOCATIONS ||
622       op == YES_NEW_NONZERO_LOCATIONS ||
623       op == COLUMNS_SORTED ||
624       op == ROW_ORIENTED) {
625         MatSetOption(a->A,op);
626   }
627   else if (op == ROWS_SORTED ||
628            op == SYMMETRIC_MATRIX ||
629            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
630            op == YES_NEW_DIAGONALS)
631     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
632   else if (op == COLUMN_ORIENTED)
633     { a->roworiented = 0; MatSetOption(a->A,op);}
634   else if (op == NO_NEW_DIAGONALS)
635     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
636   else
637     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
638   return 0;
639 }
640 
641 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
642 {
643   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
644   *m = mat->M; *n = mat->N;
645   return 0;
646 }
647 
648 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
649 {
650   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
651   *m = mat->m; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
656 {
657   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
658   *m = mat->rstart; *n = mat->rend;
659   return 0;
660 }
661 
662 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
663 {
664   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
665   int          lrow, rstart = mat->rstart, rend = mat->rend;
666 
667   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
668   lrow = row - rstart;
669   return MatGetRow(mat->A,lrow,nz,idx,v);
670 }
671 
672 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
673 {
674   if (idx) PetscFree(*idx);
675   if (v) PetscFree(*v);
676   return 0;
677 }
678 
679 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
680 {
681   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
682   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
683   int          ierr, i, j;
684   double       sum = 0.0;
685   Scalar       *v = mat->v;
686 
687   if (mdn->size == 1) {
688     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
689   } else {
690     if (type == NORM_FROBENIUS) {
691       for (i=0; i<mat->n*mat->m; i++ ) {
692 #if defined(PETSC_COMPLEX)
693         sum += real(conj(*v)*(*v)); v++;
694 #else
695         sum += (*v)*(*v); v++;
696 #endif
697       }
698       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
699       *norm = sqrt(*norm);
700       PLogFlops(2*mat->n*mat->m);
701     }
702     else if (type == NORM_1) {
703       double *tmp, *tmp2;
704       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
705       tmp2 = tmp + mdn->N;
706       PetscMemzero(tmp,2*mdn->N*sizeof(double));
707       *norm = 0.0;
708       v = mat->v;
709       for ( j=0; j<mat->n; j++ ) {
710         for ( i=0; i<mat->m; i++ ) {
711           tmp[j] += PetscAbsScalar(*v);  v++;
712         }
713       }
714       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
715       for ( j=0; j<mdn->N; j++ ) {
716         if (tmp2[j] > *norm) *norm = tmp2[j];
717       }
718       PetscFree(tmp);
719       PLogFlops(mat->n*mat->m);
720     }
721     else if (type == NORM_INFINITY) { /* max row norm */
722       double ntemp;
723       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
724       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
725     }
726     else {
727       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
728     }
729   }
730   return 0;
731 }
732 
733 static int MatTranspose_MPIDense(Mat A,Mat *matout)
734 {
735   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
736   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
737   Mat          B;
738   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
739   int          j, i, ierr;
740   Scalar       *v;
741 
742   if (matout == PETSC_NULL && M != N)
743     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
744   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
745          CHKERRQ(ierr);
746 
747   m = Aloc->m; n = Aloc->n; v = Aloc->v;
748   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
749   for ( j=0; j<n; j++ ) {
750     for (i=0; i<m; i++) rwork[i] = rstart + i;
751     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
752     v += m;
753   }
754   PetscFree(rwork);
755   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
756   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
757   if (matout != PETSC_NULL) {
758     *matout = B;
759   } else {
760     /* This isn't really an in-place transpose, but free data struct from a */
761     PetscFree(a->rowners);
762     ierr = MatDestroy(a->A); CHKERRQ(ierr);
763     if (a->lvec) VecDestroy(a->lvec);
764     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
765     PetscFree(a);
766     PetscMemcpy(A,B,sizeof(struct _Mat));
767     PetscHeaderDestroy(B);
768   }
769   return 0;
770 }
771 
772 #include "pinclude/plapack.h"
773 static int MatScale_MPIDense(Scalar *alpha,Mat inA)
774 {
775   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
776   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
777   int          one = 1, nz;
778 
779   nz = a->m*a->n;
780   BLscal_( &nz, alpha, a->v, &one );
781   PLogFlops(nz);
782   return 0;
783 }
784 
785 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
786 
787 /* -------------------------------------------------------------------*/
788 static struct _MatOps MatOps = {MatSetValues_MPIDense,
789        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
790        MatMult_MPIDense,MatMultAdd_MPIDense,
791        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
792 /*       MatSolve_MPIDense,0, */
793        0,0,
794        0,0,
795        0,0,
796 /*       MatLUFactor_MPIDense,0, */
797        0,MatTranspose_MPIDense,
798        MatGetInfo_MPIDense,0,
799        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
800        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
801        0,
802        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
803        0,0,0,
804 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
805        0,0,
806        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
807        MatGetOwnershipRange_MPIDense,
808        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
809        0,0,0,MatConvertSameType_MPIDense,
810        0,0,0,0,0,
811        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense};
812 
813 /*@C
814    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
815 
816    Input Parameters:
817 .  comm - MPI communicator
818 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
819 .  n - number of local columns (or PETSC_DECIDE to have calculated
820            if N is given)
821 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
822 .  N - number of global columns (or PETSC_DECIDE to have calculated
823            if n is given)
824 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
825    to control all matrix memory allocation.
826 
827    Output Parameter:
828 .  newmat - the matrix
829 
830    Notes:
831    The dense format is fully compatible with standard Fortran 77
832    storage by columns.
833 
834    The data input variable is intended primarily for Fortran programmers
835    who wish to allocate their own matrix memory space.  Most users should
836    set data=PETSC_NULL.
837 
838    The user MUST specify either the local or global matrix dimensions
839    (possibly both).
840 
841    Currently, the only parallel dense matrix decomposition is by rows,
842    so that n=N and each submatrix owns all of the global columns.
843 
844 .keywords: matrix, dense, parallel
845 
846 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
847 @*/
848 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
849 {
850   Mat          mat;
851   Mat_MPIDense *a;
852   int          ierr, i,flg;
853 
854 /* Note:  For now, when data is specified above, this assumes the user correctly
855    allocates the local dense storage space.  We should add error checking. */
856 
857   *newmat         = 0;
858   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
859   PLogObjectCreate(mat);
860   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
861   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
862   mat->destroy    = MatDestroy_MPIDense;
863   mat->view       = MatView_MPIDense;
864   mat->factor     = 0;
865 
866   a->factor = 0;
867   a->insertmode = NOT_SET_VALUES;
868   MPI_Comm_rank(comm,&a->rank);
869   MPI_Comm_size(comm,&a->size);
870 
871   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
872   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
873 
874   /* each row stores all columns */
875   if (N == PETSC_DECIDE) N = n;
876   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
877   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
878   a->N = mat->N = N;
879   a->M = mat->M = M;
880   a->m = mat->m = m;
881   a->n = mat->n = n;
882 
883   /* build local table of row and column ownerships */
884   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
885   a->cowners = a->rowners + a->size + 1;
886   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
887                        sizeof(Mat_MPIDense));
888   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
889   a->rowners[0] = 0;
890   for ( i=2; i<=a->size; i++ ) {
891     a->rowners[i] += a->rowners[i-1];
892   }
893   a->rstart = a->rowners[a->rank];
894   a->rend   = a->rowners[a->rank+1];
895   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
896   a->cowners[0] = 0;
897   for ( i=2; i<=a->size; i++ ) {
898     a->cowners[i] += a->cowners[i-1];
899   }
900 
901   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
902   PLogObjectParent(mat,a->A);
903 
904   /* build cache for off array entries formed */
905   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
906 
907   /* stuff used for matrix vector multiply */
908   a->lvec        = 0;
909   a->Mvctx       = 0;
910   a->roworiented = 1;
911 
912   *newmat = mat;
913   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
914   if (flg) {
915     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
916   }
917   return 0;
918 }
919 
920 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
921 {
922   Mat          mat;
923   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
924   int          ierr;
925   FactorCtx    *factor;
926 
927   *newmat       = 0;
928   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
929   PLogObjectCreate(mat);
930   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
931   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
932   mat->destroy  = MatDestroy_MPIDense;
933   mat->view     = MatView_MPIDense;
934   mat->factor   = A->factor;
935   mat->assembled  = PETSC_TRUE;
936 
937   a->m = mat->m = oldmat->m;
938   a->n = mat->n = oldmat->n;
939   a->M = mat->M = oldmat->M;
940   a->N = mat->N = oldmat->N;
941   if (oldmat->factor) {
942     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
943     /* copy factor contents ... add this code! */
944   } else a->factor = 0;
945 
946   a->rstart     = oldmat->rstart;
947   a->rend       = oldmat->rend;
948   a->size       = oldmat->size;
949   a->rank       = oldmat->rank;
950   a->insertmode = NOT_SET_VALUES;
951 
952   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
953   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
954   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
955   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
956 
957   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
958   PLogObjectParent(mat,a->lvec);
959   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
960   PLogObjectParent(mat,a->Mvctx);
961   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
962   PLogObjectParent(mat,a->A);
963   *newmat = mat;
964   return 0;
965 }
966 
967 #include "sys.h"
968 
969 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
970 {
971   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
972   Scalar     *array,*vals,*vals_ptr;
973   MPI_Status status;
974 
975   MPI_Comm_rank(comm,&rank);
976   MPI_Comm_size(comm,&size);
977 
978   /* determine ownership of all rows */
979   m = M/size + ((M % size) > rank);
980   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
981   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
982   rowners[0] = 0;
983   for ( i=2; i<=size; i++ ) {
984     rowners[i] += rowners[i-1];
985   }
986   rstart = rowners[rank];
987   rend   = rowners[rank+1];
988 
989   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
990   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
991 
992   if (!rank) {
993     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
994 
995     /* read in my part of the matrix numerical values  */
996     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
997 
998     /* insert into matrix-by row (this is why cannot directly read into array */
999     vals_ptr = vals;
1000     for ( i=0; i<m; i++ ) {
1001       for ( j=0; j<N; j++ ) {
1002         array[i + j*m] = *vals_ptr++;
1003       }
1004     }
1005 
1006     /* read in other processors and ship out */
1007     for ( i=1; i<size; i++ ) {
1008       nz   = (rowners[i+1] - rowners[i])*N;
1009       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1010       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
1011     }
1012   }
1013   else {
1014     /* receive numeric values */
1015     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1016 
1017     /* receive message of values*/
1018     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1019 
1020     /* insert into matrix-by row (this is why cannot directly read into array */
1021     vals_ptr = vals;
1022     for ( i=0; i<m; i++ ) {
1023       for ( j=0; j<N; j++ ) {
1024         array[i + j*m] = *vals_ptr++;
1025       }
1026     }
1027   }
1028   PetscFree(rowners);
1029   PetscFree(vals);
1030   ierr = MatAssemblyBegin(*newmat,FINAL_ASSEMBLY); CHKERRQ(ierr);
1031   ierr = MatAssemblyEnd(*newmat,FINAL_ASSEMBLY); CHKERRQ(ierr);
1032   return 0;
1033 }
1034 
1035 
1036 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1037 {
1038   Mat          A;
1039   int          i, nz, ierr, j,rstart, rend, fd;
1040   Scalar       *vals,*svals;
1041   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1042   MPI_Status   status;
1043   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1044   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1045   int          tag = ((PetscObject)viewer)->tag;
1046 
1047   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1048   if (!rank) {
1049     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1050     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1051     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1052   }
1053 
1054   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1055   M = header[1]; N = header[2]; nz = header[3];
1056 
1057   /*
1058        Handle case where matrix is stored on disk as a dense matrix
1059   */
1060   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1061     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1062   }
1063 
1064   /* determine ownership of all rows */
1065   m = M/size + ((M % size) > rank);
1066   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1067   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1068   rowners[0] = 0;
1069   for ( i=2; i<=size; i++ ) {
1070     rowners[i] += rowners[i-1];
1071   }
1072   rstart = rowners[rank];
1073   rend   = rowners[rank+1];
1074 
1075   /* distribute row lengths to all processors */
1076   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1077   offlens = ourlens + (rend-rstart);
1078   if (!rank) {
1079     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1080     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1081     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1082     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1083     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1084     PetscFree(sndcounts);
1085   }
1086   else {
1087     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1088   }
1089 
1090   if (!rank) {
1091     /* calculate the number of nonzeros on each processor */
1092     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1093     PetscMemzero(procsnz,size*sizeof(int));
1094     for ( i=0; i<size; i++ ) {
1095       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1096         procsnz[i] += rowlengths[j];
1097       }
1098     }
1099     PetscFree(rowlengths);
1100 
1101     /* determine max buffer needed and allocate it */
1102     maxnz = 0;
1103     for ( i=0; i<size; i++ ) {
1104       maxnz = PetscMax(maxnz,procsnz[i]);
1105     }
1106     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1107 
1108     /* read in my part of the matrix column indices  */
1109     nz = procsnz[0];
1110     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1111     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1112 
1113     /* read in every one elses and ship off */
1114     for ( i=1; i<size; i++ ) {
1115       nz = procsnz[i];
1116       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1117       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1118     }
1119     PetscFree(cols);
1120   }
1121   else {
1122     /* determine buffer space needed for message */
1123     nz = 0;
1124     for ( i=0; i<m; i++ ) {
1125       nz += ourlens[i];
1126     }
1127     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1128 
1129     /* receive message of column indices*/
1130     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1131     MPI_Get_count(&status,MPI_INT,&maxnz);
1132     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1133   }
1134 
1135   /* loop over local rows, determining number of off diagonal entries */
1136   PetscMemzero(offlens,m*sizeof(int));
1137   jj = 0;
1138   for ( i=0; i<m; i++ ) {
1139     for ( j=0; j<ourlens[i]; j++ ) {
1140       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1141       jj++;
1142     }
1143   }
1144 
1145   /* create our matrix */
1146   for ( i=0; i<m; i++ ) {
1147     ourlens[i] -= offlens[i];
1148   }
1149   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1150   A = *newmat;
1151   for ( i=0; i<m; i++ ) {
1152     ourlens[i] += offlens[i];
1153   }
1154 
1155   if (!rank) {
1156     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1157 
1158     /* read in my part of the matrix numerical values  */
1159     nz = procsnz[0];
1160     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1161 
1162     /* insert into matrix */
1163     jj      = rstart;
1164     smycols = mycols;
1165     svals   = vals;
1166     for ( i=0; i<m; i++ ) {
1167       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1168       smycols += ourlens[i];
1169       svals   += ourlens[i];
1170       jj++;
1171     }
1172 
1173     /* read in other processors and ship out */
1174     for ( i=1; i<size; i++ ) {
1175       nz = procsnz[i];
1176       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1177       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1178     }
1179     PetscFree(procsnz);
1180   }
1181   else {
1182     /* receive numeric values */
1183     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1184 
1185     /* receive message of values*/
1186     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1187     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1188     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1189 
1190     /* insert into matrix */
1191     jj      = rstart;
1192     smycols = mycols;
1193     svals   = vals;
1194     for ( i=0; i<m; i++ ) {
1195       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1196       smycols += ourlens[i];
1197       svals   += ourlens[i];
1198       jj++;
1199     }
1200   }
1201   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1202 
1203   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1204   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1205   return 0;
1206 }
1207 
1208 
1209 
1210 
1211 
1212