xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 7eba5e9c78dc93e183455acc9686dd36aa593f6c)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.119 1999/07/26 02:52:49 curfman Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 extern int MatSetUpMultiply_MPIDense(Mat);
13 
14 #undef __FUNC__
15 #define __FUNC__ "MatSetValues_MPIDense"
16 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
17 {
18   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
19   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
20   int          roworiented = A->roworiented;
21 
22   PetscFunctionBegin;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) continue;
25     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv);CHKERRQ(ierr);
30       } else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) continue;
33           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
35         }
36       }
37     } else {
38       if ( !A->donotstash) {
39         if (roworiented) {
40           ierr = MatStashValuesRow_Private(&mat->stash,idxm[i],n,idxn,v+i*n);CHKERRQ(ierr);
41         } else {
42           ierr = MatStashValuesCol_Private(&mat->stash,idxm[i],n,idxn,v+i,m);CHKERRQ(ierr);
43         }
44       }
45     }
46   }
47   PetscFunctionReturn(0);
48 }
49 
50 #undef __FUNC__
51 #define __FUNC__ "MatGetValues_MPIDense"
52 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
53 {
54   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
55   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
56 
57   PetscFunctionBegin;
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
65         if (idxn[j] >= mdn->N) {
66           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
67         }
68         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j);CHKERRQ(ierr);
69       }
70     } else {
71       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
72     }
73   }
74   PetscFunctionReturn(0);
75 }
76 
77 #undef __FUNC__
78 #define __FUNC__ "MatGetArray_MPIDense"
79 int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int          ierr;
83 
84   PetscFunctionBegin;
85   ierr = MatGetArray(a->A,array);CHKERRQ(ierr);
86   PetscFunctionReturn(0);
87 }
88 
89 #undef __FUNC__
90 #define __FUNC__ "MatGetSubMatrix_MPIDense"
91 static int MatGetSubMatrix_MPIDense(Mat A,IS isrow,IS iscol,int cs,MatReuse scall,Mat *B)
92 {
93   Mat_MPIDense *mat = (Mat_MPIDense *) A->data, *newmatd;
94   Mat_SeqDense *lmat = (Mat_SeqDense *) mat->A->data;
95   int          i, j, ierr, *irow, *icol, n_lrows, n_lcols;
96   int          rstart, rend, nrows, ncols, nlrows, nlcols, rank, ncols_all;
97   Scalar       *av, *bv, *v = lmat->v;
98   Mat          newmat;
99 
100   PetscFunctionBegin;
101   ierr = MPI_Comm_rank(A->comm,&rank);CHKERRQ(ierr);
102   ierr = ISGetIndices(isrow,&irow);CHKERRQ(ierr);
103   ierr = ISGetIndices(iscol,&icol);CHKERRQ(ierr);
104   ierr = ISGetSize(isrow,&nrows);CHKERRQ(ierr);
105   ierr = ISGetSize(iscol,&ncols);CHKERRQ(ierr);
106 
107   /* No parallel redistribution currently supported! Should really check each index set
108      to comfirm that it is OK.  ... Currently supports only submatrix same partitioning as
109      original matrix! */
110 
111   ierr = MatGetLocalSize(A,&nlrows,&nlcols);CHKERRQ(ierr);
112   ierr = MatGetOwnershipRange(A,&rstart,&rend);CHKERRQ(ierr);
113 
114   /* Check submatrix call */
115   if (scall == MAT_REUSE_MATRIX) {
116     /* SETERRQ(PETSC_ERR_ARG_SIZ,0,"Reused submatrix wrong size"); */
117     /* Really need to test rows and column sizes! */
118     newmat = *B;
119   } else {
120     /* Create and fill new matrix */
121     ierr = MatCreateMPIDense(A->comm,nrows,ncols,PETSC_DECIDE,PETSC_DECIDE,PETSC_NULL,&newmat);CHKERRQ(ierr);
122   }
123 
124   /* Now extract the data pointers and do the copy, column at a time */
125   newmatd = (Mat_MPIDense *) newmat->data;
126   bv = ((Mat_SeqDense *)newmatd->A->data)->v;
127 
128   for ( i=0; i<ncols; i++ ) {
129     av = v + nlrows*icol[i];
130     for (j=0; j<nrows; j++ ) {
131       *bv++ = av[irow[j] - rstart];
132     }
133   }
134 
135   /* Assemble the matrices so that the correct flags are set */
136   ierr = MatAssemblyBegin(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
137   ierr = MatAssemblyEnd(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
138 
139   /* Free work space */
140   ierr = ISRestoreIndices(isrow,&irow);CHKERRQ(ierr);
141   ierr = ISRestoreIndices(iscol,&icol);CHKERRQ(ierr);
142   *B = newmat;
143   PetscFunctionReturn(0);
144 }
145 
146 #undef __FUNC__
147 #define __FUNC__ "MatRestoreArray_MPIDense"
148 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
149 {
150   PetscFunctionBegin;
151   PetscFunctionReturn(0);
152 }
153 
154 #undef __FUNC__
155 #define __FUNC__ "MatAssemblyBegin_MPIDense"
156 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
157 {
158   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
159   MPI_Comm     comm = mat->comm;
160   int          ierr,nstash,reallocs;
161   InsertMode   addv;
162 
163   PetscFunctionBegin;
164   /* make sure all processors are either in INSERTMODE or ADDMODE */
165   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
166   if (addv == (ADD_VALUES|INSERT_VALUES)) {
167     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
168   }
169   mat->insertmode = addv; /* in case this processor had no cache */
170 
171   ierr = MatStashScatterBegin_Private(&mat->stash,mdn->rowners);CHKERRQ(ierr);
172   ierr = MatStashGetInfo_Private(&mat->stash,&nstash,&reallocs);CHKERRQ(ierr);
173   PLogInfo(mdn->A,"MatAssemblyBegin_MPIDense:Stash has %d entries, uses %d mallocs.\n",
174            nstash,reallocs);
175   PetscFunctionReturn(0);
176 }
177 
178 #undef __FUNC__
179 #define __FUNC__ "MatAssemblyEnd_MPIDense"
180 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
181 {
182   Mat_MPIDense *mdn=(Mat_MPIDense*)mat->data;
183   int          i,n,ierr,*row,*col,flg,j,rstart,ncols;
184   Scalar       *val;
185   InsertMode   addv=mat->insertmode;
186 
187   PetscFunctionBegin;
188   /*  wait on receives */
189   while (1) {
190     ierr = MatStashScatterGetMesg_Private(&mat->stash,&n,&row,&col,&val,&flg);CHKERRQ(ierr);
191     if (!flg) break;
192 
193     for ( i=0; i<n; ) {
194       /* Now identify the consecutive vals belonging to the same row */
195       for ( j=i,rstart=row[j]; j<n; j++ ) { if (row[j] != rstart) break; }
196       if (j < n) ncols = j-i;
197       else       ncols = n-i;
198       /* Now assemble all these values with a single function call */
199       ierr = MatSetValues_MPIDense(mat,1,row+i,ncols,col+i,val+i,addv);CHKERRQ(ierr);
200       i = j;
201     }
202   }
203   ierr = MatStashScatterEnd_Private(&mat->stash);CHKERRQ(ierr);
204 
205   ierr = MatAssemblyBegin(mdn->A,mode);CHKERRQ(ierr);
206   ierr = MatAssemblyEnd(mdn->A,mode);CHKERRQ(ierr);
207 
208   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
209     ierr = MatSetUpMultiply_MPIDense(mat);CHKERRQ(ierr);
210   }
211   PetscFunctionReturn(0);
212 }
213 
214 #undef __FUNC__
215 #define __FUNC__ "MatZeroEntries_MPIDense"
216 int MatZeroEntries_MPIDense(Mat A)
217 {
218   int          ierr;
219   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
220 
221   PetscFunctionBegin;
222   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
223   PetscFunctionReturn(0);
224 }
225 
226 #undef __FUNC__
227 #define __FUNC__ "MatGetBlockSize_MPIDense"
228 int MatGetBlockSize_MPIDense(Mat A,int *bs)
229 {
230   PetscFunctionBegin;
231   *bs = 1;
232   PetscFunctionReturn(0);
233 }
234 
235 /* the code does not do the diagonal entries correctly unless the
236    matrix is square and the column and row owerships are identical.
237    This is a BUG. The only way to fix it seems to be to access
238    mdn->A and mdn->B directly and not through the MatZeroRows()
239    routine.
240 */
241 #undef __FUNC__
242 #define __FUNC__ "MatZeroRows_MPIDense"
243 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
244 {
245   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
246   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
247   int            *procs,*nprocs,j,found,idx,nsends,*work;
248   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
249   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
250   int            *lens,imdex,*lrows,*values;
251   MPI_Comm       comm = A->comm;
252   MPI_Request    *send_waits,*recv_waits;
253   MPI_Status     recv_status,*send_status;
254   IS             istmp;
255 
256   PetscFunctionBegin;
257   ierr = ISGetSize(is,&N);CHKERRQ(ierr);
258   ierr = ISGetIndices(is,&rows);CHKERRQ(ierr);
259 
260   /*  first count number of contributors to each processor */
261   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) );CHKPTRQ(nprocs);
262   ierr   = PetscMemzero(nprocs,2*size*sizeof(int));CHKERRQ(ierr);
263   procs  = nprocs + size;
264   owner  = (int *) PetscMalloc((N+1)*sizeof(int));CHKPTRQ(owner); /* see note*/
265   for ( i=0; i<N; i++ ) {
266     idx = rows[i];
267     found = 0;
268     for ( j=0; j<size; j++ ) {
269       if (idx >= owners[j] && idx < owners[j+1]) {
270         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
271       }
272     }
273     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
274   }
275   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
276 
277   /* inform other processors of number of messages and max length*/
278   work   = (int *) PetscMalloc( size*sizeof(int) );CHKPTRQ(work);
279   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
280   nrecvs = work[rank];
281   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
282   nmax   = work[rank];
283   ierr = PetscFree(work);CHKERRQ(ierr);
284 
285   /* post receives:   */
286   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
287   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
288   for ( i=0; i<nrecvs; i++ ) {
289     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
290   }
291 
292   /* do sends:
293       1) starts[i] gives the starting index in svalues for stuff going to
294          the ith processor
295   */
296   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) );CHKPTRQ(svalues);
297   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
298   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) );CHKPTRQ(starts);
299   starts[0]  = 0;
300   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
301   for ( i=0; i<N; i++ ) {
302     svalues[starts[owner[i]]++] = rows[i];
303   }
304   ISRestoreIndices(is,&rows);
305 
306   starts[0] = 0;
307   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
308   count = 0;
309   for ( i=0; i<size; i++ ) {
310     if (procs[i]) {
311       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
312     }
313   }
314   ierr = PetscFree(starts);CHKERRQ(ierr);
315 
316   base = owners[rank];
317 
318   /*  wait on receives */
319   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) );CHKPTRQ(lens);
320   source = lens + nrecvs;
321   count  = nrecvs; slen = 0;
322   while (count) {
323     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
324     /* unpack receives into our local space */
325     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
326     source[imdex]  = recv_status.MPI_SOURCE;
327     lens[imdex]  = n;
328     slen += n;
329     count--;
330   }
331   ierr = PetscFree(recv_waits);CHKERRQ(ierr);
332 
333   /* move the data into the send scatter */
334   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) );CHKPTRQ(lrows);
335   count = 0;
336   for ( i=0; i<nrecvs; i++ ) {
337     values = rvalues + i*nmax;
338     for ( j=0; j<lens[i]; j++ ) {
339       lrows[count++] = values[j] - base;
340     }
341   }
342   ierr = PetscFree(rvalues);CHKERRQ(ierr);
343   ierr = PetscFree(lens);CHKERRQ(ierr);
344   ierr = PetscFree(owner);CHKERRQ(ierr);
345   ierr = PetscFree(nprocs);CHKERRQ(ierr);
346 
347   /* actually zap the local rows */
348   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
349   PLogObjectParent(A,istmp);
350   ierr = PetscFree(lrows);CHKERRQ(ierr);
351   ierr = MatZeroRows(l->A,istmp,diag);CHKERRQ(ierr);
352   ierr = ISDestroy(istmp);CHKERRQ(ierr);
353 
354   /* wait on sends */
355   if (nsends) {
356     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
357     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
358     ierr = PetscFree(send_status);CHKERRQ(ierr);
359   }
360   ierr = PetscFree(send_waits);CHKERRQ(ierr);
361   ierr = PetscFree(svalues);CHKERRQ(ierr);
362 
363   PetscFunctionReturn(0);
364 }
365 
366 #undef __FUNC__
367 #define __FUNC__ "MatMult_MPIDense"
368 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
369 {
370   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
371   int          ierr;
372 
373   PetscFunctionBegin;
374   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
375   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
376   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy);CHKERRQ(ierr);
377   PetscFunctionReturn(0);
378 }
379 
380 #undef __FUNC__
381 #define __FUNC__ "MatMultAdd_MPIDense"
382 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
383 {
384   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
385   int          ierr;
386 
387   PetscFunctionBegin;
388   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
389   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
390   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz);CHKERRQ(ierr);
391   PetscFunctionReturn(0);
392 }
393 
394 #undef __FUNC__
395 #define __FUNC__ "MatMultTrans_MPIDense"
396 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
397 {
398   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
399   int          ierr;
400   Scalar       zero = 0.0;
401 
402   PetscFunctionBegin;
403   ierr = VecSet(&zero,yy);CHKERRQ(ierr);
404   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
405   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
406   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
407   PetscFunctionReturn(0);
408 }
409 
410 #undef __FUNC__
411 #define __FUNC__ "MatMultTransAdd_MPIDense"
412 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
413 {
414   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
415   int          ierr;
416 
417   PetscFunctionBegin;
418   ierr = VecCopy(yy,zz);CHKERRQ(ierr);
419   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
420   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
421   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
422   PetscFunctionReturn(0);
423 }
424 
425 #undef __FUNC__
426 #define __FUNC__ "MatGetDiagonal_MPIDense"
427 int MatGetDiagonal_MPIDense(Mat A,Vec v)
428 {
429   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
430   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
431   int          ierr, len, i, n, m = a->m, radd;
432   Scalar       *x, zero = 0.0;
433 
434   PetscFunctionBegin;
435   VecSet(&zero,v);
436   ierr = VecGetArray(v,&x);CHKERRQ(ierr);
437   ierr = VecGetSize(v,&n);CHKERRQ(ierr);
438   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
439   len = PetscMin(aloc->m,aloc->n);
440   radd = a->rstart*m;
441   for ( i=0; i<len; i++ ) {
442     x[i] = aloc->v[radd + i*m + i];
443   }
444   ierr = VecRestoreArray(v,&x);CHKERRQ(ierr);
445   PetscFunctionReturn(0);
446 }
447 
448 #undef __FUNC__
449 #define __FUNC__ "MatDestroy_MPIDense"
450 int MatDestroy_MPIDense(Mat mat)
451 {
452   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
453   int          ierr;
454 
455   PetscFunctionBegin;
456 
457   if (mat->mapping) {
458     ierr = ISLocalToGlobalMappingDestroy(mat->mapping);CHKERRQ(ierr);
459   }
460   if (mat->bmapping) {
461     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping);CHKERRQ(ierr);
462   }
463 #if defined(PETSC_USE_LOG)
464   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mdn->M,mdn->N);
465 #endif
466   ierr = MatStashDestroy_Private(&mat->stash);CHKERRQ(ierr);
467   ierr = PetscFree(mdn->rowners);CHKERRQ(ierr);
468   ierr = MatDestroy(mdn->A);CHKERRQ(ierr);
469   if (mdn->lvec)   VecDestroy(mdn->lvec);
470   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
471   if (mdn->factor) {
472     if (mdn->factor->temp)   {ierr = PetscFree(mdn->factor->temp);CHKERRQ(ierr);}
473     if (mdn->factor->tag)    {ierr = PetscFree(mdn->factor->tag);CHKERRQ(ierr);}
474     if (mdn->factor->pivots) {ierr = PetscFree(mdn->factor->pivots);CHKERRQ(ierr);}
475     ierr = PetscFree(mdn->factor);CHKERRQ(ierr);
476   }
477   ierr = PetscFree(mdn);CHKERRQ(ierr);
478   if (mat->rmap) {
479     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
480   }
481   if (mat->cmap) {
482     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
483   }
484   PLogObjectDestroy(mat);
485   PetscHeaderDestroy(mat);
486   PetscFunctionReturn(0);
487 }
488 
489 #undef __FUNC__
490 #define __FUNC__ "MatView_MPIDense_Binary"
491 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
492 {
493   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
494   int          ierr;
495 
496   PetscFunctionBegin;
497   if (mdn->size == 1) {
498     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
499   }
500   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
501   PetscFunctionReturn(0);
502 }
503 
504 #undef __FUNC__
505 #define __FUNC__ "MatView_MPIDense_ASCII"
506 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
507 {
508   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
509   int          ierr, format, size = mdn->size, rank = mdn->rank;
510   FILE         *fd;
511   ViewerType   vtype;
512 
513   PetscFunctionBegin;
514   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
515   ierr = ViewerASCIIGetPointer(viewer,&fd);CHKERRQ(ierr);
516   ierr = ViewerGetFormat(viewer,&format);CHKERRQ(ierr);
517   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
518     MatInfo info;
519     ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
520     PetscSequentialPhaseBegin(mat->comm,1);
521       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
522          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
523       fflush(fd);
524     PetscSequentialPhaseEnd(mat->comm,1);
525     ierr = VecScatterView(mdn->Mvctx,viewer);CHKERRQ(ierr);
526     PetscFunctionReturn(0);
527   } else if (format == VIEWER_FORMAT_ASCII_INFO) {
528     PetscFunctionReturn(0);
529   }
530 
531   if (size == 1) {
532     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
533   } else {
534     /* assemble the entire matrix onto first processor. */
535     Mat          A;
536     int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
537     Scalar       *vals;
538     Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
539 
540     if (!rank) {
541       ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
542     } else {
543       ierr = MatCreateMPIDense(mat->comm,0,N,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
544     }
545     PLogObjectParent(mat,A);
546 
547     /* Copy the matrix ... This isn't the most efficient means,
548        but it's quick for now */
549     row = mdn->rstart; m = Amdn->m;
550     for ( i=0; i<m; i++ ) {
551       ierr = MatGetRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
552       ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
553       ierr = MatRestoreRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
554       row++;
555     }
556 
557     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
558     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
559     if (!rank) {
560       ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer);CHKERRQ(ierr);
561     }
562     ierr = MatDestroy(A);CHKERRQ(ierr);
563   }
564   PetscFunctionReturn(0);
565 }
566 
567 #undef __FUNC__
568 #define __FUNC__ "MatView_MPIDense"
569 int MatView_MPIDense(Mat mat,Viewer viewer)
570 {
571   int          ierr;
572   ViewerType   vtype;
573 
574   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
575   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
576     ierr = MatView_MPIDense_ASCII(mat,viewer);CHKERRQ(ierr);
577   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
578     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
579   } else {
580     SETERRQ(1,1,"Viewer type not supported by PETSc object");
581   }
582   PetscFunctionReturn(0);
583 }
584 
585 #undef __FUNC__
586 #define __FUNC__ "MatGetInfo_MPIDense"
587 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
588 {
589   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
590   Mat          mdn = mat->A;
591   int          ierr;
592   double       isend[5], irecv[5];
593 
594   PetscFunctionBegin;
595   info->rows_global    = (double)mat->M;
596   info->columns_global = (double)mat->N;
597   info->rows_local     = (double)mat->m;
598   info->columns_local  = (double)mat->N;
599   info->block_size     = 1.0;
600   ierr = MatGetInfo(mdn,MAT_LOCAL,info);CHKERRQ(ierr);
601   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
602   isend[3] = info->memory;  isend[4] = info->mallocs;
603   if (flag == MAT_LOCAL) {
604     info->nz_used      = isend[0];
605     info->nz_allocated = isend[1];
606     info->nz_unneeded  = isend[2];
607     info->memory       = isend[3];
608     info->mallocs      = isend[4];
609   } else if (flag == MAT_GLOBAL_MAX) {
610     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
611     info->nz_used      = irecv[0];
612     info->nz_allocated = irecv[1];
613     info->nz_unneeded  = irecv[2];
614     info->memory       = irecv[3];
615     info->mallocs      = irecv[4];
616   } else if (flag == MAT_GLOBAL_SUM) {
617     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
618     info->nz_used      = irecv[0];
619     info->nz_allocated = irecv[1];
620     info->nz_unneeded  = irecv[2];
621     info->memory       = irecv[3];
622     info->mallocs      = irecv[4];
623   }
624   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
625   info->fill_ratio_needed = 0;
626   info->factor_mallocs    = 0;
627   PetscFunctionReturn(0);
628 }
629 
630 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
631    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
632    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
633    extern int MatSolve_MPIDense(Mat,Vec,Vec);
634    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
635    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
636    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
637 
638 #undef __FUNC__
639 #define __FUNC__ "MatSetOption_MPIDense"
640 int MatSetOption_MPIDense(Mat A,MatOption op)
641 {
642   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
643 
644   PetscFunctionBegin;
645   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
646       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
647       op == MAT_NEW_NONZERO_LOCATION_ERR ||
648       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
649       op == MAT_COLUMNS_SORTED ||
650       op == MAT_COLUMNS_UNSORTED) {
651         MatSetOption(a->A,op);
652   } else if (op == MAT_ROW_ORIENTED) {
653         a->roworiented = 1;
654         MatSetOption(a->A,op);
655   } else if (op == MAT_ROWS_SORTED ||
656              op == MAT_ROWS_UNSORTED ||
657              op == MAT_SYMMETRIC ||
658              op == MAT_STRUCTURALLY_SYMMETRIC ||
659              op == MAT_YES_NEW_DIAGONALS ||
660              op == MAT_USE_HASH_TABLE) {
661     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
662   } else if (op == MAT_COLUMN_ORIENTED) {
663     a->roworiented = 0; MatSetOption(a->A,op);
664   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
665     a->donotstash = 1;
666   } else if (op == MAT_NO_NEW_DIAGONALS) {
667     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
668   } else {
669     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
670   }
671   PetscFunctionReturn(0);
672 }
673 
674 #undef __FUNC__
675 #define __FUNC__ "MatGetSize_MPIDense"
676 int MatGetSize_MPIDense(Mat A,int *m,int *n)
677 {
678   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
679 
680   PetscFunctionBegin;
681   *m = mat->M; *n = mat->N;
682   PetscFunctionReturn(0);
683 }
684 
685 #undef __FUNC__
686 #define __FUNC__ "MatGetLocalSize_MPIDense"
687 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
688 {
689   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
690 
691   PetscFunctionBegin;
692   *m = mat->m; *n = mat->N;
693   PetscFunctionReturn(0);
694 }
695 
696 #undef __FUNC__
697 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
698 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
699 {
700   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
701 
702   PetscFunctionBegin;
703   *m = mat->rstart; *n = mat->rend;
704   PetscFunctionReturn(0);
705 }
706 
707 #undef __FUNC__
708 #define __FUNC__ "MatGetRow_MPIDense"
709 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
710 {
711   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
712   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
713 
714   PetscFunctionBegin;
715   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
716   lrow = row - rstart;
717   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
718   PetscFunctionReturn(0);
719 }
720 
721 #undef __FUNC__
722 #define __FUNC__ "MatRestoreRow_MPIDense"
723 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
724 {
725   int ierr;
726 
727   PetscFunctionBegin;
728   if (idx) {ierr = PetscFree(*idx);CHKERRQ(ierr);}
729   if (v) {ierr = PetscFree(*v);CHKERRQ(ierr);}
730   PetscFunctionReturn(0);
731 }
732 
733 #undef __FUNC__
734 #define __FUNC__ "MatNorm_MPIDense"
735 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
736 {
737   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
738   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
739   int          ierr, i, j;
740   double       sum = 0.0;
741   Scalar       *v = mat->v;
742 
743   PetscFunctionBegin;
744   if (mdn->size == 1) {
745     ierr =  MatNorm(mdn->A,type,norm);CHKERRQ(ierr);
746   } else {
747     if (type == NORM_FROBENIUS) {
748       for (i=0; i<mat->n*mat->m; i++ ) {
749 #if defined(PETSC_USE_COMPLEX)
750         sum += PetscReal(PetscConj(*v)*(*v)); v++;
751 #else
752         sum += (*v)*(*v); v++;
753 #endif
754       }
755       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
756       *norm = sqrt(*norm);
757       PLogFlops(2*mat->n*mat->m);
758     } else if (type == NORM_1) {
759       double *tmp, *tmp2;
760       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) );CHKPTRQ(tmp);
761       tmp2 = tmp + mdn->N;
762       ierr = PetscMemzero(tmp,2*mdn->N*sizeof(double));CHKERRQ(ierr);
763       *norm = 0.0;
764       v = mat->v;
765       for ( j=0; j<mat->n; j++ ) {
766         for ( i=0; i<mat->m; i++ ) {
767           tmp[j] += PetscAbsScalar(*v);  v++;
768         }
769       }
770       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
771       for ( j=0; j<mdn->N; j++ ) {
772         if (tmp2[j] > *norm) *norm = tmp2[j];
773       }
774       ierr = PetscFree(tmp);CHKERRQ(ierr);
775       PLogFlops(mat->n*mat->m);
776     } else if (type == NORM_INFINITY) { /* max row norm */
777       double ntemp;
778       ierr = MatNorm(mdn->A,type,&ntemp);CHKERRQ(ierr);
779       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
780     } else {
781       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
782     }
783   }
784   PetscFunctionReturn(0);
785 }
786 
787 #undef __FUNC__
788 #define __FUNC__ "MatTranspose_MPIDense"
789 int MatTranspose_MPIDense(Mat A,Mat *matout)
790 {
791   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
792   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
793   Mat          B;
794   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
795   int          j, i, ierr;
796   Scalar       *v;
797 
798   PetscFunctionBegin;
799   if (matout == PETSC_NULL && M != N) {
800     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
801   }
802   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
803 
804   m = Aloc->m; n = Aloc->n; v = Aloc->v;
805   rwork = (int *) PetscMalloc(n*sizeof(int));CHKPTRQ(rwork);
806   for ( j=0; j<n; j++ ) {
807     for (i=0; i<m; i++) rwork[i] = rstart + i;
808     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES);CHKERRQ(ierr);
809     v   += m;
810   }
811   ierr = PetscFree(rwork);CHKERRQ(ierr);
812   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
813   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
814   if (matout != PETSC_NULL) {
815     *matout = B;
816   } else {
817     PetscOps *Abops;
818     MatOps   Aops;
819 
820     /* This isn't really an in-place transpose, but free data struct from a */
821     ierr = PetscFree(a->rowners);CHKERRQ(ierr);
822     ierr = MatDestroy(a->A);CHKERRQ(ierr);
823     if (a->lvec) VecDestroy(a->lvec);
824     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
825     ierr = PetscFree(a);CHKERRQ(ierr);
826 
827     /*
828          This is horrible, horrible code. We need to keep the
829       A pointers for the bops and ops but copy everything
830       else from C.
831     */
832     Abops   = A->bops;
833     Aops    = A->ops;
834     ierr    = PetscMemcpy(A,B,sizeof(struct _p_Mat));CHKERRQ(ierr);
835     A->bops = Abops;
836     A->ops  = Aops;
837 
838     PetscHeaderDestroy(B);
839   }
840   PetscFunctionReturn(0);
841 }
842 
843 #include "pinclude/blaslapack.h"
844 #undef __FUNC__
845 #define __FUNC__ "MatScale_MPIDense"
846 int MatScale_MPIDense(Scalar *alpha,Mat inA)
847 {
848   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
849   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
850   int          one = 1, nz;
851 
852   PetscFunctionBegin;
853   nz = a->m*a->n;
854   BLscal_( &nz, alpha, a->v, &one );
855   PLogFlops(nz);
856   PetscFunctionReturn(0);
857 }
858 
859 static int MatDuplicate_MPIDense(Mat,MatDuplicateOption,Mat *);
860 extern int MatGetSubMatrices_MPIDense(Mat,int,IS *,IS *,MatReuse,Mat **);
861 
862 /* -------------------------------------------------------------------*/
863 static struct _MatOps MatOps_Values = {MatSetValues_MPIDense,
864        MatGetRow_MPIDense,
865        MatRestoreRow_MPIDense,
866        MatMult_MPIDense,
867        MatMultAdd_MPIDense,
868        MatMultTrans_MPIDense,
869        MatMultTransAdd_MPIDense,
870        0,
871        0,
872        0,
873        0,
874        0,
875        0,
876        0,
877        MatTranspose_MPIDense,
878        MatGetInfo_MPIDense,0,
879        MatGetDiagonal_MPIDense,
880        0,
881        MatNorm_MPIDense,
882        MatAssemblyBegin_MPIDense,
883        MatAssemblyEnd_MPIDense,
884        0,
885        MatSetOption_MPIDense,
886        MatZeroEntries_MPIDense,
887        MatZeroRows_MPIDense,
888        0,
889        0,
890        0,
891        0,
892        MatGetSize_MPIDense,
893        MatGetLocalSize_MPIDense,
894        MatGetOwnershipRange_MPIDense,
895        0,
896        0,
897        MatGetArray_MPIDense,
898        MatRestoreArray_MPIDense,
899        MatDuplicate_MPIDense,
900        0,
901        0,
902        0,
903        0,
904        0,
905        MatGetSubMatrices_MPIDense,
906        0,
907        MatGetValues_MPIDense,
908        0,
909        0,
910        MatScale_MPIDense,
911        0,
912        0,
913        0,
914        MatGetBlockSize_MPIDense,
915        0,
916        0,
917        0,
918        0,
919        0,
920        0,
921        0,
922        0,
923        0,
924        MatGetSubMatrix_MPIDense,
925        0,
926        0,
927        MatGetMaps_Petsc};
928 
929 #undef __FUNC__
930 #define __FUNC__ "MatCreateMPIDense"
931 /*@C
932    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
933 
934    Collective on MPI_Comm
935 
936    Input Parameters:
937 +  comm - MPI communicator
938 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
939 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
940 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
941 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
942 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
943    to control all matrix memory allocation.
944 
945    Output Parameter:
946 .  A - the matrix
947 
948    Notes:
949    The dense format is fully compatible with standard Fortran 77
950    storage by columns.
951 
952    The data input variable is intended primarily for Fortran programmers
953    who wish to allocate their own matrix memory space.  Most users should
954    set data=PETSC_NULL.
955 
956    The user MUST specify either the local or global matrix dimensions
957    (possibly both).
958 
959    Currently, the only parallel dense matrix decomposition is by rows,
960    so that n=N and each submatrix owns all of the global columns.
961 
962    Level: intermediate
963 
964 .keywords: matrix, dense, parallel
965 
966 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
967 @*/
968 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
969 {
970   Mat          mat;
971   Mat_MPIDense *a;
972   int          ierr, i,flg;
973 
974   PetscFunctionBegin;
975   /* Note:  For now, when data is specified above, this assumes the user correctly
976    allocates the local dense storage space.  We should add error checking. */
977 
978   *A = 0;
979   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",comm,MatDestroy,MatView);
980   PLogObjectCreate(mat);
981   mat->data         = (void *) (a = PetscNew(Mat_MPIDense));CHKPTRQ(a);
982   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
983   mat->ops->destroy = MatDestroy_MPIDense;
984   mat->ops->view    = MatView_MPIDense;
985   mat->factor       = 0;
986   mat->mapping      = 0;
987 
988   a->factor       = 0;
989   mat->insertmode = NOT_SET_VALUES;
990   ierr = MPI_Comm_rank(comm,&a->rank);CHKERRQ(ierr);
991   ierr = MPI_Comm_size(comm,&a->size);CHKERRQ(ierr);
992 
993   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
994 
995   /*
996      The computation of n is wrong below, n should represent the number of local
997      rows in the right (column vector)
998   */
999 
1000   /* each row stores all columns */
1001   if (N == PETSC_DECIDE) N = n;
1002   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
1003   /*  if (n != N) SETERRQ(PETSC_ERR_SUP,0,"For now, only n=N is supported"); */
1004   a->N = mat->N = N;
1005   a->M = mat->M = M;
1006   a->m = mat->m = m;
1007   a->n = mat->n = n;
1008 
1009   /* the information in the maps duplicates the information computed below, eventually
1010      we should remove the duplicate information that is not contained in the maps */
1011   ierr = MapCreateMPI(comm,m,M,&mat->rmap);CHKERRQ(ierr);
1012   ierr = MapCreateMPI(comm,PETSC_DECIDE,N,&mat->cmap);CHKERRQ(ierr);
1013 
1014   /* build local table of row and column ownerships */
1015   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int));CHKPTRQ(a->rowners);
1016   a->cowners = a->rowners + a->size + 1;
1017   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1018   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1019   a->rowners[0] = 0;
1020   for ( i=2; i<=a->size; i++ ) {
1021     a->rowners[i] += a->rowners[i-1];
1022   }
1023   a->rstart = a->rowners[a->rank];
1024   a->rend   = a->rowners[a->rank+1];
1025   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1026   a->cowners[0] = 0;
1027   for ( i=2; i<=a->size; i++ ) {
1028     a->cowners[i] += a->cowners[i-1];
1029   }
1030 
1031   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A);CHKERRQ(ierr);
1032   PLogObjectParent(mat,a->A);
1033 
1034   /* build cache for off array entries formed */
1035   a->donotstash = 0;
1036   ierr = MatStashCreate_Private(comm,1,&mat->stash);CHKERRQ(ierr);
1037 
1038   /* stuff used for matrix vector multiply */
1039   a->lvec        = 0;
1040   a->Mvctx       = 0;
1041   a->roworiented = 1;
1042 
1043   *A = mat;
1044   ierr = OptionsHasName(PETSC_NULL,"-help",&flg);CHKERRQ(ierr);
1045   if (flg) {
1046     ierr = MatPrintHelp(mat);CHKERRQ(ierr);
1047   }
1048   PetscFunctionReturn(0);
1049 }
1050 
1051 #undef __FUNC__
1052 #define __FUNC__ "MatDuplicate_MPIDense"
1053 static int MatDuplicate_MPIDense(Mat A,MatDuplicateOption cpvalues,Mat *newmat)
1054 {
1055   Mat          mat;
1056   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
1057   int          ierr;
1058   FactorCtx    *factor;
1059 
1060   PetscFunctionBegin;
1061   *newmat       = 0;
1062   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",A->comm,MatDestroy,MatView);
1063   PLogObjectCreate(mat);
1064   mat->data         = (void *) (a = PetscNew(Mat_MPIDense));CHKPTRQ(a);
1065   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
1066   mat->ops->destroy = MatDestroy_MPIDense;
1067   mat->ops->view    = MatView_MPIDense;
1068   mat->factor       = A->factor;
1069   mat->assembled    = PETSC_TRUE;
1070 
1071   a->m = mat->m = oldmat->m;
1072   a->n = mat->n = oldmat->n;
1073   a->M = mat->M = oldmat->M;
1074   a->N = mat->N = oldmat->N;
1075   if (oldmat->factor) {
1076     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx));CHKPTRQ(factor);
1077     /* copy factor contents ... add this code! */
1078   } else a->factor = 0;
1079 
1080   a->rstart       = oldmat->rstart;
1081   a->rend         = oldmat->rend;
1082   a->size         = oldmat->size;
1083   a->rank         = oldmat->rank;
1084   mat->insertmode = NOT_SET_VALUES;
1085   a->donotstash   = oldmat->donotstash;
1086   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int));CHKPTRQ(a->rowners);
1087   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1088   ierr = PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));CHKERRQ(ierr);
1089   ierr = MatStashCreate_Private(A->comm,1,&mat->stash);CHKERRQ(ierr);
1090 
1091   ierr =  VecDuplicate(oldmat->lvec,&a->lvec);CHKERRQ(ierr);
1092   PLogObjectParent(mat,a->lvec);
1093   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx);CHKERRQ(ierr);
1094   PLogObjectParent(mat,a->Mvctx);
1095   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A);CHKERRQ(ierr);
1096   PLogObjectParent(mat,a->A);
1097   *newmat = mat;
1098   PetscFunctionReturn(0);
1099 }
1100 
1101 #include "sys.h"
1102 
1103 #undef __FUNC__
1104 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1105 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1106 {
1107   int        *rowners, i,size,rank,m,ierr,nz,j;
1108   Scalar     *array,*vals,*vals_ptr;
1109   MPI_Status status;
1110 
1111   PetscFunctionBegin;
1112   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1113   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1114 
1115   /* determine ownership of all rows */
1116   m          = M/size + ((M % size) > rank);
1117   rowners    = (int *) PetscMalloc((size+2)*sizeof(int));CHKPTRQ(rowners);
1118   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1119   rowners[0] = 0;
1120   for ( i=2; i<=size; i++ ) {
1121     rowners[i] += rowners[i-1];
1122   }
1123 
1124   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1125   ierr = MatGetArray(*newmat,&array);CHKERRQ(ierr);
1126 
1127   if (!rank) {
1128     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) );CHKPTRQ(vals);
1129 
1130     /* read in my part of the matrix numerical values  */
1131     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR);CHKERRQ(ierr);
1132 
1133     /* insert into matrix-by row (this is why cannot directly read into array */
1134     vals_ptr = vals;
1135     for ( i=0; i<m; i++ ) {
1136       for ( j=0; j<N; j++ ) {
1137         array[i + j*m] = *vals_ptr++;
1138       }
1139     }
1140 
1141     /* read in other processors and ship out */
1142     for ( i=1; i<size; i++ ) {
1143       nz   = (rowners[i+1] - rowners[i])*N;
1144       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1145       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1146     }
1147   } else {
1148     /* receive numeric values */
1149     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) );CHKPTRQ(vals);
1150 
1151     /* receive message of values*/
1152     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1153 
1154     /* insert into matrix-by row (this is why cannot directly read into array */
1155     vals_ptr = vals;
1156     for ( i=0; i<m; i++ ) {
1157       for ( j=0; j<N; j++ ) {
1158         array[i + j*m] = *vals_ptr++;
1159       }
1160     }
1161   }
1162   ierr = PetscFree(rowners);CHKERRQ(ierr);
1163   ierr = PetscFree(vals);CHKERRQ(ierr);
1164   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1165   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1166   PetscFunctionReturn(0);
1167 }
1168 
1169 
1170 #undef __FUNC__
1171 #define __FUNC__ "MatLoad_MPIDense"
1172 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1173 {
1174   Mat          A;
1175   Scalar       *vals,*svals;
1176   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1177   MPI_Status   status;
1178   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1179   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1180   int          tag = ((PetscObject)viewer)->tag;
1181   int          i, nz, ierr, j,rstart, rend, fd;
1182 
1183   PetscFunctionBegin;
1184   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1185   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1186   if (!rank) {
1187     ierr = ViewerBinaryGetDescriptor(viewer,&fd);CHKERRQ(ierr);
1188     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT);CHKERRQ(ierr);
1189     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1190   }
1191 
1192   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1193   M = header[1]; N = header[2]; nz = header[3];
1194 
1195   /*
1196        Handle case where matrix is stored on disk as a dense matrix
1197   */
1198   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1199     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1200     PetscFunctionReturn(0);
1201   }
1202 
1203   /* determine ownership of all rows */
1204   m          = M/size + ((M % size) > rank);
1205   rowners    = (int *) PetscMalloc((size+2)*sizeof(int));CHKPTRQ(rowners);
1206   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1207   rowners[0] = 0;
1208   for ( i=2; i<=size; i++ ) {
1209     rowners[i] += rowners[i-1];
1210   }
1211   rstart = rowners[rank];
1212   rend   = rowners[rank+1];
1213 
1214   /* distribute row lengths to all processors */
1215   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) );CHKPTRQ(ourlens);
1216   offlens = ourlens + (rend-rstart);
1217   if (!rank) {
1218     rowlengths = (int*) PetscMalloc( M*sizeof(int) );CHKPTRQ(rowlengths);
1219     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT);CHKERRQ(ierr);
1220     sndcounts = (int*) PetscMalloc( size*sizeof(int) );CHKPTRQ(sndcounts);
1221     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1222     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1223     ierr = PetscFree(sndcounts);CHKERRQ(ierr);
1224   } else {
1225     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1226   }
1227 
1228   if (!rank) {
1229     /* calculate the number of nonzeros on each processor */
1230     procsnz = (int*) PetscMalloc( size*sizeof(int) );CHKPTRQ(procsnz);
1231     ierr    = PetscMemzero(procsnz,size*sizeof(int));CHKERRQ(ierr);
1232     for ( i=0; i<size; i++ ) {
1233       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1234         procsnz[i] += rowlengths[j];
1235       }
1236     }
1237     ierr = PetscFree(rowlengths);CHKERRQ(ierr);
1238 
1239     /* determine max buffer needed and allocate it */
1240     maxnz = 0;
1241     for ( i=0; i<size; i++ ) {
1242       maxnz = PetscMax(maxnz,procsnz[i]);
1243     }
1244     cols = (int *) PetscMalloc( maxnz*sizeof(int) );CHKPTRQ(cols);
1245 
1246     /* read in my part of the matrix column indices  */
1247     nz = procsnz[0];
1248     mycols = (int *) PetscMalloc( nz*sizeof(int) );CHKPTRQ(mycols);
1249     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT);CHKERRQ(ierr);
1250 
1251     /* read in every one elses and ship off */
1252     for ( i=1; i<size; i++ ) {
1253       nz   = procsnz[i];
1254       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT);CHKERRQ(ierr);
1255       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1256     }
1257     ierr = PetscFree(cols);CHKERRQ(ierr);
1258   } else {
1259     /* determine buffer space needed for message */
1260     nz = 0;
1261     for ( i=0; i<m; i++ ) {
1262       nz += ourlens[i];
1263     }
1264     mycols = (int*) PetscMalloc( nz*sizeof(int) );CHKPTRQ(mycols);
1265 
1266     /* receive message of column indices*/
1267     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1268     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1269     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1270   }
1271 
1272   /* loop over local rows, determining number of off diagonal entries */
1273   ierr = PetscMemzero(offlens,m*sizeof(int));CHKERRQ(ierr);
1274   jj = 0;
1275   for ( i=0; i<m; i++ ) {
1276     for ( j=0; j<ourlens[i]; j++ ) {
1277       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1278       jj++;
1279     }
1280   }
1281 
1282   /* create our matrix */
1283   for ( i=0; i<m; i++ ) {
1284     ourlens[i] -= offlens[i];
1285   }
1286   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1287   A = *newmat;
1288   for ( i=0; i<m; i++ ) {
1289     ourlens[i] += offlens[i];
1290   }
1291 
1292   if (!rank) {
1293     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) );CHKPTRQ(vals);
1294 
1295     /* read in my part of the matrix numerical values  */
1296     nz = procsnz[0];
1297     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1298 
1299     /* insert into matrix */
1300     jj      = rstart;
1301     smycols = mycols;
1302     svals   = vals;
1303     for ( i=0; i<m; i++ ) {
1304       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1305       smycols += ourlens[i];
1306       svals   += ourlens[i];
1307       jj++;
1308     }
1309 
1310     /* read in other processors and ship out */
1311     for ( i=1; i<size; i++ ) {
1312       nz   = procsnz[i];
1313       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1314       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1315     }
1316     ierr = PetscFree(procsnz);CHKERRQ(ierr);
1317   } else {
1318     /* receive numeric values */
1319     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) );CHKPTRQ(vals);
1320 
1321     /* receive message of values*/
1322     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1323     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1324     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1325 
1326     /* insert into matrix */
1327     jj      = rstart;
1328     smycols = mycols;
1329     svals   = vals;
1330     for ( i=0; i<m; i++ ) {
1331       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1332       smycols += ourlens[i];
1333       svals   += ourlens[i];
1334       jj++;
1335     }
1336   }
1337   ierr = PetscFree(ourlens);CHKERRQ(ierr);
1338   ierr = PetscFree(vals);CHKERRQ(ierr);
1339   ierr = PetscFree(mycols);CHKERRQ(ierr);
1340   ierr = PetscFree(rowners);CHKERRQ(ierr);
1341 
1342   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1343   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1344   PetscFunctionReturn(0);
1345 }
1346 
1347 
1348 
1349 
1350 
1351