xref: /petsc/src/dm/impls/da/dadd.c (revision 3307d110e72ee4e6d2468971620073eb5ff93529)
1 #include <petsc/private/dmdaimpl.h>  /*I   "petscdmda.h"   I*/
2 
3 /*@
4   DMDACreatePatchIS - Creates an index set corresponding to a patch of the DA.
5 
6   Collective
7 
8   Input Parameters:
9 +  da - the DMDA
10 .  lower - a matstencil with i, j and k corresponding to the lower corner of the patch
11 .  upper - a matstencil with i, j and k corresponding to the upper corner of the patch
12 -  offproc - indicate whether the returned IS will contain off process indices
13 
14   Output Parameters:
15 .  is - the IS corresponding to the patch
16 
17   Level: developer
18 
19 Notes:
20 This routine always returns an IS on the DMDA's comm, if offproc is set to PETSC_TRUE,
21 the routine returns an IS with all the indices requested regardless of whether these indices
22 are present on the requesting rank or not. Thus, it is upon the caller to ensure that
23 the indices returned in this mode are appropriate. If offproc is set to PETSC_FALSE,
24 the IS only returns the subset of indices that are present on the requesting rank and there
25 is no duplication of indices.
26 
27 .seealso: DMDACreateDomainDecomposition(), DMDACreateDomainDecompositionScatters()
28 @*/
29 PetscErrorCode DMDACreatePatchIS(DM da,MatStencil *lower,MatStencil *upper,IS *is, PetscBool offproc)
30 {
31   PetscInt       ms=0,ns=0,ps=0;
32   PetscInt       mw=0,nw=0,pw=0;
33   PetscInt       me=1,ne=1,pe=1;
34   PetscInt       mr=0,nr=0,pr=0;
35   PetscInt       ii,jj,kk;
36   PetscInt       si,sj,sk;
37   PetscInt       i,j,k,l,idx=0;
38   PetscInt       base;
39   PetscInt       xm=1,ym=1,zm=1;
40   PetscInt       ox,oy,oz;
41   PetscInt       m,n,p,M,N,P,dof;
42   const PetscInt *lx,*ly,*lz;
43   PetscInt       nindices;
44   PetscInt       *indices;
45   DM_DA          *dd = (DM_DA*)da->data;
46   PetscBool      skip_i=PETSC_TRUE, skip_j=PETSC_TRUE, skip_k=PETSC_TRUE;
47   PetscBool      valid_j=PETSC_FALSE, valid_k=PETSC_FALSE; /* DMDA has at least 1 dimension */
48 
49   PetscFunctionBegin;
50   M = dd->M; N = dd->N; P = dd->P;
51   m = dd->m; n = dd->n; p = dd->p;
52   dof = dd->w;
53 
54   nindices = -1;
55   if (PetscLikely(upper->i - lower->i)) {
56     nindices = nindices*(upper->i - lower->i);
57     skip_i=PETSC_FALSE;
58   }
59   if (N>1) {
60     valid_j = PETSC_TRUE;
61     if (PetscLikely(upper->j - lower->j)) {
62       nindices = nindices*(upper->j - lower->j);
63       skip_j=PETSC_FALSE;
64     }
65   }
66   if (P>1) {
67     valid_k = PETSC_TRUE;
68     if (PetscLikely(upper->k - lower->k)) {
69       nindices = nindices*(upper->k - lower->k);
70       skip_k=PETSC_FALSE;
71     }
72   }
73   if (PetscLikely(nindices<0)) {
74     if (PetscUnlikely(skip_i && skip_j && skip_k)) {
75       nindices = 0;
76     } else nindices = nindices*(-1);
77   } else SETERRQ(PetscObjectComm((PetscObject)da),PETSC_ERR_ARG_WRONG,"Lower and Upper stencils are identical! Please check inputs.");
78 
79   PetscCall(PetscMalloc1(nindices*dof,&indices));
80   PetscCall(DMDAGetOffset(da,&ox,&oy,&oz,NULL,NULL,NULL));
81 
82   if (!valid_k) {
83     k = 0;
84     upper->k=0;
85     lower->k=0;
86   }
87   if (!valid_j) {
88     j = 0;
89     upper->j=0;
90     lower->j=0;
91   }
92 
93   if (offproc) {
94     PetscCall(DMDAGetOwnershipRanges(da,&lx,&ly,&lz));
95     /* start at index 0 on processor 0 */
96     mr = 0;
97     nr = 0;
98     pr = 0;
99     ms = 0;
100     ns = 0;
101     ps = 0;
102     if (lx) me = lx[0];
103     if (ly) ne = ly[0];
104     if (lz) pe = lz[0];
105     /*
106        If no indices are to be returned, create an empty is,
107        this prevents hanging in while loops
108     */
109     if (skip_i && skip_j && skip_k) goto createis;
110     /*
111        do..while loops to ensure the block gets entered once,
112        regardless of control condition being met, necessary for
113        cases when a subset of skip_i/j/k is true
114     */
115     if (skip_k) k = upper->k-oz; else k = lower->k-oz;
116     do {
117       if (skip_j) j = upper->j-oy; else j = lower->j-oy;
118       do {
119         if (skip_i) i = upper->i-ox; else i = lower->i-ox;
120         do {
121           /* "actual" indices rather than ones outside of the domain */
122           ii = i;
123           jj = j;
124           kk = k;
125           if (ii < 0) ii = M + ii;
126           if (jj < 0) jj = N + jj;
127           if (kk < 0) kk = P + kk;
128           if (ii > M-1) ii = ii - M;
129           if (jj > N-1) jj = jj - N;
130           if (kk > P-1) kk = kk - P;
131           /* gone out of processor range on x axis */
132           while (ii > me-1 || ii < ms) {
133             if (mr == m-1) {
134               ms = 0;
135               me = lx[0];
136               mr = 0;
137             } else {
138               mr++;
139               ms = me;
140               me += lx[mr];
141             }
142           }
143           /* gone out of processor range on y axis */
144           while (jj > ne-1 || jj < ns) {
145             if (nr == n-1) {
146               ns = 0;
147               ne = ly[0];
148               nr = 0;
149             } else {
150               nr++;
151               ns = ne;
152               ne += ly[nr];
153             }
154           }
155           /* gone out of processor range on z axis */
156           while (kk > pe-1 || kk < ps) {
157             if (pr == p-1) {
158               ps = 0;
159               pe = lz[0];
160               pr = 0;
161             } else {
162               pr++;
163               ps = pe;
164               pe += lz[pr];
165             }
166           }
167           /* compute the vector base on owning processor */
168           xm = me - ms;
169           ym = ne - ns;
170           zm = pe - ps;
171           base = ms*ym*zm + ns*M*zm + ps*M*N;
172           /* compute the local coordinates on owning processor */
173           si = ii - ms;
174           sj = jj - ns;
175           sk = kk - ps;
176           for (l=0;l<dof;l++) {
177             indices[idx] = l + dof*(base + si + xm*sj + xm*ym*sk);
178             idx++;
179           }
180           i++;
181         } while (i<upper->i-ox);
182         j++;
183       } while (j<upper->j-oy);
184       k++;
185     } while (k<upper->k-oz);
186   }
187 
188   if (!offproc) {
189     PetscCall(DMDAGetCorners(da, &ms, &ns, &ps, &mw, &nw, &pw));
190     me = ms + mw;
191     if (N>1) ne = ns + nw;
192     if (P>1) pe = ps + pw;
193     /* Account for DM offsets */
194     ms = ms - ox; me = me - ox;
195     ns = ns - oy; ne = ne - oy;
196     ps = ps - oz; pe = pe - oz;
197 
198     /* compute the vector base on owning processor */
199     xm = me - ms;
200     ym = ne - ns;
201     zm = pe - ps;
202     base = ms*ym*zm + ns*M*zm + ps*M*N;
203     /*
204        if no indices are to be returned, create an empty is,
205        this prevents hanging in while loops
206     */
207     if (skip_i && skip_j && skip_k) goto createis;
208     /*
209        do..while loops to ensure the block gets entered once,
210        regardless of control condition being met, necessary for
211        cases when a subset of skip_i/j/k is true
212     */
213     if (skip_k) k = upper->k-oz; else k = lower->k-oz;
214     do {
215       if (skip_j) j = upper->j-oy; else j = lower->j-oy;
216       do {
217         if (skip_i) i = upper->i-ox; else i = lower->i-ox;
218         do {
219           if (k>=ps && k<=pe-1) {
220             if (j>=ns && j<=ne-1) {
221               if (i>=ms && i<=me-1) {
222                 /* compute the local coordinates on owning processor */
223                 si = i - ms;
224                 sj = j - ns;
225                 sk = k - ps;
226                 for (l=0; l<dof; l++) {
227                   indices[idx] = l + dof*(base + si + xm*sj + xm*ym*sk);
228                   idx++;
229                 }
230               }
231             }
232           }
233           i++;
234         } while (i<upper->i-ox);
235         j++;
236       } while (j<upper->j-oy);
237       k++;
238     } while (k<upper->k-oz);
239 
240     PetscCall(PetscRealloc((size_t)(idx*sizeof(PetscInt)), (void*)&indices));
241   }
242 
243   createis:
244   PetscCall(ISCreateGeneral(PetscObjectComm((PetscObject)da),idx,indices,PETSC_OWN_POINTER,is));
245   PetscFunctionReturn(0);
246 }
247 
248 PetscErrorCode DMDASubDomainDA_Private(DM dm, PetscInt *nlocal, DM **sdm)
249 {
250   DM             *da;
251   PetscInt       dim,size,i,j,k,idx;
252   DMDALocalInfo  info;
253   PetscInt       xsize,ysize,zsize;
254   PetscInt       xo,yo,zo;
255   PetscInt       xs,ys,zs;
256   PetscInt       xm=1,ym=1,zm=1;
257   PetscInt       xol,yol,zol;
258   PetscInt       m=1,n=1,p=1;
259   PetscInt       M,N,P;
260   PetscInt       pm,mtmp;
261 
262   PetscFunctionBegin;
263   PetscCall(DMDAGetLocalInfo(dm,&info));
264   PetscCall(DMDAGetOverlap(dm,&xol,&yol,&zol));
265   PetscCall(DMDAGetNumLocalSubDomains(dm,&size));
266   PetscCall(PetscMalloc1(size,&da));
267 
268   if (nlocal) *nlocal = size;
269 
270   dim = info.dim;
271 
272   M = info.xm;
273   N = info.ym;
274   P = info.zm;
275 
276   if (dim == 1) {
277     m = size;
278   } else if (dim == 2) {
279     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M)*((PetscReal)size)/((PetscReal)N)));
280     while (m > 0) {
281       n = size/m;
282       if (m*n*p == size) break;
283       m--;
284     }
285   } else if (dim == 3) {
286     n = (PetscInt)(0.5 + PetscPowReal(((PetscReal)N*N)*((PetscReal)size)/((PetscReal)P*M),(PetscReal)(1./3.))); if (!n) n = 1;
287     while (n > 0) {
288       pm = size/n;
289       if (n*pm == size) break;
290       n--;
291     }
292     if (!n) n = 1;
293     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M)*((PetscReal)size)/((PetscReal)P*n)));
294     if (!m) m = 1;
295     while (m > 0) {
296       p = size/(m*n);
297       if (m*n*p == size) break;
298       m--;
299     }
300     if (M > P && m < p) {mtmp = m; m = p; p = mtmp;}
301   }
302 
303   zs = info.zs;
304   idx = 0;
305   for (k = 0; k < p; k++) {
306     ys = info.ys;
307     for (j = 0; j < n; j++) {
308       xs = info.xs;
309       for (i = 0; i < m; i++) {
310         if (dim == 1) {
311           xm = M/m + ((M % m) > i);
312         } else if (dim == 2) {
313           xm = M/m + ((M % m) > i);
314           ym = N/n + ((N % n) > j);
315         } else if (dim == 3) {
316           xm = M/m + ((M % m) > i);
317           ym = N/n + ((N % n) > j);
318           zm = P/p + ((P % p) > k);
319         }
320 
321         xsize = xm;
322         ysize = ym;
323         zsize = zm;
324         xo = xs;
325         yo = ys;
326         zo = zs;
327 
328         PetscCall(DMDACreate(PETSC_COMM_SELF,&(da[idx])));
329         PetscCall(DMSetOptionsPrefix(da[idx],"sub_"));
330         PetscCall(DMSetDimension(da[idx], info.dim));
331         PetscCall(DMDASetDof(da[idx], info.dof));
332 
333         PetscCall(DMDASetStencilType(da[idx],info.st));
334         PetscCall(DMDASetStencilWidth(da[idx],info.sw));
335 
336         if (info.bx == DM_BOUNDARY_PERIODIC || (xs != 0)) {
337           xsize += xol;
338           xo    -= xol;
339         }
340         if (info.by == DM_BOUNDARY_PERIODIC || (ys != 0)) {
341           ysize += yol;
342           yo    -= yol;
343         }
344         if (info.bz == DM_BOUNDARY_PERIODIC || (zs != 0)) {
345           zsize += zol;
346           zo    -= zol;
347         }
348 
349         if (info.bx == DM_BOUNDARY_PERIODIC || (xs+xm != info.mx)) xsize += xol;
350         if (info.by == DM_BOUNDARY_PERIODIC || (ys+ym != info.my)) ysize += yol;
351         if (info.bz == DM_BOUNDARY_PERIODIC || (zs+zm != info.mz)) zsize += zol;
352 
353         if (info.bx != DM_BOUNDARY_PERIODIC) {
354           if (xo < 0) {
355             xsize += xo;
356             xo = 0;
357           }
358           if (xo+xsize > info.mx-1) {
359             xsize -= xo+xsize - info.mx;
360           }
361         }
362         if (info.by != DM_BOUNDARY_PERIODIC) {
363           if (yo < 0) {
364             ysize += yo;
365             yo = 0;
366           }
367           if (yo+ysize > info.my-1) {
368             ysize -= yo+ysize - info.my;
369           }
370         }
371         if (info.bz != DM_BOUNDARY_PERIODIC) {
372           if (zo < 0) {
373             zsize += zo;
374             zo = 0;
375           }
376           if (zo+zsize > info.mz-1) {
377             zsize -= zo+zsize - info.mz;
378           }
379         }
380 
381         PetscCall(DMDASetSizes(da[idx], xsize, ysize, zsize));
382         PetscCall(DMDASetNumProcs(da[idx], 1, 1, 1));
383         PetscCall(DMDASetBoundaryType(da[idx], DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED));
384 
385         /* set up as a block instead */
386         PetscCall(DMSetUp(da[idx]));
387 
388         /* nonoverlapping region */
389         PetscCall(DMDASetNonOverlappingRegion(da[idx],xs,ys,zs,xm,ym,zm));
390 
391         /* this alters the behavior of DMDAGetInfo, DMDAGetLocalInfo, DMDAGetCorners, and DMDAGetGhostedCorners and should be used with care */
392         PetscCall(DMDASetOffset(da[idx],xo,yo,zo,info.mx,info.my,info.mz));
393         xs += xm;
394         idx++;
395       }
396       ys += ym;
397     }
398     zs += zm;
399   }
400   *sdm = da;
401   PetscFunctionReturn(0);
402 }
403 
404 /*
405    Fills the local vector problem on the subdomain from the global problem.
406 
407    Right now this assumes one subdomain per processor.
408 
409 */
410 PetscErrorCode DMCreateDomainDecompositionScatters_DA(DM dm,PetscInt nsubdms,DM *subdms,VecScatter **iscat,VecScatter **oscat, VecScatter **lscat)
411 {
412   DMDALocalInfo  info,subinfo;
413   DM             subdm;
414   MatStencil     upper,lower;
415   IS             idis,isis,odis,osis,gdis;
416   Vec            svec,dvec,slvec;
417   PetscInt       xm,ym,zm,xs,ys,zs;
418   PetscInt       i;
419   PetscBool      patchis_offproc = PETSC_TRUE;
420 
421   PetscFunctionBegin;
422   /* allocate the arrays of scatters */
423   if (iscat) PetscCall(PetscMalloc1(nsubdms,iscat));
424   if (oscat) PetscCall(PetscMalloc1(nsubdms,oscat));
425   if (lscat) PetscCall(PetscMalloc1(nsubdms,lscat));
426 
427   PetscCall(DMDAGetLocalInfo(dm,&info));
428   for (i = 0; i < nsubdms; i++) {
429     subdm = subdms[i];
430     PetscCall(DMDAGetLocalInfo(subdm,&subinfo));
431     PetscCall(DMDAGetNonOverlappingRegion(subdm,&xs,&ys,&zs,&xm,&ym,&zm));
432 
433     /* create the global and subdomain index sets for the inner domain */
434     lower.i = xs;
435     lower.j = ys;
436     lower.k = zs;
437     upper.i = xs+xm;
438     upper.j = ys+ym;
439     upper.k = zs+zm;
440     PetscCall(DMDACreatePatchIS(dm,&lower,&upper,&idis,patchis_offproc));
441     PetscCall(DMDACreatePatchIS(subdm,&lower,&upper,&isis,patchis_offproc));
442 
443     /* create the global and subdomain index sets for the outer subdomain */
444     lower.i = subinfo.xs;
445     lower.j = subinfo.ys;
446     lower.k = subinfo.zs;
447     upper.i = subinfo.xs+subinfo.xm;
448     upper.j = subinfo.ys+subinfo.ym;
449     upper.k = subinfo.zs+subinfo.zm;
450     PetscCall(DMDACreatePatchIS(dm,&lower,&upper,&odis,patchis_offproc));
451     PetscCall(DMDACreatePatchIS(subdm,&lower,&upper,&osis,patchis_offproc));
452 
453     /* global and subdomain ISes for the local indices of the subdomain */
454     /* todo - make this not loop over at nonperiodic boundaries, which will be more involved */
455     lower.i = subinfo.gxs;
456     lower.j = subinfo.gys;
457     lower.k = subinfo.gzs;
458     upper.i = subinfo.gxs+subinfo.gxm;
459     upper.j = subinfo.gys+subinfo.gym;
460     upper.k = subinfo.gzs+subinfo.gzm;
461     PetscCall(DMDACreatePatchIS(dm,&lower,&upper,&gdis,patchis_offproc));
462 
463     /* form the scatter */
464     PetscCall(DMGetGlobalVector(dm,&dvec));
465     PetscCall(DMGetGlobalVector(subdm,&svec));
466     PetscCall(DMGetLocalVector(subdm,&slvec));
467 
468     if (iscat) PetscCall(VecScatterCreate(dvec,idis,svec,isis,&(*iscat)[i]));
469     if (oscat) PetscCall(VecScatterCreate(dvec,odis,svec,osis,&(*oscat)[i]));
470     if (lscat) PetscCall(VecScatterCreate(dvec,gdis,slvec,NULL,&(*lscat)[i]));
471 
472     PetscCall(DMRestoreGlobalVector(dm,&dvec));
473     PetscCall(DMRestoreGlobalVector(subdm,&svec));
474     PetscCall(DMRestoreLocalVector(subdm,&slvec));
475 
476     PetscCall(ISDestroy(&idis));
477     PetscCall(ISDestroy(&isis));
478 
479     PetscCall(ISDestroy(&odis));
480     PetscCall(ISDestroy(&osis));
481 
482     PetscCall(ISDestroy(&gdis));
483   }
484   PetscFunctionReturn(0);
485 }
486 
487 PetscErrorCode DMDASubDomainIS_Private(DM dm,PetscInt n,DM *subdm,IS **iis,IS **ois)
488 {
489   PetscInt       i;
490   DMDALocalInfo  info,subinfo;
491   MatStencil     lower,upper;
492   PetscBool      patchis_offproc = PETSC_TRUE;
493 
494   PetscFunctionBegin;
495   PetscCall(DMDAGetLocalInfo(dm,&info));
496   if (iis) PetscCall(PetscMalloc1(n,iis));
497   if (ois) PetscCall(PetscMalloc1(n,ois));
498 
499   for (i = 0;i < n; i++) {
500     PetscCall(DMDAGetLocalInfo(subdm[i],&subinfo));
501     if (iis) {
502       /* create the inner IS */
503       lower.i = info.xs;
504       lower.j = info.ys;
505       lower.k = info.zs;
506       upper.i = info.xs+info.xm;
507       upper.j = info.ys+info.ym;
508       upper.k = info.zs+info.zm;
509       PetscCall(DMDACreatePatchIS(dm,&lower,&upper,&(*iis)[i],patchis_offproc));
510     }
511 
512     if (ois) {
513       /* create the outer IS */
514       lower.i = subinfo.xs;
515       lower.j = subinfo.ys;
516       lower.k = subinfo.zs;
517       upper.i = subinfo.xs+subinfo.xm;
518       upper.j = subinfo.ys+subinfo.ym;
519       upper.k = subinfo.zs+subinfo.zm;
520       PetscCall(DMDACreatePatchIS(dm,&lower,&upper,&(*ois)[i],patchis_offproc));
521     }
522   }
523   PetscFunctionReturn(0);
524 }
525 
526 PetscErrorCode DMCreateDomainDecomposition_DA(DM dm,PetscInt *len,char ***names,IS **iis,IS **ois,DM **subdm)
527 {
528   DM             *sdm;
529   PetscInt       n,i;
530 
531   PetscFunctionBegin;
532   PetscCall(DMDASubDomainDA_Private(dm,&n,&sdm));
533   if (names) {
534     PetscCall(PetscMalloc1(n,names));
535     for (i=0;i<n;i++) (*names)[i] = NULL;
536   }
537   PetscCall(DMDASubDomainIS_Private(dm,n,sdm,iis,ois));
538   if (subdm) *subdm = sdm;
539   else {
540     for (i=0;i<n;i++) {
541       PetscCall(DMDestroy(&sdm[i]));
542     }
543   }
544   if (len) *len = n;
545   PetscFunctionReturn(0);
546 }
547