xref: /petsc/src/dm/impls/da/da1.c (revision 7453f77509fc006cbcc7de2dd772f60dc49feac5)
1 
2 /*
3    Code for manipulating distributed regular 1d arrays in parallel.
4    This file was created by Peter Mell   6/30/95
5 */
6 
7 #include <petsc/private/dmdaimpl.h>     /*I  "petscdmda.h"   I*/
8 
9 #include <petscdraw.h>
10 #undef __FUNCT__
11 #define __FUNCT__ "DMView_DA_1d"
12 static PetscErrorCode DMView_DA_1d(DM da,PetscViewer viewer)
13 {
14   PetscErrorCode ierr;
15   PetscMPIInt    rank;
16   PetscBool      iascii,isdraw,isbinary;
17   DM_DA          *dd = (DM_DA*)da->data;
18 #if defined(PETSC_HAVE_MATLAB_ENGINE)
19   PetscBool ismatlab;
20 #endif
21 
22   PetscFunctionBegin;
23   ierr = MPI_Comm_rank(PetscObjectComm((PetscObject)da),&rank);CHKERRQ(ierr);
24 
25   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERASCII,&iascii);CHKERRQ(ierr);
26   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERDRAW,&isdraw);CHKERRQ(ierr);
27   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERBINARY,&isbinary);CHKERRQ(ierr);
28 #if defined(PETSC_HAVE_MATLAB_ENGINE)
29   ierr = PetscObjectTypeCompare((PetscObject)viewer,PETSCVIEWERMATLAB,&ismatlab);CHKERRQ(ierr);
30 #endif
31   if (iascii) {
32     PetscViewerFormat format;
33 
34     ierr = PetscViewerGetFormat(viewer, &format);CHKERRQ(ierr);
35     if (format != PETSC_VIEWER_ASCII_VTK && format != PETSC_VIEWER_ASCII_VTK_CELL) {
36       DMDALocalInfo info;
37       ierr = DMDAGetLocalInfo(da,&info);CHKERRQ(ierr);
38       ierr = PetscViewerASCIIPushSynchronized(viewer);CHKERRQ(ierr);
39       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"Processor [%d] M %D m %D w %D s %D\n",rank,dd->M,dd->m,dd->w,dd->s);CHKERRQ(ierr);
40       ierr = PetscViewerASCIISynchronizedPrintf(viewer,"X range of indices: %D %D\n",info.xs,info.xs+info.xm);CHKERRQ(ierr);
41       ierr = PetscViewerFlush(viewer);CHKERRQ(ierr);
42       ierr = PetscViewerASCIIPopSynchronized(viewer);CHKERRQ(ierr);
43     } else {
44       ierr = DMView_DA_VTK(da, viewer);CHKERRQ(ierr);
45     }
46   } else if (isdraw) {
47     PetscDraw draw;
48     double    ymin = -1,ymax = 1,xmin = -1,xmax = dd->M,x;
49     PetscInt  base;
50     char      node[10];
51     PetscBool isnull;
52 
53     ierr = PetscViewerDrawGetDraw(viewer,0,&draw);CHKERRQ(ierr);
54     ierr = PetscDrawIsNull(draw,&isnull);CHKERRQ(ierr);
55     if (isnull) PetscFunctionReturn(0);
56 
57     ierr = PetscDrawCheckResizedWindow(draw);CHKERRQ(ierr);
58     ierr = PetscDrawClear(draw);CHKERRQ(ierr);
59     ierr = PetscDrawSetCoordinates(draw,xmin,ymin,xmax,ymax);CHKERRQ(ierr);
60 
61     ierr = PetscDrawCollectiveBegin(draw);CHKERRQ(ierr);
62     /* first processor draws all node lines */
63     if (!rank) {
64       PetscInt xmin_tmp;
65       ymin = 0.0; ymax = 0.3;
66       for (xmin_tmp=0; xmin_tmp < dd->M; xmin_tmp++) {
67         ierr = PetscDrawLine(draw,(double)xmin_tmp,ymin,(double)xmin_tmp,ymax,PETSC_DRAW_BLACK);CHKERRQ(ierr);
68       }
69       xmin = 0.0; xmax = dd->M - 1;
70       ierr = PetscDrawLine(draw,xmin,ymin,xmax,ymin,PETSC_DRAW_BLACK);CHKERRQ(ierr);
71       ierr = PetscDrawLine(draw,xmin,ymax,xmax,ymax,PETSC_DRAW_BLACK);CHKERRQ(ierr);
72     }
73     ierr = PetscDrawCollectiveEnd(draw);CHKERRQ(ierr);
74     ierr = PetscDrawFlush(draw);CHKERRQ(ierr);
75     ierr = PetscDrawPause(draw);CHKERRQ(ierr);
76 
77     ierr = PetscDrawCollectiveBegin(draw);CHKERRQ(ierr);
78     /* draw my box */
79     ymin = 0; ymax = 0.3; xmin = dd->xs / dd->w; xmax = (dd->xe / dd->w)  - 1;
80     ierr = PetscDrawLine(draw,xmin,ymin,xmax,ymin,PETSC_DRAW_RED);CHKERRQ(ierr);
81     ierr = PetscDrawLine(draw,xmin,ymin,xmin,ymax,PETSC_DRAW_RED);CHKERRQ(ierr);
82     ierr = PetscDrawLine(draw,xmin,ymax,xmax,ymax,PETSC_DRAW_RED);CHKERRQ(ierr);
83     ierr = PetscDrawLine(draw,xmax,ymin,xmax,ymax,PETSC_DRAW_RED);CHKERRQ(ierr);
84     /* Put in index numbers */
85     base = dd->base / dd->w;
86     for (x=xmin; x<=xmax; x++) {
87       ierr = PetscSNPrintf(node,sizeof(node),"%d",(int)base++);CHKERRQ(ierr);
88       ierr = PetscDrawString(draw,x,ymin,PETSC_DRAW_RED,node);CHKERRQ(ierr);
89     }
90     ierr = PetscDrawCollectiveEnd(draw);CHKERRQ(ierr);
91     ierr = PetscDrawFlush(draw);CHKERRQ(ierr);
92     ierr = PetscDrawPause(draw);CHKERRQ(ierr);
93     ierr = PetscDrawSave(draw);CHKERRQ(ierr);
94   } else if (isbinary) {
95     ierr = DMView_DA_Binary(da,viewer);CHKERRQ(ierr);
96 #if defined(PETSC_HAVE_MATLAB_ENGINE)
97   } else if (ismatlab) {
98     ierr = DMView_DA_Matlab(da,viewer);CHKERRQ(ierr);
99 #endif
100   }
101   PetscFunctionReturn(0);
102 }
103 
104 
105 #undef __FUNCT__
106 #define __FUNCT__ "DMSetUp_DA_1D"
107 PetscErrorCode  DMSetUp_DA_1D(DM da)
108 {
109   DM_DA            *dd   = (DM_DA*)da->data;
110   const PetscInt   M     = dd->M;
111   const PetscInt   dof   = dd->w;
112   const PetscInt   s     = dd->s;
113   const PetscInt   sDist = s;  /* stencil distance in points */
114   const PetscInt   *lx   = dd->lx;
115   DMBoundaryType   bx    = dd->bx;
116   MPI_Comm         comm;
117   Vec              local, global;
118   VecScatter       gtol;
119   IS               to, from;
120   PetscBool        flg1 = PETSC_FALSE, flg2 = PETSC_FALSE;
121   PetscMPIInt      rank, size;
122   PetscInt         i,*idx,nn,left,xs,xe,x,Xs,Xe,start,m,IXs,IXe;
123   PetscErrorCode   ierr;
124 
125   PetscFunctionBegin;
126   ierr = PetscObjectGetComm((PetscObject) da, &comm);CHKERRQ(ierr);
127   ierr = MPI_Comm_size(comm,&size);CHKERRQ(ierr);
128   ierr = MPI_Comm_rank(comm,&rank);CHKERRQ(ierr);
129 
130   dd->p = 1;
131   dd->n = 1;
132   dd->m = size;
133   m     = dd->m;
134 
135   if (s > 0) {
136     /* if not communicating data then should be ok to have nothing on some processes */
137     if (M < m) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"More processes than data points! %D %D",m,M);
138     if ((M-1) < s && size > 1) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"Array is too small for stencil! %D %D",M-1,s);
139   }
140 
141   /*
142      Determine locally owned region
143      xs is the first local node number, x is the number of local nodes
144   */
145   if (!lx) {
146     ierr = PetscMalloc1(m, &dd->lx);CHKERRQ(ierr);
147     ierr = PetscOptionsGetBool(((PetscObject)da)->options,((PetscObject)da)->prefix,"-da_partition_blockcomm",&flg1,NULL);CHKERRQ(ierr);
148     ierr = PetscOptionsGetBool(((PetscObject)da)->options,((PetscObject)da)->prefix,"-da_partition_nodes_at_end",&flg2,NULL);CHKERRQ(ierr);
149     if (flg1) {      /* Block Comm type Distribution */
150       xs = rank*M/m;
151       x  = (rank + 1)*M/m - xs;
152     } else if (flg2) { /* The odd nodes are evenly distributed across last nodes */
153       x = (M + rank)/m;
154       if (M/m == x) xs = rank*x;
155       else          xs = rank*(x-1) + (M+rank)%(x*m);
156     } else { /* The odd nodes are evenly distributed across the first k nodes */
157       /* Regular PETSc Distribution */
158       x = M/m + ((M % m) > rank);
159       if (rank >= (M % m)) xs = (rank * (PetscInt)(M/m) + M % m);
160       else                 xs = rank * (PetscInt)(M/m) + rank;
161     }
162     ierr = MPI_Allgather(&xs,1,MPIU_INT,dd->lx,1,MPIU_INT,comm);CHKERRQ(ierr);
163     for (i=0; i<m-1; i++) dd->lx[i] = dd->lx[i+1] - dd->lx[i];
164     dd->lx[m-1] = M - dd->lx[m-1];
165   } else {
166     x  = lx[rank];
167     xs = 0;
168     for (i=0; i<rank; i++) xs += lx[i];
169     /* verify that data user provided is consistent */
170     left = xs;
171     for (i=rank; i<size; i++) left += lx[i];
172     if (left != M) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"Sum of lx across processors not equal to M %D %D",left,M);
173   }
174 
175   /*
176    check if the scatter requires more than one process neighbor or wraps around
177    the domain more than once
178   */
179   if ((x < s) & ((M > 1) | (bx == DM_BOUNDARY_PERIODIC))) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE,"Local x-width of domain x %D is smaller than stencil width s %D",x,s);
180 
181   xe  = xs + x;
182 
183   /* determine ghost region (Xs) and region scattered into (IXs)  */
184   if (xs-sDist > 0) {
185     Xs  = xs - sDist;
186     IXs = xs - sDist;
187   } else {
188     if (bx) Xs = xs - sDist;
189     else Xs = 0;
190     IXs = 0;
191   }
192   if (xe+sDist <= M) {
193     Xe  = xe + sDist;
194     IXe = xe + sDist;
195   } else {
196     if (bx) Xe = xe + sDist;
197     else Xe = M;
198     IXe = M;
199   }
200 
201   if (bx == DM_BOUNDARY_PERIODIC || bx == DM_BOUNDARY_MIRROR) {
202     Xs  = xs - sDist;
203     Xe  = xe + sDist;
204     IXs = xs - sDist;
205     IXe = xe + sDist;
206   }
207 
208   /* allocate the base parallel and sequential vectors */
209   dd->Nlocal = dof*x;
210   ierr       = VecCreateMPIWithArray(comm,dof,dd->Nlocal,PETSC_DECIDE,NULL,&global);CHKERRQ(ierr);
211   dd->nlocal = dof*(Xe-Xs);
212   ierr       = VecCreateSeqWithArray(PETSC_COMM_SELF,dof,dd->nlocal,NULL,&local);CHKERRQ(ierr);
213 
214   ierr = VecGetOwnershipRange(global,&start,NULL);CHKERRQ(ierr);
215 
216   /* Create Global to Local Vector Scatter Context */
217   /* global to local must retrieve ghost points */
218   ierr = ISCreateStride(comm,dof*(IXe-IXs),dof*(IXs-Xs),1,&to);CHKERRQ(ierr);
219 
220   ierr = PetscMalloc1(x+2*sDist,&idx);CHKERRQ(ierr);
221   ierr = PetscLogObjectMemory((PetscObject)da,(x+2*(sDist))*sizeof(PetscInt));CHKERRQ(ierr);
222 
223   for (i=0; i<IXs-Xs; i++) idx[i] = -1; /* prepend with -1s if needed for ghosted case*/
224 
225   nn = IXs-Xs;
226   if (bx == DM_BOUNDARY_PERIODIC) { /* Handle all cases with periodic first */
227     for (i=0; i<sDist; i++) {  /* Left ghost points */
228       if ((xs-sDist+i)>=0) idx[nn++] = xs-sDist+i;
229       else                 idx[nn++] = M+(xs-sDist+i);
230     }
231 
232     for (i=0; i<x; i++) idx [nn++] = xs + i;  /* Non-ghost points */
233 
234     for (i=0; i<sDist; i++) { /* Right ghost points */
235       if ((xe+i)<M) idx [nn++] =  xe+i;
236       else          idx [nn++] = (xe+i) - M;
237     }
238   } else if (bx == DM_BOUNDARY_MIRROR) { /* Handle all cases with periodic first */
239     for (i=0; i<(sDist); i++) {  /* Left ghost points */
240       if ((xs-sDist+i)>=0) idx[nn++] = xs-sDist+i;
241       else                 idx[nn++] = sDist - i;
242     }
243 
244     for (i=0; i<x; i++) idx [nn++] = xs + i;  /* Non-ghost points */
245 
246     for (i=0; i<(sDist); i++) { /* Right ghost points */
247       if ((xe+i)<M) idx[nn++] =  xe+i;
248       else          idx[nn++] = M - (i + 1);
249     }
250   } else {      /* Now do all cases with no periodicity */
251     if (0 <= xs-sDist) {
252       for (i=0; i<sDist; i++) idx[nn++] = xs - sDist + i;
253     } else {
254       for (i=0; i<xs; i++) idx[nn++] = i;
255     }
256 
257     for (i=0; i<x; i++) idx [nn++] = xs + i;
258 
259     if ((xe+sDist)<=M) {
260       for (i=0; i<sDist; i++) idx[nn++]=xe+i;
261     } else {
262       for (i=xe; i<M; i++) idx[nn++]=i;
263     }
264   }
265 
266   ierr = ISCreateBlock(comm,dof,nn-IXs+Xs,&idx[IXs-Xs],PETSC_USE_POINTER,&from);CHKERRQ(ierr);
267   ierr = VecScatterCreate(global,from,local,to,&gtol);CHKERRQ(ierr);
268   ierr = PetscLogObjectParent((PetscObject)da,(PetscObject)gtol);CHKERRQ(ierr);
269   ierr = ISDestroy(&to);CHKERRQ(ierr);
270   ierr = ISDestroy(&from);CHKERRQ(ierr);
271   ierr = VecDestroy(&local);CHKERRQ(ierr);
272   ierr = VecDestroy(&global);CHKERRQ(ierr);
273 
274   dd->xs = dof*xs; dd->xe = dof*xe; dd->ys = 0; dd->ye = 1; dd->zs = 0; dd->ze = 1;
275   dd->Xs = dof*Xs; dd->Xe = dof*Xe; dd->Ys = 0; dd->Ye = 1; dd->Zs = 0; dd->Ze = 1;
276 
277   dd->gtol      = gtol;
278   dd->base      = dof*xs;
279   da->ops->view = DMView_DA_1d;
280 
281   /*
282      Set the local to global ordering in the global vector, this allows use
283      of VecSetValuesLocal().
284   */
285   for (i=0; i<Xe-IXe; i++) idx[nn++] = -1; /* pad with -1s if needed for ghosted case*/
286 
287   ierr = ISLocalToGlobalMappingCreate(comm,dof,nn,idx,PETSC_OWN_POINTER,&da->ltogmap);CHKERRQ(ierr);
288   ierr = PetscLogObjectParent((PetscObject)da,(PetscObject)da->ltogmap);CHKERRQ(ierr);
289 
290   PetscFunctionReturn(0);
291 }
292 
293 
294 #undef __FUNCT__
295 #define __FUNCT__ "DMDACreate1d"
296 /*@C
297    DMDACreate1d - Creates an object that will manage the communication of  one-dimensional
298    regular array data that is distributed across some processors.
299 
300    Collective on MPI_Comm
301 
302    Input Parameters:
303 +  comm - MPI communicator
304 .  bx - type of ghost cells at the boundary the array should have, if any. Use
305           DM_BOUNDARY_NONE, DM_BOUNDARY_GHOSTED, or DM_BOUNDARY_PERIODIC.
306 .  M - global dimension of the array
307             from the command line with -da_grid_x <M>)
308 .  dof - number of degrees of freedom per node
309 .  s - stencil width
310 -  lx - array containing number of nodes in the X direction on each processor,
311         or NULL. If non-null, must be of length as the number of processes in the MPI_Comm.
312 
313    Output Parameter:
314 .  da - the resulting distributed array object
315 
316    Options Database Key:
317 +  -dm_view - Calls DMView() at the conclusion of DMDACreate1d()
318 .  -da_grid_x <nx> - number of grid points in x direction; can set if M < 0
319 .  -da_refine_x <rx> - refinement factor
320 -  -da_refine <n> - refine the DMDA n times before creating it, if M < 0
321 
322    Level: beginner
323 
324    Notes:
325    The array data itself is NOT stored in the DMDA, it is stored in Vec objects;
326    The appropriate vector objects can be obtained with calls to DMCreateGlobalVector()
327    and DMCreateLocalVector() and calls to VecDuplicate() if more are needed.
328 
329    You must call DMSetUp() after this call before using this DM.
330 
331    If you wish to use the options database to change values in the DMDA call DMSetFromOptions() after this call
332    but before DMSetUp().
333 
334 .keywords: distributed array, create, one-dimensional
335 
336 .seealso: DMDestroy(), DMView(), DMDACreate2d(), DMDACreate3d(), DMGlobalToLocalBegin(), DMDASetRefinementFactor(),
337           DMGlobalToLocalEnd(), DMLocalToGlobalBegin(), DMLocalToLocalBegin(), DMLocalToLocalEnd(), DMDAGetRefinementFactor(),
338           DMDAGetInfo(), DMCreateGlobalVector(), DMCreateLocalVector(), DMDACreateNaturalVector(), DMLoad(), DMDAGetOwnershipRanges()
339 
340 @*/
341 PetscErrorCode  DMDACreate1d(MPI_Comm comm, DMBoundaryType bx, PetscInt M, PetscInt dof, PetscInt s, const PetscInt lx[], DM *da)
342 {
343   PetscErrorCode ierr;
344   PetscMPIInt    size;
345 
346   PetscFunctionBegin;
347   ierr = DMDACreate(comm, da);CHKERRQ(ierr);
348   ierr = DMSetDimension(*da, 1);CHKERRQ(ierr);
349   ierr = DMDASetSizes(*da, M, 1, 1);CHKERRQ(ierr);
350   ierr = MPI_Comm_size(comm, &size);CHKERRQ(ierr);
351   ierr = DMDASetNumProcs(*da, size, PETSC_DECIDE, PETSC_DECIDE);CHKERRQ(ierr);
352   ierr = DMDASetBoundaryType(*da, bx, DM_BOUNDARY_NONE, DM_BOUNDARY_NONE);CHKERRQ(ierr);
353   ierr = DMDASetDof(*da, dof);CHKERRQ(ierr);
354   ierr = DMDASetStencilWidth(*da, s);CHKERRQ(ierr);
355   ierr = DMDASetOwnershipRanges(*da, lx, NULL, NULL);CHKERRQ(ierr);
356   PetscFunctionReturn(0);
357 }
358