xref: /petsc/src/dm/impls/da/dadd.c (revision 30404b964f92c07d0d054ea2ab37b64e4da063cc)
1 #include <petsc/private/dmdaimpl.h> /*I   "petscdmda.h"   I*/
2 
3 /*@
4   DMDACreatePatchIS - Creates an index set corresponding to a logically rectangular patch of the `DMDA`.
5 
6   Collective
7 
8   Input Parameters:
9 + da      - the `DMDA`
10 . lower   - a `MatStencil` with i, j and k entries corresponding to the lower corner of the patch
11 . upper   - a `MatStencil` with i, j and k entries corresponding to the upper corner of the patch
12 - offproc - indicate whether the returned `IS` will contain off process indices
13 
14   Output Parameter:
15 . is - the `IS` corresponding to the patch
16 
17   Level: developer
18 
19   Notes:
20   This routine always returns an `IS` on the `DMDA` communicator.
21 
22   If `offproc` is set to `PETSC_TRUE`,
23   the routine returns an `IS` with all the indices requested regardless of whether these indices
24   are present on the requesting MPI process or not. Thus, it is upon the caller to ensure that
25   the indices returned in this mode are appropriate.
26 
27   If `offproc` is set to `PETSC_FALSE`,
28   the `IS` only returns the subset of indices that are present on the requesting MPI process and there
29   is no duplication of indices between multiple MPI processes.
30 
31 .seealso: [](sec_struct), `DM`, `DMDA`, `DMCreateDomainDecomposition()`, `DMCreateDomainDecompositionScatters()`
32 @*/
33 PetscErrorCode DMDACreatePatchIS(DM da, MatStencil *lower, MatStencil *upper, IS *is, PetscBool offproc)
34 {
35   PetscInt        ms = 0, ns = 0, ps = 0;
36   PetscInt        mw = 0, nw = 0, pw = 0;
37   PetscInt        me = 1, ne = 1, pe = 1;
38   PetscInt        mr = 0, nr = 0, pr = 0;
39   PetscInt        ii, jj, kk;
40   PetscInt        si, sj, sk;
41   PetscInt        i, j, k = 0, l, idx = 0;
42   PetscInt        base;
43   PetscInt        xm = 1, ym = 1, zm = 1;
44   PetscInt        ox, oy, oz;
45   PetscInt        m, n, p, M, N, P, dof;
46   const PetscInt *lx, *ly, *lz;
47   PetscInt        nindices;
48   PetscInt       *indices;
49   DM_DA          *dd     = (DM_DA *)da->data;
50   PetscBool       skip_i = PETSC_TRUE, skip_j = PETSC_TRUE, skip_k = PETSC_TRUE;
51   PetscBool       valid_j = PETSC_FALSE, valid_k = PETSC_FALSE; /* DMDA has at least 1 dimension */
52 
53   PetscFunctionBegin;
54   M   = dd->M;
55   N   = dd->N;
56   P   = dd->P;
57   m   = dd->m;
58   n   = dd->n;
59   p   = dd->p;
60   dof = dd->w;
61 
62   nindices = -1;
63   if (PetscLikely(upper->i - lower->i)) {
64     nindices = nindices * (upper->i - lower->i);
65     skip_i   = PETSC_FALSE;
66   }
67   if (N > 1) {
68     valid_j = PETSC_TRUE;
69     if (PetscLikely(upper->j - lower->j)) {
70       nindices = nindices * (upper->j - lower->j);
71       skip_j   = PETSC_FALSE;
72     }
73   }
74   if (P > 1) {
75     valid_k = PETSC_TRUE;
76     if (PetscLikely(upper->k - lower->k)) {
77       nindices = nindices * (upper->k - lower->k);
78       skip_k   = PETSC_FALSE;
79     }
80   }
81   if (PetscLikely(nindices < 0)) {
82     if (PetscUnlikely(skip_i && skip_j && skip_k)) {
83       nindices = 0;
84     } else nindices = nindices * (-1);
85   } else SETERRQ(PetscObjectComm((PetscObject)da), PETSC_ERR_ARG_WRONG, "Lower and Upper stencils are identical! Please check inputs.");
86 
87   PetscCall(PetscMalloc1(nindices * dof, &indices));
88   PetscCall(DMDAGetOffset(da, &ox, &oy, &oz, NULL, NULL, NULL));
89 
90   if (!valid_k) {
91     upper->k = 0;
92     lower->k = 0;
93   }
94   if (!valid_j) {
95     j        = 0;
96     upper->j = 0;
97     lower->j = 0;
98   }
99 
100   if (offproc) {
101     PetscCall(DMDAGetOwnershipRanges(da, &lx, &ly, &lz));
102     /* start at index 0 on processor 0 */
103     mr = 0;
104     nr = 0;
105     pr = 0;
106     ms = 0;
107     ns = 0;
108     ps = 0;
109     if (lx) me = lx[0];
110     if (ly) ne = ly[0];
111     if (lz) pe = lz[0];
112     /*
113        If no indices are to be returned, create an empty is,
114        this prevents hanging in while loops
115     */
116     if (skip_i && skip_j && skip_k) goto createis;
117     /*
118        do..while loops to ensure the block gets entered once,
119        regardless of control condition being met, necessary for
120        cases when a subset of skip_i/j/k is true
121     */
122     if (skip_k) k = upper->k - oz;
123     else k = lower->k - oz;
124     do {
125       if (skip_j) j = upper->j - oy;
126       else j = lower->j - oy;
127       do {
128         if (skip_i) i = upper->i - ox;
129         else i = lower->i - ox;
130         do {
131           /* "actual" indices rather than ones outside of the domain */
132           ii = i;
133           jj = j;
134           kk = k;
135           if (ii < 0) ii = M + ii;
136           if (jj < 0) jj = N + jj;
137           if (kk < 0) kk = P + kk;
138           if (ii > M - 1) ii = ii - M;
139           if (jj > N - 1) jj = jj - N;
140           if (kk > P - 1) kk = kk - P;
141           /* gone out of processor range on x axis */
142           while (ii > me - 1 || ii < ms) {
143             if (mr == m - 1) {
144               ms = 0;
145               me = lx[0];
146               mr = 0;
147             } else {
148               mr++;
149               ms = me;
150               me += lx[mr];
151             }
152           }
153           /* gone out of processor range on y axis */
154           while (jj > ne - 1 || jj < ns) {
155             if (nr == n - 1) {
156               ns = 0;
157               ne = ly[0];
158               nr = 0;
159             } else {
160               nr++;
161               ns = ne;
162               ne += ly[nr];
163             }
164           }
165           /* gone out of processor range on z axis */
166           while (kk > pe - 1 || kk < ps) {
167             if (pr == p - 1) {
168               ps = 0;
169               pe = lz[0];
170               pr = 0;
171             } else {
172               pr++;
173               ps = pe;
174               pe += lz[pr];
175             }
176           }
177           /* compute the vector base on owning processor */
178           xm   = me - ms;
179           ym   = ne - ns;
180           zm   = pe - ps;
181           base = ms * ym * zm + ns * M * zm + ps * M * N;
182           /* compute the local coordinates on owning processor */
183           si = ii - ms;
184           sj = jj - ns;
185           sk = kk - ps;
186           for (l = 0; l < dof; l++) {
187             indices[idx] = l + dof * (base + si + xm * sj + xm * ym * sk);
188             idx++;
189           }
190           i++;
191         } while (i < upper->i - ox);
192         j++;
193       } while (j < upper->j - oy);
194       k++;
195     } while (k < upper->k - oz);
196   }
197 
198   if (!offproc) {
199     PetscCall(DMDAGetCorners(da, &ms, &ns, &ps, &mw, &nw, &pw));
200     me = ms + mw;
201     if (N > 1) ne = ns + nw;
202     if (P > 1) pe = ps + pw;
203     /* Account for DM offsets */
204     ms = ms - ox;
205     me = me - ox;
206     ns = ns - oy;
207     ne = ne - oy;
208     ps = ps - oz;
209     pe = pe - oz;
210 
211     /* compute the vector base on owning processor */
212     xm   = me - ms;
213     ym   = ne - ns;
214     zm   = pe - ps;
215     base = ms * ym * zm + ns * M * zm + ps * M * N;
216     /*
217        if no indices are to be returned, create an empty is,
218        this prevents hanging in while loops
219     */
220     if (skip_i && skip_j && skip_k) goto createis;
221     /*
222        do..while loops to ensure the block gets entered once,
223        regardless of control condition being met, necessary for
224        cases when a subset of skip_i/j/k is true
225     */
226     if (skip_k) k = upper->k - oz;
227     else k = lower->k - oz;
228     do {
229       if (skip_j) j = upper->j - oy;
230       else j = lower->j - oy;
231       do {
232         if (skip_i) i = upper->i - ox;
233         else i = lower->i - ox;
234         do {
235           if (k >= ps && k <= pe - 1) {
236             if (j >= ns && j <= ne - 1) {
237               if (i >= ms && i <= me - 1) {
238                 /* compute the local coordinates on owning processor */
239                 si = i - ms;
240                 sj = j - ns;
241                 sk = k - ps;
242                 for (l = 0; l < dof; l++) {
243                   indices[idx] = l + dof * (base + si + xm * sj + xm * ym * sk);
244                   idx++;
245                 }
246               }
247             }
248           }
249           i++;
250         } while (i < upper->i - ox);
251         j++;
252       } while (j < upper->j - oy);
253       k++;
254     } while (k < upper->k - oz);
255 
256     PetscCall(PetscRealloc((size_t)(idx * sizeof(PetscInt)), (void *)&indices));
257   }
258 
259 createis:
260   PetscCall(ISCreateGeneral(PetscObjectComm((PetscObject)da), idx, indices, PETSC_OWN_POINTER, is));
261   PetscFunctionReturn(PETSC_SUCCESS);
262 }
263 
264 static PetscErrorCode DMDASubDomainDA_Private(DM dm, PetscInt *nlocal, DM **sdm)
265 {
266   DM           *da;
267   PetscInt      dim, size, i, j, k, idx;
268   DMDALocalInfo info;
269   PetscInt      xsize, ysize, zsize;
270   PetscInt      xo, yo, zo;
271   PetscInt      xs, ys, zs;
272   PetscInt      xm = 1, ym = 1, zm = 1;
273   PetscInt      xol, yol, zol;
274   PetscInt      m = 1, n = 1, p = 1;
275   PetscInt      M, N, P;
276   PetscInt      pm, mtmp;
277 
278   PetscFunctionBegin;
279   PetscCall(DMDAGetLocalInfo(dm, &info));
280   PetscCall(DMDAGetOverlap(dm, &xol, &yol, &zol));
281   PetscCall(DMDAGetNumLocalSubDomains(dm, &size));
282   PetscCall(PetscMalloc1(size, &da));
283 
284   if (nlocal) *nlocal = size;
285 
286   dim = info.dim;
287 
288   M = info.xm;
289   N = info.ym;
290   P = info.zm;
291 
292   if (dim == 1) {
293     m = size;
294   } else if (dim == 2) {
295     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M) * ((PetscReal)size) / ((PetscReal)N)));
296     while (m > 0) {
297       n = size / m;
298       if (m * n * p == size) break;
299       m--;
300     }
301   } else if (dim == 3) {
302     n = (PetscInt)(0.5 + PetscPowReal(((PetscReal)N * N) * ((PetscReal)size) / ((PetscReal)P * M), (PetscReal)(1. / 3.)));
303     if (!n) n = 1;
304     while (n > 0) {
305       pm = size / n;
306       if (n * pm == size) break;
307       n--;
308     }
309     if (!n) n = 1;
310     m = (PetscInt)(0.5 + PetscSqrtReal(((PetscReal)M) * ((PetscReal)size) / ((PetscReal)P * n)));
311     if (!m) m = 1;
312     while (m > 0) {
313       p = size / (m * n);
314       if (m * n * p == size) break;
315       m--;
316     }
317     if (M > P && m < p) {
318       mtmp = m;
319       m    = p;
320       p    = mtmp;
321     }
322   }
323 
324   zs  = info.zs;
325   idx = 0;
326   for (k = 0; k < p; k++) {
327     ys = info.ys;
328     for (j = 0; j < n; j++) {
329       xs = info.xs;
330       for (i = 0; i < m; i++) {
331         if (dim == 1) {
332           xm = M / m + ((M % m) > i);
333         } else if (dim == 2) {
334           xm = M / m + ((M % m) > i);
335           ym = N / n + ((N % n) > j);
336         } else if (dim == 3) {
337           xm = M / m + ((M % m) > i);
338           ym = N / n + ((N % n) > j);
339           zm = P / p + ((P % p) > k);
340         }
341 
342         xsize = xm;
343         ysize = ym;
344         zsize = zm;
345         xo    = xs;
346         yo    = ys;
347         zo    = zs;
348 
349         PetscCall(DMDACreate(PETSC_COMM_SELF, &da[idx]));
350         PetscCall(DMSetOptionsPrefix(da[idx], "sub_"));
351         PetscCall(DMSetDimension(da[idx], info.dim));
352         PetscCall(DMDASetDof(da[idx], info.dof));
353 
354         PetscCall(DMDASetStencilType(da[idx], info.st));
355         PetscCall(DMDASetStencilWidth(da[idx], info.sw));
356 
357         if (info.bx == DM_BOUNDARY_PERIODIC || (xs != 0)) {
358           xsize += xol;
359           xo -= xol;
360         }
361         if (info.by == DM_BOUNDARY_PERIODIC || (ys != 0)) {
362           ysize += yol;
363           yo -= yol;
364         }
365         if (info.bz == DM_BOUNDARY_PERIODIC || (zs != 0)) {
366           zsize += zol;
367           zo -= zol;
368         }
369 
370         if (info.bx == DM_BOUNDARY_PERIODIC || (xs + xm != info.mx)) xsize += xol;
371         if (info.by == DM_BOUNDARY_PERIODIC || (ys + ym != info.my)) ysize += yol;
372         if (info.bz == DM_BOUNDARY_PERIODIC || (zs + zm != info.mz)) zsize += zol;
373 
374         if (info.bx != DM_BOUNDARY_PERIODIC) {
375           if (xo < 0) {
376             xsize += xo;
377             xo = 0;
378           }
379           if (xo + xsize > info.mx - 1) xsize -= xo + xsize - info.mx;
380         }
381         if (info.by != DM_BOUNDARY_PERIODIC) {
382           if (yo < 0) {
383             ysize += yo;
384             yo = 0;
385           }
386           if (yo + ysize > info.my - 1) ysize -= yo + ysize - info.my;
387         }
388         if (info.bz != DM_BOUNDARY_PERIODIC) {
389           if (zo < 0) {
390             zsize += zo;
391             zo = 0;
392           }
393           if (zo + zsize > info.mz - 1) zsize -= zo + zsize - info.mz;
394         }
395 
396         PetscCall(DMDASetSizes(da[idx], xsize, ysize, zsize));
397         PetscCall(DMDASetNumProcs(da[idx], 1, 1, 1));
398         PetscCall(DMDASetBoundaryType(da[idx], DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED, DM_BOUNDARY_GHOSTED));
399 
400         /* set up as a block instead */
401         PetscCall(DMSetUp(da[idx]));
402 
403         /* nonoverlapping region */
404         PetscCall(DMDASetNonOverlappingRegion(da[idx], xs, ys, zs, xm, ym, zm));
405 
406         /* this alters the behavior of DMDAGetInfo, DMDAGetLocalInfo, DMDAGetCorners, and DMDAGetGhostedCorners and should be used with care */
407         PetscCall(DMDASetOffset(da[idx], xo, yo, zo, info.mx, info.my, info.mz));
408         xs += xm;
409         idx++;
410       }
411       ys += ym;
412     }
413     zs += zm;
414   }
415   *sdm = da;
416   PetscFunctionReturn(PETSC_SUCCESS);
417 }
418 
419 /*
420    Fills the local vector problem on the subdomain from the global problem.
421 
422    Right now this assumes one subdomain per processor.
423 
424 */
425 PetscErrorCode DMCreateDomainDecompositionScatters_DA(DM dm, PetscInt nsubdms, DM *subdms, VecScatter **iscat, VecScatter **oscat, VecScatter **lscat)
426 {
427   DMDALocalInfo info, subinfo;
428   DM            subdm;
429   MatStencil    upper, lower;
430   IS            idis, isis, odis, osis, gdis;
431   Vec           svec, dvec, slvec;
432   PetscInt      xm, ym, zm, xs, ys, zs;
433   PetscInt      i;
434   PetscBool     patchis_offproc = PETSC_TRUE;
435 
436   PetscFunctionBegin;
437   /* allocate the arrays of scatters */
438   if (iscat) PetscCall(PetscMalloc1(nsubdms, iscat));
439   if (oscat) PetscCall(PetscMalloc1(nsubdms, oscat));
440   if (lscat) PetscCall(PetscMalloc1(nsubdms, lscat));
441 
442   PetscCall(DMDAGetLocalInfo(dm, &info));
443   for (i = 0; i < nsubdms; i++) {
444     subdm = subdms[i];
445     PetscCall(DMDAGetLocalInfo(subdm, &subinfo));
446     PetscCall(DMDAGetNonOverlappingRegion(subdm, &xs, &ys, &zs, &xm, &ym, &zm));
447 
448     /* create the global and subdomain index sets for the inner domain */
449     lower.i = xs;
450     lower.j = ys;
451     lower.k = zs;
452     upper.i = xs + xm;
453     upper.j = ys + ym;
454     upper.k = zs + zm;
455     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &idis, patchis_offproc));
456     PetscCall(DMDACreatePatchIS(subdm, &lower, &upper, &isis, patchis_offproc));
457 
458     /* create the global and subdomain index sets for the outer subdomain */
459     lower.i = subinfo.xs;
460     lower.j = subinfo.ys;
461     lower.k = subinfo.zs;
462     upper.i = subinfo.xs + subinfo.xm;
463     upper.j = subinfo.ys + subinfo.ym;
464     upper.k = subinfo.zs + subinfo.zm;
465     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &odis, patchis_offproc));
466     PetscCall(DMDACreatePatchIS(subdm, &lower, &upper, &osis, patchis_offproc));
467 
468     /* global and subdomain ISes for the local indices of the subdomain */
469     /* todo - make this not loop over at nonperiodic boundaries, which will be more involved */
470     lower.i = subinfo.gxs;
471     lower.j = subinfo.gys;
472     lower.k = subinfo.gzs;
473     upper.i = subinfo.gxs + subinfo.gxm;
474     upper.j = subinfo.gys + subinfo.gym;
475     upper.k = subinfo.gzs + subinfo.gzm;
476     PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &gdis, patchis_offproc));
477 
478     /* form the scatter */
479     PetscCall(DMGetGlobalVector(dm, &dvec));
480     PetscCall(DMGetGlobalVector(subdm, &svec));
481     PetscCall(DMGetLocalVector(subdm, &slvec));
482 
483     if (iscat) PetscCall(VecScatterCreate(dvec, idis, svec, isis, &(*iscat)[i]));
484     if (oscat) PetscCall(VecScatterCreate(dvec, odis, svec, osis, &(*oscat)[i]));
485     if (lscat) PetscCall(VecScatterCreate(dvec, gdis, slvec, NULL, &(*lscat)[i]));
486 
487     PetscCall(DMRestoreGlobalVector(dm, &dvec));
488     PetscCall(DMRestoreGlobalVector(subdm, &svec));
489     PetscCall(DMRestoreLocalVector(subdm, &slvec));
490 
491     PetscCall(ISDestroy(&idis));
492     PetscCall(ISDestroy(&isis));
493 
494     PetscCall(ISDestroy(&odis));
495     PetscCall(ISDestroy(&osis));
496 
497     PetscCall(ISDestroy(&gdis));
498   }
499   PetscFunctionReturn(PETSC_SUCCESS);
500 }
501 
502 static PetscErrorCode DMDASubDomainIS_Private(DM dm, PetscInt n, DM *subdm, IS **iis, IS **ois)
503 {
504   PetscInt      i;
505   DMDALocalInfo info, subinfo;
506   MatStencil    lower, upper;
507   PetscBool     patchis_offproc = PETSC_TRUE;
508 
509   PetscFunctionBegin;
510   PetscCall(DMDAGetLocalInfo(dm, &info));
511   if (iis) PetscCall(PetscMalloc1(n, iis));
512   if (ois) PetscCall(PetscMalloc1(n, ois));
513 
514   for (i = 0; i < n; i++) {
515     PetscCall(DMDAGetLocalInfo(subdm[i], &subinfo));
516     if (iis) {
517       /* create the inner IS */
518       lower.i = info.xs;
519       lower.j = info.ys;
520       lower.k = info.zs;
521       upper.i = info.xs + info.xm;
522       upper.j = info.ys + info.ym;
523       upper.k = info.zs + info.zm;
524       PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &(*iis)[i], patchis_offproc));
525     }
526 
527     if (ois) {
528       /* create the outer IS */
529       lower.i = subinfo.xs;
530       lower.j = subinfo.ys;
531       lower.k = subinfo.zs;
532       upper.i = subinfo.xs + subinfo.xm;
533       upper.j = subinfo.ys + subinfo.ym;
534       upper.k = subinfo.zs + subinfo.zm;
535       PetscCall(DMDACreatePatchIS(dm, &lower, &upper, &(*ois)[i], patchis_offproc));
536     }
537   }
538   PetscFunctionReturn(PETSC_SUCCESS);
539 }
540 
541 PetscErrorCode DMCreateDomainDecomposition_DA(DM dm, PetscInt *len, char ***names, IS **iis, IS **ois, DM **subdm)
542 {
543   DM      *sdm = NULL;
544   PetscInt n   = 0, i;
545 
546   PetscFunctionBegin;
547   PetscCall(DMDASubDomainDA_Private(dm, &n, &sdm));
548   if (names) {
549     PetscCall(PetscMalloc1(n, names));
550     for (i = 0; i < n; i++) (*names)[i] = NULL;
551   }
552   PetscCall(DMDASubDomainIS_Private(dm, n, sdm, iis, ois));
553   if (subdm) *subdm = sdm;
554   else {
555     for (i = 0; i < n; i++) PetscCall(DMDestroy(&sdm[i]));
556   }
557   if (len) *len = n;
558   PetscFunctionReturn(PETSC_SUCCESS);
559 }
560