xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 2f8a729c95982efb3e50c82c45a5cc0e4d951579)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.13 1995/11/25 23:49:04 curfman Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
18 
19   if (mdn->insertmode != NOT_SET_VALUES && mdn->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   mdn->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= mdn->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       for ( j=0; j<n; j++ ) {
29         if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
30         if (idxn[j] >= mdn->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
31         ierr = MatSetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j,addv); CHKERRQ(ierr);
32       }
33     }
34     else {
35       ierr = StashValues_Private(&mdn->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
36     }
37   }
38   return 0;
39 }
40 
41 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
42 {
43   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
44   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
45 
46   if (!mdn->assembled) SETERRQ(1,"MatGetValues_MPIDense:Not for unassembled matrix");
47   for ( i=0; i<m; i++ ) {
48     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
49     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
50     if (idxm[i] >= rstart && idxm[i] < rend) {
51       row = idxm[i] - rstart;
52       for ( j=0; j<n; j++ ) {
53         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
54         if (idxn[j] >= mdn->N)
55           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
56         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
57       }
58     }
59     else {
60       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
61     }
62   }
63   return 0;
64 }
65 
66 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
67 {
68   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
69   MPI_Comm     comm = mat->comm;
70   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
71   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
72   int          tag = mat->tag, *owner,*starts,count,ierr;
73   InsertMode   addv;
74   MPI_Request  *send_waits,*recv_waits;
75   Scalar       *rvalues,*svalues;
76 
77   /* make sure all processors are either in INSERTMODE or ADDMODE */
78   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
79                 MPI_BOR,comm);
80   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
81     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
82     }
83   mdn->insertmode = addv; /* in case this processor had no cache */
84 
85   /*  first count number of contributors to each processor */
86   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
87   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
88   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
89   for ( i=0; i<mdn->stash.n; i++ ) {
90     idx = mdn->stash.idx[i];
91     for ( j=0; j<size; j++ ) {
92       if (idx >= owners[j] && idx < owners[j+1]) {
93         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
94       }
95     }
96   }
97   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
98 
99   /* inform other processors of number of messages and max length*/
100   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
101   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
102   nreceives = work[rank];
103   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
104   nmax = work[rank];
105   PetscFree(work);
106 
107   /* post receives:
108        1) each message will consist of ordered pairs
109      (global index,value) we store the global index as a double
110      to simplify the message passing.
111        2) since we don't know how long each individual message is we
112      allocate the largest needed buffer for each receive. Potentially
113      this is a lot of wasted space.
114 
115        This could be done better.
116   */
117   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
118   CHKPTRQ(rvalues);
119   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
120   CHKPTRQ(recv_waits);
121   for ( i=0; i<nreceives; i++ ) {
122     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
123               comm,recv_waits+i);
124   }
125 
126   /* do sends:
127       1) starts[i] gives the starting index in svalues for stuff going to
128          the ith processor
129   */
130   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
131   CHKPTRQ(svalues);
132   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
133   CHKPTRQ(send_waits);
134   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
135   starts[0] = 0;
136   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
137   for ( i=0; i<mdn->stash.n; i++ ) {
138     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
139     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
140     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
141   }
142   PetscFree(owner);
143   starts[0] = 0;
144   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
145   count = 0;
146   for ( i=0; i<size; i++ ) {
147     if (procs[i]) {
148       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
149                 comm,send_waits+count++);
150     }
151   }
152   PetscFree(starts); PetscFree(nprocs);
153 
154   /* Free cache space */
155   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
156 
157   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
158   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
159   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
160   mdn->rmax       = nmax;
161 
162   return 0;
163 }
164 extern int MatSetUpMultiply_MPIDense(Mat);
165 
166 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
167 {
168   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
169   MPI_Status   *send_status,recv_status;
170   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
171   Scalar       *values,val;
172   InsertMode   addv = mdn->insertmode;
173 
174   /*  wait on receives */
175   while (count) {
176     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
177     /* unpack receives into our local space */
178     values = mdn->rvalues + 3*imdex*mdn->rmax;
179     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
180     n = n/3;
181     for ( i=0; i<n; i++ ) {
182       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
183       col = (int) PETSCREAL(values[3*i+1]);
184       val = values[3*i+2];
185       if (col >= 0 && col < mdn->N) {
186         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
187       }
188       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
189     }
190     count--;
191   }
192   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
193 
194   /* wait on sends */
195   if (mdn->nsends) {
196     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
197     CHKPTRQ(send_status);
198     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
199     PetscFree(send_status);
200   }
201   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
202 
203   mdn->insertmode = NOT_SET_VALUES;
204   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
205   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
206 
207   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
208     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
209   }
210   mdn->assembled = 1;
211   return 0;
212 }
213 
214 static int MatZeroEntries_MPIDense(Mat A)
215 {
216   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
217   return MatZeroEntries(l->A);
218 }
219 
220 /* the code does not do the diagonal entries correctly unless the
221    matrix is square and the column and row owerships are identical.
222    This is a BUG. The only way to fix it seems to be to access
223    mdn->A and mdn->B directly and not through the MatZeroRows()
224    routine.
225 */
226 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
227 {
228   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
229   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
230   int            *procs,*nprocs,j,found,idx,nsends,*work;
231   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
232   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
233   int            *lens,imdex,*lrows,*values;
234   MPI_Comm       comm = A->comm;
235   MPI_Request    *send_waits,*recv_waits;
236   MPI_Status     recv_status,*send_status;
237   IS             istmp;
238 
239   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
240   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
241   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
242 
243   /*  first count number of contributors to each processor */
244   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
245   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
246   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
247   for ( i=0; i<N; i++ ) {
248     idx = rows[i];
249     found = 0;
250     for ( j=0; j<size; j++ ) {
251       if (idx >= owners[j] && idx < owners[j+1]) {
252         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
253       }
254     }
255     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
256   }
257   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
258 
259   /* inform other processors of number of messages and max length*/
260   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
261   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
262   nrecvs = work[rank];
263   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
264   nmax = work[rank];
265   PetscFree(work);
266 
267   /* post receives:   */
268   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
269   CHKPTRQ(rvalues);
270   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
271   CHKPTRQ(recv_waits);
272   for ( i=0; i<nrecvs; i++ ) {
273     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
274   }
275 
276   /* do sends:
277       1) starts[i] gives the starting index in svalues for stuff going to
278          the ith processor
279   */
280   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
281   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
282   CHKPTRQ(send_waits);
283   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
284   starts[0] = 0;
285   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
286   for ( i=0; i<N; i++ ) {
287     svalues[starts[owner[i]]++] = rows[i];
288   }
289   ISRestoreIndices(is,&rows);
290 
291   starts[0] = 0;
292   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
293   count = 0;
294   for ( i=0; i<size; i++ ) {
295     if (procs[i]) {
296       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
297     }
298   }
299   PetscFree(starts);
300 
301   base = owners[rank];
302 
303   /*  wait on receives */
304   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
305   source = lens + nrecvs;
306   count  = nrecvs; slen = 0;
307   while (count) {
308     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
309     /* unpack receives into our local space */
310     MPI_Get_count(&recv_status,MPI_INT,&n);
311     source[imdex]  = recv_status.MPI_SOURCE;
312     lens[imdex]  = n;
313     slen += n;
314     count--;
315   }
316   PetscFree(recv_waits);
317 
318   /* move the data into the send scatter */
319   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
320   count = 0;
321   for ( i=0; i<nrecvs; i++ ) {
322     values = rvalues + i*nmax;
323     for ( j=0; j<lens[i]; j++ ) {
324       lrows[count++] = values[j] - base;
325     }
326   }
327   PetscFree(rvalues); PetscFree(lens);
328   PetscFree(owner); PetscFree(nprocs);
329 
330   /* actually zap the local rows */
331   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
332   PLogObjectParent(A,istmp);
333   PetscFree(lrows);
334   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
335   ierr = ISDestroy(istmp); CHKERRQ(ierr);
336 
337   /* wait on sends */
338   if (nsends) {
339     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
340     CHKPTRQ(send_status);
341     MPI_Waitall(nsends,send_waits,send_status);
342     PetscFree(send_status);
343   }
344   PetscFree(send_waits); PetscFree(svalues);
345 
346   return 0;
347 }
348 
349 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
350 {
351   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
352   int          ierr;
353   if (!mdn->assembled)
354     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
355   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
356   CHKERRQ(ierr);
357   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
358   CHKERRQ(ierr);
359   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
360   return 0;
361 }
362 
363 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
364 {
365   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
366   int          ierr;
367   if (!mdn->assembled)
368     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
369   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
370   CHKERRQ(ierr);
371   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
372   CHKERRQ(ierr);
373   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
374   return 0;
375 }
376 
377 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
378 {
379   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
380   int          ierr;
381   Scalar       zero = 0.0;
382 
383   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
384   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
385   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
386   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
387          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
388   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
389          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
390   return 0;
391 }
392 
393 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
394 {
395   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
396   int          ierr;
397 
398   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
399   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
400   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
401   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
402          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
404          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
405   return 0;
406 }
407 
408 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
409 {
410   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
411   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
412   int          ierr, i, n, m = a->m, radd;
413   Scalar       *x;
414 
415   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
416   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
417   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
418   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
419   radd = a->rstart*m*m;
420   for ( i=0; i<m; i++ ) {
421     x[i] = aloc->v[radd + i*m + i];
422   }
423   return 0;
424 }
425 
426 static int MatDestroy_MPIDense(PetscObject obj)
427 {
428   Mat          mat = (Mat) obj;
429   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
430   int          ierr;
431 
432 #if defined(PETSC_LOG)
433   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
434 #endif
435   PetscFree(mdn->rowners);
436   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
437   if (mdn->lvec)   VecDestroy(mdn->lvec);
438   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
439   PetscFree(mdn);
440   PLogObjectDestroy(mat);
441   PetscHeaderDestroy(mat);
442   return 0;
443 }
444 
445 #include "pinclude/pviewer.h"
446 
447 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
448 {
449   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
450   int          ierr;
451   if (mdn->size == 1) {
452     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
453   }
454   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
455   return 0;
456 }
457 
458 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
459 {
460   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
461   int          ierr, format;
462   PetscObject  vobj = (PetscObject) viewer;
463   FILE         *fd;
464 
465   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
466   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
467     ierr = ViewerFileGetFormat_Private(viewer,&format);
468     if (format == FILE_FORMAT_INFO_DETAILED) {
469       int nz, nzalloc, mem, rank;
470       MPI_Comm_rank(mat->comm,&rank);
471       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
472       MPIU_Seq_begin(mat->comm,1);
473         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
474             rank,mdn->m,nz,nzalloc,mem);
475       fflush(fd);
476       MPIU_Seq_end(mat->comm,1);
477       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
478       return 0;
479     }
480     else if (format == FILE_FORMAT_INFO) {
481       return 0;
482     }
483   }
484   if (vobj->type == ASCII_FILE_VIEWER) {
485     MPIU_Seq_begin(mat->comm,1);
486     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
487              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
488     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
489     fflush(fd);
490     MPIU_Seq_end(mat->comm,1);
491   }
492   else {
493     int size = mdn->size, rank = mdn->rank;
494     if (size == 1) {
495       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
496     }
497     else {
498       /* assemble the entire matrix onto first processor. */
499       Mat          A;
500       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
501       Scalar       *vals;
502       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
503 
504       if (!rank) {
505         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,0,&A); CHKERRQ(ierr);
506       }
507       else {
508         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,0,&A); CHKERRQ(ierr);
509       }
510       PLogObjectParent(mat,A);
511 
512       /* Copy the matrix ... This isn't the most efficient means,
513          but it's quick for now */
514       row = mdn->rstart; m = Amdn->m;
515       for ( i=0; i<m; i++ ) {
516         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
517         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
518         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
519         row++;
520       }
521 
522       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
523       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
524       if (!rank) {
525         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
526       }
527       ierr = MatDestroy(A); CHKERRQ(ierr);
528     }
529   }
530   return 0;
531 }
532 
533 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
534 {
535   Mat          mat = (Mat) obj;
536   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
537   PetscObject  vobj = (PetscObject) viewer;
538   int          ierr;
539 
540   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
541   if (!viewer) {
542     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
543   }
544   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
545     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
546   }
547   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
548     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
549   }
550   else if (vobj->type == BINARY_FILE_VIEWER) {
551     return MatView_MPIDense_Binary(mat,viewer);
552   }
553   return 0;
554 }
555 
556 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
557                              int *nzalloc,int *mem)
558 {
559   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
560   Mat          mdn = mat->A;
561   int          ierr, isend[3], irecv[3];
562 
563   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
564   if (flag == MAT_LOCAL) {
565     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
566   } else if (flag == MAT_GLOBAL_MAX) {
567     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
568     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
569   } else if (flag == MAT_GLOBAL_SUM) {
570     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
571     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
572   }
573   return 0;
574 }
575 
576 static int MatSetOption_MPIDense(Mat A,MatOption op)
577 {
578   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
579 
580   if (op == NO_NEW_NONZERO_LOCATIONS ||
581       op == YES_NEW_NONZERO_LOCATIONS ||
582       op == COLUMNS_SORTED ||
583       op == ROW_ORIENTED) {
584         MatSetOption(a->A,op);
585   }
586   else if (op == ROWS_SORTED ||
587            op == SYMMETRIC_MATRIX ||
588            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
589            op == YES_NEW_DIAGONALS)
590     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
591   else if (op == COLUMN_ORIENTED)
592     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:COLUMN_ORIENTED");}
593   else if (op == NO_NEW_DIAGONALS)
594     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
595   else
596     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
597   return 0;
598 }
599 
600 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
601 {
602   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
603   *m = mat->M; *n = mat->N;
604   return 0;
605 }
606 
607 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
608 {
609   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
610   *m = mat->m; *n = mat->N;
611   return 0;
612 }
613 
614 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
615 {
616   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
617   *m = mat->rstart; *n = mat->rend;
618   return 0;
619 }
620 
621 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
622 {
623   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
624   int          lrow, rstart = mat->rstart, rend = mat->rend;
625 
626   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
627   lrow = row - rstart;
628   return MatGetRow(mat->A,lrow,nz,idx,v);
629 }
630 
631 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
632 {
633   if (idx) PetscFree(*idx);
634   if (v) PetscFree(*v);
635   return 0;
636 }
637 
638 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
639 {
640   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
641   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
642   int          ierr, i, j;
643   double       sum = 0.0;
644   Scalar       *v = mat->v;
645 
646   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
647   if (mdn->size == 1) {
648     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
649   } else {
650     if (type == NORM_FROBENIUS) {
651       for (i=0; i<mat->n*mat->m; i++ ) {
652 #if defined(PETSC_COMPLEX)
653         sum += real(conj(*v)*(*v)); v++;
654 #else
655         sum += (*v)*(*v); v++;
656 #endif
657       }
658       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
659       *norm = sqrt(*norm);
660       PLogFlops(2*mat->n*mat->m);
661     }
662     else if (type == NORM_1) {
663       double *tmp, *tmp2;
664       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
665       tmp2 = tmp + mdn->N;
666       PetscMemzero(tmp,2*mdn->N*sizeof(double));
667       *norm = 0.0;
668       v = mat->v;
669       for ( j=0; j<mat->n; j++ ) {
670         for ( i=0; i<mat->m; i++ ) {
671           tmp[j] += PetscAbsScalar(*v);  v++;
672         }
673       }
674       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
675       for ( j=0; j<mdn->N; j++ ) {
676         if (tmp2[j] > *norm) *norm = tmp2[j];
677       }
678       PetscFree(tmp);
679       PLogFlops(mat->n*mat->m);
680     }
681     else if (type == NORM_INFINITY) { /* max row norm */
682       double ntemp;
683       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
684       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
685     }
686     else {
687       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
688     }
689   }
690   return 0;
691 }
692 
693 static int MatTranspose_MPIDense(Mat A,Mat *matout)
694 {
695   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
696   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
697   Mat          B;
698   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
699   int          j, i, ierr;
700   Scalar       *v;
701 
702   if (!matout && M != N)
703     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
704   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,&B); CHKERRQ(ierr);
705 
706   m = Aloc->m; n = Aloc->n; v = Aloc->v;
707   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
708   for ( j=0; j<n; j++ ) {
709     for (i=0; i<m; i++) rwork[i] = rstart + i;
710     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
711     v += m;
712   }
713   PetscFree(rwork);
714   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
715   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
716   if (matout) {
717     *matout = B;
718   } else {
719     /* This isn't really an in-place transpose, but free data struct from a */
720     PetscFree(a->rowners);
721     ierr = MatDestroy(a->A); CHKERRQ(ierr);
722     if (a->lvec) VecDestroy(a->lvec);
723     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
724     PetscFree(a);
725     PetscMemcpy(A,B,sizeof(struct _Mat));
726     PetscHeaderDestroy(B);
727   }
728   return 0;
729 }
730 
731 static int MatCopyPrivate_MPIDense(Mat,Mat *,int);
732 
733 /* -------------------------------------------------------------------*/
734 static struct _MatOps MatOps = {MatSetValues_MPIDense,
735        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
736        MatMult_MPIDense,MatMultAdd_MPIDense,
737        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
738        0,0,
739        0,0,0,
740        0,0,MatTranspose_MPIDense,
741        MatGetInfo_MPIDense,0,
742        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
743        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
744        0,
745        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
746        0,
747        0,0,0,0,
748        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
749        MatGetOwnershipRange_MPIDense,
750        0,0,
751        0,0,0,0,0,MatCopyPrivate_MPIDense,
752        0,0,0,0,0,
753        0,0,MatGetValues_MPIDense};
754 
755 /*@C
756    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
757 
758    Input Parameters:
759 .  comm - MPI communicator
760 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
761 .  n - number of local columns (or PETSC_DECIDE to have calculated
762            if N is given)
763 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
764 .  N - number of global columns (or PETSC_DECIDE to have calculated
765            if n is given)
766 .  data - optional location of matrix data.  Set data=PetscNull for PETSc
767    to control all matrix memory allocation.
768 
769    Output Parameter:
770 .  newmat - the matrix
771 
772    Notes:
773    The dense format is fully compatible with standard Fortran 77
774    storage by columns.
775 
776    The data input variable is intended primarily for Fortran programmers
777    who wish to allocate their own matrix memory space.  Most users should
778    set data=PetscNull.
779 
780    The user MUST specify either the local or global matrix dimensions
781    (possibly both).
782 
783    Currently, the only parallel dense matrix decomposition is by rows,
784    so that n=N and each submatrix owns all of the global columns.
785 
786 .keywords: matrix, dense, parallel
787 
788 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
789 @*/
790 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
791 {
792   Mat          mat;
793   Mat_MPIDense *a;
794   int          ierr, i;
795 
796 /* Note:  for now, this assumes that the user knows what he's doing if
797    data is specified above.  */
798 
799   *newmat         = 0;
800   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
801   PLogObjectCreate(mat);
802   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
803   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
804   mat->destroy    = MatDestroy_MPIDense;
805   mat->view       = MatView_MPIDense;
806   mat->factor     = 0;
807 
808   a->insertmode = NOT_SET_VALUES;
809   MPI_Comm_rank(comm,&a->rank);
810   MPI_Comm_size(comm,&a->size);
811 
812   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
813   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
814 
815   /* each row stores all columns */
816   if (N == PETSC_DECIDE) N = n;
817   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
818   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
819   a->N = N;
820   a->M = M;
821   a->m = m;
822   a->n = n;
823 
824   /* build local table of row and column ownerships */
825   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
826   a->cowners = a->rowners + a->size + 1;
827   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
828                        sizeof(Mat_MPIDense));
829   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
830   a->rowners[0] = 0;
831   for ( i=2; i<=a->size; i++ ) {
832     a->rowners[i] += a->rowners[i-1];
833   }
834   a->rstart = a->rowners[a->rank];
835   a->rend   = a->rowners[a->rank+1];
836   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
837   a->cowners[0] = 0;
838   for ( i=2; i<=a->size; i++ ) {
839     a->cowners[i] += a->cowners[i-1];
840   }
841 
842   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
843   PLogObjectParent(mat,a->A);
844 
845   /* build cache for off array entries formed */
846   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
847 
848   /* stuff used for matrix vector multiply */
849   a->lvec      = 0;
850   a->Mvctx     = 0;
851   a->assembled = 0;
852 
853   *newmat = mat;
854   return 0;
855 }
856 
857 static int MatCopyPrivate_MPIDense(Mat A,Mat *newmat,int cpvalues)
858 {
859   Mat          mat;
860   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
861   int          ierr;
862 
863   if (!oldmat->assembled) SETERRQ(1,"MatCopyPrivate_MPIDense:Must assemble matrix");
864   *newmat       = 0;
865   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
866   PLogObjectCreate(mat);
867   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
868   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
869   mat->destroy  = MatDestroy_MPIDense;
870   mat->view     = MatView_MPIDense;
871   mat->factor   = A->factor;
872 
873   a->m          = oldmat->m;
874   a->n          = oldmat->n;
875   a->M          = oldmat->M;
876   a->N          = oldmat->N;
877 
878   a->assembled  = 1;
879   a->rstart     = oldmat->rstart;
880   a->rend       = oldmat->rend;
881   a->size       = oldmat->size;
882   a->rank       = oldmat->rank;
883   a->insertmode = NOT_SET_VALUES;
884 
885   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
886   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
887   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
888   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
889 
890   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
891   PLogObjectParent(mat,a->lvec);
892   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
893   PLogObjectParent(mat,a->Mvctx);
894   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
895   PLogObjectParent(mat,a->A);
896   *newmat = mat;
897   return 0;
898 }
899 
900 #include "sysio.h"
901 
902 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
903 {
904   Mat          A;
905   int          i, nz, ierr, j,rstart, rend, fd;
906   Scalar       *vals,*svals;
907   PetscObject  vobj = (PetscObject) bview;
908   MPI_Comm     comm = vobj->comm;
909   MPI_Status   status;
910   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
911   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
912   int          tag = ((PetscObject)bview)->tag;
913 
914   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
915   if (!rank) {
916     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
917     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
918     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
919   }
920 
921   MPI_Bcast(header+1,3,MPI_INT,0,comm);
922   M = header[1]; N = header[2];
923   /* determine ownership of all rows */
924   m = M/size + ((M % size) > rank);
925   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
926   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
927   rowners[0] = 0;
928   for ( i=2; i<=size; i++ ) {
929     rowners[i] += rowners[i-1];
930   }
931   rstart = rowners[rank];
932   rend   = rowners[rank+1];
933 
934   /* distribute row lengths to all processors */
935   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
936   offlens = ourlens + (rend-rstart);
937   if (!rank) {
938     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
939     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
940     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
941     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
942     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
943     PetscFree(sndcounts);
944   }
945   else {
946     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
947   }
948 
949   if (!rank) {
950     /* calculate the number of nonzeros on each processor */
951     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
952     PetscMemzero(procsnz,size*sizeof(int));
953     for ( i=0; i<size; i++ ) {
954       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
955         procsnz[i] += rowlengths[j];
956       }
957     }
958     PetscFree(rowlengths);
959 
960     /* determine max buffer needed and allocate it */
961     maxnz = 0;
962     for ( i=0; i<size; i++ ) {
963       maxnz = PetscMax(maxnz,procsnz[i]);
964     }
965     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
966 
967     /* read in my part of the matrix column indices  */
968     nz = procsnz[0];
969     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
970     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
971 
972     /* read in every one elses and ship off */
973     for ( i=1; i<size; i++ ) {
974       nz = procsnz[i];
975       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
976       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
977     }
978     PetscFree(cols);
979   }
980   else {
981     /* determine buffer space needed for message */
982     nz = 0;
983     for ( i=0; i<m; i++ ) {
984       nz += ourlens[i];
985     }
986     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
987 
988     /* receive message of column indices*/
989     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
990     MPI_Get_count(&status,MPI_INT,&maxnz);
991     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
992   }
993 
994   /* loop over local rows, determining number of off diagonal entries */
995   PetscMemzero(offlens,m*sizeof(int));
996   jj = 0;
997   for ( i=0; i<m; i++ ) {
998     for ( j=0; j<ourlens[i]; j++ ) {
999       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1000       jj++;
1001     }
1002   }
1003 
1004   /* create our matrix */
1005   for ( i=0; i<m; i++ ) {
1006     ourlens[i] -= offlens[i];
1007   }
1008   if (type == MATMPIDENSE) {
1009     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,0,newmat); CHKERRQ(ierr);
1010   }
1011   A = *newmat;
1012   for ( i=0; i<m; i++ ) {
1013     ourlens[i] += offlens[i];
1014   }
1015 
1016   if (!rank) {
1017     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1018 
1019     /* read in my part of the matrix numerical values  */
1020     nz = procsnz[0];
1021     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1022 
1023     /* insert into matrix */
1024     jj      = rstart;
1025     smycols = mycols;
1026     svals   = vals;
1027     for ( i=0; i<m; i++ ) {
1028       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1029       smycols += ourlens[i];
1030       svals   += ourlens[i];
1031       jj++;
1032     }
1033 
1034     /* read in other processors and ship out */
1035     for ( i=1; i<size; i++ ) {
1036       nz = procsnz[i];
1037       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1038       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1039     }
1040     PetscFree(procsnz);
1041   }
1042   else {
1043     /* receive numeric values */
1044     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1045 
1046     /* receive message of values*/
1047     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1048     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1049     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1050 
1051     /* insert into matrix */
1052     jj      = rstart;
1053     smycols = mycols;
1054     svals   = vals;
1055     for ( i=0; i<m; i++ ) {
1056       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1057       smycols += ourlens[i];
1058       svals   += ourlens[i];
1059       jj++;
1060     }
1061   }
1062   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1063 
1064   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1065   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1066   return 0;
1067 }
1068