xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision be0abb6ddea392ceaee217d3645fed7c6ea71822)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.123 1999/08/10 19:16:58 balay Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 EXTERN_C_BEGIN
13 #undef __FUNC__
14 #define __FUNC__ "MatGetDiagonalBlock_MPIDense"
15 int MatGetDiagonalBlock_MPIDense(Mat A,PetscTruth *iscopy,MatReuse reuse,Mat *B)
16 {
17   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
18   int          m = mdn->m,rstart = mdn->rstart,rank,ierr;
19   Scalar       *array;
20   MPI_Comm     comm;
21 
22   PetscFunctionBegin;
23   if (mdn->M != mdn->N) SETERRQ(PETSC_ERR_SUP,0,"Only square matrices supported.");
24 
25   /* The reuse aspect is not implemented efficiently */
26   if (reuse) { ierr = MatDestroy(*B);CHKERRQ(ierr);}
27 
28   ierr = MPI_Comm_rank(A->comm,&rank);CHKERRQ(ierr);
29   ierr = PetscObjectGetComm((PetscObject)(mdn->A),&comm);CHKERRQ(ierr);
30   ierr = MatGetArray(mdn->A,&array);CHKERRQ(ierr);
31   ierr = MatCreateSeqDense(comm,m,m,array+m*rstart,B);CHKERRQ(ierr);
32   ierr = MatRestoreArray(mdn->A,&array);CHKERRQ(ierr);
33   ierr = MatAssemblyBegin(*B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
34   ierr = MatAssemblyEnd(*B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
35 
36   *iscopy = PETSC_TRUE;
37   PetscFunctionReturn(0);
38 }
39 EXTERN_C_END
40 
41 extern int MatSetUpMultiply_MPIDense(Mat);
42 
43 #undef __FUNC__
44 #define __FUNC__ "MatSetValues_MPIDense"
45 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
46 {
47   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
48   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
49   int          roworiented = A->roworiented;
50 
51   PetscFunctionBegin;
52   for ( i=0; i<m; i++ ) {
53     if (idxm[i] < 0) continue;
54     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
55     if (idxm[i] >= rstart && idxm[i] < rend) {
56       row = idxm[i] - rstart;
57       if (roworiented) {
58         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv);CHKERRQ(ierr);
59       } else {
60         for ( j=0; j<n; j++ ) {
61           if (idxn[j] < 0) continue;
62           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
63           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
64         }
65       }
66     } else {
67       if ( !A->donotstash) {
68         if (roworiented) {
69           ierr = MatStashValuesRow_Private(&mat->stash,idxm[i],n,idxn,v+i*n);CHKERRQ(ierr);
70         } else {
71           ierr = MatStashValuesCol_Private(&mat->stash,idxm[i],n,idxn,v+i,m);CHKERRQ(ierr);
72         }
73       }
74     }
75   }
76   PetscFunctionReturn(0);
77 }
78 
79 #undef __FUNC__
80 #define __FUNC__ "MatGetValues_MPIDense"
81 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
82 {
83   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
84   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
85 
86   PetscFunctionBegin;
87   for ( i=0; i<m; i++ ) {
88     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
89     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
90     if (idxm[i] >= rstart && idxm[i] < rend) {
91       row = idxm[i] - rstart;
92       for ( j=0; j<n; j++ ) {
93         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
94         if (idxn[j] >= mdn->N) {
95           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
96         }
97         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j);CHKERRQ(ierr);
98       }
99     } else {
100       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
101     }
102   }
103   PetscFunctionReturn(0);
104 }
105 
106 #undef __FUNC__
107 #define __FUNC__ "MatGetArray_MPIDense"
108 int MatGetArray_MPIDense(Mat A,Scalar **array)
109 {
110   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
111   int          ierr;
112 
113   PetscFunctionBegin;
114   ierr = MatGetArray(a->A,array);CHKERRQ(ierr);
115   PetscFunctionReturn(0);
116 }
117 
118 #undef __FUNC__
119 #define __FUNC__ "MatGetSubMatrix_MPIDense"
120 static int MatGetSubMatrix_MPIDense(Mat A,IS isrow,IS iscol,int cs,MatReuse scall,Mat *B)
121 {
122   Mat_MPIDense *mat = (Mat_MPIDense *) A->data, *newmatd;
123   Mat_SeqDense *lmat = (Mat_SeqDense *) mat->A->data;
124   int          i, j, ierr, *irow, *icol, rstart, rend, nrows, ncols, nlrows, nlcols, rank;
125   Scalar       *av, *bv, *v = lmat->v;
126   Mat          newmat;
127 
128   PetscFunctionBegin;
129   ierr = MPI_Comm_rank(A->comm,&rank);CHKERRQ(ierr);
130   ierr = ISGetIndices(isrow,&irow);CHKERRQ(ierr);
131   ierr = ISGetIndices(iscol,&icol);CHKERRQ(ierr);
132   ierr = ISGetSize(isrow,&nrows);CHKERRQ(ierr);
133   ierr = ISGetSize(iscol,&ncols);CHKERRQ(ierr);
134 
135   /* No parallel redistribution currently supported! Should really check each index set
136      to comfirm that it is OK.  ... Currently supports only submatrix same partitioning as
137      original matrix! */
138 
139   ierr = MatGetLocalSize(A,&nlrows,&nlcols);CHKERRQ(ierr);
140   ierr = MatGetOwnershipRange(A,&rstart,&rend);CHKERRQ(ierr);
141 
142   /* Check submatrix call */
143   if (scall == MAT_REUSE_MATRIX) {
144     /* SETERRQ(PETSC_ERR_ARG_SIZ,0,"Reused submatrix wrong size"); */
145     /* Really need to test rows and column sizes! */
146     newmat = *B;
147   } else {
148     /* Create and fill new matrix */
149     ierr = MatCreateMPIDense(A->comm,nrows,ncols,PETSC_DECIDE,PETSC_DECIDE,PETSC_NULL,&newmat);CHKERRQ(ierr);
150   }
151 
152   /* Now extract the data pointers and do the copy, column at a time */
153   newmatd = (Mat_MPIDense *) newmat->data;
154   bv = ((Mat_SeqDense *)newmatd->A->data)->v;
155 
156   for ( i=0; i<ncols; i++ ) {
157     av = v + nlrows*icol[i];
158     for (j=0; j<nrows; j++ ) {
159       *bv++ = av[irow[j] - rstart];
160     }
161   }
162 
163   /* Assemble the matrices so that the correct flags are set */
164   ierr = MatAssemblyBegin(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
165   ierr = MatAssemblyEnd(newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
166 
167   /* Free work space */
168   ierr = ISRestoreIndices(isrow,&irow);CHKERRQ(ierr);
169   ierr = ISRestoreIndices(iscol,&icol);CHKERRQ(ierr);
170   *B = newmat;
171   PetscFunctionReturn(0);
172 }
173 
174 #undef __FUNC__
175 #define __FUNC__ "MatRestoreArray_MPIDense"
176 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
177 {
178   PetscFunctionBegin;
179   PetscFunctionReturn(0);
180 }
181 
182 #undef __FUNC__
183 #define __FUNC__ "MatAssemblyBegin_MPIDense"
184 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
185 {
186   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
187   MPI_Comm     comm = mat->comm;
188   int          ierr,nstash,reallocs;
189   InsertMode   addv;
190 
191   PetscFunctionBegin;
192   /* make sure all processors are either in INSERTMODE or ADDMODE */
193   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
194   if (addv == (ADD_VALUES|INSERT_VALUES)) {
195     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
196   }
197   mat->insertmode = addv; /* in case this processor had no cache */
198 
199   ierr = MatStashScatterBegin_Private(&mat->stash,mdn->rowners);CHKERRQ(ierr);
200   ierr = MatStashGetInfo_Private(&mat->stash,&nstash,&reallocs);CHKERRQ(ierr);
201   PLogInfo(mdn->A,"MatAssemblyBegin_MPIDense:Stash has %d entries, uses %d mallocs.\n",
202            nstash,reallocs);
203   PetscFunctionReturn(0);
204 }
205 
206 #undef __FUNC__
207 #define __FUNC__ "MatAssemblyEnd_MPIDense"
208 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
209 {
210   Mat_MPIDense *mdn=(Mat_MPIDense*)mat->data;
211   int          i,n,ierr,*row,*col,flg,j,rstart,ncols;
212   Scalar       *val;
213   InsertMode   addv=mat->insertmode;
214 
215   PetscFunctionBegin;
216   /*  wait on receives */
217   while (1) {
218     ierr = MatStashScatterGetMesg_Private(&mat->stash,&n,&row,&col,&val,&flg);CHKERRQ(ierr);
219     if (!flg) break;
220 
221     for ( i=0; i<n; ) {
222       /* Now identify the consecutive vals belonging to the same row */
223       for ( j=i,rstart=row[j]; j<n; j++ ) { if (row[j] != rstart) break; }
224       if (j < n) ncols = j-i;
225       else       ncols = n-i;
226       /* Now assemble all these values with a single function call */
227       ierr = MatSetValues_MPIDense(mat,1,row+i,ncols,col+i,val+i,addv);CHKERRQ(ierr);
228       i = j;
229     }
230   }
231   ierr = MatStashScatterEnd_Private(&mat->stash);CHKERRQ(ierr);
232 
233   ierr = MatAssemblyBegin(mdn->A,mode);CHKERRQ(ierr);
234   ierr = MatAssemblyEnd(mdn->A,mode);CHKERRQ(ierr);
235 
236   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
237     ierr = MatSetUpMultiply_MPIDense(mat);CHKERRQ(ierr);
238   }
239   PetscFunctionReturn(0);
240 }
241 
242 #undef __FUNC__
243 #define __FUNC__ "MatZeroEntries_MPIDense"
244 int MatZeroEntries_MPIDense(Mat A)
245 {
246   int          ierr;
247   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
248 
249   PetscFunctionBegin;
250   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
251   PetscFunctionReturn(0);
252 }
253 
254 #undef __FUNC__
255 #define __FUNC__ "MatGetBlockSize_MPIDense"
256 int MatGetBlockSize_MPIDense(Mat A,int *bs)
257 {
258   PetscFunctionBegin;
259   *bs = 1;
260   PetscFunctionReturn(0);
261 }
262 
263 /* the code does not do the diagonal entries correctly unless the
264    matrix is square and the column and row owerships are identical.
265    This is a BUG. The only way to fix it seems to be to access
266    mdn->A and mdn->B directly and not through the MatZeroRows()
267    routine.
268 */
269 #undef __FUNC__
270 #define __FUNC__ "MatZeroRows_MPIDense"
271 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
272 {
273   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
274   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
275   int            *procs,*nprocs,j,found,idx,nsends,*work;
276   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
277   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
278   int            *lens,imdex,*lrows,*values;
279   MPI_Comm       comm = A->comm;
280   MPI_Request    *send_waits,*recv_waits;
281   MPI_Status     recv_status,*send_status;
282   IS             istmp;
283 
284   PetscFunctionBegin;
285   ierr = ISGetSize(is,&N);CHKERRQ(ierr);
286   ierr = ISGetIndices(is,&rows);CHKERRQ(ierr);
287 
288   /*  first count number of contributors to each processor */
289   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) );CHKPTRQ(nprocs);
290   ierr   = PetscMemzero(nprocs,2*size*sizeof(int));CHKERRQ(ierr);
291   procs  = nprocs + size;
292   owner  = (int *) PetscMalloc((N+1)*sizeof(int));CHKPTRQ(owner); /* see note*/
293   for ( i=0; i<N; i++ ) {
294     idx = rows[i];
295     found = 0;
296     for ( j=0; j<size; j++ ) {
297       if (idx >= owners[j] && idx < owners[j+1]) {
298         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
299       }
300     }
301     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
302   }
303   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
304 
305   /* inform other processors of number of messages and max length*/
306   work   = (int *) PetscMalloc( size*sizeof(int) );CHKPTRQ(work);
307   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
308   nrecvs = work[rank];
309   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
310   nmax   = work[rank];
311   ierr = PetscFree(work);CHKERRQ(ierr);
312 
313   /* post receives:   */
314   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
315   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
316   for ( i=0; i<nrecvs; i++ ) {
317     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
318   }
319 
320   /* do sends:
321       1) starts[i] gives the starting index in svalues for stuff going to
322          the ith processor
323   */
324   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) );CHKPTRQ(svalues);
325   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
326   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) );CHKPTRQ(starts);
327   starts[0]  = 0;
328   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
329   for ( i=0; i<N; i++ ) {
330     svalues[starts[owner[i]]++] = rows[i];
331   }
332   ISRestoreIndices(is,&rows);
333 
334   starts[0] = 0;
335   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
336   count = 0;
337   for ( i=0; i<size; i++ ) {
338     if (procs[i]) {
339       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
340     }
341   }
342   ierr = PetscFree(starts);CHKERRQ(ierr);
343 
344   base = owners[rank];
345 
346   /*  wait on receives */
347   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) );CHKPTRQ(lens);
348   source = lens + nrecvs;
349   count  = nrecvs; slen = 0;
350   while (count) {
351     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
352     /* unpack receives into our local space */
353     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
354     source[imdex]  = recv_status.MPI_SOURCE;
355     lens[imdex]  = n;
356     slen += n;
357     count--;
358   }
359   ierr = PetscFree(recv_waits);CHKERRQ(ierr);
360 
361   /* move the data into the send scatter */
362   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) );CHKPTRQ(lrows);
363   count = 0;
364   for ( i=0; i<nrecvs; i++ ) {
365     values = rvalues + i*nmax;
366     for ( j=0; j<lens[i]; j++ ) {
367       lrows[count++] = values[j] - base;
368     }
369   }
370   ierr = PetscFree(rvalues);CHKERRQ(ierr);
371   ierr = PetscFree(lens);CHKERRQ(ierr);
372   ierr = PetscFree(owner);CHKERRQ(ierr);
373   ierr = PetscFree(nprocs);CHKERRQ(ierr);
374 
375   /* actually zap the local rows */
376   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
377   PLogObjectParent(A,istmp);
378   ierr = PetscFree(lrows);CHKERRQ(ierr);
379   ierr = MatZeroRows(l->A,istmp,diag);CHKERRQ(ierr);
380   ierr = ISDestroy(istmp);CHKERRQ(ierr);
381 
382   /* wait on sends */
383   if (nsends) {
384     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
385     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
386     ierr = PetscFree(send_status);CHKERRQ(ierr);
387   }
388   ierr = PetscFree(send_waits);CHKERRQ(ierr);
389   ierr = PetscFree(svalues);CHKERRQ(ierr);
390 
391   PetscFunctionReturn(0);
392 }
393 
394 #undef __FUNC__
395 #define __FUNC__ "MatMult_MPIDense"
396 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
397 {
398   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
399   int          ierr;
400 
401   PetscFunctionBegin;
402   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
403   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
404   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy);CHKERRQ(ierr);
405   PetscFunctionReturn(0);
406 }
407 
408 #undef __FUNC__
409 #define __FUNC__ "MatMultAdd_MPIDense"
410 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
411 {
412   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
413   int          ierr;
414 
415   PetscFunctionBegin;
416   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
417   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
418   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz);CHKERRQ(ierr);
419   PetscFunctionReturn(0);
420 }
421 
422 #undef __FUNC__
423 #define __FUNC__ "MatMultTrans_MPIDense"
424 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
425 {
426   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
427   int          ierr;
428   Scalar       zero = 0.0;
429 
430   PetscFunctionBegin;
431   ierr = VecSet(&zero,yy);CHKERRQ(ierr);
432   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
433   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
434   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
435   PetscFunctionReturn(0);
436 }
437 
438 #undef __FUNC__
439 #define __FUNC__ "MatMultTransAdd_MPIDense"
440 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
441 {
442   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
443   int          ierr;
444 
445   PetscFunctionBegin;
446   ierr = VecCopy(yy,zz);CHKERRQ(ierr);
447   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec);CHKERRQ(ierr);
448   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
449   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
450   PetscFunctionReturn(0);
451 }
452 
453 #undef __FUNC__
454 #define __FUNC__ "MatGetDiagonal_MPIDense"
455 int MatGetDiagonal_MPIDense(Mat A,Vec v)
456 {
457   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
458   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
459   int          ierr, len, i, n, m = a->m, radd;
460   Scalar       *x, zero = 0.0;
461 
462   PetscFunctionBegin;
463   VecSet(&zero,v);
464   ierr = VecGetArray(v,&x);CHKERRQ(ierr);
465   ierr = VecGetSize(v,&n);CHKERRQ(ierr);
466   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
467   len = PetscMin(aloc->m,aloc->n);
468   radd = a->rstart*m;
469   for ( i=0; i<len; i++ ) {
470     x[i] = aloc->v[radd + i*m + i];
471   }
472   ierr = VecRestoreArray(v,&x);CHKERRQ(ierr);
473   PetscFunctionReturn(0);
474 }
475 
476 #undef __FUNC__
477 #define __FUNC__ "MatDestroy_MPIDense"
478 int MatDestroy_MPIDense(Mat mat)
479 {
480   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
481   int          ierr;
482 
483   PetscFunctionBegin;
484 
485   if (mat->mapping) {
486     ierr = ISLocalToGlobalMappingDestroy(mat->mapping);CHKERRQ(ierr);
487   }
488   if (mat->bmapping) {
489     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping);CHKERRQ(ierr);
490   }
491 #if defined(PETSC_USE_LOG)
492   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mdn->M,mdn->N);
493 #endif
494   ierr = MatStashDestroy_Private(&mat->stash);CHKERRQ(ierr);
495   ierr = PetscFree(mdn->rowners);CHKERRQ(ierr);
496   ierr = MatDestroy(mdn->A);CHKERRQ(ierr);
497   if (mdn->lvec)   VecDestroy(mdn->lvec);
498   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
499   if (mdn->factor) {
500     if (mdn->factor->temp)   {ierr = PetscFree(mdn->factor->temp);CHKERRQ(ierr);}
501     if (mdn->factor->tag)    {ierr = PetscFree(mdn->factor->tag);CHKERRQ(ierr);}
502     if (mdn->factor->pivots) {ierr = PetscFree(mdn->factor->pivots);CHKERRQ(ierr);}
503     ierr = PetscFree(mdn->factor);CHKERRQ(ierr);
504   }
505   ierr = PetscFree(mdn);CHKERRQ(ierr);
506   if (mat->rmap) {
507     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
508   }
509   if (mat->cmap) {
510     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
511   }
512   PLogObjectDestroy(mat);
513   PetscHeaderDestroy(mat);
514   PetscFunctionReturn(0);
515 }
516 
517 #undef __FUNC__
518 #define __FUNC__ "MatView_MPIDense_Binary"
519 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
520 {
521   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
522   int          ierr;
523 
524   PetscFunctionBegin;
525   if (mdn->size == 1) {
526     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
527   }
528   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
529   PetscFunctionReturn(0);
530 }
531 
532 #undef __FUNC__
533 #define __FUNC__ "MatView_MPIDense_ASCII"
534 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
535 {
536   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
537   int          ierr, format, size = mdn->size, rank = mdn->rank;
538   FILE         *fd;
539   ViewerType   vtype;
540 
541   PetscFunctionBegin;
542   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
543   ierr = ViewerASCIIGetPointer(viewer,&fd);CHKERRQ(ierr);
544   ierr = ViewerGetFormat(viewer,&format);CHKERRQ(ierr);
545   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
546     MatInfo info;
547     ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
548     PetscSequentialPhaseBegin(mat->comm,1);
549       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
550          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
551       fflush(fd);
552     PetscSequentialPhaseEnd(mat->comm,1);
553     ierr = VecScatterView(mdn->Mvctx,viewer);CHKERRQ(ierr);
554     PetscFunctionReturn(0);
555   } else if (format == VIEWER_FORMAT_ASCII_INFO) {
556     PetscFunctionReturn(0);
557   }
558 
559   if (size == 1) {
560     ierr = MatView(mdn->A,viewer);CHKERRQ(ierr);
561   } else {
562     /* assemble the entire matrix onto first processor. */
563     Mat          A;
564     int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
565     Scalar       *vals;
566     Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
567 
568     if (!rank) {
569       ierr = MatCreateMPIDense(mat->comm,M,mdn->nvec,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
570     } else {
571       ierr = MatCreateMPIDense(mat->comm,0,mdn->nvec,M,N,PETSC_NULL,&A);CHKERRQ(ierr);
572     }
573     PLogObjectParent(mat,A);
574 
575     /* Copy the matrix ... This isn't the most efficient means,
576        but it's quick for now */
577     row = mdn->rstart; m = Amdn->m;
578     for ( i=0; i<m; i++ ) {
579       ierr = MatGetRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
580       ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
581       ierr = MatRestoreRow(mat,row,&nz,&cols,&vals);CHKERRQ(ierr);
582       row++;
583     }
584 
585     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
586     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
587     if (!rank) {
588       ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer);CHKERRQ(ierr);
589     }
590     ierr = MatDestroy(A);CHKERRQ(ierr);
591   }
592   PetscFunctionReturn(0);
593 }
594 
595 #undef __FUNC__
596 #define __FUNC__ "MatView_MPIDense"
597 int MatView_MPIDense(Mat mat,Viewer viewer)
598 {
599   int          ierr;
600   ViewerType   vtype;
601 
602   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
603   if (PetscTypeCompare(vtype,ASCII_VIEWER)) {
604     ierr = MatView_MPIDense_ASCII(mat,viewer);CHKERRQ(ierr);
605   } else if (PetscTypeCompare(vtype,BINARY_VIEWER)) {
606     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
607   } else {
608     SETERRQ(1,1,"Viewer type not supported by PETSc object");
609   }
610   PetscFunctionReturn(0);
611 }
612 
613 #undef __FUNC__
614 #define __FUNC__ "MatGetInfo_MPIDense"
615 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
616 {
617   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
618   Mat          mdn = mat->A;
619   int          ierr;
620   double       isend[5], irecv[5];
621 
622   PetscFunctionBegin;
623   info->rows_global    = (double)mat->M;
624   info->columns_global = (double)mat->N;
625   info->rows_local     = (double)mat->m;
626   info->columns_local  = (double)mat->N;
627   info->block_size     = 1.0;
628   ierr = MatGetInfo(mdn,MAT_LOCAL,info);CHKERRQ(ierr);
629   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
630   isend[3] = info->memory;  isend[4] = info->mallocs;
631   if (flag == MAT_LOCAL) {
632     info->nz_used      = isend[0];
633     info->nz_allocated = isend[1];
634     info->nz_unneeded  = isend[2];
635     info->memory       = isend[3];
636     info->mallocs      = isend[4];
637   } else if (flag == MAT_GLOBAL_MAX) {
638     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
639     info->nz_used      = irecv[0];
640     info->nz_allocated = irecv[1];
641     info->nz_unneeded  = irecv[2];
642     info->memory       = irecv[3];
643     info->mallocs      = irecv[4];
644   } else if (flag == MAT_GLOBAL_SUM) {
645     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
646     info->nz_used      = irecv[0];
647     info->nz_allocated = irecv[1];
648     info->nz_unneeded  = irecv[2];
649     info->memory       = irecv[3];
650     info->mallocs      = irecv[4];
651   }
652   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
653   info->fill_ratio_needed = 0;
654   info->factor_mallocs    = 0;
655   PetscFunctionReturn(0);
656 }
657 
658 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
659    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
660    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
661    extern int MatSolve_MPIDense(Mat,Vec,Vec);
662    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
663    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
664    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
665 
666 #undef __FUNC__
667 #define __FUNC__ "MatSetOption_MPIDense"
668 int MatSetOption_MPIDense(Mat A,MatOption op)
669 {
670   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
671 
672   PetscFunctionBegin;
673   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
674       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
675       op == MAT_NEW_NONZERO_LOCATION_ERR ||
676       op == MAT_NEW_NONZERO_ALLOCATION_ERR ||
677       op == MAT_COLUMNS_SORTED ||
678       op == MAT_COLUMNS_UNSORTED) {
679         MatSetOption(a->A,op);
680   } else if (op == MAT_ROW_ORIENTED) {
681         a->roworiented = 1;
682         MatSetOption(a->A,op);
683   } else if (op == MAT_ROWS_SORTED ||
684              op == MAT_ROWS_UNSORTED ||
685              op == MAT_SYMMETRIC ||
686              op == MAT_STRUCTURALLY_SYMMETRIC ||
687              op == MAT_YES_NEW_DIAGONALS ||
688              op == MAT_USE_HASH_TABLE) {
689     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
690   } else if (op == MAT_COLUMN_ORIENTED) {
691     a->roworiented = 0; MatSetOption(a->A,op);
692   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
693     a->donotstash = 1;
694   } else if (op == MAT_NO_NEW_DIAGONALS) {
695     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
696   } else {
697     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
698   }
699   PetscFunctionReturn(0);
700 }
701 
702 #undef __FUNC__
703 #define __FUNC__ "MatGetSize_MPIDense"
704 int MatGetSize_MPIDense(Mat A,int *m,int *n)
705 {
706   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
707 
708   PetscFunctionBegin;
709   *m = mat->M; *n = mat->N;
710   PetscFunctionReturn(0);
711 }
712 
713 #undef __FUNC__
714 #define __FUNC__ "MatGetLocalSize_MPIDense"
715 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
716 {
717   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
718 
719   PetscFunctionBegin;
720   *m = mat->m; *n = mat->N;
721   PetscFunctionReturn(0);
722 }
723 
724 #undef __FUNC__
725 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
726 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
727 {
728   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
729 
730   PetscFunctionBegin;
731   *m = mat->rstart; *n = mat->rend;
732   PetscFunctionReturn(0);
733 }
734 
735 #undef __FUNC__
736 #define __FUNC__ "MatGetRow_MPIDense"
737 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
738 {
739   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
740   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
741 
742   PetscFunctionBegin;
743   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
744   lrow = row - rstart;
745   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
746   PetscFunctionReturn(0);
747 }
748 
749 #undef __FUNC__
750 #define __FUNC__ "MatRestoreRow_MPIDense"
751 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
752 {
753   int ierr;
754 
755   PetscFunctionBegin;
756   if (idx) {ierr = PetscFree(*idx);CHKERRQ(ierr);}
757   if (v) {ierr = PetscFree(*v);CHKERRQ(ierr);}
758   PetscFunctionReturn(0);
759 }
760 
761 #undef __FUNC__
762 #define __FUNC__ "MatDiagonalScale_MPIDense"
763 int MatDiagonalScale_MPIDense(Mat A,Vec ll,Vec rr)
764 {
765   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
766   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
767   Scalar       *l,*r,x,*v;
768   int          ierr,i,j,s2a,s3a,s2,s3,m=mat->m,n=mat->n;
769 
770   PetscFunctionBegin;
771   ierr = MatGetLocalSize(A,&s2,&s3);CHKERRQ(ierr);
772   if (ll) {
773     ierr = VecGetLocalSize(ll,&s2a);CHKERRQ(ierr);
774     if (s2a != s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Left scaling vector non-conforming local size");
775     ierr = VecGetArray(ll,&l);CHKERRQ(ierr);
776     for ( i=0; i<m; i++ ) {
777       x = l[i];
778       v = mat->v + i;
779       for ( j=0; j<n; j++ ) { (*v) *= x; v+= m;}
780     }
781     ierr = VecRestoreArray(ll,&l);CHKERRQ(ierr);
782     PLogFlops(n*m);
783   }
784   if (rr) {
785     ierr = VecGetSize(rr,&s3a);CHKERRQ(ierr);
786     if (s3a != s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Right scaling vec non-conforming local size");
787     ierr = VecScatterBegin(rr,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
788     ierr = VecScatterEnd(rr,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
789     ierr = VecGetArray(mdn->lvec,&r);CHKERRQ(ierr);
790     for ( i=0; i<n; i++ ) {
791       x = r[i];
792       v = mat->v + i*m;
793       for ( j=0; j<m; j++ ) { (*v++) *= x;}
794     }
795     ierr = VecRestoreArray(mdn->lvec,&r);CHKERRQ(ierr);
796     PLogFlops(n*m);
797   }
798   PetscFunctionReturn(0);
799 }
800 
801 #undef __FUNC__
802 #define __FUNC__ "MatNorm_MPIDense"
803 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
804 {
805   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
806   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
807   int          ierr, i, j;
808   double       sum = 0.0;
809   Scalar       *v = mat->v;
810 
811   PetscFunctionBegin;
812   if (mdn->size == 1) {
813     ierr =  MatNorm(mdn->A,type,norm);CHKERRQ(ierr);
814   } else {
815     if (type == NORM_FROBENIUS) {
816       for (i=0; i<mat->n*mat->m; i++ ) {
817 #if defined(PETSC_USE_COMPLEX)
818         sum += PetscReal(PetscConj(*v)*(*v)); v++;
819 #else
820         sum += (*v)*(*v); v++;
821 #endif
822       }
823       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
824       *norm = sqrt(*norm);
825       PLogFlops(2*mat->n*mat->m);
826     } else if (type == NORM_1) {
827       double *tmp, *tmp2;
828       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) );CHKPTRQ(tmp);
829       tmp2 = tmp + mdn->N;
830       ierr = PetscMemzero(tmp,2*mdn->N*sizeof(double));CHKERRQ(ierr);
831       *norm = 0.0;
832       v = mat->v;
833       for ( j=0; j<mat->n; j++ ) {
834         for ( i=0; i<mat->m; i++ ) {
835           tmp[j] += PetscAbsScalar(*v);  v++;
836         }
837       }
838       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
839       for ( j=0; j<mdn->N; j++ ) {
840         if (tmp2[j] > *norm) *norm = tmp2[j];
841       }
842       ierr = PetscFree(tmp);CHKERRQ(ierr);
843       PLogFlops(mat->n*mat->m);
844     } else if (type == NORM_INFINITY) { /* max row norm */
845       double ntemp;
846       ierr = MatNorm(mdn->A,type,&ntemp);CHKERRQ(ierr);
847       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
848     } else {
849       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
850     }
851   }
852   PetscFunctionReturn(0);
853 }
854 
855 #undef __FUNC__
856 #define __FUNC__ "MatTranspose_MPIDense"
857 int MatTranspose_MPIDense(Mat A,Mat *matout)
858 {
859   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
860   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
861   Mat          B;
862   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
863   int          j, i, ierr;
864   Scalar       *v;
865 
866   PetscFunctionBegin;
867   if (matout == PETSC_NULL && M != N) {
868     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
869   }
870   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
871 
872   m = Aloc->m; n = Aloc->n; v = Aloc->v;
873   rwork = (int *) PetscMalloc(n*sizeof(int));CHKPTRQ(rwork);
874   for ( j=0; j<n; j++ ) {
875     for (i=0; i<m; i++) rwork[i] = rstart + i;
876     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES);CHKERRQ(ierr);
877     v   += m;
878   }
879   ierr = PetscFree(rwork);CHKERRQ(ierr);
880   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
881   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
882   if (matout != PETSC_NULL) {
883     *matout = B;
884   } else {
885     PetscOps *Abops;
886     MatOps   Aops;
887 
888     /* This isn't really an in-place transpose, but free data struct from a */
889     ierr = PetscFree(a->rowners);CHKERRQ(ierr);
890     ierr = MatDestroy(a->A);CHKERRQ(ierr);
891     if (a->lvec) VecDestroy(a->lvec);
892     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
893     ierr = PetscFree(a);CHKERRQ(ierr);
894 
895     /*
896          This is horrible, horrible code. We need to keep the
897       A pointers for the bops and ops but copy everything
898       else from C.
899     */
900     Abops   = A->bops;
901     Aops    = A->ops;
902     ierr    = PetscMemcpy(A,B,sizeof(struct _p_Mat));CHKERRQ(ierr);
903     A->bops = Abops;
904     A->ops  = Aops;
905 
906     PetscHeaderDestroy(B);
907   }
908   PetscFunctionReturn(0);
909 }
910 
911 #include "pinclude/blaslapack.h"
912 #undef __FUNC__
913 #define __FUNC__ "MatScale_MPIDense"
914 int MatScale_MPIDense(Scalar *alpha,Mat inA)
915 {
916   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
917   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
918   int          one = 1, nz;
919 
920   PetscFunctionBegin;
921   nz = a->m*a->n;
922   BLscal_( &nz, alpha, a->v, &one );
923   PLogFlops(nz);
924   PetscFunctionReturn(0);
925 }
926 
927 static int MatDuplicate_MPIDense(Mat,MatDuplicateOption,Mat *);
928 extern int MatGetSubMatrices_MPIDense(Mat,int,IS *,IS *,MatReuse,Mat **);
929 
930 /* -------------------------------------------------------------------*/
931 static struct _MatOps MatOps_Values = {MatSetValues_MPIDense,
932        MatGetRow_MPIDense,
933        MatRestoreRow_MPIDense,
934        MatMult_MPIDense,
935        MatMultAdd_MPIDense,
936        MatMultTrans_MPIDense,
937        MatMultTransAdd_MPIDense,
938        0,
939        0,
940        0,
941        0,
942        0,
943        0,
944        0,
945        MatTranspose_MPIDense,
946        MatGetInfo_MPIDense,0,
947        MatGetDiagonal_MPIDense,
948        MatDiagonalScale_MPIDense,
949        MatNorm_MPIDense,
950        MatAssemblyBegin_MPIDense,
951        MatAssemblyEnd_MPIDense,
952        0,
953        MatSetOption_MPIDense,
954        MatZeroEntries_MPIDense,
955        MatZeroRows_MPIDense,
956        0,
957        0,
958        0,
959        0,
960        MatGetSize_MPIDense,
961        MatGetLocalSize_MPIDense,
962        MatGetOwnershipRange_MPIDense,
963        0,
964        0,
965        MatGetArray_MPIDense,
966        MatRestoreArray_MPIDense,
967        MatDuplicate_MPIDense,
968        0,
969        0,
970        0,
971        0,
972        0,
973        MatGetSubMatrices_MPIDense,
974        0,
975        MatGetValues_MPIDense,
976        0,
977        0,
978        MatScale_MPIDense,
979        0,
980        0,
981        0,
982        MatGetBlockSize_MPIDense,
983        0,
984        0,
985        0,
986        0,
987        0,
988        0,
989        0,
990        0,
991        0,
992        MatGetSubMatrix_MPIDense,
993        0,
994        0,
995        MatGetMaps_Petsc};
996 
997 #undef __FUNC__
998 #define __FUNC__ "MatCreateMPIDense"
999 /*@C
1000    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
1001 
1002    Collective on MPI_Comm
1003 
1004    Input Parameters:
1005 +  comm - MPI communicator
1006 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1007 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1008 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1009 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1010 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
1011    to control all matrix memory allocation.
1012 
1013    Output Parameter:
1014 .  A - the matrix
1015 
1016    Notes:
1017    The dense format is fully compatible with standard Fortran 77
1018    storage by columns.
1019 
1020    The data input variable is intended primarily for Fortran programmers
1021    who wish to allocate their own matrix memory space.  Most users should
1022    set data=PETSC_NULL.
1023 
1024    The user MUST specify either the local or global matrix dimensions
1025    (possibly both).
1026 
1027    Level: intermediate
1028 
1029 .keywords: matrix, dense, parallel
1030 
1031 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
1032 @*/
1033 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
1034 {
1035   Mat          mat;
1036   Mat_MPIDense *a;
1037   int          ierr, i,flg;
1038 
1039   PetscFunctionBegin;
1040   /* Note:  For now, when data is specified above, this assumes the user correctly
1041    allocates the local dense storage space.  We should add error checking. */
1042 
1043   *A = 0;
1044   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",comm,MatDestroy,MatView);
1045   PLogObjectCreate(mat);
1046   mat->data         = (void *) (a = PetscNew(Mat_MPIDense));CHKPTRQ(a);
1047   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
1048   mat->ops->destroy = MatDestroy_MPIDense;
1049   mat->ops->view    = MatView_MPIDense;
1050   mat->factor       = 0;
1051   mat->mapping      = 0;
1052 
1053   a->factor       = 0;
1054   mat->insertmode = NOT_SET_VALUES;
1055   ierr = MPI_Comm_rank(comm,&a->rank);CHKERRQ(ierr);
1056   ierr = MPI_Comm_size(comm,&a->size);CHKERRQ(ierr);
1057 
1058   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
1059 
1060   ierr = PetscSplitOwnership(comm,&m,&M);CHKERRQ(ierr);
1061   a->nvec = n;
1062 
1063   /* each row stores all columns */
1064   a->N = mat->N = N;
1065   a->M = mat->M = M;
1066   a->m = mat->m = m;
1067   a->n = mat->n = N;   /* NOTE: n == N */
1068 
1069   /* the information in the maps duplicates the information computed below, eventually
1070      we should remove the duplicate information that is not contained in the maps */
1071   ierr = MapCreateMPI(comm,m,M,&mat->rmap);CHKERRQ(ierr);
1072   ierr = MapCreateMPI(comm,n,N,&mat->cmap);CHKERRQ(ierr);
1073 
1074   /* build local table of row and column ownerships */
1075   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int));CHKPTRQ(a->rowners);
1076   a->cowners = a->rowners + a->size + 1;
1077   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1078   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1079   a->rowners[0] = 0;
1080   for ( i=2; i<=a->size; i++ ) {
1081     a->rowners[i] += a->rowners[i-1];
1082   }
1083   a->rstart = a->rowners[a->rank];
1084   a->rend   = a->rowners[a->rank+1];
1085   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1086   a->cowners[0] = 0;
1087   for ( i=2; i<=a->size; i++ ) {
1088     a->cowners[i] += a->cowners[i-1];
1089   }
1090 
1091   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A);CHKERRQ(ierr);
1092   PLogObjectParent(mat,a->A);
1093 
1094   /* build cache for off array entries formed */
1095   a->donotstash = 0;
1096   ierr = MatStashCreate_Private(comm,1,&mat->stash);CHKERRQ(ierr);
1097 
1098   /* stuff used for matrix vector multiply */
1099   a->lvec        = 0;
1100   a->Mvctx       = 0;
1101   a->roworiented = 1;
1102 
1103   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatGetDiagonalBlock_C",
1104                                      "MatGetDiagonalBlock_MPIDense",
1105                                      (void*)MatGetDiagonalBlock_MPIDense);CHKERRQ(ierr);
1106 
1107   *A = mat;
1108   ierr = OptionsHasName(PETSC_NULL,"-help",&flg);CHKERRQ(ierr);
1109   if (flg) {
1110     ierr = MatPrintHelp(mat);CHKERRQ(ierr);
1111   }
1112   PetscFunctionReturn(0);
1113 }
1114 
1115 #undef __FUNC__
1116 #define __FUNC__ "MatDuplicate_MPIDense"
1117 static int MatDuplicate_MPIDense(Mat A,MatDuplicateOption cpvalues,Mat *newmat)
1118 {
1119   Mat          mat;
1120   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
1121   int          ierr;
1122   FactorCtx    *factor;
1123 
1124   PetscFunctionBegin;
1125   *newmat       = 0;
1126   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,"Mat",A->comm,MatDestroy,MatView);
1127   PLogObjectCreate(mat);
1128   mat->data         = (void *) (a = PetscNew(Mat_MPIDense));CHKPTRQ(a);
1129   ierr              = PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
1130   mat->ops->destroy = MatDestroy_MPIDense;
1131   mat->ops->view    = MatView_MPIDense;
1132   mat->factor       = A->factor;
1133   mat->assembled    = PETSC_TRUE;
1134 
1135   a->m = mat->m = oldmat->m;
1136   a->n = mat->n = oldmat->n;
1137   a->M = mat->M = oldmat->M;
1138   a->N = mat->N = oldmat->N;
1139   if (oldmat->factor) {
1140     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx));CHKPTRQ(factor);
1141     /* copy factor contents ... add this code! */
1142   } else a->factor = 0;
1143 
1144   a->rstart       = oldmat->rstart;
1145   a->rend         = oldmat->rend;
1146   a->size         = oldmat->size;
1147   a->rank         = oldmat->rank;
1148   mat->insertmode = NOT_SET_VALUES;
1149   a->donotstash   = oldmat->donotstash;
1150   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int));CHKPTRQ(a->rowners);
1151   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1152   ierr = PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));CHKERRQ(ierr);
1153   ierr = MatStashCreate_Private(A->comm,1,&mat->stash);CHKERRQ(ierr);
1154 
1155   ierr =  VecDuplicate(oldmat->lvec,&a->lvec);CHKERRQ(ierr);
1156   PLogObjectParent(mat,a->lvec);
1157   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx);CHKERRQ(ierr);
1158   PLogObjectParent(mat,a->Mvctx);
1159   ierr =  MatDuplicate(oldmat->A,cpvalues,&a->A);CHKERRQ(ierr);
1160   PLogObjectParent(mat,a->A);
1161   *newmat = mat;
1162   PetscFunctionReturn(0);
1163 }
1164 
1165 #include "sys.h"
1166 
1167 #undef __FUNC__
1168 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1169 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1170 {
1171   int        *rowners, i,size,rank,m,ierr,nz,j;
1172   Scalar     *array,*vals,*vals_ptr;
1173   MPI_Status status;
1174 
1175   PetscFunctionBegin;
1176   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1177   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1178 
1179   /* determine ownership of all rows */
1180   m          = M/size + ((M % size) > rank);
1181   rowners    = (int *) PetscMalloc((size+2)*sizeof(int));CHKPTRQ(rowners);
1182   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1183   rowners[0] = 0;
1184   for ( i=2; i<=size; i++ ) {
1185     rowners[i] += rowners[i-1];
1186   }
1187 
1188   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1189   ierr = MatGetArray(*newmat,&array);CHKERRQ(ierr);
1190 
1191   if (!rank) {
1192     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) );CHKPTRQ(vals);
1193 
1194     /* read in my part of the matrix numerical values  */
1195     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR);CHKERRQ(ierr);
1196 
1197     /* insert into matrix-by row (this is why cannot directly read into array */
1198     vals_ptr = vals;
1199     for ( i=0; i<m; i++ ) {
1200       for ( j=0; j<N; j++ ) {
1201         array[i + j*m] = *vals_ptr++;
1202       }
1203     }
1204 
1205     /* read in other processors and ship out */
1206     for ( i=1; i<size; i++ ) {
1207       nz   = (rowners[i+1] - rowners[i])*N;
1208       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1209       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1210     }
1211   } else {
1212     /* receive numeric values */
1213     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) );CHKPTRQ(vals);
1214 
1215     /* receive message of values*/
1216     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1217 
1218     /* insert into matrix-by row (this is why cannot directly read into array */
1219     vals_ptr = vals;
1220     for ( i=0; i<m; i++ ) {
1221       for ( j=0; j<N; j++ ) {
1222         array[i + j*m] = *vals_ptr++;
1223       }
1224     }
1225   }
1226   ierr = PetscFree(rowners);CHKERRQ(ierr);
1227   ierr = PetscFree(vals);CHKERRQ(ierr);
1228   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1229   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1230   PetscFunctionReturn(0);
1231 }
1232 
1233 
1234 #undef __FUNC__
1235 #define __FUNC__ "MatLoad_MPIDense"
1236 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1237 {
1238   Mat          A;
1239   Scalar       *vals,*svals;
1240   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1241   MPI_Status   status;
1242   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1243   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1244   int          tag = ((PetscObject)viewer)->tag;
1245   int          i, nz, ierr, j,rstart, rend, fd;
1246 
1247   PetscFunctionBegin;
1248   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1249   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1250   if (!rank) {
1251     ierr = ViewerBinaryGetDescriptor(viewer,&fd);CHKERRQ(ierr);
1252     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT);CHKERRQ(ierr);
1253     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1254   }
1255 
1256   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1257   M = header[1]; N = header[2]; nz = header[3];
1258 
1259   /*
1260        Handle case where matrix is stored on disk as a dense matrix
1261   */
1262   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1263     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1264     PetscFunctionReturn(0);
1265   }
1266 
1267   /* determine ownership of all rows */
1268   m          = M/size + ((M % size) > rank);
1269   rowners    = (int *) PetscMalloc((size+2)*sizeof(int));CHKPTRQ(rowners);
1270   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1271   rowners[0] = 0;
1272   for ( i=2; i<=size; i++ ) {
1273     rowners[i] += rowners[i-1];
1274   }
1275   rstart = rowners[rank];
1276   rend   = rowners[rank+1];
1277 
1278   /* distribute row lengths to all processors */
1279   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) );CHKPTRQ(ourlens);
1280   offlens = ourlens + (rend-rstart);
1281   if (!rank) {
1282     rowlengths = (int*) PetscMalloc( M*sizeof(int) );CHKPTRQ(rowlengths);
1283     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT);CHKERRQ(ierr);
1284     sndcounts = (int*) PetscMalloc( size*sizeof(int) );CHKPTRQ(sndcounts);
1285     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1286     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1287     ierr = PetscFree(sndcounts);CHKERRQ(ierr);
1288   } else {
1289     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1290   }
1291 
1292   if (!rank) {
1293     /* calculate the number of nonzeros on each processor */
1294     procsnz = (int*) PetscMalloc( size*sizeof(int) );CHKPTRQ(procsnz);
1295     ierr    = PetscMemzero(procsnz,size*sizeof(int));CHKERRQ(ierr);
1296     for ( i=0; i<size; i++ ) {
1297       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1298         procsnz[i] += rowlengths[j];
1299       }
1300     }
1301     ierr = PetscFree(rowlengths);CHKERRQ(ierr);
1302 
1303     /* determine max buffer needed and allocate it */
1304     maxnz = 0;
1305     for ( i=0; i<size; i++ ) {
1306       maxnz = PetscMax(maxnz,procsnz[i]);
1307     }
1308     cols = (int *) PetscMalloc( maxnz*sizeof(int) );CHKPTRQ(cols);
1309 
1310     /* read in my part of the matrix column indices  */
1311     nz = procsnz[0];
1312     mycols = (int *) PetscMalloc( nz*sizeof(int) );CHKPTRQ(mycols);
1313     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT);CHKERRQ(ierr);
1314 
1315     /* read in every one elses and ship off */
1316     for ( i=1; i<size; i++ ) {
1317       nz   = procsnz[i];
1318       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT);CHKERRQ(ierr);
1319       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1320     }
1321     ierr = PetscFree(cols);CHKERRQ(ierr);
1322   } else {
1323     /* determine buffer space needed for message */
1324     nz = 0;
1325     for ( i=0; i<m; i++ ) {
1326       nz += ourlens[i];
1327     }
1328     mycols = (int*) PetscMalloc( nz*sizeof(int) );CHKPTRQ(mycols);
1329 
1330     /* receive message of column indices*/
1331     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1332     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1333     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1334   }
1335 
1336   /* loop over local rows, determining number of off diagonal entries */
1337   ierr = PetscMemzero(offlens,m*sizeof(int));CHKERRQ(ierr);
1338   jj = 0;
1339   for ( i=0; i<m; i++ ) {
1340     for ( j=0; j<ourlens[i]; j++ ) {
1341       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1342       jj++;
1343     }
1344   }
1345 
1346   /* create our matrix */
1347   for ( i=0; i<m; i++ ) {
1348     ourlens[i] -= offlens[i];
1349   }
1350   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1351   A = *newmat;
1352   for ( i=0; i<m; i++ ) {
1353     ourlens[i] += offlens[i];
1354   }
1355 
1356   if (!rank) {
1357     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) );CHKPTRQ(vals);
1358 
1359     /* read in my part of the matrix numerical values  */
1360     nz = procsnz[0];
1361     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1362 
1363     /* insert into matrix */
1364     jj      = rstart;
1365     smycols = mycols;
1366     svals   = vals;
1367     for ( i=0; i<m; i++ ) {
1368       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1369       smycols += ourlens[i];
1370       svals   += ourlens[i];
1371       jj++;
1372     }
1373 
1374     /* read in other processors and ship out */
1375     for ( i=1; i<size; i++ ) {
1376       nz   = procsnz[i];
1377       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
1378       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1379     }
1380     ierr = PetscFree(procsnz);CHKERRQ(ierr);
1381   } else {
1382     /* receive numeric values */
1383     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) );CHKPTRQ(vals);
1384 
1385     /* receive message of values*/
1386     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1387     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1388     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1389 
1390     /* insert into matrix */
1391     jj      = rstart;
1392     smycols = mycols;
1393     svals   = vals;
1394     for ( i=0; i<m; i++ ) {
1395       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1396       smycols += ourlens[i];
1397       svals   += ourlens[i];
1398       jj++;
1399     }
1400   }
1401   ierr = PetscFree(ourlens);CHKERRQ(ierr);
1402   ierr = PetscFree(vals);CHKERRQ(ierr);
1403   ierr = PetscFree(mycols);CHKERRQ(ierr);
1404   ierr = PetscFree(rowners);CHKERRQ(ierr);
1405 
1406   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1407   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1408   PetscFunctionReturn(0);
1409 }
1410 
1411 
1412 
1413 
1414 
1415