xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision edae2e7dd0bdd187ac013a59d14cdeee7e201c06)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.35 1995/05/02 17:59:59 bsmith Exp curfman $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
63   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
68   return 0;
69 }
70 
71 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
72                             int *idxn,Scalar *v,InsertMode addv)
73 {
74   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
75   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
76   int        cstart = aij->cstart, cend = aij->cend,row,col;
77 
78   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
79     SETERR(1,"You cannot mix inserts and adds");
80   }
81   aij->insertmode = addv;
82   for ( i=0; i<m; i++ ) {
83     if (idxm[i] < 0) SETERR(1,"Negative row index");
84     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
85     if (idxm[i] >= rstart && idxm[i] < rend) {
86       row = idxm[i] - rstart;
87       for ( j=0; j<n; j++ ) {
88         if (idxn[j] < 0) SETERR(1,"Negative column index");
89         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
90         if (idxn[j] >= cstart && idxn[j] < cend){
91           col = idxn[j] - cstart;
92           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
93         }
94         else {
95           if (aij->assembled) {
96             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
97             col = aij->colmap[idxn[j]] - 1;
98             if (col < 0) {
99               SETERR(1,"Cannot insert new off diagonal block nonzero in\
100                      already\
101                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
102                      if your need this feature");
103             }
104           }
105           else col = idxn[j];
106           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
107         }
108       }
109     }
110     else {
111       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
112     }
113   }
114   return 0;
115 }
116 
117 /*
118     the assembly code is alot like the code for vectors, we should
119     sometime derive a single assembly code that can be used for
120     either case.
121 */
122 
123 static int MatAssemblyBegin_MPIAIJ(Mat mat,MatAssemblyType mode)
124 {
125   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
126   MPI_Comm    comm = mat->comm;
127   int         numtids = aij->numtids, *owners = aij->rowners;
128   int         mytid = aij->mytid;
129   MPI_Request *send_waits,*recv_waits;
130   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
131   int         tag = 50, *owner,*starts,count;
132   InsertMode  addv;
133   Scalar      *rvalues,*svalues;
134 
135   /* make sure all processors are either in INSERTMODE or ADDMODE */
136   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
137                 MPI_BOR,comm);
138   if (addv == (AddValues|InsertValues)) {
139     SETERR(1,"Some processors have inserted while others have added");
140   }
141   aij->insertmode = addv; /* in case this processor had no cache */
142 
143   /*  first count number of contributors to each processor */
144   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
145   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
146   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
147   for ( i=0; i<aij->stash.n; i++ ) {
148     idx = aij->stash.idx[i];
149     for ( j=0; j<numtids; j++ ) {
150       if (idx >= owners[j] && idx < owners[j+1]) {
151         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
152       }
153     }
154   }
155   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
156 
157   /* inform other processors of number of messages and max length*/
158   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
159   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
160   nreceives = work[mytid];
161   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
162   nmax = work[mytid];
163   FREE(work);
164 
165   /* post receives:
166        1) each message will consist of ordered pairs
167      (global index,value) we store the global index as a double
168      to simplify the message passing.
169        2) since we don't know how long each individual message is we
170      allocate the largest needed buffer for each receive. Potentially
171      this is a lot of wasted space.
172 
173 
174        This could be done better.
175   */
176   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
177   CHKPTR(rvalues);
178   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
179   CHKPTR(recv_waits);
180   for ( i=0; i<nreceives; i++ ) {
181     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
182               comm,recv_waits+i);
183   }
184 
185   /* do sends:
186       1) starts[i] gives the starting index in svalues for stuff going to
187          the ith processor
188   */
189   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
190   CHKPTR(svalues);
191   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
192   CHKPTR(send_waits);
193   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
194   starts[0] = 0;
195   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
196   for ( i=0; i<aij->stash.n; i++ ) {
197     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
198     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
199     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
200   }
201   FREE(owner);
202   starts[0] = 0;
203   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   count = 0;
205   for ( i=0; i<numtids; i++ ) {
206     if (procs[i]) {
207       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
208                 comm,send_waits+count++);
209     }
210   }
211   FREE(starts); FREE(nprocs);
212 
213   /* Free cache space */
214   aij->stash.nmax = aij->stash.n = 0;
215   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
216 
217   aij->svalues    = svalues;       aij->rvalues = rvalues;
218   aij->nsends     = nsends;         aij->nrecvs = nreceives;
219   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
220   aij->rmax       = nmax;
221 
222   return 0;
223 }
224 extern int MatSetUpMultiply_MPIAIJ(Mat);
225 
226 static int MatAssemblyEnd_MPIAIJ(Mat mat,MatAssemblyType mode)
227 {
228   int        ierr;
229   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
230 
231   MPI_Status  *send_status,recv_status;
232   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
233   int         row,col;
234   Scalar      *values,val;
235   InsertMode  addv = aij->insertmode;
236 
237   /*  wait on receives */
238   while (count) {
239     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
240     /* unpack receives into our local space */
241     values = aij->rvalues + 3*imdex*aij->rmax;
242     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
243     n = n/3;
244     for ( i=0; i<n; i++ ) {
245       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
246       col = (int) PETSCREAL(values[3*i+1]);
247       val = values[3*i+2];
248       if (col >= aij->cstart && col < aij->cend) {
249           col -= aij->cstart;
250         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
251       }
252       else {
253         if (aij->assembled) {
254           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
255           col = aij->colmap[col] - 1;
256           if (col < 0) {
257             SETERR(1,"Cannot insert new off diagonal block nonzero in\
258                      already\
259                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
260                      if your need this feature");
261           }
262         }
263         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
264       }
265     }
266     count--;
267   }
268   FREE(aij->recv_waits); FREE(aij->rvalues);
269 
270   /* wait on sends */
271   if (aij->nsends) {
272     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
273     CHKPTR(send_status);
274     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
275     FREE(send_status);
276   }
277   FREE(aij->send_waits); FREE(aij->svalues);
278 
279   aij->insertmode = NotSetValues;
280   ierr = MatAssemblyBegin(aij->A,mode); CHKERR(ierr);
281   ierr = MatAssemblyEnd(aij->A,mode); CHKERR(ierr);
282 
283   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
284     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
285     aij->assembled = 1;
286   }
287   ierr = MatAssemblyBegin(aij->B,mode); CHKERR(ierr);
288   ierr = MatAssemblyEnd(aij->B,mode); CHKERR(ierr);
289 
290   return 0;
291 }
292 
293 static int MatZeroEntries_MPIAIJ(Mat A)
294 {
295   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
296 
297   MatZeroEntries(l->A); MatZeroEntries(l->B);
298   return 0;
299 }
300 
301 /* again this uses the same basic stratagy as in the assembly and
302    scatter create routines, we should try to do it systemamatically
303    if we can figure out the proper level of generality. */
304 
305 /* the code does not do the diagonal entries correctly unless the
306    matrix is square and the column and row owerships are identical.
307    This is a BUG. The only way to fix it seems to be to access
308    aij->A and aij->B directly and not through the MatZeroRows()
309    routine.
310 */
311 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
312 {
313   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
314   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
315   int            *procs,*nprocs,j,found,idx,nsends,*work;
316   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
317   int            *rvalues,tag = 67,count,base,slen,n,*source;
318   int            *lens,imdex,*lrows,*values;
319   MPI_Comm       comm = A->comm;
320   MPI_Request    *send_waits,*recv_waits;
321   MPI_Status     recv_status,*send_status;
322   IS             istmp;
323 
324   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
325   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
326   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
327 
328   /*  first count number of contributors to each processor */
329   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
330   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
331   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
332   for ( i=0; i<N; i++ ) {
333     idx = rows[i];
334     found = 0;
335     for ( j=0; j<numtids; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
338       }
339     }
340     if (!found) SETERR(1,"Imdex out of range");
341   }
342   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
343 
344   /* inform other processors of number of messages and max length*/
345   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
346   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
347   nrecvs = work[mytid];
348   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
349   nmax = work[mytid];
350   FREE(work);
351 
352   /* post receives:   */
353   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
354   CHKPTR(rvalues);
355   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
356   CHKPTR(recv_waits);
357   for ( i=0; i<nrecvs; i++ ) {
358     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
359               comm,recv_waits+i);
360   }
361 
362   /* do sends:
363       1) starts[i] gives the starting index in svalues for stuff going to
364          the ith processor
365   */
366   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
367   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
368   CHKPTR(send_waits);
369   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
370   starts[0] = 0;
371   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
372   for ( i=0; i<N; i++ ) {
373     svalues[starts[owner[i]]++] = rows[i];
374   }
375   ISRestoreIndices(is,&rows);
376 
377   starts[0] = 0;
378   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
379   count = 0;
380   for ( i=0; i<numtids; i++ ) {
381     if (procs[i]) {
382       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
383                 comm,send_waits+count++);
384     }
385   }
386   FREE(starts);
387 
388   base = owners[mytid];
389 
390   /*  wait on receives */
391   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
392   source = lens + nrecvs;
393   count = nrecvs; slen = 0;
394   while (count) {
395     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
396     /* unpack receives into our local space */
397     MPI_Get_count(&recv_status,MPI_INT,&n);
398     source[imdex]  = recv_status.MPI_SOURCE;
399     lens[imdex]  = n;
400     slen += n;
401     count--;
402   }
403   FREE(recv_waits);
404 
405   /* move the data into the send scatter */
406   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
407   count = 0;
408   for ( i=0; i<nrecvs; i++ ) {
409     values = rvalues + i*nmax;
410     for ( j=0; j<lens[i]; j++ ) {
411       lrows[count++] = values[j] - base;
412     }
413   }
414   FREE(rvalues); FREE(lens);
415   FREE(owner); FREE(nprocs);
416 
417   /* actually zap the local rows */
418   ierr = ISCreateSequential(MPI_COMM_SELF,slen,lrows,&istmp);
419   CHKERR(ierr);  FREE(lrows);
420   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
421   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
422   ierr = ISDestroy(istmp); CHKERR(ierr);
423 
424   /* wait on sends */
425   if (nsends) {
426     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
427     CHKPTR(send_status);
428     MPI_Waitall(nsends,send_waits,send_status);
429     FREE(send_status);
430   }
431   FREE(send_waits); FREE(svalues);
432 
433 
434   return 0;
435 }
436 
437 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
438 {
439   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
440   int        ierr;
441   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
442   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
443   CHKERR(ierr);
444   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
445   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
446   CHKERR(ierr);
447   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
448   return 0;
449 }
450 
451 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
452 {
453   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
454   int        ierr;
455   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
456   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
457   CHKERR(ierr);
458   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
459   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
460   CHKERR(ierr);
461   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
462   return 0;
463 }
464 
465 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
466 {
467   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
468   int        ierr;
469 
470   if (!aij->assembled)
471     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
472   /* do nondiagonal part */
473   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
474   /* send it on its way */
475   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
476                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
477   /* do local part */
478   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
479   /* receive remote parts: note this assumes the values are not actually */
480   /* inserted in yy until the next line, which is true for my implementation*/
481   /* but is not perhaps always true. */
482   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
483                          aij->Mvctx); CHKERR(ierr);
484   return 0;
485 }
486 
487 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
488 {
489   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
490   int        ierr;
491 
492   if (!aij->assembled)
493     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
494   /* do nondiagonal part */
495   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
496   /* send it on its way */
497   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
498                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
499   /* do local part */
500   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
501   /* receive remote parts: note this assumes the values are not actually */
502   /* inserted in yy until the next line, which is true for my implementation*/
503   /* but is not perhaps always true. */
504   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
505                          aij->Mvctx); CHKERR(ierr);
506   return 0;
507 }
508 
509 /*
510   This only works correctly for square matrices where the subblock A->A is the
511    diagonal block
512 */
513 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
514 {
515   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
516   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
517   return MatGetDiagonal(A->A,v);
518 }
519 
520 static int MatDestroy_MPIAIJ(PetscObject obj)
521 {
522   Mat        mat = (Mat) obj;
523   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
524   int        ierr;
525 #if defined(PETSC_LOG)
526   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
527 #endif
528   FREE(aij->rowners);
529   ierr = MatDestroy(aij->A); CHKERR(ierr);
530   ierr = MatDestroy(aij->B); CHKERR(ierr);
531   if (aij->colmap) FREE(aij->colmap);
532   if (aij->garray) FREE(aij->garray);
533   if (aij->lvec) VecDestroy(aij->lvec);
534   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
535   FREE(aij);
536   PLogObjectDestroy(mat);
537   PETSCHEADERDESTROY(mat);
538   return 0;
539 }
540 #include "draw.h"
541 #include "pviewer.h"
542 
543 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
544 {
545   Mat        mat = (Mat) obj;
546   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
547   int        ierr;
548   PetscObject vobj = (PetscObject) viewer;
549 
550   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
551   if (!viewer) { /* so that viewers may be used from debuggers */
552     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
553   }
554   if (vobj->cookie == DRAW_COOKIE && vobj->type == NULLWINDOW) return 0;
555   if (vobj->cookie == VIEWER_COOKIE && vobj->type == FILE_VIEWER) {
556     FILE *fd = ViewerFileGetPointer_Private(viewer);
557     MPE_Seq_begin(mat->comm,1);
558     fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
559              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
560              aij->cend);
561     ierr = MatView(aij->A,viewer); CHKERR(ierr);
562     ierr = MatView(aij->B,viewer); CHKERR(ierr);
563     fflush(fd);
564     MPE_Seq_end(mat->comm,1);
565   }
566   else if ((vobj->cookie == VIEWER_COOKIE && vobj->type == FILES_VIEWER) ||
567             vobj->cookie == DRAW_COOKIE) {
568     int numtids = aij->numtids, mytid = aij->mytid;
569     if (numtids == 1) {
570       ierr = MatView(aij->A,viewer); CHKERR(ierr);
571     }
572     else {
573       /* assemble the entire matrix onto first processor. */
574       Mat     A;
575       Mat_AIJ *Aloc;
576       int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
577       Scalar  *a;
578 
579       if (!mytid) {
580         ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
581       }
582       else {
583         ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
584       }
585       CHKERR(ierr);
586 
587       /* copy over the A part */
588       Aloc = (Mat_AIJ*) aij->A->data;
589       m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
590       row = aij->rstart;
591       for ( i=0; i<ai[m]; i++ ) {aj[i] += aij->cstart - 1;}
592       for ( i=0; i<m; i++ ) {
593         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,InsertValues);
594         CHKERR(ierr);
595         row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
596       }
597       aj = Aloc->j;
598       for ( i=0; i<ai[m]; i++ ) {aj[i] -= aij->cstart - 1;}
599 
600       /* copy over the B part */
601       Aloc = (Mat_AIJ*) aij->B->data;
602       m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
603       row = aij->rstart;
604       ct = cols = (int *) MALLOC( (ai[m]+1)*sizeof(int) ); CHKPTR(cols);
605       for ( i=0; i<ai[m]; i++ ) {cols[i] = aij->garray[aj[i]-1];}
606       for ( i=0; i<m; i++ ) {
607         ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,InsertValues);
608         CHKERR(ierr);
609         row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
610       }
611       FREE(ct);
612       ierr = MatAssemblyBegin(A,FINAL_ASSEMBLY); CHKERR(ierr);
613       ierr = MatAssemblyEnd(A,FINAL_ASSEMBLY); CHKERR(ierr);
614       if (!mytid) {
615         ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERR(ierr);
616       }
617       ierr = MatDestroy(A); CHKERR(ierr);
618     }
619   }
620   return 0;
621 }
622 
623 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
624 /*
625     This has to provide several versions.
626 
627      1) per sequential
628      2) a) use only local smoothing updating outer values only once.
629         b) local smoothing updating outer values each inner iteration
630      3) color updating out values betwen colors.
631 */
632 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,MatSORType flag,
633                            double shift,int its,Vec xx)
634 {
635   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
636   Mat        AA = mat->A, BB = mat->B;
637   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
638   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
639   int        ierr,*idx, *diag;
640   int        n = mat->n, m = mat->m, i;
641   Vec        tt;
642 
643   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
644 
645   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
646   xs = x -1; /* shift by one for index start of 1 */
647   ls--;
648   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
649   diag = A->diag;
650   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
651     SETERR(1,"That option not yet support for parallel AIJ matrices");
652   }
653   if (flag & SOR_EISENSTAT) {
654     /* Let  A = L + U + D; where L is lower trianglar,
655     U is upper triangular, E is diagonal; This routine applies
656 
657             (L + E)^{-1} A (U + E)^{-1}
658 
659     to a vector efficiently using Eisenstat's trick. This is for
660     the case of SSOR preconditioner, so E is D/omega where omega
661     is the relaxation factor.
662     */
663     ierr = VecCreate(xx,&tt); CHKERR(ierr);
664     VecGetArray(tt,&t);
665     scale = (2.0/omega) - 1.0;
666     /*  x = (E + U)^{-1} b */
667     VecSet(&zero,mat->lvec);
668     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
669                               mat->Mvctx); CHKERR(ierr);
670     for ( i=m-1; i>-1; i-- ) {
671       n    = A->i[i+1] - diag[i] - 1;
672       idx  = A->j + diag[i];
673       v    = A->a + diag[i];
674       sum  = b[i];
675       SPARSEDENSEMDOT(sum,xs,v,idx,n);
676       d    = shift + A->a[diag[i]-1];
677       n    = B->i[i+1] - B->i[i];
678       idx  = B->j + B->i[i] - 1;
679       v    = B->a + B->i[i] - 1;
680       SPARSEDENSEMDOT(sum,ls,v,idx,n);
681       x[i] = omega*(sum/d);
682     }
683     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
684                             mat->Mvctx); CHKERR(ierr);
685 
686     /*  t = b - (2*E - D)x */
687     v = A->a;
688     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
689 
690     /*  t = (E + L)^{-1}t */
691     ts = t - 1; /* shifted by one for index start of a or mat->j*/
692     diag = A->diag;
693     VecSet(&zero,mat->lvec);
694     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
695                                                  mat->Mvctx); CHKERR(ierr);
696     for ( i=0; i<m; i++ ) {
697       n    = diag[i] - A->i[i];
698       idx  = A->j + A->i[i] - 1;
699       v    = A->a + A->i[i] - 1;
700       sum  = t[i];
701       SPARSEDENSEMDOT(sum,ts,v,idx,n);
702       d    = shift + A->a[diag[i]-1];
703       n    = B->i[i+1] - B->i[i];
704       idx  = B->j + B->i[i] - 1;
705       v    = B->a + B->i[i] - 1;
706       SPARSEDENSEMDOT(sum,ls,v,idx,n);
707       t[i] = omega*(sum/d);
708     }
709     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
710                                                     mat->Mvctx); CHKERR(ierr);
711     /*  x = x + t */
712     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
713     VecDestroy(tt);
714     return 0;
715   }
716 
717 
718   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
719     if (flag & SOR_ZERO_INITIAL_GUESS) {
720       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
721     }
722     else {
723       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
724       CHKERR(ierr);
725       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
726       CHKERR(ierr);
727     }
728     while (its--) {
729       /* go down through the rows */
730       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
731                               mat->Mvctx); CHKERR(ierr);
732       for ( i=0; i<m; i++ ) {
733         n    = A->i[i+1] - A->i[i];
734         idx  = A->j + A->i[i] - 1;
735         v    = A->a + A->i[i] - 1;
736         sum  = b[i];
737         SPARSEDENSEMDOT(sum,xs,v,idx,n);
738         d    = shift + A->a[diag[i]-1];
739         n    = B->i[i+1] - B->i[i];
740         idx  = B->j + B->i[i] - 1;
741         v    = B->a + B->i[i] - 1;
742         SPARSEDENSEMDOT(sum,ls,v,idx,n);
743         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
744       }
745       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
746                             mat->Mvctx); CHKERR(ierr);
747       /* come up through the rows */
748       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
749                               mat->Mvctx); CHKERR(ierr);
750       for ( i=m-1; i>-1; i-- ) {
751         n    = A->i[i+1] - A->i[i];
752         idx  = A->j + A->i[i] - 1;
753         v    = A->a + A->i[i] - 1;
754         sum  = b[i];
755         SPARSEDENSEMDOT(sum,xs,v,idx,n);
756         d    = shift + A->a[diag[i]-1];
757         n    = B->i[i+1] - B->i[i];
758         idx  = B->j + B->i[i] - 1;
759         v    = B->a + B->i[i] - 1;
760         SPARSEDENSEMDOT(sum,ls,v,idx,n);
761         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
762       }
763       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
764                             mat->Mvctx); CHKERR(ierr);
765     }
766   }
767   else if (flag & SOR_FORWARD_SWEEP){
768     if (flag & SOR_ZERO_INITIAL_GUESS) {
769       VecSet(&zero,mat->lvec);
770       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
771                               mat->Mvctx); CHKERR(ierr);
772       for ( i=0; i<m; i++ ) {
773         n    = diag[i] - A->i[i];
774         idx  = A->j + A->i[i] - 1;
775         v    = A->a + A->i[i] - 1;
776         sum  = b[i];
777         SPARSEDENSEMDOT(sum,xs,v,idx,n);
778         d    = shift + A->a[diag[i]-1];
779         n    = B->i[i+1] - B->i[i];
780         idx  = B->j + B->i[i] - 1;
781         v    = B->a + B->i[i] - 1;
782         SPARSEDENSEMDOT(sum,ls,v,idx,n);
783         x[i] = omega*(sum/d);
784       }
785       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
786                             mat->Mvctx); CHKERR(ierr);
787       its--;
788     }
789     while (its--) {
790       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
791       CHKERR(ierr);
792       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
793       CHKERR(ierr);
794       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
795                               mat->Mvctx); CHKERR(ierr);
796       for ( i=0; i<m; i++ ) {
797         n    = A->i[i+1] - A->i[i];
798         idx  = A->j + A->i[i] - 1;
799         v    = A->a + A->i[i] - 1;
800         sum  = b[i];
801         SPARSEDENSEMDOT(sum,xs,v,idx,n);
802         d    = shift + A->a[diag[i]-1];
803         n    = B->i[i+1] - B->i[i];
804         idx  = B->j + B->i[i] - 1;
805         v    = B->a + B->i[i] - 1;
806         SPARSEDENSEMDOT(sum,ls,v,idx,n);
807         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
808       }
809       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
810                             mat->Mvctx); CHKERR(ierr);
811     }
812   }
813   else if (flag & SOR_BACKWARD_SWEEP){
814     if (flag & SOR_ZERO_INITIAL_GUESS) {
815       VecSet(&zero,mat->lvec);
816       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
817                               mat->Mvctx); CHKERR(ierr);
818       for ( i=m-1; i>-1; i-- ) {
819         n    = A->i[i+1] - diag[i] - 1;
820         idx  = A->j + diag[i];
821         v    = A->a + diag[i];
822         sum  = b[i];
823         SPARSEDENSEMDOT(sum,xs,v,idx,n);
824         d    = shift + A->a[diag[i]-1];
825         n    = B->i[i+1] - B->i[i];
826         idx  = B->j + B->i[i] - 1;
827         v    = B->a + B->i[i] - 1;
828         SPARSEDENSEMDOT(sum,ls,v,idx,n);
829         x[i] = omega*(sum/d);
830       }
831       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
832                             mat->Mvctx); CHKERR(ierr);
833       its--;
834     }
835     while (its--) {
836       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
837                             mat->Mvctx); CHKERR(ierr);
838       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
839                             mat->Mvctx); CHKERR(ierr);
840       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
841                               mat->Mvctx); CHKERR(ierr);
842       for ( i=m-1; i>-1; i-- ) {
843         n    = A->i[i+1] - A->i[i];
844         idx  = A->j + A->i[i] - 1;
845         v    = A->a + A->i[i] - 1;
846         sum  = b[i];
847         SPARSEDENSEMDOT(sum,xs,v,idx,n);
848         d    = shift + A->a[diag[i]-1];
849         n    = B->i[i+1] - B->i[i];
850         idx  = B->j + B->i[i] - 1;
851         v    = B->a + B->i[i] - 1;
852         SPARSEDENSEMDOT(sum,ls,v,idx,n);
853         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
854       }
855       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
856                             mat->Mvctx); CHKERR(ierr);
857     }
858   }
859   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
860     if (flag & SOR_ZERO_INITIAL_GUESS) {
861       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
862     }
863     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
864     CHKERR(ierr);
865     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
866     CHKERR(ierr);
867     while (its--) {
868       /* go down through the rows */
869       for ( i=0; i<m; i++ ) {
870         n    = A->i[i+1] - A->i[i];
871         idx  = A->j + A->i[i] - 1;
872         v    = A->a + A->i[i] - 1;
873         sum  = b[i];
874         SPARSEDENSEMDOT(sum,xs,v,idx,n);
875         d    = shift + A->a[diag[i]-1];
876         n    = B->i[i+1] - B->i[i];
877         idx  = B->j + B->i[i] - 1;
878         v    = B->a + B->i[i] - 1;
879         SPARSEDENSEMDOT(sum,ls,v,idx,n);
880         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
881       }
882       /* come up through the rows */
883       for ( i=m-1; i>-1; i-- ) {
884         n    = A->i[i+1] - A->i[i];
885         idx  = A->j + A->i[i] - 1;
886         v    = A->a + A->i[i] - 1;
887         sum  = b[i];
888         SPARSEDENSEMDOT(sum,xs,v,idx,n);
889         d    = shift + A->a[diag[i]-1];
890         n    = B->i[i+1] - B->i[i];
891         idx  = B->j + B->i[i] - 1;
892         v    = B->a + B->i[i] - 1;
893         SPARSEDENSEMDOT(sum,ls,v,idx,n);
894         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
895       }
896     }
897   }
898   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
899     if (flag & SOR_ZERO_INITIAL_GUESS) {
900       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
901     }
902     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
903     CHKERR(ierr);
904     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
905     CHKERR(ierr);
906     while (its--) {
907       for ( i=0; i<m; i++ ) {
908         n    = A->i[i+1] - A->i[i];
909         idx  = A->j + A->i[i] - 1;
910         v    = A->a + A->i[i] - 1;
911         sum  = b[i];
912         SPARSEDENSEMDOT(sum,xs,v,idx,n);
913         d    = shift + A->a[diag[i]-1];
914         n    = B->i[i+1] - B->i[i];
915         idx  = B->j + B->i[i] - 1;
916         v    = B->a + B->i[i] - 1;
917         SPARSEDENSEMDOT(sum,ls,v,idx,n);
918         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
919       }
920     }
921   }
922   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
923     if (flag & SOR_ZERO_INITIAL_GUESS) {
924       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
925     }
926     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
927                             mat->Mvctx); CHKERR(ierr);
928     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
929                             mat->Mvctx); CHKERR(ierr);
930     while (its--) {
931       for ( i=m-1; i>-1; i-- ) {
932         n    = A->i[i+1] - A->i[i];
933         idx  = A->j + A->i[i] - 1;
934         v    = A->a + A->i[i] - 1;
935         sum  = b[i];
936         SPARSEDENSEMDOT(sum,xs,v,idx,n);
937         d    = shift + A->a[diag[i]-1];
938         n    = B->i[i+1] - B->i[i];
939         idx  = B->j + B->i[i] - 1;
940         v    = B->a + B->i[i] - 1;
941         SPARSEDENSEMDOT(sum,ls,v,idx,n);
942         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
943       }
944     }
945   }
946   return 0;
947 }
948 
949 static int MatGetInfo_MPIAIJ(Mat matin,MatInfoType flag,int *nz,
950                              int *nzalloc,int *mem)
951 {
952   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
953   Mat        A = mat->A, B = mat->B;
954   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
955 
956   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERR(ierr);
957   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERR(ierr);
958   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
959   if (flag == MAT_LOCAL) {
960     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
961   } else if (flag == MAT_GLOBAL_MAX) {
962     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
963     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
964   } else if (flag == MAT_GLOBAL_SUM) {
965     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
966     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
967   }
968   return 0;
969 }
970 
971 static int MatSetOption_MPIAIJ(Mat aijin,MatOption op)
972 {
973   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
974 
975   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
976     MatSetOption(aij->A,op);
977     MatSetOption(aij->B,op);
978   }
979   else if (op == YES_NEW_NONZERO_LOCATIONS) {
980     MatSetOption(aij->A,op);
981     MatSetOption(aij->B,op);
982   }
983   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
984   return 0;
985 }
986 
987 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
988 {
989   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
990   *m = mat->M; *n = mat->N;
991   return 0;
992 }
993 
994 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
995 {
996   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
997   *m = mat->m; *n = mat->n;
998   return 0;
999 }
1000 
1001 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
1002 {
1003   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1004   *m = mat->rstart; *n = mat->rend;
1005   return 0;
1006 }
1007 
1008 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1009 {
1010   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1011   Scalar     *vworkA, *vworkB, **pvA, **pvB;
1012   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1013   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1014 
1015   if (!mat->assembled)
1016     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
1017   if (row < rstart || row >= rend)
1018     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
1019   lrow = row - rstart;
1020 
1021   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1022   if (!v)   {pvA = 0; pvB = 0;}
1023   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1024   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1025   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1026   nztot = nzA + nzB;
1027 
1028   if (v  || idx) {
1029     if (nztot) {
1030       /* Sort by increasing column numbers, assuming A and B already sorted */
1031       int imark, imark2;
1032       for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
1033       if (v) {
1034         *v = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
1035         for ( i=0; i<nzB; i++ ) {
1036           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
1037           else break;
1038         }
1039         imark = i;
1040         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
1041         imark2 = imark+nzA;
1042         for ( i=imark; i<nzB; i++ ) (*v)[imark2+i] = vworkB[i];
1043       }
1044       if (idx) {
1045         *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
1046         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1047         for ( i=0; i<nzB; i++ ) {
1048           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1049           else break;
1050         }
1051         imark = i;
1052         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1053         imark2 = imark+nzA;
1054         for ( i=imark; i<nzB; i++ ) (*idx)[imark2+i] = cworkB[i];
1055       }
1056     }
1057     else {*idx = 0; *v=0;}
1058   }
1059   *nz = nztot;
1060   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1061   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1062   return 0;
1063 }
1064 
1065 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1066 {
1067   if (idx) FREE(*idx);
1068   if (v) FREE(*v);
1069   return 0;
1070 }
1071 
1072 static int MatCopy_MPIAIJ(Mat,Mat *);
1073 extern int MatConvert_MPIAIJ(Mat,MatType,Mat *);
1074 
1075 /* -------------------------------------------------------------------*/
1076 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1077        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1078        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1079        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1080        0,0,0,0,
1081        0,0,
1082        MatRelax_MPIAIJ,
1083        0,
1084        MatGetInfo_MPIAIJ,0,
1085        MatCopy_MPIAIJ,
1086        MatGetDiagonal_MPIAIJ,0,0,
1087        MatAssemblyBegin_MPIAIJ,MatAssemblyEnd_MPIAIJ,
1088        0,
1089        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,0,
1090        0,0,0,0,
1091        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1092        0,0,
1093        0,MatConvert_MPIAIJ };
1094 
1095 /*@
1096    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1097    (the default parallel PETSc format).
1098 
1099    Input Parameters:
1100 .  comm - MPI communicator
1101 .  m - number of local rows (or PETSC_DECIDE to have calculated if M is given)
1102 .  n - number of local columns (or PETSC_DECIDE to have calculated
1103            if N is given)
1104 .  M - number of global rows (or PETSC_DECIDE to have calculated if m is given)
1105 .  N - number of global columns (or PETSC_DECIDE to have calculated
1106            if n is given)
1107 .  d_nz - number of nonzeros per row in diagonal portion of matrix
1108            (same for all local rows)
1109 .  d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1110            (possibly different for each row).  You must leave room for the
1111            diagonal entry even if it is zero.
1112 .  o_nz - number of nonzeros per row in off-diagonal portion of matrix
1113            (same for all local rows)
1114 .  o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1115            or null (possibly different for each row).
1116 
1117    Output Parameter:
1118 .  newmat - the matrix
1119 
1120    Notes:
1121    The AIJ format (also called the Yale sparse matrix format or
1122    compressed row storage), is fully compatible with standard Fortran 77
1123    storage.  That is, the stored row and column indices begin at
1124    one, not zero.
1125 
1126    The user MUST specify either the local or global matrix dimensions
1127    (possibly both).
1128 
1129    The user can set d_nz, d_nnz, o_nz, and o_nnz to zero for PETSc to
1130    control dynamic memory allocation.
1131 
1132 .keywords: Mat, matrix, aij, compressed row, sparse, parallel
1133 
1134 .seealso: MatCreateSequentialAIJ(), MatSetValues()
1135 @*/
1136 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1137                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1138 {
1139   Mat          mat;
1140   Mat_MPIAIJ   *aij;
1141   int          ierr, i,sum[2],work[2];
1142   *newmat         = 0;
1143   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1144   PLogObjectCreate(mat);
1145   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1146   mat->ops        = &MatOps;
1147   mat->destroy    = MatDestroy_MPIAIJ;
1148   mat->view       = MatView_MPIAIJ;
1149   mat->factor     = 0;
1150 
1151   mat->comm       = comm;
1152   aij->insertmode = NotSetValues;
1153   MPI_Comm_rank(comm,&aij->mytid);
1154   MPI_Comm_size(comm,&aij->numtids);
1155 
1156   if (M == -1 || N == -1) {
1157     work[0] = m; work[1] = n;
1158     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1159     if (M == -1) M = sum[0];
1160     if (N == -1) N = sum[1];
1161   }
1162   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1163   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1164   aij->m       = m;
1165   aij->n       = n;
1166   aij->N       = N;
1167   aij->M       = M;
1168 
1169   /* build local table of row and column ownerships */
1170   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1171   CHKPTR(aij->rowners);
1172   aij->cowners = aij->rowners + aij->numtids + 1;
1173   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1174   aij->rowners[0] = 0;
1175   for ( i=2; i<=aij->numtids; i++ ) {
1176     aij->rowners[i] += aij->rowners[i-1];
1177   }
1178   aij->rstart = aij->rowners[aij->mytid];
1179   aij->rend   = aij->rowners[aij->mytid+1];
1180   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1181   aij->cowners[0] = 0;
1182   for ( i=2; i<=aij->numtids; i++ ) {
1183     aij->cowners[i] += aij->cowners[i-1];
1184   }
1185   aij->cstart = aij->cowners[aij->mytid];
1186   aij->cend   = aij->cowners[aij->mytid+1];
1187 
1188 
1189   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&aij->A);
1190   CHKERR(ierr);
1191   PLogObjectParent(mat,aij->A);
1192   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&aij->B);
1193   CHKERR(ierr);
1194   PLogObjectParent(mat,aij->B);
1195 
1196   /* build cache for off array entries formed */
1197   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1198   aij->stash.n    = 0;
1199   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1200                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1201   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1202   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1203   aij->colmap    = 0;
1204   aij->garray    = 0;
1205 
1206   /* stuff used for matrix vector multiply */
1207   aij->lvec      = 0;
1208   aij->Mvctx     = 0;
1209   aij->assembled = 0;
1210 
1211   *newmat = mat;
1212   return 0;
1213 }
1214 
1215 static int MatCopy_MPIAIJ(Mat matin,Mat *newmat)
1216 {
1217   Mat        mat;
1218   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1219   int        ierr, len;
1220   *newmat      = 0;
1221 
1222   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1223   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1224   PLogObjectCreate(mat);
1225   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1226   mat->ops        = &MatOps;
1227   mat->destroy    = MatDestroy_MPIAIJ;
1228   mat->view       = MatView_MPIAIJ;
1229   mat->factor     = matin->factor;
1230 
1231   aij->m          = oldmat->m;
1232   aij->n          = oldmat->n;
1233   aij->M          = oldmat->M;
1234   aij->N          = oldmat->N;
1235 
1236   aij->assembled  = 1;
1237   aij->rstart     = oldmat->rstart;
1238   aij->rend       = oldmat->rend;
1239   aij->cstart     = oldmat->cstart;
1240   aij->cend       = oldmat->cend;
1241   aij->numtids    = oldmat->numtids;
1242   aij->mytid      = oldmat->mytid;
1243   aij->insertmode = NotSetValues;
1244 
1245   aij->rowners        = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1246   CHKPTR(aij->rowners);
1247   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1248   aij->stash.nmax     = 0;
1249   aij->stash.n        = 0;
1250   aij->stash.array    = 0;
1251   if (oldmat->colmap) {
1252     aij->colmap      = (int *) MALLOC( (aij->N)*sizeof(int) );
1253     CHKPTR(aij->colmap);
1254     MEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1255   } else aij->colmap = 0;
1256     if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1257     aij->garray      = (int *) MALLOC(len*sizeof(int) ); CHKPTR(aij->garray);
1258     MEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1259   } else aij->garray = 0;
1260   mat->comm           = matin->comm;
1261 
1262   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1263   PLogObjectParent(mat,aij->lvec);
1264   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1265   PLogObjectParent(mat,aij->Mvctx);
1266   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1267   PLogObjectParent(mat,aij->A);
1268   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1269   PLogObjectParent(mat,aij->B);
1270   *newmat = mat;
1271   return 0;
1272 }
1273