xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 00a98328c1f4fb9da36b452d9383e7a2e935b046)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.178 1996/11/29 22:26:49 curfman Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 int CreateColmap_MPIAIJ_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i;
19 
20   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscMemzero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
30                            PetscTruth *done)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetRowIJ_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
41                                PetscTruth *done)
42 {
43   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
44   int        ierr;
45   if (aij->size == 1) {
46     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
47   } else SETERRQ(1,"MatRestoreRowIJ_MPIAIJ:not supported in parallel");
48   return 0;
49 }
50 
51 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
52 
53 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
56   Scalar     value;
57   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
58   int        cstart = aij->cstart, cend = aij->cend,row,col;
59   int        roworiented = aij->roworiented;
60 
61 #if defined(PETSC_BOPT_g)
62   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
64   }
65 #endif
66   aij->insertmode = addv;
67   for ( i=0; i<m; i++ ) {
68 #if defined(PETSC_BOPT_g)
69     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
70     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
71 #endif
72     if (im[i] >= rstart && im[i] < rend) {
73       row = im[i] - rstart;
74       for ( j=0; j<n; j++ ) {
75         if (in[j] >= cstart && in[j] < cend){
76           col = in[j] - cstart;
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80 #if defined(PETSC_BOPT_g)
81         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");}
82         else if (in[j] >= aij->N) {SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");}
83 #endif
84         else {
85           if (mat->was_assembled) {
86             if (!aij->colmap) {
87               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
88             }
89             col = aij->colmap[in[j]] - 1;
90             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
91               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
92               col =  in[j];
93             }
94           }
95           else col = in[j];
96           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
97           ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101     else {
102       if (roworiented && !aij->donotstash) {
103         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
104       }
105       else {
106         if (!aij->donotstash) {
107           row = im[i];
108           for ( j=0; j<n; j++ ) {
109             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
110           }
111         }
112       }
113     }
114   }
115   return 0;
116 }
117 
118 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
119 {
120   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
121   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
122   int        cstart = aij->cstart, cend = aij->cend,row,col;
123 
124   for ( i=0; i<m; i++ ) {
125     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
126     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
127     if (idxm[i] >= rstart && idxm[i] < rend) {
128       row = idxm[i] - rstart;
129       for ( j=0; j<n; j++ ) {
130         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
131         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
132         if (idxn[j] >= cstart && idxn[j] < cend){
133           col = idxn[j] - cstart;
134           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
135         }
136         else {
137           if (!aij->colmap) {
138             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
139           }
140           col = aij->colmap[idxn[j]] - 1;
141           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
142           else {
143             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
144           }
145         }
146       }
147     }
148     else {
149       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
150     }
151   }
152   return 0;
153 }
154 
155 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
156 {
157   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
158   MPI_Comm    comm = mat->comm;
159   int         size = aij->size, *owners = aij->rowners;
160   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
161   MPI_Request *send_waits,*recv_waits;
162   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
163   InsertMode  addv;
164   Scalar      *rvalues,*svalues;
165 
166   /* make sure all processors are either in INSERTMODE or ADDMODE */
167   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
168   if (addv == (ADD_VALUES|INSERT_VALUES)) {
169     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
170   }
171   aij->insertmode = addv; /* in case this processor had no cache */
172 
173   /*  first count number of contributors to each processor */
174   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
175   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
176   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
177   for ( i=0; i<aij->stash.n; i++ ) {
178     idx = aij->stash.idx[i];
179     for ( j=0; j<size; j++ ) {
180       if (idx >= owners[j] && idx < owners[j+1]) {
181         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
182       }
183     }
184   }
185   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
186 
187   /* inform other processors of number of messages and max length*/
188   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
189   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
190   nreceives = work[rank];
191   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
192   nmax = work[rank];
193   PetscFree(work);
194 
195   /* post receives:
196        1) each message will consist of ordered pairs
197      (global index,value) we store the global index as a double
198      to simplify the message passing.
199        2) since we don't know how long each individual message is we
200      allocate the largest needed buffer for each receive. Potentially
201      this is a lot of wasted space.
202 
203 
204        This could be done better.
205   */
206   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
207   CHKPTRQ(rvalues);
208   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
209   CHKPTRQ(recv_waits);
210   for ( i=0; i<nreceives; i++ ) {
211     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
212               comm,recv_waits+i);
213   }
214 
215   /* do sends:
216       1) starts[i] gives the starting index in svalues for stuff going to
217          the ith processor
218   */
219   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
220   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
221   CHKPTRQ(send_waits);
222   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
223   starts[0] = 0;
224   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
225   for ( i=0; i<aij->stash.n; i++ ) {
226     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
227     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
228     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
229   }
230   PetscFree(owner);
231   starts[0] = 0;
232   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
233   count = 0;
234   for ( i=0; i<size; i++ ) {
235     if (procs[i]) {
236       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
237                 comm,send_waits+count++);
238     }
239   }
240   PetscFree(starts); PetscFree(nprocs);
241 
242   /* Free cache space */
243   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",rank,aij->stash.n);
244   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
245 
246   aij->svalues    = svalues;    aij->rvalues    = rvalues;
247   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
248   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
249   aij->rmax       = nmax;
250 
251   return 0;
252 }
253 extern int MatSetUpMultiply_MPIAIJ(Mat);
254 
255 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
256 {
257   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
258   MPI_Status  *send_status,recv_status;
259   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
260   int         row,col,other_disassembled;
261   Scalar      *values,val;
262   InsertMode  addv = aij->insertmode;
263 
264   /*  wait on receives */
265   while (count) {
266     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
267     /* unpack receives into our local space */
268     values = aij->rvalues + 3*imdex*aij->rmax;
269     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
270     n = n/3;
271     for ( i=0; i<n; i++ ) {
272       row = (int) PetscReal(values[3*i]) - aij->rstart;
273       col = (int) PetscReal(values[3*i+1]);
274       val = values[3*i+2];
275       if (col >= aij->cstart && col < aij->cend) {
276         col -= aij->cstart;
277         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
278       }
279       else {
280         if (mat->was_assembled || mat->assembled) {
281           if (!aij->colmap) {
282             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
283           }
284           col = aij->colmap[col] - 1;
285           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
286             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
287             col = (int) PetscReal(values[3*i+1]);
288           }
289         }
290         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
291       }
292     }
293     count--;
294   }
295   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
296 
297   /* wait on sends */
298   if (aij->nsends) {
299     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
300     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
301     PetscFree(send_status);
302   }
303   PetscFree(aij->send_waits); PetscFree(aij->svalues);
304 
305   aij->insertmode = NOT_SET_VALUES;
306   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
307   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
308 
309   /* determine if any processor has disassembled, if so we must
310      also disassemble ourselfs, in order that we may reassemble. */
311   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
312   if (mat->was_assembled && !other_disassembled) {
313     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
314   }
315 
316   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
317     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
318   }
319   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
320   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
321 
322   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
323   return 0;
324 }
325 
326 static int MatZeroEntries_MPIAIJ(Mat A)
327 {
328   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
329   int        ierr;
330   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
331   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
332   return 0;
333 }
334 
335 /* the code does not do the diagonal entries correctly unless the
336    matrix is square and the column and row owerships are identical.
337    This is a BUG. The only way to fix it seems to be to access
338    aij->A and aij->B directly and not through the MatZeroRows()
339    routine.
340 */
341 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
342 {
343   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
344   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
345   int            *procs,*nprocs,j,found,idx,nsends,*work;
346   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
347   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
348   int            *lens,imdex,*lrows,*values;
349   MPI_Comm       comm = A->comm;
350   MPI_Request    *send_waits,*recv_waits;
351   MPI_Status     recv_status,*send_status;
352   IS             istmp;
353 
354   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
355   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
356 
357   /*  first count number of contributors to each processor */
358   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
359   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
360   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
361   for ( i=0; i<N; i++ ) {
362     idx = rows[i];
363     found = 0;
364     for ( j=0; j<size; j++ ) {
365       if (idx >= owners[j] && idx < owners[j+1]) {
366         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
367       }
368     }
369     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
370   }
371   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
372 
373   /* inform other processors of number of messages and max length*/
374   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
375   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
376   nrecvs = work[rank];
377   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
378   nmax = work[rank];
379   PetscFree(work);
380 
381   /* post receives:   */
382   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
383   CHKPTRQ(rvalues);
384   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
385   CHKPTRQ(recv_waits);
386   for ( i=0; i<nrecvs; i++ ) {
387     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
388   }
389 
390   /* do sends:
391       1) starts[i] gives the starting index in svalues for stuff going to
392          the ith processor
393   */
394   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
395   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
396   CHKPTRQ(send_waits);
397   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
398   starts[0] = 0;
399   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
400   for ( i=0; i<N; i++ ) {
401     svalues[starts[owner[i]]++] = rows[i];
402   }
403   ISRestoreIndices(is,&rows);
404 
405   starts[0] = 0;
406   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
407   count = 0;
408   for ( i=0; i<size; i++ ) {
409     if (procs[i]) {
410       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
411     }
412   }
413   PetscFree(starts);
414 
415   base = owners[rank];
416 
417   /*  wait on receives */
418   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
419   source = lens + nrecvs;
420   count  = nrecvs; slen = 0;
421   while (count) {
422     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
423     /* unpack receives into our local space */
424     MPI_Get_count(&recv_status,MPI_INT,&n);
425     source[imdex]  = recv_status.MPI_SOURCE;
426     lens[imdex]  = n;
427     slen += n;
428     count--;
429   }
430   PetscFree(recv_waits);
431 
432   /* move the data into the send scatter */
433   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
434   count = 0;
435   for ( i=0; i<nrecvs; i++ ) {
436     values = rvalues + i*nmax;
437     for ( j=0; j<lens[i]; j++ ) {
438       lrows[count++] = values[j] - base;
439     }
440   }
441   PetscFree(rvalues); PetscFree(lens);
442   PetscFree(owner); PetscFree(nprocs);
443 
444   /* actually zap the local rows */
445   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
446   PLogObjectParent(A,istmp);
447   PetscFree(lrows);
448   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
449   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
450   ierr = ISDestroy(istmp); CHKERRQ(ierr);
451 
452   /* wait on sends */
453   if (nsends) {
454     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
455     CHKPTRQ(send_status);
456     MPI_Waitall(nsends,send_waits,send_status);
457     PetscFree(send_status);
458   }
459   PetscFree(send_waits); PetscFree(svalues);
460 
461   return 0;
462 }
463 
464 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
465 {
466   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
467   int        ierr,nt;
468 
469   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
470   if (nt != a->n) {
471     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
472   }
473   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
474   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
475   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
476   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
477   return 0;
478 }
479 
480 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
481 {
482   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
483   int        ierr;
484   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
485   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
486   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
487   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
488   return 0;
489 }
490 
491 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
492 {
493   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
494   int        ierr;
495 
496   /* do nondiagonal part */
497   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
498   /* send it on its way */
499   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
500   /* do local part */
501   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
502   /* receive remote parts: note this assumes the values are not actually */
503   /* inserted in yy until the next line, which is true for my implementation*/
504   /* but is not perhaps always true. */
505   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
506   return 0;
507 }
508 
509 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
510 {
511   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
512   int        ierr;
513 
514   /* do nondiagonal part */
515   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
516   /* send it on its way */
517   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
518   /* do local part */
519   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
520   /* receive remote parts: note this assumes the values are not actually */
521   /* inserted in yy until the next line, which is true for my implementation*/
522   /* but is not perhaps always true. */
523   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
524   return 0;
525 }
526 
527 /*
528   This only works correctly for square matrices where the subblock A->A is the
529    diagonal block
530 */
531 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
532 {
533   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
534   if (a->M != a->N)
535     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
536   if (a->rstart != a->cstart || a->rend != a->cend) {
537     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
538   return MatGetDiagonal(a->A,v);
539 }
540 
541 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
542 {
543   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
544   int        ierr;
545   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
546   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
547   return 0;
548 }
549 
550 static int MatDestroy_MPIAIJ(PetscObject obj)
551 {
552   Mat        mat = (Mat) obj;
553   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
554   int        ierr;
555 #if defined(PETSC_LOG)
556   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
557 #endif
558   PetscFree(aij->rowners);
559   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
560   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
561   if (aij->colmap) PetscFree(aij->colmap);
562   if (aij->garray) PetscFree(aij->garray);
563   if (aij->lvec)   VecDestroy(aij->lvec);
564   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
565   if (aij->rowvalues) PetscFree(aij->rowvalues);
566   PetscFree(aij);
567   if (mat->mapping) {
568     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
569   }
570   PLogObjectDestroy(mat);
571   PetscHeaderDestroy(mat);
572   return 0;
573 }
574 #include "draw.h"
575 #include "pinclude/pviewer.h"
576 
577 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
578 {
579   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
580   int         ierr;
581 
582   if (aij->size == 1) {
583     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
584   }
585   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
586   return 0;
587 }
588 
589 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
590 {
591   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
592   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
593   int         ierr, format,shift = C->indexshift,rank;
594   FILE        *fd;
595   ViewerType  vtype;
596 
597   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
598   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
599     ierr = ViewerGetFormat(viewer,&format);
600     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
601       MatInfo info;
602       int     flg;
603       MPI_Comm_rank(mat->comm,&rank);
604       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
605       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
606       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
607       PetscSequentialPhaseBegin(mat->comm,1);
608       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
609          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
610       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
611          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
612       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
613       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
614       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
615       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
616       fflush(fd);
617       PetscSequentialPhaseEnd(mat->comm,1);
618       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
619       return 0;
620     }
621     else if (format == VIEWER_FORMAT_ASCII_INFO) {
622       return 0;
623     }
624   }
625 
626   if (vtype == DRAW_VIEWER) {
627     Draw       draw;
628     PetscTruth isnull;
629     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
630     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
631   }
632 
633   if (vtype == ASCII_FILE_VIEWER) {
634     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
635     PetscSequentialPhaseBegin(mat->comm,1);
636     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
637            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
638            aij->cend);
639     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
640     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
641     fflush(fd);
642     PetscSequentialPhaseEnd(mat->comm,1);
643   }
644   else {
645     int size = aij->size;
646     rank = aij->rank;
647     if (size == 1) {
648       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
649     }
650     else {
651       /* assemble the entire matrix onto first processor. */
652       Mat         A;
653       Mat_SeqAIJ *Aloc;
654       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
655       Scalar      *a;
656 
657       if (!rank) {
658         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
659                CHKERRQ(ierr);
660       }
661       else {
662         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
663                CHKERRQ(ierr);
664       }
665       PLogObjectParent(mat,A);
666 
667       /* copy over the A part */
668       Aloc = (Mat_SeqAIJ*) aij->A->data;
669       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
670       row = aij->rstart;
671       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
672       for ( i=0; i<m; i++ ) {
673         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
674         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
675       }
676       aj = Aloc->j;
677       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
678 
679       /* copy over the B part */
680       Aloc = (Mat_SeqAIJ*) aij->B->data;
681       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
682       row = aij->rstart;
683       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
684       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
685       for ( i=0; i<m; i++ ) {
686         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
687         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
688       }
689       PetscFree(ct);
690       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
691       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
692       if (!rank) {
693         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
694       }
695       ierr = MatDestroy(A); CHKERRQ(ierr);
696     }
697   }
698   return 0;
699 }
700 
701 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
702 {
703   Mat         mat = (Mat) obj;
704   int         ierr;
705   ViewerType  vtype;
706 
707   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
708   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
709       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
710     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
711   }
712   else if (vtype == BINARY_FILE_VIEWER) {
713     return MatView_MPIAIJ_Binary(mat,viewer);
714   }
715   return 0;
716 }
717 
718 /*
719     This has to provide several versions.
720 
721      2) a) use only local smoothing updating outer values only once.
722         b) local smoothing updating outer values each inner iteration
723      3) color updating out values betwen colors.
724 */
725 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
726                            double fshift,int its,Vec xx)
727 {
728   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
729   Mat        AA = mat->A, BB = mat->B;
730   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
731   Scalar     *b,*x,*xs,*ls,d,*v,sum;
732   int        ierr,*idx, *diag;
733   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
734 
735   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
736   xs = x + shift; /* shift by one for index start of 1 */
737   ls = ls + shift;
738   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
739   diag = A->diag;
740   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
741     if (flag & SOR_ZERO_INITIAL_GUESS) {
742       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
743     }
744     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
745     CHKERRQ(ierr);
746     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
747     CHKERRQ(ierr);
748     while (its--) {
749       /* go down through the rows */
750       for ( i=0; i<m; i++ ) {
751         n    = A->i[i+1] - A->i[i];
752         idx  = A->j + A->i[i] + shift;
753         v    = A->a + A->i[i] + shift;
754         sum  = b[i];
755         SPARSEDENSEMDOT(sum,xs,v,idx,n);
756         d    = fshift + A->a[diag[i]+shift];
757         n    = B->i[i+1] - B->i[i];
758         idx  = B->j + B->i[i] + shift;
759         v    = B->a + B->i[i] + shift;
760         SPARSEDENSEMDOT(sum,ls,v,idx,n);
761         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
762       }
763       /* come up through the rows */
764       for ( i=m-1; i>-1; i-- ) {
765         n    = A->i[i+1] - A->i[i];
766         idx  = A->j + A->i[i] + shift;
767         v    = A->a + A->i[i] + shift;
768         sum  = b[i];
769         SPARSEDENSEMDOT(sum,xs,v,idx,n);
770         d    = fshift + A->a[diag[i]+shift];
771         n    = B->i[i+1] - B->i[i];
772         idx  = B->j + B->i[i] + shift;
773         v    = B->a + B->i[i] + shift;
774         SPARSEDENSEMDOT(sum,ls,v,idx,n);
775         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
776       }
777     }
778   }
779   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
780     if (flag & SOR_ZERO_INITIAL_GUESS) {
781       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
782     }
783     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
784     CHKERRQ(ierr);
785     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);
786     CHKERRQ(ierr);
787     while (its--) {
788       for ( i=0; i<m; i++ ) {
789         n    = A->i[i+1] - A->i[i];
790         idx  = A->j + A->i[i] + shift;
791         v    = A->a + A->i[i] + shift;
792         sum  = b[i];
793         SPARSEDENSEMDOT(sum,xs,v,idx,n);
794         d    = fshift + A->a[diag[i]+shift];
795         n    = B->i[i+1] - B->i[i];
796         idx  = B->j + B->i[i] + shift;
797         v    = B->a + B->i[i] + shift;
798         SPARSEDENSEMDOT(sum,ls,v,idx,n);
799         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
800       }
801     }
802   }
803   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
804     if (flag & SOR_ZERO_INITIAL_GUESS) {
805       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
806     }
807     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
808                             mat->Mvctx); CHKERRQ(ierr);
809     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
810                             mat->Mvctx); CHKERRQ(ierr);
811     while (its--) {
812       for ( i=m-1; i>-1; i-- ) {
813         n    = A->i[i+1] - A->i[i];
814         idx  = A->j + A->i[i] + shift;
815         v    = A->a + A->i[i] + shift;
816         sum  = b[i];
817         SPARSEDENSEMDOT(sum,xs,v,idx,n);
818         d    = fshift + A->a[diag[i]+shift];
819         n    = B->i[i+1] - B->i[i];
820         idx  = B->j + B->i[i] + shift;
821         v    = B->a + B->i[i] + shift;
822         SPARSEDENSEMDOT(sum,ls,v,idx,n);
823         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
824       }
825     }
826   }
827   else {
828     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
829   }
830   return 0;
831 }
832 
833 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
834 {
835   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
836   Mat        A = mat->A, B = mat->B;
837   int        ierr;
838   double     isend[5], irecv[5];
839 
840   info->rows_global    = (double)mat->M;
841   info->columns_global = (double)mat->N;
842   info->rows_local     = (double)mat->m;
843   info->columns_local  = (double)mat->N;
844   info->block_size     = 1.0;
845   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
846   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
847   isend[3] = info->memory;  isend[4] = info->mallocs;
848   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
849   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
850   isend[3] += info->memory;  isend[4] += info->mallocs;
851   if (flag == MAT_LOCAL) {
852     info->nz_used      = isend[0];
853     info->nz_allocated = isend[1];
854     info->nz_unneeded  = isend[2];
855     info->memory       = isend[3];
856     info->mallocs      = isend[4];
857   } else if (flag == MAT_GLOBAL_MAX) {
858     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
859     info->nz_used      = irecv[0];
860     info->nz_allocated = irecv[1];
861     info->nz_unneeded  = irecv[2];
862     info->memory       = irecv[3];
863     info->mallocs      = irecv[4];
864   } else if (flag == MAT_GLOBAL_SUM) {
865     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
866     info->nz_used      = irecv[0];
867     info->nz_allocated = irecv[1];
868     info->nz_unneeded  = irecv[2];
869     info->memory       = irecv[3];
870     info->mallocs      = irecv[4];
871   }
872   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
873   info->fill_ratio_needed = 0;
874   info->factor_mallocs    = 0;
875 
876   return 0;
877 }
878 
879 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
880 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
881 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
882 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
883 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
884 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
885 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
886 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
887 
888 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
889 {
890   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
891 
892   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
893       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
894       op == MAT_COLUMNS_UNSORTED ||
895       op == MAT_COLUMNS_SORTED) {
896         MatSetOption(a->A,op);
897         MatSetOption(a->B,op);
898   } else if (op == MAT_ROW_ORIENTED) {
899     a->roworiented = 1;
900     MatSetOption(a->A,op);
901     MatSetOption(a->B,op);
902   } else if (op == MAT_ROWS_SORTED ||
903              op == MAT_ROWS_UNSORTED ||
904              op == MAT_SYMMETRIC ||
905              op == MAT_STRUCTURALLY_SYMMETRIC ||
906              op == MAT_YES_NEW_DIAGONALS)
907     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
908   else if (op == MAT_COLUMN_ORIENTED) {
909     a->roworiented = 0;
910     MatSetOption(a->A,op);
911     MatSetOption(a->B,op);
912   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
913     a->donotstash = 1;
914   } else if (op == MAT_NO_NEW_DIAGONALS)
915     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
916   else
917     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
918   return 0;
919 }
920 
921 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
922 {
923   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
924   *m = mat->M; *n = mat->N;
925   return 0;
926 }
927 
928 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
929 {
930   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
931   *m = mat->m; *n = mat->N;
932   return 0;
933 }
934 
935 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
936 {
937   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
938   *m = mat->rstart; *n = mat->rend;
939   return 0;
940 }
941 
942 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
943 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
944 
945 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
946 {
947   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
948   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
949   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
950   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
951   int        *cmap, *idx_p;
952 
953   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
954   mat->getrowactive = PETSC_TRUE;
955 
956   if (!mat->rowvalues && (idx || v)) {
957     /*
958         allocate enough space to hold information from the longest row.
959     */
960     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
961     int     max = 1,m = mat->m,tmp;
962     for ( i=0; i<m; i++ ) {
963       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
964       if (max < tmp) { max = tmp; }
965     }
966     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
967     CHKPTRQ(mat->rowvalues);
968     mat->rowindices = (int *) (mat->rowvalues + max);
969   }
970 
971   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
972   lrow = row - rstart;
973 
974   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
975   if (!v)   {pvA = 0; pvB = 0;}
976   if (!idx) {pcA = 0; if (!v) pcB = 0;}
977   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
978   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
979   nztot = nzA + nzB;
980 
981   cmap  = mat->garray;
982   if (v  || idx) {
983     if (nztot) {
984       /* Sort by increasing column numbers, assuming A and B already sorted */
985       int imark = -1;
986       if (v) {
987         *v = v_p = mat->rowvalues;
988         for ( i=0; i<nzB; i++ ) {
989           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
990           else break;
991         }
992         imark = i;
993         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
994         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
995       }
996       if (idx) {
997         *idx = idx_p = mat->rowindices;
998         if (imark > -1) {
999           for ( i=0; i<imark; i++ ) {
1000             idx_p[i] = cmap[cworkB[i]];
1001           }
1002         } else {
1003           for ( i=0; i<nzB; i++ ) {
1004             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1005             else break;
1006           }
1007           imark = i;
1008         }
1009         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1010         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1011       }
1012     }
1013     else {
1014       if (idx) *idx = 0;
1015       if (v)   *v   = 0;
1016     }
1017   }
1018   *nz = nztot;
1019   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1020   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1021   return 0;
1022 }
1023 
1024 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1025 {
1026   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1027   if (aij->getrowactive == PETSC_FALSE) {
1028     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1029   }
1030   aij->getrowactive = PETSC_FALSE;
1031   return 0;
1032 }
1033 
1034 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1035 {
1036   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1037   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1038   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1039   double     sum = 0.0;
1040   Scalar     *v;
1041 
1042   if (aij->size == 1) {
1043     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1044   } else {
1045     if (type == NORM_FROBENIUS) {
1046       v = amat->a;
1047       for (i=0; i<amat->nz; i++ ) {
1048 #if defined(PETSC_COMPLEX)
1049         sum += real(conj(*v)*(*v)); v++;
1050 #else
1051         sum += (*v)*(*v); v++;
1052 #endif
1053       }
1054       v = bmat->a;
1055       for (i=0; i<bmat->nz; i++ ) {
1056 #if defined(PETSC_COMPLEX)
1057         sum += real(conj(*v)*(*v)); v++;
1058 #else
1059         sum += (*v)*(*v); v++;
1060 #endif
1061       }
1062       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1063       *norm = sqrt(*norm);
1064     }
1065     else if (type == NORM_1) { /* max column norm */
1066       double *tmp, *tmp2;
1067       int    *jj, *garray = aij->garray;
1068       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1069       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1070       PetscMemzero(tmp,aij->N*sizeof(double));
1071       *norm = 0.0;
1072       v = amat->a; jj = amat->j;
1073       for ( j=0; j<amat->nz; j++ ) {
1074         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1075       }
1076       v = bmat->a; jj = bmat->j;
1077       for ( j=0; j<bmat->nz; j++ ) {
1078         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1079       }
1080       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1081       for ( j=0; j<aij->N; j++ ) {
1082         if (tmp2[j] > *norm) *norm = tmp2[j];
1083       }
1084       PetscFree(tmp); PetscFree(tmp2);
1085     }
1086     else if (type == NORM_INFINITY) { /* max row norm */
1087       double ntemp = 0.0;
1088       for ( j=0; j<amat->m; j++ ) {
1089         v = amat->a + amat->i[j] + shift;
1090         sum = 0.0;
1091         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1092           sum += PetscAbsScalar(*v); v++;
1093         }
1094         v = bmat->a + bmat->i[j] + shift;
1095         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1096           sum += PetscAbsScalar(*v); v++;
1097         }
1098         if (sum > ntemp) ntemp = sum;
1099       }
1100       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1101     }
1102     else {
1103       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1104     }
1105   }
1106   return 0;
1107 }
1108 
1109 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1110 {
1111   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1112   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1113   int        ierr,shift = Aloc->indexshift;
1114   Mat        B;
1115   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1116   Scalar     *array;
1117 
1118   if (matout == PETSC_NULL && M != N)
1119     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1120   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1121          PETSC_NULL,&B); CHKERRQ(ierr);
1122 
1123   /* copy over the A part */
1124   Aloc = (Mat_SeqAIJ*) a->A->data;
1125   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1126   row = a->rstart;
1127   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1128   for ( i=0; i<m; i++ ) {
1129     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1130     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1131   }
1132   aj = Aloc->j;
1133   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1134 
1135   /* copy over the B part */
1136   Aloc = (Mat_SeqAIJ*) a->B->data;
1137   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1138   row = a->rstart;
1139   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1140   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1141   for ( i=0; i<m; i++ ) {
1142     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1143     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1144   }
1145   PetscFree(ct);
1146   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1147   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1148   if (matout != PETSC_NULL) {
1149     *matout = B;
1150   } else {
1151     /* This isn't really an in-place transpose .... but free data structures from a */
1152     PetscFree(a->rowners);
1153     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1154     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1155     if (a->colmap) PetscFree(a->colmap);
1156     if (a->garray) PetscFree(a->garray);
1157     if (a->lvec) VecDestroy(a->lvec);
1158     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1159     PetscFree(a);
1160     PetscMemcpy(A,B,sizeof(struct _Mat));
1161     PetscHeaderDestroy(B);
1162   }
1163   return 0;
1164 }
1165 
1166 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1167 {
1168   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1169   Mat a = aij->A, b = aij->B;
1170   int ierr,s1,s2,s3;
1171 
1172   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1173   if (rr) {
1174     s3 = aij->n;
1175     VecGetLocalSize_Fast(rr,s1);
1176     if (s1!=s3) SETERRQ(1,"MatDiagonalScale: right vector non-conforming local size");
1177     /* Overlap communication with computation. */
1178     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1179   }
1180   if (ll) {
1181     VecGetLocalSize_Fast(ll,s1);
1182     if (s1!=s2) SETERRQ(1,"MatDiagonalScale: left vector non-conforming local size");
1183     ierr = (*b->ops.diagonalscale)(b,ll,0); CHKERRQ(ierr);
1184   }
1185   /* scale  the diagonal block */
1186   ierr = (*a->ops.diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1187 
1188   if (rr) {
1189     /* Do a scatter end and then right scale the off-diagonal block */
1190     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1191     ierr = (*b->ops.diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1192   }
1193 
1194   return 0;
1195 }
1196 
1197 
1198 extern int MatPrintHelp_SeqAIJ(Mat);
1199 static int MatPrintHelp_MPIAIJ(Mat A)
1200 {
1201   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1202 
1203   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1204   else return 0;
1205 }
1206 
1207 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1208 {
1209   *bs = 1;
1210   return 0;
1211 }
1212 
1213 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1214 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1215 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1216 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1217 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1218 /* -------------------------------------------------------------------*/
1219 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1220        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1221        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1222        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1223        0,0,
1224        0,0,
1225        0,0,
1226        MatRelax_MPIAIJ,
1227        MatTranspose_MPIAIJ,
1228        MatGetInfo_MPIAIJ,0,
1229        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1230        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1231        0,
1232        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1233        0,0,0,0,
1234        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1235        0,0,
1236        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1237        0,0,0,
1238        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1239        MatPrintHelp_MPIAIJ,
1240        MatScale_MPIAIJ,0,0,0,
1241        MatGetBlockSize_MPIAIJ,0,0,0,0,
1242        MatFDColoringCreate_MPIAIJ};
1243 
1244 
1245 /*@C
1246    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1247    (the default parallel PETSc format).  For good matrix assembly performance
1248    the user should preallocate the matrix storage by setting the parameters
1249    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1250    performance can be increased by more than a factor of 50.
1251 
1252    Input Parameters:
1253 .  comm - MPI communicator
1254 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1255            This value should be the same as the local size used in creating the
1256            y vector for the matrix-vector product y = Ax.
1257 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1258            This value should be the same as the local size used in creating the
1259            x vector for the matrix-vector product y = Ax.
1260 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1261 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1262 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1263            (same for all local rows)
1264 .  d_nzz - array containing the number of nonzeros in the various rows of the
1265            diagonal portion of the local submatrix (possibly different for each row)
1266            or PETSC_NULL. You must leave room for the diagonal entry even if
1267            it is zero.
1268 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1269            submatrix (same for all local rows).
1270 .  o_nzz - array containing the number of nonzeros in the various rows of the
1271            off-diagonal portion of the local submatrix (possibly different for
1272            each row) or PETSC_NULL.
1273 
1274    Output Parameter:
1275 .  A - the matrix
1276 
1277    Notes:
1278    The AIJ format (also called the Yale sparse matrix format or
1279    compressed row storage), is fully compatible with standard Fortran 77
1280    storage.  That is, the stored row and column indices can begin at
1281    either one (as in Fortran) or zero.  See the users manual for details.
1282 
1283    The user MUST specify either the local or global matrix dimensions
1284    (possibly both).
1285 
1286    By default, this format uses inodes (identical nodes) when possible.
1287    We search for consecutive rows with the same nonzero structure, thereby
1288    reusing matrix information to achieve increased efficiency.
1289 
1290    Options Database Keys:
1291 $    -mat_aij_no_inode  - Do not use inodes
1292 $    -mat_aij_inode_limit <limit> - Set inode limit.
1293 $        (max limit=5)
1294 $    -mat_aij_oneindex - Internally use indexing starting at 1
1295 $        rather than 0.  Note: When calling MatSetValues(),
1296 $        the user still MUST index entries starting at 0!
1297 
1298    Storage Information:
1299    For a square global matrix we define each processor's diagonal portion
1300    to be its local rows and the corresponding columns (a square submatrix);
1301    each processor's off-diagonal portion encompasses the remainder of the
1302    local matrix (a rectangular submatrix).
1303 
1304    The user can specify preallocated storage for the diagonal part of
1305    the local submatrix with either d_nz or d_nnz (not both).  Set
1306    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1307    memory allocation.  Likewise, specify preallocated storage for the
1308    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1309 
1310    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1311    the figure below we depict these three local rows and all columns (0-11).
1312 
1313 $          0 1 2 3 4 5 6 7 8 9 10 11
1314 $         -------------------
1315 $  row 3  |  o o o d d d o o o o o o
1316 $  row 4  |  o o o d d d o o o o o o
1317 $  row 5  |  o o o d d d o o o o o o
1318 $         -------------------
1319 $
1320 
1321    Thus, any entries in the d locations are stored in the d (diagonal)
1322    submatrix, and any entries in the o locations are stored in the
1323    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1324    stored simply in the MATSEQAIJ format for compressed row storage.
1325 
1326    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1327    and o_nz should indicate the number of nonzeros per row in the o matrix.
1328    In general, for PDE problems in which most nonzeros are near the diagonal,
1329    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1330    or you will get TERRIBLE performance; see the users' manual chapter on
1331    matrices.
1332 
1333 .keywords: matrix, aij, compressed row, sparse, parallel
1334 
1335 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1336 @*/
1337 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1338                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1339 {
1340   Mat          B;
1341   Mat_MPIAIJ   *b;
1342   int          ierr, i,sum[2],work[2],size;
1343 
1344   *A = 0;
1345   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1346   PLogObjectCreate(B);
1347   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1348   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1349   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1350   MPI_Comm_size(comm,&size);
1351   if (size == 1) {
1352     B->ops.getrowij          = MatGetRowIJ_MPIAIJ;
1353     B->ops.restorerowij      = MatRestoreRowIJ_MPIAIJ;
1354     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIAIJ;
1355     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIAIJ;
1356     B->ops.lufactor          = MatLUFactor_MPIAIJ;
1357     B->ops.solve             = MatSolve_MPIAIJ;
1358     B->ops.solveadd          = MatSolveAdd_MPIAIJ;
1359     B->ops.solvetrans        = MatSolveTrans_MPIAIJ;
1360     B->ops.solvetransadd     = MatSolveTransAdd_MPIAIJ;
1361     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIAIJ;
1362   }
1363   B->destroy    = MatDestroy_MPIAIJ;
1364   B->view       = MatView_MPIAIJ;
1365   B->factor     = 0;
1366   B->assembled  = PETSC_FALSE;
1367   B->mapping    = 0;
1368 
1369   b->insertmode = NOT_SET_VALUES;
1370   MPI_Comm_rank(comm,&b->rank);
1371   MPI_Comm_size(comm,&b->size);
1372 
1373   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1374     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1375 
1376   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1377     work[0] = m; work[1] = n;
1378     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1379     if (M == PETSC_DECIDE) M = sum[0];
1380     if (N == PETSC_DECIDE) N = sum[1];
1381   }
1382   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1383   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1384   b->m = m; B->m = m;
1385   b->n = n; B->n = n;
1386   b->N = N; B->N = N;
1387   b->M = M; B->M = M;
1388 
1389   /* build local table of row and column ownerships */
1390   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1391   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1392   b->cowners = b->rowners + b->size + 2;
1393   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1394   b->rowners[0] = 0;
1395   for ( i=2; i<=b->size; i++ ) {
1396     b->rowners[i] += b->rowners[i-1];
1397   }
1398   b->rstart = b->rowners[b->rank];
1399   b->rend   = b->rowners[b->rank+1];
1400   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1401   b->cowners[0] = 0;
1402   for ( i=2; i<=b->size; i++ ) {
1403     b->cowners[i] += b->cowners[i-1];
1404   }
1405   b->cstart = b->cowners[b->rank];
1406   b->cend   = b->cowners[b->rank+1];
1407 
1408   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1409   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1410   PLogObjectParent(B,b->A);
1411   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1412   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1413   PLogObjectParent(B,b->B);
1414 
1415   /* build cache for off array entries formed */
1416   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1417   b->donotstash  = 0;
1418   b->colmap      = 0;
1419   b->garray      = 0;
1420   b->roworiented = 1;
1421 
1422   /* stuff used for matrix vector multiply */
1423   b->lvec      = 0;
1424   b->Mvctx     = 0;
1425 
1426   /* stuff for MatGetRow() */
1427   b->rowindices   = 0;
1428   b->rowvalues    = 0;
1429   b->getrowactive = PETSC_FALSE;
1430 
1431   *A = B;
1432   return 0;
1433 }
1434 
1435 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1436 {
1437   Mat        mat;
1438   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1439   int        ierr, len=0, flg;
1440 
1441   *newmat       = 0;
1442   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1443   PLogObjectCreate(mat);
1444   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1445   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1446   mat->destroy    = MatDestroy_MPIAIJ;
1447   mat->view       = MatView_MPIAIJ;
1448   mat->factor     = matin->factor;
1449   mat->assembled  = PETSC_TRUE;
1450 
1451   a->m = mat->m   = oldmat->m;
1452   a->n = mat->n   = oldmat->n;
1453   a->M = mat->M   = oldmat->M;
1454   a->N = mat->N   = oldmat->N;
1455 
1456   a->rstart       = oldmat->rstart;
1457   a->rend         = oldmat->rend;
1458   a->cstart       = oldmat->cstart;
1459   a->cend         = oldmat->cend;
1460   a->size         = oldmat->size;
1461   a->rank         = oldmat->rank;
1462   a->insertmode   = NOT_SET_VALUES;
1463   a->rowvalues    = 0;
1464   a->getrowactive = PETSC_FALSE;
1465 
1466   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1467   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1468   a->cowners = a->rowners + a->size + 2;
1469   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1470   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1471   if (oldmat->colmap) {
1472     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1473     PLogObjectMemory(mat,(a->N)*sizeof(int));
1474     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1475   } else a->colmap = 0;
1476   if (oldmat->garray) {
1477     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1478     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1479     PLogObjectMemory(mat,len*sizeof(int));
1480     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1481   } else a->garray = 0;
1482 
1483   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1484   PLogObjectParent(mat,a->lvec);
1485   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1486   PLogObjectParent(mat,a->Mvctx);
1487   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1488   PLogObjectParent(mat,a->A);
1489   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1490   PLogObjectParent(mat,a->B);
1491   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1492   if (flg) {
1493     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1494   }
1495   *newmat = mat;
1496   return 0;
1497 }
1498 
1499 #include "sys.h"
1500 
1501 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1502 {
1503   Mat          A;
1504   int          i, nz, ierr, j,rstart, rend, fd;
1505   Scalar       *vals,*svals;
1506   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1507   MPI_Status   status;
1508   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1509   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1510   int          tag = ((PetscObject)viewer)->tag;
1511 
1512   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1513   if (!rank) {
1514     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1515     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1516     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1517   }
1518 
1519   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1520   M = header[1]; N = header[2];
1521   /* determine ownership of all rows */
1522   m = M/size + ((M % size) > rank);
1523   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1524   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1525   rowners[0] = 0;
1526   for ( i=2; i<=size; i++ ) {
1527     rowners[i] += rowners[i-1];
1528   }
1529   rstart = rowners[rank];
1530   rend   = rowners[rank+1];
1531 
1532   /* distribute row lengths to all processors */
1533   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1534   offlens = ourlens + (rend-rstart);
1535   if (!rank) {
1536     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1537     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1538     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1539     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1540     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1541     PetscFree(sndcounts);
1542   }
1543   else {
1544     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1545   }
1546 
1547   if (!rank) {
1548     /* calculate the number of nonzeros on each processor */
1549     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1550     PetscMemzero(procsnz,size*sizeof(int));
1551     for ( i=0; i<size; i++ ) {
1552       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1553         procsnz[i] += rowlengths[j];
1554       }
1555     }
1556     PetscFree(rowlengths);
1557 
1558     /* determine max buffer needed and allocate it */
1559     maxnz = 0;
1560     for ( i=0; i<size; i++ ) {
1561       maxnz = PetscMax(maxnz,procsnz[i]);
1562     }
1563     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1564 
1565     /* read in my part of the matrix column indices  */
1566     nz = procsnz[0];
1567     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1568     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1569 
1570     /* read in every one elses and ship off */
1571     for ( i=1; i<size; i++ ) {
1572       nz = procsnz[i];
1573       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1574       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1575     }
1576     PetscFree(cols);
1577   }
1578   else {
1579     /* determine buffer space needed for message */
1580     nz = 0;
1581     for ( i=0; i<m; i++ ) {
1582       nz += ourlens[i];
1583     }
1584     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1585 
1586     /* receive message of column indices*/
1587     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1588     MPI_Get_count(&status,MPI_INT,&maxnz);
1589     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1590   }
1591 
1592   /* loop over local rows, determining number of off diagonal entries */
1593   PetscMemzero(offlens,m*sizeof(int));
1594   jj = 0;
1595   for ( i=0; i<m; i++ ) {
1596     for ( j=0; j<ourlens[i]; j++ ) {
1597       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1598       jj++;
1599     }
1600   }
1601 
1602   /* create our matrix */
1603   for ( i=0; i<m; i++ ) {
1604     ourlens[i] -= offlens[i];
1605   }
1606   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1607   A = *newmat;
1608   MatSetOption(A,MAT_COLUMNS_SORTED);
1609   for ( i=0; i<m; i++ ) {
1610     ourlens[i] += offlens[i];
1611   }
1612 
1613   if (!rank) {
1614     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1615 
1616     /* read in my part of the matrix numerical values  */
1617     nz = procsnz[0];
1618     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1619 
1620     /* insert into matrix */
1621     jj      = rstart;
1622     smycols = mycols;
1623     svals   = vals;
1624     for ( i=0; i<m; i++ ) {
1625       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1626       smycols += ourlens[i];
1627       svals   += ourlens[i];
1628       jj++;
1629     }
1630 
1631     /* read in other processors and ship out */
1632     for ( i=1; i<size; i++ ) {
1633       nz = procsnz[i];
1634       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1635       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1636     }
1637     PetscFree(procsnz);
1638   }
1639   else {
1640     /* receive numeric values */
1641     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1642 
1643     /* receive message of values*/
1644     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1645     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1646     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1647 
1648     /* insert into matrix */
1649     jj      = rstart;
1650     smycols = mycols;
1651     svals   = vals;
1652     for ( i=0; i<m; i++ ) {
1653       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1654       smycols += ourlens[i];
1655       svals   += ourlens[i];
1656       jj++;
1657     }
1658   }
1659   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1660 
1661   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1662   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1663   return 0;
1664 }
1665