xref: /petsc/src/mat/impls/baij/mpi/mpibaij.c (revision aeafbbfc33d028db7008a5487c5a5ac8aec64109)
1 #ifndef lint
2 static char vcid[] = "$Id: mpibaij.c,v 1.30 1996/11/19 16:31:35 bsmith Exp curfman $";
3 #endif
4 
5 #include "src/mat/impls/baij/mpi/mpibaij.h"
6 #include "src/vec/vecimpl.h"
7 
8 #include "draw.h"
9 #include "pinclude/pviewer.h"
10 
11 extern int MatSetUpMultiply_MPIBAIJ(Mat);
12 extern int DisAssemble_MPIBAIJ(Mat);
13 extern int MatIncreaseOverlap_MPIBAIJ(Mat,int,IS *,int);
14 extern int MatGetSubMatrices_MPIBAIJ(Mat,int,IS *,IS *,MatGetSubMatrixCall,Mat **);
15 extern int MatLUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,Mat *);
16 extern int MatLUFactorNumeric_MPIBAIJ(Mat,Mat *);
17 extern int MatLUFactor_MPIBAIJ(Mat,IS,IS,double);
18 extern int MatSolve_MPIBAIJ(Mat,Vec,Vec);
19 extern int MatSolveAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
20 extern int MatSolveTrans_MPIBAIJ(Mat,Vec,Vec);
21 extern int MatSolveTransAdd_MPIBAIJ(Mat,Vec,Vec,Vec);
22 extern int MatILUFactorSymbolic_MPIBAIJ(Mat,IS,IS,double,int,Mat *);
23 
24 
25 /*
26      Local utility routine that creates a mapping from the global column
27    number to the local number in the off-diagonal part of the local
28    storage of the matrix.  This is done in a non scable way since the
29    length of colmap equals the global matrix length.
30 */
31 static int CreateColmap_MPIBAIJ_Private(Mat mat)
32 {
33   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
34   Mat_SeqBAIJ *B = (Mat_SeqBAIJ*) baij->B->data;
35   int         nbs = B->nbs,i;
36 
37   baij->colmap = (int *) PetscMalloc(baij->Nbs*sizeof(int));CHKPTRQ(baij->colmap);
38   PLogObjectMemory(mat,baij->Nbs*sizeof(int));
39   PetscMemzero(baij->colmap,baij->Nbs*sizeof(int));
40   for ( i=0; i<nbs; i++ ) baij->colmap[baij->garray[i]] = i + 1;
41   return 0;
42 }
43 
44 static int MatGetRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
45                             PetscTruth *done)
46 {
47   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
48   int         ierr;
49   if (aij->size == 1) {
50     ierr = MatGetRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
51   } else SETERRQ(1,"MatGetRowIJ_MPIBAIJ:not supported in parallel");
52   return 0;
53 }
54 
55 static int MatRestoreRowIJ_MPIBAIJ(Mat mat,int shift,PetscTruth symmetric,int *n,int **ia,int **ja,
56                                 PetscTruth *done)
57 {
58   Mat_MPIBAIJ *aij = (Mat_MPIBAIJ *) mat->data;
59   int        ierr;
60   if (aij->size == 1) {
61     ierr = MatRestoreRowIJ(aij->A,shift,symmetric,n,ia,ja,done); CHKERRQ(ierr);
62   } else SETERRQ(1,"MatRestoreRowIJ_MPIBAIJ:not supported in parallel");
63   return 0;
64 }
65 
66 extern int MatSetValues_SeqBAIJ(Mat,int,int*,int,int*,Scalar*,InsertMode);
67 static int MatSetValues_MPIBAIJ(Mat mat,int m,int *im,int n,int *in,Scalar *v,InsertMode addv)
68 {
69   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
70   Scalar      value;
71   int         ierr,i,j, rstart = baij->rstart, rend = baij->rend;
72   int         cstart = baij->cstart, cend = baij->cend,row,col;
73   int         roworiented = baij->roworiented,rstart_orig,rend_orig;
74   int         cstart_orig,cend_orig,bs=baij->bs;
75 
76   if (baij->insertmode != NOT_SET_VALUES && baij->insertmode != addv) {
77     SETERRQ(1,"MatSetValues_MPIBAIJ:Cannot mix inserts and adds");
78   }
79   baij->insertmode = addv;
80   rstart_orig = rstart*bs;
81   rend_orig   = rend*bs;
82   cstart_orig = cstart*bs;
83   cend_orig   = cend*bs;
84   for ( i=0; i<m; i++ ) {
85 #if defined(PETSC_BOPT_g)
86     if (im[i] < 0) SETERRQ(1,"MatSetValues_MPIBAIJ:Negative row");
87     if (im[i] >= baij->M) SETERRQ(1,"MatSetValues_MPIBAIJ:Row too large");
88 #endif
89     if (im[i] >= rstart_orig && im[i] < rend_orig) {
90       row = im[i] - rstart_orig;
91       for ( j=0; j<n; j++ ) {
92         if (in[j] >= cstart_orig && in[j] < cend_orig){
93           col = in[j] - cstart_orig;
94           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
95           ierr = MatSetValues_SeqBAIJ(baij->A,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
96         }
97 #if defined(PETSC_BOPT_g)
98         else if (in[j] < 0) {SETERRQ(1,"MatSetValues_MPIBAIJ:Negative column");}
99         else if (in[j] >= baij->N) {SETERRQ(1,"MatSetValues_MPIBAIJ:Col too large");}
100 #endif
101         else {
102           if (mat->was_assembled) {
103             if (!baij->colmap) {
104               ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
105             }
106             col = baij->colmap[in[j]/bs] - 1 + in[j]%bs;
107             if (col < 0 && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
108               ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
109               col =  in[j];
110             }
111           }
112           else col = in[j];
113           if (roworiented) value = v[i*n+j]; else value = v[i+j*m];
114           ierr = MatSetValues_SeqBAIJ(baij->B,1,&row,1,&col,&value,addv);CHKERRQ(ierr);
115         }
116       }
117     }
118     else {
119       if (roworiented && !baij->donotstash) {
120         ierr = StashValues_Private(&baij->stash,im[i],n,in,v+i*n,addv);CHKERRQ(ierr);
121       }
122       else {
123         if (!baij->donotstash) {
124           row = im[i];
125 	  for ( j=0; j<n; j++ ) {
126 	    ierr = StashValues_Private(&baij->stash,row,1,in+j,v+i+j*m,addv);CHKERRQ(ierr);
127           }
128         }
129       }
130     }
131   }
132   return 0;
133 }
134 
135 static int MatGetValues_MPIBAIJ(Mat mat,int m,int *idxm,int n,int *idxn,Scalar *v)
136 {
137   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
138   int        bs=baij->bs,ierr,i,j, bsrstart = baij->rstart*bs, bsrend = baij->rend*bs;
139   int        bscstart = baij->cstart*bs, bscend = baij->cend*bs,row,col;
140 
141   for ( i=0; i<m; i++ ) {
142     if (idxm[i] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative row");
143     if (idxm[i] >= baij->M) SETERRQ(1,"MatGetValues_MPIBAIJ:Row too large");
144     if (idxm[i] >= bsrstart && idxm[i] < bsrend) {
145       row = idxm[i] - bsrstart;
146       for ( j=0; j<n; j++ ) {
147         if (idxn[j] < 0) SETERRQ(1,"MatGetValues_MPIBAIJ:Negative column");
148         if (idxn[j] >= baij->N) SETERRQ(1,"MatGetValues_MPIBAIJ:Col too large");
149         if (idxn[j] >= bscstart && idxn[j] < bscend){
150           col = idxn[j] - bscstart;
151           ierr = MatGetValues(baij->A,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
152         }
153         else {
154           if (!baij->colmap) {
155             ierr = CreateColmap_MPIBAIJ_Private(mat);CHKERRQ(ierr);
156           }
157           if((baij->colmap[idxn[j]/bs]-1 < 0) ||
158              (baij->garray[baij->colmap[idxn[j]/bs]-1] != idxn[j]/bs)) *(v+i*n+j) = 0.0;
159           else {
160             col  = (baij->colmap[idxn[j]/bs]-1)*bs + idxn[j]%bs;
161             ierr = MatGetValues(baij->B,1,&row,1,&col,v+i*n+j); CHKERRQ(ierr);
162           }
163         }
164       }
165     }
166     else {
167       SETERRQ(1,"MatGetValues_MPIBAIJ:Only local values currently supported");
168     }
169   }
170   return 0;
171 }
172 
173 static int MatNorm_MPIBAIJ(Mat mat,NormType type,double *norm)
174 {
175   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
176   Mat_SeqBAIJ *amat = (Mat_SeqBAIJ*) baij->A->data, *bmat = (Mat_SeqBAIJ*) baij->B->data;
177   int        ierr, i,bs2=baij->bs2;
178   double     sum = 0.0;
179   Scalar     *v;
180 
181   if (baij->size == 1) {
182     ierr =  MatNorm(baij->A,type,norm); CHKERRQ(ierr);
183   } else {
184     if (type == NORM_FROBENIUS) {
185       v = amat->a;
186       for (i=0; i<amat->nz*bs2; i++ ) {
187 #if defined(PETSC_COMPLEX)
188         sum += real(conj(*v)*(*v)); v++;
189 #else
190         sum += (*v)*(*v); v++;
191 #endif
192       }
193       v = bmat->a;
194       for (i=0; i<bmat->nz*bs2; i++ ) {
195 #if defined(PETSC_COMPLEX)
196         sum += real(conj(*v)*(*v)); v++;
197 #else
198         sum += (*v)*(*v); v++;
199 #endif
200       }
201       MPI_Allreduce(&sum,norm,1,MPI_DOUBLE,MPI_SUM,mat->comm);
202       *norm = sqrt(*norm);
203     }
204     else
205       SETERRQ(1,"MatNorm_SeqBAIJ:No support for this norm yet");
206   }
207   return 0;
208 }
209 
210 static int MatAssemblyBegin_MPIBAIJ(Mat mat,MatAssemblyType mode)
211 {
212   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
213   MPI_Comm    comm = mat->comm;
214   int         size = baij->size, *owners = baij->rowners,bs=baij->bs;
215   int         rank = baij->rank,tag = mat->tag, *owner,*starts,count,ierr;
216   MPI_Request *send_waits,*recv_waits;
217   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
218   InsertMode  addv;
219   Scalar      *rvalues,*svalues;
220 
221   /* make sure all processors are either in INSERTMODE or ADDMODE */
222   MPI_Allreduce(&baij->insertmode,&addv,1,MPI_INT,MPI_BOR,comm);
223   if (addv == (ADD_VALUES|INSERT_VALUES)) {
224     SETERRQ(1,"MatAssemblyBegin_MPIBAIJ:Some processors inserted others added");
225   }
226   baij->insertmode = addv; /* in case this processor had no cache */
227 
228   /*  first count number of contributors to each processor */
229   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
230   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
231   owner = (int *) PetscMalloc( (baij->stash.n+1)*sizeof(int) ); CHKPTRQ(owner);
232   for ( i=0; i<baij->stash.n; i++ ) {
233     idx = baij->stash.idx[i];
234     for ( j=0; j<size; j++ ) {
235       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
236         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
237       }
238     }
239   }
240   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
241 
242   /* inform other processors of number of messages and max length*/
243   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
244   MPI_Allreduce(procs, work,size,MPI_INT,MPI_SUM,comm);
245   nreceives = work[rank];
246   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
247   nmax = work[rank];
248   PetscFree(work);
249 
250   /* post receives:
251        1) each message will consist of ordered pairs
252      (global index,value) we store the global index as a double
253      to simplify the message passing.
254        2) since we don't know how long each individual message is we
255      allocate the largest needed buffer for each receive. Potentially
256      this is a lot of wasted space.
257 
258 
259        This could be done better.
260   */
261   rvalues = (Scalar *) PetscMalloc(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
262   CHKPTRQ(rvalues);
263   recv_waits = (MPI_Request *) PetscMalloc((nreceives+1)*sizeof(MPI_Request));
264   CHKPTRQ(recv_waits);
265   for ( i=0; i<nreceives; i++ ) {
266     MPI_Irecv(rvalues+3*nmax*i,3*nmax,MPIU_SCALAR,MPI_ANY_SOURCE,tag,
267               comm,recv_waits+i);
268   }
269 
270   /* do sends:
271       1) starts[i] gives the starting index in svalues for stuff going to
272          the ith processor
273   */
274   svalues = (Scalar *) PetscMalloc(3*(baij->stash.n+1)*sizeof(Scalar));CHKPTRQ(svalues);
275   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
276   CHKPTRQ(send_waits);
277   starts = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(starts);
278   starts[0] = 0;
279   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
280   for ( i=0; i<baij->stash.n; i++ ) {
281     svalues[3*starts[owner[i]]]       = (Scalar)  baij->stash.idx[i];
282     svalues[3*starts[owner[i]]+1]     = (Scalar)  baij->stash.idy[i];
283     svalues[3*(starts[owner[i]]++)+2] =  baij->stash.array[i];
284   }
285   PetscFree(owner);
286   starts[0] = 0;
287   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
288   count = 0;
289   for ( i=0; i<size; i++ ) {
290     if (procs[i]) {
291       MPI_Isend(svalues+3*starts[i],3*nprocs[i],MPIU_SCALAR,i,tag,
292                 comm,send_waits+count++);
293     }
294   }
295   PetscFree(starts); PetscFree(nprocs);
296 
297   /* Free cache space */
298   PLogInfo(0,"[%d]MatAssemblyBegin_MPIBAIJ:Number of off processor values %d\n",rank,baij->stash.n);
299   ierr = StashDestroy_Private(&baij->stash); CHKERRQ(ierr);
300 
301   baij->svalues    = svalues;    baij->rvalues    = rvalues;
302   baij->nsends     = nsends;     baij->nrecvs     = nreceives;
303   baij->send_waits = send_waits; baij->recv_waits = recv_waits;
304   baij->rmax       = nmax;
305 
306   return 0;
307 }
308 
309 
310 static int MatAssemblyEnd_MPIBAIJ(Mat mat,MatAssemblyType mode)
311 {
312   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
313   MPI_Status  *send_status,recv_status;
314   int         imdex,nrecvs = baij->nrecvs, count = nrecvs, i, n, ierr;
315   int         bs=baij->bs,row,col,other_disassembled;
316   Scalar      *values,val;
317   InsertMode  addv = baij->insertmode;
318 
319   /*  wait on receives */
320   while (count) {
321     MPI_Waitany(nrecvs,baij->recv_waits,&imdex,&recv_status);
322     /* unpack receives into our local space */
323     values = baij->rvalues + 3*imdex*baij->rmax;
324     MPI_Get_count(&recv_status,MPIU_SCALAR,&n);
325     n = n/3;
326     for ( i=0; i<n; i++ ) {
327       row = (int) PetscReal(values[3*i]) - baij->rstart*bs;
328       col = (int) PetscReal(values[3*i+1]);
329       val = values[3*i+2];
330       if (col >= baij->cstart*bs && col < baij->cend*bs) {
331         col -= baij->cstart*bs;
332         MatSetValues(baij->A,1,&row,1,&col,&val,addv);
333       }
334       else {
335         if (mat->was_assembled) {
336           if (!baij->colmap) {
337             ierr = CreateColmap_MPIBAIJ_Private(mat); CHKERRQ(ierr);
338           }
339           col = (baij->colmap[col/bs]-1)*bs + col%bs;
340           if (col < 0  && !((Mat_SeqBAIJ*)(baij->A->data))->nonew) {
341             ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
342             col = (int) PetscReal(values[3*i+1]);
343           }
344         }
345         MatSetValues(baij->B,1,&row,1,&col,&val,addv);
346       }
347     }
348     count--;
349   }
350   PetscFree(baij->recv_waits); PetscFree(baij->rvalues);
351 
352   /* wait on sends */
353   if (baij->nsends) {
354     send_status = (MPI_Status *) PetscMalloc(baij->nsends*sizeof(MPI_Status));
355     CHKPTRQ(send_status);
356     MPI_Waitall(baij->nsends,baij->send_waits,send_status);
357     PetscFree(send_status);
358   }
359   PetscFree(baij->send_waits); PetscFree(baij->svalues);
360 
361   baij->insertmode = NOT_SET_VALUES;
362   ierr = MatAssemblyBegin(baij->A,mode); CHKERRQ(ierr);
363   ierr = MatAssemblyEnd(baij->A,mode); CHKERRQ(ierr);
364 
365   /* determine if any processor has disassembled, if so we must
366      also disassemble ourselfs, in order that we may reassemble. */
367   MPI_Allreduce(&mat->was_assembled,&other_disassembled,1,MPI_INT,MPI_PROD,mat->comm);
368   if (mat->was_assembled && !other_disassembled) {
369     ierr = DisAssemble_MPIBAIJ(mat); CHKERRQ(ierr);
370   }
371 
372   if (!mat->was_assembled && mode == MAT_FINAL_ASSEMBLY) {
373     ierr = MatSetUpMultiply_MPIBAIJ(mat); CHKERRQ(ierr);
374   }
375   ierr = MatAssemblyBegin(baij->B,mode); CHKERRQ(ierr);
376   ierr = MatAssemblyEnd(baij->B,mode); CHKERRQ(ierr);
377 
378   if (baij->rowvalues) {PetscFree(baij->rowvalues); baij->rowvalues = 0;}
379   return 0;
380 }
381 
382 static int MatView_MPIBAIJ_Binary(Mat mat,Viewer viewer)
383 {
384   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
385   int          ierr;
386 
387   if (baij->size == 1) {
388     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
389   }
390   else SETERRQ(1,"MatView_MPIBAIJ_Binary:Only uniprocessor output supported");
391   return 0;
392 }
393 
394 static int MatView_MPIBAIJ_ASCIIorDraworMatlab(Mat mat,Viewer viewer)
395 {
396   Mat_MPIBAIJ  *baij = (Mat_MPIBAIJ *) mat->data;
397   int          ierr, format,rank,bs = baij->bs;
398   FILE         *fd;
399   ViewerType   vtype;
400 
401   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
402   if (vtype  == ASCII_FILES_VIEWER || vtype == ASCII_FILE_VIEWER) {
403     ierr = ViewerGetFormat(viewer,&format);
404     if (format == VIEWER_FORMAT_ASCII_INFO_LONG) {
405       MatInfo info;
406       MPI_Comm_rank(mat->comm,&rank);
407       ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
408       ierr = MatGetInfo(mat,MAT_LOCAL,&info);
409       PetscSequentialPhaseBegin(mat->comm,1);
410       fprintf(fd,"[%d] Local rows %d nz %d nz alloced %d bs %d mem %d\n",
411               rank,baij->m,(int)info.nz_used*bs,(int)info.nz_allocated*bs,
412               baij->bs,(int)info.memory);
413       ierr = MatGetInfo(baij->A,MAT_LOCAL,&info);
414       fprintf(fd,"[%d] on-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
415       ierr = MatGetInfo(baij->B,MAT_LOCAL,&info);
416       fprintf(fd,"[%d] off-diagonal part: nz %d \n",rank,(int)info.nz_used*bs);
417       fflush(fd);
418       PetscSequentialPhaseEnd(mat->comm,1);
419       ierr = VecScatterView(baij->Mvctx,viewer); CHKERRQ(ierr);
420       return 0;
421     }
422     else if (format == VIEWER_FORMAT_ASCII_INFO) {
423       PetscPrintf(mat->comm,"  block size is %d\n",bs);
424       return 0;
425     }
426   }
427 
428   if (vtype == DRAW_VIEWER) {
429     Draw       draw;
430     PetscTruth isnull;
431     ierr = ViewerDrawGetDraw(viewer,&draw); CHKERRQ(ierr);
432     ierr = DrawIsNull(draw,&isnull); CHKERRQ(ierr); if (isnull) return 0;
433   }
434 
435   if (vtype == ASCII_FILE_VIEWER) {
436     ierr = ViewerASCIIGetPointer(viewer,&fd); CHKERRQ(ierr);
437     PetscSequentialPhaseBegin(mat->comm,1);
438     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
439            baij->rank,baij->m,baij->rstart*bs,baij->rend*bs,baij->n,
440             baij->cstart*bs,baij->cend*bs);
441     ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
442     ierr = MatView(baij->B,viewer); CHKERRQ(ierr);
443     fflush(fd);
444     PetscSequentialPhaseEnd(mat->comm,1);
445   }
446   else {
447     int size = baij->size;
448     rank = baij->rank;
449     if (size == 1) {
450       ierr = MatView(baij->A,viewer); CHKERRQ(ierr);
451     }
452     else {
453       /* assemble the entire matrix onto first processor. */
454       Mat         A;
455       Mat_SeqBAIJ *Aloc;
456       int         M = baij->M, N = baij->N,*ai,*aj,row,col,i,j,k,*rvals;
457       int         mbs=baij->mbs;
458       Scalar      *a;
459 
460       if (!rank) {
461         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,M,N,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
462         CHKERRQ(ierr);
463       }
464       else {
465         ierr = MatCreateMPIBAIJ(mat->comm,baij->bs,0,0,M,N,0,PETSC_NULL,0,PETSC_NULL,&A);
466         CHKERRQ(ierr);
467       }
468       PLogObjectParent(mat,A);
469 
470       /* copy over the A part */
471       Aloc = (Mat_SeqBAIJ*) baij->A->data;
472       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
473       row = baij->rstart;
474       rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
475 
476       for ( i=0; i<mbs; i++ ) {
477         rvals[0] = bs*(baij->rstart + i);
478         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
479         for ( j=ai[i]; j<ai[i+1]; j++ ) {
480           col = (baij->cstart+aj[j])*bs;
481           for (k=0; k<bs; k++ ) {
482             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
483             col++; a += bs;
484           }
485         }
486       }
487       /* copy over the B part */
488       Aloc = (Mat_SeqBAIJ*) baij->B->data;
489       ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
490       row = baij->rstart*bs;
491       for ( i=0; i<mbs; i++ ) {
492         rvals[0] = bs*(baij->rstart + i);
493         for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
494         for ( j=ai[i]; j<ai[i+1]; j++ ) {
495           col = baij->garray[aj[j]]*bs;
496           for (k=0; k<bs; k++ ) {
497             ierr = MatSetValues(A,bs,rvals,1,&col,a,INSERT_VALUES);CHKERRQ(ierr);
498             col++; a += bs;
499           }
500         }
501       }
502       PetscFree(rvals);
503       ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
504       ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
505       if (!rank) {
506         ierr = MatView(((Mat_MPIBAIJ*)(A->data))->A,viewer); CHKERRQ(ierr);
507       }
508       ierr = MatDestroy(A); CHKERRQ(ierr);
509     }
510   }
511   return 0;
512 }
513 
514 
515 
516 static int MatView_MPIBAIJ(PetscObject obj,Viewer viewer)
517 {
518   Mat         mat = (Mat) obj;
519   int         ierr;
520   ViewerType  vtype;
521 
522   ierr = ViewerGetType(viewer,&vtype); CHKERRQ(ierr);
523   if (vtype == ASCII_FILE_VIEWER || vtype == ASCII_FILES_VIEWER ||
524       vtype == DRAW_VIEWER       || vtype == MATLAB_VIEWER) {
525     ierr = MatView_MPIBAIJ_ASCIIorDraworMatlab(mat,viewer); CHKERRQ(ierr);
526   }
527   else if (vtype == BINARY_FILE_VIEWER) {
528     return MatView_MPIBAIJ_Binary(mat,viewer);
529   }
530   return 0;
531 }
532 
533 static int MatDestroy_MPIBAIJ(PetscObject obj)
534 {
535   Mat         mat = (Mat) obj;
536   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
537   int         ierr;
538 
539 #if defined(PETSC_LOG)
540   PLogObjectState(obj,"Rows=%d, Cols=%d",baij->M,baij->N);
541 #endif
542 
543   PetscFree(baij->rowners);
544   ierr = MatDestroy(baij->A); CHKERRQ(ierr);
545   ierr = MatDestroy(baij->B); CHKERRQ(ierr);
546   if (baij->colmap) PetscFree(baij->colmap);
547   if (baij->garray) PetscFree(baij->garray);
548   if (baij->lvec)   VecDestroy(baij->lvec);
549   if (baij->Mvctx)  VecScatterDestroy(baij->Mvctx);
550   if (baij->rowvalues) PetscFree(baij->rowvalues);
551   PetscFree(baij);
552   if (mat->mapping) {
553     ierr = ISLocalToGlobalMappingDestroy(mat->mapping); CHKERRQ(ierr);
554   }
555   PLogObjectDestroy(mat);
556   PetscHeaderDestroy(mat);
557   return 0;
558 }
559 
560 static int MatMult_MPIBAIJ(Mat A,Vec xx,Vec yy)
561 {
562   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
563   int         ierr, nt;
564 
565   VecGetLocalSize_Fast(xx,nt);
566   if (nt != a->n) {
567     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and xx");
568   }
569   VecGetLocalSize_Fast(yy,nt);
570   if (nt != a->m) {
571     SETERRQ(1,"MatMult_MPIBAIJ:Incompatible parition of A and yy");
572   }
573   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
574   ierr = (*a->A->ops.mult)(a->A,xx,yy); CHKERRQ(ierr);
575   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
576   ierr = (*a->B->ops.multadd)(a->B,a->lvec,yy,yy); CHKERRQ(ierr);
577   ierr = VecScatterPostRecvs(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
578   return 0;
579 }
580 
581 static int MatMultAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
582 {
583   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
584   int        ierr;
585   ierr = VecScatterBegin(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
586   ierr = (*a->A->ops.multadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
587   ierr = VecScatterEnd(xx,a->lvec,INSERT_VALUES,SCATTER_ALL,a->Mvctx);CHKERRQ(ierr);
588   ierr = (*a->B->ops.multadd)(a->B,a->lvec,zz,zz); CHKERRQ(ierr);
589   return 0;
590 }
591 
592 static int MatMultTrans_MPIBAIJ(Mat A,Vec xx,Vec yy)
593 {
594   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
595   int        ierr;
596 
597   /* do nondiagonal part */
598   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
599   /* send it on its way */
600   ierr = VecScatterBegin(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
601   /* do local part */
602   ierr = (*a->A->ops.multtrans)(a->A,xx,yy); CHKERRQ(ierr);
603   /* receive remote parts: note this assumes the values are not actually */
604   /* inserted in yy until the next line, which is true for my implementation*/
605   /* but is not perhaps always true. */
606   ierr = VecScatterEnd(a->lvec,yy,ADD_VALUES,SCATTER_REVERSE,a->Mvctx);CHKERRQ(ierr);
607   return 0;
608 }
609 
610 static int MatMultTransAdd_MPIBAIJ(Mat A,Vec xx,Vec yy,Vec zz)
611 {
612   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
613   int        ierr;
614 
615   /* do nondiagonal part */
616   ierr = (*a->B->ops.multtrans)(a->B,xx,a->lvec); CHKERRQ(ierr);
617   /* send it on its way */
618   ierr = VecScatterBegin(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
619   /* do local part */
620   ierr = (*a->A->ops.multtransadd)(a->A,xx,yy,zz); CHKERRQ(ierr);
621   /* receive remote parts: note this assumes the values are not actually */
622   /* inserted in yy until the next line, which is true for my implementation*/
623   /* but is not perhaps always true. */
624   ierr = VecScatterEnd(a->lvec,zz,ADD_VALUES,SCATTER_REVERSE,a->Mvctx); CHKERRQ(ierr);
625   return 0;
626 }
627 
628 /*
629   This only works correctly for square matrices where the subblock A->A is the
630    diagonal block
631 */
632 static int MatGetDiagonal_MPIBAIJ(Mat A,Vec v)
633 {
634   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
635   if (a->M != a->N)
636     SETERRQ(1,"MatGetDiagonal_MPIBAIJ:Supports only square matrix where A->A is diag block");
637   return MatGetDiagonal(a->A,v);
638 }
639 
640 static int MatScale_MPIBAIJ(Scalar *aa,Mat A)
641 {
642   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
643   int        ierr;
644   ierr = MatScale(aa,a->A); CHKERRQ(ierr);
645   ierr = MatScale(aa,a->B); CHKERRQ(ierr);
646   return 0;
647 }
648 static int MatGetSize_MPIBAIJ(Mat matin,int *m,int *n)
649 {
650   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
651   *m = mat->M; *n = mat->N;
652   return 0;
653 }
654 
655 static int MatGetLocalSize_MPIBAIJ(Mat matin,int *m,int *n)
656 {
657   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
658   *m = mat->m; *n = mat->N;
659   return 0;
660 }
661 
662 static int MatGetOwnershipRange_MPIBAIJ(Mat matin,int *m,int *n)
663 {
664   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
665   *m = mat->rstart*mat->bs; *n = mat->rend*mat->bs;
666   return 0;
667 }
668 
669 extern int MatGetRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
670 extern int MatRestoreRow_SeqBAIJ(Mat,int,int*,int**,Scalar**);
671 
672 int MatGetRow_MPIBAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
673 {
674   Mat_MPIBAIJ *mat = (Mat_MPIBAIJ *) matin->data;
675   Scalar     *vworkA, *vworkB, **pvA, **pvB,*v_p;
676   int        bs = mat->bs, bs2 = mat->bs2, i, ierr, *cworkA, *cworkB, **pcA, **pcB;
677   int        nztot, nzA, nzB, lrow, brstart = mat->rstart*bs, brend = mat->rend*bs;
678   int        *cmap, *idx_p,cstart = mat->cstart;
679 
680   if (mat->getrowactive == PETSC_TRUE) SETERRQ(1,"MatGetRow_MPIBAIJ:Already active");
681   mat->getrowactive = PETSC_TRUE;
682 
683   if (!mat->rowvalues && (idx || v)) {
684     /*
685         allocate enough space to hold information from the longest row.
686     */
687     Mat_SeqBAIJ *Aa = (Mat_SeqBAIJ *) mat->A->data,*Ba = (Mat_SeqBAIJ *) mat->B->data;
688     int     max = 1,mbs = mat->mbs,tmp;
689     for ( i=0; i<mbs; i++ ) {
690       tmp = Aa->i[i+1] - Aa->i[i] + Ba->i[i+1] - Ba->i[i];
691       if (max < tmp) { max = tmp; }
692     }
693     mat->rowvalues = (Scalar *) PetscMalloc( max*bs2*(sizeof(int)+sizeof(Scalar)));
694     CHKPTRQ(mat->rowvalues);
695     mat->rowindices = (int *) (mat->rowvalues + max*bs2);
696   }
697 
698 
699   if (row < brstart || row >= brend) SETERRQ(1,"MatGetRow_MPIBAIJ:Only local rows")
700   lrow = row - brstart;
701 
702   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
703   if (!v)   {pvA = 0; pvB = 0;}
704   if (!idx) {pcA = 0; if (!v) pcB = 0;}
705   ierr = (*mat->A->ops.getrow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
706   ierr = (*mat->B->ops.getrow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
707   nztot = nzA + nzB;
708 
709   cmap  = mat->garray;
710   if (v  || idx) {
711     if (nztot) {
712       /* Sort by increasing column numbers, assuming A and B already sorted */
713       int imark = -1;
714       if (v) {
715         *v = v_p = mat->rowvalues;
716         for ( i=0; i<nzB; i++ ) {
717           if (cmap[cworkB[i]/bs] < cstart)   v_p[i] = vworkB[i];
718           else break;
719         }
720         imark = i;
721         for ( i=0; i<nzA; i++ )     v_p[imark+i] = vworkA[i];
722         for ( i=imark; i<nzB; i++ ) v_p[nzA+i]   = vworkB[i];
723       }
724       if (idx) {
725         *idx = idx_p = mat->rowindices;
726         if (imark > -1) {
727           for ( i=0; i<imark; i++ ) {
728             idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs;
729           }
730         } else {
731           for ( i=0; i<nzB; i++ ) {
732             if (cmap[cworkB[i]/bs] < cstart)
733               idx_p[i] = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
734             else break;
735           }
736           imark = i;
737         }
738         for ( i=0; i<nzA; i++ )     idx_p[imark+i] = cstart*bs + cworkA[i];
739         for ( i=imark; i<nzB; i++ ) idx_p[nzA+i]   = cmap[cworkB[i]/bs]*bs + cworkB[i]%bs ;
740       }
741     }
742     else {
743       if (idx) *idx = 0;
744       if (v)   *v   = 0;
745     }
746   }
747   *nz = nztot;
748   ierr = (*mat->A->ops.restorerow)(mat->A,lrow,&nzA,pcA,pvA); CHKERRQ(ierr);
749   ierr = (*mat->B->ops.restorerow)(mat->B,lrow,&nzB,pcB,pvB); CHKERRQ(ierr);
750   return 0;
751 }
752 
753 int MatRestoreRow_MPIBAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
754 {
755   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
756   if (baij->getrowactive == PETSC_FALSE) {
757     SETERRQ(1,"MatRestoreRow_MPIBAIJ:MatGetRow not called");
758   }
759   baij->getrowactive = PETSC_FALSE;
760   return 0;
761 }
762 
763 static int MatGetBlockSize_MPIBAIJ(Mat mat,int *bs)
764 {
765   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) mat->data;
766   *bs = baij->bs;
767   return 0;
768 }
769 
770 static int MatZeroEntries_MPIBAIJ(Mat A)
771 {
772   Mat_MPIBAIJ *l = (Mat_MPIBAIJ *) A->data;
773   int         ierr;
774   ierr = MatZeroEntries(l->A); CHKERRQ(ierr);
775   ierr = MatZeroEntries(l->B); CHKERRQ(ierr);
776   return 0;
777 }
778 
779 static int MatGetInfo_MPIBAIJ(Mat matin,MatInfoType flag,MatInfo *info)
780 {
781   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) matin->data;
782   Mat         A = a->A, B = a->B;
783   int         ierr;
784   double      isend[5], irecv[5];
785 
786   info->rows_global    = (double)a->M;
787   info->columns_global = (double)a->N;
788   info->rows_local     = (double)a->m;
789   info->columns_local  = (double)a->N;
790   info->block_size     = (double)a->bs;
791   ierr = MatGetInfo(A,MAT_LOCAL,info); CHKERRQ(ierr);
792   isend[0] = info->nz_used; isend[1] = info->nz_allocated; isend[2] = info->memory;
793   ierr = MatGetInfo(B,MAT_LOCAL,info); CHKERRQ(ierr);
794   isend[0] += info->nz_used; isend[1] += info->nz_allocated; isend[2] += info->memory;
795   if (flag == MAT_LOCAL) {
796     info->nz_used      = isend[0];
797     info->nz_allocated = isend[1];
798     info->nz_unneeded  = isend[2];
799     info->memory       = isend[3];
800     info->mallocs      = isend[4];
801   } else if (flag == MAT_GLOBAL_MAX) {
802     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_MAX,matin->comm);
803     info->nz_used      = irecv[0];
804     info->nz_allocated = irecv[1];
805     info->nz_unneeded  = irecv[2];
806     info->memory       = irecv[3];
807     info->mallocs      = irecv[4];
808   } else if (flag == MAT_GLOBAL_SUM) {
809     MPI_Allreduce(isend,irecv,3,MPI_INT,MPI_SUM,matin->comm);
810     info->nz_used      = irecv[0];
811     info->nz_allocated = irecv[1];
812     info->nz_unneeded  = irecv[2];
813     info->memory       = irecv[3];
814     info->mallocs      = irecv[4];
815   }
816   info->fill_ratio_given  = 0; /* no parallel LU/ILU/Cholesky */
817   info->fill_ratio_needed = 0;
818   info->factor_mallocs    = 0;
819   return 0;
820 }
821 
822 static int MatSetOption_MPIBAIJ(Mat A,MatOption op)
823 {
824   Mat_MPIBAIJ *a = (Mat_MPIBAIJ *) A->data;
825 
826   if (op == MAT_NO_NEW_NONZERO_LOCATIONS ||
827       op == MAT_YES_NEW_NONZERO_LOCATIONS ||
828       op == MAT_COLUMNS_SORTED ||
829       op == MAT_ROW_ORIENTED) {
830         a->roworiented = 1;
831         MatSetOption(a->A,op);
832         MatSetOption(a->B,op);
833   }
834   else if (op == MAT_ROWS_SORTED ||
835            op == MAT_SYMMETRIC ||
836            op == MAT_STRUCTURALLY_SYMMETRIC ||
837            op == MAT_YES_NEW_DIAGONALS)
838     PLogInfo(A,"Info:MatSetOption_MPIBAIJ:Option ignored\n");
839   else if (op == MAT_COLUMN_ORIENTED) {
840     a->roworiented = 0;
841     MatSetOption(a->A,op);
842     MatSetOption(a->B,op);
843   } else if (op == MAT_IGNORE_OFF_PROCESSOR_ENTRIES) {
844     a->donotstash = 1;
845   } else if (op == MAT_NO_NEW_DIAGONALS)
846     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:MAT_NO_NEW_DIAGONALS");}
847   else
848     {SETERRQ(PETSC_ERR_SUP,"MatSetOption_MPIBAIJ:unknown option");}
849   return 0;
850 }
851 
852 static int MatTranspose_MPIBAIJ(Mat A,Mat *matout)
853 {
854   Mat_MPIBAIJ *baij = (Mat_MPIBAIJ *) A->data;
855   Mat_SeqBAIJ *Aloc;
856   Mat        B;
857   int        ierr,M=baij->M,N=baij->N,*ai,*aj,row,i,*rvals,j,k,col;
858   int        bs=baij->bs,mbs=baij->mbs;
859   Scalar     *a;
860 
861   if (matout == PETSC_NULL && M != N)
862     SETERRQ(1,"MatTranspose_MPIBAIJ:Square matrix only for in-place");
863   ierr = MatCreateMPIBAIJ(A->comm,baij->bs,PETSC_DECIDE,PETSC_DECIDE,N,M,0,PETSC_NULL,0,PETSC_NULL,&B);
864   CHKERRQ(ierr);
865 
866   /* copy over the A part */
867   Aloc = (Mat_SeqBAIJ*) baij->A->data;
868   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
869   row = baij->rstart;
870   rvals = (int *) PetscMalloc(bs*sizeof(int)); CHKPTRQ(rvals);
871 
872   for ( i=0; i<mbs; i++ ) {
873     rvals[0] = bs*(baij->rstart + i);
874     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
875     for ( j=ai[i]; j<ai[i+1]; j++ ) {
876       col = (baij->cstart+aj[j])*bs;
877       for (k=0; k<bs; k++ ) {
878         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
879         col++; a += bs;
880       }
881     }
882   }
883   /* copy over the B part */
884   Aloc = (Mat_SeqBAIJ*) baij->B->data;
885   ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
886   row = baij->rstart*bs;
887   for ( i=0; i<mbs; i++ ) {
888     rvals[0] = bs*(baij->rstart + i);
889     for ( j=1; j<bs; j++ ) { rvals[j] = rvals[j-1] + 1; }
890     for ( j=ai[i]; j<ai[i+1]; j++ ) {
891       col = baij->garray[aj[j]]*bs;
892       for (k=0; k<bs; k++ ) {
893         ierr = MatSetValues(B,1,&col,bs,rvals,a,INSERT_VALUES);CHKERRQ(ierr);
894         col++; a += bs;
895       }
896     }
897   }
898   PetscFree(rvals);
899   ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
900   ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
901 
902   if (matout != PETSC_NULL) {
903     *matout = B;
904   } else {
905     /* This isn't really an in-place transpose .... but free data structures from baij */
906     PetscFree(baij->rowners);
907     ierr = MatDestroy(baij->A); CHKERRQ(ierr);
908     ierr = MatDestroy(baij->B); CHKERRQ(ierr);
909     if (baij->colmap) PetscFree(baij->colmap);
910     if (baij->garray) PetscFree(baij->garray);
911     if (baij->lvec) VecDestroy(baij->lvec);
912     if (baij->Mvctx) VecScatterDestroy(baij->Mvctx);
913     PetscFree(baij);
914     PetscMemcpy(A,B,sizeof(struct _Mat));
915     PetscHeaderDestroy(B);
916   }
917   return 0;
918 }
919 
920 int MatDiagonalScale_MPIBAIJ(Mat A,Vec ll,Vec rr)
921 {
922   Mat a = ((Mat_MPIBAIJ *) A->data)->A;
923   Mat b = ((Mat_MPIBAIJ *) A->data)->B;
924   int ierr,s1,s2,s3;
925 
926   if (ll)  {
927     ierr = VecGetLocalSize(ll,&s1); CHKERRQ(ierr);
928     ierr = MatGetLocalSize(A,&s2,&s3); CHKERRQ(ierr);
929     if (s1!=s2) SETERRQ(1,"MatDiagonalScale_MPIBAIJ: non-conforming local sizes");
930     ierr = MatDiagonalScale(a,ll,0); CHKERRQ(ierr);
931     ierr = MatDiagonalScale(b,ll,0); CHKERRQ(ierr);
932   }
933   if (rr) SETERRQ(1,"MatDiagonalScale_MPIBAIJ:not supported for right vector");
934   return 0;
935 }
936 
937 /* the code does not do the diagonal entries correctly unless the
938    matrix is square and the column and row owerships are identical.
939    This is a BUG. The only way to fix it seems to be to access
940    baij->A and baij->B directly and not through the MatZeroRows()
941    routine.
942 */
943 static int MatZeroRows_MPIBAIJ(Mat A,IS is,Scalar *diag)
944 {
945   Mat_MPIBAIJ    *l = (Mat_MPIBAIJ *) A->data;
946   int            i,ierr,N, *rows,*owners = l->rowners,size = l->size;
947   int            *procs,*nprocs,j,found,idx,nsends,*work;
948   int            nmax,*svalues,*starts,*owner,nrecvs,rank = l->rank;
949   int            *rvalues,tag = A->tag,count,base,slen,n,*source;
950   int            *lens,imdex,*lrows,*values,bs=l->bs;
951   MPI_Comm       comm = A->comm;
952   MPI_Request    *send_waits,*recv_waits;
953   MPI_Status     recv_status,*send_status;
954   IS             istmp;
955 
956   ierr = ISGetSize(is,&N); CHKERRQ(ierr);
957   ierr = ISGetIndices(is,&rows); CHKERRQ(ierr);
958 
959   /*  first count number of contributors to each processor */
960   nprocs = (int *) PetscMalloc( 2*size*sizeof(int) ); CHKPTRQ(nprocs);
961   PetscMemzero(nprocs,2*size*sizeof(int)); procs = nprocs + size;
962   owner = (int *) PetscMalloc((N+1)*sizeof(int)); CHKPTRQ(owner); /* see note*/
963   for ( i=0; i<N; i++ ) {
964     idx = rows[i];
965     found = 0;
966     for ( j=0; j<size; j++ ) {
967       if (idx >= owners[j]*bs && idx < owners[j+1]*bs) {
968         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
969       }
970     }
971     if (!found) SETERRQ(1,"MatZeroRows_MPIBAIJ:Index out of range");
972   }
973   nsends = 0;  for ( i=0; i<size; i++ ) { nsends += procs[i];}
974 
975   /* inform other processors of number of messages and max length*/
976   work = (int *) PetscMalloc( size*sizeof(int) ); CHKPTRQ(work);
977   MPI_Allreduce( procs, work,size,MPI_INT,MPI_SUM,comm);
978   nrecvs = work[rank];
979   MPI_Allreduce( nprocs, work,size,MPI_INT,MPI_MAX,comm);
980   nmax = work[rank];
981   PetscFree(work);
982 
983   /* post receives:   */
984   rvalues = (int *) PetscMalloc((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
985   CHKPTRQ(rvalues);
986   recv_waits = (MPI_Request *) PetscMalloc((nrecvs+1)*sizeof(MPI_Request));
987   CHKPTRQ(recv_waits);
988   for ( i=0; i<nrecvs; i++ ) {
989     MPI_Irecv(rvalues+nmax*i,nmax,MPI_INT,MPI_ANY_SOURCE,tag,comm,recv_waits+i);
990   }
991 
992   /* do sends:
993       1) starts[i] gives the starting index in svalues for stuff going to
994          the ith processor
995   */
996   svalues = (int *) PetscMalloc( (N+1)*sizeof(int) ); CHKPTRQ(svalues);
997   send_waits = (MPI_Request *) PetscMalloc( (nsends+1)*sizeof(MPI_Request));
998   CHKPTRQ(send_waits);
999   starts = (int *) PetscMalloc( (size+1)*sizeof(int) ); CHKPTRQ(starts);
1000   starts[0] = 0;
1001   for ( i=1; i<size; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1002   for ( i=0; i<N; i++ ) {
1003     svalues[starts[owner[i]]++] = rows[i];
1004   }
1005   ISRestoreIndices(is,&rows);
1006 
1007   starts[0] = 0;
1008   for ( i=1; i<size+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
1009   count = 0;
1010   for ( i=0; i<size; i++ ) {
1011     if (procs[i]) {
1012       MPI_Isend(svalues+starts[i],nprocs[i],MPI_INT,i,tag,comm,send_waits+count++);
1013     }
1014   }
1015   PetscFree(starts);
1016 
1017   base = owners[rank]*bs;
1018 
1019   /*  wait on receives */
1020   lens   = (int *) PetscMalloc( 2*(nrecvs+1)*sizeof(int) ); CHKPTRQ(lens);
1021   source = lens + nrecvs;
1022   count  = nrecvs; slen = 0;
1023   while (count) {
1024     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
1025     /* unpack receives into our local space */
1026     MPI_Get_count(&recv_status,MPI_INT,&n);
1027     source[imdex]  = recv_status.MPI_SOURCE;
1028     lens[imdex]  = n;
1029     slen += n;
1030     count--;
1031   }
1032   PetscFree(recv_waits);
1033 
1034   /* move the data into the send scatter */
1035   lrows = (int *) PetscMalloc( (slen+1)*sizeof(int) ); CHKPTRQ(lrows);
1036   count = 0;
1037   for ( i=0; i<nrecvs; i++ ) {
1038     values = rvalues + i*nmax;
1039     for ( j=0; j<lens[i]; j++ ) {
1040       lrows[count++] = values[j] - base;
1041     }
1042   }
1043   PetscFree(rvalues); PetscFree(lens);
1044   PetscFree(owner); PetscFree(nprocs);
1045 
1046   /* actually zap the local rows */
1047   ierr = ISCreateGeneral(MPI_COMM_SELF,slen,lrows,&istmp);CHKERRQ(ierr);
1048   PLogObjectParent(A,istmp);
1049   PetscFree(lrows);
1050   ierr = MatZeroRows(l->A,istmp,diag); CHKERRQ(ierr);
1051   ierr = MatZeroRows(l->B,istmp,0); CHKERRQ(ierr);
1052   ierr = ISDestroy(istmp); CHKERRQ(ierr);
1053 
1054   /* wait on sends */
1055   if (nsends) {
1056     send_status = (MPI_Status *) PetscMalloc(nsends*sizeof(MPI_Status));
1057     CHKPTRQ(send_status);
1058     MPI_Waitall(nsends,send_waits,send_status);
1059     PetscFree(send_status);
1060   }
1061   PetscFree(send_waits); PetscFree(svalues);
1062 
1063   return 0;
1064 }
1065 extern int MatPrintHelp_SeqBAIJ(Mat);
1066 static int MatPrintHelp_MPIBAIJ(Mat A)
1067 {
1068   Mat_MPIBAIJ *a   = (Mat_MPIBAIJ*) A->data;
1069 
1070   if (!a->rank) return MatPrintHelp_SeqBAIJ(a->A);
1071   else return 0;
1072 }
1073 
1074 static int MatConvertSameType_MPIBAIJ(Mat,Mat *,int);
1075 
1076 /* -------------------------------------------------------------------*/
1077 static struct _MatOps MatOps = {
1078   MatSetValues_MPIBAIJ,MatGetRow_MPIBAIJ,MatRestoreRow_MPIBAIJ,MatMult_MPIBAIJ,
1079   MatMultAdd_MPIBAIJ,MatMultTrans_MPIBAIJ,MatMultTransAdd_MPIBAIJ,0,
1080   0,0,0,0,
1081   0,0,MatTranspose_MPIBAIJ,MatGetInfo_MPIBAIJ,
1082   0,MatGetDiagonal_MPIBAIJ,MatDiagonalScale_MPIBAIJ,MatNorm_MPIBAIJ,
1083   MatAssemblyBegin_MPIBAIJ,MatAssemblyEnd_MPIBAIJ,0,MatSetOption_MPIBAIJ,
1084   MatZeroEntries_MPIBAIJ,MatZeroRows_MPIBAIJ,0,
1085   0,0,0,MatGetSize_MPIBAIJ,
1086   MatGetLocalSize_MPIBAIJ,MatGetOwnershipRange_MPIBAIJ,0,0,
1087   0,0,0,MatConvertSameType_MPIBAIJ,0,0,
1088   0,0,0,MatGetSubMatrices_MPIBAIJ,
1089   MatIncreaseOverlap_MPIBAIJ,MatGetValues_MPIBAIJ,0,MatPrintHelp_MPIBAIJ,
1090   MatScale_MPIBAIJ,0,0,0,MatGetBlockSize_MPIBAIJ};
1091 
1092 
1093 /*@C
1094    MatCreateMPIBAIJ - Creates a sparse parallel matrix in block AIJ format
1095    (block compressed row).  For good matrix assembly performance
1096    the user should preallocate the matrix storage by setting the parameters
1097    d_nz (or d_nnz) and o_nz (or o_nnz).  By setting these parameters accurately,
1098    performance can be increased by more than a factor of 50.
1099 
1100    Input Parameters:
1101 .  comm - MPI communicator
1102 .  bs   - size of blockk
1103 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1104            This value should be the same as the local size used in creating the
1105            y vector for the matrix-vector product y = Ax.
1106 .  n - number of local columns (or PETSC_DECIDE to have calculated if N is given)
1107            This value should be the same as the local size used in creating the
1108            x vector for the matrix-vector product y = Ax.
1109 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1110 .  N - number of global columns (or PETSC_DECIDE to have calculated if n is given)
1111 .  d_nz  - number of block nonzeros per block row in diagonal portion of local
1112            submatrix  (same for all local rows)
1113 .  d_nzz - array containing the number of block nonzeros in the various block rows
1114            of the in diagonal portion of the local (possibly different for each block
1115            row) or PETSC_NULL.  You must leave room for the diagonal entry even if
1116            it is zero.
1117 .  o_nz  - number of block nonzeros per block row in the off-diagonal portion of local
1118            submatrix (same for all local rows).
1119 .  o_nzz - array containing the number of nonzeros in the various block rows of the
1120            off-diagonal portion of the local submatrix (possibly different for
1121            each block row) or PETSC_NULL.
1122 
1123    Output Parameter:
1124 .  A - the matrix
1125 
1126    Notes:
1127    The user MUST specify either the local or global matrix dimensions
1128    (possibly both).
1129 
1130    Storage Information:
1131    For a square global matrix we define each processor's diagonal portion
1132    to be its local rows and the corresponding columns (a square submatrix);
1133    each processor's off-diagonal portion encompasses the remainder of the
1134    local matrix (a rectangular submatrix).
1135 
1136    The user can specify preallocated storage for the diagonal part of
1137    the local submatrix with either d_nz or d_nnz (not both).  Set
1138    d_nz=PETSC_DEFAULT and d_nnz=PETSC_NULL for PETSc to control dynamic
1139    memory allocation.  Likewise, specify preallocated storage for the
1140    off-diagonal part of the local submatrix with o_nz or o_nnz (not both).
1141 
1142    Consider a processor that owns rows 3, 4 and 5 of a parallel matrix. In
1143    the figure below we depict these three local rows and all columns (0-11).
1144 
1145 $          0 1 2 3 4 5 6 7 8 9 10 11
1146 $         -------------------
1147 $  row 3  |  o o o d d d o o o o o o
1148 $  row 4  |  o o o d d d o o o o o o
1149 $  row 5  |  o o o d d d o o o o o o
1150 $         -------------------
1151 $
1152 
1153    Thus, any entries in the d locations are stored in the d (diagonal)
1154    submatrix, and any entries in the o locations are stored in the
1155    o (off-diagonal) submatrix.  Note that the d and the o submatrices are
1156    stored simply in the MATSEQBAIJ format for compressed row storage.
1157 
1158    Now d_nz should indicate the number of nonzeros per row in the d matrix,
1159    and o_nz should indicate the number of nonzeros per row in the o matrix.
1160    In general, for PDE problems in which most nonzeros are near the diagonal,
1161    one expects d_nz >> o_nz.   For large problems you MUST preallocate memory
1162    or you will get TERRIBLE performance; see the users' manual chapter on
1163    matrices and the file $(PETSC_DIR)/Performance.
1164 
1165 .keywords: matrix, block, aij, compressed row, sparse, parallel
1166 
1167 .seealso: MatCreate(), MatCreateSeqBAIJ(), MatSetValues()
1168 @*/
1169 int MatCreateMPIBAIJ(MPI_Comm comm,int bs,int m,int n,int M,int N,
1170                     int d_nz,int *d_nnz,int o_nz,int *o_nnz,Mat *A)
1171 {
1172   Mat          B;
1173   Mat_MPIBAIJ  *b;
1174   int          ierr, i,sum[2],work[2],mbs,nbs,Mbs=PETSC_DECIDE,Nbs=PETSC_DECIDE,size;
1175 
1176   if (bs < 1) SETERRQ(1,"MatCreateMPIBAIJ: invalid block size specified");
1177   *A = 0;
1178   PetscHeaderCreate(B,_Mat,MAT_COOKIE,MATMPIBAIJ,comm);
1179   PLogObjectCreate(B);
1180   B->data       = (void *) (b = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(b);
1181   PetscMemzero(b,sizeof(Mat_MPIBAIJ));
1182   PetscMemcpy(&B->ops,&MatOps,sizeof(struct _MatOps));
1183   MPI_Comm_size(comm,&size);
1184   if (size == 1) {
1185     B->ops.getrowij          = MatGetRowIJ_MPIBAIJ;
1186     B->ops.restorerowij      = MatRestoreRowIJ_MPIBAIJ;
1187     B->ops.lufactorsymbolic  = MatLUFactorSymbolic_MPIBAIJ;
1188     B->ops.lufactornumeric   = MatLUFactorNumeric_MPIBAIJ;
1189     B->ops.lufactor          = MatLUFactor_MPIBAIJ;
1190     B->ops.solve             = MatSolve_MPIBAIJ;
1191     B->ops.solveadd          = MatSolveAdd_MPIBAIJ;
1192     B->ops.solvetrans        = MatSolveTrans_MPIBAIJ;
1193     B->ops.solvetransadd     = MatSolveTransAdd_MPIBAIJ;
1194     B->ops.ilufactorsymbolic = MatILUFactorSymbolic_MPIBAIJ;
1195   }
1196 
1197   B->destroy    = MatDestroy_MPIBAIJ;
1198   B->view       = MatView_MPIBAIJ;
1199   B->mapping    = 0;
1200   B->factor     = 0;
1201   B->assembled  = PETSC_FALSE;
1202 
1203   b->insertmode = NOT_SET_VALUES;
1204   MPI_Comm_rank(comm,&b->rank);
1205   MPI_Comm_size(comm,&b->size);
1206 
1207   if ( m == PETSC_DECIDE && (d_nnz != PETSC_NULL || o_nnz != PETSC_NULL))
1208     SETERRQ(1,"MatCreateMPIBAIJ:Cannot have PETSC_DECIDE rows but set d_nnz or o_nnz");
1209   if ( M == PETSC_DECIDE && m == PETSC_DECIDE) SETERRQ(1,"MatCreateMPIBAIJ: either M or m should be specified");
1210   if ( M == PETSC_DECIDE && n == PETSC_DECIDE)SETERRQ(1,"MatCreateMPIBAIJ: either N or n should be specified");
1211   if ( M != PETSC_DECIDE && m != PETSC_DECIDE) M = PETSC_DECIDE;
1212   if ( N != PETSC_DECIDE && n != PETSC_DECIDE) N = PETSC_DECIDE;
1213 
1214   if (M == PETSC_DECIDE || N == PETSC_DECIDE) {
1215     work[0] = m; work[1] = n;
1216     mbs = m/bs; nbs = n/bs;
1217     MPI_Allreduce( work, sum,2,MPI_INT,MPI_SUM,comm );
1218     if (M == PETSC_DECIDE) {M = sum[0]; Mbs = M/bs;}
1219     if (N == PETSC_DECIDE) {N = sum[1]; Nbs = N/bs;}
1220   }
1221   if (m == PETSC_DECIDE) {
1222     Mbs = M/bs;
1223     if (Mbs*bs != M) SETERRQ(1,"MatCreateMPIBAIJ: No of global rows must be divisible by blocksize");
1224     mbs = Mbs/b->size + ((Mbs % b->size) > b->rank);
1225     m   = mbs*bs;
1226   }
1227   if (n == PETSC_DECIDE) {
1228     Nbs = N/bs;
1229     if (Nbs*bs != N) SETERRQ(1,"MatCreateMPIBAIJ: No of global cols must be divisible by blocksize");
1230     nbs = Nbs/b->size + ((Nbs % b->size) > b->rank);
1231     n   = nbs*bs;
1232   }
1233   if (mbs*bs != m || nbs*bs != n) SETERRQ(1,"MatCreateMPIBAIJ: No of local rows, cols must be divisible by blocksize");
1234 
1235   b->m = m; B->m = m;
1236   b->n = n; B->n = n;
1237   b->N = N; B->N = N;
1238   b->M = M; B->M = M;
1239   b->bs  = bs;
1240   b->bs2 = bs*bs;
1241   b->mbs = mbs;
1242   b->nbs = nbs;
1243   b->Mbs = Mbs;
1244   b->Nbs = Nbs;
1245 
1246   /* build local table of row and column ownerships */
1247   b->rowners = (int *) PetscMalloc(2*(b->size+2)*sizeof(int)); CHKPTRQ(b->rowners);
1248   PLogObjectMemory(B,2*(b->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1249   b->cowners = b->rowners + b->size + 2;
1250   MPI_Allgather(&mbs,1,MPI_INT,b->rowners+1,1,MPI_INT,comm);
1251   b->rowners[0] = 0;
1252   for ( i=2; i<=b->size; i++ ) {
1253     b->rowners[i] += b->rowners[i-1];
1254   }
1255   b->rstart = b->rowners[b->rank];
1256   b->rend   = b->rowners[b->rank+1];
1257   MPI_Allgather(&nbs,1,MPI_INT,b->cowners+1,1,MPI_INT,comm);
1258   b->cowners[0] = 0;
1259   for ( i=2; i<=b->size; i++ ) {
1260     b->cowners[i] += b->cowners[i-1];
1261   }
1262   b->cstart = b->cowners[b->rank];
1263   b->cend   = b->cowners[b->rank+1];
1264 
1265   if (d_nz == PETSC_DEFAULT) d_nz = 5;
1266   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,n,d_nz,d_nnz,&b->A); CHKERRQ(ierr);
1267   PLogObjectParent(B,b->A);
1268   if (o_nz == PETSC_DEFAULT) o_nz = 0;
1269   ierr = MatCreateSeqBAIJ(MPI_COMM_SELF,bs,m,N,o_nz,o_nnz,&b->B); CHKERRQ(ierr);
1270   PLogObjectParent(B,b->B);
1271 
1272   /* build cache for off array entries formed */
1273   ierr = StashBuild_Private(&b->stash); CHKERRQ(ierr);
1274   b->donotstash  = 0;
1275   b->colmap      = 0;
1276   b->garray      = 0;
1277   b->roworiented = 1;
1278 
1279   /* stuff used for matrix vector multiply */
1280   b->lvec      = 0;
1281   b->Mvctx     = 0;
1282 
1283   /* stuff for MatGetRow() */
1284   b->rowindices   = 0;
1285   b->rowvalues    = 0;
1286   b->getrowactive = PETSC_FALSE;
1287 
1288   *A = B;
1289   return 0;
1290 }
1291 static int MatConvertSameType_MPIBAIJ(Mat matin,Mat *newmat,int cpvalues)
1292 {
1293   Mat        mat;
1294   Mat_MPIBAIJ *a,*oldmat = (Mat_MPIBAIJ *) matin->data;
1295   int        ierr, len=0, flg;
1296 
1297   *newmat       = 0;
1298   PetscHeaderCreate(mat,_Mat,MAT_COOKIE,MATMPIBAIJ,matin->comm);
1299   PLogObjectCreate(mat);
1300   mat->data       = (void *) (a = PetscNew(Mat_MPIBAIJ)); CHKPTRQ(a);
1301   PetscMemcpy(&mat->ops,&MatOps,sizeof(struct _MatOps));
1302   mat->destroy    = MatDestroy_MPIBAIJ;
1303   mat->view       = MatView_MPIBAIJ;
1304   mat->factor     = matin->factor;
1305   mat->assembled  = PETSC_TRUE;
1306 
1307   a->m = mat->m   = oldmat->m;
1308   a->n = mat->n   = oldmat->n;
1309   a->M = mat->M   = oldmat->M;
1310   a->N = mat->N   = oldmat->N;
1311 
1312   a->bs  = oldmat->bs;
1313   a->bs2 = oldmat->bs2;
1314   a->mbs = oldmat->mbs;
1315   a->nbs = oldmat->nbs;
1316   a->Mbs = oldmat->Mbs;
1317   a->Nbs = oldmat->Nbs;
1318 
1319   a->rstart       = oldmat->rstart;
1320   a->rend         = oldmat->rend;
1321   a->cstart       = oldmat->cstart;
1322   a->cend         = oldmat->cend;
1323   a->size         = oldmat->size;
1324   a->rank         = oldmat->rank;
1325   a->insertmode   = NOT_SET_VALUES;
1326   a->rowvalues    = 0;
1327   a->getrowactive = PETSC_FALSE;
1328 
1329   a->rowners = (int *) PetscMalloc(2*(a->size+2)*sizeof(int)); CHKPTRQ(a->rowners);
1330   PLogObjectMemory(mat,2*(a->size+2)*sizeof(int)+sizeof(struct _Mat)+sizeof(Mat_MPIBAIJ));
1331   a->cowners = a->rowners + a->size + 2;
1332   PetscMemcpy(a->rowners,oldmat->rowners,2*(a->size+2)*sizeof(int));
1333   ierr = StashInitialize_Private(&a->stash); CHKERRQ(ierr);
1334   if (oldmat->colmap) {
1335     a->colmap = (int *) PetscMalloc((a->Nbs)*sizeof(int));CHKPTRQ(a->colmap);
1336     PLogObjectMemory(mat,(a->Nbs)*sizeof(int));
1337     PetscMemcpy(a->colmap,oldmat->colmap,(a->Nbs)*sizeof(int));
1338   } else a->colmap = 0;
1339   if (oldmat->garray && (len = ((Mat_SeqBAIJ *) (oldmat->B->data))->nbs)) {
1340     a->garray = (int *) PetscMalloc(len*sizeof(int)); CHKPTRQ(a->garray);
1341     PLogObjectMemory(mat,len*sizeof(int));
1342     PetscMemcpy(a->garray,oldmat->garray,len*sizeof(int));
1343   } else a->garray = 0;
1344 
1345   ierr =  VecDuplicate(oldmat->lvec,&a->lvec); CHKERRQ(ierr);
1346   PLogObjectParent(mat,a->lvec);
1347   ierr =  VecScatterCopy(oldmat->Mvctx,&a->Mvctx); CHKERRQ(ierr);
1348   PLogObjectParent(mat,a->Mvctx);
1349   ierr =  MatConvert(oldmat->A,MATSAME,&a->A); CHKERRQ(ierr);
1350   PLogObjectParent(mat,a->A);
1351   ierr =  MatConvert(oldmat->B,MATSAME,&a->B); CHKERRQ(ierr);
1352   PLogObjectParent(mat,a->B);
1353   ierr = OptionsHasName(PETSC_NULL,"-help",&flg); CHKERRQ(ierr);
1354   if (flg) {
1355     ierr = MatPrintHelp(mat); CHKERRQ(ierr);
1356   }
1357   *newmat = mat;
1358   return 0;
1359 }
1360 
1361 #include "sys.h"
1362 
1363 int MatLoad_MPIBAIJ(Viewer viewer,MatType type,Mat *newmat)
1364 {
1365   Mat          A;
1366   int          i, nz, ierr, j,rstart, rend, fd;
1367   Scalar       *vals,*buf;
1368   MPI_Comm     comm = ((PetscObject)viewer)->comm;
1369   MPI_Status   status;
1370   int          header[4],rank,size,*rowlengths = 0,M,N,m,*rowners,*browners,maxnz,*cols;
1371   int          *locrowlens,*sndcounts = 0,*procsnz = 0, jj,*mycols,*ibuf;
1372   int          flg,tag = ((PetscObject)viewer)->tag,bs=1,bs2,Mbs,mbs,extra_rows;
1373   int          *dlens,*odlens,*mask,*masked1,*masked2,rowcount,odcount;
1374   int          dcount,kmax,k,nzcount,tmp;
1375 
1376 
1377   ierr = OptionsGetInt(PETSC_NULL,"-matload_block_size",&bs,&flg);CHKERRQ(ierr);
1378   bs2  = bs*bs;
1379 
1380   MPI_Comm_size(comm,&size); MPI_Comm_rank(comm,&rank);
1381   if (!rank) {
1382     ierr = ViewerBinaryGetDescriptor(viewer,&fd); CHKERRQ(ierr);
1383     ierr = PetscBinaryRead(fd,(char *)header,4,BINARY_INT); CHKERRQ(ierr);
1384     if (header[0] != MAT_COOKIE) SETERRQ(1,"MatLoad_MPIBAIJ:not matrix object");
1385   }
1386 
1387   MPI_Bcast(header+1,3,MPI_INT,0,comm);
1388   M = header[1]; N = header[2];
1389 
1390   if (M != N) SETERRQ(1,"MatLoad_SeqBAIJ:Can only do square matrices");
1391 
1392   /*
1393      This code adds extra rows to make sure the number of rows is
1394      divisible by the blocksize
1395   */
1396   Mbs        = M/bs;
1397   extra_rows = bs - M + bs*(Mbs);
1398   if (extra_rows == bs) extra_rows = 0;
1399   else                  Mbs++;
1400   if (extra_rows &&!rank) {
1401     PLogInfo(0,"MatLoad_SeqBAIJ:Padding loaded matrix to match blocksize\n");
1402   }
1403 
1404   /* determine ownership of all rows */
1405   mbs = Mbs/size + ((Mbs % size) > rank);
1406   m   = mbs * bs;
1407   rowners = (int *) PetscMalloc(2*(size+2)*sizeof(int)); CHKPTRQ(rowners);
1408   browners = rowners + size + 1;
1409   MPI_Allgather(&mbs,1,MPI_INT,rowners+1,1,MPI_INT,comm);
1410   rowners[0] = 0;
1411   for ( i=2; i<=size; i++ ) rowners[i] += rowners[i-1];
1412   for ( i=0; i<=size;  i++ ) browners[i] = rowners[i]*bs;
1413   rstart = rowners[rank];
1414   rend   = rowners[rank+1];
1415 
1416   /* distribute row lengths to all processors */
1417   locrowlens = (int*) PetscMalloc( (rend-rstart)*bs*sizeof(int) ); CHKPTRQ(locrowlens);
1418   if (!rank) {
1419     rowlengths = (int*) PetscMalloc( (M+extra_rows)*sizeof(int) ); CHKPTRQ(rowlengths);
1420     ierr = PetscBinaryRead(fd,rowlengths,M,BINARY_INT); CHKERRQ(ierr);
1421     for ( i=0; i<extra_rows; i++ ) rowlengths[M+i] = 1;
1422     sndcounts = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(sndcounts);
1423     for ( i=0; i<size; i++ ) sndcounts[i] = browners[i+1] - browners[i];
1424     MPI_Scatterv(rowlengths,sndcounts,browners,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT,0,comm);
1425     PetscFree(sndcounts);
1426   }
1427   else {
1428     MPI_Scatterv(0,0,0,MPI_INT,locrowlens,(rend-rstart)*bs,MPI_INT, 0,comm);
1429   }
1430 
1431   if (!rank) {
1432     /* calculate the number of nonzeros on each processor */
1433     procsnz = (int*) PetscMalloc( size*sizeof(int) ); CHKPTRQ(procsnz);
1434     PetscMemzero(procsnz,size*sizeof(int));
1435     for ( i=0; i<size; i++ ) {
1436       for ( j=rowners[i]*bs; j< rowners[i+1]*bs; j++ ) {
1437         procsnz[i] += rowlengths[j];
1438       }
1439     }
1440     PetscFree(rowlengths);
1441 
1442     /* determine max buffer needed and allocate it */
1443     maxnz = 0;
1444     for ( i=0; i<size; i++ ) {
1445       maxnz = PetscMax(maxnz,procsnz[i]);
1446     }
1447     cols = (int *) PetscMalloc( maxnz*sizeof(int) ); CHKPTRQ(cols);
1448 
1449     /* read in my part of the matrix column indices  */
1450     nz = procsnz[0];
1451     ibuf = (int *) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1452     mycols = ibuf;
1453     if (size == 1)  nz -= extra_rows;
1454     ierr = PetscBinaryRead(fd,mycols,nz,BINARY_INT); CHKERRQ(ierr);
1455     if (size == 1)  for (i=0; i< extra_rows; i++) { mycols[nz+i] = M+i; }
1456 
1457     /* read in every ones (except the last) and ship off */
1458     for ( i=1; i<size-1; i++ ) {
1459       nz = procsnz[i];
1460       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1461       MPI_Send(cols,nz,MPI_INT,i,tag,comm);
1462     }
1463     /* read in the stuff for the last proc */
1464     if ( size != 1 ) {
1465       nz = procsnz[size-1] - extra_rows;  /* the extra rows are not on the disk */
1466       ierr = PetscBinaryRead(fd,cols,nz,BINARY_INT); CHKERRQ(ierr);
1467       for ( i=0; i<extra_rows; i++ ) cols[nz+i] = M+i;
1468       MPI_Send(cols,nz+extra_rows,MPI_INT,size-1,tag,comm);
1469     }
1470     PetscFree(cols);
1471   }
1472   else {
1473     /* determine buffer space needed for message */
1474     nz = 0;
1475     for ( i=0; i<m; i++ ) {
1476       nz += locrowlens[i];
1477     }
1478     ibuf = (int*) PetscMalloc( nz*sizeof(int) ); CHKPTRQ(ibuf);
1479     mycols = ibuf;
1480     /* receive message of column indices*/
1481     MPI_Recv(mycols,nz,MPI_INT,0,tag,comm,&status);
1482     MPI_Get_count(&status,MPI_INT,&maxnz);
1483     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1484   }
1485 
1486   /* loop over local rows, determining number of off diagonal entries */
1487   dlens  = (int *) PetscMalloc( 2*(rend-rstart+1)*sizeof(int) ); CHKPTRQ(dlens);
1488   odlens = dlens + (rend-rstart);
1489   mask   = (int *) PetscMalloc( 3*Mbs*sizeof(int) ); CHKPTRQ(mask);
1490   PetscMemzero(mask,3*Mbs*sizeof(int));
1491   masked1 = mask    + Mbs;
1492   masked2 = masked1 + Mbs;
1493   rowcount = 0; nzcount = 0;
1494   for ( i=0; i<mbs; i++ ) {
1495     dcount  = 0;
1496     odcount = 0;
1497     for ( j=0; j<bs; j++ ) {
1498       kmax = locrowlens[rowcount];
1499       for ( k=0; k<kmax; k++ ) {
1500         tmp = mycols[nzcount++]/bs;
1501         if (!mask[tmp]) {
1502           mask[tmp] = 1;
1503           if (tmp < rstart || tmp >= rend ) masked2[odcount++] = tmp;
1504           else masked1[dcount++] = tmp;
1505         }
1506       }
1507       rowcount++;
1508     }
1509 
1510     dlens[i]  = dcount;
1511     odlens[i] = odcount;
1512 
1513     /* zero out the mask elements we set */
1514     for ( j=0; j<dcount; j++ ) mask[masked1[j]] = 0;
1515     for ( j=0; j<odcount; j++ ) mask[masked2[j]] = 0;
1516   }
1517 
1518   /* create our matrix */
1519   ierr = MatCreateMPIBAIJ(comm,bs,m,PETSC_DECIDE,M+extra_rows,N+extra_rows,0,dlens,0,odlens,newmat);
1520          CHKERRQ(ierr);
1521   A = *newmat;
1522   MatSetOption(A,MAT_COLUMNS_SORTED);
1523 
1524   if (!rank) {
1525     buf = (Scalar *) PetscMalloc( maxnz*sizeof(Scalar) ); CHKPTRQ(buf);
1526     /* read in my part of the matrix numerical values  */
1527     nz = procsnz[0];
1528     vals = buf;
1529     mycols = ibuf;
1530     if (size == 1)  nz -= extra_rows;
1531     ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1532     if (size == 1)  for (i=0; i< extra_rows; i++) { vals[nz+i] = 1.0; }
1533 
1534     /* insert into matrix */
1535     jj      = rstart*bs;
1536     for ( i=0; i<m; i++ ) {
1537       ierr = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1538       mycols += locrowlens[i];
1539       vals   += locrowlens[i];
1540       jj++;
1541     }
1542     /* read in other processors (except the last one) and ship out */
1543     for ( i=1; i<size-1; i++ ) {
1544       nz = procsnz[i];
1545       vals = buf;
1546       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1547       MPI_Send(vals,nz,MPIU_SCALAR,i,A->tag,comm);
1548     }
1549     /* the last proc */
1550     if ( size != 1 ){
1551       nz = procsnz[i] - extra_rows;
1552       vals = buf;
1553       ierr = PetscBinaryRead(fd,vals,nz,BINARY_SCALAR); CHKERRQ(ierr);
1554       for ( i=0; i<extra_rows; i++ ) vals[nz+i] = 1.0;
1555       MPI_Send(vals,nz+extra_rows,MPIU_SCALAR,size-1,A->tag,comm);
1556     }
1557     PetscFree(procsnz);
1558   }
1559   else {
1560     /* receive numeric values */
1561     buf = (Scalar*) PetscMalloc( nz*sizeof(Scalar) ); CHKPTRQ(buf);
1562 
1563     /* receive message of values*/
1564     vals = buf;
1565     mycols = ibuf;
1566     MPI_Recv(vals,nz,MPIU_SCALAR,0,A->tag,comm,&status);
1567     MPI_Get_count(&status,MPIU_SCALAR,&maxnz);
1568     if (maxnz != nz) SETERRQ(1,"MatLoad_MPIBAIJ:something is wrong with file");
1569 
1570     /* insert into matrix */
1571     jj      = rstart*bs;
1572     for ( i=0; i<m; i++ ) {
1573       ierr    = MatSetValues(A,1,&jj,locrowlens[i],mycols,vals,INSERT_VALUES);CHKERRQ(ierr);
1574       mycols += locrowlens[i];
1575       vals   += locrowlens[i];
1576       jj++;
1577     }
1578   }
1579   PetscFree(locrowlens);
1580   PetscFree(buf);
1581   PetscFree(ibuf);
1582   PetscFree(rowners);
1583   PetscFree(dlens);
1584   PetscFree(mask);
1585   ierr = MatAssemblyBegin(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1586   ierr = MatAssemblyEnd(A,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
1587   return 0;
1588 }
1589 
1590 
1591