xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision 639f9d9dbbc54d6ac4e42e98283c540b41bb2cee)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.27 1996/09/24 20:24:28 bsmith Exp bsmith $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
77     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
78   }
79   baij->insertmode = addv;
80   rstart_orig = rstart*bs;
81   rend_orig   = rend*bs;
82   cstart_orig = cstart*bs;
83   cend_orig   = cend*bs;
84   for ( i=0; i<m; i++ ) {
85 #if defined(PETSC_BOPT_g)
86     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
87     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
88 #endif
89     if (im[i] >= rstart_orig && im[i] < rend_orig) {
90       row = im[i] - rstart_orig;
91       for ( j=0; j<n; j++ ) {
92         if (in[j] >= cstart_orig && in[j] < cend_orig){
93           col = in[j] - cstart_orig;
94           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
95           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
96         }
97 #if defined(PETSC_BOPT_g)
98         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
99         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
100 #endif
101         else {
102           if (mat->was_assembled) {
103             if (!baij->colmap) {
104               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
105             }
106             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
107             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
108               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
109               col =  in[j];
110             }
111           }
112           else col = in[j];
113           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
114           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
115         }
116       }
117     }
118     else {
119       if (roworiented) {
120         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
121       }
122       else {
123         row = im[i];
124         for ( j=0; j<n; j++ ) {
125           ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
126         }
127       }
128     }
129   }
130   return 0;
131 }
132 
133 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
134 {
135   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
136   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
137   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
138 
139   for ( i=0; i<m; i++ ) {
140     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
141     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
142     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
143       row = idxm[i] - bsrstart;
144       for ( j=0; j<n; j++ ) {
145         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
146         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
147         if (idxn[j] >= bscstart && idxn[j] < bscend){
148           col = idxn[j] - bscstart;
149           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
150         }
151         else {
152           if (!baij->colmap) {
153             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
154           }
155           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
156              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
157           else {
158             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
159             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
160           }
161         }
162       }
163     }
164     else {
165       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
166     }
167   }
168   return 0;
169 }
170 
171 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
172 {
173   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
174   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
175   int        ierr, i,bs2=baij->bs2;
176   double     sum = 0.0;
177   Scalar     *v;
178 
179   if (baij->size == 1) {
180     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
181   } else {
182     if (type == NORM_FROBENIUS) {
183       v = amat->a;
184       for (i=0; i<amat->nz*bs2; i++ ) {
185 #if defined(PETSC_COMPLEX)
186         sum += real(conj(*v)*(*v)); v++;
187 #else
188         sum += (*v)*(*v); v++;
189 #endif
190       }
191       v = bmat->a;
192       for (i=0; i<bmat->nz*bs2; i++ ) {
193 #if defined(PETSC_COMPLEX)
194         sum += real(conj(*v)*(*v)); v++;
195 #else
196         sum += (*v)*(*v); v++;
197 #endif
198       }
199       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
200       *norm = sqrt(*norm);
201     }
202     else
203       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
204   }
205   return 0;
206 }
207 
208 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
209 {
210   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
211   MPI_Comm    comm = mat->comm;
212   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
213   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
214   MPI_Request *send_waits,*recv_waits;
215   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
216   InsertMode  addv;
217   Scalar      *rvalues,*svalues;
218 
219   /* make sure all processors are either in INSERTMODE or ADDMODE */
220   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
221   if (addv == (ADD_VALUES|INSERT_VALUES)) {
222     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
223   }
224   baij->insertmode = addv; /* in case this processor had no cache */
225 
226   /*  first count number of contributors to each processor */
227   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
228   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
229   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
230   for ( i=0; i<baij->stash.n; i++ ) {
231     idx = baij->stash.idx[i];
232     for ( j=0; j<size; j++ ) {
233       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
234         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
235       }
236     }
237   }
238   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
239 
240   /* inform other processors of number of messages and max length*/
241   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
242   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
243   nreceives = work[rank];
244   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
245   nmax = work[rank];
246   PetscFree(work);
247 
248   /* post receives:
249        1) each message will consist of ordered pairs
250      (global index,value) we store the global index as a double
251      to simplify the message passing.
252        2) since we don't know how long each individual message is we
253      allocate the largest needed buffer for each receive. Potentially
254      this is a lot of wasted space.
255 
256 
257        This could be done better.
258   */
259   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
260   CHKPTRQ(rvalues);
261   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
262   CHKPTRQ(recv_waits);
263   for ( i=0; i<nreceives; i++ ) {
264     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
265               comm,recv_waits+i);
266   }
267 
268   /* do sends:
269       1) starts[i] gives the starting index in svalues for stuff going to
270          the ith processor
271   */
272   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
273   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
274   CHKPTRQ(send_waits);
275   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
276   starts[0] = 0;
277   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
278   for ( i=0; i<baij->stash.n; i++ ) {
279     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
280     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
281     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
282   }
283   PetscFree(owner);
284   starts[0] = 0;
285   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
286   count = 0;
287   for ( i=0; i<size; i++ ) {
288     if (procs[i]) {
289       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
290                 comm,send_waits+count++);
291     }
292   }
293   PetscFree(starts); PetscFree(nprocs);
294 
295   /* Free cache space */
296   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
297   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
298 
299   baij->svalues    = svalues;    baij->rvalues    = rvalues;
300   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
301   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
302   baij->rmax       = nmax;
303 
304   return 0;
305 }
306 
307 
308 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
309 {
310   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
311   MPI_Status  *send_status,recv_status;
312   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
313   int         bs=baij->bs,row,col,other_disassembled;
314   Scalar      *values,val;
315   InsertMode  addv = baij->insertmode;
316 
317   /*  wait on receives */
318   while (count) {
319     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
320     /* unpack receives into our local space */
321     values = baij->rvalues + 3*imdex*baij->rmax;
322     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
323     n = n/3;
324     for ( i=0; i<n; i++ ) {
325       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
326       col = (int) PetscReal(values[3*i+1]);
327       val = values[3*i+2];
328       if (col >= baij->cstart*bs && col < baij->cend*bs) {
329         col -= baij->cstart*bs;
330         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
331       }
332       else {
333         if (mat->was_assembled) {
334           if (!baij->colmap) {
335             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
336           }
337           col = (baij->colmap[col/bs]-1)*bs + col%bs;
338           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
339             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
340             col = (int) PetscReal(values[3*i+1]);
341           }
342         }
343         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
344       }
345     }
346     count--;
347   }
348   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
349 
350   /* wait on sends */
351   if (baij->nsends) {
352     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
353     CHKPTRQ(send_status);
354     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
355     PetscFree(send_status);
356   }
357   PetscFree(baij->send_waits); PetscFree(baij->svalues);
358 
359   baij->insertmode = NOT_SET_VALUES;
360   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
361   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
362 
363   /* determine if any processor has disassembled, if so we must
364      also disassemble ourselfs, in order that we may reassemble. */
365   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
366   if (mat->was_assembled && !other_disassembled) {
367     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
368   }
369 
370   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
371     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
372   }
373   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
374   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
375 
376   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
377   return 0;
378 }
379 
380 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
381 {
382   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
383   int          ierr;
384 
385   if (baij->size == 1) {
386     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
387   }
388   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
389   return 0;
390 }
391 
392 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
393 {
394   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
395   int          ierr, format,rank,bs=baij->bs;
396   FILE         *fd;
397   ViewerType   vtype;
398 
399   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
400   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
401     ierr = ViewerGetFormat(viewer,&format);
402     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
403       MatInfo info;
404       MPI_Comm_rank(mat->comm,&rank);
405       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
406       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
407       PetscSequentialPhaseBegin(mat->comm,1);
408       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
409               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
410               baij->bs,(int)info.memory);
411       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
412       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
413       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       fflush(fd);
416       PetscSequentialPhaseEnd(mat->comm,1);
417       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
418       return 0;
419     }
420     else if (format == VIEWER_FORMAT_ASCII_INFO) {
421       return 0;
422     }
423   }
424 
425   if (vtype == DRAW_VIEWER) {
426     Draw       draw;
427     PetscTruth isnull;
428     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
429     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
430   }
431 
432   if (vtype == ASCII_FILE_VIEWER) {
433     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
434     PetscSequentialPhaseBegin(mat->comm,1);
435     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
436            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
437             baij->cstart*bs,baij->cend*bs);
438     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
439     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
440     fflush(fd);
441     PetscSequentialPhaseEnd(mat->comm,1);
442   }
443   else {
444     int size = baij->size;
445     rank = baij->rank;
446     if (size == 1) {
447       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
448     }
449     else {
450       /* assemble the entire matrix onto first processor. */
451       Mat         A;
452       Mat_SeqBAIJ *Aloc;
453       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
454       int         mbs=baij->mbs;
455       Scalar      *a;
456 
457       if (!rank) {
458         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
459         CHKERRQ(ierr);
460       }
461       else {
462         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
463         CHKERRQ(ierr);
464       }
465       PLogObjectParent(mat,A);
466 
467       /* copy over the A part */
468       Aloc = (Mat_SeqBAIJ*) baij->A->data;
469       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
470       row = baij->rstart;
471       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
472 
473       for ( i=0; i<mbs; i++ ) {
474         rvals[0] = bs*(baij->rstart + i);
475         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
476         for ( j=ai[i]; j<ai[i+1]; j++ ) {
477           col = (baij->cstart+aj[j])*bs;
478           for (k=0; k<bs; k++ ) {
479             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
480             col++; a += bs;
481           }
482         }
483       }
484       /* copy over the B part */
485       Aloc = (Mat_SeqBAIJ*) baij->B->data;
486       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
487       row = baij->rstart*bs;
488       for ( i=0; i<mbs; i++ ) {
489         rvals[0] = bs*(baij->rstart + i);
490         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
491         for ( j=ai[i]; j<ai[i+1]; j++ ) {
492           col = baij->garray[aj[j]]*bs;
493           for (k=0; k<bs; k++ ) {
494             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
495             col++; a += bs;
496           }
497         }
498       }
499       PetscFree(rvals);
500       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
501       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
502       if (!rank) {
503         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
504       }
505       ierr = MatDestroy(A); CHKERRQ(ierr);
506     }
507   }
508   return 0;
509 }
510 
511 
512 
513 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
514 {
515   Mat         mat = (Mat) obj;
516   int         ierr;
517   ViewerType  vtype;
518 
519   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
520   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
521       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
522     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
523   }
524   else if (vtype == BINARY_FILE_VIEWER) {
525     return MatView_MPIBAIJ_Binary(mat,viewer);
526   }
527   return 0;
528 }
529 
530 static int MatDestroy_MPIBAIJ(PetscObject obj)
531 {
532   Mat         mat = (Mat) obj;
533   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
534   int         ierr;
535 
536 #if defined(PETSC_LOG)
537   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
538 #endif
539 
540   PetscFree(baij->rowners);
541   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
542   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
543   if (baij->colmap) PetscFree(baij->colmap);
544   if (baij->garray) PetscFree(baij->garray);
545   if (baij->lvec)   VecDestroy(baij->lvec);
546   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
547   if (baij->rowvalues) PetscFree(baij->rowvalues);
548   PetscFree(baij);
549   PLogObjectDestroy(mat);
550   PetscHeaderDestroy(mat);
551   return 0;
552 }
553 
554 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
555 {
556   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
557   int         ierr, nt;
558 
559   VecGetLocalSize_Fast(xx,nt);
560   if (nt != a->n) {
561     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
562   }
563   VecGetLocalSize_Fast(yy,nt);
564   if (nt != a->m) {
565     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
566   }
567   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
568   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
569   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
570   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
571   return 0;
572 }
573 
574 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
575 {
576   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
577   int        ierr;
578   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
579   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
580   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
581   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
582   return 0;
583 }
584 
585 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
586 {
587   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
588   int        ierr;
589 
590   /* do nondiagonal part */
591   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
592   /* send it on its way */
593   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
594   /* do local part */
595   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
596   /* receive remote parts: note this assumes the values are not actually */
597   /* inserted in yy until the next line, which is true for my implementation*/
598   /* but is not perhaps always true. */
599   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
600   return 0;
601 }
602 
603 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
604 {
605   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
606   int        ierr;
607 
608   /* do nondiagonal part */
609   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
610   /* send it on its way */
611   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
612   /* do local part */
613   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
614   /* receive remote parts: note this assumes the values are not actually */
615   /* inserted in yy until the next line, which is true for my implementation*/
616   /* but is not perhaps always true. */
617   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
618   return 0;
619 }
620 
621 /*
622   This only works correctly for square matrices where the subblock A->A is the
623    diagonal block
624 */
625 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
626 {
627   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
628   if (a->M != a->N)
629     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
630   return MatGetDiagonal(a->A,v);
631 }
632 
633 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
634 {
635   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
636   int        ierr;
637   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
638   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
639   return 0;
640 }
641 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
642 {
643   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
644   *m = mat->M; *n = mat->N;
645   return 0;
646 }
647 
648 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
649 {
650   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
651   *m = mat->m; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
656 {
657   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
658   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
659   return 0;
660 }
661 
662 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
663 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
664 
665 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
666 {
667   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
668   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
669   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
670   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
671   int        *cmap, *idx_p,cstart = mat->cstart;
672 
673   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
674   mat->getrowactive = PETSC_TRUE;
675 
676   if (!mat->rowvalues && (idx || v)) {
677     /*
678         allocate enough space to hold information from the longest row.
679     */
680     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
681     int     max = 1,mbs = mat->mbs,tmp;
682     for ( i=0; i<mbs; i++ ) {
683       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
684       if (max < tmp) { max = tmp; }
685     }
686     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
687     CHKPTRQ(mat->rowvalues);
688     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
689   }
690 
691 
692   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
693   lrow = row - brstart;
694 
695   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
696   if (!v)   {pvA = 0; pvB = 0;}
697   if (!idx) {pcA = 0; if (!v) pcB = 0;}
698   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
699   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
700   nztot = nzA + nzB;
701 
702   cmap  = mat->garray;
703   if (v  || idx) {
704     if (nztot) {
705       /* Sort by increasing column numbers, assuming A and B already sorted */
706       int imark = -1;
707       if (v) {
708         *v = v_p = mat->rowvalues;
709         for ( i=0; i<nzB; i++ ) {
710           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
711           else break;
712         }
713         imark = i;
714         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
715         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
716       }
717       if (idx) {
718         *idx = idx_p = mat->rowindices;
719         if (imark > -1) {
720           for ( i=0; i<imark; i++ ) {
721             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
722           }
723         } else {
724           for ( i=0; i<nzB; i++ ) {
725             if (cmap[cworkB[i]/bs] < cstart)
726               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
727             else break;
728           }
729           imark = i;
730         }
731         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
732         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
733       }
734     }
735     else {
736       if (idx) *idx = 0;
737       if (v)   *v   = 0;
738     }
739   }
740   *nz = nztot;
741   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
742   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
743   return 0;
744 }
745 
746 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
747 {
748   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
749   if (baij->getrowactive == PETSC_FALSE) {
750     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
751   }
752   baij->getrowactive = PETSC_FALSE;
753   return 0;
754 }
755 
756 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
757 {
758   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
759   *bs = baij->bs;
760   return 0;
761 }
762 
763 static int MatZeroEntries_MPIBAIJ(Mat A)
764 {
765   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
766   int         ierr;
767   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
768   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
769   return 0;
770 }
771 
772 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
773 {
774   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
775   Mat         A = a->A, B = a->B;
776   int         ierr;
777   double      isend[5], irecv[5];
778 
779   info->rows_global    = (double)a->M;
780   info->columns_global = (double)a->N;
781   info->rows_local     = (double)a->m;
782   info->columns_local  = (double)a->N;
783   info->block_size     = (double)a->bs;
784   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
785   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
786   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
787   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
788   if (flag == MAT_LOCAL) {
789     info->nz_used      = isend[0];
790     info->nz_allocated = isend[1];
791     info->nz_unneeded  = isend[2];
792     info->memory       = isend[3];
793     info->mallocs      = isend[4];
794   } else if (flag == MAT_GLOBAL_MAX) {
795     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
796     info->nz_used      = irecv[0];
797     info->nz_allocated = irecv[1];
798     info->nz_unneeded  = irecv[2];
799     info->memory       = irecv[3];
800     info->mallocs      = irecv[4];
801   } else if (flag == MAT_GLOBAL_SUM) {
802     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
803     info->nz_used      = irecv[0];
804     info->nz_allocated = irecv[1];
805     info->nz_unneeded  = irecv[2];
806     info->memory       = irecv[3];
807     info->mallocs      = irecv[4];
808   }
809   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
810   info->fill_ratio_needed = 0;
811   info->factor_mallocs    = 0;
812   return 0;
813 }
814 
815 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
816 {
817   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
818 
819   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
820       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
821       op == MAT_COLUMNS_SORTED ||
822       op == MAT_ROW_ORIENTED) {
823         MatSetOption(a->A,op);
824         MatSetOption(a->B,op);
825   }
826   else if (op == MAT_ROWS_SORTED ||
827            op == MAT_SYMMETRIC ||
828            op == MAT_STRUCTURALLY_SYMMETRIC ||
829            op == MAT_YES_NEW_DIAGONALS)
830     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
831   else if (op == MAT_COLUMN_ORIENTED) {
832     a->roworiented = 0;
833     MatSetOption(a->A,op);
834     MatSetOption(a->B,op);
835   }
836   else if (op == MAT_NO_NEW_DIAGONALS)
837     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
838   else
839     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
840   return 0;
841 }
842 
843 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
844 {
845   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
846   Mat_SeqBAIJ *Aloc;
847   Mat        B;
848   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
849   int        bs=baij->bs,mbs=baij->mbs;
850   Scalar     *a;
851 
852   if (matout == PETSC_NULL && M != N)
853     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
854   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
855   CHKERRQ(ierr);
856 
857   /* copy over the A part */
858   Aloc = (Mat_SeqBAIJ*) baij->A->data;
859   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
860   row = baij->rstart;
861   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
862 
863   for ( i=0; i<mbs; i++ ) {
864     rvals[0] = bs*(baij->rstart + i);
865     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
866     for ( j=ai[i]; j<ai[i+1]; j++ ) {
867       col = (baij->cstart+aj[j])*bs;
868       for (k=0; k<bs; k++ ) {
869         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
870         col++; a += bs;
871       }
872     }
873   }
874   /* copy over the B part */
875   Aloc = (Mat_SeqBAIJ*) baij->B->data;
876   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
877   row = baij->rstart*bs;
878   for ( i=0; i<mbs; i++ ) {
879     rvals[0] = bs*(baij->rstart + i);
880     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
881     for ( j=ai[i]; j<ai[i+1]; j++ ) {
882       col = baij->garray[aj[j]]*bs;
883       for (k=0; k<bs; k++ ) {
884         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
885         col++; a += bs;
886       }
887     }
888   }
889   PetscFree(rvals);
890   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
891   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
892 
893   if (matout != PETSC_NULL) {
894     *matout = B;
895   } else {
896     /* This isn't really an in-place transpose .... but free data structures from baij */
897     PetscFree(baij->rowners);
898     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
899     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
900     if (baij->colmap) PetscFree(baij->colmap);
901     if (baij->garray) PetscFree(baij->garray);
902     if (baij->lvec) VecDestroy(baij->lvec);
903     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
904     PetscFree(baij);
905     PetscMemcpy(A,B,sizeof(struct _Mat));
906     PetscHeaderDestroy(B);
907   }
908   return 0;
909 }
910 
911 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
912 {
913   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
914   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
915   int ierr,s1,s2,s3;
916 
917   if (ll)  {
918     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
919     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
920     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
921     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
922     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
923   }
924   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
925   return 0;
926 }
927 
928 /* the code does not do the diagonal entries correctly unless the
929    matrix is square and the column and row owerships are identical.
930    This is a BUG. The only way to fix it seems to be to access
931    baij->A and baij->B directly and not through the MatZeroRows()
932    routine.
933 */
934 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
935 {
936   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
937   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
938   int            *procs,*nprocs,j,found,idx,nsends,*work;
939   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
940   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
941   int            *lens,imdex,*lrows,*values,bs=l->bs;
942   MPI_Comm       comm = A->comm;
943   MPI_Request    *send_waits,*recv_waits;
944   MPI_Status     recv_status,*send_status;
945   IS             istmp;
946 
947   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
948   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
949 
950   /*  first count number of contributors to each processor */
951   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
952   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
953   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
954   for ( i=0; i<N; i++ ) {
955     idx = rows[i];
956     found = 0;
957     for ( j=0; j<size; j++ ) {
958       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
959         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
960       }
961     }
962     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
963   }
964   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
965 
966   /* inform other processors of number of messages and max length*/
967   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
968   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
969   nrecvs = work[rank];
970   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
971   nmax = work[rank];
972   PetscFree(work);
973 
974   /* post receives:   */
975   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
976   CHKPTRQ(rvalues);
977   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
978   CHKPTRQ(recv_waits);
979   for ( i=0; i<nrecvs; i++ ) {
980     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
981   }
982 
983   /* do sends:
984       1) starts[i] gives the starting index in svalues for stuff going to
985          the ith processor
986   */
987   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
988   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
989   CHKPTRQ(send_waits);
990   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
991   starts[0] = 0;
992   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
993   for ( i=0; i<N; i++ ) {
994     svalues[starts[owner[i]]++] = rows[i];
995   }
996   ISRestoreIndices(is,&rows);
997 
998   starts[0] = 0;
999   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1000   count = 0;
1001   for ( i=0; i<size; i++ ) {
1002     if (procs[i]) {
1003       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1004     }
1005   }
1006   PetscFree(starts);
1007 
1008   base = owners[rank]*bs;
1009 
1010   /*  wait on receives */
1011   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1012   source = lens + nrecvs;
1013   count  = nrecvs; slen = 0;
1014   while (count) {
1015     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1016     /* unpack receives into our local space */
1017     MPI_Get_count(&recv_status,MPI_INT,&n);
1018     source[imdex]  = recv_status.MPI_SOURCE;
1019     lens[imdex]  = n;
1020     slen += n;
1021     count--;
1022   }
1023   PetscFree(recv_waits);
1024 
1025   /* move the data into the send scatter */
1026   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1027   count = 0;
1028   for ( i=0; i<nrecvs; i++ ) {
1029     values = rvalues + i*nmax;
1030     for ( j=0; j<lens[i]; j++ ) {
1031       lrows[count++] = values[j] - base;
1032     }
1033   }
1034   PetscFree(rvalues); PetscFree(lens);
1035   PetscFree(owner); PetscFree(nprocs);
1036 
1037   /* actually zap the local rows */
1038   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1039   PLogObjectParent(A,istmp);
1040   PetscFree(lrows);
1041   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1042   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1043   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1044 
1045   /* wait on sends */
1046   if (nsends) {
1047     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1048     CHKPTRQ(send_status);
1049     MPI_Waitall(nsends,send_waits,send_status);
1050     PetscFree(send_status);
1051   }
1052   PetscFree(send_waits); PetscFree(svalues);
1053 
1054   return 0;
1055 }
1056 extern int MatPrintHelp_SeqBAIJ(Mat);
1057 static int MatPrintHelp_MPIBAIJ(Mat A)
1058 {
1059   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1060 
1061   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1062   else return 0;
1063 }
1064 
1065 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1066 
1067 /* -------------------------------------------------------------------*/
1068 static struct _MatOps MatOps = {
1069   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1070   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1071   0,0,0,0,
1072   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1073   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1074   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1075   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1076   0,0,0,MatGetSize_MPIBAIJ,
1077   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1078   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1079   0,0,0,MatGetSubMatrices_MPIBAIJ,
1080   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1081   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1082 
1083 
1084 /*@C
1085    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1086    (block compressed row).  For good matrix assembly performance
1087    the user should preallocate the matrix storage by setting the parameters
1088    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1089    performance can be increased by more than a factor of 50.
1090 
1091    Input Parameters:
1092 .  comm - MPI communicator
1093 .  bs   - size of blockk
1094 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1095            This value should be the same as the local size used in creating the
1096            y vector for the matrix-vector product y = Ax.
1097 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1098            This value should be the same as the local size used in creating the
1099            x vector for the matrix-vector product y = Ax.
1100 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1101 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1102 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1103            submatrix  (same for all local rows)
1104 .  d_nzz - array containing the number of block nonzeros in the various block rows
1105            of the in diagonal portion of the local (possibly different for each block
1106            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1107            it is zero.
1108 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1109            submatrix (same for all local rows).
1110 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1111            off-diagonal portion of the local submatrix (possibly different for
1112            each block row) or PETSC_NULL.
1113 
1114    Output Parameter:
1115 .  A - the matrix
1116 
1117    Notes:
1118    The user MUST specify either the local or global matrix dimensions
1119    (possibly both).
1120 
1121    Storage Information:
1122    For a square global matrix we define each processor's diagonal portion
1123    to be its local rows and the corresponding columns (a square submatrix);
1124    each processor's off-diagonal portion encompasses the remainder of the
1125    local matrix (a rectangular submatrix).
1126 
1127    The user can specify preallocated storage for the diagonal part of
1128    the local submatrix with either d_nz or d_nnz (not both).  Set
1129    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1130    memory allocation.  Likewise, specify preallocated storage for the
1131    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1132 
1133    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1134    the figure below we depict these three local rows and all columns (0-11).
1135 
1136 $          0 1 2 3 4 5 6 7 8 9 10 11
1137 $         -------------------
1138 $  row 3  |  o o o d d d o o o o o o
1139 $  row 4  |  o o o d d d o o o o o o
1140 $  row 5  |  o o o d d d o o o o o o
1141 $         -------------------
1142 $
1143 
1144    Thus, any entries in the d locations are stored in the d (diagonal)
1145    submatrix, and any entries in the o locations are stored in the
1146    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1147    stored simply in the MATSEQBAIJ format for compressed row storage.
1148 
1149    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1150    and o_nz should indicate the number of nonzeros per row in the o matrix.
1151    In general, for PDE problems in which most nonzeros are near the diagonal,
1152    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1153    or you will get TERRIBLE performance; see the users' manual chapter on
1154    matrices and the file $(PETSC_DIR)/Performance.
1155 
1156 .keywords: matrix, block, aij, compressed row, sparse, parallel
1157 
1158 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1159 @*/
1160 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1161                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1162 {
1163   Mat          B;
1164   Mat_MPIBAIJ  *b;
1165   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1166 
1167   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1168   *A = 0;
1169   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1170   PLogObjectCreate(B);
1171   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1172   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1173   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1174   MPI_Comm_size(comm,&size);
1175   if (size == 1) {
1176     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1177     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1178     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1179     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1180     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1181     B->ops.solve             = MatSolve_MPIBAIJ;
1182     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1183     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1184     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1185     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1186   }
1187 
1188   B->destroy    = MatDestroy_MPIBAIJ;
1189   B->view       = MatView_MPIBAIJ;
1190 
1191   B->factor     = 0;
1192   B->assembled  = PETSC_FALSE;
1193 
1194   b->insertmode = NOT_SET_VALUES;
1195   MPI_Comm_rank(comm,&b->rank);
1196   MPI_Comm_size(comm,&b->size);
1197 
1198   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1199     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1200   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1201   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1202   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1203   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1204 
1205   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1206     work[0] = m; work[1] = n;
1207     mbs = m/bs; nbs = n/bs;
1208     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1209     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1210     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1211   }
1212   if (m == PETSC_DECIDE) {
1213     Mbs = M/bs;
1214     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1215     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1216     m   = mbs*bs;
1217   }
1218   if (n == PETSC_DECIDE) {
1219     Nbs = N/bs;
1220     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1221     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1222     n   = nbs*bs;
1223   }
1224   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1225 
1226   b->m = m; B->m = m;
1227   b->n = n; B->n = n;
1228   b->N = N; B->N = N;
1229   b->M = M; B->M = M;
1230   b->bs  = bs;
1231   b->bs2 = bs*bs;
1232   b->mbs = mbs;
1233   b->nbs = nbs;
1234   b->Mbs = Mbs;
1235   b->Nbs = Nbs;
1236 
1237   /* build local table of row and column ownerships */
1238   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1239   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1240   b->cowners = b->rowners + b->size + 2;
1241   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1242   b->rowners[0] = 0;
1243   for ( i=2; i<=b->size; i++ ) {
1244     b->rowners[i] += b->rowners[i-1];
1245   }
1246   b->rstart = b->rowners[b->rank];
1247   b->rend   = b->rowners[b->rank+1];
1248   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1249   b->cowners[0] = 0;
1250   for ( i=2; i<=b->size; i++ ) {
1251     b->cowners[i] += b->cowners[i-1];
1252   }
1253   b->cstart = b->cowners[b->rank];
1254   b->cend   = b->cowners[b->rank+1];
1255 
1256   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1257   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1258   PLogObjectParent(B,b->A);
1259   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1260   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1261   PLogObjectParent(B,b->B);
1262 
1263   /* build cache for off array entries formed */
1264   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1265   b->colmap      = 0;
1266   b->garray      = 0;
1267   b->roworiented = 1;
1268 
1269   /* stuff used for matrix vector multiply */
1270   b->lvec      = 0;
1271   b->Mvctx     = 0;
1272 
1273   /* stuff for MatGetRow() */
1274   b->rowindices   = 0;
1275   b->rowvalues    = 0;
1276   b->getrowactive = PETSC_FALSE;
1277 
1278   *A = B;
1279   return 0;
1280 }
1281 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1282 {
1283   Mat        mat;
1284   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1285   int        ierr, len=0, flg;
1286 
1287   *newmat       = 0;
1288   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1289   PLogObjectCreate(mat);
1290   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1291   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1292   mat->destroy    = MatDestroy_MPIBAIJ;
1293   mat->view       = MatView_MPIBAIJ;
1294   mat->factor     = matin->factor;
1295   mat->assembled  = PETSC_TRUE;
1296 
1297   a->m = mat->m   = oldmat->m;
1298   a->n = mat->n   = oldmat->n;
1299   a->M = mat->M   = oldmat->M;
1300   a->N = mat->N   = oldmat->N;
1301 
1302   a->bs  = oldmat->bs;
1303   a->bs2 = oldmat->bs2;
1304   a->mbs = oldmat->mbs;
1305   a->nbs = oldmat->nbs;
1306   a->Mbs = oldmat->Mbs;
1307   a->Nbs = oldmat->Nbs;
1308 
1309   a->rstart       = oldmat->rstart;
1310   a->rend         = oldmat->rend;
1311   a->cstart       = oldmat->cstart;
1312   a->cend         = oldmat->cend;
1313   a->size         = oldmat->size;
1314   a->rank         = oldmat->rank;
1315   a->insertmode   = NOT_SET_VALUES;
1316   a->rowvalues    = 0;
1317   a->getrowactive = PETSC_FALSE;
1318 
1319   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1320   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1321   a->cowners = a->rowners + a->size + 2;
1322   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1323   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1324   if (oldmat->colmap) {
1325     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1326     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1327     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1328   } else a->colmap = 0;
1329   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1330     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1331     PLogObjectMemory(mat,len*sizeof(int));
1332     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1333   } else a->garray = 0;
1334 
1335   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1336   PLogObjectParent(mat,a->lvec);
1337   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1338   PLogObjectParent(mat,a->Mvctx);
1339   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1340   PLogObjectParent(mat,a->A);
1341   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1342   PLogObjectParent(mat,a->B);
1343   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1344   if (flg) {
1345     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1346   }
1347   *newmat = mat;
1348   return 0;
1349 }
1350 
1351 #include "sys.h"
1352 
1353 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1354 {
1355   Mat          A;
1356   int          i, nz, ierr, j,rstart, rend, fd;
1357   Scalar       *vals,*buf;
1358   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1359   MPI_Status   status;
1360   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1361   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1362   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1363   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1364   int          dcount,kmax,k,nzcount,tmp;
1365 
1366 
1367   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1368   bs2  = bs*bs;
1369 
1370   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1371   if (!rank) {
1372     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1373     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1374     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1375   }
1376 
1377   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1378   M = header[1]; N = header[2];
1379 
1380   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1381 
1382   /*
1383      This code adds extra rows to make sure the number of rows is
1384      divisible by the blocksize
1385   */
1386   Mbs        = M/bs;
1387   extra_rows = bs - M + bs*(Mbs);
1388   if (extra_rows == bs) extra_rows = 0;
1389   else                  Mbs++;
1390   if (extra_rows &&!rank) {
1391     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1392   }
1393 
1394   /* determine ownership of all rows */
1395   mbs = Mbs/size + ((Mbs % size) > rank);
1396   m   = mbs * bs;
1397   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1398   browners = rowners + size + 1;
1399   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1400   rowners[0] = 0;
1401   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1402   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1403   rstart = rowners[rank];
1404   rend   = rowners[rank+1];
1405 
1406   /* distribute row lengths to all processors */
1407   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1408   if (!rank) {
1409     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1410     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1411     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1412     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1413     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1414     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1415     PetscFree(sndcounts);
1416   }
1417   else {
1418     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1419   }
1420 
1421   if (!rank) {
1422     /* calculate the number of nonzeros on each processor */
1423     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1424     PetscMemzero(procsnz,size*sizeof(int));
1425     for ( i=0; i<size; i++ ) {
1426       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1427         procsnz[i] += rowlengths[j];
1428       }
1429     }
1430     PetscFree(rowlengths);
1431 
1432     /* determine max buffer needed and allocate it */
1433     maxnz = 0;
1434     for ( i=0; i<size; i++ ) {
1435       maxnz = PetscMax(maxnz,procsnz[i]);
1436     }
1437     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1438 
1439     /* read in my part of the matrix column indices  */
1440     nz = procsnz[0];
1441     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1442     mycols = ibuf;
1443     if (size == 1)  nz -= extra_rows;
1444     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1445     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1446 
1447     /* read in every ones (except the last) and ship off */
1448     for ( i=1; i<size-1; i++ ) {
1449       nz = procsnz[i];
1450       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1451       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1452     }
1453     /* read in the stuff for the last proc */
1454     if ( size != 1 ) {
1455       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1456       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1457       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1458       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1459     }
1460     PetscFree(cols);
1461   }
1462   else {
1463     /* determine buffer space needed for message */
1464     nz = 0;
1465     for ( i=0; i<m; i++ ) {
1466       nz += locrowlens[i];
1467     }
1468     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1469     mycols = ibuf;
1470     /* receive message of column indices*/
1471     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1472     MPI_Get_count(&status,MPI_INT,&maxnz);
1473     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1474   }
1475 
1476   /* loop over local rows, determining number of off diagonal entries */
1477   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1478   odlens = dlens + (rend-rstart);
1479   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1480   PetscMemzero(mask,3*Mbs*sizeof(int));
1481   masked1 = mask    + Mbs;
1482   masked2 = masked1 + Mbs;
1483   rowcount = 0; nzcount = 0;
1484   for ( i=0; i<mbs; i++ ) {
1485     dcount  = 0;
1486     odcount = 0;
1487     for ( j=0; j<bs; j++ ) {
1488       kmax = locrowlens[rowcount];
1489       for ( k=0; k<kmax; k++ ) {
1490         tmp = mycols[nzcount++]/bs;
1491         if (!mask[tmp]) {
1492           mask[tmp] = 1;
1493           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1494           else masked1[dcount++] = tmp;
1495         }
1496       }
1497       rowcount++;
1498     }
1499 
1500     dlens[i]  = dcount;
1501     odlens[i] = odcount;
1502 
1503     /* zero out the mask elements we set */
1504     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1505     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1506   }
1507 
1508   /* create our matrix */
1509   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1510          CHKERRQ(ierr);
1511   A = *newmat;
1512   MatSetOption(A,MAT_COLUMNS_SORTED);
1513 
1514   if (!rank) {
1515     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1516     /* read in my part of the matrix numerical values  */
1517     nz = procsnz[0];
1518     vals = buf;
1519     mycols = ibuf;
1520     if (size == 1)  nz -= extra_rows;
1521     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1522     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1523 
1524     /* insert into matrix */
1525     jj      = rstart*bs;
1526     for ( i=0; i<m; i++ ) {
1527       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1528       mycols += locrowlens[i];
1529       vals   += locrowlens[i];
1530       jj++;
1531     }
1532     /* read in other processors (except the last one) and ship out */
1533     for ( i=1; i<size-1; i++ ) {
1534       nz = procsnz[i];
1535       vals = buf;
1536       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1537       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1538     }
1539     /* the last proc */
1540     if ( size != 1 ){
1541       nz = procsnz[i] - extra_rows;
1542       vals = buf;
1543       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1544       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1545       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1546     }
1547     PetscFree(procsnz);
1548   }
1549   else {
1550     /* receive numeric values */
1551     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1552 
1553     /* receive message of values*/
1554     vals = buf;
1555     mycols = ibuf;
1556     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1557     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1558     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1559 
1560     /* insert into matrix */
1561     jj      = rstart*bs;
1562     for ( i=0; i<m; i++ ) {
1563       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1564       mycols += locrowlens[i];
1565       vals   += locrowlens[i];
1566       jj++;
1567     }
1568   }
1569   PetscFree(locrowlens);
1570   PetscFree(buf);
1571   PetscFree(ibuf);
1572   PetscFree(rowners);
1573   PetscFree(dlens);
1574   PetscFree(mask);
1575   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1576   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1577   return 0;
1578 }
1579 
1580 
1581