xref: /petsc/src/dm/impls/da/dadd.c (revision a69119a591a03a9d906b29c0a4e9802e4d7c9795)
1 #include <petsc/private/dmdaimpl.h> /*I   "petscdmda.h"   I*/
2 
3 /*@
4   DMDACreatePatchIS - Creates an index set corresponding to a patch of the DA.
5 
6   Collective
7 
8   Input Parameters:
9 +  da - the DMDA
10 .  lower - a matstencil with i, j and k corresponding to the lower corner of the patch
11 .  upper - a matstencil with i, j and k corresponding to the upper corner of the patch
12 -  offproc - indicate whether the returned IS will contain off process indices
13 
14   Output Parameters:
15 .  is - the IS corresponding to the patch
16 
17   Level: developer
18 
19 Notes:
20 This routine always returns an IS on the DMDA's comm, if offproc is set to PETSC_TRUE,
21 the routine returns an IS with all the indices requested regardless of whether these indices
22 are present on the requesting rank or not. Thus, it is upon the caller to ensure that
23 the indices returned in this mode are appropriate. If offproc is set to PETSC_FALSE,
24 the IS only returns the subset of indices that are present on the requesting rank and there
25 is no duplication of indices.
26 
27 .seealso: `DMCreateDomainDecomposition()`, `DMCreateDomainDecompositionScatters()`
28 @*/
29 PetscErrorCode DMDACreatePatchIS(DM da, MatStencil *lower, MatStencil *upper, IS *is, PetscBool offproc) {
30   PetscInt        ms = 0, ns = 0, ps = 0;
31   PetscInt        mw = 0, nw = 0, pw = 0;
32   PetscInt        me = 1, ne = 1, pe = 1;
33   PetscInt        mr = 0, nr = 0, pr = 0;
34   PetscInt        ii, jj, kk;
35   PetscInt        si, sj, sk;
36   PetscInt        i, j, k, l, idx = 0;
37   PetscInt        base;
38   PetscInt        xm = 1, ym = 1, zm = 1;
39   PetscInt        ox, oy, oz;
40   PetscInt        m, n, p, M, N, P, dof;
41   const PetscInt *lx, *ly, *lz;
42   PetscInt        nindices;
43   PetscInt       *indices;
44   DM_DA          *dd     = (DM_DA *)da->data;
45   PetscBool       skip_i = PETSC_TRUE, skip_j = PETSC_TRUE, skip_k = PETSC_TRUE;
46   PetscBool       valid_j = PETSC_FALSE, valid_k = PETSC_FALSE; /* DMDA has at least 1 dimension */
47 
48   PetscFunctionBegin;
49   M   = dd->M;
50   N   = dd->N;
51   P   = dd->P;
52   m   = dd->m;
53   n   = dd->n;
54   p   = dd->p;
55   dof = dd->w;
56 
57   nindices = -1;
58   if (PetscLikely(upper->i - lower->i)) {
59     nindices = nindices * (upper->i - lower->i);
60     skip_i   = PETSC_FALSE;
61   }
62   if (N > 1) {
63     valid_j = PETSC_TRUE;
64     if (PetscLikely(upper->j - lower->j)) {
65       nindices = nindices * (upper->j - lower->j);
66       skip_j   = PETSC_FALSE;
67     }
68   }
69   if (P > 1) {
70     valid_k = PETSC_TRUE;
71     if (PetscLikely(upper->k - lower->k)) {
72       nindices = nindices * (upper->k - lower->k);
73       skip_k   = PETSC_FALSE;
74     }
75   }
76   if (PetscLikely(nindices < 0)) {
77     if (PetscUnlikely(skip_i && skip_j && skip_k)) {
78       nindices = 0;
79     } else nindices = nindices * (-1);
80   } else SETERRQ(PetscObjectComm((PetscObject)da), PETSC_ERR_ARG_WRONG, "Lower and Upper stencils are identical! Please check inputs.");
81 
82   PetscCall(PetscMalloc1(nindices * dof, &indices));
83   PetscCall(DMDAGetOffset(da, &ox, &oy, &oz, NULL, NULL, NULL));
84 
85   if (!valid_k) {
86     k        = 0;
87     upper->k = 0;
88     lower->k = 0;
89   }
90   if (!valid_j) {
91     j        = 0;
92     upper->j = 0;
93     lower->j = 0;
94   }
95 
96   if (offproc) {
97     PetscCall(DMDAGetOwnershipRanges(da, &lx, &ly, &lz));
98     /* start at index 0 on processor 0 */
99     mr = 0;
100     nr = 0;
101     pr = 0;
102     ms = 0;
103     ns = 0;
104     ps = 0;
105     if (lx) me = lx[0];
106     if (ly) ne = ly[0];
107     if (lz) pe = lz[0];
108     /*
109        If no indices are to be returned, create an empty is,
110        this prevents hanging in while loops
111     */
112     if (skip_i && skip_j && skip_k) goto createis;
113     /*
114        do..while loops to ensure the block gets entered once,
115        regardless of control condition being met, necessary for
116        cases when a subset of skip_i/j/k is true
117     */
118     if (skip_k) k = upper->k - oz;
119     else k = lower->k - oz;
120     do {
121       if (skip_j) j = upper->j - oy;
122       else j = lower->j - oy;
123       do {
124         if (skip_i) i = upper->i - ox;
125         else i = lower->i - ox;
126         do {
127           /* "actual" indices rather than ones outside of the domain */
128           ii = i;
129           jj = j;
130           kk = k;
131           if (ii < 0) ii = M + ii;
132           if (jj < 0) jj = N + jj;
133           if (kk < 0) kk = P + kk;
134           if (ii > M - 1) ii = ii - M;
135           if (jj > N - 1) jj = jj - N;
136           if (kk > P - 1) kk = kk - P;
137           /* gone out of processor range on x axis */
138           while (ii > me - 1 || ii < ms) {
139             if (mr == m - 1) {
140               ms = 0;
141               me = lx[0];
142               mr = 0;
143             } else {
144               mr++;
145               ms = me;
146               me += lx[mr];
147             }
148           }
149           /* gone out of processor range on y axis */
150           while (jj > ne - 1 || jj < ns) {
151             if (nr == n - 1) {
152               ns = 0;
153               ne = ly[0];
154               nr = 0;
155             } else {
156               nr++;
157               ns = ne;
158               ne += ly[nr];
159             }
160           }
161           /* gone out of processor range on z axis */
162           while (kk > pe - 1 || kk < ps) {
163             if (pr == p - 1) {
164               ps = 0;
165               pe = lz[0];
166               pr = 0;
167             } else {
168               pr++;
169               ps = pe;
170               pe += lz[pr];
171             }
172           }
173           /* compute the vector base on owning processor */
174           xm   = me - ms;
175           ym   = ne - ns;
176           zm   = pe - ps;
177           base = ms * ym * zm + ns * M * zm + ps * M * N;
178           /* compute the local coordinates on owning processor */
179           si   = ii - ms;
180           sj   = jj - ns;
181           sk   = kk - ps;
182           for (l = 0; l < dof; l++) {
183             indices[idx] = l + dof * (base + si + xm * sj + xm * ym * sk);
184             idx++;
185           }
186           i++;
187         } while (i < upper->i - ox);
188         j++;
189       } while (j < upper->j - oy);
190       k++;
191     } while (k < upper->k - oz);
192   }
193 
194   if (!offproc) {
195     PetscCall(DMDAGetCorners(da, &ms, &ns, &ps, &mw, &nw, &pw));
196     me = ms + mw;
197     if (N > 1) ne = ns + nw;
198     if (P > 1) pe = ps + pw;
199     /* Account for DM offsets */
200     ms = ms - ox;
201     me = me - ox;
202     ns = ns - oy;
203     ne = ne - oy;
204     ps = ps - oz;
205     pe = pe - oz;
206 
207     /* compute the vector base on owning processor */
208     xm   = me - ms;
209     ym   = ne - ns;
210     zm   = pe - ps;
211     base = ms * ym * zm + ns * M * zm + ps * M * N;
212     /*
213        if no indices are to be returned, create an empty is,
214        this prevents hanging in while loops
215     */
216     if (skip_i && skip_j && skip_k) goto createis;
217     /*
218        do..while loops to ensure the block gets entered once,
219        regardless of control condition being met, necessary for
220        cases when a subset of skip_i/j/k is true
221     */
222     if (skip_k) k = upper->k - oz;
223     else k = lower->k - oz;
224     do {
225       if (skip_j) j = upper->j - oy;
226       else j = lower->j - oy;
227       do {
228         if (skip_i) i = upper->i - ox;
229         else i = lower->i - ox;
230         do {
231           if (k >= ps && k <= pe - 1) {
232             if (j >= ns && j <= ne - 1) {
233               if (i >= ms && i <= me - 1) {
234                 /* compute the local coordinates on owning processor */
235                 si = i - ms;
236                 sj = j - ns;
237                 sk = k - ps;
238                 for (l = 0; l < dof; l++) {
239                   indices[idx] = l + dof * (base + si + xm * sj + xm * ym * sk);
240                   idx++;
241                 }
242               }
243             }
244           }
245           i++;
246         } while (i < upper->i - ox);
247         j++;
248       } while (j < upper->j - oy);
249       k++;
250     } while (k < upper->k - oz);
251 
252     PetscCall(PetscRealloc((size_t)(idx * sizeof(PetscInt)), (void *)&indices));
253   }
254 
255 createis:
256   PetscCall(ISCreateGeneral(PetscObjectComm((PetscObject)da), idx, indices, PETSC_OWN_POINTER, is));
257   PetscFunctionReturn(0);
258 }
259 
260 PetscErrorCode DMDASubDomainDA_Private(DM dm, PetscInt *nlocal, DM **sdm) {
261   DM           *da;
262   PetscInt      dim, size, i, j, k, idx;
263   DMDALocalInfo info;
264   PetscInt      xsize, ysize, zsize;
265   PetscInt      xo, yo, zo;
266   PetscInt      xs, ys, zs;
267   PetscInt      xm = 1, ym = 1, zm = 1;
268   PetscInt      xol, yol, zol;
269   PetscInt      m = 1, n = 1, p = 1;
270   PetscInt      M, N, P;
271   PetscInt      pm, mtmp;
272 
273   PetscFunctionBegin;
274   PetscCall(DMDAGetLocalInfo(dm, &info));
275   PetscCall(DMDAGetOverlap(dm, &xol, &yol, &zol));
276   PetscCall(DMDAGetNumLocalSubDomains(dm, &size));
277   PetscCall(PetscMalloc1(size, &da));
278 
279   if (nlocal) *nlocal = size;
280 
281   dim = info.dim;
282 
283   M = info.xm;
284   N = info.ym;
285   P = info.zm;
286 
287   if (dim == 1) {
288     m = size;
289   } else if (dim == 2) {
290     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M) * ((PetscReal)size) / ((PetscReal)N)));
291     while (m > 0) {
292       n = size / m;
293       if (m * n * p == size) break;
294       m--;
295     }
296   } else if (dim == 3) {
297     n = (PetscInt)(0.5 + PetscPowReal(((PetscReal)N * N) * ((PetscReal)size) / ((PetscReal)P * M), (PetscReal)(1. / 3.)));
298     if (!n) n = 1;
299     while (n > 0) {
300       pm = size / n;
301       if (n * pm == size) break;
302       n--;
303     }
304     if (!n) n = 1;
305     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M) * ((PetscReal)size) / ((PetscReal)P * n)));
306     if (!m) m = 1;
307     while (m > 0) {
308       p = size / (m * n);
309       if (m * n * p == size) break;
310       m--;
311     }
312     if (M > P && m < p) {
313       mtmp = m;
314       m    = p;
315       p    = mtmp;
316     }
317   }
318 
319   zs  = info.zs;
320   idx = 0;
321   for (k = 0; k < p; k++) {
322     ys = info.ys;
323     for (j = 0; j < n; j++) {
324       xs = info.xs;
325       for (i = 0; i < m; i++) {
326         if (dim == 1) {
327           xm = M / m + ((M % m) > i);
328         } else if (dim == 2) {
329           xm = M / m + ((M % m) > i);
330           ym = N / n + ((N % n) > j);
331         } else if (dim == 3) {
332           xm = M / m + ((M % m) > i);
333           ym = N / n + ((N % n) > j);
334           zm = P / p + ((P % p) > k);
335         }
336 
337         xsize = xm;
338         ysize = ym;
339         zsize = zm;
340         xo    = xs;
341         yo    = ys;
342         zo    = zs;
343 
344         PetscCall(DMDACreate(PETSC_COMM_SELF, &(da[idx])));
345         PetscCall(DMSetOptionsPrefix(da[idx], "sub_"));
346         PetscCall(DMSetDimension(da[idx], info.dim));
347         PetscCall(DMDASetDof(da[idx], info.dof));
348 
349         PetscCall(DMDASetStencilType(da[idx], info.st));
350         PetscCall(DMDASetStencilWidth(da[idx], info.sw));
351 
352         if (info.bx == DM_BOUNDARY_PERIODIC || (xs != 0)) {
353           xsize += xol;
354           xo -= xol;
355         }
356         if (info.by == DM_BOUNDARY_PERIODIC || (ys != 0)) {
357           ysize += yol;
358           yo -= yol;
359         }
360         if (info.bz == DM_BOUNDARY_PERIODIC || (zs != 0)) {
361           zsize += zol;
362           zo -= zol;
363         }
364 
365         if (info.bx == DM_BOUNDARY_PERIODIC || (xs + xm != info.mx)) xsize += xol;
366         if (info.by == DM_BOUNDARY_PERIODIC || (ys + ym != info.my)) ysize += yol;
367         if (info.bz == DM_BOUNDARY_PERIODIC || (zs + zm != info.mz)) zsize += zol;
368 
369         if (info.bx != DM_BOUNDARY_PERIODIC) {
370           if (xo < 0) {
371             xsize += xo;
372             xo = 0;
373           }
374           if (xo + xsize > info.mx - 1) xsize -= xo + xsize - info.mx;
375         }
376         if (info.by != DM_BOUNDARY_PERIODIC) {
377           if (yo < 0) {
378             ysize += yo;
379             yo = 0;
380           }
381           if (yo + ysize > info.my - 1) ysize -= yo + ysize - info.my;
382         }
383         if (info.bz != DM_BOUNDARY_PERIODIC) {
384           if (zo < 0) {
385             zsize += zo;
386             zo = 0;
387           }
388           if (zo + zsize > info.mz - 1) zsize -= zo + zsize - info.mz;
389         }
390 
391         PetscCall(DMDASetSizes(da[idx], xsize, ysize, zsize));
392         PetscCall(DMDASetNumProcs(da[idx], 1, 1, 1));
393         PetscCall(DMDASetBoundaryType(da[idx], DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED));
394 
395         /* set up as a block instead */
396         PetscCall(DMSetUp(da[idx]));
397 
398         /* nonoverlapping region */
399         PetscCall(DMDASetNonOverlappingRegion(da[idx], xs, ys, zs, xm, ym, zm));
400 
401         /* this alters the behavior of DMDAGetInfo, DMDAGetLocalInfo, DMDAGetCorners, and DMDAGetGhostedCorners and should be used with care */
402         PetscCall(DMDASetOffset(da[idx], xo, yo, zo, info.mx, info.my, info.mz));
403         xs += xm;
404         idx++;
405       }
406       ys += ym;
407     }
408     zs += zm;
409   }
410   *sdm = da;
411   PetscFunctionReturn(0);
412 }
413 
414 /*
415    Fills the local vector problem on the subdomain from the global problem.
416 
417    Right now this assumes one subdomain per processor.
418 
419 */
420 PetscErrorCode DMCreateDomainDecompositionScatters_DA(DM dm, PetscInt nsubdms, DM *subdms, VecScatter **iscat, VecScatter **oscat, VecScatter **lscat) {
421   DMDALocalInfo info, subinfo;
422   DM            subdm;
423   MatStencil    upper, lower;
424   IS            idis, isis, odis, osis, gdis;
425   Vec           svec, dvec, slvec;
426   PetscInt      xm, ym, zm, xs, ys, zs;
427   PetscInt      i;
428   PetscBool     patchis_offproc = PETSC_TRUE;
429 
430   PetscFunctionBegin;
431   /* allocate the arrays of scatters */
432   if (iscat) PetscCall(PetscMalloc1(nsubdms, iscat));
433   if (oscat) PetscCall(PetscMalloc1(nsubdms, oscat));
434   if (lscat) PetscCall(PetscMalloc1(nsubdms, lscat));
435 
436   PetscCall(DMDAGetLocalInfo(dm, &info));
437   for (i = 0; i < nsubdms; i++) {
438     subdm = subdms[i];
439     PetscCall(DMDAGetLocalInfo(subdm, &subinfo));
440     PetscCall(DMDAGetNonOverlappingRegion(subdm, &xs, &ys, &zs, &xm, &ym, &zm));
441 
442     /* create the global and subdomain index sets for the inner domain */
443     lower.i = xs;
444     lower.j = ys;
445     lower.k = zs;
446     upper.i = xs + xm;
447     upper.j = ys + ym;
448     upper.k = zs + zm;
449     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &idis, patchis_offproc));
450     PetscCall(DMDACreatePatchIS(subdm, &lower, &upper, &isis, patchis_offproc));
451 
452     /* create the global and subdomain index sets for the outer subdomain */
453     lower.i = subinfo.xs;
454     lower.j = subinfo.ys;
455     lower.k = subinfo.zs;
456     upper.i = subinfo.xs + subinfo.xm;
457     upper.j = subinfo.ys + subinfo.ym;
458     upper.k = subinfo.zs + subinfo.zm;
459     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &odis, patchis_offproc));
460     PetscCall(DMDACreatePatchIS(subdm, &lower, &upper, &osis, patchis_offproc));
461 
462     /* global and subdomain ISes for the local indices of the subdomain */
463     /* todo - make this not loop over at nonperiodic boundaries, which will be more involved */
464     lower.i = subinfo.gxs;
465     lower.j = subinfo.gys;
466     lower.k = subinfo.gzs;
467     upper.i = subinfo.gxs + subinfo.gxm;
468     upper.j = subinfo.gys + subinfo.gym;
469     upper.k = subinfo.gzs + subinfo.gzm;
470     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &gdis, patchis_offproc));
471 
472     /* form the scatter */
473     PetscCall(DMGetGlobalVector(dm, &dvec));
474     PetscCall(DMGetGlobalVector(subdm, &svec));
475     PetscCall(DMGetLocalVector(subdm, &slvec));
476 
477     if (iscat) PetscCall(VecScatterCreate(dvec, idis, svec, isis, &(*iscat)[i]));
478     if (oscat) PetscCall(VecScatterCreate(dvec, odis, svec, osis, &(*oscat)[i]));
479     if (lscat) PetscCall(VecScatterCreate(dvec, gdis, slvec, NULL, &(*lscat)[i]));
480 
481     PetscCall(DMRestoreGlobalVector(dm, &dvec));
482     PetscCall(DMRestoreGlobalVector(subdm, &svec));
483     PetscCall(DMRestoreLocalVector(subdm, &slvec));
484 
485     PetscCall(ISDestroy(&idis));
486     PetscCall(ISDestroy(&isis));
487 
488     PetscCall(ISDestroy(&odis));
489     PetscCall(ISDestroy(&osis));
490 
491     PetscCall(ISDestroy(&gdis));
492   }
493   PetscFunctionReturn(0);
494 }
495 
496 PetscErrorCode DMDASubDomainIS_Private(DM dm, PetscInt n, DM *subdm, IS **iis, IS **ois) {
497   PetscInt      i;
498   DMDALocalInfo info, subinfo;
499   MatStencil    lower, upper;
500   PetscBool     patchis_offproc = PETSC_TRUE;
501 
502   PetscFunctionBegin;
503   PetscCall(DMDAGetLocalInfo(dm, &info));
504   if (iis) PetscCall(PetscMalloc1(n, iis));
505   if (ois) PetscCall(PetscMalloc1(n, ois));
506 
507   for (i = 0; i < n; i++) {
508     PetscCall(DMDAGetLocalInfo(subdm[i], &subinfo));
509     if (iis) {
510       /* create the inner IS */
511       lower.i = info.xs;
512       lower.j = info.ys;
513       lower.k = info.zs;
514       upper.i = info.xs + info.xm;
515       upper.j = info.ys + info.ym;
516       upper.k = info.zs + info.zm;
517       PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &(*iis)[i], patchis_offproc));
518     }
519 
520     if (ois) {
521       /* create the outer IS */
522       lower.i = subinfo.xs;
523       lower.j = subinfo.ys;
524       lower.k = subinfo.zs;
525       upper.i = subinfo.xs + subinfo.xm;
526       upper.j = subinfo.ys + subinfo.ym;
527       upper.k = subinfo.zs + subinfo.zm;
528       PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &(*ois)[i], patchis_offproc));
529     }
530   }
531   PetscFunctionReturn(0);
532 }
533 
534 PetscErrorCode DMCreateDomainDecomposition_DA(DM dm, PetscInt *len, char ***names, IS **iis, IS **ois, DM **subdm) {
535   DM      *sdm;
536   PetscInt n, i;
537 
538   PetscFunctionBegin;
539   PetscCall(DMDASubDomainDA_Private(dm, &n, &sdm));
540   if (names) {
541     PetscCall(PetscMalloc1(n, names));
542     for (i = 0; i < n; i++) (*names)[i] = NULL;
543   }
544   PetscCall(DMDASubDomainIS_Private(dm, n, sdm, iis, ois));
545   if (subdm) *subdm = sdm;
546   else {
547     for (i = 0; i < n; i++) PetscCall(DMDestroy(&sdm[i]));
548   }
549   if (len) *len = n;
550   PetscFunctionReturn(0);
551 }
552