xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 76be9ce4a233aaa47cda2bc7f5a27cd7faabecaa)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.77 1997/12/01 01:54:22 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 #undef __FUNC__
13 #define __FUNC__ "MatSetValues_MPIDense"
14 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   PetscFunctionBegin;
21   for ( i=0; i<m; i++ ) {
22     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
23     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
24     if (idxm[i] >= rstart && idxm[i] < rend) {
25       row = idxm[i] - rstart;
26       if (roworiented) {
27         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
28       } else {
29         for ( j=0; j<n; j++ ) {
30           if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
31           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
32           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
33         }
34       }
35     } else {
36       if (roworiented) {
37         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
38       } else { /* must stash each seperately */
39         row = idxm[i];
40         for ( j=0; j<n; j++ ) {
41           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
42         }
43       }
44     }
45   }
46   PetscFunctionReturn(0);
47 }
48 
49 #undef __FUNC__
50 #define __FUNC__ "MatGetValues_MPIDense"
51 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
52 {
53   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
54   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
55 
56   PetscFunctionBegin;
57   for ( i=0; i<m; i++ ) {
58     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
59     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
60     if (idxm[i] >= rstart && idxm[i] < rend) {
61       row = idxm[i] - rstart;
62       for ( j=0; j<n; j++ ) {
63         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
64         if (idxn[j] >= mdn->N) {
65           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
66         }
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     } else {
70       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
71     }
72   }
73   PetscFunctionReturn(0);
74 }
75 
76 #undef __FUNC__
77 #define __FUNC__ "MatGetArray_MPIDense"
78 int MatGetArray_MPIDense(Mat A,Scalar **array)
79 {
80   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
81   int          ierr;
82 
83   PetscFunctionBegin;
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   PetscFunctionReturn(0);
86 }
87 
88 #undef __FUNC__
89 #define __FUNC__ "MatRestoreArray_MPIDense"
90 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
91 {
92   PetscFunctionBegin;
93   PetscFunctionReturn(0);
94 }
95 
96 #undef __FUNC__
97 #define __FUNC__ "MatAssemblyBegin_MPIDense"
98 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
99 {
100   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
101   MPI_Comm     comm = mat->comm;
102   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
103   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
104   int          tag = mat->tag, *owner,*starts,count,ierr;
105   InsertMode   addv;
106   MPI_Request  *send_waits,*recv_waits;
107   Scalar       *rvalues,*svalues;
108 
109   PetscFunctionBegin;
110   /* make sure all processors are either in INSERTMODE or ADDMODE */
111   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
112   if (addv == (ADD_VALUES|INSERT_VALUES)) {
113     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
114   }
115   mat->insertmode = addv; /* in case this processor had no cache */
116 
117   /*  first count number of contributors to each processor */
118   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
119   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
120   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
121   for ( i=0; i<mdn->stash.n; i++ ) {
122     idx = mdn->stash.idx[i];
123     for ( j=0; j<size; j++ ) {
124       if (idx >= owners[j] && idx < owners[j+1]) {
125         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
126       }
127     }
128   }
129   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
130 
131   /* inform other processors of number of messages and max length*/
132   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
133   ierr = MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
134   nreceives = work[rank];
135   if (nreceives > size) SETERRQ(PETSC_ERR_PLIB,0,"Internal PETSc error");
136   ierr = MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
137   nmax = work[rank];
138   PetscFree(work);
139 
140   /* post receives:
141        1) each message will consist of ordered pairs
142      (global index,value) we store the global index as a double
143      to simplify the message passing.
144        2) since we don't know how long each individual message is we
145      allocate the largest needed buffer for each receive. Potentially
146      this is a lot of wasted space.
147 
148        This could be done better.
149   */
150   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
151   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
152   for ( i=0; i<nreceives; i++ ) {
153     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
154   }
155 
156   /* do sends:
157       1) starts[i] gives the starting index in svalues for stuff going to
158          the ith processor
159   */
160   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
161   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
162   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
163   starts[0] = 0;
164   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
165   for ( i=0; i<mdn->stash.n; i++ ) {
166     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
167     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
168     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
169   }
170   PetscFree(owner);
171   starts[0] = 0;
172   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
173   count = 0;
174   for ( i=0; i<size; i++ ) {
175     if (procs[i]) {
176       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
177     }
178   }
179   PetscFree(starts); PetscFree(nprocs);
180 
181   /* Free cache space */
182   PLogInfo(mat,"MatAssemblyBegin_MPIDense:Number of off-processor values %d\n",mdn->stash.n);
183   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
184 
185   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
186   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
187   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
188   mdn->rmax       = nmax;
189 
190   PetscFunctionReturn(0);
191 }
192 extern int MatSetUpMultiply_MPIDense(Mat);
193 
194 #undef __FUNC__
195 #define __FUNC__ "MatAssemblyEnd_MPIDense"
196 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
197 {
198   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
199   MPI_Status   *send_status,recv_status;
200   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
201   Scalar       *values,val;
202   InsertMode   addv = mat->insertmode;
203 
204   PetscFunctionBegin;
205   /*  wait on receives */
206   while (count) {
207     ierr = MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
208     /* unpack receives into our local space */
209     values = mdn->rvalues + 3*imdex*mdn->rmax;
210     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
211     n = n/3;
212     for ( i=0; i<n; i++ ) {
213       row = (int) PetscReal(values[3*i]) - mdn->rstart;
214       col = (int) PetscReal(values[3*i+1]);
215       val = values[3*i+2];
216       if (col >= 0 && col < mdn->N) {
217         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
218       }
219       else {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid column");}
220     }
221     count--;
222   }
223   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
224 
225   /* wait on sends */
226   if (mdn->nsends) {
227     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
228     ierr        = MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);CHKERRQ(ierr);
229     PetscFree(send_status);
230   }
231   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
232 
233   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
234   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
235 
236   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
237     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
238   }
239   PetscFunctionReturn(0);
240 }
241 
242 #undef __FUNC__
243 #define __FUNC__ "MatZeroEntries_MPIDense"
244 int MatZeroEntries_MPIDense(Mat A)
245 {
246   int          ierr;
247   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
248 
249   PetscFunctionBegin;
250   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
251   PetscFunctionReturn(0);
252 }
253 
254 #undef __FUNC__
255 #define __FUNC__ "MatGetBlockSize_MPIDense"
256 int MatGetBlockSize_MPIDense(Mat A,int *bs)
257 {
258   PetscFunctionBegin;
259   *bs = 1;
260   PetscFunctionReturn(0);
261 }
262 
263 /* the code does not do the diagonal entries correctly unless the
264    matrix is square and the column and row owerships are identical.
265    This is a BUG. The only way to fix it seems to be to access
266    mdn->A and mdn->B directly and not through the MatZeroRows()
267    routine.
268 */
269 #undef __FUNC__
270 #define __FUNC__ "MatZeroRows_MPIDense"
271 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
272 {
273   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
274   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
275   int            *procs,*nprocs,j,found,idx,nsends,*work;
276   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
277   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
278   int            *lens,imdex,*lrows,*values;
279   MPI_Comm       comm = A->comm;
280   MPI_Request    *send_waits,*recv_waits;
281   MPI_Status     recv_status,*send_status;
282   IS             istmp;
283 
284   PetscFunctionBegin;
285   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
286   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
287 
288   /*  first count number of contributors to each processor */
289   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
290   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
291   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
292   for ( i=0; i<N; i++ ) {
293     idx = rows[i];
294     found = 0;
295     for ( j=0; j<size; j++ ) {
296       if (idx >= owners[j] && idx < owners[j+1]) {
297         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
298       }
299     }
300     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
301   }
302   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
303 
304   /* inform other processors of number of messages and max length*/
305   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
306   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
307   nrecvs = work[rank];
308   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
309   nmax   = work[rank];
310   PetscFree(work);
311 
312   /* post receives:   */
313   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
314   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
315   for ( i=0; i<nrecvs; i++ ) {
316     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
317   }
318 
319   /* do sends:
320       1) starts[i] gives the starting index in svalues for stuff going to
321          the ith processor
322   */
323   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
324   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
325   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
326   starts[0]  = 0;
327   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
328   for ( i=0; i<N; i++ ) {
329     svalues[starts[owner[i]]++] = rows[i];
330   }
331   ISRestoreIndices(is,&rows);
332 
333   starts[0] = 0;
334   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
335   count = 0;
336   for ( i=0; i<size; i++ ) {
337     if (procs[i]) {
338       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
339     }
340   }
341   PetscFree(starts);
342 
343   base = owners[rank];
344 
345   /*  wait on receives */
346   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
347   source = lens + nrecvs;
348   count  = nrecvs; slen = 0;
349   while (count) {
350     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
351     /* unpack receives into our local space */
352     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
353     source[imdex]  = recv_status.MPI_SOURCE;
354     lens[imdex]  = n;
355     slen += n;
356     count--;
357   }
358   PetscFree(recv_waits);
359 
360   /* move the data into the send scatter */
361   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
362   count = 0;
363   for ( i=0; i<nrecvs; i++ ) {
364     values = rvalues + i*nmax;
365     for ( j=0; j<lens[i]; j++ ) {
366       lrows[count++] = values[j] - base;
367     }
368   }
369   PetscFree(rvalues); PetscFree(lens);
370   PetscFree(owner); PetscFree(nprocs);
371 
372   /* actually zap the local rows */
373   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
374   PLogObjectParent(A,istmp);
375   PetscFree(lrows);
376   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
377   ierr = ISDestroy(istmp); CHKERRQ(ierr);
378 
379   /* wait on sends */
380   if (nsends) {
381     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
382     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
383     PetscFree(send_status);
384   }
385   PetscFree(send_waits); PetscFree(svalues);
386 
387   PetscFunctionReturn(0);
388 }
389 
390 #undef __FUNC__
391 #define __FUNC__ "MatMult_MPIDense"
392 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
395   int          ierr;
396 
397   PetscFunctionBegin;
398   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
399   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
400   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
401   PetscFunctionReturn(0);
402 }
403 
404 #undef __FUNC__
405 #define __FUNC__ "MatMultAdd_MPIDense"
406 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
407 {
408   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
409   int          ierr;
410 
411   PetscFunctionBegin;
412   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
413   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
414   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
415   PetscFunctionReturn(0);
416 }
417 
418 #undef __FUNC__
419 #define __FUNC__ "MatMultTrans_MPIDense"
420 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
421 {
422   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
423   int          ierr;
424   Scalar       zero = 0.0;
425 
426   PetscFunctionBegin;
427   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
428   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
429   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
430   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
431   PetscFunctionReturn(0);
432 }
433 
434 #undef __FUNC__
435 #define __FUNC__ "MatMultTransAdd_MPIDense"
436 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
437 {
438   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
439   int          ierr;
440 
441   PetscFunctionBegin;
442   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
443   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
444   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
445   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
446   PetscFunctionReturn(0);
447 }
448 
449 #undef __FUNC__
450 #define __FUNC__ "MatGetDiagonal_MPIDense"
451 int MatGetDiagonal_MPIDense(Mat A,Vec v)
452 {
453   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
454   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
455   int          ierr, len, i, n, m = a->m, radd;
456   Scalar       *x, zero = 0.0;
457 
458   PetscFunctionBegin;
459   VecSet(&zero,v);
460   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
461   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
462   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
463   len = PetscMin(aloc->m,aloc->n);
464   radd = a->rstart*m;
465   for ( i=0; i<len; i++ ) {
466     x[i] = aloc->v[radd + i*m + i];
467   }
468   PetscFunctionReturn(0);
469 }
470 
471 #undef __FUNC__
472 #define __FUNC__ "MatDestroy_MPIDense"
473 int MatDestroy_MPIDense(PetscObject obj)
474 {
475   Mat          mat = (Mat) obj;
476   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
477   int          ierr;
478 
479   PetscFunctionBegin;
480 #if defined(USE_PETSC_LOG)
481   PLogObjectState(obj,"Rows=%d, Cols=%d",mdn->M,mdn->N);
482 #endif
483   PetscFree(mdn->rowners);
484   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
485   if (mdn->lvec)   VecDestroy(mdn->lvec);
486   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
487   if (mdn->factor) {
488     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
489     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
490     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
491     PetscFree(mdn->factor);
492   }
493   PetscFree(mdn);
494   if (mat->mapping) {
495     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
496   }
497   PLogObjectDestroy(mat);
498   PetscHeaderDestroy(mat);
499   PetscFunctionReturn(0);
500 }
501 
502 #include "pinclude/pviewer.h"
503 
504 #undef __FUNC__
505 #define __FUNC__ "MatView_MPIDense_Binary"
506 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
507 {
508   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
509   int          ierr;
510 
511   PetscFunctionBegin;
512   if (mdn->size == 1) {
513     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
514   }
515   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
516   PetscFunctionReturn(0);
517 }
518 
519 #undef __FUNC__
520 #define __FUNC__ "MatView_MPIDense_ASCII"
521 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
522 {
523   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
524   int          ierr, format;
525   FILE         *fd;
526   ViewerType   vtype;
527 
528   PetscFunctionBegin;
529   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
530   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
531   ierr = ViewerGetFormat(viewer,&format);
532   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
533     int rank;
534     MatInfo info;
535     MPI_Comm_rank(mat->comm,&rank);
536     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
537     PetscSequentialPhaseBegin(mat->comm,1);
538       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
539          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
540       fflush(fd);
541     PetscSequentialPhaseEnd(mat->comm,1);
542     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
543     PetscFunctionReturn(0);
544   }
545   else if (format == VIEWER_FORMAT_ASCII_INFO) {
546     PetscFunctionReturn(0);
547   }
548   if (vtype == ASCII_FILE_VIEWER) {
549     PetscSequentialPhaseBegin(mat->comm,1);
550     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
551              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
552     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
553     fflush(fd);
554     PetscSequentialPhaseEnd(mat->comm,1);
555   } else {
556     int size = mdn->size, rank = mdn->rank;
557     if (size == 1) {
558       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
559     } else {
560       /* assemble the entire matrix onto first processor. */
561       Mat          A;
562       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
563       Scalar       *vals;
564       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
565 
566       if (!rank) {
567         ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
568       } else {
569         ierr = MatCreateMPIDense(mat->comm,0,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
570       }
571       PLogObjectParent(mat,A);
572 
573       /* Copy the matrix ... This isn't the most efficient means,
574          but it's quick for now */
575       row = mdn->rstart; m = Amdn->m;
576       for ( i=0; i<m; i++ ) {
577         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
578         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
579         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
580         row++;
581       }
582 
583       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
584       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
585       if (!rank) {
586         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
587       }
588       ierr = MatDestroy(A); CHKERRQ(ierr);
589     }
590   }
591   PetscFunctionReturn(0);
592 }
593 
594 #undef __FUNC__
595 #define __FUNC__ "MatView_MPIDense"
596 int MatView_MPIDense(PetscObject obj,Viewer viewer)
597 {
598   Mat          mat = (Mat) obj;
599   int          ierr;
600   ViewerType   vtype;
601 
602   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
603   if (vtype == ASCII_FILE_VIEWER) {
604     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
605   } else if (vtype == ASCII_FILES_VIEWER) {
606     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
607   } else if (vtype == BINARY_FILE_VIEWER) {
608     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
609   }
610   PetscFunctionReturn(0);
611 }
612 
613 #undef __FUNC__
614 #define __FUNC__ "MatGetInfo_MPIDense"
615 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
616 {
617   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
618   Mat          mdn = mat->A;
619   int          ierr;
620   double       isend[5], irecv[5];
621 
622   PetscFunctionBegin;
623   info->rows_global    = (double)mat->M;
624   info->columns_global = (double)mat->N;
625   info->rows_local     = (double)mat->m;
626   info->columns_local  = (double)mat->N;
627   info->block_size     = 1.0;
628   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
629   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
630   isend[3] = info->memory;  isend[4] = info->mallocs;
631   if (flag == MAT_LOCAL) {
632     info->nz_used      = isend[0];
633     info->nz_allocated = isend[1];
634     info->nz_unneeded  = isend[2];
635     info->memory       = isend[3];
636     info->mallocs      = isend[4];
637   } else if (flag == MAT_GLOBAL_MAX) {
638     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_MAX,A->comm);CHKERRQ(ierr);
639     info->nz_used      = irecv[0];
640     info->nz_allocated = irecv[1];
641     info->nz_unneeded  = irecv[2];
642     info->memory       = irecv[3];
643     info->mallocs      = irecv[4];
644   } else if (flag == MAT_GLOBAL_SUM) {
645     ierr = MPI_Allreduce(isend,irecv,5,MPI_INT,MPI_SUM,A->comm);CHKERRQ(ierr);
646     info->nz_used      = irecv[0];
647     info->nz_allocated = irecv[1];
648     info->nz_unneeded  = irecv[2];
649     info->memory       = irecv[3];
650     info->mallocs      = irecv[4];
651   }
652   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
653   info->fill_ratio_needed = 0;
654   info->factor_mallocs    = 0;
655   PetscFunctionReturn(0);
656 }
657 
658 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
659    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
660    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
661    extern int MatSolve_MPIDense(Mat,Vec,Vec);
662    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
663    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
664    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
665 
666 #undef __FUNC__
667 #define __FUNC__ "MatSetOption_MPIDense"
668 int MatSetOption_MPIDense(Mat A,MatOption op)
669 {
670   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
671 
672   PetscFunctionBegin;
673   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
674       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
675       op == MAT_NEW_NONZERO_LOCATION_ERROR ||
676       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
677       op == MAT_COLUMNS_SORTED ||
678       op == MAT_COLUMNS_UNSORTED) {
679         MatSetOption(a->A,op);
680   } else if (op == MAT_ROW_ORIENTED) {
681         a->roworiented = 1;
682         MatSetOption(a->A,op);
683   } else if (op == MAT_ROWS_SORTED ||
684              op == MAT_ROWS_UNSORTED ||
685              op == MAT_SYMMETRIC ||
686              op == MAT_STRUCTURALLY_SYMMETRIC ||
687              op == MAT_YES_NEW_DIAGONALS) {
688     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
689   } else if (op == MAT_COLUMN_ORIENTED) {
690     a->roworiented = 0; MatSetOption(a->A,op);
691   } else if (op == MAT_NO_NEW_DIAGONALS) {
692     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
693   } else {
694     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
695   }
696   PetscFunctionReturn(0);
697 }
698 
699 #undef __FUNC__
700 #define __FUNC__ "MatGetSize_MPIDense"
701 int MatGetSize_MPIDense(Mat A,int *m,int *n)
702 {
703   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
704 
705   PetscFunctionBegin;
706   *m = mat->M; *n = mat->N;
707   PetscFunctionReturn(0);
708 }
709 
710 #undef __FUNC__
711 #define __FUNC__ "MatGetLocalSize_MPIDense"
712 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
713 {
714   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
715 
716   PetscFunctionBegin;
717   *m = mat->m; *n = mat->N;
718   PetscFunctionReturn(0);
719 }
720 
721 #undef __FUNC__
722 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
723 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
724 {
725   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
726 
727   PetscFunctionBegin;
728   *m = mat->rstart; *n = mat->rend;
729   PetscFunctionReturn(0);
730 }
731 
732 #undef __FUNC__
733 #define __FUNC__ "MatGetRow_MPIDense"
734 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
735 {
736   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
737   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
738 
739   PetscFunctionBegin;
740   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
741   lrow = row - rstart;
742   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
743   PetscFunctionReturn(0);
744 }
745 
746 #undef __FUNC__
747 #define __FUNC__ "MatRestoreRow_MPIDense"
748 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
749 {
750   PetscFunctionBegin;
751   if (idx) PetscFree(*idx);
752   if (v) PetscFree(*v);
753   PetscFunctionReturn(0);
754 }
755 
756 #undef __FUNC__
757 #define __FUNC__ "MatNorm_MPIDense"
758 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
759 {
760   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
761   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
762   int          ierr, i, j;
763   double       sum = 0.0;
764   Scalar       *v = mat->v;
765 
766   PetscFunctionBegin;
767   if (mdn->size == 1) {
768     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
769   } else {
770     if (type == NORM_FROBENIUS) {
771       for (i=0; i<mat->n*mat->m; i++ ) {
772 #if defined(USE_PETSC_COMPLEX)
773         sum += real(conj(*v)*(*v)); v++;
774 #else
775         sum += (*v)*(*v); v++;
776 #endif
777       }
778       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
779       *norm = sqrt(*norm);
780       PLogFlops(2*mat->n*mat->m);
781     } else if (type == NORM_1) {
782       double *tmp, *tmp2;
783       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
784       tmp2 = tmp + mdn->N;
785       PetscMemzero(tmp,2*mdn->N*sizeof(double));
786       *norm = 0.0;
787       v = mat->v;
788       for ( j=0; j<mat->n; j++ ) {
789         for ( i=0; i<mat->m; i++ ) {
790           tmp[j] += PetscAbsScalar(*v);  v++;
791         }
792       }
793       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
794       for ( j=0; j<mdn->N; j++ ) {
795         if (tmp2[j] > *norm) *norm = tmp2[j];
796       }
797       PetscFree(tmp);
798       PLogFlops(mat->n*mat->m);
799     } else if (type == NORM_INFINITY) { /* max row norm */
800       double ntemp;
801       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
802       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
803     } else {
804       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
805     }
806   }
807   PetscFunctionReturn(0);
808 }
809 
810 #undef __FUNC__
811 #define __FUNC__ "MatTranspose_MPIDense"
812 int MatTranspose_MPIDense(Mat A,Mat *matout)
813 {
814   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
815   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
816   Mat          B;
817   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
818   int          j, i, ierr;
819   Scalar       *v;
820 
821   PetscFunctionBegin;
822   if (matout == PETSC_NULL && M != N) {
823     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
824   }
825   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
826 
827   m = Aloc->m; n = Aloc->n; v = Aloc->v;
828   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
829   for ( j=0; j<n; j++ ) {
830     for (i=0; i<m; i++) rwork[i] = rstart + i;
831     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
832     v   += m;
833   }
834   PetscFree(rwork);
835   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
836   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
837   if (matout != PETSC_NULL) {
838     *matout = B;
839   } else {
840     /* This isn't really an in-place transpose, but free data struct from a */
841     PetscFree(a->rowners);
842     ierr = MatDestroy(a->A); CHKERRQ(ierr);
843     if (a->lvec) VecDestroy(a->lvec);
844     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
845     PetscFree(a);
846     PetscMemcpy(A,B,sizeof(struct _p_Mat));
847     PetscHeaderDestroy(B);
848   }
849   PetscFunctionReturn(0);
850 }
851 
852 #include "pinclude/plapack.h"
853 #undef __FUNC__
854 #define __FUNC__ "MatScale_MPIDense"
855 int MatScale_MPIDense(Scalar *alpha,Mat inA)
856 {
857   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
858   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
859   int          one = 1, nz;
860 
861   PetscFunctionBegin;
862   nz = a->m*a->n;
863   BLscal_( &nz, alpha, a->v, &one );
864   PLogFlops(nz);
865   PetscFunctionReturn(0);
866 }
867 
868 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
869 
870 /* -------------------------------------------------------------------*/
871 static struct _MatOps MatOps = {MatSetValues_MPIDense,
872        MatGetRow_MPIDense,MatRestoreRow_MPIDense,
873        MatMult_MPIDense,MatMultAdd_MPIDense,
874        MatMultTrans_MPIDense,MatMultTransAdd_MPIDense,
875 /*       MatSolve_MPIDense,0, */
876        0,0,
877        0,0,
878        0,0,
879 /*       MatLUFactor_MPIDense,0, */
880        0,MatTranspose_MPIDense,
881        MatGetInfo_MPIDense,0,
882        MatGetDiagonal_MPIDense,0,MatNorm_MPIDense,
883        MatAssemblyBegin_MPIDense,MatAssemblyEnd_MPIDense,
884        0,
885        MatSetOption_MPIDense,MatZeroEntries_MPIDense,MatZeroRows_MPIDense,
886        0,0,
887 /*       0,MatLUFactorSymbolic_MPIDense,MatLUFactorNumeric_MPIDense, */
888        0,0,
889        MatGetSize_MPIDense,MatGetLocalSize_MPIDense,
890        MatGetOwnershipRange_MPIDense,
891        0,0, MatGetArray_MPIDense, MatRestoreArray_MPIDense,
892        MatConvertSameType_MPIDense,
893        0,0,0,0,0,
894        0,0,MatGetValues_MPIDense,0,0,MatScale_MPIDense,
895        0,0,0,MatGetBlockSize_MPIDense};
896 
897 #undef __FUNC__
898 #define __FUNC__ "MatCreateMPIDense"
899 /*@C
900    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
901 
902    Input Parameters:
903 .  comm - MPI communicator
904 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
905 .  n - number of local columns (or PETSC_DECIDE to have calculated
906            if N is given)
907 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
908 .  N - number of global columns (or PETSC_DECIDE to have calculated
909            if n is given)
910 .  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
911    to control all matrix memory allocation.
912 
913    Output Parameter:
914 .  A - the matrix
915 
916    Notes:
917    The dense format is fully compatible with standard Fortran 77
918    storage by columns.
919 
920    The data input variable is intended primarily for Fortran programmers
921    who wish to allocate their own matrix memory space.  Most users should
922    set data=PETSC_NULL.
923 
924    The user MUST specify either the local or global matrix dimensions
925    (possibly both).
926 
927    Currently, the only parallel dense matrix decomposition is by rows,
928    so that n=N and each submatrix owns all of the global columns.
929 
930 .keywords: matrix, dense, parallel
931 
932 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
933 @*/
934 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
935 {
936   Mat          mat;
937   Mat_MPIDense *a;
938   int          ierr, i,flg;
939 
940   PetscFunctionBegin;
941   /* Note:  For now, when data is specified above, this assumes the user correctly
942    allocates the local dense storage space.  We should add error checking. */
943 
944   *A = 0;
945   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIDENSE,comm,MatDestroy,MatView);
946   PLogObjectCreate(mat);
947   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
948   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
949   mat->destroy    = MatDestroy_MPIDense;
950   mat->view       = MatView_MPIDense;
951   mat->factor     = 0;
952   mat->mapping    = 0;
953 
954   a->factor       = 0;
955   mat->insertmode = NOT_SET_VALUES;
956   MPI_Comm_rank(comm,&a->rank);
957   MPI_Comm_size(comm,&a->size);
958 
959   if (M == PETSC_DECIDE) {ierr = MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);}
960   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
961 
962   /* each row stores all columns */
963   if (N == PETSC_DECIDE) N = n;
964   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
965   /*  if (n != N) SETERRQ(PETSC_ERR_SUP,0,"For now, only n=N is supported"); */
966   a->N = mat->N = N;
967   a->M = mat->M = M;
968   a->m = mat->m = m;
969   a->n = mat->n = n;
970 
971   /* build local table of row and column ownerships */
972   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
973   a->cowners = a->rowners + a->size + 1;
974   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
975   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
976   a->rowners[0] = 0;
977   for ( i=2; i<=a->size; i++ ) {
978     a->rowners[i] += a->rowners[i-1];
979   }
980   a->rstart = a->rowners[a->rank];
981   a->rend   = a->rowners[a->rank+1];
982   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
983   a->cowners[0] = 0;
984   for ( i=2; i<=a->size; i++ ) {
985     a->cowners[i] += a->cowners[i-1];
986   }
987 
988   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
989   PLogObjectParent(mat,a->A);
990 
991   /* build cache for off array entries formed */
992   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
993 
994   /* stuff used for matrix vector multiply */
995   a->lvec        = 0;
996   a->Mvctx       = 0;
997   a->roworiented = 1;
998 
999   *A = mat;
1000   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1001   if (flg) {
1002     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1003   }
1004   PetscFunctionReturn(0);
1005 }
1006 
1007 #undef __FUNC__
1008 #define __FUNC__ "MatConvertSameType_MPIDense"
1009 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
1010 {
1011   Mat          mat;
1012   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
1013   int          ierr;
1014   FactorCtx    *factor;
1015 
1016   PetscFunctionBegin;
1017   *newmat       = 0;
1018   PetscHeaderCreate(mat,_p_Mat,MAT_COOKIE,MATMPIDENSE,A->comm,MatDestroy,MatView);
1019   PLogObjectCreate(mat);
1020   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
1021   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1022   mat->destroy   = MatDestroy_MPIDense;
1023   mat->view      = MatView_MPIDense;
1024   mat->factor    = A->factor;
1025   mat->assembled = PETSC_TRUE;
1026 
1027   a->m = mat->m = oldmat->m;
1028   a->n = mat->n = oldmat->n;
1029   a->M = mat->M = oldmat->M;
1030   a->N = mat->N = oldmat->N;
1031   if (oldmat->factor) {
1032     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
1033     /* copy factor contents ... add this code! */
1034   } else a->factor = 0;
1035 
1036   a->rstart       = oldmat->rstart;
1037   a->rend         = oldmat->rend;
1038   a->size         = oldmat->size;
1039   a->rank         = oldmat->rank;
1040   mat->insertmode = NOT_SET_VALUES;
1041 
1042   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1043   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1044   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1045   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1046 
1047   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1048   PLogObjectParent(mat,a->lvec);
1049   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1050   PLogObjectParent(mat,a->Mvctx);
1051   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1052   PLogObjectParent(mat,a->A);
1053   *newmat = mat;
1054   PetscFunctionReturn(0);
1055 }
1056 
1057 #include "sys.h"
1058 
1059 #undef __FUNC__
1060 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1061 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1062 {
1063   int        *rowners, i,size,rank,m,rstart,rend,ierr,nz,j;
1064   Scalar     *array,*vals,*vals_ptr;
1065   MPI_Status status;
1066 
1067   PetscFunctionBegin;
1068   MPI_Comm_rank(comm,&rank);
1069   MPI_Comm_size(comm,&size);
1070 
1071   /* determine ownership of all rows */
1072   m          = M/size + ((M % size) > rank);
1073   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1074   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1075   rowners[0] = 0;
1076   for ( i=2; i<=size; i++ ) {
1077     rowners[i] += rowners[i-1];
1078   }
1079   rstart = rowners[rank];
1080   rend   = rowners[rank+1];
1081 
1082   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1083   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
1084 
1085   if (!rank) {
1086     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1087 
1088     /* read in my part of the matrix numerical values  */
1089     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR); CHKERRQ(ierr);
1090 
1091     /* insert into matrix-by row (this is why cannot directly read into array */
1092     vals_ptr = vals;
1093     for ( i=0; i<m; i++ ) {
1094       for ( j=0; j<N; j++ ) {
1095         array[i + j*m] = *vals_ptr++;
1096       }
1097     }
1098 
1099     /* read in other processors and ship out */
1100     for ( i=1; i<size; i++ ) {
1101       nz   = (rowners[i+1] - rowners[i])*N;
1102       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1103       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1104     }
1105   } else {
1106     /* receive numeric values */
1107     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1108 
1109     /* receive message of values*/
1110     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1111 
1112     /* insert into matrix-by row (this is why cannot directly read into array */
1113     vals_ptr = vals;
1114     for ( i=0; i<m; i++ ) {
1115       for ( j=0; j<N; j++ ) {
1116         array[i + j*m] = *vals_ptr++;
1117       }
1118     }
1119   }
1120   PetscFree(rowners);
1121   PetscFree(vals);
1122   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1123   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1124   PetscFunctionReturn(0);
1125 }
1126 
1127 
1128 #undef __FUNC__
1129 #define __FUNC__ "MatLoad_MPIDense"
1130 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1131 {
1132   Mat          A;
1133   Scalar       *vals,*svals;
1134   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1135   MPI_Status   status;
1136   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1137   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1138   int          tag = ((PetscObject)viewer)->tag;
1139   int          i, nz, ierr, j,rstart, rend, fd;
1140 
1141   PetscFunctionBegin;
1142   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1143   if (!rank) {
1144     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1145     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1146     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1147   }
1148 
1149   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1150   M = header[1]; N = header[2]; nz = header[3];
1151 
1152   /*
1153        Handle case where matrix is stored on disk as a dense matrix
1154   */
1155   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1156     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1157     PetscFunctionReturn(0);
1158   }
1159 
1160   /* determine ownership of all rows */
1161   m          = M/size + ((M % size) > rank);
1162   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1163   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1164   rowners[0] = 0;
1165   for ( i=2; i<=size; i++ ) {
1166     rowners[i] += rowners[i-1];
1167   }
1168   rstart = rowners[rank];
1169   rend   = rowners[rank+1];
1170 
1171   /* distribute row lengths to all processors */
1172   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1173   offlens = ourlens + (rend-rstart);
1174   if (!rank) {
1175     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1176     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1177     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1178     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1179     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1180     PetscFree(sndcounts);
1181   } else {
1182     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1183   }
1184 
1185   if (!rank) {
1186     /* calculate the number of nonzeros on each processor */
1187     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1188     PetscMemzero(procsnz,size*sizeof(int));
1189     for ( i=0; i<size; i++ ) {
1190       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1191         procsnz[i] += rowlengths[j];
1192       }
1193     }
1194     PetscFree(rowlengths);
1195 
1196     /* determine max buffer needed and allocate it */
1197     maxnz = 0;
1198     for ( i=0; i<size; i++ ) {
1199       maxnz = PetscMax(maxnz,procsnz[i]);
1200     }
1201     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1202 
1203     /* read in my part of the matrix column indices  */
1204     nz = procsnz[0];
1205     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1206     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1207 
1208     /* read in every one elses and ship off */
1209     for ( i=1; i<size; i++ ) {
1210       nz   = procsnz[i];
1211       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1212       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1213     }
1214     PetscFree(cols);
1215   } else {
1216     /* determine buffer space needed for message */
1217     nz = 0;
1218     for ( i=0; i<m; i++ ) {
1219       nz += ourlens[i];
1220     }
1221     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1222 
1223     /* receive message of column indices*/
1224     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1225     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1226     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1227   }
1228 
1229   /* loop over local rows, determining number of off diagonal entries */
1230   PetscMemzero(offlens,m*sizeof(int));
1231   jj = 0;
1232   for ( i=0; i<m; i++ ) {
1233     for ( j=0; j<ourlens[i]; j++ ) {
1234       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1235       jj++;
1236     }
1237   }
1238 
1239   /* create our matrix */
1240   for ( i=0; i<m; i++ ) {
1241     ourlens[i] -= offlens[i];
1242   }
1243   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1244   A = *newmat;
1245   for ( i=0; i<m; i++ ) {
1246     ourlens[i] += offlens[i];
1247   }
1248 
1249   if (!rank) {
1250     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1251 
1252     /* read in my part of the matrix numerical values  */
1253     nz = procsnz[0];
1254     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1255 
1256     /* insert into matrix */
1257     jj      = rstart;
1258     smycols = mycols;
1259     svals   = vals;
1260     for ( i=0; i<m; i++ ) {
1261       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1262       smycols += ourlens[i];
1263       svals   += ourlens[i];
1264       jj++;
1265     }
1266 
1267     /* read in other processors and ship out */
1268     for ( i=1; i<size; i++ ) {
1269       nz   = procsnz[i];
1270       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1271       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1272     }
1273     PetscFree(procsnz);
1274   } else {
1275     /* receive numeric values */
1276     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1277 
1278     /* receive message of values*/
1279     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1280     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1281     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1282 
1283     /* insert into matrix */
1284     jj      = rstart;
1285     smycols = mycols;
1286     svals   = vals;
1287     for ( i=0; i<m; i++ ) {
1288       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1289       smycols += ourlens[i];
1290       svals   += ourlens[i];
1291       jj++;
1292     }
1293   }
1294   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1295 
1296   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1297   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1298   PetscFunctionReturn(0);
1299 }
1300 
1301 
1302 
1303 
1304 
1305