xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 44bdcb2d3326b2ce09a36bf9b6840bc8773a5d60)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.91 1998/05/29 20:37:01 bsmith Exp balay $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 #undef __FUNC__
13 #define __FUNC__ "MatSetValues_MPIDense"
14 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   PetscFunctionBegin;
21   for ( i=0; i<m; i++ ) {
22     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
23     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
24     if (idxm[i] >= rstart && idxm[i] < rend) {
25       row = idxm[i] - rstart;
26       if (roworiented) {
27         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
28       } else {
29         for ( j=0; j<n; j++ ) {
30           if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
31           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
32           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
33         }
34       }
35     } else {
36       if (roworiented) {
37         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
38       } else { /* must stash each seperately */
39         row = idxm[i];
40         for ( j=0; j<n; j++ ) {
41           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
42         }
43       }
44     }
45   }
46   PetscFunctionReturn(0);
47 }
48 
49 #undef __FUNC__
50 #define __FUNC__ "MatGetValues_MPIDense"
51 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
52 {
53   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
54   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
55 
56   PetscFunctionBegin;
57   for ( i=0; i<m; i++ ) {
58     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
59     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
60     if (idxm[i] >= rstart && idxm[i] < rend) {
61       row = idxm[i] - rstart;
62       for ( j=0; j<n; j++ ) {
63         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
64         if (idxn[j] >= mdn->N) {
65           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
66         }
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     } else {
70       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
71     }
72   }
73   PetscFunctionReturn(0);
74 }
75 
76 #undef __FUNC__
77 #define __FUNC__ "MatGetArray_MPIDense"
78 int MatGetArray_MPIDense(Mat A,Scalar **array)
79 {
80   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
81   int          ierr;
82 
83   PetscFunctionBegin;
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   PetscFunctionReturn(0);
86 }
87 
88 #undef __FUNC__
89 #define __FUNC__ "MatRestoreArray_MPIDense"
90 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
91 {
92   PetscFunctionBegin;
93   PetscFunctionReturn(0);
94 }
95 
96 #undef __FUNC__
97 #define __FUNC__ "MatAssemblyBegin_MPIDense"
98 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
99 {
100   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
101   MPI_Comm     comm = mat->comm;
102   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
103   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
104   int          tag = mat->tag, *owner,*starts,count,ierr;
105   InsertMode   addv;
106   MPI_Request  *send_waits,*recv_waits;
107   Scalar       *rvalues,*svalues;
108 
109   PetscFunctionBegin;
110   /* make sure all processors are either in INSERTMODE or ADDMODE */
111   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
112   if (addv == (ADD_VALUES|INSERT_VALUES)) {
113     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
114   }
115   mat->insertmode = addv; /* in case this processor had no cache */
116 
117   /*  first count number of contributors to each processor */
118   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
119   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
120   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
121   for ( i=0; i<mdn->stash.n; i++ ) {
122     idx = mdn->stash.idx[i];
123     for ( j=0; j<size; j++ ) {
124       if (idx >= owners[j] && idx < owners[j+1]) {
125         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
126       }
127     }
128   }
129   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
130 
131   /* inform other processors of number of messages and max length*/
132   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
133   ierr = MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
134   nreceives = work[rank];
135   if (nreceives > size) SETERRQ(PETSC_ERR_PLIB,0,"Internal PETSc error");
136   ierr = MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
137   nmax = work[rank];
138   PetscFree(work);
139 
140   /* post receives:
141        1) each message will consist of ordered pairs
142      (global index,value) we store the global index as a double
143      to simplify the message passing.
144        2) since we don't know how long each individual message is we
145      allocate the largest needed buffer for each receive. Potentially
146      this is a lot of wasted space.
147 
148        This could be done better.
149   */
150   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
151   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
152   for ( i=0; i<nreceives; i++ ) {
153     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
154   }
155 
156   /* do sends:
157       1) starts[i] gives the starting index in svalues for stuff going to
158          the ith processor
159   */
160   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
161   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
162   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
163   starts[0] = 0;
164   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
165   for ( i=0; i<mdn->stash.n; i++ ) {
166     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
167     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
168     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
169   }
170   PetscFree(owner);
171   starts[0] = 0;
172   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
173   count = 0;
174   for ( i=0; i<size; i++ ) {
175     if (procs[i]) {
176       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
177     }
178   }
179   PetscFree(starts); PetscFree(nprocs);
180 
181   /* Free cache space */
182   PLogInfo(mat,"MatAssemblyBegin_MPIDense:Number of off-processor values %d\n",mdn->stash.n);
183   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
184 
185   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
186   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
187   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
188   mdn->rmax       = nmax;
189 
190   PetscFunctionReturn(0);
191 }
192 extern int MatSetUpMultiply_MPIDense(Mat);
193 
194 #undef __FUNC__
195 #define __FUNC__ "MatAssemblyEnd_MPIDense"
196 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
197 {
198   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
199   MPI_Status   *send_status,recv_status;
200   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
201   Scalar       *values,val;
202   InsertMode   addv = mat->insertmode;
203 
204   PetscFunctionBegin;
205   /*  wait on receives */
206   while (count) {
207     ierr = MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
208     /* unpack receives into our local space */
209     values = mdn->rvalues + 3*imdex*mdn->rmax;
210     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
211     n = n/3;
212     for ( i=0; i<n; i++ ) {
213       row = (int) PetscReal(values[3*i]) - mdn->rstart;
214       col = (int) PetscReal(values[3*i+1]);
215       val = values[3*i+2];
216       if (col >= 0 && col < mdn->N) {
217         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
218       }
219       else {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid column");}
220     }
221     count--;
222   }
223   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
224 
225   /* wait on sends */
226   if (mdn->nsends) {
227     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
228     ierr        = MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);CHKERRQ(ierr);
229     PetscFree(send_status);
230   }
231   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
232 
233   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
234   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
235 
236   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
237     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
238   }
239   PetscFunctionReturn(0);
240 }
241 
242 #undef __FUNC__
243 #define __FUNC__ "MatZeroEntries_MPIDense"
244 int MatZeroEntries_MPIDense(Mat A)
245 {
246   int          ierr;
247   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
248 
249   PetscFunctionBegin;
250   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
251   PetscFunctionReturn(0);
252 }
253 
254 #undef __FUNC__
255 #define __FUNC__ "MatGetBlockSize_MPIDense"
256 int MatGetBlockSize_MPIDense(Mat A,int *bs)
257 {
258   PetscFunctionBegin;
259   *bs = 1;
260   PetscFunctionReturn(0);
261 }
262 
263 /* the code does not do the diagonal entries correctly unless the
264    matrix is square and the column and row owerships are identical.
265    This is a BUG. The only way to fix it seems to be to access
266    mdn->A and mdn->B directly and not through the MatZeroRows()
267    routine.
268 */
269 #undef __FUNC__
270 #define __FUNC__ "MatZeroRows_MPIDense"
271 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
272 {
273   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
274   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
275   int            *procs,*nprocs,j,found,idx,nsends,*work;
276   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
277   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
278   int            *lens,imdex,*lrows,*values;
279   MPI_Comm       comm = A->comm;
280   MPI_Request    *send_waits,*recv_waits;
281   MPI_Status     recv_status,*send_status;
282   IS             istmp;
283 
284   PetscFunctionBegin;
285   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
286   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
287 
288   /*  first count number of contributors to each processor */
289   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
290   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
291   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
292   for ( i=0; i<N; i++ ) {
293     idx = rows[i];
294     found = 0;
295     for ( j=0; j<size; j++ ) {
296       if (idx >= owners[j] && idx < owners[j+1]) {
297         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
298       }
299     }
300     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
301   }
302   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
303 
304   /* inform other processors of number of messages and max length*/
305   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
306   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
307   nrecvs = work[rank];
308   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
309   nmax   = work[rank];
310   PetscFree(work);
311 
312   /* post receives:   */
313   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
314   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
315   for ( i=0; i<nrecvs; i++ ) {
316     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
317   }
318 
319   /* do sends:
320       1) starts[i] gives the starting index in svalues for stuff going to
321          the ith processor
322   */
323   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
324   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
325   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
326   starts[0]  = 0;
327   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
328   for ( i=0; i<N; i++ ) {
329     svalues[starts[owner[i]]++] = rows[i];
330   }
331   ISRestoreIndices(is,&rows);
332 
333   starts[0] = 0;
334   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
335   count = 0;
336   for ( i=0; i<size; i++ ) {
337     if (procs[i]) {
338       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
339     }
340   }
341   PetscFree(starts);
342 
343   base = owners[rank];
344 
345   /*  wait on receives */
346   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
347   source = lens + nrecvs;
348   count  = nrecvs; slen = 0;
349   while (count) {
350     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
351     /* unpack receives into our local space */
352     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
353     source[imdex]  = recv_status.MPI_SOURCE;
354     lens[imdex]  = n;
355     slen += n;
356     count--;
357   }
358   PetscFree(recv_waits);
359 
360   /* move the data into the send scatter */
361   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
362   count = 0;
363   for ( i=0; i<nrecvs; i++ ) {
364     values = rvalues + i*nmax;
365     for ( j=0; j<lens[i]; j++ ) {
366       lrows[count++] = values[j] - base;
367     }
368   }
369   PetscFree(rvalues); PetscFree(lens);
370   PetscFree(owner); PetscFree(nprocs);
371 
372   /* actually zap the local rows */
373   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
374   PLogObjectParent(A,istmp);
375   PetscFree(lrows);
376   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
377   ierr = ISDestroy(istmp); CHKERRQ(ierr);
378 
379   /* wait on sends */
380   if (nsends) {
381     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
382     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
383     PetscFree(send_status);
384   }
385   PetscFree(send_waits); PetscFree(svalues);
386 
387   PetscFunctionReturn(0);
388 }
389 
390 #undef __FUNC__
391 #define __FUNC__ "MatMult_MPIDense"
392 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
395   int          ierr;
396 
397   PetscFunctionBegin;
398   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
399   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
400   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
401   PetscFunctionReturn(0);
402 }
403 
404 #undef __FUNC__
405 #define __FUNC__ "MatMultAdd_MPIDense"
406 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
407 {
408   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
409   int          ierr;
410 
411   PetscFunctionBegin;
412   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
413   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
414   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
415   PetscFunctionReturn(0);
416 }
417 
418 #undef __FUNC__
419 #define __FUNC__ "MatMultTrans_MPIDense"
420 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
421 {
422   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
423   int          ierr;
424   Scalar       zero = 0.0;
425 
426   PetscFunctionBegin;
427   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
428   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
429   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
430   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
431   PetscFunctionReturn(0);
432 }
433 
434 #undef __FUNC__
435 #define __FUNC__ "MatMultTransAdd_MPIDense"
436 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
437 {
438   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
439   int          ierr;
440 
441   PetscFunctionBegin;
442   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
443   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
444   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
445   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
446   PetscFunctionReturn(0);
447 }
448 
449 #undef __FUNC__
450 #define __FUNC__ "MatGetDiagonal_MPIDense"
451 int MatGetDiagonal_MPIDense(Mat A,Vec v)
452 {
453   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
454   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
455   int          ierr, len, i, n, m = a->m, radd;
456   Scalar       *x, zero = 0.0;
457 
458   PetscFunctionBegin;
459   VecSet(&zero,v);
460   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
461   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
462   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
463   len = PetscMin(aloc->m,aloc->n);
464   radd = a->rstart*m;
465   for ( i=0; i<len; i++ ) {
466     x[i] = aloc->v[radd + i*m + i];
467   }
468   PetscFunctionReturn(0);
469 }
470 
471 #undef __FUNC__
472 #define __FUNC__ "MatDestroy_MPIDense"
473 int MatDestroy_MPIDense(Mat mat)
474 {
475   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
476   int          ierr;
477 
478   PetscFunctionBegin;
479   if (--mat->refct > 0) PetscFunctionReturn(0);
480 
481   if (mat->mapping) {
482     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
483   }
484   if (mat->bmapping) {
485     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
486   }
487 #if defined(USE_PETSC_LOG)
488   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mdn->M,mdn->N);
489 #endif
490   PetscFree(mdn->rowners);
491   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
492   if (mdn->lvec)   VecDestroy(mdn->lvec);
493   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
494   if (mdn->factor) {
495     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
496     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
497     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
498     PetscFree(mdn->factor);
499   }
500   PetscFree(mdn);
501   if (mat->mapping) {
502     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
503   }
504   PLogObjectDestroy(mat);
505   PetscHeaderDestroy(mat);
506   PetscFunctionReturn(0);
507 }
508 
509 #include "pinclude/pviewer.h"
510 
511 #undef __FUNC__
512 #define __FUNC__ "MatView_MPIDense_Binary"
513 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
514 {
515   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
516   int          ierr;
517 
518   PetscFunctionBegin;
519   if (mdn->size == 1) {
520     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
521   }
522   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
523   PetscFunctionReturn(0);
524 }
525 
526 #undef __FUNC__
527 #define __FUNC__ "MatView_MPIDense_ASCII"
528 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
529 {
530   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
531   int          ierr, format;
532   FILE         *fd;
533   ViewerType   vtype;
534 
535   PetscFunctionBegin;
536   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
537   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
538   ierr = ViewerGetFormat(viewer,&format);
539   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
540     int rank;
541     MatInfo info;
542     MPI_Comm_rank(mat->comm,&rank);
543     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
544     PetscSequentialPhaseBegin(mat->comm,1);
545       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
546          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
547       fflush(fd);
548     PetscSequentialPhaseEnd(mat->comm,1);
549     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
550     PetscFunctionReturn(0);
551   }
552   else if (format == VIEWER_FORMAT_ASCII_INFO) {
553     PetscFunctionReturn(0);
554   }
555   if (vtype == ASCII_FILE_VIEWER) {
556     PetscSequentialPhaseBegin(mat->comm,1);
557     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
558              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
559     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
560     fflush(fd);
561     PetscSequentialPhaseEnd(mat->comm,1);
562   } else {
563     int size = mdn->size, rank = mdn->rank;
564     if (size == 1) {
565       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
566     } else {
567       /* assemble the entire matrix onto first processor. */
568       Mat          A;
569       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
570       Scalar       *vals;
571       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
572 
573       if (!rank) {
574         ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
575       } else {
576         ierr = MatCreateMPIDense(mat->comm,0,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
577       }
578       PLogObjectParent(mat,A);
579 
580       /* Copy the matrix ... This isn't the most efficient means,
581          but it's quick for now */
582       row = mdn->rstart; m = Amdn->m;
583       for ( i=0; i<m; i++ ) {
584         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
585         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
586         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
587         row++;
588       }
589 
590       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
591       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
592       if (!rank) {
593         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
594       }
595       ierr = MatDestroy(A); CHKERRQ(ierr);
596     }
597   }
598   PetscFunctionReturn(0);
599 }
600 
601 #undef __FUNC__
602 #define __FUNC__ "MatView_MPIDense"
603 int MatView_MPIDense(Mat mat,Viewer viewer)
604 {
605   int          ierr;
606   ViewerType   vtype;
607 
608   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
609   if (vtype == ASCII_FILE_VIEWER) {
610     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
611   } else if (vtype == ASCII_FILES_VIEWER) {
612     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
613   } else if (vtype == BINARY_FILE_VIEWER) {
614     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
615   } else {
616     SETERRQ(1,1,"Viewer type not supported by PETSc object");
617   }
618   PetscFunctionReturn(0);
619 }
620 
621 #undef __FUNC__
622 #define __FUNC__ "MatGetInfo_MPIDense"
623 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
624 {
625   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
626   Mat          mdn = mat->A;
627   int          ierr;
628   double       isend[5], irecv[5];
629 
630   PetscFunctionBegin;
631   info->rows_global    = (double)mat->M;
632   info->columns_global = (double)mat->N;
633   info->rows_local     = (double)mat->m;
634   info->columns_local  = (double)mat->N;
635   info->block_size     = 1.0;
636   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
637   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
638   isend[3] = info->memory;  isend[4] = info->mallocs;
639   if (flag == MAT_LOCAL) {
640     info->nz_used      = isend[0];
641     info->nz_allocated = isend[1];
642     info->nz_unneeded  = isend[2];
643     info->memory       = isend[3];
644     info->mallocs      = isend[4];
645   } else if (flag == MAT_GLOBAL_MAX) {
646     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
647     info->nz_used      = irecv[0];
648     info->nz_allocated = irecv[1];
649     info->nz_unneeded  = irecv[2];
650     info->memory       = irecv[3];
651     info->mallocs      = irecv[4];
652   } else if (flag == MAT_GLOBAL_SUM) {
653     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
654     info->nz_used      = irecv[0];
655     info->nz_allocated = irecv[1];
656     info->nz_unneeded  = irecv[2];
657     info->memory       = irecv[3];
658     info->mallocs      = irecv[4];
659   }
660   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
661   info->fill_ratio_needed = 0;
662   info->factor_mallocs    = 0;
663   PetscFunctionReturn(0);
664 }
665 
666 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
667    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
668    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
669    extern int MatSolve_MPIDense(Mat,Vec,Vec);
670    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
671    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
672    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
673 
674 #undef __FUNC__
675 #define __FUNC__ "MatSetOption_MPIDense"
676 int MatSetOption_MPIDense(Mat A,MatOption op)
677 {
678   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
679 
680   PetscFunctionBegin;
681   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
682       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
683       op == MAT_NEW_NONZERO_LOCATION_ERROR ||
684       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
685       op == MAT_COLUMNS_SORTED ||
686       op == MAT_COLUMNS_UNSORTED) {
687         MatSetOption(a->A,op);
688   } else if (op == MAT_ROW_ORIENTED) {
689         a->roworiented = 1;
690         MatSetOption(a->A,op);
691   } else if (op == MAT_ROWS_SORTED ||
692              op == MAT_ROWS_UNSORTED ||
693              op == MAT_SYMMETRIC ||
694              op == MAT_STRUCTURALLY_SYMMETRIC ||
695              op == MAT_YES_NEW_DIAGONALS ||
696              op == MAT_USE_HASH_TABLE) {
697     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
698   } else if (op == MAT_COLUMN_ORIENTED) {
699     a->roworiented = 0; MatSetOption(a->A,op);
700   } else if (op == MAT_NO_NEW_DIAGONALS) {
701     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
702   } else {
703     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
704   }
705   PetscFunctionReturn(0);
706 }
707 
708 #undef __FUNC__
709 #define __FUNC__ "MatGetSize_MPIDense"
710 int MatGetSize_MPIDense(Mat A,int *m,int *n)
711 {
712   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
713 
714   PetscFunctionBegin;
715   *m = mat->M; *n = mat->N;
716   PetscFunctionReturn(0);
717 }
718 
719 #undef __FUNC__
720 #define __FUNC__ "MatGetLocalSize_MPIDense"
721 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
722 {
723   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
724 
725   PetscFunctionBegin;
726   *m = mat->m; *n = mat->N;
727   PetscFunctionReturn(0);
728 }
729 
730 #undef __FUNC__
731 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
732 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
733 {
734   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
735 
736   PetscFunctionBegin;
737   *m = mat->rstart; *n = mat->rend;
738   PetscFunctionReturn(0);
739 }
740 
741 #undef __FUNC__
742 #define __FUNC__ "MatGetRow_MPIDense"
743 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
744 {
745   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
746   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
747 
748   PetscFunctionBegin;
749   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
750   lrow = row - rstart;
751   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
752   PetscFunctionReturn(0);
753 }
754 
755 #undef __FUNC__
756 #define __FUNC__ "MatRestoreRow_MPIDense"
757 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
758 {
759   PetscFunctionBegin;
760   if (idx) PetscFree(*idx);
761   if (v) PetscFree(*v);
762   PetscFunctionReturn(0);
763 }
764 
765 #undef __FUNC__
766 #define __FUNC__ "MatNorm_MPIDense"
767 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
768 {
769   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
770   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
771   int          ierr, i, j;
772   double       sum = 0.0;
773   Scalar       *v = mat->v;
774 
775   PetscFunctionBegin;
776   if (mdn->size == 1) {
777     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
778   } else {
779     if (type == NORM_FROBENIUS) {
780       for (i=0; i<mat->n*mat->m; i++ ) {
781 #if defined(USE_PETSC_COMPLEX)
782         sum += PetscReal(PetscConj(*v)*(*v)); v++;
783 #else
784         sum += (*v)*(*v); v++;
785 #endif
786       }
787       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
788       *norm = sqrt(*norm);
789       PLogFlops(2*mat->n*mat->m);
790     } else if (type == NORM_1) {
791       double *tmp, *tmp2;
792       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
793       tmp2 = tmp + mdn->N;
794       PetscMemzero(tmp,2*mdn->N*sizeof(double));
795       *norm = 0.0;
796       v = mat->v;
797       for ( j=0; j<mat->n; j++ ) {
798         for ( i=0; i<mat->m; i++ ) {
799           tmp[j] += PetscAbsScalar(*v);  v++;
800         }
801       }
802       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
803       for ( j=0; j<mdn->N; j++ ) {
804         if (tmp2[j] > *norm) *norm = tmp2[j];
805       }
806       PetscFree(tmp);
807       PLogFlops(mat->n*mat->m);
808     } else if (type == NORM_INFINITY) { /* max row norm */
809       double ntemp;
810       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
811       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
812     } else {
813       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
814     }
815   }
816   PetscFunctionReturn(0);
817 }
818 
819 #undef __FUNC__
820 #define __FUNC__ "MatTranspose_MPIDense"
821 int MatTranspose_MPIDense(Mat A,Mat *matout)
822 {
823   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
824   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
825   Mat          B;
826   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
827   int          j, i, ierr;
828   Scalar       *v;
829 
830   PetscFunctionBegin;
831   if (matout == PETSC_NULL && M != N) {
832     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
833   }
834   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
835 
836   m = Aloc->m; n = Aloc->n; v = Aloc->v;
837   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
838   for ( j=0; j<n; j++ ) {
839     for (i=0; i<m; i++) rwork[i] = rstart + i;
840     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
841     v   += m;
842   }
843   PetscFree(rwork);
844   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
845   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
846   if (matout != PETSC_NULL) {
847     *matout = B;
848   } else {
849     PetscOps       *Abops;
850     struct _MatOps *Aops;
851 
852     /* This isn't really an in-place transpose, but free data struct from a */
853     PetscFree(a->rowners);
854     ierr = MatDestroy(a->A); CHKERRQ(ierr);
855     if (a->lvec) VecDestroy(a->lvec);
856     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
857     PetscFree(a);
858 
859     /*
860          This is horrible, horrible code. We need to keep the
861       A pointers for the bops and ops but copy everything
862       else from C.
863     */
864     Abops = A->bops;
865     Aops  = A->ops;
866     PetscMemcpy(A,B,sizeof(struct _p_Mat));
867     A->bops = Abops;
868     A->ops  = Aops;
869 
870     PetscHeaderDestroy(B);
871   }
872   PetscFunctionReturn(0);
873 }
874 
875 #include "pinclude/blaslapack.h"
876 #undef __FUNC__
877 #define __FUNC__ "MatScale_MPIDense"
878 int MatScale_MPIDense(Scalar *alpha,Mat inA)
879 {
880   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
881   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
882   int          one = 1, nz;
883 
884   PetscFunctionBegin;
885   nz = a->m*a->n;
886   BLscal_( &nz, alpha, a->v, &one );
887   PLogFlops(nz);
888   PetscFunctionReturn(0);
889 }
890 
891 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
892 
893 /* -------------------------------------------------------------------*/
894 static struct _MatOps MatOps = {MatSetValues_MPIDense,
895        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
896        MatMult_MPIDense,MatMultAdd_MPIDense,
897        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
898        0,0,
899        0,0,
900        0,0,
901        0,MatTranspose_MPIDense,
902        MatGetInfo_MPIDense,0,
903        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
904        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
905        0,
906        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
907        0,0,
908        0,0,
909        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
910        MatGetOwnershipRange_MPIDense,
911        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
912        MatConvertSameType_MPIDense,
913        0,0,0,0,0,
914        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense,
915        0,0,0,MatGetBlockSize_MPIDense};
916 
917 #undef __FUNC__
918 #define __FUNC__ "MatCreateMPIDense"
919 /*@C
920    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
921 
922    Collective on MPI_Comm
923 
924    Input Parameters:
925 +  comm - MPI communicator
926 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
927 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
928 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
929 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
930 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
931    to control all matrix memory allocation.
932 
933    Output Parameter:
934 .  A - the matrix
935 
936    Notes:
937    The dense format is fully compatible with standard Fortran 77
938    storage by columns.
939 
940    The data input variable is intended primarily for Fortran programmers
941    who wish to allocate their own matrix memory space.  Most users should
942    set data=PETSC_NULL.
943 
944    The user MUST specify either the local or global matrix dimensions
945    (possibly both).
946 
947    Currently, the only parallel dense matrix decomposition is by rows,
948    so that n=N and each submatrix owns all of the global columns.
949 
950 .keywords: matrix, dense, parallel
951 
952 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
953 @*/
954 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
955 {
956   Mat          mat;
957   Mat_MPIDense *a;
958   int          ierr, i,flg;
959 
960   PetscFunctionBegin;
961   /* Note:  For now, when data is specified above, this assumes the user correctly
962    allocates the local dense storage space.  We should add error checking. */
963 
964   *A = 0;
965   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,comm,MatDestroy,MatView);
966   PLogObjectCreate(mat);
967   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
968   PetscMemcpy(mat->ops,&MatOps,sizeof(struct _MatOps));
969   mat->ops->destroy    = MatDestroy_MPIDense;
970   mat->ops->view       = MatView_MPIDense;
971   mat->factor          = 0;
972   mat->mapping         = 0;
973 
974   a->factor       = 0;
975   mat->insertmode = NOT_SET_VALUES;
976   MPI_Comm_rank(comm,&a->rank);
977   MPI_Comm_size(comm,&a->size);
978 
979   if (M == PETSC_DECIDE) {ierr = MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);}
980   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
981 
982   /* each row stores all columns */
983   if (N == PETSC_DECIDE) N = n;
984   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
985   /*  if (n != N) SETERRQ(PETSC_ERR_SUP,0,"For now, only n=N is supported"); */
986   a->N = mat->N = N;
987   a->M = mat->M = M;
988   a->m = mat->m = m;
989   a->n = mat->n = n;
990 
991   /* build local table of row and column ownerships */
992   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
993   a->cowners = a->rowners + a->size + 1;
994   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
995   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
996   a->rowners[0] = 0;
997   for ( i=2; i<=a->size; i++ ) {
998     a->rowners[i] += a->rowners[i-1];
999   }
1000   a->rstart = a->rowners[a->rank];
1001   a->rend   = a->rowners[a->rank+1];
1002   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1003   a->cowners[0] = 0;
1004   for ( i=2; i<=a->size; i++ ) {
1005     a->cowners[i] += a->cowners[i-1];
1006   }
1007 
1008   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
1009   PLogObjectParent(mat,a->A);
1010 
1011   /* build cache for off array entries formed */
1012   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
1013 
1014   /* stuff used for matrix vector multiply */
1015   a->lvec        = 0;
1016   a->Mvctx       = 0;
1017   a->roworiented = 1;
1018 
1019   *A = mat;
1020   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1021   if (flg) {
1022     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1023   }
1024   PetscFunctionReturn(0);
1025 }
1026 
1027 #undef __FUNC__
1028 #define __FUNC__ "MatConvertSameType_MPIDense"
1029 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
1030 {
1031   Mat          mat;
1032   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
1033   int          ierr;
1034   FactorCtx    *factor;
1035 
1036   PetscFunctionBegin;
1037   *newmat       = 0;
1038   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,A->comm,MatDestroy,MatView);
1039   PLogObjectCreate(mat);
1040   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
1041   PetscMemcpy(mat->ops,&MatOps,sizeof(struct _MatOps));
1042   mat->ops->destroy   = MatDestroy_MPIDense;
1043   mat->ops->view      = MatView_MPIDense;
1044   mat->factor         = A->factor;
1045   mat->assembled      = PETSC_TRUE;
1046 
1047   a->m = mat->m = oldmat->m;
1048   a->n = mat->n = oldmat->n;
1049   a->M = mat->M = oldmat->M;
1050   a->N = mat->N = oldmat->N;
1051   if (oldmat->factor) {
1052     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
1053     /* copy factor contents ... add this code! */
1054   } else a->factor = 0;
1055 
1056   a->rstart       = oldmat->rstart;
1057   a->rend         = oldmat->rend;
1058   a->size         = oldmat->size;
1059   a->rank         = oldmat->rank;
1060   mat->insertmode = NOT_SET_VALUES;
1061 
1062   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1063   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1064   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1065   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1066 
1067   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1068   PLogObjectParent(mat,a->lvec);
1069   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1070   PLogObjectParent(mat,a->Mvctx);
1071   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1072   PLogObjectParent(mat,a->A);
1073   *newmat = mat;
1074   PetscFunctionReturn(0);
1075 }
1076 
1077 #include "sys.h"
1078 
1079 #undef __FUNC__
1080 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1081 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1082 {
1083   int        *rowners, i,size,rank,m,ierr,nz,j;
1084   Scalar     *array,*vals,*vals_ptr;
1085   MPI_Status status;
1086 
1087   PetscFunctionBegin;
1088   MPI_Comm_rank(comm,&rank);
1089   MPI_Comm_size(comm,&size);
1090 
1091   /* determine ownership of all rows */
1092   m          = M/size + ((M % size) > rank);
1093   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1094   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1095   rowners[0] = 0;
1096   for ( i=2; i<=size; i++ ) {
1097     rowners[i] += rowners[i-1];
1098   }
1099 
1100   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1101   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
1102 
1103   if (!rank) {
1104     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1105 
1106     /* read in my part of the matrix numerical values  */
1107     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR); CHKERRQ(ierr);
1108 
1109     /* insert into matrix-by row (this is why cannot directly read into array */
1110     vals_ptr = vals;
1111     for ( i=0; i<m; i++ ) {
1112       for ( j=0; j<N; j++ ) {
1113         array[i + j*m] = *vals_ptr++;
1114       }
1115     }
1116 
1117     /* read in other processors and ship out */
1118     for ( i=1; i<size; i++ ) {
1119       nz   = (rowners[i+1] - rowners[i])*N;
1120       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1121       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1122     }
1123   } else {
1124     /* receive numeric values */
1125     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1126 
1127     /* receive message of values*/
1128     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1129 
1130     /* insert into matrix-by row (this is why cannot directly read into array */
1131     vals_ptr = vals;
1132     for ( i=0; i<m; i++ ) {
1133       for ( j=0; j<N; j++ ) {
1134         array[i + j*m] = *vals_ptr++;
1135       }
1136     }
1137   }
1138   PetscFree(rowners);
1139   PetscFree(vals);
1140   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1141   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1142   PetscFunctionReturn(0);
1143 }
1144 
1145 
1146 #undef __FUNC__
1147 #define __FUNC__ "MatLoad_MPIDense"
1148 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1149 {
1150   Mat          A;
1151   Scalar       *vals,*svals;
1152   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1153   MPI_Status   status;
1154   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1155   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1156   int          tag = ((PetscObject)viewer)->tag;
1157   int          i, nz, ierr, j,rstart, rend, fd;
1158 
1159   PetscFunctionBegin;
1160   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1161   if (!rank) {
1162     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1163     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1164     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1165   }
1166 
1167   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1168   M = header[1]; N = header[2]; nz = header[3];
1169 
1170   /*
1171        Handle case where matrix is stored on disk as a dense matrix
1172   */
1173   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1174     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1175     PetscFunctionReturn(0);
1176   }
1177 
1178   /* determine ownership of all rows */
1179   m          = M/size + ((M % size) > rank);
1180   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1181   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1182   rowners[0] = 0;
1183   for ( i=2; i<=size; i++ ) {
1184     rowners[i] += rowners[i-1];
1185   }
1186   rstart = rowners[rank];
1187   rend   = rowners[rank+1];
1188 
1189   /* distribute row lengths to all processors */
1190   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1191   offlens = ourlens + (rend-rstart);
1192   if (!rank) {
1193     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1194     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1195     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1196     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1197     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1198     PetscFree(sndcounts);
1199   } else {
1200     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1201   }
1202 
1203   if (!rank) {
1204     /* calculate the number of nonzeros on each processor */
1205     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1206     PetscMemzero(procsnz,size*sizeof(int));
1207     for ( i=0; i<size; i++ ) {
1208       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1209         procsnz[i] += rowlengths[j];
1210       }
1211     }
1212     PetscFree(rowlengths);
1213 
1214     /* determine max buffer needed and allocate it */
1215     maxnz = 0;
1216     for ( i=0; i<size; i++ ) {
1217       maxnz = PetscMax(maxnz,procsnz[i]);
1218     }
1219     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1220 
1221     /* read in my part of the matrix column indices  */
1222     nz = procsnz[0];
1223     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1224     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1225 
1226     /* read in every one elses and ship off */
1227     for ( i=1; i<size; i++ ) {
1228       nz   = procsnz[i];
1229       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1230       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1231     }
1232     PetscFree(cols);
1233   } else {
1234     /* determine buffer space needed for message */
1235     nz = 0;
1236     for ( i=0; i<m; i++ ) {
1237       nz += ourlens[i];
1238     }
1239     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1240 
1241     /* receive message of column indices*/
1242     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1243     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1244     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1245   }
1246 
1247   /* loop over local rows, determining number of off diagonal entries */
1248   PetscMemzero(offlens,m*sizeof(int));
1249   jj = 0;
1250   for ( i=0; i<m; i++ ) {
1251     for ( j=0; j<ourlens[i]; j++ ) {
1252       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1253       jj++;
1254     }
1255   }
1256 
1257   /* create our matrix */
1258   for ( i=0; i<m; i++ ) {
1259     ourlens[i] -= offlens[i];
1260   }
1261   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1262   A = *newmat;
1263   for ( i=0; i<m; i++ ) {
1264     ourlens[i] += offlens[i];
1265   }
1266 
1267   if (!rank) {
1268     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1269 
1270     /* read in my part of the matrix numerical values  */
1271     nz = procsnz[0];
1272     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1273 
1274     /* insert into matrix */
1275     jj      = rstart;
1276     smycols = mycols;
1277     svals   = vals;
1278     for ( i=0; i<m; i++ ) {
1279       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1280       smycols += ourlens[i];
1281       svals   += ourlens[i];
1282       jj++;
1283     }
1284 
1285     /* read in other processors and ship out */
1286     for ( i=1; i<size; i++ ) {
1287       nz   = procsnz[i];
1288       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1289       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1290     }
1291     PetscFree(procsnz);
1292   } else {
1293     /* receive numeric values */
1294     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1295 
1296     /* receive message of values*/
1297     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1298     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1299     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1300 
1301     /* insert into matrix */
1302     jj      = rstart;
1303     smycols = mycols;
1304     svals   = vals;
1305     for ( i=0; i<m; i++ ) {
1306       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1307       smycols += ourlens[i];
1308       svals   += ourlens[i];
1309       jj++;
1310     }
1311   }
1312   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1313 
1314   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1315   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1316   PetscFunctionReturn(0);
1317 }
1318 
1319 
1320 
1321 
1322 
1323