xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 4db511f1d1b98f37717f3ecaf35cce8ecdb684f9)
1 /*$Id: mpidense.c,v 1.159 2001/08/10 03:30:41 bsmith Exp $*/
2 
3 /*
4    Basic functions for basic parallel dense matrices.
5 */
6 
7 #include "src/mat/impls/dense/mpi/mpidense.h"
8 #include "src/vec/vecimpl.h"
9 
10 EXTERN_C_BEGIN
11 #undef __FUNCT__
12 #define __FUNCT__ "MatGetDiagonalBlock_MPIDense"
13 int MatGetDiagonalBlock_MPIDense(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *B)
14 {
15   Mat_MPIDense *mdn = (Mat_MPIDense*)A->data;
16   int          m = A->m,rstart = mdn->rstart,ierr;
17   PetscScalar  *array;
18   MPI_Comm     comm;
19 
20   PetscFunctionBegin;
21   if (A->M != A->N) SETERRQ(PETSC_ERR_SUP,"Only square matrices supported.");
22 
23   /* The reuse aspect is not implemented efficiently */
24   if (reuse) { ierr = MatDestroy(*B);CHKERRQ(ierr);}
25 
26   ierr = PetscObjectGetComm((PetscObject)(mdn->A),&comm);CHKERRQ(ierr);
27   ierr = MatGetArray(mdn->A,&array);CHKERRQ(ierr);
28   ierr = MatCreateSeqDense(comm,m,m,array+m*rstart,B);CHKERRQ(ierr);
29   ierr = MatRestoreArray(mdn->A,&array);CHKERRQ(ierr);
30   ierr = MatAssemblyBegin(*B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
31   ierr = MatAssemblyEnd(*B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
32 
33   *iscopy = PETSC_TRUE;
34   PetscFunctionReturn(0);
35 }
36 EXTERN_C_END
37 
38 EXTERN int MatSetUpMultiply_MPIDense(Mat);
39 
40 #undef __FUNCT__
41 #define __FUNCT__ "MatSetValues_MPIDense"
42 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,PetscScalar *v,InsertMode addv)
43 {
44   Mat_MPIDense *A = (Mat_MPIDense*)mat->data;
45   int          ierr,i,j,rstart = A->rstart,rend = A->rend,row;
46   PetscTruth   roworiented = A->roworiented;
47 
48   PetscFunctionBegin;
49   for (i=0; i<m; i++) {
50     if (idxm[i] < 0) continue;
51     if (idxm[i] >= mat->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Row too large");
52     if (idxm[i] >= rstart && idxm[i] < rend) {
53       row = idxm[i] - rstart;
54       if (roworiented) {
55         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv);CHKERRQ(ierr);
56       } else {
57         for (j=0; j<n; j++) {
58           if (idxn[j] < 0) continue;
59           if (idxn[j] >= mat->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Column too large");
60           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
61         }
62       }
63     } else {
64       if (!A->donotstash) {
65         if (roworiented) {
66           ierr = MatStashValuesRow_Private(&mat->stash,idxm[i],n,idxn,v+i*n);CHKERRQ(ierr);
67         } else {
68           ierr = MatStashValuesCol_Private(&mat->stash,idxm[i],n,idxn,v+i,m);CHKERRQ(ierr);
69         }
70       }
71     }
72   }
73   PetscFunctionReturn(0);
74 }
75 
76 #undef __FUNCT__
77 #define __FUNCT__ "MatGetValues_MPIDense"
78 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,PetscScalar *v)
79 {
80   Mat_MPIDense *mdn = (Mat_MPIDense*)mat->data;
81   int          ierr,i,j,rstart = mdn->rstart,rend = mdn->rend,row;
82 
83   PetscFunctionBegin;
84   for (i=0; i<m; i++) {
85     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Negative row");
86     if (idxm[i] >= mat->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Row too large");
87     if (idxm[i] >= rstart && idxm[i] < rend) {
88       row = idxm[i] - rstart;
89       for (j=0; j<n; j++) {
90         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Negative column");
91         if (idxn[j] >= mat->N) {
92           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Column too large");
93         }
94         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j);CHKERRQ(ierr);
95       }
96     } else {
97       SETERRQ(PETSC_ERR_SUP,"Only local values currently supported");
98     }
99   }
100   PetscFunctionReturn(0);
101 }
102 
103 #undef __FUNCT__
104 #define __FUNCT__ "MatGetArray_MPIDense"
105 int MatGetArray_MPIDense(Mat A,PetscScalar **array)
106 {
107   Mat_MPIDense *a = (Mat_MPIDense*)A->data;
108   int          ierr;
109 
110   PetscFunctionBegin;
111   ierr = MatGetArray(a->A,array);CHKERRQ(ierr);
112   PetscFunctionReturn(0);
113 }
114 
115 #undef __FUNCT__
116 #define __FUNCT__ "MatGetSubMatrix_MPIDense"
117 static int MatGetSubMatrix_MPIDense(Mat A,IS isrow,IS iscol,int cs,MatReuse scall,Mat *B)
118 {
119   Mat_MPIDense *mat = (Mat_MPIDense*)A->data,*newmatd;
120   Mat_SeqDense *lmat = (Mat_SeqDense*)mat->A->data;
121   int          i,j,ierr,*irow,*icol,rstart,rend,nrows,ncols,nlrows,nlcols;
122   PetscScalar  *av,*bv,*v = lmat->v;
123   Mat          newmat;
124 
125   PetscFunctionBegin;
126   ierr = ISGetIndices(isrow,&irow);CHKERRQ(ierr);
127   ierr = ISGetIndices(iscol,&icol);CHKERRQ(ierr);
128   ierr = ISGetLocalSize(isrow,&nrows);CHKERRQ(ierr);
129   ierr = ISGetLocalSize(iscol,&ncols);CHKERRQ(ierr);
130 
131   /* No parallel redistribution currently supported! Should really check each index set
132      to comfirm that it is OK.  ... Currently supports only submatrix same partitioning as
133      original matrix! */
134 
135   ierr = MatGetLocalSize(A,&nlrows,&nlcols);CHKERRQ(ierr);
136   ierr = MatGetOwnershipRange(A,&rstart,&rend);CHKERRQ(ierr);
137 
138   /* Check submatrix call */
139   if (scall == MAT_REUSE_MATRIX) {
140     /* SETERRQ(PETSC_ERR_ARG_SIZ,"Reused submatrix wrong size"); */
141     /* Really need to test rows and column sizes! */
142     newmat = *B;
143   } else {
144     /* Create and fill new matrix */
145     ierr = MatCreateMPIDense(A->comm,nrows,cs,PETSC_DECIDE,ncols,PETSC_NULL,&newmat);CHKERRQ(ierr);
146   }
147 
148   /* Now extract the data pointers and do the copy, column at a time */
149   newmatd = (Mat_MPIDense*)newmat->data;
150   bv      = ((Mat_SeqDense *)newmatd->A->data)->v;
151 
152   for (i=0; i<ncols; i++) {
153     av = v + nlrows*icol[i];
154     for (j=0; j<nrows; j++) {
155       *bv++ = av[irow[j] - rstart];
156     }
157   }
158 
159   /* Assemble the matrices so that the correct flags are set */
160   ierr = MatAssemblyBegin(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
161   ierr = MatAssemblyEnd(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
162 
163   /* Free work space */
164   ierr = ISRestoreIndices(isrow,&irow);CHKERRQ(ierr);
165   ierr = ISRestoreIndices(iscol,&icol);CHKERRQ(ierr);
166   *B = newmat;
167   PetscFunctionReturn(0);
168 }
169 
170 #undef __FUNCT__
171 #define __FUNCT__ "MatRestoreArray_MPIDense"
172 int MatRestoreArray_MPIDense(Mat A,PetscScalar **array)
173 {
174   PetscFunctionBegin;
175   PetscFunctionReturn(0);
176 }
177 
178 #undef __FUNCT__
179 #define __FUNCT__ "MatAssemblyBegin_MPIDense"
180 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
181 {
182   Mat_MPIDense *mdn = (Mat_MPIDense*)mat->data;
183   MPI_Comm     comm = mat->comm;
184   int          ierr,nstash,reallocs;
185   InsertMode   addv;
186 
187   PetscFunctionBegin;
188   /* make sure all processors are either in INSERTMODE or ADDMODE */
189   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
190   if (addv == (ADD_VALUES|INSERT_VALUES)) {
191     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"Cannot mix adds/inserts on different procs");
192   }
193   mat->insertmode = addv; /* in case this processor had no cache */
194 
195   ierr = MatStashScatterBegin_Private(&mat->stash,mdn->rowners);CHKERRQ(ierr);
196   ierr = MatStashGetInfo_Private(&mat->stash,&nstash,&reallocs);CHKERRQ(ierr);
197   PetscLogInfo(mdn->A,"MatAssemblyBegin_MPIDense:Stash has %d entries, uses %d mallocs.\n",nstash,reallocs);
198   PetscFunctionReturn(0);
199 }
200 
201 #undef __FUNCT__
202 #define __FUNCT__ "MatAssemblyEnd_MPIDense"
203 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
204 {
205   Mat_MPIDense *mdn=(Mat_MPIDense*)mat->data;
206   int          i,n,ierr,*row,*col,flg,j,rstart,ncols;
207   PetscScalar  *val;
208   InsertMode   addv=mat->insertmode;
209 
210   PetscFunctionBegin;
211   /*  wait on receives */
212   while (1) {
213     ierr = MatStashScatterGetMesg_Private(&mat->stash,&n,&row,&col,&val,&flg);CHKERRQ(ierr);
214     if (!flg) break;
215 
216     for (i=0; i<n;) {
217       /* Now identify the consecutive vals belonging to the same row */
218       for (j=i,rstart=row[j]; j<n; j++) { if (row[j] != rstart) break; }
219       if (j < n) ncols = j-i;
220       else       ncols = n-i;
221       /* Now assemble all these values with a single function call */
222       ierr = MatSetValues_MPIDense(mat,1,row+i,ncols,col+i,val+i,addv);CHKERRQ(ierr);
223       i = j;
224     }
225   }
226   ierr = MatStashScatterEnd_Private(&mat->stash);CHKERRQ(ierr);
227 
228   ierr = MatAssemblyBegin(mdn->A,mode);CHKERRQ(ierr);
229   ierr = MatAssemblyEnd(mdn->A,mode);CHKERRQ(ierr);
230 
231   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
232     ierr = MatSetUpMultiply_MPIDense(mat);CHKERRQ(ierr);
233   }
234   PetscFunctionReturn(0);
235 }
236 
237 #undef __FUNCT__
238 #define __FUNCT__ "MatZeroEntries_MPIDense"
239 int MatZeroEntries_MPIDense(Mat A)
240 {
241   int          ierr;
242   Mat_MPIDense *l = (Mat_MPIDense*)A->data;
243 
244   PetscFunctionBegin;
245   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
246   PetscFunctionReturn(0);
247 }
248 
249 #undef __FUNCT__
250 #define __FUNCT__ "MatGetBlockSize_MPIDense"
251 int MatGetBlockSize_MPIDense(Mat A,int *bs)
252 {
253   PetscFunctionBegin;
254   *bs = 1;
255   PetscFunctionReturn(0);
256 }
257 
258 /* the code does not do the diagonal entries correctly unless the
259    matrix is square and the column and row owerships are identical.
260    This is a BUG. The only way to fix it seems to be to access
261    mdn->A and mdn->B directly and not through the MatZeroRows()
262    routine.
263 */
264 #undef __FUNCT__
265 #define __FUNCT__ "MatZeroRows_MPIDense"
266 int MatZeroRows_MPIDense(Mat A,IS is,PetscScalar *diag)
267 {
268   Mat_MPIDense   *l = (Mat_MPIDense*)A->data;
269   int            i,ierr,N,*rows,*owners = l->rowners,size = l->size;
270   int            *nprocs,j,idx,nsends;
271   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
272   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
273   int            *lens,imdex,*lrows,*values;
274   MPI_Comm       comm = A->comm;
275   MPI_Request    *send_waits,*recv_waits;
276   MPI_Status     recv_status,*send_status;
277   IS             istmp;
278   PetscTruth     found;
279 
280   PetscFunctionBegin;
281   ierr = ISGetLocalSize(is,&N);CHKERRQ(ierr);
282   ierr = ISGetIndices(is,&rows);CHKERRQ(ierr);
283 
284   /*  first count number of contributors to each processor */
285   ierr  = PetscMalloc(2*size*sizeof(int),&nprocs);CHKERRQ(ierr);
286   ierr  = PetscMemzero(nprocs,2*size*sizeof(int));CHKERRQ(ierr);
287   ierr  = PetscMalloc((N+1)*sizeof(int),&owner);CHKERRQ(ierr); /* see note*/
288   for (i=0; i<N; i++) {
289     idx = rows[i];
290     found = PETSC_FALSE;
291     for (j=0; j<size; j++) {
292       if (idx >= owners[j] && idx < owners[j+1]) {
293         nprocs[2*j]++; nprocs[2*j+1] = 1; owner[i] = j; found = PETSC_TRUE; break;
294       }
295     }
296     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Index out of range");
297   }
298   nsends = 0;  for (i=0; i<size; i++) { nsends += nprocs[2*i+1];}
299 
300   /* inform other processors of number of messages and max length*/
301   ierr = PetscMaxSum(comm,nprocs,&nmax,&nrecvs);CHKERRQ(ierr);
302 
303   /* post receives:   */
304   ierr = PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int),&rvalues);CHKERRQ(ierr);
305   ierr = PetscMalloc((nrecvs+1)*sizeof(MPI_Request),&recv_waits);CHKERRQ(ierr);
306   for (i=0; i<nrecvs; i++) {
307     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
308   }
309 
310   /* do sends:
311       1) starts[i] gives the starting index in svalues for stuff going to
312          the ith processor
313   */
314   ierr = PetscMalloc((N+1)*sizeof(int),&svalues);CHKERRQ(ierr);
315   ierr = PetscMalloc((nsends+1)*sizeof(MPI_Request),&send_waits);CHKERRQ(ierr);
316   ierr = PetscMalloc((size+1)*sizeof(int),&starts);CHKERRQ(ierr);
317   starts[0]  = 0;
318   for (i=1; i<size; i++) { starts[i] = starts[i-1] + nprocs[2*i-2];}
319   for (i=0; i<N; i++) {
320     svalues[starts[owner[i]]++] = rows[i];
321   }
322   ISRestoreIndices(is,&rows);
323 
324   starts[0] = 0;
325   for (i=1; i<size+1; i++) { starts[i] = starts[i-1] + nprocs[2*i-2];}
326   count = 0;
327   for (i=0; i<size; i++) {
328     if (nprocs[2*i+1]) {
329       ierr = MPI_Isend(svalues+starts[i],nprocs[2*i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
330     }
331   }
332   ierr = PetscFree(starts);CHKERRQ(ierr);
333 
334   base = owners[rank];
335 
336   /*  wait on receives */
337   ierr   = PetscMalloc(2*(nrecvs+1)*sizeof(int),&lens);CHKERRQ(ierr);
338   source = lens + nrecvs;
339   count  = nrecvs; slen = 0;
340   while (count) {
341     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
342     /* unpack receives into our local space */
343     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
344     source[imdex]  = recv_status.MPI_SOURCE;
345     lens[imdex]  = n;
346     slen += n;
347     count--;
348   }
349   ierr = PetscFree(recv_waits);CHKERRQ(ierr);
350 
351   /* move the data into the send scatter */
352   ierr = PetscMalloc((slen+1)*sizeof(int),&lrows);CHKERRQ(ierr);
353   count = 0;
354   for (i=0; i<nrecvs; i++) {
355     values = rvalues + i*nmax;
356     for (j=0; j<lens[i]; j++) {
357       lrows[count++] = values[j] - base;
358     }
359   }
360   ierr = PetscFree(rvalues);CHKERRQ(ierr);
361   ierr = PetscFree(lens);CHKERRQ(ierr);
362   ierr = PetscFree(owner);CHKERRQ(ierr);
363   ierr = PetscFree(nprocs);CHKERRQ(ierr);
364 
365   /* actually zap the local rows */
366   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
367   PetscLogObjectParent(A,istmp);
368   ierr = PetscFree(lrows);CHKERRQ(ierr);
369   ierr = MatZeroRows(l->A,istmp,diag);CHKERRQ(ierr);
370   ierr = ISDestroy(istmp);CHKERRQ(ierr);
371 
372   /* wait on sends */
373   if (nsends) {
374     ierr = PetscMalloc(nsends*sizeof(MPI_Status),&send_status);CHKERRQ(ierr);
375     ierr = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
376     ierr = PetscFree(send_status);CHKERRQ(ierr);
377   }
378   ierr = PetscFree(send_waits);CHKERRQ(ierr);
379   ierr = PetscFree(svalues);CHKERRQ(ierr);
380 
381   PetscFunctionReturn(0);
382 }
383 
384 #undef __FUNCT__
385 #define __FUNCT__ "MatMult_MPIDense"
386 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
387 {
388   Mat_MPIDense *mdn = (Mat_MPIDense*)mat->data;
389   int          ierr;
390 
391   PetscFunctionBegin;
392   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
393   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
394   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy);CHKERRQ(ierr);
395   PetscFunctionReturn(0);
396 }
397 
398 #undef __FUNCT__
399 #define __FUNCT__ "MatMultAdd_MPIDense"
400 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
401 {
402   Mat_MPIDense *mdn = (Mat_MPIDense*)mat->data;
403   int          ierr;
404 
405   PetscFunctionBegin;
406   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
407   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
408   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz);CHKERRQ(ierr);
409   PetscFunctionReturn(0);
410 }
411 
412 #undef __FUNCT__
413 #define __FUNCT__ "MatMultTranspose_MPIDense"
414 int MatMultTranspose_MPIDense(Mat A,Vec xx,Vec yy)
415 {
416   Mat_MPIDense *a = (Mat_MPIDense*)A->data;
417   int          ierr;
418   PetscScalar  zero = 0.0;
419 
420   PetscFunctionBegin;
421   ierr = VecSet(&zero,yy);CHKERRQ(ierr);
422   ierr = MatMultTranspose_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
423   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
424   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
425   PetscFunctionReturn(0);
426 }
427 
428 #undef __FUNCT__
429 #define __FUNCT__ "MatMultTransposeAdd_MPIDense"
430 int MatMultTransposeAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
431 {
432   Mat_MPIDense *a = (Mat_MPIDense*)A->data;
433   int          ierr;
434 
435   PetscFunctionBegin;
436   ierr = VecCopy(yy,zz);CHKERRQ(ierr);
437   ierr = MatMultTranspose_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
438   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
439   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
440   PetscFunctionReturn(0);
441 }
442 
443 #undef __FUNCT__
444 #define __FUNCT__ "MatGetDiagonal_MPIDense"
445 int MatGetDiagonal_MPIDense(Mat A,Vec v)
446 {
447   Mat_MPIDense *a = (Mat_MPIDense*)A->data;
448   Mat_SeqDense *aloc = (Mat_SeqDense*)a->A->data;
449   int          ierr,len,i,n,m = A->m,radd;
450   PetscScalar  *x,zero = 0.0;
451 
452   PetscFunctionBegin;
453   ierr = VecSet(&zero,v);CHKERRQ(ierr);
454   ierr = VecGetArray(v,&x);CHKERRQ(ierr);
455   ierr = VecGetSize(v,&n);CHKERRQ(ierr);
456   if (n != A->M) SETERRQ(PETSC_ERR_ARG_SIZ,"Nonconforming mat and vec");
457   len  = PetscMin(a->A->m,a->A->n);
458   radd = a->rstart*m;
459   for (i=0; i<len; i++) {
460     x[i] = aloc->v[radd + i*m + i];
461   }
462   ierr = VecRestoreArray(v,&x);CHKERRQ(ierr);
463   PetscFunctionReturn(0);
464 }
465 
466 #undef __FUNCT__
467 #define __FUNCT__ "MatDestroy_MPIDense"
468 int MatDestroy_MPIDense(Mat mat)
469 {
470   Mat_MPIDense *mdn = (Mat_MPIDense*)mat->data;
471   int          ierr;
472 
473   PetscFunctionBegin;
474 
475 #if defined(PETSC_USE_LOG)
476   PetscLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mat->M,mat->N);
477 #endif
478   ierr = MatStashDestroy_Private(&mat->stash);CHKERRQ(ierr);
479   ierr = PetscFree(mdn->rowners);CHKERRQ(ierr);
480   ierr = MatDestroy(mdn->A);CHKERRQ(ierr);
481   if (mdn->lvec)   VecDestroy(mdn->lvec);
482   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
483   if (mdn->factor) {
484     if (mdn->factor->temp)   {ierr = PetscFree(mdn->factor->temp);CHKERRQ(ierr);}
485     if (mdn->factor->tag)    {ierr = PetscFree(mdn->factor->tag);CHKERRQ(ierr);}
486     if (mdn->factor->pivots) {ierr = PetscFree(mdn->factor->pivots);CHKERRQ(ierr);}
487     ierr = PetscFree(mdn->factor);CHKERRQ(ierr);
488   }
489   ierr = PetscFree(mdn);CHKERRQ(ierr);
490   PetscFunctionReturn(0);
491 }
492 
493 #undef __FUNCT__
494 #define __FUNCT__ "MatView_MPIDense_Binary"
495 static int MatView_MPIDense_Binary(Mat mat,PetscViewer viewer)
496 {
497   Mat_MPIDense *mdn = (Mat_MPIDense*)mat->data;
498   int          ierr;
499 
500   PetscFunctionBegin;
501   if (mdn->size == 1) {
502     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
503   }
504   else SETERRQ(PETSC_ERR_SUP,"Only uniprocessor output supported");
505   PetscFunctionReturn(0);
506 }
507 
508 #undef __FUNCT__
509 #define __FUNCT__ "MatView_MPIDense_ASCIIorDraworSocket"
510 static int MatView_MPIDense_ASCIIorDraworSocket(Mat mat,PetscViewer viewer)
511 {
512   Mat_MPIDense      *mdn = (Mat_MPIDense*)mat->data;
513   int               ierr,size = mdn->size,rank = mdn->rank;
514   PetscViewerType   vtype;
515   PetscTruth        isascii,isdraw;
516   PetscViewer       sviewer;
517   PetscViewerFormat format;
518 
519   PetscFunctionBegin;
520   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_ASCII,&isascii);CHKERRQ(ierr);
521   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_DRAW,&isdraw);CHKERRQ(ierr);
522   if (isascii) {
523     ierr = PetscViewerGetType(viewer,&vtype);CHKERRQ(ierr);
524     ierr = PetscViewerGetFormat(viewer,&format);CHKERRQ(ierr);
525     if (format == PETSC_VIEWER_ASCII_INFO_DETAIL) {
526       MatInfo info;
527       ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
528       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mat->m,
529                    (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);CHKERRQ(ierr);
530       ierr = PetscViewerFlush(viewer);CHKERRQ(ierr);
531       ierr = VecScatterView(mdn->Mvctx,viewer);CHKERRQ(ierr);
532       PetscFunctionReturn(0);
533     } else if (format == PETSC_VIEWER_ASCII_INFO) {
534       PetscFunctionReturn(0);
535     }
536   } else if (isdraw) {
537     PetscDraw       draw;
538     PetscTruth isnull;
539 
540     ierr = PetscViewerDrawGetDraw(viewer,0,&draw);CHKERRQ(ierr);
541     ierr = PetscDrawIsNull(draw,&isnull);CHKERRQ(ierr);
542     if (isnull) PetscFunctionReturn(0);
543   }
544 
545   if (size == 1) {
546     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
547   } else {
548     /* assemble the entire matrix onto first processor. */
549     Mat          A;
550     int          M = mat->M,N = mat->N,m,row,i,nz,*cols;
551     PetscScalar  *vals;
552 
553     if (!rank) {
554       ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
555     } else {
556       ierr = MatCreateMPIDense(mat->comm,0,0,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
557     }
558     PetscLogObjectParent(mat,A);
559 
560     /* Copy the matrix ... This isn't the most efficient means,
561        but it's quick for now */
562     row = mdn->rstart; m = mdn->A->m;
563     for (i=0; i<m; i++) {
564       ierr = MatGetRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
565       ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
566       ierr = MatRestoreRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
567       row++;
568     }
569 
570     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
571     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
572     ierr = PetscViewerGetSingleton(viewer,&sviewer);CHKERRQ(ierr);
573     if (!rank) {
574       ierr = MatView(((Mat_MPIDense*)(A->data))->A,sviewer);CHKERRQ(ierr);
575     }
576     ierr = PetscViewerRestoreSingleton(viewer,&sviewer);CHKERRQ(ierr);
577     ierr = PetscViewerFlush(viewer);CHKERRQ(ierr);
578     ierr = MatDestroy(A);CHKERRQ(ierr);
579   }
580   PetscFunctionReturn(0);
581 }
582 
583 #undef __FUNCT__
584 #define __FUNCT__ "MatView_MPIDense"
585 int MatView_MPIDense(Mat mat,PetscViewer viewer)
586 {
587   int        ierr;
588   PetscTruth isascii,isbinary,isdraw,issocket;
589 
590   PetscFunctionBegin;
591 
592   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_ASCII,&isascii);CHKERRQ(ierr);
593   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_BINARY,&isbinary);CHKERRQ(ierr);
594   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_SOCKET,&issocket);CHKERRQ(ierr);
595   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_DRAW,&isdraw);CHKERRQ(ierr);
596 
597   if (isascii || issocket || isdraw) {
598     ierr = MatView_MPIDense_ASCIIorDraworSocket(mat,viewer);CHKERRQ(ierr);
599   } else if (isbinary) {
600     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
601   } else {
602     SETERRQ1(1,"Viewer type %s not supported by MPI dense matrix",((PetscObject)viewer)->type_name);
603   }
604   PetscFunctionReturn(0);
605 }
606 
607 #undef __FUNCT__
608 #define __FUNCT__ "MatGetInfo_MPIDense"
609 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
610 {
611   Mat_MPIDense *mat = (Mat_MPIDense*)A->data;
612   Mat          mdn = mat->A;
613   int          ierr;
614   PetscReal    isend[5],irecv[5];
615 
616   PetscFunctionBegin;
617   info->rows_global    = (double)A->M;
618   info->columns_global = (double)A->N;
619   info->rows_local     = (double)A->m;
620   info->columns_local  = (double)A->N;
621   info->block_size     = 1.0;
622   ierr = MatGetInfo(mdn,MAT_LOCAL,info);CHKERRQ(ierr);
623   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
624   isend[3] = info->memory;  isend[4] = info->mallocs;
625   if (flag == MAT_LOCAL) {
626     info->nz_used      = isend[0];
627     info->nz_allocated = isend[1];
628     info->nz_unneeded  = isend[2];
629     info->memory       = isend[3];
630     info->mallocs      = isend[4];
631   } else if (flag == MAT_GLOBAL_MAX) {
632     ierr = MPI_Allreduce(isend,irecv,5,MPIU_REAL,MPI_MAX,A->comm);CHKERRQ(ierr);
633     info->nz_used      = irecv[0];
634     info->nz_allocated = irecv[1];
635     info->nz_unneeded  = irecv[2];
636     info->memory       = irecv[3];
637     info->mallocs      = irecv[4];
638   } else if (flag == MAT_GLOBAL_SUM) {
639     ierr = MPI_Allreduce(isend,irecv,5,MPIU_REAL,MPI_SUM,A->comm);CHKERRQ(ierr);
640     info->nz_used      = irecv[0];
641     info->nz_allocated = irecv[1];
642     info->nz_unneeded  = irecv[2];
643     info->memory       = irecv[3];
644     info->mallocs      = irecv[4];
645   }
646   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
647   info->fill_ratio_needed = 0;
648   info->factor_mallocs    = 0;
649   PetscFunctionReturn(0);
650 }
651 
652 #undef __FUNCT__
653 #define __FUNCT__ "MatSetOption_MPIDense"
654 int MatSetOption_MPIDense(Mat A,MatOption op)
655 {
656   Mat_MPIDense *a = (Mat_MPIDense*)A->data;
657   int          ierr;
658 
659   PetscFunctionBegin;
660   switch (op) {
661   case MAT_NO_NEW_NONZERO_LOCATIONS:
662   case MAT_YES_NEW_NONZERO_LOCATIONS:
663   case MAT_NEW_NONZERO_LOCATION_ERR:
664   case MAT_NEW_NONZERO_ALLOCATION_ERR:
665   case MAT_COLUMNS_SORTED:
666   case MAT_COLUMNS_UNSORTED:
667     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
668     break;
669   case MAT_ROW_ORIENTED:
670     a->roworiented = PETSC_TRUE;
671     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
672     break;
673   case MAT_ROWS_SORTED:
674   case MAT_ROWS_UNSORTED:
675   case MAT_YES_NEW_DIAGONALS:
676   case MAT_USE_HASH_TABLE:
677     PetscLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
678     break;
679   case MAT_COLUMN_ORIENTED:
680     a->roworiented = PETSC_FALSE;
681     ierr = MatSetOption(a->A,op);CHKERRQ(ierr);
682     break;
683   case MAT_IGNORE_OFF_PROC_ENTRIES:
684     a->donotstash = PETSC_TRUE;
685     break;
686   case MAT_NO_NEW_DIAGONALS:
687     SETERRQ(PETSC_ERR_SUP,"MAT_NO_NEW_DIAGONALS");
688   default:
689     SETERRQ(PETSC_ERR_SUP,"unknown option");
690   }
691   PetscFunctionReturn(0);
692 }
693 
694 #undef __FUNCT__
695 #define __FUNCT__ "MatGetRow_MPIDense"
696 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,PetscScalar **v)
697 {
698   Mat_MPIDense *mat = (Mat_MPIDense*)A->data;
699   int          lrow,rstart = mat->rstart,rend = mat->rend,ierr;
700 
701   PetscFunctionBegin;
702   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,"only local rows")
703   lrow = row - rstart;
704   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
705   PetscFunctionReturn(0);
706 }
707 
708 #undef __FUNCT__
709 #define __FUNCT__ "MatRestoreRow_MPIDense"
710 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,PetscScalar **v)
711 {
712   int ierr;
713 
714   PetscFunctionBegin;
715   if (idx) {ierr = PetscFree(*idx);CHKERRQ(ierr);}
716   if (v) {ierr = PetscFree(*v);CHKERRQ(ierr);}
717   PetscFunctionReturn(0);
718 }
719 
720 #undef __FUNCT__
721 #define __FUNCT__ "MatDiagonalScale_MPIDense"
722 int MatDiagonalScale_MPIDense(Mat A,Vec ll,Vec rr)
723 {
724   Mat_MPIDense *mdn = (Mat_MPIDense*)A->data;
725   Mat_SeqDense *mat = (Mat_SeqDense*)mdn->A->data;
726   PetscScalar  *l,*r,x,*v;
727   int          ierr,i,j,s2a,s3a,s2,s3,m=mdn->A->m,n=mdn->A->n;
728 
729   PetscFunctionBegin;
730   ierr = MatGetLocalSize(A,&s2,&s3);CHKERRQ(ierr);
731   if (ll) {
732     ierr = VecGetLocalSize(ll,&s2a);CHKERRQ(ierr);
733     if (s2a != s2) SETERRQ(PETSC_ERR_ARG_SIZ,"Left scaling vector non-conforming local size");
734     ierr = VecGetArray(ll,&l);CHKERRQ(ierr);
735     for (i=0; i<m; i++) {
736       x = l[i];
737       v = mat->v + i;
738       for (j=0; j<n; j++) { (*v) *= x; v+= m;}
739     }
740     ierr = VecRestoreArray(ll,&l);CHKERRQ(ierr);
741     PetscLogFlops(n*m);
742   }
743   if (rr) {
744     ierr = VecGetSize(rr,&s3a);CHKERRQ(ierr);
745     if (s3a != s3) SETERRQ(PETSC_ERR_ARG_SIZ,"Right scaling vec non-conforming local size");
746     ierr = VecScatterBegin(rr,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
747     ierr = VecScatterEnd(rr,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
748     ierr = VecGetArray(mdn->lvec,&r);CHKERRQ(ierr);
749     for (i=0; i<n; i++) {
750       x = r[i];
751       v = mat->v + i*m;
752       for (j=0; j<m; j++) { (*v++) *= x;}
753     }
754     ierr = VecRestoreArray(mdn->lvec,&r);CHKERRQ(ierr);
755     PetscLogFlops(n*m);
756   }
757   PetscFunctionReturn(0);
758 }
759 
760 #undef __FUNCT__
761 #define __FUNCT__ "MatNorm_MPIDense"
762 int MatNorm_MPIDense(Mat A,NormType type,PetscReal *nrm)
763 {
764   Mat_MPIDense *mdn = (Mat_MPIDense*)A->data;
765   Mat_SeqDense *mat = (Mat_SeqDense*)mdn->A->data;
766   int          ierr,i,j;
767   PetscReal    sum = 0.0;
768   PetscScalar  *v = mat->v;
769 
770   PetscFunctionBegin;
771   if (mdn->size == 1) {
772     ierr =  MatNorm(mdn->A,type,nrm);CHKERRQ(ierr);
773   } else {
774     if (type == NORM_FROBENIUS) {
775       for (i=0; i<mdn->A->n*mdn->A->m; i++) {
776 #if defined(PETSC_USE_COMPLEX)
777         sum += PetscRealPart(PetscConj(*v)*(*v)); v++;
778 #else
779         sum += (*v)*(*v); v++;
780 #endif
781       }
782       ierr = MPI_Allreduce(&sum,nrm,1,MPIU_REAL,MPI_SUM,A->comm);CHKERRQ(ierr);
783       *nrm = sqrt(*nrm);
784       PetscLogFlops(2*mdn->A->n*mdn->A->m);
785     } else if (type == NORM_1) {
786       PetscReal *tmp,*tmp2;
787       ierr = PetscMalloc(2*A->N*sizeof(PetscReal),&tmp);CHKERRQ(ierr);
788       tmp2 = tmp + A->N;
789       ierr = PetscMemzero(tmp,2*A->N*sizeof(PetscReal));CHKERRQ(ierr);
790       *nrm = 0.0;
791       v = mat->v;
792       for (j=0; j<mdn->A->n; j++) {
793         for (i=0; i<mdn->A->m; i++) {
794           tmp[j] += PetscAbsScalar(*v);  v++;
795         }
796       }
797       ierr = MPI_Allreduce(tmp,tmp2,A->N,MPIU_REAL,MPI_SUM,A->comm);CHKERRQ(ierr);
798       for (j=0; j<A->N; j++) {
799         if (tmp2[j] > *nrm) *nrm = tmp2[j];
800       }
801       ierr = PetscFree(tmp);CHKERRQ(ierr);
802       PetscLogFlops(A->n*A->m);
803     } else if (type == NORM_INFINITY) { /* max row norm */
804       PetscReal ntemp;
805       ierr = MatNorm(mdn->A,type,&ntemp);CHKERRQ(ierr);
806       ierr = MPI_Allreduce(&ntemp,nrm,1,MPIU_REAL,MPI_MAX,A->comm);CHKERRQ(ierr);
807     } else {
808       SETERRQ(PETSC_ERR_SUP,"No support for two norm");
809     }
810   }
811   PetscFunctionReturn(0);
812 }
813 
814 #undef __FUNCT__
815 #define __FUNCT__ "MatTranspose_MPIDense"
816 int MatTranspose_MPIDense(Mat A,Mat *matout)
817 {
818   Mat_MPIDense *a = (Mat_MPIDense*)A->data;
819   Mat_SeqDense *Aloc = (Mat_SeqDense*)a->A->data;
820   Mat          B;
821   int          M = A->M,N = A->N,m,n,*rwork,rstart = a->rstart;
822   int          j,i,ierr;
823   PetscScalar  *v;
824 
825   PetscFunctionBegin;
826   if (!matout && M != N) {
827     SETERRQ(PETSC_ERR_SUP,"Supports square matrix only in-place");
828   }
829   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
830 
831   m = a->A->m; n = a->A->n; v = Aloc->v;
832   ierr = PetscMalloc(n*sizeof(int),&rwork);CHKERRQ(ierr);
833   for (j=0; j<n; j++) {
834     for (i=0; i<m; i++) rwork[i] = rstart + i;
835     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES);CHKERRQ(ierr);
836     v   += m;
837   }
838   ierr = PetscFree(rwork);CHKERRQ(ierr);
839   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
840   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
841   if (matout) {
842     *matout = B;
843   } else {
844     ierr = MatHeaderCopy(A,B);CHKERRQ(ierr);
845   }
846   PetscFunctionReturn(0);
847 }
848 
849 #include "petscblaslapack.h"
850 #undef __FUNCT__
851 #define __FUNCT__ "MatScale_MPIDense"
852 int MatScale_MPIDense(PetscScalar *alpha,Mat inA)
853 {
854   Mat_MPIDense *A = (Mat_MPIDense*)inA->data;
855   Mat_SeqDense *a = (Mat_SeqDense*)A->A->data;
856   int          one = 1,nz;
857 
858   PetscFunctionBegin;
859   nz = inA->m*inA->N;
860   BLscal_(&nz,alpha,a->v,&one);
861   PetscLogFlops(nz);
862   PetscFunctionReturn(0);
863 }
864 
865 static int MatDuplicate_MPIDense(Mat,MatDuplicateOption,Mat *);
866 EXTERN int MatGetSubMatrices_MPIDense(Mat,int,IS *,IS *,MatReuse,Mat **);
867 
868 #undef __FUNCT__
869 #define __FUNCT__ "MatSetUpPreallocation_MPIDense"
870 int MatSetUpPreallocation_MPIDense(Mat A)
871 {
872   int        ierr;
873 
874   PetscFunctionBegin;
875   ierr =  MatMPIDenseSetPreallocation(A,0);CHKERRQ(ierr);
876   PetscFunctionReturn(0);
877 }
878 
879 /* -------------------------------------------------------------------*/
880 static struct _MatOps MatOps_Values = {MatSetValues_MPIDense,
881        MatGetRow_MPIDense,
882        MatRestoreRow_MPIDense,
883        MatMult_MPIDense,
884        MatMultAdd_MPIDense,
885        MatMultTranspose_MPIDense,
886        MatMultTransposeAdd_MPIDense,
887        0,
888        0,
889        0,
890        0,
891        0,
892        0,
893        0,
894        MatTranspose_MPIDense,
895        MatGetInfo_MPIDense,0,
896        MatGetDiagonal_MPIDense,
897        MatDiagonalScale_MPIDense,
898        MatNorm_MPIDense,
899        MatAssemblyBegin_MPIDense,
900        MatAssemblyEnd_MPIDense,
901        0,
902        MatSetOption_MPIDense,
903        MatZeroEntries_MPIDense,
904        MatZeroRows_MPIDense,
905        0,
906        0,
907        0,
908        0,
909        MatSetUpPreallocation_MPIDense,
910        0,
911        0,
912        MatGetArray_MPIDense,
913        MatRestoreArray_MPIDense,
914        MatDuplicate_MPIDense,
915        0,
916        0,
917        0,
918        0,
919        0,
920        MatGetSubMatrices_MPIDense,
921        0,
922        MatGetValues_MPIDense,
923        0,
924        0,
925        MatScale_MPIDense,
926        0,
927        0,
928        0,
929        MatGetBlockSize_MPIDense,
930        0,
931        0,
932        0,
933        0,
934        0,
935        0,
936        0,
937        0,
938        0,
939        MatGetSubMatrix_MPIDense,
940        MatDestroy_MPIDense,
941        MatView_MPIDense,
942        MatGetPetscMaps_Petsc};
943 
944 EXTERN_C_BEGIN
945 #undef __FUNCT__
946 #define __FUNCT__ "MatMPIDenseSetPreallocation_MPIDense"
947 int MatMPIDenseSetPreallocation_MPIDense(Mat mat,PetscScalar *data)
948 {
949   Mat_MPIDense *a;
950   int          ierr;
951 
952   PetscFunctionBegin;
953   mat->preallocated = PETSC_TRUE;
954   /* Note:  For now, when data is specified above, this assumes the user correctly
955    allocates the local dense storage space.  We should add error checking. */
956 
957   a    = (Mat_MPIDense*)mat->data;
958   ierr = MatCreateSeqDense(PETSC_COMM_SELF,mat->m,mat->N,data,&a->A);CHKERRQ(ierr);
959   PetscLogObjectParent(mat,a->A);
960   PetscFunctionReturn(0);
961 }
962 EXTERN_C_END
963 
964 EXTERN_C_BEGIN
965 #undef __FUNCT__
966 #define __FUNCT__ "MatCreate_MPIDense"
967 int MatCreate_MPIDense(Mat mat)
968 {
969   Mat_MPIDense *a;
970   int          ierr,i;
971 
972   PetscFunctionBegin;
973   ierr              = PetscNew(Mat_MPIDense,&a);CHKERRQ(ierr);
974   mat->data         = (void*)a;
975   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
976   mat->factor       = 0;
977   mat->mapping      = 0;
978 
979   a->factor       = 0;
980   mat->insertmode = NOT_SET_VALUES;
981   ierr = MPI_Comm_rank(mat->comm,&a->rank);CHKERRQ(ierr);
982   ierr = MPI_Comm_size(mat->comm,&a->size);CHKERRQ(ierr);
983 
984   ierr = PetscSplitOwnership(mat->comm,&mat->m,&mat->M);CHKERRQ(ierr);
985   ierr = PetscSplitOwnership(mat->comm,&mat->n,&mat->N);CHKERRQ(ierr);
986   a->nvec = mat->n;
987 
988   /* the information in the maps duplicates the information computed below, eventually
989      we should remove the duplicate information that is not contained in the maps */
990   ierr = PetscMapCreateMPI(mat->comm,mat->m,mat->M,&mat->rmap);CHKERRQ(ierr);
991   ierr = PetscMapCreateMPI(mat->comm,mat->n,mat->N,&mat->cmap);CHKERRQ(ierr);
992 
993   /* build local table of row and column ownerships */
994   ierr       = PetscMalloc(2*(a->size+2)*sizeof(int),&a->rowners);CHKERRQ(ierr);
995   a->cowners = a->rowners + a->size + 1;
996   PetscLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
997   ierr = MPI_Allgather(&mat->m,1,MPI_INT,a->rowners+1,1,MPI_INT,mat->comm);CHKERRQ(ierr);
998   a->rowners[0] = 0;
999   for (i=2; i<=a->size; i++) {
1000     a->rowners[i] += a->rowners[i-1];
1001   }
1002   a->rstart = a->rowners[a->rank];
1003   a->rend   = a->rowners[a->rank+1];
1004   ierr      = MPI_Allgather(&mat->n,1,MPI_INT,a->cowners+1,1,MPI_INT,mat->comm);CHKERRQ(ierr);
1005   a->cowners[0] = 0;
1006   for (i=2; i<=a->size; i++) {
1007     a->cowners[i] += a->cowners[i-1];
1008   }
1009 
1010   /* build cache for off array entries formed */
1011   a->donotstash = PETSC_FALSE;
1012   ierr = MatStashCreate_Private(mat->comm,1,&mat->stash);CHKERRQ(ierr);
1013 
1014   /* stuff used for matrix vector multiply */
1015   a->lvec        = 0;
1016   a->Mvctx       = 0;
1017   a->roworiented = PETSC_TRUE;
1018 
1019   ierr = PetscObjectComposeFunctionDynamic((PetscObject)mat,"MatGetDiagonalBlock_C",
1020                                      "MatGetDiagonalBlock_MPIDense",
1021                                      MatGetDiagonalBlock_MPIDense);CHKERRQ(ierr);
1022   ierr = PetscObjectComposeFunctionDynamic((PetscObject)mat,"MatMPIDenseSetPreallocation_C",
1023                                      "MatMPIDenseSetPreallocation_MPIDense",
1024                                      MatMPIDenseSetPreallocation_MPIDense);CHKERRQ(ierr);
1025   PetscFunctionReturn(0);
1026 }
1027 EXTERN_C_END
1028 
1029 #undef __FUNCT__
1030 #define __FUNCT__ "MatMPIDenseSetPreallocation"
1031 /*@C
1032    MatMPIDenseSetPreallocation - Sets the array used to store the matrix entries
1033 
1034    Not collective
1035 
1036    Input Parameters:
1037 .  A - the matrix
1038 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
1039    to control all matrix memory allocation.
1040 
1041    Notes:
1042    The dense format is fully compatible with standard Fortran 77
1043    storage by columns.
1044 
1045    The data input variable is intended primarily for Fortran programmers
1046    who wish to allocate their own matrix memory space.  Most users should
1047    set data=PETSC_NULL.
1048 
1049    Level: intermediate
1050 
1051 .keywords: matrix,dense, parallel
1052 
1053 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
1054 @*/
1055 int MatMPIDenseSetPreallocation(Mat mat,PetscScalar *data)
1056 {
1057   int ierr,(*f)(Mat,PetscScalar *);
1058 
1059   PetscFunctionBegin;
1060   ierr = PetscObjectQueryFunction((PetscObject)mat,"MatMPIDenseSetPreallocation_C",(void (**)(void))&f);CHKERRQ(ierr);
1061   if (f) {
1062     ierr = (*f)(mat,data);CHKERRQ(ierr);
1063   }
1064   PetscFunctionReturn(0);
1065 }
1066 
1067 #undef __FUNCT__
1068 #define __FUNCT__ "MatCreateMPIDense"
1069 /*@C
1070    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
1071 
1072    Collective on MPI_Comm
1073 
1074    Input Parameters:
1075 +  comm - MPI communicator
1076 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1077 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1078 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1079 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1080 -  data - optional location of matrix data.  Set data=PETSC_NULL (PETSC_NULL_SCALAR for Fortran users) for PETSc
1081    to control all matrix memory allocation.
1082 
1083    Output Parameter:
1084 .  A - the matrix
1085 
1086    Notes:
1087    The dense format is fully compatible with standard Fortran 77
1088    storage by columns.
1089 
1090    The data input variable is intended primarily for Fortran programmers
1091    who wish to allocate their own matrix memory space.  Most users should
1092    set data=PETSC_NULL (PETSC_NULL_SCALAR for Fortran users).
1093 
1094    The user MUST specify either the local or global matrix dimensions
1095    (possibly both).
1096 
1097    Level: intermediate
1098 
1099 .keywords: matrix,dense, parallel
1100 
1101 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
1102 @*/
1103 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,PetscScalar *data,Mat *A)
1104 {
1105   int ierr,size;
1106 
1107   PetscFunctionBegin;
1108   ierr = MatCreate(comm,m,n,M,N,A);CHKERRQ(ierr);
1109   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1110   if (size > 1) {
1111     ierr = MatSetType(*A,MATMPIDENSE);CHKERRQ(ierr);
1112     ierr = MatMPIDenseSetPreallocation(*A,data);CHKERRQ(ierr);
1113   } else {
1114     ierr = MatSetType(*A,MATSEQDENSE);CHKERRQ(ierr);
1115     ierr = MatSeqDenseSetPreallocation(*A,data);CHKERRQ(ierr);
1116   }
1117   PetscFunctionReturn(0);
1118 }
1119 
1120 #undef __FUNCT__
1121 #define __FUNCT__ "MatDuplicate_MPIDense"
1122 static int MatDuplicate_MPIDense(Mat A,MatDuplicateOption cpvalues,Mat *newmat)
1123 {
1124   Mat          mat;
1125   Mat_MPIDense *a,*oldmat = (Mat_MPIDense*)A->data;
1126   int          ierr;
1127 
1128   PetscFunctionBegin;
1129   *newmat       = 0;
1130   ierr = MatCreate(A->comm,A->m,A->n,A->M,A->N,&mat);CHKERRQ(ierr);
1131   ierr = MatSetType(mat,MATMPIDENSE);CHKERRQ(ierr);
1132   ierr              = PetscNew(Mat_MPIDense,&a);CHKERRQ(ierr);
1133   mat->data         = (void*)a;
1134   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
1135   mat->factor       = A->factor;
1136   mat->assembled    = PETSC_TRUE;
1137   mat->preallocated = PETSC_TRUE;
1138 
1139   a->rstart       = oldmat->rstart;
1140   a->rend         = oldmat->rend;
1141   a->size         = oldmat->size;
1142   a->rank         = oldmat->rank;
1143   mat->insertmode = NOT_SET_VALUES;
1144   a->nvec         = oldmat->nvec;
1145   a->donotstash   = oldmat->donotstash;
1146   ierr            = PetscMalloc((a->size+1)*sizeof(int),&a->rowners);CHKERRQ(ierr);
1147   PetscLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1148   ierr = PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));CHKERRQ(ierr);
1149   ierr = MatStashCreate_Private(A->comm,1,&mat->stash);CHKERRQ(ierr);
1150 
1151   ierr = MatSetUpMultiply_MPIDense(mat);CHKERRQ(ierr);
1152   ierr = MatDuplicate(oldmat->A,cpvalues,&a->A);CHKERRQ(ierr);
1153   PetscLogObjectParent(mat,a->A);
1154   *newmat = mat;
1155   PetscFunctionReturn(0);
1156 }
1157 
1158 #include "petscsys.h"
1159 
1160 #undef __FUNCT__
1161 #define __FUNCT__ "MatLoad_MPIDense_DenseInFile"
1162 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M,int N,Mat *newmat)
1163 {
1164   int          *rowners,i,size,rank,m,ierr,nz,j;
1165   PetscScalar  *array,*vals,*vals_ptr;
1166   MPI_Status   status;
1167 
1168   PetscFunctionBegin;
1169   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1170   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1171 
1172   /* determine ownership of all rows */
1173   m          = M/size + ((M % size) > rank);
1174   ierr       = PetscMalloc((size+2)*sizeof(int),&rowners);CHKERRQ(ierr);
1175   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1176   rowners[0] = 0;
1177   for (i=2; i<=size; i++) {
1178     rowners[i] += rowners[i-1];
1179   }
1180 
1181   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1182   ierr = MatGetArray(*newmat,&array);CHKERRQ(ierr);
1183 
1184   if (!rank) {
1185     ierr = PetscMalloc(m*N*sizeof(PetscScalar),&vals);CHKERRQ(ierr);
1186 
1187     /* read in my part of the matrix numerical values  */
1188     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR);CHKERRQ(ierr);
1189 
1190     /* insert into matrix-by row (this is why cannot directly read into array */
1191     vals_ptr = vals;
1192     for (i=0; i<m; i++) {
1193       for (j=0; j<N; j++) {
1194         array[i + j*m] = *vals_ptr++;
1195       }
1196     }
1197 
1198     /* read in other processors and ship out */
1199     for (i=1; i<size; i++) {
1200       nz   = (rowners[i+1] - rowners[i])*N;
1201       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1202       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1203     }
1204   } else {
1205     /* receive numeric values */
1206     ierr = PetscMalloc(m*N*sizeof(PetscScalar),&vals);CHKERRQ(ierr);
1207 
1208     /* receive message of values*/
1209     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1210 
1211     /* insert into matrix-by row (this is why cannot directly read into array */
1212     vals_ptr = vals;
1213     for (i=0; i<m; i++) {
1214       for (j=0; j<N; j++) {
1215         array[i + j*m] = *vals_ptr++;
1216       }
1217     }
1218   }
1219   ierr = PetscFree(rowners);CHKERRQ(ierr);
1220   ierr = PetscFree(vals);CHKERRQ(ierr);
1221   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1222   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1223   PetscFunctionReturn(0);
1224 }
1225 
1226 EXTERN_C_BEGIN
1227 #undef __FUNCT__
1228 #define __FUNCT__ "MatLoad_MPIDense"
1229 int MatLoad_MPIDense(PetscViewer viewer,MatType type,Mat *newmat)
1230 {
1231   Mat          A;
1232   PetscScalar  *vals,*svals;
1233   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1234   MPI_Status   status;
1235   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1236   int          *ourlens,*sndcounts = 0,*procsnz = 0,*offlens,jj,*mycols,*smycols;
1237   int          tag = ((PetscObject)viewer)->tag;
1238   int          i,nz,ierr,j,rstart,rend,fd;
1239 
1240   PetscFunctionBegin;
1241   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1242   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1243   if (!rank) {
1244     ierr = PetscViewerBinaryGetDescriptor(viewer,&fd);CHKERRQ(ierr);
1245     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT);CHKERRQ(ierr);
1246     if (header[0] != MAT_FILE_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,"not matrix object");
1247   }
1248 
1249   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1250   M = header[1]; N = header[2]; nz = header[3];
1251 
1252   /*
1253        Handle case where matrix is stored on disk as a dense matrix
1254   */
1255   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1256     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1257     PetscFunctionReturn(0);
1258   }
1259 
1260   /* determine ownership of all rows */
1261   m          = M/size + ((M % size) > rank);
1262   ierr       = PetscMalloc((size+2)*sizeof(int),&rowners);CHKERRQ(ierr);
1263   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1264   rowners[0] = 0;
1265   for (i=2; i<=size; i++) {
1266     rowners[i] += rowners[i-1];
1267   }
1268   rstart = rowners[rank];
1269   rend   = rowners[rank+1];
1270 
1271   /* distribute row lengths to all processors */
1272   ierr    = PetscMalloc(2*(rend-rstart)*sizeof(int),&ourlens);CHKERRQ(ierr);
1273   offlens = ourlens + (rend-rstart);
1274   if (!rank) {
1275     ierr = PetscMalloc(M*sizeof(int),&rowlengths);CHKERRQ(ierr);
1276     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT);CHKERRQ(ierr);
1277     ierr = PetscMalloc(size*sizeof(int),&sndcounts);CHKERRQ(ierr);
1278     for (i=0; i<size; i++) sndcounts[i] = rowners[i+1] - rowners[i];
1279     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1280     ierr = PetscFree(sndcounts);CHKERRQ(ierr);
1281   } else {
1282     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1283   }
1284 
1285   if (!rank) {
1286     /* calculate the number of nonzeros on each processor */
1287     ierr = PetscMalloc(size*sizeof(int),&procsnz);CHKERRQ(ierr);
1288     ierr = PetscMemzero(procsnz,size*sizeof(int));CHKERRQ(ierr);
1289     for (i=0; i<size; i++) {
1290       for (j=rowners[i]; j< rowners[i+1]; j++) {
1291         procsnz[i] += rowlengths[j];
1292       }
1293     }
1294     ierr = PetscFree(rowlengths);CHKERRQ(ierr);
1295 
1296     /* determine max buffer needed and allocate it */
1297     maxnz = 0;
1298     for (i=0; i<size; i++) {
1299       maxnz = PetscMax(maxnz,procsnz[i]);
1300     }
1301     ierr = PetscMalloc(maxnz*sizeof(int),&cols);CHKERRQ(ierr);
1302 
1303     /* read in my part of the matrix column indices  */
1304     nz = procsnz[0];
1305     ierr = PetscMalloc(nz*sizeof(int),&mycols);CHKERRQ(ierr);
1306     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT);CHKERRQ(ierr);
1307 
1308     /* read in every one elses and ship off */
1309     for (i=1; i<size; i++) {
1310       nz   = procsnz[i];
1311       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT);CHKERRQ(ierr);
1312       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1313     }
1314     ierr = PetscFree(cols);CHKERRQ(ierr);
1315   } else {
1316     /* determine buffer space needed for message */
1317     nz = 0;
1318     for (i=0; i<m; i++) {
1319       nz += ourlens[i];
1320     }
1321     ierr = PetscMalloc(nz*sizeof(int),&mycols);CHKERRQ(ierr);
1322 
1323     /* receive message of column indices*/
1324     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1325     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1326     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,"something is wrong with file");
1327   }
1328 
1329   /* loop over local rows, determining number of off diagonal entries */
1330   ierr = PetscMemzero(offlens,m*sizeof(int));CHKERRQ(ierr);
1331   jj = 0;
1332   for (i=0; i<m; i++) {
1333     for (j=0; j<ourlens[i]; j++) {
1334       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1335       jj++;
1336     }
1337   }
1338 
1339   /* create our matrix */
1340   for (i=0; i<m; i++) {
1341     ourlens[i] -= offlens[i];
1342   }
1343   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1344   A = *newmat;
1345   for (i=0; i<m; i++) {
1346     ourlens[i] += offlens[i];
1347   }
1348 
1349   if (!rank) {
1350     ierr = PetscMalloc(maxnz*sizeof(PetscScalar),&vals);CHKERRQ(ierr);
1351 
1352     /* read in my part of the matrix numerical values  */
1353     nz = procsnz[0];
1354     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1355 
1356     /* insert into matrix */
1357     jj      = rstart;
1358     smycols = mycols;
1359     svals   = vals;
1360     for (i=0; i<m; i++) {
1361       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1362       smycols += ourlens[i];
1363       svals   += ourlens[i];
1364       jj++;
1365     }
1366 
1367     /* read in other processors and ship out */
1368     for (i=1; i<size; i++) {
1369       nz   = procsnz[i];
1370       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1371       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1372     }
1373     ierr = PetscFree(procsnz);CHKERRQ(ierr);
1374   } else {
1375     /* receive numeric values */
1376     ierr = PetscMalloc(nz*sizeof(PetscScalar),&vals);CHKERRQ(ierr);
1377 
1378     /* receive message of values*/
1379     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1380     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1381     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,"something is wrong with file");
1382 
1383     /* insert into matrix */
1384     jj      = rstart;
1385     smycols = mycols;
1386     svals   = vals;
1387     for (i=0; i<m; i++) {
1388       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1389       smycols += ourlens[i];
1390       svals   += ourlens[i];
1391       jj++;
1392     }
1393   }
1394   ierr = PetscFree(ourlens);CHKERRQ(ierr);
1395   ierr = PetscFree(vals);CHKERRQ(ierr);
1396   ierr = PetscFree(mycols);CHKERRQ(ierr);
1397   ierr = PetscFree(rowners);CHKERRQ(ierr);
1398 
1399   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1400   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1401   PetscFunctionReturn(0);
1402 }
1403 EXTERN_C_END
1404 
1405 
1406 
1407 
1408 
1409