xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision ebb8b11f1ce76c784a8869da16ffc573a18b1864)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.110 1999/03/18 00:36:01 balay Exp balay $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 extern int MatSetUpMultiply_MPIDense(Mat);
13 
14 #undef __FUNC__
15 #define __FUNC__ "MatSetValues_MPIDense"
16 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
17 {
18   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
19   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
20   int          roworiented = A->roworiented;
21 
22   PetscFunctionBegin;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) continue;
25     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
30       } else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) continue;
33           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
35         }
36       }
37     } else {
38       if ( !A->donotstash) {
39         if (roworiented) {
40           ierr = MatStashValuesRow_Private(&mat->stash,idxm[i],n,idxn,v+i*n); CHKERRQ(ierr);
41         } else {
42           ierr = MatStashValuesCol_Private(&mat->stash,idxm[i],n,idxn,v+i,m);CHKERRQ(ierr);
43         }
44       }
45     }
46   }
47   PetscFunctionReturn(0);
48 }
49 
50 #undef __FUNC__
51 #define __FUNC__ "MatGetValues_MPIDense"
52 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
53 {
54   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
55   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
56 
57   PetscFunctionBegin;
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
65         if (idxn[j] >= mdn->N) {
66           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
67         }
68         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
69       }
70     } else {
71       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
72     }
73   }
74   PetscFunctionReturn(0);
75 }
76 
77 #undef __FUNC__
78 #define __FUNC__ "MatGetArray_MPIDense"
79 int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int          ierr;
83 
84   PetscFunctionBegin;
85   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
86   PetscFunctionReturn(0);
87 }
88 
89 #undef __FUNC__
90 #define __FUNC__ "MatRestoreArray_MPIDense"
91 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
92 {
93   PetscFunctionBegin;
94   PetscFunctionReturn(0);
95 }
96 
97 #undef __FUNC__
98 #define __FUNC__ "MatAssemblyBegin_MPIDense"
99 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
100 {
101   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
102   MPI_Comm     comm = mat->comm;
103   int          ierr,nstash,reallocs;
104   InsertMode   addv;
105 
106   PetscFunctionBegin;
107   /* make sure all processors are either in INSERTMODE or ADDMODE */
108   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
109   if (addv == (ADD_VALUES|INSERT_VALUES)) {
110     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
111   }
112   mat->insertmode = addv; /* in case this processor had no cache */
113 
114   ierr = MatStashScatterBegin_Private(&mat->stash,mdn->rowners); CHKERRQ(ierr);
115   ierr = MatStashGetInfo_Private(&mat->stash,&nstash,&reallocs); CHKERRQ(ierr);
116   PLogInfo(mdn->A,"MatAssemblyBegin_MPIDense:Stash has %d entries, uses %d mallocs.\n",
117            nstash,reallocs);
118   PetscFunctionReturn(0);
119 }
120 
121 #undef __FUNC__
122 #define __FUNC__ "MatAssemblyEnd_MPIDense"
123 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
124 {
125   Mat_MPIDense *mdn=(Mat_MPIDense*)mat->data;
126   int          i,n,ierr,*row,*col,flg,j,rstart,ncols;
127   Scalar       *val;
128   InsertMode   addv=mat->insertmode;
129 
130   PetscFunctionBegin;
131   /*  wait on receives */
132   while (1) {
133     ierr = MatStashScatterGetMesg_Private(&mat->stash,&n,&row,&col,&val,&flg); CHKERRQ(ierr);
134     if (!flg) break;
135 
136     for ( i=0; i<n; ) {
137       /* Now identify the consecutive vals belonging to the same row */
138       for ( j=i,rstart=row[j]; j<n; j++ ) { if (row[j] != rstart) break; }
139       if (j < n) ncols = j-i;
140       else       ncols = n-i;
141       /* Now assemble all these values with a single function call */
142       ierr = MatSetValues_MPIDense(mat,1,row+i,ncols,col+i,val+i,addv); CHKERRQ(ierr);
143       i = j;
144     }
145   }
146   ierr = MatStashScatterEnd_Private(&mat->stash); CHKERRQ(ierr);
147 
148   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
149   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
150 
151   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
152     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
153   }
154   PetscFunctionReturn(0);
155 }
156 
157 #undef __FUNC__
158 #define __FUNC__ "MatZeroEntries_MPIDense"
159 int MatZeroEntries_MPIDense(Mat A)
160 {
161   int          ierr;
162   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
163 
164   PetscFunctionBegin;
165   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
166   PetscFunctionReturn(0);
167 }
168 
169 #undef __FUNC__
170 #define __FUNC__ "MatGetBlockSize_MPIDense"
171 int MatGetBlockSize_MPIDense(Mat A,int *bs)
172 {
173   PetscFunctionBegin;
174   *bs = 1;
175   PetscFunctionReturn(0);
176 }
177 
178 /* the code does not do the diagonal entries correctly unless the
179    matrix is square and the column and row owerships are identical.
180    This is a BUG. The only way to fix it seems to be to access
181    mdn->A and mdn->B directly and not through the MatZeroRows()
182    routine.
183 */
184 #undef __FUNC__
185 #define __FUNC__ "MatZeroRows_MPIDense"
186 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
187 {
188   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
189   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
190   int            *procs,*nprocs,j,found,idx,nsends,*work;
191   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
192   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
193   int            *lens,imdex,*lrows,*values;
194   MPI_Comm       comm = A->comm;
195   MPI_Request    *send_waits,*recv_waits;
196   MPI_Status     recv_status,*send_status;
197   IS             istmp;
198 
199   PetscFunctionBegin;
200   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
201   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
202 
203   /*  first count number of contributors to each processor */
204   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
205   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
206   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
207   for ( i=0; i<N; i++ ) {
208     idx = rows[i];
209     found = 0;
210     for ( j=0; j<size; j++ ) {
211       if (idx >= owners[j] && idx < owners[j+1]) {
212         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
213       }
214     }
215     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
216   }
217   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
218 
219   /* inform other processors of number of messages and max length*/
220   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
221   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
222   nrecvs = work[rank];
223   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
224   nmax   = work[rank];
225   PetscFree(work);
226 
227   /* post receives:   */
228   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
229   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
230   for ( i=0; i<nrecvs; i++ ) {
231     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
232   }
233 
234   /* do sends:
235       1) starts[i] gives the starting index in svalues for stuff going to
236          the ith processor
237   */
238   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
239   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
240   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
241   starts[0]  = 0;
242   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
243   for ( i=0; i<N; i++ ) {
244     svalues[starts[owner[i]]++] = rows[i];
245   }
246   ISRestoreIndices(is,&rows);
247 
248   starts[0] = 0;
249   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
250   count = 0;
251   for ( i=0; i<size; i++ ) {
252     if (procs[i]) {
253       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
254     }
255   }
256   PetscFree(starts);
257 
258   base = owners[rank];
259 
260   /*  wait on receives */
261   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
262   source = lens + nrecvs;
263   count  = nrecvs; slen = 0;
264   while (count) {
265     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
266     /* unpack receives into our local space */
267     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
268     source[imdex]  = recv_status.MPI_SOURCE;
269     lens[imdex]  = n;
270     slen += n;
271     count--;
272   }
273   PetscFree(recv_waits);
274 
275   /* move the data into the send scatter */
276   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
277   count = 0;
278   for ( i=0; i<nrecvs; i++ ) {
279     values = rvalues + i*nmax;
280     for ( j=0; j<lens[i]; j++ ) {
281       lrows[count++] = values[j] - base;
282     }
283   }
284   PetscFree(rvalues); PetscFree(lens);
285   PetscFree(owner); PetscFree(nprocs);
286 
287   /* actually zap the local rows */
288   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
289   PLogObjectParent(A,istmp);
290   PetscFree(lrows);
291   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
292   ierr = ISDestroy(istmp); CHKERRQ(ierr);
293 
294   /* wait on sends */
295   if (nsends) {
296     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
297     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
298     PetscFree(send_status);
299   }
300   PetscFree(send_waits); PetscFree(svalues);
301 
302   PetscFunctionReturn(0);
303 }
304 
305 #undef __FUNC__
306 #define __FUNC__ "MatMult_MPIDense"
307 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
308 {
309   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
310   int          ierr;
311 
312   PetscFunctionBegin;
313   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
314   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
315   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
316   PetscFunctionReturn(0);
317 }
318 
319 #undef __FUNC__
320 #define __FUNC__ "MatMultAdd_MPIDense"
321 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
322 {
323   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
324   int          ierr;
325 
326   PetscFunctionBegin;
327   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
328   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
329   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
330   PetscFunctionReturn(0);
331 }
332 
333 #undef __FUNC__
334 #define __FUNC__ "MatMultTrans_MPIDense"
335 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
336 {
337   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
338   int          ierr;
339   Scalar       zero = 0.0;
340 
341   PetscFunctionBegin;
342   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
343   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
344   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
345   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
346   PetscFunctionReturn(0);
347 }
348 
349 #undef __FUNC__
350 #define __FUNC__ "MatMultTransAdd_MPIDense"
351 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
352 {
353   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
354   int          ierr;
355 
356   PetscFunctionBegin;
357   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
358   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
359   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
360   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
361   PetscFunctionReturn(0);
362 }
363 
364 #undef __FUNC__
365 #define __FUNC__ "MatGetDiagonal_MPIDense"
366 int MatGetDiagonal_MPIDense(Mat A,Vec v)
367 {
368   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
369   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
370   int          ierr, len, i, n, m = a->m, radd;
371   Scalar       *x, zero = 0.0;
372 
373   PetscFunctionBegin;
374   VecSet(&zero,v);
375   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
376   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
377   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
378   len = PetscMin(aloc->m,aloc->n);
379   radd = a->rstart*m;
380   for ( i=0; i<len; i++ ) {
381     x[i] = aloc->v[radd + i*m + i];
382   }
383   PetscFunctionReturn(0);
384 }
385 
386 #undef __FUNC__
387 #define __FUNC__ "MatDestroy_MPIDense"
388 int MatDestroy_MPIDense(Mat mat)
389 {
390   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
391   int          ierr;
392 
393   PetscFunctionBegin;
394   if (--mat->refct > 0) PetscFunctionReturn(0);
395 
396   if (mat->mapping) {
397     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
398   }
399   if (mat->bmapping) {
400     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
401   }
402 #if defined(USE_PETSC_LOG)
403   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mdn->M,mdn->N);
404 #endif
405   ierr = MatStashDestroy_Private(&mat->stash); CHKERRQ(ierr);
406   PetscFree(mdn->rowners);
407   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
408   if (mdn->lvec)   VecDestroy(mdn->lvec);
409   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
410   if (mdn->factor) {
411     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
412     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
413     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
414     PetscFree(mdn->factor);
415   }
416   PetscFree(mdn);
417   if (mat->rmap) {
418     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
419   }
420   if (mat->cmap) {
421     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
422   }
423   PLogObjectDestroy(mat);
424   PetscHeaderDestroy(mat);
425   PetscFunctionReturn(0);
426 }
427 
428 #undef __FUNC__
429 #define __FUNC__ "MatView_MPIDense_Binary"
430 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
431 {
432   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
433   int          ierr;
434 
435   PetscFunctionBegin;
436   if (mdn->size == 1) {
437     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
438   }
439   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
440   PetscFunctionReturn(0);
441 }
442 
443 #undef __FUNC__
444 #define __FUNC__ "MatView_MPIDense_ASCII"
445 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
446 {
447   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
448   int          ierr, format, size = mdn->size, rank = mdn->rank;
449   FILE         *fd;
450   ViewerType   vtype;
451 
452   PetscFunctionBegin;
453   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
454   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
455   ierr = ViewerGetFormat(viewer,&format);
456   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
457     MatInfo info;
458     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
459     PetscSequentialPhaseBegin(mat->comm,1);
460       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
461          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
462       fflush(fd);
463     PetscSequentialPhaseEnd(mat->comm,1);
464     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
465     PetscFunctionReturn(0);
466   } else if (format == VIEWER_FORMAT_ASCII_INFO) {
467     PetscFunctionReturn(0);
468   }
469 
470   if (size == 1) {
471     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
472   } else {
473     /* assemble the entire matrix onto first processor. */
474     Mat          A;
475     int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
476     Scalar       *vals;
477     Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
478 
479     if (!rank) {
480       ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
481     } else {
482       ierr = MatCreateMPIDense(mat->comm,0,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
483     }
484     PLogObjectParent(mat,A);
485 
486     /* Copy the matrix ... This isn't the most efficient means,
487        but it's quick for now */
488     row = mdn->rstart; m = Amdn->m;
489     for ( i=0; i<m; i++ ) {
490       ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
491       ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
492       ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
493       row++;
494     }
495 
496     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
497     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
498     if (!rank) {
499       ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
500     }
501     ierr = MatDestroy(A); CHKERRQ(ierr);
502   }
503   PetscFunctionReturn(0);
504 }
505 
506 #undef __FUNC__
507 #define __FUNC__ "MatView_MPIDense"
508 int MatView_MPIDense(Mat mat,Viewer viewer)
509 {
510   int          ierr;
511   ViewerType   vtype;
512 
513   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
514   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
515     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
516   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
517     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
518   } else {
519     SETERRQ(1,1,"Viewer type not supported by PETSc object");
520   }
521   PetscFunctionReturn(0);
522 }
523 
524 #undef __FUNC__
525 #define __FUNC__ "MatGetInfo_MPIDense"
526 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
527 {
528   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
529   Mat          mdn = mat->A;
530   int          ierr;
531   double       isend[5], irecv[5];
532 
533   PetscFunctionBegin;
534   info->rows_global    = (double)mat->M;
535   info->columns_global = (double)mat->N;
536   info->rows_local     = (double)mat->m;
537   info->columns_local  = (double)mat->N;
538   info->block_size     = 1.0;
539   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
540   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
541   isend[3] = info->memory;  isend[4] = info->mallocs;
542   if (flag == MAT_LOCAL) {
543     info->nz_used      = isend[0];
544     info->nz_allocated = isend[1];
545     info->nz_unneeded  = isend[2];
546     info->memory       = isend[3];
547     info->mallocs      = isend[4];
548   } else if (flag == MAT_GLOBAL_MAX) {
549     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
550     info->nz_used      = irecv[0];
551     info->nz_allocated = irecv[1];
552     info->nz_unneeded  = irecv[2];
553     info->memory       = irecv[3];
554     info->mallocs      = irecv[4];
555   } else if (flag == MAT_GLOBAL_SUM) {
556     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
557     info->nz_used      = irecv[0];
558     info->nz_allocated = irecv[1];
559     info->nz_unneeded  = irecv[2];
560     info->memory       = irecv[3];
561     info->mallocs      = irecv[4];
562   }
563   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
564   info->fill_ratio_needed = 0;
565   info->factor_mallocs    = 0;
566   PetscFunctionReturn(0);
567 }
568 
569 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
570    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
571    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
572    extern int MatSolve_MPIDense(Mat,Vec,Vec);
573    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
574    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
575    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
576 
577 #undef __FUNC__
578 #define __FUNC__ "MatSetOption_MPIDense"
579 int MatSetOption_MPIDense(Mat A,MatOption op)
580 {
581   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
582 
583   PetscFunctionBegin;
584   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
585       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
586       op == MAT_NEW_NONZERO_LOCATION_ERR ||
587       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
588       op == MAT_COLUMNS_SORTED ||
589       op == MAT_COLUMNS_UNSORTED) {
590         MatSetOption(a->A,op);
591   } else if (op == MAT_ROW_ORIENTED) {
592         a->roworiented = 1;
593         MatSetOption(a->A,op);
594   } else if (op == MAT_ROWS_SORTED ||
595              op == MAT_ROWS_UNSORTED ||
596              op == MAT_SYMMETRIC ||
597              op == MAT_STRUCTURALLY_SYMMETRIC ||
598              op == MAT_YES_NEW_DIAGONALS ||
599              op == MAT_USE_HASH_TABLE) {
600     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
601   } else if (op == MAT_COLUMN_ORIENTED) {
602     a->roworiented = 0; MatSetOption(a->A,op);
603   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
604     a->donotstash = 1;
605   } else if (op == MAT_NO_NEW_DIAGONALS) {
606     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
607   } else {
608     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
609   }
610   PetscFunctionReturn(0);
611 }
612 
613 #undef __FUNC__
614 #define __FUNC__ "MatGetSize_MPIDense"
615 int MatGetSize_MPIDense(Mat A,int *m,int *n)
616 {
617   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
618 
619   PetscFunctionBegin;
620   *m = mat->M; *n = mat->N;
621   PetscFunctionReturn(0);
622 }
623 
624 #undef __FUNC__
625 #define __FUNC__ "MatGetLocalSize_MPIDense"
626 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
627 {
628   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
629 
630   PetscFunctionBegin;
631   *m = mat->m; *n = mat->N;
632   PetscFunctionReturn(0);
633 }
634 
635 #undef __FUNC__
636 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
637 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
638 {
639   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
640 
641   PetscFunctionBegin;
642   *m = mat->rstart; *n = mat->rend;
643   PetscFunctionReturn(0);
644 }
645 
646 #undef __FUNC__
647 #define __FUNC__ "MatGetRow_MPIDense"
648 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
649 {
650   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
651   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
652 
653   PetscFunctionBegin;
654   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
655   lrow = row - rstart;
656   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
657   PetscFunctionReturn(0);
658 }
659 
660 #undef __FUNC__
661 #define __FUNC__ "MatRestoreRow_MPIDense"
662 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
663 {
664   PetscFunctionBegin;
665   if (idx) PetscFree(*idx);
666   if (v) PetscFree(*v);
667   PetscFunctionReturn(0);
668 }
669 
670 #undef __FUNC__
671 #define __FUNC__ "MatNorm_MPIDense"
672 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
673 {
674   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
675   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
676   int          ierr, i, j;
677   double       sum = 0.0;
678   Scalar       *v = mat->v;
679 
680   PetscFunctionBegin;
681   if (mdn->size == 1) {
682     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
683   } else {
684     if (type == NORM_FROBENIUS) {
685       for (i=0; i<mat->n*mat->m; i++ ) {
686 #if defined(USE_PETSC_COMPLEX)
687         sum += PetscReal(PetscConj(*v)*(*v)); v++;
688 #else
689         sum += (*v)*(*v); v++;
690 #endif
691       }
692       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
693       *norm = sqrt(*norm);
694       PLogFlops(2*mat->n*mat->m);
695     } else if (type == NORM_1) {
696       double *tmp, *tmp2;
697       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
698       tmp2 = tmp + mdn->N;
699       PetscMemzero(tmp,2*mdn->N*sizeof(double));
700       *norm = 0.0;
701       v = mat->v;
702       for ( j=0; j<mat->n; j++ ) {
703         for ( i=0; i<mat->m; i++ ) {
704           tmp[j] += PetscAbsScalar(*v);  v++;
705         }
706       }
707       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
708       for ( j=0; j<mdn->N; j++ ) {
709         if (tmp2[j] > *norm) *norm = tmp2[j];
710       }
711       PetscFree(tmp);
712       PLogFlops(mat->n*mat->m);
713     } else if (type == NORM_INFINITY) { /* max row norm */
714       double ntemp;
715       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
716       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
717     } else {
718       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
719     }
720   }
721   PetscFunctionReturn(0);
722 }
723 
724 #undef __FUNC__
725 #define __FUNC__ "MatTranspose_MPIDense"
726 int MatTranspose_MPIDense(Mat A,Mat *matout)
727 {
728   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
729   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
730   Mat          B;
731   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
732   int          j, i, ierr;
733   Scalar       *v;
734 
735   PetscFunctionBegin;
736   if (matout == PETSC_NULL && M != N) {
737     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
738   }
739   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
740 
741   m = Aloc->m; n = Aloc->n; v = Aloc->v;
742   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
743   for ( j=0; j<n; j++ ) {
744     for (i=0; i<m; i++) rwork[i] = rstart + i;
745     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
746     v   += m;
747   }
748   PetscFree(rwork);
749   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
750   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
751   if (matout != PETSC_NULL) {
752     *matout = B;
753   } else {
754     PetscOps *Abops;
755     MatOps   Aops;
756 
757     /* This isn't really an in-place transpose, but free data struct from a */
758     PetscFree(a->rowners);
759     ierr = MatDestroy(a->A); CHKERRQ(ierr);
760     if (a->lvec) VecDestroy(a->lvec);
761     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
762     PetscFree(a);
763 
764     /*
765          This is horrible, horrible code. We need to keep the
766       A pointers for the bops and ops but copy everything
767       else from C.
768     */
769     Abops = A->bops;
770     Aops  = A->ops;
771     PetscMemcpy(A,B,sizeof(struct _p_Mat));
772     A->bops = Abops;
773     A->ops  = Aops;
774 
775     PetscHeaderDestroy(B);
776   }
777   PetscFunctionReturn(0);
778 }
779 
780 #include "pinclude/blaslapack.h"
781 #undef __FUNC__
782 #define __FUNC__ "MatScale_MPIDense"
783 int MatScale_MPIDense(Scalar *alpha,Mat inA)
784 {
785   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
786   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
787   int          one = 1, nz;
788 
789   PetscFunctionBegin;
790   nz = a->m*a->n;
791   BLscal_( &nz, alpha, a->v, &one );
792   PLogFlops(nz);
793   PetscFunctionReturn(0);
794 }
795 
796 static int MatDuplicate_MPIDense(Mat,MatDuplicateOption,Mat *);
797 extern int MatGetSubMatrices_MPIDense(Mat,int,IS *,IS *,MatReuse,Mat **);
798 
799 /* -------------------------------------------------------------------*/
800 static struct _MatOps MatOps_Values = {MatSetValues_MPIDense,
801        MatGetRow_MPIDense,
802        MatRestoreRow_MPIDense,
803        MatMult_MPIDense,
804        MatMultAdd_MPIDense,
805        MatMultTrans_MPIDense,
806        MatMultTransAdd_MPIDense,
807        0,
808        0,
809        0,
810        0,
811        0,
812        0,
813        0,
814        MatTranspose_MPIDense,
815        MatGetInfo_MPIDense,0,
816        MatGetDiagonal_MPIDense,
817        0,
818        MatNorm_MPIDense,
819        MatAssemblyBegin_MPIDense,
820        MatAssemblyEnd_MPIDense,
821        0,
822        MatSetOption_MPIDense,
823        MatZeroEntries_MPIDense,
824        MatZeroRows_MPIDense,
825        0,
826        0,
827        0,
828        0,
829        MatGetSize_MPIDense,
830        MatGetLocalSize_MPIDense,
831        MatGetOwnershipRange_MPIDense,
832        0,
833        0,
834        MatGetArray_MPIDense,
835        MatRestoreArray_MPIDense,
836        MatDuplicate_MPIDense,
837        0,
838        0,
839        0,
840        0,
841        0,
842        MatGetSubMatrices_MPIDense,
843        0,
844        MatGetValues_MPIDense,
845        0,
846        0,
847        MatScale_MPIDense,
848        0,
849        0,
850        0,
851        MatGetBlockSize_MPIDense,
852        0,
853        0,
854        0,
855        0,
856        0,
857        0,
858        0,
859        0,
860        0,
861        0,
862        0,
863        0,
864        MatGetMaps_Petsc};
865 
866 #undef __FUNC__
867 #define __FUNC__ "MatCreateMPIDense"
868 /*@C
869    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
870 
871    Collective on MPI_Comm
872 
873    Input Parameters:
874 +  comm - MPI communicator
875 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
876 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
877 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
878 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
879 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
880    to control all matrix memory allocation.
881 
882    Output Parameter:
883 .  A - the matrix
884 
885    Notes:
886    The dense format is fully compatible with standard Fortran 77
887    storage by columns.
888 
889    The data input variable is intended primarily for Fortran programmers
890    who wish to allocate their own matrix memory space.  Most users should
891    set data=PETSC_NULL.
892 
893    The user MUST specify either the local or global matrix dimensions
894    (possibly both).
895 
896    Currently, the only parallel dense matrix decomposition is by rows,
897    so that n=N and each submatrix owns all of the global columns.
898 
899    Level: intermediate
900 
901 .keywords: matrix, dense, parallel
902 
903 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
904 @*/
905 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
906 {
907   Mat          mat;
908   Mat_MPIDense *a;
909   int          ierr, i,flg;
910 
911   PetscFunctionBegin;
912   /* Note:  For now, when data is specified above, this assumes the user correctly
913    allocates the local dense storage space.  We should add error checking. */
914 
915   *A = 0;
916   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",comm,MatDestroy,MatView);
917   PLogObjectCreate(mat);
918   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
919   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
920   mat->ops->destroy    = MatDestroy_MPIDense;
921   mat->ops->view       = MatView_MPIDense;
922   mat->factor          = 0;
923   mat->mapping         = 0;
924 
925   a->factor       = 0;
926   mat->insertmode = NOT_SET_VALUES;
927   MPI_Comm_rank(comm,&a->rank);
928   MPI_Comm_size(comm,&a->size);
929 
930   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
931 
932   /*
933      The computation of n is wrong below, n should represent the number of local
934      rows in the right (column vector)
935   */
936 
937   /* each row stores all columns */
938   if (N == PETSC_DECIDE) N = n;
939   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
940   /*  if (n != N) SETERRQ(PETSC_ERR_SUP,0,"For now, only n=N is supported"); */
941   a->N = mat->N = N;
942   a->M = mat->M = M;
943   a->m = mat->m = m;
944   a->n = mat->n = n;
945 
946   /* the information in the maps duplicates the information computed below, eventually
947      we should remove the duplicate information that is not contained in the maps */
948   ierr = MapCreateMPI(comm,m,M,&mat->rmap);CHKERRQ(ierr);
949   ierr = MapCreateMPI(comm,PETSC_DECIDE,N,&mat->cmap);CHKERRQ(ierr);
950 
951   /* build local table of row and column ownerships */
952   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
953   a->cowners = a->rowners + a->size + 1;
954   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
955   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
956   a->rowners[0] = 0;
957   for ( i=2; i<=a->size; i++ ) {
958     a->rowners[i] += a->rowners[i-1];
959   }
960   a->rstart = a->rowners[a->rank];
961   a->rend   = a->rowners[a->rank+1];
962   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
963   a->cowners[0] = 0;
964   for ( i=2; i<=a->size; i++ ) {
965     a->cowners[i] += a->cowners[i-1];
966   }
967 
968   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
969   PLogObjectParent(mat,a->A);
970 
971   /* build cache for off array entries formed */
972   a->donotstash = 0;
973   ierr = MatStashCreate_Private(comm,1,&mat->stash); CHKERRQ(ierr);
974 
975   /* stuff used for matrix vector multiply */
976   a->lvec        = 0;
977   a->Mvctx       = 0;
978   a->roworiented = 1;
979 
980   *A = mat;
981   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
982   if (flg) {
983     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
984   }
985   PetscFunctionReturn(0);
986 }
987 
988 #undef __FUNC__
989 #define __FUNC__ "MatDuplicate_MPIDense"
990 static int MatDuplicate_MPIDense(Mat A,MatDuplicateOption cpvalues,Mat *newmat)
991 {
992   Mat          mat;
993   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
994   int          ierr;
995   FactorCtx    *factor;
996 
997   PetscFunctionBegin;
998   *newmat       = 0;
999   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",A->comm,MatDestroy,MatView);
1000   PLogObjectCreate(mat);
1001   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
1002   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1003   mat->ops->destroy   = MatDestroy_MPIDense;
1004   mat->ops->view      = MatView_MPIDense;
1005   mat->factor         = A->factor;
1006   mat->assembled      = PETSC_TRUE;
1007 
1008   a->m = mat->m = oldmat->m;
1009   a->n = mat->n = oldmat->n;
1010   a->M = mat->M = oldmat->M;
1011   a->N = mat->N = oldmat->N;
1012   if (oldmat->factor) {
1013     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
1014     /* copy factor contents ... add this code! */
1015   } else a->factor = 0;
1016 
1017   a->rstart       = oldmat->rstart;
1018   a->rend         = oldmat->rend;
1019   a->size         = oldmat->size;
1020   a->rank         = oldmat->rank;
1021   mat->insertmode = NOT_SET_VALUES;
1022   a->donotstash   = oldmat->donotstash;
1023   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1024   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1025   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1026   ierr = MatStashCreate_Private(A->comm,1,&mat->stash); CHKERRQ(ierr);
1027 
1028   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1029   PLogObjectParent(mat,a->lvec);
1030   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1031   PLogObjectParent(mat,a->Mvctx);
1032   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A); CHKERRQ(ierr);
1033   PLogObjectParent(mat,a->A);
1034   *newmat = mat;
1035   PetscFunctionReturn(0);
1036 }
1037 
1038 #include "sys.h"
1039 
1040 #undef __FUNC__
1041 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1042 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1043 {
1044   int        *rowners, i,size,rank,m,ierr,nz,j;
1045   Scalar     *array,*vals,*vals_ptr;
1046   MPI_Status status;
1047 
1048   PetscFunctionBegin;
1049   MPI_Comm_rank(comm,&rank);
1050   MPI_Comm_size(comm,&size);
1051 
1052   /* determine ownership of all rows */
1053   m          = M/size + ((M % size) > rank);
1054   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1055   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1056   rowners[0] = 0;
1057   for ( i=2; i<=size; i++ ) {
1058     rowners[i] += rowners[i-1];
1059   }
1060 
1061   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1062   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
1063 
1064   if (!rank) {
1065     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1066 
1067     /* read in my part of the matrix numerical values  */
1068     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR); CHKERRQ(ierr);
1069 
1070     /* insert into matrix-by row (this is why cannot directly read into array */
1071     vals_ptr = vals;
1072     for ( i=0; i<m; i++ ) {
1073       for ( j=0; j<N; j++ ) {
1074         array[i + j*m] = *vals_ptr++;
1075       }
1076     }
1077 
1078     /* read in other processors and ship out */
1079     for ( i=1; i<size; i++ ) {
1080       nz   = (rowners[i+1] - rowners[i])*N;
1081       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1082       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1083     }
1084   } else {
1085     /* receive numeric values */
1086     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1087 
1088     /* receive message of values*/
1089     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1090 
1091     /* insert into matrix-by row (this is why cannot directly read into array */
1092     vals_ptr = vals;
1093     for ( i=0; i<m; i++ ) {
1094       for ( j=0; j<N; j++ ) {
1095         array[i + j*m] = *vals_ptr++;
1096       }
1097     }
1098   }
1099   PetscFree(rowners);
1100   PetscFree(vals);
1101   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1102   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1103   PetscFunctionReturn(0);
1104 }
1105 
1106 
1107 #undef __FUNC__
1108 #define __FUNC__ "MatLoad_MPIDense"
1109 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1110 {
1111   Mat          A;
1112   Scalar       *vals,*svals;
1113   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1114   MPI_Status   status;
1115   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1116   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1117   int          tag = ((PetscObject)viewer)->tag;
1118   int          i, nz, ierr, j,rstart, rend, fd;
1119 
1120   PetscFunctionBegin;
1121   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1122   if (!rank) {
1123     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1124     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1125     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1126   }
1127 
1128   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1129   M = header[1]; N = header[2]; nz = header[3];
1130 
1131   /*
1132        Handle case where matrix is stored on disk as a dense matrix
1133   */
1134   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1135     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1136     PetscFunctionReturn(0);
1137   }
1138 
1139   /* determine ownership of all rows */
1140   m          = M/size + ((M % size) > rank);
1141   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1142   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1143   rowners[0] = 0;
1144   for ( i=2; i<=size; i++ ) {
1145     rowners[i] += rowners[i-1];
1146   }
1147   rstart = rowners[rank];
1148   rend   = rowners[rank+1];
1149 
1150   /* distribute row lengths to all processors */
1151   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1152   offlens = ourlens + (rend-rstart);
1153   if (!rank) {
1154     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1155     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1156     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1157     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1158     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1159     PetscFree(sndcounts);
1160   } else {
1161     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1162   }
1163 
1164   if (!rank) {
1165     /* calculate the number of nonzeros on each processor */
1166     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1167     PetscMemzero(procsnz,size*sizeof(int));
1168     for ( i=0; i<size; i++ ) {
1169       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1170         procsnz[i] += rowlengths[j];
1171       }
1172     }
1173     PetscFree(rowlengths);
1174 
1175     /* determine max buffer needed and allocate it */
1176     maxnz = 0;
1177     for ( i=0; i<size; i++ ) {
1178       maxnz = PetscMax(maxnz,procsnz[i]);
1179     }
1180     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1181 
1182     /* read in my part of the matrix column indices  */
1183     nz = procsnz[0];
1184     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1185     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1186 
1187     /* read in every one elses and ship off */
1188     for ( i=1; i<size; i++ ) {
1189       nz   = procsnz[i];
1190       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1191       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1192     }
1193     PetscFree(cols);
1194   } else {
1195     /* determine buffer space needed for message */
1196     nz = 0;
1197     for ( i=0; i<m; i++ ) {
1198       nz += ourlens[i];
1199     }
1200     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1201 
1202     /* receive message of column indices*/
1203     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1204     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1205     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1206   }
1207 
1208   /* loop over local rows, determining number of off diagonal entries */
1209   PetscMemzero(offlens,m*sizeof(int));
1210   jj = 0;
1211   for ( i=0; i<m; i++ ) {
1212     for ( j=0; j<ourlens[i]; j++ ) {
1213       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1214       jj++;
1215     }
1216   }
1217 
1218   /* create our matrix */
1219   for ( i=0; i<m; i++ ) {
1220     ourlens[i] -= offlens[i];
1221   }
1222   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1223   A = *newmat;
1224   for ( i=0; i<m; i++ ) {
1225     ourlens[i] += offlens[i];
1226   }
1227 
1228   if (!rank) {
1229     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1230 
1231     /* read in my part of the matrix numerical values  */
1232     nz = procsnz[0];
1233     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1234 
1235     /* insert into matrix */
1236     jj      = rstart;
1237     smycols = mycols;
1238     svals   = vals;
1239     for ( i=0; i<m; i++ ) {
1240       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1241       smycols += ourlens[i];
1242       svals   += ourlens[i];
1243       jj++;
1244     }
1245 
1246     /* read in other processors and ship out */
1247     for ( i=1; i<size; i++ ) {
1248       nz   = procsnz[i];
1249       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1250       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1251     }
1252     PetscFree(procsnz);
1253   } else {
1254     /* receive numeric values */
1255     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1256 
1257     /* receive message of values*/
1258     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1259     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1260     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1261 
1262     /* insert into matrix */
1263     jj      = rstart;
1264     smycols = mycols;
1265     svals   = vals;
1266     for ( i=0; i<m; i++ ) {
1267       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1268       smycols += ourlens[i];
1269       svals   += ourlens[i];
1270       jj++;
1271     }
1272   }
1273   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1274 
1275   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1276   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1277   PetscFunctionReturn(0);
1278 }
1279 
1280 
1281 
1282 
1283 
1284