xref: /petsc/src/mat/impls/dense/mpi/mpidense.c (revision 488ecbaffa467bb032d31d7eb20bc6d0ef6d986c)
1 #ifdef PETSC_RCS_HEADER
2 static char vcid[] = "$Id: mpidense.c,v 1.95 1998/07/14 03:10:52 bsmith Exp bsmith $";
3 #endif
4 
5 /*
6    Basic functions for basic parallel dense matrices.
7 */
8 
9 #include "src/mat/impls/dense/mpi/mpidense.h"
10 #include "src/vec/vecimpl.h"
11 
12 #undef __FUNC__
13 #define __FUNC__ "MatSetValues_MPIDense"
14 int MatSetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v,InsertMode addv)
15 {
16   Mat_MPIDense *A = (Mat_MPIDense *) mat->data;
17   int          ierr, i, j, rstart = A->rstart, rend = A->rend, row;
18   int          roworiented = A->roworiented;
19 
20   PetscFunctionBegin;
21   for ( i=0; i<m; i++ ) {
22     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
23     if (idxm[i] >= A->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
24     if (idxm[i] >= rstart && idxm[i] < rend) {
25       row = idxm[i] - rstart;
26       if (roworiented) {
27         ierr = MatSetValues(A->A,1,&row,n,idxn,v+i*n,addv); CHKERRQ(ierr);
28       } else {
29         for ( j=0; j<n; j++ ) {
30           if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
31           if (idxn[j] >= A->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
32           ierr = MatSetValues(A->A,1,&row,1,&idxn[j],v+i+j*m,addv); CHKERRQ(ierr);
33         }
34       }
35     } else {
36       if (roworiented) {
37         ierr = StashValues_Private(&A->stash,idxm[i],n,idxn,v+i*n,addv); CHKERRQ(ierr);
38       } else { /* must stash each seperately */
39         row = idxm[i];
40         for ( j=0; j<n; j++ ) {
41           ierr = StashValues_Private(&A->stash,row,1,&idxn[j],v+i+j*m,addv);CHKERRQ(ierr);
42         }
43       }
44     }
45   }
46   PetscFunctionReturn(0);
47 }
48 
49 #undef __FUNC__
50 #define __FUNC__ "MatGetValues_MPIDense"
51 int MatGetValues_MPIDense(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
52 {
53   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
54   int          ierr, i, j, rstart = mdn->rstart, rend = mdn->rend, row;
55 
56   PetscFunctionBegin;
57   for ( i=0; i<m; i++ ) {
58     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
59     if (idxm[i] >= mdn->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
60     if (idxm[i] >= rstart && idxm[i] < rend) {
61       row = idxm[i] - rstart;
62       for ( j=0; j<n; j++ ) {
63         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
64         if (idxn[j] >= mdn->N) {
65           SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
66         }
67         ierr = MatGetValues(mdn->A,1,&row,1,&idxn[j],v+i*n+j); CHKERRQ(ierr);
68       }
69     } else {
70       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
71     }
72   }
73   PetscFunctionReturn(0);
74 }
75 
76 #undef __FUNC__
77 #define __FUNC__ "MatGetArray_MPIDense"
78 int MatGetArray_MPIDense(Mat A,Scalar **array)
79 {
80   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
81   int          ierr;
82 
83   PetscFunctionBegin;
84   ierr = MatGetArray(a->A,array); CHKERRQ(ierr);
85   PetscFunctionReturn(0);
86 }
87 
88 #undef __FUNC__
89 #define __FUNC__ "MatRestoreArray_MPIDense"
90 int MatRestoreArray_MPIDense(Mat A,Scalar **array)
91 {
92   PetscFunctionBegin;
93   PetscFunctionReturn(0);
94 }
95 
96 #undef __FUNC__
97 #define __FUNC__ "MatAssemblyBegin_MPIDense"
98 int MatAssemblyBegin_MPIDense(Mat mat,MatAssemblyType mode)
99 {
100   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
101   MPI_Comm     comm = mat->comm;
102   int          size = mdn->size, *owners = mdn->rowners, rank = mdn->rank;
103   int          *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
104   int          tag = mat->tag, *owner,*starts,count,ierr;
105   InsertMode   addv;
106   MPI_Request  *send_waits,*recv_waits;
107   Scalar       *rvalues,*svalues;
108 
109   PetscFunctionBegin;
110   /* make sure all processors are either in INSERTMODE or ADDMODE */
111   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
112   if (addv == (ADD_VALUES|INSERT_VALUES)) {
113     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Cannot mix adds/inserts on different procs");
114   }
115   mat->insertmode = addv; /* in case this processor had no cache */
116 
117   /*  first count number of contributors to each processor */
118   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
119   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
120   owner = (int *) PetscMalloc( (mdn->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
121   for ( i=0; i<mdn->stash.n; i++ ) {
122     idx = mdn->stash.idx[i];
123     for ( j=0; j<size; j++ ) {
124       if (idx >= owners[j] && idx < owners[j+1]) {
125         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
126       }
127     }
128   }
129   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
130 
131   /* inform other processors of number of messages and max length*/
132   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
133   ierr = MPI_Allreduce(procs,work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
134   nreceives = work[rank];
135   if (nreceives > size) SETERRQ(PETSC_ERR_PLIB,0,"Internal PETSc error");
136   ierr = MPI_Allreduce(nprocs,work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
137   nmax = work[rank];
138   PetscFree(work);
139 
140   /* post receives:
141        1) each message will consist of ordered pairs
142      (global index,value) we store the global index as a double
143      to simplify the message passing.
144        2) since we don't know how long each individual message is we
145      allocate the largest needed buffer for each receive. Potentially
146      this is a lot of wasted space.
147 
148        This could be done better.
149   */
150   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
151   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
152   for ( i=0; i<nreceives; i++ ) {
153     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
154   }
155 
156   /* do sends:
157       1) starts[i] gives the starting index in svalues for stuff going to
158          the ith processor
159   */
160   svalues = (Scalar *) PetscMalloc( 3*(mdn->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
161   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
162   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
163   starts[0] = 0;
164   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
165   for ( i=0; i<mdn->stash.n; i++ ) {
166     svalues[3*starts[owner[i]]]       = (Scalar)  mdn->stash.idx[i];
167     svalues[3*starts[owner[i]]+1]     = (Scalar)  mdn->stash.idy[i];
168     svalues[3*(starts[owner[i]]++)+2] =  mdn->stash.array[i];
169   }
170   PetscFree(owner);
171   starts[0] = 0;
172   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
173   count = 0;
174   for ( i=0; i<size; i++ ) {
175     if (procs[i]) {
176       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
177     }
178   }
179   PetscFree(starts); PetscFree(nprocs);
180 
181   /* Free cache space */
182   PLogInfo(mat,"MatAssemblyBegin_MPIDense:Number of off-processor values %d\n",mdn->stash.n);
183   ierr = StashDestroy_Private(&mdn->stash); CHKERRQ(ierr);
184 
185   mdn->svalues    = svalues;    mdn->rvalues = rvalues;
186   mdn->nsends     = nsends;     mdn->nrecvs = nreceives;
187   mdn->send_waits = send_waits; mdn->recv_waits = recv_waits;
188   mdn->rmax       = nmax;
189 
190   PetscFunctionReturn(0);
191 }
192 extern int MatSetUpMultiply_MPIDense(Mat);
193 
194 #undef __FUNC__
195 #define __FUNC__ "MatAssemblyEnd_MPIDense"
196 int MatAssemblyEnd_MPIDense(Mat mat,MatAssemblyType mode)
197 {
198   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
199   MPI_Status   *send_status,recv_status;
200   int          imdex, nrecvs=mdn->nrecvs, count=nrecvs, i, n, ierr, row, col;
201   Scalar       *values,val;
202   InsertMode   addv = mat->insertmode;
203 
204   PetscFunctionBegin;
205   /*  wait on receives */
206   while (count) {
207     ierr = MPI_Waitany(nrecvs,mdn->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
208     /* unpack receives into our local space */
209     values = mdn->rvalues + 3*imdex*mdn->rmax;
210     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
211     n = n/3;
212     for ( i=0; i<n; i++ ) {
213       row = (int) PetscReal(values[3*i]) - mdn->rstart;
214       col = (int) PetscReal(values[3*i+1]);
215       val = values[3*i+2];
216       if (col >= 0 && col < mdn->N) {
217         MatSetValues(mdn->A,1,&row,1,&col,&val,addv);
218       }
219       else {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Invalid column");}
220     }
221     count--;
222   }
223   PetscFree(mdn->recv_waits); PetscFree(mdn->rvalues);
224 
225   /* wait on sends */
226   if (mdn->nsends) {
227     send_status = (MPI_Status *) PetscMalloc(mdn->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
228     ierr        = MPI_Waitall(mdn->nsends,mdn->send_waits,send_status);CHKERRQ(ierr);
229     PetscFree(send_status);
230   }
231   PetscFree(mdn->send_waits); PetscFree(mdn->svalues);
232 
233   ierr = MatAssemblyBegin(mdn->A,mode); CHKERRQ(ierr);
234   ierr = MatAssemblyEnd(mdn->A,mode); CHKERRQ(ierr);
235 
236   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
237     ierr = MatSetUpMultiply_MPIDense(mat); CHKERRQ(ierr);
238   }
239   PetscFunctionReturn(0);
240 }
241 
242 #undef __FUNC__
243 #define __FUNC__ "MatZeroEntries_MPIDense"
244 int MatZeroEntries_MPIDense(Mat A)
245 {
246   int          ierr;
247   Mat_MPIDense *l = (Mat_MPIDense *) A->data;
248 
249   PetscFunctionBegin;
250   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
251   PetscFunctionReturn(0);
252 }
253 
254 #undef __FUNC__
255 #define __FUNC__ "MatGetBlockSize_MPIDense"
256 int MatGetBlockSize_MPIDense(Mat A,int *bs)
257 {
258   PetscFunctionBegin;
259   *bs = 1;
260   PetscFunctionReturn(0);
261 }
262 
263 /* the code does not do the diagonal entries correctly unless the
264    matrix is square and the column and row owerships are identical.
265    This is a BUG. The only way to fix it seems to be to access
266    mdn->A and mdn->B directly and not through the MatZeroRows()
267    routine.
268 */
269 #undef __FUNC__
270 #define __FUNC__ "MatZeroRows_MPIDense"
271 int MatZeroRows_MPIDense(Mat A,IS is,Scalar *diag)
272 {
273   Mat_MPIDense   *l = (Mat_MPIDense *) A->data;
274   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
275   int            *procs,*nprocs,j,found,idx,nsends,*work;
276   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
277   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
278   int            *lens,imdex,*lrows,*values;
279   MPI_Comm       comm = A->comm;
280   MPI_Request    *send_waits,*recv_waits;
281   MPI_Status     recv_status,*send_status;
282   IS             istmp;
283 
284   PetscFunctionBegin;
285   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
286   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
287 
288   /*  first count number of contributors to each processor */
289   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
290   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
291   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
292   for ( i=0; i<N; i++ ) {
293     idx = rows[i];
294     found = 0;
295     for ( j=0; j<size; j++ ) {
296       if (idx >= owners[j] && idx < owners[j+1]) {
297         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
298       }
299     }
300     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
301   }
302   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
303 
304   /* inform other processors of number of messages and max length*/
305   work   = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
306   ierr   = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
307   nrecvs = work[rank];
308   ierr   = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
309   nmax   = work[rank];
310   PetscFree(work);
311 
312   /* post receives:   */
313   rvalues    = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
314   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
315   for ( i=0; i<nrecvs; i++ ) {
316     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
317   }
318 
319   /* do sends:
320       1) starts[i] gives the starting index in svalues for stuff going to
321          the ith processor
322   */
323   svalues    = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
324   send_waits = (MPI_Request *) PetscMalloc((nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
325   starts     = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
326   starts[0]  = 0;
327   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
328   for ( i=0; i<N; i++ ) {
329     svalues[starts[owner[i]]++] = rows[i];
330   }
331   ISRestoreIndices(is,&rows);
332 
333   starts[0] = 0;
334   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
335   count = 0;
336   for ( i=0; i<size; i++ ) {
337     if (procs[i]) {
338       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
339     }
340   }
341   PetscFree(starts);
342 
343   base = owners[rank];
344 
345   /*  wait on receives */
346   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
347   source = lens + nrecvs;
348   count  = nrecvs; slen = 0;
349   while (count) {
350     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
351     /* unpack receives into our local space */
352     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
353     source[imdex]  = recv_status.MPI_SOURCE;
354     lens[imdex]  = n;
355     slen += n;
356     count--;
357   }
358   PetscFree(recv_waits);
359 
360   /* move the data into the send scatter */
361   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
362   count = 0;
363   for ( i=0; i<nrecvs; i++ ) {
364     values = rvalues + i*nmax;
365     for ( j=0; j<lens[i]; j++ ) {
366       lrows[count++] = values[j] - base;
367     }
368   }
369   PetscFree(rvalues); PetscFree(lens);
370   PetscFree(owner); PetscFree(nprocs);
371 
372   /* actually zap the local rows */
373   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
374   PLogObjectParent(A,istmp);
375   PetscFree(lrows);
376   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
377   ierr = ISDestroy(istmp); CHKERRQ(ierr);
378 
379   /* wait on sends */
380   if (nsends) {
381     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
382     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
383     PetscFree(send_status);
384   }
385   PetscFree(send_waits); PetscFree(svalues);
386 
387   PetscFunctionReturn(0);
388 }
389 
390 #undef __FUNC__
391 #define __FUNC__ "MatMult_MPIDense"
392 int MatMult_MPIDense(Mat mat,Vec xx,Vec yy)
393 {
394   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
395   int          ierr;
396 
397   PetscFunctionBegin;
398   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
399   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
400   ierr = MatMult_SeqDense(mdn->A,mdn->lvec,yy); CHKERRQ(ierr);
401   PetscFunctionReturn(0);
402 }
403 
404 #undef __FUNC__
405 #define __FUNC__ "MatMultAdd_MPIDense"
406 int MatMultAdd_MPIDense(Mat mat,Vec xx,Vec yy,Vec zz)
407 {
408   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
409   int          ierr;
410 
411   PetscFunctionBegin;
412   ierr = VecScatterBegin(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
413   ierr = VecScatterEnd(xx,mdn->lvec,INSERT_VALUES,SCATTER_FORWARD,mdn->Mvctx);CHKERRQ(ierr);
414   ierr = MatMultAdd_SeqDense(mdn->A,mdn->lvec,yy,zz); CHKERRQ(ierr);
415   PetscFunctionReturn(0);
416 }
417 
418 #undef __FUNC__
419 #define __FUNC__ "MatMultTrans_MPIDense"
420 int MatMultTrans_MPIDense(Mat A,Vec xx,Vec yy)
421 {
422   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
423   int          ierr;
424   Scalar       zero = 0.0;
425 
426   PetscFunctionBegin;
427   ierr = VecSet(&zero,yy); CHKERRQ(ierr);
428   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
429   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
430   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
431   PetscFunctionReturn(0);
432 }
433 
434 #undef __FUNC__
435 #define __FUNC__ "MatMultTransAdd_MPIDense"
436 int MatMultTransAdd_MPIDense(Mat A,Vec xx,Vec yy,Vec zz)
437 {
438   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
439   int          ierr;
440 
441   PetscFunctionBegin;
442   ierr = VecCopy(yy,zz); CHKERRQ(ierr);
443   ierr = MatMultTrans_SeqDense(a->A,xx,a->lvec); CHKERRQ(ierr);
444   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
445   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
446   PetscFunctionReturn(0);
447 }
448 
449 #undef __FUNC__
450 #define __FUNC__ "MatGetDiagonal_MPIDense"
451 int MatGetDiagonal_MPIDense(Mat A,Vec v)
452 {
453   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
454   Mat_SeqDense *aloc = (Mat_SeqDense *) a->A->data;
455   int          ierr, len, i, n, m = a->m, radd;
456   Scalar       *x, zero = 0.0;
457 
458   PetscFunctionBegin;
459   VecSet(&zero,v);
460   ierr = VecGetArray(v,&x); CHKERRQ(ierr);
461   ierr = VecGetSize(v,&n); CHKERRQ(ierr);
462   if (n != a->M) SETERRQ(PETSC_ERR_ARG_SIZ,0,"Nonconforming mat and vec");
463   len = PetscMin(aloc->m,aloc->n);
464   radd = a->rstart*m;
465   for ( i=0; i<len; i++ ) {
466     x[i] = aloc->v[radd + i*m + i];
467   }
468   PetscFunctionReturn(0);
469 }
470 
471 #undef __FUNC__
472 #define __FUNC__ "MatDestroy_MPIDense"
473 int MatDestroy_MPIDense(Mat mat)
474 {
475   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
476   int          ierr;
477 
478   PetscFunctionBegin;
479   if (--mat->refct > 0) PetscFunctionReturn(0);
480 
481   if (mat->mapping) {
482     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
483   }
484   if (mat->bmapping) {
485     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
486   }
487 #if defined(USE_PETSC_LOG)
488   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",mdn->M,mdn->N);
489 #endif
490   PetscFree(mdn->rowners);
491   ierr = MatDestroy(mdn->A); CHKERRQ(ierr);
492   if (mdn->lvec)   VecDestroy(mdn->lvec);
493   if (mdn->Mvctx)  VecScatterDestroy(mdn->Mvctx);
494   if (mdn->factor) {
495     if (mdn->factor->temp)   PetscFree(mdn->factor->temp);
496     if (mdn->factor->tag)    PetscFree(mdn->factor->tag);
497     if (mdn->factor->pivots) PetscFree(mdn->factor->pivots);
498     PetscFree(mdn->factor);
499   }
500   PetscFree(mdn);
501   if (mat->rmap) {
502     ierr = MapDestroy(mat->rmap);CHKERRQ(ierr);
503   }
504   if (mat->cmap) {
505     ierr = MapDestroy(mat->cmap);CHKERRQ(ierr);
506   }
507   PLogObjectDestroy(mat);
508   PetscHeaderDestroy(mat);
509   PetscFunctionReturn(0);
510 }
511 
512 #include "pinclude/pviewer.h"
513 
514 #undef __FUNC__
515 #define __FUNC__ "MatView_MPIDense_Binary"
516 static int MatView_MPIDense_Binary(Mat mat,Viewer viewer)
517 {
518   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
519   int          ierr;
520 
521   PetscFunctionBegin;
522   if (mdn->size == 1) {
523     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
524   }
525   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
526   PetscFunctionReturn(0);
527 }
528 
529 #undef __FUNC__
530 #define __FUNC__ "MatView_MPIDense_ASCII"
531 static int MatView_MPIDense_ASCII(Mat mat,Viewer viewer)
532 {
533   Mat_MPIDense *mdn = (Mat_MPIDense *) mat->data;
534   int          ierr, format;
535   FILE         *fd;
536   ViewerType   vtype;
537 
538   PetscFunctionBegin;
539   ierr = ViewerGetType(viewer,&vtype);CHKERRQ(ierr);
540   ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
541   ierr = ViewerGetFormat(viewer,&format);
542   if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
543     int rank;
544     MatInfo info;
545     MPI_Comm_rank(mat->comm,&rank);
546     ierr = MatGetInfo(mat,MAT_LOCAL,&info);
547     PetscSequentialPhaseBegin(mat->comm,1);
548       fprintf(fd,"  [%d] local rows %d nz %d nz alloced %d mem %d \n",rank,mdn->m,
549          (int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
550       fflush(fd);
551     PetscSequentialPhaseEnd(mat->comm,1);
552     ierr = VecScatterView(mdn->Mvctx,viewer); CHKERRQ(ierr);
553     PetscFunctionReturn(0);
554   }
555   else if (format == VIEWER_FORMAT_ASCII_INFO) {
556     PetscFunctionReturn(0);
557   }
558   if (vtype == ASCII_FILE_VIEWER) {
559     PetscSequentialPhaseBegin(mat->comm,1);
560     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d\n",
561              mdn->rank,mdn->m,mdn->rstart,mdn->rend,mdn->n);
562     ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
563     fflush(fd);
564     PetscSequentialPhaseEnd(mat->comm,1);
565   } else {
566     int size = mdn->size, rank = mdn->rank;
567     if (size == 1) {
568       ierr = MatView(mdn->A,viewer); CHKERRQ(ierr);
569     } else {
570       /* assemble the entire matrix onto first processor. */
571       Mat          A;
572       int          M = mdn->M, N = mdn->N,m,row,i, nz, *cols;
573       Scalar       *vals;
574       Mat_SeqDense *Amdn = (Mat_SeqDense*) mdn->A->data;
575 
576       if (!rank) {
577         ierr = MatCreateMPIDense(mat->comm,M,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
578       } else {
579         ierr = MatCreateMPIDense(mat->comm,0,N,M,N,PETSC_NULL,&A); CHKERRQ(ierr);
580       }
581       PLogObjectParent(mat,A);
582 
583       /* Copy the matrix ... This isn't the most efficient means,
584          but it's quick for now */
585       row = mdn->rstart; m = Amdn->m;
586       for ( i=0; i<m; i++ ) {
587         ierr = MatGetRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
588         ierr = MatSetValues(A,1,&row,nz,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
589         ierr = MatRestoreRow(mat,row,&nz,&cols,&vals); CHKERRQ(ierr);
590         row++;
591       }
592 
593       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
594       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
595       if (!rank) {
596         ierr = MatView(((Mat_MPIDense*)(A->data))->A,viewer); CHKERRQ(ierr);
597       }
598       ierr = MatDestroy(A); CHKERRQ(ierr);
599     }
600   }
601   PetscFunctionReturn(0);
602 }
603 
604 #undef __FUNC__
605 #define __FUNC__ "MatView_MPIDense"
606 int MatView_MPIDense(Mat mat,Viewer viewer)
607 {
608   int          ierr;
609   ViewerType   vtype;
610 
611   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
612   if (vtype == ASCII_FILE_VIEWER) {
613     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
614   } else if (vtype == ASCII_FILES_VIEWER) {
615     ierr = MatView_MPIDense_ASCII(mat,viewer); CHKERRQ(ierr);
616   } else if (vtype == BINARY_FILE_VIEWER) {
617     ierr = MatView_MPIDense_Binary(mat,viewer);CHKERRQ(ierr);
618   } else {
619     SETERRQ(1,1,"Viewer type not supported by PETSc object");
620   }
621   PetscFunctionReturn(0);
622 }
623 
624 #undef __FUNC__
625 #define __FUNC__ "MatGetInfo_MPIDense"
626 int MatGetInfo_MPIDense(Mat A,MatInfoType flag,MatInfo *info)
627 {
628   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
629   Mat          mdn = mat->A;
630   int          ierr;
631   double       isend[5], irecv[5];
632 
633   PetscFunctionBegin;
634   info->rows_global    = (double)mat->M;
635   info->columns_global = (double)mat->N;
636   info->rows_local     = (double)mat->m;
637   info->columns_local  = (double)mat->N;
638   info->block_size     = 1.0;
639   ierr = MatGetInfo(mdn,MAT_LOCAL,info); CHKERRQ(ierr);
640   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
641   isend[3] = info->memory;  isend[4] = info->mallocs;
642   if (flag == MAT_LOCAL) {
643     info->nz_used      = isend[0];
644     info->nz_allocated = isend[1];
645     info->nz_unneeded  = isend[2];
646     info->memory       = isend[3];
647     info->mallocs      = isend[4];
648   } else if (flag == MAT_GLOBAL_MAX) {
649     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
650     info->nz_used      = irecv[0];
651     info->nz_allocated = irecv[1];
652     info->nz_unneeded  = irecv[2];
653     info->memory       = irecv[3];
654     info->mallocs      = irecv[4];
655   } else if (flag == MAT_GLOBAL_SUM) {
656     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
657     info->nz_used      = irecv[0];
658     info->nz_allocated = irecv[1];
659     info->nz_unneeded  = irecv[2];
660     info->memory       = irecv[3];
661     info->mallocs      = irecv[4];
662   }
663   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
664   info->fill_ratio_needed = 0;
665   info->factor_mallocs    = 0;
666   PetscFunctionReturn(0);
667 }
668 
669 /* extern int MatLUFactorSymbolic_MPIDense(Mat,IS,IS,double,Mat*);
670    extern int MatLUFactorNumeric_MPIDense(Mat,Mat*);
671    extern int MatLUFactor_MPIDense(Mat,IS,IS,double);
672    extern int MatSolve_MPIDense(Mat,Vec,Vec);
673    extern int MatSolveAdd_MPIDense(Mat,Vec,Vec,Vec);
674    extern int MatSolveTrans_MPIDense(Mat,Vec,Vec);
675    extern int MatSolveTransAdd_MPIDense(Mat,Vec,Vec,Vec); */
676 
677 #undef __FUNC__
678 #define __FUNC__ "MatSetOption_MPIDense"
679 int MatSetOption_MPIDense(Mat A,MatOption op)
680 {
681   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
682 
683   PetscFunctionBegin;
684   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
685       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
686       op == MAT_NEW_NONZERO_LOCATION_ERROR ||
687       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
688       op == MAT_COLUMNS_SORTED ||
689       op == MAT_COLUMNS_UNSORTED) {
690         MatSetOption(a->A,op);
691   } else if (op == MAT_ROW_ORIENTED) {
692         a->roworiented = 1;
693         MatSetOption(a->A,op);
694   } else if (op == MAT_ROWS_SORTED ||
695              op == MAT_ROWS_UNSORTED ||
696              op == MAT_SYMMETRIC ||
697              op == MAT_STRUCTURALLY_SYMMETRIC ||
698              op == MAT_YES_NEW_DIAGONALS ||
699              op == MAT_USE_HASH_TABLE) {
700     PLogInfo(A,"MatSetOption_MPIDense:Option ignored\n");
701   } else if (op == MAT_COLUMN_ORIENTED) {
702     a->roworiented = 0; MatSetOption(a->A,op);
703   } else if (op == MAT_NO_NEW_DIAGONALS) {
704     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
705   } else {
706     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
707   }
708   PetscFunctionReturn(0);
709 }
710 
711 #undef __FUNC__
712 #define __FUNC__ "MatGetSize_MPIDense"
713 int MatGetSize_MPIDense(Mat A,int *m,int *n)
714 {
715   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
716 
717   PetscFunctionBegin;
718   *m = mat->M; *n = mat->N;
719   PetscFunctionReturn(0);
720 }
721 
722 #undef __FUNC__
723 #define __FUNC__ "MatGetLocalSize_MPIDense"
724 int MatGetLocalSize_MPIDense(Mat A,int *m,int *n)
725 {
726   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
727 
728   PetscFunctionBegin;
729   *m = mat->m; *n = mat->N;
730   PetscFunctionReturn(0);
731 }
732 
733 #undef __FUNC__
734 #define __FUNC__ "MatGetOwnershipRange_MPIDense"
735 int MatGetOwnershipRange_MPIDense(Mat A,int *m,int *n)
736 {
737   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
738 
739   PetscFunctionBegin;
740   *m = mat->rstart; *n = mat->rend;
741   PetscFunctionReturn(0);
742 }
743 
744 #undef __FUNC__
745 #define __FUNC__ "MatGetRow_MPIDense"
746 int MatGetRow_MPIDense(Mat A,int row,int *nz,int **idx,Scalar **v)
747 {
748   Mat_MPIDense *mat = (Mat_MPIDense *) A->data;
749   int          lrow, rstart = mat->rstart, rend = mat->rend,ierr;
750 
751   PetscFunctionBegin;
752   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_SUP,0,"only local rows")
753   lrow = row - rstart;
754   ierr = MatGetRow(mat->A,lrow,nz,idx,v);CHKERRQ(ierr);
755   PetscFunctionReturn(0);
756 }
757 
758 #undef __FUNC__
759 #define __FUNC__ "MatRestoreRow_MPIDense"
760 int MatRestoreRow_MPIDense(Mat mat,int row,int *nz,int **idx,Scalar **v)
761 {
762   PetscFunctionBegin;
763   if (idx) PetscFree(*idx);
764   if (v) PetscFree(*v);
765   PetscFunctionReturn(0);
766 }
767 
768 #undef __FUNC__
769 #define __FUNC__ "MatNorm_MPIDense"
770 int MatNorm_MPIDense(Mat A,NormType type,double *norm)
771 {
772   Mat_MPIDense *mdn = (Mat_MPIDense *) A->data;
773   Mat_SeqDense *mat = (Mat_SeqDense*) mdn->A->data;
774   int          ierr, i, j;
775   double       sum = 0.0;
776   Scalar       *v = mat->v;
777 
778   PetscFunctionBegin;
779   if (mdn->size == 1) {
780     ierr =  MatNorm(mdn->A,type,norm); CHKERRQ(ierr);
781   } else {
782     if (type == NORM_FROBENIUS) {
783       for (i=0; i<mat->n*mat->m; i++ ) {
784 #if defined(USE_PETSC_COMPLEX)
785         sum += PetscReal(PetscConj(*v)*(*v)); v++;
786 #else
787         sum += (*v)*(*v); v++;
788 #endif
789       }
790       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
791       *norm = sqrt(*norm);
792       PLogFlops(2*mat->n*mat->m);
793     } else if (type == NORM_1) {
794       double *tmp, *tmp2;
795       tmp  = (double *) PetscMalloc( 2*mdn->N*sizeof(double) ); CHKPTRQ(tmp);
796       tmp2 = tmp + mdn->N;
797       PetscMemzero(tmp,2*mdn->N*sizeof(double));
798       *norm = 0.0;
799       v = mat->v;
800       for ( j=0; j<mat->n; j++ ) {
801         for ( i=0; i<mat->m; i++ ) {
802           tmp[j] += PetscAbsScalar(*v);  v++;
803         }
804       }
805       ierr = MPI_Allreduce(tmp,tmp2,mdn->N,MPI_DOUBLE,MPI_SUM,A->comm);CHKERRQ(ierr);
806       for ( j=0; j<mdn->N; j++ ) {
807         if (tmp2[j] > *norm) *norm = tmp2[j];
808       }
809       PetscFree(tmp);
810       PLogFlops(mat->n*mat->m);
811     } else if (type == NORM_INFINITY) { /* max row norm */
812       double ntemp;
813       ierr = MatNorm(mdn->A,type,&ntemp); CHKERRQ(ierr);
814       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,A->comm);CHKERRQ(ierr);
815     } else {
816       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
817     }
818   }
819   PetscFunctionReturn(0);
820 }
821 
822 #undef __FUNC__
823 #define __FUNC__ "MatTranspose_MPIDense"
824 int MatTranspose_MPIDense(Mat A,Mat *matout)
825 {
826   Mat_MPIDense *a = (Mat_MPIDense *) A->data;
827   Mat_SeqDense *Aloc = (Mat_SeqDense *) a->A->data;
828   Mat          B;
829   int          M = a->M, N = a->N, m, n, *rwork, rstart = a->rstart;
830   int          j, i, ierr;
831   Scalar       *v;
832 
833   PetscFunctionBegin;
834   if (matout == PETSC_NULL && M != N) {
835     SETERRQ(PETSC_ERR_SUP,0,"Supports square matrix only in-place");
836   }
837   ierr = MatCreateMPIDense(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,PETSC_NULL,&B);CHKERRQ(ierr);
838 
839   m = Aloc->m; n = Aloc->n; v = Aloc->v;
840   rwork = (int *) PetscMalloc(n*sizeof(int)); CHKPTRQ(rwork);
841   for ( j=0; j<n; j++ ) {
842     for (i=0; i<m; i++) rwork[i] = rstart + i;
843     ierr = MatSetValues(B,1,&j,m,rwork,v,INSERT_VALUES); CHKERRQ(ierr);
844     v   += m;
845   }
846   PetscFree(rwork);
847   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
848   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
849   if (matout != PETSC_NULL) {
850     *matout = B;
851   } else {
852     PetscOps *Abops;
853     MatOps   Aops;
854 
855     /* This isn't really an in-place transpose, but free data struct from a */
856     PetscFree(a->rowners);
857     ierr = MatDestroy(a->A); CHKERRQ(ierr);
858     if (a->lvec) VecDestroy(a->lvec);
859     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
860     PetscFree(a);
861 
862     /*
863          This is horrible, horrible code. We need to keep the
864       A pointers for the bops and ops but copy everything
865       else from C.
866     */
867     Abops = A->bops;
868     Aops  = A->ops;
869     PetscMemcpy(A,B,sizeof(struct _p_Mat));
870     A->bops = Abops;
871     A->ops  = Aops;
872 
873     PetscHeaderDestroy(B);
874   }
875   PetscFunctionReturn(0);
876 }
877 
878 #include "pinclude/blaslapack.h"
879 #undef __FUNC__
880 #define __FUNC__ "MatScale_MPIDense"
881 int MatScale_MPIDense(Scalar *alpha,Mat inA)
882 {
883   Mat_MPIDense *A = (Mat_MPIDense *) inA->data;
884   Mat_SeqDense *a = (Mat_SeqDense *) A->A->data;
885   int          one = 1, nz;
886 
887   PetscFunctionBegin;
888   nz = a->m*a->n;
889   BLscal_( &nz, alpha, a->v, &one );
890   PLogFlops(nz);
891   PetscFunctionReturn(0);
892 }
893 
894 static int MatConvertSameType_MPIDense(Mat,Mat *,int);
895 
896 /* -------------------------------------------------------------------*/
897 static struct _MatOps MatOps_Values = {MatSetValues_MPIDense,
898        MatGetRow_MPIDense,
899        MatRestoreRow_MPIDense,
900        MatMult_MPIDense,
901        MatMultAdd_MPIDense,
902        MatMultTrans_MPIDense,
903        MatMultTransAdd_MPIDense,
904        0,
905        0,
906        0,
907        0,
908        0,
909        0,
910        0,
911        MatTranspose_MPIDense,
912        MatGetInfo_MPIDense,0,
913        MatGetDiagonal_MPIDense,
914        0,
915        MatNorm_MPIDense,
916        MatAssemblyBegin_MPIDense,
917        MatAssemblyEnd_MPIDense,
918        0,
919        MatSetOption_MPIDense,
920        MatZeroEntries_MPIDense,
921        MatZeroRows_MPIDense,
922        0,
923        0,
924        0,
925        0,
926        MatGetSize_MPIDense,
927        MatGetLocalSize_MPIDense,
928        MatGetOwnershipRange_MPIDense,
929        0,
930        0,
931        MatGetArray_MPIDense,
932        MatRestoreArray_MPIDense,
933        MatConvertSameType_MPIDense,
934        0,
935        0,
936        0,
937        0,
938        0,
939        0,
940        0,
941        MatGetValues_MPIDense,
942        0,
943        0,
944        MatScale_MPIDense,
945        0,
946        0,
947        0,
948        MatGetBlockSize_MPIDense,
949        0,
950        0,
951        0,
952        0,
953        0,
954        0,
955        0,
956        0,
957        0,
958        0,
959        0,
960        0,
961        MatGetMaps_Petsc};
962 
963 #undef __FUNC__
964 #define __FUNC__ "MatCreateMPIDense"
965 /*@C
966    MatCreateMPIDense - Creates a sparse parallel matrix in dense format.
967 
968    Collective on MPI_Comm
969 
970    Input Parameters:
971 +  comm - MPI communicator
972 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
973 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
974 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
975 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
976 -  data - optional location of matrix data.  Set data=PETSC_NULL for PETSc
977    to control all matrix memory allocation.
978 
979    Output Parameter:
980 .  A - the matrix
981 
982    Notes:
983    The dense format is fully compatible with standard Fortran 77
984    storage by columns.
985 
986    The data input variable is intended primarily for Fortran programmers
987    who wish to allocate their own matrix memory space.  Most users should
988    set data=PETSC_NULL.
989 
990    The user MUST specify either the local or global matrix dimensions
991    (possibly both).
992 
993    Currently, the only parallel dense matrix decomposition is by rows,
994    so that n=N and each submatrix owns all of the global columns.
995 
996 .keywords: matrix, dense, parallel
997 
998 .seealso: MatCreate(), MatCreateSeqDense(), MatSetValues()
999 @*/
1000 int MatCreateMPIDense(MPI_Comm comm,int m,int n,int M,int N,Scalar *data,Mat *A)
1001 {
1002   Mat          mat;
1003   Mat_MPIDense *a;
1004   int          ierr, i,flg;
1005 
1006   PetscFunctionBegin;
1007   /* Note:  For now, when data is specified above, this assumes the user correctly
1008    allocates the local dense storage space.  We should add error checking. */
1009 
1010   *A = 0;
1011   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,comm,MatDestroy,MatView);
1012   PLogObjectCreate(mat);
1013   mat->data       = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
1014   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1015   mat->ops->destroy    = MatDestroy_MPIDense;
1016   mat->ops->view       = MatView_MPIDense;
1017   mat->factor          = 0;
1018   mat->mapping         = 0;
1019 
1020   a->factor       = 0;
1021   mat->insertmode = NOT_SET_VALUES;
1022   MPI_Comm_rank(comm,&a->rank);
1023   MPI_Comm_size(comm,&a->size);
1024 
1025   if (M == PETSC_DECIDE) {ierr = MPI_Allreduce(&m,&M,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);}
1026   if (m == PETSC_DECIDE) {m = M/a->size + ((M % a->size) > a->rank);}
1027 
1028   /*
1029      The computation of n is wrong below, n should represent the number of local
1030      rows in the right (column vector)
1031   */
1032 
1033   /* each row stores all columns */
1034   if (N == PETSC_DECIDE) N = n;
1035   if (n == PETSC_DECIDE) {n = N/a->size + ((N % a->size) > a->rank);}
1036   /*  if (n != N) SETERRQ(PETSC_ERR_SUP,0,"For now, only n=N is supported"); */
1037   a->N = mat->N = N;
1038   a->M = mat->M = M;
1039   a->m = mat->m = m;
1040   a->n = mat->n = n;
1041 
1042   /* the information in the maps duplicates the information computed below, eventually
1043      we should remove the duplicate information that is not contained in the maps */
1044   ierr = MapCreateMPI(comm,m,M,&mat->rmap);CHKERRQ(ierr);
1045   ierr = MapCreateMPI(comm,n,N,&mat->cmap);CHKERRQ(ierr);
1046 
1047   /* build local table of row and column ownerships */
1048   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1049   a->cowners = a->rowners + a->size + 1;
1050   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1051   ierr = MPI_Allgather(&m,1,MPI_INT,a->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1052   a->rowners[0] = 0;
1053   for ( i=2; i<=a->size; i++ ) {
1054     a->rowners[i] += a->rowners[i-1];
1055   }
1056   a->rstart = a->rowners[a->rank];
1057   a->rend   = a->rowners[a->rank+1];
1058   ierr      = MPI_Allgather(&n,1,MPI_INT,a->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1059   a->cowners[0] = 0;
1060   for ( i=2; i<=a->size; i++ ) {
1061     a->cowners[i] += a->cowners[i-1];
1062   }
1063 
1064   ierr = MatCreateSeqDense(PETSC_COMM_SELF,m,N,data,&a->A); CHKERRQ(ierr);
1065   PLogObjectParent(mat,a->A);
1066 
1067   /* build cache for off array entries formed */
1068   ierr = StashBuild_Private(&a->stash); CHKERRQ(ierr);
1069 
1070   /* stuff used for matrix vector multiply */
1071   a->lvec        = 0;
1072   a->Mvctx       = 0;
1073   a->roworiented = 1;
1074 
1075   *A = mat;
1076   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1077   if (flg) {
1078     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1079   }
1080   PetscFunctionReturn(0);
1081 }
1082 
1083 #undef __FUNC__
1084 #define __FUNC__ "MatConvertSameType_MPIDense"
1085 static int MatConvertSameType_MPIDense(Mat A,Mat *newmat,int cpvalues)
1086 {
1087   Mat          mat;
1088   Mat_MPIDense *a,*oldmat = (Mat_MPIDense *) A->data;
1089   int          ierr;
1090   FactorCtx    *factor;
1091 
1092   PetscFunctionBegin;
1093   *newmat       = 0;
1094   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIDENSE,A->comm,MatDestroy,MatView);
1095   PLogObjectCreate(mat);
1096   mat->data      = (void *) (a = PetscNew(Mat_MPIDense)); CHKPTRQ(a);
1097   PetscMemcpy(mat->ops,&MatOps_Values,sizeof(struct _MatOps));
1098   mat->ops->destroy   = MatDestroy_MPIDense;
1099   mat->ops->view      = MatView_MPIDense;
1100   mat->factor         = A->factor;
1101   mat->assembled      = PETSC_TRUE;
1102 
1103   a->m = mat->m = oldmat->m;
1104   a->n = mat->n = oldmat->n;
1105   a->M = mat->M = oldmat->M;
1106   a->N = mat->N = oldmat->N;
1107   if (oldmat->factor) {
1108     a->factor = (FactorCtx *) (factor = PetscNew(FactorCtx)); CHKPTRQ(factor);
1109     /* copy factor contents ... add this code! */
1110   } else a->factor = 0;
1111 
1112   a->rstart       = oldmat->rstart;
1113   a->rend         = oldmat->rend;
1114   a->size         = oldmat->size;
1115   a->rank         = oldmat->rank;
1116   mat->insertmode = NOT_SET_VALUES;
1117 
1118   a->rowners = (int *) PetscMalloc((a->size+1)*sizeof(int)); CHKPTRQ(a->rowners);
1119   PLogObjectMemory(mat,(a->size+1)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIDense));
1120   PetscMemcpy(a->rowners,oldmat->rowners,(a->size+1)*sizeof(int));
1121   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1122 
1123   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1124   PLogObjectParent(mat,a->lvec);
1125   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1126   PLogObjectParent(mat,a->Mvctx);
1127   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1128   PLogObjectParent(mat,a->A);
1129   *newmat = mat;
1130   PetscFunctionReturn(0);
1131 }
1132 
1133 #include "sys.h"
1134 
1135 #undef __FUNC__
1136 #define __FUNC__ "MatLoad_MPIDense_DenseInFile"
1137 int MatLoad_MPIDense_DenseInFile(MPI_Comm comm,int fd,int M, int N, Mat *newmat)
1138 {
1139   int        *rowners, i,size,rank,m,ierr,nz,j;
1140   Scalar     *array,*vals,*vals_ptr;
1141   MPI_Status status;
1142 
1143   PetscFunctionBegin;
1144   MPI_Comm_rank(comm,&rank);
1145   MPI_Comm_size(comm,&size);
1146 
1147   /* determine ownership of all rows */
1148   m          = M/size + ((M % size) > rank);
1149   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1150   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1151   rowners[0] = 0;
1152   for ( i=2; i<=size; i++ ) {
1153     rowners[i] += rowners[i-1];
1154   }
1155 
1156   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1157   ierr = MatGetArray(*newmat,&array); CHKERRQ(ierr);
1158 
1159   if (!rank) {
1160     vals = (Scalar *) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1161 
1162     /* read in my part of the matrix numerical values  */
1163     ierr = PetscBinaryRead(fd,vals,m*N,PETSC_SCALAR); CHKERRQ(ierr);
1164 
1165     /* insert into matrix-by row (this is why cannot directly read into array */
1166     vals_ptr = vals;
1167     for ( i=0; i<m; i++ ) {
1168       for ( j=0; j<N; j++ ) {
1169         array[i + j*m] = *vals_ptr++;
1170       }
1171     }
1172 
1173     /* read in other processors and ship out */
1174     for ( i=1; i<size; i++ ) {
1175       nz   = (rowners[i+1] - rowners[i])*N;
1176       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1177       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,(*newmat)->tag,comm);CHKERRQ(ierr);
1178     }
1179   } else {
1180     /* receive numeric values */
1181     vals = (Scalar*) PetscMalloc( m*N*sizeof(Scalar) ); CHKPTRQ(vals);
1182 
1183     /* receive message of values*/
1184     ierr = MPI_Recv(vals,m*N,MPIU_SCALAR,0,(*newmat)->tag,comm,&status);CHKERRQ(ierr);
1185 
1186     /* insert into matrix-by row (this is why cannot directly read into array */
1187     vals_ptr = vals;
1188     for ( i=0; i<m; i++ ) {
1189       for ( j=0; j<N; j++ ) {
1190         array[i + j*m] = *vals_ptr++;
1191       }
1192     }
1193   }
1194   PetscFree(rowners);
1195   PetscFree(vals);
1196   ierr = MatAssemblyBegin(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1197   ierr = MatAssemblyEnd(*newmat,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1198   PetscFunctionReturn(0);
1199 }
1200 
1201 
1202 #undef __FUNC__
1203 #define __FUNC__ "MatLoad_MPIDense"
1204 int MatLoad_MPIDense(Viewer viewer,MatType type,Mat *newmat)
1205 {
1206   Mat          A;
1207   Scalar       *vals,*svals;
1208   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1209   MPI_Status   status;
1210   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1211   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1212   int          tag = ((PetscObject)viewer)->tag;
1213   int          i, nz, ierr, j,rstart, rend, fd;
1214 
1215   PetscFunctionBegin;
1216   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1217   if (!rank) {
1218     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1219     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1220     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1221   }
1222 
1223   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1224   M = header[1]; N = header[2]; nz = header[3];
1225 
1226   /*
1227        Handle case where matrix is stored on disk as a dense matrix
1228   */
1229   if (nz == MATRIX_BINARY_FORMAT_DENSE) {
1230     ierr = MatLoad_MPIDense_DenseInFile(comm,fd,M,N,newmat);CHKERRQ(ierr);
1231     PetscFunctionReturn(0);
1232   }
1233 
1234   /* determine ownership of all rows */
1235   m          = M/size + ((M % size) > rank);
1236   rowners    = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1237   ierr       = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1238   rowners[0] = 0;
1239   for ( i=2; i<=size; i++ ) {
1240     rowners[i] += rowners[i-1];
1241   }
1242   rstart = rowners[rank];
1243   rend   = rowners[rank+1];
1244 
1245   /* distribute row lengths to all processors */
1246   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1247   offlens = ourlens + (rend-rstart);
1248   if (!rank) {
1249     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1250     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1251     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1252     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1253     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1254     PetscFree(sndcounts);
1255   } else {
1256     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1257   }
1258 
1259   if (!rank) {
1260     /* calculate the number of nonzeros on each processor */
1261     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1262     PetscMemzero(procsnz,size*sizeof(int));
1263     for ( i=0; i<size; i++ ) {
1264       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1265         procsnz[i] += rowlengths[j];
1266       }
1267     }
1268     PetscFree(rowlengths);
1269 
1270     /* determine max buffer needed and allocate it */
1271     maxnz = 0;
1272     for ( i=0; i<size; i++ ) {
1273       maxnz = PetscMax(maxnz,procsnz[i]);
1274     }
1275     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1276 
1277     /* read in my part of the matrix column indices  */
1278     nz = procsnz[0];
1279     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1280     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1281 
1282     /* read in every one elses and ship off */
1283     for ( i=1; i<size; i++ ) {
1284       nz   = procsnz[i];
1285       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1286       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1287     }
1288     PetscFree(cols);
1289   } else {
1290     /* determine buffer space needed for message */
1291     nz = 0;
1292     for ( i=0; i<m; i++ ) {
1293       nz += ourlens[i];
1294     }
1295     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1296 
1297     /* receive message of column indices*/
1298     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1299     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1300     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1301   }
1302 
1303   /* loop over local rows, determining number of off diagonal entries */
1304   PetscMemzero(offlens,m*sizeof(int));
1305   jj = 0;
1306   for ( i=0; i<m; i++ ) {
1307     for ( j=0; j<ourlens[i]; j++ ) {
1308       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1309       jj++;
1310     }
1311   }
1312 
1313   /* create our matrix */
1314   for ( i=0; i<m; i++ ) {
1315     ourlens[i] -= offlens[i];
1316   }
1317   ierr = MatCreateMPIDense(comm,m,PETSC_DECIDE,M,N,PETSC_NULL,newmat);CHKERRQ(ierr);
1318   A = *newmat;
1319   for ( i=0; i<m; i++ ) {
1320     ourlens[i] += offlens[i];
1321   }
1322 
1323   if (!rank) {
1324     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1325 
1326     /* read in my part of the matrix numerical values  */
1327     nz = procsnz[0];
1328     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1329 
1330     /* insert into matrix */
1331     jj      = rstart;
1332     smycols = mycols;
1333     svals   = vals;
1334     for ( i=0; i<m; i++ ) {
1335       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1336       smycols += ourlens[i];
1337       svals   += ourlens[i];
1338       jj++;
1339     }
1340 
1341     /* read in other processors and ship out */
1342     for ( i=1; i<size; i++ ) {
1343       nz   = procsnz[i];
1344       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1345       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1346     }
1347     PetscFree(procsnz);
1348   } else {
1349     /* receive numeric values */
1350     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1351 
1352     /* receive message of values*/
1353     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1354     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1355     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1356 
1357     /* insert into matrix */
1358     jj      = rstart;
1359     smycols = mycols;
1360     svals   = vals;
1361     for ( i=0; i<m; i++ ) {
1362       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1363       smycols += ourlens[i];
1364       svals   += ourlens[i];
1365       jj++;
1366     }
1367   }
1368   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1369 
1370   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1371   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1372   PetscFunctionReturn(0);
1373 }
1374 
1375 
1376 
1377 
1378 
1379