xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 6a67db7e1512b5d9ff9ba8f94e48d05f0a9eae3d)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.47 1996/08/15 16:05:25 bsmith Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
13 {
14   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
15   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
16   int          roworiented = A->roworiented;
17 
18   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
19     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
20   }
21   A->insertmode = addv;
22   for ( i=0; i<m; i++ ) {
23     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
24     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
25     if (idxm[i] >= rstart && idxm[i] < rend) {
26       row = idxm[i] - rstart;
27       if (roworiented) {
28         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
29       }
30       else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
33           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
35         }
36       }
37     }
38     else {
39       if (roworiented) {
40         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
41       }
42       else { /* must stash each seperately */
43         row = idxm[i];
44         for ( j=0; j<n; j++ ) {
45           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
46         }
47       }
48     }
49   }
50   return 0;
51 }
52 
53 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
54 {
55   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
56   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
57 
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
65         if (idxn[j] >= mdn->N)
66           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     }
70     else {
71       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
72     }
73   }
74   return 0;
75 }
76 
77 static int MatGetArray_MPIDense(Mat A,Scalar **array)
78 {
79   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
80   int ierr;
81 
82   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
83   return 0;
84 }
85 
86 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
87 {
88   return 0;
89 }
90 
91 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
92 {
93   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
94   MPI_Comm     comm = mat->comm;
95   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
96   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
97   int          tag = mat->tag, *owner,*starts,count,ierr;
98   InsertMode   addv;
99   MPI_Request  *send_waits,*recv_waits;
100   Scalar       *rvalues,*svalues;
101 
102   /* make sure all processors are either in INSERTMODE or ADDMODE */
103   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
104   if (addv == (ADD_VALUES|INSERT_VALUES)) {
105     SETERRQ(1,"MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
106   }
107   mdn->insertmode = addv; /* in case this processor had no cache */
108 
109   /*  first count number of contributors to each processor */
110   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
111   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
112   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
113   for ( i=0; i<mdn->stash.n; i++ ) {
114     idx = mdn->stash.idx[i];
115     for ( j=0; j<size; j++ ) {
116       if (idx >= owners[j] && idx < owners[j+1]) {
117         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
118       }
119     }
120   }
121   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
122 
123   /* inform other processors of number of messages and max length*/
124   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
125   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
126   nreceives = work[rank];
127   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
128   nmax = work[rank];
129   PetscFree(work);
130 
131   /* post receives:
132        1) each message will consist of ordered pairs
133      (global index,value) we store the global index as a double
134      to simplify the message passing.
135        2) since we don't know how long each individual message is we
136      allocate the largest needed buffer for each receive. Potentially
137      this is a lot of wasted space.
138 
139        This could be done better.
140   */
141   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
142   CHKPTRQ(rvalues);
143   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
144   CHKPTRQ(recv_waits);
145   for ( i=0; i<nreceives; i++ ) {
146     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
147   }
148 
149   /* do sends:
150       1) starts[i] gives the starting index in svalues for stuff going to
151          the ith processor
152   */
153   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
154   CHKPTRQ(svalues);
155   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
156   CHKPTRQ(send_waits);
157   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
158   starts[0] = 0;
159   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
160   for ( i=0; i<mdn->stash.n; i++ ) {
161     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
162     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
163     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
164   }
165   PetscFree(owner);
166   starts[0] = 0;
167   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
168   count = 0;
169   for ( i=0; i<size; i++ ) {
170     if (procs[i]) {
171       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);
172     }
173   }
174   PetscFree(starts); PetscFree(nprocs);
175 
176   /* Free cache space */
177   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
178 
179   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
180   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
181   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
182   mdn->rmax       = nmax;
183 
184   return 0;
185 }
186 extern int MatSetUpMultiply_MPIDense(Mat);
187 
188 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
189 {
190   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
191   MPI_Status   *send_status,recv_status;
192   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
193   Scalar       *values,val;
194   InsertMode   addv = mdn->insertmode;
195 
196   /*  wait on receives */
197   while (count) {
198     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
199     /* unpack receives into our local space */
200     values = mdn->rvalues + 3*imdex*mdn->rmax;
201     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
202     n = n/3;
203     for ( i=0; i<n; i++ ) {
204       row = (int) PetscReal(values[3*i]) - mdn->rstart;
205       col = (int) PetscReal(values[3*i+1]);
206       val = values[3*i+2];
207       if (col >= 0 && col < mdn->N) {
208         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
209       }
210       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
211     }
212     count--;
213   }
214   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
215 
216   /* wait on sends */
217   if (mdn->nsends) {
218     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
219     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
220     PetscFree(send_status);
221   }
222   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
223 
224   mdn->insertmode = NOT_SET_VALUES;
225   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
226   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
227 
228   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
229     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
230   }
231   return 0;
232 }
233 
234 static int MatZeroEntries_MPIDense(Mat A)
235 {
236   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
237   return MatZeroEntries(l->A);
238 }
239 
240 static int MatGetBlockSize_MPIDense(Mat A,int *bs)
241 {
242   *bs = 1;
243   return 0;
244 }
245 
246 /* the code does not do the diagonal entries correctly unless the
247    matrix is square and the column and row owerships are identical.
248    This is a BUG. The only way to fix it seems to be to access
249    mdn->A and mdn->B directly and not through the MatZeroRows()
250    routine.
251 */
252 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
253 {
254   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
255   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
256   int            *procs,*nprocs,j,found,idx,nsends,*work;
257   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
258   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
259   int            *lens,imdex,*lrows,*values;
260   MPI_Comm       comm = A->comm;
261   MPI_Request    *send_waits,*recv_waits;
262   MPI_Status     recv_status,*send_status;
263   IS             istmp;
264 
265   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
266   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
267 
268   /*  first count number of contributors to each processor */
269   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
270   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
271   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
272   for ( i=0; i<N; i++ ) {
273     idx = rows[i];
274     found = 0;
275     for ( j=0; j<size; j++ ) {
276       if (idx >= owners[j] && idx < owners[j+1]) {
277         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
278       }
279     }
280     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
281   }
282   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
283 
284   /* inform other processors of number of messages and max length*/
285   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
286   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
287   nrecvs = work[rank];
288   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
289   nmax = work[rank];
290   PetscFree(work);
291 
292   /* post receives:   */
293   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
294   CHKPTRQ(rvalues);
295   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
296   CHKPTRQ(recv_waits);
297   for ( i=0; i<nrecvs; i++ ) {
298     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
299   }
300 
301   /* do sends:
302       1) starts[i] gives the starting index in svalues for stuff going to
303          the ith processor
304   */
305   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
306   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
307   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0]  = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
364     MPI_Waitall(nsends,send_waits,send_status);
365     PetscFree(send_status);
366   }
367   PetscFree(send_waits); PetscFree(svalues);
368 
369   return 0;
370 }
371 
372 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
373 {
374   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
375   int          ierr;
376 
377   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
378   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
379   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
380   return 0;
381 }
382 
383 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
384 {
385   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
386   int          ierr;
387 
388   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
389   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
390   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
391   return 0;
392 }
393 
394 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
395 {
396   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
397   int          ierr;
398   Scalar       zero = 0.0;
399 
400   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
401   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
402   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
404   return 0;
405 }
406 
407 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
408 {
409   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
410   int          ierr;
411 
412   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
413   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
414   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
415   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
416   return 0;
417 }
418 
419 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
420 {
421   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
422   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
423   int          ierr, len, i, n, m = a->m, radd;
424   Scalar       *x, zero = 0.0;
425 
426   VecSet(&zero,v);
427   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
428   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
429   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
430   len = PetscMin(aloc->m,aloc->n);
431   radd = a->rstart*m;
432   for ( i=0; i<len; i++ ) {
433     x[i] = aloc->v[radd + i*m + i];
434   }
435   return 0;
436 }
437 
438 static int MatDestroy_MPIDense(PetscObject obj)
439 {
440   Mat          mat = (Mat) obj;
441   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
442   int          ierr;
443 
444 #if defined(PETSC_LOG)
445   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
446 #endif
447   PetscFree(mdn->rowners);
448   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
449   if (mdn->lvec)   VecDestroy(mdn->lvec);
450   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
451   if (mdn->factor) {
452     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
453     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
454     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
455     PetscFree(mdn->factor);
456   }
457   PetscFree(mdn);
458   PLogObjectDestroy(mat);
459   PetscHeaderDestroy(mat);
460   return 0;
461 }
462 
463 #include "pinclude/pviewer.h"
464 
465 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
466 {
467   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
468   int          ierr;
469 
470   if (mdn->size == 1) {
471     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
472   }
473   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
474   return 0;
475 }
476 
477 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
478 {
479   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
480   int          ierr, format;
481   FILE         *fd;
482   ViewerType   vtype;
483 
484   ViewerGetType(viewer,&vtype);
485   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
486   ierr = ViewerGetFormat(viewer,&format);
487   if (format == ASCII_FORMAT_INFO_DETAILED) {
488     int rank;
489     MatInfo info;
490     MPI_Comm_rank(mat->comm,&rank);
491     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
492     PetscSequentialPhaseBegin(mat->comm,1);
493       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
494          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
495       fflush(fd);
496     PetscSequentialPhaseEnd(mat->comm,1);
497     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
498     return 0;
499   }
500   else if (format == ASCII_FORMAT_INFO) {
501     return 0;
502   }
503   if (vtype == ASCII_FILE_VIEWER) {
504     PetscSequentialPhaseBegin(mat->comm,1);
505     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
506              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
507     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
508     fflush(fd);
509     PetscSequentialPhaseEnd(mat->comm,1);
510   }
511   else {
512     int size = mdn->size, rank = mdn->rank;
513     if (size == 1) {
514       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
515     }
516     else {
517       /* assemble the entire matrix onto first processor. */
518       Mat          A;
519       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
520       Scalar       *vals;
521       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
522 
523       if (!rank) {
524         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
525       }
526       else {
527         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
528       }
529       PLogObjectParent(mat,A);
530 
531       /* Copy the matrix ... This isn't the most efficient means,
532          but it's quick for now */
533       row = mdn->rstart; m = Amdn->m;
534       for ( i=0; i<m; i++ ) {
535         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
536         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
537         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         row++;
539       }
540 
541       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
542       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
543       if (!rank) {
544         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
545       }
546       ierr = MatDestroy(A); CHKERRQ(ierr);
547     }
548   }
549   return 0;
550 }
551 
552 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
553 {
554   Mat          mat = (Mat) obj;
555   int          ierr;
556   ViewerType   vtype;
557 
558   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
559   if (vtype == ASCII_FILE_VIEWER) {
560     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
561   }
562   else if (vtype == ASCII_FILES_VIEWER) {
563     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
564   }
565   else if (vtype == BINARY_FILE_VIEWER) {
566     return MatView_MPIDense_Binary(mat,viewer);
567   }
568   return 0;
569 }
570 
571 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
572 {
573   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
574   Mat          mdn = mat->A;
575   int          ierr;
576   double       isend[5], irecv[5];
577 
578   info->rows_global    = (double)mat->M;
579   info->columns_global = (double)mat->N;
580   info->rows_local     = (double)mat->m;
581   info->columns_local  = (double)mat->N;
582   info->block_size     = 1.0;
583   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
584   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
585   isend[3] = info->memory;  isend[4] = info->mallocs;
586   if (flag == MAT_LOCAL) {
587     info->nz_used      = isend[0];
588     info->nz_allocated = isend[1];
589     info->nz_unneeded  = isend[2];
590     info->memory       = isend[3];
591     info->mallocs      = isend[4];
592   } else if (flag == MAT_GLOBAL_MAX) {
593     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
594     info->nz_used      = irecv[0];
595     info->nz_allocated = irecv[1];
596     info->nz_unneeded  = irecv[2];
597     info->memory       = irecv[3];
598     info->mallocs      = irecv[4];
599   } else if (flag == MAT_GLOBAL_SUM) {
600     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
601     info->nz_used      = irecv[0];
602     info->nz_allocated = irecv[1];
603     info->nz_unneeded  = irecv[2];
604     info->memory       = irecv[3];
605     info->mallocs      = irecv[4];
606   }
607   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
608   info->fill_ratio_needed = 0;
609   info->factor_mallocs    = 0;
610   return 0;
611 }
612 
613 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
614    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
615    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
616    extern int MatSolve_MPIDense(Mat,Vec,Vec);
617    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
618    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
619    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
620 
621 static int MatSetOption_MPIDense(Mat A,MatOption op)
622 {
623   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
624 
625   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
626       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
627       op == MAT_COLUMNS_SORTED ||
628       op == MAT_ROW_ORIENTED) {
629         MatSetOption(a->A,op);
630   }
631   else if (op == MAT_ROWS_SORTED ||
632            op == MAT_SYMMETRIC ||
633            op == MAT_STRUCTURALLY_SYMMETRIC ||
634            op == MAT_YES_NEW_DIAGONALS)
635     PLogInfo(A,"Info:MatSetOption_MPIDense:Option ignored\n");
636   else if (op == MAT_COLUMN_ORIENTED)
637     { a->roworiented = 0; MatSetOption(a->A,op);}
638   else if (op == MAT_NO_NEW_DIAGONALS)
639     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:MAT_NO_NEW_DIAGONALS");}
640   else
641     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
642   return 0;
643 }
644 
645 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
646 {
647   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
648   *m = mat->M; *n = mat->N;
649   return 0;
650 }
651 
652 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
653 {
654   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
655   *m = mat->m; *n = mat->N;
656   return 0;
657 }
658 
659 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
660 {
661   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
662   *m = mat->rstart; *n = mat->rend;
663   return 0;
664 }
665 
666 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
667 {
668   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
669   int          lrow, rstart = mat->rstart, rend = mat->rend;
670 
671   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
672   lrow = row - rstart;
673   return MatGetRow(mat->A,lrow,nz,idx,v);
674 }
675 
676 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
677 {
678   if (idx) PetscFree(*idx);
679   if (v) PetscFree(*v);
680   return 0;
681 }
682 
683 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
684 {
685   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
686   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
687   int          ierr, i, j;
688   double       sum = 0.0;
689   Scalar       *v = mat->v;
690 
691   if (mdn->size == 1) {
692     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
693   } else {
694     if (type == NORM_FROBENIUS) {
695       for (i=0; i<mat->n*mat->m; i++ ) {
696 #if defined(PETSC_COMPLEX)
697         sum += real(conj(*v)*(*v)); v++;
698 #else
699         sum += (*v)*(*v); v++;
700 #endif
701       }
702       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
703       *norm = sqrt(*norm);
704       PLogFlops(2*mat->n*mat->m);
705     }
706     else if (type == NORM_1) {
707       double *tmp, *tmp2;
708       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
709       tmp2 = tmp + mdn->N;
710       PetscMemzero(tmp,2*mdn->N*sizeof(double));
711       *norm = 0.0;
712       v = mat->v;
713       for ( j=0; j<mat->n; j++ ) {
714         for ( i=0; i<mat->m; i++ ) {
715           tmp[j] += PetscAbsScalar(*v);  v++;
716         }
717       }
718       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
719       for ( j=0; j<mdn->N; j++ ) {
720         if (tmp2[j] > *norm) *norm = tmp2[j];
721       }
722       PetscFree(tmp);
723       PLogFlops(mat->n*mat->m);
724     }
725     else if (type == NORM_INFINITY) { /* max row norm */
726       double ntemp;
727       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
728       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
729     }
730     else {
731       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
732     }
733   }
734   return 0;
735 }
736 
737 static int MatTranspose_MPIDense(Mat A,Mat *matout)
738 {
739   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
740   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
741   Mat          B;
742   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
743   int          j, i, ierr;
744   Scalar       *v;
745 
746   if (matout == PETSC_NULL && M != N) {
747     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
748   }
749   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
750 
751   m = Aloc->m; n = Aloc->n; v = Aloc->v;
752   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
753   for ( j=0; j<n; j++ ) {
754     for (i=0; i<m; i++) rwork[i] = rstart + i;
755     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
756     v   += m;
757   }
758   PetscFree(rwork);
759   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
760   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
761   if (matout != PETSC_NULL) {
762     *matout = B;
763   } else {
764     /* This isn't really an in-place transpose, but free data struct from a */
765     PetscFree(a->rowners);
766     ierr = MatDestroy(a->A); CHKERRQ(ierr);
767     if (a->lvec) VecDestroy(a->lvec);
768     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
769     PetscFree(a);
770     PetscMemcpy(A,B,sizeof(struct _Mat));
771     PetscHeaderDestroy(B);
772   }
773   return 0;
774 }
775 
776 #include "pinclude/plapack.h"
777 static int MatScale_MPIDense(Scalar *alpha,Mat inA)
778 {
779   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
780   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
781   int          one = 1, nz;
782 
783   nz = a->m*a->n;
784   BLscal_( &nz, alpha, a->v, &one );
785   PLogFlops(nz);
786   return 0;
787 }
788 
789 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
790 
791 /* -------------------------------------------------------------------*/
792 static struct _MatOps MatOps = {MatSetValues_MPIDense,
793        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
794        MatMult_MPIDense,MatMultAdd_MPIDense,
795        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
796 /*       MatSolve_MPIDense,0, */
797        0,0,
798        0,0,
799        0,0,
800 /*       MatLUFactor_MPIDense,0, */
801        0,MatTranspose_MPIDense,
802        MatGetInfo_MPIDense,0,
803        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
804        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
805        0,
806        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
807        0,0,0,
808 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
809        0,0,
810        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
811        MatGetOwnershipRange_MPIDense,
812        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
813        0,MatConvertSameType_MPIDense,
814        0,0,0,0,0,
815        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense,
816        0,0,0,MatGetBlockSize_MPIDense};
817 
818 /*@C
819    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
820 
821    Input Parameters:
822 .  comm - MPI communicator
823 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
824 .  n - number of local columns (or PETSC_DECIDE to have calculated
825            if N is given)
826 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
827 .  N - number of global columns (or PETSC_DECIDE to have calculated
828            if n is given)
829 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
830    to control all matrix memory allocation.
831 
832    Output Parameter:
833 .  A - the matrix
834 
835    Notes:
836    The dense format is fully compatible with standard Fortran 77
837    storage by columns.
838 
839    The data input variable is intended primarily for Fortran programmers
840    who wish to allocate their own matrix memory space.  Most users should
841    set data=PETSC_NULL.
842 
843    The user MUST specify either the local or global matrix dimensions
844    (possibly both).
845 
846    Currently, the only parallel dense matrix decomposition is by rows,
847    so that n=N and each submatrix owns all of the global columns.
848 
849 .keywords: matrix, dense, parallel
850 
851 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
852 @*/
853 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
854 {
855   Mat          mat;
856   Mat_MPIDense *a;
857   int          ierr, i,flg;
858 
859   /* Note:  For now, when data is specified above, this assumes the user correctly
860    allocates the local dense storage space.  We should add error checking. */
861 
862   *A = 0;
863   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
864   PLogObjectCreate(mat);
865   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
866   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
867   mat->destroy    = MatDestroy_MPIDense;
868   mat->view       = MatView_MPIDense;
869   mat->factor     = 0;
870 
871   a->factor     = 0;
872   a->insertmode = NOT_SET_VALUES;
873   MPI_Comm_rank(comm,&a->rank);
874   MPI_Comm_size(comm,&a->size);
875 
876   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
877   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
878 
879   /* each row stores all columns */
880   if (N == PETSC_DECIDE) N = n;
881   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
882   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
883   a->N = mat->N = N;
884   a->M = mat->M = M;
885   a->m = mat->m = m;
886   a->n = mat->n = n;
887 
888   /* build local table of row and column ownerships */
889   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
890   a->cowners = a->rowners + a->size + 1;
891   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
892   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
893   a->rowners[0] = 0;
894   for ( i=2; i<=a->size; i++ ) {
895     a->rowners[i] += a->rowners[i-1];
896   }
897   a->rstart = a->rowners[a->rank];
898   a->rend   = a->rowners[a->rank+1];
899   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
900   a->cowners[0] = 0;
901   for ( i=2; i<=a->size; i++ ) {
902     a->cowners[i] += a->cowners[i-1];
903   }
904 
905   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
906   PLogObjectParent(mat,a->A);
907 
908   /* build cache for off array entries formed */
909   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
910 
911   /* stuff used for matrix vector multiply */
912   a->lvec        = 0;
913   a->Mvctx       = 0;
914   a->roworiented = 1;
915 
916   *A = mat;
917   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
918   if (flg) {
919     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
920   }
921   return 0;
922 }
923 
924 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
925 {
926   Mat          mat;
927   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
928   int          ierr;
929   FactorCtx    *factor;
930 
931   *newmat       = 0;
932   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
933   PLogObjectCreate(mat);
934   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
935   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
936   mat->destroy   = MatDestroy_MPIDense;
937   mat->view      = MatView_MPIDense;
938   mat->factor    = A->factor;
939   mat->assembled = PETSC_TRUE;
940 
941   a->m = mat->m = oldmat->m;
942   a->n = mat->n = oldmat->n;
943   a->M = mat->M = oldmat->M;
944   a->N = mat->N = oldmat->N;
945   if (oldmat->factor) {
946     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
947     /* copy factor contents ... add this code! */
948   } else a->factor = 0;
949 
950   a->rstart     = oldmat->rstart;
951   a->rend       = oldmat->rend;
952   a->size       = oldmat->size;
953   a->rank       = oldmat->rank;
954   a->insertmode = NOT_SET_VALUES;
955 
956   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
957   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
958   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
959   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
960 
961   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
962   PLogObjectParent(mat,a->lvec);
963   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
964   PLogObjectParent(mat,a->Mvctx);
965   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
966   PLogObjectParent(mat,a->A);
967   *newmat = mat;
968   return 0;
969 }
970 
971 #include "sys.h"
972 
973 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
974 {
975   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
976   Scalar     *array,*vals,*vals_ptr;
977   MPI_Status status;
978 
979   MPI_Comm_rank(comm,&rank);
980   MPI_Comm_size(comm,&size);
981 
982   /* determine ownership of all rows */
983   m = M/size + ((M % size) > rank);
984   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
985   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
986   rowners[0] = 0;
987   for ( i=2; i<=size; i++ ) {
988     rowners[i] += rowners[i-1];
989   }
990   rstart = rowners[rank];
991   rend   = rowners[rank+1];
992 
993   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
994   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
995 
996   if (!rank) {
997     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
998 
999     /* read in my part of the matrix numerical values  */
1000     ierr = PetscBinaryRead(fd,vals,m*N,BINARY_SCALAR); CHKERRQ(ierr);
1001 
1002     /* insert into matrix-by row (this is why cannot directly read into array */
1003     vals_ptr = vals;
1004     for ( i=0; i<m; i++ ) {
1005       for ( j=0; j<N; j++ ) {
1006         array[i + j*m] = *vals_ptr++;
1007       }
1008     }
1009 
1010     /* read in other processors and ship out */
1011     for ( i=1; i<size; i++ ) {
1012       nz   = (rowners[i+1] - rowners[i])*N;
1013       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1014       MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);
1015     }
1016   }
1017   else {
1018     /* receive numeric values */
1019     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1020 
1021     /* receive message of values*/
1022     MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);
1023 
1024     /* insert into matrix-by row (this is why cannot directly read into array */
1025     vals_ptr = vals;
1026     for ( i=0; i<m; i++ ) {
1027       for ( j=0; j<N; j++ ) {
1028         array[i + j*m] = *vals_ptr++;
1029       }
1030     }
1031   }
1032   PetscFree(rowners);
1033   PetscFree(vals);
1034   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1035   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1036   return 0;
1037 }
1038 
1039 
1040 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1041 {
1042   Mat          A;
1043   int          i, nz, ierr, j,rstart, rend, fd;
1044   Scalar       *vals,*svals;
1045   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1046   MPI_Status   status;
1047   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1048   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1049   int          tag = ((PetscObject)viewer)->tag;
1050 
1051   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1052   if (!rank) {
1053     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1054     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1055     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
1056   }
1057 
1058   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1059   M = header[1]; N = header[2]; nz = header[3];
1060 
1061   /*
1062        Handle case where matrix is stored on disk as a dense matrix
1063   */
1064   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1065     return MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);
1066   }
1067 
1068   /* determine ownership of all rows */
1069   m = M/size + ((M % size) > rank);
1070   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1071   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1072   rowners[0] = 0;
1073   for ( i=2; i<=size; i++ ) {
1074     rowners[i] += rowners[i-1];
1075   }
1076   rstart = rowners[rank];
1077   rend   = rowners[rank+1];
1078 
1079   /* distribute row lengths to all processors */
1080   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1081   offlens = ourlens + (rend-rstart);
1082   if (!rank) {
1083     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1084     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1085     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1086     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1087     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1088     PetscFree(sndcounts);
1089   }
1090   else {
1091     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1092   }
1093 
1094   if (!rank) {
1095     /* calculate the number of nonzeros on each processor */
1096     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1097     PetscMemzero(procsnz,size*sizeof(int));
1098     for ( i=0; i<size; i++ ) {
1099       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1100         procsnz[i] += rowlengths[j];
1101       }
1102     }
1103     PetscFree(rowlengths);
1104 
1105     /* determine max buffer needed and allocate it */
1106     maxnz = 0;
1107     for ( i=0; i<size; i++ ) {
1108       maxnz = PetscMax(maxnz,procsnz[i]);
1109     }
1110     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1111 
1112     /* read in my part of the matrix column indices  */
1113     nz = procsnz[0];
1114     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1115     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1116 
1117     /* read in every one elses and ship off */
1118     for ( i=1; i<size; i++ ) {
1119       nz = procsnz[i];
1120       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1121       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1122     }
1123     PetscFree(cols);
1124   }
1125   else {
1126     /* determine buffer space needed for message */
1127     nz = 0;
1128     for ( i=0; i<m; i++ ) {
1129       nz += ourlens[i];
1130     }
1131     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1132 
1133     /* receive message of column indices*/
1134     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1135     MPI_Get_count(&status,MPI_INT,&maxnz);
1136     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1137   }
1138 
1139   /* loop over local rows, determining number of off diagonal entries */
1140   PetscMemzero(offlens,m*sizeof(int));
1141   jj = 0;
1142   for ( i=0; i<m; i++ ) {
1143     for ( j=0; j<ourlens[i]; j++ ) {
1144       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1145       jj++;
1146     }
1147   }
1148 
1149   /* create our matrix */
1150   for ( i=0; i<m; i++ ) {
1151     ourlens[i] -= offlens[i];
1152   }
1153   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1154   A = *newmat;
1155   for ( i=0; i<m; i++ ) {
1156     ourlens[i] += offlens[i];
1157   }
1158 
1159   if (!rank) {
1160     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1161 
1162     /* read in my part of the matrix numerical values  */
1163     nz = procsnz[0];
1164     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1165 
1166     /* insert into matrix */
1167     jj      = rstart;
1168     smycols = mycols;
1169     svals   = vals;
1170     for ( i=0; i<m; i++ ) {
1171       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1172       smycols += ourlens[i];
1173       svals   += ourlens[i];
1174       jj++;
1175     }
1176 
1177     /* read in other processors and ship out */
1178     for ( i=1; i<size; i++ ) {
1179       nz = procsnz[i];
1180       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1181       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1182     }
1183     PetscFree(procsnz);
1184   }
1185   else {
1186     /* receive numeric values */
1187     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1188 
1189     /* receive message of values*/
1190     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1191     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1192     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1193 
1194     /* insert into matrix */
1195     jj      = rstart;
1196     smycols = mycols;
1197     svals   = vals;
1198     for ( i=0; i<m; i++ ) {
1199       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1200       smycols += ourlens[i];
1201       svals   += ourlens[i];
1202       jj++;
1203     }
1204   }
1205   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1206 
1207   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1208   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1209   return 0;
1210 }
1211 
1212 
1213 
1214 
1215 
1216