xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 3f3760d9f26c0cf679475d2d7a7a670d761122a5)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.21 1995/12/31 21:09:28 curfman Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
21     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
22   }
23   A->insertmode = addv;
24   for ( i=0; i<m; i++ ) {
25     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
26     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
27     if (idxm[i] >= rstart && idxm[i] < rend) {
28       row = idxm[i] - rstart;
29       if (roworiented) {
30         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
31       }
32       else {
33         for ( j=0; j<n; j++ ) {
34           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
35           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
36           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
37         }
38       }
39     }
40     else {
41       if (roworiented) {
42         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
43       }
44       else { /* must stash each seperately */
45         row = idxm[i];
46         for ( j=0; j<n; j++ ) {
47           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
48                  CHKERRQ(ierr);
49         }
50       }
51     }
52   }
53   return 0;
54 }
55 
56 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
57 {
58   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
59   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
60 
61   if (!mdn->assembled) SETERRQ(1,"MatGetValues_MPIDense:Not for unassembled matrix");
62   for ( i=0; i<m; i++ ) {
63     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
64     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
65     if (idxm[i] >= rstart && idxm[i] < rend) {
66       row = idxm[i] - rstart;
67       for ( j=0; j<n; j++ ) {
68         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
69         if (idxn[j] >= mdn->N)
70           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
71         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
72       }
73     }
74     else {
75       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
76     }
77   }
78   return 0;
79 }
80 
81 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
82 {
83   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
84   MPI_Comm     comm = mat->comm;
85   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
86   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
87   int          tag = mat->tag, *owner,*starts,count,ierr;
88   InsertMode   addv;
89   MPI_Request  *send_waits,*recv_waits;
90   Scalar       *rvalues,*svalues;
91 
92   /* make sure all processors are either in INSERTMODE or ADDMODE */
93   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
94                 MPI_BOR,comm);
95   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
96     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
97     }
98   mdn->insertmode = addv; /* in case this processor had no cache */
99 
100   /*  first count number of contributors to each processor */
101   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
102   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
103   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
104   for ( i=0; i<mdn->stash.n; i++ ) {
105     idx = mdn->stash.idx[i];
106     for ( j=0; j<size; j++ ) {
107       if (idx >= owners[j] && idx < owners[j+1]) {
108         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
109       }
110     }
111   }
112   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
113 
114   /* inform other processors of number of messages and max length*/
115   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
116   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
117   nreceives = work[rank];
118   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
119   nmax = work[rank];
120   PetscFree(work);
121 
122   /* post receives:
123        1) each message will consist of ordered pairs
124      (global index,value) we store the global index as a double
125      to simplify the message passing.
126        2) since we don't know how long each individual message is we
127      allocate the largest needed buffer for each receive. Potentially
128      this is a lot of wasted space.
129 
130        This could be done better.
131   */
132   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
133   CHKPTRQ(rvalues);
134   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
135   CHKPTRQ(recv_waits);
136   for ( i=0; i<nreceives; i++ ) {
137     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
138               comm,recv_waits+i);
139   }
140 
141   /* do sends:
142       1) starts[i] gives the starting index in svalues for stuff going to
143          the ith processor
144   */
145   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
146   CHKPTRQ(svalues);
147   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
148   CHKPTRQ(send_waits);
149   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
150   starts[0] = 0;
151   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
152   for ( i=0; i<mdn->stash.n; i++ ) {
153     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
154     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
155     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
156   }
157   PetscFree(owner);
158   starts[0] = 0;
159   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
160   count = 0;
161   for ( i=0; i<size; i++ ) {
162     if (procs[i]) {
163       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
164                 comm,send_waits+count++);
165     }
166   }
167   PetscFree(starts); PetscFree(nprocs);
168 
169   /* Free cache space */
170   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
171 
172   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
173   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
174   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
175   mdn->rmax       = nmax;
176 
177   return 0;
178 }
179 extern int MatSetUpMultiply_MPIDense(Mat);
180 
181 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
182 {
183   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
184   MPI_Status   *send_status,recv_status;
185   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
186   Scalar       *values,val;
187   InsertMode   addv = mdn->insertmode;
188 
189   /*  wait on receives */
190   while (count) {
191     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
192     /* unpack receives into our local space */
193     values = mdn->rvalues + 3*imdex*mdn->rmax;
194     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
195     n = n/3;
196     for ( i=0; i<n; i++ ) {
197       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
198       col = (int) PETSCREAL(values[3*i+1]);
199       val = values[3*i+2];
200       if (col >= 0 && col < mdn->N) {
201         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
202       }
203       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
204     }
205     count--;
206   }
207   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
208 
209   /* wait on sends */
210   if (mdn->nsends) {
211     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
212     CHKPTRQ(send_status);
213     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
214     PetscFree(send_status);
215   }
216   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
217 
218   mdn->insertmode = NOT_SET_VALUES;
219   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
220   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
221 
222   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
223     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
224   }
225   mdn->assembled = 1;
226   return 0;
227 }
228 
229 static int MatZeroEntries_MPIDense(Mat A)
230 {
231   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
232   return MatZeroEntries(l->A);
233 }
234 
235 /* the code does not do the diagonal entries correctly unless the
236    matrix is square and the column and row owerships are identical.
237    This is a BUG. The only way to fix it seems to be to access
238    mdn->A and mdn->B directly and not through the MatZeroRows()
239    routine.
240 */
241 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
242 {
243   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
244   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
245   int            *procs,*nprocs,j,found,idx,nsends,*work;
246   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
247   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
248   int            *lens,imdex,*lrows,*values;
249   MPI_Comm       comm = A->comm;
250   MPI_Request    *send_waits,*recv_waits;
251   MPI_Status     recv_status,*send_status;
252   IS             istmp;
253 
254   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
255   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
256   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
257 
258   /*  first count number of contributors to each processor */
259   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
260   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
261   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
262   for ( i=0; i<N; i++ ) {
263     idx = rows[i];
264     found = 0;
265     for ( j=0; j<size; j++ ) {
266       if (idx >= owners[j] && idx < owners[j+1]) {
267         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
268       }
269     }
270     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
271   }
272   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
273 
274   /* inform other processors of number of messages and max length*/
275   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
276   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
277   nrecvs = work[rank];
278   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
279   nmax = work[rank];
280   PetscFree(work);
281 
282   /* post receives:   */
283   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
284   CHKPTRQ(rvalues);
285   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
286   CHKPTRQ(recv_waits);
287   for ( i=0; i<nrecvs; i++ ) {
288     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
289   }
290 
291   /* do sends:
292       1) starts[i] gives the starting index in svalues for stuff going to
293          the ith processor
294   */
295   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
296   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
297   CHKPTRQ(send_waits);
298   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
299   starts[0] = 0;
300   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
301   for ( i=0; i<N; i++ ) {
302     svalues[starts[owner[i]]++] = rows[i];
303   }
304   ISRestoreIndices(is,&rows);
305 
306   starts[0] = 0;
307   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   count = 0;
309   for ( i=0; i<size; i++ ) {
310     if (procs[i]) {
311       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
312     }
313   }
314   PetscFree(starts);
315 
316   base = owners[rank];
317 
318   /*  wait on receives */
319   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
320   source = lens + nrecvs;
321   count  = nrecvs; slen = 0;
322   while (count) {
323     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
324     /* unpack receives into our local space */
325     MPI_Get_count(&recv_status,MPI_INT,&n);
326     source[imdex]  = recv_status.MPI_SOURCE;
327     lens[imdex]  = n;
328     slen += n;
329     count--;
330   }
331   PetscFree(recv_waits);
332 
333   /* move the data into the send scatter */
334   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
335   count = 0;
336   for ( i=0; i<nrecvs; i++ ) {
337     values = rvalues + i*nmax;
338     for ( j=0; j<lens[i]; j++ ) {
339       lrows[count++] = values[j] - base;
340     }
341   }
342   PetscFree(rvalues); PetscFree(lens);
343   PetscFree(owner); PetscFree(nprocs);
344 
345   /* actually zap the local rows */
346   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
347   PLogObjectParent(A,istmp);
348   PetscFree(lrows);
349   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
350   ierr = ISDestroy(istmp); CHKERRQ(ierr);
351 
352   /* wait on sends */
353   if (nsends) {
354     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(nsends,send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(send_waits); PetscFree(svalues);
360 
361   return 0;
362 }
363 
364 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
365 {
366   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
367   int          ierr;
368   if (!mdn->assembled)
369     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
370   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
371   CHKERRQ(ierr);
372   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
373   CHKERRQ(ierr);
374   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
375   return 0;
376 }
377 
378 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
379 {
380   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
381   int          ierr;
382   if (!mdn->assembled)
383     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
384   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
385   CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
387   CHKERRQ(ierr);
388   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
389   return 0;
390 }
391 
392 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
395   int          ierr;
396   Scalar       zero = 0.0;
397 
398   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
399   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
400   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
401   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
402          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
404          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
405   return 0;
406 }
407 
408 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
409 {
410   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
411   int          ierr;
412 
413   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
414   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
415   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
416   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
417          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
418   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
419          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
420   return 0;
421 }
422 
423 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
424 {
425   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
426   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
427   int          ierr, i, n, m = a->m, radd;
428   Scalar       *x;
429 
430   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
431   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
432   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
433   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
434   radd = a->rstart*m*m;
435   for ( i=0; i<m; i++ ) {
436     x[i] = aloc->v[radd + i*m + i];
437   }
438   return 0;
439 }
440 
441 static int MatDestroy_MPIDense(PetscObject obj)
442 {
443   Mat          mat = (Mat) obj;
444   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
445   int          ierr;
446 
447 #if defined(PETSC_LOG)
448   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
449 #endif
450   PetscFree(mdn->rowners);
451   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
452   if (mdn->lvec)   VecDestroy(mdn->lvec);
453   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
454   if (mdn->factor) {
455     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
456     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
457     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
458     PetscFree(mdn->factor);
459   }
460   PetscFree(mdn);
461   PLogObjectDestroy(mat);
462   PetscHeaderDestroy(mat);
463   return 0;
464 }
465 
466 #include "pinclude/pviewer.h"
467 
468 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
469 {
470   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
471   int          ierr;
472   if (mdn->size == 1) {
473     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
474   }
475   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
476   return 0;
477 }
478 
479 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
480 {
481   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
482   int          ierr, format;
483   PetscObject  vobj = (PetscObject) viewer;
484   FILE         *fd;
485 
486   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
487   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
488     ierr = ViewerFileGetFormat_Private(viewer,&format);
489     if (format == FILE_FORMAT_INFO_DETAILED) {
490       int nz, nzalloc, mem, rank;
491       MPI_Comm_rank(mat->comm,&rank);
492       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
493       MPIU_Seq_begin(mat->comm,1);
494         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
495             rank,mdn->m,nz,nzalloc,mem);
496       fflush(fd);
497       MPIU_Seq_end(mat->comm,1);
498       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
499       return 0;
500     }
501     else if (format == FILE_FORMAT_INFO) {
502       return 0;
503     }
504   }
505   if (vobj->type == ASCII_FILE_VIEWER) {
506     MPIU_Seq_begin(mat->comm,1);
507     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
508              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
509     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
510     fflush(fd);
511     MPIU_Seq_end(mat->comm,1);
512   }
513   else {
514     int size = mdn->size, rank = mdn->rank;
515     if (size == 1) {
516       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
517     }
518     else {
519       /* assemble the entire matrix onto first processor. */
520       Mat          A;
521       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
522       Scalar       *vals;
523       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
524 
525       if (!rank) {
526         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
527       }
528       else {
529         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
530       }
531       PLogObjectParent(mat,A);
532 
533       /* Copy the matrix ... This isn't the most efficient means,
534          but it's quick for now */
535       row = mdn->rstart; m = Amdn->m;
536       for ( i=0; i<m; i++ ) {
537         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
539         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
540         row++;
541       }
542 
543       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
544       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
545       if (!rank) {
546         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
547       }
548       ierr = MatDestroy(A); CHKERRQ(ierr);
549     }
550   }
551   return 0;
552 }
553 
554 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
555 {
556   Mat          mat = (Mat) obj;
557   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
558   PetscObject  vobj = (PetscObject) viewer;
559   int          ierr;
560 
561   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
562   if (!viewer) {
563     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
564   }
565   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
566     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
567   }
568   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
569     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
570   }
571   else if (vobj->type == BINARY_FILE_VIEWER) {
572     return MatView_MPIDense_Binary(mat,viewer);
573   }
574   return 0;
575 }
576 
577 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
578                              int *nzalloc,int *mem)
579 {
580   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
581   Mat          mdn = mat->A;
582   int          ierr, isend[3], irecv[3];
583 
584   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
585   if (flag == MAT_LOCAL) {
586     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
587   } else if (flag == MAT_GLOBAL_MAX) {
588     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
589     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
590   } else if (flag == MAT_GLOBAL_SUM) {
591     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
592     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
593   }
594   return 0;
595 }
596 
597 extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
598 extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
599 extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
600 extern int MatSolve_MPIDense(Mat,Vec,Vec);
601 /* extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
602 extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
603 extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
604 
605 static int MatSetOption_MPIDense(Mat A,MatOption op)
606 {
607   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
608 
609   if (op == NO_NEW_NONZERO_LOCATIONS ||
610       op == YES_NEW_NONZERO_LOCATIONS ||
611       op == COLUMNS_SORTED ||
612       op == ROW_ORIENTED) {
613         MatSetOption(a->A,op);
614   }
615   else if (op == ROWS_SORTED ||
616            op == SYMMETRIC_MATRIX ||
617            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
618            op == YES_NEW_DIAGONALS)
619     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
620   else if (op == COLUMN_ORIENTED)
621     { a->roworiented = 0; MatSetOption(a->A,op);}
622   else if (op == NO_NEW_DIAGONALS)
623     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
624   else
625     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
626   return 0;
627 }
628 
629 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
630 {
631   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
632   *m = mat->M; *n = mat->N;
633   return 0;
634 }
635 
636 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
637 {
638   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
639   *m = mat->m; *n = mat->N;
640   return 0;
641 }
642 
643 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
644 {
645   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
646   *m = mat->rstart; *n = mat->rend;
647   return 0;
648 }
649 
650 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
651 {
652   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
653   int          lrow, rstart = mat->rstart, rend = mat->rend;
654 
655   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
656   lrow = row - rstart;
657   return MatGetRow(mat->A,lrow,nz,idx,v);
658 }
659 
660 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
661 {
662   if (idx) PetscFree(*idx);
663   if (v) PetscFree(*v);
664   return 0;
665 }
666 
667 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
668 {
669   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
670   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
671   int          ierr, i, j;
672   double       sum = 0.0;
673   Scalar       *v = mat->v;
674 
675   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
676   if (mdn->size == 1) {
677     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
678   } else {
679     if (type == NORM_FROBENIUS) {
680       for (i=0; i<mat->n*mat->m; i++ ) {
681 #if defined(PETSC_COMPLEX)
682         sum += real(conj(*v)*(*v)); v++;
683 #else
684         sum += (*v)*(*v); v++;
685 #endif
686       }
687       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
688       *norm = sqrt(*norm);
689       PLogFlops(2*mat->n*mat->m);
690     }
691     else if (type == NORM_1) {
692       double *tmp, *tmp2;
693       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
694       tmp2 = tmp + mdn->N;
695       PetscMemzero(tmp,2*mdn->N*sizeof(double));
696       *norm = 0.0;
697       v = mat->v;
698       for ( j=0; j<mat->n; j++ ) {
699         for ( i=0; i<mat->m; i++ ) {
700           tmp[j] += PetscAbsScalar(*v);  v++;
701         }
702       }
703       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
704       for ( j=0; j<mdn->N; j++ ) {
705         if (tmp2[j] > *norm) *norm = tmp2[j];
706       }
707       PetscFree(tmp);
708       PLogFlops(mat->n*mat->m);
709     }
710     else if (type == NORM_INFINITY) { /* max row norm */
711       double ntemp;
712       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
713       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
714     }
715     else {
716       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
717     }
718   }
719   return 0;
720 }
721 
722 static int MatTranspose_MPIDense(Mat A,Mat *matout)
723 {
724   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
725   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
726   Mat          B;
727   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
728   int          j, i, ierr;
729   Scalar       *v;
730 
731   if (!matout && M != N)
732     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
733   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
734          CHKERRQ(ierr);
735 
736   m = Aloc->m; n = Aloc->n; v = Aloc->v;
737   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
738   for ( j=0; j<n; j++ ) {
739     for (i=0; i<m; i++) rwork[i] = rstart + i;
740     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
741     v += m;
742   }
743   PetscFree(rwork);
744   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
745   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
746   if (matout) {
747     *matout = B;
748   } else {
749     /* This isn't really an in-place transpose, but free data struct from a */
750     PetscFree(a->rowners);
751     ierr = MatDestroy(a->A); CHKERRQ(ierr);
752     if (a->lvec) VecDestroy(a->lvec);
753     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
754     PetscFree(a);
755     PetscMemcpy(A,B,sizeof(struct _Mat));
756     PetscHeaderDestroy(B);
757   }
758   return 0;
759 }
760 
761 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
762 
763 /* -------------------------------------------------------------------*/
764 static struct _MatOps MatOps = {MatSetValues_MPIDense,
765        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
766        MatMult_MPIDense,MatMultAdd_MPIDense,
767        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
768        MatSolve_MPIDense,0,
769        0,0,
770        MatLUFactor_MPIDense,0,
771        0,MatTranspose_MPIDense,
772        MatGetInfo_MPIDense,0,
773        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
774        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
775        0,
776        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
777        0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense,
778        0,0,
779        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
780        MatGetOwnershipRange_MPIDense,
781        0,0,
782        0,0,0,0,0,MatConvertSameType_MPIDense,
783        0,0,0,0,0,
784        0,0,MatGetValues_MPIDense};
785 
786 /*@C
787    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
788 
789    Input Parameters:
790 .  comm - MPI communicator
791 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
792 .  n - number of local columns (or PETSC_DECIDE to have calculated
793            if N is given)
794 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
795 .  N - number of global columns (or PETSC_DECIDE to have calculated
796            if n is given)
797 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
798    to control all matrix memory allocation.
799 
800    Output Parameter:
801 .  newmat - the matrix
802 
803    Notes:
804    The dense format is fully compatible with standard Fortran 77
805    storage by columns.
806 
807    The data input variable is intended primarily for Fortran programmers
808    who wish to allocate their own matrix memory space.  Most users should
809    set data=PETSC_NULL.
810 
811    The user MUST specify either the local or global matrix dimensions
812    (possibly both).
813 
814    Currently, the only parallel dense matrix decomposition is by rows,
815    so that n=N and each submatrix owns all of the global columns.
816 
817 .keywords: matrix, dense, parallel
818 
819 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
820 @*/
821 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
822 {
823   Mat          mat;
824   Mat_MPIDense *a;
825   int          ierr, i;
826 
827 /* Note:  For now, when data is specified above, this assumes the user correctly
828    allocates the local dense storage space.  We should add error checking. */
829 
830   *newmat         = 0;
831   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
832   PLogObjectCreate(mat);
833   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
834   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
835   mat->destroy    = MatDestroy_MPIDense;
836   mat->view       = MatView_MPIDense;
837   mat->factor     = 0;
838 
839   a->factor = 0;
840   a->insertmode = NOT_SET_VALUES;
841   MPI_Comm_rank(comm,&a->rank);
842   MPI_Comm_size(comm,&a->size);
843 
844   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
845   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
846 
847   /* each row stores all columns */
848   if (N == PETSC_DECIDE) N = n;
849   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
850   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
851   a->N = N;
852   a->M = M;
853   a->m = m;
854   a->n = n;
855 
856   /* build local table of row and column ownerships */
857   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
858   a->cowners = a->rowners + a->size + 1;
859   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
860                        sizeof(Mat_MPIDense));
861   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
862   a->rowners[0] = 0;
863   for ( i=2; i<=a->size; i++ ) {
864     a->rowners[i] += a->rowners[i-1];
865   }
866   a->rstart = a->rowners[a->rank];
867   a->rend   = a->rowners[a->rank+1];
868   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
869   a->cowners[0] = 0;
870   for ( i=2; i<=a->size; i++ ) {
871     a->cowners[i] += a->cowners[i-1];
872   }
873 
874   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
875   PLogObjectParent(mat,a->A);
876 
877   /* build cache for off array entries formed */
878   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
879 
880   /* stuff used for matrix vector multiply */
881   a->lvec        = 0;
882   a->Mvctx       = 0;
883   a->assembled   = 0;
884   a->roworiented = 1;
885 
886   *newmat = mat;
887   return 0;
888 }
889 
890 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
891 {
892   Mat          mat;
893   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
894   int          ierr;
895   FactorCtx    *factor;
896 
897   if (!oldmat->assembled) SETERRQ(1,"MatConvertSameType_MPIDense:Must assemble matrix");
898   *newmat       = 0;
899   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
900   PLogObjectCreate(mat);
901   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
902   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
903   mat->destroy  = MatDestroy_MPIDense;
904   mat->view     = MatView_MPIDense;
905   mat->factor   = A->factor;
906 
907   a->m          = oldmat->m;
908   a->n          = oldmat->n;
909   a->M          = oldmat->M;
910   a->N          = oldmat->N;
911   if (oldmat->factor) {
912     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
913     /* copy factor contents ... add this code! */
914   } else a->factor = 0;
915 
916   a->assembled  = 1;
917   a->rstart     = oldmat->rstart;
918   a->rend       = oldmat->rend;
919   a->size       = oldmat->size;
920   a->rank       = oldmat->rank;
921   a->insertmode = NOT_SET_VALUES;
922 
923   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
924   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
925   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
926   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
927 
928   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
929   PLogObjectParent(mat,a->lvec);
930   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
931   PLogObjectParent(mat,a->Mvctx);
932   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
933   PLogObjectParent(mat,a->A);
934   *newmat = mat;
935   return 0;
936 }
937 
938 #include "sysio.h"
939 
940 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
941 {
942   Mat          A;
943   int          i, nz, ierr, j,rstart, rend, fd;
944   Scalar       *vals,*svals;
945   PetscObject  vobj = (PetscObject) bview;
946   MPI_Comm     comm = vobj->comm;
947   MPI_Status   status;
948   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
949   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
950   int          tag = ((PetscObject)bview)->tag;
951 
952   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
953   if (!rank) {
954     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
955     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
956     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
957   }
958 
959   MPI_Bcast(header+1,3,MPI_INT,0,comm);
960   M = header[1]; N = header[2];
961   /* determine ownership of all rows */
962   m = M/size + ((M % size) > rank);
963   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
964   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
965   rowners[0] = 0;
966   for ( i=2; i<=size; i++ ) {
967     rowners[i] += rowners[i-1];
968   }
969   rstart = rowners[rank];
970   rend   = rowners[rank+1];
971 
972   /* distribute row lengths to all processors */
973   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
974   offlens = ourlens + (rend-rstart);
975   if (!rank) {
976     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
977     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
978     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
979     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
980     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
981     PetscFree(sndcounts);
982   }
983   else {
984     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
985   }
986 
987   if (!rank) {
988     /* calculate the number of nonzeros on each processor */
989     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
990     PetscMemzero(procsnz,size*sizeof(int));
991     for ( i=0; i<size; i++ ) {
992       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
993         procsnz[i] += rowlengths[j];
994       }
995     }
996     PetscFree(rowlengths);
997 
998     /* determine max buffer needed and allocate it */
999     maxnz = 0;
1000     for ( i=0; i<size; i++ ) {
1001       maxnz = PetscMax(maxnz,procsnz[i]);
1002     }
1003     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1004 
1005     /* read in my part of the matrix column indices  */
1006     nz = procsnz[0];
1007     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1008     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1009 
1010     /* read in every one elses and ship off */
1011     for ( i=1; i<size; i++ ) {
1012       nz = procsnz[i];
1013       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1014       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1015     }
1016     PetscFree(cols);
1017   }
1018   else {
1019     /* determine buffer space needed for message */
1020     nz = 0;
1021     for ( i=0; i<m; i++ ) {
1022       nz += ourlens[i];
1023     }
1024     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1025 
1026     /* receive message of column indices*/
1027     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1028     MPI_Get_count(&status,MPI_INT,&maxnz);
1029     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1030   }
1031 
1032   /* loop over local rows, determining number of off diagonal entries */
1033   PetscMemzero(offlens,m*sizeof(int));
1034   jj = 0;
1035   for ( i=0; i<m; i++ ) {
1036     for ( j=0; j<ourlens[i]; j++ ) {
1037       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1038       jj++;
1039     }
1040   }
1041 
1042   /* create our matrix */
1043   for ( i=0; i<m; i++ ) {
1044     ourlens[i] -= offlens[i];
1045   }
1046   if (type == MATMPIDENSE) {
1047     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat); CHKERRQ(ierr);
1048   }
1049   A = *newmat;
1050   for ( i=0; i<m; i++ ) {
1051     ourlens[i] += offlens[i];
1052   }
1053 
1054   if (!rank) {
1055     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1056 
1057     /* read in my part of the matrix numerical values  */
1058     nz = procsnz[0];
1059     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1060 
1061     /* insert into matrix */
1062     jj      = rstart;
1063     smycols = mycols;
1064     svals   = vals;
1065     for ( i=0; i<m; i++ ) {
1066       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1067       smycols += ourlens[i];
1068       svals   += ourlens[i];
1069       jj++;
1070     }
1071 
1072     /* read in other processors and ship out */
1073     for ( i=1; i<size; i++ ) {
1074       nz = procsnz[i];
1075       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1076       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1077     }
1078     PetscFree(procsnz);
1079   }
1080   else {
1081     /* receive numeric values */
1082     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1083 
1084     /* receive message of values*/
1085     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1086     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1087     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1088 
1089     /* insert into matrix */
1090     jj      = rstart;
1091     smycols = mycols;
1092     svals   = vals;
1093     for ( i=0; i<m; i++ ) {
1094       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1095       smycols += ourlens[i];
1096       svals   += ourlens[i];
1097       jj++;
1098     }
1099   }
1100   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1101 
1102   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1103   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1104   return 0;
1105 }
1106