xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 72d926a56187b9fb6d286e1ca6b5d0854043ff45)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.121 1999/08/02 21:57:54 curfman Exp curfman $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 extern int MatSetUpMultiply_MPIDense(Mat);
13 
14 #undef __FUNC__
15 #define __FUNC__ "MatSetValues_MPIDense"
16 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
17 {
18   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
19   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
20   int          roworiented = A->roworiented;
21 
22   PetscFunctionBegin;
23   for ( i=0; i<m; i++ ) {
24     if (idxm[i] < 0) continue;
25     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
26     if (idxm[i] >= rstart && idxm[i] < rend) {
27       row = idxm[i] - rstart;
28       if (roworiented) {
29         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv);CHKERRQ(ierr);
30       } else {
31         for ( j=0; j<n; j++ ) {
32           if (idxn[j] < 0) continue;
33           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
34           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
35         }
36       }
37     } else {
38       if ( !A->donotstash) {
39         if (roworiented) {
40           ierr = MatStashValuesRow_Private(&mat->stash,idxm[i],n,idxn,v+i*n);CHKERRQ(ierr);
41         } else {
42           ierr = MatStashValuesCol_Private(&mat->stash,idxm[i],n,idxn,v+i,m);CHKERRQ(ierr);
43         }
44       }
45     }
46   }
47   PetscFunctionReturn(0);
48 }
49 
50 #undef __FUNC__
51 #define __FUNC__ "MatGetValues_MPIDense"
52 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
53 {
54   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
55   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
56 
57   PetscFunctionBegin;
58   for ( i=0; i<m; i++ ) {
59     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
60     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
61     if (idxm[i] >= rstart && idxm[i] < rend) {
62       row = idxm[i] - rstart;
63       for ( j=0; j<n; j++ ) {
64         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
65         if (idxn[j] >= mdn->N) {
66           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
67         }
68         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j);CHKERRQ(ierr);
69       }
70     } else {
71       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
72     }
73   }
74   PetscFunctionReturn(0);
75 }
76 
77 #undef __FUNC__
78 #define __FUNC__ "MatGetArray_MPIDense"
79 int MatGetArray_MPIDense(Mat A,Scalar **array)
80 {
81   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
82   int          ierr;
83 
84   PetscFunctionBegin;
85   ierr = MatGetArray(a->A,array);CHKERRQ(ierr);
86   PetscFunctionReturn(0);
87 }
88 
89 #undef __FUNC__
90 #define __FUNC__ "MatGetSubMatrix_MPIDense"
91 static int MatGetSubMatrix_MPIDense(Mat A,IS isrow,IS iscol,int cs,MatReuse scall,Mat *B)
92 {
93   Mat_MPIDense *mat = (Mat_MPIDense *) A->data, *newmatd;
94   Mat_SeqDense *lmat = (Mat_SeqDense *) mat->A->data;
95   int          i, j, ierr, *irow, *icol, rstart, rend, nrows, ncols, nlrows, nlcols, rank;
96   Scalar       *av, *bv, *v = lmat->v;
97   Mat          newmat;
98 
99   PetscFunctionBegin;
100   ierr = MPI_Comm_rank(A->comm,&rank);CHKERRQ(ierr);
101   ierr = ISGetIndices(isrow,&irow);CHKERRQ(ierr);
102   ierr = ISGetIndices(iscol,&icol);CHKERRQ(ierr);
103   ierr = ISGetSize(isrow,&nrows);CHKERRQ(ierr);
104   ierr = ISGetSize(iscol,&ncols);CHKERRQ(ierr);
105 
106   /* No parallel redistribution currently supported! Should really check each index set
107      to comfirm that it is OK.  ... Currently supports only submatrix same partitioning as
108      original matrix! */
109 
110   ierr = MatGetLocalSize(A,&nlrows,&nlcols);CHKERRQ(ierr);
111   ierr = MatGetOwnershipRange(A,&rstart,&rend);CHKERRQ(ierr);
112 
113   /* Check submatrix call */
114   if (scall == MAT_REUSE_MATRIX) {
115     /* SETERRQ(PETSC_ERR_ARG_SIZ,0,"Reused submatrix wrong size"); */
116     /* Really need to test rows and column sizes! */
117     newmat = *B;
118   } else {
119     /* Create and fill new matrix */
120     ierr = MatCreateMPIDense(A->comm,nrows,ncols,PETSC_DECIDE,PETSC_DECIDE,PETSC_NULL,&newmat);CHKERRQ(ierr);
121   }
122 
123   /* Now extract the data pointers and do the copy, column at a time */
124   newmatd = (Mat_MPIDense *) newmat->data;
125   bv = ((Mat_SeqDense *)newmatd->A->data)->v;
126 
127   for ( i=0; i<ncols; i++ ) {
128     av = v + nlrows*icol[i];
129     for (j=0; j<nrows; j++ ) {
130       *bv++ = av[irow[j] - rstart];
131     }
132   }
133 
134   /* Assemble the matrices so that the correct flags are set */
135   ierr = MatAssemblyBegin(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
136   ierr = MatAssemblyEnd(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
137 
138   /* Free work space */
139   ierr = ISRestoreIndices(isrow,&irow);CHKERRQ(ierr);
140   ierr = ISRestoreIndices(iscol,&icol);CHKERRQ(ierr);
141   *B = newmat;
142   PetscFunctionReturn(0);
143 }
144 
145 #undef __FUNC__
146 #define __FUNC__ "MatRestoreArray_MPIDense"
147 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
148 {
149   PetscFunctionBegin;
150   PetscFunctionReturn(0);
151 }
152 
153 #undef __FUNC__
154 #define __FUNC__ "MatAssemblyBegin_MPIDense"
155 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
156 {
157   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
158   MPI_Comm     comm = mat->comm;
159   int          ierr,nstash,reallocs;
160   InsertMode   addv;
161 
162   PetscFunctionBegin;
163   /* make sure all processors are either in INSERTMODE or ADDMODE */
164   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
165   if (addv == (ADD_VALUES|INSERT_VALUES)) {
166     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
167   }
168   mat->insertmode = addv; /* in case this processor had no cache */
169 
170   ierr = MatStashScatterBegin_Private(&mat->stash,mdn->rowners);CHKERRQ(ierr);
171   ierr = MatStashGetInfo_Private(&mat->stash,&nstash,&reallocs);CHKERRQ(ierr);
172   PLogInfo(mdn->A,"MatAssemblyBegin_MPIDense:Stash has %d entries, uses %d mallocs.\n",
173            nstash,reallocs);
174   PetscFunctionReturn(0);
175 }
176 
177 #undef __FUNC__
178 #define __FUNC__ "MatAssemblyEnd_MPIDense"
179 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
180 {
181   Mat_MPIDense *mdn=(Mat_MPIDense*)mat->data;
182   int          i,n,ierr,*row,*col,flg,j,rstart,ncols;
183   Scalar       *val;
184   InsertMode   addv=mat->insertmode;
185 
186   PetscFunctionBegin;
187   /*  wait on receives */
188   while (1) {
189     ierr = MatStashScatterGetMesg_Private(&mat->stash,&n,&row,&col,&val,&flg);CHKERRQ(ierr);
190     if (!flg) break;
191 
192     for ( i=0; i<n; ) {
193       /* Now identify the consecutive vals belonging to the same row */
194       for ( j=i,rstart=row[j]; j<n; j++ ) { if (row[j] != rstart) break; }
195       if (j < n) ncols = j-i;
196       else       ncols = n-i;
197       /* Now assemble all these values with a single function call */
198       ierr = MatSetValues_MPIDense(mat,1,row+i,ncols,col+i,val+i,addv);CHKERRQ(ierr);
199       i = j;
200     }
201   }
202   ierr = MatStashScatterEnd_Private(&mat->stash);CHKERRQ(ierr);
203 
204   ierr = MatAssemblyBegin(mdn->A,mode);CHKERRQ(ierr);
205   ierr = MatAssemblyEnd(mdn->A,mode);CHKERRQ(ierr);
206 
207   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
208     ierr = MatSetUpMultiply_MPIDense(mat);CHKERRQ(ierr);
209   }
210   PetscFunctionReturn(0);
211 }
212 
213 #undef __FUNC__
214 #define __FUNC__ "MatZeroEntries_MPIDense"
215 int MatZeroEntries_MPIDense(Mat A)
216 {
217   int          ierr;
218   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
219 
220   PetscFunctionBegin;
221   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
222   PetscFunctionReturn(0);
223 }
224 
225 #undef __FUNC__
226 #define __FUNC__ "MatGetBlockSize_MPIDense"
227 int MatGetBlockSize_MPIDense(Mat A,int *bs)
228 {
229   PetscFunctionBegin;
230   *bs = 1;
231   PetscFunctionReturn(0);
232 }
233 
234 /* the code does not do the diagonal entries correctly unless the
235    matrix is square and the column and row owerships are identical.
236    This is a BUG. The only way to fix it seems to be to access
237    mdn->A and mdn->B directly and not through the MatZeroRows()
238    routine.
239 */
240 #undef __FUNC__
241 #define __FUNC__ "MatZeroRows_MPIDense"
242 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
243 {
244   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
245   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
246   int            *procs,*nprocs,j,found,idx,nsends,*work;
247   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
248   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
249   int            *lens,imdex,*lrows,*values;
250   MPI_Comm       comm = A->comm;
251   MPI_Request    *send_waits,*recv_waits;
252   MPI_Status     recv_status,*send_status;
253   IS             istmp;
254 
255   PetscFunctionBegin;
256   ierr = ISGetSize(is,&N);CHKERRQ(ierr);
257   ierr = ISGetIndices(is,&rows);CHKERRQ(ierr);
258 
259   /*  first count number of contributors to each processor */
260   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) );CHKPTRQ(nprocs);
261   ierr   = PetscMemzero(nprocs,2*size*sizeof(int));CHKERRQ(ierr);
262   procs  = nprocs + size;
263   owner  = (int *) PetscMalloc((N+1)*sizeof(int));CHKPTRQ(owner); /* see note*/
264   for ( i=0; i<N; i++ ) {
265     idx = rows[i];
266     found = 0;
267     for ( j=0; j<size; j++ ) {
268       if (idx >= owners[j] && idx < owners[j+1]) {
269         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
270       }
271     }
272     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
273   }
274   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
275 
276   /* inform other processors of number of messages and max length*/
277   work   = (int *) PetscMalloc( size*sizeof(int) );CHKPTRQ(work);
278   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
279   nrecvs = work[rank];
280   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
281   nmax   = work[rank];
282   ierr = PetscFree(work);CHKERRQ(ierr);
283 
284   /* post receives:   */
285   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
286   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
287   for ( i=0; i<nrecvs; i++ ) {
288     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
289   }
290 
291   /* do sends:
292       1) starts[i] gives the starting index in svalues for stuff going to
293          the ith processor
294   */
295   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) );CHKPTRQ(svalues);
296   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
297   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) );CHKPTRQ(starts);
298   starts[0]  = 0;
299   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
300   for ( i=0; i<N; i++ ) {
301     svalues[starts[owner[i]]++] = rows[i];
302   }
303   ISRestoreIndices(is,&rows);
304 
305   starts[0] = 0;
306   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
307   count = 0;
308   for ( i=0; i<size; i++ ) {
309     if (procs[i]) {
310       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
311     }
312   }
313   ierr = PetscFree(starts);CHKERRQ(ierr);
314 
315   base = owners[rank];
316 
317   /*  wait on receives */
318   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) );CHKPTRQ(lens);
319   source = lens + nrecvs;
320   count  = nrecvs; slen = 0;
321   while (count) {
322     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
323     /* unpack receives into our local space */
324     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
325     source[imdex]  = recv_status.MPI_SOURCE;
326     lens[imdex]  = n;
327     slen += n;
328     count--;
329   }
330   ierr = PetscFree(recv_waits);CHKERRQ(ierr);
331 
332   /* move the data into the send scatter */
333   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) );CHKPTRQ(lrows);
334   count = 0;
335   for ( i=0; i<nrecvs; i++ ) {
336     values = rvalues + i*nmax;
337     for ( j=0; j<lens[i]; j++ ) {
338       lrows[count++] = values[j] - base;
339     }
340   }
341   ierr = PetscFree(rvalues);CHKERRQ(ierr);
342   ierr = PetscFree(lens);CHKERRQ(ierr);
343   ierr = PetscFree(owner);CHKERRQ(ierr);
344   ierr = PetscFree(nprocs);CHKERRQ(ierr);
345 
346   /* actually zap the local rows */
347   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
348   PLogObjectParent(A,istmp);
349   ierr = PetscFree(lrows);CHKERRQ(ierr);
350   ierr = MatZeroRows(l->A,istmp,diag);CHKERRQ(ierr);
351   ierr = ISDestroy(istmp);CHKERRQ(ierr);
352 
353   /* wait on sends */
354   if (nsends) {
355     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
356     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
357     ierr = PetscFree(send_status);CHKERRQ(ierr);
358   }
359   ierr = PetscFree(send_waits);CHKERRQ(ierr);
360   ierr = PetscFree(svalues);CHKERRQ(ierr);
361 
362   PetscFunctionReturn(0);
363 }
364 
365 #undef __FUNC__
366 #define __FUNC__ "MatMult_MPIDense"
367 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
368 {
369   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
370   int          ierr;
371 
372   PetscFunctionBegin;
373   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
374   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
375   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy);CHKERRQ(ierr);
376   PetscFunctionReturn(0);
377 }
378 
379 #undef __FUNC__
380 #define __FUNC__ "MatMultAdd_MPIDense"
381 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
382 {
383   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
384   int          ierr;
385 
386   PetscFunctionBegin;
387   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
388   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
389   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz);CHKERRQ(ierr);
390   PetscFunctionReturn(0);
391 }
392 
393 #undef __FUNC__
394 #define __FUNC__ "MatMultTrans_MPIDense"
395 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
396 {
397   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
398   int          ierr;
399   Scalar       zero = 0.0;
400 
401   PetscFunctionBegin;
402   ierr = VecSet(&zero,yy);CHKERRQ(ierr);
403   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
404   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
405   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
406   PetscFunctionReturn(0);
407 }
408 
409 #undef __FUNC__
410 #define __FUNC__ "MatMultTransAdd_MPIDense"
411 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
412 {
413   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
414   int          ierr;
415 
416   PetscFunctionBegin;
417   ierr = VecCopy(yy,zz);CHKERRQ(ierr);
418   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
419   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
420   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
421   PetscFunctionReturn(0);
422 }
423 
424 #undef __FUNC__
425 #define __FUNC__ "MatGetDiagonal_MPIDense"
426 int MatGetDiagonal_MPIDense(Mat A,Vec v)
427 {
428   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
429   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
430   int          ierr, len, i, n, m = a->m, radd;
431   Scalar       *x, zero = 0.0;
432 
433   PetscFunctionBegin;
434   VecSet(&zero,v);
435   ierr = VecGetArray(v,&x);CHKERRQ(ierr);
436   ierr = VecGetSize(v,&n);CHKERRQ(ierr);
437   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
438   len = PetscMin(aloc->m,aloc->n);
439   radd = a->rstart*m;
440   for ( i=0; i<len; i++ ) {
441     x[i] = aloc->v[radd + i*m + i];
442   }
443   ierr = VecRestoreArray(v,&x);CHKERRQ(ierr);
444   PetscFunctionReturn(0);
445 }
446 
447 #undef __FUNC__
448 #define __FUNC__ "MatDestroy_MPIDense"
449 int MatDestroy_MPIDense(Mat mat)
450 {
451   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
452   int          ierr;
453 
454   PetscFunctionBegin;
455 
456   if (mat->mapping) {
457     ierr = ISLocalToGlobalMappingDestroy(mat->mapping);CHKERRQ(ierr);
458   }
459   if (mat->bmapping) {
460     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping);CHKERRQ(ierr);
461   }
462 #if defined(PETSC_USE_LOG)
463   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mdn->M,mdn->N);
464 #endif
465   ierr = MatStashDestroy_Private(&mat->stash);CHKERRQ(ierr);
466   ierr = PetscFree(mdn->rowners);CHKERRQ(ierr);
467   ierr = MatDestroy(mdn->A);CHKERRQ(ierr);
468   if (mdn->lvec)   VecDestroy(mdn->lvec);
469   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
470   if (mdn->factor) {
471     if (mdn->factor->temp)   {ierr = PetscFree(mdn->factor->temp);CHKERRQ(ierr);}
472     if (mdn->factor->tag)    {ierr = PetscFree(mdn->factor->tag);CHKERRQ(ierr);}
473     if (mdn->factor->pivots) {ierr = PetscFree(mdn->factor->pivots);CHKERRQ(ierr);}
474     ierr = PetscFree(mdn->factor);CHKERRQ(ierr);
475   }
476   ierr = PetscFree(mdn);CHKERRQ(ierr);
477   if (mat->rmap) {
478     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
479   }
480   if (mat->cmap) {
481     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
482   }
483   PLogObjectDestroy(mat);
484   PetscHeaderDestroy(mat);
485   PetscFunctionReturn(0);
486 }
487 
488 #undef __FUNC__
489 #define __FUNC__ "MatView_MPIDense_Binary"
490 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
491 {
492   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
493   int          ierr;
494 
495   PetscFunctionBegin;
496   if (mdn->size == 1) {
497     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
498   }
499   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
500   PetscFunctionReturn(0);
501 }
502 
503 #undef __FUNC__
504 #define __FUNC__ "MatView_MPIDense_ASCII"
505 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
506 {
507   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
508   int          ierr, format, size = mdn->size, rank = mdn->rank;
509   FILE         *fd;
510   ViewerType   vtype;
511 
512   PetscFunctionBegin;
513   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
514   ierr = ViewerASCIIGetPointer(viewer,&fd);CHKERRQ(ierr);
515   ierr = ViewerGetFormat(viewer,&format);CHKERRQ(ierr);
516   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
517     MatInfo info;
518     ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
519     PetscSequentialPhaseBegin(mat->comm,1);
520       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
521          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
522       fflush(fd);
523     PetscSequentialPhaseEnd(mat->comm,1);
524     ierr = VecScatterView(mdn->Mvctx,viewer);CHKERRQ(ierr);
525     PetscFunctionReturn(0);
526   } else if (format == VIEWER_FORMAT_ASCII_INFO) {
527     PetscFunctionReturn(0);
528   }
529 
530   if (size == 1) {
531     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
532   } else {
533     /* assemble the entire matrix onto first processor. */
534     Mat          A;
535     int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
536     Scalar       *vals;
537     Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
538 
539     if (!rank) {
540       ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
541     } else {
542       ierr = MatCreateMPIDense(mat->comm,0,N,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
543     }
544     PLogObjectParent(mat,A);
545 
546     /* Copy the matrix ... This isn't the most efficient means,
547        but it's quick for now */
548     row = mdn->rstart; m = Amdn->m;
549     for ( i=0; i<m; i++ ) {
550       ierr = MatGetRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
551       ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
552       ierr = MatRestoreRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
553       row++;
554     }
555 
556     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
557     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
558     if (!rank) {
559       ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer);CHKERRQ(ierr);
560     }
561     ierr = MatDestroy(A);CHKERRQ(ierr);
562   }
563   PetscFunctionReturn(0);
564 }
565 
566 #undef __FUNC__
567 #define __FUNC__ "MatView_MPIDense"
568 int MatView_MPIDense(Mat mat,Viewer viewer)
569 {
570   int          ierr;
571   ViewerType   vtype;
572 
573   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
574   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
575     ierr = MatView_MPIDense_ASCII(mat,viewer);CHKERRQ(ierr);
576   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
577     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
578   } else {
579     SETERRQ(1,1,"Viewer type not supported by PETSc object");
580   }
581   PetscFunctionReturn(0);
582 }
583 
584 #undef __FUNC__
585 #define __FUNC__ "MatGetInfo_MPIDense"
586 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
587 {
588   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
589   Mat          mdn = mat->A;
590   int          ierr;
591   double       isend[5], irecv[5];
592 
593   PetscFunctionBegin;
594   info->rows_global    = (double)mat->M;
595   info->columns_global = (double)mat->N;
596   info->rows_local     = (double)mat->m;
597   info->columns_local  = (double)mat->N;
598   info->block_size     = 1.0;
599   ierr = MatGetInfo(mdn,MAT_LOCAL,info);CHKERRQ(ierr);
600   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
601   isend[3] = info->memory;  isend[4] = info->mallocs;
602   if (flag == MAT_LOCAL) {
603     info->nz_used      = isend[0];
604     info->nz_allocated = isend[1];
605     info->nz_unneeded  = isend[2];
606     info->memory       = isend[3];
607     info->mallocs      = isend[4];
608   } else if (flag == MAT_GLOBAL_MAX) {
609     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
610     info->nz_used      = irecv[0];
611     info->nz_allocated = irecv[1];
612     info->nz_unneeded  = irecv[2];
613     info->memory       = irecv[3];
614     info->mallocs      = irecv[4];
615   } else if (flag == MAT_GLOBAL_SUM) {
616     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
617     info->nz_used      = irecv[0];
618     info->nz_allocated = irecv[1];
619     info->nz_unneeded  = irecv[2];
620     info->memory       = irecv[3];
621     info->mallocs      = irecv[4];
622   }
623   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
624   info->fill_ratio_needed = 0;
625   info->factor_mallocs    = 0;
626   PetscFunctionReturn(0);
627 }
628 
629 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
630    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
631    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
632    extern int MatSolve_MPIDense(Mat,Vec,Vec);
633    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
634    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
635    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
636 
637 #undef __FUNC__
638 #define __FUNC__ "MatSetOption_MPIDense"
639 int MatSetOption_MPIDense(Mat A,MatOption op)
640 {
641   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
642 
643   PetscFunctionBegin;
644   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
645       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
646       op == MAT_NEW_NONZERO_LOCATION_ERR ||
647       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
648       op == MAT_COLUMNS_SORTED ||
649       op == MAT_COLUMNS_UNSORTED) {
650         MatSetOption(a->A,op);
651   } else if (op == MAT_ROW_ORIENTED) {
652         a->roworiented = 1;
653         MatSetOption(a->A,op);
654   } else if (op == MAT_ROWS_SORTED ||
655              op == MAT_ROWS_UNSORTED ||
656              op == MAT_SYMMETRIC ||
657              op == MAT_STRUCTURALLY_SYMMETRIC ||
658              op == MAT_YES_NEW_DIAGONALS ||
659              op == MAT_USE_HASH_TABLE) {
660     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
661   } else if (op == MAT_COLUMN_ORIENTED) {
662     a->roworiented = 0; MatSetOption(a->A,op);
663   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
664     a->donotstash = 1;
665   } else if (op == MAT_NO_NEW_DIAGONALS) {
666     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
667   } else {
668     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
669   }
670   PetscFunctionReturn(0);
671 }
672 
673 #undef __FUNC__
674 #define __FUNC__ "MatGetSize_MPIDense"
675 int MatGetSize_MPIDense(Mat A,int *m,int *n)
676 {
677   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
678 
679   PetscFunctionBegin;
680   *m = mat->M; *n = mat->N;
681   PetscFunctionReturn(0);
682 }
683 
684 #undef __FUNC__
685 #define __FUNC__ "MatGetLocalSize_MPIDense"
686 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
687 {
688   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
689 
690   PetscFunctionBegin;
691   *m = mat->m; *n = mat->N;
692   PetscFunctionReturn(0);
693 }
694 
695 #undef __FUNC__
696 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
697 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
698 {
699   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
700 
701   PetscFunctionBegin;
702   *m = mat->rstart; *n = mat->rend;
703   PetscFunctionReturn(0);
704 }
705 
706 #undef __FUNC__
707 #define __FUNC__ "MatGetRow_MPIDense"
708 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
709 {
710   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
711   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
712 
713   PetscFunctionBegin;
714   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
715   lrow = row - rstart;
716   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
717   PetscFunctionReturn(0);
718 }
719 
720 #undef __FUNC__
721 #define __FUNC__ "MatRestoreRow_MPIDense"
722 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
723 {
724   int ierr;
725 
726   PetscFunctionBegin;
727   if (idx) {ierr = PetscFree(*idx);CHKERRQ(ierr);}
728   if (v) {ierr = PetscFree(*v);CHKERRQ(ierr);}
729   PetscFunctionReturn(0);
730 }
731 
732 #undef __FUNC__
733 #define __FUNC__ "MatDiagonalScale_MPIDense"
734 int MatDiagonalScale_MPIDense(Mat A,Vec ll,Vec rr)
735 {
736   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
737   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
738   Scalar       *l,*r,x,*v;
739   int          ierr,i,j,s2a,s3a,s2,s3,m=mat->m,n=mat->n;
740 
741   PetscFunctionBegin;
742   ierr = MatGetLocalSize(A,&s2,&s3);CHKERRQ(ierr);
743   if (ll) {
744     ierr = VecGetLocalSize(ll,&s2a);CHKERRQ(ierr);
745     if (s2a != s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Left scaling vector non-conforming local size");
746     ierr = VecGetArray(ll,&l);CHKERRQ(ierr);
747     for ( i=0; i<m; i++ ) {
748       x = l[i];
749       v = mat->v + i;
750       for ( j=0; j<n; j++ ) { (*v) *= x; v+= m;}
751     }
752     ierr = VecRestoreArray(ll,&l);CHKERRQ(ierr);
753     PLogFlops(n*m);
754   }
755   if (rr) {
756     ierr = VecGetSize(rr,&s3a);CHKERRQ(ierr);
757     if (s3a != s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Right scaling vec non-conforming local size");
758     ierr = VecScatterBegin(rr,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
759     ierr = VecScatterEnd(rr,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
760     ierr = VecGetArray(mdn->lvec,&r);CHKERRQ(ierr);
761     for ( i=0; i<n; i++ ) {
762       x = r[i];
763       v = mat->v + i*m;
764       for ( j=0; j<m; j++ ) { (*v++) *= x;}
765     }
766     ierr = VecRestoreArray(mdn->lvec,&r);CHKERRQ(ierr);
767     PLogFlops(n*m);
768   }
769   PetscFunctionReturn(0);
770 }
771 
772 #undef __FUNC__
773 #define __FUNC__ "MatNorm_MPIDense"
774 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
775 {
776   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
777   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
778   int          ierr, i, j;
779   double       sum = 0.0;
780   Scalar       *v = mat->v;
781 
782   PetscFunctionBegin;
783   if (mdn->size == 1) {
784     ierr =  MatNorm(mdn->A,type,norm);CHKERRQ(ierr);
785   } else {
786     if (type == NORM_FROBENIUS) {
787       for (i=0; i<mat->n*mat->m; i++ ) {
788 #if defined(PETSC_USE_COMPLEX)
789         sum += PetscReal(PetscConj(*v)*(*v)); v++;
790 #else
791         sum += (*v)*(*v); v++;
792 #endif
793       }
794       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
795       *norm = sqrt(*norm);
796       PLogFlops(2*mat->n*mat->m);
797     } else if (type == NORM_1) {
798       double *tmp, *tmp2;
799       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) );CHKPTRQ(tmp);
800       tmp2 = tmp + mdn->N;
801       ierr = PetscMemzero(tmp,2*mdn->N*sizeof(double));CHKERRQ(ierr);
802       *norm = 0.0;
803       v = mat->v;
804       for ( j=0; j<mat->n; j++ ) {
805         for ( i=0; i<mat->m; i++ ) {
806           tmp[j] += PetscAbsScalar(*v);  v++;
807         }
808       }
809       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
810       for ( j=0; j<mdn->N; j++ ) {
811         if (tmp2[j] > *norm) *norm = tmp2[j];
812       }
813       ierr = PetscFree(tmp);CHKERRQ(ierr);
814       PLogFlops(mat->n*mat->m);
815     } else if (type == NORM_INFINITY) { /* max row norm */
816       double ntemp;
817       ierr = MatNorm(mdn->A,type,&ntemp);CHKERRQ(ierr);
818       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
819     } else {
820       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
821     }
822   }
823   PetscFunctionReturn(0);
824 }
825 
826 #undef __FUNC__
827 #define __FUNC__ "MatTranspose_MPIDense"
828 int MatTranspose_MPIDense(Mat A,Mat *matout)
829 {
830   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
831   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
832   Mat          B;
833   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
834   int          j, i, ierr;
835   Scalar       *v;
836 
837   PetscFunctionBegin;
838   if (matout == PETSC_NULL && M != N) {
839     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
840   }
841   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
842 
843   m = Aloc->m; n = Aloc->n; v = Aloc->v;
844   rwork = (int *) PetscMalloc(n*sizeof(int));CHKPTRQ(rwork);
845   for ( j=0; j<n; j++ ) {
846     for (i=0; i<m; i++) rwork[i] = rstart + i;
847     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES);CHKERRQ(ierr);
848     v   += m;
849   }
850   ierr = PetscFree(rwork);CHKERRQ(ierr);
851   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
852   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
853   if (matout != PETSC_NULL) {
854     *matout = B;
855   } else {
856     PetscOps *Abops;
857     MatOps   Aops;
858 
859     /* This isn't really an in-place transpose, but free data struct from a */
860     ierr = PetscFree(a->rowners);CHKERRQ(ierr);
861     ierr = MatDestroy(a->A);CHKERRQ(ierr);
862     if (a->lvec) VecDestroy(a->lvec);
863     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
864     ierr = PetscFree(a);CHKERRQ(ierr);
865 
866     /*
867          This is horrible, horrible code. We need to keep the
868       A pointers for the bops and ops but copy everything
869       else from C.
870     */
871     Abops   = A->bops;
872     Aops    = A->ops;
873     ierr    = PetscMemcpy(A,B,sizeof(struct _p_Mat));CHKERRQ(ierr);
874     A->bops = Abops;
875     A->ops  = Aops;
876 
877     PetscHeaderDestroy(B);
878   }
879   PetscFunctionReturn(0);
880 }
881 
882 #include "pinclude/blaslapack.h"
883 #undef __FUNC__
884 #define __FUNC__ "MatScale_MPIDense"
885 int MatScale_MPIDense(Scalar *alpha,Mat inA)
886 {
887   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
888   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
889   int          one = 1, nz;
890 
891   PetscFunctionBegin;
892   nz = a->m*a->n;
893   BLscal_( &nz, alpha, a->v, &one );
894   PLogFlops(nz);
895   PetscFunctionReturn(0);
896 }
897 
898 static int MatDuplicate_MPIDense(Mat,MatDuplicateOption,Mat *);
899 extern int MatGetSubMatrices_MPIDense(Mat,int,IS *,IS *,MatReuse,Mat **);
900 
901 /* -------------------------------------------------------------------*/
902 static struct _MatOps MatOps_Values = {MatSetValues_MPIDense,
903        MatGetRow_MPIDense,
904        MatRestoreRow_MPIDense,
905        MatMult_MPIDense,
906        MatMultAdd_MPIDense,
907        MatMultTrans_MPIDense,
908        MatMultTransAdd_MPIDense,
909        0,
910        0,
911        0,
912        0,
913        0,
914        0,
915        0,
916        MatTranspose_MPIDense,
917        MatGetInfo_MPIDense,0,
918        MatGetDiagonal_MPIDense,
919        MatDiagonalScale_MPIDense,
920        MatNorm_MPIDense,
921        MatAssemblyBegin_MPIDense,
922        MatAssemblyEnd_MPIDense,
923        0,
924        MatSetOption_MPIDense,
925        MatZeroEntries_MPIDense,
926        MatZeroRows_MPIDense,
927        0,
928        0,
929        0,
930        0,
931        MatGetSize_MPIDense,
932        MatGetLocalSize_MPIDense,
933        MatGetOwnershipRange_MPIDense,
934        0,
935        0,
936        MatGetArray_MPIDense,
937        MatRestoreArray_MPIDense,
938        MatDuplicate_MPIDense,
939        0,
940        0,
941        0,
942        0,
943        0,
944        MatGetSubMatrices_MPIDense,
945        0,
946        MatGetValues_MPIDense,
947        0,
948        0,
949        MatScale_MPIDense,
950        0,
951        0,
952        0,
953        MatGetBlockSize_MPIDense,
954        0,
955        0,
956        0,
957        0,
958        0,
959        0,
960        0,
961        0,
962        0,
963        MatGetSubMatrix_MPIDense,
964        0,
965        0,
966        MatGetMaps_Petsc};
967 
968 #undef __FUNC__
969 #define __FUNC__ "MatCreateMPIDense"
970 /*@C
971    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
972 
973    Collective on MPI_Comm
974 
975    Input Parameters:
976 +  comm - MPI communicator
977 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
978 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
979 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
980 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
981 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
982    to control all matrix memory allocation.
983 
984    Output Parameter:
985 .  A - the matrix
986 
987    Notes:
988    The dense format is fully compatible with standard Fortran 77
989    storage by columns.
990 
991    The data input variable is intended primarily for Fortran programmers
992    who wish to allocate their own matrix memory space.  Most users should
993    set data=PETSC_NULL.
994 
995    The user MUST specify either the local or global matrix dimensions
996    (possibly both).
997 
998    Currently, the only parallel dense matrix decomposition is by rows,
999    so that n=N and each submatrix owns all of the global columns.
1000 
1001    Level: intermediate
1002 
1003 .keywords: matrix, dense, parallel
1004 
1005 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
1006 @*/
1007 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
1008 {
1009   Mat          mat;
1010   Mat_MPIDense *a;
1011   int          ierr, i,flg;
1012 
1013   PetscFunctionBegin;
1014   /* Note:  For now, when data is specified above, this assumes the user correctly
1015    allocates the local dense storage space.  We should add error checking. */
1016 
1017   *A = 0;
1018   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",comm,MatDestroy,MatView);
1019   PLogObjectCreate(mat);
1020   mat->data         = (void *) (a = PetscNew(Mat_MPIDense));CHKPTRQ(a);
1021   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
1022   mat->ops->destroy = MatDestroy_MPIDense;
1023   mat->ops->view    = MatView_MPIDense;
1024   mat->factor       = 0;
1025   mat->mapping      = 0;
1026 
1027   a->factor       = 0;
1028   mat->insertmode = NOT_SET_VALUES;
1029   ierr = MPI_Comm_rank(comm,&a->rank);CHKERRQ(ierr);
1030   ierr = MPI_Comm_size(comm,&a->size);CHKERRQ(ierr);
1031 
1032   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
1033 
1034   /*
1035      The computation of n is wrong below, n should represent the number of local
1036      rows in the right (column vector)
1037   */
1038 
1039   /* each row stores all columns */
1040   if (N == PETSC_DECIDE) N = n;
1041   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
1042   /*  if (n != N) SETERRQ(PETSC_ERR_SUP,0,"For now, only n=N is supported"); */
1043   a->N = mat->N = N;
1044   a->M = mat->M = M;
1045   a->m = mat->m = m;
1046   a->n = mat->n = n;
1047 
1048   /* the information in the maps duplicates the information computed below, eventually
1049      we should remove the duplicate information that is not contained in the maps */
1050   ierr = MapCreateMPI(comm,m,M,&mat->rmap);CHKERRQ(ierr);
1051   ierr = MapCreateMPI(comm,PETSC_DECIDE,N,&mat->cmap);CHKERRQ(ierr);
1052 
1053   /* build local table of row and column ownerships */
1054   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int));CHKPTRQ(a->rowners);
1055   a->cowners = a->rowners + a->size + 1;
1056   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1057   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1058   a->rowners[0] = 0;
1059   for ( i=2; i<=a->size; i++ ) {
1060     a->rowners[i] += a->rowners[i-1];
1061   }
1062   a->rstart = a->rowners[a->rank];
1063   a->rend   = a->rowners[a->rank+1];
1064   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1065   a->cowners[0] = 0;
1066   for ( i=2; i<=a->size; i++ ) {
1067     a->cowners[i] += a->cowners[i-1];
1068   }
1069 
1070   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A);CHKERRQ(ierr);
1071   PLogObjectParent(mat,a->A);
1072 
1073   /* build cache for off array entries formed */
1074   a->donotstash = 0;
1075   ierr = MatStashCreate_Private(comm,1,&mat->stash);CHKERRQ(ierr);
1076 
1077   /* stuff used for matrix vector multiply */
1078   a->lvec        = 0;
1079   a->Mvctx       = 0;
1080   a->roworiented = 1;
1081 
1082   *A = mat;
1083   ierr = OptionsHasName(PETSC_NULL,"-help",&flg);CHKERRQ(ierr);
1084   if (flg) {
1085     ierr = MatPrintHelp(mat);CHKERRQ(ierr);
1086   }
1087   PetscFunctionReturn(0);
1088 }
1089 
1090 #undef __FUNC__
1091 #define __FUNC__ "MatDuplicate_MPIDense"
1092 static int MatDuplicate_MPIDense(Mat A,MatDuplicateOption cpvalues,Mat *newmat)
1093 {
1094   Mat          mat;
1095   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
1096   int          ierr;
1097   FactorCtx    *factor;
1098 
1099   PetscFunctionBegin;
1100   *newmat       = 0;
1101   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",A->comm,MatDestroy,MatView);
1102   PLogObjectCreate(mat);
1103   mat->data         = (void *) (a = PetscNew(Mat_MPIDense));CHKPTRQ(a);
1104   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
1105   mat->ops->destroy = MatDestroy_MPIDense;
1106   mat->ops->view    = MatView_MPIDense;
1107   mat->factor       = A->factor;
1108   mat->assembled    = PETSC_TRUE;
1109 
1110   a->m = mat->m = oldmat->m;
1111   a->n = mat->n = oldmat->n;
1112   a->M = mat->M = oldmat->M;
1113   a->N = mat->N = oldmat->N;
1114   if (oldmat->factor) {
1115     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx));CHKPTRQ(factor);
1116     /* copy factor contents ... add this code! */
1117   } else a->factor = 0;
1118 
1119   a->rstart       = oldmat->rstart;
1120   a->rend         = oldmat->rend;
1121   a->size         = oldmat->size;
1122   a->rank         = oldmat->rank;
1123   mat->insertmode = NOT_SET_VALUES;
1124   a->donotstash   = oldmat->donotstash;
1125   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int));CHKPTRQ(a->rowners);
1126   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1127   ierr = PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));CHKERRQ(ierr);
1128   ierr = MatStashCreate_Private(A->comm,1,&mat->stash);CHKERRQ(ierr);
1129 
1130   ierr =  VecDuplicate(oldmat->lvec,&a->lvec);CHKERRQ(ierr);
1131   PLogObjectParent(mat,a->lvec);
1132   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx);CHKERRQ(ierr);
1133   PLogObjectParent(mat,a->Mvctx);
1134   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A);CHKERRQ(ierr);
1135   PLogObjectParent(mat,a->A);
1136   *newmat = mat;
1137   PetscFunctionReturn(0);
1138 }
1139 
1140 #include "sys.h"
1141 
1142 #undef __FUNC__
1143 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1144 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1145 {
1146   int        *rowners, i,size,rank,m,ierr,nz,j;
1147   Scalar     *array,*vals,*vals_ptr;
1148   MPI_Status status;
1149 
1150   PetscFunctionBegin;
1151   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1152   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1153 
1154   /* determine ownership of all rows */
1155   m          = M/size + ((M % size) > rank);
1156   rowners    = (int *) PetscMalloc((size+2)*sizeof(int));CHKPTRQ(rowners);
1157   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1158   rowners[0] = 0;
1159   for ( i=2; i<=size; i++ ) {
1160     rowners[i] += rowners[i-1];
1161   }
1162 
1163   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1164   ierr = MatGetArray(*newmat,&array);CHKERRQ(ierr);
1165 
1166   if (!rank) {
1167     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) );CHKPTRQ(vals);
1168 
1169     /* read in my part of the matrix numerical values  */
1170     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR);CHKERRQ(ierr);
1171 
1172     /* insert into matrix-by row (this is why cannot directly read into array */
1173     vals_ptr = vals;
1174     for ( i=0; i<m; i++ ) {
1175       for ( j=0; j<N; j++ ) {
1176         array[i + j*m] = *vals_ptr++;
1177       }
1178     }
1179 
1180     /* read in other processors and ship out */
1181     for ( i=1; i<size; i++ ) {
1182       nz   = (rowners[i+1] - rowners[i])*N;
1183       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1184       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1185     }
1186   } else {
1187     /* receive numeric values */
1188     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) );CHKPTRQ(vals);
1189 
1190     /* receive message of values*/
1191     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1192 
1193     /* insert into matrix-by row (this is why cannot directly read into array */
1194     vals_ptr = vals;
1195     for ( i=0; i<m; i++ ) {
1196       for ( j=0; j<N; j++ ) {
1197         array[i + j*m] = *vals_ptr++;
1198       }
1199     }
1200   }
1201   ierr = PetscFree(rowners);CHKERRQ(ierr);
1202   ierr = PetscFree(vals);CHKERRQ(ierr);
1203   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1204   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1205   PetscFunctionReturn(0);
1206 }
1207 
1208 
1209 #undef __FUNC__
1210 #define __FUNC__ "MatLoad_MPIDense"
1211 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1212 {
1213   Mat          A;
1214   Scalar       *vals,*svals;
1215   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1216   MPI_Status   status;
1217   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1218   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1219   int          tag = ((PetscObject)viewer)->tag;
1220   int          i, nz, ierr, j,rstart, rend, fd;
1221 
1222   PetscFunctionBegin;
1223   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1224   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1225   if (!rank) {
1226     ierr = ViewerBinaryGetDescriptor(viewer,&fd);CHKERRQ(ierr);
1227     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT);CHKERRQ(ierr);
1228     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1229   }
1230 
1231   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1232   M = header[1]; N = header[2]; nz = header[3];
1233 
1234   /*
1235        Handle case where matrix is stored on disk as a dense matrix
1236   */
1237   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1238     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1239     PetscFunctionReturn(0);
1240   }
1241 
1242   /* determine ownership of all rows */
1243   m          = M/size + ((M % size) > rank);
1244   rowners    = (int *) PetscMalloc((size+2)*sizeof(int));CHKPTRQ(rowners);
1245   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1246   rowners[0] = 0;
1247   for ( i=2; i<=size; i++ ) {
1248     rowners[i] += rowners[i-1];
1249   }
1250   rstart = rowners[rank];
1251   rend   = rowners[rank+1];
1252 
1253   /* distribute row lengths to all processors */
1254   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) );CHKPTRQ(ourlens);
1255   offlens = ourlens + (rend-rstart);
1256   if (!rank) {
1257     rowlengths = (int*) PetscMalloc( M*sizeof(int) );CHKPTRQ(rowlengths);
1258     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT);CHKERRQ(ierr);
1259     sndcounts = (int*) PetscMalloc( size*sizeof(int) );CHKPTRQ(sndcounts);
1260     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1261     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1262     ierr = PetscFree(sndcounts);CHKERRQ(ierr);
1263   } else {
1264     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1265   }
1266 
1267   if (!rank) {
1268     /* calculate the number of nonzeros on each processor */
1269     procsnz = (int*) PetscMalloc( size*sizeof(int) );CHKPTRQ(procsnz);
1270     ierr    = PetscMemzero(procsnz,size*sizeof(int));CHKERRQ(ierr);
1271     for ( i=0; i<size; i++ ) {
1272       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1273         procsnz[i] += rowlengths[j];
1274       }
1275     }
1276     ierr = PetscFree(rowlengths);CHKERRQ(ierr);
1277 
1278     /* determine max buffer needed and allocate it */
1279     maxnz = 0;
1280     for ( i=0; i<size; i++ ) {
1281       maxnz = PetscMax(maxnz,procsnz[i]);
1282     }
1283     cols = (int *) PetscMalloc( maxnz*sizeof(int) );CHKPTRQ(cols);
1284 
1285     /* read in my part of the matrix column indices  */
1286     nz = procsnz[0];
1287     mycols = (int *) PetscMalloc( nz*sizeof(int) );CHKPTRQ(mycols);
1288     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT);CHKERRQ(ierr);
1289 
1290     /* read in every one elses and ship off */
1291     for ( i=1; i<size; i++ ) {
1292       nz   = procsnz[i];
1293       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT);CHKERRQ(ierr);
1294       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1295     }
1296     ierr = PetscFree(cols);CHKERRQ(ierr);
1297   } else {
1298     /* determine buffer space needed for message */
1299     nz = 0;
1300     for ( i=0; i<m; i++ ) {
1301       nz += ourlens[i];
1302     }
1303     mycols = (int*) PetscMalloc( nz*sizeof(int) );CHKPTRQ(mycols);
1304 
1305     /* receive message of column indices*/
1306     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1307     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1308     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1309   }
1310 
1311   /* loop over local rows, determining number of off diagonal entries */
1312   ierr = PetscMemzero(offlens,m*sizeof(int));CHKERRQ(ierr);
1313   jj = 0;
1314   for ( i=0; i<m; i++ ) {
1315     for ( j=0; j<ourlens[i]; j++ ) {
1316       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1317       jj++;
1318     }
1319   }
1320 
1321   /* create our matrix */
1322   for ( i=0; i<m; i++ ) {
1323     ourlens[i] -= offlens[i];
1324   }
1325   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1326   A = *newmat;
1327   for ( i=0; i<m; i++ ) {
1328     ourlens[i] += offlens[i];
1329   }
1330 
1331   if (!rank) {
1332     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) );CHKPTRQ(vals);
1333 
1334     /* read in my part of the matrix numerical values  */
1335     nz = procsnz[0];
1336     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1337 
1338     /* insert into matrix */
1339     jj      = rstart;
1340     smycols = mycols;
1341     svals   = vals;
1342     for ( i=0; i<m; i++ ) {
1343       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1344       smycols += ourlens[i];
1345       svals   += ourlens[i];
1346       jj++;
1347     }
1348 
1349     /* read in other processors and ship out */
1350     for ( i=1; i<size; i++ ) {
1351       nz   = procsnz[i];
1352       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1353       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1354     }
1355     ierr = PetscFree(procsnz);CHKERRQ(ierr);
1356   } else {
1357     /* receive numeric values */
1358     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) );CHKPTRQ(vals);
1359 
1360     /* receive message of values*/
1361     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1362     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1363     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1364 
1365     /* insert into matrix */
1366     jj      = rstart;
1367     smycols = mycols;
1368     svals   = vals;
1369     for ( i=0; i<m; i++ ) {
1370       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1371       smycols += ourlens[i];
1372       svals   += ourlens[i];
1373       jj++;
1374     }
1375   }
1376   ierr = PetscFree(ourlens);CHKERRQ(ierr);
1377   ierr = PetscFree(vals);CHKERRQ(ierr);
1378   ierr = PetscFree(mycols);CHKERRQ(ierr);
1379   ierr = PetscFree(rowners);CHKERRQ(ierr);
1380 
1381   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1382   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1383   PetscFunctionReturn(0);
1384 }
1385 
1386 
1387 
1388 
1389 
1390