xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision b4fd42875b350b60e141cbdf895ae7195e067e9f)
1 #ifndef lint
2 static char vcid[] = "$Id: mpidense.c,v 1.16 1995/12/12 22:54:32 curfman Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "mpidense.h"
10 #include "vec/vecimpl.h"
11 #include "inline/spops.h"
12 
13 static int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,
14                                int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
18 
19   if (mdn->insertmode != NOT_SET_VALUES && mdn->insertmode != addv) {
20     SETERRQ(1,"MatSetValues_MPIDense:Cannot mix inserts and adds");
21   }
22   mdn->insertmode = addv;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative row");
25     if (idxm[i] >= mdn->M) SETERRQ(1,"MatSetValues_MPIDense:Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       for ( j=0; j<n; j++ ) {
29         if (idxn[j] < 0) SETERRQ(1,"MatSetValues_MPIDense:Negative column");
30         if (idxn[j] >= mdn->N) SETERRQ(1,"MatSetValues_MPIDense:Column too large");
31         ierr = MatSetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j,addv); CHKERRQ(ierr);
32       }
33     }
34     else {
35       ierr = StashValues_Private(&mdn->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
36     }
37   }
38   return 0;
39 }
40 
41 static int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
42 {
43   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
44   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
45 
46   if (!mdn->assembled) SETERRQ(1,"MatGetValues_MPIDense:Not for unassembled matrix");
47   for ( i=0; i<m; i++ ) {
48     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative row");
49     if (idxm[i] >= mdn->M) SETERRQ(1,"MatGetValues_MPIDense:Row too large");
50     if (idxm[i] >= rstart && idxm[i] < rend) {
51       row = idxm[i] - rstart;
52       for ( j=0; j<n; j++ ) {
53         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIDense:Negative column");
54         if (idxn[j] >= mdn->N)
55           SETERRQ(1,"MatGetValues_MPIDense:Column too large");
56         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
57       }
58     }
59     else {
60       SETERRQ(1,"MatGetValues_MPIDense:Only local values currently supported");
61     }
62   }
63   return 0;
64 }
65 
66 static int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
67 {
68   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
69   MPI_Comm     comm = mat->comm;
70   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
71   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
72   int          tag = mat->tag, *owner,*starts,count,ierr;
73   InsertMode   addv;
74   MPI_Request  *send_waits,*recv_waits;
75   Scalar       *rvalues,*svalues;
76 
77   /* make sure all processors are either in INSERTMODE or ADDMODE */
78   MPI_Allreduce((void *) &mdn->insertmode,(void *) &addv,1,MPI_INT,
79                 MPI_BOR,comm);
80   if (addv == (ADD_VALUES|INSERT_VALUES)) { SETERRQ(1,
81     "MatAssemblyBegin_MPIDense:Cannot mix adds/inserts on different procs");
82     }
83   mdn->insertmode = addv; /* in case this processor had no cache */
84 
85   /*  first count number of contributors to each processor */
86   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
87   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
88   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
89   for ( i=0; i<mdn->stash.n; i++ ) {
90     idx = mdn->stash.idx[i];
91     for ( j=0; j<size; j++ ) {
92       if (idx >= owners[j] && idx < owners[j+1]) {
93         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
94       }
95     }
96   }
97   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
98 
99   /* inform other processors of number of messages and max length*/
100   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
101   MPI_Allreduce((void *) procs,(void *) work,size,MPI_INT,MPI_SUM,comm);
102   nreceives = work[rank];
103   MPI_Allreduce((void *) nprocs,(void *) work,size,MPI_INT,MPI_MAX,comm);
104   nmax = work[rank];
105   PetscFree(work);
106 
107   /* post receives:
108        1) each message will consist of ordered pairs
109      (global index,value) we store the global index as a double
110      to simplify the message passing.
111        2) since we don't know how long each individual message is we
112      allocate the largest needed buffer for each receive. Potentially
113      this is a lot of wasted space.
114 
115        This could be done better.
116   */
117   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
118   CHKPTRQ(rvalues);
119   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
120   CHKPTRQ(recv_waits);
121   for ( i=0; i<nreceives; i++ ) {
122     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
123               comm,recv_waits+i);
124   }
125 
126   /* do sends:
127       1) starts[i] gives the starting index in svalues for stuff going to
128          the ith processor
129   */
130   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar) );
131   CHKPTRQ(svalues);
132   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
133   CHKPTRQ(send_waits);
134   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
135   starts[0] = 0;
136   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
137   for ( i=0; i<mdn->stash.n; i++ ) {
138     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
139     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
140     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
141   }
142   PetscFree(owner);
143   starts[0] = 0;
144   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
145   count = 0;
146   for ( i=0; i<size; i++ ) {
147     if (procs[i]) {
148       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPIU_SCALAR,i,tag,
149                 comm,send_waits+count++);
150     }
151   }
152   PetscFree(starts); PetscFree(nprocs);
153 
154   /* Free cache space */
155   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
156 
157   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
158   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
159   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
160   mdn->rmax       = nmax;
161 
162   return 0;
163 }
164 extern int MatSetUpMultiply_MPIDense(Mat);
165 
166 static int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
167 {
168   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
169   MPI_Status   *send_status,recv_status;
170   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
171   Scalar       *values,val;
172   InsertMode   addv = mdn->insertmode;
173 
174   /*  wait on receives */
175   while (count) {
176     MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);
177     /* unpack receives into our local space */
178     values = mdn->rvalues + 3*imdex*mdn->rmax;
179     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
180     n = n/3;
181     for ( i=0; i<n; i++ ) {
182       row = (int) PETSCREAL(values[3*i]) - mdn->rstart;
183       col = (int) PETSCREAL(values[3*i+1]);
184       val = values[3*i+2];
185       if (col >= 0 && col < mdn->N) {
186         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
187       }
188       else {SETERRQ(1,"MatAssemblyEnd_MPIDense:Invalid column");}
189     }
190     count--;
191   }
192   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
193 
194   /* wait on sends */
195   if (mdn->nsends) {
196     send_status = (MPI_Status *) PetscMalloc( mdn->nsends*sizeof(MPI_Status) );
197     CHKPTRQ(send_status);
198     MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);
199     PetscFree(send_status);
200   }
201   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
202 
203   mdn->insertmode = NOT_SET_VALUES;
204   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
205   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
206 
207   if (!mdn->assembled && mode == FINAL_ASSEMBLY) {
208     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
209   }
210   mdn->assembled = 1;
211   return 0;
212 }
213 
214 static int MatZeroEntries_MPIDense(Mat A)
215 {
216   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
217   return MatZeroEntries(l->A);
218 }
219 
220 /* the code does not do the diagonal entries correctly unless the
221    matrix is square and the column and row owerships are identical.
222    This is a BUG. The only way to fix it seems to be to access
223    mdn->A and mdn->B directly and not through the MatZeroRows()
224    routine.
225 */
226 static int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
227 {
228   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
229   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
230   int            *procs,*nprocs,j,found,idx,nsends,*work;
231   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
232   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
233   int            *lens,imdex,*lrows,*values;
234   MPI_Comm       comm = A->comm;
235   MPI_Request    *send_waits,*recv_waits;
236   MPI_Status     recv_status,*send_status;
237   IS             istmp;
238 
239   if (!l->assembled) SETERRQ(1,"MatZeroRows_MPIDense:Must assemble matrix");
240   ierr = ISGetLocalSize(is,&N); CHKERRQ(ierr);
241   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
242 
243   /*  first count number of contributors to each processor */
244   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
245   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
246   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
247   for ( i=0; i<N; i++ ) {
248     idx = rows[i];
249     found = 0;
250     for ( j=0; j<size; j++ ) {
251       if (idx >= owners[j] && idx < owners[j+1]) {
252         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
253       }
254     }
255     if (!found) SETERRQ(1,"MatZeroRows_MPIDense:Index out of range");
256   }
257   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
258 
259   /* inform other processors of number of messages and max length*/
260   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
261   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
262   nrecvs = work[rank];
263   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
264   nmax = work[rank];
265   PetscFree(work);
266 
267   /* post receives:   */
268   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
269   CHKPTRQ(rvalues);
270   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
271   CHKPTRQ(recv_waits);
272   for ( i=0; i<nrecvs; i++ ) {
273     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
274   }
275 
276   /* do sends:
277       1) starts[i] gives the starting index in svalues for stuff going to
278          the ith processor
279   */
280   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
281   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
282   CHKPTRQ(send_waits);
283   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
284   starts[0] = 0;
285   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
286   for ( i=0; i<N; i++ ) {
287     svalues[starts[owner[i]]++] = rows[i];
288   }
289   ISRestoreIndices(is,&rows);
290 
291   starts[0] = 0;
292   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
293   count = 0;
294   for ( i=0; i<size; i++ ) {
295     if (procs[i]) {
296       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
297     }
298   }
299   PetscFree(starts);
300 
301   base = owners[rank];
302 
303   /*  wait on receives */
304   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
305   source = lens + nrecvs;
306   count  = nrecvs; slen = 0;
307   while (count) {
308     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
309     /* unpack receives into our local space */
310     MPI_Get_count(&recv_status,MPI_INT,&n);
311     source[imdex]  = recv_status.MPI_SOURCE;
312     lens[imdex]  = n;
313     slen += n;
314     count--;
315   }
316   PetscFree(recv_waits);
317 
318   /* move the data into the send scatter */
319   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
320   count = 0;
321   for ( i=0; i<nrecvs; i++ ) {
322     values = rvalues + i*nmax;
323     for ( j=0; j<lens[i]; j++ ) {
324       lrows[count++] = values[j] - base;
325     }
326   }
327   PetscFree(rvalues); PetscFree(lens);
328   PetscFree(owner); PetscFree(nprocs);
329 
330   /* actually zap the local rows */
331   ierr = ISCreateSeq(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
332   PLogObjectParent(A,istmp);
333   PetscFree(lrows);
334   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
335   ierr = ISDestroy(istmp); CHKERRQ(ierr);
336 
337   /* wait on sends */
338   if (nsends) {
339     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
340     CHKPTRQ(send_status);
341     MPI_Waitall(nsends,send_waits,send_status);
342     PetscFree(send_status);
343   }
344   PetscFree(send_waits); PetscFree(svalues);
345 
346   return 0;
347 }
348 
349 static int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
350 {
351   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
352   int          ierr;
353   if (!mdn->assembled)
354     SETERRQ(1,"MatMult_MPIDense:Must assemble matrix first");
355   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
356   CHKERRQ(ierr);
357   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
358   CHKERRQ(ierr);
359   ierr = MatMult(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
360   return 0;
361 }
362 
363 static int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
364 {
365   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
366   int          ierr;
367   if (!mdn->assembled)
368     SETERRQ(1,"MatMultAdd_MPIDense:Must assemble matrix first");
369   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
370   CHKERRQ(ierr);
371   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_ALL,mdn->Mvctx);
372   CHKERRQ(ierr);
373   ierr = MatMultAdd(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
374   return 0;
375 }
376 
377 static int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
378 {
379   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
380   int          ierr;
381   Scalar       zero = 0.0;
382 
383   if (!a->assembled) SETERRQ(1,"MatMulTrans_MPIDense:must assemble matrix");
384   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
385   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
386   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,
387          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
388   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,
389          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
390   return 0;
391 }
392 
393 static int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
394 {
395   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
396   int          ierr;
397 
398   if (!a->assembled) SETERRQ(1,"MatMulTransAdd_MPIDense:must assemble matrix");
399   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
400   ierr = MatMultTrans(a->A,xx,a->lvec); CHKERRQ(ierr);
401   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,
402          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
403   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,
404          (ScatterMode)(SCATTER_ALL|SCATTER_REVERSE),a->Mvctx); CHKERRQ(ierr);
405   return 0;
406 }
407 
408 static int MatGetDiagonal_MPIDense(Mat A,Vec v)
409 {
410   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
411   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
412   int          ierr, i, n, m = a->m, radd;
413   Scalar       *x;
414 
415   if (!a->assembled) SETERRQ(1,"MatGetDiag_MPIDense:must assemble matrix");
416   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
417   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
418   if (n != a->M) SETERRQ(1,"MatGetDiagonal_SeqDense:Nonconforming mat and vec");
419   radd = a->rstart*m*m;
420   for ( i=0; i<m; i++ ) {
421     x[i] = aloc->v[radd + i*m + i];
422   }
423   return 0;
424 }
425 
426 static int MatDestroy_MPIDense(PetscObject obj)
427 {
428   Mat          mat = (Mat) obj;
429   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
430   int          ierr;
431 
432 #if defined(PETSC_LOG)
433   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
434 #endif
435   PetscFree(mdn->rowners);
436   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
437   if (mdn->lvec)   VecDestroy(mdn->lvec);
438   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
439   if (mdn->factor) PetscFree(mdn->factor);
440   PetscFree(mdn);
441   PLogObjectDestroy(mat);
442   PetscHeaderDestroy(mat);
443   return 0;
444 }
445 
446 #include "pinclude/pviewer.h"
447 
448 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
449 {
450   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
451   int          ierr;
452   if (mdn->size == 1) {
453     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
454   }
455   else SETERRQ(1,"MatView_MPIDense_Binary:Only uniprocessor output supported");
456   return 0;
457 }
458 
459 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
460 {
461   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
462   int          ierr, format;
463   PetscObject  vobj = (PetscObject) viewer;
464   FILE         *fd;
465 
466   ierr = ViewerFileGetPointer_Private(viewer,&fd); CHKERRQ(ierr);
467   if (vobj->type == ASCII_FILE_VIEWER || vobj->type == ASCII_FILES_VIEWER) {
468     ierr = ViewerFileGetFormat_Private(viewer,&format);
469     if (format == FILE_FORMAT_INFO_DETAILED) {
470       int nz, nzalloc, mem, rank;
471       MPI_Comm_rank(mat->comm,&rank);
472       ierr = MatGetInfo(mat,MAT_LOCAL,&nz,&nzalloc,&mem);
473       MPIU_Seq_begin(mat->comm,1);
474         fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",
475             rank,mdn->m,nz,nzalloc,mem);
476       fflush(fd);
477       MPIU_Seq_end(mat->comm,1);
478       ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
479       return 0;
480     }
481     else if (format == FILE_FORMAT_INFO) {
482       return 0;
483     }
484   }
485   if (vobj->type == ASCII_FILE_VIEWER) {
486     MPIU_Seq_begin(mat->comm,1);
487     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
488              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
489     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
490     fflush(fd);
491     MPIU_Seq_end(mat->comm,1);
492   }
493   else {
494     int size = mdn->size, rank = mdn->rank;
495     if (size == 1) {
496       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
497     }
498     else {
499       /* assemble the entire matrix onto first processor. */
500       Mat          A;
501       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
502       Scalar       *vals;
503       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
504 
505       if (!rank) {
506         ierr = MatCreateMPIDense(mat->comm,M,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
507       }
508       else {
509         ierr = MatCreateMPIDense(mat->comm,0,M,N,N,PETSC_NULL,&A); CHKERRQ(ierr);
510       }
511       PLogObjectParent(mat,A);
512 
513       /* Copy the matrix ... This isn't the most efficient means,
514          but it's quick for now */
515       row = mdn->rstart; m = Amdn->m;
516       for ( i=0; i<m; i++ ) {
517         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
518         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
519         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
520         row++;
521       }
522 
523       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
524       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
525       if (!rank) {
526         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
527       }
528       ierr = MatDestroy(A); CHKERRQ(ierr);
529     }
530   }
531   return 0;
532 }
533 
534 static int MatView_MPIDense(PetscObject obj,Viewer viewer)
535 {
536   Mat          mat = (Mat) obj;
537   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
538   PetscObject  vobj = (PetscObject) viewer;
539   int          ierr;
540 
541   if (!mdn->assembled) SETERRQ(1,"MatView_MPIDense:must assemble matrix");
542   if (!viewer) {
543     viewer = STDOUT_VIEWER_SELF; vobj = (PetscObject) viewer;
544   }
545   if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILE_VIEWER) {
546     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
547   }
548   else if (vobj->cookie == VIEWER_COOKIE && vobj->type == ASCII_FILES_VIEWER) {
549     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
550   }
551   else if (vobj->type == BINARY_FILE_VIEWER) {
552     return MatView_MPIDense_Binary(mat,viewer);
553   }
554   return 0;
555 }
556 
557 static int MatGetInfo_MPIDense(Mat A,MatInfoType flag,int *nz,
558                              int *nzalloc,int *mem)
559 {
560   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
561   Mat          mdn = mat->A;
562   int          ierr, isend[3], irecv[3];
563 
564   ierr = MatGetInfo(mdn,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERRQ(ierr);
565   if (flag == MAT_LOCAL) {
566     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
567   } else if (flag == MAT_GLOBAL_MAX) {
568     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,A->comm);
569     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
570   } else if (flag == MAT_GLOBAL_SUM) {
571     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,A->comm);
572     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
573   }
574   return 0;
575 }
576 
577 static int MatSetOption_MPIDense(Mat A,MatOption op)
578 {
579   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
580 
581   if (op == NO_NEW_NONZERO_LOCATIONS ||
582       op == YES_NEW_NONZERO_LOCATIONS ||
583       op == COLUMNS_SORTED ||
584       op == ROW_ORIENTED) {
585         MatSetOption(a->A,op);
586   }
587   else if (op == ROWS_SORTED ||
588            op == SYMMETRIC_MATRIX ||
589            op == STRUCTURALLY_SYMMETRIC_MATRIX ||
590            op == YES_NEW_DIAGONALS)
591     PLogInfo((PetscObject)A,"Info:MatSetOption_MPIDense:Option ignored\n");
592   else if (op == COLUMN_ORIENTED)
593     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:COLUMN_ORIENTED");}
594   else if (op == NO_NEW_DIAGONALS)
595     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:NO_NEW_DIAGONALS");}
596   else
597     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIDense:unknown option");}
598   return 0;
599 }
600 
601 static int MatGetSize_MPIDense(Mat A,int *m,int *n)
602 {
603   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
604   *m = mat->M; *n = mat->N;
605   return 0;
606 }
607 
608 static int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
609 {
610   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
611   *m = mat->m; *n = mat->N;
612   return 0;
613 }
614 
615 static int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
616 {
617   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
618   *m = mat->rstart; *n = mat->rend;
619   return 0;
620 }
621 
622 static int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
623 {
624   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
625   int          lrow, rstart = mat->rstart, rend = mat->rend;
626 
627   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIDense:only local rows")
628   lrow = row - rstart;
629   return MatGetRow(mat->A,lrow,nz,idx,v);
630 }
631 
632 static int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
633 {
634   if (idx) PetscFree(*idx);
635   if (v) PetscFree(*v);
636   return 0;
637 }
638 
639 static int MatNorm_MPIDense(Mat A,NormType type,double *norm)
640 {
641   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
642   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
643   int          ierr, i, j;
644   double       sum = 0.0;
645   Scalar       *v = mat->v;
646 
647   if (!mdn->assembled) SETERRQ(1,"MatNorm_MPIDense:Must assemble matrix");
648   if (mdn->size == 1) {
649     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
650   } else {
651     if (type == NORM_FROBENIUS) {
652       for (i=0; i<mat->n*mat->m; i++ ) {
653 #if defined(PETSC_COMPLEX)
654         sum += real(conj(*v)*(*v)); v++;
655 #else
656         sum += (*v)*(*v); v++;
657 #endif
658       }
659       MPI_Allreduce((void*)&sum,(void*)norm,1,MPI_DOUBLE,MPI_SUM,A->comm);
660       *norm = sqrt(*norm);
661       PLogFlops(2*mat->n*mat->m);
662     }
663     else if (type == NORM_1) {
664       double *tmp, *tmp2;
665       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
666       tmp2 = tmp + mdn->N;
667       PetscMemzero(tmp,2*mdn->N*sizeof(double));
668       *norm = 0.0;
669       v = mat->v;
670       for ( j=0; j<mat->n; j++ ) {
671         for ( i=0; i<mat->m; i++ ) {
672           tmp[j] += PetscAbsScalar(*v);  v++;
673         }
674       }
675       MPI_Allreduce((void*)tmp,(void*)tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);
676       for ( j=0; j<mdn->N; j++ ) {
677         if (tmp2[j] > *norm) *norm = tmp2[j];
678       }
679       PetscFree(tmp);
680       PLogFlops(mat->n*mat->m);
681     }
682     else if (type == NORM_INFINITY) { /* max row norm */
683       double ntemp;
684       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
685       MPI_Allreduce((void*)&ntemp,(void*)norm,1,MPI_DOUBLE,MPI_MAX,A->comm);
686     }
687     else {
688       SETERRQ(1,"MatNorm_MPIDense:No support for two norm");
689     }
690   }
691   return 0;
692 }
693 
694 static int MatTranspose_MPIDense(Mat A,Mat *matout)
695 {
696   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
697   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
698   Mat          B;
699   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
700   int          j, i, ierr;
701   Scalar       *v;
702 
703   if (!matout && M != N)
704     SETERRQ(1,"MatTranspose_MPIDense:Supports square matrix only in-place");
705   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);
706          CHKERRQ(ierr);
707 
708   m = Aloc->m; n = Aloc->n; v = Aloc->v;
709   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
710   for ( j=0; j<n; j++ ) {
711     for (i=0; i<m; i++) rwork[i] = rstart + i;
712     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
713     v += m;
714   }
715   PetscFree(rwork);
716   ierr = MatAssemblyBegin(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
717   ierr = MatAssemblyEnd(B,FINAL_ASSEMBLY); CHKERRQ(ierr);
718   if (matout) {
719     *matout = B;
720   } else {
721     /* This isn't really an in-place transpose, but free data struct from a */
722     PetscFree(a->rowners);
723     ierr = MatDestroy(a->A); CHKERRQ(ierr);
724     if (a->lvec) VecDestroy(a->lvec);
725     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
726     PetscFree(a);
727     PetscMemcpy(A,B,sizeof(struct _Mat));
728     PetscHeaderDestroy(B);
729   }
730   return 0;
731 }
732 
733 static int MatCopyPrivate_MPIDense(Mat,Mat *,int);
734 
735 /* -------------------------------------------------------------------*/
736 static struct _MatOps MatOps = {MatSetValues_MPIDense,
737        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
738        MatMult_MPIDense,MatMultAdd_MPIDense,
739        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
740        0,0,
741        0,0,0,
742        0,0,MatTranspose_MPIDense,
743        MatGetInfo_MPIDense,0,
744        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
745        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
746        0,
747        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
748        0,
749        0,0,0,0,
750        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
751        MatGetOwnershipRange_MPIDense,
752        0,0,
753        0,0,0,0,0,MatCopyPrivate_MPIDense,
754        0,0,0,0,0,
755        0,0,MatGetValues_MPIDense};
756 
757 /*@C
758    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
759 
760    Input Parameters:
761 .  comm - MPI communicator
762 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
763 .  n - number of local columns (or PETSC_DECIDE to have calculated
764            if N is given)
765 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
766 .  N - number of global columns (or PETSC_DECIDE to have calculated
767            if n is given)
768 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
769    to control all matrix memory allocation.
770 
771    Output Parameter:
772 .  newmat - the matrix
773 
774    Notes:
775    The dense format is fully compatible with standard Fortran 77
776    storage by columns.
777 
778    The data input variable is intended primarily for Fortran programmers
779    who wish to allocate their own matrix memory space.  Most users should
780    set data=PETSC_NULL.
781 
782    The user MUST specify either the local or global matrix dimensions
783    (possibly both).
784 
785    Currently, the only parallel dense matrix decomposition is by rows,
786    so that n=N and each submatrix owns all of the global columns.
787 
788 .keywords: matrix, dense, parallel
789 
790 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
791 @*/
792 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *newmat)
793 {
794   Mat          mat;
795   Mat_MPIDense *a;
796   int          ierr, i;
797 
798 /* Note:  For now, when data is specified above, this assumes the user correctly
799    allocates the local dense storage space.  We should add error checking. */
800 
801   *newmat         = 0;
802   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,comm);
803   PLogObjectCreate(mat);
804   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
805   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
806   mat->destroy    = MatDestroy_MPIDense;
807   mat->view       = MatView_MPIDense;
808   mat->factor     = 0;
809 
810   a->insertmode = NOT_SET_VALUES;
811   MPI_Comm_rank(comm,&a->rank);
812   MPI_Comm_size(comm,&a->size);
813 
814   if (M == PETSC_DECIDE) MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);
815   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
816 
817   /* each row stores all columns */
818   if (N == PETSC_DECIDE) N = n;
819   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
820   /*  if (n != N) SETERRQ(1,"MatCreateMPIDense:For now, only n=N is supported"); */
821   a->N = N;
822   a->M = M;
823   a->m = m;
824   a->n = n;
825 
826   /* build local table of row and column ownerships */
827   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
828   a->cowners = a->rowners + a->size + 1;
829   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+
830                        sizeof(Mat_MPIDense));
831   MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);
832   a->rowners[0] = 0;
833   for ( i=2; i<=a->size; i++ ) {
834     a->rowners[i] += a->rowners[i-1];
835   }
836   a->rstart = a->rowners[a->rank];
837   a->rend   = a->rowners[a->rank+1];
838   MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);
839   a->cowners[0] = 0;
840   for ( i=2; i<=a->size; i++ ) {
841     a->cowners[i] += a->cowners[i-1];
842   }
843 
844   ierr = MatCreateSeqDense(MPI_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
845   PLogObjectParent(mat,a->A);
846 
847   /* build cache for off array entries formed */
848   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
849 
850   /* stuff used for matrix vector multiply */
851   a->lvec      = 0;
852   a->Mvctx     = 0;
853   a->assembled = 0;
854 
855   *newmat = mat;
856   return 0;
857 }
858 
859 static int MatCopyPrivate_MPIDense(Mat A,Mat *newmat,int cpvalues)
860 {
861   Mat          mat;
862   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
863   int          ierr;
864 
865   if (!oldmat->assembled) SETERRQ(1,"MatCopyPrivate_MPIDense:Must assemble matrix");
866   *newmat       = 0;
867   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIDENSE,A->comm);
868   PLogObjectCreate(mat);
869   mat->data     = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
870   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
871   mat->destroy  = MatDestroy_MPIDense;
872   mat->view     = MatView_MPIDense;
873   mat->factor   = A->factor;
874 
875   a->m          = oldmat->m;
876   a->n          = oldmat->n;
877   a->M          = oldmat->M;
878   a->N          = oldmat->N;
879 
880   a->assembled  = 1;
881   a->rstart     = oldmat->rstart;
882   a->rend       = oldmat->rend;
883   a->size       = oldmat->size;
884   a->rank       = oldmat->rank;
885   a->insertmode = NOT_SET_VALUES;
886 
887   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
888   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIDense));
889   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
890   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
891 
892   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
893   PLogObjectParent(mat,a->lvec);
894   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
895   PLogObjectParent(mat,a->Mvctx);
896   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
897   PLogObjectParent(mat,a->A);
898   *newmat = mat;
899   return 0;
900 }
901 
902 #include "sysio.h"
903 
904 int MatLoad_MPIDense(Viewer bview,MatType type,Mat *newmat)
905 {
906   Mat          A;
907   int          i, nz, ierr, j,rstart, rend, fd;
908   Scalar       *vals,*svals;
909   PetscObject  vobj = (PetscObject) bview;
910   MPI_Comm     comm = vobj->comm;
911   MPI_Status   status;
912   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
913   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
914   int          tag = ((PetscObject)bview)->tag;
915 
916   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
917   if (!rank) {
918     ierr = ViewerFileGetDescriptor_Private(bview,&fd); CHKERRQ(ierr);
919     ierr = SYRead(fd,(char *)header,4,SYINT); CHKERRQ(ierr);
920     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:not matrix object");
921   }
922 
923   MPI_Bcast(header+1,3,MPI_INT,0,comm);
924   M = header[1]; N = header[2];
925   /* determine ownership of all rows */
926   m = M/size + ((M % size) > rank);
927   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
928   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
929   rowners[0] = 0;
930   for ( i=2; i<=size; i++ ) {
931     rowners[i] += rowners[i-1];
932   }
933   rstart = rowners[rank];
934   rend   = rowners[rank+1];
935 
936   /* distribute row lengths to all processors */
937   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
938   offlens = ourlens + (rend-rstart);
939   if (!rank) {
940     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
941     ierr = SYRead(fd,rowlengths,M,SYINT); CHKERRQ(ierr);
942     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
943     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
944     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
945     PetscFree(sndcounts);
946   }
947   else {
948     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
949   }
950 
951   if (!rank) {
952     /* calculate the number of nonzeros on each processor */
953     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
954     PetscMemzero(procsnz,size*sizeof(int));
955     for ( i=0; i<size; i++ ) {
956       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
957         procsnz[i] += rowlengths[j];
958       }
959     }
960     PetscFree(rowlengths);
961 
962     /* determine max buffer needed and allocate it */
963     maxnz = 0;
964     for ( i=0; i<size; i++ ) {
965       maxnz = PetscMax(maxnz,procsnz[i]);
966     }
967     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
968 
969     /* read in my part of the matrix column indices  */
970     nz = procsnz[0];
971     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
972     ierr = SYRead(fd,mycols,nz,SYINT); CHKERRQ(ierr);
973 
974     /* read in every one elses and ship off */
975     for ( i=1; i<size; i++ ) {
976       nz = procsnz[i];
977       ierr = SYRead(fd,cols,nz,SYINT); CHKERRQ(ierr);
978       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
979     }
980     PetscFree(cols);
981   }
982   else {
983     /* determine buffer space needed for message */
984     nz = 0;
985     for ( i=0; i<m; i++ ) {
986       nz += ourlens[i];
987     }
988     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
989 
990     /* receive message of column indices*/
991     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
992     MPI_Get_count(&status,MPI_INT,&maxnz);
993     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
994   }
995 
996   /* loop over local rows, determining number of off diagonal entries */
997   PetscMemzero(offlens,m*sizeof(int));
998   jj = 0;
999   for ( i=0; i<m; i++ ) {
1000     for ( j=0; j<ourlens[i]; j++ ) {
1001       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1002       jj++;
1003     }
1004   }
1005 
1006   /* create our matrix */
1007   for ( i=0; i<m; i++ ) {
1008     ourlens[i] -= offlens[i];
1009   }
1010   if (type == MATMPIDENSE) {
1011     ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat); CHKERRQ(ierr);
1012   }
1013   A = *newmat;
1014   for ( i=0; i<m; i++ ) {
1015     ourlens[i] += offlens[i];
1016   }
1017 
1018   if (!rank) {
1019     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1020 
1021     /* read in my part of the matrix numerical values  */
1022     nz = procsnz[0];
1023     ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1024 
1025     /* insert into matrix */
1026     jj      = rstart;
1027     smycols = mycols;
1028     svals   = vals;
1029     for ( i=0; i<m; i++ ) {
1030       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1031       smycols += ourlens[i];
1032       svals   += ourlens[i];
1033       jj++;
1034     }
1035 
1036     /* read in other processors and ship out */
1037     for ( i=1; i<size; i++ ) {
1038       nz = procsnz[i];
1039       ierr = SYRead(fd,vals,nz,SYSCALAR); CHKERRQ(ierr);
1040       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1041     }
1042     PetscFree(procsnz);
1043   }
1044   else {
1045     /* receive numeric values */
1046     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1047 
1048     /* receive message of values*/
1049     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1050     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1051     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIDenseorMPIRow:something is wrong with file");
1052 
1053     /* insert into matrix */
1054     jj      = rstart;
1055     smycols = mycols;
1056     svals   = vals;
1057     for ( i=0; i<m; i++ ) {
1058       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1059       smycols += ourlens[i];
1060       svals   += ourlens[i];
1061       jj++;
1062     }
1063   }
1064   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1065 
1066   ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1067   ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERRQ(ierr);
1068   return 0;
1069 }
1070