xref: /petsc/src/dm/impls/da/dadd.c (revision df4cd43f92eaa320656440c40edb1046daee8f75)
1 #include <petsc/private/dmdaimpl.h> /*I   "petscdmda.h"   I*/
2 
3 /*@
4   DMDACreatePatchIS - Creates an index set corresponding to a patch of the `DMDA`.
5 
6   Collective
7 
8   Input Parameters:
9 +  da - the `DMDA`
10 .  lower - a matstencil with i, j and k corresponding to the lower corner of the patch
11 .  upper - a matstencil with i, j and k corresponding to the upper corner of the patch
12 -  offproc - indicate whether the returned IS will contain off process indices
13 
14   Output Parameters:
15 .  is - the `IS` corresponding to the patch
16 
17   Level: developer
18 
19   Notes:
20   This routine always returns an `IS` on the `DMDA` comm, if offproc is set to `PETSC_TRUE`,
21   the routine returns an `IS` with all the indices requested regardless of whether these indices
22   are present on the requesting rank or not. Thus, it is upon the caller to ensure that
23   the indices returned in this mode are appropriate. If offproc is set to `PETSC_FALSE`,
24   the `IS` only returns the subset of indices that are present on the requesting rank and there
25   is no duplication of indices.
26 
27 .seealso: `DM`, `DMDA`, `DMCreateDomainDecomposition()`, `DMCreateDomainDecompositionScatters()`
28 @*/
29 PetscErrorCode DMDACreatePatchIS(DM da, MatStencil *lower, MatStencil *upper, IS *is, PetscBool offproc)
30 {
31   PetscInt        ms = 0, ns = 0, ps = 0;
32   PetscInt        mw = 0, nw = 0, pw = 0;
33   PetscInt        me = 1, ne = 1, pe = 1;
34   PetscInt        mr = 0, nr = 0, pr = 0;
35   PetscInt        ii, jj, kk;
36   PetscInt        si, sj, sk;
37   PetscInt        i, j, k, l, idx = 0;
38   PetscInt        base;
39   PetscInt        xm = 1, ym = 1, zm = 1;
40   PetscInt        ox, oy, oz;
41   PetscInt        m, n, p, M, N, P, dof;
42   const PetscInt *lx, *ly, *lz;
43   PetscInt        nindices;
44   PetscInt       *indices;
45   DM_DA          *dd     = (DM_DA *)da->data;
46   PetscBool       skip_i = PETSC_TRUE, skip_j = PETSC_TRUE, skip_k = PETSC_TRUE;
47   PetscBool       valid_j = PETSC_FALSE, valid_k = PETSC_FALSE; /* DMDA has at least 1 dimension */
48 
49   PetscFunctionBegin;
50   M   = dd->M;
51   N   = dd->N;
52   P   = dd->P;
53   m   = dd->m;
54   n   = dd->n;
55   p   = dd->p;
56   dof = dd->w;
57 
58   nindices = -1;
59   if (PetscLikely(upper->i - lower->i)) {
60     nindices = nindices * (upper->i - lower->i);
61     skip_i   = PETSC_FALSE;
62   }
63   if (N > 1) {
64     valid_j = PETSC_TRUE;
65     if (PetscLikely(upper->j - lower->j)) {
66       nindices = nindices * (upper->j - lower->j);
67       skip_j   = PETSC_FALSE;
68     }
69   }
70   if (P > 1) {
71     valid_k = PETSC_TRUE;
72     if (PetscLikely(upper->k - lower->k)) {
73       nindices = nindices * (upper->k - lower->k);
74       skip_k   = PETSC_FALSE;
75     }
76   }
77   if (PetscLikely(nindices < 0)) {
78     if (PetscUnlikely(skip_i && skip_j && skip_k)) {
79       nindices = 0;
80     } else nindices = nindices * (-1);
81   } else SETERRQ(PetscObjectComm((PetscObject)da), PETSC_ERR_ARG_WRONG, "Lower and Upper stencils are identical! Please check inputs.");
82 
83   PetscCall(PetscMalloc1(nindices * dof, &indices));
84   PetscCall(DMDAGetOffset(da, &ox, &oy, &oz, NULL, NULL, NULL));
85 
86   if (!valid_k) {
87     k        = 0;
88     upper->k = 0;
89     lower->k = 0;
90   }
91   if (!valid_j) {
92     j        = 0;
93     upper->j = 0;
94     lower->j = 0;
95   }
96 
97   if (offproc) {
98     PetscCall(DMDAGetOwnershipRanges(da, &lx, &ly, &lz));
99     /* start at index 0 on processor 0 */
100     mr = 0;
101     nr = 0;
102     pr = 0;
103     ms = 0;
104     ns = 0;
105     ps = 0;
106     if (lx) me = lx[0];
107     if (ly) ne = ly[0];
108     if (lz) pe = lz[0];
109     /*
110        If no indices are to be returned, create an empty is,
111        this prevents hanging in while loops
112     */
113     if (skip_i && skip_j && skip_k) goto createis;
114     /*
115        do..while loops to ensure the block gets entered once,
116        regardless of control condition being met, necessary for
117        cases when a subset of skip_i/j/k is true
118     */
119     if (skip_k) k = upper->k - oz;
120     else k = lower->k - oz;
121     do {
122       if (skip_j) j = upper->j - oy;
123       else j = lower->j - oy;
124       do {
125         if (skip_i) i = upper->i - ox;
126         else i = lower->i - ox;
127         do {
128           /* "actual" indices rather than ones outside of the domain */
129           ii = i;
130           jj = j;
131           kk = k;
132           if (ii < 0) ii = M + ii;
133           if (jj < 0) jj = N + jj;
134           if (kk < 0) kk = P + kk;
135           if (ii > M - 1) ii = ii - M;
136           if (jj > N - 1) jj = jj - N;
137           if (kk > P - 1) kk = kk - P;
138           /* gone out of processor range on x axis */
139           while (ii > me - 1 || ii < ms) {
140             if (mr == m - 1) {
141               ms = 0;
142               me = lx[0];
143               mr = 0;
144             } else {
145               mr++;
146               ms = me;
147               me += lx[mr];
148             }
149           }
150           /* gone out of processor range on y axis */
151           while (jj > ne - 1 || jj < ns) {
152             if (nr == n - 1) {
153               ns = 0;
154               ne = ly[0];
155               nr = 0;
156             } else {
157               nr++;
158               ns = ne;
159               ne += ly[nr];
160             }
161           }
162           /* gone out of processor range on z axis */
163           while (kk > pe - 1 || kk < ps) {
164             if (pr == p - 1) {
165               ps = 0;
166               pe = lz[0];
167               pr = 0;
168             } else {
169               pr++;
170               ps = pe;
171               pe += lz[pr];
172             }
173           }
174           /* compute the vector base on owning processor */
175           xm   = me - ms;
176           ym   = ne - ns;
177           zm   = pe - ps;
178           base = ms * ym * zm + ns * M * zm + ps * M * N;
179           /* compute the local coordinates on owning processor */
180           si = ii - ms;
181           sj = jj - ns;
182           sk = kk - ps;
183           for (l = 0; l < dof; l++) {
184             indices[idx] = l + dof * (base + si + xm * sj + xm * ym * sk);
185             idx++;
186           }
187           i++;
188         } while (i < upper->i - ox);
189         j++;
190       } while (j < upper->j - oy);
191       k++;
192     } while (k < upper->k - oz);
193   }
194 
195   if (!offproc) {
196     PetscCall(DMDAGetCorners(da, &ms, &ns, &ps, &mw, &nw, &pw));
197     me = ms + mw;
198     if (N > 1) ne = ns + nw;
199     if (P > 1) pe = ps + pw;
200     /* Account for DM offsets */
201     ms = ms - ox;
202     me = me - ox;
203     ns = ns - oy;
204     ne = ne - oy;
205     ps = ps - oz;
206     pe = pe - oz;
207 
208     /* compute the vector base on owning processor */
209     xm   = me - ms;
210     ym   = ne - ns;
211     zm   = pe - ps;
212     base = ms * ym * zm + ns * M * zm + ps * M * N;
213     /*
214        if no indices are to be returned, create an empty is,
215        this prevents hanging in while loops
216     */
217     if (skip_i && skip_j && skip_k) goto createis;
218     /*
219        do..while loops to ensure the block gets entered once,
220        regardless of control condition being met, necessary for
221        cases when a subset of skip_i/j/k is true
222     */
223     if (skip_k) k = upper->k - oz;
224     else k = lower->k - oz;
225     do {
226       if (skip_j) j = upper->j - oy;
227       else j = lower->j - oy;
228       do {
229         if (skip_i) i = upper->i - ox;
230         else i = lower->i - ox;
231         do {
232           if (k >= ps && k <= pe - 1) {
233             if (j >= ns && j <= ne - 1) {
234               if (i >= ms && i <= me - 1) {
235                 /* compute the local coordinates on owning processor */
236                 si = i - ms;
237                 sj = j - ns;
238                 sk = k - ps;
239                 for (l = 0; l < dof; l++) {
240                   indices[idx] = l + dof * (base + si + xm * sj + xm * ym * sk);
241                   idx++;
242                 }
243               }
244             }
245           }
246           i++;
247         } while (i < upper->i - ox);
248         j++;
249       } while (j < upper->j - oy);
250       k++;
251     } while (k < upper->k - oz);
252 
253     PetscCall(PetscRealloc((size_t)(idx * sizeof(PetscInt)), (void *)&indices));
254   }
255 
256 createis:
257   PetscCall(ISCreateGeneral(PetscObjectComm((PetscObject)da), idx, indices, PETSC_OWN_POINTER, is));
258   PetscFunctionReturn(PETSC_SUCCESS);
259 }
260 
261 PetscErrorCode DMDASubDomainDA_Private(DM dm, PetscInt *nlocal, DM **sdm)
262 {
263   DM           *da;
264   PetscInt      dim, size, i, j, k, idx;
265   DMDALocalInfo info;
266   PetscInt      xsize, ysize, zsize;
267   PetscInt      xo, yo, zo;
268   PetscInt      xs, ys, zs;
269   PetscInt      xm = 1, ym = 1, zm = 1;
270   PetscInt      xol, yol, zol;
271   PetscInt      m = 1, n = 1, p = 1;
272   PetscInt      M, N, P;
273   PetscInt      pm, mtmp;
274 
275   PetscFunctionBegin;
276   PetscCall(DMDAGetLocalInfo(dm, &info));
277   PetscCall(DMDAGetOverlap(dm, &xol, &yol, &zol));
278   PetscCall(DMDAGetNumLocalSubDomains(dm, &size));
279   PetscCall(PetscMalloc1(size, &da));
280 
281   if (nlocal) *nlocal = size;
282 
283   dim = info.dim;
284 
285   M = info.xm;
286   N = info.ym;
287   P = info.zm;
288 
289   if (dim == 1) {
290     m = size;
291   } else if (dim == 2) {
292     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M) * ((PetscReal)size) / ((PetscReal)N)));
293     while (m > 0) {
294       n = size / m;
295       if (m * n * p == size) break;
296       m--;
297     }
298   } else if (dim == 3) {
299     n = (PetscInt)(0.5 + PetscPowReal(((PetscReal)N * N) * ((PetscReal)size) / ((PetscReal)P * M), (PetscReal)(1. / 3.)));
300     if (!n) n = 1;
301     while (n > 0) {
302       pm = size / n;
303       if (n * pm == size) break;
304       n--;
305     }
306     if (!n) n = 1;
307     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M) * ((PetscReal)size) / ((PetscReal)P * n)));
308     if (!m) m = 1;
309     while (m > 0) {
310       p = size / (m * n);
311       if (m * n * p == size) break;
312       m--;
313     }
314     if (M > P && m < p) {
315       mtmp = m;
316       m    = p;
317       p    = mtmp;
318     }
319   }
320 
321   zs  = info.zs;
322   idx = 0;
323   for (k = 0; k < p; k++) {
324     ys = info.ys;
325     for (j = 0; j < n; j++) {
326       xs = info.xs;
327       for (i = 0; i < m; i++) {
328         if (dim == 1) {
329           xm = M / m + ((M % m) > i);
330         } else if (dim == 2) {
331           xm = M / m + ((M % m) > i);
332           ym = N / n + ((N % n) > j);
333         } else if (dim == 3) {
334           xm = M / m + ((M % m) > i);
335           ym = N / n + ((N % n) > j);
336           zm = P / p + ((P % p) > k);
337         }
338 
339         xsize = xm;
340         ysize = ym;
341         zsize = zm;
342         xo    = xs;
343         yo    = ys;
344         zo    = zs;
345 
346         PetscCall(DMDACreate(PETSC_COMM_SELF, &(da[idx])));
347         PetscCall(DMSetOptionsPrefix(da[idx], "sub_"));
348         PetscCall(DMSetDimension(da[idx], info.dim));
349         PetscCall(DMDASetDof(da[idx], info.dof));
350 
351         PetscCall(DMDASetStencilType(da[idx], info.st));
352         PetscCall(DMDASetStencilWidth(da[idx], info.sw));
353 
354         if (info.bx == DM_BOUNDARY_PERIODIC || (xs != 0)) {
355           xsize += xol;
356           xo -= xol;
357         }
358         if (info.by == DM_BOUNDARY_PERIODIC || (ys != 0)) {
359           ysize += yol;
360           yo -= yol;
361         }
362         if (info.bz == DM_BOUNDARY_PERIODIC || (zs != 0)) {
363           zsize += zol;
364           zo -= zol;
365         }
366 
367         if (info.bx == DM_BOUNDARY_PERIODIC || (xs + xm != info.mx)) xsize += xol;
368         if (info.by == DM_BOUNDARY_PERIODIC || (ys + ym != info.my)) ysize += yol;
369         if (info.bz == DM_BOUNDARY_PERIODIC || (zs + zm != info.mz)) zsize += zol;
370 
371         if (info.bx != DM_BOUNDARY_PERIODIC) {
372           if (xo < 0) {
373             xsize += xo;
374             xo = 0;
375           }
376           if (xo + xsize > info.mx - 1) xsize -= xo + xsize - info.mx;
377         }
378         if (info.by != DM_BOUNDARY_PERIODIC) {
379           if (yo < 0) {
380             ysize += yo;
381             yo = 0;
382           }
383           if (yo + ysize > info.my - 1) ysize -= yo + ysize - info.my;
384         }
385         if (info.bz != DM_BOUNDARY_PERIODIC) {
386           if (zo < 0) {
387             zsize += zo;
388             zo = 0;
389           }
390           if (zo + zsize > info.mz - 1) zsize -= zo + zsize - info.mz;
391         }
392 
393         PetscCall(DMDASetSizes(da[idx], xsize, ysize, zsize));
394         PetscCall(DMDASetNumProcs(da[idx], 1, 1, 1));
395         PetscCall(DMDASetBoundaryType(da[idx], DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED));
396 
397         /* set up as a block instead */
398         PetscCall(DMSetUp(da[idx]));
399 
400         /* nonoverlapping region */
401         PetscCall(DMDASetNonOverlappingRegion(da[idx], xs, ys, zs, xm, ym, zm));
402 
403         /* this alters the behavior of DMDAGetInfo, DMDAGetLocalInfo, DMDAGetCorners, and DMDAGetGhostedCorners and should be used with care */
404         PetscCall(DMDASetOffset(da[idx], xo, yo, zo, info.mx, info.my, info.mz));
405         xs += xm;
406         idx++;
407       }
408       ys += ym;
409     }
410     zs += zm;
411   }
412   *sdm = da;
413   PetscFunctionReturn(PETSC_SUCCESS);
414 }
415 
416 /*
417    Fills the local vector problem on the subdomain from the global problem.
418 
419    Right now this assumes one subdomain per processor.
420 
421 */
422 PetscErrorCode DMCreateDomainDecompositionScatters_DA(DM dm, PetscInt nsubdms, DM *subdms, VecScatter **iscat, VecScatter **oscat, VecScatter **lscat)
423 {
424   DMDALocalInfo info, subinfo;
425   DM            subdm;
426   MatStencil    upper, lower;
427   IS            idis, isis, odis, osis, gdis;
428   Vec           svec, dvec, slvec;
429   PetscInt      xm, ym, zm, xs, ys, zs;
430   PetscInt      i;
431   PetscBool     patchis_offproc = PETSC_TRUE;
432 
433   PetscFunctionBegin;
434   /* allocate the arrays of scatters */
435   if (iscat) PetscCall(PetscMalloc1(nsubdms, iscat));
436   if (oscat) PetscCall(PetscMalloc1(nsubdms, oscat));
437   if (lscat) PetscCall(PetscMalloc1(nsubdms, lscat));
438 
439   PetscCall(DMDAGetLocalInfo(dm, &info));
440   for (i = 0; i < nsubdms; i++) {
441     subdm = subdms[i];
442     PetscCall(DMDAGetLocalInfo(subdm, &subinfo));
443     PetscCall(DMDAGetNonOverlappingRegion(subdm, &xs, &ys, &zs, &xm, &ym, &zm));
444 
445     /* create the global and subdomain index sets for the inner domain */
446     lower.i = xs;
447     lower.j = ys;
448     lower.k = zs;
449     upper.i = xs + xm;
450     upper.j = ys + ym;
451     upper.k = zs + zm;
452     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &idis, patchis_offproc));
453     PetscCall(DMDACreatePatchIS(subdm, &lower, &upper, &isis, patchis_offproc));
454 
455     /* create the global and subdomain index sets for the outer subdomain */
456     lower.i = subinfo.xs;
457     lower.j = subinfo.ys;
458     lower.k = subinfo.zs;
459     upper.i = subinfo.xs + subinfo.xm;
460     upper.j = subinfo.ys + subinfo.ym;
461     upper.k = subinfo.zs + subinfo.zm;
462     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &odis, patchis_offproc));
463     PetscCall(DMDACreatePatchIS(subdm, &lower, &upper, &osis, patchis_offproc));
464 
465     /* global and subdomain ISes for the local indices of the subdomain */
466     /* todo - make this not loop over at nonperiodic boundaries, which will be more involved */
467     lower.i = subinfo.gxs;
468     lower.j = subinfo.gys;
469     lower.k = subinfo.gzs;
470     upper.i = subinfo.gxs + subinfo.gxm;
471     upper.j = subinfo.gys + subinfo.gym;
472     upper.k = subinfo.gzs + subinfo.gzm;
473     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &gdis, patchis_offproc));
474 
475     /* form the scatter */
476     PetscCall(DMGetGlobalVector(dm, &dvec));
477     PetscCall(DMGetGlobalVector(subdm, &svec));
478     PetscCall(DMGetLocalVector(subdm, &slvec));
479 
480     if (iscat) PetscCall(VecScatterCreate(dvec, idis, svec, isis, &(*iscat)[i]));
481     if (oscat) PetscCall(VecScatterCreate(dvec, odis, svec, osis, &(*oscat)[i]));
482     if (lscat) PetscCall(VecScatterCreate(dvec, gdis, slvec, NULL, &(*lscat)[i]));
483 
484     PetscCall(DMRestoreGlobalVector(dm, &dvec));
485     PetscCall(DMRestoreGlobalVector(subdm, &svec));
486     PetscCall(DMRestoreLocalVector(subdm, &slvec));
487 
488     PetscCall(ISDestroy(&idis));
489     PetscCall(ISDestroy(&isis));
490 
491     PetscCall(ISDestroy(&odis));
492     PetscCall(ISDestroy(&osis));
493 
494     PetscCall(ISDestroy(&gdis));
495   }
496   PetscFunctionReturn(PETSC_SUCCESS);
497 }
498 
499 PetscErrorCode DMDASubDomainIS_Private(DM dm, PetscInt n, DM *subdm, IS **iis, IS **ois)
500 {
501   PetscInt      i;
502   DMDALocalInfo info, subinfo;
503   MatStencil    lower, upper;
504   PetscBool     patchis_offproc = PETSC_TRUE;
505 
506   PetscFunctionBegin;
507   PetscCall(DMDAGetLocalInfo(dm, &info));
508   if (iis) PetscCall(PetscMalloc1(n, iis));
509   if (ois) PetscCall(PetscMalloc1(n, ois));
510 
511   for (i = 0; i < n; i++) {
512     PetscCall(DMDAGetLocalInfo(subdm[i], &subinfo));
513     if (iis) {
514       /* create the inner IS */
515       lower.i = info.xs;
516       lower.j = info.ys;
517       lower.k = info.zs;
518       upper.i = info.xs + info.xm;
519       upper.j = info.ys + info.ym;
520       upper.k = info.zs + info.zm;
521       PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &(*iis)[i], patchis_offproc));
522     }
523 
524     if (ois) {
525       /* create the outer IS */
526       lower.i = subinfo.xs;
527       lower.j = subinfo.ys;
528       lower.k = subinfo.zs;
529       upper.i = subinfo.xs + subinfo.xm;
530       upper.j = subinfo.ys + subinfo.ym;
531       upper.k = subinfo.zs + subinfo.zm;
532       PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &(*ois)[i], patchis_offproc));
533     }
534   }
535   PetscFunctionReturn(PETSC_SUCCESS);
536 }
537 
538 PetscErrorCode DMCreateDomainDecomposition_DA(DM dm, PetscInt *len, char ***names, IS **iis, IS **ois, DM **subdm)
539 {
540   DM      *sdm;
541   PetscInt n, i;
542 
543   PetscFunctionBegin;
544   PetscCall(DMDASubDomainDA_Private(dm, &n, &sdm));
545   if (names) {
546     PetscCall(PetscMalloc1(n, names));
547     for (i = 0; i < n; i++) (*names)[i] = NULL;
548   }
549   PetscCall(DMDASubDomainIS_Private(dm, n, sdm, iis, ois));
550   if (subdm) *subdm = sdm;
551   else {
552     for (i = 0; i < n; i++) PetscCall(DMDestroy(&sdm[i]));
553   }
554   if (len) *len = n;
555   PetscFunctionReturn(PETSC_SUCCESS);
556 }
557