xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision ead193b4e97231db95de86d0444f02f44fa4015f)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.32 1996/03/10 17:28:08 bsmith Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int ierr;
83 
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   return 0;
86 }
87 
88 static int MatRestoreArray_MPIDense(Mat A,Scalar **array)
89 {
90   return 0;
91 }
92 
93 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
94 {
95   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
96   MPI_Comm     comm = mat->comm;
97   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
98   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
99   int          tag = mat->tag, *owner,*starts,count,ierr;
100   InsertMode   addv;
101   MPI_Request  *send_waits,*recv_waits;
102   Scalar       *rvalues,*svalues;
103 
104   /* make sure all processors are either in INSERTMODE or ADDMODE */
105   MPI_Allreduce(&mdn->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
106   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
107     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
108     }
109   mdn->insertmode = addv; /* in case this processor had no cache */
110 
111   /*  first count number of contributors to each processor */
112   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
113   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
114   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
115   for ( i=0; i<mdn->stash.n; i++ ) {
116     idx = mdn->stash.idx[i];
117     for ( j=0; j<size; j++ ) {
118       if (idx >= owners[j] && idx < owners[j+1]) {
119         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
120       }
121     }
122   }
123   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
124 
125   /* inform other processors of number of messages and max length*/
126   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
127   MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);
128   nreceives = work[rank];
129   MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);
130   nmax = work[rank];
131   PetscFree(work);
132 
133   /* post receives:
134        1) each message will consist of ordered pairs
135      (global index,value) we store the global index as a double
136      to simplify the message passing.
137        2) since we don't know how long each individual message is we
138      allocate the largest needed buffer for each receive. Potentially
139      this is a lot of wasted space.
140 
141        This could be done better.
142   */
143   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
144   CHKPTRQ(rvalues);
145   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
146   CHKPTRQ(recv_waits);
147   for ( i=0; i<nreceives; i++ ) {
148     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
149               comm,recv_waits+i);
150   }
151 
152   /* do sends:
153       1) starts[i] gives the starting index in svalues for stuff going to
154          the ith processor
155   */
156   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
157   CHKPTRQ(svalues);
158   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
159   CHKPTRQ(send_waits);
160   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
161   starts[0] = 0;
162   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
163   for ( i=0; i<mdn->stash.n; i++ ) {
164     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
165     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
166     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
167   }
168   PetscFree(owner);
169   starts[0] = 0;
170   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
171   count = 0;
172   for ( i=0; i<size; i++ ) {
173     if (procs[i]) {
174       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
175                 comm,send_waits+count++);
176     }
177   }
178   PetscFree(starts); PetscFree(nprocs);
179 
180   /* Free cache space */
181   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
182 
183   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
184   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
185   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
186   mdn->rmax       = nmax;
187 
188   return 0;
189 }
190 extern int MatSetUpMultiply_MPIDense(Mat);
191 
192 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
193 {
194   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
195   MPI_Status   *send_status,recv_status;
196   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
197   Scalar       *values,val;
198   InsertMode   addv = mdn->insertmode;
199 
200   /*  wait on receives */
201   while (count) {
202     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
203     /* unpack receives into our local space */
204     values = mdn->rvalues + 3*imdex*mdn->rmax;
205     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
206     n = n/3;
207     for ( i=0; i<n; i++ ) {
208       row = (int) PetscReal(values[3*i]) - mdn->rstart;
209       col = (int) PetscReal(values[3*i+1]);
210       val = values[3*i+2];
211       if (col >= 0 && col < mdn->N) {
212         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
213       }
214       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
215     }
216     count--;
217   }
218   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
219 
220   /* wait on sends */
221   if (mdn->nsends) {
222     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
223     CHKPTRQ(send_status);
224     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
225     PetscFree(send_status);
226   }
227   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
228 
229   mdn->insertmode = NOT_SET_VALUES;
230   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
231   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
232 
233   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
234     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
235   }
236   return 0;
237 }
238 
239 static int MatZeroEntries_MPIDense(Mat A)
240 {
241   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
242   return MatZeroEntries(l->A);
243 }
244 
245 /* the code does not do the diagonal entries correctly unless the
246    matrix is square and the column and row owerships are identical.
247    This is a BUG. The only way to fix it seems to be to access
248    mdn->A and mdn->B directly and not through the MatZeroRows()
249    routine.
250 */
251 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
252 {
253   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
254   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
255   int            *procs,*nprocs,j,found,idx,nsends,*work;
256   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
257   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
258   int            *lens,imdex,*lrows,*values;
259   MPI_Comm       comm = A->comm;
260   MPI_Request    *send_waits,*recv_waits;
261   MPI_Status     recv_status,*send_status;
262   IS             istmp;
263 
264   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
265   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
266 
267   /*  first count number of contributors to each processor */
268   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
269   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
270   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
271   for ( i=0; i<N; i++ ) {
272     idx = rows[i];
273     found = 0;
274     for ( j=0; j<size; j++ ) {
275       if (idx >= owners[j] && idx < owners[j+1]) {
276         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
277       }
278     }
279     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
280   }
281   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
282 
283   /* inform other processors of number of messages and max length*/
284   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
285   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
286   nrecvs = work[rank];
287   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
288   nmax = work[rank];
289   PetscFree(work);
290 
291   /* post receives:   */
292   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
293   CHKPTRQ(rvalues);
294   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
295   CHKPTRQ(recv_waits);
296   for ( i=0; i<nrecvs; i++ ) {
297     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
298   }
299 
300   /* do sends:
301       1) starts[i] gives the starting index in svalues for stuff going to
302          the ith processor
303   */
304   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
305   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
306   CHKPTRQ(send_waits);
307   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
308   starts[0] = 0;
309   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
310   for ( i=0; i<N; i++ ) {
311     svalues[starts[owner[i]]++] = rows[i];
312   }
313   ISRestoreIndices(is,&rows);
314 
315   starts[0] = 0;
316   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
317   count = 0;
318   for ( i=0; i<size; i++ ) {
319     if (procs[i]) {
320       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
321     }
322   }
323   PetscFree(starts);
324 
325   base = owners[rank];
326 
327   /*  wait on receives */
328   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
329   source = lens + nrecvs;
330   count  = nrecvs; slen = 0;
331   while (count) {
332     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
333     /* unpack receives into our local space */
334     MPI_Get_count(&recv_status,MPI_INT,&n);
335     source[imdex]  = recv_status.MPI_SOURCE;
336     lens[imdex]  = n;
337     slen += n;
338     count--;
339   }
340   PetscFree(recv_waits);
341 
342   /* move the data into the send scatter */
343   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
344   count = 0;
345   for ( i=0; i<nrecvs; i++ ) {
346     values = rvalues + i*nmax;
347     for ( j=0; j<lens[i]; j++ ) {
348       lrows[count++] = values[j] - base;
349     }
350   }
351   PetscFree(rvalues); PetscFree(lens);
352   PetscFree(owner); PetscFree(nprocs);
353 
354   /* actually zap the local rows */
355   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
356   PLogObjectParent(A,istmp);
357   PetscFree(lrows);
358   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
359   ierr = ISDestroy(istmp); CHKERRQ(ierr);
360 
361   /* wait on sends */
362   if (nsends) {
363     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
364     CHKPTRQ(send_status);
365     MPI_Waitall(nsends,send_waits,send_status);
366     PetscFree(send_status);
367   }
368   PetscFree(send_waits); PetscFree(svalues);
369 
370   return 0;
371 }
372 
373 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
374 {
375   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
376   int          ierr;
377 
378   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
379   CHKERRQ(ierr);
380   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
381   CHKERRQ(ierr);
382   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
383   return 0;
384 }
385 
386 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
387 {
388   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
389   int          ierr;
390 
391   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
392   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
393   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
394   return 0;
395 }
396 
397 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
398 {
399   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
400   int          ierr;
401   Scalar       zero = 0.0;
402 
403   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
404   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
406          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
407   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
408          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
409   return 0;
410 }
411 
412 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   int          ierr;
416 
417   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
418   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
419   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
420          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
421   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
422          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
423   return 0;
424 }
425 
426 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
427 {
428   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
429   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
430   int          ierr, i, n, m = a->m, radd;
431   Scalar       *x;
432 
433   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
434   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
435   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
436   radd = a->rstart*m*m;
437   for ( i=0; i<m; i++ ) {
438     x[i] = aloc->v[radd + i*m + i];
439   }
440   return 0;
441 }
442 
443 static int MatDestroy_MPIDense(PetscObject obj)
444 {
445   Mat          mat = (Mat) obj;
446   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
447   int          ierr;
448 
449 #if defined(PETSC_LOG)
450   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
451 #endif
452   PetscFree(mdn->rowners);
453   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
454   if (mdn->lvec)   VecDestroy(mdn->lvec);
455   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
456   if (mdn->factor) {
457     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
458     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
459     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
460     PetscFree(mdn->factor);
461   }
462   PetscFree(mdn);
463   PLogObjectDestroy(mat);
464   PetscHeaderDestroy(mat);
465   return 0;
466 }
467 
468 #include "pinclude/pviewer.h"
469 
470 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
471 {
472   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
473   int          ierr;
474   if (mdn->size == 1) {
475     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
476   }
477   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
478   return 0;
479 }
480 
481 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
482 {
483   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
484   int          ierr, format;
485   FILE         *fd;
486   ViewerType   vtype;
487 
488   ViewerGetType(viewer,&vtype);
489   ierr = ViewerFileGetPointer(viewer,&fd); CHKERRQ(ierr);
490   ierr = ViewerFileGetFormat_Private(viewer,&format);
491   if (format == FILE_FORMAT_INFO_DETAILED) {
492     int nz, nzalloc, mem, rank;
493     MPI_Comm_rank(mat->comm,&rank);
494     ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
495     MPIU_Seq_begin(mat->comm,1);
496       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
497           rank,mdn->m,nz,nzalloc,mem);
498       fflush(fd);
499     MPIU_Seq_end(mat->comm,1);
500     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
501     return 0;
502   }
503   else if (format == FILE_FORMAT_INFO) {
504     return 0;
505   }
506   if (vtype == ASCII_FILE_VIEWER) {
507     MPIU_Seq_begin(mat->comm,1);
508     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
509              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
510     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
511     fflush(fd);
512     MPIU_Seq_end(mat->comm,1);
513   }
514   else {
515     int size = mdn->size, rank = mdn->rank;
516     if (size == 1) {
517       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
518     }
519     else {
520       /* assemble the entire matrix onto first processor. */
521       Mat          A;
522       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
523       Scalar       *vals;
524       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
525 
526       if (!rank) {
527         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
528       }
529       else {
530         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
531       }
532       PLogObjectParent(mat,A);
533 
534       /* Copy the matrix ... This isn't the most efficient means,
535          but it's quick for now */
536       row = mdn->rstart; m = Amdn->m;
537       for ( i=0; i<m; i++ ) {
538         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
539         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
540         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
541         row++;
542       }
543 
544       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
545       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
546       if (!rank) {
547         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
548       }
549       ierr = MatDestroy(A); CHKERRQ(ierr);
550     }
551   }
552   return 0;
553 }
554 
555 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
556 {
557   Mat          mat = (Mat) obj;
558   int          ierr;
559   ViewerType   vtype;
560 
561   if (!viewer) {
562     viewer = STDOUT_VIEWER_SELF;
563   }
564   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
565   if (vtype == ASCII_FILE_VIEWER) {
566     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
567   }
568   else if (vtype == ASCII_FILES_VIEWER) {
569     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
570   }
571   else if (vtype == BINARY_FILE_VIEWER) {
572     return MatView_MPIDense_Binary(mat,viewer);
573   }
574   return 0;
575 }
576 
577 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
578                              int *nzalloc,int *mem)
579 {
580   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
581   Mat          mdn = mat->A;
582   int          ierr, isend[3], irecv[3];
583 
584   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
585   if (flag == MAT_LOCAL) {
586     if (nz)      *nz      = isend[0];
587     if (nzalloc) *nzalloc = isend[1];
588     if (mem)     *mem     = isend[2];
589   } else if (flag == MAT_GLOBAL_MAX) {
590     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
591     if (nz)      *nz      = irecv[0];
592     if (nzalloc) *nzalloc = irecv[1];
593     if (mem)     *mem     = irecv[2];
594   } else if (flag == MAT_GLOBAL_SUM) {
595     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
596     if (nz)      *nz      = irecv[0];
597     if (nzalloc) *nzalloc = irecv[1];
598     if (mem)     *mem     = irecv[2];
599   }
600   return 0;
601 }
602 
603 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
604    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
605    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
606    extern int MatSolve_MPIDense(Mat,Vec,Vec);
607    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
608    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
609    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
610 
611 static int MatSetOption_MPIDense(Mat A,MatOption op)
612 {
613   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
614 
615   if (op == NO_NEW_NONZERO_LOCATIONS ||
616       op == YES_NEW_NONZERO_LOCATIONS ||
617       op == COLUMNS_SORTED ||
618       op == ROW_ORIENTED) {
619         MatSetOption(a->A,op);
620   }
621   else if (op == ROWS_SORTED ||
622            op == SYMMETRIC_MATRIX ||
623            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
624            op == YES_NEW_DIAGONALS)
625     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
626   else if (op == COLUMN_ORIENTED)
627     { a->roworiented = 0; MatSetOption(a->A,op);}
628   else if (op == NO_NEW_DIAGONALS)
629     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
630   else
631     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
632   return 0;
633 }
634 
635 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
636 {
637   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
638   *m = mat->M; *n = mat->N;
639   return 0;
640 }
641 
642 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
643 {
644   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
645   *m = mat->m; *n = mat->N;
646   return 0;
647 }
648 
649 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
650 {
651   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
652   *m = mat->rstart; *n = mat->rend;
653   return 0;
654 }
655 
656 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
657 {
658   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
659   int          lrow, rstart = mat->rstart, rend = mat->rend;
660 
661   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
662   lrow = row - rstart;
663   return MatGetRow(mat->A,lrow,nz,idx,v);
664 }
665 
666 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
667 {
668   if (idx) PetscFree(*idx);
669   if (v) PetscFree(*v);
670   return 0;
671 }
672 
673 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
674 {
675   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
676   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
677   int          ierr, i, j;
678   double       sum = 0.0;
679   Scalar       *v = mat->v;
680 
681   if (mdn->size == 1) {
682     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
683   } else {
684     if (type == NORM_FROBENIUS) {
685       for (i=0; i<mat->n*mat->m; i++ ) {
686 #if defined(PETSC_COMPLEX)
687         sum += real(conj(*v)*(*v)); v++;
688 #else
689         sum += (*v)*(*v); v++;
690 #endif
691       }
692       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
693       *norm = sqrt(*norm);
694       PLogFlops(2*mat->n*mat->m);
695     }
696     else if (type == NORM_1) {
697       double *tmp, *tmp2;
698       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
699       tmp2 = tmp + mdn->N;
700       PetscMemzero(tmp,2*mdn->N*sizeof(double));
701       *norm = 0.0;
702       v = mat->v;
703       for ( j=0; j<mat->n; j++ ) {
704         for ( i=0; i<mat->m; i++ ) {
705           tmp[j] += PetscAbsScalar(*v);  v++;
706         }
707       }
708       MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
709       for ( j=0; j<mdn->N; j++ ) {
710         if (tmp2[j] > *norm) *norm = tmp2[j];
711       }
712       PetscFree(tmp);
713       PLogFlops(mat->n*mat->m);
714     }
715     else if (type == NORM_INFINITY) { /* max row norm */
716       double ntemp;
717       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
718       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
719     }
720     else {
721       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
722     }
723   }
724   return 0;
725 }
726 
727 static int MatTranspose_MPIDense(Mat A,Mat *matout)
728 {
729   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
730   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
731   Mat          B;
732   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
733   int          j, i, ierr;
734   Scalar       *v;
735 
736   if (matout == PETSC_NULL && M != N)
737     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
738   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
739          CHKERRQ(ierr);
740 
741   m = Aloc->m; n = Aloc->n; v = Aloc->v;
742   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
743   for ( j=0; j<n; j++ ) {
744     for (i=0; i<m; i++) rwork[i] = rstart + i;
745     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
746     v += m;
747   }
748   PetscFree(rwork);
749   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
750   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
751   if (matout != PETSC_NULL) {
752     *matout = B;
753   } else {
754     /* This isn't really an in-place transpose, but free data struct from a */
755     PetscFree(a->rowners);
756     ierr = MatDestroy(a->A); CHKERRQ(ierr);
757     if (a->lvec) VecDestroy(a->lvec);
758     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
759     PetscFree(a);
760     PetscMemcpy(A,B,sizeof(struct _Mat));
761     PetscHeaderDestroy(B);
762   }
763   return 0;
764 }
765 
766 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
767 
768 /* -------------------------------------------------------------------*/
769 static struct _MatOps MatOps = {MatSetValues_MPIDense,
770        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
771        MatMult_MPIDense,MatMultAdd_MPIDense,
772        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
773 /*       MatSolve_MPIDense,0, */
774        0,0,
775        0,0,
776        0,0,
777 /*       MatLUFactor_MPIDense,0, */
778        0,MatTranspose_MPIDense,
779        MatGetInfo_MPIDense,0,
780        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
781        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
782        0,
783        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
784        0,0,0,
785 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
786        0,0,
787        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
788        MatGetOwnershipRange_MPIDense,
789        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
790        0,0,0,MatConvertSameType_MPIDense,
791        0,0,0,0,0,
792        0,0,MatGetValues_MPIDense};
793 
794 /*@C
795    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
796 
797    Input Parameters:
798 .  comm - MPI communicator
799 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
800 .  n - number of local columns (or PETSC_DECIDE to have calculated
801            if N is given)
802 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
803 .  N - number of global columns (or PETSC_DECIDE to have calculated
804            if n is given)
805 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
806    to control all matrix memory allocation.
807 
808    Output Parameter:
809 .  newmat - the matrix
810 
811    Notes:
812    The dense format is fully compatible with standard Fortran 77
813    storage by columns.
814 
815    The data input variable is intended primarily for Fortran programmers
816    who wish to allocate their own matrix memory space.  Most users should
817    set data=PETSC_NULL.
818 
819    The user MUST specify either the local or global matrix dimensions
820    (possibly both).
821 
822    Currently, the only parallel dense matrix decomposition is by rows,
823    so that n=N and each submatrix owns all of the global columns.
824 
825 .keywords: matrix, dense, parallel
826 
827 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
828 @*/
829 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
830 {
831   Mat          mat;
832   Mat_MPIDense *a;
833   int          ierr, i,flg;
834 
835 /* Note:  For now, when data is specified above, this assumes the user correctly
836    allocates the local dense storage space.  We should add error checking. */
837 
838   *newmat         = 0;
839   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
840   PLogObjectCreate(mat);
841   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
842   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
843   mat->destroy    = MatDestroy_MPIDense;
844   mat->view       = MatView_MPIDense;
845   mat->factor     = 0;
846 
847   a->factor = 0;
848   a->insertmode = NOT_SET_VALUES;
849   MPI_Comm_rank(comm,&a->rank);
850   MPI_Comm_size(comm,&a->size);
851 
852   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
853   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
854 
855   /* each row stores all columns */
856   if (N == PETSC_DECIDE) N = n;
857   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
858   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
859   a->N = N;
860   a->M = M;
861   a->m = m;
862   a->n = n;
863 
864   /* build local table of row and column ownerships */
865   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
866   a->cowners = a->rowners + a->size + 1;
867   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
868                        sizeof(Mat_MPIDense));
869   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
870   a->rowners[0] = 0;
871   for ( i=2; i<=a->size; i++ ) {
872     a->rowners[i] += a->rowners[i-1];
873   }
874   a->rstart = a->rowners[a->rank];
875   a->rend   = a->rowners[a->rank+1];
876   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
877   a->cowners[0] = 0;
878   for ( i=2; i<=a->size; i++ ) {
879     a->cowners[i] += a->cowners[i-1];
880   }
881 
882   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
883   PLogObjectParent(mat,a->A);
884 
885   /* build cache for off array entries formed */
886   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
887 
888   /* stuff used for matrix vector multiply */
889   a->lvec        = 0;
890   a->Mvctx       = 0;
891   a->roworiented = 1;
892 
893   *newmat = mat;
894   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
895   if (flg) {
896     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
897   }
898   return 0;
899 }
900 
901 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
902 {
903   Mat          mat;
904   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
905   int          ierr;
906   FactorCtx    *factor;
907 
908   *newmat       = 0;
909   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
910   PLogObjectCreate(mat);
911   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
912   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
913   mat->destroy  = MatDestroy_MPIDense;
914   mat->view     = MatView_MPIDense;
915   mat->factor   = A->factor;
916   mat->assembled  = PETSC_TRUE;
917 
918   a->m          = oldmat->m;
919   a->n          = oldmat->n;
920   a->M          = oldmat->M;
921   a->N          = oldmat->N;
922   if (oldmat->factor) {
923     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
924     /* copy factor contents ... add this code! */
925   } else a->factor = 0;
926 
927   a->rstart     = oldmat->rstart;
928   a->rend       = oldmat->rend;
929   a->size       = oldmat->size;
930   a->rank       = oldmat->rank;
931   a->insertmode = NOT_SET_VALUES;
932 
933   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
934   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
935   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
936   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
937 
938   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
939   PLogObjectParent(mat,a->lvec);
940   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
941   PLogObjectParent(mat,a->Mvctx);
942   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
943   PLogObjectParent(mat,a->A);
944   *newmat = mat;
945   return 0;
946 }
947 
948 #include "sysio.h"
949 
950 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
951 {
952   Mat          A;
953   int          i, nz, ierr, j,rstart, rend, fd;
954   Scalar       *vals,*svals;
955   MPI_Comm     comm = ((PetscObject)viewer)->comm;
956   MPI_Status   status;
957   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
958   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
959   int          tag = ((PetscObject)viewer)->tag;
960 
961   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
962   if (!rank) {
963     ierr = ViewerFileGetDescriptor(viewer,&fd); CHKERRQ(ierr);
964     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
965     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
966   }
967 
968   MPI_Bcast(header+1,3,MPI_INT,0,comm);
969   M = header[1]; N = header[2];
970   /* determine ownership of all rows */
971   m = M/size + ((M % size) > rank);
972   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
973   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
974   rowners[0] = 0;
975   for ( i=2; i<=size; i++ ) {
976     rowners[i] += rowners[i-1];
977   }
978   rstart = rowners[rank];
979   rend   = rowners[rank+1];
980 
981   /* distribute row lengths to all processors */
982   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
983   offlens = ourlens + (rend-rstart);
984   if (!rank) {
985     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
986     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
987     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
988     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
989     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
990     PetscFree(sndcounts);
991   }
992   else {
993     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
994   }
995 
996   if (!rank) {
997     /* calculate the number of nonzeros on each processor */
998     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
999     PetscMemzero(procsnz,size*sizeof(int));
1000     for ( i=0; i<size; i++ ) {
1001       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1002         procsnz[i] += rowlengths[j];
1003       }
1004     }
1005     PetscFree(rowlengths);
1006 
1007     /* determine max buffer needed and allocate it */
1008     maxnz = 0;
1009     for ( i=0; i<size; i++ ) {
1010       maxnz = PetscMax(maxnz,procsnz[i]);
1011     }
1012     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1013 
1014     /* read in my part of the matrix column indices  */
1015     nz = procsnz[0];
1016     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1017     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1018 
1019     /* read in every one elses and ship off */
1020     for ( i=1; i<size; i++ ) {
1021       nz = procsnz[i];
1022       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1023       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1024     }
1025     PetscFree(cols);
1026   }
1027   else {
1028     /* determine buffer space needed for message */
1029     nz = 0;
1030     for ( i=0; i<m; i++ ) {
1031       nz += ourlens[i];
1032     }
1033     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1034 
1035     /* receive message of column indices*/
1036     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1037     MPI_Get_count(&status,MPI_INT,&maxnz);
1038     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1039   }
1040 
1041   /* loop over local rows, determining number of off diagonal entries */
1042   PetscMemzero(offlens,m*sizeof(int));
1043   jj = 0;
1044   for ( i=0; i<m; i++ ) {
1045     for ( j=0; j<ourlens[i]; j++ ) {
1046       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1047       jj++;
1048     }
1049   }
1050 
1051   /* create our matrix */
1052   for ( i=0; i<m; i++ ) {
1053     ourlens[i] -= offlens[i];
1054   }
1055   if (type == MATMPIDENSE) {
1056     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1057   }
1058   A = *newmat;
1059   for ( i=0; i<m; i++ ) {
1060     ourlens[i] += offlens[i];
1061   }
1062 
1063   if (!rank) {
1064     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1065 
1066     /* read in my part of the matrix numerical values  */
1067     nz = procsnz[0];
1068     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1069 
1070     /* insert into matrix */
1071     jj      = rstart;
1072     smycols = mycols;
1073     svals   = vals;
1074     for ( i=0; i<m; i++ ) {
1075       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1076       smycols += ourlens[i];
1077       svals   += ourlens[i];
1078       jj++;
1079     }
1080 
1081     /* read in other processors and ship out */
1082     for ( i=1; i<size; i++ ) {
1083       nz = procsnz[i];
1084       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1085       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1086     }
1087     PetscFree(procsnz);
1088   }
1089   else {
1090     /* receive numeric values */
1091     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1092 
1093     /* receive message of values*/
1094     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1095     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1096     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1097 
1098     /* insert into matrix */
1099     jj      = rstart;
1100     smycols = mycols;
1101     svals   = vals;
1102     for ( i=0; i<m; i++ ) {
1103       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1104       smycols += ourlens[i];
1105       svals   += ourlens[i];
1106       jj++;
1107     }
1108   }
1109   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1110 
1111   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1112   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1113   return 0;
1114 }
1115