xref: /petsc/src/ts/characteristic/interface/characteristic.c (revision 030f984af8d8bb4c203755d35bded3c05b3d83ce)
1 
2 #include <petsc/private/characteristicimpl.h> /*I "petsccharacteristic.h" I*/
3 #include <petscdmda.h>
4 #include <petscviewer.h>
5 
6 PetscClassId  CHARACTERISTIC_CLASSID;
7 PetscLogEvent CHARACTERISTIC_SetUp, CHARACTERISTIC_Solve, CHARACTERISTIC_QueueSetup, CHARACTERISTIC_DAUpdate;
8 PetscLogEvent CHARACTERISTIC_HalfTimeLocal, CHARACTERISTIC_HalfTimeRemote, CHARACTERISTIC_HalfTimeExchange;
9 PetscLogEvent CHARACTERISTIC_FullTimeLocal, CHARACTERISTIC_FullTimeRemote, CHARACTERISTIC_FullTimeExchange;
10 /*
11    Contains the list of registered characteristic routines
12 */
13 PetscFunctionList CharacteristicList              = NULL;
14 PetscBool         CharacteristicRegisterAllCalled = PETSC_FALSE;
15 
16 PetscErrorCode DMDAGetNeighborsRank(DM, PetscMPIInt []);
17 PetscInt       DMDAGetNeighborRelative(DM, PetscReal, PetscReal);
18 PetscErrorCode DMDAMapToPeriodicDomain(DM, PetscScalar []);
19 
20 PetscErrorCode CharacteristicHeapSort(Characteristic, Queue, PetscInt);
21 PetscErrorCode CharacteristicSiftDown(Characteristic, Queue, PetscInt, PetscInt);
22 
23 PetscErrorCode CharacteristicView(Characteristic c, PetscViewer viewer)
24 {
25   PetscBool      iascii;
26   PetscErrorCode ierr;
27 
28   PetscFunctionBegin;
29   PetscValidHeaderSpecific(c, CHARACTERISTIC_CLASSID, 1);
30   if (!viewer) {
31     ierr = PetscViewerASCIIGetStdout(PetscObjectComm((PetscObject)c),&viewer);CHKERRQ(ierr);
32   }
33   PetscValidHeaderSpecific(viewer, PETSC_VIEWER_CLASSID, 2);
34   PetscCheckSameComm(c, 1, viewer, 2);
35 
36   ierr = PetscObjectTypeCompare((PetscObject) viewer, PETSCVIEWERASCII, &iascii);CHKERRQ(ierr);
37   if (!iascii) {
38     if (c->ops->view) {
39       ierr = (*c->ops->view)(c, viewer);CHKERRQ(ierr);
40     }
41   }
42   PetscFunctionReturn(0);
43 }
44 
45 PetscErrorCode CharacteristicDestroy(Characteristic *c)
46 {
47   PetscErrorCode ierr;
48 
49   PetscFunctionBegin;
50   if (!*c) PetscFunctionReturn(0);
51   PetscValidHeaderSpecific(*c, CHARACTERISTIC_CLASSID, 1);
52   if (--((PetscObject)(*c))->refct > 0) PetscFunctionReturn(0);
53 
54   if ((*c)->ops->destroy) {
55     ierr = (*(*c)->ops->destroy)((*c));CHKERRQ(ierr);
56   }
57   ierr = MPI_Type_free(&(*c)->itemType);CHKERRMPI(ierr);
58   ierr = PetscFree((*c)->queue);CHKERRQ(ierr);
59   ierr = PetscFree((*c)->queueLocal);CHKERRQ(ierr);
60   ierr = PetscFree((*c)->queueRemote);CHKERRQ(ierr);
61   ierr = PetscFree((*c)->neighbors);CHKERRQ(ierr);
62   ierr = PetscFree((*c)->needCount);CHKERRQ(ierr);
63   ierr = PetscFree((*c)->localOffsets);CHKERRQ(ierr);
64   ierr = PetscFree((*c)->fillCount);CHKERRQ(ierr);
65   ierr = PetscFree((*c)->remoteOffsets);CHKERRQ(ierr);
66   ierr = PetscFree((*c)->request);CHKERRQ(ierr);
67   ierr = PetscFree((*c)->status);CHKERRQ(ierr);
68   ierr = PetscHeaderDestroy(c);CHKERRQ(ierr);
69   PetscFunctionReturn(0);
70 }
71 
72 PetscErrorCode CharacteristicCreate(MPI_Comm comm, Characteristic *c)
73 {
74   Characteristic newC;
75   PetscErrorCode ierr;
76 
77   PetscFunctionBegin;
78   PetscValidPointer(c, 2);
79   *c = NULL;
80   ierr = CharacteristicInitializePackage();CHKERRQ(ierr);
81 
82   ierr = PetscHeaderCreate(newC, CHARACTERISTIC_CLASSID, "Characteristic", "Characteristic", "Characteristic", comm, CharacteristicDestroy, CharacteristicView);CHKERRQ(ierr);
83   *c   = newC;
84 
85   newC->structured          = PETSC_TRUE;
86   newC->numIds              = 0;
87   newC->velocityDA          = NULL;
88   newC->velocity            = NULL;
89   newC->velocityOld         = NULL;
90   newC->numVelocityComp     = 0;
91   newC->velocityComp        = NULL;
92   newC->velocityInterp      = NULL;
93   newC->velocityInterpLocal = NULL;
94   newC->velocityCtx         = NULL;
95   newC->fieldDA             = NULL;
96   newC->field               = NULL;
97   newC->numFieldComp        = 0;
98   newC->fieldComp           = NULL;
99   newC->fieldInterp         = NULL;
100   newC->fieldInterpLocal    = NULL;
101   newC->fieldCtx            = NULL;
102   newC->itemType            = 0;
103   newC->queue               = NULL;
104   newC->queueSize           = 0;
105   newC->queueMax            = 0;
106   newC->queueLocal          = NULL;
107   newC->queueLocalSize      = 0;
108   newC->queueLocalMax       = 0;
109   newC->queueRemote         = NULL;
110   newC->queueRemoteSize     = 0;
111   newC->queueRemoteMax      = 0;
112   newC->numNeighbors        = 0;
113   newC->neighbors           = NULL;
114   newC->needCount           = NULL;
115   newC->localOffsets        = NULL;
116   newC->fillCount           = NULL;
117   newC->remoteOffsets       = NULL;
118   newC->request             = NULL;
119   newC->status              = NULL;
120   PetscFunctionReturn(0);
121 }
122 
123 /*@C
124    CharacteristicSetType - Builds Characteristic for a particular solver.
125 
126    Logically Collective on Characteristic
127 
128    Input Parameters:
129 +  c    - the method of characteristics context
130 -  type - a known method
131 
132    Options Database Key:
133 .  -characteristic_type <method> - Sets the method; use -help for a list
134     of available methods
135 
136    Notes:
137    See "include/petsccharacteristic.h" for available methods
138 
139   Normally, it is best to use the CharacteristicSetFromOptions() command and
140   then set the Characteristic type from the options database rather than by using
141   this routine.  Using the options database provides the user with
142   maximum flexibility in evaluating the many different Krylov methods.
143   The CharacteristicSetType() routine is provided for those situations where it
144   is necessary to set the iterative solver independently of the command
145   line or options database.  This might be the case, for example, when
146   the choice of iterative solver changes during the execution of the
147   program, and the user's application is taking responsibility for
148   choosing the appropriate method.  In other words, this routine is
149   not for beginners.
150 
151   Level: intermediate
152 
153 .seealso: CharacteristicType
154 
155 @*/
156 PetscErrorCode CharacteristicSetType(Characteristic c, CharacteristicType type)
157 {
158   PetscErrorCode ierr, (*r)(Characteristic);
159   PetscBool      match;
160 
161   PetscFunctionBegin;
162   PetscValidHeaderSpecific(c, CHARACTERISTIC_CLASSID, 1);
163   PetscValidCharPointer(type, 2);
164 
165   ierr = PetscObjectTypeCompare((PetscObject) c, type, &match);CHKERRQ(ierr);
166   if (match) PetscFunctionReturn(0);
167 
168   if (c->data) {
169     /* destroy the old private Characteristic context */
170     ierr = (*c->ops->destroy)(c);CHKERRQ(ierr);
171     c->ops->destroy = NULL;
172     c->data         = NULL;
173   }
174 
175   ierr =  PetscFunctionListFind(CharacteristicList,type,&r);CHKERRQ(ierr);
176   if (!r) SETERRQ1(PETSC_COMM_SELF,PETSC_ERR_ARG_UNKNOWN_TYPE, "Unknown Characteristic type given: %s", type);
177   c->setupcalled = 0;
178   ierr = (*r)(c);CHKERRQ(ierr);
179   ierr = PetscObjectChangeTypeName((PetscObject) c, type);CHKERRQ(ierr);
180   PetscFunctionReturn(0);
181 }
182 
183 /*@
184    CharacteristicSetUp - Sets up the internal data structures for the
185    later use of an iterative solver.
186 
187    Collective on Characteristic
188 
189    Input Parameter:
190 .  ksp   - iterative context obtained from CharacteristicCreate()
191 
192    Level: developer
193 
194 .seealso: CharacteristicCreate(), CharacteristicSolve(), CharacteristicDestroy()
195 @*/
196 PetscErrorCode CharacteristicSetUp(Characteristic c)
197 {
198   PetscErrorCode ierr;
199 
200   PetscFunctionBegin;
201   PetscValidHeaderSpecific(c, CHARACTERISTIC_CLASSID, 1);
202 
203   if (!((PetscObject)c)->type_name) {
204     ierr = CharacteristicSetType(c, CHARACTERISTICDA);CHKERRQ(ierr);
205   }
206 
207   if (c->setupcalled == 2) PetscFunctionReturn(0);
208 
209   ierr = PetscLogEventBegin(CHARACTERISTIC_SetUp,c,NULL,NULL,NULL);CHKERRQ(ierr);
210   if (!c->setupcalled) {
211     ierr = (*c->ops->setup)(c);CHKERRQ(ierr);
212   }
213   ierr = PetscLogEventEnd(CHARACTERISTIC_SetUp,c,NULL,NULL,NULL);CHKERRQ(ierr);
214   c->setupcalled = 2;
215   PetscFunctionReturn(0);
216 }
217 
218 /*@C
219   CharacteristicRegister -  Adds a solver to the method of characteristics package.
220 
221    Not Collective
222 
223    Input Parameters:
224 +  name_solver - name of a new user-defined solver
225 -  routine_create - routine to create method context
226 
227   Sample usage:
228 .vb
229     CharacteristicRegister("my_char", MyCharCreate);
230 .ve
231 
232   Then, your Characteristic type can be chosen with the procedural interface via
233 .vb
234     CharacteristicCreate(MPI_Comm, Characteristic* &char);
235     CharacteristicSetType(char,"my_char");
236 .ve
237    or at runtime via the option
238 .vb
239     -characteristic_type my_char
240 .ve
241 
242    Notes:
243    CharacteristicRegister() may be called multiple times to add several user-defined solvers.
244 
245 .seealso: CharacteristicRegisterAll(), CharacteristicRegisterDestroy()
246 
247   Level: advanced
248 @*/
249 PetscErrorCode CharacteristicRegister(const char sname[],PetscErrorCode (*function)(Characteristic))
250 {
251   PetscErrorCode ierr;
252 
253   PetscFunctionBegin;
254   ierr = CharacteristicInitializePackage();CHKERRQ(ierr);
255   ierr = PetscFunctionListAdd(&CharacteristicList,sname,function);CHKERRQ(ierr);
256   PetscFunctionReturn(0);
257 }
258 
259 PetscErrorCode CharacteristicSetVelocityInterpolation(Characteristic c, DM da, Vec v, Vec vOld, PetscInt numComponents, PetscInt components[], PetscErrorCode (*interp)(Vec, PetscReal[], PetscInt, PetscInt[], PetscScalar[], void*), void *ctx)
260 {
261   PetscFunctionBegin;
262   c->velocityDA      = da;
263   c->velocity        = v;
264   c->velocityOld     = vOld;
265   c->numVelocityComp = numComponents;
266   c->velocityComp    = components;
267   c->velocityInterp  = interp;
268   c->velocityCtx     = ctx;
269   PetscFunctionReturn(0);
270 }
271 
272 PetscErrorCode CharacteristicSetVelocityInterpolationLocal(Characteristic c, DM da, Vec v, Vec vOld, PetscInt numComponents, PetscInt components[], PetscErrorCode (*interp)(void*, PetscReal [], PetscInt, PetscInt[], PetscScalar[], void*), void *ctx)
273 {
274   PetscFunctionBegin;
275   c->velocityDA          = da;
276   c->velocity            = v;
277   c->velocityOld         = vOld;
278   c->numVelocityComp     = numComponents;
279   c->velocityComp        = components;
280   c->velocityInterpLocal = interp;
281   c->velocityCtx         = ctx;
282   PetscFunctionReturn(0);
283 }
284 
285 PetscErrorCode CharacteristicSetFieldInterpolation(Characteristic c, DM da, Vec v, PetscInt numComponents, PetscInt components[], PetscErrorCode (*interp)(Vec, PetscReal[], PetscInt, PetscInt[], PetscScalar[], void*), void *ctx)
286 {
287   PetscFunctionBegin;
288 #if 0
289   if (numComponents > 2) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_SUP, "Fields with more than 2 components are not supported. Send mail to petsc-maint@mcs.anl.gov.");
290 #endif
291   c->fieldDA      = da;
292   c->field        = v;
293   c->numFieldComp = numComponents;
294   c->fieldComp    = components;
295   c->fieldInterp  = interp;
296   c->fieldCtx     = ctx;
297   PetscFunctionReturn(0);
298 }
299 
300 PetscErrorCode CharacteristicSetFieldInterpolationLocal(Characteristic c, DM da, Vec v, PetscInt numComponents, PetscInt components[], PetscErrorCode (*interp)(void*, PetscReal[], PetscInt, PetscInt[], PetscScalar [], void*), void *ctx)
301 {
302   PetscFunctionBegin;
303 #if 0
304   if (numComponents > 2) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_SUP, "Fields with more than 2 components are not supported. Send mail to petsc-maint@mcs.anl.gov.");
305 #endif
306   c->fieldDA          = da;
307   c->field            = v;
308   c->numFieldComp     = numComponents;
309   c->fieldComp        = components;
310   c->fieldInterpLocal = interp;
311   c->fieldCtx         = ctx;
312   PetscFunctionReturn(0);
313 }
314 
315 PetscErrorCode CharacteristicSolve(Characteristic c, PetscReal dt, Vec solution)
316 {
317   CharacteristicPointDA2D Qi;
318   DM                      da = c->velocityDA;
319   Vec                     velocityLocal, velocityLocalOld;
320   Vec                     fieldLocal;
321   DMDALocalInfo           info;
322   PetscScalar             **solArray;
323   void                    *velocityArray;
324   void                    *velocityArrayOld;
325   void                    *fieldArray;
326   PetscScalar             *interpIndices;
327   PetscScalar             *velocityValues, *velocityValuesOld;
328   PetscScalar             *fieldValues;
329   PetscMPIInt             rank;
330   PetscInt                dim;
331   PetscMPIInt             neighbors[9];
332   PetscInt                dof;
333   PetscInt                gx, gy;
334   PetscInt                n, is, ie, js, je, comp;
335   PetscErrorCode          ierr;
336 
337   PetscFunctionBegin;
338   c->queueSize = 0;
339   ierr = MPI_Comm_rank(PetscObjectComm((PetscObject)c), &rank);CHKERRMPI(ierr);
340   ierr = DMDAGetNeighborsRank(da, neighbors);CHKERRQ(ierr);
341   ierr = CharacteristicSetNeighbors(c, 9, neighbors);CHKERRQ(ierr);
342   ierr = CharacteristicSetUp(c);CHKERRQ(ierr);
343   /* global and local grid info */
344   ierr = DMDAGetInfo(da, &dim, &gx, &gy, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);CHKERRQ(ierr);
345   ierr = DMDAGetLocalInfo(da, &info);CHKERRQ(ierr);
346   is   = info.xs;          ie   = info.xs+info.xm;
347   js   = info.ys;          je   = info.ys+info.ym;
348   /* Allocation */
349   ierr = PetscMalloc1(dim,                &interpIndices);CHKERRQ(ierr);
350   ierr = PetscMalloc1(c->numVelocityComp, &velocityValues);CHKERRQ(ierr);
351   ierr = PetscMalloc1(c->numVelocityComp, &velocityValuesOld);CHKERRQ(ierr);
352   ierr = PetscMalloc1(c->numFieldComp,    &fieldValues);CHKERRQ(ierr);
353   ierr = PetscLogEventBegin(CHARACTERISTIC_Solve,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
354 
355   /* -----------------------------------------------------------------------
356      PART 1, AT t-dt/2
357      -----------------------------------------------------------------------*/
358   ierr = PetscLogEventBegin(CHARACTERISTIC_QueueSetup,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
359   /* GET POSITION AT HALF TIME IN THE PAST */
360   if (c->velocityInterpLocal) {
361     ierr = DMGetLocalVector(c->velocityDA, &velocityLocal);CHKERRQ(ierr);
362     ierr = DMGetLocalVector(c->velocityDA, &velocityLocalOld);CHKERRQ(ierr);
363     ierr = DMGlobalToLocalBegin(c->velocityDA, c->velocity, INSERT_VALUES, velocityLocal);CHKERRQ(ierr);
364     ierr = DMGlobalToLocalEnd(c->velocityDA, c->velocity, INSERT_VALUES, velocityLocal);CHKERRQ(ierr);
365     ierr = DMGlobalToLocalBegin(c->velocityDA, c->velocityOld, INSERT_VALUES, velocityLocalOld);CHKERRQ(ierr);
366     ierr = DMGlobalToLocalEnd(c->velocityDA, c->velocityOld, INSERT_VALUES, velocityLocalOld);CHKERRQ(ierr);
367     ierr = DMDAVecGetArray(c->velocityDA, velocityLocal,    &velocityArray);CHKERRQ(ierr);
368     ierr = DMDAVecGetArray(c->velocityDA, velocityLocalOld, &velocityArrayOld);CHKERRQ(ierr);
369   }
370   ierr = PetscInfo(NULL, "Calculating position at t_{n - 1/2}\n");CHKERRQ(ierr);
371   for (Qi.j = js; Qi.j < je; Qi.j++) {
372     for (Qi.i = is; Qi.i < ie; Qi.i++) {
373       interpIndices[0] = Qi.i;
374       interpIndices[1] = Qi.j;
375       if (c->velocityInterpLocal) {ierr = c->velocityInterpLocal(velocityArray, interpIndices, c->numVelocityComp, c->velocityComp, velocityValues, c->velocityCtx);CHKERRQ(ierr);}
376       else {ierr = c->velocityInterp(c->velocity, interpIndices, c->numVelocityComp, c->velocityComp, velocityValues, c->velocityCtx);CHKERRQ(ierr);}
377       Qi.x = Qi.i - velocityValues[0]*dt/2.0;
378       Qi.y = Qi.j - velocityValues[1]*dt/2.0;
379 
380       /* Determine whether the position at t - dt/2 is local */
381       Qi.proc = DMDAGetNeighborRelative(da, Qi.x, Qi.y);
382 
383       /* Check for Periodic boundaries and move all periodic points back onto the domain */
384       ierr = DMDAMapCoordsToPeriodicDomain(da,&(Qi.x),&(Qi.y));CHKERRQ(ierr);
385       ierr = CharacteristicAddPoint(c, &Qi);CHKERRQ(ierr);
386     }
387   }
388   ierr = PetscLogEventEnd(CHARACTERISTIC_QueueSetup,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
389 
390   ierr = PetscLogEventBegin(CHARACTERISTIC_HalfTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
391   ierr = CharacteristicSendCoordinatesBegin(c);CHKERRQ(ierr);
392   ierr = PetscLogEventEnd(CHARACTERISTIC_HalfTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
393 
394   ierr = PetscLogEventBegin(CHARACTERISTIC_HalfTimeLocal,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
395   /* Calculate velocity at t_n+1/2 (local values) */
396   ierr = PetscInfo(NULL, "Calculating local velocities at t_{n - 1/2}\n");CHKERRQ(ierr);
397   for (n = 0; n < c->queueSize; n++) {
398     Qi = c->queue[n];
399     if (c->neighbors[Qi.proc] == rank) {
400       interpIndices[0] = Qi.x;
401       interpIndices[1] = Qi.y;
402       if (c->velocityInterpLocal) {
403         ierr = c->velocityInterpLocal(velocityArray,    interpIndices, c->numVelocityComp, c->velocityComp, velocityValues, c->velocityCtx);CHKERRQ(ierr);
404         ierr = c->velocityInterpLocal(velocityArrayOld, interpIndices, c->numVelocityComp, c->velocityComp, velocityValuesOld, c->velocityCtx);CHKERRQ(ierr);
405       } else {
406         ierr = c->velocityInterp(c->velocity,    interpIndices, c->numVelocityComp, c->velocityComp, velocityValues, c->velocityCtx);CHKERRQ(ierr);
407         ierr = c->velocityInterp(c->velocityOld, interpIndices, c->numVelocityComp, c->velocityComp, velocityValuesOld, c->velocityCtx);CHKERRQ(ierr);
408       }
409       Qi.x = 0.5*(velocityValues[0] + velocityValuesOld[0]);
410       Qi.y = 0.5*(velocityValues[1] + velocityValuesOld[1]);
411     }
412     c->queue[n] = Qi;
413   }
414   ierr = PetscLogEventEnd(CHARACTERISTIC_HalfTimeLocal,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
415 
416   ierr = PetscLogEventBegin(CHARACTERISTIC_HalfTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
417   ierr = CharacteristicSendCoordinatesEnd(c);CHKERRQ(ierr);
418   ierr = PetscLogEventEnd(CHARACTERISTIC_HalfTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
419 
420   /* Calculate velocity at t_n+1/2 (fill remote requests) */
421   ierr = PetscLogEventBegin(CHARACTERISTIC_HalfTimeRemote,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
422   ierr = PetscInfo1(NULL, "Calculating %d remote velocities at t_{n - 1/2}\n", c->queueRemoteSize);CHKERRQ(ierr);
423   for (n = 0; n < c->queueRemoteSize; n++) {
424     Qi = c->queueRemote[n];
425     interpIndices[0] = Qi.x;
426     interpIndices[1] = Qi.y;
427     if (c->velocityInterpLocal) {
428       ierr = c->velocityInterpLocal(velocityArray,    interpIndices, c->numVelocityComp, c->velocityComp, velocityValues,    c->velocityCtx);CHKERRQ(ierr);
429       ierr = c->velocityInterpLocal(velocityArrayOld, interpIndices, c->numVelocityComp, c->velocityComp, velocityValuesOld, c->velocityCtx);CHKERRQ(ierr);
430     } else {
431       ierr = c->velocityInterp(c->velocity,    interpIndices, c->numVelocityComp, c->velocityComp, velocityValues,    c->velocityCtx);CHKERRQ(ierr);
432       ierr = c->velocityInterp(c->velocityOld, interpIndices, c->numVelocityComp, c->velocityComp, velocityValuesOld, c->velocityCtx);CHKERRQ(ierr);
433     }
434     Qi.x = 0.5*(velocityValues[0] + velocityValuesOld[0]);
435     Qi.y = 0.5*(velocityValues[1] + velocityValuesOld[1]);
436     c->queueRemote[n] = Qi;
437   }
438   ierr = PetscLogEventEnd(CHARACTERISTIC_HalfTimeRemote,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
439   ierr = PetscLogEventBegin(CHARACTERISTIC_HalfTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
440   ierr = CharacteristicGetValuesBegin(c);CHKERRQ(ierr);
441   ierr = CharacteristicGetValuesEnd(c);CHKERRQ(ierr);
442   if (c->velocityInterpLocal) {
443     ierr = DMDAVecRestoreArray(c->velocityDA, velocityLocal,    &velocityArray);CHKERRQ(ierr);
444     ierr = DMDAVecRestoreArray(c->velocityDA, velocityLocalOld, &velocityArrayOld);CHKERRQ(ierr);
445     ierr = DMRestoreLocalVector(c->velocityDA, &velocityLocal);CHKERRQ(ierr);
446     ierr = DMRestoreLocalVector(c->velocityDA, &velocityLocalOld);CHKERRQ(ierr);
447   }
448   ierr = PetscLogEventEnd(CHARACTERISTIC_HalfTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
449 
450   /* -----------------------------------------------------------------------
451      PART 2, AT t-dt
452      -----------------------------------------------------------------------*/
453 
454   /* GET POSITION AT t_n (local values) */
455   ierr = PetscLogEventBegin(CHARACTERISTIC_FullTimeLocal,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
456   ierr = PetscInfo(NULL, "Calculating position at t_{n}\n");CHKERRQ(ierr);
457   for (n = 0; n < c->queueSize; n++) {
458     Qi   = c->queue[n];
459     Qi.x = Qi.i - Qi.x*dt;
460     Qi.y = Qi.j - Qi.y*dt;
461 
462     /* Determine whether the position at t-dt is local */
463     Qi.proc = DMDAGetNeighborRelative(da, Qi.x, Qi.y);
464 
465     /* Check for Periodic boundaries and move all periodic points back onto the domain */
466     ierr = DMDAMapCoordsToPeriodicDomain(da,&(Qi.x),&(Qi.y));CHKERRQ(ierr);
467 
468     c->queue[n] = Qi;
469   }
470   ierr = PetscLogEventEnd(CHARACTERISTIC_FullTimeLocal,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
471 
472   ierr = PetscLogEventBegin(CHARACTERISTIC_FullTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
473   ierr = CharacteristicSendCoordinatesBegin(c);CHKERRQ(ierr);
474   ierr = PetscLogEventEnd(CHARACTERISTIC_FullTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
475 
476   /* GET VALUE AT FULL TIME IN THE PAST (LOCAL REQUESTS) */
477   ierr = PetscLogEventBegin(CHARACTERISTIC_FullTimeLocal,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
478   if (c->fieldInterpLocal) {
479     ierr = DMGetLocalVector(c->fieldDA, &fieldLocal);CHKERRQ(ierr);
480     ierr = DMGlobalToLocalBegin(c->fieldDA, c->field, INSERT_VALUES, fieldLocal);CHKERRQ(ierr);
481     ierr = DMGlobalToLocalEnd(c->fieldDA, c->field, INSERT_VALUES, fieldLocal);CHKERRQ(ierr);
482     ierr = DMDAVecGetArray(c->fieldDA, fieldLocal, &fieldArray);CHKERRQ(ierr);
483   }
484   ierr = PetscInfo(NULL, "Calculating local field at t_{n}\n");CHKERRQ(ierr);
485   for (n = 0; n < c->queueSize; n++) {
486     if (c->neighbors[c->queue[n].proc] == rank) {
487       interpIndices[0] = c->queue[n].x;
488       interpIndices[1] = c->queue[n].y;
489       if (c->fieldInterpLocal) {ierr = c->fieldInterpLocal(fieldArray, interpIndices, c->numFieldComp, c->fieldComp, fieldValues, c->fieldCtx);CHKERRQ(ierr);}
490       else {ierr = c->fieldInterp(c->field, interpIndices, c->numFieldComp, c->fieldComp, fieldValues, c->fieldCtx);CHKERRQ(ierr);}
491       for (comp = 0; comp < c->numFieldComp; comp++) c->queue[n].field[comp] = fieldValues[comp];
492     }
493   }
494   ierr = PetscLogEventEnd(CHARACTERISTIC_FullTimeLocal,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
495 
496   ierr = PetscLogEventBegin(CHARACTERISTIC_FullTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
497   ierr = CharacteristicSendCoordinatesEnd(c);CHKERRQ(ierr);
498   ierr = PetscLogEventEnd(CHARACTERISTIC_FullTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
499 
500   /* GET VALUE AT FULL TIME IN THE PAST (REMOTE REQUESTS) */
501   ierr = PetscLogEventBegin(CHARACTERISTIC_FullTimeRemote,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
502   ierr = PetscInfo1(NULL, "Calculating %d remote field points at t_{n}\n", c->queueRemoteSize);CHKERRQ(ierr);
503   for (n = 0; n < c->queueRemoteSize; n++) {
504     interpIndices[0] = c->queueRemote[n].x;
505     interpIndices[1] = c->queueRemote[n].y;
506 
507     /* for debugging purposes */
508     if (1) { /* hacked bounds test...let's do better */
509       PetscScalar im = interpIndices[0]; PetscScalar jm = interpIndices[1];
510 
511       if ((im < (PetscScalar) is - 1.) || (im > (PetscScalar) ie) || (jm < (PetscScalar)  js - 1.) || (jm > (PetscScalar) je)) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_LIB, "Nonlocal point: (%g,%g)", im, jm);
512     }
513 
514     if (c->fieldInterpLocal) {ierr = c->fieldInterpLocal(fieldArray, interpIndices, c->numFieldComp, c->fieldComp, fieldValues, c->fieldCtx);CHKERRQ(ierr);}
515     else {ierr = c->fieldInterp(c->field, interpIndices, c->numFieldComp, c->fieldComp, fieldValues, c->fieldCtx);CHKERRQ(ierr);}
516     for (comp = 0; comp < c->numFieldComp; comp++) c->queueRemote[n].field[comp] = fieldValues[comp];
517   }
518   ierr = PetscLogEventEnd(CHARACTERISTIC_FullTimeRemote,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
519 
520   ierr = PetscLogEventBegin(CHARACTERISTIC_FullTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
521   ierr = CharacteristicGetValuesBegin(c);CHKERRQ(ierr);
522   ierr = CharacteristicGetValuesEnd(c);CHKERRQ(ierr);
523   if (c->fieldInterpLocal) {
524     ierr = DMDAVecRestoreArray(c->fieldDA, fieldLocal, &fieldArray);CHKERRQ(ierr);
525     ierr = DMRestoreLocalVector(c->fieldDA, &fieldLocal);CHKERRQ(ierr);
526   }
527   ierr = PetscLogEventEnd(CHARACTERISTIC_FullTimeExchange,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
528 
529   /* Return field of characteristics at t_n-1 */
530   ierr = PetscLogEventBegin(CHARACTERISTIC_DAUpdate,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
531   ierr = DMDAGetInfo(c->fieldDA,NULL,NULL,NULL,NULL,NULL,NULL,NULL,&dof,NULL,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
532   ierr = DMDAVecGetArray(c->fieldDA, solution, &solArray);CHKERRQ(ierr);
533   for (n = 0; n < c->queueSize; n++) {
534     Qi = c->queue[n];
535     for (comp = 0; comp < c->numFieldComp; comp++) solArray[Qi.j][Qi.i*dof+c->fieldComp[comp]] = Qi.field[comp];
536   }
537   ierr = DMDAVecRestoreArray(c->fieldDA, solution, &solArray);CHKERRQ(ierr);
538   ierr = PetscLogEventEnd(CHARACTERISTIC_DAUpdate,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
539   ierr = PetscLogEventEnd(CHARACTERISTIC_Solve,NULL,NULL,NULL,NULL);CHKERRQ(ierr);
540 
541   /* Cleanup */
542   ierr = PetscFree(interpIndices);CHKERRQ(ierr);
543   ierr = PetscFree(velocityValues);CHKERRQ(ierr);
544   ierr = PetscFree(velocityValuesOld);CHKERRQ(ierr);
545   ierr = PetscFree(fieldValues);CHKERRQ(ierr);
546   PetscFunctionReturn(0);
547 }
548 
549 PetscErrorCode CharacteristicSetNeighbors(Characteristic c, PetscInt numNeighbors, PetscMPIInt neighbors[])
550 {
551   PetscErrorCode ierr;
552 
553   PetscFunctionBegin;
554   c->numNeighbors = numNeighbors;
555   ierr = PetscFree(c->neighbors);CHKERRQ(ierr);
556   ierr = PetscMalloc1(numNeighbors, &c->neighbors);CHKERRQ(ierr);
557   ierr = PetscArraycpy(c->neighbors, neighbors, numNeighbors);CHKERRQ(ierr);
558   PetscFunctionReturn(0);
559 }
560 
561 PetscErrorCode CharacteristicAddPoint(Characteristic c, CharacteristicPointDA2D *point)
562 {
563   PetscFunctionBegin;
564   if (c->queueSize >= c->queueMax) SETERRQ1(PETSC_COMM_SELF,PETSC_ERR_ARG_OUTOFRANGE, "Exceeded maximum queue size %d", c->queueMax);
565   c->queue[c->queueSize++] = *point;
566   PetscFunctionReturn(0);
567 }
568 
569 int CharacteristicSendCoordinatesBegin(Characteristic c)
570 {
571   PetscMPIInt    rank, tag = 121;
572   PetscInt       i, n;
573   PetscErrorCode ierr;
574 
575   PetscFunctionBegin;
576   ierr = MPI_Comm_rank(PetscObjectComm((PetscObject)c), &rank);CHKERRMPI(ierr);
577   ierr = CharacteristicHeapSort(c, c->queue, c->queueSize);CHKERRQ(ierr);
578   ierr = PetscArrayzero(c->needCount, c->numNeighbors);CHKERRQ(ierr);
579   for (i = 0;  i < c->queueSize; i++) c->needCount[c->queue[i].proc]++;
580   c->fillCount[0] = 0;
581   for (n = 1; n < c->numNeighbors; n++) {
582     ierr = MPI_Irecv(&(c->fillCount[n]), 1, MPIU_INT, c->neighbors[n], tag, PetscObjectComm((PetscObject)c), &(c->request[n-1]));CHKERRMPI(ierr);
583   }
584   for (n = 1; n < c->numNeighbors; n++) {
585     ierr = MPI_Send(&(c->needCount[n]), 1, MPIU_INT, c->neighbors[n], tag, PetscObjectComm((PetscObject)c));CHKERRMPI(ierr);
586   }
587   ierr = MPI_Waitall(c->numNeighbors-1, c->request, c->status);CHKERRMPI(ierr);
588   /* Initialize the remote queue */
589   c->queueLocalMax  = c->localOffsets[0]  = 0;
590   c->queueRemoteMax = c->remoteOffsets[0] = 0;
591   for (n = 1; n < c->numNeighbors; n++) {
592     c->remoteOffsets[n] = c->queueRemoteMax;
593     c->queueRemoteMax  += c->fillCount[n];
594     c->localOffsets[n]  = c->queueLocalMax;
595     c->queueLocalMax   += c->needCount[n];
596   }
597   /* HACK BEGIN */
598   for (n = 1; n < c->numNeighbors; n++) c->localOffsets[n] += c->needCount[0];
599   c->needCount[0] = 0;
600   /* HACK END */
601   if (c->queueRemoteMax) {
602     ierr = PetscMalloc1(c->queueRemoteMax, &c->queueRemote);CHKERRQ(ierr);
603   } else c->queueRemote = NULL;
604   c->queueRemoteSize = c->queueRemoteMax;
605 
606   /* Send and Receive requests for values at t_n+1/2, giving the coordinates for interpolation */
607   for (n = 1; n < c->numNeighbors; n++) {
608     ierr = PetscInfo2(NULL, "Receiving %d requests for values from proc %d\n", c->fillCount[n], c->neighbors[n]);CHKERRQ(ierr);
609     ierr = MPI_Irecv(&(c->queueRemote[c->remoteOffsets[n]]), c->fillCount[n], c->itemType, c->neighbors[n], tag, PetscObjectComm((PetscObject)c), &(c->request[n-1]));CHKERRMPI(ierr);
610   }
611   for (n = 1; n < c->numNeighbors; n++) {
612     ierr = PetscInfo2(NULL, "Sending %d requests for values from proc %d\n", c->needCount[n], c->neighbors[n]);CHKERRQ(ierr);
613     ierr = MPI_Send(&(c->queue[c->localOffsets[n]]), c->needCount[n], c->itemType, c->neighbors[n], tag, PetscObjectComm((PetscObject)c));CHKERRMPI(ierr);
614   }
615   PetscFunctionReturn(0);
616 }
617 
618 PetscErrorCode CharacteristicSendCoordinatesEnd(Characteristic c)
619 {
620 #if 0
621   PetscMPIInt rank;
622   PetscInt    n;
623 #endif
624   PetscErrorCode ierr;
625 
626   PetscFunctionBegin;
627   ierr = MPI_Waitall(c->numNeighbors-1, c->request, c->status);CHKERRMPI(ierr);
628 #if 0
629   ierr = MPI_Comm_rank(PetscObjectComm((PetscObject)c), &rank);CHKERRMPI(ierr);
630   for (n = 0; n < c->queueRemoteSize; n++) {
631     if (c->neighbors[c->queueRemote[n].proc] == rank) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_PLIB, "This is messed up, n = %d proc = %d", n, c->queueRemote[n].proc);
632   }
633 #endif
634   PetscFunctionReturn(0);
635 }
636 
637 PetscErrorCode CharacteristicGetValuesBegin(Characteristic c)
638 {
639   PetscMPIInt    tag = 121;
640   PetscInt       n;
641   PetscErrorCode ierr;
642 
643   PetscFunctionBegin;
644   /* SEND AND RECIEVE FILLED REQUESTS for velocities at t_n+1/2 */
645   for (n = 1; n < c->numNeighbors; n++) {
646     ierr = MPI_Irecv(&(c->queue[c->localOffsets[n]]), c->needCount[n], c->itemType, c->neighbors[n], tag, PetscObjectComm((PetscObject)c), &(c->request[n-1]));CHKERRMPI(ierr);
647   }
648   for (n = 1; n < c->numNeighbors; n++) {
649     ierr = MPI_Send(&(c->queueRemote[c->remoteOffsets[n]]), c->fillCount[n], c->itemType, c->neighbors[n], tag, PetscObjectComm((PetscObject)c));CHKERRMPI(ierr);
650   }
651   PetscFunctionReturn(0);
652 }
653 
654 PetscErrorCode CharacteristicGetValuesEnd(Characteristic c)
655 {
656   PetscErrorCode ierr;
657 
658   PetscFunctionBegin;
659   ierr = MPI_Waitall(c->numNeighbors-1, c->request, c->status);CHKERRMPI(ierr);
660   /* Free queue of requests from other procs */
661   ierr = PetscFree(c->queueRemote);CHKERRQ(ierr);
662   PetscFunctionReturn(0);
663 }
664 
665 /*---------------------------------------------------------------------*/
666 /*
667   Based on code from http://linux.wku.edu/~lamonml/algor/sort/heap.html
668 */
669 PetscErrorCode CharacteristicHeapSort(Characteristic c, Queue queue, PetscInt size)
670 /*---------------------------------------------------------------------*/
671 {
672   PetscErrorCode          ierr;
673   CharacteristicPointDA2D temp;
674   PetscInt                n;
675 
676   PetscFunctionBegin;
677   if (0) { /* Check the order of the queue before sorting */
678     ierr = PetscInfo(NULL, "Before Heap sort\n");CHKERRQ(ierr);
679     for (n=0; n<size; n++) {
680       ierr = PetscInfo2(NULL,"%d %d\n",n,queue[n].proc);CHKERRQ(ierr);
681     }
682   }
683 
684   /* SORTING PHASE */
685   for (n = (size / 2)-1; n >= 0; n--) {
686     ierr = CharacteristicSiftDown(c, queue, n, size-1);CHKERRQ(ierr); /* Rich had size-1 here, Matt had size*/
687   }
688   for (n = size-1; n >= 1; n--) {
689     temp     = queue[0];
690     queue[0] = queue[n];
691     queue[n] = temp;
692     ierr     = CharacteristicSiftDown(c, queue, 0, n-1);CHKERRQ(ierr);
693   }
694   if (0) { /* Check the order of the queue after sorting */
695     ierr = PetscInfo(NULL, "Avter  Heap sort\n");CHKERRQ(ierr);
696     for (n=0; n<size; n++) {
697       ierr = PetscInfo2(NULL,"%d %d\n",n,queue[n].proc);CHKERRQ(ierr);
698     }
699   }
700   PetscFunctionReturn(0);
701 }
702 
703 /*---------------------------------------------------------------------*/
704 /*
705   Based on code from http://linux.wku.edu/~lamonml/algor/sort/heap.html
706 */
707 PetscErrorCode CharacteristicSiftDown(Characteristic c, Queue queue, PetscInt root, PetscInt bottom)
708 /*---------------------------------------------------------------------*/
709 {
710   PetscBool               done = PETSC_FALSE;
711   PetscInt                maxChild;
712   CharacteristicPointDA2D temp;
713 
714   PetscFunctionBegin;
715   while ((root*2 <= bottom) && (!done)) {
716     if (root*2 == bottom) maxChild = root * 2;
717     else if (queue[root*2].proc > queue[root*2+1].proc) maxChild = root * 2;
718     else maxChild = root * 2 + 1;
719 
720     if (queue[root].proc < queue[maxChild].proc) {
721       temp            = queue[root];
722       queue[root]     = queue[maxChild];
723       queue[maxChild] = temp;
724       root            = maxChild;
725     } else done = PETSC_TRUE;
726   }
727   PetscFunctionReturn(0);
728 }
729 
730 /* [center, left, top-left, top, top-right, right, bottom-right, bottom, bottom-left] */
731 PetscErrorCode DMDAGetNeighborsRank(DM da, PetscMPIInt neighbors[])
732 {
733   DMBoundaryType   bx, by;
734   PetscBool        IPeriodic = PETSC_FALSE, JPeriodic = PETSC_FALSE;
735   MPI_Comm         comm;
736   PetscMPIInt      rank;
737   PetscInt         **procs,pi,pj,pim,pip,pjm,pjp,PI,PJ;
738   PetscErrorCode   ierr;
739 
740   PetscFunctionBegin;
741   ierr = PetscObjectGetComm((PetscObject) da, &comm);CHKERRQ(ierr);
742   ierr = MPI_Comm_rank(comm, &rank);CHKERRMPI(ierr);
743   ierr = DMDAGetInfo(da, NULL, NULL, NULL, NULL, &PI,&PJ, NULL, NULL, NULL, &bx, &by,NULL, NULL);CHKERRQ(ierr);
744 
745   if (bx == DM_BOUNDARY_PERIODIC) IPeriodic = PETSC_TRUE;
746   if (by == DM_BOUNDARY_PERIODIC) JPeriodic = PETSC_TRUE;
747 
748   neighbors[0] = rank;
749   rank = 0;
750   ierr = PetscMalloc1(PJ,&procs);CHKERRQ(ierr);
751   for (pj=0; pj<PJ; pj++) {
752     ierr = PetscMalloc1(PI,&(procs[pj]));CHKERRQ(ierr);
753     for (pi=0; pi<PI; pi++) {
754       procs[pj][pi] = rank;
755       rank++;
756     }
757   }
758 
759   pi  = neighbors[0] % PI;
760   pj  = neighbors[0] / PI;
761   pim = pi-1;  if (pim<0) pim=PI-1;
762   pip = (pi+1)%PI;
763   pjm = pj-1;  if (pjm<0) pjm=PJ-1;
764   pjp = (pj+1)%PJ;
765 
766   neighbors[1] = procs[pj] [pim];
767   neighbors[2] = procs[pjp][pim];
768   neighbors[3] = procs[pjp][pi];
769   neighbors[4] = procs[pjp][pip];
770   neighbors[5] = procs[pj] [pip];
771   neighbors[6] = procs[pjm][pip];
772   neighbors[7] = procs[pjm][pi];
773   neighbors[8] = procs[pjm][pim];
774 
775   if (!IPeriodic) {
776     if (pi==0)    neighbors[1]=neighbors[2]=neighbors[8]=neighbors[0];
777     if (pi==PI-1) neighbors[4]=neighbors[5]=neighbors[6]=neighbors[0];
778   }
779 
780   if (!JPeriodic) {
781     if (pj==0)    neighbors[6]=neighbors[7]=neighbors[8]=neighbors[0];
782     if (pj==PJ-1) neighbors[2]=neighbors[3]=neighbors[4]=neighbors[0];
783   }
784 
785   for (pj = 0; pj < PJ; pj++) {
786     ierr = PetscFree(procs[pj]);CHKERRQ(ierr);
787   }
788   ierr = PetscFree(procs);CHKERRQ(ierr);
789   PetscFunctionReturn(0);
790 }
791 
792 /*
793   SUBDOMAIN NEIGHBORHOOD PROCESS MAP:
794     2 | 3 | 4
795     __|___|__
796     1 | 0 | 5
797     __|___|__
798     8 | 7 | 6
799       |   |
800 */
801 PetscInt DMDAGetNeighborRelative(DM da, PetscReal ir, PetscReal jr)
802 {
803   DMDALocalInfo  info;
804   PetscReal      is,ie,js,je;
805   PetscErrorCode ierr;
806 
807   ierr = DMDAGetLocalInfo(da, &info);CHKERRQ(ierr);
808   is   = (PetscReal) info.xs - 0.5; ie = (PetscReal) info.xs + info.xm - 0.5;
809   js   = (PetscReal) info.ys - 0.5; je = (PetscReal) info.ys + info.ym - 0.5;
810 
811   if (ir >= is && ir <= ie) { /* center column */
812     if (jr >= js && jr <= je) return 0;
813     else if (jr < js)         return 7;
814     else                      return 3;
815   } else if (ir < is) {     /* left column */
816     if (jr >= js && jr <= je) return 1;
817     else if (jr < js)         return 8;
818     else                      return 2;
819   } else {                  /* right column */
820     if (jr >= js && jr <= je) return 5;
821     else if (jr < js)         return 6;
822     else                      return 4;
823   }
824 }
825