xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 2e7ac64766c5fb805bb4575aab735c32922a18c8)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.30 1996/03/06 21:47:19 balay Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int ierr;
83 
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   return 0;
86 }
87 
88 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
89 {
90   return 0;
91 }
92 
93 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
94 {
95   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
96   MPI_Comm     comm = mat->comm;
97   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
98   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
99   int          tag = mat->tag, *owner,*starts,count,ierr;
100   InsertMode   addv;
101   MPI_Request  *send_waits,*recv_waits;
102   Scalar       *rvalues,*svalues;
103 
104   /* make sure all processors are either in INSERTMODE or ADDMODE */
105   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
106   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
107     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
108     }
109   mdn->insertmode = addv; /* in case this processor had no cache */
110 
111   /*  first count number of contributors to each processor */
112   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
113   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
114   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
115   for ( i=0; i<mdn->stash.n; i++ ) {
116     idx = mdn->stash.idx[i];
117     for ( j=0; j<size; j++ ) {
118       if (idx >= owners[j] && idx < owners[j+1]) {
119         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
120       }
121     }
122   }
123   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
124 
125   /* inform other processors of number of messages and max length*/
126   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
127   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
128   nreceives = work[rank];
129   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
130   nmax = work[rank];
131   PetscFree(work);
132 
133   /* post receives:
134        1) each message will consist of ordered pairs
135      (global index,value) we store the global index as a double
136      to simplify the message passing.
137        2) since we don't know how long each individual message is we
138      allocate the largest needed buffer for each receive. Potentially
139      this is a lot of wasted space.
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<mdn->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
167   }
168   PetscFree(owner);
169   starts[0] = 0;
170   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<size; i++ ) {
173     if (procs[i]) {
174       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PetscFree(starts); PetscFree(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
182 
183   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
184   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
185   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
186   mdn->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIDense(Mat);
191 
192 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
193 {
194   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
195   MPI_Status   *send_status,recv_status;
196   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
197   Scalar       *values,val;
198   InsertMode   addv = mdn->insertmode;
199 
200   /*  wait on receives */
201   while (count) {
202     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
203     /* unpack receives into our local space */
204     values = mdn->rvalues + 3*imdex*mdn->rmax;
205     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
206     n = n/3;
207     for ( i=0; i<n; i++ ) {
208       row = (int) PetscReal(values[3*i]) - mdn->rstart;
209       col = (int) PetscReal(values[3*i+1]);
210       val = values[3*i+2];
211       if (col >= 0 && col < mdn->N) {
212         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
213       }
214       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
215     }
216     count--;
217   }
218   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
219 
220   /* wait on sends */
221   if (mdn->nsends) {
222     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
223     CHKPTRQ(send_status);
224     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
225     PetscFree(send_status);
226   }
227   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
228 
229   mdn->insertmode = NOT_SET_VALUES;
230   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
231   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
232 
233   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
234     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
235   }
236   return 0;
237 }
238 
239 static int MatZeroEntries_MPIDense(Mat A)
240 {
241   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
242   return MatZeroEntries(l->A);
243 }
244 
245 /* the code does not do the diagonal entries correctly unless the
246    matrix is square and the column and row owerships are identical.
247    This is a BUG. The only way to fix it seems to be to access
248    mdn->A and mdn->B directly and not through the MatZeroRows()
249    routine.
250 */
251 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
252 {
253   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
254   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
255   int            *procs,*nprocs,j,found,idx,nsends,*work;
256   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
257   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
258   int            *lens,imdex,*lrows,*values;
259   MPI_Comm       comm = A->comm;
260   MPI_Request    *send_waits,*recv_waits;
261   MPI_Status     recv_status,*send_status;
262   IS             istmp;
263 
264   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
265   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
266 
267   /*  first count number of contributors to each processor */
268   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
269   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
270   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
271   for ( i=0; i<N; i++ ) {
272     idx = rows[i];
273     found = 0;
274     for ( j=0; j<size; j++ ) {
275       if (idx >= owners[j] && idx < owners[j+1]) {
276         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
277       }
278     }
279     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
280   }
281   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
282 
283   /* inform other processors of number of messages and max length*/
284   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
285   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
286   nrecvs = work[rank];
287   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
288   nmax = work[rank];
289   PetscFree(work);
290 
291   /* post receives:   */
292   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
293   CHKPTRQ(rvalues);
294   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
295   CHKPTRQ(recv_waits);
296   for ( i=0; i<nrecvs; i++ ) {
297     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
298   }
299 
300   /* do sends:
301       1) starts[i] gives the starting index in svalues for stuff going to
302          the ith processor
303   */
304   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
305   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
306   CHKPTRQ(send_waits);
307   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0] = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
364     CHKPTRQ(send_status);
365     MPI_Waitall(nsends,send_waits,send_status);
366     PetscFree(send_status);
367   }
368   PetscFree(send_waits); PetscFree(svalues);
369 
370   return 0;
371 }
372 
373 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
374 {
375   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
376   int          ierr;
377 
378   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
379   CHKERRQ(ierr);
380   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
381   CHKERRQ(ierr);
382   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
383   return 0;
384 }
385 
386 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
387 {
388   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
389   int          ierr;
390 
391   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
392   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
393   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
394   return 0;
395 }
396 
397 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
398 {
399   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
400   int          ierr;
401   Scalar       zero = 0.0;
402 
403   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
404   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
406          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
407   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   return 0;
410 }
411 
412 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   int          ierr;
416 
417   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
418   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
419   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
420          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
421   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
422          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
423   return 0;
424 }
425 
426 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
427 {
428   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
429   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
430   int          ierr, i, n, m = a->m, radd;
431   Scalar       *x;
432 
433   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
434   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
435   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
436   radd = a->rstart*m*m;
437   for ( i=0; i<m; i++ ) {
438     x[i] = aloc->v[radd + i*m + i];
439   }
440   return 0;
441 }
442 
443 static int MatDestroy_MPIDense(PetscObject obj)
444 {
445   Mat          mat = (Mat) obj;
446   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
447   int          ierr;
448 
449 #if defined(PETSC_LOG)
450   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
451 #endif
452   PetscFree(mdn->rowners);
453   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
454   if (mdn->lvec)   VecDestroy(mdn->lvec);
455   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
456   if (mdn->factor) {
457     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
458     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
459     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
460     PetscFree(mdn->factor);
461   }
462   PetscFree(mdn);
463   PLogObjectDestroy(mat);
464   PetscHeaderDestroy(mat);
465   return 0;
466 }
467 
468 #include "pinclude/pviewer.h"
469 
470 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
471 {
472   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
473   int          ierr;
474   if (mdn->size == 1) {
475     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
476   }
477   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
478   return 0;
479 }
480 
481 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
482 {
483   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
484   int          ierr, format;
485   PetscObject  vobj = (PetscObject) viewer;
486   FILE         *fd;
487 
488   ierr = ViewerFileGetPointer(viewer,&fd); CHKERRQ(ierr);
489   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
490     ierr = ViewerFileGetFormat_Private(viewer,&format);
491     if (format == FILE_FORMAT_INFO_DETAILED) {
492       int nz, nzalloc, mem, rank;
493       MPI_Comm_rank(mat->comm,&rank);
494       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
495       MPIU_Seq_begin(mat->comm,1);
496         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
497             rank,mdn->m,nz,nzalloc,mem);
498       fflush(fd);
499       MPIU_Seq_end(mat->comm,1);
500       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
501       return 0;
502     }
503     else if (format == FILE_FORMAT_INFO) {
504       return 0;
505     }
506   }
507   if (vobj->type == ASCII_FILE_VIEWER) {
508     MPIU_Seq_begin(mat->comm,1);
509     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
510              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
511     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
512     fflush(fd);
513     MPIU_Seq_end(mat->comm,1);
514   }
515   else {
516     int size = mdn->size, rank = mdn->rank;
517     if (size == 1) {
518       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
519     }
520     else {
521       /* assemble the entire matrix onto first processor. */
522       Mat          A;
523       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
524       Scalar       *vals;
525       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
526 
527       if (!rank) {
528         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
529       }
530       else {
531         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
532       }
533       PLogObjectParent(mat,A);
534 
535       /* Copy the matrix ... This isn't the most efficient means,
536          but it's quick for now */
537       row = mdn->rstart; m = Amdn->m;
538       for ( i=0; i<m; i++ ) {
539         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
540         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
541         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
542         row++;
543       }
544 
545       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
546       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
547       if (!rank) {
548         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
549       }
550       ierr = MatDestroy(A); CHKERRQ(ierr);
551     }
552   }
553   return 0;
554 }
555 
556 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
557 {
558   Mat          mat = (Mat) obj;
559   PetscObject  vobj = (PetscObject) viewer;
560   int          ierr;
561   ViewerType   vtype;
562 
563   if (!viewer) {
564     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
565   }
566   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
567   if (vtype == ASCII_FILE_VIEWER) {
568     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
569   }
570   else if (vtype == ASCII_FILES_VIEWER) {
571     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
572   }
573   else if (vtype == BINARY_FILE_VIEWER) {
574     return MatView_MPIDense_Binary(mat,viewer);
575   }
576   return 0;
577 }
578 
579 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
580                              int *nzalloc,int *mem)
581 {
582   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
583   Mat          mdn = mat->A;
584   int          ierr, isend[3], irecv[3];
585 
586   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
587   if (flag == MAT_LOCAL) {
588     if (nz)      *nz      = isend[0];
589     if (nzalloc) *nzalloc = isend[1];
590     if (mem)     *mem     = isend[2];
591   } else if (flag == MAT_GLOBAL_MAX) {
592     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
593     if (nz)      *nz      = irecv[0];
594     if (nzalloc) *nzalloc = irecv[1];
595     if (mem)     *mem     = irecv[2];
596   } else if (flag == MAT_GLOBAL_SUM) {
597     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
598     if (nz)      *nz      = irecv[0];
599     if (nzalloc) *nzalloc = irecv[1];
600     if (mem)     *mem     = irecv[2];
601   }
602   return 0;
603 }
604 
605 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
606    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
607    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
608    extern int MatSolve_MPIDense(Mat,Vec,Vec);
609    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
610    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
611    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
612 
613 static int MatSetOption_MPIDense(Mat A,MatOption op)
614 {
615   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
616 
617   if (op == NO_NEW_NONZERO_LOCATIONS ||
618       op == YES_NEW_NONZERO_LOCATIONS ||
619       op == COLUMNS_SORTED ||
620       op == ROW_ORIENTED) {
621         MatSetOption(a->A,op);
622   }
623   else if (op == ROWS_SORTED ||
624            op == SYMMETRIC_MATRIX ||
625            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
626            op == YES_NEW_DIAGONALS)
627     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
628   else if (op == COLUMN_ORIENTED)
629     { a->roworiented = 0; MatSetOption(a->A,op);}
630   else if (op == NO_NEW_DIAGONALS)
631     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
632   else
633     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
634   return 0;
635 }
636 
637 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
638 {
639   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
640   *m = mat->M; *n = mat->N;
641   return 0;
642 }
643 
644 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
645 {
646   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
647   *m = mat->m; *n = mat->N;
648   return 0;
649 }
650 
651 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
652 {
653   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
654   *m = mat->rstart; *n = mat->rend;
655   return 0;
656 }
657 
658 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
659 {
660   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
661   int          lrow, rstart = mat->rstart, rend = mat->rend;
662 
663   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
664   lrow = row - rstart;
665   return MatGetRow(mat->A,lrow,nz,idx,v);
666 }
667 
668 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
669 {
670   if (idx) PetscFree(*idx);
671   if (v) PetscFree(*v);
672   return 0;
673 }
674 
675 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
676 {
677   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
678   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
679   int          ierr, i, j;
680   double       sum = 0.0;
681   Scalar       *v = mat->v;
682 
683   if (mdn->size == 1) {
684     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
685   } else {
686     if (type == NORM_FROBENIUS) {
687       for (i=0; i<mat->n*mat->m; i++ ) {
688 #if defined(PETSC_COMPLEX)
689         sum += real(conj(*v)*(*v)); v++;
690 #else
691         sum += (*v)*(*v); v++;
692 #endif
693       }
694       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
695       *norm = sqrt(*norm);
696       PLogFlops(2*mat->n*mat->m);
697     }
698     else if (type == NORM_1) {
699       double *tmp, *tmp2;
700       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
701       tmp2 = tmp + mdn->N;
702       PetscMemzero(tmp,2*mdn->N*sizeof(double));
703       *norm = 0.0;
704       v = mat->v;
705       for ( j=0; j<mat->n; j++ ) {
706         for ( i=0; i<mat->m; i++ ) {
707           tmp[j] += PetscAbsScalar(*v);  v++;
708         }
709       }
710       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
711       for ( j=0; j<mdn->N; j++ ) {
712         if (tmp2[j] > *norm) *norm = tmp2[j];
713       }
714       PetscFree(tmp);
715       PLogFlops(mat->n*mat->m);
716     }
717     else if (type == NORM_INFINITY) { /* max row norm */
718       double ntemp;
719       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
720       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
721     }
722     else {
723       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
724     }
725   }
726   return 0;
727 }
728 
729 static int MatTranspose_MPIDense(Mat A,Mat *matout)
730 {
731   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
732   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
733   Mat          B;
734   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
735   int          j, i, ierr;
736   Scalar       *v;
737 
738   if (matout == PETSC_NULL && M != N)
739     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
740   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
741          CHKERRQ(ierr);
742 
743   m = Aloc->m; n = Aloc->n; v = Aloc->v;
744   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
745   for ( j=0; j<n; j++ ) {
746     for (i=0; i<m; i++) rwork[i] = rstart + i;
747     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
748     v += m;
749   }
750   PetscFree(rwork);
751   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
752   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
753   if (matout != PETSC_NULL) {
754     *matout = B;
755   } else {
756     /* This isn't really an in-place transpose, but free data struct from a */
757     PetscFree(a->rowners);
758     ierr = MatDestroy(a->A); CHKERRQ(ierr);
759     if (a->lvec) VecDestroy(a->lvec);
760     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
761     PetscFree(a);
762     PetscMemcpy(A,B,sizeof(struct _Mat));
763     PetscHeaderDestroy(B);
764   }
765   return 0;
766 }
767 
768 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
769 
770 /* -------------------------------------------------------------------*/
771 static struct _MatOps MatOps = {MatSetValues_MPIDense,
772        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
773        MatMult_MPIDense,MatMultAdd_MPIDense,
774        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
775 /*       MatSolve_MPIDense,0, */
776        0,0,
777        0,0,
778        0,0,
779 /*       MatLUFactor_MPIDense,0, */
780        0,MatTranspose_MPIDense,
781        MatGetInfo_MPIDense,0,
782        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
783        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
784        0,
785        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
786        0,0,0,
787 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
788        0,0,
789        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
790        MatGetOwnershipRange_MPIDense,
791        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
792        0,0,0,MatConvertSameType_MPIDense,
793        0,0,0,0,0,
794        0,0,MatGetValues_MPIDense};
795 
796 /*@C
797    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
798 
799    Input Parameters:
800 .  comm - MPI communicator
801 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
802 .  n - number of local columns (or PETSC_DECIDE to have calculated
803            if N is given)
804 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
805 .  N - number of global columns (or PETSC_DECIDE to have calculated
806            if n is given)
807 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
808    to control all matrix memory allocation.
809 
810    Output Parameter:
811 .  newmat - the matrix
812 
813    Notes:
814    The dense format is fully compatible with standard Fortran 77
815    storage by columns.
816 
817    The data input variable is intended primarily for Fortran programmers
818    who wish to allocate their own matrix memory space.  Most users should
819    set data=PETSC_NULL.
820 
821    The user MUST specify either the local or global matrix dimensions
822    (possibly both).
823 
824    Currently, the only parallel dense matrix decomposition is by rows,
825    so that n=N and each submatrix owns all of the global columns.
826 
827 .keywords: matrix, dense, parallel
828 
829 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
830 @*/
831 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
832 {
833   Mat          mat;
834   Mat_MPIDense *a;
835   int          ierr, i,flg;
836 
837 /* Note:  For now, when data is specified above, this assumes the user correctly
838    allocates the local dense storage space.  We should add error checking. */
839 
840   *newmat         = 0;
841   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
842   PLogObjectCreate(mat);
843   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
844   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
845   mat->destroy    = MatDestroy_MPIDense;
846   mat->view       = MatView_MPIDense;
847   mat->factor     = 0;
848 
849   a->factor = 0;
850   a->insertmode = NOT_SET_VALUES;
851   MPI_Comm_rank(comm,&a->rank);
852   MPI_Comm_size(comm,&a->size);
853 
854   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
855   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
856 
857   /* each row stores all columns */
858   if (N == PETSC_DECIDE) N = n;
859   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
860   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
861   a->N = N;
862   a->M = M;
863   a->m = m;
864   a->n = n;
865 
866   /* build local table of row and column ownerships */
867   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
868   a->cowners = a->rowners + a->size + 1;
869   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
870                        sizeof(Mat_MPIDense));
871   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
872   a->rowners[0] = 0;
873   for ( i=2; i<=a->size; i++ ) {
874     a->rowners[i] += a->rowners[i-1];
875   }
876   a->rstart = a->rowners[a->rank];
877   a->rend   = a->rowners[a->rank+1];
878   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
879   a->cowners[0] = 0;
880   for ( i=2; i<=a->size; i++ ) {
881     a->cowners[i] += a->cowners[i-1];
882   }
883 
884   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
885   PLogObjectParent(mat,a->A);
886 
887   /* build cache for off array entries formed */
888   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
889 
890   /* stuff used for matrix vector multiply */
891   a->lvec        = 0;
892   a->Mvctx       = 0;
893   a->roworiented = 1;
894 
895   *newmat = mat;
896   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
897   if (flg) {
898     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
899   }
900   return 0;
901 }
902 
903 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
904 {
905   Mat          mat;
906   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
907   int          ierr;
908   FactorCtx    *factor;
909 
910   *newmat       = 0;
911   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
912   PLogObjectCreate(mat);
913   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
914   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
915   mat->destroy  = MatDestroy_MPIDense;
916   mat->view     = MatView_MPIDense;
917   mat->factor   = A->factor;
918   mat->assembled  = PETSC_TRUE;
919 
920   a->m          = oldmat->m;
921   a->n          = oldmat->n;
922   a->M          = oldmat->M;
923   a->N          = oldmat->N;
924   if (oldmat->factor) {
925     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
926     /* copy factor contents ... add this code! */
927   } else a->factor = 0;
928 
929   a->rstart     = oldmat->rstart;
930   a->rend       = oldmat->rend;
931   a->size       = oldmat->size;
932   a->rank       = oldmat->rank;
933   a->insertmode = NOT_SET_VALUES;
934 
935   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
936   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
937   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
938   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
939 
940   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
941   PLogObjectParent(mat,a->lvec);
942   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
943   PLogObjectParent(mat,a->Mvctx);
944   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
945   PLogObjectParent(mat,a->A);
946   *newmat = mat;
947   return 0;
948 }
949 
950 #include "sysio.h"
951 
952 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
953 {
954   Mat          A;
955   int          i, nz, ierr, j,rstart, rend, fd;
956   Scalar       *vals,*svals;
957   PetscObject  vobj = (PetscObject) bview;
958   MPI_Comm     comm = vobj->comm;
959   MPI_Status   status;
960   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
961   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
962   int          tag = ((PetscObject)bview)->tag;
963 
964   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
965   if (!rank) {
966     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
967     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
968     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
969   }
970 
971   MPI_Bcast(header+1,3,MPI_INT,0,comm);
972   M = header[1]; N = header[2];
973   /* determine ownership of all rows */
974   m = M/size + ((M % size) > rank);
975   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
976   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
977   rowners[0] = 0;
978   for ( i=2; i<=size; i++ ) {
979     rowners[i] += rowners[i-1];
980   }
981   rstart = rowners[rank];
982   rend   = rowners[rank+1];
983 
984   /* distribute row lengths to all processors */
985   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
986   offlens = ourlens + (rend-rstart);
987   if (!rank) {
988     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
989     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
990     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
991     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
992     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
993     PetscFree(sndcounts);
994   }
995   else {
996     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
997   }
998 
999   if (!rank) {
1000     /* calculate the number of nonzeros on each processor */
1001     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1002     PetscMemzero(procsnz,size*sizeof(int));
1003     for ( i=0; i<size; i++ ) {
1004       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1005         procsnz[i] += rowlengths[j];
1006       }
1007     }
1008     PetscFree(rowlengths);
1009 
1010     /* determine max buffer needed and allocate it */
1011     maxnz = 0;
1012     for ( i=0; i<size; i++ ) {
1013       maxnz = PetscMax(maxnz,procsnz[i]);
1014     }
1015     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1016 
1017     /* read in my part of the matrix column indices  */
1018     nz = procsnz[0];
1019     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1020     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1021 
1022     /* read in every one elses and ship off */
1023     for ( i=1; i<size; i++ ) {
1024       nz = procsnz[i];
1025       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1026       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1027     }
1028     PetscFree(cols);
1029   }
1030   else {
1031     /* determine buffer space needed for message */
1032     nz = 0;
1033     for ( i=0; i<m; i++ ) {
1034       nz += ourlens[i];
1035     }
1036     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1037 
1038     /* receive message of column indices*/
1039     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1040     MPI_Get_count(&status,MPI_INT,&maxnz);
1041     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1042   }
1043 
1044   /* loop over local rows, determining number of off diagonal entries */
1045   PetscMemzero(offlens,m*sizeof(int));
1046   jj = 0;
1047   for ( i=0; i<m; i++ ) {
1048     for ( j=0; j<ourlens[i]; j++ ) {
1049       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1050       jj++;
1051     }
1052   }
1053 
1054   /* create our matrix */
1055   for ( i=0; i<m; i++ ) {
1056     ourlens[i] -= offlens[i];
1057   }
1058   if (type == MATMPIDENSE) {
1059     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1060   }
1061   A = *newmat;
1062   for ( i=0; i<m; i++ ) {
1063     ourlens[i] += offlens[i];
1064   }
1065 
1066   if (!rank) {
1067     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1068 
1069     /* read in my part of the matrix numerical values  */
1070     nz = procsnz[0];
1071     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1072 
1073     /* insert into matrix */
1074     jj      = rstart;
1075     smycols = mycols;
1076     svals   = vals;
1077     for ( i=0; i<m; i++ ) {
1078       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1079       smycols += ourlens[i];
1080       svals   += ourlens[i];
1081       jj++;
1082     }
1083 
1084     /* read in other processors and ship out */
1085     for ( i=1; i<size; i++ ) {
1086       nz = procsnz[i];
1087       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1088       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1089     }
1090     PetscFree(procsnz);
1091   }
1092   else {
1093     /* receive numeric values */
1094     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1095 
1096     /* receive message of values*/
1097     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1098     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1099     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1100 
1101     /* insert into matrix */
1102     jj      = rstart;
1103     smycols = mycols;
1104     svals   = vals;
1105     for ( i=0; i<m; i++ ) {
1106       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1107       smycols += ourlens[i];
1108       svals   += ourlens[i];
1109       jj++;
1110     }
1111   }
1112   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1113 
1114   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1115   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1116   return 0;
1117 }
1118