xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision 51ec2c115fd3adf68a3f90e2f617835be2dadbbf)
1 #define PETSCMAT_DLL
2 
3 #include "src/mat/impls/aij/mpi/mpiaij.h"   /*I "petscmat.h" I*/
4 #include "src/inline/spops.h"
5 
6 /*
7   Local utility routine that creates a mapping from the global column
8 number to the local number in the off-diagonal part of the local
9 storage of the matrix.  When PETSC_USE_CTABLE is used this is scalable at
10 a slightly higher hash table cost; without it it is not scalable (each processor
11 has an order N integer array but is fast to acess.
12 */
13 #undef __FUNCT__
14 #define __FUNCT__ "CreateColmap_MPIAIJ_Private"
15 PetscErrorCode CreateColmap_MPIAIJ_Private(Mat mat)
16 {
17   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
18   PetscErrorCode ierr;
19   PetscInt       n = aij->B->cmap.n,i;
20 
21   PetscFunctionBegin;
22 #if defined (PETSC_USE_CTABLE)
23   ierr = PetscTableCreate(n,&aij->colmap);CHKERRQ(ierr);
24   for (i=0; i<n; i++){
25     ierr = PetscTableAdd(aij->colmap,aij->garray[i]+1,i+1);CHKERRQ(ierr);
26   }
27 #else
28   ierr = PetscMalloc((mat->cmap.N+1)*sizeof(PetscInt),&aij->colmap);CHKERRQ(ierr);
29   ierr = PetscLogObjectMemory(mat,mat->cmap.N*sizeof(PetscInt));CHKERRQ(ierr);
30   ierr = PetscMemzero(aij->colmap,mat->cmap.N*sizeof(PetscInt));CHKERRQ(ierr);
31   for (i=0; i<n; i++) aij->colmap[aij->garray[i]] = i+1;
32 #endif
33   PetscFunctionReturn(0);
34 }
35 
36 
37 #define CHUNKSIZE   15
38 #define MatSetValues_SeqAIJ_A_Private(row,col,value,addv) \
39 { \
40     if (col <= lastcol1) low1 = 0; else high1 = nrow1; \
41     lastcol1 = col;\
42     while (high1-low1 > 5) { \
43       t = (low1+high1)/2; \
44       if (rp1[t] > col) high1 = t; \
45       else             low1  = t; \
46     } \
47       for (_i=low1; _i<high1; _i++) { \
48         if (rp1[_i] > col) break; \
49         if (rp1[_i] == col) { \
50           if (addv == ADD_VALUES) ap1[_i] += value;   \
51           else                    ap1[_i] = value; \
52           goto a_noinsert; \
53         } \
54       }  \
55       if (value == 0.0 && ignorezeroentries) goto a_noinsert; \
56       if (nonew == 1) goto a_noinsert; \
57       if (nonew == -1) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Inserting a new nonzero (%D, %D) into matrix", row, col); \
58       MatSeqXAIJReallocateAIJ(A,am,1,nrow1,row,col,rmax1,aa,ai,aj,rp1,ap1,aimax,nonew,MatScalar); \
59       N = nrow1++ - 1; a->nz++; high1++; \
60       /* shift up all the later entries in this row */ \
61       for (ii=N; ii>=_i; ii--) { \
62         rp1[ii+1] = rp1[ii]; \
63         ap1[ii+1] = ap1[ii]; \
64       } \
65       rp1[_i] = col;  \
66       ap1[_i] = value;  \
67       a_noinsert: ; \
68       ailen[row] = nrow1; \
69 }
70 
71 
72 #define MatSetValues_SeqAIJ_B_Private(row,col,value,addv) \
73 { \
74     if (col <= lastcol2) low2 = 0; else high2 = nrow2; \
75     lastcol2 = col;\
76     while (high2-low2 > 5) { \
77       t = (low2+high2)/2; \
78       if (rp2[t] > col) high2 = t; \
79       else             low2  = t; \
80     } \
81        for (_i=low2; _i<high2; _i++) { \
82         if (rp2[_i] > col) break; \
83         if (rp2[_i] == col) { \
84           if (addv == ADD_VALUES) ap2[_i] += value;   \
85           else                    ap2[_i] = value; \
86           goto b_noinsert; \
87         } \
88       }  \
89       if (value == 0.0 && ignorezeroentries) goto b_noinsert; \
90       if (nonew == 1) goto b_noinsert; \
91       if (nonew == -1) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Inserting a new nonzero (%D, %D) into matrix", row, col); \
92       MatSeqXAIJReallocateAIJ(B,bm,1,nrow2,row,col,rmax2,ba,bi,bj,rp2,ap2,bimax,nonew,MatScalar); \
93       N = nrow2++ - 1; b->nz++; high2++;\
94       /* shift up all the later entries in this row */ \
95       for (ii=N; ii>=_i; ii--) { \
96         rp2[ii+1] = rp2[ii]; \
97         ap2[ii+1] = ap2[ii]; \
98       } \
99       rp2[_i] = col;  \
100       ap2[_i] = value;  \
101       b_noinsert: ; \
102       bilen[row] = nrow2; \
103 }
104 
105 #undef __FUNCT__
106 #define __FUNCT__ "MatSetValuesRow_MPIAIJ"
107 PetscErrorCode MatSetValuesRow_MPIAIJ(Mat A,PetscInt row,const PetscScalar v[])
108 {
109   Mat_MPIAIJ     *mat = (Mat_MPIAIJ*)A->data;
110   Mat_SeqAIJ     *a = (Mat_SeqAIJ*)mat->A->data,*b = (Mat_SeqAIJ*)mat->B->data;
111   PetscErrorCode ierr;
112   PetscInt       l,*garray = mat->garray,diag;
113 
114   PetscFunctionBegin;
115   /* code only works for square matrices A */
116 
117   /* find size of row to the left of the diagonal part */
118   ierr = MatGetOwnershipRange(A,&diag,0);CHKERRQ(ierr);
119   row  = row - diag;
120   for (l=0; l<b->i[row+1]-b->i[row]; l++) {
121     if (garray[b->j[b->i[row]+l]] > diag) break;
122   }
123   ierr = PetscMemcpy(b->a+b->i[row],v,l*sizeof(PetscScalar));CHKERRQ(ierr);
124 
125   /* diagonal part */
126   ierr = PetscMemcpy(a->a+a->i[row],v+l,(a->i[row+1]-a->i[row])*sizeof(PetscScalar));CHKERRQ(ierr);
127 
128   /* right of diagonal part */
129   ierr = PetscMemcpy(b->a+b->i[row]+l,v+l+a->i[row+1]-a->i[row],(b->i[row+1]-b->i[row]-l)*sizeof(PetscScalar));CHKERRQ(ierr);
130   PetscFunctionReturn(0);
131 }
132 
133 #undef __FUNCT__
134 #define __FUNCT__ "MatSetValues_MPIAIJ"
135 PetscErrorCode MatSetValues_MPIAIJ(Mat mat,PetscInt m,const PetscInt im[],PetscInt n,const PetscInt in[],const PetscScalar v[],InsertMode addv)
136 {
137   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
138   PetscScalar    value;
139   PetscErrorCode ierr;
140   PetscInt       i,j,rstart = mat->rmap.rstart,rend = mat->rmap.rend;
141   PetscInt       cstart = mat->cmap.rstart,cend = mat->cmap.rend,row,col;
142   PetscTruth     roworiented = aij->roworiented;
143 
144   /* Some Variables required in the macro */
145   Mat            A = aij->A;
146   Mat_SeqAIJ     *a = (Mat_SeqAIJ*)A->data;
147   PetscInt       *aimax = a->imax,*ai = a->i,*ailen = a->ilen,*aj = a->j;
148   PetscScalar    *aa = a->a;
149   PetscTruth     ignorezeroentries = a->ignorezeroentries;
150   Mat            B = aij->B;
151   Mat_SeqAIJ     *b = (Mat_SeqAIJ*)B->data;
152   PetscInt       *bimax = b->imax,*bi = b->i,*bilen = b->ilen,*bj = b->j,bm = aij->B->rmap.n,am = aij->A->rmap.n;
153   PetscScalar    *ba = b->a;
154 
155   PetscInt       *rp1,*rp2,ii,nrow1,nrow2,_i,rmax1,rmax2,N,low1,high1,low2,high2,t,lastcol1,lastcol2;
156   PetscInt       nonew = a->nonew;
157   PetscScalar    *ap1,*ap2;
158 
159   PetscFunctionBegin;
160   for (i=0; i<m; i++) {
161     if (im[i] < 0) continue;
162 #if defined(PETSC_USE_DEBUG)
163     if (im[i] >= mat->rmap.N) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Row too large: row %D max %D",im[i],mat->rmap.N-1);
164 #endif
165     if (im[i] >= rstart && im[i] < rend) {
166       row      = im[i] - rstart;
167       lastcol1 = -1;
168       rp1      = aj + ai[row];
169       ap1      = aa + ai[row];
170       rmax1    = aimax[row];
171       nrow1    = ailen[row];
172       low1     = 0;
173       high1    = nrow1;
174       lastcol2 = -1;
175       rp2      = bj + bi[row];
176       ap2      = ba + bi[row];
177       rmax2    = bimax[row];
178       nrow2    = bilen[row];
179       low2     = 0;
180       high2    = nrow2;
181 
182       for (j=0; j<n; j++) {
183         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
184         if (ignorezeroentries && value == 0.0 && (addv == ADD_VALUES)) continue;
185         if (in[j] >= cstart && in[j] < cend){
186           col = in[j] - cstart;
187           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
188         } else if (in[j] < 0) continue;
189 #if defined(PETSC_USE_DEBUG)
190         else if (in[j] >= mat->cmap.N) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Column too large: col %D max %D",in[j],mat->cmap.N-1);}
191 #endif
192         else {
193           if (mat->was_assembled) {
194             if (!aij->colmap) {
195               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
196             }
197 #if defined (PETSC_USE_CTABLE)
198             ierr = PetscTableFind(aij->colmap,in[j]+1,&col);CHKERRQ(ierr);
199 	    col--;
200 #else
201             col = aij->colmap[in[j]] - 1;
202 #endif
203             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
204               ierr = DisAssemble_MPIAIJ(mat);CHKERRQ(ierr);
205               col =  in[j];
206               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
207               B = aij->B;
208               b = (Mat_SeqAIJ*)B->data;
209               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
210               rp2      = bj + bi[row];
211               ap2      = ba + bi[row];
212               rmax2    = bimax[row];
213               nrow2    = bilen[row];
214               low2     = 0;
215               high2    = nrow2;
216               bm       = aij->B->rmap.n;
217               ba = b->a;
218             }
219           } else col = in[j];
220           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
221         }
222       }
223     } else {
224       if (!aij->donotstash) {
225         if (roworiented) {
226           if (ignorezeroentries && v[i*n] == 0.0) continue;
227           ierr = MatStashValuesRow_Private(&mat->stash,im[i],n,in,v+i*n);CHKERRQ(ierr);
228         } else {
229           if (ignorezeroentries && v[i] == 0.0) continue;
230           ierr = MatStashValuesCol_Private(&mat->stash,im[i],n,in,v+i,m);CHKERRQ(ierr);
231         }
232       }
233     }
234   }
235   PetscFunctionReturn(0);
236 }
237 
238 #undef __FUNCT__
239 #define __FUNCT__ "MatGetValues_MPIAIJ"
240 PetscErrorCode MatGetValues_MPIAIJ(Mat mat,PetscInt m,const PetscInt idxm[],PetscInt n,const PetscInt idxn[],PetscScalar v[])
241 {
242   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
243   PetscErrorCode ierr;
244   PetscInt       i,j,rstart = mat->rmap.rstart,rend = mat->rmap.rend;
245   PetscInt       cstart = mat->cmap.rstart,cend = mat->cmap.rend,row,col;
246 
247   PetscFunctionBegin;
248   for (i=0; i<m; i++) {
249     if (idxm[i] < 0) continue; /* SETERRQ1(PETSC_ERR_ARG_OUTOFRANGE,"Negative row: %D",idxm[i]);*/
250     if (idxm[i] >= mat->rmap.N) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Row too large: row %D max %D",idxm[i],mat->rmap.N-1);
251     if (idxm[i] >= rstart && idxm[i] < rend) {
252       row = idxm[i] - rstart;
253       for (j=0; j<n; j++) {
254         if (idxn[j] < 0) continue; /* SETERRQ1(PETSC_ERR_ARG_OUTOFRANGE,"Negative column: %D",idxn[j]); */
255         if (idxn[j] >= mat->cmap.N) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Column too large: col %D max %D",idxn[j],mat->cmap.N-1);
256         if (idxn[j] >= cstart && idxn[j] < cend){
257           col = idxn[j] - cstart;
258           ierr = MatGetValues(aij->A,1,&row,1,&col,v+i*n+j);CHKERRQ(ierr);
259         } else {
260           if (!aij->colmap) {
261             ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
262           }
263 #if defined (PETSC_USE_CTABLE)
264           ierr = PetscTableFind(aij->colmap,idxn[j]+1,&col);CHKERRQ(ierr);
265           col --;
266 #else
267           col = aij->colmap[idxn[j]] - 1;
268 #endif
269           if ((col < 0) || (aij->garray[col] != idxn[j])) *(v+i*n+j) = 0.0;
270           else {
271             ierr = MatGetValues(aij->B,1,&row,1,&col,v+i*n+j);CHKERRQ(ierr);
272           }
273         }
274       }
275     } else {
276       SETERRQ(PETSC_ERR_SUP,"Only local values currently supported");
277     }
278   }
279   PetscFunctionReturn(0);
280 }
281 
282 #undef __FUNCT__
283 #define __FUNCT__ "MatAssemblyBegin_MPIAIJ"
284 PetscErrorCode MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
285 {
286   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
287   PetscErrorCode ierr;
288   PetscInt       nstash,reallocs;
289   InsertMode     addv;
290 
291   PetscFunctionBegin;
292   if (aij->donotstash) {
293     PetscFunctionReturn(0);
294   }
295 
296   /* make sure all processors are either in INSERTMODE or ADDMODE */
297   ierr = MPI_Allreduce(&mat->insertmode,&addv,1,MPI_INT,MPI_BOR,mat->comm);CHKERRQ(ierr);
298   if (addv == (ADD_VALUES|INSERT_VALUES)) {
299     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"Some processors inserted others added");
300   }
301   mat->insertmode = addv; /* in case this processor had no cache */
302 
303   ierr = MatStashScatterBegin_Private(mat,&mat->stash,mat->rmap.range);CHKERRQ(ierr);
304   ierr = MatStashGetInfo_Private(&mat->stash,&nstash,&reallocs);CHKERRQ(ierr);
305   ierr = PetscInfo2(aij->A,"Stash has %D entries, uses %D mallocs.\n",nstash,reallocs);CHKERRQ(ierr);
306   PetscFunctionReturn(0);
307 }
308 
309 #undef __FUNCT__
310 #define __FUNCT__ "MatAssemblyEnd_MPIAIJ"
311 PetscErrorCode MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
312 {
313   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
314   Mat_SeqAIJ     *a=(Mat_SeqAIJ *)aij->A->data;
315   PetscErrorCode ierr;
316   PetscMPIInt    n;
317   PetscInt       i,j,rstart,ncols,flg;
318   PetscInt       *row,*col,other_disassembled;
319   PetscScalar    *val;
320   InsertMode     addv = mat->insertmode;
321 
322   /* do not use 'b = (Mat_SeqAIJ *)aij->B->data' as B can be reset in disassembly */
323   PetscFunctionBegin;
324   if (!aij->donotstash) {
325     while (1) {
326       ierr = MatStashScatterGetMesg_Private(&mat->stash,&n,&row,&col,&val,&flg);CHKERRQ(ierr);
327       if (!flg) break;
328 
329       for (i=0; i<n;) {
330         /* Now identify the consecutive vals belonging to the same row */
331         for (j=i,rstart=row[j]; j<n; j++) { if (row[j] != rstart) break; }
332         if (j < n) ncols = j-i;
333         else       ncols = n-i;
334         /* Now assemble all these values with a single function call */
335         ierr = MatSetValues_MPIAIJ(mat,1,row+i,ncols,col+i,val+i,addv);CHKERRQ(ierr);
336         i = j;
337       }
338     }
339     ierr = MatStashScatterEnd_Private(&mat->stash);CHKERRQ(ierr);
340   }
341   a->compressedrow.use     = PETSC_FALSE;
342   ierr = MatAssemblyBegin(aij->A,mode);CHKERRQ(ierr);
343   ierr = MatAssemblyEnd(aij->A,mode);CHKERRQ(ierr);
344 
345   /* determine if any processor has disassembled, if so we must
346      also disassemble ourselfs, in order that we may reassemble. */
347   /*
348      if nonzero structure of submatrix B cannot change then we know that
349      no processor disassembled thus we can skip this stuff
350   */
351   if (!((Mat_SeqAIJ*)aij->B->data)->nonew)  {
352     ierr = MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);CHKERRQ(ierr);
353     if (mat->was_assembled && !other_disassembled) {
354       ierr = DisAssemble_MPIAIJ(mat);CHKERRQ(ierr);
355     }
356   }
357   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
358     ierr = MatSetUpMultiply_MPIAIJ(mat);CHKERRQ(ierr);
359   }
360   ierr = MatSetOption(aij->B,MAT_USE_INODES,PETSC_FALSE);CHKERRQ(ierr);
361   ((Mat_SeqAIJ *)aij->B->data)->compressedrow.use = PETSC_TRUE; /* b->compressedrow.use */
362   ierr = MatAssemblyBegin(aij->B,mode);CHKERRQ(ierr);
363   ierr = MatAssemblyEnd(aij->B,mode);CHKERRQ(ierr);
364 
365   ierr = PetscFree(aij->rowvalues);CHKERRQ(ierr);
366   aij->rowvalues = 0;
367 
368   /* used by MatAXPY() */
369   a->xtoy = 0; ((Mat_SeqAIJ *)aij->B->data)->xtoy = 0;  /* b->xtoy = 0 */
370   a->XtoY = 0; ((Mat_SeqAIJ *)aij->B->data)->XtoY = 0;  /* b->XtoY = 0 */
371 
372   PetscFunctionReturn(0);
373 }
374 
375 #undef __FUNCT__
376 #define __FUNCT__ "MatZeroEntries_MPIAIJ"
377 PetscErrorCode MatZeroEntries_MPIAIJ(Mat A)
378 {
379   Mat_MPIAIJ     *l = (Mat_MPIAIJ*)A->data;
380   PetscErrorCode ierr;
381 
382   PetscFunctionBegin;
383   ierr = MatZeroEntries(l->A);CHKERRQ(ierr);
384   ierr = MatZeroEntries(l->B);CHKERRQ(ierr);
385   PetscFunctionReturn(0);
386 }
387 
388 #undef __FUNCT__
389 #define __FUNCT__ "MatZeroRows_MPIAIJ"
390 PetscErrorCode MatZeroRows_MPIAIJ(Mat A,PetscInt N,const PetscInt rows[],PetscScalar diag)
391 {
392   Mat_MPIAIJ     *l = (Mat_MPIAIJ*)A->data;
393   PetscErrorCode ierr;
394   PetscMPIInt    size = l->size,imdex,n,rank = l->rank,tag = A->tag,lastidx = -1;
395   PetscInt       i,*owners = A->rmap.range;
396   PetscInt       *nprocs,j,idx,nsends,row;
397   PetscInt       nmax,*svalues,*starts,*owner,nrecvs;
398   PetscInt       *rvalues,count,base,slen,*source;
399   PetscInt       *lens,*lrows,*values,rstart=A->rmap.rstart;
400   MPI_Comm       comm = A->comm;
401   MPI_Request    *send_waits,*recv_waits;
402   MPI_Status     recv_status,*send_status;
403 #if defined(PETSC_DEBUG)
404   PetscTruth     found = PETSC_FALSE;
405 #endif
406 
407   PetscFunctionBegin;
408   /*  first count number of contributors to each processor */
409   ierr = PetscMalloc(2*size*sizeof(PetscInt),&nprocs);CHKERRQ(ierr);
410   ierr = PetscMemzero(nprocs,2*size*sizeof(PetscInt));CHKERRQ(ierr);
411   ierr = PetscMalloc((N+1)*sizeof(PetscInt),&owner);CHKERRQ(ierr); /* see note*/
412   j = 0;
413   for (i=0; i<N; i++) {
414     if (lastidx > (idx = rows[i])) j = 0;
415     lastidx = idx;
416     for (; j<size; j++) {
417       if (idx >= owners[j] && idx < owners[j+1]) {
418         nprocs[2*j]++;
419         nprocs[2*j+1] = 1;
420         owner[i] = j;
421 #if defined(PETSC_DEBUG)
422         found = PETSC_TRUE;
423 #endif
424         break;
425       }
426     }
427 #if defined(PETSC_DEBUG)
428     if (!found) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Index out of range");
429     found = PETSC_FALSE;
430 #endif
431   }
432   nsends = 0;  for (i=0; i<size; i++) { nsends += nprocs[2*i+1];}
433 
434   /* inform other processors of number of messages and max length*/
435   ierr = PetscMaxSum(comm,nprocs,&nmax,&nrecvs);CHKERRQ(ierr);
436 
437   /* post receives:   */
438   ierr = PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(PetscInt),&rvalues);CHKERRQ(ierr);
439   ierr = PetscMalloc((nrecvs+1)*sizeof(MPI_Request),&recv_waits);CHKERRQ(ierr);
440   for (i=0; i<nrecvs; i++) {
441     ierr = MPI_Irecv(rvalues+nmax*i,nmax,MPIU_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);CHKERRQ(ierr);
442   }
443 
444   /* do sends:
445       1) starts[i] gives the starting index in svalues for stuff going to
446          the ith processor
447   */
448   ierr = PetscMalloc((N+1)*sizeof(PetscInt),&svalues);CHKERRQ(ierr);
449   ierr = PetscMalloc((nsends+1)*sizeof(MPI_Request),&send_waits);CHKERRQ(ierr);
450   ierr = PetscMalloc((size+1)*sizeof(PetscInt),&starts);CHKERRQ(ierr);
451   starts[0] = 0;
452   for (i=1; i<size; i++) { starts[i] = starts[i-1] + nprocs[2*i-2];}
453   for (i=0; i<N; i++) {
454     svalues[starts[owner[i]]++] = rows[i];
455   }
456 
457   starts[0] = 0;
458   for (i=1; i<size+1; i++) { starts[i] = starts[i-1] + nprocs[2*i-2];}
459   count = 0;
460   for (i=0; i<size; i++) {
461     if (nprocs[2*i+1]) {
462       ierr = MPI_Isend(svalues+starts[i],nprocs[2*i],MPIU_INT,i,tag,comm,send_waits+count++);CHKERRQ(ierr);
463     }
464   }
465   ierr = PetscFree(starts);CHKERRQ(ierr);
466 
467   base = owners[rank];
468 
469   /*  wait on receives */
470   ierr   = PetscMalloc(2*(nrecvs+1)*sizeof(PetscInt),&lens);CHKERRQ(ierr);
471   source = lens + nrecvs;
472   count  = nrecvs; slen = 0;
473   while (count) {
474     ierr = MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);CHKERRQ(ierr);
475     /* unpack receives into our local space */
476     ierr = MPI_Get_count(&recv_status,MPIU_INT,&n);CHKERRQ(ierr);
477     source[imdex]  = recv_status.MPI_SOURCE;
478     lens[imdex]    = n;
479     slen          += n;
480     count--;
481   }
482   ierr = PetscFree(recv_waits);CHKERRQ(ierr);
483 
484   /* move the data into the send scatter */
485   ierr = PetscMalloc((slen+1)*sizeof(PetscInt),&lrows);CHKERRQ(ierr);
486   count = 0;
487   for (i=0; i<nrecvs; i++) {
488     values = rvalues + i*nmax;
489     for (j=0; j<lens[i]; j++) {
490       lrows[count++] = values[j] - base;
491     }
492   }
493   ierr = PetscFree(rvalues);CHKERRQ(ierr);
494   ierr = PetscFree(lens);CHKERRQ(ierr);
495   ierr = PetscFree(owner);CHKERRQ(ierr);
496   ierr = PetscFree(nprocs);CHKERRQ(ierr);
497 
498   /* actually zap the local rows */
499   /*
500         Zero the required rows. If the "diagonal block" of the matrix
501      is square and the user wishes to set the diagonal we use separate
502      code so that MatSetValues() is not called for each diagonal allocating
503      new memory, thus calling lots of mallocs and slowing things down.
504 
505        Contributed by: Matthew Knepley
506   */
507   /* must zero l->B before l->A because the (diag) case below may put values into l->B*/
508   ierr = MatZeroRows(l->B,slen,lrows,0.0);CHKERRQ(ierr);
509   if ((diag != 0.0) && (l->A->rmap.N == l->A->cmap.N)) {
510     ierr      = MatZeroRows(l->A,slen,lrows,diag);CHKERRQ(ierr);
511   } else if (diag != 0.0) {
512     ierr = MatZeroRows(l->A,slen,lrows,0.0);CHKERRQ(ierr);
513     if (((Mat_SeqAIJ*)l->A->data)->nonew) {
514       SETERRQ(PETSC_ERR_SUP,"MatZeroRows() on rectangular matrices cannot be used with the Mat options\n\
515 MAT_NEW_NONZERO_LOCATIONS,MAT_NEW_NONZERO_LOCATION_ERR,MAT_NEW_NONZERO_ALLOCATION_ERR");
516     }
517     for (i = 0; i < slen; i++) {
518       row  = lrows[i] + rstart;
519       ierr = MatSetValues(A,1,&row,1,&row,&diag,INSERT_VALUES);CHKERRQ(ierr);
520     }
521     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
522     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
523   } else {
524     ierr = MatZeroRows(l->A,slen,lrows,0.0);CHKERRQ(ierr);
525   }
526   ierr = PetscFree(lrows);CHKERRQ(ierr);
527 
528   /* wait on sends */
529   if (nsends) {
530     ierr = PetscMalloc(nsends*sizeof(MPI_Status),&send_status);CHKERRQ(ierr);
531     ierr = MPI_Waitall(nsends,send_waits,send_status);CHKERRQ(ierr);
532     ierr = PetscFree(send_status);CHKERRQ(ierr);
533   }
534   ierr = PetscFree(send_waits);CHKERRQ(ierr);
535   ierr = PetscFree(svalues);CHKERRQ(ierr);
536 
537   PetscFunctionReturn(0);
538 }
539 
540 #undef __FUNCT__
541 #define __FUNCT__ "MatMult_MPIAIJ"
542 PetscErrorCode MatMult_MPIAIJ(Mat A,Vec xx,Vec yy)
543 {
544   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
545   PetscErrorCode ierr;
546   PetscInt       nt;
547 
548   PetscFunctionBegin;
549   ierr = VecGetLocalSize(xx,&nt);CHKERRQ(ierr);
550   if (nt != A->cmap.n) {
551     SETERRQ2(PETSC_ERR_ARG_SIZ,"Incompatible partition of A (%D) and xx (%D)",A->cmap.n,nt);
552   }
553   ierr = VecScatterBegin(a->Mvctx,xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
554   ierr = (*a->A->ops->mult)(a->A,xx,yy);CHKERRQ(ierr);
555   ierr = VecScatterEnd(a->Mvctx,xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
556   ierr = (*a->B->ops->multadd)(a->B,a->lvec,yy,yy);CHKERRQ(ierr);
557   PetscFunctionReturn(0);
558 }
559 
560 #undef __FUNCT__
561 #define __FUNCT__ "MatMultAdd_MPIAIJ"
562 PetscErrorCode MatMultAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
563 {
564   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
565   PetscErrorCode ierr;
566 
567   PetscFunctionBegin;
568   ierr = VecScatterBegin(a->Mvctx,xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
569   ierr = (*a->A->ops->multadd)(a->A,xx,yy,zz);CHKERRQ(ierr);
570   ierr = VecScatterEnd(a->Mvctx,xx,a->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
571   ierr = (*a->B->ops->multadd)(a->B,a->lvec,zz,zz);CHKERRQ(ierr);
572   PetscFunctionReturn(0);
573 }
574 
575 #undef __FUNCT__
576 #define __FUNCT__ "MatMultTranspose_MPIAIJ"
577 PetscErrorCode MatMultTranspose_MPIAIJ(Mat A,Vec xx,Vec yy)
578 {
579   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
580   PetscErrorCode ierr;
581   PetscTruth     merged;
582 
583   PetscFunctionBegin;
584   ierr = VecScatterGetMerged(a->Mvctx,&merged);CHKERRQ(ierr);
585   /* do nondiagonal part */
586   ierr = (*a->B->ops->multtranspose)(a->B,xx,a->lvec);CHKERRQ(ierr);
587   if (!merged) {
588     /* send it on its way */
589     ierr = VecScatterBegin(a->Mvctx,a->lvec,yy,ADD_VALUES,SCATTER_REVERSE);CHKERRQ(ierr);
590     /* do local part */
591     ierr = (*a->A->ops->multtranspose)(a->A,xx,yy);CHKERRQ(ierr);
592     /* receive remote parts: note this assumes the values are not actually */
593     /* added in yy until the next line, */
594     ierr = VecScatterEnd(a->Mvctx,a->lvec,yy,ADD_VALUES,SCATTER_REVERSE);CHKERRQ(ierr);
595   } else {
596     /* do local part */
597     ierr = (*a->A->ops->multtranspose)(a->A,xx,yy);CHKERRQ(ierr);
598     /* send it on its way */
599     ierr = VecScatterBegin(a->Mvctx,a->lvec,yy,ADD_VALUES,SCATTER_REVERSE);CHKERRQ(ierr);
600     /* values actually were received in the Begin() but we need to call this nop */
601     ierr = VecScatterEnd(a->Mvctx,a->lvec,yy,ADD_VALUES,SCATTER_REVERSE);CHKERRQ(ierr);
602   }
603   PetscFunctionReturn(0);
604 }
605 
606 EXTERN_C_BEGIN
607 #undef __FUNCT__
608 #define __FUNCT__ "MatIsTranspose_MPIAIJ"
609 PetscErrorCode PETSCMAT_DLLEXPORT MatIsTranspose_MPIAIJ(Mat Amat,Mat Bmat,PetscReal tol,PetscTruth *f)
610 {
611   MPI_Comm       comm;
612   Mat_MPIAIJ     *Aij = (Mat_MPIAIJ *) Amat->data, *Bij;
613   Mat            Adia = Aij->A, Bdia, Aoff,Boff,*Aoffs,*Boffs;
614   IS             Me,Notme;
615   PetscErrorCode ierr;
616   PetscInt       M,N,first,last,*notme,i;
617   PetscMPIInt    size;
618 
619   PetscFunctionBegin;
620 
621   /* Easy test: symmetric diagonal block */
622   Bij = (Mat_MPIAIJ *) Bmat->data; Bdia = Bij->A;
623   ierr = MatIsTranspose(Adia,Bdia,tol,f);CHKERRQ(ierr);
624   if (!*f) PetscFunctionReturn(0);
625   ierr = PetscObjectGetComm((PetscObject)Amat,&comm);CHKERRQ(ierr);
626   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
627   if (size == 1) PetscFunctionReturn(0);
628 
629   /* Hard test: off-diagonal block. This takes a MatGetSubMatrix. */
630   ierr = MatGetSize(Amat,&M,&N);CHKERRQ(ierr);
631   ierr = MatGetOwnershipRange(Amat,&first,&last);CHKERRQ(ierr);
632   ierr = PetscMalloc((N-last+first)*sizeof(PetscInt),&notme);CHKERRQ(ierr);
633   for (i=0; i<first; i++) notme[i] = i;
634   for (i=last; i<M; i++) notme[i-last+first] = i;
635   ierr = ISCreateGeneral(MPI_COMM_SELF,N-last+first,notme,&Notme);CHKERRQ(ierr);
636   ierr = ISCreateStride(MPI_COMM_SELF,last-first,first,1,&Me);CHKERRQ(ierr);
637   ierr = MatGetSubMatrices(Amat,1,&Me,&Notme,MAT_INITIAL_MATRIX,&Aoffs);CHKERRQ(ierr);
638   Aoff = Aoffs[0];
639   ierr = MatGetSubMatrices(Bmat,1,&Notme,&Me,MAT_INITIAL_MATRIX,&Boffs);CHKERRQ(ierr);
640   Boff = Boffs[0];
641   ierr = MatIsTranspose(Aoff,Boff,tol,f);CHKERRQ(ierr);
642   ierr = MatDestroyMatrices(1,&Aoffs);CHKERRQ(ierr);
643   ierr = MatDestroyMatrices(1,&Boffs);CHKERRQ(ierr);
644   ierr = ISDestroy(Me);CHKERRQ(ierr);
645   ierr = ISDestroy(Notme);CHKERRQ(ierr);
646 
647   PetscFunctionReturn(0);
648 }
649 EXTERN_C_END
650 
651 #undef __FUNCT__
652 #define __FUNCT__ "MatMultTransposeAdd_MPIAIJ"
653 PetscErrorCode MatMultTransposeAdd_MPIAIJ(Mat A,Vec xx,Vec yy,Vec zz)
654 {
655   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
656   PetscErrorCode ierr;
657 
658   PetscFunctionBegin;
659   /* do nondiagonal part */
660   ierr = (*a->B->ops->multtranspose)(a->B,xx,a->lvec);CHKERRQ(ierr);
661   /* send it on its way */
662   ierr = VecScatterBegin(a->Mvctx,a->lvec,zz,ADD_VALUES,SCATTER_REVERSE);CHKERRQ(ierr);
663   /* do local part */
664   ierr = (*a->A->ops->multtransposeadd)(a->A,xx,yy,zz);CHKERRQ(ierr);
665   /* receive remote parts */
666   ierr = VecScatterEnd(a->Mvctx,a->lvec,zz,ADD_VALUES,SCATTER_REVERSE);CHKERRQ(ierr);
667   PetscFunctionReturn(0);
668 }
669 
670 /*
671   This only works correctly for square matrices where the subblock A->A is the
672    diagonal block
673 */
674 #undef __FUNCT__
675 #define __FUNCT__ "MatGetDiagonal_MPIAIJ"
676 PetscErrorCode MatGetDiagonal_MPIAIJ(Mat A,Vec v)
677 {
678   PetscErrorCode ierr;
679   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
680 
681   PetscFunctionBegin;
682   if (A->rmap.N != A->cmap.N) SETERRQ(PETSC_ERR_SUP,"Supports only square matrix where A->A is diag block");
683   if (A->rmap.rstart != A->cmap.rstart || A->rmap.rend != A->cmap.rend) {
684     SETERRQ(PETSC_ERR_ARG_SIZ,"row partition must equal col partition");
685   }
686   ierr = MatGetDiagonal(a->A,v);CHKERRQ(ierr);
687   PetscFunctionReturn(0);
688 }
689 
690 #undef __FUNCT__
691 #define __FUNCT__ "MatScale_MPIAIJ"
692 PetscErrorCode MatScale_MPIAIJ(Mat A,PetscScalar aa)
693 {
694   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
695   PetscErrorCode ierr;
696 
697   PetscFunctionBegin;
698   ierr = MatScale(a->A,aa);CHKERRQ(ierr);
699   ierr = MatScale(a->B,aa);CHKERRQ(ierr);
700   PetscFunctionReturn(0);
701 }
702 
703 #undef __FUNCT__
704 #define __FUNCT__ "MatDestroy_MPIAIJ"
705 PetscErrorCode MatDestroy_MPIAIJ(Mat mat)
706 {
707   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
708   PetscErrorCode ierr;
709 
710   PetscFunctionBegin;
711 #if defined(PETSC_USE_LOG)
712   PetscLogObjectState((PetscObject)mat,"Rows=%D, Cols=%D",mat->rmap.N,mat->cmap.N);
713 #endif
714   ierr = MatStashDestroy_Private(&mat->stash);CHKERRQ(ierr);
715   ierr = MatDestroy(aij->A);CHKERRQ(ierr);
716   ierr = MatDestroy(aij->B);CHKERRQ(ierr);
717 #if defined (PETSC_USE_CTABLE)
718   if (aij->colmap) {ierr = PetscTableDestroy(aij->colmap);CHKERRQ(ierr);}
719 #else
720   ierr = PetscFree(aij->colmap);CHKERRQ(ierr);
721 #endif
722   ierr = PetscFree(aij->garray);CHKERRQ(ierr);
723   if (aij->lvec)   {ierr = VecDestroy(aij->lvec);CHKERRQ(ierr);}
724   if (aij->Mvctx)  {ierr = VecScatterDestroy(aij->Mvctx);CHKERRQ(ierr);}
725   ierr = PetscFree(aij->rowvalues);CHKERRQ(ierr);
726   ierr = PetscFree(aij->ld);CHKERRQ(ierr);
727   ierr = PetscFree(aij);CHKERRQ(ierr);
728 
729   ierr = PetscObjectChangeTypeName((PetscObject)mat,0);CHKERRQ(ierr);
730   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatStoreValues_C","",PETSC_NULL);CHKERRQ(ierr);
731   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatRetrieveValues_C","",PETSC_NULL);CHKERRQ(ierr);
732   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatGetDiagonalBlock_C","",PETSC_NULL);CHKERRQ(ierr);
733   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatIsTranspose_C","",PETSC_NULL);CHKERRQ(ierr);
734   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatMPIAIJSetPreallocation_C","",PETSC_NULL);CHKERRQ(ierr);
735   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatMPIAIJSetPreallocationCSR_C","",PETSC_NULL);CHKERRQ(ierr);
736   ierr = PetscObjectComposeFunction((PetscObject)mat,"MatDiagonalScaleLocal_C","",PETSC_NULL);CHKERRQ(ierr);
737   PetscFunctionReturn(0);
738 }
739 
740 #undef __FUNCT__
741 #define __FUNCT__ "MatView_MPIAIJ_Binary"
742 PetscErrorCode MatView_MPIAIJ_Binary(Mat mat,PetscViewer viewer)
743 {
744   Mat_MPIAIJ        *aij = (Mat_MPIAIJ*)mat->data;
745   Mat_SeqAIJ*       A = (Mat_SeqAIJ*)aij->A->data;
746   Mat_SeqAIJ*       B = (Mat_SeqAIJ*)aij->B->data;
747   PetscErrorCode    ierr;
748   PetscMPIInt       rank,size,tag = ((PetscObject)viewer)->tag;
749   int               fd;
750   PetscInt          nz,header[4],*row_lengths,*range=0,rlen,i;
751   PetscInt          nzmax,*column_indices,j,k,col,*garray = aij->garray,cnt,cstart = mat->cmap.rstart,rnz;
752   PetscScalar       *column_values;
753 
754   PetscFunctionBegin;
755   ierr = MPI_Comm_rank(mat->comm,&rank);CHKERRQ(ierr);
756   ierr = MPI_Comm_size(mat->comm,&size);CHKERRQ(ierr);
757   nz   = A->nz + B->nz;
758   if (!rank) {
759     header[0] = MAT_FILE_COOKIE;
760     header[1] = mat->rmap.N;
761     header[2] = mat->cmap.N;
762     ierr = MPI_Reduce(&nz,&header[3],1,MPIU_INT,MPI_SUM,0,mat->comm);CHKERRQ(ierr);
763     ierr = PetscViewerBinaryGetDescriptor(viewer,&fd);CHKERRQ(ierr);
764     ierr = PetscBinaryWrite(fd,header,4,PETSC_INT,PETSC_TRUE);CHKERRQ(ierr);
765     /* get largest number of rows any processor has */
766     rlen = mat->rmap.n;
767     range = mat->rmap.range;
768     for (i=1; i<size; i++) {
769       rlen = PetscMax(rlen,range[i+1] - range[i]);
770     }
771   } else {
772     ierr = MPI_Reduce(&nz,0,1,MPIU_INT,MPI_SUM,0,mat->comm);CHKERRQ(ierr);
773     rlen = mat->rmap.n;
774   }
775 
776   /* load up the local row counts */
777   ierr = PetscMalloc((rlen+1)*sizeof(PetscInt),&row_lengths);CHKERRQ(ierr);
778   for (i=0; i<mat->rmap.n; i++) {
779     row_lengths[i] = A->i[i+1] - A->i[i] + B->i[i+1] - B->i[i];
780   }
781 
782   /* store the row lengths to the file */
783   if (!rank) {
784     MPI_Status status;
785     ierr = PetscBinaryWrite(fd,row_lengths,mat->rmap.n,PETSC_INT,PETSC_TRUE);CHKERRQ(ierr);
786     for (i=1; i<size; i++) {
787       rlen = range[i+1] - range[i];
788       ierr = MPI_Recv(row_lengths,rlen,MPIU_INT,i,tag,mat->comm,&status);CHKERRQ(ierr);
789       ierr = PetscBinaryWrite(fd,row_lengths,rlen,PETSC_INT,PETSC_TRUE);CHKERRQ(ierr);
790     }
791   } else {
792     ierr = MPI_Send(row_lengths,mat->rmap.n,MPIU_INT,0,tag,mat->comm);CHKERRQ(ierr);
793   }
794   ierr = PetscFree(row_lengths);CHKERRQ(ierr);
795 
796   /* load up the local column indices */
797   nzmax = nz; /* )th processor needs space a largest processor needs */
798   ierr = MPI_Reduce(&nz,&nzmax,1,MPIU_INT,MPI_MAX,0,mat->comm);CHKERRQ(ierr);
799   ierr = PetscMalloc((nzmax+1)*sizeof(PetscInt),&column_indices);CHKERRQ(ierr);
800   cnt  = 0;
801   for (i=0; i<mat->rmap.n; i++) {
802     for (j=B->i[i]; j<B->i[i+1]; j++) {
803       if ( (col = garray[B->j[j]]) > cstart) break;
804       column_indices[cnt++] = col;
805     }
806     for (k=A->i[i]; k<A->i[i+1]; k++) {
807       column_indices[cnt++] = A->j[k] + cstart;
808     }
809     for (; j<B->i[i+1]; j++) {
810       column_indices[cnt++] = garray[B->j[j]];
811     }
812   }
813   if (cnt != A->nz + B->nz) SETERRQ2(PETSC_ERR_LIB,"Internal PETSc error: cnt = %D nz = %D",cnt,A->nz+B->nz);
814 
815   /* store the column indices to the file */
816   if (!rank) {
817     MPI_Status status;
818     ierr = PetscBinaryWrite(fd,column_indices,nz,PETSC_INT,PETSC_TRUE);CHKERRQ(ierr);
819     for (i=1; i<size; i++) {
820       ierr = MPI_Recv(&rnz,1,MPIU_INT,i,tag,mat->comm,&status);CHKERRQ(ierr);
821       if (rnz > nzmax) SETERRQ2(PETSC_ERR_LIB,"Internal PETSc error: nz = %D nzmax = %D",nz,nzmax);
822       ierr = MPI_Recv(column_indices,rnz,MPIU_INT,i,tag,mat->comm,&status);CHKERRQ(ierr);
823       ierr = PetscBinaryWrite(fd,column_indices,rnz,PETSC_INT,PETSC_TRUE);CHKERRQ(ierr);
824     }
825   } else {
826     ierr = MPI_Send(&nz,1,MPIU_INT,0,tag,mat->comm);CHKERRQ(ierr);
827     ierr = MPI_Send(column_indices,nz,MPIU_INT,0,tag,mat->comm);CHKERRQ(ierr);
828   }
829   ierr = PetscFree(column_indices);CHKERRQ(ierr);
830 
831   /* load up the local column values */
832   ierr = PetscMalloc((nzmax+1)*sizeof(PetscScalar),&column_values);CHKERRQ(ierr);
833   cnt  = 0;
834   for (i=0; i<mat->rmap.n; i++) {
835     for (j=B->i[i]; j<B->i[i+1]; j++) {
836       if ( garray[B->j[j]] > cstart) break;
837       column_values[cnt++] = B->a[j];
838     }
839     for (k=A->i[i]; k<A->i[i+1]; k++) {
840       column_values[cnt++] = A->a[k];
841     }
842     for (; j<B->i[i+1]; j++) {
843       column_values[cnt++] = B->a[j];
844     }
845   }
846   if (cnt != A->nz + B->nz) SETERRQ2(PETSC_ERR_PLIB,"Internal PETSc error: cnt = %D nz = %D",cnt,A->nz+B->nz);
847 
848   /* store the column values to the file */
849   if (!rank) {
850     MPI_Status status;
851     ierr = PetscBinaryWrite(fd,column_values,nz,PETSC_SCALAR,PETSC_TRUE);CHKERRQ(ierr);
852     for (i=1; i<size; i++) {
853       ierr = MPI_Recv(&rnz,1,MPIU_INT,i,tag,mat->comm,&status);CHKERRQ(ierr);
854       if (rnz > nzmax) SETERRQ2(PETSC_ERR_LIB,"Internal PETSc error: nz = %D nzmax = %D",nz,nzmax);
855       ierr = MPI_Recv(column_values,rnz,MPIU_SCALAR,i,tag,mat->comm,&status);CHKERRQ(ierr);
856       ierr = PetscBinaryWrite(fd,column_values,rnz,PETSC_SCALAR,PETSC_TRUE);CHKERRQ(ierr);
857     }
858   } else {
859     ierr = MPI_Send(&nz,1,MPIU_INT,0,tag,mat->comm);CHKERRQ(ierr);
860     ierr = MPI_Send(column_values,nz,MPIU_SCALAR,0,tag,mat->comm);CHKERRQ(ierr);
861   }
862   ierr = PetscFree(column_values);CHKERRQ(ierr);
863   PetscFunctionReturn(0);
864 }
865 
866 #undef __FUNCT__
867 #define __FUNCT__ "MatView_MPIAIJ_ASCIIorDraworSocket"
868 PetscErrorCode MatView_MPIAIJ_ASCIIorDraworSocket(Mat mat,PetscViewer viewer)
869 {
870   Mat_MPIAIJ        *aij = (Mat_MPIAIJ*)mat->data;
871   PetscErrorCode    ierr;
872   PetscMPIInt       rank = aij->rank,size = aij->size;
873   PetscTruth        isdraw,iascii,isbinary;
874   PetscViewer       sviewer;
875   PetscViewerFormat format;
876 
877   PetscFunctionBegin;
878   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_DRAW,&isdraw);CHKERRQ(ierr);
879   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_ASCII,&iascii);CHKERRQ(ierr);
880   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_BINARY,&isbinary);CHKERRQ(ierr);
881   if (iascii) {
882     ierr = PetscViewerGetFormat(viewer,&format);CHKERRQ(ierr);
883     if (format == PETSC_VIEWER_ASCII_INFO_DETAIL) {
884       MatInfo    info;
885       PetscTruth inodes;
886 
887       ierr = MPI_Comm_rank(mat->comm,&rank);CHKERRQ(ierr);
888       ierr = MatGetInfo(mat,MAT_LOCAL,&info);CHKERRQ(ierr);
889       ierr = MatInodeGetInodeSizes(aij->A,PETSC_NULL,(PetscInt **)&inodes,PETSC_NULL);CHKERRQ(ierr);
890       if (!inodes) {
891         ierr = PetscViewerASCIISynchronizedPrintf(viewer,"[%d] Local rows %D nz %D nz alloced %D mem %D, not using I-node routines\n",
892 					      rank,mat->rmap.n,(PetscInt)info.nz_used,(PetscInt)info.nz_allocated,(PetscInt)info.memory);CHKERRQ(ierr);
893       } else {
894         ierr = PetscViewerASCIISynchronizedPrintf(viewer,"[%d] Local rows %D nz %D nz alloced %D mem %D, using I-node routines\n",
895 		    rank,mat->rmap.n,(PetscInt)info.nz_used,(PetscInt)info.nz_allocated,(PetscInt)info.memory);CHKERRQ(ierr);
896       }
897       ierr = MatGetInfo(aij->A,MAT_LOCAL,&info);CHKERRQ(ierr);
898       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"[%d] on-diagonal part: nz %D \n",rank,(PetscInt)info.nz_used);CHKERRQ(ierr);
899       ierr = MatGetInfo(aij->B,MAT_LOCAL,&info);CHKERRQ(ierr);
900       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"[%d] off-diagonal part: nz %D \n",rank,(PetscInt)info.nz_used);CHKERRQ(ierr);
901       ierr = PetscViewerFlush(viewer);CHKERRQ(ierr);
902       ierr = PetscViewerASCIIPrintf(viewer,"Information on VecScatter used in matrix-vector product: \n");CHKERRQ(ierr);
903       ierr = VecScatterView(aij->Mvctx,viewer);CHKERRQ(ierr);
904       PetscFunctionReturn(0);
905     } else if (format == PETSC_VIEWER_ASCII_INFO) {
906       PetscInt   inodecount,inodelimit,*inodes;
907       ierr = MatInodeGetInodeSizes(aij->A,&inodecount,&inodes,&inodelimit);CHKERRQ(ierr);
908       if (inodes) {
909         ierr = PetscViewerASCIIPrintf(viewer,"using I-node (on process 0) routines: found %D nodes, limit used is %D\n",inodecount,inodelimit);CHKERRQ(ierr);
910       } else {
911         ierr = PetscViewerASCIIPrintf(viewer,"not using I-node (on process 0) routines\n");CHKERRQ(ierr);
912       }
913       PetscFunctionReturn(0);
914     } else if (format == PETSC_VIEWER_ASCII_FACTOR_INFO) {
915       PetscFunctionReturn(0);
916     }
917   } else if (isbinary) {
918     if (size == 1) {
919       ierr = PetscObjectSetName((PetscObject)aij->A,mat->name);CHKERRQ(ierr);
920       ierr = MatView(aij->A,viewer);CHKERRQ(ierr);
921     } else {
922       ierr = MatView_MPIAIJ_Binary(mat,viewer);CHKERRQ(ierr);
923     }
924     PetscFunctionReturn(0);
925   } else if (isdraw) {
926     PetscDraw  draw;
927     PetscTruth isnull;
928     ierr = PetscViewerDrawGetDraw(viewer,0,&draw);CHKERRQ(ierr);
929     ierr = PetscDrawIsNull(draw,&isnull);CHKERRQ(ierr); if (isnull) PetscFunctionReturn(0);
930   }
931 
932   if (size == 1) {
933     ierr = PetscObjectSetName((PetscObject)aij->A,mat->name);CHKERRQ(ierr);
934     ierr = MatView(aij->A,viewer);CHKERRQ(ierr);
935   } else {
936     /* assemble the entire matrix onto first processor. */
937     Mat         A;
938     Mat_SeqAIJ  *Aloc;
939     PetscInt    M = mat->rmap.N,N = mat->cmap.N,m,*ai,*aj,row,*cols,i,*ct;
940     PetscScalar *a;
941 
942     ierr = MatCreate(mat->comm,&A);CHKERRQ(ierr);
943     if (!rank) {
944       ierr = MatSetSizes(A,M,N,M,N);CHKERRQ(ierr);
945     } else {
946       ierr = MatSetSizes(A,0,0,M,N);CHKERRQ(ierr);
947     }
948     /* This is just a temporary matrix, so explicitly using MATMPIAIJ is probably best */
949     ierr = MatSetType(A,MATMPIAIJ);CHKERRQ(ierr);
950     ierr = MatMPIAIJSetPreallocation(A,0,PETSC_NULL,0,PETSC_NULL);CHKERRQ(ierr);
951     ierr = PetscLogObjectParent(mat,A);CHKERRQ(ierr);
952 
953     /* copy over the A part */
954     Aloc = (Mat_SeqAIJ*)aij->A->data;
955     m = aij->A->rmap.n; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
956     row = mat->rmap.rstart;
957     for (i=0; i<ai[m]; i++) {aj[i] += mat->cmap.rstart ;}
958     for (i=0; i<m; i++) {
959       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,INSERT_VALUES);CHKERRQ(ierr);
960       row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
961     }
962     aj = Aloc->j;
963     for (i=0; i<ai[m]; i++) {aj[i] -= mat->cmap.rstart;}
964 
965     /* copy over the B part */
966     Aloc = (Mat_SeqAIJ*)aij->B->data;
967     m    = aij->B->rmap.n;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
968     row  = mat->rmap.rstart;
969     ierr = PetscMalloc((ai[m]+1)*sizeof(PetscInt),&cols);CHKERRQ(ierr);
970     ct   = cols;
971     for (i=0; i<ai[m]; i++) {cols[i] = aij->garray[aj[i]];}
972     for (i=0; i<m; i++) {
973       ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,INSERT_VALUES);CHKERRQ(ierr);
974       row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
975     }
976     ierr = PetscFree(ct);CHKERRQ(ierr);
977     ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
978     ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
979     /*
980        Everyone has to call to draw the matrix since the graphics waits are
981        synchronized across all processors that share the PetscDraw object
982     */
983     ierr = PetscViewerGetSingleton(viewer,&sviewer);CHKERRQ(ierr);
984     if (!rank) {
985       ierr = PetscObjectSetName((PetscObject)((Mat_MPIAIJ*)(A->data))->A,mat->name);CHKERRQ(ierr);
986       ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,sviewer);CHKERRQ(ierr);
987     }
988     ierr = PetscViewerRestoreSingleton(viewer,&sviewer);CHKERRQ(ierr);
989     ierr = MatDestroy(A);CHKERRQ(ierr);
990   }
991   PetscFunctionReturn(0);
992 }
993 
994 #undef __FUNCT__
995 #define __FUNCT__ "MatView_MPIAIJ"
996 PetscErrorCode MatView_MPIAIJ(Mat mat,PetscViewer viewer)
997 {
998   PetscErrorCode ierr;
999   PetscTruth     iascii,isdraw,issocket,isbinary;
1000 
1001   PetscFunctionBegin;
1002   ierr  = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_ASCII,&iascii);CHKERRQ(ierr);
1003   ierr  = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_DRAW,&isdraw);CHKERRQ(ierr);
1004   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_BINARY,&isbinary);CHKERRQ(ierr);
1005   ierr = PetscTypeCompare((PetscObject)viewer,PETSC_VIEWER_SOCKET,&issocket);CHKERRQ(ierr);
1006   if (iascii || isdraw || isbinary || issocket) {
1007     ierr = MatView_MPIAIJ_ASCIIorDraworSocket(mat,viewer);CHKERRQ(ierr);
1008   } else {
1009     SETERRQ1(PETSC_ERR_SUP,"Viewer type %s not supported by MPIAIJ matrices",((PetscObject)viewer)->type_name);
1010   }
1011   PetscFunctionReturn(0);
1012 }
1013 
1014 #undef __FUNCT__
1015 #define __FUNCT__ "MatRelax_MPIAIJ"
1016 PetscErrorCode MatRelax_MPIAIJ(Mat matin,Vec bb,PetscReal omega,MatSORType flag,PetscReal fshift,PetscInt its,PetscInt lits,Vec xx)
1017 {
1018   Mat_MPIAIJ     *mat = (Mat_MPIAIJ*)matin->data;
1019   PetscErrorCode ierr;
1020   Vec            bb1;
1021 
1022   PetscFunctionBegin;
1023   ierr = VecDuplicate(bb,&bb1);CHKERRQ(ierr);
1024 
1025   if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
1026     if (flag & SOR_ZERO_INITIAL_GUESS) {
1027       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,lits,lits,xx);CHKERRQ(ierr);
1028       its--;
1029     }
1030 
1031     while (its--) {
1032       ierr = VecScatterBegin(mat->Mvctx,xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1033       ierr = VecScatterEnd(mat->Mvctx,xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1034 
1035       /* update rhs: bb1 = bb - B*x */
1036       ierr = VecScale(mat->lvec,-1.0);CHKERRQ(ierr);
1037       ierr = (*mat->B->ops->multadd)(mat->B,mat->lvec,bb,bb1);CHKERRQ(ierr);
1038 
1039       /* local sweep */
1040       ierr = (*mat->A->ops->relax)(mat->A,bb1,omega,SOR_SYMMETRIC_SWEEP,fshift,lits,lits,xx);CHKERRQ(ierr);
1041     }
1042   } else if (flag & SOR_LOCAL_FORWARD_SWEEP){
1043     if (flag & SOR_ZERO_INITIAL_GUESS) {
1044       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,lits,PETSC_NULL,xx);CHKERRQ(ierr);
1045       its--;
1046     }
1047     while (its--) {
1048       ierr = VecScatterBegin(mat->Mvctx,xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1049       ierr = VecScatterEnd(mat->Mvctx,xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1050 
1051       /* update rhs: bb1 = bb - B*x */
1052       ierr = VecScale(mat->lvec,-1.0);CHKERRQ(ierr);
1053       ierr = (*mat->B->ops->multadd)(mat->B,mat->lvec,bb,bb1);CHKERRQ(ierr);
1054 
1055       /* local sweep */
1056       ierr = (*mat->A->ops->relax)(mat->A,bb1,omega,SOR_FORWARD_SWEEP,fshift,lits,PETSC_NULL,xx);CHKERRQ(ierr);
1057     }
1058   } else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
1059     if (flag & SOR_ZERO_INITIAL_GUESS) {
1060       ierr = (*mat->A->ops->relax)(mat->A,bb,omega,flag,fshift,lits,PETSC_NULL,xx);CHKERRQ(ierr);
1061       its--;
1062     }
1063     while (its--) {
1064       ierr = VecScatterBegin(mat->Mvctx,xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1065       ierr = VecScatterEnd(mat->Mvctx,xx,mat->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1066 
1067       /* update rhs: bb1 = bb - B*x */
1068       ierr = VecScale(mat->lvec,-1.0);CHKERRQ(ierr);
1069       ierr = (*mat->B->ops->multadd)(mat->B,mat->lvec,bb,bb1);CHKERRQ(ierr);
1070 
1071       /* local sweep */
1072       ierr = (*mat->A->ops->relax)(mat->A,bb1,omega,SOR_BACKWARD_SWEEP,fshift,lits,PETSC_NULL,xx);CHKERRQ(ierr);
1073     }
1074   } else {
1075     SETERRQ(PETSC_ERR_SUP,"Parallel SOR not supported");
1076   }
1077 
1078   ierr = VecDestroy(bb1);CHKERRQ(ierr);
1079   PetscFunctionReturn(0);
1080 }
1081 
1082 #undef __FUNCT__
1083 #define __FUNCT__ "MatPermute_MPIAIJ"
1084 PetscErrorCode MatPermute_MPIAIJ(Mat A,IS rowp,IS colp,Mat *B)
1085 {
1086   MPI_Comm       comm,pcomm;
1087   PetscInt       first,local_size,nrows,*rows;
1088   int            ntids;
1089   IS             crowp,growp,irowp,lrowp,lcolp,icolp;
1090   PetscErrorCode ierr;
1091 
1092   PetscFunctionBegin;
1093   ierr = PetscObjectGetComm((PetscObject)A,&comm); CHKERRQ(ierr);
1094   /* make a collective version of 'rowp' */
1095   ierr = PetscObjectGetComm((PetscObject)rowp,&pcomm); CHKERRQ(ierr);
1096   if (pcomm==comm) {
1097     crowp = rowp;
1098   } else {
1099     ierr = ISGetSize(rowp,&nrows); CHKERRQ(ierr);
1100     ierr = ISGetIndices(rowp,&rows); CHKERRQ(ierr);
1101     ierr = ISCreateGeneral(comm,nrows,rows,&crowp); CHKERRQ(ierr);
1102     ierr = ISRestoreIndices(rowp,&rows); CHKERRQ(ierr);
1103   }
1104   /* collect the global row permutation and invert it */
1105   ierr = ISAllGather(crowp,&growp); CHKERRQ(ierr);
1106   ierr = ISSetPermutation(growp); CHKERRQ(ierr);
1107   if (pcomm!=comm) {
1108     ierr = ISDestroy(crowp); CHKERRQ(ierr);
1109   }
1110   ierr = ISInvertPermutation(growp,PETSC_DECIDE,&irowp);CHKERRQ(ierr);
1111   /* get the local target indices */
1112   ierr = MatGetOwnershipRange(A,&first,PETSC_NULL); CHKERRQ(ierr);
1113   ierr = MatGetLocalSize(A,&local_size,PETSC_NULL); CHKERRQ(ierr);
1114   ierr = ISGetIndices(irowp,&rows); CHKERRQ(ierr);
1115   ierr = ISCreateGeneral(MPI_COMM_SELF,local_size,rows+first,&lrowp); CHKERRQ(ierr);
1116   ierr = ISRestoreIndices(irowp,&rows); CHKERRQ(ierr);
1117   ierr = ISDestroy(irowp); CHKERRQ(ierr);
1118   /* the column permutation is so much easier;
1119      make a local version of 'colp' and invert it */
1120   ierr = PetscObjectGetComm((PetscObject)colp,&pcomm); CHKERRQ(ierr);
1121   ierr = MPI_Comm_size(pcomm,&ntids); CHKERRQ(ierr);
1122   if (ntids==1) {
1123     lcolp = colp;
1124   } else {
1125     ierr = ISGetSize(colp,&nrows); CHKERRQ(ierr);
1126     ierr = ISGetIndices(colp,&rows); CHKERRQ(ierr);
1127     ierr = ISCreateGeneral(MPI_COMM_SELF,nrows,rows,&lcolp); CHKERRQ(ierr);
1128   }
1129   ierr = ISInvertPermutation(lcolp,PETSC_DECIDE,&icolp); CHKERRQ(ierr);
1130   ierr = ISSetPermutation(lcolp); CHKERRQ(ierr);
1131   if (ntids>1) {
1132     ierr = ISRestoreIndices(colp,&rows); CHKERRQ(ierr);
1133     ierr = ISDestroy(lcolp); CHKERRQ(ierr);
1134   }
1135   /* now we just get the submatrix */
1136   ierr = MatGetSubMatrix(A,lrowp,icolp,local_size,MAT_INITIAL_MATRIX,B); CHKERRQ(ierr);
1137   /* clean up */
1138   ierr = ISDestroy(lrowp); CHKERRQ(ierr);
1139   ierr = ISDestroy(icolp); CHKERRQ(ierr);
1140   PetscFunctionReturn(0);
1141 }
1142 
1143 #undef __FUNCT__
1144 #define __FUNCT__ "MatGetInfo_MPIAIJ"
1145 PetscErrorCode MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,MatInfo *info)
1146 {
1147   Mat_MPIAIJ     *mat = (Mat_MPIAIJ*)matin->data;
1148   Mat            A = mat->A,B = mat->B;
1149   PetscErrorCode ierr;
1150   PetscReal      isend[5],irecv[5];
1151 
1152   PetscFunctionBegin;
1153   info->block_size     = 1.0;
1154   ierr = MatGetInfo(A,MAT_LOCAL,info);CHKERRQ(ierr);
1155   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->nz_unneeded;
1156   isend[3] = info->memory;  isend[4] = info->mallocs;
1157   ierr = MatGetInfo(B,MAT_LOCAL,info);CHKERRQ(ierr);
1158   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->nz_unneeded;
1159   isend[3] += info->memory;  isend[4] += info->mallocs;
1160   if (flag == MAT_LOCAL) {
1161     info->nz_used      = isend[0];
1162     info->nz_allocated = isend[1];
1163     info->nz_unneeded  = isend[2];
1164     info->memory       = isend[3];
1165     info->mallocs      = isend[4];
1166   } else if (flag == MAT_GLOBAL_MAX) {
1167     ierr = MPI_Allreduce(isend,irecv,5,MPIU_REAL,MPI_MAX,matin->comm);CHKERRQ(ierr);
1168     info->nz_used      = irecv[0];
1169     info->nz_allocated = irecv[1];
1170     info->nz_unneeded  = irecv[2];
1171     info->memory       = irecv[3];
1172     info->mallocs      = irecv[4];
1173   } else if (flag == MAT_GLOBAL_SUM) {
1174     ierr = MPI_Allreduce(isend,irecv,5,MPIU_REAL,MPI_SUM,matin->comm);CHKERRQ(ierr);
1175     info->nz_used      = irecv[0];
1176     info->nz_allocated = irecv[1];
1177     info->nz_unneeded  = irecv[2];
1178     info->memory       = irecv[3];
1179     info->mallocs      = irecv[4];
1180   }
1181   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
1182   info->fill_ratio_needed = 0;
1183   info->factor_mallocs    = 0;
1184   info->rows_global       = (double)matin->rmap.N;
1185   info->columns_global    = (double)matin->cmap.N;
1186   info->rows_local        = (double)matin->rmap.n;
1187   info->columns_local     = (double)matin->cmap.N;
1188 
1189   PetscFunctionReturn(0);
1190 }
1191 
1192 #undef __FUNCT__
1193 #define __FUNCT__ "MatSetOption_MPIAIJ"
1194 PetscErrorCode MatSetOption_MPIAIJ(Mat A,MatOption op,PetscTruth flg)
1195 {
1196   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
1197   PetscErrorCode ierr;
1198 
1199   PetscFunctionBegin;
1200   switch (op) {
1201   case MAT_NEW_NONZERO_LOCATIONS:
1202   case MAT_NEW_NONZERO_ALLOCATION_ERR:
1203   case MAT_KEEP_ZEROED_ROWS:
1204   case MAT_NEW_NONZERO_LOCATION_ERR:
1205   case MAT_USE_INODES:
1206   case MAT_IGNORE_ZERO_ENTRIES:
1207     ierr = MatSetOption(a->A,op,flg);CHKERRQ(ierr);
1208     ierr = MatSetOption(a->B,op,flg);CHKERRQ(ierr);
1209     break;
1210   case MAT_ROW_ORIENTED:
1211     a->roworiented = flg;
1212     ierr = MatSetOption(a->A,op,flg);CHKERRQ(ierr);
1213     ierr = MatSetOption(a->B,op,flg);CHKERRQ(ierr);
1214     break;
1215   case MAT_NEW_DIAGONALS:
1216     ierr = PetscInfo1(A,"Option %s ignored\n",MatOptions[op]);CHKERRQ(ierr);
1217     break;
1218   case MAT_IGNORE_OFF_PROC_ENTRIES:
1219     a->donotstash = PETSC_TRUE;
1220     break;
1221   case MAT_SYMMETRIC:
1222     ierr = MatSetOption(a->A,op,flg);CHKERRQ(ierr);
1223     break;
1224   case MAT_STRUCTURALLY_SYMMETRIC:
1225   case MAT_HERMITIAN:
1226   case MAT_SYMMETRY_ETERNAL:
1227     ierr = MatSetOption(a->A,op,flg);CHKERRQ(ierr);
1228     break;
1229   default:
1230     SETERRQ1(PETSC_ERR_SUP,"unknown option %d",op);
1231   }
1232   PetscFunctionReturn(0);
1233 }
1234 
1235 #undef __FUNCT__
1236 #define __FUNCT__ "MatGetRow_MPIAIJ"
1237 PetscErrorCode MatGetRow_MPIAIJ(Mat matin,PetscInt row,PetscInt *nz,PetscInt **idx,PetscScalar **v)
1238 {
1239   Mat_MPIAIJ     *mat = (Mat_MPIAIJ*)matin->data;
1240   PetscScalar    *vworkA,*vworkB,**pvA,**pvB,*v_p;
1241   PetscErrorCode ierr;
1242   PetscInt       i,*cworkA,*cworkB,**pcA,**pcB,cstart = matin->cmap.rstart;
1243   PetscInt       nztot,nzA,nzB,lrow,rstart = matin->rmap.rstart,rend = matin->rmap.rend;
1244   PetscInt       *cmap,*idx_p;
1245 
1246   PetscFunctionBegin;
1247   if (mat->getrowactive) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"Already active");
1248   mat->getrowactive = PETSC_TRUE;
1249 
1250   if (!mat->rowvalues && (idx || v)) {
1251     /*
1252         allocate enough space to hold information from the longest row.
1253     */
1254     Mat_SeqAIJ *Aa = (Mat_SeqAIJ*)mat->A->data,*Ba = (Mat_SeqAIJ*)mat->B->data;
1255     PetscInt     max = 1,tmp;
1256     for (i=0; i<matin->rmap.n; i++) {
1257       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
1258       if (max < tmp) { max = tmp; }
1259     }
1260     ierr = PetscMalloc(max*(sizeof(PetscInt)+sizeof(PetscScalar)),&mat->rowvalues);CHKERRQ(ierr);
1261     mat->rowindices = (PetscInt*)(mat->rowvalues + max);
1262   }
1263 
1264   if (row < rstart || row >= rend) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"Only local rows")
1265   lrow = row - rstart;
1266 
1267   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1268   if (!v)   {pvA = 0; pvB = 0;}
1269   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1270   ierr = (*mat->A->ops->getrow)(mat->A,lrow,&nzA,pcA,pvA);CHKERRQ(ierr);
1271   ierr = (*mat->B->ops->getrow)(mat->B,lrow,&nzB,pcB,pvB);CHKERRQ(ierr);
1272   nztot = nzA + nzB;
1273 
1274   cmap  = mat->garray;
1275   if (v  || idx) {
1276     if (nztot) {
1277       /* Sort by increasing column numbers, assuming A and B already sorted */
1278       PetscInt imark = -1;
1279       if (v) {
1280         *v = v_p = mat->rowvalues;
1281         for (i=0; i<nzB; i++) {
1282           if (cmap[cworkB[i]] < cstart)   v_p[i] = vworkB[i];
1283           else break;
1284         }
1285         imark = i;
1286         for (i=0; i<nzA; i++)     v_p[imark+i] = vworkA[i];
1287         for (i=imark; i<nzB; i++) v_p[nzA+i]   = vworkB[i];
1288       }
1289       if (idx) {
1290         *idx = idx_p = mat->rowindices;
1291         if (imark > -1) {
1292           for (i=0; i<imark; i++) {
1293             idx_p[i] = cmap[cworkB[i]];
1294           }
1295         } else {
1296           for (i=0; i<nzB; i++) {
1297             if (cmap[cworkB[i]] < cstart)   idx_p[i] = cmap[cworkB[i]];
1298             else break;
1299           }
1300           imark = i;
1301         }
1302         for (i=0; i<nzA; i++)     idx_p[imark+i] = cstart + cworkA[i];
1303         for (i=imark; i<nzB; i++) idx_p[nzA+i]   = cmap[cworkB[i]];
1304       }
1305     } else {
1306       if (idx) *idx = 0;
1307       if (v)   *v   = 0;
1308     }
1309   }
1310   *nz = nztot;
1311   ierr = (*mat->A->ops->restorerow)(mat->A,lrow,&nzA,pcA,pvA);CHKERRQ(ierr);
1312   ierr = (*mat->B->ops->restorerow)(mat->B,lrow,&nzB,pcB,pvB);CHKERRQ(ierr);
1313   PetscFunctionReturn(0);
1314 }
1315 
1316 #undef __FUNCT__
1317 #define __FUNCT__ "MatRestoreRow_MPIAIJ"
1318 PetscErrorCode MatRestoreRow_MPIAIJ(Mat mat,PetscInt row,PetscInt *nz,PetscInt **idx,PetscScalar **v)
1319 {
1320   Mat_MPIAIJ *aij = (Mat_MPIAIJ*)mat->data;
1321 
1322   PetscFunctionBegin;
1323   if (!aij->getrowactive) {
1324     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"MatGetRow() must be called first");
1325   }
1326   aij->getrowactive = PETSC_FALSE;
1327   PetscFunctionReturn(0);
1328 }
1329 
1330 #undef __FUNCT__
1331 #define __FUNCT__ "MatNorm_MPIAIJ"
1332 PetscErrorCode MatNorm_MPIAIJ(Mat mat,NormType type,PetscReal *norm)
1333 {
1334   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
1335   Mat_SeqAIJ     *amat = (Mat_SeqAIJ*)aij->A->data,*bmat = (Mat_SeqAIJ*)aij->B->data;
1336   PetscErrorCode ierr;
1337   PetscInt       i,j,cstart = mat->cmap.rstart;
1338   PetscReal      sum = 0.0;
1339   PetscScalar    *v;
1340 
1341   PetscFunctionBegin;
1342   if (aij->size == 1) {
1343     ierr =  MatNorm(aij->A,type,norm);CHKERRQ(ierr);
1344   } else {
1345     if (type == NORM_FROBENIUS) {
1346       v = amat->a;
1347       for (i=0; i<amat->nz; i++) {
1348 #if defined(PETSC_USE_COMPLEX)
1349         sum += PetscRealPart(PetscConj(*v)*(*v)); v++;
1350 #else
1351         sum += (*v)*(*v); v++;
1352 #endif
1353       }
1354       v = bmat->a;
1355       for (i=0; i<bmat->nz; i++) {
1356 #if defined(PETSC_USE_COMPLEX)
1357         sum += PetscRealPart(PetscConj(*v)*(*v)); v++;
1358 #else
1359         sum += (*v)*(*v); v++;
1360 #endif
1361       }
1362       ierr = MPI_Allreduce(&sum,norm,1,MPIU_REAL,MPI_SUM,mat->comm);CHKERRQ(ierr);
1363       *norm = sqrt(*norm);
1364     } else if (type == NORM_1) { /* max column norm */
1365       PetscReal *tmp,*tmp2;
1366       PetscInt    *jj,*garray = aij->garray;
1367       ierr = PetscMalloc((mat->cmap.N+1)*sizeof(PetscReal),&tmp);CHKERRQ(ierr);
1368       ierr = PetscMalloc((mat->cmap.N+1)*sizeof(PetscReal),&tmp2);CHKERRQ(ierr);
1369       ierr = PetscMemzero(tmp,mat->cmap.N*sizeof(PetscReal));CHKERRQ(ierr);
1370       *norm = 0.0;
1371       v = amat->a; jj = amat->j;
1372       for (j=0; j<amat->nz; j++) {
1373         tmp[cstart + *jj++ ] += PetscAbsScalar(*v);  v++;
1374       }
1375       v = bmat->a; jj = bmat->j;
1376       for (j=0; j<bmat->nz; j++) {
1377         tmp[garray[*jj++]] += PetscAbsScalar(*v); v++;
1378       }
1379       ierr = MPI_Allreduce(tmp,tmp2,mat->cmap.N,MPIU_REAL,MPI_SUM,mat->comm);CHKERRQ(ierr);
1380       for (j=0; j<mat->cmap.N; j++) {
1381         if (tmp2[j] > *norm) *norm = tmp2[j];
1382       }
1383       ierr = PetscFree(tmp);CHKERRQ(ierr);
1384       ierr = PetscFree(tmp2);CHKERRQ(ierr);
1385     } else if (type == NORM_INFINITY) { /* max row norm */
1386       PetscReal ntemp = 0.0;
1387       for (j=0; j<aij->A->rmap.n; j++) {
1388         v = amat->a + amat->i[j];
1389         sum = 0.0;
1390         for (i=0; i<amat->i[j+1]-amat->i[j]; i++) {
1391           sum += PetscAbsScalar(*v); v++;
1392         }
1393         v = bmat->a + bmat->i[j];
1394         for (i=0; i<bmat->i[j+1]-bmat->i[j]; i++) {
1395           sum += PetscAbsScalar(*v); v++;
1396         }
1397         if (sum > ntemp) ntemp = sum;
1398       }
1399       ierr = MPI_Allreduce(&ntemp,norm,1,MPIU_REAL,MPI_MAX,mat->comm);CHKERRQ(ierr);
1400     } else {
1401       SETERRQ(PETSC_ERR_SUP,"No support for two norm");
1402     }
1403   }
1404   PetscFunctionReturn(0);
1405 }
1406 
1407 #undef __FUNCT__
1408 #define __FUNCT__ "MatTranspose_MPIAIJ"
1409 PetscErrorCode MatTranspose_MPIAIJ(Mat A,Mat *matout)
1410 {
1411   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
1412   Mat_SeqAIJ     *Aloc=(Mat_SeqAIJ*)a->A->data,*Bloc=(Mat_SeqAIJ*)a->B->data;
1413   PetscErrorCode ierr;
1414   PetscInt       M = A->rmap.N,N = A->cmap.N,ma,na,mb,*ai,*aj,*bi,*bj,row,*cols,i,*d_nnz;
1415   PetscInt       cstart=A->cmap.rstart,ncol;
1416   Mat            B;
1417   PetscScalar    *array;
1418 
1419   PetscFunctionBegin;
1420   if (!matout && M != N) SETERRQ(PETSC_ERR_ARG_SIZ,"Square matrix only for in-place");
1421 
1422   /* compute d_nnz for preallocation; o_nnz is approximated by d_nnz to avoid communication */
1423   ma = A->rmap.n; na = A->cmap.n; mb = a->B->rmap.n;
1424   ai = Aloc->i; aj = Aloc->j;
1425   bi = Bloc->i; bj = Bloc->j;
1426   ierr = PetscMalloc((1+na+bi[mb])*sizeof(PetscInt),&d_nnz);CHKERRQ(ierr);
1427   cols = d_nnz + na + 1; /* work space to be used by B part */
1428   ierr = PetscMemzero(d_nnz,(1+na)*sizeof(PetscInt));CHKERRQ(ierr);
1429   for (i=0; i<ai[ma]; i++){
1430     d_nnz[aj[i]] ++;
1431     aj[i] += cstart; /* global col index to be used by MatSetValues() */
1432   }
1433 
1434   ierr = MatCreate(A->comm,&B);CHKERRQ(ierr);
1435   ierr = MatSetSizes(B,A->cmap.n,A->rmap.n,N,M);CHKERRQ(ierr);
1436   ierr = MatSetType(B,A->type_name);CHKERRQ(ierr);
1437   ierr = MatMPIAIJSetPreallocation(B,0,d_nnz,0,d_nnz);CHKERRQ(ierr);
1438 
1439   /* copy over the A part */
1440   array = Aloc->a;
1441   row = A->rmap.rstart;
1442   for (i=0; i<ma; i++) {
1443     ncol = ai[i+1]-ai[i];
1444     ierr = MatSetValues(B,ncol,aj,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1445     row++; array += ncol; aj += ncol;
1446   }
1447   aj = Aloc->j;
1448   for (i=0; i<ai[ma]; i++) aj[i] -= cstart; /* resume local col index */
1449 
1450   /* copy over the B part */
1451   array = Bloc->a;
1452   row = A->rmap.rstart;
1453   for (i=0; i<bi[mb]; i++) {cols[i] = a->garray[bj[i]];}
1454   for (i=0; i<mb; i++) {
1455     ncol = bi[i+1]-bi[i];
1456     ierr = MatSetValues(B,ncol,cols,1,&row,array,INSERT_VALUES);CHKERRQ(ierr);
1457     row++; array += ncol; cols += ncol;
1458   }
1459   ierr = PetscFree(d_nnz);CHKERRQ(ierr);
1460   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1461   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1462   if (matout) {
1463     *matout = B;
1464   } else {
1465     ierr = MatHeaderCopy(A,B);CHKERRQ(ierr);
1466   }
1467   PetscFunctionReturn(0);
1468 }
1469 
1470 #undef __FUNCT__
1471 #define __FUNCT__ "MatDiagonalScale_MPIAIJ"
1472 PetscErrorCode MatDiagonalScale_MPIAIJ(Mat mat,Vec ll,Vec rr)
1473 {
1474   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
1475   Mat            a = aij->A,b = aij->B;
1476   PetscErrorCode ierr;
1477   PetscInt       s1,s2,s3;
1478 
1479   PetscFunctionBegin;
1480   ierr = MatGetLocalSize(mat,&s2,&s3);CHKERRQ(ierr);
1481   if (rr) {
1482     ierr = VecGetLocalSize(rr,&s1);CHKERRQ(ierr);
1483     if (s1!=s3) SETERRQ(PETSC_ERR_ARG_SIZ,"right vector non-conforming local size");
1484     /* Overlap communication with computation. */
1485     ierr = VecScatterBegin(aij->Mvctx,rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1486   }
1487   if (ll) {
1488     ierr = VecGetLocalSize(ll,&s1);CHKERRQ(ierr);
1489     if (s1!=s2) SETERRQ(PETSC_ERR_ARG_SIZ,"left vector non-conforming local size");
1490     ierr = (*b->ops->diagonalscale)(b,ll,0);CHKERRQ(ierr);
1491   }
1492   /* scale  the diagonal block */
1493   ierr = (*a->ops->diagonalscale)(a,ll,rr);CHKERRQ(ierr);
1494 
1495   if (rr) {
1496     /* Do a scatter end and then right scale the off-diagonal block */
1497     ierr = VecScatterEnd(aij->Mvctx,rr,aij->lvec,INSERT_VALUES,SCATTER_FORWARD);CHKERRQ(ierr);
1498     ierr = (*b->ops->diagonalscale)(b,0,aij->lvec);CHKERRQ(ierr);
1499   }
1500 
1501   PetscFunctionReturn(0);
1502 }
1503 
1504 #undef __FUNCT__
1505 #define __FUNCT__ "MatSetBlockSize_MPIAIJ"
1506 PetscErrorCode MatSetBlockSize_MPIAIJ(Mat A,PetscInt bs)
1507 {
1508   Mat_MPIAIJ     *a   = (Mat_MPIAIJ*)A->data;
1509   PetscErrorCode ierr;
1510 
1511   PetscFunctionBegin;
1512   ierr = MatSetBlockSize(a->A,bs);CHKERRQ(ierr);
1513   ierr = MatSetBlockSize(a->B,bs);CHKERRQ(ierr);
1514   PetscFunctionReturn(0);
1515 }
1516 #undef __FUNCT__
1517 #define __FUNCT__ "MatSetUnfactored_MPIAIJ"
1518 PetscErrorCode MatSetUnfactored_MPIAIJ(Mat A)
1519 {
1520   Mat_MPIAIJ     *a   = (Mat_MPIAIJ*)A->data;
1521   PetscErrorCode ierr;
1522 
1523   PetscFunctionBegin;
1524   ierr = MatSetUnfactored(a->A);CHKERRQ(ierr);
1525   PetscFunctionReturn(0);
1526 }
1527 
1528 #undef __FUNCT__
1529 #define __FUNCT__ "MatEqual_MPIAIJ"
1530 PetscErrorCode MatEqual_MPIAIJ(Mat A,Mat B,PetscTruth *flag)
1531 {
1532   Mat_MPIAIJ     *matB = (Mat_MPIAIJ*)B->data,*matA = (Mat_MPIAIJ*)A->data;
1533   Mat            a,b,c,d;
1534   PetscTruth     flg;
1535   PetscErrorCode ierr;
1536 
1537   PetscFunctionBegin;
1538   a = matA->A; b = matA->B;
1539   c = matB->A; d = matB->B;
1540 
1541   ierr = MatEqual(a,c,&flg);CHKERRQ(ierr);
1542   if (flg) {
1543     ierr = MatEqual(b,d,&flg);CHKERRQ(ierr);
1544   }
1545   ierr = MPI_Allreduce(&flg,flag,1,MPI_INT,MPI_LAND,A->comm);CHKERRQ(ierr);
1546   PetscFunctionReturn(0);
1547 }
1548 
1549 #undef __FUNCT__
1550 #define __FUNCT__ "MatCopy_MPIAIJ"
1551 PetscErrorCode MatCopy_MPIAIJ(Mat A,Mat B,MatStructure str)
1552 {
1553   PetscErrorCode ierr;
1554   Mat_MPIAIJ     *a = (Mat_MPIAIJ *)A->data;
1555   Mat_MPIAIJ     *b = (Mat_MPIAIJ *)B->data;
1556 
1557   PetscFunctionBegin;
1558   /* If the two matrices don't have the same copy implementation, they aren't compatible for fast copy. */
1559   if ((str != SAME_NONZERO_PATTERN) || (A->ops->copy != B->ops->copy)) {
1560     /* because of the column compression in the off-processor part of the matrix a->B,
1561        the number of columns in a->B and b->B may be different, hence we cannot call
1562        the MatCopy() directly on the two parts. If need be, we can provide a more
1563        efficient copy than the MatCopy_Basic() by first uncompressing the a->B matrices
1564        then copying the submatrices */
1565     ierr = MatCopy_Basic(A,B,str);CHKERRQ(ierr);
1566   } else {
1567     ierr = MatCopy(a->A,b->A,str);CHKERRQ(ierr);
1568     ierr = MatCopy(a->B,b->B,str);CHKERRQ(ierr);
1569   }
1570   PetscFunctionReturn(0);
1571 }
1572 
1573 #undef __FUNCT__
1574 #define __FUNCT__ "MatSetUpPreallocation_MPIAIJ"
1575 PetscErrorCode MatSetUpPreallocation_MPIAIJ(Mat A)
1576 {
1577   PetscErrorCode ierr;
1578 
1579   PetscFunctionBegin;
1580   ierr =  MatMPIAIJSetPreallocation(A,PETSC_DEFAULT,0,PETSC_DEFAULT,0);CHKERRQ(ierr);
1581   PetscFunctionReturn(0);
1582 }
1583 
1584 #include "petscblaslapack.h"
1585 #undef __FUNCT__
1586 #define __FUNCT__ "MatAXPY_MPIAIJ"
1587 PetscErrorCode MatAXPY_MPIAIJ(Mat Y,PetscScalar a,Mat X,MatStructure str)
1588 {
1589   PetscErrorCode ierr;
1590   PetscInt       i;
1591   Mat_MPIAIJ     *xx = (Mat_MPIAIJ *)X->data,*yy = (Mat_MPIAIJ *)Y->data;
1592   PetscBLASInt   bnz,one=1;
1593   Mat_SeqAIJ     *x,*y;
1594 
1595   PetscFunctionBegin;
1596   if (str == SAME_NONZERO_PATTERN) {
1597     PetscScalar alpha = a;
1598     x = (Mat_SeqAIJ *)xx->A->data;
1599     y = (Mat_SeqAIJ *)yy->A->data;
1600     bnz = (PetscBLASInt)x->nz;
1601     BLASaxpy_(&bnz,&alpha,x->a,&one,y->a,&one);
1602     x = (Mat_SeqAIJ *)xx->B->data;
1603     y = (Mat_SeqAIJ *)yy->B->data;
1604     bnz = (PetscBLASInt)x->nz;
1605     BLASaxpy_(&bnz,&alpha,x->a,&one,y->a,&one);
1606   } else if (str == SUBSET_NONZERO_PATTERN) {
1607     ierr = MatAXPY_SeqAIJ(yy->A,a,xx->A,str);CHKERRQ(ierr);
1608 
1609     x = (Mat_SeqAIJ *)xx->B->data;
1610     y = (Mat_SeqAIJ *)yy->B->data;
1611     if (y->xtoy && y->XtoY != xx->B) {
1612       ierr = PetscFree(y->xtoy);CHKERRQ(ierr);
1613       ierr = MatDestroy(y->XtoY);CHKERRQ(ierr);
1614     }
1615     if (!y->xtoy) { /* get xtoy */
1616       ierr = MatAXPYGetxtoy_Private(xx->B->rmap.n,x->i,x->j,xx->garray,y->i,y->j,yy->garray,&y->xtoy);CHKERRQ(ierr);
1617       y->XtoY = xx->B;
1618       ierr = PetscObjectReference((PetscObject)xx->B);CHKERRQ(ierr);
1619     }
1620     for (i=0; i<x->nz; i++) y->a[y->xtoy[i]] += a*(x->a[i]);
1621   } else {
1622     ierr = MatAXPY_Basic(Y,a,X,str);CHKERRQ(ierr);
1623   }
1624   PetscFunctionReturn(0);
1625 }
1626 
1627 EXTERN PetscErrorCode PETSCMAT_DLLEXPORT MatConjugate_SeqAIJ(Mat);
1628 
1629 #undef __FUNCT__
1630 #define __FUNCT__ "MatConjugate_MPIAIJ"
1631 PetscErrorCode PETSCMAT_DLLEXPORT MatConjugate_MPIAIJ(Mat mat)
1632 {
1633 #if defined(PETSC_USE_COMPLEX)
1634   PetscErrorCode ierr;
1635   Mat_MPIAIJ     *aij = (Mat_MPIAIJ *)mat->data;
1636 
1637   PetscFunctionBegin;
1638   ierr = MatConjugate_SeqAIJ(aij->A);CHKERRQ(ierr);
1639   ierr = MatConjugate_SeqAIJ(aij->B);CHKERRQ(ierr);
1640 #else
1641   PetscFunctionBegin;
1642 #endif
1643   PetscFunctionReturn(0);
1644 }
1645 
1646 #undef __FUNCT__
1647 #define __FUNCT__ "MatRealPart_MPIAIJ"
1648 PetscErrorCode MatRealPart_MPIAIJ(Mat A)
1649 {
1650   Mat_MPIAIJ   *a = (Mat_MPIAIJ*)A->data;
1651   PetscErrorCode ierr;
1652 
1653   PetscFunctionBegin;
1654   ierr = MatRealPart(a->A);CHKERRQ(ierr);
1655   ierr = MatRealPart(a->B);CHKERRQ(ierr);
1656   PetscFunctionReturn(0);
1657 }
1658 
1659 #undef __FUNCT__
1660 #define __FUNCT__ "MatImaginaryPart_MPIAIJ"
1661 PetscErrorCode MatImaginaryPart_MPIAIJ(Mat A)
1662 {
1663   Mat_MPIAIJ   *a = (Mat_MPIAIJ*)A->data;
1664   PetscErrorCode ierr;
1665 
1666   PetscFunctionBegin;
1667   ierr = MatImaginaryPart(a->A);CHKERRQ(ierr);
1668   ierr = MatImaginaryPart(a->B);CHKERRQ(ierr);
1669   PetscFunctionReturn(0);
1670 }
1671 
1672 #ifdef PETSC_HAVE_PBGL
1673 
1674 #include <boost/parallel/mpi/bsp_process_group.hpp>
1675 #include <boost/graph/distributed/ilu_default_graph.hpp>
1676 #include <boost/graph/distributed/ilu_0_block.hpp>
1677 #include <boost/graph/distributed/ilu_preconditioner.hpp>
1678 #include <boost/graph/distributed/petsc/interface.hpp>
1679 #include <boost/multi_array.hpp>
1680 #include <boost/parallel/distributed_property_map.hpp>
1681 
1682 #undef __FUNCT__
1683 #define __FUNCT__ "MatILUFactorSymbolic_MPIAIJ"
1684 /*
1685   This uses the parallel ILU factorization of Peter Gottschling <pgottsch@osl.iu.edu>
1686 */
1687 PetscErrorCode MatILUFactorSymbolic_MPIAIJ(Mat A, IS isrow, IS iscol, MatFactorInfo *info, Mat *fact)
1688 {
1689   namespace petsc = boost::distributed::petsc;
1690 
1691   namespace graph_dist = boost::graph::distributed;
1692   using boost::graph::distributed::ilu_default::process_group_type;
1693   using boost::graph::ilu_permuted;
1694 
1695   PetscTruth      row_identity, col_identity;
1696   PetscContainer  c;
1697   PetscInt        m, n, M, N;
1698   PetscErrorCode  ierr;
1699 
1700   PetscFunctionBegin;
1701   if (info->levels != 0) SETERRQ(PETSC_ERR_SUP,"Only levels = 0 supported for parallel ilu");
1702   ierr = ISIdentity(isrow, &row_identity);CHKERRQ(ierr);
1703   ierr = ISIdentity(iscol, &col_identity);CHKERRQ(ierr);
1704   if (!row_identity || !col_identity) {
1705     SETERRQ(PETSC_ERR_ARG_WRONG,"Row and column permutations must be identity for parallel ILU");
1706   }
1707 
1708   process_group_type pg;
1709   typedef graph_dist::ilu_default::ilu_level_graph_type  lgraph_type;
1710   lgraph_type*   lgraph_p = new lgraph_type(petsc::num_global_vertices(A), pg, petsc::matrix_distribution(A, pg));
1711   lgraph_type&   level_graph = *lgraph_p;
1712   graph_dist::ilu_default::graph_type&            graph(level_graph.graph);
1713 
1714   petsc::read_matrix(A, graph, get(boost::edge_weight, graph));
1715   ilu_permuted(level_graph);
1716 
1717   /* put together the new matrix */
1718   ierr = MatCreate(A->comm, fact);CHKERRQ(ierr);
1719   ierr = MatGetLocalSize(A, &m, &n);CHKERRQ(ierr);
1720   ierr = MatGetSize(A, &M, &N);CHKERRQ(ierr);
1721   ierr = MatSetSizes(*fact, m, n, M, N);CHKERRQ(ierr);
1722   ierr = MatSetType(*fact, A->type_name);CHKERRQ(ierr);
1723   ierr = MatAssemblyBegin(*fact, MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1724   ierr = MatAssemblyEnd(*fact, MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
1725   (*fact)->factor = FACTOR_LU;
1726 
1727   ierr = PetscContainerCreate(A->comm, &c);
1728   ierr = PetscContainerSetPointer(c, lgraph_p);
1729   ierr = PetscObjectCompose((PetscObject) (*fact), "graph", (PetscObject) c);
1730   PetscFunctionReturn(0);
1731 }
1732 
1733 #undef __FUNCT__
1734 #define __FUNCT__ "MatLUFactorNumeric_MPIAIJ"
1735 PetscErrorCode MatLUFactorNumeric_MPIAIJ(Mat A, MatFactorInfo *info, Mat *B)
1736 {
1737   PetscFunctionBegin;
1738   PetscFunctionReturn(0);
1739 }
1740 
1741 #undef __FUNCT__
1742 #define __FUNCT__ "MatSolve_MPIAIJ"
1743 /*
1744   This uses the parallel ILU factorization of Peter Gottschling <pgottsch@osl.iu.edu>
1745 */
1746 PetscErrorCode MatSolve_MPIAIJ(Mat A, Vec b, Vec x)
1747 {
1748   namespace graph_dist = boost::graph::distributed;
1749 
1750   typedef graph_dist::ilu_default::ilu_level_graph_type  lgraph_type;
1751   lgraph_type*   lgraph_p;
1752   PetscContainer c;
1753   PetscErrorCode ierr;
1754 
1755   PetscFunctionBegin;
1756   ierr = PetscObjectQuery((PetscObject) A, "graph", (PetscObject *) &c);CHKERRQ(ierr);
1757   ierr = PetscContainerGetPointer(c, (void **) &lgraph_p);CHKERRQ(ierr);
1758   ierr = VecCopy(b, x); CHKERRQ(ierr);
1759 
1760   PetscScalar* array_x;
1761   ierr = VecGetArray(x, &array_x);CHKERRQ(ierr);
1762   PetscInt sx;
1763   ierr = VecGetSize(x, &sx);CHKERRQ(ierr);
1764 
1765   PetscScalar* array_b;
1766   ierr = VecGetArray(b, &array_b);CHKERRQ(ierr);
1767   PetscInt sb;
1768   ierr = VecGetSize(b, &sb);CHKERRQ(ierr);
1769 
1770   lgraph_type&   level_graph = *lgraph_p;
1771   graph_dist::ilu_default::graph_type&            graph(level_graph.graph);
1772 
1773   typedef boost::multi_array_ref<PetscScalar, 1> array_ref_type;
1774   array_ref_type                                 ref_b(array_b, boost::extents[num_vertices(graph)]),
1775                                                  ref_x(array_x, boost::extents[num_vertices(graph)]);
1776 
1777   typedef boost::iterator_property_map<array_ref_type::iterator,
1778                                 boost::property_map<graph_dist::ilu_default::graph_type, boost::vertex_index_t>::type>  gvector_type;
1779   gvector_type                                   vector_b(ref_b.begin(), get(boost::vertex_index, graph)),
1780                                                  vector_x(ref_x.begin(), get(boost::vertex_index, graph));
1781 
1782   ilu_set_solve(*lgraph_p, vector_b, vector_x);
1783 
1784   PetscFunctionReturn(0);
1785 }
1786 #endif
1787 
1788 typedef struct { /* used by MatGetRedundantMatrix() for reusing matredundant */
1789   PetscInt       nzlocal,nsends,nrecvs;
1790   PetscMPIInt    *send_rank;
1791   PetscInt       *sbuf_nz,*sbuf_j,**rbuf_j;
1792   PetscScalar    *sbuf_a,**rbuf_a;
1793   PetscErrorCode (*MatDestroy)(Mat);
1794 } Mat_Redundant;
1795 
1796 #undef __FUNCT__
1797 #define __FUNCT__ "PetscContainerDestroy_MatRedundant"
1798 PetscErrorCode PetscContainerDestroy_MatRedundant(void *ptr)
1799 {
1800   PetscErrorCode       ierr;
1801   Mat_Redundant        *redund=(Mat_Redundant*)ptr;
1802   PetscInt             i;
1803 
1804   PetscFunctionBegin;
1805   ierr = PetscFree(redund->send_rank);CHKERRQ(ierr);
1806   ierr = PetscFree(redund->sbuf_j);CHKERRQ(ierr);
1807   ierr = PetscFree(redund->sbuf_a);CHKERRQ(ierr);
1808   for (i=0; i<redund->nrecvs; i++){
1809     ierr = PetscFree(redund->rbuf_j[i]);CHKERRQ(ierr);
1810     ierr = PetscFree(redund->rbuf_a[i]);CHKERRQ(ierr);
1811   }
1812   ierr = PetscFree3(redund->sbuf_nz,redund->rbuf_j,redund->rbuf_a);CHKERRQ(ierr);
1813   ierr = PetscFree(redund);CHKERRQ(ierr);
1814   PetscFunctionReturn(0);
1815 }
1816 
1817 #undef __FUNCT__
1818 #define __FUNCT__ "MatDestroy_MatRedundant"
1819 PetscErrorCode MatDestroy_MatRedundant(Mat A)
1820 {
1821   PetscErrorCode  ierr;
1822   PetscContainer  container;
1823   Mat_Redundant   *redund=PETSC_NULL;
1824 
1825   PetscFunctionBegin;
1826   ierr = PetscObjectQuery((PetscObject)A,"Mat_Redundant",(PetscObject *)&container);CHKERRQ(ierr);
1827   if (container) {
1828     ierr = PetscContainerGetPointer(container,(void **)&redund);CHKERRQ(ierr);
1829   } else {
1830     SETERRQ(PETSC_ERR_PLIB,"Container does not exit");
1831   }
1832   A->ops->destroy = redund->MatDestroy;
1833   ierr = PetscObjectCompose((PetscObject)A,"Mat_Redundant",0);CHKERRQ(ierr);
1834   ierr = (*A->ops->destroy)(A);CHKERRQ(ierr);
1835   ierr = PetscContainerDestroy(container);CHKERRQ(ierr);
1836   PetscFunctionReturn(0);
1837 }
1838 
1839 #undef __FUNCT__
1840 #define __FUNCT__ "MatGetRedundantMatrix_MPIAIJ"
1841 PetscErrorCode MatGetRedundantMatrix_MPIAIJ(Mat mat,PetscInt nsubcomm,MPI_Comm subcomm,PetscInt mlocal_sub,MatReuse reuse,Mat *matredundant)
1842 {
1843   PetscMPIInt    rank,size;
1844   MPI_Comm       comm=mat->comm;
1845   PetscErrorCode ierr;
1846   PetscInt       nsends=0,nrecvs=0,i,rownz_max=0;
1847   PetscMPIInt    *send_rank=PETSC_NULL,*recv_rank=PETSC_NULL;
1848   PetscInt       *rowrange=mat->rmap.range;
1849   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
1850   Mat            A=aij->A,B=aij->B,C=*matredundant;
1851   Mat_SeqAIJ     *a=(Mat_SeqAIJ*)A->data,*b=(Mat_SeqAIJ*)B->data;
1852   PetscScalar    *sbuf_a;
1853   PetscInt       nzlocal=a->nz+b->nz;
1854   PetscInt       j,cstart=mat->cmap.rstart,cend=mat->cmap.rend,row,nzA,nzB,ncols,*cworkA,*cworkB;
1855   PetscInt       rstart=mat->rmap.rstart,rend=mat->rmap.rend,*bmap=aij->garray,M,N;
1856   PetscInt       *cols,ctmp,lwrite,*rptr,l,*sbuf_j;
1857   PetscScalar    *vals,*aworkA,*aworkB;
1858   PetscMPIInt    tag1,tag2,tag3,imdex;
1859   MPI_Request    *s_waits1=PETSC_NULL,*s_waits2=PETSC_NULL,*s_waits3=PETSC_NULL,
1860                  *r_waits1=PETSC_NULL,*r_waits2=PETSC_NULL,*r_waits3=PETSC_NULL;
1861   MPI_Status     recv_status,*send_status;
1862   PetscInt       *sbuf_nz=PETSC_NULL,*rbuf_nz=PETSC_NULL,count;
1863   PetscInt       **rbuf_j=PETSC_NULL;
1864   PetscScalar    **rbuf_a=PETSC_NULL;
1865   Mat_Redundant  *redund=PETSC_NULL;
1866   PetscContainer container;
1867 
1868   PetscFunctionBegin;
1869   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
1870   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
1871 
1872   if (reuse == MAT_REUSE_MATRIX) {
1873     ierr = MatGetSize(C,&M,&N);CHKERRQ(ierr);
1874     if (M != N || M != mat->rmap.N) SETERRQ(PETSC_ERR_ARG_SIZ,"Cannot reuse matrix. Wrong global size");
1875     ierr = MatGetLocalSize(C,&M,&N);CHKERRQ(ierr);
1876     if (M != N || M != mlocal_sub) SETERRQ(PETSC_ERR_ARG_SIZ,"Cannot reuse matrix. Wrong local size");
1877     ierr = PetscObjectQuery((PetscObject)C,"Mat_Redundant",(PetscObject *)&container);CHKERRQ(ierr);
1878     if (container) {
1879       ierr = PetscContainerGetPointer(container,(void **)&redund);CHKERRQ(ierr);
1880     } else {
1881       SETERRQ(PETSC_ERR_PLIB,"Container does not exit");
1882     }
1883     if (nzlocal != redund->nzlocal) SETERRQ(PETSC_ERR_ARG_SIZ,"Cannot reuse matrix. Wrong nzlocal");
1884 
1885     nsends    = redund->nsends;
1886     nrecvs    = redund->nrecvs;
1887     send_rank = redund->send_rank; recv_rank = send_rank + size;
1888     sbuf_nz   = redund->sbuf_nz;     rbuf_nz = sbuf_nz + nsends;
1889     sbuf_j    = redund->sbuf_j;
1890     sbuf_a    = redund->sbuf_a;
1891     rbuf_j    = redund->rbuf_j;
1892     rbuf_a    = redund->rbuf_a;
1893   }
1894 
1895   if (reuse == MAT_INITIAL_MATRIX){
1896     PetscMPIInt  subrank,subsize;
1897     PetscInt     nleftover,np_subcomm;
1898     /* get the destination processors' id send_rank, nsends and nrecvs */
1899     ierr = MPI_Comm_rank(subcomm,&subrank);CHKERRQ(ierr);
1900     ierr = MPI_Comm_size(subcomm,&subsize);CHKERRQ(ierr);
1901     ierr = PetscMalloc((2*size+1)*sizeof(PetscMPIInt),&send_rank);
1902     recv_rank = send_rank + size;
1903     np_subcomm = size/nsubcomm;
1904     nleftover  = size - nsubcomm*np_subcomm;
1905     nsends = 0; nrecvs = 0;
1906     for (i=0; i<size; i++){ /* i=rank*/
1907       if (subrank == i/nsubcomm && rank != i){ /* my_subrank == other's subrank */
1908         send_rank[nsends] = i; nsends++;
1909         recv_rank[nrecvs++] = i;
1910       }
1911     }
1912     if (rank >= size - nleftover){/* this proc is a leftover processor */
1913       i = size-nleftover-1;
1914       j = 0;
1915       while (j < nsubcomm - nleftover){
1916         send_rank[nsends++] = i;
1917         i--; j++;
1918       }
1919     }
1920 
1921     if (nleftover && subsize == size/nsubcomm && subrank==subsize-1){ /* this proc recvs from leftover processors */
1922       for (i=0; i<nleftover; i++){
1923         recv_rank[nrecvs++] = size-nleftover+i;
1924       }
1925     }
1926 
1927     /* allocate sbuf_j, sbuf_a */
1928     i = nzlocal + rowrange[rank+1] - rowrange[rank] + 2;
1929     ierr = PetscMalloc(i*sizeof(PetscInt),&sbuf_j);CHKERRQ(ierr);
1930     ierr = PetscMalloc((nzlocal+1)*sizeof(PetscScalar),&sbuf_a);CHKERRQ(ierr);
1931   } /* endof if (reuse == MAT_INITIAL_MATRIX) */
1932 
1933   /* copy mat's local entries into the buffers */
1934   if (reuse == MAT_INITIAL_MATRIX){
1935     rownz_max = 0;
1936     rptr = sbuf_j;
1937     cols = sbuf_j + rend-rstart + 1;
1938     vals = sbuf_a;
1939     rptr[0] = 0;
1940     for (i=0; i<rend-rstart; i++){
1941       row = i + rstart;
1942       nzA    = a->i[i+1] - a->i[i]; nzB = b->i[i+1] - b->i[i];
1943       ncols  = nzA + nzB;
1944       cworkA = a->j + a->i[i]; cworkB = b->j + b->i[i];
1945       aworkA = a->a + a->i[i]; aworkB = b->a + b->i[i];
1946       /* load the column indices for this row into cols */
1947       lwrite = 0;
1948       for (l=0; l<nzB; l++) {
1949         if ((ctmp = bmap[cworkB[l]]) < cstart){
1950           vals[lwrite]   = aworkB[l];
1951           cols[lwrite++] = ctmp;
1952         }
1953       }
1954       for (l=0; l<nzA; l++){
1955         vals[lwrite]   = aworkA[l];
1956         cols[lwrite++] = cstart + cworkA[l];
1957       }
1958       for (l=0; l<nzB; l++) {
1959         if ((ctmp = bmap[cworkB[l]]) >= cend){
1960           vals[lwrite]   = aworkB[l];
1961           cols[lwrite++] = ctmp;
1962         }
1963       }
1964       vals += ncols;
1965       cols += ncols;
1966       rptr[i+1] = rptr[i] + ncols;
1967       if (rownz_max < ncols) rownz_max = ncols;
1968     }
1969     if (rptr[rend-rstart] != a->nz + b->nz) SETERRQ4(1, "rptr[%d] %d != %d + %d",rend-rstart,rptr[rend-rstart+1],a->nz,b->nz);
1970   } else { /* only copy matrix values into sbuf_a */
1971     rptr = sbuf_j;
1972     vals = sbuf_a;
1973     rptr[0] = 0;
1974     for (i=0; i<rend-rstart; i++){
1975       row = i + rstart;
1976       nzA    = a->i[i+1] - a->i[i]; nzB = b->i[i+1] - b->i[i];
1977       ncols  = nzA + nzB;
1978       cworkA = a->j + a->i[i]; cworkB = b->j + b->i[i];
1979       aworkA = a->a + a->i[i]; aworkB = b->a + b->i[i];
1980       lwrite = 0;
1981       for (l=0; l<nzB; l++) {
1982         if ((ctmp = bmap[cworkB[l]]) < cstart) vals[lwrite++] = aworkB[l];
1983       }
1984       for (l=0; l<nzA; l++) vals[lwrite++] = aworkA[l];
1985       for (l=0; l<nzB; l++) {
1986         if ((ctmp = bmap[cworkB[l]]) >= cend) vals[lwrite++] = aworkB[l];
1987       }
1988       vals += ncols;
1989       rptr[i+1] = rptr[i] + ncols;
1990     }
1991   } /* endof if (reuse == MAT_INITIAL_MATRIX) */
1992 
1993   /* send nzlocal to others, and recv other's nzlocal */
1994   /*--------------------------------------------------*/
1995   if (reuse == MAT_INITIAL_MATRIX){
1996     ierr = PetscMalloc2(3*(nsends + nrecvs)+1,MPI_Request,&s_waits3,nsends+1,MPI_Status,&send_status);CHKERRQ(ierr);
1997     s_waits2 = s_waits3 + nsends;
1998     s_waits1 = s_waits2 + nsends;
1999     r_waits1 = s_waits1 + nsends;
2000     r_waits2 = r_waits1 + nrecvs;
2001     r_waits3 = r_waits2 + nrecvs;
2002   } else {
2003     ierr = PetscMalloc2(nsends + nrecvs +1,MPI_Request,&s_waits3,nsends+1,MPI_Status,&send_status);CHKERRQ(ierr);
2004     r_waits3 = s_waits3 + nsends;
2005   }
2006 
2007   ierr = PetscObjectGetNewTag((PetscObject)mat,&tag3);CHKERRQ(ierr);
2008   if (reuse == MAT_INITIAL_MATRIX){
2009     /* get new tags to keep the communication clean */
2010     ierr = PetscObjectGetNewTag((PetscObject)mat,&tag1);CHKERRQ(ierr);
2011     ierr = PetscObjectGetNewTag((PetscObject)mat,&tag2);CHKERRQ(ierr);
2012     ierr = PetscMalloc3(nsends+nrecvs+1,PetscInt,&sbuf_nz,nrecvs,PetscInt*,&rbuf_j,nrecvs,PetscScalar*,&rbuf_a);CHKERRQ(ierr);
2013     rbuf_nz = sbuf_nz + nsends;
2014 
2015     /* post receives of other's nzlocal */
2016     for (i=0; i<nrecvs; i++){
2017       ierr = MPI_Irecv(rbuf_nz+i,1,MPIU_INT,MPI_ANY_SOURCE,tag1,comm,r_waits1+i);CHKERRQ(ierr);
2018     }
2019     /* send nzlocal to others */
2020     for (i=0; i<nsends; i++){
2021       sbuf_nz[i] = nzlocal;
2022       ierr = MPI_Isend(sbuf_nz+i,1,MPIU_INT,send_rank[i],tag1,comm,s_waits1+i);CHKERRQ(ierr);
2023     }
2024     /* wait on receives of nzlocal; allocate space for rbuf_j, rbuf_a */
2025     count = nrecvs;
2026     while (count) {
2027       ierr = MPI_Waitany(nrecvs,r_waits1,&imdex,&recv_status);CHKERRQ(ierr);
2028       recv_rank[imdex] = recv_status.MPI_SOURCE;
2029       /* allocate rbuf_a and rbuf_j; then post receives of rbuf_j */
2030       ierr = PetscMalloc((rbuf_nz[imdex]+1)*sizeof(PetscScalar),&rbuf_a[imdex]);CHKERRQ(ierr);
2031 
2032       i = rowrange[recv_status.MPI_SOURCE+1] - rowrange[recv_status.MPI_SOURCE]; /* number of expected mat->i */
2033       rbuf_nz[imdex] += i + 2;
2034       ierr = PetscMalloc(rbuf_nz[imdex]*sizeof(PetscInt),&rbuf_j[imdex]);CHKERRQ(ierr);
2035       ierr = MPI_Irecv(rbuf_j[imdex],rbuf_nz[imdex],MPIU_INT,recv_status.MPI_SOURCE,tag2,comm,r_waits2+imdex);CHKERRQ(ierr);
2036       count--;
2037     }
2038     /* wait on sends of nzlocal */
2039     if (nsends) {ierr = MPI_Waitall(nsends,s_waits1,send_status);CHKERRQ(ierr);}
2040     /* send mat->i,j to others, and recv from other's */
2041     /*------------------------------------------------*/
2042     for (i=0; i<nsends; i++){
2043       j = nzlocal + rowrange[rank+1] - rowrange[rank] + 1;
2044       ierr = MPI_Isend(sbuf_j,j,MPIU_INT,send_rank[i],tag2,comm,s_waits2+i);CHKERRQ(ierr);
2045     }
2046     /* wait on receives of mat->i,j */
2047     /*------------------------------*/
2048     count = nrecvs;
2049     while (count) {
2050       ierr = MPI_Waitany(nrecvs,r_waits2,&imdex,&recv_status);CHKERRQ(ierr);
2051       if (recv_rank[imdex] != recv_status.MPI_SOURCE) SETERRQ2(1, "recv_rank %d != MPI_SOURCE %d",recv_rank[imdex],recv_status.MPI_SOURCE);
2052       count--;
2053     }
2054     /* wait on sends of mat->i,j */
2055     /*---------------------------*/
2056     if (nsends) {
2057       ierr = MPI_Waitall(nsends,s_waits2,send_status);CHKERRQ(ierr);
2058     }
2059   } /* endof if (reuse == MAT_INITIAL_MATRIX) */
2060 
2061   /* post receives, send and receive mat->a */
2062   /*----------------------------------------*/
2063   for (imdex=0; imdex<nrecvs; imdex++) {
2064     ierr = MPI_Irecv(rbuf_a[imdex],rbuf_nz[imdex],MPIU_SCALAR,recv_rank[imdex],tag3,comm,r_waits3+imdex);CHKERRQ(ierr);
2065   }
2066   for (i=0; i<nsends; i++){
2067     ierr = MPI_Isend(sbuf_a,nzlocal,MPIU_SCALAR,send_rank[i],tag3,comm,s_waits3+i);CHKERRQ(ierr);
2068   }
2069   count = nrecvs;
2070   while (count) {
2071     ierr = MPI_Waitany(nrecvs,r_waits3,&imdex,&recv_status);CHKERRQ(ierr);
2072     if (recv_rank[imdex] != recv_status.MPI_SOURCE) SETERRQ2(1, "recv_rank %d != MPI_SOURCE %d",recv_rank[imdex],recv_status.MPI_SOURCE);
2073     count--;
2074   }
2075   if (nsends) {
2076     ierr = MPI_Waitall(nsends,s_waits3,send_status);CHKERRQ(ierr);
2077   }
2078 
2079   ierr = PetscFree2(s_waits3,send_status);CHKERRQ(ierr);
2080 
2081   /* create redundant matrix */
2082   /*-------------------------*/
2083   if (reuse == MAT_INITIAL_MATRIX){
2084     /* compute rownz_max for preallocation */
2085     for (imdex=0; imdex<nrecvs; imdex++){
2086       j = rowrange[recv_rank[imdex]+1] - rowrange[recv_rank[imdex]];
2087       rptr = rbuf_j[imdex];
2088       for (i=0; i<j; i++){
2089         ncols = rptr[i+1] - rptr[i];
2090         if (rownz_max < ncols) rownz_max = ncols;
2091       }
2092     }
2093 
2094     ierr = MatCreate(subcomm,&C);CHKERRQ(ierr);
2095     ierr = MatSetSizes(C,mlocal_sub,mlocal_sub,PETSC_DECIDE,PETSC_DECIDE);CHKERRQ(ierr);
2096     ierr = MatSetFromOptions(C);CHKERRQ(ierr);
2097     ierr = MatSeqAIJSetPreallocation(C,rownz_max,PETSC_NULL);CHKERRQ(ierr);
2098     ierr = MatMPIAIJSetPreallocation(C,rownz_max,PETSC_NULL,rownz_max,PETSC_NULL);CHKERRQ(ierr);
2099   } else {
2100     C = *matredundant;
2101   }
2102 
2103   /* insert local matrix entries */
2104   rptr = sbuf_j;
2105   cols = sbuf_j + rend-rstart + 1;
2106   vals = sbuf_a;
2107   for (i=0; i<rend-rstart; i++){
2108     row   = i + rstart;
2109     ncols = rptr[i+1] - rptr[i];
2110     ierr = MatSetValues(C,1,&row,ncols,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
2111     vals += ncols;
2112     cols += ncols;
2113   }
2114   /* insert received matrix entries */
2115   for (imdex=0; imdex<nrecvs; imdex++){
2116     rstart = rowrange[recv_rank[imdex]];
2117     rend   = rowrange[recv_rank[imdex]+1];
2118     rptr = rbuf_j[imdex];
2119     cols = rbuf_j[imdex] + rend-rstart + 1;
2120     vals = rbuf_a[imdex];
2121     for (i=0; i<rend-rstart; i++){
2122       row   = i + rstart;
2123       ncols = rptr[i+1] - rptr[i];
2124       ierr = MatSetValues(C,1,&row,ncols,cols,vals,INSERT_VALUES);CHKERRQ(ierr);
2125       vals += ncols;
2126       cols += ncols;
2127     }
2128   }
2129   ierr = MatAssemblyBegin(C,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2130   ierr = MatAssemblyEnd(C,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2131   ierr = MatGetSize(C,&M,&N);CHKERRQ(ierr);
2132   if (M != mat->rmap.N || N != mat->cmap.N) SETERRQ2(PETSC_ERR_ARG_INCOMP,"redundant mat size %d != input mat size %d",M,mat->rmap.N);
2133   if (reuse == MAT_INITIAL_MATRIX){
2134     PetscContainer container;
2135     *matredundant = C;
2136     /* create a supporting struct and attach it to C for reuse */
2137     ierr = PetscNewLog(C,Mat_Redundant,&redund);CHKERRQ(ierr);
2138     ierr = PetscContainerCreate(PETSC_COMM_SELF,&container);CHKERRQ(ierr);
2139     ierr = PetscContainerSetPointer(container,redund);CHKERRQ(ierr);
2140     ierr = PetscObjectCompose((PetscObject)C,"Mat_Redundant",(PetscObject)container);CHKERRQ(ierr);
2141     ierr = PetscContainerSetUserDestroy(container,PetscContainerDestroy_MatRedundant);CHKERRQ(ierr);
2142 
2143     redund->nzlocal = nzlocal;
2144     redund->nsends  = nsends;
2145     redund->nrecvs  = nrecvs;
2146     redund->send_rank = send_rank;
2147     redund->sbuf_nz = sbuf_nz;
2148     redund->sbuf_j  = sbuf_j;
2149     redund->sbuf_a  = sbuf_a;
2150     redund->rbuf_j  = rbuf_j;
2151     redund->rbuf_a  = rbuf_a;
2152 
2153     redund->MatDestroy = C->ops->destroy;
2154     C->ops->destroy    = MatDestroy_MatRedundant;
2155   }
2156   PetscFunctionReturn(0);
2157 }
2158 
2159 #undef __FUNCT__
2160 #define __FUNCT__ "MatGetRowMin_MPIAIJ"
2161 PetscErrorCode MatGetRowMin_MPIAIJ(Mat A, Vec v, PetscInt idx[])
2162 {
2163   Mat_MPIAIJ    *mat    = (Mat_MPIAIJ *) A->data;
2164   PetscInt       n      = A->rmap.n;
2165   PetscInt       cstart = A->cmap.rstart;
2166   PetscInt      *cmap   = mat->garray;
2167   PetscInt      *diagIdx, *offdiagIdx;
2168   Vec            diagV, offdiagV;
2169   PetscScalar   *a, *diagA, *offdiagA;
2170   PetscInt       r;
2171   PetscErrorCode ierr;
2172 
2173   PetscFunctionBegin;
2174   ierr = PetscMalloc2(n,PetscInt,&diagIdx,n,PetscInt,&offdiagIdx);CHKERRQ(ierr);
2175   ierr = VecCreateSeq(A->comm, n, &diagV);CHKERRQ(ierr);
2176   ierr = VecCreateSeq(A->comm, n, &offdiagV);CHKERRQ(ierr);
2177   ierr = MatGetRowMin(mat->A, diagV,    diagIdx);CHKERRQ(ierr);
2178   ierr = MatGetRowMin(mat->B, offdiagV, offdiagIdx);CHKERRQ(ierr);
2179   ierr = VecGetArray(v,        &a);CHKERRQ(ierr);
2180   ierr = VecGetArray(diagV,    &diagA);CHKERRQ(ierr);
2181   ierr = VecGetArray(offdiagV, &offdiagA);CHKERRQ(ierr);
2182   for(r = 0; r < n; ++r) {
2183     if (PetscAbsScalar(diagA[r]) <= PetscAbsScalar(offdiagA[r])) {
2184       a[r]   = diagA[r];
2185       idx[r] = cstart + diagIdx[r];
2186     } else {
2187       a[r]   = offdiagA[r];
2188       idx[r] = cmap[offdiagIdx[r]];
2189     }
2190   }
2191   ierr = VecRestoreArray(v,        &a);CHKERRQ(ierr);
2192   ierr = VecRestoreArray(diagV,    &diagA);CHKERRQ(ierr);
2193   ierr = VecRestoreArray(offdiagV, &offdiagA);CHKERRQ(ierr);
2194   ierr = VecDestroy(diagV);CHKERRQ(ierr);
2195   ierr = VecDestroy(offdiagV);CHKERRQ(ierr);
2196   ierr = PetscFree2(diagIdx, offdiagIdx);CHKERRQ(ierr);
2197   PetscFunctionReturn(0);
2198 }
2199 
2200 /* -------------------------------------------------------------------*/
2201 static struct _MatOps MatOps_Values = {MatSetValues_MPIAIJ,
2202        MatGetRow_MPIAIJ,
2203        MatRestoreRow_MPIAIJ,
2204        MatMult_MPIAIJ,
2205 /* 4*/ MatMultAdd_MPIAIJ,
2206        MatMultTranspose_MPIAIJ,
2207        MatMultTransposeAdd_MPIAIJ,
2208 #ifdef PETSC_HAVE_PBGL
2209        MatSolve_MPIAIJ,
2210 #else
2211        0,
2212 #endif
2213        0,
2214        0,
2215 /*10*/ 0,
2216        0,
2217        0,
2218        MatRelax_MPIAIJ,
2219        MatTranspose_MPIAIJ,
2220 /*15*/ MatGetInfo_MPIAIJ,
2221        MatEqual_MPIAIJ,
2222        MatGetDiagonal_MPIAIJ,
2223        MatDiagonalScale_MPIAIJ,
2224        MatNorm_MPIAIJ,
2225 /*20*/ MatAssemblyBegin_MPIAIJ,
2226        MatAssemblyEnd_MPIAIJ,
2227        0,
2228        MatSetOption_MPIAIJ,
2229        MatZeroEntries_MPIAIJ,
2230 /*25*/ MatZeroRows_MPIAIJ,
2231        0,
2232 #ifdef PETSC_HAVE_PBGL
2233        MatLUFactorNumeric_MPIAIJ,
2234 #else
2235        0,
2236 #endif
2237        0,
2238        0,
2239 /*30*/ MatSetUpPreallocation_MPIAIJ,
2240 #ifdef PETSC_HAVE_PBGL
2241        MatILUFactorSymbolic_MPIAIJ,
2242 #else
2243        0,
2244 #endif
2245        0,
2246        0,
2247        0,
2248 /*35*/ MatDuplicate_MPIAIJ,
2249        0,
2250        0,
2251        0,
2252        0,
2253 /*40*/ MatAXPY_MPIAIJ,
2254        MatGetSubMatrices_MPIAIJ,
2255        MatIncreaseOverlap_MPIAIJ,
2256        MatGetValues_MPIAIJ,
2257        MatCopy_MPIAIJ,
2258 /*45*/ 0,
2259        MatScale_MPIAIJ,
2260        0,
2261        0,
2262        0,
2263 /*50*/ MatSetBlockSize_MPIAIJ,
2264        0,
2265        0,
2266        0,
2267        0,
2268 /*55*/ MatFDColoringCreate_MPIAIJ,
2269        0,
2270        MatSetUnfactored_MPIAIJ,
2271        MatPermute_MPIAIJ,
2272        0,
2273 /*60*/ MatGetSubMatrix_MPIAIJ,
2274        MatDestroy_MPIAIJ,
2275        MatView_MPIAIJ,
2276        0,
2277        0,
2278 /*65*/ 0,
2279        0,
2280        0,
2281        0,
2282        0,
2283 /*70*/ 0,
2284        0,
2285        MatSetColoring_MPIAIJ,
2286 #if defined(PETSC_HAVE_ADIC)
2287        MatSetValuesAdic_MPIAIJ,
2288 #else
2289        0,
2290 #endif
2291        MatSetValuesAdifor_MPIAIJ,
2292 /*75*/ 0,
2293        0,
2294        0,
2295        0,
2296        0,
2297 /*80*/ 0,
2298        0,
2299        0,
2300        0,
2301 /*84*/ MatLoad_MPIAIJ,
2302        0,
2303        0,
2304        0,
2305        0,
2306        0,
2307 /*90*/ MatMatMult_MPIAIJ_MPIAIJ,
2308        MatMatMultSymbolic_MPIAIJ_MPIAIJ,
2309        MatMatMultNumeric_MPIAIJ_MPIAIJ,
2310        MatPtAP_Basic,
2311        MatPtAPSymbolic_MPIAIJ,
2312 /*95*/ MatPtAPNumeric_MPIAIJ,
2313        0,
2314        0,
2315        0,
2316        0,
2317 /*100*/0,
2318        MatPtAPSymbolic_MPIAIJ_MPIAIJ,
2319        MatPtAPNumeric_MPIAIJ_MPIAIJ,
2320        MatConjugate_MPIAIJ,
2321        0,
2322 /*105*/MatSetValuesRow_MPIAIJ,
2323        MatRealPart_MPIAIJ,
2324        MatImaginaryPart_MPIAIJ,
2325        0,
2326        0,
2327 /*110*/0,
2328        MatGetRedundantMatrix_MPIAIJ,
2329        MatGetRowMin_MPIAIJ};
2330 
2331 /* ----------------------------------------------------------------------------------------*/
2332 
2333 EXTERN_C_BEGIN
2334 #undef __FUNCT__
2335 #define __FUNCT__ "MatStoreValues_MPIAIJ"
2336 PetscErrorCode PETSCMAT_DLLEXPORT MatStoreValues_MPIAIJ(Mat mat)
2337 {
2338   Mat_MPIAIJ     *aij = (Mat_MPIAIJ *)mat->data;
2339   PetscErrorCode ierr;
2340 
2341   PetscFunctionBegin;
2342   ierr = MatStoreValues(aij->A);CHKERRQ(ierr);
2343   ierr = MatStoreValues(aij->B);CHKERRQ(ierr);
2344   PetscFunctionReturn(0);
2345 }
2346 EXTERN_C_END
2347 
2348 EXTERN_C_BEGIN
2349 #undef __FUNCT__
2350 #define __FUNCT__ "MatRetrieveValues_MPIAIJ"
2351 PetscErrorCode PETSCMAT_DLLEXPORT MatRetrieveValues_MPIAIJ(Mat mat)
2352 {
2353   Mat_MPIAIJ     *aij = (Mat_MPIAIJ *)mat->data;
2354   PetscErrorCode ierr;
2355 
2356   PetscFunctionBegin;
2357   ierr = MatRetrieveValues(aij->A);CHKERRQ(ierr);
2358   ierr = MatRetrieveValues(aij->B);CHKERRQ(ierr);
2359   PetscFunctionReturn(0);
2360 }
2361 EXTERN_C_END
2362 
2363 #include "petscpc.h"
2364 EXTERN_C_BEGIN
2365 #undef __FUNCT__
2366 #define __FUNCT__ "MatMPIAIJSetPreallocation_MPIAIJ"
2367 PetscErrorCode PETSCMAT_DLLEXPORT MatMPIAIJSetPreallocation_MPIAIJ(Mat B,PetscInt d_nz,const PetscInt d_nnz[],PetscInt o_nz,const PetscInt o_nnz[])
2368 {
2369   Mat_MPIAIJ     *b;
2370   PetscErrorCode ierr;
2371   PetscInt       i;
2372 
2373   PetscFunctionBegin;
2374   B->preallocated = PETSC_TRUE;
2375   if (d_nz == PETSC_DEFAULT || d_nz == PETSC_DECIDE) d_nz = 5;
2376   if (o_nz == PETSC_DEFAULT || o_nz == PETSC_DECIDE) o_nz = 2;
2377   if (d_nz < 0) SETERRQ1(PETSC_ERR_ARG_OUTOFRANGE,"d_nz cannot be less than 0: value %D",d_nz);
2378   if (o_nz < 0) SETERRQ1(PETSC_ERR_ARG_OUTOFRANGE,"o_nz cannot be less than 0: value %D",o_nz);
2379 
2380   B->rmap.bs = B->cmap.bs = 1;
2381   ierr = PetscMapSetUp(&B->rmap);CHKERRQ(ierr);
2382   ierr = PetscMapSetUp(&B->cmap);CHKERRQ(ierr);
2383   if (d_nnz) {
2384     for (i=0; i<B->rmap.n; i++) {
2385       if (d_nnz[i] < 0) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"d_nnz cannot be less than 0: local row %D value %D",i,d_nnz[i]);
2386     }
2387   }
2388   if (o_nnz) {
2389     for (i=0; i<B->rmap.n; i++) {
2390       if (o_nnz[i] < 0) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"o_nnz cannot be less than 0: local row %D value %D",i,o_nnz[i]);
2391     }
2392   }
2393   b = (Mat_MPIAIJ*)B->data;
2394 
2395   /* Explicitly create 2 MATSEQAIJ matrices. */
2396   ierr = MatCreate(PETSC_COMM_SELF,&b->A);CHKERRQ(ierr);
2397   ierr = MatSetSizes(b->A,B->rmap.n,B->cmap.n,B->rmap.n,B->cmap.n);CHKERRQ(ierr);
2398   ierr = MatSetType(b->A,MATSEQAIJ);CHKERRQ(ierr);
2399   ierr = PetscLogObjectParent(B,b->A);CHKERRQ(ierr);
2400   ierr = MatCreate(PETSC_COMM_SELF,&b->B);CHKERRQ(ierr);
2401   ierr = MatSetSizes(b->B,B->rmap.n,B->cmap.N,B->rmap.n,B->cmap.N);CHKERRQ(ierr);
2402   ierr = MatSetType(b->B,MATSEQAIJ);CHKERRQ(ierr);
2403   ierr = PetscLogObjectParent(B,b->B);CHKERRQ(ierr);
2404 
2405   ierr = MatSeqAIJSetPreallocation(b->A,d_nz,d_nnz);CHKERRQ(ierr);
2406   ierr = MatSeqAIJSetPreallocation(b->B,o_nz,o_nnz);CHKERRQ(ierr);
2407 
2408   PetscFunctionReturn(0);
2409 }
2410 EXTERN_C_END
2411 
2412 #undef __FUNCT__
2413 #define __FUNCT__ "MatDuplicate_MPIAIJ"
2414 PetscErrorCode MatDuplicate_MPIAIJ(Mat matin,MatDuplicateOption cpvalues,Mat *newmat)
2415 {
2416   Mat            mat;
2417   Mat_MPIAIJ     *a,*oldmat = (Mat_MPIAIJ*)matin->data;
2418   PetscErrorCode ierr;
2419 
2420   PetscFunctionBegin;
2421   *newmat       = 0;
2422   ierr = MatCreate(matin->comm,&mat);CHKERRQ(ierr);
2423   ierr = MatSetSizes(mat,matin->rmap.n,matin->cmap.n,matin->rmap.N,matin->cmap.N);CHKERRQ(ierr);
2424   ierr = MatSetType(mat,matin->type_name);CHKERRQ(ierr);
2425   ierr = PetscMemcpy(mat->ops,matin->ops,sizeof(struct _MatOps));CHKERRQ(ierr);
2426   a    = (Mat_MPIAIJ*)mat->data;
2427 
2428   mat->factor       = matin->factor;
2429   mat->rmap.bs      = matin->rmap.bs;
2430   mat->assembled    = PETSC_TRUE;
2431   mat->insertmode   = NOT_SET_VALUES;
2432   mat->preallocated = PETSC_TRUE;
2433 
2434   a->size           = oldmat->size;
2435   a->rank           = oldmat->rank;
2436   a->donotstash     = oldmat->donotstash;
2437   a->roworiented    = oldmat->roworiented;
2438   a->rowindices     = 0;
2439   a->rowvalues      = 0;
2440   a->getrowactive   = PETSC_FALSE;
2441 
2442   ierr = PetscMapCopy(mat->comm,&matin->rmap,&mat->rmap);CHKERRQ(ierr);
2443   ierr = PetscMapCopy(mat->comm,&matin->cmap,&mat->cmap);CHKERRQ(ierr);
2444 
2445   ierr = MatStashCreate_Private(matin->comm,1,&mat->stash);CHKERRQ(ierr);
2446   if (oldmat->colmap) {
2447 #if defined (PETSC_USE_CTABLE)
2448     ierr = PetscTableCreateCopy(oldmat->colmap,&a->colmap);CHKERRQ(ierr);
2449 #else
2450     ierr = PetscMalloc((mat->cmap.N)*sizeof(PetscInt),&a->colmap);CHKERRQ(ierr);
2451     ierr = PetscLogObjectMemory(mat,(mat->cmap.N)*sizeof(PetscInt));CHKERRQ(ierr);
2452     ierr = PetscMemcpy(a->colmap,oldmat->colmap,(mat->cmap.N)*sizeof(PetscInt));CHKERRQ(ierr);
2453 #endif
2454   } else a->colmap = 0;
2455   if (oldmat->garray) {
2456     PetscInt len;
2457     len  = oldmat->B->cmap.n;
2458     ierr = PetscMalloc((len+1)*sizeof(PetscInt),&a->garray);CHKERRQ(ierr);
2459     ierr = PetscLogObjectMemory(mat,len*sizeof(PetscInt));CHKERRQ(ierr);
2460     if (len) { ierr = PetscMemcpy(a->garray,oldmat->garray,len*sizeof(PetscInt));CHKERRQ(ierr); }
2461   } else a->garray = 0;
2462 
2463   ierr = VecDuplicate(oldmat->lvec,&a->lvec);CHKERRQ(ierr);
2464   ierr = PetscLogObjectParent(mat,a->lvec);CHKERRQ(ierr);
2465   ierr = VecScatterCopy(oldmat->Mvctx,&a->Mvctx);CHKERRQ(ierr);
2466   ierr = PetscLogObjectParent(mat,a->Mvctx);CHKERRQ(ierr);
2467   ierr = MatDuplicate(oldmat->A,cpvalues,&a->A);CHKERRQ(ierr);
2468   ierr = PetscLogObjectParent(mat,a->A);CHKERRQ(ierr);
2469   ierr = MatDuplicate(oldmat->B,cpvalues,&a->B);CHKERRQ(ierr);
2470   ierr = PetscLogObjectParent(mat,a->B);CHKERRQ(ierr);
2471   ierr = PetscFListDuplicate(matin->qlist,&mat->qlist);CHKERRQ(ierr);
2472   *newmat = mat;
2473   PetscFunctionReturn(0);
2474 }
2475 
2476 #include "petscsys.h"
2477 
2478 #undef __FUNCT__
2479 #define __FUNCT__ "MatLoad_MPIAIJ"
2480 PetscErrorCode MatLoad_MPIAIJ(PetscViewer viewer, MatType type,Mat *newmat)
2481 {
2482   Mat            A;
2483   PetscScalar    *vals,*svals;
2484   MPI_Comm       comm = ((PetscObject)viewer)->comm;
2485   MPI_Status     status;
2486   PetscErrorCode ierr;
2487   PetscMPIInt    rank,size,tag = ((PetscObject)viewer)->tag,maxnz;
2488   PetscInt       i,nz,j,rstart,rend,mmax;
2489   PetscInt       header[4],*rowlengths = 0,M,N,m,*cols;
2490   PetscInt       *ourlens = PETSC_NULL,*procsnz = PETSC_NULL,*offlens = PETSC_NULL,jj,*mycols,*smycols;
2491   PetscInt       cend,cstart,n,*rowners;
2492   int            fd;
2493 
2494   PetscFunctionBegin;
2495   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
2496   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
2497   if (!rank) {
2498     ierr = PetscViewerBinaryGetDescriptor(viewer,&fd);CHKERRQ(ierr);
2499     ierr = PetscBinaryRead(fd,(char *)header,4,PETSC_INT);CHKERRQ(ierr);
2500     if (header[0] != MAT_FILE_COOKIE) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,"not matrix object");
2501   }
2502 
2503   ierr = MPI_Bcast(header+1,3,MPIU_INT,0,comm);CHKERRQ(ierr);
2504   M = header[1]; N = header[2];
2505   /* determine ownership of all rows */
2506   m    = M/size + ((M % size) > rank);
2507   ierr = PetscMalloc((size+1)*sizeof(PetscInt),&rowners);CHKERRQ(ierr);
2508   ierr = MPI_Allgather(&m,1,MPIU_INT,rowners+1,1,MPIU_INT,comm);CHKERRQ(ierr);
2509 
2510   /* First process needs enough room for process with most rows */
2511   if (!rank) {
2512     mmax       = rowners[1];
2513     for (i=2; i<size; i++) {
2514       mmax = PetscMax(mmax,rowners[i]);
2515     }
2516   } else mmax = m;
2517 
2518   rowners[0] = 0;
2519   for (i=2; i<=size; i++) {
2520     rowners[i] += rowners[i-1];
2521   }
2522   rstart = rowners[rank];
2523   rend   = rowners[rank+1];
2524 
2525   /* distribute row lengths to all processors */
2526   ierr    = PetscMalloc2(mmax,PetscInt,&ourlens,mmax,PetscInt,&offlens);CHKERRQ(ierr);
2527   if (!rank) {
2528     ierr = PetscBinaryRead(fd,ourlens,m,PETSC_INT);CHKERRQ(ierr);
2529     ierr = PetscMalloc(m*sizeof(PetscInt),&rowlengths);CHKERRQ(ierr);
2530     ierr = PetscMalloc(size*sizeof(PetscInt),&procsnz);CHKERRQ(ierr);
2531     ierr = PetscMemzero(procsnz,size*sizeof(PetscInt));CHKERRQ(ierr);
2532     for (j=0; j<m; j++) {
2533       procsnz[0] += ourlens[j];
2534     }
2535     for (i=1; i<size; i++) {
2536       ierr = PetscBinaryRead(fd,rowlengths,rowners[i+1]-rowners[i],PETSC_INT);CHKERRQ(ierr);
2537       /* calculate the number of nonzeros on each processor */
2538       for (j=0; j<rowners[i+1]-rowners[i]; j++) {
2539         procsnz[i] += rowlengths[j];
2540       }
2541       ierr = MPI_Send(rowlengths,rowners[i+1]-rowners[i],MPIU_INT,i,tag,comm);CHKERRQ(ierr);
2542     }
2543     ierr = PetscFree(rowlengths);CHKERRQ(ierr);
2544   } else {
2545     ierr = MPI_Recv(ourlens,m,MPIU_INT,0,tag,comm,&status);CHKERRQ(ierr);
2546   }
2547 
2548   if (!rank) {
2549     /* determine max buffer needed and allocate it */
2550     maxnz = 0;
2551     for (i=0; i<size; i++) {
2552       maxnz = PetscMax(maxnz,procsnz[i]);
2553     }
2554     ierr = PetscMalloc(maxnz*sizeof(PetscInt),&cols);CHKERRQ(ierr);
2555 
2556     /* read in my part of the matrix column indices  */
2557     nz   = procsnz[0];
2558     ierr = PetscMalloc(nz*sizeof(PetscInt),&mycols);CHKERRQ(ierr);
2559     ierr = PetscBinaryRead(fd,mycols,nz,PETSC_INT);CHKERRQ(ierr);
2560 
2561     /* read in every one elses and ship off */
2562     for (i=1; i<size; i++) {
2563       nz   = procsnz[i];
2564       ierr = PetscBinaryRead(fd,cols,nz,PETSC_INT);CHKERRQ(ierr);
2565       ierr = MPI_Send(cols,nz,MPIU_INT,i,tag,comm);CHKERRQ(ierr);
2566     }
2567     ierr = PetscFree(cols);CHKERRQ(ierr);
2568   } else {
2569     /* determine buffer space needed for message */
2570     nz = 0;
2571     for (i=0; i<m; i++) {
2572       nz += ourlens[i];
2573     }
2574     ierr = PetscMalloc(nz*sizeof(PetscInt),&mycols);CHKERRQ(ierr);
2575 
2576     /* receive message of column indices*/
2577     ierr = MPI_Recv(mycols,nz,MPIU_INT,0,tag,comm,&status);CHKERRQ(ierr);
2578     ierr = MPI_Get_count(&status,MPIU_INT,&maxnz);CHKERRQ(ierr);
2579     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,"something is wrong with file");
2580   }
2581 
2582   /* determine column ownership if matrix is not square */
2583   if (N != M) {
2584     n      = N/size + ((N % size) > rank);
2585     ierr   = MPI_Scan(&n,&cend,1,MPIU_INT,MPI_SUM,comm);CHKERRQ(ierr);
2586     cstart = cend - n;
2587   } else {
2588     cstart = rstart;
2589     cend   = rend;
2590     n      = cend - cstart;
2591   }
2592 
2593   /* loop over local rows, determining number of off diagonal entries */
2594   ierr = PetscMemzero(offlens,m*sizeof(PetscInt));CHKERRQ(ierr);
2595   jj = 0;
2596   for (i=0; i<m; i++) {
2597     for (j=0; j<ourlens[i]; j++) {
2598       if (mycols[jj] < cstart || mycols[jj] >= cend) offlens[i]++;
2599       jj++;
2600     }
2601   }
2602 
2603   /* create our matrix */
2604   for (i=0; i<m; i++) {
2605     ourlens[i] -= offlens[i];
2606   }
2607   ierr = MatCreate(comm,&A);CHKERRQ(ierr);
2608   ierr = MatSetSizes(A,m,n,M,N);CHKERRQ(ierr);
2609   ierr = MatSetType(A,type);CHKERRQ(ierr);
2610   ierr = MatMPIAIJSetPreallocation(A,0,ourlens,0,offlens);CHKERRQ(ierr);
2611 
2612   for (i=0; i<m; i++) {
2613     ourlens[i] += offlens[i];
2614   }
2615 
2616   if (!rank) {
2617     ierr = PetscMalloc((maxnz+1)*sizeof(PetscScalar),&vals);CHKERRQ(ierr);
2618 
2619     /* read in my part of the matrix numerical values  */
2620     nz   = procsnz[0];
2621     ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
2622 
2623     /* insert into matrix */
2624     jj      = rstart;
2625     smycols = mycols;
2626     svals   = vals;
2627     for (i=0; i<m; i++) {
2628       ierr = MatSetValues_MPIAIJ(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2629       smycols += ourlens[i];
2630       svals   += ourlens[i];
2631       jj++;
2632     }
2633 
2634     /* read in other processors and ship out */
2635     for (i=1; i<size; i++) {
2636       nz   = procsnz[i];
2637       ierr = PetscBinaryRead(fd,vals,nz,PETSC_SCALAR);CHKERRQ(ierr);
2638       ierr = MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);CHKERRQ(ierr);
2639     }
2640     ierr = PetscFree(procsnz);CHKERRQ(ierr);
2641   } else {
2642     /* receive numeric values */
2643     ierr = PetscMalloc((nz+1)*sizeof(PetscScalar),&vals);CHKERRQ(ierr);
2644 
2645     /* receive message of values*/
2646     ierr = MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);CHKERRQ(ierr);
2647     ierr = MPI_Get_count(&status,MPIU_SCALAR,&maxnz);CHKERRQ(ierr);
2648     if (maxnz != nz) SETERRQ(PETSC_ERR_FILE_UNEXPECTED,"something is wrong with file");
2649 
2650     /* insert into matrix */
2651     jj      = rstart;
2652     smycols = mycols;
2653     svals   = vals;
2654     for (i=0; i<m; i++) {
2655       ierr     = MatSetValues_MPIAIJ(A,1,&jj,ourlens[i],smycols,svals,INSERT_VALUES);CHKERRQ(ierr);
2656       smycols += ourlens[i];
2657       svals   += ourlens[i];
2658       jj++;
2659     }
2660   }
2661   ierr = PetscFree2(ourlens,offlens);CHKERRQ(ierr);
2662   ierr = PetscFree(vals);CHKERRQ(ierr);
2663   ierr = PetscFree(mycols);CHKERRQ(ierr);
2664   ierr = PetscFree(rowners);CHKERRQ(ierr);
2665 
2666   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2667   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2668   *newmat = A;
2669   PetscFunctionReturn(0);
2670 }
2671 
2672 #undef __FUNCT__
2673 #define __FUNCT__ "MatGetSubMatrix_MPIAIJ"
2674 /*
2675     Not great since it makes two copies of the submatrix, first an SeqAIJ
2676   in local and then by concatenating the local matrices the end result.
2677   Writing it directly would be much like MatGetSubMatrices_MPIAIJ()
2678 */
2679 PetscErrorCode MatGetSubMatrix_MPIAIJ(Mat mat,IS isrow,IS iscol,PetscInt csize,MatReuse call,Mat *newmat)
2680 {
2681   PetscErrorCode ierr;
2682   PetscMPIInt    rank,size;
2683   PetscInt       i,m,n,rstart,row,rend,nz,*cwork,j;
2684   PetscInt       *ii,*jj,nlocal,*dlens,*olens,dlen,olen,jend,mglobal;
2685   Mat            *local,M,Mreuse;
2686   PetscScalar    *vwork,*aa;
2687   MPI_Comm       comm = mat->comm;
2688   Mat_SeqAIJ     *aij;
2689 
2690 
2691   PetscFunctionBegin;
2692   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
2693   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
2694 
2695   if (call ==  MAT_REUSE_MATRIX) {
2696     ierr = PetscObjectQuery((PetscObject)*newmat,"SubMatrix",(PetscObject *)&Mreuse);CHKERRQ(ierr);
2697     if (!Mreuse) SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"Submatrix passed in was not used before, cannot reuse");
2698     local = &Mreuse;
2699     ierr  = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_REUSE_MATRIX,&local);CHKERRQ(ierr);
2700   } else {
2701     ierr   = MatGetSubMatrices(mat,1,&isrow,&iscol,MAT_INITIAL_MATRIX,&local);CHKERRQ(ierr);
2702     Mreuse = *local;
2703     ierr   = PetscFree(local);CHKERRQ(ierr);
2704   }
2705 
2706   /*
2707       m - number of local rows
2708       n - number of columns (same on all processors)
2709       rstart - first row in new global matrix generated
2710   */
2711   ierr = MatGetSize(Mreuse,&m,&n);CHKERRQ(ierr);
2712   if (call == MAT_INITIAL_MATRIX) {
2713     aij = (Mat_SeqAIJ*)(Mreuse)->data;
2714     ii  = aij->i;
2715     jj  = aij->j;
2716 
2717     /*
2718         Determine the number of non-zeros in the diagonal and off-diagonal
2719         portions of the matrix in order to do correct preallocation
2720     */
2721 
2722     /* first get start and end of "diagonal" columns */
2723     if (csize == PETSC_DECIDE) {
2724       ierr = ISGetSize(isrow,&mglobal);CHKERRQ(ierr);
2725       if (mglobal == n) { /* square matrix */
2726 	nlocal = m;
2727       } else {
2728         nlocal = n/size + ((n % size) > rank);
2729       }
2730     } else {
2731       nlocal = csize;
2732     }
2733     ierr   = MPI_Scan(&nlocal,&rend,1,MPIU_INT,MPI_SUM,comm);CHKERRQ(ierr);
2734     rstart = rend - nlocal;
2735     if (rank == size - 1 && rend != n) {
2736       SETERRQ2(PETSC_ERR_ARG_SIZ,"Local column sizes %D do not add up to total number of columns %D",rend,n);
2737     }
2738 
2739     /* next, compute all the lengths */
2740     ierr  = PetscMalloc((2*m+1)*sizeof(PetscInt),&dlens);CHKERRQ(ierr);
2741     olens = dlens + m;
2742     for (i=0; i<m; i++) {
2743       jend = ii[i+1] - ii[i];
2744       olen = 0;
2745       dlen = 0;
2746       for (j=0; j<jend; j++) {
2747         if (*jj < rstart || *jj >= rend) olen++;
2748         else dlen++;
2749         jj++;
2750       }
2751       olens[i] = olen;
2752       dlens[i] = dlen;
2753     }
2754     ierr = MatCreate(comm,&M);CHKERRQ(ierr);
2755     ierr = MatSetSizes(M,m,nlocal,PETSC_DECIDE,n);CHKERRQ(ierr);
2756     ierr = MatSetType(M,mat->type_name);CHKERRQ(ierr);
2757     ierr = MatMPIAIJSetPreallocation(M,0,dlens,0,olens);CHKERRQ(ierr);
2758     ierr = PetscFree(dlens);CHKERRQ(ierr);
2759   } else {
2760     PetscInt ml,nl;
2761 
2762     M = *newmat;
2763     ierr = MatGetLocalSize(M,&ml,&nl);CHKERRQ(ierr);
2764     if (ml != m) SETERRQ(PETSC_ERR_ARG_SIZ,"Previous matrix must be same size/layout as request");
2765     ierr = MatZeroEntries(M);CHKERRQ(ierr);
2766     /*
2767          The next two lines are needed so we may call MatSetValues_MPIAIJ() below directly,
2768        rather than the slower MatSetValues().
2769     */
2770     M->was_assembled = PETSC_TRUE;
2771     M->assembled     = PETSC_FALSE;
2772   }
2773   ierr = MatGetOwnershipRange(M,&rstart,&rend);CHKERRQ(ierr);
2774   aij = (Mat_SeqAIJ*)(Mreuse)->data;
2775   ii  = aij->i;
2776   jj  = aij->j;
2777   aa  = aij->a;
2778   for (i=0; i<m; i++) {
2779     row   = rstart + i;
2780     nz    = ii[i+1] - ii[i];
2781     cwork = jj;     jj += nz;
2782     vwork = aa;     aa += nz;
2783     ierr = MatSetValues_MPIAIJ(M,1,&row,nz,cwork,vwork,INSERT_VALUES);CHKERRQ(ierr);
2784   }
2785 
2786   ierr = MatAssemblyBegin(M,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2787   ierr = MatAssemblyEnd(M,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2788   *newmat = M;
2789 
2790   /* save submatrix used in processor for next request */
2791   if (call ==  MAT_INITIAL_MATRIX) {
2792     ierr = PetscObjectCompose((PetscObject)M,"SubMatrix",(PetscObject)Mreuse);CHKERRQ(ierr);
2793     ierr = PetscObjectDereference((PetscObject)Mreuse);CHKERRQ(ierr);
2794   }
2795 
2796   PetscFunctionReturn(0);
2797 }
2798 
2799 EXTERN_C_BEGIN
2800 #undef __FUNCT__
2801 #define __FUNCT__ "MatMPIAIJSetPreallocationCSR_MPIAIJ"
2802 PetscErrorCode PETSCMAT_DLLEXPORT MatMPIAIJSetPreallocationCSR_MPIAIJ(Mat B,const PetscInt Ii[],const PetscInt J[],const PetscScalar v[])
2803 {
2804   PetscInt       m,cstart, cend,j,nnz,i,d;
2805   PetscInt       *d_nnz,*o_nnz,nnz_max = 0,rstart,ii;
2806   const PetscInt *JJ;
2807   PetscScalar    *values;
2808   PetscErrorCode ierr;
2809 
2810   PetscFunctionBegin;
2811   if (Ii[0]) SETERRQ1(PETSC_ERR_ARG_OUTOFRANGE,"Ii[0] must be 0 it is %D",Ii[0]);
2812 
2813   B->rmap.bs = B->cmap.bs = 1;
2814   ierr = PetscMapSetUp(&B->rmap);CHKERRQ(ierr);
2815   ierr = PetscMapSetUp(&B->cmap);CHKERRQ(ierr);
2816   m      = B->rmap.n;
2817   cstart = B->cmap.rstart;
2818   cend   = B->cmap.rend;
2819   rstart = B->rmap.rstart;
2820 
2821   ierr  = PetscMalloc((2*m+1)*sizeof(PetscInt),&d_nnz);CHKERRQ(ierr);
2822   o_nnz = d_nnz + m;
2823 
2824   for (i=0; i<m; i++) {
2825     nnz     = Ii[i+1]- Ii[i];
2826     JJ      = J + Ii[i];
2827     nnz_max = PetscMax(nnz_max,nnz);
2828     if (nnz < 0) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Local row %D has a negative %D number of columns",i,nnz);
2829     for (j=0; j<nnz; j++) {
2830       if (*JJ >= cstart) break;
2831       JJ++;
2832     }
2833     d = 0;
2834     for (; j<nnz; j++) {
2835       if (*JJ++ >= cend) break;
2836       d++;
2837     }
2838     d_nnz[i] = d;
2839     o_nnz[i] = nnz - d;
2840   }
2841   ierr = MatMPIAIJSetPreallocation(B,0,d_nnz,0,o_nnz);CHKERRQ(ierr);
2842   ierr = PetscFree(d_nnz);CHKERRQ(ierr);
2843 
2844   if (v) values = (PetscScalar*)v;
2845   else {
2846     ierr = PetscMalloc((nnz_max+1)*sizeof(PetscScalar),&values);CHKERRQ(ierr);
2847     ierr = PetscMemzero(values,nnz_max*sizeof(PetscScalar));CHKERRQ(ierr);
2848   }
2849 
2850   for (i=0; i<m; i++) {
2851     ii   = i + rstart;
2852     nnz  = Ii[i+1]- Ii[i];
2853     ierr = MatSetValues_MPIAIJ(B,1,&ii,nnz,J+Ii[i],values+(v ? Ii[i] : 0),INSERT_VALUES);CHKERRQ(ierr);
2854   }
2855   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2856   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
2857 
2858   if (!v) {
2859     ierr = PetscFree(values);CHKERRQ(ierr);
2860   }
2861   PetscFunctionReturn(0);
2862 }
2863 EXTERN_C_END
2864 
2865 #undef __FUNCT__
2866 #define __FUNCT__ "MatMPIAIJSetPreallocationCSR"
2867 /*@
2868    MatMPIAIJSetPreallocationCSR - Allocates memory for a sparse parallel matrix in AIJ format
2869    (the default parallel PETSc format).
2870 
2871    Collective on MPI_Comm
2872 
2873    Input Parameters:
2874 +  B - the matrix
2875 .  i - the indices into j for the start of each local row (starts with zero)
2876 .  j - the column indices for each local row (starts with zero) these must be sorted for each row
2877 -  v - optional values in the matrix
2878 
2879    Level: developer
2880 
2881    Notes: this actually copies the values from i[], j[], and a[] to put them into PETSc's internal
2882      storage format. Thus changing the values in a[] after this call will not effect the matrix values.
2883 
2884 .keywords: matrix, aij, compressed row, sparse, parallel
2885 
2886 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues(), MatMPIAIJSetPreallocation(), MatCreateMPIAIJ(), MPIAIJ,
2887           MatCreateSeqAIJWithArrays(), MatCreateMPIAIJWithSplitArrays()
2888 @*/
2889 PetscErrorCode PETSCMAT_DLLEXPORT MatMPIAIJSetPreallocationCSR(Mat B,const PetscInt i[],const PetscInt j[], const PetscScalar v[])
2890 {
2891   PetscErrorCode ierr,(*f)(Mat,const PetscInt[],const PetscInt[],const PetscScalar[]);
2892 
2893   PetscFunctionBegin;
2894   ierr = PetscObjectQueryFunction((PetscObject)B,"MatMPIAIJSetPreallocationCSR_C",(void (**)(void))&f);CHKERRQ(ierr);
2895   if (f) {
2896     ierr = (*f)(B,i,j,v);CHKERRQ(ierr);
2897   }
2898   PetscFunctionReturn(0);
2899 }
2900 
2901 #undef __FUNCT__
2902 #define __FUNCT__ "MatMPIAIJSetPreallocation"
2903 /*@C
2904    MatMPIAIJSetPreallocation - Preallocates memory for a sparse parallel matrix in AIJ format
2905    (the default parallel PETSc format).  For good matrix assembly performance
2906    the user should preallocate the matrix storage by setting the parameters
2907    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
2908    performance can be increased by more than a factor of 50.
2909 
2910    Collective on MPI_Comm
2911 
2912    Input Parameters:
2913 +  A - the matrix
2914 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
2915            (same value is used for all local rows)
2916 .  d_nnz - array containing the number of nonzeros in the various rows of the
2917            DIAGONAL portion of the local submatrix (possibly different for each row)
2918            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
2919            The size of this array is equal to the number of local rows, i.e 'm'.
2920            You must leave room for the diagonal entry even if it is zero.
2921 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
2922            submatrix (same value is used for all local rows).
2923 -  o_nnz - array containing the number of nonzeros in the various rows of the
2924            OFF-DIAGONAL portion of the local submatrix (possibly different for
2925            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
2926            structure. The size of this array is equal to the number
2927            of local rows, i.e 'm'.
2928 
2929    If the *_nnz parameter is given then the *_nz parameter is ignored
2930 
2931    The AIJ format (also called the Yale sparse matrix format or
2932    compressed row storage (CSR)), is fully compatible with standard Fortran 77
2933    storage.  The stored row and column indices begin with zero.  See the users manual for details.
2934 
2935    The parallel matrix is partitioned such that the first m0 rows belong to
2936    process 0, the next m1 rows belong to process 1, the next m2 rows belong
2937    to process 2 etc.. where m0,m1,m2... are the input parameter 'm'.
2938 
2939    The DIAGONAL portion of the local submatrix of a processor can be defined
2940    as the submatrix which is obtained by extraction the part corresponding
2941    to the rows r1-r2 and columns r1-r2 of the global matrix, where r1 is the
2942    first row that belongs to the processor, and r2 is the last row belonging
2943    to the this processor. This is a square mxm matrix. The remaining portion
2944    of the local submatrix (mxN) constitute the OFF-DIAGONAL portion.
2945 
2946    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
2947 
2948    Example usage:
2949 
2950    Consider the following 8x8 matrix with 34 non-zero values, that is
2951    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
2952    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
2953    as follows:
2954 
2955 .vb
2956             1  2  0  |  0  3  0  |  0  4
2957     Proc0   0  5  6  |  7  0  0  |  8  0
2958             9  0 10  | 11  0  0  | 12  0
2959     -------------------------------------
2960            13  0 14  | 15 16 17  |  0  0
2961     Proc1   0 18  0  | 19 20 21  |  0  0
2962             0  0  0  | 22 23  0  | 24  0
2963     -------------------------------------
2964     Proc2  25 26 27  |  0  0 28  | 29  0
2965            30  0  0  | 31 32 33  |  0 34
2966 .ve
2967 
2968    This can be represented as a collection of submatrices as:
2969 
2970 .vb
2971       A B C
2972       D E F
2973       G H I
2974 .ve
2975 
2976    Where the submatrices A,B,C are owned by proc0, D,E,F are
2977    owned by proc1, G,H,I are owned by proc2.
2978 
2979    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
2980    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
2981    The 'M','N' parameters are 8,8, and have the same values on all procs.
2982 
2983    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
2984    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
2985    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
2986    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
2987    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
2988    matrix, ans [DF] as another SeqAIJ matrix.
2989 
2990    When d_nz, o_nz parameters are specified, d_nz storage elements are
2991    allocated for every row of the local diagonal submatrix, and o_nz
2992    storage locations are allocated for every row of the OFF-DIAGONAL submat.
2993    One way to choose d_nz and o_nz is to use the max nonzerors per local
2994    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
2995    In this case, the values of d_nz,o_nz are:
2996 .vb
2997      proc0 : dnz = 2, o_nz = 2
2998      proc1 : dnz = 3, o_nz = 2
2999      proc2 : dnz = 1, o_nz = 4
3000 .ve
3001    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
3002    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
3003    for proc3. i.e we are using 12+15+10=37 storage locations to store
3004    34 values.
3005 
3006    When d_nnz, o_nnz parameters are specified, the storage is specified
3007    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
3008    In the above case the values for d_nnz,o_nnz are:
3009 .vb
3010      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
3011      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
3012      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
3013 .ve
3014    Here the space allocated is sum of all the above values i.e 34, and
3015    hence pre-allocation is perfect.
3016 
3017    Level: intermediate
3018 
3019 .keywords: matrix, aij, compressed row, sparse, parallel
3020 
3021 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues(), MatCreateMPIAIJ(), MatMPIAIJSetPreallocationCSR(),
3022           MPIAIJ
3023 @*/
3024 PetscErrorCode PETSCMAT_DLLEXPORT MatMPIAIJSetPreallocation(Mat B,PetscInt d_nz,const PetscInt d_nnz[],PetscInt o_nz,const PetscInt o_nnz[])
3025 {
3026   PetscErrorCode ierr,(*f)(Mat,PetscInt,const PetscInt[],PetscInt,const PetscInt[]);
3027 
3028   PetscFunctionBegin;
3029   ierr = PetscObjectQueryFunction((PetscObject)B,"MatMPIAIJSetPreallocation_C",(void (**)(void))&f);CHKERRQ(ierr);
3030   if (f) {
3031     ierr = (*f)(B,d_nz,d_nnz,o_nz,o_nnz);CHKERRQ(ierr);
3032   }
3033   PetscFunctionReturn(0);
3034 }
3035 
3036 #undef __FUNCT__
3037 #define __FUNCT__ "MatCreateMPIAIJWithArrays"
3038 /*@
3039      MatCreateMPIAIJWithArrays - creates a MPI AIJ matrix using arrays that contain in standard
3040          CSR format the local rows.
3041 
3042    Collective on MPI_Comm
3043 
3044    Input Parameters:
3045 +  comm - MPI communicator
3046 .  m - number of local rows (Cannot be PETSC_DECIDE)
3047 .  n - This value should be the same as the local size used in creating the
3048        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
3049        calculated if N is given) For square matrices n is almost always m.
3050 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
3051 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
3052 .   i - row indices
3053 .   j - column indices
3054 -   a - matrix values
3055 
3056    Output Parameter:
3057 .   mat - the matrix
3058 
3059    Level: intermediate
3060 
3061    Notes:
3062        The i, j, and a arrays ARE copied by this routine into the internal format used by PETSc;
3063      thus you CANNOT change the matrix entries by changing the values of a[] after you have
3064      called this routine. Use MatCreateMPIAIJWithSplitArrays() to avoid needing to copy the arrays.
3065 
3066        The i and j indices are 0 based
3067 
3068 .keywords: matrix, aij, compressed row, sparse, parallel
3069 
3070 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues(), MatMPIAIJSetPreallocation(), MatMPIAIJSetPreallocationCSR(),
3071           MPIAIJ, MatCreateMPIAIJ(), MatCreateMPIAIJWithSplitArrays()
3072 @*/
3073 PetscErrorCode PETSCMAT_DLLEXPORT MatCreateMPIAIJWithArrays(MPI_Comm comm,PetscInt m,PetscInt n,PetscInt M,PetscInt N,const PetscInt i[],const PetscInt j[],const PetscScalar a[],Mat *mat)
3074 {
3075   PetscErrorCode ierr;
3076 
3077  PetscFunctionBegin;
3078   if (i[0]) {
3079     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"i (row indices) must start with 0");
3080   }
3081   if (m < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"local number of rows (m) cannot be PETSC_DECIDE, or negative");
3082   ierr = MatCreate(comm,mat);CHKERRQ(ierr);
3083   ierr = MatSetSizes(*mat,m,n,M,N);CHKERRQ(ierr);
3084   ierr = MatSetType(*mat,MATMPIAIJ);CHKERRQ(ierr);
3085   ierr = MatMPIAIJSetPreallocationCSR(*mat,i,j,a);CHKERRQ(ierr);
3086   PetscFunctionReturn(0);
3087 }
3088 
3089 #undef __FUNCT__
3090 #define __FUNCT__ "MatCreateMPIAIJ"
3091 /*@C
3092    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
3093    (the default parallel PETSc format).  For good matrix assembly performance
3094    the user should preallocate the matrix storage by setting the parameters
3095    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
3096    performance can be increased by more than a factor of 50.
3097 
3098    Collective on MPI_Comm
3099 
3100    Input Parameters:
3101 +  comm - MPI communicator
3102 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
3103            This value should be the same as the local size used in creating the
3104            y vector for the matrix-vector product y = Ax.
3105 .  n - This value should be the same as the local size used in creating the
3106        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
3107        calculated if N is given) For square matrices n is almost always m.
3108 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
3109 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
3110 .  d_nz  - number of nonzeros per row in DIAGONAL portion of local submatrix
3111            (same value is used for all local rows)
3112 .  d_nnz - array containing the number of nonzeros in the various rows of the
3113            DIAGONAL portion of the local submatrix (possibly different for each row)
3114            or PETSC_NULL, if d_nz is used to specify the nonzero structure.
3115            The size of this array is equal to the number of local rows, i.e 'm'.
3116            You must leave room for the diagonal entry even if it is zero.
3117 .  o_nz  - number of nonzeros per row in the OFF-DIAGONAL portion of local
3118            submatrix (same value is used for all local rows).
3119 -  o_nnz - array containing the number of nonzeros in the various rows of the
3120            OFF-DIAGONAL portion of the local submatrix (possibly different for
3121            each row) or PETSC_NULL, if o_nz is used to specify the nonzero
3122            structure. The size of this array is equal to the number
3123            of local rows, i.e 'm'.
3124 
3125    Output Parameter:
3126 .  A - the matrix
3127 
3128    It is recommended that one use the MatCreate(), MatSetType() and/or MatSetFromOptions(),
3129    MatXXXXSetPreallocation() paradgm instead of this routine directly. This is definitely
3130    true if you plan to use the external direct solvers such as SuperLU, MUMPS or Spooles.
3131    [MatXXXXSetPreallocation() is, for example, MatSeqAIJSetPreallocation]
3132 
3133    Notes:
3134    If the *_nnz parameter is given then the *_nz parameter is ignored
3135 
3136    m,n,M,N parameters specify the size of the matrix, and its partitioning across
3137    processors, while d_nz,d_nnz,o_nz,o_nnz parameters specify the approximate
3138    storage requirements for this matrix.
3139 
3140    If PETSC_DECIDE or  PETSC_DETERMINE is used for a particular argument on one
3141    processor than it must be used on all processors that share the object for
3142    that argument.
3143 
3144    The user MUST specify either the local or global matrix dimensions
3145    (possibly both).
3146 
3147    The parallel matrix is partitioned across processors such that the
3148    first m0 rows belong to process 0, the next m1 rows belong to
3149    process 1, the next m2 rows belong to process 2 etc.. where
3150    m0,m1,m2,.. are the input parameter 'm'. i.e each processor stores
3151    values corresponding to [m x N] submatrix.
3152 
3153    The columns are logically partitioned with the n0 columns belonging
3154    to 0th partition, the next n1 columns belonging to the next
3155    partition etc.. where n0,n1,n2... are the the input parameter 'n'.
3156 
3157    The DIAGONAL portion of the local submatrix on any given processor
3158    is the submatrix corresponding to the rows and columns m,n
3159    corresponding to the given processor. i.e diagonal matrix on
3160    process 0 is [m0 x n0], diagonal matrix on process 1 is [m1 x n1]
3161    etc. The remaining portion of the local submatrix [m x (N-n)]
3162    constitute the OFF-DIAGONAL portion. The example below better
3163    illustrates this concept.
3164 
3165    For a square global matrix we define each processor's diagonal portion
3166    to be its local rows and the corresponding columns (a square submatrix);
3167    each processor's off-diagonal portion encompasses the remainder of the
3168    local matrix (a rectangular submatrix).
3169 
3170    If o_nnz, d_nnz are specified, then o_nz, and d_nz are ignored.
3171 
3172    When calling this routine with a single process communicator, a matrix of
3173    type SEQAIJ is returned.  If a matrix of type MPIAIJ is desired for this
3174    type of communicator, use the construction mechanism:
3175      MatCreate(...,&A); MatSetType(A,MPIAIJ); MatMPIAIJSetPreallocation(A,...);
3176 
3177    By default, this format uses inodes (identical nodes) when possible.
3178    We search for consecutive rows with the same nonzero structure, thereby
3179    reusing matrix information to achieve increased efficiency.
3180 
3181    Options Database Keys:
3182 +  -mat_no_inode  - Do not use inodes
3183 .  -mat_inode_limit <limit> - Sets inode limit (max limit=5)
3184 -  -mat_aij_oneindex - Internally use indexing starting at 1
3185         rather than 0.  Note that when calling MatSetValues(),
3186         the user still MUST index entries starting at 0!
3187 
3188 
3189    Example usage:
3190 
3191    Consider the following 8x8 matrix with 34 non-zero values, that is
3192    assembled across 3 processors. Lets assume that proc0 owns 3 rows,
3193    proc1 owns 3 rows, proc2 owns 2 rows. This division can be shown
3194    as follows:
3195 
3196 .vb
3197             1  2  0  |  0  3  0  |  0  4
3198     Proc0   0  5  6  |  7  0  0  |  8  0
3199             9  0 10  | 11  0  0  | 12  0
3200     -------------------------------------
3201            13  0 14  | 15 16 17  |  0  0
3202     Proc1   0 18  0  | 19 20 21  |  0  0
3203             0  0  0  | 22 23  0  | 24  0
3204     -------------------------------------
3205     Proc2  25 26 27  |  0  0 28  | 29  0
3206            30  0  0  | 31 32 33  |  0 34
3207 .ve
3208 
3209    This can be represented as a collection of submatrices as:
3210 
3211 .vb
3212       A B C
3213       D E F
3214       G H I
3215 .ve
3216 
3217    Where the submatrices A,B,C are owned by proc0, D,E,F are
3218    owned by proc1, G,H,I are owned by proc2.
3219 
3220    The 'm' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
3221    The 'n' parameters for proc0,proc1,proc2 are 3,3,2 respectively.
3222    The 'M','N' parameters are 8,8, and have the same values on all procs.
3223 
3224    The DIAGONAL submatrices corresponding to proc0,proc1,proc2 are
3225    submatrices [A], [E], [I] respectively. The OFF-DIAGONAL submatrices
3226    corresponding to proc0,proc1,proc2 are [BC], [DF], [GH] respectively.
3227    Internally, each processor stores the DIAGONAL part, and the OFF-DIAGONAL
3228    part as SeqAIJ matrices. for eg: proc1 will store [E] as a SeqAIJ
3229    matrix, ans [DF] as another SeqAIJ matrix.
3230 
3231    When d_nz, o_nz parameters are specified, d_nz storage elements are
3232    allocated for every row of the local diagonal submatrix, and o_nz
3233    storage locations are allocated for every row of the OFF-DIAGONAL submat.
3234    One way to choose d_nz and o_nz is to use the max nonzerors per local
3235    rows for each of the local DIAGONAL, and the OFF-DIAGONAL submatrices.
3236    In this case, the values of d_nz,o_nz are:
3237 .vb
3238      proc0 : dnz = 2, o_nz = 2
3239      proc1 : dnz = 3, o_nz = 2
3240      proc2 : dnz = 1, o_nz = 4
3241 .ve
3242    We are allocating m*(d_nz+o_nz) storage locations for every proc. This
3243    translates to 3*(2+2)=12 for proc0, 3*(3+2)=15 for proc1, 2*(1+4)=10
3244    for proc3. i.e we are using 12+15+10=37 storage locations to store
3245    34 values.
3246 
3247    When d_nnz, o_nnz parameters are specified, the storage is specified
3248    for every row, coresponding to both DIAGONAL and OFF-DIAGONAL submatrices.
3249    In the above case the values for d_nnz,o_nnz are:
3250 .vb
3251      proc0: d_nnz = [2,2,2] and o_nnz = [2,2,2]
3252      proc1: d_nnz = [3,3,2] and o_nnz = [2,1,1]
3253      proc2: d_nnz = [1,1]   and o_nnz = [4,4]
3254 .ve
3255    Here the space allocated is sum of all the above values i.e 34, and
3256    hence pre-allocation is perfect.
3257 
3258    Level: intermediate
3259 
3260 .keywords: matrix, aij, compressed row, sparse, parallel
3261 
3262 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues(), MatMPIAIJSetPreallocation(), MatMPIAIJSetPreallocationCSR(),
3263           MPIAIJ, MatCreateMPIAIJWithArrays()
3264 @*/
3265 PetscErrorCode PETSCMAT_DLLEXPORT MatCreateMPIAIJ(MPI_Comm comm,PetscInt m,PetscInt n,PetscInt M,PetscInt N,PetscInt d_nz,const PetscInt d_nnz[],PetscInt o_nz,const PetscInt o_nnz[],Mat *A)
3266 {
3267   PetscErrorCode ierr;
3268   PetscMPIInt    size;
3269 
3270   PetscFunctionBegin;
3271   ierr = MatCreate(comm,A);CHKERRQ(ierr);
3272   ierr = MatSetSizes(*A,m,n,M,N);CHKERRQ(ierr);
3273   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
3274   if (size > 1) {
3275     ierr = MatSetType(*A,MATMPIAIJ);CHKERRQ(ierr);
3276     ierr = MatMPIAIJSetPreallocation(*A,d_nz,d_nnz,o_nz,o_nnz);CHKERRQ(ierr);
3277   } else {
3278     ierr = MatSetType(*A,MATSEQAIJ);CHKERRQ(ierr);
3279     ierr = MatSeqAIJSetPreallocation(*A,d_nz,d_nnz);CHKERRQ(ierr);
3280   }
3281   PetscFunctionReturn(0);
3282 }
3283 
3284 #undef __FUNCT__
3285 #define __FUNCT__ "MatMPIAIJGetSeqAIJ"
3286 PetscErrorCode PETSCMAT_DLLEXPORT MatMPIAIJGetSeqAIJ(Mat A,Mat *Ad,Mat *Ao,PetscInt *colmap[])
3287 {
3288   Mat_MPIAIJ *a = (Mat_MPIAIJ *)A->data;
3289 
3290   PetscFunctionBegin;
3291   *Ad     = a->A;
3292   *Ao     = a->B;
3293   *colmap = a->garray;
3294   PetscFunctionReturn(0);
3295 }
3296 
3297 #undef __FUNCT__
3298 #define __FUNCT__ "MatSetColoring_MPIAIJ"
3299 PetscErrorCode MatSetColoring_MPIAIJ(Mat A,ISColoring coloring)
3300 {
3301   PetscErrorCode ierr;
3302   PetscInt       i;
3303   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
3304 
3305   PetscFunctionBegin;
3306   if (coloring->ctype == IS_COLORING_GLOBAL) {
3307     ISColoringValue *allcolors,*colors;
3308     ISColoring      ocoloring;
3309 
3310     /* set coloring for diagonal portion */
3311     ierr = MatSetColoring_SeqAIJ(a->A,coloring);CHKERRQ(ierr);
3312 
3313     /* set coloring for off-diagonal portion */
3314     ierr = ISAllGatherColors(A->comm,coloring->n,coloring->colors,PETSC_NULL,&allcolors);CHKERRQ(ierr);
3315     ierr = PetscMalloc((a->B->cmap.n+1)*sizeof(ISColoringValue),&colors);CHKERRQ(ierr);
3316     for (i=0; i<a->B->cmap.n; i++) {
3317       colors[i] = allcolors[a->garray[i]];
3318     }
3319     ierr = PetscFree(allcolors);CHKERRQ(ierr);
3320     ierr = ISColoringCreate(MPI_COMM_SELF,coloring->n,a->B->cmap.n,colors,&ocoloring);CHKERRQ(ierr);
3321     ierr = MatSetColoring_SeqAIJ(a->B,ocoloring);CHKERRQ(ierr);
3322     ierr = ISColoringDestroy(ocoloring);CHKERRQ(ierr);
3323   } else if (coloring->ctype == IS_COLORING_GHOSTED) {
3324     ISColoringValue *colors;
3325     PetscInt        *larray;
3326     ISColoring      ocoloring;
3327 
3328     /* set coloring for diagonal portion */
3329     ierr = PetscMalloc((a->A->cmap.n+1)*sizeof(PetscInt),&larray);CHKERRQ(ierr);
3330     for (i=0; i<a->A->cmap.n; i++) {
3331       larray[i] = i + A->cmap.rstart;
3332     }
3333     ierr = ISGlobalToLocalMappingApply(A->mapping,IS_GTOLM_MASK,a->A->cmap.n,larray,PETSC_NULL,larray);CHKERRQ(ierr);
3334     ierr = PetscMalloc((a->A->cmap.n+1)*sizeof(ISColoringValue),&colors);CHKERRQ(ierr);
3335     for (i=0; i<a->A->cmap.n; i++) {
3336       colors[i] = coloring->colors[larray[i]];
3337     }
3338     ierr = PetscFree(larray);CHKERRQ(ierr);
3339     ierr = ISColoringCreate(PETSC_COMM_SELF,coloring->n,a->A->cmap.n,colors,&ocoloring);CHKERRQ(ierr);
3340     ierr = MatSetColoring_SeqAIJ(a->A,ocoloring);CHKERRQ(ierr);
3341     ierr = ISColoringDestroy(ocoloring);CHKERRQ(ierr);
3342 
3343     /* set coloring for off-diagonal portion */
3344     ierr = PetscMalloc((a->B->cmap.n+1)*sizeof(PetscInt),&larray);CHKERRQ(ierr);
3345     ierr = ISGlobalToLocalMappingApply(A->mapping,IS_GTOLM_MASK,a->B->cmap.n,a->garray,PETSC_NULL,larray);CHKERRQ(ierr);
3346     ierr = PetscMalloc((a->B->cmap.n+1)*sizeof(ISColoringValue),&colors);CHKERRQ(ierr);
3347     for (i=0; i<a->B->cmap.n; i++) {
3348       colors[i] = coloring->colors[larray[i]];
3349     }
3350     ierr = PetscFree(larray);CHKERRQ(ierr);
3351     ierr = ISColoringCreate(MPI_COMM_SELF,coloring->n,a->B->cmap.n,colors,&ocoloring);CHKERRQ(ierr);
3352     ierr = MatSetColoring_SeqAIJ(a->B,ocoloring);CHKERRQ(ierr);
3353     ierr = ISColoringDestroy(ocoloring);CHKERRQ(ierr);
3354   } else {
3355     SETERRQ1(PETSC_ERR_SUP,"No support ISColoringType %d",(int)coloring->ctype);
3356   }
3357 
3358   PetscFunctionReturn(0);
3359 }
3360 
3361 #if defined(PETSC_HAVE_ADIC)
3362 #undef __FUNCT__
3363 #define __FUNCT__ "MatSetValuesAdic_MPIAIJ"
3364 PetscErrorCode MatSetValuesAdic_MPIAIJ(Mat A,void *advalues)
3365 {
3366   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
3367   PetscErrorCode ierr;
3368 
3369   PetscFunctionBegin;
3370   ierr = MatSetValuesAdic_SeqAIJ(a->A,advalues);CHKERRQ(ierr);
3371   ierr = MatSetValuesAdic_SeqAIJ(a->B,advalues);CHKERRQ(ierr);
3372   PetscFunctionReturn(0);
3373 }
3374 #endif
3375 
3376 #undef __FUNCT__
3377 #define __FUNCT__ "MatSetValuesAdifor_MPIAIJ"
3378 PetscErrorCode MatSetValuesAdifor_MPIAIJ(Mat A,PetscInt nl,void *advalues)
3379 {
3380   Mat_MPIAIJ     *a = (Mat_MPIAIJ*)A->data;
3381   PetscErrorCode ierr;
3382 
3383   PetscFunctionBegin;
3384   ierr = MatSetValuesAdifor_SeqAIJ(a->A,nl,advalues);CHKERRQ(ierr);
3385   ierr = MatSetValuesAdifor_SeqAIJ(a->B,nl,advalues);CHKERRQ(ierr);
3386   PetscFunctionReturn(0);
3387 }
3388 
3389 #undef __FUNCT__
3390 #define __FUNCT__ "MatMerge"
3391 /*@
3392       MatMerge - Creates a single large PETSc matrix by concatinating sequential
3393                  matrices from each processor
3394 
3395     Collective on MPI_Comm
3396 
3397    Input Parameters:
3398 +    comm - the communicators the parallel matrix will live on
3399 .    inmat - the input sequential matrices
3400 .    n - number of local columns (or PETSC_DECIDE)
3401 -    scall - either MAT_INITIAL_MATRIX or MAT_REUSE_MATRIX
3402 
3403    Output Parameter:
3404 .    outmat - the parallel matrix generated
3405 
3406     Level: advanced
3407 
3408    Notes: The number of columns of the matrix in EACH processor MUST be the same.
3409 
3410 @*/
3411 PetscErrorCode PETSCMAT_DLLEXPORT MatMerge(MPI_Comm comm,Mat inmat,PetscInt n,MatReuse scall,Mat *outmat)
3412 {
3413   PetscErrorCode ierr;
3414   PetscInt       m,N,i,rstart,nnz,Ii,*dnz,*onz;
3415   PetscInt       *indx;
3416   PetscScalar    *values;
3417 
3418   PetscFunctionBegin;
3419   ierr = MatGetSize(inmat,&m,&N);CHKERRQ(ierr);
3420   if (scall == MAT_INITIAL_MATRIX){
3421     /* count nonzeros in each row, for diagonal and off diagonal portion of matrix */
3422     if (n == PETSC_DECIDE){
3423       ierr = PetscSplitOwnership(comm,&n,&N);CHKERRQ(ierr);
3424     }
3425     ierr = MPI_Scan(&m, &rstart,1,MPIU_INT,MPI_SUM,comm);CHKERRQ(ierr);
3426     rstart -= m;
3427 
3428     ierr = MatPreallocateInitialize(comm,m,n,dnz,onz);CHKERRQ(ierr);
3429     for (i=0;i<m;i++) {
3430       ierr = MatGetRow_SeqAIJ(inmat,i,&nnz,&indx,PETSC_NULL);CHKERRQ(ierr);
3431       ierr = MatPreallocateSet(i+rstart,nnz,indx,dnz,onz);CHKERRQ(ierr);
3432       ierr = MatRestoreRow_SeqAIJ(inmat,i,&nnz,&indx,PETSC_NULL);CHKERRQ(ierr);
3433     }
3434     /* This routine will ONLY return MPIAIJ type matrix */
3435     ierr = MatCreate(comm,outmat);CHKERRQ(ierr);
3436     ierr = MatSetSizes(*outmat,m,n,PETSC_DETERMINE,PETSC_DETERMINE);CHKERRQ(ierr);
3437     ierr = MatSetType(*outmat,MATMPIAIJ);CHKERRQ(ierr);
3438     ierr = MatMPIAIJSetPreallocation(*outmat,0,dnz,0,onz);CHKERRQ(ierr);
3439     ierr = MatPreallocateFinalize(dnz,onz);CHKERRQ(ierr);
3440 
3441   } else if (scall == MAT_REUSE_MATRIX){
3442     ierr = MatGetOwnershipRange(*outmat,&rstart,PETSC_NULL);CHKERRQ(ierr);
3443   } else {
3444     SETERRQ1(PETSC_ERR_ARG_WRONG,"Invalid MatReuse %d",(int)scall);
3445   }
3446 
3447   for (i=0;i<m;i++) {
3448     ierr = MatGetRow_SeqAIJ(inmat,i,&nnz,&indx,&values);CHKERRQ(ierr);
3449     Ii    = i + rstart;
3450     ierr = MatSetValues(*outmat,1,&Ii,nnz,indx,values,INSERT_VALUES);CHKERRQ(ierr);
3451     ierr = MatRestoreRow_SeqAIJ(inmat,i,&nnz,&indx,&values);CHKERRQ(ierr);
3452   }
3453   ierr = MatDestroy(inmat);CHKERRQ(ierr);
3454   ierr = MatAssemblyBegin(*outmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
3455   ierr = MatAssemblyEnd(*outmat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
3456 
3457   PetscFunctionReturn(0);
3458 }
3459 
3460 #undef __FUNCT__
3461 #define __FUNCT__ "MatFileSplit"
3462 PetscErrorCode MatFileSplit(Mat A,char *outfile)
3463 {
3464   PetscErrorCode    ierr;
3465   PetscMPIInt       rank;
3466   PetscInt          m,N,i,rstart,nnz;
3467   size_t            len;
3468   const PetscInt    *indx;
3469   PetscViewer       out;
3470   char              *name;
3471   Mat               B;
3472   const PetscScalar *values;
3473 
3474   PetscFunctionBegin;
3475   ierr = MatGetLocalSize(A,&m,0);CHKERRQ(ierr);
3476   ierr = MatGetSize(A,0,&N);CHKERRQ(ierr);
3477   /* Should this be the type of the diagonal block of A? */
3478   ierr = MatCreate(PETSC_COMM_SELF,&B);CHKERRQ(ierr);
3479   ierr = MatSetSizes(B,m,N,m,N);CHKERRQ(ierr);
3480   ierr = MatSetType(B,MATSEQAIJ);CHKERRQ(ierr);
3481   ierr = MatSeqAIJSetPreallocation(B,0,PETSC_NULL);CHKERRQ(ierr);
3482   ierr = MatGetOwnershipRange(A,&rstart,0);CHKERRQ(ierr);
3483   for (i=0;i<m;i++) {
3484     ierr = MatGetRow(A,i+rstart,&nnz,&indx,&values);CHKERRQ(ierr);
3485     ierr = MatSetValues(B,1,&i,nnz,indx,values,INSERT_VALUES);CHKERRQ(ierr);
3486     ierr = MatRestoreRow(A,i+rstart,&nnz,&indx,&values);CHKERRQ(ierr);
3487   }
3488   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
3489   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
3490 
3491   ierr = MPI_Comm_rank(A->comm,&rank);CHKERRQ(ierr);
3492   ierr = PetscStrlen(outfile,&len);CHKERRQ(ierr);
3493   ierr = PetscMalloc((len+5)*sizeof(char),&name);CHKERRQ(ierr);
3494   sprintf(name,"%s.%d",outfile,rank);
3495   ierr = PetscViewerBinaryOpen(PETSC_COMM_SELF,name,FILE_MODE_APPEND,&out);CHKERRQ(ierr);
3496   ierr = PetscFree(name);
3497   ierr = MatView(B,out);CHKERRQ(ierr);
3498   ierr = PetscViewerDestroy(out);CHKERRQ(ierr);
3499   ierr = MatDestroy(B);CHKERRQ(ierr);
3500   PetscFunctionReturn(0);
3501 }
3502 
3503 EXTERN PetscErrorCode MatDestroy_MPIAIJ(Mat);
3504 #undef __FUNCT__
3505 #define __FUNCT__ "MatDestroy_MPIAIJ_SeqsToMPI"
3506 PetscErrorCode PETSCMAT_DLLEXPORT MatDestroy_MPIAIJ_SeqsToMPI(Mat A)
3507 {
3508   PetscErrorCode       ierr;
3509   Mat_Merge_SeqsToMPI  *merge;
3510   PetscContainer       container;
3511 
3512   PetscFunctionBegin;
3513   ierr = PetscObjectQuery((PetscObject)A,"MatMergeSeqsToMPI",(PetscObject *)&container);CHKERRQ(ierr);
3514   if (container) {
3515     ierr = PetscContainerGetPointer(container,(void **)&merge);CHKERRQ(ierr);
3516     ierr = PetscFree(merge->id_r);CHKERRQ(ierr);
3517     ierr = PetscFree(merge->len_s);CHKERRQ(ierr);
3518     ierr = PetscFree(merge->len_r);CHKERRQ(ierr);
3519     ierr = PetscFree(merge->bi);CHKERRQ(ierr);
3520     ierr = PetscFree(merge->bj);CHKERRQ(ierr);
3521     ierr = PetscFree(merge->buf_ri);CHKERRQ(ierr);
3522     ierr = PetscFree(merge->buf_rj);CHKERRQ(ierr);
3523     ierr = PetscFree(merge->coi);CHKERRQ(ierr);
3524     ierr = PetscFree(merge->coj);CHKERRQ(ierr);
3525     ierr = PetscFree(merge->owners_co);CHKERRQ(ierr);
3526     ierr = PetscFree(merge->rowmap.range);CHKERRQ(ierr);
3527 
3528     ierr = PetscContainerDestroy(container);CHKERRQ(ierr);
3529     ierr = PetscObjectCompose((PetscObject)A,"MatMergeSeqsToMPI",0);CHKERRQ(ierr);
3530   }
3531   ierr = PetscFree(merge);CHKERRQ(ierr);
3532 
3533   ierr = MatDestroy_MPIAIJ(A);CHKERRQ(ierr);
3534   PetscFunctionReturn(0);
3535 }
3536 
3537 #include "src/mat/utils/freespace.h"
3538 #include "petscbt.h"
3539 static PetscEvent logkey_seqstompinum = 0;
3540 #undef __FUNCT__
3541 #define __FUNCT__ "MatMerge_SeqsToMPINumeric"
3542 /*@C
3543       MatMerge_SeqsToMPI - Creates a MPIAIJ matrix by adding sequential
3544                  matrices from each processor
3545 
3546     Collective on MPI_Comm
3547 
3548    Input Parameters:
3549 +    comm - the communicators the parallel matrix will live on
3550 .    seqmat - the input sequential matrices
3551 .    m - number of local rows (or PETSC_DECIDE)
3552 .    n - number of local columns (or PETSC_DECIDE)
3553 -    scall - either MAT_INITIAL_MATRIX or MAT_REUSE_MATRIX
3554 
3555    Output Parameter:
3556 .    mpimat - the parallel matrix generated
3557 
3558     Level: advanced
3559 
3560    Notes:
3561      The dimensions of the sequential matrix in each processor MUST be the same.
3562      The input seqmat is included into the container "Mat_Merge_SeqsToMPI", and will be
3563      destroyed when mpimat is destroyed. Call PetscObjectQuery() to access seqmat.
3564 @*/
3565 PetscErrorCode PETSCMAT_DLLEXPORT MatMerge_SeqsToMPINumeric(Mat seqmat,Mat mpimat)
3566 {
3567   PetscErrorCode       ierr;
3568   MPI_Comm             comm=mpimat->comm;
3569   Mat_SeqAIJ           *a=(Mat_SeqAIJ*)seqmat->data;
3570   PetscMPIInt          size,rank,taga,*len_s;
3571   PetscInt             N=mpimat->cmap.N,i,j,*owners,*ai=a->i,*aj=a->j;
3572   PetscInt             proc,m;
3573   PetscInt             **buf_ri,**buf_rj;
3574   PetscInt             k,anzi,*bj_i,*bi,*bj,arow,bnzi,nextaj;
3575   PetscInt             nrows,**buf_ri_k,**nextrow,**nextai;
3576   MPI_Request          *s_waits,*r_waits;
3577   MPI_Status           *status;
3578   MatScalar            *aa=a->a,**abuf_r,*ba_i;
3579   Mat_Merge_SeqsToMPI  *merge;
3580   PetscContainer       container;
3581 
3582   PetscFunctionBegin;
3583   if (!logkey_seqstompinum) {
3584     ierr = PetscLogEventRegister(&logkey_seqstompinum,"MatMerge_SeqsToMPINumeric",MAT_COOKIE);
3585   }
3586   ierr = PetscLogEventBegin(logkey_seqstompinum,seqmat,0,0,0);CHKERRQ(ierr);
3587 
3588   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
3589   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
3590 
3591   ierr = PetscObjectQuery((PetscObject)mpimat,"MatMergeSeqsToMPI",(PetscObject *)&container);CHKERRQ(ierr);
3592   if (container) {
3593     ierr  = PetscContainerGetPointer(container,(void **)&merge);CHKERRQ(ierr);
3594   }
3595   bi     = merge->bi;
3596   bj     = merge->bj;
3597   buf_ri = merge->buf_ri;
3598   buf_rj = merge->buf_rj;
3599 
3600   ierr   = PetscMalloc(size*sizeof(MPI_Status),&status);CHKERRQ(ierr);
3601   owners = merge->rowmap.range;
3602   len_s  = merge->len_s;
3603 
3604   /* send and recv matrix values */
3605   /*-----------------------------*/
3606   ierr = PetscObjectGetNewTag((PetscObject)mpimat,&taga);CHKERRQ(ierr);
3607   ierr = PetscPostIrecvScalar(comm,taga,merge->nrecv,merge->id_r,merge->len_r,&abuf_r,&r_waits);CHKERRQ(ierr);
3608 
3609   ierr = PetscMalloc((merge->nsend+1)*sizeof(MPI_Request),&s_waits);CHKERRQ(ierr);
3610   for (proc=0,k=0; proc<size; proc++){
3611     if (!len_s[proc]) continue;
3612     i = owners[proc];
3613     ierr = MPI_Isend(aa+ai[i],len_s[proc],MPIU_MATSCALAR,proc,taga,comm,s_waits+k);CHKERRQ(ierr);
3614     k++;
3615   }
3616 
3617   if (merge->nrecv) {ierr = MPI_Waitall(merge->nrecv,r_waits,status);CHKERRQ(ierr);}
3618   if (merge->nsend) {ierr = MPI_Waitall(merge->nsend,s_waits,status);CHKERRQ(ierr);}
3619   ierr = PetscFree(status);CHKERRQ(ierr);
3620 
3621   ierr = PetscFree(s_waits);CHKERRQ(ierr);
3622   ierr = PetscFree(r_waits);CHKERRQ(ierr);
3623 
3624   /* insert mat values of mpimat */
3625   /*----------------------------*/
3626   ierr = PetscMalloc(N*sizeof(MatScalar),&ba_i);CHKERRQ(ierr);
3627   ierr = PetscMalloc((3*merge->nrecv+1)*sizeof(PetscInt**),&buf_ri_k);CHKERRQ(ierr);
3628   nextrow = buf_ri_k + merge->nrecv;
3629   nextai  = nextrow + merge->nrecv;
3630 
3631   for (k=0; k<merge->nrecv; k++){
3632     buf_ri_k[k] = buf_ri[k]; /* beginning of k-th recved i-structure */
3633     nrows = *(buf_ri_k[k]);
3634     nextrow[k]  = buf_ri_k[k]+1;  /* next row number of k-th recved i-structure */
3635     nextai[k]   = buf_ri_k[k] + (nrows + 1);/* poins to the next i-structure of k-th recved i-structure  */
3636   }
3637 
3638   /* set values of ba */
3639   m = merge->rowmap.n;
3640   for (i=0; i<m; i++) {
3641     arow = owners[rank] + i;
3642     bj_i = bj+bi[i];  /* col indices of the i-th row of mpimat */
3643     bnzi = bi[i+1] - bi[i];
3644     ierr = PetscMemzero(ba_i,bnzi*sizeof(MatScalar));CHKERRQ(ierr);
3645 
3646     /* add local non-zero vals of this proc's seqmat into ba */
3647     anzi = ai[arow+1] - ai[arow];
3648     aj   = a->j + ai[arow];
3649     aa   = a->a + ai[arow];
3650     nextaj = 0;
3651     for (j=0; nextaj<anzi; j++){
3652       if (*(bj_i + j) == aj[nextaj]){ /* bcol == acol */
3653         ba_i[j] += aa[nextaj++];
3654       }
3655     }
3656 
3657     /* add received vals into ba */
3658     for (k=0; k<merge->nrecv; k++){ /* k-th received message */
3659       /* i-th row */
3660       if (i == *nextrow[k]) {
3661         anzi = *(nextai[k]+1) - *nextai[k];
3662         aj   = buf_rj[k] + *(nextai[k]);
3663         aa   = abuf_r[k] + *(nextai[k]);
3664         nextaj = 0;
3665         for (j=0; nextaj<anzi; j++){
3666           if (*(bj_i + j) == aj[nextaj]){ /* bcol == acol */
3667             ba_i[j] += aa[nextaj++];
3668           }
3669         }
3670         nextrow[k]++; nextai[k]++;
3671       }
3672     }
3673     ierr = MatSetValues(mpimat,1,&arow,bnzi,bj_i,ba_i,INSERT_VALUES);CHKERRQ(ierr);
3674   }
3675   ierr = MatAssemblyBegin(mpimat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
3676   ierr = MatAssemblyEnd(mpimat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
3677 
3678   ierr = PetscFree(abuf_r);CHKERRQ(ierr);
3679   ierr = PetscFree(ba_i);CHKERRQ(ierr);
3680   ierr = PetscFree(buf_ri_k);CHKERRQ(ierr);
3681   ierr = PetscLogEventEnd(logkey_seqstompinum,seqmat,0,0,0);CHKERRQ(ierr);
3682   PetscFunctionReturn(0);
3683 }
3684 
3685 static PetscEvent logkey_seqstompisym = 0;
3686 #undef __FUNCT__
3687 #define __FUNCT__ "MatMerge_SeqsToMPISymbolic"
3688 PetscErrorCode PETSCMAT_DLLEXPORT MatMerge_SeqsToMPISymbolic(MPI_Comm comm,Mat seqmat,PetscInt m,PetscInt n,Mat *mpimat)
3689 {
3690   PetscErrorCode       ierr;
3691   Mat                  B_mpi;
3692   Mat_SeqAIJ           *a=(Mat_SeqAIJ*)seqmat->data;
3693   PetscMPIInt          size,rank,tagi,tagj,*len_s,*len_si,*len_ri;
3694   PetscInt             **buf_rj,**buf_ri,**buf_ri_k;
3695   PetscInt             M=seqmat->rmap.n,N=seqmat->cmap.n,i,*owners,*ai=a->i,*aj=a->j;
3696   PetscInt             len,proc,*dnz,*onz;
3697   PetscInt             k,anzi,*bi,*bj,*lnk,nlnk,arow,bnzi,nspacedouble=0;
3698   PetscInt             nrows,*buf_s,*buf_si,*buf_si_i,**nextrow,**nextai;
3699   MPI_Request          *si_waits,*sj_waits,*ri_waits,*rj_waits;
3700   MPI_Status           *status;
3701   PetscFreeSpaceList   free_space=PETSC_NULL,current_space=PETSC_NULL;
3702   PetscBT              lnkbt;
3703   Mat_Merge_SeqsToMPI  *merge;
3704   PetscContainer       container;
3705 
3706   PetscFunctionBegin;
3707   if (!logkey_seqstompisym) {
3708     ierr = PetscLogEventRegister(&logkey_seqstompisym,"MatMerge_SeqsToMPISymbolic",MAT_COOKIE);
3709   }
3710   ierr = PetscLogEventBegin(logkey_seqstompisym,seqmat,0,0,0);CHKERRQ(ierr);
3711 
3712   /* make sure it is a PETSc comm */
3713   ierr = PetscCommDuplicate(comm,&comm,PETSC_NULL);CHKERRQ(ierr);
3714   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
3715   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
3716 
3717   ierr = PetscNew(Mat_Merge_SeqsToMPI,&merge);CHKERRQ(ierr);
3718   ierr = PetscMalloc(size*sizeof(MPI_Status),&status);CHKERRQ(ierr);
3719 
3720   /* determine row ownership */
3721   /*---------------------------------------------------------*/
3722   ierr = PetscMapInitialize(comm,&merge->rowmap);CHKERRQ(ierr);
3723   merge->rowmap.n = m;
3724   merge->rowmap.N = M;
3725   merge->rowmap.bs = 1;
3726   ierr = PetscMapSetUp(&merge->rowmap);CHKERRQ(ierr);
3727   ierr = PetscMalloc(size*sizeof(PetscMPIInt),&len_si);CHKERRQ(ierr);
3728   ierr = PetscMalloc(size*sizeof(PetscMPIInt),&merge->len_s);CHKERRQ(ierr);
3729 
3730   m      = merge->rowmap.n;
3731   M      = merge->rowmap.N;
3732   owners = merge->rowmap.range;
3733 
3734   /* determine the number of messages to send, their lengths */
3735   /*---------------------------------------------------------*/
3736   len_s  = merge->len_s;
3737 
3738   len = 0;  /* length of buf_si[] */
3739   merge->nsend = 0;
3740   for (proc=0; proc<size; proc++){
3741     len_si[proc] = 0;
3742     if (proc == rank){
3743       len_s[proc] = 0;
3744     } else {
3745       len_si[proc] = owners[proc+1] - owners[proc] + 1;
3746       len_s[proc] = ai[owners[proc+1]] - ai[owners[proc]]; /* num of rows to be sent to [proc] */
3747     }
3748     if (len_s[proc]) {
3749       merge->nsend++;
3750       nrows = 0;
3751       for (i=owners[proc]; i<owners[proc+1]; i++){
3752         if (ai[i+1] > ai[i]) nrows++;
3753       }
3754       len_si[proc] = 2*(nrows+1);
3755       len += len_si[proc];
3756     }
3757   }
3758 
3759   /* determine the number and length of messages to receive for ij-structure */
3760   /*-------------------------------------------------------------------------*/
3761   ierr = PetscGatherNumberOfMessages(comm,PETSC_NULL,len_s,&merge->nrecv);CHKERRQ(ierr);
3762   ierr = PetscGatherMessageLengths2(comm,merge->nsend,merge->nrecv,len_s,len_si,&merge->id_r,&merge->len_r,&len_ri);CHKERRQ(ierr);
3763 
3764   /* post the Irecv of j-structure */
3765   /*-------------------------------*/
3766   ierr = PetscCommGetNewTag(comm,&tagj);CHKERRQ(ierr);
3767   ierr = PetscPostIrecvInt(comm,tagj,merge->nrecv,merge->id_r,merge->len_r,&buf_rj,&rj_waits);CHKERRQ(ierr);
3768 
3769   /* post the Isend of j-structure */
3770   /*--------------------------------*/
3771   ierr = PetscMalloc((2*merge->nsend+1)*sizeof(MPI_Request),&si_waits);CHKERRQ(ierr);
3772   sj_waits = si_waits + merge->nsend;
3773 
3774   for (proc=0, k=0; proc<size; proc++){
3775     if (!len_s[proc]) continue;
3776     i = owners[proc];
3777     ierr = MPI_Isend(aj+ai[i],len_s[proc],MPIU_INT,proc,tagj,comm,sj_waits+k);CHKERRQ(ierr);
3778     k++;
3779   }
3780 
3781   /* receives and sends of j-structure are complete */
3782   /*------------------------------------------------*/
3783   if (merge->nrecv) {ierr = MPI_Waitall(merge->nrecv,rj_waits,status);CHKERRQ(ierr);}
3784   if (merge->nsend) {ierr = MPI_Waitall(merge->nsend,sj_waits,status);CHKERRQ(ierr);}
3785 
3786   /* send and recv i-structure */
3787   /*---------------------------*/
3788   ierr = PetscCommGetNewTag(comm,&tagi);CHKERRQ(ierr);
3789   ierr = PetscPostIrecvInt(comm,tagi,merge->nrecv,merge->id_r,len_ri,&buf_ri,&ri_waits);CHKERRQ(ierr);
3790 
3791   ierr = PetscMalloc((len+1)*sizeof(PetscInt),&buf_s);CHKERRQ(ierr);
3792   buf_si = buf_s;  /* points to the beginning of k-th msg to be sent */
3793   for (proc=0,k=0; proc<size; proc++){
3794     if (!len_s[proc]) continue;
3795     /* form outgoing message for i-structure:
3796          buf_si[0]:                 nrows to be sent
3797                [1:nrows]:           row index (global)
3798                [nrows+1:2*nrows+1]: i-structure index
3799     */
3800     /*-------------------------------------------*/
3801     nrows = len_si[proc]/2 - 1;
3802     buf_si_i    = buf_si + nrows+1;
3803     buf_si[0]   = nrows;
3804     buf_si_i[0] = 0;
3805     nrows = 0;
3806     for (i=owners[proc]; i<owners[proc+1]; i++){
3807       anzi = ai[i+1] - ai[i];
3808       if (anzi) {
3809         buf_si_i[nrows+1] = buf_si_i[nrows] + anzi; /* i-structure */
3810         buf_si[nrows+1] = i-owners[proc]; /* local row index */
3811         nrows++;
3812       }
3813     }
3814     ierr = MPI_Isend(buf_si,len_si[proc],MPIU_INT,proc,tagi,comm,si_waits+k);CHKERRQ(ierr);
3815     k++;
3816     buf_si += len_si[proc];
3817   }
3818 
3819   if (merge->nrecv) {ierr = MPI_Waitall(merge->nrecv,ri_waits,status);CHKERRQ(ierr);}
3820   if (merge->nsend) {ierr = MPI_Waitall(merge->nsend,si_waits,status);CHKERRQ(ierr);}
3821 
3822   ierr = PetscInfo2(seqmat,"nsend: %D, nrecv: %D\n",merge->nsend,merge->nrecv);CHKERRQ(ierr);
3823   for (i=0; i<merge->nrecv; i++){
3824     ierr = PetscInfo3(seqmat,"recv len_ri=%D, len_rj=%D from [%D]\n",len_ri[i],merge->len_r[i],merge->id_r[i]);CHKERRQ(ierr);
3825   }
3826 
3827   ierr = PetscFree(len_si);CHKERRQ(ierr);
3828   ierr = PetscFree(len_ri);CHKERRQ(ierr);
3829   ierr = PetscFree(rj_waits);CHKERRQ(ierr);
3830   ierr = PetscFree(si_waits);CHKERRQ(ierr);
3831   ierr = PetscFree(ri_waits);CHKERRQ(ierr);
3832   ierr = PetscFree(buf_s);CHKERRQ(ierr);
3833   ierr = PetscFree(status);CHKERRQ(ierr);
3834 
3835   /* compute a local seq matrix in each processor */
3836   /*----------------------------------------------*/
3837   /* allocate bi array and free space for accumulating nonzero column info */
3838   ierr = PetscMalloc((m+1)*sizeof(PetscInt),&bi);CHKERRQ(ierr);
3839   bi[0] = 0;
3840 
3841   /* create and initialize a linked list */
3842   nlnk = N+1;
3843   ierr = PetscLLCreate(N,N,nlnk,lnk,lnkbt);CHKERRQ(ierr);
3844 
3845   /* initial FreeSpace size is 2*(num of local nnz(seqmat)) */
3846   len = 0;
3847   len  = ai[owners[rank+1]] - ai[owners[rank]];
3848   ierr = PetscFreeSpaceGet((PetscInt)(2*len+1),&free_space);CHKERRQ(ierr);
3849   current_space = free_space;
3850 
3851   /* determine symbolic info for each local row */
3852   ierr = PetscMalloc((3*merge->nrecv+1)*sizeof(PetscInt**),&buf_ri_k);CHKERRQ(ierr);
3853   nextrow = buf_ri_k + merge->nrecv;
3854   nextai  = nextrow + merge->nrecv;
3855   for (k=0; k<merge->nrecv; k++){
3856     buf_ri_k[k] = buf_ri[k]; /* beginning of k-th recved i-structure */
3857     nrows = *buf_ri_k[k];
3858     nextrow[k]  = buf_ri_k[k] + 1;  /* next row number of k-th recved i-structure */
3859     nextai[k]   = buf_ri_k[k] + (nrows + 1);/* poins to the next i-structure of k-th recved i-structure  */
3860   }
3861 
3862   ierr = MatPreallocateInitialize(comm,m,n,dnz,onz);CHKERRQ(ierr);
3863   len = 0;
3864   for (i=0;i<m;i++) {
3865     bnzi   = 0;
3866     /* add local non-zero cols of this proc's seqmat into lnk */
3867     arow   = owners[rank] + i;
3868     anzi   = ai[arow+1] - ai[arow];
3869     aj     = a->j + ai[arow];
3870     ierr = PetscLLAdd(anzi,aj,N,nlnk,lnk,lnkbt);CHKERRQ(ierr);
3871     bnzi += nlnk;
3872     /* add received col data into lnk */
3873     for (k=0; k<merge->nrecv; k++){ /* k-th received message */
3874       if (i == *nextrow[k]) { /* i-th row */
3875         anzi = *(nextai[k]+1) - *nextai[k];
3876         aj   = buf_rj[k] + *nextai[k];
3877         ierr = PetscLLAdd(anzi,aj,N,nlnk,lnk,lnkbt);CHKERRQ(ierr);
3878         bnzi += nlnk;
3879         nextrow[k]++; nextai[k]++;
3880       }
3881     }
3882     if (len < bnzi) len = bnzi;  /* =max(bnzi) */
3883 
3884     /* if free space is not available, make more free space */
3885     if (current_space->local_remaining<bnzi) {
3886       ierr = PetscFreeSpaceGet(current_space->total_array_size,&current_space);CHKERRQ(ierr);
3887       nspacedouble++;
3888     }
3889     /* copy data into free space, then initialize lnk */
3890     ierr = PetscLLClean(N,N,bnzi,lnk,current_space->array,lnkbt);CHKERRQ(ierr);
3891     ierr = MatPreallocateSet(i+owners[rank],bnzi,current_space->array,dnz,onz);CHKERRQ(ierr);
3892 
3893     current_space->array           += bnzi;
3894     current_space->local_used      += bnzi;
3895     current_space->local_remaining -= bnzi;
3896 
3897     bi[i+1] = bi[i] + bnzi;
3898   }
3899 
3900   ierr = PetscFree(buf_ri_k);CHKERRQ(ierr);
3901 
3902   ierr = PetscMalloc((bi[m]+1)*sizeof(PetscInt),&bj);CHKERRQ(ierr);
3903   ierr = PetscFreeSpaceContiguous(&free_space,bj);CHKERRQ(ierr);
3904   ierr = PetscLLDestroy(lnk,lnkbt);CHKERRQ(ierr);
3905 
3906   /* create symbolic parallel matrix B_mpi */
3907   /*---------------------------------------*/
3908   ierr = MatCreate(comm,&B_mpi);CHKERRQ(ierr);
3909   if (n==PETSC_DECIDE) {
3910     ierr = MatSetSizes(B_mpi,m,n,PETSC_DETERMINE,N);CHKERRQ(ierr);
3911   } else {
3912     ierr = MatSetSizes(B_mpi,m,n,PETSC_DETERMINE,PETSC_DETERMINE);CHKERRQ(ierr);
3913   }
3914   ierr = MatSetType(B_mpi,MATMPIAIJ);CHKERRQ(ierr);
3915   ierr = MatMPIAIJSetPreallocation(B_mpi,0,dnz,0,onz);CHKERRQ(ierr);
3916   ierr = MatPreallocateFinalize(dnz,onz);CHKERRQ(ierr);
3917 
3918   /* B_mpi is not ready for use - assembly will be done by MatMerge_SeqsToMPINumeric() */
3919   B_mpi->assembled     = PETSC_FALSE;
3920   B_mpi->ops->destroy  = MatDestroy_MPIAIJ_SeqsToMPI;
3921   merge->bi            = bi;
3922   merge->bj            = bj;
3923   merge->buf_ri        = buf_ri;
3924   merge->buf_rj        = buf_rj;
3925   merge->coi           = PETSC_NULL;
3926   merge->coj           = PETSC_NULL;
3927   merge->owners_co     = PETSC_NULL;
3928 
3929   /* attach the supporting struct to B_mpi for reuse */
3930   ierr = PetscContainerCreate(PETSC_COMM_SELF,&container);CHKERRQ(ierr);
3931   ierr = PetscContainerSetPointer(container,merge);CHKERRQ(ierr);
3932   ierr = PetscObjectCompose((PetscObject)B_mpi,"MatMergeSeqsToMPI",(PetscObject)container);CHKERRQ(ierr);
3933   *mpimat = B_mpi;
3934 
3935   ierr = PetscCommDestroy(&comm);CHKERRQ(ierr);
3936   ierr = PetscLogEventEnd(logkey_seqstompisym,seqmat,0,0,0);CHKERRQ(ierr);
3937   PetscFunctionReturn(0);
3938 }
3939 
3940 static PetscEvent logkey_seqstompi = 0;
3941 #undef __FUNCT__
3942 #define __FUNCT__ "MatMerge_SeqsToMPI"
3943 PetscErrorCode PETSCMAT_DLLEXPORT MatMerge_SeqsToMPI(MPI_Comm comm,Mat seqmat,PetscInt m,PetscInt n,MatReuse scall,Mat *mpimat)
3944 {
3945   PetscErrorCode   ierr;
3946 
3947   PetscFunctionBegin;
3948   if (!logkey_seqstompi) {
3949     ierr = PetscLogEventRegister(&logkey_seqstompi,"MatMerge_SeqsToMPI",MAT_COOKIE);
3950   }
3951   ierr = PetscLogEventBegin(logkey_seqstompi,seqmat,0,0,0);CHKERRQ(ierr);
3952   if (scall == MAT_INITIAL_MATRIX){
3953     ierr = MatMerge_SeqsToMPISymbolic(comm,seqmat,m,n,mpimat);CHKERRQ(ierr);
3954   }
3955   ierr = MatMerge_SeqsToMPINumeric(seqmat,*mpimat);CHKERRQ(ierr);
3956   ierr = PetscLogEventEnd(logkey_seqstompi,seqmat,0,0,0);CHKERRQ(ierr);
3957   PetscFunctionReturn(0);
3958 }
3959 static PetscEvent logkey_getlocalmat = 0;
3960 #undef __FUNCT__
3961 #define __FUNCT__ "MatGetLocalMat"
3962 /*@
3963      MatGetLocalMat - Creates a SeqAIJ matrix by taking all its local rows
3964 
3965     Not Collective
3966 
3967    Input Parameters:
3968 +    A - the matrix
3969 .    scall - either MAT_INITIAL_MATRIX or MAT_REUSE_MATRIX
3970 
3971    Output Parameter:
3972 .    A_loc - the local sequential matrix generated
3973 
3974     Level: developer
3975 
3976 @*/
3977 PetscErrorCode PETSCMAT_DLLEXPORT MatGetLocalMat(Mat A,MatReuse scall,Mat *A_loc)
3978 {
3979   PetscErrorCode  ierr;
3980   Mat_MPIAIJ      *mpimat=(Mat_MPIAIJ*)A->data;
3981   Mat_SeqAIJ      *mat,*a=(Mat_SeqAIJ*)(mpimat->A)->data,*b=(Mat_SeqAIJ*)(mpimat->B)->data;
3982   PetscInt        *ai=a->i,*aj=a->j,*bi=b->i,*bj=b->j,*cmap=mpimat->garray;
3983   PetscScalar     *aa=a->a,*ba=b->a,*ca;
3984   PetscInt        am=A->rmap.n,i,j,k,cstart=A->cmap.rstart;
3985   PetscInt        *ci,*cj,col,ncols_d,ncols_o,jo;
3986 
3987   PetscFunctionBegin;
3988   if (!logkey_getlocalmat) {
3989     ierr = PetscLogEventRegister(&logkey_getlocalmat,"MatGetLocalMat",MAT_COOKIE);
3990   }
3991   ierr = PetscLogEventBegin(logkey_getlocalmat,A,0,0,0);CHKERRQ(ierr);
3992   if (scall == MAT_INITIAL_MATRIX){
3993     ierr = PetscMalloc((1+am)*sizeof(PetscInt),&ci);CHKERRQ(ierr);
3994     ci[0] = 0;
3995     for (i=0; i<am; i++){
3996       ci[i+1] = ci[i] + (ai[i+1] - ai[i]) + (bi[i+1] - bi[i]);
3997     }
3998     ierr = PetscMalloc((1+ci[am])*sizeof(PetscInt),&cj);CHKERRQ(ierr);
3999     ierr = PetscMalloc((1+ci[am])*sizeof(PetscScalar),&ca);CHKERRQ(ierr);
4000     k = 0;
4001     for (i=0; i<am; i++) {
4002       ncols_o = bi[i+1] - bi[i];
4003       ncols_d = ai[i+1] - ai[i];
4004       /* off-diagonal portion of A */
4005       for (jo=0; jo<ncols_o; jo++) {
4006         col = cmap[*bj];
4007         if (col >= cstart) break;
4008         cj[k]   = col; bj++;
4009         ca[k++] = *ba++;
4010       }
4011       /* diagonal portion of A */
4012       for (j=0; j<ncols_d; j++) {
4013         cj[k]   = cstart + *aj++;
4014         ca[k++] = *aa++;
4015       }
4016       /* off-diagonal portion of A */
4017       for (j=jo; j<ncols_o; j++) {
4018         cj[k]   = cmap[*bj++];
4019         ca[k++] = *ba++;
4020       }
4021     }
4022     /* put together the new matrix */
4023     ierr = MatCreateSeqAIJWithArrays(PETSC_COMM_SELF,am,A->cmap.N,ci,cj,ca,A_loc);CHKERRQ(ierr);
4024     /* MatCreateSeqAIJWithArrays flags matrix so PETSc doesn't free the user's arrays. */
4025     /* Since these are PETSc arrays, change flags to free them as necessary. */
4026     mat          = (Mat_SeqAIJ*)(*A_loc)->data;
4027     mat->free_a  = PETSC_TRUE;
4028     mat->free_ij = PETSC_TRUE;
4029     mat->nonew   = 0;
4030   } else if (scall == MAT_REUSE_MATRIX){
4031     mat=(Mat_SeqAIJ*)(*A_loc)->data;
4032     ci = mat->i; cj = mat->j; ca = mat->a;
4033     for (i=0; i<am; i++) {
4034       /* off-diagonal portion of A */
4035       ncols_o = bi[i+1] - bi[i];
4036       for (jo=0; jo<ncols_o; jo++) {
4037         col = cmap[*bj];
4038         if (col >= cstart) break;
4039         *ca++ = *ba++; bj++;
4040       }
4041       /* diagonal portion of A */
4042       ncols_d = ai[i+1] - ai[i];
4043       for (j=0; j<ncols_d; j++) *ca++ = *aa++;
4044       /* off-diagonal portion of A */
4045       for (j=jo; j<ncols_o; j++) {
4046         *ca++ = *ba++; bj++;
4047       }
4048     }
4049   } else {
4050     SETERRQ1(PETSC_ERR_ARG_WRONG,"Invalid MatReuse %d",(int)scall);
4051   }
4052 
4053   ierr = PetscLogEventEnd(logkey_getlocalmat,A,0,0,0);CHKERRQ(ierr);
4054   PetscFunctionReturn(0);
4055 }
4056 
4057 static PetscEvent logkey_getlocalmatcondensed = 0;
4058 #undef __FUNCT__
4059 #define __FUNCT__ "MatGetLocalMatCondensed"
4060 /*@C
4061      MatGetLocalMatCondensed - Creates a SeqAIJ matrix by taking all its local rows and NON-ZERO columns
4062 
4063     Not Collective
4064 
4065    Input Parameters:
4066 +    A - the matrix
4067 .    scall - either MAT_INITIAL_MATRIX or MAT_REUSE_MATRIX
4068 -    row, col - index sets of rows and columns to extract (or PETSC_NULL)
4069 
4070    Output Parameter:
4071 .    A_loc - the local sequential matrix generated
4072 
4073     Level: developer
4074 
4075 @*/
4076 PetscErrorCode PETSCMAT_DLLEXPORT MatGetLocalMatCondensed(Mat A,MatReuse scall,IS *row,IS *col,Mat *A_loc)
4077 {
4078   Mat_MPIAIJ        *a=(Mat_MPIAIJ*)A->data;
4079   PetscErrorCode    ierr;
4080   PetscInt          i,start,end,ncols,nzA,nzB,*cmap,imark,*idx;
4081   IS                isrowa,iscola;
4082   Mat               *aloc;
4083 
4084   PetscFunctionBegin;
4085   if (!logkey_getlocalmatcondensed) {
4086     ierr = PetscLogEventRegister(&logkey_getlocalmatcondensed,"MatGetLocalMatCondensed",MAT_COOKIE);
4087   }
4088   ierr = PetscLogEventBegin(logkey_getlocalmatcondensed,A,0,0,0);CHKERRQ(ierr);
4089   if (!row){
4090     start = A->rmap.rstart; end = A->rmap.rend;
4091     ierr = ISCreateStride(PETSC_COMM_SELF,end-start,start,1,&isrowa);CHKERRQ(ierr);
4092   } else {
4093     isrowa = *row;
4094   }
4095   if (!col){
4096     start = A->cmap.rstart;
4097     cmap  = a->garray;
4098     nzA   = a->A->cmap.n;
4099     nzB   = a->B->cmap.n;
4100     ierr  = PetscMalloc((nzA+nzB)*sizeof(PetscInt), &idx);CHKERRQ(ierr);
4101     ncols = 0;
4102     for (i=0; i<nzB; i++) {
4103       if (cmap[i] < start) idx[ncols++] = cmap[i];
4104       else break;
4105     }
4106     imark = i;
4107     for (i=0; i<nzA; i++) idx[ncols++] = start + i;
4108     for (i=imark; i<nzB; i++) idx[ncols++] = cmap[i];
4109     ierr = ISCreateGeneral(PETSC_COMM_SELF,ncols,idx,&iscola);CHKERRQ(ierr);
4110     ierr = PetscFree(idx);CHKERRQ(ierr);
4111   } else {
4112     iscola = *col;
4113   }
4114   if (scall != MAT_INITIAL_MATRIX){
4115     ierr = PetscMalloc(sizeof(Mat),&aloc);CHKERRQ(ierr);
4116     aloc[0] = *A_loc;
4117   }
4118   ierr = MatGetSubMatrices(A,1,&isrowa,&iscola,scall,&aloc);CHKERRQ(ierr);
4119   *A_loc = aloc[0];
4120   ierr = PetscFree(aloc);CHKERRQ(ierr);
4121   if (!row){
4122     ierr = ISDestroy(isrowa);CHKERRQ(ierr);
4123   }
4124   if (!col){
4125     ierr = ISDestroy(iscola);CHKERRQ(ierr);
4126   }
4127   ierr = PetscLogEventEnd(logkey_getlocalmatcondensed,A,0,0,0);CHKERRQ(ierr);
4128   PetscFunctionReturn(0);
4129 }
4130 
4131 static PetscEvent logkey_GetBrowsOfAcols = 0;
4132 #undef __FUNCT__
4133 #define __FUNCT__ "MatGetBrowsOfAcols"
4134 /*@C
4135     MatGetBrowsOfAcols - Creates a SeqAIJ matrix by taking rows of B that equal to nonzero columns of local A
4136 
4137     Collective on Mat
4138 
4139    Input Parameters:
4140 +    A,B - the matrices in mpiaij format
4141 .    scall - either MAT_INITIAL_MATRIX or MAT_REUSE_MATRIX
4142 -    rowb, colb - index sets of rows and columns of B to extract (or PETSC_NULL)
4143 
4144    Output Parameter:
4145 +    rowb, colb - index sets of rows and columns of B to extract
4146 .    brstart - row index of B_seq from which next B->rmap.n rows are taken from B's local rows
4147 -    B_seq - the sequential matrix generated
4148 
4149     Level: developer
4150 
4151 @*/
4152 PetscErrorCode PETSCMAT_DLLEXPORT MatGetBrowsOfAcols(Mat A,Mat B,MatReuse scall,IS *rowb,IS *colb,PetscInt *brstart,Mat *B_seq)
4153 {
4154   Mat_MPIAIJ        *a=(Mat_MPIAIJ*)A->data;
4155   PetscErrorCode    ierr;
4156   PetscInt          *idx,i,start,ncols,nzA,nzB,*cmap,imark;
4157   IS                isrowb,iscolb;
4158   Mat               *bseq;
4159 
4160   PetscFunctionBegin;
4161   if (A->cmap.rstart != B->rmap.rstart || A->cmap.rend != B->rmap.rend){
4162     SETERRQ4(PETSC_ERR_ARG_SIZ,"Matrix local dimensions are incompatible, (%D, %D) != (%D,%D)",A->cmap.rstart,A->cmap.rend,B->rmap.rstart,B->rmap.rend);
4163   }
4164   if (!logkey_GetBrowsOfAcols) {
4165     ierr = PetscLogEventRegister(&logkey_GetBrowsOfAcols,"MatGetBrowsOfAcols",MAT_COOKIE);
4166   }
4167   ierr = PetscLogEventBegin(logkey_GetBrowsOfAcols,A,B,0,0);CHKERRQ(ierr);
4168 
4169   if (scall == MAT_INITIAL_MATRIX){
4170     start = A->cmap.rstart;
4171     cmap  = a->garray;
4172     nzA   = a->A->cmap.n;
4173     nzB   = a->B->cmap.n;
4174     ierr  = PetscMalloc((nzA+nzB)*sizeof(PetscInt), &idx);CHKERRQ(ierr);
4175     ncols = 0;
4176     for (i=0; i<nzB; i++) {  /* row < local row index */
4177       if (cmap[i] < start) idx[ncols++] = cmap[i];
4178       else break;
4179     }
4180     imark = i;
4181     for (i=0; i<nzA; i++) idx[ncols++] = start + i;  /* local rows */
4182     for (i=imark; i<nzB; i++) idx[ncols++] = cmap[i]; /* row > local row index */
4183     ierr = ISCreateGeneral(PETSC_COMM_SELF,ncols,idx,&isrowb);CHKERRQ(ierr);
4184     ierr = PetscFree(idx);CHKERRQ(ierr);
4185     *brstart = imark;
4186     ierr = ISCreateStride(PETSC_COMM_SELF,B->cmap.N,0,1,&iscolb);CHKERRQ(ierr);
4187   } else {
4188     if (!rowb || !colb) SETERRQ(PETSC_ERR_SUP,"IS rowb and colb must be provided for MAT_REUSE_MATRIX");
4189     isrowb = *rowb; iscolb = *colb;
4190     ierr = PetscMalloc(sizeof(Mat),&bseq);CHKERRQ(ierr);
4191     bseq[0] = *B_seq;
4192   }
4193   ierr = MatGetSubMatrices(B,1,&isrowb,&iscolb,scall,&bseq);CHKERRQ(ierr);
4194   *B_seq = bseq[0];
4195   ierr = PetscFree(bseq);CHKERRQ(ierr);
4196   if (!rowb){
4197     ierr = ISDestroy(isrowb);CHKERRQ(ierr);
4198   } else {
4199     *rowb = isrowb;
4200   }
4201   if (!colb){
4202     ierr = ISDestroy(iscolb);CHKERRQ(ierr);
4203   } else {
4204     *colb = iscolb;
4205   }
4206   ierr = PetscLogEventEnd(logkey_GetBrowsOfAcols,A,B,0,0);CHKERRQ(ierr);
4207   PetscFunctionReturn(0);
4208 }
4209 
4210 static PetscEvent logkey_GetBrowsOfAocols = 0;
4211 #undef __FUNCT__
4212 #define __FUNCT__ "MatGetBrowsOfAoCols"
4213 /*@C
4214     MatGetBrowsOfAoCols - Creates a SeqAIJ matrix by taking rows of B that equal to nonzero columns
4215     of the OFF-DIAGONAL portion of local A
4216 
4217     Collective on Mat
4218 
4219    Input Parameters:
4220 +    A,B - the matrices in mpiaij format
4221 .    scall - either MAT_INITIAL_MATRIX or MAT_REUSE_MATRIX
4222 .    startsj - starting point in B's sending and receiving j-arrays, saved for MAT_REUSE (or PETSC_NULL)
4223 -    bufa_ptr - array for sending matrix values, saved for MAT_REUSE (or PETSC_NULL)
4224 
4225    Output Parameter:
4226 +    B_oth - the sequential matrix generated
4227 
4228     Level: developer
4229 
4230 @*/
4231 PetscErrorCode PETSCMAT_DLLEXPORT MatGetBrowsOfAoCols(Mat A,Mat B,MatReuse scall,PetscInt **startsj,PetscScalar **bufa_ptr,Mat *B_oth)
4232 {
4233   VecScatter_MPI_General *gen_to,*gen_from;
4234   PetscErrorCode         ierr;
4235   Mat_MPIAIJ             *a=(Mat_MPIAIJ*)A->data;
4236   Mat_SeqAIJ             *b_oth;
4237   VecScatter             ctx=a->Mvctx;
4238   MPI_Comm               comm=ctx->comm;
4239   PetscMPIInt            *rprocs,*sprocs,tag=ctx->tag,rank;
4240   PetscInt               *rowlen,*bufj,*bufJ,ncols,aBn=a->B->cmap.n,row,*b_othi,*b_othj;
4241   PetscScalar            *rvalues,*svalues,*b_otha,*bufa,*bufA;
4242   PetscInt               i,j,k,l,ll,nrecvs,nsends,nrows,*srow,*rstarts,*rstartsj = 0,*sstarts,*sstartsj,len;
4243   MPI_Request            *rwaits = PETSC_NULL,*swaits = PETSC_NULL;
4244   MPI_Status             *sstatus,rstatus;
4245   PetscMPIInt            jj;
4246   PetscInt               *cols,sbs,rbs;
4247   PetscScalar            *vals;
4248 
4249   PetscFunctionBegin;
4250   if (A->cmap.rstart != B->rmap.rstart || A->cmap.rend != B->rmap.rend){
4251     SETERRQ4(PETSC_ERR_ARG_SIZ,"Matrix local dimensions are incompatible, (%d, %d) != (%d,%d)",A->cmap.rstart,A->cmap.rend,B->rmap.rstart,B->rmap.rend);
4252   }
4253   if (!logkey_GetBrowsOfAocols) {
4254     ierr = PetscLogEventRegister(&logkey_GetBrowsOfAocols,"MatGetBrAoCol",MAT_COOKIE);
4255   }
4256   ierr = PetscLogEventBegin(logkey_GetBrowsOfAocols,A,B,0,0);CHKERRQ(ierr);
4257   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
4258 
4259   gen_to   = (VecScatter_MPI_General*)ctx->todata;
4260   gen_from = (VecScatter_MPI_General*)ctx->fromdata;
4261   rvalues  = gen_from->values; /* holds the length of receiving row */
4262   svalues  = gen_to->values;   /* holds the length of sending row */
4263   nrecvs   = gen_from->n;
4264   nsends   = gen_to->n;
4265 
4266   ierr = PetscMalloc2(nrecvs,MPI_Request,&rwaits,nsends,MPI_Request,&swaits);CHKERRQ(ierr);
4267   srow     = gen_to->indices;   /* local row index to be sent */
4268   sstarts  = gen_to->starts;
4269   sprocs   = gen_to->procs;
4270   sstatus  = gen_to->sstatus;
4271   sbs      = gen_to->bs;
4272   rstarts  = gen_from->starts;
4273   rprocs   = gen_from->procs;
4274   rbs      = gen_from->bs;
4275 
4276   if (!startsj || !bufa_ptr) scall = MAT_INITIAL_MATRIX;
4277   if (scall == MAT_INITIAL_MATRIX){
4278     /* i-array */
4279     /*---------*/
4280     /*  post receives */
4281     for (i=0; i<nrecvs; i++){
4282       rowlen = (PetscInt*)rvalues + rstarts[i]*rbs;
4283       nrows = (rstarts[i+1]-rstarts[i])*rbs; /* num of indices to be received */
4284       ierr = MPI_Irecv(rowlen,nrows,MPIU_INT,rprocs[i],tag,comm,rwaits+i);CHKERRQ(ierr);
4285     }
4286 
4287     /* pack the outgoing message */
4288     ierr = PetscMalloc((nsends+nrecvs+3)*sizeof(PetscInt),&sstartsj);CHKERRQ(ierr);
4289     rstartsj = sstartsj + nsends +1;
4290     sstartsj[0] = 0;  rstartsj[0] = 0;
4291     len = 0; /* total length of j or a array to be sent */
4292     k = 0;
4293     for (i=0; i<nsends; i++){
4294       rowlen = (PetscInt*)svalues + sstarts[i]*sbs;
4295       nrows = sstarts[i+1]-sstarts[i]; /* num of block rows */
4296       for (j=0; j<nrows; j++) {
4297         row = srow[k] + B->rmap.range[rank]; /* global row idx */
4298         for (l=0; l<sbs; l++){
4299           ierr = MatGetRow_MPIAIJ(B,row+l,&ncols,PETSC_NULL,PETSC_NULL);CHKERRQ(ierr); /* rowlength */
4300           rowlen[j*sbs+l] = ncols;
4301           len += ncols;
4302           ierr = MatRestoreRow_MPIAIJ(B,row+l,&ncols,PETSC_NULL,PETSC_NULL);CHKERRQ(ierr);
4303         }
4304         k++;
4305       }
4306       ierr = MPI_Isend(rowlen,nrows*sbs,MPIU_INT,sprocs[i],tag,comm,swaits+i);CHKERRQ(ierr);
4307       sstartsj[i+1] = len;  /* starting point of (i+1)-th outgoing msg in bufj and bufa */
4308     }
4309     /* recvs and sends of i-array are completed */
4310     i = nrecvs;
4311     while (i--) {
4312       ierr = MPI_Waitany(nrecvs,rwaits,&jj,&rstatus);CHKERRQ(ierr);
4313     }
4314     if (nsends) {ierr = MPI_Waitall(nsends,swaits,sstatus);CHKERRQ(ierr);}
4315 
4316     /* allocate buffers for sending j and a arrays */
4317     ierr = PetscMalloc((len+1)*sizeof(PetscInt),&bufj);CHKERRQ(ierr);
4318     ierr = PetscMalloc((len+1)*sizeof(PetscScalar),&bufa);CHKERRQ(ierr);
4319 
4320     /* create i-array of B_oth */
4321     ierr = PetscMalloc((aBn+2)*sizeof(PetscInt),&b_othi);CHKERRQ(ierr);
4322     b_othi[0] = 0;
4323     len = 0; /* total length of j or a array to be received */
4324     k = 0;
4325     for (i=0; i<nrecvs; i++){
4326       rowlen = (PetscInt*)rvalues + rstarts[i]*rbs;
4327       nrows = rbs*(rstarts[i+1]-rstarts[i]); /* num of rows to be recieved */
4328       for (j=0; j<nrows; j++) {
4329         b_othi[k+1] = b_othi[k] + rowlen[j];
4330         len += rowlen[j]; k++;
4331       }
4332       rstartsj[i+1] = len; /* starting point of (i+1)-th incoming msg in bufj and bufa */
4333     }
4334 
4335     /* allocate space for j and a arrrays of B_oth */
4336     ierr = PetscMalloc((b_othi[aBn]+1)*sizeof(PetscInt),&b_othj);CHKERRQ(ierr);
4337     ierr = PetscMalloc((b_othi[aBn]+1)*sizeof(PetscScalar),&b_otha);CHKERRQ(ierr);
4338 
4339     /* j-array */
4340     /*---------*/
4341     /*  post receives of j-array */
4342     for (i=0; i<nrecvs; i++){
4343       nrows = rstartsj[i+1]-rstartsj[i]; /* length of the msg received */
4344       ierr = MPI_Irecv(b_othj+rstartsj[i],nrows,MPIU_INT,rprocs[i],tag,comm,rwaits+i);CHKERRQ(ierr);
4345     }
4346 
4347     /* pack the outgoing message j-array */
4348     k = 0;
4349     for (i=0; i<nsends; i++){
4350       nrows = sstarts[i+1]-sstarts[i]; /* num of block rows */
4351       bufJ = bufj+sstartsj[i];
4352       for (j=0; j<nrows; j++) {
4353         row  = srow[k++] + B->rmap.range[rank]; /* global row idx */
4354         for (ll=0; ll<sbs; ll++){
4355           ierr = MatGetRow_MPIAIJ(B,row+ll,&ncols,&cols,PETSC_NULL);CHKERRQ(ierr);
4356           for (l=0; l<ncols; l++){
4357             *bufJ++ = cols[l];
4358           }
4359           ierr = MatRestoreRow_MPIAIJ(B,row+ll,&ncols,&cols,PETSC_NULL);CHKERRQ(ierr);
4360         }
4361       }
4362       ierr = MPI_Isend(bufj+sstartsj[i],sstartsj[i+1]-sstartsj[i],MPIU_INT,sprocs[i],tag,comm,swaits+i);CHKERRQ(ierr);
4363     }
4364 
4365     /* recvs and sends of j-array are completed */
4366     i = nrecvs;
4367     while (i--) {
4368       ierr = MPI_Waitany(nrecvs,rwaits,&jj,&rstatus);CHKERRQ(ierr);
4369     }
4370     if (nsends) {ierr = MPI_Waitall(nsends,swaits,sstatus);CHKERRQ(ierr);}
4371   } else if (scall == MAT_REUSE_MATRIX){
4372     sstartsj = *startsj;
4373     rstartsj = sstartsj + nsends +1;
4374     bufa     = *bufa_ptr;
4375     b_oth    = (Mat_SeqAIJ*)(*B_oth)->data;
4376     b_otha   = b_oth->a;
4377   } else {
4378     SETERRQ(PETSC_ERR_ARG_WRONGSTATE, "Matrix P does not posses an object container");
4379   }
4380 
4381   /* a-array */
4382   /*---------*/
4383   /*  post receives of a-array */
4384   for (i=0; i<nrecvs; i++){
4385     nrows = rstartsj[i+1]-rstartsj[i]; /* length of the msg received */
4386     ierr = MPI_Irecv(b_otha+rstartsj[i],nrows,MPIU_SCALAR,rprocs[i],tag,comm,rwaits+i);CHKERRQ(ierr);
4387   }
4388 
4389   /* pack the outgoing message a-array */
4390   k = 0;
4391   for (i=0; i<nsends; i++){
4392     nrows = sstarts[i+1]-sstarts[i]; /* num of block rows */
4393     bufA = bufa+sstartsj[i];
4394     for (j=0; j<nrows; j++) {
4395       row  = srow[k++] + B->rmap.range[rank]; /* global row idx */
4396       for (ll=0; ll<sbs; ll++){
4397         ierr = MatGetRow_MPIAIJ(B,row+ll,&ncols,PETSC_NULL,&vals);CHKERRQ(ierr);
4398         for (l=0; l<ncols; l++){
4399           *bufA++ = vals[l];
4400         }
4401         ierr = MatRestoreRow_MPIAIJ(B,row+ll,&ncols,PETSC_NULL,&vals);CHKERRQ(ierr);
4402       }
4403     }
4404     ierr = MPI_Isend(bufa+sstartsj[i],sstartsj[i+1]-sstartsj[i],MPIU_SCALAR,sprocs[i],tag,comm,swaits+i);CHKERRQ(ierr);
4405   }
4406   /* recvs and sends of a-array are completed */
4407   i = nrecvs;
4408   while (i--) {
4409     ierr = MPI_Waitany(nrecvs,rwaits,&jj,&rstatus);CHKERRQ(ierr);
4410   }
4411   if (nsends) {ierr = MPI_Waitall(nsends,swaits,sstatus);CHKERRQ(ierr);}
4412   ierr = PetscFree2(rwaits,swaits);CHKERRQ(ierr);
4413 
4414   if (scall == MAT_INITIAL_MATRIX){
4415     /* put together the new matrix */
4416     ierr = MatCreateSeqAIJWithArrays(PETSC_COMM_SELF,aBn,B->cmap.N,b_othi,b_othj,b_otha,B_oth);CHKERRQ(ierr);
4417 
4418     /* MatCreateSeqAIJWithArrays flags matrix so PETSc doesn't free the user's arrays. */
4419     /* Since these are PETSc arrays, change flags to free them as necessary. */
4420     b_oth          = (Mat_SeqAIJ *)(*B_oth)->data;
4421     b_oth->free_a  = PETSC_TRUE;
4422     b_oth->free_ij = PETSC_TRUE;
4423     b_oth->nonew   = 0;
4424 
4425     ierr = PetscFree(bufj);CHKERRQ(ierr);
4426     if (!startsj || !bufa_ptr){
4427       ierr = PetscFree(sstartsj);CHKERRQ(ierr);
4428       ierr = PetscFree(bufa_ptr);CHKERRQ(ierr);
4429     } else {
4430       *startsj  = sstartsj;
4431       *bufa_ptr = bufa;
4432     }
4433   }
4434   ierr = PetscLogEventEnd(logkey_GetBrowsOfAocols,A,B,0,0);CHKERRQ(ierr);
4435   PetscFunctionReturn(0);
4436 }
4437 
4438 #undef __FUNCT__
4439 #define __FUNCT__ "MatGetCommunicationStructs"
4440 /*@C
4441   MatGetCommunicationStructs - Provides access to the communication structures used in matrix-vector multiplication.
4442 
4443   Not Collective
4444 
4445   Input Parameters:
4446 . A - The matrix in mpiaij format
4447 
4448   Output Parameter:
4449 + lvec - The local vector holding off-process values from the argument to a matrix-vector product
4450 . colmap - A map from global column index to local index into lvec
4451 - multScatter - A scatter from the argument of a matrix-vector product to lvec
4452 
4453   Level: developer
4454 
4455 @*/
4456 #if defined (PETSC_USE_CTABLE)
4457 PetscErrorCode PETSCMAT_DLLEXPORT MatGetCommunicationStructs(Mat A, Vec *lvec, PetscTable *colmap, VecScatter *multScatter)
4458 #else
4459 PetscErrorCode PETSCMAT_DLLEXPORT MatGetCommunicationStructs(Mat A, Vec *lvec, PetscInt *colmap[], VecScatter *multScatter)
4460 #endif
4461 {
4462   Mat_MPIAIJ *a;
4463 
4464   PetscFunctionBegin;
4465   PetscValidHeaderSpecific(A, MAT_COOKIE, 1);
4466   PetscValidPointer(lvec, 2)
4467   PetscValidPointer(colmap, 3)
4468   PetscValidPointer(multScatter, 4)
4469   a = (Mat_MPIAIJ *) A->data;
4470   if (lvec) *lvec = a->lvec;
4471   if (colmap) *colmap = a->colmap;
4472   if (multScatter) *multScatter = a->Mvctx;
4473   PetscFunctionReturn(0);
4474 }
4475 
4476 EXTERN_C_BEGIN
4477 extern PetscErrorCode PETSCMAT_DLLEXPORT MatConvert_MPIAIJ_MPICRL(Mat,MatType,MatReuse,Mat*);
4478 extern PetscErrorCode PETSCMAT_DLLEXPORT MatConvert_MPIAIJ_MPICSRPERM(Mat,MatType,MatReuse,Mat*);
4479 EXTERN_C_END
4480 
4481 /*MC
4482    MATMPIAIJ - MATMPIAIJ = "mpiaij" - A matrix type to be used for parallel sparse matrices.
4483 
4484    Options Database Keys:
4485 . -mat_type mpiaij - sets the matrix type to "mpiaij" during a call to MatSetFromOptions()
4486 
4487   Level: beginner
4488 
4489 .seealso: MatCreateMPIAIJ()
4490 M*/
4491 
4492 EXTERN_C_BEGIN
4493 #undef __FUNCT__
4494 #define __FUNCT__ "MatCreate_MPIAIJ"
4495 PetscErrorCode PETSCMAT_DLLEXPORT MatCreate_MPIAIJ(Mat B)
4496 {
4497   Mat_MPIAIJ     *b;
4498   PetscErrorCode ierr;
4499   PetscMPIInt    size;
4500 
4501   PetscFunctionBegin;
4502   ierr = MPI_Comm_size(B->comm,&size);CHKERRQ(ierr);
4503 
4504   ierr            = PetscNewLog(B,Mat_MPIAIJ,&b);CHKERRQ(ierr);
4505   B->data         = (void*)b;
4506   ierr            = PetscMemcpy(B->ops,&MatOps_Values,sizeof(struct _MatOps));CHKERRQ(ierr);
4507   B->factor       = 0;
4508   B->rmap.bs      = 1;
4509   B->assembled    = PETSC_FALSE;
4510   B->mapping      = 0;
4511 
4512   B->insertmode      = NOT_SET_VALUES;
4513   b->size            = size;
4514   ierr = MPI_Comm_rank(B->comm,&b->rank);CHKERRQ(ierr);
4515 
4516   /* build cache for off array entries formed */
4517   ierr = MatStashCreate_Private(B->comm,1,&B->stash);CHKERRQ(ierr);
4518   b->donotstash  = PETSC_FALSE;
4519   b->colmap      = 0;
4520   b->garray      = 0;
4521   b->roworiented = PETSC_TRUE;
4522 
4523   /* stuff used for matrix vector multiply */
4524   b->lvec      = PETSC_NULL;
4525   b->Mvctx     = PETSC_NULL;
4526 
4527   /* stuff for MatGetRow() */
4528   b->rowindices   = 0;
4529   b->rowvalues    = 0;
4530   b->getrowactive = PETSC_FALSE;
4531 
4532 
4533   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatStoreValues_C",
4534                                      "MatStoreValues_MPIAIJ",
4535                                      MatStoreValues_MPIAIJ);CHKERRQ(ierr);
4536   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatRetrieveValues_C",
4537                                      "MatRetrieveValues_MPIAIJ",
4538                                      MatRetrieveValues_MPIAIJ);CHKERRQ(ierr);
4539   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatGetDiagonalBlock_C",
4540 				     "MatGetDiagonalBlock_MPIAIJ",
4541                                      MatGetDiagonalBlock_MPIAIJ);CHKERRQ(ierr);
4542   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatIsTranspose_C",
4543 				     "MatIsTranspose_MPIAIJ",
4544 				     MatIsTranspose_MPIAIJ);CHKERRQ(ierr);
4545   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatMPIAIJSetPreallocation_C",
4546 				     "MatMPIAIJSetPreallocation_MPIAIJ",
4547 				     MatMPIAIJSetPreallocation_MPIAIJ);CHKERRQ(ierr);
4548   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatMPIAIJSetPreallocationCSR_C",
4549 				     "MatMPIAIJSetPreallocationCSR_MPIAIJ",
4550 				     MatMPIAIJSetPreallocationCSR_MPIAIJ);CHKERRQ(ierr);
4551   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatDiagonalScaleLocal_C",
4552 				     "MatDiagonalScaleLocal_MPIAIJ",
4553 				     MatDiagonalScaleLocal_MPIAIJ);CHKERRQ(ierr);
4554   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatConvert_mpiaij_mpicsrperm_C",
4555                                      "MatConvert_MPIAIJ_MPICSRPERM",
4556                                       MatConvert_MPIAIJ_MPICSRPERM);CHKERRQ(ierr);
4557   ierr = PetscObjectComposeFunctionDynamic((PetscObject)B,"MatConvert_mpiaij_mpicrl_C",
4558                                      "MatConvert_MPIAIJ_MPICRL",
4559                                       MatConvert_MPIAIJ_MPICRL);CHKERRQ(ierr);
4560   ierr = PetscObjectChangeTypeName((PetscObject)B,MATMPIAIJ);CHKERRQ(ierr);
4561   PetscFunctionReturn(0);
4562 }
4563 EXTERN_C_END
4564 
4565 #undef __FUNCT__
4566 #define __FUNCT__ "MatCreateMPIAIJWithSplitArrays"
4567 /*@
4568      MatCreateMPIAIJWithSplitArrays - creates a MPI AIJ matrix using arrays that contain the "diagonal"
4569          and "off-diagonal" part of the matrix in CSR format.
4570 
4571    Collective on MPI_Comm
4572 
4573    Input Parameters:
4574 +  comm - MPI communicator
4575 .  m - number of local rows (Cannot be PETSC_DECIDE)
4576 .  n - This value should be the same as the local size used in creating the
4577        x vector for the matrix-vector product y = Ax. (or PETSC_DECIDE to have
4578        calculated if N is given) For square matrices n is almost always m.
4579 .  M - number of global rows (or PETSC_DETERMINE to have calculated if m is given)
4580 .  N - number of global columns (or PETSC_DETERMINE to have calculated if n is given)
4581 .   i - row indices for "diagonal" portion of matrix
4582 .   j - column indices
4583 .   a - matrix values
4584 .   oi - row indices for "off-diagonal" portion of matrix
4585 .   oj - column indices
4586 -   oa - matrix values
4587 
4588    Output Parameter:
4589 .   mat - the matrix
4590 
4591    Level: advanced
4592 
4593    Notes:
4594        The i, j, and a arrays ARE NOT copied by this routine into the internal format used by PETSc.
4595 
4596        The i and j indices are 0 based
4597 
4598        See MatCreateMPIAIJ() for the definition of "diagonal" and "off-diagonal" portion of the matrix
4599 
4600 
4601 .keywords: matrix, aij, compressed row, sparse, parallel
4602 
4603 .seealso: MatCreate(), MatCreateSeqAIJ(), MatSetValues(), MatMPIAIJSetPreallocation(), MatMPIAIJSetPreallocationCSR(),
4604           MPIAIJ, MatCreateMPIAIJ(), MatCreateMPIAIJWithArrays()
4605 @*/
4606 PetscErrorCode PETSCMAT_DLLEXPORT MatCreateMPIAIJWithSplitArrays(MPI_Comm comm,PetscInt m,PetscInt n,PetscInt M,PetscInt N,PetscInt i[],PetscInt j[],PetscScalar a[],
4607 								PetscInt oi[], PetscInt oj[],PetscScalar oa[],Mat *mat)
4608 {
4609   PetscErrorCode ierr;
4610   Mat_MPIAIJ     *maij;
4611 
4612  PetscFunctionBegin;
4613   if (m < 0) SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"local number of rows (m) cannot be PETSC_DECIDE, or negative");
4614   if (i[0]) {
4615     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"i (row indices) must start with 0");
4616   }
4617   if (oi[0]) {
4618     SETERRQ(PETSC_ERR_ARG_OUTOFRANGE,"oi (row indices) must start with 0");
4619   }
4620   ierr = MatCreate(comm,mat);CHKERRQ(ierr);
4621   ierr = MatSetSizes(*mat,m,n,M,N);CHKERRQ(ierr);
4622   ierr = MatSetType(*mat,MATMPIAIJ);CHKERRQ(ierr);
4623   maij = (Mat_MPIAIJ*) (*mat)->data;
4624   maij->donotstash     = PETSC_TRUE;
4625   (*mat)->preallocated = PETSC_TRUE;
4626 
4627   (*mat)->rmap.bs = (*mat)->cmap.bs = 1;
4628   ierr = PetscMapSetUp(&(*mat)->rmap);CHKERRQ(ierr);
4629   ierr = PetscMapSetUp(&(*mat)->cmap);CHKERRQ(ierr);
4630 
4631   ierr = MatCreateSeqAIJWithArrays(PETSC_COMM_SELF,m,n,i,j,a,&maij->A);CHKERRQ(ierr);
4632   ierr = MatCreateSeqAIJWithArrays(PETSC_COMM_SELF,m,(*mat)->cmap.N,oi,oj,oa,&maij->B);CHKERRQ(ierr);
4633 
4634   ierr = MatAssemblyBegin(maij->A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
4635   ierr = MatAssemblyEnd(maij->A,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
4636   ierr = MatAssemblyBegin(maij->B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
4637   ierr = MatAssemblyEnd(maij->B,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
4638 
4639   ierr = MatAssemblyBegin(*mat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
4640   ierr = MatAssemblyEnd(*mat,MAT_FINAL_ASSEMBLY);CHKERRQ(ierr);
4641   PetscFunctionReturn(0);
4642 }
4643 
4644 /*
4645     Special version for direct calls from Fortran
4646 */
4647 #if defined(PETSC_HAVE_FORTRAN_CAPS)
4648 #define matsetvaluesmpiaij_ MATSETVALUESMPIAIJ
4649 #elif !defined(PETSC_HAVE_FORTRAN_UNDERSCORE)
4650 #define matsetvaluesmpiaij_ matsetvaluesmpiaij
4651 #endif
4652 
4653 /* Change these macros so can be used in void function */
4654 #undef CHKERRQ
4655 #define CHKERRQ(ierr) CHKERRABORT(mat->comm,ierr)
4656 #undef SETERRQ2
4657 #define SETERRQ2(ierr,b,c,d) CHKERRABORT(mat->comm,ierr)
4658 #undef SETERRQ
4659 #define SETERRQ(ierr,b) CHKERRABORT(mat->comm,ierr)
4660 
4661 EXTERN_C_BEGIN
4662 #undef __FUNCT__
4663 #define __FUNCT__ "matsetvaluesmpiaij_"
4664 void PETSC_STDCALL matsetvaluesmpiaij_(Mat *mmat,PetscInt *mm,const PetscInt im[],PetscInt *mn,const PetscInt in[],const PetscScalar v[],InsertMode *maddv,PetscErrorCode *_ierr)
4665 {
4666   Mat            mat = *mmat;
4667   PetscInt       m = *mm, n = *mn;
4668   InsertMode     addv = *maddv;
4669   Mat_MPIAIJ     *aij = (Mat_MPIAIJ*)mat->data;
4670   PetscScalar    value;
4671   PetscErrorCode ierr;
4672 
4673   MatPreallocated(mat);
4674   if (mat->insertmode == NOT_SET_VALUES) {
4675     mat->insertmode = addv;
4676   }
4677 #if defined(PETSC_USE_DEBUG)
4678   else if (mat->insertmode != addv) {
4679     SETERRQ(PETSC_ERR_ARG_WRONGSTATE,"Cannot mix add values and insert values");
4680   }
4681 #endif
4682   {
4683   PetscInt       i,j,rstart = mat->rmap.rstart,rend = mat->rmap.rend;
4684   PetscInt       cstart = mat->cmap.rstart,cend = mat->cmap.rend,row,col;
4685   PetscTruth     roworiented = aij->roworiented;
4686 
4687   /* Some Variables required in the macro */
4688   Mat            A = aij->A;
4689   Mat_SeqAIJ     *a = (Mat_SeqAIJ*)A->data;
4690   PetscInt       *aimax = a->imax,*ai = a->i,*ailen = a->ilen,*aj = a->j;
4691   PetscScalar    *aa = a->a;
4692   PetscTruth     ignorezeroentries = (((a->ignorezeroentries)&&(addv==ADD_VALUES))?PETSC_TRUE:PETSC_FALSE);
4693   Mat            B = aij->B;
4694   Mat_SeqAIJ     *b = (Mat_SeqAIJ*)B->data;
4695   PetscInt       *bimax = b->imax,*bi = b->i,*bilen = b->ilen,*bj = b->j,bm = aij->B->rmap.n,am = aij->A->rmap.n;
4696   PetscScalar    *ba = b->a;
4697 
4698   PetscInt       *rp1,*rp2,ii,nrow1,nrow2,_i,rmax1,rmax2,N,low1,high1,low2,high2,t,lastcol1,lastcol2;
4699   PetscInt       nonew = a->nonew;
4700   PetscScalar    *ap1,*ap2;
4701 
4702   PetscFunctionBegin;
4703   for (i=0; i<m; i++) {
4704     if (im[i] < 0) continue;
4705 #if defined(PETSC_USE_DEBUG)
4706     if (im[i] >= mat->rmap.N) SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Row too large: row %D max %D",im[i],mat->rmap.N-1);
4707 #endif
4708     if (im[i] >= rstart && im[i] < rend) {
4709       row      = im[i] - rstart;
4710       lastcol1 = -1;
4711       rp1      = aj + ai[row];
4712       ap1      = aa + ai[row];
4713       rmax1    = aimax[row];
4714       nrow1    = ailen[row];
4715       low1     = 0;
4716       high1    = nrow1;
4717       lastcol2 = -1;
4718       rp2      = bj + bi[row];
4719       ap2      = ba + bi[row];
4720       rmax2    = bimax[row];
4721       nrow2    = bilen[row];
4722       low2     = 0;
4723       high2    = nrow2;
4724 
4725       for (j=0; j<n; j++) {
4726         if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
4727         if (ignorezeroentries && value == 0.0 && (addv == ADD_VALUES)) continue;
4728         if (in[j] >= cstart && in[j] < cend){
4729           col = in[j] - cstart;
4730           MatSetValues_SeqAIJ_A_Private(row,col,value,addv);
4731         } else if (in[j] < 0) continue;
4732 #if defined(PETSC_USE_DEBUG)
4733         else if (in[j] >= mat->cmap.N) {SETERRQ2(PETSC_ERR_ARG_OUTOFRANGE,"Column too large: col %D max %D",in[j],mat->cmap.N-1);}
4734 #endif
4735         else {
4736           if (mat->was_assembled) {
4737             if (!aij->colmap) {
4738               ierr = CreateColmap_MPIAIJ_Private(mat);CHKERRQ(ierr);
4739             }
4740 #if defined (PETSC_USE_CTABLE)
4741             ierr = PetscTableFind(aij->colmap,in[j]+1,&col);CHKERRQ(ierr);
4742 	    col--;
4743 #else
4744             col = aij->colmap[in[j]] - 1;
4745 #endif
4746             if (col < 0 && !((Mat_SeqAIJ*)(aij->A->data))->nonew) {
4747               ierr = DisAssemble_MPIAIJ(mat);CHKERRQ(ierr);
4748               col =  in[j];
4749               /* Reinitialize the variables required by MatSetValues_SeqAIJ_B_Private() */
4750               B = aij->B;
4751               b = (Mat_SeqAIJ*)B->data;
4752               bimax = b->imax; bi = b->i; bilen = b->ilen; bj = b->j;
4753               rp2      = bj + bi[row];
4754               ap2      = ba + bi[row];
4755               rmax2    = bimax[row];
4756               nrow2    = bilen[row];
4757               low2     = 0;
4758               high2    = nrow2;
4759               bm       = aij->B->rmap.n;
4760               ba = b->a;
4761             }
4762           } else col = in[j];
4763           MatSetValues_SeqAIJ_B_Private(row,col,value,addv);
4764         }
4765       }
4766     } else {
4767       if (!aij->donotstash) {
4768         if (roworiented) {
4769           if (ignorezeroentries && v[i*n] == 0.0) continue;
4770           ierr = MatStashValuesRow_Private(&mat->stash,im[i],n,in,v+i*n);CHKERRQ(ierr);
4771         } else {
4772           if (ignorezeroentries && v[i] == 0.0) continue;
4773           ierr = MatStashValuesCol_Private(&mat->stash,im[i],n,in,v+i,m);CHKERRQ(ierr);
4774         }
4775       }
4776     }
4777   }}
4778   PetscFunctionReturnVoid();
4779 }
4780 EXTERN_C_END
4781 
4782