xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision ee0de897fec99be7c10c7637bbd068d61b2ce625)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.24 1996/01/09 01:13:54 curfman Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
21     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
22   }
23   A->insertmode = addv;
24   for ( i=0; i<m; i++ ) {
25     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
26     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
27     if (idxm[i] >= rstart && idxm[i] < rend) {
28       row = idxm[i] - rstart;
29       if (roworiented) {
30         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
31       }
32       else {
33         for ( j=0; j<n; j++ ) {
34           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
35           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
36           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
37         }
38       }
39     }
40     else {
41       if (roworiented) {
42         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
43       }
44       else { /* must stash each seperately */
45         row = idxm[i];
46         for ( j=0; j<n; j++ ) {
47           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
48                  CHKERRQ(ierr);
49         }
50       }
51     }
52   }
53   return 0;
54 }
55 
56 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
57 {
58   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
59   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
60 
61   if (!mdn->assembled) SETERRQ(1,"MatGetValues_MPIDense:Not for unassembled matrix");
62   for ( i=0; i<m; i++ ) {
63     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
64     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
65     if (idxm[i] >= rstart && idxm[i] < rend) {
66       row = idxm[i] - rstart;
67       for ( j=0; j<n; j++ ) {
68         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
69         if (idxn[j] >= mdn->N)
70           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
71         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
72       }
73     }
74     else {
75       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
76     }
77   }
78   return 0;
79 }
80 
81 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
82 {
83   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
84   MPI_Comm     comm = mat->comm;
85   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
86   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
87   int          tag = mat->tag, *owner,*starts,count,ierr;
88   InsertMode   addv;
89   MPI_Request  *send_waits,*recv_waits;
90   Scalar       *rvalues,*svalues;
91 
92   /* make sure all processors are either in INSERTMODE or ADDMODE */
93   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
94                 MPI_BOR,comm);
95   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
96     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
97     }
98   mdn->insertmode = addv; /* in case this processor had no cache */
99 
100   /*  first count number of contributors to each processor */
101   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
102   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
103   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
104   for ( i=0; i<mdn->stash.n; i++ ) {
105     idx = mdn->stash.idx[i];
106     for ( j=0; j<size; j++ ) {
107       if (idx >= owners[j] && idx < owners[j+1]) {
108         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
109       }
110     }
111   }
112   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
113 
114   /* inform other processors of number of messages and max length*/
115   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
116   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
117   nreceives = work[rank];
118   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
119   nmax = work[rank];
120   PetscFree(work);
121 
122   /* post receives:
123        1) each message will consist of ordered pairs
124      (global index,value) we store the global index as a double
125      to simplify the message passing.
126        2) since we don't know how long each individual message is we
127      allocate the largest needed buffer for each receive. Potentially
128      this is a lot of wasted space.
129 
130        This could be done better.
131   */
132   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
133   CHKPTRQ(rvalues);
134   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
135   CHKPTRQ(recv_waits);
136   for ( i=0; i<nreceives; i++ ) {
137     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
138               comm,recv_waits+i);
139   }
140 
141   /* do sends:
142       1) starts[i] gives the starting index in svalues for stuff going to
143          the ith processor
144   */
145   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
146   CHKPTRQ(svalues);
147   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
148   CHKPTRQ(send_waits);
149   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
150   starts[0] = 0;
151   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
152   for ( i=0; i<mdn->stash.n; i++ ) {
153     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
154     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
155     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
156   }
157   PetscFree(owner);
158   starts[0] = 0;
159   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
160   count = 0;
161   for ( i=0; i<size; i++ ) {
162     if (procs[i]) {
163       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
164                 comm,send_waits+count++);
165     }
166   }
167   PetscFree(starts); PetscFree(nprocs);
168 
169   /* Free cache space */
170   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
171 
172   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
173   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
174   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
175   mdn->rmax       = nmax;
176 
177   return 0;
178 }
179 extern int MatSetUpMultiply_MPIDense(Mat);
180 
181 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
182 {
183   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
184   MPI_Status   *send_status,recv_status;
185   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
186   Scalar       *values,val;
187   InsertMode   addv = mdn->insertmode;
188 
189   /*  wait on receives */
190   while (count) {
191     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
192     /* unpack receives into our local space */
193     values = mdn->rvalues + 3*imdex*mdn->rmax;
194     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
195     n = n/3;
196     for ( i=0; i<n; i++ ) {
197       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
198       col = (int) PETSCREAL(values[3*i+1]);
199       val = values[3*i+2];
200       if (col >= 0 && col < mdn->N) {
201         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
202       }
203       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
204     }
205     count--;
206   }
207   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
208 
209   /* wait on sends */
210   if (mdn->nsends) {
211     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
212     CHKPTRQ(send_status);
213     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
214     PetscFree(send_status);
215   }
216   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
217 
218   mdn->insertmode = NOT_SET_VALUES;
219   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
220   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
221 
222   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
223     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
224   }
225   mdn->assembled = 1;
226   return 0;
227 }
228 
229 static int MatZeroEntries_MPIDense(Mat A)
230 {
231   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
232   return MatZeroEntries(l->A);
233 }
234 
235 /* the code does not do the diagonal entries correctly unless the
236    matrix is square and the column and row owerships are identical.
237    This is a BUG. The only way to fix it seems to be to access
238    mdn->A and mdn->B directly and not through the MatZeroRows()
239    routine.
240 */
241 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
242 {
243   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
244   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
245   int            *procs,*nprocs,j,found,idx,nsends,*work;
246   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
247   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
248   int            *lens,imdex,*lrows,*values;
249   MPI_Comm       comm = A->comm;
250   MPI_Request    *send_waits,*recv_waits;
251   MPI_Status     recv_status,*send_status;
252   IS             istmp;
253 
254   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
255   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
256   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
257 
258   /*  first count number of contributors to each processor */
259   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
260   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
261   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
262   for ( i=0; i<N; i++ ) {
263     idx = rows[i];
264     found = 0;
265     for ( j=0; j<size; j++ ) {
266       if (idx >= owners[j] && idx < owners[j+1]) {
267         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
268       }
269     }
270     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
271   }
272   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
273 
274   /* inform other processors of number of messages and max length*/
275   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
276   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
277   nrecvs = work[rank];
278   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
279   nmax = work[rank];
280   PetscFree(work);
281 
282   /* post receives:   */
283   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
284   CHKPTRQ(rvalues);
285   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
286   CHKPTRQ(recv_waits);
287   for ( i=0; i<nrecvs; i++ ) {
288     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
289   }
290 
291   /* do sends:
292       1) starts[i] gives the starting index in svalues for stuff going to
293          the ith processor
294   */
295   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
296   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
297   CHKPTRQ(send_waits);
298   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
299   starts[0] = 0;
300   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
301   for ( i=0; i<N; i++ ) {
302     svalues[starts[owner[i]]++] = rows[i];
303   }
304   ISRestoreIndices(is,&rows);
305 
306   starts[0] = 0;
307   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   count = 0;
309   for ( i=0; i<size; i++ ) {
310     if (procs[i]) {
311       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
312     }
313   }
314   PetscFree(starts);
315 
316   base = owners[rank];
317 
318   /*  wait on receives */
319   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
320   source = lens + nrecvs;
321   count  = nrecvs; slen = 0;
322   while (count) {
323     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
324     /* unpack receives into our local space */
325     MPI_Get_count(&recv_status,MPI_INT,&n);
326     source[imdex]  = recv_status.MPI_SOURCE;
327     lens[imdex]  = n;
328     slen += n;
329     count--;
330   }
331   PetscFree(recv_waits);
332 
333   /* move the data into the send scatter */
334   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
335   count = 0;
336   for ( i=0; i<nrecvs; i++ ) {
337     values = rvalues + i*nmax;
338     for ( j=0; j<lens[i]; j++ ) {
339       lrows[count++] = values[j] - base;
340     }
341   }
342   PetscFree(rvalues); PetscFree(lens);
343   PetscFree(owner); PetscFree(nprocs);
344 
345   /* actually zap the local rows */
346   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
347   PLogObjectParent(A,istmp);
348   PetscFree(lrows);
349   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
350   ierr = ISDestroy(istmp); CHKERRQ(ierr);
351 
352   /* wait on sends */
353   if (nsends) {
354     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(nsends,send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(send_waits); PetscFree(svalues);
360 
361   return 0;
362 }
363 
364 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
365 {
366   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
367   int          ierr;
368   if (!mdn->assembled)
369     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
370   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
371   CHKERRQ(ierr);
372   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
373   CHKERRQ(ierr);
374   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
375   return 0;
376 }
377 
378 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
379 {
380   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
381   int          ierr;
382   if (!mdn->assembled)
383     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
384   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
385   CHKERRQ(ierr);
386   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
387   CHKERRQ(ierr);
388   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
389   return 0;
390 }
391 
392 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
395   int          ierr;
396   Scalar       zero = 0.0;
397 
398   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
399   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
400   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
401   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
402          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
404          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
405   return 0;
406 }
407 
408 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
409 {
410   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
411   int          ierr;
412 
413   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
414   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
415   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
416   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
417          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
418   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
419          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
420   return 0;
421 }
422 
423 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
424 {
425   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
426   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
427   int          ierr, i, n, m = a->m, radd;
428   Scalar       *x;
429 
430   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
431   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
432   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
433   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
434   radd = a->rstart*m*m;
435   for ( i=0; i<m; i++ ) {
436     x[i] = aloc->v[radd + i*m + i];
437   }
438   return 0;
439 }
440 
441 static int MatDestroy_MPIDense(PetscObject obj)
442 {
443   Mat          mat = (Mat) obj;
444   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
445   int          ierr;
446 
447 #if defined(PETSC_LOG)
448   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
449 #endif
450   PetscFree(mdn->rowners);
451   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
452   if (mdn->lvec)   VecDestroy(mdn->lvec);
453   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
454   if (mdn->factor) {
455     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
456     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
457     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
458     PetscFree(mdn->factor);
459   }
460   PetscFree(mdn);
461   PLogObjectDestroy(mat);
462   PetscHeaderDestroy(mat);
463   return 0;
464 }
465 
466 #include "pinclude/pviewer.h"
467 
468 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
469 {
470   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
471   int          ierr;
472   if (mdn->size == 1) {
473     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
474   }
475   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
476   return 0;
477 }
478 
479 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
480 {
481   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
482   int          ierr, format;
483   PetscObject  vobj = (PetscObject) viewer;
484   FILE         *fd;
485 
486   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
487   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
488     ierr = ViewerFileGetFormat_Private(viewer,&format);
489     if (format == FILE_FORMAT_INFO_DETAILED) {
490       int nz, nzalloc, mem, rank;
491       MPI_Comm_rank(mat->comm,&rank);
492       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
493       MPIU_Seq_begin(mat->comm,1);
494         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
495             rank,mdn->m,nz,nzalloc,mem);
496       fflush(fd);
497       MPIU_Seq_end(mat->comm,1);
498       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
499       return 0;
500     }
501     else if (format == FILE_FORMAT_INFO) {
502       return 0;
503     }
504   }
505   if (vobj->type == ASCII_FILE_VIEWER) {
506     MPIU_Seq_begin(mat->comm,1);
507     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
508              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
509     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
510     fflush(fd);
511     MPIU_Seq_end(mat->comm,1);
512   }
513   else {
514     int size = mdn->size, rank = mdn->rank;
515     if (size == 1) {
516       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
517     }
518     else {
519       /* assemble the entire matrix onto first processor. */
520       Mat          A;
521       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
522       Scalar       *vals;
523       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
524 
525       if (!rank) {
526         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
527       }
528       else {
529         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
530       }
531       PLogObjectParent(mat,A);
532 
533       /* Copy the matrix ... This isn't the most efficient means,
534          but it's quick for now */
535       row = mdn->rstart; m = Amdn->m;
536       for ( i=0; i<m; i++ ) {
537         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
538         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
539         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
540         row++;
541       }
542 
543       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
544       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
545       if (!rank) {
546         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
547       }
548       ierr = MatDestroy(A); CHKERRQ(ierr);
549     }
550   }
551   return 0;
552 }
553 
554 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
555 {
556   Mat          mat = (Mat) obj;
557   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
558   PetscObject  vobj = (PetscObject) viewer;
559   int          ierr;
560 
561   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
562   if (!viewer) {
563     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
564   }
565   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
566     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
567   }
568   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
569     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
570   }
571   else if (vobj->type == BINARY_FILE_VIEWER) {
572     return MatView_MPIDense_Binary(mat,viewer);
573   }
574   return 0;
575 }
576 
577 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
578                              int *nzalloc,int *mem)
579 {
580   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
581   Mat          mdn = mat->A;
582   int          ierr, isend[3], irecv[3];
583 
584   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
585   if (flag == MAT_LOCAL) {
586     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
587   } else if (flag == MAT_GLOBAL_MAX) {
588     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
589     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
590   } else if (flag == MAT_GLOBAL_SUM) {
591     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
592     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
593   }
594   return 0;
595 }
596 
597 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
598    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
599    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
600    extern int MatSolve_MPIDense(Mat,Vec,Vec);
601    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
602    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
603    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
604 
605 static int MatSetOption_MPIDense(Mat A,MatOption op)
606 {
607   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
608 
609   if (op == NO_NEW_NONZERO_LOCATIONS ||
610       op == YES_NEW_NONZERO_LOCATIONS ||
611       op == COLUMNS_SORTED ||
612       op == ROW_ORIENTED) {
613         MatSetOption(a->A,op);
614   }
615   else if (op == ROWS_SORTED ||
616            op == SYMMETRIC_MATRIX ||
617            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
618            op == YES_NEW_DIAGONALS)
619     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
620   else if (op == COLUMN_ORIENTED)
621     { a->roworiented = 0; MatSetOption(a->A,op);}
622   else if (op == NO_NEW_DIAGONALS)
623     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
624   else
625     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
626   return 0;
627 }
628 
629 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
630 {
631   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
632   *m = mat->M; *n = mat->N;
633   return 0;
634 }
635 
636 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
637 {
638   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
639   *m = mat->m; *n = mat->N;
640   return 0;
641 }
642 
643 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
644 {
645   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
646   *m = mat->rstart; *n = mat->rend;
647   return 0;
648 }
649 
650 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
651 {
652   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
653   int          lrow, rstart = mat->rstart, rend = mat->rend;
654 
655   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
656   lrow = row - rstart;
657   return MatGetRow(mat->A,lrow,nz,idx,v);
658 }
659 
660 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
661 {
662   if (idx) PetscFree(*idx);
663   if (v) PetscFree(*v);
664   return 0;
665 }
666 
667 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
668 {
669   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
670   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
671   int          ierr, i, j;
672   double       sum = 0.0;
673   Scalar       *v = mat->v;
674 
675   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
676   if (mdn->size == 1) {
677     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
678   } else {
679     if (type == NORM_FROBENIUS) {
680       for (i=0; i<mat->n*mat->m; i++ ) {
681 #if defined(PETSC_COMPLEX)
682         sum += real(conj(*v)*(*v)); v++;
683 #else
684         sum += (*v)*(*v); v++;
685 #endif
686       }
687       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
688       *norm = sqrt(*norm);
689       PLogFlops(2*mat->n*mat->m);
690     }
691     else if (type == NORM_1) {
692       double *tmp, *tmp2;
693       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
694       tmp2 = tmp + mdn->N;
695       PetscMemzero(tmp,2*mdn->N*sizeof(double));
696       *norm = 0.0;
697       v = mat->v;
698       for ( j=0; j<mat->n; j++ ) {
699         for ( i=0; i<mat->m; i++ ) {
700           tmp[j] += PetscAbsScalar(*v);  v++;
701         }
702       }
703       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
704       for ( j=0; j<mdn->N; j++ ) {
705         if (tmp2[j] > *norm) *norm = tmp2[j];
706       }
707       PetscFree(tmp);
708       PLogFlops(mat->n*mat->m);
709     }
710     else if (type == NORM_INFINITY) { /* max row norm */
711       double ntemp;
712       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
713       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
714     }
715     else {
716       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
717     }
718   }
719   return 0;
720 }
721 
722 static int MatTranspose_MPIDense(Mat A,Mat *matout)
723 {
724   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
725   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
726   Mat          B;
727   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
728   int          j, i, ierr;
729   Scalar       *v;
730 
731   if (!matout && M != N)
732     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
733   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
734          CHKERRQ(ierr);
735 
736   m = Aloc->m; n = Aloc->n; v = Aloc->v;
737   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
738   for ( j=0; j<n; j++ ) {
739     for (i=0; i<m; i++) rwork[i] = rstart + i;
740     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
741     v += m;
742   }
743   PetscFree(rwork);
744   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
745   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
746   if (matout) {
747     *matout = B;
748   } else {
749     /* This isn't really an in-place transpose, but free data struct from a */
750     PetscFree(a->rowners);
751     ierr = MatDestroy(a->A); CHKERRQ(ierr);
752     if (a->lvec) VecDestroy(a->lvec);
753     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
754     PetscFree(a);
755     PetscMemcpy(A,B,sizeof(struct _Mat));
756     PetscHeaderDestroy(B);
757   }
758   return 0;
759 }
760 
761 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
762 
763 /* -------------------------------------------------------------------*/
764 static struct _MatOps MatOps = {MatSetValues_MPIDense,
765        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
766        MatMult_MPIDense,MatMultAdd_MPIDense,
767        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
768 /*       MatSolve_MPIDense,0, */
769        0,0,
770        0,0,
771        0,0,
772 /*       MatLUFactor_MPIDense,0, */
773        0,MatTranspose_MPIDense,
774        MatGetInfo_MPIDense,0,
775        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
776        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
777        0,
778        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
779        0,0,0,
780 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
781        0,0,
782        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
783        MatGetOwnershipRange_MPIDense,
784        0,0,
785        0,0,0,0,0,MatConvertSameType_MPIDense,
786        0,0,0,0,0,
787        0,0,MatGetValues_MPIDense};
788 
789 /*@C
790    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
791 
792    Input Parameters:
793 .  comm - MPI communicator
794 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
795 .  n - number of local columns (or PETSC_DECIDE to have calculated
796            if N is given)
797 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
798 .  N - number of global columns (or PETSC_DECIDE to have calculated
799            if n is given)
800 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
801    to control all matrix memory allocation.
802 
803    Output Parameter:
804 .  newmat - the matrix
805 
806    Notes:
807    The dense format is fully compatible with standard Fortran 77
808    storage by columns.
809 
810    The data input variable is intended primarily for Fortran programmers
811    who wish to allocate their own matrix memory space.  Most users should
812    set data=PETSC_NULL.
813 
814    The user MUST specify either the local or global matrix dimensions
815    (possibly both).
816 
817    Currently, the only parallel dense matrix decomposition is by rows,
818    so that n=N and each submatrix owns all of the global columns.
819 
820 .keywords: matrix, dense, parallel
821 
822 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
823 @*/
824 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
825 {
826   Mat          mat;
827   Mat_MPIDense *a;
828   int          ierr, i,flg;
829 
830 /* Note:  For now, when data is specified above, this assumes the user correctly
831    allocates the local dense storage space.  We should add error checking. */
832 
833   *newmat         = 0;
834   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
835   PLogObjectCreate(mat);
836   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
837   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
838   mat->destroy    = MatDestroy_MPIDense;
839   mat->view       = MatView_MPIDense;
840   mat->factor     = 0;
841 
842   a->factor = 0;
843   a->insertmode = NOT_SET_VALUES;
844   MPI_Comm_rank(comm,&a->rank);
845   MPI_Comm_size(comm,&a->size);
846 
847   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
848   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
849 
850   /* each row stores all columns */
851   if (N == PETSC_DECIDE) N = n;
852   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
853   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
854   a->N = N;
855   a->M = M;
856   a->m = m;
857   a->n = n;
858 
859   /* build local table of row and column ownerships */
860   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
861   a->cowners = a->rowners + a->size + 1;
862   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
863                        sizeof(Mat_MPIDense));
864   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
865   a->rowners[0] = 0;
866   for ( i=2; i<=a->size; i++ ) {
867     a->rowners[i] += a->rowners[i-1];
868   }
869   a->rstart = a->rowners[a->rank];
870   a->rend   = a->rowners[a->rank+1];
871   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
872   a->cowners[0] = 0;
873   for ( i=2; i<=a->size; i++ ) {
874     a->cowners[i] += a->cowners[i-1];
875   }
876 
877   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
878   PLogObjectParent(mat,a->A);
879 
880   /* build cache for off array entries formed */
881   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
882 
883   /* stuff used for matrix vector multiply */
884   a->lvec        = 0;
885   a->Mvctx       = 0;
886   a->assembled   = 0;
887   a->roworiented = 1;
888 
889   *newmat = mat;
890   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
891   if (flg) {
892     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
893   }
894   return 0;
895 }
896 
897 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
898 {
899   Mat          mat;
900   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
901   int          ierr;
902   FactorCtx    *factor;
903 
904   if (!oldmat->assembled) SETERRQ(1,"MatConvertSameType_MPIDense:Must assemble matrix");
905   *newmat       = 0;
906   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
907   PLogObjectCreate(mat);
908   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
909   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
910   mat->destroy  = MatDestroy_MPIDense;
911   mat->view     = MatView_MPIDense;
912   mat->factor   = A->factor;
913 
914   a->m          = oldmat->m;
915   a->n          = oldmat->n;
916   a->M          = oldmat->M;
917   a->N          = oldmat->N;
918   if (oldmat->factor) {
919     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
920     /* copy factor contents ... add this code! */
921   } else a->factor = 0;
922 
923   a->assembled  = 1;
924   a->rstart     = oldmat->rstart;
925   a->rend       = oldmat->rend;
926   a->size       = oldmat->size;
927   a->rank       = oldmat->rank;
928   a->insertmode = NOT_SET_VALUES;
929 
930   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
931   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
932   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
933   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
934 
935   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
936   PLogObjectParent(mat,a->lvec);
937   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
938   PLogObjectParent(mat,a->Mvctx);
939   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
940   PLogObjectParent(mat,a->A);
941   *newmat = mat;
942   return 0;
943 }
944 
945 #include "sysio.h"
946 
947 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
948 {
949   Mat          A;
950   int          i, nz, ierr, j,rstart, rend, fd;
951   Scalar       *vals,*svals;
952   PetscObject  vobj = (PetscObject) bview;
953   MPI_Comm     comm = vobj->comm;
954   MPI_Status   status;
955   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
956   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
957   int          tag = ((PetscObject)bview)->tag;
958 
959   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
960   if (!rank) {
961     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
962     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
963     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
964   }
965 
966   MPI_Bcast(header+1,3,MPI_INT,0,comm);
967   M = header[1]; N = header[2];
968   /* determine ownership of all rows */
969   m = M/size + ((M % size) > rank);
970   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
971   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
972   rowners[0] = 0;
973   for ( i=2; i<=size; i++ ) {
974     rowners[i] += rowners[i-1];
975   }
976   rstart = rowners[rank];
977   rend   = rowners[rank+1];
978 
979   /* distribute row lengths to all processors */
980   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
981   offlens = ourlens + (rend-rstart);
982   if (!rank) {
983     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
984     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
985     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
986     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
987     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
988     PetscFree(sndcounts);
989   }
990   else {
991     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
992   }
993 
994   if (!rank) {
995     /* calculate the number of nonzeros on each processor */
996     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
997     PetscMemzero(procsnz,size*sizeof(int));
998     for ( i=0; i<size; i++ ) {
999       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1000         procsnz[i] += rowlengths[j];
1001       }
1002     }
1003     PetscFree(rowlengths);
1004 
1005     /* determine max buffer needed and allocate it */
1006     maxnz = 0;
1007     for ( i=0; i<size; i++ ) {
1008       maxnz = PetscMax(maxnz,procsnz[i]);
1009     }
1010     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1011 
1012     /* read in my part of the matrix column indices  */
1013     nz = procsnz[0];
1014     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1015     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1016 
1017     /* read in every one elses and ship off */
1018     for ( i=1; i<size; i++ ) {
1019       nz = procsnz[i];
1020       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1021       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1022     }
1023     PetscFree(cols);
1024   }
1025   else {
1026     /* determine buffer space needed for message */
1027     nz = 0;
1028     for ( i=0; i<m; i++ ) {
1029       nz += ourlens[i];
1030     }
1031     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1032 
1033     /* receive message of column indices*/
1034     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1035     MPI_Get_count(&status,MPI_INT,&maxnz);
1036     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1037   }
1038 
1039   /* loop over local rows, determining number of off diagonal entries */
1040   PetscMemzero(offlens,m*sizeof(int));
1041   jj = 0;
1042   for ( i=0; i<m; i++ ) {
1043     for ( j=0; j<ourlens[i]; j++ ) {
1044       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1045       jj++;
1046     }
1047   }
1048 
1049   /* create our matrix */
1050   for ( i=0; i<m; i++ ) {
1051     ourlens[i] -= offlens[i];
1052   }
1053   if (type == MATMPIDENSE) {
1054     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat); CHKERRQ(ierr);
1055   }
1056   A = *newmat;
1057   for ( i=0; i<m; i++ ) {
1058     ourlens[i] += offlens[i];
1059   }
1060 
1061   if (!rank) {
1062     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1063 
1064     /* read in my part of the matrix numerical values  */
1065     nz = procsnz[0];
1066     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1067 
1068     /* insert into matrix */
1069     jj      = rstart;
1070     smycols = mycols;
1071     svals   = vals;
1072     for ( i=0; i<m; i++ ) {
1073       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1074       smycols += ourlens[i];
1075       svals   += ourlens[i];
1076       jj++;
1077     }
1078 
1079     /* read in other processors and ship out */
1080     for ( i=1; i<size; i++ ) {
1081       nz = procsnz[i];
1082       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1083       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1084     }
1085     PetscFree(procsnz);
1086   }
1087   else {
1088     /* receive numeric values */
1089     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1090 
1091     /* receive message of values*/
1092     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1093     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1094     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1095 
1096     /* insert into matrix */
1097     jj      = rstart;
1098     smycols = mycols;
1099     svals   = vals;
1100     for ( i=0; i<m; i++ ) {
1101       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1102       smycols += ourlens[i];
1103       svals   += ourlens[i];
1104       jj++;
1105     }
1106   }
1107   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1108 
1109   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1110   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1111   return 0;
1112 }
1113