xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 6d84be18fbb99ba69be7b8bdde5411a66955b7ea)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.28 1996/02/01 18:52:35 curfman Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
80 {
81   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
82   MPI_Comm     comm = mat->comm;
83   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
84   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
85   int          tag = mat->tag, *owner,*starts,count,ierr;
86   InsertMode   addv;
87   MPI_Request  *send_waits,*recv_waits;
88   Scalar       *rvalues,*svalues;
89 
90   /* make sure all processors are either in INSERTMODE or ADDMODE */
91   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
92   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
93     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
94     }
95   mdn->insertmode = addv; /* in case this processor had no cache */
96 
97   /*  first count number of contributors to each processor */
98   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
99   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
100   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
101   for ( i=0; i<mdn->stash.n; i++ ) {
102     idx = mdn->stash.idx[i];
103     for ( j=0; j<size; j++ ) {
104       if (idx >= owners[j] && idx < owners[j+1]) {
105         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
106       }
107     }
108   }
109   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
110 
111   /* inform other processors of number of messages and max length*/
112   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
113   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
114   nreceives = work[rank];
115   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
116   nmax = work[rank];
117   PetscFree(work);
118 
119   /* post receives:
120        1) each message will consist of ordered pairs
121      (global index,value) we store the global index as a double
122      to simplify the message passing.
123        2) since we don't know how long each individual message is we
124      allocate the largest needed buffer for each receive. Potentially
125      this is a lot of wasted space.
126 
127        This could be done better.
128   */
129   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
130   CHKPTRQ(rvalues);
131   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
132   CHKPTRQ(recv_waits);
133   for ( i=0; i<nreceives; i++ ) {
134     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
135               comm,recv_waits+i);
136   }
137 
138   /* do sends:
139       1) starts[i] gives the starting index in svalues for stuff going to
140          the ith processor
141   */
142   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
143   CHKPTRQ(svalues);
144   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
145   CHKPTRQ(send_waits);
146   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
147   starts[0] = 0;
148   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
149   for ( i=0; i<mdn->stash.n; i++ ) {
150     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
151     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
152     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
153   }
154   PetscFree(owner);
155   starts[0] = 0;
156   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
157   count = 0;
158   for ( i=0; i<size; i++ ) {
159     if (procs[i]) {
160       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
161                 comm,send_waits+count++);
162     }
163   }
164   PetscFree(starts); PetscFree(nprocs);
165 
166   /* Free cache space */
167   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
168 
169   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
170   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
171   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
172   mdn->rmax       = nmax;
173 
174   return 0;
175 }
176 extern int MatSetUpMultiply_MPIDense(Mat);
177 
178 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
179 {
180   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
181   MPI_Status   *send_status,recv_status;
182   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
183   Scalar       *values,val;
184   InsertMode   addv = mdn->insertmode;
185 
186   /*  wait on receives */
187   while (count) {
188     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
189     /* unpack receives into our local space */
190     values = mdn->rvalues + 3*imdex*mdn->rmax;
191     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
192     n = n/3;
193     for ( i=0; i<n; i++ ) {
194       row = (int) PetscReal(values[3*i]) - mdn->rstart;
195       col = (int) PetscReal(values[3*i+1]);
196       val = values[3*i+2];
197       if (col >= 0 && col < mdn->N) {
198         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
199       }
200       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
201     }
202     count--;
203   }
204   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
205 
206   /* wait on sends */
207   if (mdn->nsends) {
208     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
209     CHKPTRQ(send_status);
210     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
211     PetscFree(send_status);
212   }
213   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
214 
215   mdn->insertmode = NOT_SET_VALUES;
216   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
217   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
218 
219   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
220     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
221   }
222   return 0;
223 }
224 
225 static int MatZeroEntries_MPIDense(Mat A)
226 {
227   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
228   return MatZeroEntries(l->A);
229 }
230 
231 /* the code does not do the diagonal entries correctly unless the
232    matrix is square and the column and row owerships are identical.
233    This is a BUG. The only way to fix it seems to be to access
234    mdn->A and mdn->B directly and not through the MatZeroRows()
235    routine.
236 */
237 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
238 {
239   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
240   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
241   int            *procs,*nprocs,j,found,idx,nsends,*work;
242   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
243   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
244   int            *lens,imdex,*lrows,*values;
245   MPI_Comm       comm = A->comm;
246   MPI_Request    *send_waits,*recv_waits;
247   MPI_Status     recv_status,*send_status;
248   IS             istmp;
249 
250   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
251   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
252 
253   /*  first count number of contributors to each processor */
254   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
255   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
256   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
257   for ( i=0; i<N; i++ ) {
258     idx = rows[i];
259     found = 0;
260     for ( j=0; j<size; j++ ) {
261       if (idx >= owners[j] && idx < owners[j+1]) {
262         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
263       }
264     }
265     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
266   }
267   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
268 
269   /* inform other processors of number of messages and max length*/
270   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
271   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
272   nrecvs = work[rank];
273   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
274   nmax = work[rank];
275   PetscFree(work);
276 
277   /* post receives:   */
278   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
279   CHKPTRQ(rvalues);
280   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
281   CHKPTRQ(recv_waits);
282   for ( i=0; i<nrecvs; i++ ) {
283     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
284   }
285 
286   /* do sends:
287       1) starts[i] gives the starting index in svalues for stuff going to
288          the ith processor
289   */
290   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
291   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
292   CHKPTRQ(send_waits);
293   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
294   starts[0] = 0;
295   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
296   for ( i=0; i<N; i++ ) {
297     svalues[starts[owner[i]]++] = rows[i];
298   }
299   ISRestoreIndices(is,&rows);
300 
301   starts[0] = 0;
302   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
303   count = 0;
304   for ( i=0; i<size; i++ ) {
305     if (procs[i]) {
306       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
307     }
308   }
309   PetscFree(starts);
310 
311   base = owners[rank];
312 
313   /*  wait on receives */
314   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
315   source = lens + nrecvs;
316   count  = nrecvs; slen = 0;
317   while (count) {
318     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
319     /* unpack receives into our local space */
320     MPI_Get_count(&recv_status,MPI_INT,&n);
321     source[imdex]  = recv_status.MPI_SOURCE;
322     lens[imdex]  = n;
323     slen += n;
324     count--;
325   }
326   PetscFree(recv_waits);
327 
328   /* move the data into the send scatter */
329   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
330   count = 0;
331   for ( i=0; i<nrecvs; i++ ) {
332     values = rvalues + i*nmax;
333     for ( j=0; j<lens[i]; j++ ) {
334       lrows[count++] = values[j] - base;
335     }
336   }
337   PetscFree(rvalues); PetscFree(lens);
338   PetscFree(owner); PetscFree(nprocs);
339 
340   /* actually zap the local rows */
341   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
342   PLogObjectParent(A,istmp);
343   PetscFree(lrows);
344   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
345   ierr = ISDestroy(istmp); CHKERRQ(ierr);
346 
347   /* wait on sends */
348   if (nsends) {
349     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
350     CHKPTRQ(send_status);
351     MPI_Waitall(nsends,send_waits,send_status);
352     PetscFree(send_status);
353   }
354   PetscFree(send_waits); PetscFree(svalues);
355 
356   return 0;
357 }
358 
359 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
360 {
361   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
362   int          ierr;
363 
364   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
365   CHKERRQ(ierr);
366   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
367   CHKERRQ(ierr);
368   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
369   return 0;
370 }
371 
372 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
373 {
374   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
375   int          ierr;
376 
377   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
378   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
379   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
380   return 0;
381 }
382 
383 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
384 {
385   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
386   int          ierr;
387   Scalar       zero = 0.0;
388 
389   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
390   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
391   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
392          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
393   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
394          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
395   return 0;
396 }
397 
398 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
399 {
400   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
401   int          ierr;
402 
403   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
404   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
406          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
407   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   return 0;
410 }
411 
412 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
416   int          ierr, i, n, m = a->m, radd;
417   Scalar       *x;
418 
419   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
420   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
421   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
422   radd = a->rstart*m*m;
423   for ( i=0; i<m; i++ ) {
424     x[i] = aloc->v[radd + i*m + i];
425   }
426   return 0;
427 }
428 
429 static int MatDestroy_MPIDense(PetscObject obj)
430 {
431   Mat          mat = (Mat) obj;
432   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
433   int          ierr;
434 
435 #if defined(PETSC_LOG)
436   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
437 #endif
438   PetscFree(mdn->rowners);
439   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
440   if (mdn->lvec)   VecDestroy(mdn->lvec);
441   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
442   if (mdn->factor) {
443     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
444     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
445     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
446     PetscFree(mdn->factor);
447   }
448   PetscFree(mdn);
449   PLogObjectDestroy(mat);
450   PetscHeaderDestroy(mat);
451   return 0;
452 }
453 
454 #include "pinclude/pviewer.h"
455 
456 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
457 {
458   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
459   int          ierr;
460   if (mdn->size == 1) {
461     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
462   }
463   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
464   return 0;
465 }
466 
467 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
468 {
469   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
470   int          ierr, format;
471   PetscObject  vobj = (PetscObject) viewer;
472   FILE         *fd;
473 
474   ierr = ViewerFileGetPointer(viewer,&fd); CHKERRQ(ierr);
475   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
476     ierr = ViewerFileGetFormat_Private(viewer,&format);
477     if (format == FILE_FORMAT_INFO_DETAILED) {
478       int nz, nzalloc, mem, rank;
479       MPI_Comm_rank(mat->comm,&rank);
480       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
481       MPIU_Seq_begin(mat->comm,1);
482         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
483             rank,mdn->m,nz,nzalloc,mem);
484       fflush(fd);
485       MPIU_Seq_end(mat->comm,1);
486       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
487       return 0;
488     }
489     else if (format == FILE_FORMAT_INFO) {
490       return 0;
491     }
492   }
493   if (vobj->type == ASCII_FILE_VIEWER) {
494     MPIU_Seq_begin(mat->comm,1);
495     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
496              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
497     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
498     fflush(fd);
499     MPIU_Seq_end(mat->comm,1);
500   }
501   else {
502     int size = mdn->size, rank = mdn->rank;
503     if (size == 1) {
504       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
505     }
506     else {
507       /* assemble the entire matrix onto first processor. */
508       Mat          A;
509       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
510       Scalar       *vals;
511       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
512 
513       if (!rank) {
514         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
515       }
516       else {
517         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
518       }
519       PLogObjectParent(mat,A);
520 
521       /* Copy the matrix ... This isn't the most efficient means,
522          but it's quick for now */
523       row = mdn->rstart; m = Amdn->m;
524       for ( i=0; i<m; i++ ) {
525         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
526         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
527         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
528         row++;
529       }
530 
531       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
532       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
533       if (!rank) {
534         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
535       }
536       ierr = MatDestroy(A); CHKERRQ(ierr);
537     }
538   }
539   return 0;
540 }
541 
542 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
543 {
544   Mat          mat = (Mat) obj;
545   PetscObject  vobj = (PetscObject) viewer;
546   int          ierr;
547 
548   if (!viewer) {
549     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
550   }
551   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
552     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
553   }
554   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
555     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
556   }
557   else if (vobj->type == BINARY_FILE_VIEWER) {
558     return MatView_MPIDense_Binary(mat,viewer);
559   }
560   return 0;
561 }
562 
563 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
564                              int *nzalloc,int *mem)
565 {
566   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
567   Mat          mdn = mat->A;
568   int          ierr, isend[3], irecv[3];
569 
570   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
571   if (flag == MAT_LOCAL) {
572     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
573   } else if (flag == MAT_GLOBAL_MAX) {
574     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
575     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
576   } else if (flag == MAT_GLOBAL_SUM) {
577     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
578     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
579   }
580   return 0;
581 }
582 
583 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
584    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
585    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
586    extern int MatSolve_MPIDense(Mat,Vec,Vec);
587    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
588    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
589    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
590 
591 static int MatSetOption_MPIDense(Mat A,MatOption op)
592 {
593   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
594 
595   if (op == NO_NEW_NONZERO_LOCATIONS ||
596       op == YES_NEW_NONZERO_LOCATIONS ||
597       op == COLUMNS_SORTED ||
598       op == ROW_ORIENTED) {
599         MatSetOption(a->A,op);
600   }
601   else if (op == ROWS_SORTED ||
602            op == SYMMETRIC_MATRIX ||
603            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
604            op == YES_NEW_DIAGONALS)
605     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
606   else if (op == COLUMN_ORIENTED)
607     { a->roworiented = 0; MatSetOption(a->A,op);}
608   else if (op == NO_NEW_DIAGONALS)
609     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
610   else
611     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
612   return 0;
613 }
614 
615 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
616 {
617   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
618   *m = mat->M; *n = mat->N;
619   return 0;
620 }
621 
622 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
623 {
624   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
625   *m = mat->m; *n = mat->N;
626   return 0;
627 }
628 
629 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
630 {
631   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
632   *m = mat->rstart; *n = mat->rend;
633   return 0;
634 }
635 
636 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
637 {
638   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
639   int          lrow, rstart = mat->rstart, rend = mat->rend;
640 
641   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
642   lrow = row - rstart;
643   return MatGetRow(mat->A,lrow,nz,idx,v);
644 }
645 
646 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
647 {
648   if (idx) PetscFree(*idx);
649   if (v) PetscFree(*v);
650   return 0;
651 }
652 
653 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
654 {
655   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
656   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
657   int          ierr, i, j;
658   double       sum = 0.0;
659   Scalar       *v = mat->v;
660 
661   if (mdn->size == 1) {
662     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
663   } else {
664     if (type == NORM_FROBENIUS) {
665       for (i=0; i<mat->n*mat->m; i++ ) {
666 #if defined(PETSC_COMPLEX)
667         sum += real(conj(*v)*(*v)); v++;
668 #else
669         sum += (*v)*(*v); v++;
670 #endif
671       }
672       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
673       *norm = sqrt(*norm);
674       PLogFlops(2*mat->n*mat->m);
675     }
676     else if (type == NORM_1) {
677       double *tmp, *tmp2;
678       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
679       tmp2 = tmp + mdn->N;
680       PetscMemzero(tmp,2*mdn->N*sizeof(double));
681       *norm = 0.0;
682       v = mat->v;
683       for ( j=0; j<mat->n; j++ ) {
684         for ( i=0; i<mat->m; i++ ) {
685           tmp[j] += PetscAbsScalar(*v);  v++;
686         }
687       }
688       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
689       for ( j=0; j<mdn->N; j++ ) {
690         if (tmp2[j] > *norm) *norm = tmp2[j];
691       }
692       PetscFree(tmp);
693       PLogFlops(mat->n*mat->m);
694     }
695     else if (type == NORM_INFINITY) { /* max row norm */
696       double ntemp;
697       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
698       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
699     }
700     else {
701       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
702     }
703   }
704   return 0;
705 }
706 
707 static int MatTranspose_MPIDense(Mat A,Mat *matout)
708 {
709   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
710   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
711   Mat          B;
712   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
713   int          j, i, ierr;
714   Scalar       *v;
715 
716   if (matout == PETSC_NULL && M != N)
717     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
718   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
719          CHKERRQ(ierr);
720 
721   m = Aloc->m; n = Aloc->n; v = Aloc->v;
722   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
723   for ( j=0; j<n; j++ ) {
724     for (i=0; i<m; i++) rwork[i] = rstart + i;
725     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
726     v += m;
727   }
728   PetscFree(rwork);
729   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
730   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
731   if (matout != PETSC_NULL) {
732     *matout = B;
733   } else {
734     /* This isn't really an in-place transpose, but free data struct from a */
735     PetscFree(a->rowners);
736     ierr = MatDestroy(a->A); CHKERRQ(ierr);
737     if (a->lvec) VecDestroy(a->lvec);
738     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
739     PetscFree(a);
740     PetscMemcpy(A,B,sizeof(struct _Mat));
741     PetscHeaderDestroy(B);
742   }
743   return 0;
744 }
745 
746 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
747 
748 /* -------------------------------------------------------------------*/
749 static struct _MatOps MatOps = {MatSetValues_MPIDense,
750        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
751        MatMult_MPIDense,MatMultAdd_MPIDense,
752        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
753 /*       MatSolve_MPIDense,0, */
754        0,0,
755        0,0,
756        0,0,
757 /*       MatLUFactor_MPIDense,0, */
758        0,MatTranspose_MPIDense,
759        MatGetInfo_MPIDense,0,
760        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
761        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
762        0,
763        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
764        0,0,0,
765 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
766        0,0,
767        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
768        MatGetOwnershipRange_MPIDense,
769        0,0,
770        0,0,0,0,0,MatConvertSameType_MPIDense,
771        0,0,0,0,0,
772        0,0,MatGetValues_MPIDense};
773 
774 /*@C
775    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
776 
777    Input Parameters:
778 .  comm - MPI communicator
779 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
780 .  n - number of local columns (or PETSC_DECIDE to have calculated
781            if N is given)
782 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
783 .  N - number of global columns (or PETSC_DECIDE to have calculated
784            if n is given)
785 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
786    to control all matrix memory allocation.
787 
788    Output Parameter:
789 .  newmat - the matrix
790 
791    Notes:
792    The dense format is fully compatible with standard Fortran 77
793    storage by columns.
794 
795    The data input variable is intended primarily for Fortran programmers
796    who wish to allocate their own matrix memory space.  Most users should
797    set data=PETSC_NULL.
798 
799    The user MUST specify either the local or global matrix dimensions
800    (possibly both).
801 
802    Currently, the only parallel dense matrix decomposition is by rows,
803    so that n=N and each submatrix owns all of the global columns.
804 
805 .keywords: matrix, dense, parallel
806 
807 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
808 @*/
809 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
810 {
811   Mat          mat;
812   Mat_MPIDense *a;
813   int          ierr, i,flg;
814 
815 /* Note:  For now, when data is specified above, this assumes the user correctly
816    allocates the local dense storage space.  We should add error checking. */
817 
818   *newmat         = 0;
819   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
820   PLogObjectCreate(mat);
821   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
822   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
823   mat->destroy    = MatDestroy_MPIDense;
824   mat->view       = MatView_MPIDense;
825   mat->factor     = 0;
826 
827   a->factor = 0;
828   a->insertmode = NOT_SET_VALUES;
829   MPI_Comm_rank(comm,&a->rank);
830   MPI_Comm_size(comm,&a->size);
831 
832   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
833   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
834 
835   /* each row stores all columns */
836   if (N == PETSC_DECIDE) N = n;
837   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
838   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
839   a->N = N;
840   a->M = M;
841   a->m = m;
842   a->n = n;
843 
844   /* build local table of row and column ownerships */
845   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
846   a->cowners = a->rowners + a->size + 1;
847   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
848                        sizeof(Mat_MPIDense));
849   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
850   a->rowners[0] = 0;
851   for ( i=2; i<=a->size; i++ ) {
852     a->rowners[i] += a->rowners[i-1];
853   }
854   a->rstart = a->rowners[a->rank];
855   a->rend   = a->rowners[a->rank+1];
856   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
857   a->cowners[0] = 0;
858   for ( i=2; i<=a->size; i++ ) {
859     a->cowners[i] += a->cowners[i-1];
860   }
861 
862   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
863   PLogObjectParent(mat,a->A);
864 
865   /* build cache for off array entries formed */
866   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
867 
868   /* stuff used for matrix vector multiply */
869   a->lvec        = 0;
870   a->Mvctx       = 0;
871   a->roworiented = 1;
872 
873   *newmat = mat;
874   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
875   if (flg) {
876     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
877   }
878   return 0;
879 }
880 
881 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
882 {
883   Mat          mat;
884   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
885   int          ierr;
886   FactorCtx    *factor;
887 
888   *newmat       = 0;
889   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
890   PLogObjectCreate(mat);
891   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
892   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
893   mat->destroy  = MatDestroy_MPIDense;
894   mat->view     = MatView_MPIDense;
895   mat->factor   = A->factor;
896   mat->assembled  = PETSC_TRUE;
897 
898   a->m          = oldmat->m;
899   a->n          = oldmat->n;
900   a->M          = oldmat->M;
901   a->N          = oldmat->N;
902   if (oldmat->factor) {
903     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
904     /* copy factor contents ... add this code! */
905   } else a->factor = 0;
906 
907   a->rstart     = oldmat->rstart;
908   a->rend       = oldmat->rend;
909   a->size       = oldmat->size;
910   a->rank       = oldmat->rank;
911   a->insertmode = NOT_SET_VALUES;
912 
913   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
914   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
915   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
916   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
917 
918   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
919   PLogObjectParent(mat,a->lvec);
920   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
921   PLogObjectParent(mat,a->Mvctx);
922   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
923   PLogObjectParent(mat,a->A);
924   *newmat = mat;
925   return 0;
926 }
927 
928 #include "sysio.h"
929 
930 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
931 {
932   Mat          A;
933   int          i, nz, ierr, j,rstart, rend, fd;
934   Scalar       *vals,*svals;
935   PetscObject  vobj = (PetscObject) bview;
936   MPI_Comm     comm = vobj->comm;
937   MPI_Status   status;
938   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
939   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
940   int          tag = ((PetscObject)bview)->tag;
941 
942   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
943   if (!rank) {
944     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
945     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
946     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
947   }
948 
949   MPI_Bcast(header+1,3,MPI_INT,0,comm);
950   M = header[1]; N = header[2];
951   /* determine ownership of all rows */
952   m = M/size + ((M % size) > rank);
953   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
954   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
955   rowners[0] = 0;
956   for ( i=2; i<=size; i++ ) {
957     rowners[i] += rowners[i-1];
958   }
959   rstart = rowners[rank];
960   rend   = rowners[rank+1];
961 
962   /* distribute row lengths to all processors */
963   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
964   offlens = ourlens + (rend-rstart);
965   if (!rank) {
966     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
967     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
968     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
969     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
970     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
971     PetscFree(sndcounts);
972   }
973   else {
974     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
975   }
976 
977   if (!rank) {
978     /* calculate the number of nonzeros on each processor */
979     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
980     PetscMemzero(procsnz,size*sizeof(int));
981     for ( i=0; i<size; i++ ) {
982       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
983         procsnz[i] += rowlengths[j];
984       }
985     }
986     PetscFree(rowlengths);
987 
988     /* determine max buffer needed and allocate it */
989     maxnz = 0;
990     for ( i=0; i<size; i++ ) {
991       maxnz = PetscMax(maxnz,procsnz[i]);
992     }
993     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
994 
995     /* read in my part of the matrix column indices  */
996     nz = procsnz[0];
997     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
998     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
999 
1000     /* read in every one elses and ship off */
1001     for ( i=1; i<size; i++ ) {
1002       nz = procsnz[i];
1003       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1004       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1005     }
1006     PetscFree(cols);
1007   }
1008   else {
1009     /* determine buffer space needed for message */
1010     nz = 0;
1011     for ( i=0; i<m; i++ ) {
1012       nz += ourlens[i];
1013     }
1014     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1015 
1016     /* receive message of column indices*/
1017     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1018     MPI_Get_count(&status,MPI_INT,&maxnz);
1019     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1020   }
1021 
1022   /* loop over local rows, determining number of off diagonal entries */
1023   PetscMemzero(offlens,m*sizeof(int));
1024   jj = 0;
1025   for ( i=0; i<m; i++ ) {
1026     for ( j=0; j<ourlens[i]; j++ ) {
1027       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1028       jj++;
1029     }
1030   }
1031 
1032   /* create our matrix */
1033   for ( i=0; i<m; i++ ) {
1034     ourlens[i] -= offlens[i];
1035   }
1036   if (type == MATMPIDENSE) {
1037     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1038   }
1039   A = *newmat;
1040   for ( i=0; i<m; i++ ) {
1041     ourlens[i] += offlens[i];
1042   }
1043 
1044   if (!rank) {
1045     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1046 
1047     /* read in my part of the matrix numerical values  */
1048     nz = procsnz[0];
1049     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1050 
1051     /* insert into matrix */
1052     jj      = rstart;
1053     smycols = mycols;
1054     svals   = vals;
1055     for ( i=0; i<m; i++ ) {
1056       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1057       smycols += ourlens[i];
1058       svals   += ourlens[i];
1059       jj++;
1060     }
1061 
1062     /* read in other processors and ship out */
1063     for ( i=1; i<size; i++ ) {
1064       nz = procsnz[i];
1065       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1066       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1067     }
1068     PetscFree(procsnz);
1069   }
1070   else {
1071     /* receive numeric values */
1072     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1073 
1074     /* receive message of values*/
1075     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1076     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1077     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1078 
1079     /* insert into matrix */
1080     jj      = rstart;
1081     smycols = mycols;
1082     svals   = vals;
1083     for ( i=0; i<m; i++ ) {
1084       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1085       smycols += ourlens[i];
1086       svals   += ourlens[i];
1087       jj++;
1088     }
1089   }
1090   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1091 
1092   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1093   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1094   return 0;
1095 }
1096