xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 08480c60afa5ef1d2e4e27b9ebdf48b02c6a2186)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.4 1995/10/22 22:17:20 curfman Exp curfman $";
3 #endif
4 
5 #include "mpidense.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
10                                int *idxn,Scalar *v,InsertMode addv)
11 {
12   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
13   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
14 
15   if (mdn->insertmode != NOT_SET_VALUES && mdn->insertmode != addv) {
16     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
17   }
18   mdn->insertmode = addv;
19   for ( i=0; i<m; i++ ) {
20     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
21     if (idxm[i] >= mdn->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
22     if (idxm[i] >= rstart && idxm[i] < rend) {
23       row = idxm[i] - rstart;
24       for ( j=0; j<n; j++ ) {
25         if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
26         if (idxn[j] >= mdn->N)
27           SETERRQ(1,"MatSetValues_MPIDense:Column too large");
28         ierr = MatSetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j,addv);
29         CHKERRQ(ierr);
30       }
31     }
32     else {
33       ierr = StashValues_Private(&mdn->stash,idxm[i],n,idxn,v+i*n,addv);
34       CHKERRQ(ierr);
35     }
36   }
37   return 0;
38 }
39 
40 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
41 {
42   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
43   MPI_Comm     comm = mat->comm;
44   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
45   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
46   int          tag = mat->tag, *owner,*starts,count,ierr;
47   InsertMode   addv;
48   MPI_Request  *send_waits,*recv_waits;
49   Scalar       *rvalues,*svalues;
50 
51   /* make sure all processors are either in INSERTMODE or ADDMODE */
52   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
53                 MPI_BOR,comm);
54   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
55     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
56     }
57   mdn->insertmode = addv; /* in case this processor had no cache */
58 
59   /*  first count number of contributors to each processor */
60   nprocs = (int *) PETSCMALLOC( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
61   PetscZero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
62   owner = (int *) PETSCMALLOC( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
63   for ( i=0; i<mdn->stash.n; i++ ) {
64     idx = mdn->stash.idx[i];
65     for ( j=0; j<size; j++ ) {
66       if (idx >= owners[j] && idx < owners[j+1]) {
67         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
68       }
69     }
70   }
71   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
72 
73   /* inform other processors of number of messages and max length*/
74   work = (int *) PETSCMALLOC( size*sizeof(int) ); CHKPTRQ(work);
75   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
76   nreceives = work[rank];
77   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
78   nmax = work[rank];
79   PETSCFREE(work);
80 
81   /* post receives:
82        1) each message will consist of ordered pairs
83      (global index,value) we store the global index as a double
84      to simplify the message passing.
85        2) since we don't know how long each individual message is we
86      allocate the largest needed buffer for each receive. Potentially
87      this is a lot of wasted space.
88 
89        This could be done better.
90   */
91   rvalues = (Scalar *) PETSCMALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
92   CHKPTRQ(rvalues);
93   recv_waits = (MPI_Request *) PETSCMALLOC((nreceives+1)*sizeof(MPI_Request));
94   CHKPTRQ(recv_waits);
95   for ( i=0; i<nreceives; i++ ) {
96     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
97               comm,recv_waits+i);
98   }
99 
100   /* do sends:
101       1) starts[i] gives the starting index in svalues for stuff going to
102          the ith processor
103   */
104   svalues = (Scalar *) PETSCMALLOC( 3*(mdn->stash.n+1)*sizeof(Scalar) );
105   CHKPTRQ(svalues);
106   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
107   CHKPTRQ(send_waits);
108   starts = (int *) PETSCMALLOC( size*sizeof(int) ); CHKPTRQ(starts);
109   starts[0] = 0;
110   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
111   for ( i=0; i<mdn->stash.n; i++ ) {
112     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
113     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
114     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
115   }
116   PETSCFREE(owner);
117   starts[0] = 0;
118   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
119   count = 0;
120   for ( i=0; i<size; i++ ) {
121     if (procs[i]) {
122       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
123                 comm,send_waits+count++);
124     }
125   }
126   PETSCFREE(starts); PETSCFREE(nprocs);
127 
128   /* Free cache space */
129   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
130 
131   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
132   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
133   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
134   mdn->rmax       = nmax;
135 
136   return 0;
137 }
138 extern int MatSetUpMultiply_MPIDense(Mat);
139 
140 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
141 {
142   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
143   MPI_Status   *send_status,recv_status;
144   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
145   Scalar       *values,val;
146   InsertMode   addv = mdn->insertmode;
147 
148   /*  wait on receives */
149   while (count) {
150     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
151     /* unpack receives into our local space */
152     values = mdn->rvalues + 3*imdex*mdn->rmax;
153     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
154     n = n/3;
155     for ( i=0; i<n; i++ ) {
156       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
157       col = (int) PETSCREAL(values[3*i+1]);
158       val = values[3*i+2];
159       if (col >= 0 && col < mdn->N) {
160         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
161       }
162       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
163     }
164     count--;
165   }
166   PETSCFREE(mdn->recv_waits); PETSCFREE(mdn->rvalues);
167 
168   /* wait on sends */
169   if (mdn->nsends) {
170     send_status = (MPI_Status *) PETSCMALLOC( mdn->nsends*sizeof(MPI_Status) );
171     CHKPTRQ(send_status);
172     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
173     PETSCFREE(send_status);
174   }
175   PETSCFREE(mdn->send_waits); PETSCFREE(mdn->svalues);
176 
177   mdn->insertmode = NOT_SET_VALUES;
178   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
179   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
180 
181   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
182     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
183   }
184   mdn->assembled = 1;
185   return 0;
186 }
187 
188 static int MatZeroEntries_MPIDense(Mat A)
189 {
190   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
191   return MatZeroEntries(l->A);
192 }
193 
194 /* the code does not do the diagonal entries correctly unless the
195    matrix is square and the column and row owerships are identical.
196    This is a BUG. The only way to fix it seems to be to access
197    mdn->A and mdn->B directly and not through the MatZeroRows()
198    routine.
199 */
200 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
201 {
202   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
203   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
204   int            *procs,*nprocs,j,found,idx,nsends,*work;
205   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
206   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
207   int            *lens,imdex,*lrows,*values;
208   MPI_Comm       comm = A->comm;
209   MPI_Request    *send_waits,*recv_waits;
210   MPI_Status     recv_status,*send_status;
211   IS             istmp;
212 
213   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
214   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
215   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
216 
217   /*  first count number of contributors to each processor */
218   nprocs = (int *) PETSCMALLOC( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
219   PetscZero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
220   owner = (int *) PETSCMALLOC((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
221   for ( i=0; i<N; i++ ) {
222     idx = rows[i];
223     found = 0;
224     for ( j=0; j<size; j++ ) {
225       if (idx >= owners[j] && idx < owners[j+1]) {
226         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
227       }
228     }
229     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
230   }
231   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
232 
233   /* inform other processors of number of messages and max length*/
234   work = (int *) PETSCMALLOC( size*sizeof(int) ); CHKPTRQ(work);
235   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
236   nrecvs = work[rank];
237   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
238   nmax = work[rank];
239   PETSCFREE(work);
240 
241   /* post receives:   */
242   rvalues = (int *) PETSCMALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
243   CHKPTRQ(rvalues);
244   recv_waits = (MPI_Request *) PETSCMALLOC((nrecvs+1)*sizeof(MPI_Request));
245   CHKPTRQ(recv_waits);
246   for ( i=0; i<nrecvs; i++ ) {
247     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
248   }
249 
250   /* do sends:
251       1) starts[i] gives the starting index in svalues for stuff going to
252          the ith processor
253   */
254   svalues = (int *) PETSCMALLOC( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
255   send_waits = (MPI_Request *) PETSCMALLOC( (nsends+1)*sizeof(MPI_Request));
256   CHKPTRQ(send_waits);
257   starts = (int *) PETSCMALLOC( (size+1)*sizeof(int) ); CHKPTRQ(starts);
258   starts[0] = 0;
259   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
260   for ( i=0; i<N; i++ ) {
261     svalues[starts[owner[i]]++] = rows[i];
262   }
263   ISRestoreIndices(is,&rows);
264 
265   starts[0] = 0;
266   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
267   count = 0;
268   for ( i=0; i<size; i++ ) {
269     if (procs[i]) {
270       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
271     }
272   }
273   PETSCFREE(starts);
274 
275   base = owners[rank];
276 
277   /*  wait on receives */
278   lens   = (int *) PETSCMALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
279   source = lens + nrecvs;
280   count  = nrecvs; slen = 0;
281   while (count) {
282     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
283     /* unpack receives into our local space */
284     MPI_Get_count(&recv_status,MPI_INT,&n);
285     source[imdex]  = recv_status.MPI_SOURCE;
286     lens[imdex]  = n;
287     slen += n;
288     count--;
289   }
290   PETSCFREE(recv_waits);
291 
292   /* move the data into the send scatter */
293   lrows = (int *) PETSCMALLOC( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
294   count = 0;
295   for ( i=0; i<nrecvs; i++ ) {
296     values = rvalues + i*nmax;
297     for ( j=0; j<lens[i]; j++ ) {
298       lrows[count++] = values[j] - base;
299     }
300   }
301   PETSCFREE(rvalues); PETSCFREE(lens);
302   PETSCFREE(owner); PETSCFREE(nprocs);
303 
304   /* actually zap the local rows */
305   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
306   PLogObjectParent(A,istmp);
307   PETSCFREE(lrows);
308   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
309   ierr = ISDestroy(istmp); CHKERRQ(ierr);
310 
311   /* wait on sends */
312   if (nsends) {
313     send_status = (MPI_Status *) PETSCMALLOC(nsends*sizeof(MPI_Status));
314     CHKPTRQ(send_status);
315     MPI_Waitall(nsends,send_waits,send_status);
316     PETSCFREE(send_status);
317   }
318   PETSCFREE(send_waits); PETSCFREE(svalues);
319 
320   return 0;
321 }
322 
323 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
324 {
325   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
326   int          ierr;
327   if (!mdn->assembled)
328     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
329   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
330   CHKERRQ(ierr);
331   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
332   CHKERRQ(ierr);
333   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
334   return 0;
335 }
336 
337 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
338 {
339   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
340   int          ierr;
341   if (!mdn->assembled)
342     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
343   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
344   CHKERRQ(ierr);
345   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
346   CHKERRQ(ierr);
347   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
348   return 0;
349 }
350 
351 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
352 {
353   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
354   int          ierr;
355   Scalar       zero = 0.0;
356 
357   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
358   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
359   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
360   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
361          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
362   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
363          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
364   return 0;
365 }
366 
367 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
368 {
369   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
370   int          ierr;
371 
372   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
373   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
374   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
375   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
376          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
377   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
378          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
379   return 0;
380 }
381 
382 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
383 {
384   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
385   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
386   int          ierr, i, n, m = a->m, radd;
387   Scalar       *x;
388   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
389   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
390   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
391   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
392   radd = a->rstart*m*m;
393   for ( i=0; i<m; i++ ) {
394     x[i] = aloc->v[radd + i*m + i];
395   }
396   return 0;
397 }
398 
399 static int MatDestroy_MPIDense(PetscObject obj)
400 {
401   Mat          mat = (Mat) obj;
402   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
403   int          ierr;
404 #if defined(PETSC_LOG)
405   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
406 #endif
407   PETSCFREE(mdn->rowners);
408   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
409   if (mdn->lvec)   VecDestroy(mdn->lvec);
410   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
411   PETSCFREE(mdn);
412   PLogObjectDestroy(mat);
413   PETSCHEADERDESTROY(mat);
414   return 0;
415 }
416 
417 #include "pinclude/pviewer.h"
418 
419 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
420 {
421   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
422   int          ierr;
423   if (mdn->size == 1) {
424     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
425   }
426   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
427   return 0;
428 }
429 
430 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
431 {
432   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
433   int          ierr, format;
434   PetscObject  vobj = (PetscObject) viewer;
435   FILE         *fd;
436 
437   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
438   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
439     ierr = ViewerFileGetFormat_Private(viewer,&format);
440     if (format == FILE_FORMAT_INFO_DETAILED) {
441       int nz, nzalloc, mem, rank;
442       MPI_Comm_rank(mat->comm,&rank);
443       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
444       MPIU_Seq_begin(mat->comm,1);
445         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
446             rank,mdn->m,nz,nzalloc,mem);
447       fflush(fd);
448       MPIU_Seq_end(mat->comm,1);
449       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
450       return 0;
451     }
452     else if (format == FILE_FORMAT_INFO) {
453       return 0;
454     }
455   }
456   if (vobj->type == ASCII_FILE_VIEWER) {
457     MPIU_Seq_begin(mat->comm,1);
458     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
459              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
460     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
461     fflush(fd);
462     MPIU_Seq_end(mat->comm,1);
463   }
464   else {
465     int size = mdn->size, rank = mdn->rank;
466     if (size == 1) {
467       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
468     }
469     else {
470       /* assemble the entire matrix onto first processor. */
471       Mat          A;
472       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
473       Scalar       *vals;
474       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
475 
476       if (!rank) {
477         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,&A); CHKERRQ(ierr);
478       }
479       else {
480         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,&A); CHKERRQ(ierr);
481       }
482       PLogObjectParent(mat,A);
483 
484       /* Copy the matrix ... This isn't the most efficient means,
485          but it's quick for now */
486       row = mdn->rstart; m = Amdn->m;
487       for ( i=0; i<m; i++ ) {
488         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
489         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
490         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
491         row++;
492       }
493 
494       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
495       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
496       if (!rank) {
497         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
498       }
499       ierr = MatDestroy(A); CHKERRQ(ierr);
500     }
501   }
502   return 0;
503 }
504 
505 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
506 {
507   Mat          mat = (Mat) obj;
508   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
509   PetscObject  vobj = (PetscObject) viewer;
510   int          ierr;
511 
512   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
513   if (!viewer) {
514     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
515   }
516   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
517     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
518   }
519   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
520     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
521   }
522   else if (vobj->type == BINARY_FILE_VIEWER) {
523     return MatView_MPIDense_Binary(mat,viewer);
524   }
525   return 0;
526 }
527 
528 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
529                              int *nzalloc,int *mem)
530 {
531   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
532   Mat          mdn = mat->A;
533   int          ierr, isend[3], irecv[3];
534 
535   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
536   if (flag == MAT_LOCAL) {
537     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
538   } else if (flag == MAT_GLOBAL_MAX) {
539     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
540     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
541   } else if (flag == MAT_GLOBAL_SUM) {
542     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
543     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
544   }
545   return 0;
546 }
547 
548 static int MatSetOption_MPIDense(Mat A,MatOption op)
549 {
550   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
551 
552   if (op == NO_NEW_NONZERO_LOCATIONS ||
553       op == YES_NEW_NONZERO_LOCATIONS ||
554       op == COLUMNS_SORTED ||
555       op == ROW_ORIENTED) {
556         MatSetOption(a->A,op);
557   }
558   else if (op == ROWS_SORTED ||
559            op == SYMMETRIC_MATRIX ||
560            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
561            op == YES_NEW_DIAGONALS)
562     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
563   else if (op == COLUMN_ORIENTED)
564     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:COLUMN_ORIENTED");}
565   else if (op == NO_NEW_DIAGONALS)
566     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
567   else
568     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
569   return 0;
570 }
571 
572 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
573 {
574   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
575   *m = mat->M; *n = mat->N;
576   return 0;
577 }
578 
579 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
580 {
581   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
582   *m = mat->m; *n = mat->N;
583   return 0;
584 }
585 
586 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
587 {
588   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
589   *m = mat->rstart; *n = mat->rend;
590   return 0;
591 }
592 
593 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
594 {
595   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
596   int          lrow, rstart = mat->rstart, rend = mat->rend;
597 
598   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
599   lrow = row - rstart;
600   return MatGetRow(mat->A,lrow,nz,idx,v);
601 }
602 
603 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
604 {
605   if (idx) PETSCFREE(*idx);
606   if (v) PETSCFREE(*v);
607   return 0;
608 }
609 
610 static int MatNorm_MPIDense(Mat A,MatNormType type,double *norm)
611 {
612   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
613   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
614   int          ierr, i, j;
615   double       sum = 0.0;
616   Scalar       *v = mat->v;
617 
618   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
619   if (mdn->size == 1) {
620     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
621   } else {
622     if (type == NORM_FROBENIUS) {
623       for (i=0; i<mat->n*mat->m; i++ ) {
624 #if defined(PETSC_COMPLEX)
625         sum += real(conj(*v)*(*v)); v++;
626 #else
627         sum += (*v)*(*v); v++;
628 #endif
629       }
630       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
631       *norm = sqrt(*norm);
632       PLogFlops(2*mat->n*mat->m);
633     }
634     else if (type == NORM_1) {
635       double *tmp, *tmp2;
636       tmp  = (double *) PETSCMALLOC( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
637       tmp2 = tmp + mdn->N;
638       PetscZero(tmp,2*mdn->N*sizeof(double));
639       *norm = 0.0;
640       v = mat->v;
641       for ( j=0; j<mat->n; j++ ) {
642         for ( i=0; i<mat->m; i++ ) {
643 #if defined(PETSC_COMPLEX)
644           tmp[j] += abs(*v++);
645 #else
646           tmp[j] += fabs(*v++);
647 #endif
648         }
649       }
650       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
651       for ( j=0; j<mdn->N; j++ ) {
652         if (tmp2[j] > *norm) *norm = tmp2[j];
653       }
654       PETSCFREE(tmp);
655       PLogFlops(mat->n*mat->m);
656     }
657     else if (type == NORM_INFINITY) { /* max row norm */
658       double ntemp;
659       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
660       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
661     }
662     else {
663       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
664     }
665   }
666   return 0;
667 }
668 
669 static int MatTranspose_MPIDense(Mat A,Mat *matout)
670 {
671   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
672   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
673   Mat          B;
674   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
675   int          j, i, ierr;
676   Scalar       *v;
677 
678   if (!matout && M != N)
679     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
680   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,&B); CHKERRQ(ierr);
681 
682   m = Aloc->m; n = Aloc->n; v = Aloc->v;
683   rwork = (int *) PETSCMALLOC(n*sizeof(int)); CHKPTRQ(rwork);
684   for ( j=0; j<n; j++ ) {
685     for (i=0; i<m; i++) rwork[i] = rstart + i;
686     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
687     v += m;
688   }
689   PETSCFREE(rwork);
690   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
691   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
692   if (matout) {
693     *matout = B;
694   } else {
695     /* This isn't really an in-place transpose, but free data struct from a */
696     PETSCFREE(a->rowners);
697     ierr = MatDestroy(a->A); CHKERRQ(ierr);
698     if (a->lvec) VecDestroy(a->lvec);
699     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
700     PETSCFREE(a);
701     PetscMemcpy(A,B,sizeof(struct _Mat));
702     PETSCHEADERDESTROY(B);
703   }
704   return 0;
705 }
706 
707 static int MatCopyPrivate_MPIDense(Mat,Mat *,int);
708 
709 /* -------------------------------------------------------------------*/
710 static struct _MatOps MatOps = {MatSetValues_MPIDense,
711        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
712        MatMult_MPIDense,MatMultAdd_MPIDense,
713        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
714        0,0,
715        0,0,0,
716        0,0,MatTranspose_MPIDense,
717        MatGetInfo_MPIDense,0,
718        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
719        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
720        0,
721        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
722        0,
723        0,0,0,0,
724        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
725        MatGetOwnershipRange_MPIDense,
726        0,0,
727        0,0,0,0,0,MatCopyPrivate_MPIDense};
728 
729 /*@C
730    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
731 
732    Input Parameters:
733 .  comm - MPI communicator
734 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
735 .  n - number of local columns (or PETSC_DECIDE to have calculated
736            if N is given)
737 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
738 .  N - number of global columns (or PETSC_DECIDE to have calculated
739            if n is given)
740 
741    Output Parameter:
742 .  newmat - the matrix
743 
744    Notes:
745    The dense format is fully compatible with standard Fortran 77
746    storage by columns.
747 
748    The user MUST specify either the local or global matrix dimensions
749    (possibly both).
750 
751    Currently, the only parallel dense matrix decomposition is by rows,
752    so that n=N and each submatrix owns all of the global columns.
753 
754 .keywords: matrix, dense, parallel
755 
756 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
757 @*/
758 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Mat *newmat)
759 {
760   Mat          mat;
761   Mat_MPIDense *a;
762   int          ierr, i;
763 
764   *newmat         = 0;
765   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
766   PLogObjectCreate(mat);
767   mat->data       = (void *) (a = PETSCNEW(Mat_MPIDense)); CHKPTRQ(a);
768   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
769   mat->destroy    = MatDestroy_MPIDense;
770   mat->view       = MatView_MPIDense;
771   mat->factor     = 0;
772 
773   a->insertmode = NOT_SET_VALUES;
774   MPI_Comm_rank(comm,&a->rank);
775   MPI_Comm_size(comm,&a->size);
776 
777   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
778   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
779 
780   /* each row stores all columns */
781   if (N == PETSC_DECIDE) N = n;
782   if (n == PETSC_DECIDE) n = N;
783   if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported");
784   a->N = N;
785   a->M = M;
786   a->m = m;
787   a->n = n;
788 
789   /* build local table of row and column ownerships */
790   a->rowners = (int *) PETSCMALLOC((a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
791   PLogObjectMemory(mat,(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
792                        sizeof(Mat_MPIDense));
793   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
794   a->rowners[0] = 0;
795   for ( i=2; i<=a->size; i++ ) {
796     a->rowners[i] += a->rowners[i-1];
797   }
798   a->rstart = a->rowners[a->rank];
799   a->rend   = a->rowners[a->rank+1];
800 
801   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,&a->A); CHKERRQ(ierr);
802   PLogObjectParent(mat,a->A);
803 
804   /* build cache for off array entries formed */
805   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
806 
807   /* stuff used for matrix vector multiply */
808   a->lvec      = 0;
809   a->Mvctx     = 0;
810   a->assembled = 0;
811 
812   *newmat = mat;
813   return 0;
814 }
815 
816 static int MatCopyPrivate_MPIDense(Mat A,Mat *newmat,int cpvalues)
817 {
818   Mat          mat;
819   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
820   int          ierr;
821 
822   if (!oldmat->assembled) SETERRQ(1,"MatCopyPrivate_MPIDense:Must assemble matrix");
823   *newmat       = 0;
824   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
825   PLogObjectCreate(mat);
826   mat->data     = (void *) (a = PETSCNEW(Mat_MPIDense)); CHKPTRQ(a);
827   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
828   mat->destroy  = MatDestroy_MPIDense;
829   mat->view     = MatView_MPIDense;
830   mat->factor   = A->factor;
831 
832   a->m          = oldmat->m;
833   a->n          = oldmat->n;
834   a->M          = oldmat->M;
835   a->N          = oldmat->N;
836 
837   a->assembled  = 1;
838   a->rstart     = oldmat->rstart;
839   a->rend       = oldmat->rend;
840   a->size       = oldmat->size;
841   a->rank       = oldmat->rank;
842   a->insertmode = NOT_SET_VALUES;
843 
844   a->rowners = (int *) PETSCMALLOC((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
845   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
846   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
847   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
848 
849   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
850   PLogObjectParent(mat,a->lvec);
851   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
852   PLogObjectParent(mat,a->Mvctx);
853   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
854   PLogObjectParent(mat,a->A);
855   *newmat = mat;
856   return 0;
857 }
858 
859 #include "sysio.h"
860 
861 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
862 {
863   Mat          A;
864   int          i, nz, ierr, j,rstart, rend, fd;
865   Scalar       *vals,*svals;
866   PetscObject  vobj = (PetscObject) bview;
867   MPI_Comm     comm = vobj->comm;
868   MPI_Status   status;
869   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
870   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
871   int          tag = ((PetscObject)bview)->tag;
872 
873   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
874   if (!rank) {
875     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
876     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
877     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
878   }
879 
880   MPI_Bcast(header+1,3,MPI_INT,0,comm);
881   M = header[1]; N = header[2];
882   /* determine ownership of all rows */
883   m = M/size + ((M % size) > rank);
884   rowners = (int *) PETSCMALLOC((size+2)*sizeof(int)); CHKPTRQ(rowners);
885   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
886   rowners[0] = 0;
887   for ( i=2; i<=size; i++ ) {
888     rowners[i] += rowners[i-1];
889   }
890   rstart = rowners[rank];
891   rend   = rowners[rank+1];
892 
893   /* distribute row lengths to all processors */
894   ourlens = (int*) PETSCMALLOC( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
895   offlens = ourlens + (rend-rstart);
896   if (!rank) {
897     rowlengths = (int*) PETSCMALLOC( M*sizeof(int) ); CHKPTRQ(rowlengths);
898     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
899     sndcounts = (int*) PETSCMALLOC( size*sizeof(int) ); CHKPTRQ(sndcounts);
900     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
901     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
902     PETSCFREE(sndcounts);
903   }
904   else {
905     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
906   }
907 
908   if (!rank) {
909     /* calculate the number of nonzeros on each processor */
910     procsnz = (int*) PETSCMALLOC( size*sizeof(int) ); CHKPTRQ(procsnz);
911     PetscZero(procsnz,size*sizeof(int));
912     for ( i=0; i<size; i++ ) {
913       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
914         procsnz[i] += rowlengths[j];
915       }
916     }
917     PETSCFREE(rowlengths);
918 
919     /* determine max buffer needed and allocate it */
920     maxnz = 0;
921     for ( i=0; i<size; i++ ) {
922       maxnz = PETSCMAX(maxnz,procsnz[i]);
923     }
924     cols = (int *) PETSCMALLOC( maxnz*sizeof(int) ); CHKPTRQ(cols);
925 
926     /* read in my part of the matrix column indices  */
927     nz = procsnz[0];
928     mycols = (int *) PETSCMALLOC( nz*sizeof(int) ); CHKPTRQ(mycols);
929     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
930 
931     /* read in every one elses and ship off */
932     for ( i=1; i<size; i++ ) {
933       nz = procsnz[i];
934       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
935       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
936     }
937     PETSCFREE(cols);
938   }
939   else {
940     /* determine buffer space needed for message */
941     nz = 0;
942     for ( i=0; i<m; i++ ) {
943       nz += ourlens[i];
944     }
945     mycols = (int*) PETSCMALLOC( nz*sizeof(int) ); CHKPTRQ(mycols);
946 
947     /* receive message of column indices*/
948     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
949     MPI_Get_count(&status,MPI_INT,&maxnz);
950     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
951   }
952 
953   /* loop over local rows, determining number of off diagonal entries */
954   PetscZero(offlens,m*sizeof(int));
955   jj = 0;
956   for ( i=0; i<m; i++ ) {
957     for ( j=0; j<ourlens[i]; j++ ) {
958       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
959       jj++;
960     }
961   }
962 
963   /* create our matrix */
964   for ( i=0; i<m; i++ ) {
965     ourlens[i] -= offlens[i];
966   }
967   if (type == MATMPIDENSE) {
968     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,newmat);CHKERRQ(ierr);
969   }
970   A = *newmat;
971   MatSetOption(A,COLUMNS_SORTED);
972   for ( i=0; i<m; i++ ) {
973     ourlens[i] += offlens[i];
974   }
975 
976   if (!rank) {
977     vals = (Scalar *) PETSCMALLOC( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
978 
979     /* read in my part of the matrix numerical values  */
980     nz = procsnz[0];
981     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
982 
983     /* insert into matrix */
984     jj      = rstart;
985     smycols = mycols;
986     svals   = vals;
987     for ( i=0; i<m; i++ ) {
988       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
989       smycols += ourlens[i];
990       svals   += ourlens[i];
991       jj++;
992     }
993 
994     /* read in other processors and ship out */
995     for ( i=1; i<size; i++ ) {
996       nz = procsnz[i];
997       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
998       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
999     }
1000     PETSCFREE(procsnz);
1001   }
1002   else {
1003     /* receive numeric values */
1004     vals = (Scalar*) PETSCMALLOC( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1005 
1006     /* receive message of values*/
1007     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1008     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1009     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1010 
1011     /* insert into matrix */
1012     jj      = rstart;
1013     smycols = mycols;
1014     svals   = vals;
1015     for ( i=0; i<m; i++ ) {
1016       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1017       smycols += ourlens[i];
1018       svals   += ourlens[i];
1019       jj++;
1020     }
1021   }
1022   PETSCFREE(ourlens); PETSCFREE(vals); PETSCFREE(mycols); PETSCFREE(rowners);
1023 
1024   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1025   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1026   return 0;
1027 }
1028