xref: /petsc/src/dm/impls/da/da1.c (revision b41ce5d507ea9a58bfa83cf403107a702e77a67d)
1 
2 /*
3    Code for manipulating distributed regular 1d arrays in parallel.
4    This file was created by Peter Mell   6/30/95
5 */
6 
7 #include <petsc/private/dmdaimpl.h>     /*I  "petscdmda.h"   I*/
8 
9 #include <petscdraw.h>
10 static PetscErrorCode DMView_DA_1d(DM da,PetscViewer viewer)
11 {
12   PetscErrorCode ierr;
13   PetscMPIInt    rank;
14   PetscBool      iascii,isdraw,isbinary;
15   DM_DA          *dd = (DM_DA*)da->data;
16 #if defined(PETSC_HAVE_MATLAB_ENGINE)
17   PetscBool ismatlab;
18 #endif
19 
20   PetscFunctionBegin;
21   ierr = MPI_Comm_rank(PetscObjectComm((PetscObject)da),&rank);CHKERRQ(ierr);
22 
23   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERASCII,&iascii);CHKERRQ(ierr);
24   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERDRAW,&isdraw);CHKERRQ(ierr);
25   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERBINARY,&isbinary);CHKERRQ(ierr);
26 #if defined(PETSC_HAVE_MATLAB_ENGINE)
27   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERMATLAB,&ismatlab);CHKERRQ(ierr);
28 #endif
29   if (iascii) {
30     PetscViewerFormat format;
31 
32     ierr = PetscViewerGetFormat(viewer, &format);CHKERRQ(ierr);
33     if (format != PETSC_VIEWER_ASCII_VTK && format != PETSC_VIEWER_ASCII_VTK_CELL && format != PETSC_VIEWER_ASCII_GLVIS) {
34       DMDALocalInfo info;
35       ierr = DMDAGetLocalInfo(da,&info);CHKERRQ(ierr);
36       ierr = PetscViewerASCIIPushSynchronized(viewer);CHKERRQ(ierr);
37       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"Processor [%d] M %D m %D w %D s %D\n",rank,dd->M,dd->m,dd->w,dd->s);CHKERRQ(ierr);
38       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"X range of indices: %D %D\n",info.xs,info.xs+info.xm);CHKERRQ(ierr);
39       ierr = PetscViewerFlush(viewer);CHKERRQ(ierr);
40       ierr = PetscViewerASCIIPopSynchronized(viewer);CHKERRQ(ierr);
41     } else if (format == PETSC_VIEWER_ASCII_GLVIS) {
42       ierr = DMView_DA_GLVis(da,viewer);CHKERRQ(ierr);
43     } else {
44       ierr = DMView_DA_VTK(da, viewer);CHKERRQ(ierr);
45     }
46   } else if (isdraw) {
47     PetscDraw draw;
48     double    ymin = -1,ymax = 1,xmin = -1,xmax = dd->M,x;
49     PetscInt  base;
50     char      node[10];
51     PetscBool isnull;
52 
53     ierr = PetscViewerDrawGetDraw(viewer,0,&draw);CHKERRQ(ierr);
54     ierr = PetscDrawIsNull(draw,&isnull);CHKERRQ(ierr);
55     if (isnull) PetscFunctionReturn(0);
56 
57     ierr = PetscDrawCheckResizedWindow(draw);CHKERRQ(ierr);
58     ierr = PetscDrawClear(draw);CHKERRQ(ierr);
59     ierr = PetscDrawSetCoordinates(draw,xmin,ymin,xmax,ymax);CHKERRQ(ierr);
60 
61     ierr = PetscDrawCollectiveBegin(draw);CHKERRQ(ierr);
62     /* first processor draws all node lines */
63     if (!rank) {
64       PetscInt xmin_tmp;
65       ymin = 0.0; ymax = 0.3;
66       for (xmin_tmp=0; xmin_tmp < dd->M; xmin_tmp++) {
67         ierr = PetscDrawLine(draw,(double)xmin_tmp,ymin,(double)xmin_tmp,ymax,PETSC_DRAW_BLACK);CHKERRQ(ierr);
68       }
69       xmin = 0.0; xmax = dd->M - 1;
70       ierr = PetscDrawLine(draw,xmin,ymin,xmax,ymin,PETSC_DRAW_BLACK);CHKERRQ(ierr);
71       ierr = PetscDrawLine(draw,xmin,ymax,xmax,ymax,PETSC_DRAW_BLACK);CHKERRQ(ierr);
72     }
73     ierr = PetscDrawCollectiveEnd(draw);CHKERRQ(ierr);
74     ierr = PetscDrawFlush(draw);CHKERRQ(ierr);
75     ierr = PetscDrawPause(draw);CHKERRQ(ierr);
76 
77     ierr = PetscDrawCollectiveBegin(draw);CHKERRQ(ierr);
78     /* draw my box */
79     ymin = 0; ymax = 0.3; xmin = dd->xs / dd->w; xmax = (dd->xe / dd->w)  - 1;
80     ierr = PetscDrawLine(draw,xmin,ymin,xmax,ymin,PETSC_DRAW_RED);CHKERRQ(ierr);
81     ierr = PetscDrawLine(draw,xmin,ymin,xmin,ymax,PETSC_DRAW_RED);CHKERRQ(ierr);
82     ierr = PetscDrawLine(draw,xmin,ymax,xmax,ymax,PETSC_DRAW_RED);CHKERRQ(ierr);
83     ierr = PetscDrawLine(draw,xmax,ymin,xmax,ymax,PETSC_DRAW_RED);CHKERRQ(ierr);
84     /* Put in index numbers */
85     base = dd->base / dd->w;
86     for (x=xmin; x<=xmax; x++) {
87       ierr = PetscSNPrintf(node,sizeof(node),"%d",(int)base++);CHKERRQ(ierr);
88       ierr = PetscDrawString(draw,x,ymin,PETSC_DRAW_RED,node);CHKERRQ(ierr);
89     }
90     ierr = PetscDrawCollectiveEnd(draw);CHKERRQ(ierr);
91     ierr = PetscDrawFlush(draw);CHKERRQ(ierr);
92     ierr = PetscDrawPause(draw);CHKERRQ(ierr);
93     ierr = PetscDrawSave(draw);CHKERRQ(ierr);
94   } else if (isbinary) {
95     ierr = DMView_DA_Binary(da,viewer);CHKERRQ(ierr);
96 #if defined(PETSC_HAVE_MATLAB_ENGINE)
97   } else if (ismatlab) {
98     ierr = DMView_DA_Matlab(da,viewer);CHKERRQ(ierr);
99 #endif
100   }
101   PetscFunctionReturn(0);
102 }
103 
104 
105 PetscErrorCode  DMSetUp_DA_1D(DM da)
106 {
107   DM_DA            *dd   = (DM_DA*)da->data;
108   const PetscInt   M     = dd->M;
109   const PetscInt   dof   = dd->w;
110   const PetscInt   s     = dd->s;
111   const PetscInt   sDist = s;  /* stencil distance in points */
112   const PetscInt   *lx   = dd->lx;
113   DMBoundaryType   bx    = dd->bx;
114   MPI_Comm         comm;
115   Vec              local, global;
116   VecScatter       gtol;
117   IS               to, from;
118   PetscBool        flg1 = PETSC_FALSE, flg2 = PETSC_FALSE;
119   PetscMPIInt      rank, size;
120   PetscInt         i,*idx,nn,left,xs,xe,x,Xs,Xe,start,m,IXs,IXe;
121   PetscErrorCode   ierr;
122 
123   PetscFunctionBegin;
124   ierr = PetscObjectGetComm((PetscObject) da, &comm);CHKERRQ(ierr);
125   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
126   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
127 
128   dd->p = 1;
129   dd->n = 1;
130   dd->m = size;
131   m     = dd->m;
132 
133   if (s > 0) {
134     /* if not communicating data then should be ok to have nothing on some processes */
135     if (M < m) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"More processes than data points! %D %D",m,M);
136     if ((M-1) < s && size > 1) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"Array is too small for stencil! %D %D",M-1,s);
137   }
138 
139   /*
140      Determine locally owned region
141      xs is the first local node number, x is the number of local nodes
142   */
143   if (!lx) {
144     ierr = PetscMalloc1(m, &dd->lx);CHKERRQ(ierr);
145     ierr = PetscOptionsGetBool(((PetscObject)da)->options,((PetscObject)da)->prefix,"-da_partition_blockcomm",&flg1,NULL);CHKERRQ(ierr);
146     ierr = PetscOptionsGetBool(((PetscObject)da)->options,((PetscObject)da)->prefix,"-da_partition_nodes_at_end",&flg2,NULL);CHKERRQ(ierr);
147     if (flg1) {      /* Block Comm type Distribution */
148       xs = rank*M/m;
149       x  = (rank + 1)*M/m - xs;
150     } else if (flg2) { /* The odd nodes are evenly distributed across last nodes */
151       x = (M + rank)/m;
152       if (M/m == x) xs = rank*x;
153       else          xs = rank*(x-1) + (M+rank)%(x*m);
154     } else { /* The odd nodes are evenly distributed across the first k nodes */
155       /* Regular PETSc Distribution */
156       x = M/m + ((M % m) > rank);
157       if (rank >= (M % m)) xs = (rank * (PetscInt)(M/m) + M % m);
158       else                 xs = rank * (PetscInt)(M/m) + rank;
159     }
160     ierr = MPI_Allgather(&xs,1,MPIU_INT,dd->lx,1,MPIU_INT,comm);CHKERRQ(ierr);
161     for (i=0; i<m-1; i++) dd->lx[i] = dd->lx[i+1] - dd->lx[i];
162     dd->lx[m-1] = M - dd->lx[m-1];
163   } else {
164     x  = lx[rank];
165     xs = 0;
166     for (i=0; i<rank; i++) xs += lx[i];
167     /* verify that data user provided is consistent */
168     left = xs;
169     for (i=rank; i<size; i++) left += lx[i];
170     if (left != M) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"Sum of lx across processors not equal to M %D %D",left,M);
171   }
172 
173   /*
174    check if the scatter requires more than one process neighbor or wraps around
175    the domain more than once
176   */
177   if ((x < s) & ((M > 1) | (bx == DM_BOUNDARY_PERIODIC))) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"Local x-width of domain x %D is smaller than stencil width s %D",x,s);
178 
179   xe  = xs + x;
180 
181   /* determine ghost region (Xs) and region scattered into (IXs)  */
182   if (xs-sDist > 0) {
183     Xs  = xs - sDist;
184     IXs = xs - sDist;
185   } else {
186     if (bx) Xs = xs - sDist;
187     else Xs = 0;
188     IXs = 0;
189   }
190   if (xe+sDist <= M) {
191     Xe  = xe + sDist;
192     IXe = xe + sDist;
193   } else {
194     if (bx) Xe = xe + sDist;
195     else Xe = M;
196     IXe = M;
197   }
198 
199   if (bx == DM_BOUNDARY_PERIODIC || bx == DM_BOUNDARY_MIRROR) {
200     Xs  = xs - sDist;
201     Xe  = xe + sDist;
202     IXs = xs - sDist;
203     IXe = xe + sDist;
204   }
205 
206   /* allocate the base parallel and sequential vectors */
207   dd->Nlocal = dof*x;
208   ierr       = VecCreateMPIWithArray(comm,dof,dd->Nlocal,PETSC_DECIDE,NULL,&global);CHKERRQ(ierr);
209   dd->nlocal = dof*(Xe-Xs);
210   ierr       = VecCreateSeqWithArray(PETSC_COMM_SELF,dof,dd->nlocal,NULL,&local);CHKERRQ(ierr);
211 
212   ierr = VecGetOwnershipRange(global,&start,NULL);CHKERRQ(ierr);
213 
214   /* Create Global to Local Vector Scatter Context */
215   /* global to local must retrieve ghost points */
216   ierr = ISCreateStride(comm,dof*(IXe-IXs),dof*(IXs-Xs),1,&to);CHKERRQ(ierr);
217 
218   ierr = PetscMalloc1(x+2*sDist,&idx);CHKERRQ(ierr);
219   ierr = PetscLogObjectMemory((PetscObject)da,(x+2*(sDist))*sizeof(PetscInt));CHKERRQ(ierr);
220 
221   for (i=0; i<IXs-Xs; i++) idx[i] = -1; /* prepend with -1s if needed for ghosted case*/
222 
223   nn = IXs-Xs;
224   if (bx == DM_BOUNDARY_PERIODIC) { /* Handle all cases with periodic first */
225     for (i=0; i<sDist; i++) {  /* Left ghost points */
226       if ((xs-sDist+i)>=0) idx[nn++] = xs-sDist+i;
227       else                 idx[nn++] = M+(xs-sDist+i);
228     }
229 
230     for (i=0; i<x; i++) idx [nn++] = xs + i;  /* Non-ghost points */
231 
232     for (i=0; i<sDist; i++) { /* Right ghost points */
233       if ((xe+i)<M) idx [nn++] =  xe+i;
234       else          idx [nn++] = (xe+i) - M;
235     }
236   } else if (bx == DM_BOUNDARY_MIRROR) { /* Handle all cases with periodic first */
237     for (i=0; i<(sDist); i++) {  /* Left ghost points */
238       if ((xs-sDist+i)>=0) idx[nn++] = xs-sDist+i;
239       else                 idx[nn++] = sDist - i;
240     }
241 
242     for (i=0; i<x; i++) idx [nn++] = xs + i;  /* Non-ghost points */
243 
244     for (i=0; i<(sDist); i++) { /* Right ghost points */
245       if ((xe+i)<M) idx[nn++] =  xe+i;
246       else          idx[nn++] = M - (i + 2);
247     }
248   } else {      /* Now do all cases with no periodicity */
249     if (0 <= xs-sDist) {
250       for (i=0; i<sDist; i++) idx[nn++] = xs - sDist + i;
251     } else {
252       for (i=0; i<xs; i++) idx[nn++] = i;
253     }
254 
255     for (i=0; i<x; i++) idx [nn++] = xs + i;
256 
257     if ((xe+sDist)<=M) {
258       for (i=0; i<sDist; i++) idx[nn++]=xe+i;
259     } else {
260       for (i=xe; i<M; i++) idx[nn++]=i;
261     }
262   }
263 
264   ierr = ISCreateBlock(comm,dof,nn-IXs+Xs,&idx[IXs-Xs],PETSC_USE_POINTER,&from);CHKERRQ(ierr);
265   ierr = VecScatterCreate(global,from,local,to,&gtol);CHKERRQ(ierr);
266   ierr = PetscLogObjectParent((PetscObject)da,(PetscObject)gtol);CHKERRQ(ierr);
267   ierr = ISDestroy(&to);CHKERRQ(ierr);
268   ierr = ISDestroy(&from);CHKERRQ(ierr);
269   ierr = VecDestroy(&local);CHKERRQ(ierr);
270   ierr = VecDestroy(&global);CHKERRQ(ierr);
271 
272   dd->xs = dof*xs; dd->xe = dof*xe; dd->ys = 0; dd->ye = 1; dd->zs = 0; dd->ze = 1;
273   dd->Xs = dof*Xs; dd->Xe = dof*Xe; dd->Ys = 0; dd->Ye = 1; dd->Zs = 0; dd->Ze = 1;
274 
275   dd->gtol      = gtol;
276   dd->base      = dof*xs;
277   da->ops->view = DMView_DA_1d;
278 
279   /*
280      Set the local to global ordering in the global vector, this allows use
281      of VecSetValuesLocal().
282   */
283   for (i=0; i<Xe-IXe; i++) idx[nn++] = -1; /* pad with -1s if needed for ghosted case*/
284 
285   ierr = ISLocalToGlobalMappingCreate(comm,dof,nn,idx,PETSC_OWN_POINTER,&da->ltogmap);CHKERRQ(ierr);
286   ierr = PetscLogObjectParent((PetscObject)da,(PetscObject)da->ltogmap);CHKERRQ(ierr);
287 
288   PetscFunctionReturn(0);
289 }
290 
291 
292 /*@C
293    DMDACreate1d - Creates an object that will manage the communication of  one-dimensional
294    regular array data that is distributed across some processors.
295 
296    Collective on MPI_Comm
297 
298    Input Parameters:
299 +  comm - MPI communicator
300 .  bx - type of ghost cells at the boundary the array should have, if any. Use
301           DM_BOUNDARY_NONE, DM_BOUNDARY_GHOSTED, or DM_BOUNDARY_PERIODIC.
302 .  M - global dimension of the array
303             from the command line with -da_grid_x <M>)
304 .  dof - number of degrees of freedom per node
305 .  s - stencil width
306 -  lx - array containing number of nodes in the X direction on each processor,
307         or NULL. If non-null, must be of length as the number of processes in the MPI_Comm.
308 
309    Output Parameter:
310 .  da - the resulting distributed array object
311 
312    Options Database Key:
313 +  -dm_view - Calls DMView() at the conclusion of DMDACreate1d()
314 .  -da_grid_x <nx> - number of grid points in x direction; can set if M < 0
315 .  -da_refine_x <rx> - refinement factor
316 -  -da_refine <n> - refine the DMDA n times before creating it, if M < 0
317 
318    Level: beginner
319 
320    Notes:
321    The array data itself is NOT stored in the DMDA, it is stored in Vec objects;
322    The appropriate vector objects can be obtained with calls to DMCreateGlobalVector()
323    and DMCreateLocalVector() and calls to VecDuplicate() if more are needed.
324 
325    You must call DMSetUp() after this call before using this DM.
326 
327    If you wish to use the options database to change values in the DMDA call DMSetFromOptions() after this call
328    but before DMSetUp().
329 
330 .keywords: distributed array, create, one-dimensional
331 
332 .seealso: DMDestroy(), DMView(), DMDACreate2d(), DMDACreate3d(), DMGlobalToLocalBegin(), DMDASetRefinementFactor(),
333           DMGlobalToLocalEnd(), DMLocalToGlobalBegin(), DMLocalToLocalBegin(), DMLocalToLocalEnd(), DMDAGetRefinementFactor(),
334           DMDAGetInfo(), DMCreateGlobalVector(), DMCreateLocalVector(), DMDACreateNaturalVector(), DMLoad(), DMDAGetOwnershipRanges()
335 
336 @*/
337 PetscErrorCode  DMDACreate1d(MPI_Comm comm, DMBoundaryType bx, PetscInt M, PetscInt dof, PetscInt s, const PetscInt lx[], DM *da)
338 {
339   PetscErrorCode ierr;
340   PetscMPIInt    size;
341 
342   PetscFunctionBegin;
343   ierr = DMDACreate(comm, da);CHKERRQ(ierr);
344   ierr = DMSetDimension(*da, 1);CHKERRQ(ierr);
345   ierr = DMDASetSizes(*da, M, 1, 1);CHKERRQ(ierr);
346   ierr = MPI_Comm_size(comm, &size);CHKERRQ(ierr);
347   ierr = DMDASetNumProcs(*da, size, PETSC_DECIDE, PETSC_DECIDE);CHKERRQ(ierr);
348   ierr = DMDASetBoundaryType(*da, bx, DM_BOUNDARY_NONE, DM_BOUNDARY_NONE);CHKERRQ(ierr);
349   ierr = DMDASetDof(*da, dof);CHKERRQ(ierr);
350   ierr = DMDASetStencilWidth(*da, s);CHKERRQ(ierr);
351   ierr = DMDASetOwnershipRanges(*da, lx, NULL, NULL);CHKERRQ(ierr);
352   PetscFunctionReturn(0);
353 }
354