xref: /petsc/src/mat/impls/aij/mpi/mpiaij.c (revision eafb4bcbebb1c0621d2677f4c5d850d03cbd72eb)
1 #ifndef lint
2 static char vcid[] = "$Id: mpiaij.c,v 1.27 1995/04/15 03:28:13 bsmith Exp curfman $";
3 #endif
4 
5 #include "mpiaij.h"
6 #include "vec/vecimpl.h"
7 #include "inline/spops.h"
8 
9 #define CHUNCKSIZE   100
10 /*
11    This is a simple minded stash. Do a linear search to determine if
12  in stash, if not add to end.
13 */
14 static int StashValues(Stash *stash,int row,int n, int *idxn,
15                        Scalar *values,InsertMode addv)
16 {
17   int    i,j,N = stash->n,found,*n_idx, *n_idy;
18   Scalar val,*n_array;
19 
20   for ( i=0; i<n; i++ ) {
21     found = 0;
22     val = *values++;
23     for ( j=0; j<N; j++ ) {
24       if ( stash->idx[j] == row && stash->idy[j] == idxn[i]) {
25         /* found a match */
26         if (addv == AddValues) stash->array[j] += val;
27         else stash->array[j] = val;
28         found = 1;
29         break;
30       }
31     }
32     if (!found) { /* not found so add to end */
33       if ( stash->n == stash->nmax ) {
34         /* allocate a larger stash */
35         n_array = (Scalar *) MALLOC( (stash->nmax + CHUNCKSIZE)*(
36                                      2*sizeof(int) + sizeof(Scalar)));
37         CHKPTR(n_array);
38         n_idx = (int *) (n_array + stash->nmax + CHUNCKSIZE);
39         n_idy = (int *) (n_idx + stash->nmax + CHUNCKSIZE);
40         MEMCPY(n_array,stash->array,stash->nmax*sizeof(Scalar));
41         MEMCPY(n_idx,stash->idx,stash->nmax*sizeof(int));
42         MEMCPY(n_idy,stash->idy,stash->nmax*sizeof(int));
43         if (stash->array) FREE(stash->array);
44         stash->array = n_array; stash->idx = n_idx; stash->idy = n_idy;
45         stash->nmax += CHUNCKSIZE;
46       }
47       stash->array[stash->n]   = val;
48       stash->idx[stash->n]     = row;
49       stash->idy[stash->n++]   = idxn[i];
50     }
51   }
52   return 0;
53 }
54 
55 /* local utility routine that creates a mapping from the global column
56 number to the local number in the off-diagonal part of the local
57 storage of the matrix.  This is done in a non scable way since the
58 length of colmap equals the global matrix length.
59 */
60 static int CreateColmap(Mat mat)
61 {
62   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
63   Mat_AIJ    *B = (Mat_AIJ*) aij->B->data;
64   int        n = B->n,i;
65   aij->colmap = (int *) MALLOC( aij->N*sizeof(int) ); CHKPTR(aij->colmap);
66   MEMSET(aij->colmap,0,aij->N*sizeof(int));
67   for ( i=0; i<n; i++ ) aij->colmap[aij->garray[i]] = i+1;
68   return 0;
69 }
70 
71 static int MatSetValues_MPIAIJ(Mat mat,int m,int *idxm,int n,
72                             int *idxn,Scalar *v,InsertMode addv)
73 {
74   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
75   int        ierr,i,j, rstart = aij->rstart, rend = aij->rend;
76   int        cstart = aij->cstart, cend = aij->cend,row,col;
77 
78   if (aij->insertmode != NotSetValues && aij->insertmode != addv) {
79     SETERR(1,"You cannot mix inserts and adds");
80   }
81   aij->insertmode = addv;
82   for ( i=0; i<m; i++ ) {
83     if (idxm[i] < 0) SETERR(1,"Negative row index");
84     if (idxm[i] >= aij->M) SETERR(1,"Row index too large");
85     if (idxm[i] >= rstart && idxm[i] < rend) {
86       row = idxm[i] - rstart;
87       for ( j=0; j<n; j++ ) {
88         if (idxn[j] < 0) SETERR(1,"Negative column index");
89         if (idxn[j] >= aij->N) SETERR(1,"Column index too large");
90         if (idxn[j] >= cstart && idxn[j] < cend){
91           col = idxn[j] - cstart;
92           ierr = MatSetValues(aij->A,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
93         }
94         else {
95           if (aij->assembled) {
96             if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
97             col = aij->colmap[idxn[j]] - 1;
98             if (col < 0) {
99               SETERR(1,"Cannot insert new off diagonal block nonzero in\
100                      already\
101                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
102                      if your need this feature");
103             }
104           }
105           else col = idxn[j];
106           ierr = MatSetValues(aij->B,1,&row,1,&col,v+i*n+j,addv);CHKERR(ierr);
107         }
108       }
109     }
110     else {
111       ierr = StashValues(&aij->stash,idxm[i],n,idxn,v+i*n,addv);CHKERR(ierr);
112     }
113   }
114   return 0;
115 }
116 
117 /*
118     the assembly code is alot like the code for vectors, we should
119     sometime derive a single assembly code that can be used for
120     either case.
121 */
122 
123 static int MatBeginAssembly_MPIAIJ(Mat mat,int mode)
124 {
125   Mat_MPIAIJ  *aij = (Mat_MPIAIJ *) mat->data;
126   MPI_Comm    comm = mat->comm;
127   int         numtids = aij->numtids, *owners = aij->rowners;
128   int         mytid = aij->mytid;
129   MPI_Request *send_waits,*recv_waits;
130   int         *nprocs,i,j,idx,*procs,nsends,nreceives,nmax,*work;
131   int         tag = 50, *owner,*starts,count;
132   InsertMode  addv;
133   Scalar      *rvalues,*svalues;
134 
135   /* make sure all processors are either in INSERTMODE or ADDMODE */
136   MPI_Allreduce((void *) &aij->insertmode,(void *) &addv,1,MPI_INT,
137                 MPI_BOR,comm);
138   if (addv == (AddValues|InsertValues)) {
139     SETERR(1,"Some processors have inserted while others have added");
140   }
141   aij->insertmode = addv; /* in case this processor had no cache */
142 
143   /*  first count number of contributors to each processor */
144   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
145   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
146   owner = (int *) MALLOC( (aij->stash.n+1)*sizeof(int) ); CHKPTR(owner);
147   for ( i=0; i<aij->stash.n; i++ ) {
148     idx = aij->stash.idx[i];
149     for ( j=0; j<numtids; j++ ) {
150       if (idx >= owners[j] && idx < owners[j+1]) {
151         nprocs[j]++; procs[j] = 1; owner[i] = j; break;
152       }
153     }
154   }
155   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
156 
157   /* inform other processors of number of messages and max length*/
158   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
159   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
160   nreceives = work[mytid];
161   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
162   nmax = work[mytid];
163   FREE(work);
164 
165   /* post receives:
166        1) each message will consist of ordered pairs
167      (global index,value) we store the global index as a double
168      to simplify the message passing.
169        2) since we don't know how long each individual message is we
170      allocate the largest needed buffer for each receive. Potentially
171      this is a lot of wasted space.
172 
173 
174        This could be done better.
175   */
176   rvalues = (Scalar *) MALLOC(3*(nreceives+1)*(nmax+1)*sizeof(Scalar));
177   CHKPTR(rvalues);
178   recv_waits = (MPI_Request *) MALLOC((nreceives+1)*sizeof(MPI_Request));
179   CHKPTR(recv_waits);
180   for ( i=0; i<nreceives; i++ ) {
181     MPI_Irecv((void *)(rvalues+3*nmax*i),3*nmax,MPI_SCALAR,MPI_ANY_SOURCE,tag,
182               comm,recv_waits+i);
183   }
184 
185   /* do sends:
186       1) starts[i] gives the starting index in svalues for stuff going to
187          the ith processor
188   */
189   svalues = (Scalar *) MALLOC( 3*(aij->stash.n+1)*sizeof(Scalar) );
190   CHKPTR(svalues);
191   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
192   CHKPTR(send_waits);
193   starts = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(starts);
194   starts[0] = 0;
195   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
196   for ( i=0; i<aij->stash.n; i++ ) {
197     svalues[3*starts[owner[i]]]       = (Scalar)  aij->stash.idx[i];
198     svalues[3*starts[owner[i]]+1]     = (Scalar)  aij->stash.idy[i];
199     svalues[3*(starts[owner[i]]++)+2] =  aij->stash.array[i];
200   }
201   FREE(owner);
202   starts[0] = 0;
203   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
204   count = 0;
205   for ( i=0; i<numtids; i++ ) {
206     if (procs[i]) {
207       MPI_Isend((void*)(svalues+3*starts[i]),3*nprocs[i],MPI_SCALAR,i,tag,
208                 comm,send_waits+count++);
209     }
210   }
211   FREE(starts); FREE(nprocs);
212 
213   /* Free cache space */
214   aij->stash.nmax = aij->stash.n = 0;
215   if (aij->stash.array){ FREE(aij->stash.array); aij->stash.array = 0;}
216 
217   aij->svalues    = svalues;       aij->rvalues = rvalues;
218   aij->nsends     = nsends;         aij->nrecvs = nreceives;
219   aij->send_waits = send_waits; aij->recv_waits = recv_waits;
220   aij->rmax       = nmax;
221 
222   return 0;
223 }
224 extern int MatSetUpMultiply_MPIAIJ(Mat);
225 
226 static int MatEndAssembly_MPIAIJ(Mat mat,int mode)
227 {
228   int        ierr;
229   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
230 
231   MPI_Status  *send_status,recv_status;
232   int         imdex,nrecvs = aij->nrecvs, count = nrecvs, i, n;
233   int         row,col;
234   Scalar      *values,val;
235   InsertMode  addv = aij->insertmode;
236 
237   /*  wait on receives */
238   while (count) {
239     MPI_Waitany(nrecvs,aij->recv_waits,&imdex,&recv_status);
240     /* unpack receives into our local space */
241     values = aij->rvalues + 3*imdex*aij->rmax;
242     MPI_Get_count(&recv_status,MPI_SCALAR,&n);
243     n = n/3;
244     for ( i=0; i<n; i++ ) {
245       row = (int) PETSCREAL(values[3*i]) - aij->rstart;
246       col = (int) PETSCREAL(values[3*i+1]);
247       val = values[3*i+2];
248       if (col >= aij->cstart && col < aij->cend) {
249           col -= aij->cstart;
250         MatSetValues(aij->A,1,&row,1,&col,&val,addv);
251       }
252       else {
253         if (aij->assembled) {
254           if (!aij->colmap) {ierr = CreateColmap(mat); CHKERR(ierr);}
255           col = aij->colmap[col] - 1;
256           if (col < 0) {
257             SETERR(1,"Cannot insert new off diagonal block nonzero in\
258                      already\
259                      assembled matrix. Contact petsc-maint@mcs.anl.gov\
260                      if your need this feature");
261           }
262         }
263         MatSetValues(aij->B,1,&row,1,&col,&val,addv);
264       }
265     }
266     count--;
267   }
268   FREE(aij->recv_waits); FREE(aij->rvalues);
269 
270   /* wait on sends */
271   if (aij->nsends) {
272     send_status = (MPI_Status *) MALLOC( aij->nsends*sizeof(MPI_Status) );
273     CHKPTR(send_status);
274     MPI_Waitall(aij->nsends,aij->send_waits,send_status);
275     FREE(send_status);
276   }
277   FREE(aij->send_waits); FREE(aij->svalues);
278 
279   aij->insertmode = NotSetValues;
280   ierr = MatBeginAssembly(aij->A,mode); CHKERR(ierr);
281   ierr = MatEndAssembly(aij->A,mode); CHKERR(ierr);
282 
283   if (!aij->assembled && mode == FINAL_ASSEMBLY) {
284     ierr = MatSetUpMultiply_MPIAIJ(mat); CHKERR(ierr);
285     aij->assembled = 1;
286   }
287   ierr = MatBeginAssembly(aij->B,mode); CHKERR(ierr);
288   ierr = MatEndAssembly(aij->B,mode); CHKERR(ierr);
289 
290   return 0;
291 }
292 
293 static int MatZeroEntries_MPIAIJ(Mat A)
294 {
295   Mat_MPIAIJ *l = (Mat_MPIAIJ *) A->data;
296 
297   MatZeroEntries(l->A); MatZeroEntries(l->B);
298   return 0;
299 }
300 
301 /* again this uses the same basic stratagy as in the assembly and
302    scatter create routines, we should try to do it systemamatically
303    if we can figure out the proper level of generality. */
304 
305 /* the code does not do the diagonal entries correctly unless the
306    matrix is square and the column and row owerships are identical.
307    This is a BUG. The only way to fix it seems to be to access
308    aij->A and aij->B directly and not through the MatZeroRows()
309    routine.
310 */
311 static int MatZeroRows_MPIAIJ(Mat A,IS is,Scalar *diag)
312 {
313   Mat_MPIAIJ     *l = (Mat_MPIAIJ *) A->data;
314   int            i,ierr,N, *rows,*owners = l->rowners,numtids = l->numtids;
315   int            *procs,*nprocs,j,found,idx,nsends,*work;
316   int            nmax,*svalues,*starts,*owner,nrecvs,mytid = l->mytid;
317   int            *rvalues,tag = 67,count,base,slen,n,*source;
318   int            *lens,imdex,*lrows,*values;
319   MPI_Comm       comm = A->comm;
320   MPI_Request    *send_waits,*recv_waits;
321   MPI_Status     recv_status,*send_status;
322   IS             istmp;
323 
324   if (!l->assembled) SETERR(1,"MatZeroRows_MPIAIJ: must assemble matrix first");
325   ierr = ISGetLocalSize(is,&N); CHKERR(ierr);
326   ierr = ISGetIndices(is,&rows); CHKERR(ierr);
327 
328   /*  first count number of contributors to each processor */
329   nprocs = (int *) MALLOC( 2*numtids*sizeof(int) ); CHKPTR(nprocs);
330   MEMSET(nprocs,0,2*numtids*sizeof(int)); procs = nprocs + numtids;
331   owner = (int *) MALLOC((N+1)*sizeof(int)); CHKPTR(owner); /* see note*/
332   for ( i=0; i<N; i++ ) {
333     idx = rows[i];
334     found = 0;
335     for ( j=0; j<numtids; j++ ) {
336       if (idx >= owners[j] && idx < owners[j+1]) {
337         nprocs[j]++; procs[j] = 1; owner[i] = j; found = 1; break;
338       }
339     }
340     if (!found) SETERR(1,"Imdex out of range");
341   }
342   nsends = 0;  for ( i=0; i<numtids; i++ ) { nsends += procs[i];}
343 
344   /* inform other processors of number of messages and max length*/
345   work = (int *) MALLOC( numtids*sizeof(int) ); CHKPTR(work);
346   MPI_Allreduce((void *) procs,(void *) work,numtids,MPI_INT,MPI_SUM,comm);
347   nrecvs = work[mytid];
348   MPI_Allreduce((void *) nprocs,(void *) work,numtids,MPI_INT,MPI_MAX,comm);
349   nmax = work[mytid];
350   FREE(work);
351 
352   /* post receives:   */
353   rvalues = (int *) MALLOC((nrecvs+1)*(nmax+1)*sizeof(int)); /*see note */
354   CHKPTR(rvalues);
355   recv_waits = (MPI_Request *) MALLOC((nrecvs+1)*sizeof(MPI_Request));
356   CHKPTR(recv_waits);
357   for ( i=0; i<nrecvs; i++ ) {
358     MPI_Irecv((void *)(rvalues+nmax*i),nmax,MPI_INT,MPI_ANY_SOURCE,tag,
359               comm,recv_waits+i);
360   }
361 
362   /* do sends:
363       1) starts[i] gives the starting index in svalues for stuff going to
364          the ith processor
365   */
366   svalues = (int *) MALLOC( (N+1)*sizeof(int) ); CHKPTR(svalues);
367   send_waits = (MPI_Request *) MALLOC( (nsends+1)*sizeof(MPI_Request));
368   CHKPTR(send_waits);
369   starts = (int *) MALLOC( (numtids+1)*sizeof(int) ); CHKPTR(starts);
370   starts[0] = 0;
371   for ( i=1; i<numtids; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
372   for ( i=0; i<N; i++ ) {
373     svalues[starts[owner[i]]++] = rows[i];
374   }
375   ISRestoreIndices(is,&rows);
376 
377   starts[0] = 0;
378   for ( i=1; i<numtids+1; i++ ) { starts[i] = starts[i-1] + nprocs[i-1];}
379   count = 0;
380   for ( i=0; i<numtids; i++ ) {
381     if (procs[i]) {
382       MPI_Isend((void*)(svalues+starts[i]),nprocs[i],MPI_INT,i,tag,
383                 comm,send_waits+count++);
384     }
385   }
386   FREE(starts);
387 
388   base = owners[mytid];
389 
390   /*  wait on receives */
391   lens = (int *) MALLOC( 2*(nrecvs+1)*sizeof(int) ); CHKPTR(lens);
392   source = lens + nrecvs;
393   count = nrecvs; slen = 0;
394   while (count) {
395     MPI_Waitany(nrecvs,recv_waits,&imdex,&recv_status);
396     /* unpack receives into our local space */
397     MPI_Get_count(&recv_status,MPI_INT,&n);
398     source[imdex]  = recv_status.MPI_SOURCE;
399     lens[imdex]  = n;
400     slen += n;
401     count--;
402   }
403   FREE(recv_waits);
404 
405   /* move the data into the send scatter */
406   lrows = (int *) MALLOC( slen*sizeof(int) ); CHKPTR(lrows);
407   count = 0;
408   for ( i=0; i<nrecvs; i++ ) {
409     values = rvalues + i*nmax;
410     for ( j=0; j<lens[i]; j++ ) {
411       lrows[count++] = values[j] - base;
412     }
413   }
414   FREE(rvalues); FREE(lens);
415   FREE(owner); FREE(nprocs);
416 
417   /* actually zap the local rows */
418   ierr = ISCreateSequential(MPI_COMM_SELF,slen,lrows,&istmp);
419   CHKERR(ierr);  FREE(lrows);
420   ierr = MatZeroRows(l->A,istmp,diag); CHKERR(ierr);
421   ierr = MatZeroRows(l->B,istmp,0); CHKERR(ierr);
422   ierr = ISDestroy(istmp); CHKERR(ierr);
423 
424   /* wait on sends */
425   if (nsends) {
426     send_status = (MPI_Status *) MALLOC( nsends*sizeof(MPI_Status) );
427     CHKPTR(send_status);
428     MPI_Waitall(nsends,send_waits,send_status);
429     FREE(send_status);
430   }
431   FREE(send_waits); FREE(svalues);
432 
433 
434   return 0;
435 }
436 
437 static int MatMult_MPIAIJ(Mat aijin,Vec xx,Vec yy)
438 {
439   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
440   int        ierr;
441   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
442   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
443   CHKERR(ierr);
444   ierr = MatMult(aij->A,xx,yy); CHKERR(ierr);
445   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
446   CHKERR(ierr);
447   ierr = MatMultAdd(aij->B,aij->lvec,yy,yy); CHKERR(ierr);
448   return 0;
449 }
450 
451 static int MatMultAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
452 {
453   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
454   int        ierr;
455   if (!aij->assembled) SETERR(1,"MatMult_MPIAIJ: must assemble matrix first");
456   ierr = VecScatterBegin(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
457   CHKERR(ierr);
458   ierr = MatMultAdd(aij->A,xx,yy,zz); CHKERR(ierr);
459   ierr = VecScatterEnd(xx,0,aij->lvec,0,InsertValues,ScatterAll,aij->Mvctx);
460   CHKERR(ierr);
461   ierr = MatMultAdd(aij->B,aij->lvec,zz,zz); CHKERR(ierr);
462   return 0;
463 }
464 
465 static int MatMultTrans_MPIAIJ(Mat aijin,Vec xx,Vec yy)
466 {
467   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
468   int        ierr;
469 
470   if (!aij->assembled)
471     SETERR(1,"MatMulTrans_MPIAIJ: must assemble matrix first");
472   /* do nondiagonal part */
473   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
474   /* send it on its way */
475   ierr = VecScatterBegin(aij->lvec,0,yy,0,AddValues,
476                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
477   /* do local part */
478   ierr = MatMultTrans(aij->A,xx,yy); CHKERR(ierr);
479   /* receive remote parts: note this assumes the values are not actually */
480   /* inserted in yy until the next line, which is true for my implementation*/
481   /* but is not perhaps always true. */
482   ierr = VecScatterEnd(aij->lvec,0,yy,0,AddValues,ScatterAll|ScatterReverse,
483                          aij->Mvctx); CHKERR(ierr);
484   return 0;
485 }
486 
487 static int MatMultTransAdd_MPIAIJ(Mat aijin,Vec xx,Vec yy,Vec zz)
488 {
489   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
490   int        ierr;
491 
492   if (!aij->assembled)
493     SETERR(1,"MatMulTransAdd_MPIAIJ: must assemble matrix first");
494   /* do nondiagonal part */
495   ierr = MatMultTrans(aij->B,xx,aij->lvec); CHKERR(ierr);
496   /* send it on its way */
497   ierr = VecScatterBegin(aij->lvec,0,zz,0,AddValues,
498                          ScatterAll|ScatterReverse,aij->Mvctx); CHKERR(ierr);
499   /* do local part */
500   ierr = MatMultTransAdd(aij->A,xx,yy,zz); CHKERR(ierr);
501   /* receive remote parts: note this assumes the values are not actually */
502   /* inserted in yy until the next line, which is true for my implementation*/
503   /* but is not perhaps always true. */
504   ierr = VecScatterEnd(aij->lvec,0,zz,0,AddValues,ScatterAll|ScatterReverse,
505                          aij->Mvctx); CHKERR(ierr);
506   return 0;
507 }
508 
509 /*
510   This only works correctly for square matrices where the subblock A->A is the
511    diagonal block
512 */
513 static int MatGetDiagonal_MPIAIJ(Mat Ain,Vec v)
514 {
515   Mat_MPIAIJ *A = (Mat_MPIAIJ *) Ain->data;
516   if (!A->assembled) SETERR(1,"MatGetDiag_MPIAIJ: must assemble matrix first");
517   return MatGetDiagonal(A->A,v);
518 }
519 
520 static int MatDestroy_MPIAIJ(PetscObject obj)
521 {
522   Mat        mat = (Mat) obj;
523   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
524   int        ierr;
525 #if defined(PETSC_LOG)
526   PLogObjectState(obj,"Rows %d Cols %d",aij->M,aij->N);
527 #endif
528   FREE(aij->rowners);
529   ierr = MatDestroy(aij->A); CHKERR(ierr);
530   ierr = MatDestroy(aij->B); CHKERR(ierr);
531   if (aij->colmap) FREE(aij->colmap);
532   if (aij->garray) FREE(aij->garray);
533   if (aij->lvec) VecDestroy(aij->lvec);
534   if (aij->Mvctx) VecScatterCtxDestroy(aij->Mvctx);
535   FREE(aij);
536   PLogObjectDestroy(mat);
537   PETSCHEADERDESTROY(mat);
538   return 0;
539 }
540 
541 static int MatView_MPIAIJ(PetscObject obj,Viewer viewer)
542 {
543   Mat        mat = (Mat) obj;
544   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) mat->data;
545   int        ierr;
546   PetscObject vobj = (PetscObject) viewer;
547 
548   if (!viewer) { /* so that viewers may be used from debuggers */
549     viewer = STDOUT_VIEWER; vobj = (PetscObject) viewer;
550   }
551   if (!aij->assembled) SETERR(1,"MatView_MPIAIJ: must assemble matrix first");
552   if (vobj->cookie == VIEWER_COOKIE) {
553     FILE *fd = ViewerFileGetPointer(viewer);
554     if (vobj->type == FILE_VIEWER) {
555       MPE_Seq_begin(mat->comm,1);
556       fprintf(fd,"[%d] rows %d starts %d ends %d cols %d starts %d ends %d\n",
557              aij->mytid,aij->m,aij->rstart,aij->rend,aij->n,aij->cstart,
558              aij->cend);
559       ierr = MatView(aij->A,viewer); CHKERR(ierr);
560       ierr = MatView(aij->B,viewer); CHKERR(ierr);
561       fflush(fd);
562       MPE_Seq_end(mat->comm,1);
563     }
564     else if (vobj->type == FILES_VIEWER) {
565       int numtids = aij->numtids, mytid = aij->mytid;
566       if (numtids == 1) {
567         ierr = MatView(aij->A,viewer); CHKERR(ierr);
568       }
569       else {
570         /* assemble the entire matrix onto first processor. */
571         Mat     A;
572         Mat_AIJ *Aloc;
573         int     M = aij->M, N = aij->N,m,*ai,*aj,row,*cols,i,*ct;
574         Scalar  *a;
575 
576         if (!mytid) {
577           ierr = MatCreateMPIAIJ(mat->comm,M,N,M,N,0,0,0,0,&A);
578         }
579         else {
580           ierr = MatCreateMPIAIJ(mat->comm,0,0,M,N,0,0,0,0,&A);
581         }
582         CHKERR(ierr);
583 
584         /* copy over the A part */
585         Aloc = (Mat_AIJ*) aij->A->data;
586         m = Aloc->m; ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
587         row = aij->rstart;
588         for ( i=0; i<ai[m]; i++ ) {aj[i] += aij->cstart - 1;}
589         for ( i=0; i<m; i++ ) {
590           ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],aj,a,InsertValues);
591           CHKERR(ierr);
592           row++; a += ai[i+1]-ai[i]; aj += ai[i+1]-ai[i];
593         }
594         aj = Aloc->j;
595         for ( i=0; i<ai[m]; i++ ) {aj[i] -= aij->cstart - 1;}
596 
597         /* copy over the B part */
598         Aloc = (Mat_AIJ*) aij->B->data;
599         m = Aloc->m;  ai = Aloc->i; aj = Aloc->j; a = Aloc->a;
600         row = aij->rstart;
601         ct = cols = (int *) MALLOC( (ai[m]+1)*sizeof(int) ); CHKPTR(cols);
602         for ( i=0; i<ai[m]; i++ ) {cols[i] = aij->garray[aj[i]-1];}
603         for ( i=0; i<m; i++ ) {
604           ierr = MatSetValues(A,1,&row,ai[i+1]-ai[i],cols,a,InsertValues);
605           CHKERR(ierr);
606           row++; a += ai[i+1]-ai[i]; cols += ai[i+1]-ai[i];
607         }
608         FREE(ct);
609         ierr = MatBeginAssembly(A,FINAL_ASSEMBLY); CHKERR(ierr);
610         ierr = MatEndAssembly(A,FINAL_ASSEMBLY); CHKERR(ierr);
611         if (!mytid) {
612           ierr = MatView(((Mat_MPIAIJ*)(A->data))->A,viewer); CHKERR(ierr);
613         }
614         ierr = MatDestroy(A); CHKERR(ierr);
615       }
616     }
617   }
618   return 0;
619 }
620 
621 extern int MatMarkDiag_AIJ(Mat_AIJ  *);
622 /*
623     This has to provide several versions.
624 
625      1) per sequential
626      2) a) use only local smoothing updating outer values only once.
627         b) local smoothing updating outer values each inner iteration
628      3) color updating out values betwen colors.
629 */
630 static int MatRelax_MPIAIJ(Mat matin,Vec bb,double omega,int flag,double shift,
631                         int its,Vec xx)
632 {
633   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
634   Mat        AA = mat->A, BB = mat->B;
635   Mat_AIJ    *A = (Mat_AIJ *) AA->data, *B = (Mat_AIJ *)BB->data;
636   Scalar     zero = 0.0,*b,*x,*xs,*ls,d,*v,sum,scale,*t,*ts;
637   int        ierr,*idx, *diag;
638   int        n = mat->n, m = mat->m, i;
639   Vec        tt;
640 
641   if (!mat->assembled) SETERR(1,"MatRelax_MPIAIJ: must assemble matrix first");
642 
643   VecGetArray(xx,&x); VecGetArray(bb,&b); VecGetArray(mat->lvec,&ls);
644   xs = x -1; /* shift by one for index start of 1 */
645   ls--;
646   if (!A->diag) {if ((ierr = MatMarkDiag_AIJ(A))) return ierr;}
647   diag = A->diag;
648   if (flag == SOR_APPLY_UPPER || flag == SOR_APPLY_LOWER) {
649     SETERR(1,"That option not yet support for parallel AIJ matrices");
650   }
651   if (flag & SOR_EISENSTAT) {
652     /* Let  A = L + U + D; where L is lower trianglar,
653     U is upper triangular, E is diagonal; This routine applies
654 
655             (L + E)^{-1} A (U + E)^{-1}
656 
657     to a vector efficiently using Eisenstat's trick. This is for
658     the case of SSOR preconditioner, so E is D/omega where omega
659     is the relaxation factor.
660     */
661     ierr = VecCreate(xx,&tt); CHKERR(ierr);
662     VecGetArray(tt,&t);
663     scale = (2.0/omega) - 1.0;
664     /*  x = (E + U)^{-1} b */
665     VecSet(&zero,mat->lvec);
666     ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
667                               mat->Mvctx); CHKERR(ierr);
668     for ( i=m-1; i>-1; i-- ) {
669       n    = A->i[i+1] - diag[i] - 1;
670       idx  = A->j + diag[i];
671       v    = A->a + diag[i];
672       sum  = b[i];
673       SPARSEDENSEMDOT(sum,xs,v,idx,n);
674       d    = shift + A->a[diag[i]-1];
675       n    = B->i[i+1] - B->i[i];
676       idx  = B->j + B->i[i] - 1;
677       v    = B->a + B->i[i] - 1;
678       SPARSEDENSEMDOT(sum,ls,v,idx,n);
679       x[i] = omega*(sum/d);
680     }
681     ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
682                             mat->Mvctx); CHKERR(ierr);
683 
684     /*  t = b - (2*E - D)x */
685     v = A->a;
686     for ( i=0; i<m; i++ ) { t[i] = b[i] - scale*(v[*diag++ - 1])*x[i]; }
687 
688     /*  t = (E + L)^{-1}t */
689     ts = t - 1; /* shifted by one for index start of a or mat->j*/
690     diag = A->diag;
691     VecSet(&zero,mat->lvec);
692     ierr = VecPipelineBegin(tt,0,mat->lvec,0,InsertValues,PipelineDown,
693                                                  mat->Mvctx); CHKERR(ierr);
694     for ( i=0; i<m; i++ ) {
695       n    = diag[i] - A->i[i];
696       idx  = A->j + A->i[i] - 1;
697       v    = A->a + A->i[i] - 1;
698       sum  = t[i];
699       SPARSEDENSEMDOT(sum,ts,v,idx,n);
700       d    = shift + A->a[diag[i]-1];
701       n    = B->i[i+1] - B->i[i];
702       idx  = B->j + B->i[i] - 1;
703       v    = B->a + B->i[i] - 1;
704       SPARSEDENSEMDOT(sum,ls,v,idx,n);
705       t[i] = omega*(sum/d);
706     }
707     ierr = VecPipelineEnd(tt,0,mat->lvec,0,InsertValues,PipelineDown,
708                                                     mat->Mvctx); CHKERR(ierr);
709     /*  x = x + t */
710     for ( i=0; i<m; i++ ) { x[i] += t[i]; }
711     VecDestroy(tt);
712     return 0;
713   }
714 
715 
716   if ((flag & SOR_SYMMETRIC_SWEEP) == SOR_SYMMETRIC_SWEEP){
717     if (flag & SOR_ZERO_INITIAL_GUESS) {
718       VecSet(&zero,mat->lvec); VecSet(&zero,xx);
719     }
720     else {
721       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
722       CHKERR(ierr);
723       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
724       CHKERR(ierr);
725     }
726     while (its--) {
727       /* go down through the rows */
728       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
729                               mat->Mvctx); CHKERR(ierr);
730       for ( i=0; i<m; i++ ) {
731         n    = A->i[i+1] - A->i[i];
732         idx  = A->j + A->i[i] - 1;
733         v    = A->a + A->i[i] - 1;
734         sum  = b[i];
735         SPARSEDENSEMDOT(sum,xs,v,idx,n);
736         d    = shift + A->a[diag[i]-1];
737         n    = B->i[i+1] - B->i[i];
738         idx  = B->j + B->i[i] - 1;
739         v    = B->a + B->i[i] - 1;
740         SPARSEDENSEMDOT(sum,ls,v,idx,n);
741         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
742       }
743       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
744                             mat->Mvctx); CHKERR(ierr);
745       /* come up through the rows */
746       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
747                               mat->Mvctx); CHKERR(ierr);
748       for ( i=m-1; i>-1; i-- ) {
749         n    = A->i[i+1] - A->i[i];
750         idx  = A->j + A->i[i] - 1;
751         v    = A->a + A->i[i] - 1;
752         sum  = b[i];
753         SPARSEDENSEMDOT(sum,xs,v,idx,n);
754         d    = shift + A->a[diag[i]-1];
755         n    = B->i[i+1] - B->i[i];
756         idx  = B->j + B->i[i] - 1;
757         v    = B->a + B->i[i] - 1;
758         SPARSEDENSEMDOT(sum,ls,v,idx,n);
759         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
760       }
761       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
762                             mat->Mvctx); CHKERR(ierr);
763     }
764   }
765   else if (flag & SOR_FORWARD_SWEEP){
766     if (flag & SOR_ZERO_INITIAL_GUESS) {
767       VecSet(&zero,mat->lvec);
768       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
769                               mat->Mvctx); CHKERR(ierr);
770       for ( i=0; i<m; i++ ) {
771         n    = diag[i] - A->i[i];
772         idx  = A->j + A->i[i] - 1;
773         v    = A->a + A->i[i] - 1;
774         sum  = b[i];
775         SPARSEDENSEMDOT(sum,xs,v,idx,n);
776         d    = shift + A->a[diag[i]-1];
777         n    = B->i[i+1] - B->i[i];
778         idx  = B->j + B->i[i] - 1;
779         v    = B->a + B->i[i] - 1;
780         SPARSEDENSEMDOT(sum,ls,v,idx,n);
781         x[i] = omega*(sum/d);
782       }
783       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
784                             mat->Mvctx); CHKERR(ierr);
785       its--;
786     }
787     while (its--) {
788       ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
789       CHKERR(ierr);
790       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterUp,mat->Mvctx);
791       CHKERR(ierr);
792       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineDown,
793                               mat->Mvctx); CHKERR(ierr);
794       for ( i=0; i<m; i++ ) {
795         n    = A->i[i+1] - A->i[i];
796         idx  = A->j + A->i[i] - 1;
797         v    = A->a + A->i[i] - 1;
798         sum  = b[i];
799         SPARSEDENSEMDOT(sum,xs,v,idx,n);
800         d    = shift + A->a[diag[i]-1];
801         n    = B->i[i+1] - B->i[i];
802         idx  = B->j + B->i[i] - 1;
803         v    = B->a + B->i[i] - 1;
804         SPARSEDENSEMDOT(sum,ls,v,idx,n);
805         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
806       }
807       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineDown,
808                             mat->Mvctx); CHKERR(ierr);
809     }
810   }
811   else if (flag & SOR_BACKWARD_SWEEP){
812     if (flag & SOR_ZERO_INITIAL_GUESS) {
813       VecSet(&zero,mat->lvec);
814       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
815                               mat->Mvctx); CHKERR(ierr);
816       for ( i=m-1; i>-1; i-- ) {
817         n    = A->i[i+1] - diag[i] - 1;
818         idx  = A->j + diag[i];
819         v    = A->a + diag[i];
820         sum  = b[i];
821         SPARSEDENSEMDOT(sum,xs,v,idx,n);
822         d    = shift + A->a[diag[i]-1];
823         n    = B->i[i+1] - B->i[i];
824         idx  = B->j + B->i[i] - 1;
825         v    = B->a + B->i[i] - 1;
826         SPARSEDENSEMDOT(sum,ls,v,idx,n);
827         x[i] = omega*(sum/d);
828       }
829       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
830                             mat->Mvctx); CHKERR(ierr);
831       its--;
832     }
833     while (its--) {
834       ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterDown,
835                             mat->Mvctx); CHKERR(ierr);
836       ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterDown,
837                             mat->Mvctx); CHKERR(ierr);
838       ierr = VecPipelineBegin(xx,0,mat->lvec,0,InsertValues,PipelineUp,
839                               mat->Mvctx); CHKERR(ierr);
840       for ( i=m-1; i>-1; i-- ) {
841         n    = A->i[i+1] - A->i[i];
842         idx  = A->j + A->i[i] - 1;
843         v    = A->a + A->i[i] - 1;
844         sum  = b[i];
845         SPARSEDENSEMDOT(sum,xs,v,idx,n);
846         d    = shift + A->a[diag[i]-1];
847         n    = B->i[i+1] - B->i[i];
848         idx  = B->j + B->i[i] - 1;
849         v    = B->a + B->i[i] - 1;
850         SPARSEDENSEMDOT(sum,ls,v,idx,n);
851         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
852       }
853       ierr = VecPipelineEnd(xx,0,mat->lvec,0,InsertValues,PipelineUp,
854                             mat->Mvctx); CHKERR(ierr);
855     }
856   }
857   else if ((flag & SOR_LOCAL_SYMMETRIC_SWEEP) == SOR_LOCAL_SYMMETRIC_SWEEP){
858     if (flag & SOR_ZERO_INITIAL_GUESS) {
859       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
860     }
861     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
862     CHKERR(ierr);
863     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
864     CHKERR(ierr);
865     while (its--) {
866       /* go down through the rows */
867       for ( i=0; i<m; i++ ) {
868         n    = A->i[i+1] - A->i[i];
869         idx  = A->j + A->i[i] - 1;
870         v    = A->a + A->i[i] - 1;
871         sum  = b[i];
872         SPARSEDENSEMDOT(sum,xs,v,idx,n);
873         d    = shift + A->a[diag[i]-1];
874         n    = B->i[i+1] - B->i[i];
875         idx  = B->j + B->i[i] - 1;
876         v    = B->a + B->i[i] - 1;
877         SPARSEDENSEMDOT(sum,ls,v,idx,n);
878         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
879       }
880       /* come up through the rows */
881       for ( i=m-1; i>-1; i-- ) {
882         n    = A->i[i+1] - A->i[i];
883         idx  = A->j + A->i[i] - 1;
884         v    = A->a + A->i[i] - 1;
885         sum  = b[i];
886         SPARSEDENSEMDOT(sum,xs,v,idx,n);
887         d    = shift + A->a[diag[i]-1];
888         n    = B->i[i+1] - B->i[i];
889         idx  = B->j + B->i[i] - 1;
890         v    = B->a + B->i[i] - 1;
891         SPARSEDENSEMDOT(sum,ls,v,idx,n);
892         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
893       }
894     }
895   }
896   else if (flag & SOR_LOCAL_FORWARD_SWEEP){
897     if (flag & SOR_ZERO_INITIAL_GUESS) {
898       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
899     }
900     ierr=VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
901     CHKERR(ierr);
902     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,mat->Mvctx);
903     CHKERR(ierr);
904     while (its--) {
905       for ( i=0; i<m; i++ ) {
906         n    = A->i[i+1] - A->i[i];
907         idx  = A->j + A->i[i] - 1;
908         v    = A->a + A->i[i] - 1;
909         sum  = b[i];
910         SPARSEDENSEMDOT(sum,xs,v,idx,n);
911         d    = shift + A->a[diag[i]-1];
912         n    = B->i[i+1] - B->i[i];
913         idx  = B->j + B->i[i] - 1;
914         v    = B->a + B->i[i] - 1;
915         SPARSEDENSEMDOT(sum,ls,v,idx,n);
916         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
917       }
918     }
919   }
920   else if (flag & SOR_LOCAL_BACKWARD_SWEEP){
921     if (flag & SOR_ZERO_INITIAL_GUESS) {
922       return MatRelax(mat->A,bb,omega,flag,shift,its,xx);
923     }
924     ierr = VecScatterBegin(xx,0,mat->lvec,0,InsertValues,ScatterAll,
925                             mat->Mvctx); CHKERR(ierr);
926     ierr = VecScatterEnd(xx,0,mat->lvec,0,InsertValues,ScatterAll,
927                             mat->Mvctx); CHKERR(ierr);
928     while (its--) {
929       for ( i=m-1; i>-1; i-- ) {
930         n    = A->i[i+1] - A->i[i];
931         idx  = A->j + A->i[i] - 1;
932         v    = A->a + A->i[i] - 1;
933         sum  = b[i];
934         SPARSEDENSEMDOT(sum,xs,v,idx,n);
935         d    = shift + A->a[diag[i]-1];
936         n    = B->i[i+1] - B->i[i];
937         idx  = B->j + B->i[i] - 1;
938         v    = B->a + B->i[i] - 1;
939         SPARSEDENSEMDOT(sum,ls,v,idx,n);
940         x[i] = (1. - omega)*x[i] + omega*(sum/d + x[i]);
941       }
942     }
943   }
944   return 0;
945 }
946 
947 static int MatGetInfo_MPIAIJ(Mat matin,int flag,int *nz,int *nzalloc,int *mem)
948 {
949   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
950   Mat        A = mat->A, B = mat->B;
951   int        ierr, isend[3], irecv[3], nzA, nzallocA, memA;
952 
953   ierr = MatGetInfo(A,MAT_LOCAL,&nzA,&nzallocA,&memA); CHKERR(ierr);
954   ierr = MatGetInfo(B,MAT_LOCAL,&isend[0],&isend[1],&isend[2]); CHKERR(ierr);
955   isend[0] += nzA; isend[1] += nzallocA; isend[2] += memA;
956   if (flag == MAT_LOCAL) {
957     *nz = isend[0]; *nzalloc = isend[1]; *mem = isend[2];
958   } else if (flag == MAT_GLOBAL_MAX) {
959     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_MAX,matin->comm);
960     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
961   } else if (flag == MAT_GLOBAL_SUM) {
962     MPI_Allreduce((void *) isend,(void *) irecv,3,MPI_INT,MPI_SUM,matin->comm);
963     *nz = irecv[0]; *nzalloc = irecv[1]; *mem = irecv[2];
964   }
965   return 0;
966 }
967 
968 static int MatSetOption_MPIAIJ(Mat aijin,int op)
969 {
970   Mat_MPIAIJ *aij = (Mat_MPIAIJ *) aijin->data;
971 
972   if      (op == NO_NEW_NONZERO_LOCATIONS)  {
973     MatSetOption(aij->A,op);
974     MatSetOption(aij->B,op);
975   }
976   else if (op == YES_NEW_NONZERO_LOCATIONS) {
977     MatSetOption(aij->A,op);
978     MatSetOption(aij->B,op);
979   }
980   else if (op == COLUMN_ORIENTED) SETERR(1,"Column oriented not supported");
981   return 0;
982 }
983 
984 static int MatGetSize_MPIAIJ(Mat matin,int *m,int *n)
985 {
986   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
987   *m = mat->M; *n = mat->N;
988   return 0;
989 }
990 
991 static int MatGetLocalSize_MPIAIJ(Mat matin,int *m,int *n)
992 {
993   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
994   *m = mat->m; *n = mat->n;
995   return 0;
996 }
997 
998 static int MatGetOwnershipRange_MPIAIJ(Mat matin,int *m,int *n)
999 {
1000   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1001   *m = mat->rstart; *n = mat->rend;
1002   return 0;
1003 }
1004 
1005 static int MatGetRow_MPIAIJ(Mat matin,int row,int *nz,int **idx,Scalar **v)
1006 {
1007   Mat_MPIAIJ *mat = (Mat_MPIAIJ *) matin->data;
1008   Scalar     *vworkA, *vworkB, **pvA, **pvB;
1009   int        i, ierr, *cworkA, *cworkB, **pcA, **pcB, cstart = mat->cstart;
1010   int        nztot, nzA, nzB, lrow, rstart = mat->rstart, rend = mat->rend;
1011 
1012   if (!mat->assembled)
1013     SETERR(1,"MatGetRow_MPIAIJ: Must assemble matrix first.");
1014   if (row < rstart || row >= rend)
1015     SETERR(1,"MatGetRow_MPIAIJ: Currently you can get only local rows.")
1016   lrow = row - rstart;
1017 
1018   pvA = &vworkA; pcA = &cworkA; pvB = &vworkB; pcB = &cworkB;
1019   if (!v)   {pvA = 0; pvB = 0;}
1020   if (!idx) {pcA = 0; if (!v) pcB = 0;}
1021   ierr = MatGetRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1022   ierr = MatGetRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1023   nztot = nzA + nzB;
1024 
1025   if (v  || idx) {
1026     if (nztot) {
1027       /* Sort by increasing column numbers, assuming A and B already sorted */
1028       int imark, imark2;
1029       for (i=0; i<nzB; i++) cworkB[i] = mat->garray[cworkB[i]];
1030       if (v) {
1031         *v = (Scalar *) MALLOC( (nztot)*sizeof(Scalar) ); CHKPTR(*v);
1032         for ( i=0; i<nzB; i++ ) {
1033           if (cworkB[i] < cstart)   (*v)[i] = vworkB[i];
1034           else break;
1035         }
1036         imark = i;
1037         for ( i=0; i<nzA; i++ )     (*v)[imark+i] = vworkA[i];
1038         imark2 = imark+nzA;
1039         for ( i=imark; i<nzB; i++ ) (*v)[imark2+i] = vworkB[i];
1040       }
1041       if (idx) {
1042         *idx = (int *) MALLOC( (nztot)*sizeof(int) ); CHKPTR(*idx);
1043         for (i=0; i<nzA; i++) cworkA[i] += cstart;
1044         for ( i=0; i<nzB; i++ ) {
1045           if (cworkB[i] < cstart)   (*idx)[i] = cworkB[i];
1046           else break;
1047         }
1048         imark = i;
1049         for ( i=0; i<nzA; i++ )     (*idx)[imark+i] = cworkA[i];
1050         imark2 = imark+nzA;
1051         for ( i=imark; i<nzB; i++ ) (*idx)[imark2+i] = cworkB[i];
1052       }
1053     }
1054     else {*idx = 0; *v=0;}
1055   }
1056   *nz = nztot;
1057   ierr = MatRestoreRow(mat->A,lrow,&nzA,pcA,pvA); CHKERR(ierr);
1058   ierr = MatRestoreRow(mat->B,lrow,&nzB,pcB,pvB); CHKERR(ierr);
1059   return 0;
1060 }
1061 
1062 static int MatRestoreRow_MPIAIJ(Mat mat,int row,int *nz,int **idx,Scalar **v)
1063 {
1064   if (idx) FREE(*idx);
1065   if (v) FREE(*v);
1066   return 0;
1067 }
1068 
1069 static int MatCopy_MPIAIJ(Mat,Mat *);
1070 extern int MatConvert_MPIAIJ(Mat,MATTYPE,Mat *);
1071 
1072 /* -------------------------------------------------------------------*/
1073 static struct _MatOps MatOps = {MatSetValues_MPIAIJ,
1074        MatGetRow_MPIAIJ,MatRestoreRow_MPIAIJ,
1075        MatMult_MPIAIJ,MatMultAdd_MPIAIJ,
1076        MatMultTrans_MPIAIJ,MatMultTransAdd_MPIAIJ,
1077        0,0,0,0,
1078        0,0,
1079        MatRelax_MPIAIJ,
1080        0,
1081        MatGetInfo_MPIAIJ,0,
1082        MatCopy_MPIAIJ,
1083        MatGetDiagonal_MPIAIJ,0,0,
1084        MatBeginAssembly_MPIAIJ,MatEndAssembly_MPIAIJ,
1085        0,
1086        MatSetOption_MPIAIJ,MatZeroEntries_MPIAIJ,MatZeroRows_MPIAIJ,0,
1087        0,0,0,0,
1088        MatGetSize_MPIAIJ,MatGetLocalSize_MPIAIJ,MatGetOwnershipRange_MPIAIJ,
1089        0,0,
1090        0,MatConvert_MPIAIJ };
1091 
1092 /*@
1093    MatCreateMPIAIJ - Creates a sparse parallel matrix in AIJ format
1094    (the default parallel PETSc format).
1095 
1096    Input Parameters:
1097 .  comm - MPI communicator
1098 .  m - number of local rows (or PETSC_DECIDE to have calculated)
1099 .  n - number of local columns (or PETSC_DECIDE to have calculated)
1100 .  M - number of global rows (or PETSC_DECIDE to have calculated)
1101 .  N - number of global columns (or PETSC_DECIDE to have calculated)
1102 .  d_nz - number of nonzeros per row in diagonal portion of matrix
1103            (same for all local rows)
1104 .  d_nzz - number of nonzeros per row in diagonal portion of matrix or null
1105            (possibly different for each row).  You must leave room for the
1106            diagonal entry even if it is zero.
1107 .  o_nz - number of nonzeros per row in off-diagonal portion of matrix
1108            (same for all local rows)
1109 .  o_nzz - number of nonzeros per row in off-diagonal portion of matrix
1110            or null (possibly different for each row).
1111 
1112    Output Parameter:
1113 .  newmat - the matrix
1114 
1115    Notes:
1116    The AIJ format (also called the Yale sparse matrix format or
1117    compressed row storage), is fully compatible with standard Fortran 77
1118    storage.  That is, the stored row and column indices begin at
1119    one, not zero.
1120 
1121    The user MUST specify either the local or global matrix dimensions
1122    (possibly both).
1123 
1124    The user can set d_nz, d_nnz, o_nz, and o_nnz to zero for PETSc to
1125    control dynamic memory allocation.
1126 
1127 .keywords: Mat, matrix, aij, compressed row, sparse, parallel
1128 
1129 .seealso: MatCreateSequentialAIJ(), MatSetValues()
1130 @*/
1131 int MatCreateMPIAIJ(MPI_Comm comm,int m,int n,int M,int N,
1132                  int d_nz,int *d_nnz, int o_nz,int *o_nnz,Mat *newmat)
1133 {
1134   Mat          mat;
1135   Mat_MPIAIJ   *aij;
1136   int          ierr, i,sum[2],work[2];
1137   *newmat         = 0;
1138   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,comm);
1139   PLogObjectCreate(mat);
1140   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1141   mat->ops        = &MatOps;
1142   mat->destroy    = MatDestroy_MPIAIJ;
1143   mat->view       = MatView_MPIAIJ;
1144   mat->factor     = 0;
1145   mat->row        = 0;
1146   mat->col        = 0;
1147 
1148   mat->comm       = comm;
1149   aij->insertmode = NotSetValues;
1150   MPI_Comm_rank(comm,&aij->mytid);
1151   MPI_Comm_size(comm,&aij->numtids);
1152 
1153   if (M == -1 || N == -1) {
1154     work[0] = m; work[1] = n;
1155     MPI_Allreduce((void *) work,(void *) sum,2,MPI_INT,MPI_SUM,comm );
1156     if (M == -1) M = sum[0];
1157     if (N == -1) N = sum[1];
1158   }
1159   if (m == -1) {m = M/aij->numtids + ((M % aij->numtids) > aij->mytid);}
1160   if (n == -1) {n = N/aij->numtids + ((N % aij->numtids) > aij->mytid);}
1161   aij->m       = m;
1162   aij->n       = n;
1163   aij->N       = N;
1164   aij->M       = M;
1165 
1166   /* build local table of row and column ownerships */
1167   aij->rowners = (int *) MALLOC(2*(aij->numtids+2)*sizeof(int));
1168   CHKPTR(aij->rowners);
1169   aij->cowners = aij->rowners + aij->numtids + 1;
1170   MPI_Allgather(&m,1,MPI_INT,aij->rowners+1,1,MPI_INT,comm);
1171   aij->rowners[0] = 0;
1172   for ( i=2; i<=aij->numtids; i++ ) {
1173     aij->rowners[i] += aij->rowners[i-1];
1174   }
1175   aij->rstart = aij->rowners[aij->mytid];
1176   aij->rend   = aij->rowners[aij->mytid+1];
1177   MPI_Allgather(&n,1,MPI_INT,aij->cowners+1,1,MPI_INT,comm);
1178   aij->cowners[0] = 0;
1179   for ( i=2; i<=aij->numtids; i++ ) {
1180     aij->cowners[i] += aij->cowners[i-1];
1181   }
1182   aij->cstart = aij->cowners[aij->mytid];
1183   aij->cend   = aij->cowners[aij->mytid+1];
1184 
1185 
1186   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,n,d_nz,d_nnz,&aij->A);
1187   CHKERR(ierr);
1188   PLogObjectParent(mat,aij->A);
1189   ierr = MatCreateSequentialAIJ(MPI_COMM_SELF,m,N,o_nz,o_nnz,&aij->B);
1190   CHKERR(ierr);
1191   PLogObjectParent(mat,aij->B);
1192 
1193   /* build cache for off array entries formed */
1194   aij->stash.nmax = CHUNCKSIZE; /* completely arbratray number */
1195   aij->stash.n    = 0;
1196   aij->stash.array = (Scalar *) MALLOC( aij->stash.nmax*(2*sizeof(int) +
1197                             sizeof(Scalar))); CHKPTR(aij->stash.array);
1198   aij->stash.idx = (int *) (aij->stash.array + aij->stash.nmax);
1199   aij->stash.idy = (int *) (aij->stash.idx + aij->stash.nmax);
1200   aij->colmap    = 0;
1201   aij->garray    = 0;
1202 
1203   /* stuff used for matrix vector multiply */
1204   aij->lvec      = 0;
1205   aij->Mvctx     = 0;
1206   aij->assembled = 0;
1207 
1208   *newmat = mat;
1209   return 0;
1210 }
1211 
1212 static int MatCopy_MPIAIJ(Mat matin,Mat *newmat)
1213 {
1214   Mat        mat;
1215   Mat_MPIAIJ *aij,*oldmat = (Mat_MPIAIJ *) matin->data;
1216   int        ierr, len;
1217   *newmat      = 0;
1218 
1219   if (!oldmat->assembled) SETERR(1,"Cannot copy unassembled matrix");
1220   PETSCHEADERCREATE(mat,_Mat,MAT_COOKIE,MATMPIAIJ,matin->comm);
1221   PLogObjectCreate(mat);
1222   mat->data       = (void *) (aij = NEW(Mat_MPIAIJ)); CHKPTR(aij);
1223   mat->ops        = &MatOps;
1224   mat->destroy    = MatDestroy_MPIAIJ;
1225   mat->view       = MatView_MPIAIJ;
1226   mat->factor     = matin->factor;
1227   mat->row        = 0;
1228   mat->col        = 0;
1229 
1230   aij->m          = oldmat->m;
1231   aij->n          = oldmat->n;
1232   aij->M          = oldmat->M;
1233   aij->N          = oldmat->N;
1234 
1235   aij->assembled  = 1;
1236   aij->rstart     = oldmat->rstart;
1237   aij->rend       = oldmat->rend;
1238   aij->cstart     = oldmat->cstart;
1239   aij->cend       = oldmat->cend;
1240   aij->numtids    = oldmat->numtids;
1241   aij->mytid      = oldmat->mytid;
1242   aij->insertmode = NotSetValues;
1243 
1244   aij->rowners        = (int *) MALLOC( (aij->numtids+1)*sizeof(int) );
1245   CHKPTR(aij->rowners);
1246   MEMCPY(aij->rowners,oldmat->rowners,(aij->numtids+1)*sizeof(int));
1247   aij->stash.nmax     = 0;
1248   aij->stash.n        = 0;
1249   aij->stash.array    = 0;
1250   if (oldmat->colmap) {
1251     aij->colmap      = (int *) MALLOC( (aij->N)*sizeof(int) );
1252     CHKPTR(aij->colmap);
1253     MEMCPY(aij->colmap,oldmat->colmap,(aij->N)*sizeof(int));
1254   } else aij->colmap = 0;
1255     if (oldmat->garray && (len = ((Mat_AIJ *) (oldmat->B->data))->n)) {
1256     aij->garray      = (int *) MALLOC(len*sizeof(int) ); CHKPTR(aij->garray);
1257     MEMCPY(aij->garray,oldmat->garray,len*sizeof(int));
1258   } else aij->garray = 0;
1259   mat->comm           = matin->comm;
1260 
1261   ierr =  VecCreate(oldmat->lvec,&aij->lvec); CHKERR(ierr);
1262   PLogObjectParent(mat,aij->lvec);
1263   ierr =  VecScatterCtxCopy(oldmat->Mvctx,&aij->Mvctx); CHKERR(ierr);
1264   PLogObjectParent(mat,aij->Mvctx);
1265   ierr =  MatCopy(oldmat->A,&aij->A); CHKERR(ierr);
1266   PLogObjectParent(mat,aij->A);
1267   ierr =  MatCopy(oldmat->B,&aij->B); CHKERR(ierr);
1268   PLogObjectParent(mat,aij->B);
1269   *newmat = mat;
1270   return 0;
1271 }
1272