xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 44bdcb2d3326b2ce09a36bf9b6840bc8773a5d60)
1 
2 #ifdef PETSC_RCS_HEADER
3 static char vcid[] = "$Id: mpiaij.c,v 1.248 1998/05/29 18:15:58 bsmith Exp balay $";
4 #endif
5 
6 #include "pinclude/pviewer.h"
7 #include "src/mat/impls/aij/mpi/mpiaij.h"
8 #include "src/vec/vecimpl.h"
9 #include "src/inline/spops.h"
10 
11 /* local utility routine that creates a mapping from the global column
12 number to the local number in the off-diagonal part of the local
13 storage of the matrix.  This is done in a non scable way since the
14 length of colmap equals the global matrix length.
15 */
16 #undef __FUNC__
17 #define __FUNC__ "CreateColmap_MPIAIJ_Private"
18 int CreateColmap_MPIAIJ_Private(Mat mat)
19 {
20   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
21   Mat_SeqAIJ *B = (Mat_SeqAIJ*) aij->B->data;
22   int        n = B->n,i;
23 
24   PetscFunctionBegin;
25   aij->colmap = (int *) PetscMalloc((aij->N+1)*sizeof(int));CHKPTRQ(aij->colmap);
26   PLogObjectMemory(mat,aij->N*sizeof(int));
27   PetscMemzero(aij->colmap,aij->N*sizeof(int));
28   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
29   PetscFunctionReturn(0);
30 }
31 
32 extern int DisAssemble_MPIAIJ(Mat);
33 
34 #define CHUNKSIZE   15
35 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
36 { \
37  \
38     rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
39     rmax = aimax[row]; nrow = ailen[row];  \
40     col1 = col - shift; \
41      \
42     low = 0; high = nrow; \
43     while (high-low > 5) { \
44       t = (low+high)/2; \
45       if (rp[t] > col) high = t; \
46       else             low  = t; \
47     } \
48       for ( _i=0; _i<nrow; _i++ ) { \
49         if (rp[_i] > col1) break; \
50         if (rp[_i] == col1) { \
51           if (addv == ADD_VALUES) ap[_i] += value;   \
52           else                  ap[_i] = value; \
53           goto a_noinsert; \
54         } \
55       }  \
56       if (nonew == 1) goto a_noinsert; \
57       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
58       if (nrow >= rmax) { \
59         /* there is no extra room in row, therefore enlarge */ \
60         int    new_nz = ai[a->m] + CHUNKSIZE,len,*new_i,*new_j; \
61         Scalar *new_a; \
62  \
63         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
64  \
65         /* malloc new storage space */ \
66         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(a->m+1)*sizeof(int); \
67         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
68         new_j   = (int *) (new_a + new_nz); \
69         new_i   = new_j + new_nz; \
70  \
71         /* copy over old data into new slots */ \
72         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = ai[ii];} \
73         for ( ii=row+1; ii<a->m+1; ii++ ) {new_i[ii] = ai[ii]+CHUNKSIZE;} \
74         PetscMemcpy(new_j,aj,(ai[row]+nrow+shift)*sizeof(int)); \
75         len = (new_nz - CHUNKSIZE - ai[row] - nrow - shift); \
76         PetscMemcpy(new_j+ai[row]+shift+nrow+CHUNKSIZE,aj+ai[row]+shift+nrow, \
77                                                            len*sizeof(int)); \
78         PetscMemcpy(new_a,aa,(ai[row]+nrow+shift)*sizeof(Scalar)); \
79         PetscMemcpy(new_a+ai[row]+shift+nrow+CHUNKSIZE,aa+ai[row]+shift+nrow, \
80                                                            len*sizeof(Scalar));  \
81         /* free up old matrix storage */ \
82  \
83         PetscFree(a->a);  \
84         if (!a->singlemalloc) {PetscFree(a->i);PetscFree(a->j);} \
85         aa = a->a = new_a; ai = a->i = new_i; aj = a->j = new_j;  \
86         a->singlemalloc = 1; \
87  \
88         rp   = aj + ai[row] + shift; ap = aa + ai[row] + shift; \
89         rmax = aimax[row] = aimax[row] + CHUNKSIZE; \
90         PLogObjectMemory(A,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
91         a->maxnz += CHUNKSIZE; \
92         a->reallocs++; \
93       } \
94       N = nrow++ - 1; a->nz++; \
95       /* shift up all the later entries in this row */ \
96       for ( ii=N; ii>=_i; ii-- ) { \
97         rp[ii+1] = rp[ii]; \
98         ap[ii+1] = ap[ii]; \
99       } \
100       rp[_i] = col1;  \
101       ap[_i] = value;  \
102       a_noinsert: ; \
103       ailen[row] = nrow; \
104 }
105 
106 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
107 { \
108  \
109     rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
110     rmax = bimax[row]; nrow = bilen[row];  \
111     col1 = col - shift; \
112      \
113     low = 0; high = nrow; \
114     while (high-low > 5) { \
115       t = (low+high)/2; \
116       if (rp[t] > col) high = t; \
117       else             low  = t; \
118     } \
119        for ( _i=0; _i<nrow; _i++ ) { \
120         if (rp[_i] > col1) break; \
121         if (rp[_i] == col1) { \
122           if (addv == ADD_VALUES) ap[_i] += value;   \
123           else                  ap[_i] = value; \
124           goto b_noinsert; \
125         } \
126       }  \
127       if (nonew == 1) goto b_noinsert; \
128       else if (nonew == -1) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero into matrix"); \
129       if (nrow >= rmax) { \
130         /* there is no extra room in row, therefore enlarge */ \
131         int    new_nz = bi[b->m] + CHUNKSIZE,len,*new_i,*new_j; \
132         Scalar *new_a; \
133  \
134         if (nonew == -2) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Inserting a new nonzero in the matrix"); \
135  \
136         /* malloc new storage space */ \
137         len     = new_nz*(sizeof(int)+sizeof(Scalar))+(b->m+1)*sizeof(int); \
138         new_a   = (Scalar *) PetscMalloc( len ); CHKPTRQ(new_a); \
139         new_j   = (int *) (new_a + new_nz); \
140         new_i   = new_j + new_nz; \
141  \
142         /* copy over old data into new slots */ \
143         for ( ii=0; ii<row+1; ii++ ) {new_i[ii] = bi[ii];} \
144         for ( ii=row+1; ii<b->m+1; ii++ ) {new_i[ii] = bi[ii]+CHUNKSIZE;} \
145         PetscMemcpy(new_j,bj,(bi[row]+nrow+shift)*sizeof(int)); \
146         len = (new_nz - CHUNKSIZE - bi[row] - nrow - shift); \
147         PetscMemcpy(new_j+bi[row]+shift+nrow+CHUNKSIZE,bj+bi[row]+shift+nrow, \
148                                                            len*sizeof(int)); \
149         PetscMemcpy(new_a,ba,(bi[row]+nrow+shift)*sizeof(Scalar)); \
150         PetscMemcpy(new_a+bi[row]+shift+nrow+CHUNKSIZE,ba+bi[row]+shift+nrow, \
151                                                            len*sizeof(Scalar));  \
152         /* free up old matrix storage */ \
153  \
154         PetscFree(b->a);  \
155         if (!b->singlemalloc) {PetscFree(b->i);PetscFree(b->j);} \
156         ba = b->a = new_a; bi = b->i = new_i; bj = b->j = new_j;  \
157         b->singlemalloc = 1; \
158  \
159         rp   = bj + bi[row] + shift; ap = ba + bi[row] + shift; \
160         rmax = bimax[row] = bimax[row] + CHUNKSIZE; \
161         PLogObjectMemory(B,CHUNKSIZE*(sizeof(int) + sizeof(Scalar))); \
162         b->maxnz += CHUNKSIZE; \
163         b->reallocs++; \
164       } \
165       N = nrow++ - 1; b->nz++; \
166       /* shift up all the later entries in this row */ \
167       for ( ii=N; ii>=_i; ii-- ) { \
168         rp[ii+1] = rp[ii]; \
169         ap[ii+1] = ap[ii]; \
170       } \
171       rp[_i] = col1;  \
172       ap[_i] = value;  \
173       b_noinsert: ; \
174       bilen[row] = nrow; \
175 }
176 
177 extern int MatSetValues_SeqAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
178 #undef __FUNC__
179 #define __FUNC__ "MatSetValues_MPIAIJ"
180 int MatSetValues_MPIAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
181 {
182   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
183   Scalar     value;
184   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
185   int        cstart = aij->cstart, cend = aij->cend,row,col;
186   int        roworiented = aij->roworiented;
187 
188   /* Some Variables required in the macro */
189   Mat        A = aij->A;
190   Mat_SeqAIJ *a = (Mat_SeqAIJ *) A->data;
191   int        *aimax = a->imax, *ai = a->i, *ailen = a->ilen,*aj = a->j;
192   Scalar     *aa = a->a;
193 
194   Mat        B = aij->B;
195   Mat_SeqAIJ *b = (Mat_SeqAIJ *) B->data;
196   int        *bimax = b->imax, *bi = b->i, *bilen = b->ilen,*bj = b->j;
197   Scalar     *ba = b->a;
198 
199   int        *rp,ii,nrow,_i,rmax, N, col1,low,high,t;
200   int        nonew = a->nonew,shift = a->indexshift;
201   Scalar     *ap;
202 
203   PetscFunctionBegin;
204   for ( i=0; i<m; i++ ) {
205 #if defined(USE_PETSC_BOPT_g)
206     if (im[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
207     if (im[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
208 #endif
209     if (im[i] >= rstart && im[i] < rend) {
210       row = im[i] - rstart;
211       for ( j=0; j<n; j++ ) {
212         if (in[j] >= cstart && in[j] < cend){
213           col = in[j] - cstart;
214           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
215           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
216           /* ierr = MatSetValues_SeqAIJ(aij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
217         }
218 #if defined(USE_PETSC_BOPT_g)
219         else if (in[j] < 0) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");}
220         else if (in[j] >= aij->N) {SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");}
221 #endif
222         else {
223           if (mat->was_assembled) {
224             if (!aij->colmap) {
225               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
226             }
227             col = aij->colmap[in[j]] - 1;
228             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
229               ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
230               col =  in[j];
231               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
232               B = aij->B;
233               b = (Mat_SeqAIJ *) B->data;
234               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
235               ba = b->a;
236             }
237           } else col = in[j];
238           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
239           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
240           /* ierr = MatSetValues_SeqAIJ(aij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr); */
241         }
242       }
243     }
244     else {
245       if (roworiented && !aij->donotstash) {
246         ierr = StashValues_Private(&aij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
247       }
248       else {
249         if (!aij->donotstash) {
250           row = im[i];
251           for ( j=0; j<n; j++ ) {
252             ierr = StashValues_Private(&aij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
253           }
254         }
255       }
256     }
257   }
258   PetscFunctionReturn(0);
259 }
260 
261 #undef __FUNC__
262 #define __FUNC__ "MatGetValues_MPIAIJ"
263 int MatGetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
264 {
265   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
266   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
267   int        cstart = aij->cstart, cend = aij->cend,row,col;
268 
269   PetscFunctionBegin;
270   for ( i=0; i<m; i++ ) {
271     if (idxm[i] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative row");
272     if (idxm[i] >= aij->M) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Row too large");
273     if (idxm[i] >= rstart && idxm[i] < rend) {
274       row = idxm[i] - rstart;
275       for ( j=0; j<n; j++ ) {
276         if (idxn[j] < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Negative column");
277         if (idxn[j] >= aij->N) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Column too large");
278         if (idxn[j] >= cstart && idxn[j] < cend){
279           col = idxn[j] - cstart;
280           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
281         }
282         else {
283           if (!aij->colmap) {
284             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
285           }
286           col = aij->colmap[idxn[j]] - 1;
287           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
288           else {
289             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
290           }
291         }
292       }
293     } else {
294       SETERRQ(PETSC_ERR_SUP,0,"Only local values currently supported");
295     }
296   }
297   PetscFunctionReturn(0);
298 }
299 
300 #undef __FUNC__
301 #define __FUNC__ "MatAssemblyBegin_MPIAIJ"
302 int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
303 {
304   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
305   MPI_Comm    comm = mat->comm;
306   int         size = aij->size, *owners = aij->rowners;
307   int         rank = aij->rank,tag = mat->tag, *owner,*starts,count,ierr;
308   MPI_Request *send_waits,*recv_waits;
309   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
310   InsertMode  addv;
311   Scalar      *rvalues,*svalues;
312 
313   PetscFunctionBegin;
314   if (aij->donotstash) {
315     aij->svalues    = 0; aij->rvalues    = 0;
316     aij->nsends     = 0; aij->nrecvs     = 0;
317     aij->send_waits = 0; aij->recv_waits = 0;
318     aij->rmax       = 0;
319     PetscFunctionReturn(0);
320   }
321 
322   /* make sure all processors are either in INSERTMODE or ADDMODE */
323   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);CHKERRQ(ierr);
324   if (addv == (ADD_VALUES|INSERT_VALUES)) {
325     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Some processors inserted others added");
326   }
327   mat->insertmode = addv; /* in case this processor had no cache */
328 
329   /*  first count number of contributors to each processor */
330   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
331   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
332   owner = (int *) PetscMalloc( (aij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
333   for ( i=0; i<aij->stash.n; i++ ) {
334     idx = aij->stash.idx[i];
335     for ( j=0; j<size; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
338       }
339     }
340   }
341   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
342 
343   /* inform other processors of number of messages and max length*/
344   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
345   ierr = MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
346   nreceives = work[rank];
347   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
348   nmax = work[rank];
349   PetscFree(work);
350 
351   /* post receives:
352        1) each message will consist of ordered pairs
353      (global index,value) we store the global index as a double
354      to simplify the message passing.
355        2) since we don't know how long each individual message is we
356      allocate the largest needed buffer for each receive. Potentially
357      this is a lot of wasted space.
358 
359 
360        This could be done better.
361   */
362   rvalues    = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));CHKPTRQ(rvalues);
363   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
364   for ( i=0; i<nreceives; i++ ) {
365     ierr = MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
366               comm,recv_waits+i);CHKERRQ(ierr);
367   }
368 
369   /* do sends:
370       1) starts[i] gives the starting index in svalues for stuff going to
371          the ith processor
372   */
373   svalues    = (Scalar *) PetscMalloc(3*(aij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
374   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
375   starts     = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
376   starts[0]  = 0;
377   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
378   for ( i=0; i<aij->stash.n; i++ ) {
379     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
380     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
381     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
382   }
383   PetscFree(owner);
384   starts[0] = 0;
385   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
386   count = 0;
387   for ( i=0; i<size; i++ ) {
388     if (procs[i]) {
389       ierr = MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
390     }
391   }
392   PetscFree(starts);
393   PetscFree(nprocs);
394 
395   /* Free cache space */
396   PLogInfo(aij->A,"MatAssemblyBegin_MPIAIJ:Number of off-processor values %d\n",aij->stash.n);
397   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
398 
399   aij->svalues    = svalues;    aij->rvalues    = rvalues;
400   aij->nsends     = nsends;     aij->nrecvs     = nreceives;
401   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
402   aij->rmax       = nmax;
403 
404   PetscFunctionReturn(0);
405 }
406 extern int MatSetUpMultiply_MPIAIJ(Mat);
407 
408 #undef __FUNC__
409 #define __FUNC__ "MatAssemblyEnd_MPIAIJ"
410 int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
411 {
412   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
413   MPI_Status  *send_status,recv_status;
414   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n, ierr;
415   int         row,col,other_disassembled;
416   Scalar      *values,val;
417   InsertMode  addv = mat->insertmode;
418 
419   PetscFunctionBegin;
420 
421   /*  wait on receives */
422   while (count) {
423     ierr = MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
424     /* unpack receives into our local space */
425     values = aij->rvalues + 3*imdex*aij->rmax;
426     ierr = MPI_Get_count(&recv_status,MPIU_SCALAR,&n);CHKERRQ(ierr);
427     n = n/3;
428     for ( i=0; i<n; i++ ) {
429       row = (int) PetscReal(values[3*i]) - aij->rstart;
430       col = (int) PetscReal(values[3*i+1]);
431       val = values[3*i+2];
432       if (col >= aij->cstart && col < aij->cend) {
433         col -= aij->cstart;
434         ierr = MatSetValues(aij->A,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
435       } else {
436         if (mat->was_assembled || mat->assembled) {
437           if (!aij->colmap) {
438             ierr = CreateColmap_MPIAIJ_Private(mat); CHKERRQ(ierr);
439           }
440           col = aij->colmap[col] - 1;
441           if (col < 0  && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
442             ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
443             col = (int) PetscReal(values[3*i+1]);
444           }
445         }
446         ierr = MatSetValues(aij->B,1,&row,1,&col,&val,addv); CHKERRQ(ierr);
447       }
448     }
449     count--;
450   }
451   if (aij->recv_waits) PetscFree(aij->recv_waits);
452   if (aij->rvalues)    PetscFree(aij->rvalues);
453 
454   /* wait on sends */
455   if (aij->nsends) {
456     send_status = (MPI_Status *) PetscMalloc(aij->nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
457     ierr        = MPI_Waitall(aij->nsends,aij->send_waits,send_status);CHKERRQ(ierr);
458     PetscFree(send_status);
459   }
460   if (aij->send_waits) PetscFree(aij->send_waits);
461   if (aij->svalues)    PetscFree(aij->svalues);
462 
463   ierr = MatAssemblyBegin(aij->A,mode); CHKERRQ(ierr);
464   ierr = MatAssemblyEnd(aij->A,mode); CHKERRQ(ierr);
465 
466   /* determine if any processor has disassembled, if so we must
467      also disassemble ourselfs, in order that we may reassemble. */
468   /*
469      if nonzero structure of submatrix B cannot change then we know that
470      no processor disassembled thus we can skip this stuff
471   */
472   if (!((Mat_SeqAIJ*) aij->B->data)->nonew)  {
473     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
474     if (mat->was_assembled && !other_disassembled) {
475       ierr = DisAssemble_MPIAIJ(mat); CHKERRQ(ierr);
476     }
477   }
478 
479   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
480     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERRQ(ierr);
481   }
482   ierr = MatAssemblyBegin(aij->B,mode); CHKERRQ(ierr);
483   ierr = MatAssemblyEnd(aij->B,mode); CHKERRQ(ierr);
484 
485   if (aij->rowvalues) {PetscFree(aij->rowvalues); aij->rowvalues = 0;}
486   PetscFunctionReturn(0);
487 }
488 
489 #undef __FUNC__
490 #define __FUNC__ "MatZeroEntries_MPIAIJ"
491 int MatZeroEntries_MPIAIJ(Mat A)
492 {
493   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
494   int        ierr;
495 
496   PetscFunctionBegin;
497   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
498   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
499   PetscFunctionReturn(0);
500 }
501 
502 #undef __FUNC__
503 #define __FUNC__ "MatZeroRows_MPIAIJ"
504 int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
505 {
506   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
507   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
508   int            *procs,*nprocs,j,found,idx,nsends,*work,row;
509   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
510   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
511   int            *lens,imdex,*lrows,*values,rstart=l->rstart;
512   MPI_Comm       comm = A->comm;
513   MPI_Request    *send_waits,*recv_waits;
514   MPI_Status     recv_status,*send_status;
515   IS             istmp;
516   PetscTruth     localdiag;
517 
518   PetscFunctionBegin;
519   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
520   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
521 
522   /*  first count number of contributors to each processor */
523   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
524   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
525   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
526   for ( i=0; i<N; i++ ) {
527     idx = rows[i];
528     found = 0;
529     for ( j=0; j<size; j++ ) {
530       if (idx >= owners[j] && idx < owners[j+1]) {
531         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
532       }
533     }
534     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Index out of range");
535   }
536   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
537 
538   /* inform other processors of number of messages and max length*/
539   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
540   ierr = MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
541   nrecvs = work[rank];
542   ierr = MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);CHKERRQ(ierr);
543   nmax = work[rank];
544   PetscFree(work);
545 
546   /* post receives:   */
547   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int));CHKPTRQ(rvalues);
548   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));CHKPTRQ(recv_waits);
549   for ( i=0; i<nrecvs; i++ ) {
550     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
551   }
552 
553   /* do sends:
554       1) starts[i] gives the starting index in svalues for stuff going to
555          the ith processor
556   */
557   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
558   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));CHKPTRQ(send_waits);
559   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
560   starts[0] = 0;
561   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
562   for ( i=0; i<N; i++ ) {
563     svalues[starts[owner[i]]++] = rows[i];
564   }
565   ISRestoreIndices(is,&rows);
566 
567   starts[0] = 0;
568   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
569   count = 0;
570   for ( i=0; i<size; i++ ) {
571     if (procs[i]) {
572       ierr = MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
573     }
574   }
575   PetscFree(starts);
576 
577   base = owners[rank];
578 
579   /*  wait on receives */
580   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
581   source = lens + nrecvs;
582   count  = nrecvs; slen = 0;
583   while (count) {
584     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
585     /* unpack receives into our local space */
586     ierr = MPI_Get_count(&recv_status,MPI_INT,&n);CHKERRQ(ierr);
587     source[imdex]  = recv_status.MPI_SOURCE;
588     lens[imdex]  = n;
589     slen += n;
590     count--;
591   }
592   PetscFree(recv_waits);
593 
594   /* move the data into the send scatter */
595   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
596   count = 0;
597   for ( i=0; i<nrecvs; i++ ) {
598     values = rvalues + i*nmax;
599     for ( j=0; j<lens[i]; j++ ) {
600       lrows[count++] = values[j] - base;
601     }
602   }
603   PetscFree(rvalues); PetscFree(lens);
604   PetscFree(owner); PetscFree(nprocs);
605 
606   /* actually zap the local rows */
607   ierr = ISCreateGeneral(PETSC_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
608   PLogObjectParent(A,istmp);
609 
610   /*
611         Zero the required rows. If the "diagonal block" of the matrix
612      is square and the user wishes to set the diagonal we use seperate
613      code so that MatSetValues() is not called for each diagonal allocating
614      new memory, thus calling lots of mallocs and slowing things down.
615 
616        Contributed by: Mathew Knepley
617   */
618   localdiag = PETSC_FALSE;
619   if (diag && (l->A->M == l->A->N)) {
620     localdiag = PETSC_TRUE;
621     ierr      = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
622   } else {
623     ierr = MatZeroRows(l->A,istmp,0); CHKERRQ(ierr);
624   }
625   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
626   ierr = ISDestroy(istmp); CHKERRQ(ierr);
627 
628   if (diag && (localdiag == PETSC_FALSE)) {
629     for ( i = 0; i < slen; i++ ) {
630       row = lrows[i] + rstart;
631       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
632     }
633     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
634     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
635   }
636 
637   if (diag) {
638     for ( i = 0; i < slen; i++ ) {
639       row = lrows[i] + rstart;
640       MatSetValues(A,1,&row,1,&row,diag,INSERT_VALUES);
641     }
642     MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);
643     MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);
644   }
645   PetscFree(lrows);
646 
647   /* wait on sends */
648   if (nsends) {
649     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));CHKPTRQ(send_status);
650     ierr        = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
651     PetscFree(send_status);
652   }
653   PetscFree(send_waits); PetscFree(svalues);
654 
655   PetscFunctionReturn(0);
656 }
657 
658 #undef __FUNC__
659 #define __FUNC__ "MatMult_MPIAIJ"
660 int MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
661 {
662   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
663   int        ierr,nt;
664 
665   PetscFunctionBegin;
666   ierr = VecGetLocalSize(xx,&nt);  CHKERRQ(ierr);
667   if (nt != a->n) {
668     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Incompatible partition of A and xx");
669   }
670   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
671   ierr = (*a->A->ops->mult)(a->A,xx,yy); CHKERRQ(ierr);
672   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx); CHKERRQ(ierr);
673   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
674   PetscFunctionReturn(0);
675 }
676 
677 #undef __FUNC__
678 #define __FUNC__ "MatMultAdd_MPIAIJ"
679 int MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
680 {
681   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
682   int        ierr;
683 
684   PetscFunctionBegin;
685   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
686   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
687   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD,a->Mvctx);CHKERRQ(ierr);
688   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
689   PetscFunctionReturn(0);
690 }
691 
692 #undef __FUNC__
693 #define __FUNC__ "MatMultTrans_MPIAIJ"
694 int MatMultTrans_MPIAIJ(Mat A,Vec xx,Vec yy)
695 {
696   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
697   int        ierr;
698 
699   PetscFunctionBegin;
700   /* do nondiagonal part */
701   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
702   /* send it on its way */
703   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
704   /* do local part */
705   ierr = (*a->A->ops->multtrans)(a->A,xx,yy); CHKERRQ(ierr);
706   /* receive remote parts: note this assumes the values are not actually */
707   /* inserted in yy until the next line, which is true for my implementation*/
708   /* but is not perhaps always true. */
709   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
710   PetscFunctionReturn(0);
711 }
712 
713 #undef __FUNC__
714 #define __FUNC__ "MatMultTransAdd_MPIAIJ"
715 int MatMultTransAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
716 {
717   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
718   int        ierr;
719 
720   PetscFunctionBegin;
721   /* do nondiagonal part */
722   ierr = (*a->B->ops->multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
723   /* send it on its way */
724   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
725   /* do local part */
726   ierr = (*a->A->ops->multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
727   /* receive remote parts: note this assumes the values are not actually */
728   /* inserted in yy until the next line, which is true for my implementation*/
729   /* but is not perhaps always true. */
730   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
731   PetscFunctionReturn(0);
732 }
733 
734 /*
735   This only works correctly for square matrices where the subblock A->A is the
736    diagonal block
737 */
738 #undef __FUNC__
739 #define __FUNC__ "MatGetDiagonal_MPIAIJ"
740 int MatGetDiagonal_MPIAIJ(Mat A,Vec v)
741 {
742   int        ierr;
743   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
744 
745   PetscFunctionBegin;
746   if (a->M != a->N) SETERRQ(PETSC_ERR_SUP,0,"Supports only square matrix where A->A is diag block");
747   if (a->rstart != a->cstart || a->rend != a->cend) {
748     SETERRQ(PETSC_ERR_ARG_SIZ,0,"row partition must equal col partition");
749   }
750   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
751   PetscFunctionReturn(0);
752 }
753 
754 #undef __FUNC__
755 #define __FUNC__ "MatScale_MPIAIJ"
756 int MatScale_MPIAIJ(Scalar *aa,Mat A)
757 {
758   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
759   int        ierr;
760 
761   PetscFunctionBegin;
762   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
763   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
764   PetscFunctionReturn(0);
765 }
766 
767 #undef __FUNC__
768 #define __FUNC__ "MatDestroy_MPIAIJ"
769 int MatDestroy_MPIAIJ(Mat mat)
770 {
771   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
772   int        ierr;
773 
774   PetscFunctionBegin;
775   if (--mat->refct > 0) PetscFunctionReturn(0);
776 
777   if (mat->mapping) {
778     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
779   }
780   if (mat->bmapping) {
781     ierr = ISLocalToGlobalMappingDestroy(mat->bmapping); CHKERRQ(ierr);
782   }
783 #if defined(USE_PETSC_LOG)
784   PLogObjectState((PetscObject)mat,"Rows=%d, Cols=%d",aij->M,aij->N);
785 #endif
786   ierr = StashDestroy_Private(&aij->stash); CHKERRQ(ierr);
787   PetscFree(aij->rowners);
788   ierr = MatDestroy(aij->A); CHKERRQ(ierr);
789   ierr = MatDestroy(aij->B); CHKERRQ(ierr);
790   if (aij->colmap) PetscFree(aij->colmap);
791   if (aij->garray) PetscFree(aij->garray);
792   if (aij->lvec)   VecDestroy(aij->lvec);
793   if (aij->Mvctx)  VecScatterDestroy(aij->Mvctx);
794   if (aij->rowvalues) PetscFree(aij->rowvalues);
795   PetscFree(aij);
796   PLogObjectDestroy(mat);
797   PetscHeaderDestroy(mat);
798   PetscFunctionReturn(0);
799 }
800 
801 #undef __FUNC__
802 #define __FUNC__ "MatView_MPIAIJ_Binary"
803 extern int MatView_MPIAIJ_Binary(Mat mat,Viewer viewer)
804 {
805   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
806   int         ierr;
807 
808   PetscFunctionBegin;
809   if (aij->size == 1) {
810     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
811   }
812   else SETERRQ(PETSC_ERR_SUP,0,"Only uniprocessor output supported");
813   PetscFunctionReturn(0);
814 }
815 
816 #undef __FUNC__
817 #define __FUNC__ "MatView_MPIAIJ_ASCIIorDraworMatlab"
818 extern int MatView_MPIAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
819 {
820   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
821   Mat_SeqAIJ* C = (Mat_SeqAIJ*)aij->A->data;
822   int         ierr, format,shift = C->indexshift,rank;
823   FILE        *fd;
824   ViewerType  vtype;
825 
826   PetscFunctionBegin;
827   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
828   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
829     ierr = ViewerGetFormat(viewer,&format);
830     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
831       MatInfo info;
832       int     flg;
833       MPI_Comm_rank(mat->comm,&rank);
834       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
835       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
836       ierr = OptionsHasName(PETSC_NULL,"-mat_aij_no_inode",&flg); CHKERRQ(ierr);
837       PetscSequentialPhaseBegin(mat->comm,1);
838       if (flg) fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, not using I-node routines\n",
839          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
840       else fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d mem %d, using I-node routines\n",
841          rank,aij->m,(int)info.nz_used,(int)info.nz_allocated,(int)info.memory);
842       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);
843       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used);
844       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);
845       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used);
846       fflush(fd);
847       PetscSequentialPhaseEnd(mat->comm,1);
848       ierr = VecScatterView(aij->Mvctx,viewer); CHKERRQ(ierr);
849       PetscFunctionReturn(0);
850     } else if (format == VIEWER_FORMAT_ASCII_INFO) {
851       PetscFunctionReturn(0);
852     }
853   }
854 
855   if (vtype == DRAW_VIEWER) {
856     Draw       draw;
857     PetscTruth isnull;
858     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
859     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
860   }
861 
862   if (vtype == ASCII_FILE_VIEWER) {
863     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
864     PetscSequentialPhaseBegin(mat->comm,1);
865     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
866            aij->rank,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
867            aij->cend);
868     ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
869     ierr = MatView(aij->B,viewer); CHKERRQ(ierr);
870     fflush(fd);
871     PetscSequentialPhaseEnd(mat->comm,1);
872   } else {
873     int size = aij->size;
874     rank = aij->rank;
875     if (size == 1) {
876       ierr = MatView(aij->A,viewer); CHKERRQ(ierr);
877     } else {
878       /* assemble the entire matrix onto first processor. */
879       Mat         A;
880       Mat_SeqAIJ *Aloc;
881       int         M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
882       Scalar      *a;
883 
884       if (!rank) {
885         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
886       } else {
887         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);CHKERRQ(ierr);
888       }
889       PLogObjectParent(mat,A);
890 
891       /* copy over the A part */
892       Aloc = (Mat_SeqAIJ*) aij->A->data;
893       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
894       row = aij->rstart;
895       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += aij->cstart + shift;}
896       for ( i=0; i<m; i++ ) {
897         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
898         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
899       }
900       aj = Aloc->j;
901       for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= aij->cstart + shift;}
902 
903       /* copy over the B part */
904       Aloc = (Mat_SeqAIJ*) aij->B->data;
905       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
906       row = aij->rstart;
907       ct = cols = (int *) PetscMalloc( (ai[m]+1)*sizeof(int) ); CHKPTRQ(cols);
908       for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = aij->garray[aj[i]+shift];}
909       for ( i=0; i<m; i++ ) {
910         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
911         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
912       }
913       PetscFree(ct);
914       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
915       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
916       /*
917          Everyone has to call to draw the matrix since the graphics waits are
918          synchronized across all processors that share the Draw object
919       */
920       if (!rank || vtype == DRAW_VIEWER) {
921         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
922       }
923       ierr = MatDestroy(A); CHKERRQ(ierr);
924     }
925   }
926   PetscFunctionReturn(0);
927 }
928 
929 #undef __FUNC__
930 #define __FUNC__ "MatView_MPIAIJ"
931 int MatView_MPIAIJ(Mat mat,Viewer viewer)
932 {
933   int         ierr;
934   ViewerType  vtype;
935 
936   PetscFunctionBegin;
937   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
938   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
939       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
940     ierr = MatView_MPIAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
941   } else if (vtype == BINARY_FILE_VIEWER) {
942     ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
943   } else {
944     SETERRQ(1,1,"Viewer type not supported by PETSc object");
945   }
946   PetscFunctionReturn(0);
947 }
948 
949 /*
950     This has to provide several versions.
951 
952      2) a) use only local smoothing updating outer values only once.
953         b) local smoothing updating outer values each inner iteration
954      3) color updating out values betwen colors.
955 */
956 #undef __FUNC__
957 #define __FUNC__ "MatRelax_MPIAIJ"
958 int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
959                            double fshift,int its,Vec xx)
960 {
961   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
962   Mat        AA = mat->A, BB = mat->B;
963   Mat_SeqAIJ *A = (Mat_SeqAIJ *) AA->data, *B = (Mat_SeqAIJ *)BB->data;
964   Scalar     *b,*x,*xs,*ls,d,*v,sum;
965   int        ierr,*idx, *diag;
966   int        n = mat->n, m = mat->m, i,shift = A->indexshift;
967 
968   PetscFunctionBegin;
969   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
970   xs = x + shift; /* shift by one for index start of 1 */
971   ls = ls + shift;
972   if (!A->diag) {ierr = MatMarkDiag_SeqAIJ(AA); CHKERRQ(ierr);}
973   diag = A->diag;
974   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
975     if (flag & SOR_ZERO_INITIAL_GUESS) {
976       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
977       PetscFunctionReturn(0);
978     }
979     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
980     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
981     while (its--) {
982       /* go down through the rows */
983       for ( i=0; i<m; i++ ) {
984         n    = A->i[i+1] - A->i[i];
985         idx  = A->j + A->i[i] + shift;
986         v    = A->a + A->i[i] + shift;
987         sum  = b[i];
988         SPARSEDENSEMDOT(sum,xs,v,idx,n);
989         d    = fshift + A->a[diag[i]+shift];
990         n    = B->i[i+1] - B->i[i];
991         idx  = B->j + B->i[i] + shift;
992         v    = B->a + B->i[i] + shift;
993         SPARSEDENSEMDOT(sum,ls,v,idx,n);
994         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
995       }
996       /* come up through the rows */
997       for ( i=m-1; i>-1; i-- ) {
998         n    = A->i[i+1] - A->i[i];
999         idx  = A->j + A->i[i] + shift;
1000         v    = A->a + A->i[i] + shift;
1001         sum  = b[i];
1002         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1003         d    = fshift + A->a[diag[i]+shift];
1004         n    = B->i[i+1] - B->i[i];
1005         idx  = B->j + B->i[i] + shift;
1006         v    = B->a + B->i[i] + shift;
1007         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1008         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1009       }
1010     }
1011   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1012     if (flag & SOR_ZERO_INITIAL_GUESS) {
1013       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1014       PetscFunctionReturn(0);
1015     }
1016     ierr=VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1017     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,mat->Mvctx);CHKERRQ(ierr);
1018     while (its--) {
1019       for ( i=0; i<m; i++ ) {
1020         n    = A->i[i+1] - A->i[i];
1021         idx  = A->j + A->i[i] + shift;
1022         v    = A->a + A->i[i] + shift;
1023         sum  = b[i];
1024         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1025         d    = fshift + A->a[diag[i]+shift];
1026         n    = B->i[i+1] - B->i[i];
1027         idx  = B->j + B->i[i] + shift;
1028         v    = B->a + B->i[i] + shift;
1029         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1030         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1031       }
1032     }
1033   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1034     if (flag & SOR_ZERO_INITIAL_GUESS) {
1035       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,its,xx);CHKERRQ(ierr);
1036       PetscFunctionReturn(0);
1037     }
1038     ierr = VecScatterBegin(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1039                             mat->Mvctx); CHKERRQ(ierr);
1040     ierr = VecScatterEnd(xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD,
1041                             mat->Mvctx); CHKERRQ(ierr);
1042     while (its--) {
1043       for ( i=m-1; i>-1; i-- ) {
1044         n    = A->i[i+1] - A->i[i];
1045         idx  = A->j + A->i[i] + shift;
1046         v    = A->a + A->i[i] + shift;
1047         sum  = b[i];
1048         SPARSEDENSEMDOT(sum,xs,v,idx,n);
1049         d    = fshift + A->a[diag[i]+shift];
1050         n    = B->i[i+1] - B->i[i];
1051         idx  = B->j + B->i[i] + shift;
1052         v    = B->a + B->i[i] + shift;
1053         SPARSEDENSEMDOT(sum,ls,v,idx,n);
1054         x[i] = (1. - omega)*x[i] + omega*(sum + A->a[diag[i]+shift]*x[i])/d;
1055       }
1056     }
1057   } else {
1058     SETERRQ(PETSC_ERR_SUP,0,"Parallel SOR not supported");
1059   }
1060   PetscFunctionReturn(0);
1061 }
1062 
1063 #undef __FUNC__
1064 #define __FUNC__ "MatGetInfo_MPIAIJ"
1065 int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1066 {
1067   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1068   Mat        A = mat->A, B = mat->B;
1069   int        ierr;
1070   double     isend[5], irecv[5];
1071 
1072   PetscFunctionBegin;
1073   info->block_size     = 1.0;
1074   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
1075   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1076   isend[3] = info->memory;  isend[4] = info->mallocs;
1077   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
1078   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1079   isend[3] += info->memory;  isend[4] += info->mallocs;
1080   if (flag == MAT_LOCAL) {
1081     info->nz_used      = isend[0];
1082     info->nz_allocated = isend[1];
1083     info->nz_unneeded  = isend[2];
1084     info->memory       = isend[3];
1085     info->mallocs      = isend[4];
1086   } else if (flag == MAT_GLOBAL_MAX) {
1087     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_MAX,matin->comm);CHKERRQ(ierr);
1088     info->nz_used      = irecv[0];
1089     info->nz_allocated = irecv[1];
1090     info->nz_unneeded  = irecv[2];
1091     info->memory       = irecv[3];
1092     info->mallocs      = irecv[4];
1093   } else if (flag == MAT_GLOBAL_SUM) {
1094     ierr = MPI_Allreduce(isend,irecv,5,MPI_DOUBLE,MPI_SUM,matin->comm);CHKERRQ(ierr);
1095     info->nz_used      = irecv[0];
1096     info->nz_allocated = irecv[1];
1097     info->nz_unneeded  = irecv[2];
1098     info->memory       = irecv[3];
1099     info->mallocs      = irecv[4];
1100   }
1101   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1102   info->fill_ratio_needed = 0;
1103   info->factor_mallocs    = 0;
1104   info->rows_global       = (double)mat->M;
1105   info->columns_global    = (double)mat->N;
1106   info->rows_local        = (double)mat->m;
1107   info->columns_local     = (double)mat->N;
1108 
1109   PetscFunctionReturn(0);
1110 }
1111 
1112 #undef __FUNC__
1113 #define __FUNC__ "MatSetOption_MPIAIJ"
1114 int MatSetOption_MPIAIJ(Mat A,MatOption op)
1115 {
1116   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1117 
1118   PetscFunctionBegin;
1119   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
1120       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
1121       op == MAT_COLUMNS_UNSORTED ||
1122       op == MAT_COLUMNS_SORTED ||
1123       op == MAT_NEW_NONZERO_ALLOCATION_ERROR ||
1124       op == MAT_NEW_NONZERO_LOCATION_ERROR) {
1125         MatSetOption(a->A,op);
1126         MatSetOption(a->B,op);
1127   } else if (op == MAT_ROW_ORIENTED) {
1128     a->roworiented = 1;
1129     MatSetOption(a->A,op);
1130     MatSetOption(a->B,op);
1131   } else if (op == MAT_ROWS_SORTED ||
1132              op == MAT_ROWS_UNSORTED ||
1133              op == MAT_SYMMETRIC ||
1134              op == MAT_STRUCTURALLY_SYMMETRIC ||
1135              op == MAT_YES_NEW_DIAGONALS)
1136     PLogInfo(A,"MatSetOption_MPIAIJ:Option ignored\n");
1137   else if (op == MAT_COLUMN_ORIENTED) {
1138     a->roworiented = 0;
1139     MatSetOption(a->A,op);
1140     MatSetOption(a->B,op);
1141   } else if (op == MAT_IGNORE_OFF_PROC_ENTRIES) {
1142     a->donotstash = 1;
1143   } else if (op == MAT_NO_NEW_DIAGONALS){
1144     SETERRQ(PETSC_ERR_SUP,0,"MAT_NO_NEW_DIAGONALS");
1145   } else {
1146     SETERRQ(PETSC_ERR_SUP,0,"unknown option");
1147   }
1148   PetscFunctionReturn(0);
1149 }
1150 
1151 #undef __FUNC__
1152 #define __FUNC__ "MatGetSize_MPIAIJ"
1153 int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
1154 {
1155   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1156 
1157   PetscFunctionBegin;
1158   if (m) *m = mat->M;
1159   if (n) *n = mat->N;
1160   PetscFunctionReturn(0);
1161 }
1162 
1163 #undef __FUNC__
1164 #define __FUNC__ "MatGetLocalSize_MPIAIJ"
1165 int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
1166 {
1167   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1168 
1169   PetscFunctionBegin;
1170   if (m) *m = mat->m;
1171   if (n) *n = mat->n;
1172   PetscFunctionReturn(0);
1173 }
1174 
1175 #undef __FUNC__
1176 #define __FUNC__ "MatGetOwnershipRange_MPIAIJ"
1177 int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1178 {
1179   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1180 
1181   PetscFunctionBegin;
1182   *m = mat->rstart; *n = mat->rend;
1183   PetscFunctionReturn(0);
1184 }
1185 
1186 extern int MatGetRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1187 extern int MatRestoreRow_SeqAIJ(Mat,int,int*,int**,Scalar**);
1188 
1189 #undef __FUNC__
1190 #define __FUNC__ "MatGetRow_MPIAIJ"
1191 int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1192 {
1193   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1194   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
1195   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1196   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1197   int        *cmap, *idx_p;
1198 
1199   PetscFunctionBegin;
1200   if (mat->getrowactive == PETSC_TRUE) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"Already active");
1201   mat->getrowactive = PETSC_TRUE;
1202 
1203   if (!mat->rowvalues && (idx || v)) {
1204     /*
1205         allocate enough space to hold information from the longest row.
1206     */
1207     Mat_SeqAIJ *Aa = (Mat_SeqAIJ *) mat->A->data,*Ba = (Mat_SeqAIJ *) mat->B->data;
1208     int     max = 1,m = mat->m,tmp;
1209     for ( i=0; i<m; i++ ) {
1210       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1211       if (max < tmp) { max = tmp; }
1212     }
1213     mat->rowvalues = (Scalar *) PetscMalloc( max*(sizeof(int)+sizeof(Scalar)));
1214     CHKPTRQ(mat->rowvalues);
1215     mat->rowindices = (int *) (mat->rowvalues + max);
1216   }
1217 
1218   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,0,"Only local rows")
1219   lrow = row - rstart;
1220 
1221   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1222   if (!v)   {pvA = 0; pvB = 0;}
1223   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1224   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1225   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1226   nztot = nzA + nzB;
1227 
1228   cmap  = mat->garray;
1229   if (v  || idx) {
1230     if (nztot) {
1231       /* Sort by increasing column numbers, assuming A and B already sorted */
1232       int imark = -1;
1233       if (v) {
1234         *v = v_p = mat->rowvalues;
1235         for ( i=0; i<nzB; i++ ) {
1236           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1237           else break;
1238         }
1239         imark = i;
1240         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
1241         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
1242       }
1243       if (idx) {
1244         *idx = idx_p = mat->rowindices;
1245         if (imark > -1) {
1246           for ( i=0; i<imark; i++ ) {
1247             idx_p[i] = cmap[cworkB[i]];
1248           }
1249         } else {
1250           for ( i=0; i<nzB; i++ ) {
1251             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1252             else break;
1253           }
1254           imark = i;
1255         }
1256         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart + cworkA[i];
1257         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]];
1258       }
1259     }
1260     else {
1261       if (idx) *idx = 0;
1262       if (v)   *v   = 0;
1263     }
1264   }
1265   *nz = nztot;
1266   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
1267   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
1268   PetscFunctionReturn(0);
1269 }
1270 
1271 #undef __FUNC__
1272 #define __FUNC__ "MatRestoreRow_MPIAIJ"
1273 int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1274 {
1275   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1276 
1277   PetscFunctionBegin;
1278   if (aij->getrowactive == PETSC_FALSE) {
1279     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,0,"MatGetRow not called");
1280   }
1281   aij->getrowactive = PETSC_FALSE;
1282   PetscFunctionReturn(0);
1283 }
1284 
1285 #undef __FUNC__
1286 #define __FUNC__ "MatNorm_MPIAIJ"
1287 int MatNorm_MPIAIJ(Mat mat,NormType type,double *norm)
1288 {
1289   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1290   Mat_SeqAIJ *amat = (Mat_SeqAIJ*) aij->A->data, *bmat = (Mat_SeqAIJ*) aij->B->data;
1291   int        ierr, i, j, cstart = aij->cstart,shift = amat->indexshift;
1292   double     sum = 0.0;
1293   Scalar     *v;
1294 
1295   PetscFunctionBegin;
1296   if (aij->size == 1) {
1297     ierr =  MatNorm(aij->A,type,norm); CHKERRQ(ierr);
1298   } else {
1299     if (type == NORM_FROBENIUS) {
1300       v = amat->a;
1301       for (i=0; i<amat->nz; i++ ) {
1302 #if defined(USE_PETSC_COMPLEX)
1303         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1304 #else
1305         sum += (*v)*(*v); v++;
1306 #endif
1307       }
1308       v = bmat->a;
1309       for (i=0; i<bmat->nz; i++ ) {
1310 #if defined(USE_PETSC_COMPLEX)
1311         sum += PetscReal(PetscConj(*v)*(*v)); v++;
1312 #else
1313         sum += (*v)*(*v); v++;
1314 #endif
1315       }
1316       ierr = MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1317       *norm = sqrt(*norm);
1318     } else if (type == NORM_1) { /* max column norm */
1319       double *tmp, *tmp2;
1320       int    *jj, *garray = aij->garray;
1321       tmp  = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp);
1322       tmp2 = (double *) PetscMalloc( (aij->N+1)*sizeof(double) ); CHKPTRQ(tmp2);
1323       PetscMemzero(tmp,aij->N*sizeof(double));
1324       *norm = 0.0;
1325       v = amat->a; jj = amat->j;
1326       for ( j=0; j<amat->nz; j++ ) {
1327         tmp[cstart + *jj++ + shift] += PetscAbsScalar(*v);  v++;
1328       }
1329       v = bmat->a; jj = bmat->j;
1330       for ( j=0; j<bmat->nz; j++ ) {
1331         tmp[garray[*jj++ + shift]] += PetscAbsScalar(*v); v++;
1332       }
1333       ierr = MPI_Allreduce(tmp,tmp2,aij->N,MPI_DOUBLE,MPI_SUM,mat->comm);CHKERRQ(ierr);
1334       for ( j=0; j<aij->N; j++ ) {
1335         if (tmp2[j] > *norm) *norm = tmp2[j];
1336       }
1337       PetscFree(tmp); PetscFree(tmp2);
1338     } else if (type == NORM_INFINITY) { /* max row norm */
1339       double ntemp = 0.0;
1340       for ( j=0; j<amat->m; j++ ) {
1341         v = amat->a + amat->i[j] + shift;
1342         sum = 0.0;
1343         for ( i=0; i<amat->i[j+1]-amat->i[j]; i++ ) {
1344           sum += PetscAbsScalar(*v); v++;
1345         }
1346         v = bmat->a + bmat->i[j] + shift;
1347         for ( i=0; i<bmat->i[j+1]-bmat->i[j]; i++ ) {
1348           sum += PetscAbsScalar(*v); v++;
1349         }
1350         if (sum > ntemp) ntemp = sum;
1351       }
1352       ierr = MPI_Allreduce(&ntemp,norm,1,MPI_DOUBLE,MPI_MAX,mat->comm);CHKERRQ(ierr);
1353     } else {
1354       SETERRQ(PETSC_ERR_SUP,0,"No support for two norm");
1355     }
1356   }
1357   PetscFunctionReturn(0);
1358 }
1359 
1360 #undef __FUNC__
1361 #define __FUNC__ "MatTranspose_MPIAIJ"
1362 int MatTranspose_MPIAIJ(Mat A,Mat *matout)
1363 {
1364   Mat_MPIAIJ *a = (Mat_MPIAIJ *) A->data;
1365   Mat_SeqAIJ *Aloc = (Mat_SeqAIJ *) a->A->data;
1366   int        ierr,shift = Aloc->indexshift;
1367   int        M = a->M, N = a->N,m,*ai,*aj,row,*cols,i,*ct;
1368   Mat        B;
1369   Scalar     *array;
1370 
1371   PetscFunctionBegin;
1372   if (matout == PETSC_NULL && M != N) {
1373     SETERRQ(PETSC_ERR_ARG_SIZ,0,"Square matrix only for in-place");
1374   }
1375 
1376   ierr = MatCreateMPIAIJ(A->comm,a->n,a->m,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);CHKERRQ(ierr);
1377 
1378   /* copy over the A part */
1379   Aloc = (Mat_SeqAIJ*) a->A->data;
1380   m = Aloc->m; ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1381   row = a->rstart;
1382   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] += a->cstart + shift;}
1383   for ( i=0; i<m; i++ ) {
1384     ierr = MatSetValues(B,ai[i+1]-ai[i],aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1385     row++; array += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
1386   }
1387   aj = Aloc->j;
1388   for ( i=0; i<ai[m]+shift; i++ ) {aj[i] -= a->cstart + shift;}
1389 
1390   /* copy over the B part */
1391   Aloc = (Mat_SeqAIJ*) a->B->data;
1392   m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; array = Aloc->a;
1393   row = a->rstart;
1394   ct = cols = (int *) PetscMalloc( (1+ai[m]-shift)*sizeof(int) ); CHKPTRQ(cols);
1395   for ( i=0; i<ai[m]+shift; i++ ) {cols[i] = a->garray[aj[i]+shift];}
1396   for ( i=0; i<m; i++ ) {
1397     ierr = MatSetValues(B,ai[i+1]-ai[i],cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1398     row++; array += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
1399   }
1400   PetscFree(ct);
1401   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1402   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1403   if (matout != PETSC_NULL) {
1404     *matout = B;
1405   } else {
1406     PetscOps       *Abops;
1407     struct _MatOps *Aops;
1408 
1409     /* This isn't really an in-place transpose .... but free data structures from a */
1410     PetscFree(a->rowners);
1411     ierr = MatDestroy(a->A); CHKERRQ(ierr);
1412     ierr = MatDestroy(a->B); CHKERRQ(ierr);
1413     if (a->colmap) PetscFree(a->colmap);
1414     if (a->garray) PetscFree(a->garray);
1415     if (a->lvec) VecDestroy(a->lvec);
1416     if (a->Mvctx) VecScatterDestroy(a->Mvctx);
1417     PetscFree(a);
1418 
1419     /*
1420        This is horrible, horrible code. We need to keep the
1421       A pointers for the bops and ops but copy everything
1422       else from C.
1423     */
1424     Abops = A->bops;
1425     Aops  = A->ops;
1426     PetscMemcpy(A,B,sizeof(struct _p_Mat));
1427     A->bops = Abops;
1428     A->ops  = Aops;
1429     PetscHeaderDestroy(B);
1430   }
1431   PetscFunctionReturn(0);
1432 }
1433 
1434 #undef __FUNC__
1435 #define __FUNC__ "MatDiagonalScale_MPIAIJ"
1436 int MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1437 {
1438   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
1439   Mat a = aij->A, b = aij->B;
1440   int ierr,s1,s2,s3;
1441 
1442   PetscFunctionBegin;
1443   ierr = MatGetLocalSize(mat,&s2,&s3); CHKERRQ(ierr);
1444   if (rr) {
1445     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1446     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,0,"right vector non-conforming local size");
1447     /* Overlap communication with computation. */
1448     ierr = VecScatterBegin(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1449   }
1450   if (ll) {
1451     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1452     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,0,"left vector non-conforming local size");
1453     ierr = (*b->ops->diagonalscale)(b,ll,0); CHKERRQ(ierr);
1454   }
1455   /* scale  the diagonal block */
1456   ierr = (*a->ops->diagonalscale)(a,ll,rr); CHKERRQ(ierr);
1457 
1458   if (rr) {
1459     /* Do a scatter end and then right scale the off-diagonal block */
1460     ierr = VecScatterEnd(rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD,aij->Mvctx); CHKERRQ(ierr);
1461     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec); CHKERRQ(ierr);
1462   }
1463 
1464   PetscFunctionReturn(0);
1465 }
1466 
1467 
1468 extern int MatPrintHelp_SeqAIJ(Mat);
1469 #undef __FUNC__
1470 #define __FUNC__ "MatPrintHelp_MPIAIJ"
1471 int MatPrintHelp_MPIAIJ(Mat A)
1472 {
1473   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1474   int        ierr;
1475 
1476   PetscFunctionBegin;
1477   if (!a->rank) {
1478     ierr = MatPrintHelp_SeqAIJ(a->A);CHKERRQ(ierr);
1479   }
1480   PetscFunctionReturn(0);
1481 }
1482 
1483 #undef __FUNC__
1484 #define __FUNC__ "MatGetBlockSize_MPIAIJ"
1485 int MatGetBlockSize_MPIAIJ(Mat A,int *bs)
1486 {
1487   PetscFunctionBegin;
1488   *bs = 1;
1489   PetscFunctionReturn(0);
1490 }
1491 #undef __FUNC__
1492 #define __FUNC__ "MatSetUnfactored_MPIAIJ"
1493 int MatSetUnfactored_MPIAIJ(Mat A)
1494 {
1495   Mat_MPIAIJ *a   = (Mat_MPIAIJ*) A->data;
1496   int        ierr;
1497 
1498   PetscFunctionBegin;
1499   ierr = MatSetUnfactored(a->A); CHKERRQ(ierr);
1500   PetscFunctionReturn(0);
1501 }
1502 
1503 #undef __FUNC__
1504 #define __FUNC__ "MatEqual_MPIAIJ"
1505 int MatEqual_MPIAIJ(Mat A, Mat B, PetscTruth *flag)
1506 {
1507   Mat_MPIAIJ *matB = (Mat_MPIAIJ *) B->data,*matA = (Mat_MPIAIJ *) A->data;
1508   Mat        a, b, c, d;
1509   PetscTruth flg;
1510   int        ierr;
1511 
1512   PetscFunctionBegin;
1513   if (B->type != MATMPIAIJ) SETERRQ(PETSC_ERR_ARG_INCOMP,0,"Matrices must be same type");
1514   a = matA->A; b = matA->B;
1515   c = matB->A; d = matB->B;
1516 
1517   ierr = MatEqual(a, c, &flg); CHKERRQ(ierr);
1518   if (flg == PETSC_TRUE) {
1519     ierr = MatEqual(b, d, &flg); CHKERRQ(ierr);
1520   }
1521   ierr = MPI_Allreduce(&flg, flag, 1, MPI_INT, MPI_LAND, A->comm);CHKERRQ(ierr);
1522   PetscFunctionReturn(0);
1523 }
1524 
1525 extern int MatConvertSameType_MPIAIJ(Mat,Mat *,int);
1526 extern int MatIncreaseOverlap_MPIAIJ(Mat , int, IS *, int);
1527 extern int MatFDColoringCreate_MPIAIJ(Mat,ISColoring,MatFDColoring);
1528 extern int MatGetSubMatrices_MPIAIJ (Mat ,int , IS *,IS *,MatGetSubMatrixCall,Mat **);
1529 extern int MatGetSubMatrix_MPIAIJ (Mat ,IS,IS,int,MatGetSubMatrixCall,Mat *);
1530 
1531 /* -------------------------------------------------------------------*/
1532 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1533        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1534        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1535        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1536        0,0,
1537        0,0,
1538        0,0,
1539        MatRelax_MPIAIJ,
1540        MatTranspose_MPIAIJ,
1541        MatGetInfo_MPIAIJ,MatEqual_MPIAIJ,
1542        MatGetDiagonal_MPIAIJ,MatDiagonalScale_MPIAIJ,MatNorm_MPIAIJ,
1543        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1544        0,
1545        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,
1546        0,0,0,0,
1547        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1548        0,0,
1549        0,0,MatConvertSameType_MPIAIJ,0,0,
1550        0,0,0,
1551        MatGetSubMatrices_MPIAIJ,MatIncreaseOverlap_MPIAIJ,MatGetValues_MPIAIJ,0,
1552        MatPrintHelp_MPIAIJ,
1553        MatScale_MPIAIJ,0,0,0,
1554        MatGetBlockSize_MPIAIJ,0,0,0,0,
1555        MatFDColoringCreate_MPIAIJ,0,MatSetUnfactored_MPIAIJ,
1556        0,0,MatGetSubMatrix_MPIAIJ};
1557 
1558 
1559 #undef __FUNC__
1560 #define __FUNC__ "MatCreateMPIAIJ"
1561 /*@C
1562    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1563    (the default parallel PETSc format).  For good matrix assembly performance
1564    the user should preallocate the matrix storage by setting the parameters
1565    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1566    performance can be increased by more than a factor of 50.
1567 
1568    Collective on MPI_Comm
1569 
1570    Input Parameters:
1571 +  comm - MPI communicator
1572 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1573            This value should be the same as the local size used in creating the
1574            y vector for the matrix-vector product y = Ax.
1575 .  n - This value should be the same as the local size used in creating the
1576        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
1577        calculated if N is given) For square matrices n is almost always m.
1578 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1579 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1580 .  d_nz - number of nonzeros per row in diagonal portion of local submatrix
1581            (same for all local rows)
1582 .  d_nzz - array containing the number of nonzeros in the various rows of the
1583            diagonal portion of the local submatrix (possibly different for each row)
1584            or PETSC_NULL. You must leave room for the diagonal entry even if
1585            it is zero.
1586 .  o_nz - number of nonzeros per row in the off-diagonal portion of local
1587            submatrix (same for all local rows).
1588 -  o_nzz - array containing the number of nonzeros in the various rows of the
1589            off-diagonal portion of the local submatrix (possibly different for
1590            each row) or PETSC_NULL.
1591 
1592    Output Parameter:
1593 .  A - the matrix
1594 
1595    Notes:
1596    The AIJ format (also called the Yale sparse matrix format or
1597    compressed row storage), is fully compatible with standard Fortran 77
1598    storage.  That is, the stored row and column indices can begin at
1599    either one (as in Fortran) or zero.  See the users manual for details.
1600 
1601    The user MUST specify either the local or global matrix dimensions
1602    (possibly both).
1603 
1604    By default, this format uses inodes (identical nodes) when possible.
1605    We search for consecutive rows with the same nonzero structure, thereby
1606    reusing matrix information to achieve increased efficiency.
1607 
1608    Options Database Keys:
1609 +  -mat_aij_no_inode  - Do not use inodes
1610 .  -mat_aij_inode_limit <limit> - Sets inode limit (max limit=5)
1611 -  -mat_aij_oneindex - Internally use indexing starting at 1
1612         rather than 0.  Note that when calling MatSetValues(),
1613         the user still MUST index entries starting at 0!
1614 
1615    Storage Information:
1616    For a square global matrix we define each processor's diagonal portion
1617    to be its local rows and the corresponding columns (a square submatrix);
1618    each processor's off-diagonal portion encompasses the remainder of the
1619    local matrix (a rectangular submatrix).
1620 
1621    The user can specify preallocated storage for the diagonal part of
1622    the local submatrix with either d_nz or d_nnz (not both).  Set
1623    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1624    memory allocation.  Likewise, specify preallocated storage for the
1625    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1626 
1627    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1628    the figure below we depict these three local rows and all columns (0-11).
1629 
1630 .vb
1631              0 1 2 3 4 5 6 7 8 9 10 11
1632             -------------------
1633      row 3  |  o o o d d d o o o o o o
1634      row 4  |  o o o d d d o o o o o o
1635      row 5  |  o o o d d d o o o o o o
1636             -------------------
1637 .ve
1638 
1639    Thus, any entries in the d locations are stored in the d (diagonal)
1640    submatrix, and any entries in the o locations are stored in the
1641    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1642    stored simply in the MATSEQAIJ format for compressed row storage.
1643 
1644    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1645    and o_nz should indicate the number of nonzeros per row in the o matrix.
1646    In general, for PDE problems in which most nonzeros are near the diagonal,
1647    one expects d_nz >> o_nz. For large problems you MUST preallocate memory
1648    or you will get TERRIBLE performance; see the users' manual chapter on
1649    matrices.
1650 
1651 .keywords: matrix, aij, compressed row, sparse, parallel
1652 
1653 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues()
1654 @*/
1655 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1656 {
1657   Mat          B;
1658   Mat_MPIAIJ   *b;
1659   int          ierr, i,sum[2],work[2],size;
1660 
1661   PetscFunctionBegin;
1662   MPI_Comm_size(comm,&size);
1663   if (size == 1) {
1664     if (M == PETSC_DECIDE) M = m;
1665     if (N == PETSC_DECIDE) N = n;
1666     ierr = MatCreateSeqAIJ(comm,M,N,d_nz,d_nnz,A); CHKERRQ(ierr);
1667     PetscFunctionReturn(0);
1668   }
1669 
1670   *A = 0;
1671   PetscHeaderCreate(B,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,comm,MatDestroy,MatView);
1672   PLogObjectCreate(B);
1673   B->data       = (void *) (b = PetscNew(Mat_MPIAIJ)); CHKPTRQ(b);
1674   PetscMemzero(b,sizeof(Mat_MPIAIJ));
1675   PetscMemcpy(B->ops,&MatOps,sizeof(struct _MatOps));
1676   B->ops->destroy    = MatDestroy_MPIAIJ;
1677   B->ops->view       = MatView_MPIAIJ;
1678   B->factor     = 0;
1679   B->assembled  = PETSC_FALSE;
1680   B->mapping    = 0;
1681 
1682   B->insertmode = NOT_SET_VALUES;
1683   b->size       = size;
1684   MPI_Comm_rank(comm,&b->rank);
1685 
1686   if (m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL)) {
1687     SETERRQ(PETSC_ERR_ARG_WRONG,0,"Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1688   }
1689 
1690   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1691     work[0] = m; work[1] = n;
1692     ierr = MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );CHKERRQ(ierr);
1693     if (M == PETSC_DECIDE) M = sum[0];
1694     if (N == PETSC_DECIDE) N = sum[1];
1695   }
1696   if (m == PETSC_DECIDE) {m = M/b->size + ((M % b->size) > b->rank);}
1697   if (n == PETSC_DECIDE) {n = N/b->size + ((N % b->size) > b->rank);}
1698   b->m = m; B->m = m;
1699   b->n = n; B->n = n;
1700   b->N = N; B->N = N;
1701   b->M = M; B->M = M;
1702 
1703   /* build local table of row and column ownerships */
1704   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1705   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1706   b->cowners = b->rowners + b->size + 2;
1707   ierr = MPI_Allgather(&m,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1708   b->rowners[0] = 0;
1709   for ( i=2; i<=b->size; i++ ) {
1710     b->rowners[i] += b->rowners[i-1];
1711   }
1712   b->rstart = b->rowners[b->rank];
1713   b->rend   = b->rowners[b->rank+1];
1714   ierr = MPI_Allgather(&n,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1715   b->cowners[0] = 0;
1716   for ( i=2; i<=b->size; i++ ) {
1717     b->cowners[i] += b->cowners[i-1];
1718   }
1719   b->cstart = b->cowners[b->rank];
1720   b->cend   = b->cowners[b->rank+1];
1721 
1722   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1723   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1724   PLogObjectParent(B,b->A);
1725   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1726   ierr = MatCreateSeqAIJ(PETSC_COMM_SELF,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1727   PLogObjectParent(B,b->B);
1728 
1729   /* build cache for off array entries formed */
1730   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1731   b->donotstash  = 0;
1732   b->colmap      = 0;
1733   b->garray      = 0;
1734   b->roworiented = 1;
1735 
1736   /* stuff used for matrix vector multiply */
1737   b->lvec      = 0;
1738   b->Mvctx     = 0;
1739 
1740   /* stuff for MatGetRow() */
1741   b->rowindices   = 0;
1742   b->rowvalues    = 0;
1743   b->getrowactive = PETSC_FALSE;
1744 
1745   *A = B;
1746   PetscFunctionReturn(0);
1747 }
1748 
1749 #undef __FUNC__
1750 #define __FUNC__ "MatConvertSameType_MPIAIJ"
1751 int MatConvertSameType_MPIAIJ(Mat matin,Mat *newmat,int cpvalues)
1752 {
1753   Mat        mat;
1754   Mat_MPIAIJ *a,*oldmat = (Mat_MPIAIJ *) matin->data;
1755   int        ierr, len=0, flg;
1756 
1757   PetscFunctionBegin;
1758   *newmat       = 0;
1759   PetscHeaderCreate(mat,_p_Mat,struct _MatOps,MAT_COOKIE,MATMPIAIJ,matin->comm,MatDestroy,MatView);
1760   PLogObjectCreate(mat);
1761   mat->data       = (void *) (a = PetscNew(Mat_MPIAIJ)); CHKPTRQ(a);
1762   PetscMemcpy(mat->ops,&MatOps,sizeof(struct _MatOps));
1763   mat->ops->destroy    = MatDestroy_MPIAIJ;
1764   mat->ops->view       = MatView_MPIAIJ;
1765   mat->factor     = matin->factor;
1766   mat->assembled  = PETSC_TRUE;
1767 
1768   a->m = mat->m   = oldmat->m;
1769   a->n = mat->n   = oldmat->n;
1770   a->M = mat->M   = oldmat->M;
1771   a->N = mat->N   = oldmat->N;
1772 
1773   a->rstart       = oldmat->rstart;
1774   a->rend         = oldmat->rend;
1775   a->cstart       = oldmat->cstart;
1776   a->cend         = oldmat->cend;
1777   a->size         = oldmat->size;
1778   a->rank         = oldmat->rank;
1779   mat->insertmode = NOT_SET_VALUES;
1780   a->rowvalues    = 0;
1781   a->getrowactive = PETSC_FALSE;
1782 
1783   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1784   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _p_Mat)+sizeof(Mat_MPIAIJ));
1785   a->cowners = a->rowners + a->size + 2;
1786   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1787   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1788   if (oldmat->colmap) {
1789     a->colmap = (int *) PetscMalloc((a->N)*sizeof(int));CHKPTRQ(a->colmap);
1790     PLogObjectMemory(mat,(a->N)*sizeof(int));
1791     PetscMemcpy(a->colmap,oldmat->colmap,(a->N)*sizeof(int));
1792   } else a->colmap = 0;
1793   if (oldmat->garray) {
1794     len = ((Mat_SeqAIJ *) (oldmat->B->data))->n;
1795     a->garray = (int *) PetscMalloc((len+1)*sizeof(int)); CHKPTRQ(a->garray);
1796     PLogObjectMemory(mat,len*sizeof(int));
1797     if (len) PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1798   } else a->garray = 0;
1799 
1800   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1801   PLogObjectParent(mat,a->lvec);
1802   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1803   PLogObjectParent(mat,a->Mvctx);
1804   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1805   PLogObjectParent(mat,a->A);
1806   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1807   PLogObjectParent(mat,a->B);
1808   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1809   if (flg) {
1810     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1811   }
1812   *newmat = mat;
1813   PetscFunctionReturn(0);
1814 }
1815 
1816 #include "sys.h"
1817 
1818 #undef __FUNC__
1819 #define __FUNC__ "MatLoad_MPIAIJ"
1820 int MatLoad_MPIAIJ(Viewer viewer,MatType type,Mat *newmat)
1821 {
1822   Mat          A;
1823   Scalar       *vals,*svals;
1824   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1825   MPI_Status   status;
1826   int          i, nz, ierr, j,rstart, rend, fd;
1827   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,maxnz,*cols;
1828   int          *ourlens,*sndcounts = 0,*procsnz = 0, *offlens,jj,*mycols,*smycols;
1829   int          tag = ((PetscObject)viewer)->tag;
1830 
1831   PetscFunctionBegin;
1832   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1833   if (!rank) {
1834     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1835     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT); CHKERRQ(ierr);
1836     if (header[0] != MAT_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"not matrix object");
1837     if (header[3] < 0) {
1838       SETERRQ(PETSC_ERR_FILE_UNEXPECTED,1,"Matrix in special format on disk, cannot load as MPIAIJ");
1839     }
1840   }
1841 
1842   ierr = MPI_Bcast(header+1,3,MPI_INT,0,comm);CHKERRQ(ierr);
1843   M = header[1]; N = header[2];
1844   /* determine ownership of all rows */
1845   m = M/size + ((M % size) > rank);
1846   rowners = (int *) PetscMalloc((size+2)*sizeof(int)); CHKPTRQ(rowners);
1847   ierr = MPI_Allgather(&m,1,MPI_INT,rowners+1,1,MPI_INT,comm);CHKERRQ(ierr);
1848   rowners[0] = 0;
1849   for ( i=2; i<=size; i++ ) {
1850     rowners[i] += rowners[i-1];
1851   }
1852   rstart = rowners[rank];
1853   rend   = rowners[rank+1];
1854 
1855   /* distribute row lengths to all processors */
1856   ourlens = (int*) PetscMalloc( 2*(rend-rstart)*sizeof(int) ); CHKPTRQ(ourlens);
1857   offlens = ourlens + (rend-rstart);
1858   if (!rank) {
1859     rowlengths = (int*) PetscMalloc( M*sizeof(int) ); CHKPTRQ(rowlengths);
1860     ierr = PetscBinaryRead(fd,rowlengths,M,PETSC_INT); CHKERRQ(ierr);
1861     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1862     for ( i=0; i<size; i++ ) sndcounts[i] = rowners[i+1] - rowners[i];
1863     ierr = MPI_Scatterv(rowlengths,sndcounts,rowners,MPI_INT,ourlens,rend-rstart,MPI_INT,0,comm);CHKERRQ(ierr);
1864     PetscFree(sndcounts);
1865   } else {
1866     ierr = MPI_Scatterv(0,0,0,MPI_INT,ourlens,rend-rstart,MPI_INT, 0,comm);CHKERRQ(ierr);
1867   }
1868 
1869   if (!rank) {
1870     /* calculate the number of nonzeros on each processor */
1871     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1872     PetscMemzero(procsnz,size*sizeof(int));
1873     for ( i=0; i<size; i++ ) {
1874       for ( j=rowners[i]; j< rowners[i+1]; j++ ) {
1875         procsnz[i] += rowlengths[j];
1876       }
1877     }
1878     PetscFree(rowlengths);
1879 
1880     /* determine max buffer needed and allocate it */
1881     maxnz = 0;
1882     for ( i=0; i<size; i++ ) {
1883       maxnz = PetscMax(maxnz,procsnz[i]);
1884     }
1885     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1886 
1887     /* read in my part of the matrix column indices  */
1888     nz     = procsnz[0];
1889     mycols = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1890     ierr   = PetscBinaryRead(fd,mycols,nz,PETSC_INT); CHKERRQ(ierr);
1891 
1892     /* read in every one elses and ship off */
1893     for ( i=1; i<size; i++ ) {
1894       nz   = procsnz[i];
1895       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT); CHKERRQ(ierr);
1896       ierr = MPI_Send(cols,nz,MPI_INT,i,tag,comm);CHKERRQ(ierr);
1897     }
1898     PetscFree(cols);
1899   } else {
1900     /* determine buffer space needed for message */
1901     nz = 0;
1902     for ( i=0; i<m; i++ ) {
1903       nz += ourlens[i];
1904     }
1905     mycols = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(mycols);
1906 
1907     /* receive message of column indices*/
1908     ierr = MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);CHKERRQ(ierr);
1909     ierr = MPI_Get_count(&status,MPI_INT,&maxnz);CHKERRQ(ierr);
1910     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1911   }
1912 
1913   /* loop over local rows, determining number of off diagonal entries */
1914   PetscMemzero(offlens,m*sizeof(int));
1915   jj = 0;
1916   for ( i=0; i<m; i++ ) {
1917     for ( j=0; j<ourlens[i]; j++ ) {
1918       if (mycols[jj] < rstart || mycols[jj] >= rend) offlens[i]++;
1919       jj++;
1920     }
1921   }
1922 
1923   /* create our matrix */
1924   for ( i=0; i<m; i++ ) {
1925     ourlens[i] -= offlens[i];
1926   }
1927   ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,M,N,0,ourlens,0,offlens,newmat);CHKERRQ(ierr);
1928   A = *newmat;
1929   MatSetOption(A,MAT_COLUMNS_SORTED);
1930   for ( i=0; i<m; i++ ) {
1931     ourlens[i] += offlens[i];
1932   }
1933 
1934   if (!rank) {
1935     vals = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(vals);
1936 
1937     /* read in my part of the matrix numerical values  */
1938     nz = procsnz[0];
1939     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1940 
1941     /* insert into matrix */
1942     jj      = rstart;
1943     smycols = mycols;
1944     svals   = vals;
1945     for ( i=0; i<m; i++ ) {
1946       ierr = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1947       smycols += ourlens[i];
1948       svals   += ourlens[i];
1949       jj++;
1950     }
1951 
1952     /* read in other processors and ship out */
1953     for ( i=1; i<size; i++ ) {
1954       nz   = procsnz[i];
1955       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR); CHKERRQ(ierr);
1956       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
1957     }
1958     PetscFree(procsnz);
1959   } else {
1960     /* receive numeric values */
1961     vals = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(vals);
1962 
1963     /* receive message of values*/
1964     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
1965     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
1966     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,0,"something is wrong with file");
1967 
1968     /* insert into matrix */
1969     jj      = rstart;
1970     smycols = mycols;
1971     svals   = vals;
1972     for ( i=0; i<m; i++ ) {
1973       ierr     = MatSetValues(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
1974       smycols += ourlens[i];
1975       svals   += ourlens[i];
1976       jj++;
1977     }
1978   }
1979   PetscFree(ourlens); PetscFree(vals); PetscFree(mycols); PetscFree(rowners);
1980 
1981   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1982   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1983   PetscFunctionReturn(0);
1984 }
1985 
1986 #undef __FUNC__
1987 #define __FUNC__ "MatGetSubMatrix_MPIAIJ"
1988 /*
1989     Not great since it makes two copies of the submatrix, first an SeqAIJ
1990   in local and then by concatenating the local matrices the end result.
1991   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
1992 */
1993 int MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,int csize,MatGetSubMatrixCall call,Mat *newmat)
1994 {
1995   int        ierr, i, m,n,rstart,row,rend,nz,*cwork,size,rank,j;
1996   Mat        *local,M, Mreuse;
1997   Scalar     *vwork,*aa;
1998   MPI_Comm   comm = mat->comm;
1999   Mat_SeqAIJ *aij;
2000   int        *ii, *jj,nlocal,*dlens,*olens,dlen,olen,jend;
2001 
2002   PetscFunctionBegin;
2003   MPI_Comm_rank(comm,&rank);
2004   MPI_Comm_size(comm,&size);
2005 
2006   if (call ==  MAT_REUSE_MATRIX) {
2007     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2008     if (!Mreuse) SETERRQ(1,1,"Submatrix passed in was not used before, cannot reuse");
2009     local = &Mreuse;
2010     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2011   } else {
2012     ierr = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2013     Mreuse = *local;
2014     PetscFree(local);
2015   }
2016 
2017   /*
2018       m - number of local rows
2019       n - number of columns (same on all processors)
2020       rstart - first row in new global matrix generated
2021   */
2022   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2023   if (call == MAT_INITIAL_MATRIX) {
2024     aij = (Mat_SeqAIJ *) (Mreuse)->data;
2025     if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2026     ii  = aij->i;
2027     jj  = aij->j;
2028 
2029     /*
2030         Determine the number of non-zeros in the diagonal and off-diagonal
2031         portions of the matrix in order to do correct preallocation
2032     */
2033 
2034     /* first get start and end of "diagonal" columns */
2035     if (csize == PETSC_DECIDE) {
2036       nlocal = n/size + ((n % size) > rank);
2037     } else {
2038       nlocal = csize;
2039     }
2040     ierr   = MPI_Scan(&nlocal,&rend,1,MPI_INT,MPI_SUM,comm);CHKERRQ(ierr);
2041     rstart = rend - nlocal;
2042     if (rank == size - 1 && rend != n) {
2043       SETERRQ(1,1,"Local column sizes do not add up to total number of columns");
2044     }
2045 
2046     /* next, compute all the lengths */
2047     dlens = (int *) PetscMalloc( (2*m+1)*sizeof(int) );CHKPTRQ(dlens);
2048     olens = dlens + m;
2049     for ( i=0; i<m; i++ ) {
2050       jend = ii[i+1] - ii[i];
2051       olen = 0;
2052       dlen = 0;
2053       for ( j=0; j<jend; j++ ) {
2054         if ( *jj < rstart || *jj >= rend) olen++;
2055         else dlen++;
2056         jj++;
2057       }
2058       olens[i] = olen;
2059       dlens[i] = dlen;
2060     }
2061     ierr = MatCreateMPIAIJ(comm,m,PETSC_DECIDE,PETSC_DECIDE,n,0,dlens,0,olens,&M);CHKERRQ(ierr);
2062     PetscFree(dlens);
2063   } else {
2064     int ml,nl;
2065 
2066     M = *newmat;
2067     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2068     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,1,"Previous matrix must be same size/layout as request");
2069     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2070     /*
2071          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2072        rather than the slower MatSetValues().
2073     */
2074     M->was_assembled = PETSC_TRUE;
2075     M->assembled     = PETSC_FALSE;
2076   }
2077   ierr = MatGetOwnershipRange(M,&rstart,&rend); CHKERRQ(ierr);
2078   aij = (Mat_SeqAIJ *) (Mreuse)->data;
2079   if (aij->indexshift) SETERRQ(PETSC_ERR_SUP,1,"No support for index shifted matrix");
2080   ii  = aij->i;
2081   jj  = aij->j;
2082   aa  = aij->a;
2083   for (i=0; i<m; i++) {
2084     row   = rstart + i;
2085     nz    = ii[i+1] - ii[i];
2086     cwork = jj;     jj += nz;
2087     vwork = aa;     aa += nz;
2088     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES); CHKERRQ(ierr);
2089   }
2090 
2091   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2092   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
2093   *newmat = M;
2094 
2095   /* save submatrix used in processor for next request */
2096   if (call ==  MAT_INITIAL_MATRIX) {
2097     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2098     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2099   }
2100 
2101   PetscFunctionReturn(0);
2102 }
2103