xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 3638b69dbd64ba4780c119c4ea508ad72b54579c)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.27 1996/01/26 04:33:42 bsmith Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 
12 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
13                                int *idxn,Scalar *v,InsertMode addv)
14 {
15   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
16   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
17   int          roworiented = A->roworiented;
18 
19   if (A->insertmode != NOT_SET_VALUES && A->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   A->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= A->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       }
31       else {
32         for ( j=0; j<n; j++ ) {
33           if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
34           if (idxn[j] >= A->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
35           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
36         }
37       }
38     }
39     else {
40       if (roworiented) {
41         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
42       }
43       else { /* must stash each seperately */
44         row = idxm[i];
45         for ( j=0; j<n; j++ ) {
46           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);
47                  CHKERRQ(ierr);
48         }
49       }
50     }
51   }
52   return 0;
53 }
54 
55 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
56 {
57   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
58   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
59 
60   for ( i=0; i<m; i++ ) {
61     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
62     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
63     if (idxm[i] >= rstart && idxm[i] < rend) {
64       row = idxm[i] - rstart;
65       for ( j=0; j<n; j++ ) {
66         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
67         if (idxn[j] >= mdn->N)
68           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
69         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
70       }
71     }
72     else {
73       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
74     }
75   }
76   return 0;
77 }
78 
79 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
80 {
81   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
82   MPI_Comm     comm = mat->comm;
83   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
84   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
85   int          tag = mat->tag, *owner,*starts,count,ierr;
86   InsertMode   addv;
87   MPI_Request  *send_waits,*recv_waits;
88   Scalar       *rvalues,*svalues;
89 
90   /* make sure all processors are either in INSERTMODE or ADDMODE */
91   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
92                 MPI_BOR,comm);
93   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
94     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
95     }
96   mdn->insertmode = addv; /* in case this processor had no cache */
97 
98   /*  first count number of contributors to each processor */
99   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
100   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
101   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
102   for ( i=0; i<mdn->stash.n; i++ ) {
103     idx = mdn->stash.idx[i];
104     for ( j=0; j<size; j++ ) {
105       if (idx >= owners[j] && idx < owners[j+1]) {
106         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
107       }
108     }
109   }
110   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
111 
112   /* inform other processors of number of messages and max length*/
113   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
114   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
115   nreceives = work[rank];
116   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
117   nmax = work[rank];
118   PetscFree(work);
119 
120   /* post receives:
121        1) each message will consist of ordered pairs
122      (global index,value) we store the global index as a double
123      to simplify the message passing.
124        2) since we don't know how long each individual message is we
125      allocate the largest needed buffer for each receive. Potentially
126      this is a lot of wasted space.
127 
128        This could be done better.
129   */
130   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
131   CHKPTRQ(rvalues);
132   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
133   CHKPTRQ(recv_waits);
134   for ( i=0; i<nreceives; i++ ) {
135     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
136               comm,recv_waits+i);
137   }
138 
139   /* do sends:
140       1) starts[i] gives the starting index in svalues for stuff going to
141          the ith processor
142   */
143   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
144   CHKPTRQ(svalues);
145   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
146   CHKPTRQ(send_waits);
147   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
148   starts[0] = 0;
149   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
150   for ( i=0; i<mdn->stash.n; i++ ) {
151     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
152     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
153     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
154   }
155   PetscFree(owner);
156   starts[0] = 0;
157   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
158   count = 0;
159   for ( i=0; i<size; i++ ) {
160     if (procs[i]) {
161       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
162                 comm,send_waits+count++);
163     }
164   }
165   PetscFree(starts); PetscFree(nprocs);
166 
167   /* Free cache space */
168   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
169 
170   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
171   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
172   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
173   mdn->rmax       = nmax;
174 
175   return 0;
176 }
177 extern int MatSetUpMultiply_MPIDense(Mat);
178 
179 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
180 {
181   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
182   MPI_Status   *send_status,recv_status;
183   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
184   Scalar       *values,val;
185   InsertMode   addv = mdn->insertmode;
186 
187   /*  wait on receives */
188   while (count) {
189     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
190     /* unpack receives into our local space */
191     values = mdn->rvalues + 3*imdex*mdn->rmax;
192     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
193     n = n/3;
194     for ( i=0; i<n; i++ ) {
195       row = (int) PetscReal(values[3*i]) - mdn->rstart;
196       col = (int) PetscReal(values[3*i+1]);
197       val = values[3*i+2];
198       if (col >= 0 && col < mdn->N) {
199         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
200       }
201       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
202     }
203     count--;
204   }
205   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
206 
207   /* wait on sends */
208   if (mdn->nsends) {
209     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
210     CHKPTRQ(send_status);
211     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
212     PetscFree(send_status);
213   }
214   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
215 
216   mdn->insertmode = NOT_SET_VALUES;
217   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
218   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
219 
220   if (!mat->was_assembled && mode == FINAL_ASSEMBLY) {
221     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
222   }
223   return 0;
224 }
225 
226 static int MatZeroEntries_MPIDense(Mat A)
227 {
228   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
229   return MatZeroEntries(l->A);
230 }
231 
232 /* the code does not do the diagonal entries correctly unless the
233    matrix is square and the column and row owerships are identical.
234    This is a BUG. The only way to fix it seems to be to access
235    mdn->A and mdn->B directly and not through the MatZeroRows()
236    routine.
237 */
238 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
239 {
240   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
241   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
242   int            *procs,*nprocs,j,found,idx,nsends,*work;
243   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
244   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
245   int            *lens,imdex,*lrows,*values;
246   MPI_Comm       comm = A->comm;
247   MPI_Request    *send_waits,*recv_waits;
248   MPI_Status     recv_status,*send_status;
249   IS             istmp;
250 
251   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
252   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
253 
254   /*  first count number of contributors to each processor */
255   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
256   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
257   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
258   for ( i=0; i<N; i++ ) {
259     idx = rows[i];
260     found = 0;
261     for ( j=0; j<size; j++ ) {
262       if (idx >= owners[j] && idx < owners[j+1]) {
263         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
264       }
265     }
266     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
267   }
268   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
269 
270   /* inform other processors of number of messages and max length*/
271   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
272   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
273   nrecvs = work[rank];
274   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
275   nmax = work[rank];
276   PetscFree(work);
277 
278   /* post receives:   */
279   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
280   CHKPTRQ(rvalues);
281   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
282   CHKPTRQ(recv_waits);
283   for ( i=0; i<nrecvs; i++ ) {
284     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
285   }
286 
287   /* do sends:
288       1) starts[i] gives the starting index in svalues for stuff going to
289          the ith processor
290   */
291   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
292   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
293   CHKPTRQ(send_waits);
294   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
295   starts[0] = 0;
296   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
297   for ( i=0; i<N; i++ ) {
298     svalues[starts[owner[i]]++] = rows[i];
299   }
300   ISRestoreIndices(is,&rows);
301 
302   starts[0] = 0;
303   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
304   count = 0;
305   for ( i=0; i<size; i++ ) {
306     if (procs[i]) {
307       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
308     }
309   }
310   PetscFree(starts);
311 
312   base = owners[rank];
313 
314   /*  wait on receives */
315   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
316   source = lens + nrecvs;
317   count  = nrecvs; slen = 0;
318   while (count) {
319     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
320     /* unpack receives into our local space */
321     MPI_Get_count(&recv_status,MPI_INT,&n);
322     source[imdex]  = recv_status.MPI_SOURCE;
323     lens[imdex]  = n;
324     slen += n;
325     count--;
326   }
327   PetscFree(recv_waits);
328 
329   /* move the data into the send scatter */
330   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
331   count = 0;
332   for ( i=0; i<nrecvs; i++ ) {
333     values = rvalues + i*nmax;
334     for ( j=0; j<lens[i]; j++ ) {
335       lrows[count++] = values[j] - base;
336     }
337   }
338   PetscFree(rvalues); PetscFree(lens);
339   PetscFree(owner); PetscFree(nprocs);
340 
341   /* actually zap the local rows */
342   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
343   PLogObjectParent(A,istmp);
344   PetscFree(lrows);
345   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
346   ierr = ISDestroy(istmp); CHKERRQ(ierr);
347 
348   /* wait on sends */
349   if (nsends) {
350     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
351     CHKPTRQ(send_status);
352     MPI_Waitall(nsends,send_waits,send_status);
353     PetscFree(send_status);
354   }
355   PetscFree(send_waits); PetscFree(svalues);
356 
357   return 0;
358 }
359 
360 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
361 {
362   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
363   int          ierr;
364 
365   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
366   CHKERRQ(ierr);
367   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
368   CHKERRQ(ierr);
369   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
370   return 0;
371 }
372 
373 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
374 {
375   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
376   int          ierr;
377 
378   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
379   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);CHKERRQ(ierr);
380   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
381   return 0;
382 }
383 
384 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
385 {
386   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
387   int          ierr;
388   Scalar       zero = 0.0;
389 
390   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
391   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
392   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
393          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
394   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
395          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
396   return 0;
397 }
398 
399 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
400 {
401   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
402   int          ierr;
403 
404   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
405   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
406   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
407          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
408   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
409          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
410   return 0;
411 }
412 
413 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
414 {
415   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
416   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
417   int          ierr, i, n, m = a->m, radd;
418   Scalar       *x;
419 
420   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
421   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
422   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
423   radd = a->rstart*m*m;
424   for ( i=0; i<m; i++ ) {
425     x[i] = aloc->v[radd + i*m + i];
426   }
427   return 0;
428 }
429 
430 static int MatDestroy_MPIDense(PetscObject obj)
431 {
432   Mat          mat = (Mat) obj;
433   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
434   int          ierr;
435 
436 #if defined(PETSC_LOG)
437   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
438 #endif
439   PetscFree(mdn->rowners);
440   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
441   if (mdn->lvec)   VecDestroy(mdn->lvec);
442   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
443   if (mdn->factor) {
444     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
445     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
446     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
447     PetscFree(mdn->factor);
448   }
449   PetscFree(mdn);
450   PLogObjectDestroy(mat);
451   PetscHeaderDestroy(mat);
452   return 0;
453 }
454 
455 #include "pinclude/pviewer.h"
456 
457 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
458 {
459   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
460   int          ierr;
461   if (mdn->size == 1) {
462     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
463   }
464   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
465   return 0;
466 }
467 
468 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
469 {
470   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
471   int          ierr, format;
472   PetscObject  vobj = (PetscObject) viewer;
473   FILE         *fd;
474 
475   ierr = ViewerFileGetPointer(viewer,&fd); CHKERRQ(ierr);
476   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
477     ierr = ViewerFileGetFormat_Private(viewer,&format);
478     if (format == FILE_FORMAT_INFO_DETAILED) {
479       int nz, nzalloc, mem, rank;
480       MPI_Comm_rank(mat->comm,&rank);
481       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
482       MPIU_Seq_begin(mat->comm,1);
483         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
484             rank,mdn->m,nz,nzalloc,mem);
485       fflush(fd);
486       MPIU_Seq_end(mat->comm,1);
487       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
488       return 0;
489     }
490     else if (format == FILE_FORMAT_INFO) {
491       return 0;
492     }
493   }
494   if (vobj->type == ASCII_FILE_VIEWER) {
495     MPIU_Seq_begin(mat->comm,1);
496     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
497              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
498     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
499     fflush(fd);
500     MPIU_Seq_end(mat->comm,1);
501   }
502   else {
503     int size = mdn->size, rank = mdn->rank;
504     if (size == 1) {
505       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
506     }
507     else {
508       /* assemble the entire matrix onto first processor. */
509       Mat          A;
510       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
511       Scalar       *vals;
512       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
513 
514       if (!rank) {
515         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
516       }
517       else {
518         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
519       }
520       PLogObjectParent(mat,A);
521 
522       /* Copy the matrix ... This isn't the most efficient means,
523          but it's quick for now */
524       row = mdn->rstart; m = Amdn->m;
525       for ( i=0; i<m; i++ ) {
526         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
527         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
528         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
529         row++;
530       }
531 
532       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
533       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
534       if (!rank) {
535         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
536       }
537       ierr = MatDestroy(A); CHKERRQ(ierr);
538     }
539   }
540   return 0;
541 }
542 
543 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
544 {
545   Mat          mat = (Mat) obj;
546   PetscObject  vobj = (PetscObject) viewer;
547   int          ierr;
548 
549   if (!viewer) {
550     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
551   }
552   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
553     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
554   }
555   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
556     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
557   }
558   else if (vobj->type == BINARY_FILE_VIEWER) {
559     return MatView_MPIDense_Binary(mat,viewer);
560   }
561   return 0;
562 }
563 
564 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
565                              int *nzalloc,int *mem)
566 {
567   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
568   Mat          mdn = mat->A;
569   int          ierr, isend[3], irecv[3];
570 
571   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
572   if (flag == MAT_LOCAL) {
573     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
574   } else if (flag == MAT_GLOBAL_MAX) {
575     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
576     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
577   } else if (flag == MAT_GLOBAL_SUM) {
578     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
579     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
580   }
581   return 0;
582 }
583 
584 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
585    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
586    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
587    extern int MatSolve_MPIDense(Mat,Vec,Vec);
588    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
589    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
590    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
591 
592 static int MatSetOption_MPIDense(Mat A,MatOption op)
593 {
594   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
595 
596   if (op == NO_NEW_NONZERO_LOCATIONS ||
597       op == YES_NEW_NONZERO_LOCATIONS ||
598       op == COLUMNS_SORTED ||
599       op == ROW_ORIENTED) {
600         MatSetOption(a->A,op);
601   }
602   else if (op == ROWS_SORTED ||
603            op == SYMMETRIC_MATRIX ||
604            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
605            op == YES_NEW_DIAGONALS)
606     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
607   else if (op == COLUMN_ORIENTED)
608     { a->roworiented = 0; MatSetOption(a->A,op);}
609   else if (op == NO_NEW_DIAGONALS)
610     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
611   else
612     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
613   return 0;
614 }
615 
616 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
617 {
618   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
619   *m = mat->M; *n = mat->N;
620   return 0;
621 }
622 
623 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
624 {
625   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
626   *m = mat->m; *n = mat->N;
627   return 0;
628 }
629 
630 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
631 {
632   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
633   *m = mat->rstart; *n = mat->rend;
634   return 0;
635 }
636 
637 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
638 {
639   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
640   int          lrow, rstart = mat->rstart, rend = mat->rend;
641 
642   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
643   lrow = row - rstart;
644   return MatGetRow(mat->A,lrow,nz,idx,v);
645 }
646 
647 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
648 {
649   if (idx) PetscFree(*idx);
650   if (v) PetscFree(*v);
651   return 0;
652 }
653 
654 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
655 {
656   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
657   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
658   int          ierr, i, j;
659   double       sum = 0.0;
660   Scalar       *v = mat->v;
661 
662   if (mdn->size == 1) {
663     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
664   } else {
665     if (type == NORM_FROBENIUS) {
666       for (i=0; i<mat->n*mat->m; i++ ) {
667 #if defined(PETSC_COMPLEX)
668         sum += real(conj(*v)*(*v)); v++;
669 #else
670         sum += (*v)*(*v); v++;
671 #endif
672       }
673       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
674       *norm = sqrt(*norm);
675       PLogFlops(2*mat->n*mat->m);
676     }
677     else if (type == NORM_1) {
678       double *tmp, *tmp2;
679       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
680       tmp2 = tmp + mdn->N;
681       PetscMemzero(tmp,2*mdn->N*sizeof(double));
682       *norm = 0.0;
683       v = mat->v;
684       for ( j=0; j<mat->n; j++ ) {
685         for ( i=0; i<mat->m; i++ ) {
686           tmp[j] += PetscAbsScalar(*v);  v++;
687         }
688       }
689       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
690       for ( j=0; j<mdn->N; j++ ) {
691         if (tmp2[j] > *norm) *norm = tmp2[j];
692       }
693       PetscFree(tmp);
694       PLogFlops(mat->n*mat->m);
695     }
696     else if (type == NORM_INFINITY) { /* max row norm */
697       double ntemp;
698       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
699       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
700     }
701     else {
702       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
703     }
704   }
705   return 0;
706 }
707 
708 static int MatTranspose_MPIDense(Mat A,Mat *matout)
709 {
710   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
711   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
712   Mat          B;
713   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
714   int          j, i, ierr;
715   Scalar       *v;
716 
717   if (matout == PETSC_NULL && M != N)
718     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
719   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
720          CHKERRQ(ierr);
721 
722   m = Aloc->m; n = Aloc->n; v = Aloc->v;
723   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
724   for ( j=0; j<n; j++ ) {
725     for (i=0; i<m; i++) rwork[i] = rstart + i;
726     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
727     v += m;
728   }
729   PetscFree(rwork);
730   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
731   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
732   if (matout != PETSC_NULL) {
733     *matout = B;
734   } else {
735     /* This isn't really an in-place transpose, but free data struct from a */
736     PetscFree(a->rowners);
737     ierr = MatDestroy(a->A); CHKERRQ(ierr);
738     if (a->lvec) VecDestroy(a->lvec);
739     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
740     PetscFree(a);
741     PetscMemcpy(A,B,sizeof(struct _Mat));
742     PetscHeaderDestroy(B);
743   }
744   return 0;
745 }
746 
747 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
748 
749 /* -------------------------------------------------------------------*/
750 static struct _MatOps MatOps = {MatSetValues_MPIDense,
751        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
752        MatMult_MPIDense,MatMultAdd_MPIDense,
753        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
754 /*       MatSolve_MPIDense,0, */
755        0,0,
756        0,0,
757        0,0,
758 /*       MatLUFactor_MPIDense,0, */
759        0,MatTranspose_MPIDense,
760        MatGetInfo_MPIDense,0,
761        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
762        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
763        0,
764        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
765        0,0,0,
766 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
767        0,0,
768        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
769        MatGetOwnershipRange_MPIDense,
770        0,0,
771        0,0,0,0,0,MatConvertSameType_MPIDense,
772        0,0,0,0,0,
773        0,0,MatGetValues_MPIDense};
774 
775 /*@C
776    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
777 
778    Input Parameters:
779 .  comm - MPI communicator
780 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
781 .  n - number of local columns (or PETSC_DECIDE to have calculated
782            if N is given)
783 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
784 .  N - number of global columns (or PETSC_DECIDE to have calculated
785            if n is given)
786 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
787    to control all matrix memory allocation.
788 
789    Output Parameter:
790 .  newmat - the matrix
791 
792    Notes:
793    The dense format is fully compatible with standard Fortran 77
794    storage by columns.
795 
796    The data input variable is intended primarily for Fortran programmers
797    who wish to allocate their own matrix memory space.  Most users should
798    set data=PETSC_NULL.
799 
800    The user MUST specify either the local or global matrix dimensions
801    (possibly both).
802 
803    Currently, the only parallel dense matrix decomposition is by rows,
804    so that n=N and each submatrix owns all of the global columns.
805 
806 .keywords: matrix, dense, parallel
807 
808 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
809 @*/
810 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
811 {
812   Mat          mat;
813   Mat_MPIDense *a;
814   int          ierr, i,flg;
815 
816 /* Note:  For now, when data is specified above, this assumes the user correctly
817    allocates the local dense storage space.  We should add error checking. */
818 
819   *newmat         = 0;
820   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
821   PLogObjectCreate(mat);
822   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
823   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
824   mat->destroy    = MatDestroy_MPIDense;
825   mat->view       = MatView_MPIDense;
826   mat->factor     = 0;
827 
828   a->factor = 0;
829   a->insertmode = NOT_SET_VALUES;
830   MPI_Comm_rank(comm,&a->rank);
831   MPI_Comm_size(comm,&a->size);
832 
833   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
834   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
835 
836   /* each row stores all columns */
837   if (N == PETSC_DECIDE) N = n;
838   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
839   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
840   a->N = N;
841   a->M = M;
842   a->m = m;
843   a->n = n;
844 
845   /* build local table of row and column ownerships */
846   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
847   a->cowners = a->rowners + a->size + 1;
848   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
849                        sizeof(Mat_MPIDense));
850   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
851   a->rowners[0] = 0;
852   for ( i=2; i<=a->size; i++ ) {
853     a->rowners[i] += a->rowners[i-1];
854   }
855   a->rstart = a->rowners[a->rank];
856   a->rend   = a->rowners[a->rank+1];
857   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
858   a->cowners[0] = 0;
859   for ( i=2; i<=a->size; i++ ) {
860     a->cowners[i] += a->cowners[i-1];
861   }
862 
863   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
864   PLogObjectParent(mat,a->A);
865 
866   /* build cache for off array entries formed */
867   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
868 
869   /* stuff used for matrix vector multiply */
870   a->lvec        = 0;
871   a->Mvctx       = 0;
872   a->roworiented = 1;
873 
874   *newmat = mat;
875   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
876   if (flg) {
877     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
878   }
879   return 0;
880 }
881 
882 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
883 {
884   Mat          mat;
885   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
886   int          ierr;
887   FactorCtx    *factor;
888 
889   *newmat       = 0;
890   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
891   PLogObjectCreate(mat);
892   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
893   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
894   mat->destroy  = MatDestroy_MPIDense;
895   mat->view     = MatView_MPIDense;
896   mat->factor   = A->factor;
897   mat->assembled  = PETSC_TRUE;
898 
899   a->m          = oldmat->m;
900   a->n          = oldmat->n;
901   a->M          = oldmat->M;
902   a->N          = oldmat->N;
903   if (oldmat->factor) {
904     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
905     /* copy factor contents ... add this code! */
906   } else a->factor = 0;
907 
908   a->rstart     = oldmat->rstart;
909   a->rend       = oldmat->rend;
910   a->size       = oldmat->size;
911   a->rank       = oldmat->rank;
912   a->insertmode = NOT_SET_VALUES;
913 
914   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
915   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
916   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
917   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
918 
919   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
920   PLogObjectParent(mat,a->lvec);
921   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
922   PLogObjectParent(mat,a->Mvctx);
923   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
924   PLogObjectParent(mat,a->A);
925   *newmat = mat;
926   return 0;
927 }
928 
929 #include "sysio.h"
930 
931 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
932 {
933   Mat          A;
934   int          i, nz, ierr, j,rstart, rend, fd;
935   Scalar       *vals,*svals;
936   PetscObject  vobj = (PetscObject) bview;
937   MPI_Comm     comm = vobj->comm;
938   MPI_Status   status;
939   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
940   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
941   int          tag = ((PetscObject)bview)->tag;
942 
943   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
944   if (!rank) {
945     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
946     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
947     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
948   }
949 
950   MPI_Bcast(header+1,3,MPI_INT,0,comm);
951   M = header[1]; N = header[2];
952   /* determine ownership of all rows */
953   m = M/size + ((M % size) > rank);
954   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
955   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
956   rowners[0] = 0;
957   for ( i=2; i<=size; i++ ) {
958     rowners[i] += rowners[i-1];
959   }
960   rstart = rowners[rank];
961   rend   = rowners[rank+1];
962 
963   /* distribute row lengths to all processors */
964   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
965   offlens = ourlens + (rend-rstart);
966   if (!rank) {
967     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
968     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
969     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
970     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
971     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
972     PetscFree(sndcounts);
973   }
974   else {
975     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
976   }
977 
978   if (!rank) {
979     /* calculate the number of nonzeros on each processor */
980     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
981     PetscMemzero(procsnz,size*sizeof(int));
982     for ( i=0; i<size; i++ ) {
983       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
984         procsnz[i] += rowlengths[j];
985       }
986     }
987     PetscFree(rowlengths);
988 
989     /* determine max buffer needed and allocate it */
990     maxnz = 0;
991     for ( i=0; i<size; i++ ) {
992       maxnz = PetscMax(maxnz,procsnz[i]);
993     }
994     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
995 
996     /* read in my part of the matrix column indices  */
997     nz = procsnz[0];
998     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
999     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
1000 
1001     /* read in every one elses and ship off */
1002     for ( i=1; i<size; i++ ) {
1003       nz = procsnz[i];
1004       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
1005       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1006     }
1007     PetscFree(cols);
1008   }
1009   else {
1010     /* determine buffer space needed for message */
1011     nz = 0;
1012     for ( i=0; i<m; i++ ) {
1013       nz += ourlens[i];
1014     }
1015     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1016 
1017     /* receive message of column indices*/
1018     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1019     MPI_Get_count(&status,MPI_INT,&maxnz);
1020     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1021   }
1022 
1023   /* loop over local rows, determining number of off diagonal entries */
1024   PetscMemzero(offlens,m*sizeof(int));
1025   jj = 0;
1026   for ( i=0; i<m; i++ ) {
1027     for ( j=0; j<ourlens[i]; j++ ) {
1028       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1029       jj++;
1030     }
1031   }
1032 
1033   /* create our matrix */
1034   for ( i=0; i<m; i++ ) {
1035     ourlens[i] -= offlens[i];
1036   }
1037   if (type == MATMPIDENSE) {
1038     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1039   }
1040   A = *newmat;
1041   for ( i=0; i<m; i++ ) {
1042     ourlens[i] += offlens[i];
1043   }
1044 
1045   if (!rank) {
1046     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1047 
1048     /* read in my part of the matrix numerical values  */
1049     nz = procsnz[0];
1050     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1051 
1052     /* insert into matrix */
1053     jj      = rstart;
1054     smycols = mycols;
1055     svals   = vals;
1056     for ( i=0; i<m; i++ ) {
1057       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1058       smycols += ourlens[i];
1059       svals   += ourlens[i];
1060       jj++;
1061     }
1062 
1063     /* read in other processors and ship out */
1064     for ( i=1; i<size; i++ ) {
1065       nz = procsnz[i];
1066       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1067       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1068     }
1069     PetscFree(procsnz);
1070   }
1071   else {
1072     /* receive numeric values */
1073     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1074 
1075     /* receive message of values*/
1076     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1077     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1078     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1079 
1080     /* insert into matrix */
1081     jj      = rstart;
1082     smycols = mycols;
1083     svals   = vals;
1084     for ( i=0; i<m; i++ ) {
1085       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1086       smycols += ourlens[i];
1087       svals   += ourlens[i];
1088       jj++;
1089     }
1090   }
1091   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1092 
1093   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1094   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1095   return 0;
1096 }
1097