xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision ff14e315fb71bb9cd7a13481bbc735a7713ac235)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.29 1996/03/04 05:15:49 bsmith Exp balay $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int ierr;
83 
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   return 0;
86 }
87 
88 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
89 {
90   return 0;
91 }
92 
93 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
94 {
95   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
96   MPI_Comm     comm = mat->comm;
97   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
98   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
99   int          tag = mat->tag, *owner,*starts,count,ierr;
100   InsertMode   addv;
101   MPI_Request  *send_waits,*recv_waits;
102   Scalar       *rvalues,*svalues;
103 
104   /* make sure all processors are either in INSERTMODE or ADDMODE */
105   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
106   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
107     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
108     }
109   mdn->insertmode = addv; /* in case this processor had no cache */
110 
111   /*  first count number of contributors to each processor */
112   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
113   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
114   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
115   for ( i=0; i<mdn->stash.n; i++ ) {
116     idx = mdn->stash.idx[i];
117     for ( j=0; j<size; j++ ) {
118       if (idx >= owners[j] && idx < owners[j+1]) {
119         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
120       }
121     }
122   }
123   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
124 
125   /* inform other processors of number of messages and max length*/
126   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
127   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
128   nreceives = work[rank];
129   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
130   nmax = work[rank];
131   PetscFree(work);
132 
133   /* post receives:
134        1) each message will consist of ordered pairs
135      (global index,value) we store the global index as a double
136      to simplify the message passing.
137        2) since we don't know how long each individual message is we
138      allocate the largest needed buffer for each receive. Potentially
139      this is a lot of wasted space.
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<mdn->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
167   }
168   PetscFree(owner);
169   starts[0] = 0;
170   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<size; i++ ) {
173     if (procs[i]) {
174       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PetscFree(starts); PetscFree(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
182 
183   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
184   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
185   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
186   mdn->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIDense(Mat);
191 
192 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
193 {
194   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
195   MPI_Status   *send_status,recv_status;
196   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
197   Scalar       *values,val;
198   InsertMode   addv = mdn->insertmode;
199 
200   /*  wait on receives */
201   while (count) {
202     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
203     /* unpack receives into our local space */
204     values = mdn->rvalues + 3*imdex*mdn->rmax;
205     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
206     n = n/3;
207     for ( i=0; i<n; i++ ) {
208       row = (int) PetscReal(values[3*i]) - mdn->rstart;
209       col = (int) PetscReal(values[3*i+1]);
210       val = values[3*i+2];
211       if (col >= 0 && col < mdn->N) {
212         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
213       }
214       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
215     }
216     count--;
217   }
218   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
219 
220   /* wait on sends */
221   if (mdn->nsends) {
222     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
223     CHKPTRQ(send_status);
224     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
225     PetscFree(send_status);
226   }
227   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
228 
229   mdn->insertmode = NOT_SET_VALUES;
230   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
231   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
232 
233   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
234     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
235   }
236   return 0;
237 }
238 
239 static int MatZeroEntries_MPIDense(Mat A)
240 {
241   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
242   return MatZeroEntries(l->A);
243 }
244 
245 /* the code does not do the diagonal entries correctly unless the
246    matrix is square and the column and row owerships are identical.
247    This is a BUG. The only way to fix it seems to be to access
248    mdn->A and mdn->B directly and not through the MatZeroRows()
249    routine.
250 */
251 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
252 {
253   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
254   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
255   int            *procs,*nprocs,j,found,idx,nsends,*work;
256   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
257   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
258   int            *lens,imdex,*lrows,*values;
259   MPI_Comm       comm = A->comm;
260   MPI_Request    *send_waits,*recv_waits;
261   MPI_Status     recv_status,*send_status;
262   IS             istmp;
263 
264   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
265   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
266 
267   /*  first count number of contributors to each processor */
268   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
269   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
270   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
271   for ( i=0; i<N; i++ ) {
272     idx = rows[i];
273     found = 0;
274     for ( j=0; j<size; j++ ) {
275       if (idx >= owners[j] && idx < owners[j+1]) {
276         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
277       }
278     }
279     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
280   }
281   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
282 
283   /* inform other processors of number of messages and max length*/
284   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
285   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
286   nrecvs = work[rank];
287   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
288   nmax = work[rank];
289   PetscFree(work);
290 
291   /* post receives:   */
292   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
293   CHKPTRQ(rvalues);
294   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
295   CHKPTRQ(recv_waits);
296   for ( i=0; i<nrecvs; i++ ) {
297     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
298   }
299 
300   /* do sends:
301       1) starts[i] gives the starting index in svalues for stuff going to
302          the ith processor
303   */
304   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
305   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
306   CHKPTRQ(send_waits);
307   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0] = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
364     CHKPTRQ(send_status);
365     MPI_Waitall(nsends,send_waits,send_status);
366     PetscFree(send_status);
367   }
368   PetscFree(send_waits); PetscFree(svalues);
369 
370   return 0;
371 }
372 
373 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
374 {
375   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
376   int          ierr;
377 
378   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
379   CHKERRQ(ierr);
380   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
381   CHKERRQ(ierr);
382   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
383   return 0;
384 }
385 
386 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
387 {
388   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
389   int          ierr;
390 
391   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
392   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
393   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
394   return 0;
395 }
396 
397 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
398 {
399   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
400   int          ierr;
401   Scalar       zero = 0.0;
402 
403   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
404   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
406          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
407   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   return 0;
410 }
411 
412 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   int          ierr;
416 
417   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
418   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
419   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
420          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
421   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
422          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
423   return 0;
424 }
425 
426 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
427 {
428   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
429   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
430   int          ierr, i, n, m = a->m, radd;
431   Scalar       *x;
432 
433   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
434   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
435   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
436   radd = a->rstart*m*m;
437   for ( i=0; i<m; i++ ) {
438     x[i] = aloc->v[radd + i*m + i];
439   }
440   return 0;
441 }
442 
443 static int MatDestroy_MPIDense(PetscObject obj)
444 {
445   Mat          mat = (Mat) obj;
446   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
447   int          ierr;
448 
449 #if defined(PETSC_LOG)
450   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
451 #endif
452   PetscFree(mdn->rowners);
453   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
454   if (mdn->lvec)   VecDestroy(mdn->lvec);
455   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
456   if (mdn->factor) {
457     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
458     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
459     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
460     PetscFree(mdn->factor);
461   }
462   PetscFree(mdn);
463   PLogObjectDestroy(mat);
464   PetscHeaderDestroy(mat);
465   return 0;
466 }
467 
468 #include "pinclude/pviewer.h"
469 
470 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
471 {
472   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
473   int          ierr;
474   if (mdn->size == 1) {
475     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
476   }
477   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
478   return 0;
479 }
480 
481 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
482 {
483   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
484   int          ierr, format;
485   PetscObject  vobj = (PetscObject) viewer;
486   FILE         *fd;
487 
488   ierr = ViewerFileGetPointer(viewer,&fd); CHKERRQ(ierr);
489   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
490     ierr = ViewerFileGetFormat_Private(viewer,&format);
491     if (format == FILE_FORMAT_INFO_DETAILED) {
492       int nz, nzalloc, mem, rank;
493       MPI_Comm_rank(mat->comm,&rank);
494       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
495       MPIU_Seq_begin(mat->comm,1);
496         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
497             rank,mdn->m,nz,nzalloc,mem);
498       fflush(fd);
499       MPIU_Seq_end(mat->comm,1);
500       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
501       return 0;
502     }
503     else if (format == FILE_FORMAT_INFO) {
504       return 0;
505     }
506   }
507   if (vobj->type == ASCII_FILE_VIEWER) {
508     MPIU_Seq_begin(mat->comm,1);
509     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
510              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
511     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
512     fflush(fd);
513     MPIU_Seq_end(mat->comm,1);
514   }
515   else {
516     int size = mdn->size, rank = mdn->rank;
517     if (size == 1) {
518       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
519     }
520     else {
521       /* assemble the entire matrix onto first processor. */
522       Mat          A;
523       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
524       Scalar       *vals;
525       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
526 
527       if (!rank) {
528         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
529       }
530       else {
531         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
532       }
533       PLogObjectParent(mat,A);
534 
535       /* Copy the matrix ... This isn't the most efficient means,
536          but it's quick for now */
537       row = mdn->rstart; m = Amdn->m;
538       for ( i=0; i<m; i++ ) {
539         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
540         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
541         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
542         row++;
543       }
544 
545       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
546       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
547       if (!rank) {
548         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
549       }
550       ierr = MatDestroy(A); CHKERRQ(ierr);
551     }
552   }
553   return 0;
554 }
555 
556 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
557 {
558   Mat          mat = (Mat) obj;
559   PetscObject  vobj = (PetscObject) viewer;
560   int          ierr;
561 
562   if (!viewer) {
563     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
564   }
565   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
566     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
567   }
568   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
569     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
570   }
571   else if (vobj->type == BINARY_FILE_VIEWER) {
572     return MatView_MPIDense_Binary(mat,viewer);
573   }
574   return 0;
575 }
576 
577 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
578                              int *nzalloc,int *mem)
579 {
580   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
581   Mat          mdn = mat->A;
582   int          ierr, isend[3], irecv[3];
583 
584   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
585   if (flag == MAT_LOCAL) {
586     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
587   } else if (flag == MAT_GLOBAL_MAX) {
588     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
589     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
590   } else if (flag == MAT_GLOBAL_SUM) {
591     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
592     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
593   }
594   return 0;
595 }
596 
597 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
598    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
599    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
600    extern int MatSolve_MPIDense(Mat,Vec,Vec);
601    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
602    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
603    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
604 
605 static int MatSetOption_MPIDense(Mat A,MatOption op)
606 {
607   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
608 
609   if (op == NO_NEW_NONZERO_LOCATIONS ||
610       op == YES_NEW_NONZERO_LOCATIONS ||
611       op == COLUMNS_SORTED ||
612       op == ROW_ORIENTED) {
613         MatSetOption(a->A,op);
614   }
615   else if (op == ROWS_SORTED ||
616            op == SYMMETRIC_MATRIX ||
617            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
618            op == YES_NEW_DIAGONALS)
619     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
620   else if (op == COLUMN_ORIENTED)
621     { a->roworiented = 0; MatSetOption(a->A,op);}
622   else if (op == NO_NEW_DIAGONALS)
623     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
624   else
625     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
626   return 0;
627 }
628 
629 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
630 {
631   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
632   *m = mat->M; *n = mat->N;
633   return 0;
634 }
635 
636 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
637 {
638   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
639   *m = mat->m; *n = mat->N;
640   return 0;
641 }
642 
643 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
644 {
645   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
646   *m = mat->rstart; *n = mat->rend;
647   return 0;
648 }
649 
650 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
651 {
652   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
653   int          lrow, rstart = mat->rstart, rend = mat->rend;
654 
655   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
656   lrow = row - rstart;
657   return MatGetRow(mat->A,lrow,nz,idx,v);
658 }
659 
660 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
661 {
662   if (idx) PetscFree(*idx);
663   if (v) PetscFree(*v);
664   return 0;
665 }
666 
667 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
668 {
669   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
670   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
671   int          ierr, i, j;
672   double       sum = 0.0;
673   Scalar       *v = mat->v;
674 
675   if (mdn->size == 1) {
676     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
677   } else {
678     if (type == NORM_FROBENIUS) {
679       for (i=0; i<mat->n*mat->m; i++ ) {
680 #if defined(PETSC_COMPLEX)
681         sum += real(conj(*v)*(*v)); v++;
682 #else
683         sum += (*v)*(*v); v++;
684 #endif
685       }
686       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
687       *norm = sqrt(*norm);
688       PLogFlops(2*mat->n*mat->m);
689     }
690     else if (type == NORM_1) {
691       double *tmp, *tmp2;
692       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
693       tmp2 = tmp + mdn->N;
694       PetscMemzero(tmp,2*mdn->N*sizeof(double));
695       *norm = 0.0;
696       v = mat->v;
697       for ( j=0; j<mat->n; j++ ) {
698         for ( i=0; i<mat->m; i++ ) {
699           tmp[j] += PetscAbsScalar(*v);  v++;
700         }
701       }
702       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
703       for ( j=0; j<mdn->N; j++ ) {
704         if (tmp2[j] > *norm) *norm = tmp2[j];
705       }
706       PetscFree(tmp);
707       PLogFlops(mat->n*mat->m);
708     }
709     else if (type == NORM_INFINITY) { /* max row norm */
710       double ntemp;
711       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
712       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
713     }
714     else {
715       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
716     }
717   }
718   return 0;
719 }
720 
721 static int MatTranspose_MPIDense(Mat A,Mat *matout)
722 {
723   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
724   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
725   Mat          B;
726   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
727   int          j, i, ierr;
728   Scalar       *v;
729 
730   if (matout == PETSC_NULL && M != N)
731     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
732   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
733          CHKERRQ(ierr);
734 
735   m = Aloc->m; n = Aloc->n; v = Aloc->v;
736   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
737   for ( j=0; j<n; j++ ) {
738     for (i=0; i<m; i++) rwork[i] = rstart + i;
739     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
740     v += m;
741   }
742   PetscFree(rwork);
743   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
744   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
745   if (matout != PETSC_NULL) {
746     *matout = B;
747   } else {
748     /* This isn't really an in-place transpose, but free data struct from a */
749     PetscFree(a->rowners);
750     ierr = MatDestroy(a->A); CHKERRQ(ierr);
751     if (a->lvec) VecDestroy(a->lvec);
752     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
753     PetscFree(a);
754     PetscMemcpy(A,B,sizeof(struct _Mat));
755     PetscHeaderDestroy(B);
756   }
757   return 0;
758 }
759 
760 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
761 
762 /* -------------------------------------------------------------------*/
763 static struct _MatOps MatOps = {MatSetValues_MPIDense,
764        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
765        MatMult_MPIDense,MatMultAdd_MPIDense,
766        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
767 /*       MatSolve_MPIDense,0, */
768        0,0,
769        0,0,
770        0,0,
771 /*       MatLUFactor_MPIDense,0, */
772        0,MatTranspose_MPIDense,
773        MatGetInfo_MPIDense,0,
774        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
775        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
776        0,
777        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
778        0,0,0,
779 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
780        0,0,
781        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
782        MatGetOwnershipRange_MPIDense,
783        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
784        0,0,0,MatConvertSameType_MPIDense,
785        0,0,0,0,0,
786        0,0,MatGetValues_MPIDense};
787 
788 /*@C
789    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
790 
791    Input Parameters:
792 .  comm - MPI communicator
793 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
794 .  n - number of local columns (or PETSC_DECIDE to have calculated
795            if N is given)
796 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
797 .  N - number of global columns (or PETSC_DECIDE to have calculated
798            if n is given)
799 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
800    to control all matrix memory allocation.
801 
802    Output Parameter:
803 .  newmat - the matrix
804 
805    Notes:
806    The dense format is fully compatible with standard Fortran 77
807    storage by columns.
808 
809    The data input variable is intended primarily for Fortran programmers
810    who wish to allocate their own matrix memory space.  Most users should
811    set data=PETSC_NULL.
812 
813    The user MUST specify either the local or global matrix dimensions
814    (possibly both).
815 
816    Currently, the only parallel dense matrix decomposition is by rows,
817    so that n=N and each submatrix owns all of the global columns.
818 
819 .keywords: matrix, dense, parallel
820 
821 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
822 @*/
823 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
824 {
825   Mat          mat;
826   Mat_MPIDense *a;
827   int          ierr, i,flg;
828 
829 /* Note:  For now, when data is specified above, this assumes the user correctly
830    allocates the local dense storage space.  We should add error checking. */
831 
832   *newmat         = 0;
833   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
834   PLogObjectCreate(mat);
835   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
836   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
837   mat->destroy    = MatDestroy_MPIDense;
838   mat->view       = MatView_MPIDense;
839   mat->factor     = 0;
840 
841   a->factor = 0;
842   a->insertmode = NOT_SET_VALUES;
843   MPI_Comm_rank(comm,&a->rank);
844   MPI_Comm_size(comm,&a->size);
845 
846   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
847   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
848 
849   /* each row stores all columns */
850   if (N == PETSC_DECIDE) N = n;
851   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
852   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
853   a->N = N;
854   a->M = M;
855   a->m = m;
856   a->n = n;
857 
858   /* build local table of row and column ownerships */
859   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
860   a->cowners = a->rowners + a->size + 1;
861   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
862                        sizeof(Mat_MPIDense));
863   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
864   a->rowners[0] = 0;
865   for ( i=2; i<=a->size; i++ ) {
866     a->rowners[i] += a->rowners[i-1];
867   }
868   a->rstart = a->rowners[a->rank];
869   a->rend   = a->rowners[a->rank+1];
870   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
871   a->cowners[0] = 0;
872   for ( i=2; i<=a->size; i++ ) {
873     a->cowners[i] += a->cowners[i-1];
874   }
875 
876   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
877   PLogObjectParent(mat,a->A);
878 
879   /* build cache for off array entries formed */
880   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
881 
882   /* stuff used for matrix vector multiply */
883   a->lvec        = 0;
884   a->Mvctx       = 0;
885   a->roworiented = 1;
886 
887   *newmat = mat;
888   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
889   if (flg) {
890     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
891   }
892   return 0;
893 }
894 
895 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
896 {
897   Mat          mat;
898   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
899   int          ierr;
900   FactorCtx    *factor;
901 
902   *newmat       = 0;
903   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
904   PLogObjectCreate(mat);
905   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
906   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
907   mat->destroy  = MatDestroy_MPIDense;
908   mat->view     = MatView_MPIDense;
909   mat->factor   = A->factor;
910   mat->assembled  = PETSC_TRUE;
911 
912   a->m          = oldmat->m;
913   a->n          = oldmat->n;
914   a->M          = oldmat->M;
915   a->N          = oldmat->N;
916   if (oldmat->factor) {
917     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
918     /* copy factor contents ... add this code! */
919   } else a->factor = 0;
920 
921   a->rstart     = oldmat->rstart;
922   a->rend       = oldmat->rend;
923   a->size       = oldmat->size;
924   a->rank       = oldmat->rank;
925   a->insertmode = NOT_SET_VALUES;
926 
927   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
928   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
929   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
930   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
931 
932   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
933   PLogObjectParent(mat,a->lvec);
934   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
935   PLogObjectParent(mat,a->Mvctx);
936   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
937   PLogObjectParent(mat,a->A);
938   *newmat = mat;
939   return 0;
940 }
941 
942 #include "sysio.h"
943 
944 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
945 {
946   Mat          A;
947   int          i, nz, ierr, j,rstart, rend, fd;
948   Scalar       *vals,*svals;
949   PetscObject  vobj = (PetscObject) bview;
950   MPI_Comm     comm = vobj->comm;
951   MPI_Status   status;
952   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
953   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
954   int          tag = ((PetscObject)bview)->tag;
955 
956   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
957   if (!rank) {
958     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
959     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
960     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
961   }
962 
963   MPI_Bcast(header+1,3,MPI_INT,0,comm);
964   M = header[1]; N = header[2];
965   /* determine ownership of all rows */
966   m = M/size + ((M % size) > rank);
967   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
968   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
969   rowners[0] = 0;
970   for ( i=2; i<=size; i++ ) {
971     rowners[i] += rowners[i-1];
972   }
973   rstart = rowners[rank];
974   rend   = rowners[rank+1];
975 
976   /* distribute row lengths to all processors */
977   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
978   offlens = ourlens + (rend-rstart);
979   if (!rank) {
980     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
981     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
982     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
983     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
984     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
985     PetscFree(sndcounts);
986   }
987   else {
988     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
989   }
990 
991   if (!rank) {
992     /* calculate the number of nonzeros on each processor */
993     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
994     PetscMemzero(procsnz,size*sizeof(int));
995     for ( i=0; i<size; i++ ) {
996       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
997         procsnz[i] += rowlengths[j];
998       }
999     }
1000     PetscFree(rowlengths);
1001 
1002     /* determine max buffer needed and allocate it */
1003     maxnz = 0;
1004     for ( i=0; i<size; i++ ) {
1005       maxnz = PetscMax(maxnz,procsnz[i]);
1006     }
1007     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1008 
1009     /* read in my part of the matrix column indices  */
1010     nz = procsnz[0];
1011     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1012     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1013 
1014     /* read in every one elses and ship off */
1015     for ( i=1; i<size; i++ ) {
1016       nz = procsnz[i];
1017       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1018       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1019     }
1020     PetscFree(cols);
1021   }
1022   else {
1023     /* determine buffer space needed for message */
1024     nz = 0;
1025     for ( i=0; i<m; i++ ) {
1026       nz += ourlens[i];
1027     }
1028     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1029 
1030     /* receive message of column indices*/
1031     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1032     MPI_Get_count(&status,MPI_INT,&maxnz);
1033     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1034   }
1035 
1036   /* loop over local rows, determining number of off diagonal entries */
1037   PetscMemzero(offlens,m*sizeof(int));
1038   jj = 0;
1039   for ( i=0; i<m; i++ ) {
1040     for ( j=0; j<ourlens[i]; j++ ) {
1041       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1042       jj++;
1043     }
1044   }
1045 
1046   /* create our matrix */
1047   for ( i=0; i<m; i++ ) {
1048     ourlens[i] -= offlens[i];
1049   }
1050   if (type == MATMPIDENSE) {
1051     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1052   }
1053   A = *newmat;
1054   for ( i=0; i<m; i++ ) {
1055     ourlens[i] += offlens[i];
1056   }
1057 
1058   if (!rank) {
1059     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1060 
1061     /* read in my part of the matrix numerical values  */
1062     nz = procsnz[0];
1063     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1064 
1065     /* insert into matrix */
1066     jj      = rstart;
1067     smycols = mycols;
1068     svals   = vals;
1069     for ( i=0; i<m; i++ ) {
1070       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1071       smycols += ourlens[i];
1072       svals   += ourlens[i];
1073       jj++;
1074     }
1075 
1076     /* read in other processors and ship out */
1077     for ( i=1; i<size; i++ ) {
1078       nz = procsnz[i];
1079       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1080       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1081     }
1082     PetscFree(procsnz);
1083   }
1084   else {
1085     /* receive numeric values */
1086     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1087 
1088     /* receive message of values*/
1089     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1090     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1091     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1092 
1093     /* insert into matrix */
1094     jj      = rstart;
1095     smycols = mycols;
1096     svals   = vals;
1097     for ( i=0; i<m; i++ ) {
1098       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1099       smycols += ourlens[i];
1100       svals   += ourlens[i];
1101       jj++;
1102     }
1103   }
1104   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1105 
1106   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1107   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1108   return 0;
1109 }
1110