xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 90f02eec332fcca4c33b4e7b21cabed76bf0614e)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.173 1996/11/01 23:39:29 balay Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/aij/mpi/mpiaij.h"
6 #include "src/vec/vecimpl.h"
7 #include "src/inline/spops.h"
8 
9 /* local utility routine that creates a mapping from the global column
10 number to the local number in the off-diagonal part of the local
11 storage of the matrix.  This is done in a non scable way since the
12 length of colmap equals the global matrix length.
13 */
14 int CreateColmap_MPIAIJ_Private(Mat mat)
15 {
16   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
17   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
18   int        n = B->n,i;
19 
20   aij->colmap = (int *) PetscMalloc(aij->N*sizeof(int));CHKPTRQ(aij->colmap);
21   PLogObjectMemory(mat,aij->N*sizeof(int));
22   PetscMemzero(aij->colmap,aij->N*sizeof(int));
23   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
24   return 0;
25 }
26 
27 extern int DisAssemble_MPIAIJ(Mat);
28 
29 static int MatGetRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
30                            PetscTruth *done)
31 {
32   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
33   int        ierr;
34   if (aij->size == 1) {
35     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
36   } else SETERRQ(1,"MatGetRowIJ_MPIAIJ:not supported in parallel");
37   return 0;
38 }
39 
40 static int MatRestoreRowIJ_MPIAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
41                                PetscTruth *done)
42 {
43   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
44   int        ierr;
45   if (aij->size == 1) {
46     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
47   } else SETERRQ(1,"MatRestoreRowIJ_MPIAIJ:not supported in parallel");
48   return 0;
49 }
50 
51 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
52 
53 static int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
54 {
55   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
56   Scalar     value;
57   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
58   int        cstart = aij->cstart, cend = aij->cend,row,col;
59   int        roworiented = aij->roworiented;
60 
61 #if defined(PETSC_BOPT_g)
62   if (aij->insertmode != NOT_SET_VALUES && aij->insertmode != addv) {
63     SETERRQ(1,"MatSetValues_MPIAIJ:Cannot mix inserts and adds");
64   }
65 #endif
66   aij->insertmode = addv;
67   for ( i=0; i<m; i++ ) {
68 #if defined(PETSC_BOPT_g)
69     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIAIJ:Negative row");
70     if (im[i] >= aij->M) SETERRQ(1,"MatSetValues_MPIAIJ:Row too large");
71 #endif
72     if (im[i] >= rstart && im[i] < rend) {
73       row = im[i] - rstart;
74       for ( j=0; j<n; j++ ) {
75         if (in[j] >= cstart && in[j] < cend){
76           col = in[j] - cstart;
77           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
78           ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
79         }
80 #if defined(PETSC_BOPT_g)
81         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIAIJ:Negative column");}
82         else if (in[j] >= aij->N) {SETERRQ(1,"MatSetValues_MPIAIJ:Col too large");}
83 #endif
84         else {
85           if (mat->was_assembled) {
86             if (!aij->colmap) {
87               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
88             }
89             col = aij->colmap[in[j]] - 1;
90             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
91               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
92               col =  in[j];
93             }
94           }
95           else col = in[j];
96           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
97           ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
98         }
99       }
100     }
101     else {
102       if (roworiented && !aij->donotstash) {
103         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
104       }
105       else {
106         if (!aij->donotstash) {
107           row = im[i];
108           for ( j=0; j<n; j++ ) {
109             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
110           }
111         }
112       }
113     }
114   }
115   return 0;
116 }
117 
118 static int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
119 {
120   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
121   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
122   int        cstart = aij->cstart, cend = aij->cend,row,col;
123 
124   for ( i=0; i<m; i++ ) {
125     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative row");
126     if (idxm[i] >= aij->M) SETERRQ(1,"MatGetValues_MPIAIJ:Row too large");
127     if (idxm[i] >= rstart && idxm[i] < rend) {
128       row = idxm[i] - rstart;
129       for ( j=0; j<n; j++ ) {
130         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIAIJ:Negative column");
131         if (idxn[j] >= aij->N) SETERRQ(1,"MatGetValues_MPIAIJ:Col too large");
132         if (idxn[j] >= cstart && idxn[j] < cend){
133           col = idxn[j] - cstart;
134           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
135         }
136         else {
137           if (!aij->colmap) {
138             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
139           }
140           col = aij->colmap[idxn[j]] - 1;
141           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
142           else {
143             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
144           }
145         }
146       }
147     }
148     else {
149       SETERRQ(1,"MatGetValues_MPIAIJ:Only local values currently supported");
150     }
151   }
152   return 0;
153 }
154 
155 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
156 {
157   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
158   MPI_Comm    comm = mat->comm;
159   int         size = aij->size, *owners = aij->rowners;
160   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
161   MPI_Request *send_waits,*recv_waits;
162   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
163   InsertMode  addv;
164   Scalar      *rvalues,*svalues;
165 
166   /* make sure all processors are either in INSERTMODE or ADDMODE */
167   MPI_Allreduce(&aij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
168   if (addv == (ADD_VALUES|INSERT_VALUES)) {
169     SETERRQ(1,"MatAssemblyBegin_MPIAIJ:Some processors inserted others added");
170   }
171   aij->insertmode = addv; /* in case this processor had no cache */
172 
173   /*  first count number of contributors to each processor */
174   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
175   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
176   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
177   for ( i=0; i<aij->stash.n; i++ ) {
178     idx = aij->stash.idx[i];
179     for ( j=0; j<size; j++ ) {
180       if (idx >= owners[j] && idx < owners[j+1]) {
181         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
182       }
183     }
184   }
185   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
186 
187   /* inform other processors of number of messages and max length*/
188   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
189   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
190   nreceives = work[rank];
191   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
192   nmax = work[rank];
193   PetscFree(work);
194 
195   /* post receives:
196        1) each message will consist of ordered pairs
197      (global index,value) we store the global index as a double
198      to simplify the message passing.
199        2) since we don't know how long each individual message is we
200      allocate the largest needed buffer for each receive. Potentially
201      this is a lot of wasted space.
202 
203 
204        This could be done better.
205   */
206   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
207   CHKPTRQ(rvalues);
208   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
209   CHKPTRQ(recv_waits);
210   for ( i=0; i<nreceives; i++ ) {
211     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
212               comm,recv_waits+i);
213   }
214 
215   /* do sends:
216       1) starts[i] gives the starting index in svalues for stuff going to
217          the ith processor
218   */
219   svalues = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
220   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
221   CHKPTRQ(send_waits);
222   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
223   starts[0] = 0;
224   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
225   for ( i=0; i<aij->stash.n; i++ ) {
226     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
227     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
228     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
229   }
230   PetscFree(owner);
231   starts[0] = 0;
232   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
233   count = 0;
234   for ( i=0; i<size; i++ ) {
235     if (procs[i]) {
236       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
237                 comm,send_waits+count++);
238     }
239   }
240   PetscFree(starts); PetscFree(nprocs);
241 
242   /* Free cache space */
243   PLogInfo(0,"[%d]MatAssemblyBegin_MPIAIJ:Number of off processor values %d\n",rank,aij->stash.n);
244   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
245 
246   aij->svalues    = svalues;    aij->rvalues    = rvalues;
247   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
248   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
249   aij->rmax       = nmax;
250 
251   return 0;
252 }
253 extern int MatSetUpMultiply_MPIAIJ(Mat);
254 
255 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
256 {
257   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
258   MPI_Status  *send_status,recv_status;
259   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
260   int         row,col,other_disassembled;
261   Scalar      *values,val;
262   InsertMode  addv = aij->insertmode;
263 
264   /*  wait on receives */
265   while (count) {
266     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
267     /* unpack receives into our local space */
268     values = aij->rvalues + 3*imdex*aij->rmax;
269     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
270     n = n/3;
271     for ( i=0; i<n; i++ ) {
272       row = (int) PetscReal(values[3*i]) - aij->rstart;
273       col = (int) PetscReal(values[3*i+1]);
274       val = values[3*i+2];
275       if (col >= aij->cstart && col < aij->cend) {
276         col -= aij->cstart;
277         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
278       }
279       else {
280         if (mat->was_assembled || mat->assembled) {
281           if (!aij->colmap) {
282             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
283           }
284           col = aij->colmap[col] - 1;
285           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
286             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
287             col = (int) PetscReal(values[3*i+1]);
288           }
289         }
290         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
291       }
292     }
293     count--;
294   }
295   PetscFree(aij->recv_waits); PetscFree(aij->rvalues);
296 
297   /* wait on sends */
298   if (aij->nsends) {
299     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
300     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
301     PetscFree(send_status);
302   }
303   PetscFree(aij->send_waits); PetscFree(aij->svalues);
304 
305   aij->insertmode = NOT_SET_VALUES;
306   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
307   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
308 
309   /* determine if any processor has disassembled, if so we must
310      also disassemble ourselfs, in order that we may reassemble. */
311   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
312   if (mat->was_assembled && !other_disassembled) {
313     ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
314   }
315 
316   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
317     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
318   }
319   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
320   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
321 
322   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
323   return 0;
324 }
325 
326 static int MatZeroEntries_MPIAIJ(Mat A)
327 {
328   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
329   int        ierr;
330   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
331   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
332   return 0;
333 }
334 
335 /* the code does not do the diagonal entries correctly unless the
336    matrix is square and the column and row owerships are identical.
337    This is a BUG. The only way to fix it seems to be to access
338    aij->A and aij->B directly and not through the MatZeroRows()
339    routine.
340 */
341 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
342 {
343   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
344   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
345   int            *procs,*nprocs,j,found,idx,nsends,*work;
346   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
347   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
348   int            *lens,imdex,*lrows,*values;
349   MPI_Comm       comm = A->comm;
350   MPI_Request    *send_waits,*recv_waits;
351   MPI_Status     recv_status,*send_status;
352   IS             istmp;
353 
354   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
355   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
356 
357   /*  first count number of contributors to each processor */
358   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
359   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
360   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
361   for ( i=0; i<N; i++ ) {
362     idx = rows[i];
363     found = 0;
364     for ( j=0; j<size; j++ ) {
365       if (idx >= owners[j] && idx < owners[j+1]) {
366         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
367       }
368     }
369     if (!found) SETERRQ(1,"MatZeroRows_MPIAIJ:Index out of range");
370   }
371   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
372 
373   /* inform other processors of number of messages and max length*/
374   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
375   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
376   nrecvs = work[rank];
377   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
378   nmax = work[rank];
379   PetscFree(work);
380 
381   /* post receives:   */
382   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
383   CHKPTRQ(rvalues);
384   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
385   CHKPTRQ(recv_waits);
386   for ( i=0; i<nrecvs; i++ ) {
387     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
388   }
389 
390   /* do sends:
391       1) starts[i] gives the starting index in svalues for stuff going to
392          the ith processor
393   */
394   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
395   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
396   CHKPTRQ(send_waits);
397   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
398   starts[0] = 0;
399   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
400   for ( i=0; i<N; i++ ) {
401     svalues[starts[owner[i]]++] = rows[i];
402   }
403   ISRestoreIndices(is,&rows);
404 
405   starts[0] = 0;
406   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
407   count = 0;
408   for ( i=0; i<size; i++ ) {
409     if (procs[i]) {
410       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
411     }
412   }
413   PetscFree(starts);
414 
415   base = owners[rank];
416 
417   /*  wait on receives */
418   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
419   source = lens + nrecvs;
420   count  = nrecvs; slen = 0;
421   while (count) {
422     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
423     /* unpack receives into our local space */
424     MPI_Get_count(&recv_status,MPI_INT,&n);
425     source[imdex]  = recv_status.MPI_SOURCE;
426     lens[imdex]  = n;
427     slen += n;
428     count--;
429   }
430   PetscFree(recv_waits);
431 
432   /* move the data into the send scatter */
433   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
434   count = 0;
435   for ( i=0; i<nrecvs; i++ ) {
436     values = rvalues + i*nmax;
437     for ( j=0; j<lens[i]; j++ ) {
438       lrows[count++] = values[j] - base;
439     }
440   }
441   PetscFree(rvalues); PetscFree(lens);
442   PetscFree(owner); PetscFree(nprocs);
443 
444   /* actually zap the local rows */
445   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
446   PLogObjectParent(A,istmp);
447   PetscFree(lrows);
448   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
449   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
450   ierr = ISDestroy(istmp); CHKERRQ(ierr);
451 
452   /* wait on sends */
453   if (nsends) {
454     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
455     CHKPTRQ(send_status);
456     MPI_Waitall(nsends,send_waits,send_status);
457     PetscFree(send_status);
458   }
459   PetscFree(send_waits); PetscFree(svalues);
460 
461   return 0;
462 }
463 
464 static int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
465 {
466   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
467   int        ierr,nt;
468 
469   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
470   if (nt != a->n) {
471     SETERRQ(1,"MatMult_MPIAIJ:Incompatible parition of A and xx");
472   }
473   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
474   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
475   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx); CHKERRQ(ierr);
476   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
477   return 0;
478 }
479 
480 static int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
481 {
482   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
483   int        ierr;
484   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
485   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
486   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
487   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
488   return 0;
489 }
490 
491 static int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
492 {
493   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
494   int        ierr;
495 
496   /* do nondiagonal part */
497   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
498   /* send it on its way */
499   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
500   /* do local part */
501   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
502   /* receive remote parts: note this assumes the values are not actually */
503   /* inserted in yy until the next line, which is true for my implementation*/
504   /* but is not perhaps always true. */
505   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
506   return 0;
507 }
508 
509 static int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
510 {
511   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
512   int        ierr;
513 
514   /* do nondiagonal part */
515   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
516   /* send it on its way */
517   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
518   /* do local part */
519   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
520   /* receive remote parts: note this assumes the values are not actually */
521   /* inserted in yy until the next line, which is true for my implementation*/
522   /* but is not perhaps always true. */
523   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
524   return 0;
525 }
526 
527 /*
528   This only works correctly for square matrices where the subblock A->A is the
529    diagonal block
530 */
531 static int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
532 {
533   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
534   if (a->M != a->N)
535     SETERRQ(1,"MatGetDiagonal_MPIAIJ:Supports only square matrix where A->A is diag block");
536   if (a->rstart != a->cstart || a->rend != a->cend) {
537     SETERRQ(1,"MatGetDiagonal_MPIAIJ:row partition must equal col partition");  }
538   return MatGetDiagonal(a->A,v);
539 }
540 
541 static int MatScale_MPIAIJ(Scalar *aa,Mat A)
542 {
543   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
544   int        ierr;
545   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
546   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
547   return 0;
548 }
549 
550 static int MatDestroy_MPIAIJ(PetscObject obj)
551 {
552   Mat        mat = (Mat) obj;
553   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
554   int        ierr;
555 #if defined(PETSC_LOG)
556   PLogObjectState(obj,"Rows=%d, Cols=%d",aij->M,aij->N);
557 #endif
558   PetscFree(aij->rowners);
559   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
560   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
561   if (aij->colmap) PetscFree(aij->colmap);
562   if (aij->garray) PetscFree(aij->garray);
563   if (aij->lvec)   VecDestroy(aij->lvec);
564   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
565   if (aij->rowvalues) PetscFree(aij->rowvalues);
566   PetscFree(aij);
567   if (mat->mapping) {
568     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
569   }
570   PLogObjectDestroy(mat);
571   PetscHeaderDestroy(mat);
572   return 0;
573 }
574 #include "draw.h"
575 #include "pinclude/pviewer.h"
576 
577 static int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
578 {
579   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
580   int         ierr;
581 
582   if (aij->size == 1) {
583     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
584   }
585   else SETERRQ(1,"MatView_MPIAIJ_Binary:Only uniprocessor output supported");
586   return 0;
587 }
588 
589 static int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
590 {
591   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
592   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
593   int         ierr, format,shift = C->indexshift,rank;
594   FILE        *fd;
595   ViewerType  vtype;
596 
597   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
598   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
599     ierr = ViewerGetFormat(viewer,&format);
600     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
601       MatInfo info;
602       int     flg;
603       MPI_Comm_rank(mat->comm,&rank);
604       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
605       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
606       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
607       PetscSequentialPhaseBegin(mat->comm,1);
608       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
609          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
610       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
611          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
612       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
613       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
614       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
615       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
616       fflush(fd);
617       PetscSequentialPhaseEnd(mat->comm,1);
618       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
619       return 0;
620     }
621     else if (format == VIEWER_FORMAT_ASCII_INFO) {
622       return 0;
623     }
624   }
625 
626   if (vtype == DRAW_VIEWER) {
627     Draw       draw;
628     PetscTruth isnull;
629     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
630     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
631   }
632 
633   if (vtype == ASCII_FILE_VIEWER) {
634     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
635     PetscSequentialPhaseBegin(mat->comm,1);
636     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
637            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
638            aij->cend);
639     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
640     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
641     fflush(fd);
642     PetscSequentialPhaseEnd(mat->comm,1);
643   }
644   else {
645     int size = aij->size;
646     rank = aij->rank;
647     if (size == 1) {
648       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
649     }
650     else {
651       /* assemble the entire matrix onto first processor. */
652       Mat         A;
653       Mat_SeqAIJ *Aloc;
654       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
655       Scalar      *a;
656 
657       if (!rank) {
658         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
659                CHKERRQ(ierr);
660       }
661       else {
662         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
663                CHKERRQ(ierr);
664       }
665       PLogObjectParent(mat,A);
666 
667       /* copy over the A part */
668       Aloc = (Mat_SeqAIJ*) aij->A->data;
669       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
670       row = aij->rstart;
671       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
672       for ( i=0; i<m; i++ ) {
673         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
674         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
675       }
676       aj = Aloc->j;
677       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
678 
679       /* copy over the B part */
680       Aloc = (Mat_SeqAIJ*) aij->B->data;
681       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
682       row = aij->rstart;
683       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
684       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
685       for ( i=0; i<m; i++ ) {
686         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
687         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
688       }
689       PetscFree(ct);
690       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
691       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
692       if (!rank) {
693         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
694       }
695       ierr = MatDestroy(A); CHKERRQ(ierr);
696     }
697   }
698   return 0;
699 }
700 
701 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
702 {
703   Mat         mat = (Mat) obj;
704   int         ierr;
705   ViewerType  vtype;
706 
707   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
708   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
709       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
710     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
711   }
712   else if (vtype == BINARY_FILE_VIEWER) {
713     return MatView_MPIAIJ_Binary(mat,viewer);
714   }
715   return 0;
716 }
717 
718 /*
719     This has to provide several versions.
720 
721      2) a) use only local smoothing updating outer values only once.
722         b) local smoothing updating outer values each inner iteration
723      3) color updating out values betwen colors.
724 */
725 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
726                            double fshift,int its,Vec xx)
727 {
728   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
729   Mat        AA = mat->A, BB = mat->B;
730   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
731   Scalar     *b,*x,*xs,*ls,d,*v,sum;
732   int        ierr,*idx, *diag;
733   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
734 
735   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
736   xs = x + shift; /* shift by one for index start of 1 */
737   ls = ls + shift;
738   if (!A->diag) {if ((ierr = MatMarkDiag_SeqAIJ(AA))) return ierr;}
739   diag = A->diag;
740   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
741     if (flag & SOR_ZERO_INITIAL_GUESS) {
742       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
743     }
744     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
745     CHKERRQ(ierr);
746     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
747     CHKERRQ(ierr);
748     while (its--) {
749       /* go down through the rows */
750       for ( i=0; i<m; i++ ) {
751         n    = A->i[i+1] - A->i[i];
752         idx  = A->j + A->i[i] + shift;
753         v    = A->a + A->i[i] + shift;
754         sum  = b[i];
755         SPARSEDENSEMDOT(sum,xs,v,idx,n);
756         d    = fshift + A->a[diag[i]+shift];
757         n    = B->i[i+1] - B->i[i];
758         idx  = B->j + B->i[i] + shift;
759         v    = B->a + B->i[i] + shift;
760         SPARSEDENSEMDOT(sum,ls,v,idx,n);
761         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
762       }
763       /* come up through the rows */
764       for ( i=m-1; i>-1; i-- ) {
765         n    = A->i[i+1] - A->i[i];
766         idx  = A->j + A->i[i] + shift;
767         v    = A->a + A->i[i] + shift;
768         sum  = b[i];
769         SPARSEDENSEMDOT(sum,xs,v,idx,n);
770         d    = fshift + A->a[diag[i]+shift];
771         n    = B->i[i+1] - B->i[i];
772         idx  = B->j + B->i[i] + shift;
773         v    = B->a + B->i[i] + shift;
774         SPARSEDENSEMDOT(sum,ls,v,idx,n);
775         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
776       }
777     }
778   }
779   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
780     if (flag & SOR_ZERO_INITIAL_GUESS) {
781       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
782     }
783     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
784     CHKERRQ(ierr);
785     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,mat->Mvctx);
786     CHKERRQ(ierr);
787     while (its--) {
788       for ( i=0; i<m; i++ ) {
789         n    = A->i[i+1] - A->i[i];
790         idx  = A->j + A->i[i] + shift;
791         v    = A->a + A->i[i] + shift;
792         sum  = b[i];
793         SPARSEDENSEMDOT(sum,xs,v,idx,n);
794         d    = fshift + A->a[diag[i]+shift];
795         n    = B->i[i+1] - B->i[i];
796         idx  = B->j + B->i[i] + shift;
797         v    = B->a + B->i[i] + shift;
798         SPARSEDENSEMDOT(sum,ls,v,idx,n);
799         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
800       }
801     }
802   }
803   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
804     if (flag & SOR_ZERO_INITIAL_GUESS) {
805       return (*mat->A->ops.relax)(mat->A,bb,omega,flag,fshift,its,xx);
806     }
807     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
808                             mat->Mvctx); CHKERRQ(ierr);
809     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_ALL,
810                             mat->Mvctx); CHKERRQ(ierr);
811     while (its--) {
812       for ( i=m-1; i>-1; i-- ) {
813         n    = A->i[i+1] - A->i[i];
814         idx  = A->j + A->i[i] + shift;
815         v    = A->a + A->i[i] + shift;
816         sum  = b[i];
817         SPARSEDENSEMDOT(sum,xs,v,idx,n);
818         d    = fshift + A->a[diag[i]+shift];
819         n    = B->i[i+1] - B->i[i];
820         idx  = B->j + B->i[i] + shift;
821         v    = B->a + B->i[i] + shift;
822         SPARSEDENSEMDOT(sum,ls,v,idx,n);
823         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
824       }
825     }
826   }
827   else {
828     SETERRQ(1,"MatRelax_MPIAIJ:Option not supported");
829   }
830   return 0;
831 }
832 
833 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
834 {
835   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
836   Mat        A = mat->A, B = mat->B;
837   int        ierr;
838   double     isend[5], irecv[5];
839 
840   info->rows_global    = (double)mat->M;
841   info->columns_global = (double)mat->N;
842   info->rows_local     = (double)mat->m;
843   info->columns_local  = (double)mat->N;
844   info->block_size     = 1.0;
845   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
846   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
847   isend[3] = info->memory;  isend[4] = info->mallocs;
848   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
849   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
850   isend[3] += info->memory;  isend[4] += info->mallocs;
851   if (flag == MAT_LOCAL) {
852     info->nz_used      = isend[0];
853     info->nz_allocated = isend[1];
854     info->nz_unneeded  = isend[2];
855     info->memory       = isend[3];
856     info->mallocs      = isend[4];
857   } else if (flag == MAT_GLOBAL_MAX) {
858     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);
859     info->nz_used      = irecv[0];
860     info->nz_allocated = irecv[1];
861     info->nz_unneeded  = irecv[2];
862     info->memory       = irecv[3];
863     info->mallocs      = irecv[4];
864   } else if (flag == MAT_GLOBAL_SUM) {
865     MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);
866     info->nz_used      = irecv[0];
867     info->nz_allocated = irecv[1];
868     info->nz_unneeded  = irecv[2];
869     info->memory       = irecv[3];
870     info->mallocs      = irecv[4];
871   }
872   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
873   info->fill_ratio_needed = 0;
874   info->factor_mallocs    = 0;
875 
876   return 0;
877 }
878 
879 extern int MatLUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,Mat*);
880 extern int MatLUFactorNumeric_MPIAIJ(Mat,Mat*);
881 extern int MatLUFactor_MPIAIJ(Mat,IS,IS,double);
882 extern int MatILUFactorSymbolic_MPIAIJ(Mat,IS,IS,double,int,Mat *);
883 extern int MatSolve_MPIAIJ(Mat,Vec,Vec);
884 extern int MatSolveAdd_MPIAIJ(Mat,Vec,Vec,Vec);
885 extern int MatSolveTrans_MPIAIJ(Mat,Vec,Vec);
886 extern int MatSolveTransAdd_MPIAIJ(Mat,Vec,Vec,Vec);
887 
888 static int MatSetOption_MPIAIJ(Mat A,MatOption op)
889 {
890   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
891 
892   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
893       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
894       op == MAT_COLUMNS_SORTED ||
895       op == MAT_ROW_ORIENTED) {
896         MatSetOption(a->A,op);
897         MatSetOption(a->B,op);
898   }
899   else if (op == MAT_ROWS_SORTED ||
900            op == MAT_SYMMETRIC ||
901            op == MAT_STRUCTURALLY_SYMMETRIC ||
902            op == MAT_YES_NEW_DIAGONALS)
903     PLogInfo(A,"Info:MatSetOption_MPIAIJ:Option ignored\n");
904   else if (op == MAT_COLUMN_ORIENTED) {
905     a->roworiented = 0;
906     MatSetOption(a->A,op);
907     MatSetOption(a->B,op);
908   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
909     a->donotstash = 1;
910   } else if (op == MAT_NO_NEW_DIAGONALS)
911     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:MAT_NO_NEW_DIAGONALS");}
912   else
913     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIAIJ:unknown option");}
914   return 0;
915 }
916 
917 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
918 {
919   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
920   *m = mat->M; *n = mat->N;
921   return 0;
922 }
923 
924 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
925 {
926   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
927   *m = mat->m; *n = mat->N;
928   return 0;
929 }
930 
931 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
932 {
933   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
934   *m = mat->rstart; *n = mat->rend;
935   return 0;
936 }
937 
938 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
939 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
940 
941 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
942 {
943   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
944   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
945   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
946   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
947   int        *cmap, *idx_p;
948 
949   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIAIJ:Already active");
950   mat->getrowactive = PETSC_TRUE;
951 
952   if (!mat->rowvalues && (idx || v)) {
953     /*
954         allocate enough space to hold information from the longest row.
955     */
956     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
957     int     max = 1,m = mat->m,tmp;
958     for ( i=0; i<m; i++ ) {
959       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
960       if (max < tmp) { max = tmp; }
961     }
962     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
963     CHKPTRQ(mat->rowvalues);
964     mat->rowindices = (int *) (mat->rowvalues + max);
965   }
966 
967   if (row < rstart || row >= rend) SETERRQ(1,"MatGetRow_MPIAIJ:Only local rows")
968   lrow = row - rstart;
969 
970   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
971   if (!v)   {pvA = 0; pvB = 0;}
972   if (!idx) {pcA = 0; if (!v) pcB = 0;}
973   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
974   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
975   nztot = nzA + nzB;
976 
977   cmap  = mat->garray;
978   if (v  || idx) {
979     if (nztot) {
980       /* Sort by increasing column numbers, assuming A and B already sorted */
981       int imark = -1;
982       if (v) {
983         *v = v_p = mat->rowvalues;
984         for ( i=0; i<nzB; i++ ) {
985           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
986           else break;
987         }
988         imark = i;
989         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
990         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
991       }
992       if (idx) {
993         *idx = idx_p = mat->rowindices;
994         if (imark > -1) {
995           for ( i=0; i<imark; i++ ) {
996             idx_p[i] = cmap[cworkB[i]];
997           }
998         } else {
999           for ( i=0; i<nzB; i++ ) {
1000             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1001             else break;
1002           }
1003           imark = i;
1004         }
1005         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1006         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1007       }
1008     }
1009     else {
1010       if (idx) *idx = 0;
1011       if (v)   *v   = 0;
1012     }
1013   }
1014   *nz = nztot;
1015   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1016   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1017   return 0;
1018 }
1019 
1020 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1021 {
1022   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1023   if (aij->getrowactive == PETSC_FALSE) {
1024     SETERRQ(1,"MatRestoreRow_MPIAIJ:MatGetRow not called");
1025   }
1026   aij->getrowactive = PETSC_FALSE;
1027   return 0;
1028 }
1029 
1030 static int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1031 {
1032   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1033   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1034   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1035   double     sum = 0.0;
1036   Scalar     *v;
1037 
1038   if (aij->size == 1) {
1039     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1040   } else {
1041     if (type == NORM_FROBENIUS) {
1042       v = amat->a;
1043       for (i=0; i<amat->nz; i++ ) {
1044 #if defined(PETSC_COMPLEX)
1045         sum += real(conj(*v)*(*v)); v++;
1046 #else
1047         sum += (*v)*(*v); v++;
1048 #endif
1049       }
1050       v = bmat->a;
1051       for (i=0; i<bmat->nz; i++ ) {
1052 #if defined(PETSC_COMPLEX)
1053         sum += real(conj(*v)*(*v)); v++;
1054 #else
1055         sum += (*v)*(*v); v++;
1056 #endif
1057       }
1058       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
1059       *norm = sqrt(*norm);
1060     }
1061     else if (type == NORM_1) { /* max column norm */
1062       double *tmp, *tmp2;
1063       int    *jj, *garray = aij->garray;
1064       tmp  = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp);
1065       tmp2 = (double *) PetscMalloc( aij->N*sizeof(double) ); CHKPTRQ(tmp2);
1066       PetscMemzero(tmp,aij->N*sizeof(double));
1067       *norm = 0.0;
1068       v = amat->a; jj = amat->j;
1069       for ( j=0; j<amat->nz; j++ ) {
1070         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1071       }
1072       v = bmat->a; jj = bmat->j;
1073       for ( j=0; j<bmat->nz; j++ ) {
1074         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1075       }
1076       MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);
1077       for ( j=0; j<aij->N; j++ ) {
1078         if (tmp2[j] > *norm) *norm = tmp2[j];
1079       }
1080       PetscFree(tmp); PetscFree(tmp2);
1081     }
1082     else if (type == NORM_INFINITY) { /* max row norm */
1083       double ntemp = 0.0;
1084       for ( j=0; j<amat->m; j++ ) {
1085         v = amat->a + amat->i[j] + shift;
1086         sum = 0.0;
1087         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1088           sum += PetscAbsScalar(*v); v++;
1089         }
1090         v = bmat->a + bmat->i[j] + shift;
1091         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1092           sum += PetscAbsScalar(*v); v++;
1093         }
1094         if (sum > ntemp) ntemp = sum;
1095       }
1096       MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);
1097     }
1098     else {
1099       SETERRQ(1,"MatNorm_MPIAIJ:No support for two norm");
1100     }
1101   }
1102   return 0;
1103 }
1104 
1105 static int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1106 {
1107   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1108   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1109   int        ierr,shift = Aloc->indexshift;
1110   Mat        B;
1111   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1112   Scalar     *array;
1113 
1114   if (matout == PETSC_NULL && M != N)
1115     SETERRQ(1,"MatTranspose_MPIAIJ:Square matrix only for in-place");
1116   ierr = MatCreateMPIAIJ(A->comm,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,
1117          PETSC_NULL,&B); CHKERRQ(ierr);
1118 
1119   /* copy over the A part */
1120   Aloc = (Mat_SeqAIJ*) a->A->data;
1121   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1122   row = a->rstart;
1123   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1124   for ( i=0; i<m; i++ ) {
1125     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1126     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1127   }
1128   aj = Aloc->j;
1129   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1130 
1131   /* copy over the B part */
1132   Aloc = (Mat_SeqAIJ*) a->B->data;
1133   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1134   row = a->rstart;
1135   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1136   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1137   for ( i=0; i<m; i++ ) {
1138     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1139     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1140   }
1141   PetscFree(ct);
1142   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1143   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1144   if (matout != PETSC_NULL) {
1145     *matout = B;
1146   } else {
1147     /* This isn't really an in-place transpose .... but free data structures from a */
1148     PetscFree(a->rowners);
1149     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1150     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1151     if (a->colmap) PetscFree(a->colmap);
1152     if (a->garray) PetscFree(a->garray);
1153     if (a->lvec) VecDestroy(a->lvec);
1154     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1155     PetscFree(a);
1156     PetscMemcpy(A,B,sizeof(struct _Mat));
1157     PetscHeaderDestroy(B);
1158   }
1159   return 0;
1160 }
1161 
1162 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1163 {
1164   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1165   Mat a = aij->A, b = aij->B;
1166   int ierr,s1,s2,s3;
1167 
1168   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1169   if (rr) {
1170     s3 = aij->n;
1171     VecGetLocalSize_Fast(rr,s1);
1172     if (s1!=s3) SETERRQ(1,"MatDiagonalScale: right vector non-conforming local size");
1173     /* Overlap communication with computation. */
1174     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_ALL,aij->Mvctx); CHKERRQ(ierr);
1175   }
1176   if (ll) {
1177     VecGetLocalSize_Fast(ll,s1);
1178     if (s1!=s2) SETERRQ(1,"MatDiagonalScale: left vector non-conforming local size");
1179     ierr = (*b->ops.diagonalscale)(b,ll,0); CHKERRQ(ierr);
1180   }
1181   /* scale  the diagonal block */
1182   ierr = (*a->ops.diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1183 
1184   if (rr) {
1185     /* Do a scatter end and then right scale the off-diagonal block */
1186     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_ALL,aij->Mvctx); CHKERRQ(ierr);
1187     ierr = (*b->ops.diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1188   }
1189 
1190   return 0;
1191 }
1192 
1193 
1194 extern int MatPrintHelp_SeqAIJ(Mat);
1195 static int MatPrintHelp_MPIAIJ(Mat A)
1196 {
1197   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1198 
1199   if (!a->rank) return MatPrintHelp_SeqAIJ(a->A);
1200   else return 0;
1201 }
1202 
1203 static int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1204 {
1205   *bs = 1;
1206   return 0;
1207 }
1208 
1209 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1210 static int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1211 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1212 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1213 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1214 /* -------------------------------------------------------------------*/
1215 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1216        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1217        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1218        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1219        0,0,
1220        0,0,
1221        0,0,
1222        MatRelax_MPIAIJ,
1223        MatTranspose_MPIAIJ,
1224        MatGetInfo_MPIAIJ,0,
1225        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1226        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1227        0,
1228        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1229        0,0,0,0,
1230        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1231        0,0,
1232        0,0,MatConvert_MPIAIJ,MatConvertSameType_MPIAIJ,0,0,
1233        0,0,0,
1234        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1235        MatPrintHelp_MPIAIJ,
1236        MatScale_MPIAIJ,0,0,0,
1237        MatGetBlockSize_MPIAIJ,0,0,0,0,
1238        MatFDColoringCreate_MPIAIJ};
1239 
1240 
1241 /*@C
1242    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1243    (the default parallel PETSc format).  For good matrix assembly performance
1244    the user should preallocate the matrix storage by setting the parameters
1245    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1246    performance can be increased by more than a factor of 50.
1247 
1248    Input Parameters:
1249 .  comm - MPI communicator
1250 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1251            This value should be the same as the local size used in creating the
1252            y vector for the matrix-vector product y = Ax.
1253 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1254            This value should be the same as the local size used in creating the
1255            x vector for the matrix-vector product y = Ax.
1256 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1257 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1258 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1259            (same for all local rows)
1260 .  d_nzz - array containing the number of nonzeros in the various rows of the
1261            diagonal portion of the local submatrix (possibly different for each row)
1262            or PETSC_NULL. You must leave room for the diagonal entry even if
1263            it is zero.
1264 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1265            submatrix (same for all local rows).
1266 .  o_nzz - array containing the number of nonzeros in the various rows of the
1267            off-diagonal portion of the local submatrix (possibly different for
1268            each row) or PETSC_NULL.
1269 
1270    Output Parameter:
1271 .  A - the matrix
1272 
1273    Notes:
1274    The AIJ format (also called the Yale sparse matrix format or
1275    compressed row storage), is fully compatible with standard Fortran 77
1276    storage.  That is, the stored row and column indices can begin at
1277    either one (as in Fortran) or zero.  See the users manual for details.
1278 
1279    The user MUST specify either the local or global matrix dimensions
1280    (possibly both).
1281 
1282    By default, this format uses inodes (identical nodes) when possible.
1283    We search for consecutive rows with the same nonzero structure, thereby
1284    reusing matrix information to achieve increased efficiency.
1285 
1286    Options Database Keys:
1287 $    -mat_aij_no_inode  - Do not use inodes
1288 $    -mat_aij_inode_limit <limit> - Set inode limit.
1289 $        (max limit=5)
1290 $    -mat_aij_oneindex - Internally use indexing starting at 1
1291 $        rather than 0.  Note: When calling MatSetValues(),
1292 $        the user still MUST index entries starting at 0!
1293 
1294    Storage Information:
1295    For a square global matrix we define each processor's diagonal portion
1296    to be its local rows and the corresponding columns (a square submatrix);
1297    each processor's off-diagonal portion encompasses the remainder of the
1298    local matrix (a rectangular submatrix).
1299 
1300    The user can specify preallocated storage for the diagonal part of
1301    the local submatrix with either d_nz or d_nnz (not both).  Set
1302    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1303    memory allocation.  Likewise, specify preallocated storage for the
1304    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1305 
1306    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1307    the figure below we depict these three local rows and all columns (0-11).
1308 
1309 $          0 1 2 3 4 5 6 7 8 9 10 11
1310 $         -------------------
1311 $  row 3  |  o o o d d d o o o o o o
1312 $  row 4  |  o o o d d d o o o o o o
1313 $  row 5  |  o o o d d d o o o o o o
1314 $         -------------------
1315 $
1316 
1317    Thus, any entries in the d locations are stored in the d (diagonal)
1318    submatrix, and any entries in the o locations are stored in the
1319    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1320    stored simply in the MATSEQAIJ format for compressed row storage.
1321 
1322    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1323    and o_nz should indicate the number of nonzeros per row in the o matrix.
1324    In general, for PDE problems in which most nonzeros are near the diagonal,
1325    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1326    or you will get TERRIBLE performance; see the users' manual chapter on
1327    matrices and the file $(PETSC_DIR)/Performance.
1328 
1329 .keywords: matrix, aij, compressed row, sparse, parallel
1330 
1331 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1332 @*/
1333 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1334                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1335 {
1336   Mat          B;
1337   Mat_MPIAIJ   *b;
1338   int          ierr, i,sum[2],work[2],size;
1339 
1340   *A = 0;
1341   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1342   PLogObjectCreate(B);
1343   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1344   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1345   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1346   MPI_Comm_size(comm,&size);
1347   if (size == 1) {
1348     B->ops.getrowij          = MatGetRowIJ_MPIAIJ;
1349     B->ops.restorerowij      = MatRestoreRowIJ_MPIAIJ;
1350     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIAIJ;
1351     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIAIJ;
1352     B->ops.lufactor          = MatLUFactor_MPIAIJ;
1353     B->ops.solve             = MatSolve_MPIAIJ;
1354     B->ops.solveadd          = MatSolveAdd_MPIAIJ;
1355     B->ops.solvetrans        = MatSolveTrans_MPIAIJ;
1356     B->ops.solvetransadd     = MatSolveTransAdd_MPIAIJ;
1357     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIAIJ;
1358   }
1359   B->destroy    = MatDestroy_MPIAIJ;
1360   B->view       = MatView_MPIAIJ;
1361   B->factor     = 0;
1362   B->assembled  = PETSC_FALSE;
1363   B->mapping    = 0;
1364 
1365   b->insertmode = NOT_SET_VALUES;
1366   MPI_Comm_rank(comm,&b->rank);
1367   MPI_Comm_size(comm,&b->size);
1368 
1369   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1370     SETERRQ(1,"MatCreateMPIAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1371 
1372   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1373     work[0] = m; work[1] = n;
1374     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1375     if (M == PETSC_DECIDE) M = sum[0];
1376     if (N == PETSC_DECIDE) N = sum[1];
1377   }
1378   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1379   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1380   b->m = m; B->m = m;
1381   b->n = n; B->n = n;
1382   b->N = N; B->N = N;
1383   b->M = M; B->M = M;
1384 
1385   /* build local table of row and column ownerships */
1386   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1387   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1388   b->cowners = b->rowners + b->size + 2;
1389   MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1390   b->rowners[0] = 0;
1391   for ( i=2; i<=b->size; i++ ) {
1392     b->rowners[i] += b->rowners[i-1];
1393   }
1394   b->rstart = b->rowners[b->rank];
1395   b->rend   = b->rowners[b->rank+1];
1396   MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1397   b->cowners[0] = 0;
1398   for ( i=2; i<=b->size; i++ ) {
1399     b->cowners[i] += b->cowners[i-1];
1400   }
1401   b->cstart = b->cowners[b->rank];
1402   b->cend   = b->cowners[b->rank+1];
1403 
1404   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1405   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1406   PLogObjectParent(B,b->A);
1407   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1408   ierr = MatCreateSeqAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1409   PLogObjectParent(B,b->B);
1410 
1411   /* build cache for off array entries formed */
1412   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1413   b->donotstash  = 0;
1414   b->colmap      = 0;
1415   b->garray      = 0;
1416   b->roworiented = 1;
1417 
1418   /* stuff used for matrix vector multiply */
1419   b->lvec      = 0;
1420   b->Mvctx     = 0;
1421 
1422   /* stuff for MatGetRow() */
1423   b->rowindices   = 0;
1424   b->rowvalues    = 0;
1425   b->getrowactive = PETSC_FALSE;
1426 
1427   *A = B;
1428   return 0;
1429 }
1430 
1431 static int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1432 {
1433   Mat        mat;
1434   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1435   int        ierr, len=0, flg;
1436 
1437   *newmat       = 0;
1438   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1439   PLogObjectCreate(mat);
1440   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1441   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1442   mat->destroy    = MatDestroy_MPIAIJ;
1443   mat->view       = MatView_MPIAIJ;
1444   mat->factor     = matin->factor;
1445   mat->assembled  = PETSC_TRUE;
1446 
1447   a->m = mat->m   = oldmat->m;
1448   a->n = mat->n   = oldmat->n;
1449   a->M = mat->M   = oldmat->M;
1450   a->N = mat->N   = oldmat->N;
1451 
1452   a->rstart       = oldmat->rstart;
1453   a->rend         = oldmat->rend;
1454   a->cstart       = oldmat->cstart;
1455   a->cend         = oldmat->cend;
1456   a->size         = oldmat->size;
1457   a->rank         = oldmat->rank;
1458   a->insertmode   = NOT_SET_VALUES;
1459   a->rowvalues    = 0;
1460   a->getrowactive = PETSC_FALSE;
1461 
1462   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1463   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIAIJ));
1464   a->cowners = a->rowners + a->size + 2;
1465   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1466   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1467   if (oldmat->colmap) {
1468     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1469     PLogObjectMemory(mat,(a->N)*sizeof(int));
1470     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1471   } else a->colmap = 0;
1472   if (oldmat->garray) {
1473     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1474     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1475     PLogObjectMemory(mat,len*sizeof(int));
1476     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1477   } else a->garray = 0;
1478 
1479   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1480   PLogObjectParent(mat,a->lvec);
1481   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1482   PLogObjectParent(mat,a->Mvctx);
1483   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1484   PLogObjectParent(mat,a->A);
1485   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1486   PLogObjectParent(mat,a->B);
1487   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1488   if (flg) {
1489     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1490   }
1491   *newmat = mat;
1492   return 0;
1493 }
1494 
1495 #include "sys.h"
1496 
1497 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1498 {
1499   Mat          A;
1500   int          i, nz, ierr, j,rstart, rend, fd;
1501   Scalar       *vals,*svals;
1502   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1503   MPI_Status   status;
1504   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1505   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1506   int          tag = ((PetscObject)viewer)->tag;
1507 
1508   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1509   if (!rank) {
1510     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1511     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1512     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIAIJ:not matrix object");
1513   }
1514 
1515   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1516   M = header[1]; N = header[2];
1517   /* determine ownership of all rows */
1518   m = M/size + ((M % size) > rank);
1519   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1520   MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1521   rowners[0] = 0;
1522   for ( i=2; i<=size; i++ ) {
1523     rowners[i] += rowners[i-1];
1524   }
1525   rstart = rowners[rank];
1526   rend   = rowners[rank+1];
1527 
1528   /* distribute row lengths to all processors */
1529   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1530   offlens = ourlens + (rend-rstart);
1531   if (!rank) {
1532     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1533     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1534     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1535     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1536     MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);
1537     PetscFree(sndcounts);
1538   }
1539   else {
1540     MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);
1541   }
1542 
1543   if (!rank) {
1544     /* calculate the number of nonzeros on each processor */
1545     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1546     PetscMemzero(procsnz,size*sizeof(int));
1547     for ( i=0; i<size; i++ ) {
1548       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1549         procsnz[i] += rowlengths[j];
1550       }
1551     }
1552     PetscFree(rowlengths);
1553 
1554     /* determine max buffer needed and allocate it */
1555     maxnz = 0;
1556     for ( i=0; i<size; i++ ) {
1557       maxnz = PetscMax(maxnz,procsnz[i]);
1558     }
1559     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1560 
1561     /* read in my part of the matrix column indices  */
1562     nz = procsnz[0];
1563     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1564     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1565 
1566     /* read in every one elses and ship off */
1567     for ( i=1; i<size; i++ ) {
1568       nz = procsnz[i];
1569       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1570       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1571     }
1572     PetscFree(cols);
1573   }
1574   else {
1575     /* determine buffer space needed for message */
1576     nz = 0;
1577     for ( i=0; i<m; i++ ) {
1578       nz += ourlens[i];
1579     }
1580     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1581 
1582     /* receive message of column indices*/
1583     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1584     MPI_Get_count(&status,MPI_INT,&maxnz);
1585     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1586   }
1587 
1588   /* loop over local rows, determining number of off diagonal entries */
1589   PetscMemzero(offlens,m*sizeof(int));
1590   jj = 0;
1591   for ( i=0; i<m; i++ ) {
1592     for ( j=0; j<ourlens[i]; j++ ) {
1593       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1594       jj++;
1595     }
1596   }
1597 
1598   /* create our matrix */
1599   for ( i=0; i<m; i++ ) {
1600     ourlens[i] -= offlens[i];
1601   }
1602   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1603   A = *newmat;
1604   MatSetOption(A,MAT_COLUMNS_SORTED);
1605   for ( i=0; i<m; i++ ) {
1606     ourlens[i] += offlens[i];
1607   }
1608 
1609   if (!rank) {
1610     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1611 
1612     /* read in my part of the matrix numerical values  */
1613     nz = procsnz[0];
1614     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1615 
1616     /* insert into matrix */
1617     jj      = rstart;
1618     smycols = mycols;
1619     svals   = vals;
1620     for ( i=0; i<m; i++ ) {
1621       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1622       smycols += ourlens[i];
1623       svals   += ourlens[i];
1624       jj++;
1625     }
1626 
1627     /* read in other processors and ship out */
1628     for ( i=1; i<size; i++ ) {
1629       nz = procsnz[i];
1630       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1631       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1632     }
1633     PetscFree(procsnz);
1634   }
1635   else {
1636     /* receive numeric values */
1637     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1638 
1639     /* receive message of values*/
1640     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1641     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1642     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIAIJ:something is wrong with file");
1643 
1644     /* insert into matrix */
1645     jj      = rstart;
1646     smycols = mycols;
1647     svals   = vals;
1648     for ( i=0; i<m; i++ ) {
1649       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1650       smycols += ourlens[i];
1651       svals   += ourlens[i];
1652       jj++;
1653     }
1654   }
1655   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1656 
1657   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1658   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1659   return 0;
1660 }
1661