xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision f3fe499b4cc4d64bf04aa4f5e4963dcc4eb56541)
1 #define PETSCKSP_DLL
2 
3 /***********************************gs.c***************************************
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "../src/ksp/pc/impls/tfs/tfs.h"
25 
26 /* default length of number of items via tree - doubles if exceeded */
27 #define TREE_BUF_SZ 2048;
28 #define GS_VEC_SZ   1
29 
30 
31 
32 /***********************************gs.c***************************************
33 Type: struct gather_scatter_id
34 ------------------------------
35 
36 ************************************gs.c**************************************/
37 typedef struct gather_scatter_id {
38   PetscInt id;
39   PetscInt nel_min;
40   PetscInt nel_max;
41   PetscInt nel_sum;
42   PetscInt negl;
43   PetscInt gl_max;
44   PetscInt gl_min;
45   PetscInt repeats;
46   PetscInt ordered;
47   PetscInt positive;
48   PetscScalar *vals;
49 
50   /* bit mask info */
51   PetscInt *my_proc_mask;
52   PetscInt mask_sz;
53   PetscInt *ngh_buf;
54   PetscInt ngh_buf_sz;
55   PetscInt *nghs;
56   PetscInt num_nghs;
57   PetscInt max_nghs;
58   PetscInt *pw_nghs;
59   PetscInt num_pw_nghs;
60   PetscInt *tree_nghs;
61   PetscInt num_tree_nghs;
62 
63   PetscInt num_loads;
64 
65   /* repeats == true -> local info */
66   PetscInt nel;         /* number of unique elememts */
67   PetscInt *elms;       /* of size nel */
68   PetscInt nel_total;
69   PetscInt *local_elms; /* of size nel_total */
70   PetscInt *companion;  /* of size nel_total */
71 
72   /* local info */
73   PetscInt num_local_total;
74   PetscInt local_strength;
75   PetscInt num_local;
76   PetscInt *num_local_reduce;
77   PetscInt **local_reduce;
78   PetscInt num_local_gop;
79   PetscInt *num_gop_local_reduce;
80   PetscInt **gop_local_reduce;
81 
82   /* pairwise info */
83   PetscInt level;
84   PetscInt num_pairs;
85   PetscInt max_pairs;
86   PetscInt loc_node_pairs;
87   PetscInt max_node_pairs;
88   PetscInt min_node_pairs;
89   PetscInt avg_node_pairs;
90   PetscInt *pair_list;
91   PetscInt *msg_sizes;
92   PetscInt **node_list;
93   PetscInt len_pw_list;
94   PetscInt *pw_elm_list;
95   PetscScalar *pw_vals;
96 
97   MPI_Request *msg_ids_in;
98   MPI_Request *msg_ids_out;
99 
100   PetscScalar *out;
101   PetscScalar *in;
102   PetscInt msg_total;
103 
104   /* tree - crystal accumulator info */
105   PetscInt max_left_over;
106   PetscInt *pre;
107   PetscInt *in_num;
108   PetscInt *out_num;
109   PetscInt **in_list;
110   PetscInt **out_list;
111 
112   /* new tree work*/
113   PetscInt  tree_nel;
114   PetscInt *tree_elms;
115   PetscScalar *tree_buf;
116   PetscScalar *tree_work;
117 
118   PetscInt  tree_map_sz;
119   PetscInt *tree_map_in;
120   PetscInt *tree_map_out;
121 
122   /* current memory status */
123   PetscInt gl_bss_min;
124   PetscInt gl_perm_min;
125 
126   /* max segment size for gs_gop_vec() */
127   PetscInt vec_sz;
128 
129   /* hack to make paul happy */
130   MPI_Comm gs_comm;
131 
132 } gs_id;
133 
134 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
135 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
136 static PetscErrorCode get_ngh_buf(gs_id *gs);
137 static PetscErrorCode set_pairwise(gs_id *gs);
138 static gs_id * gsi_new(void);
139 static PetscErrorCode set_tree(gs_id *gs);
140 
141 /* same for all but vector flavor */
142 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
143 /* vector flavor */
144 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
145 
146 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
151 
152 
153 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
154 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
155 
156 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
157 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
158 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
159 
160 /* global vars */
161 /* from comm.c module */
162 
163 static PetscInt num_gs_ids = 0;
164 
165 /* should make this dynamic ... later */
166 static PetscInt msg_buf=MAX_MSG_BUF;
167 static PetscInt vec_sz=GS_VEC_SZ;
168 static PetscInt *tree_buf=NULL;
169 static PetscInt tree_buf_sz=0;
170 static PetscInt ntree=0;
171 
172 /***************************************************************************/
173 PetscErrorCode gs_init_vec_sz(PetscInt size)
174 {
175   PetscFunctionBegin;
176   vec_sz = size;
177   PetscFunctionReturn(0);
178 }
179 
180 /******************************************************************************/
181 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
182 {
183   PetscFunctionBegin;
184   msg_buf = buf_size;
185   PetscFunctionReturn(0);
186 }
187 
188 /******************************************************************************/
189 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
190 {
191    gs_id *gs;
192   MPI_Group gs_group;
193   MPI_Comm  gs_comm;
194   PetscErrorCode ierr;
195 
196   PetscFunctionBegin;
197   /* ensure that communication package has been initialized */
198   comm_init();
199 
200 
201   /* determines if we have enough dynamic/semi-static memory */
202   /* checks input, allocs and sets gd_id template            */
203   gs = gsi_check_args(elms,nel,level);
204 
205   /* only bit mask version up and working for the moment    */
206   /* LATER :: get int list version working for sparse pblms */
207   ierr = gsi_via_bit_mask(gs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
208 
209 
210   ierr = MPI_Comm_group(MPI_COMM_WORLD,&gs_group);CHKERRABORT(PETSC_COMM_WORLD,ierr);
211   ierr = MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);CHKERRABORT(PETSC_COMM_WORLD,ierr);
212   gs->gs_comm=gs_comm;
213 
214   return(gs);
215 }
216 
217 /******************************************************************************/
218 static gs_id *gsi_new(void)
219 {
220   PetscErrorCode ierr;
221   gs_id *gs;
222   gs = (gs_id *) malloc(sizeof(gs_id));
223   ierr = PetscMemzero(gs,sizeof(gs_id));CHKERRABORT(PETSC_COMM_WORLD,ierr);
224   return(gs);
225 }
226 
227 /******************************************************************************/
228 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
229 {
230    PetscInt i, j, k, t2;
231   PetscInt *companion, *elms, *unique, *iptr;
232   PetscInt num_local=0, *num_to_reduce, **local_reduce;
233   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
234   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
235   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
236   gs_id *gs;
237   PetscErrorCode ierr;
238 
239 
240   if (!in_elms) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");
241   if (nel<0) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");
242 
243   if (nel==0)
244     {ierr = PetscInfo(0,"I don't have any elements!!!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
245 
246   /* get space for gs template */
247   gs = gsi_new();
248   gs->id = ++num_gs_ids;
249 
250   /* hmt 6.4.99                                            */
251   /* caller can set global ids that don't participate to 0 */
252   /* gs_init ignores all zeros in elm list                 */
253   /* negative global ids are still invalid                 */
254   for (i=j=0;i<nel;i++)
255     {if (in_elms[i]!=0) {j++;}}
256 
257   k=nel; nel=j;
258 
259   /* copy over in_elms list and create inverse map */
260   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
261   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
262 
263   for (i=j=0;i<k;i++)
264     {
265       if (in_elms[i]!=0)
266         {elms[j] = in_elms[i]; companion[j++] = i;}
267     }
268 
269   if (j!=nel) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");
270 
271   /* pre-pass ... check to see if sorted */
272   elms[nel] = INT_MAX;
273   iptr = elms;
274   unique = elms+1;
275   j=0;
276   while (*iptr!=INT_MAX)
277     {
278       if (*iptr++>*unique++)
279         {j=1; break;}
280     }
281 
282   /* set up inverse map */
283   if (j)
284     {
285       ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);
286       ierr = SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);CHKERRABORT(PETSC_COMM_WORLD,ierr);
287     }
288   else
289     {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
290   elms[nel] = INT_MIN;
291 
292   /* first pass */
293   /* determine number of unique elements, check pd */
294   for (i=k=0;i<nel;i+=j)
295     {
296       t2 = elms[i];
297       j=++i;
298 
299       /* clump 'em for now */
300       while (elms[j]==t2) {j++;}
301 
302       /* how many together and num local */
303       if (j-=i)
304         {num_local++; k+=j;}
305     }
306 
307   /* how many unique elements? */
308   gs->repeats=k;
309   gs->nel = nel-k;
310 
311 
312   /* number of repeats? */
313   gs->num_local = num_local;
314   num_local+=2;
315   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
316   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
317 
318   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
319   gs->elms = unique;
320   gs->nel_total = nel;
321   gs->local_elms = elms;
322   gs->companion = companion;
323 
324   /* compess map as well as keep track of local ops */
325   for (num_local=i=j=0;i<gs->nel;i++)
326     {
327       k=j;
328       t2 = unique[i] = elms[j];
329       companion[i] = companion[j];
330 
331       while (elms[j]==t2) {j++;}
332 
333       if ((t2=(j-k))>1)
334         {
335           /* number together */
336           num_to_reduce[num_local] = t2++;
337           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
338 
339           /* to use binary searching don't remap until we check intersection */
340           *iptr++ = i;
341 
342           /* note that we're skipping the first one */
343           while (++k<j)
344             {*(iptr++) = companion[k];}
345           *iptr = -1;
346         }
347     }
348 
349   /* sentinel for ngh_buf */
350   unique[gs->nel]=INT_MAX;
351 
352   /* for two partition sort hack */
353   num_to_reduce[num_local] = 0;
354   local_reduce[num_local] = NULL;
355   num_to_reduce[++num_local] = 0;
356   local_reduce[num_local] = NULL;
357 
358   /* load 'em up */
359   /* note one extra to hold NON_UNIFORM flag!!! */
360   vals[2] = vals[1] = vals[0] = nel;
361   if (gs->nel>0)
362     {
363        vals[3] = unique[0];
364        vals[4] = unique[gs->nel-1];
365     }
366   else
367     {
368        vals[3] = INT_MAX;
369        vals[4] = INT_MIN;
370     }
371   vals[5] = level;
372   vals[6] = num_gs_ids;
373 
374   /* GLOBAL: send 'em out */
375   ierr = giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
376 
377   /* must be semi-pos def - only pairwise depends on this */
378   /* LATER - remove this restriction */
379   if (vals[3]<0) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");
380   if (vals[4]==INT_MAX) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");
381 
382   gs->nel_min = vals[0];
383   gs->nel_max = vals[1];
384   gs->nel_sum = vals[2];
385   gs->gl_min  = vals[3];
386   gs->gl_max  = vals[4];
387   gs->negl    = vals[4]-vals[3]+1;
388 
389   if (gs->negl<=0) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");
390 
391   /* LATER :: add level == -1 -> program selects level */
392   if (vals[5]<0)
393     {vals[5]=0;}
394   else if (vals[5]>num_nodes)
395     {vals[5]=num_nodes;}
396   gs->level = vals[5];
397 
398   return(gs);
399 }
400 
401 /******************************************************************************/
402 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
403 {
404    PetscInt i, nel, *elms;
405   PetscInt t1;
406   PetscInt **reduce;
407   PetscInt *map;
408   PetscErrorCode ierr;
409 
410   PetscFunctionBegin;
411   /* totally local removes ... ct_bits == 0 */
412   get_ngh_buf(gs);
413 
414   if (gs->level)
415     {set_pairwise(gs);}
416 
417   if (gs->max_left_over)
418     {set_tree(gs);}
419 
420   /* intersection local and pairwise/tree? */
421   gs->num_local_total = gs->num_local;
422   gs->gop_local_reduce = gs->local_reduce;
423   gs->num_gop_local_reduce = gs->num_local_reduce;
424 
425   map = gs->companion;
426 
427   /* is there any local compression */
428   if (!gs->num_local) {
429     gs->local_strength = NONE;
430     gs->num_local_gop = 0;
431   } else {
432       /* ok find intersection */
433       map = gs->companion;
434       reduce = gs->local_reduce;
435       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
436         {
437           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
438               ||
439               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
440             {
441               t1++;
442               if (gs->num_local_reduce[i]<=0) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_PLIB,"nobody in list?");
443               gs->num_local_reduce[i] *= -1;
444             }
445            **reduce=map[**reduce];
446         }
447 
448       /* intersection is empty */
449       if (!t1)
450         {
451           gs->local_strength = FULL;
452           gs->num_local_gop = 0;
453         }
454       /* intersection not empty */
455       else
456         {
457           gs->local_strength = PARTIAL;
458           ierr = SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce, gs->num_local + 1, SORT_INT_PTR);CHKERRQ(ierr);
459 
460           gs->num_local_gop = t1;
461           gs->num_local_total =  gs->num_local;
462           gs->num_local    -= t1;
463           gs->gop_local_reduce = gs->local_reduce;
464           gs->num_gop_local_reduce = gs->num_local_reduce;
465 
466           for (i=0; i<t1; i++)
467             {
468               if (gs->num_gop_local_reduce[i]>=0) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_PLIB,"they aren't negative?");
469               gs->num_gop_local_reduce[i] *= -1;
470               gs->local_reduce++;
471               gs->num_local_reduce++;
472             }
473           gs->local_reduce++;
474           gs->num_local_reduce++;
475         }
476     }
477 
478   elms = gs->pw_elm_list;
479   nel  = gs->len_pw_list;
480   for (i=0; i<nel; i++)
481     {elms[i] = map[elms[i]];}
482 
483   elms = gs->tree_map_in;
484   nel  = gs->tree_map_sz;
485   for (i=0; i<nel; i++)
486     {elms[i] = map[elms[i]];}
487 
488   /* clean up */
489   free((void*) gs->local_elms);
490   free((void*) gs->companion);
491   free((void*) gs->elms);
492   free((void*) gs->ngh_buf);
493   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
494   PetscFunctionReturn(0);
495 }
496 
497 /******************************************************************************/
498 static PetscErrorCode place_in_tree( PetscInt elm)
499 {
500    PetscInt *tp, n;
501 
502   PetscFunctionBegin;
503   if (ntree==tree_buf_sz)
504     {
505       if (tree_buf_sz)
506         {
507           tp = tree_buf;
508           n = tree_buf_sz;
509           tree_buf_sz<<=1;
510           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
511           ivec_copy(tree_buf,tp,n);
512           free(tp);
513         }
514       else
515         {
516           tree_buf_sz = TREE_BUF_SZ;
517           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
518         }
519     }
520 
521   tree_buf[ntree++] = elm;
522   PetscFunctionReturn(0);
523 }
524 
525 /******************************************************************************/
526 static PetscErrorCode get_ngh_buf(gs_id *gs)
527 {
528    PetscInt i, j, npw=0, ntree_map=0;
529   PetscInt p_mask_size, ngh_buf_size, buf_size;
530   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
531   PetscInt *ngh_buf, *buf1, *buf2;
532   PetscInt offset, per_load, num_loads, or_ct, start, end;
533   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
534   PetscInt oper=GL_B_OR;
535   PetscInt *ptr3, *t_mask, level, ct1, ct2;
536   PetscErrorCode ierr;
537 
538   PetscFunctionBegin;
539   /* to make life easier */
540   nel   = gs->nel;
541   elms  = gs->elms;
542   level = gs->level;
543 
544   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
545   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
546   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
547 
548   /* allocate space for masks and info bufs */
549   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
550   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
551   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
552   t_mask = (PetscInt*) malloc(p_mask_size);
553   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
554 
555   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
556   /* had thought I could exploit rendezvous threshold */
557 
558   /* default is one pass */
559   per_load = negl  = gs->negl;
560   gs->num_loads = num_loads = 1;
561   i=p_mask_size*negl;
562 
563   /* possible overflow on buffer size */
564   /* overflow hack                    */
565   if (i<0) {i=INT_MAX;}
566 
567   buf_size = PetscMin(msg_buf,i);
568 
569   /* can we do it? */
570   if (p_mask_size>buf_size) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);
571 
572   /* get giop buf space ... make *only* one malloc */
573   buf1 = (PetscInt*) malloc(buf_size<<1);
574 
575   /* more than one gior exchange needed? */
576   if (buf_size!=i)
577     {
578       per_load = buf_size/p_mask_size;
579       buf_size = per_load*p_mask_size;
580       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
581     }
582 
583 
584   /* convert buf sizes from #bytes to #ints - 32 bit only! */
585   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
586 
587   /* find giop work space */
588   buf2 = buf1+buf_size;
589 
590   /* hold #ints needed for processor masks */
591   gs->mask_sz=p_mask_size;
592 
593   /* init buffers */
594   ierr = ivec_zero(sh_proc_mask,p_mask_size);CHKERRQ(ierr);
595   ierr = ivec_zero(pw_sh_proc_mask,p_mask_size);CHKERRQ(ierr);
596   ierr = ivec_zero(ngh_buf,ngh_buf_size);CHKERRQ(ierr);
597 
598   /* HACK reset tree info */
599   tree_buf=NULL;
600   tree_buf_sz=ntree=0;
601 
602   /* ok do it */
603   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
604     {
605       /* identity for bitwise or is 000...000 */
606       ivec_zero(buf1,buf_size);
607 
608       /* load msg buffer */
609       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
610         {
611           offset = (offset-start)*p_mask_size;
612           ivec_copy(buf1+offset,p_mask,p_mask_size);
613         }
614 
615       /* GLOBAL: pass buffer */
616       ierr = giop(buf1,buf2,buf_size,&oper);CHKERRQ(ierr);
617 
618 
619       /* unload buffer into ngh_buf */
620       ptr2=(elms+i_start);
621       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
622         {
623           /* I own it ... may have to pairwise it */
624           if (j==*ptr2)
625             {
626               /* do i share it w/anyone? */
627               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
628               /* guess not */
629               if (ct1<2)
630                 {ptr2++; ptr1+=p_mask_size; continue;}
631 
632               /* i do ... so keep info and turn off my bit */
633               ivec_copy(ptr1,ptr3,p_mask_size);
634               ierr = ivec_xor(ptr1,p_mask,p_mask_size);CHKERRQ(ierr);
635               ierr = ivec_or(sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
636 
637               /* is it to be done pairwise? */
638               if (--ct1<=level)
639                 {
640                   npw++;
641 
642                   /* turn on high bit to indicate pw need to process */
643                   *ptr2++ |= TOP_BIT;
644                   ierr = ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
645                   ptr1+=p_mask_size;
646                   continue;
647                 }
648 
649               /* get set for next and note that I have a tree contribution */
650               /* could save exact elm index for tree here -> save a search */
651               ptr2++; ptr1+=p_mask_size; ntree_map++;
652             }
653           /* i don't but still might be involved in tree */
654           else
655             {
656 
657               /* shared by how many? */
658               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
659 
660               /* none! */
661               if (ct1<2) continue;
662 
663               /* is it going to be done pairwise? but not by me of course!*/
664               if (--ct1<=level) continue;
665             }
666           /* LATER we're going to have to process it NOW */
667           /* nope ... tree it */
668           ierr = place_in_tree(j);CHKERRQ(ierr);
669         }
670     }
671 
672   free((void*)t_mask);
673   free((void*)buf1);
674 
675   gs->len_pw_list=npw;
676   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
677 
678   /* expand from bit mask list to int list and save ngh list */
679   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
680   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
681 
682   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
683 
684   oper = GL_MAX;
685   ct1 = gs->num_nghs;
686   ierr = giop(&ct1,&ct2,1,&oper);CHKERRQ(ierr);
687   gs->max_nghs = ct1;
688 
689   gs->tree_map_sz  = ntree_map;
690   gs->max_left_over=ntree;
691 
692   free((void*)p_mask);
693   free((void*)sh_proc_mask);
694   PetscFunctionReturn(0);
695 }
696 
697 /******************************************************************************/
698 static PetscErrorCode set_pairwise(gs_id *gs)
699 {
700    PetscInt i, j;
701   PetscInt p_mask_size;
702   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
703   PetscInt *ngh_buf, *buf2;
704   PetscInt offset;
705   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
706   PetscInt *pairwise_elm_list, len_pair_list=0;
707   PetscInt *iptr, t1, i_start, nel, *elms;
708   PetscInt ct;
709   PetscErrorCode ierr;
710 
711   PetscFunctionBegin;
712   /* to make life easier */
713   nel  = gs->nel;
714   elms = gs->elms;
715   ngh_buf = gs->ngh_buf;
716   sh_proc_mask  = gs->pw_nghs;
717 
718   /* need a few temp masks */
719   p_mask_size   = len_bit_mask(num_nodes);
720   p_mask        = (PetscInt*) malloc(p_mask_size);
721   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
722 
723   /* set mask to my my_id's bit mask */
724   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
725 
726   p_mask_size /= sizeof(PetscInt);
727 
728   len_pair_list=gs->len_pw_list;
729   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
730 
731   /* how many processors (nghs) do we have to exchange with? */
732   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
733 
734 
735   /* allocate space for gs_gop() info */
736   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
737   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
738   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
739 
740   /* init msg_size list */
741   ierr = ivec_zero(msg_size,nprs);CHKERRQ(ierr);
742 
743   /* expand from bit mask list to int list */
744   ierr = bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);CHKERRQ(ierr);
745 
746   /* keep list of elements being handled pairwise */
747   for (i=j=0;i<nel;i++)
748     {
749       if (elms[i] & TOP_BIT)
750         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
751     }
752   pairwise_elm_list[j] = -1;
753 
754   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
755   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
756   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
757   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
758   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
759 
760   /* find who goes to each processor */
761   for (i_start=i=0;i<nprs;i++)
762     {
763       /* processor i's mask */
764       ierr = set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);CHKERRQ(ierr);
765 
766       /* det # going to processor i */
767       for (ct=j=0;j<len_pair_list;j++)
768         {
769           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
770           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
771           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
772             {ct++;}
773         }
774       msg_size[i] = ct;
775       i_start = PetscMax(i_start,ct);
776 
777       /*space to hold nodes in message to first neighbor */
778       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
779 
780       for (j=0;j<len_pair_list;j++)
781         {
782           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
783           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
784           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
785             {*iptr++ = j;}
786         }
787       *iptr = -1;
788     }
789   msg_nodes[nprs] = NULL;
790 
791   j=gs->loc_node_pairs=i_start;
792   t1 = GL_MAX;
793   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
794   gs->max_node_pairs = i_start;
795 
796   i_start=j;
797   t1 = GL_MIN;
798   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
799   gs->min_node_pairs = i_start;
800 
801   i_start=j;
802   t1 = GL_ADD;
803   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
804   gs->avg_node_pairs = i_start/num_nodes + 1;
805 
806   i_start=nprs;
807   t1 = GL_MAX;
808   giop(&i_start,&offset,1,&t1);
809   gs->max_pairs = i_start;
810 
811 
812   /* remap pairwise in tail of gsi_via_bit_mask() */
813   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
814   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
815   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
816 
817   /* reset malloc pool */
818   free((void*)p_mask);
819   free((void*)tmp_proc_mask);
820   PetscFunctionReturn(0);
821 }
822 
823 /* to do pruned tree just save ngh buf copy for each one and decode here!
824 ******************************************************************************/
825 static PetscErrorCode set_tree(gs_id *gs)
826 {
827   PetscInt i, j, n, nel;
828   PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
829 
830   PetscFunctionBegin;
831   /* local work ptrs */
832   elms = gs->elms;
833   nel     = gs->nel;
834 
835   /* how many via tree */
836   gs->tree_nel  = n = ntree;
837   gs->tree_elms = tree_elms = iptr_in = tree_buf;
838   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
839   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
840   j=gs->tree_map_sz;
841   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
842   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
843 
844   /* search the longer of the two lists */
845   /* note ... could save this info in get_ngh_buf and save searches */
846   if (n<=nel)
847     {
848       /* bijective fct w/remap - search elm list */
849       for (i=0; i<n; i++)
850         {
851           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
852             {*iptr_in++ = j; *iptr_out++ = i;}
853         }
854     }
855   else
856     {
857       for (i=0; i<nel; i++)
858         {
859           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
860             {*iptr_in++ = i; *iptr_out++ = j;}
861         }
862     }
863 
864   /* sentinel */
865   *iptr_in = *iptr_out = -1;
866   PetscFunctionReturn(0);
867 }
868 
869 /******************************************************************************/
870 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
871 {
872   PetscInt *num, *map, **reduce;
873   PetscScalar tmp;
874 
875   PetscFunctionBegin;
876   num    = gs->num_gop_local_reduce;
877   reduce = gs->gop_local_reduce;
878   while ((map = *reduce++))
879     {
880       /* wall */
881       if (*num == 2)
882         {
883           num ++;
884           vals[map[1]] = vals[map[0]];
885         }
886       /* corner shared by three elements */
887       else if (*num == 3)
888         {
889           num ++;
890           vals[map[2]] = vals[map[1]] = vals[map[0]];
891         }
892       /* corner shared by four elements */
893       else if (*num == 4)
894         {
895           num ++;
896           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
897         }
898       /* general case ... odd geoms ... 3D*/
899       else
900         {
901           num++;
902           tmp = *(vals + *map++);
903           while (*map >= 0)
904             {*(vals + *map++) = tmp;}
905         }
906     }
907   PetscFunctionReturn(0);
908 }
909 
910 /******************************************************************************/
911 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
912 {
913    PetscInt *num, *map, **reduce;
914    PetscScalar tmp;
915 
916   PetscFunctionBegin;
917   num    = gs->num_local_reduce;
918   reduce = gs->local_reduce;
919   while ((map = *reduce))
920     {
921       /* wall */
922       if (*num == 2)
923         {
924           num ++; reduce++;
925           vals[map[1]] = vals[map[0]] += vals[map[1]];
926         }
927       /* corner shared by three elements */
928       else if (*num == 3)
929         {
930           num ++; reduce++;
931           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
932         }
933       /* corner shared by four elements */
934       else if (*num == 4)
935         {
936           num ++; reduce++;
937           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
938                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
939         }
940       /* general case ... odd geoms ... 3D*/
941       else
942         {
943           num ++;
944           tmp = 0.0;
945           while (*map >= 0)
946             {tmp += *(vals + *map++);}
947 
948           map = *reduce++;
949           while (*map >= 0)
950             {*(vals + *map++) = tmp;}
951         }
952     }
953   PetscFunctionReturn(0);
954 }
955 
956 /******************************************************************************/
957 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
958 {
959    PetscInt *num, *map, **reduce;
960    PetscScalar *base;
961 
962   PetscFunctionBegin;
963   num    = gs->num_gop_local_reduce;
964   reduce = gs->gop_local_reduce;
965   while ((map = *reduce++))
966     {
967       /* wall */
968       if (*num == 2)
969         {
970           num ++;
971           vals[map[0]] += vals[map[1]];
972         }
973       /* corner shared by three elements */
974       else if (*num == 3)
975         {
976           num ++;
977           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
978         }
979       /* corner shared by four elements */
980       else if (*num == 4)
981         {
982           num ++;
983           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
984         }
985       /* general case ... odd geoms ... 3D*/
986       else
987         {
988           num++;
989           base = vals + *map++;
990           while (*map >= 0)
991             {*base += *(vals + *map++);}
992         }
993     }
994   PetscFunctionReturn(0);
995 }
996 
997 /******************************************************************************/
998 PetscErrorCode gs_free( gs_id *gs)
999 {
1000    PetscInt i;
1001 
1002   PetscFunctionBegin;
1003   if (gs->nghs) {free((void*) gs->nghs);}
1004   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
1005 
1006   /* tree */
1007   if (gs->max_left_over)
1008     {
1009       if (gs->tree_elms) {free((void*) gs->tree_elms);}
1010       if (gs->tree_buf) {free((void*) gs->tree_buf);}
1011       if (gs->tree_work) {free((void*) gs->tree_work);}
1012       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
1013       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
1014     }
1015 
1016   /* pairwise info */
1017   if (gs->num_pairs)
1018     {
1019       /* should be NULL already */
1020       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
1021       if (gs->elms) {free((void*) gs->elms);}
1022       if (gs->local_elms) {free((void*) gs->local_elms);}
1023       if (gs->companion) {free((void*) gs->companion);}
1024 
1025       /* only set if pairwise */
1026       if (gs->vals) {free((void*) gs->vals);}
1027       if (gs->in) {free((void*) gs->in);}
1028       if (gs->out) {free((void*) gs->out);}
1029       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
1030       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
1031       if (gs->pw_vals) {free((void*) gs->pw_vals);}
1032       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
1033       if (gs->node_list)
1034         {
1035           for (i=0;i<gs->num_pairs;i++)
1036             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
1037           free((void*) gs->node_list);
1038         }
1039       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
1040       if (gs->pair_list) {free((void*) gs->pair_list);}
1041     }
1042 
1043   /* local info */
1044   if (gs->num_local_total>=0)
1045     {
1046       for (i=0;i<gs->num_local_total+1;i++)
1047         /*      for (i=0;i<gs->num_local_total;i++) */
1048         {
1049           if (gs->num_gop_local_reduce[i])
1050             {free((void*) gs->gop_local_reduce[i]);}
1051         }
1052     }
1053 
1054   /* if intersection tree/pairwise and local isn't empty */
1055   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
1056   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
1057 
1058   free((void*) gs);
1059   PetscFunctionReturn(0);
1060 }
1061 
1062 /******************************************************************************/
1063 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
1064 {
1065   PetscErrorCode ierr;
1066 
1067   PetscFunctionBegin;
1068   switch (*op) {
1069   case '+':
1070     gs_gop_vec_plus(gs,vals,step);
1071     break;
1072   default:
1073     ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1074     ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");CHKERRQ(ierr);
1075     gs_gop_vec_plus(gs,vals,step);
1076     break;
1077   }
1078   PetscFunctionReturn(0);
1079 }
1080 
1081 /******************************************************************************/
1082 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
1083 {
1084   PetscFunctionBegin;
1085   if (!gs) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");
1086 
1087   /* local only operations!!! */
1088   if (gs->num_local)
1089     {gs_gop_vec_local_plus(gs,vals,step);}
1090 
1091   /* if intersection tree/pairwise and local isn't empty */
1092   if (gs->num_local_gop)
1093     {
1094       gs_gop_vec_local_in_plus(gs,vals,step);
1095 
1096       /* pairwise */
1097       if (gs->num_pairs)
1098         {gs_gop_vec_pairwise_plus(gs,vals,step);}
1099 
1100       /* tree */
1101       else if (gs->max_left_over)
1102         {gs_gop_vec_tree_plus(gs,vals,step);}
1103 
1104       gs_gop_vec_local_out(gs,vals,step);
1105     }
1106   /* if intersection tree/pairwise and local is empty */
1107   else
1108     {
1109       /* pairwise */
1110       if (gs->num_pairs)
1111         {gs_gop_vec_pairwise_plus(gs,vals,step);}
1112 
1113       /* tree */
1114       else if (gs->max_left_over)
1115         {gs_gop_vec_tree_plus(gs,vals,step);}
1116     }
1117   PetscFunctionReturn(0);
1118 }
1119 
1120 /******************************************************************************/
1121 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
1122 {
1123    PetscInt *num, *map, **reduce;
1124    PetscScalar *base;
1125 
1126   PetscFunctionBegin;
1127   num    = gs->num_local_reduce;
1128   reduce = gs->local_reduce;
1129   while ((map = *reduce))
1130     {
1131       base = vals + map[0] * step;
1132 
1133       /* wall */
1134       if (*num == 2)
1135         {
1136           num++; reduce++;
1137           rvec_add (base,vals+map[1]*step,step);
1138           rvec_copy(vals+map[1]*step,base,step);
1139         }
1140       /* corner shared by three elements */
1141       else if (*num == 3)
1142         {
1143           num++; reduce++;
1144           rvec_add (base,vals+map[1]*step,step);
1145           rvec_add (base,vals+map[2]*step,step);
1146           rvec_copy(vals+map[2]*step,base,step);
1147           rvec_copy(vals+map[1]*step,base,step);
1148         }
1149       /* corner shared by four elements */
1150       else if (*num == 4)
1151         {
1152           num++; reduce++;
1153           rvec_add (base,vals+map[1]*step,step);
1154           rvec_add (base,vals+map[2]*step,step);
1155           rvec_add (base,vals+map[3]*step,step);
1156           rvec_copy(vals+map[3]*step,base,step);
1157           rvec_copy(vals+map[2]*step,base,step);
1158           rvec_copy(vals+map[1]*step,base,step);
1159         }
1160       /* general case ... odd geoms ... 3D */
1161       else
1162         {
1163           num++;
1164           while (*++map >= 0)
1165             {rvec_add (base,vals+*map*step,step);}
1166 
1167           map = *reduce;
1168           while (*++map >= 0)
1169             {rvec_copy(vals+*map*step,base,step);}
1170 
1171           reduce++;
1172         }
1173     }
1174   PetscFunctionReturn(0);
1175 }
1176 
1177 /******************************************************************************/
1178 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
1179 {
1180    PetscInt  *num, *map, **reduce;
1181    PetscScalar *base;
1182   PetscFunctionBegin;
1183   num    = gs->num_gop_local_reduce;
1184   reduce = gs->gop_local_reduce;
1185   while ((map = *reduce++))
1186     {
1187       base = vals + map[0] * step;
1188 
1189       /* wall */
1190       if (*num == 2)
1191         {
1192           num ++;
1193           rvec_add(base,vals+map[1]*step,step);
1194         }
1195       /* corner shared by three elements */
1196       else if (*num == 3)
1197         {
1198           num ++;
1199           rvec_add(base,vals+map[1]*step,step);
1200           rvec_add(base,vals+map[2]*step,step);
1201         }
1202       /* corner shared by four elements */
1203       else if (*num == 4)
1204         {
1205           num ++;
1206           rvec_add(base,vals+map[1]*step,step);
1207           rvec_add(base,vals+map[2]*step,step);
1208           rvec_add(base,vals+map[3]*step,step);
1209         }
1210       /* general case ... odd geoms ... 3D*/
1211       else
1212         {
1213           num++;
1214           while (*++map >= 0)
1215             {rvec_add(base,vals+*map*step,step);}
1216         }
1217     }
1218   PetscFunctionReturn(0);
1219 }
1220 
1221 /******************************************************************************/
1222 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
1223 {
1224    PetscInt *num, *map, **reduce;
1225    PetscScalar *base;
1226 
1227   PetscFunctionBegin;
1228   num    = gs->num_gop_local_reduce;
1229   reduce = gs->gop_local_reduce;
1230   while ((map = *reduce++))
1231     {
1232       base = vals + map[0] * step;
1233 
1234       /* wall */
1235       if (*num == 2)
1236         {
1237           num ++;
1238           rvec_copy(vals+map[1]*step,base,step);
1239         }
1240       /* corner shared by three elements */
1241       else if (*num == 3)
1242         {
1243           num ++;
1244           rvec_copy(vals+map[1]*step,base,step);
1245           rvec_copy(vals+map[2]*step,base,step);
1246         }
1247       /* corner shared by four elements */
1248       else if (*num == 4)
1249         {
1250           num ++;
1251           rvec_copy(vals+map[1]*step,base,step);
1252           rvec_copy(vals+map[2]*step,base,step);
1253           rvec_copy(vals+map[3]*step,base,step);
1254         }
1255       /* general case ... odd geoms ... 3D*/
1256       else
1257         {
1258           num++;
1259           while (*++map >= 0)
1260             {rvec_copy(vals+*map*step,base,step);}
1261         }
1262     }
1263   PetscFunctionReturn(0);
1264 }
1265 
1266 /******************************************************************************/
1267 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
1268 {
1269   PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1270   PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1271   PetscInt *pw, *list, *size, **nodes;
1272   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1273   MPI_Status status;
1274   PetscBLASInt i1 = 1,dstep;
1275   PetscErrorCode ierr;
1276 
1277   PetscFunctionBegin;
1278   /* strip and load s */
1279   msg_list =list         = gs->pair_list;
1280   msg_size =size         = gs->msg_sizes;
1281   msg_nodes=nodes        = gs->node_list;
1282   iptr=pw                = gs->pw_elm_list;
1283   dptr1=dptr3            = gs->pw_vals;
1284   msg_ids_in  = ids_in   = gs->msg_ids_in;
1285   msg_ids_out = ids_out  = gs->msg_ids_out;
1286   dptr2                  = gs->out;
1287   in1=in2                = gs->in;
1288 
1289   /* post the receives */
1290   /*  msg_nodes=nodes; */
1291   do
1292     {
1293       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1294          second one *list and do list++ afterwards */
1295       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list, gs->gs_comm, msg_ids_in);CHKERRQ(ierr);
1296       list++;msg_ids_in++;
1297       in1 += *size++ *step;
1298     }
1299   while (*++msg_nodes);
1300   msg_nodes=nodes;
1301 
1302   /* load gs values into in out gs buffers */
1303   while (*iptr >= 0)
1304     {
1305       rvec_copy(dptr3,in_vals + *iptr*step,step);
1306       dptr3+=step;
1307       iptr++;
1308     }
1309 
1310   /* load out buffers and post the sends */
1311   while ((iptr = *msg_nodes++))
1312     {
1313       dptr3 = dptr2;
1314       while (*iptr >= 0)
1315         {
1316           rvec_copy(dptr2,dptr1 + *iptr*step,step);
1317           dptr2+=step;
1318           iptr++;
1319         }
1320       ierr = MPI_Isend(dptr3, *msg_size *step, MPIU_SCALAR, *msg_list, MSGTAG1+my_id, gs->gs_comm, msg_ids_out);CHKERRQ(ierr);
1321       msg_size++; msg_list++;msg_ids_out++;
1322     }
1323 
1324   /* tree */
1325   if (gs->max_left_over)
1326     {gs_gop_vec_tree_plus(gs,in_vals,step);}
1327 
1328   /* process the received data */
1329   msg_nodes=nodes;
1330   while ((iptr = *nodes++)){
1331     PetscScalar d1 = 1.0;
1332       /* Should I check the return value of MPI_Wait() or status? */
1333       /* Can this loop be replaced by a call to MPI_Waitall()? */
1334       ierr = MPI_Wait(ids_in, &status);CHKERRQ(ierr);
1335       ids_in++;
1336       while (*iptr >= 0) {
1337 	dstep = PetscBLASIntCast(step);
1338         BLASaxpy_(&dstep,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
1339 	in2+=step;
1340 	iptr++;
1341       }
1342   }
1343 
1344   /* replace vals */
1345   while (*pw >= 0)
1346     {
1347       rvec_copy(in_vals + *pw*step,dptr1,step);
1348       dptr1+=step;
1349       pw++;
1350     }
1351 
1352   /* clear isend message handles */
1353   /* This changed for clarity though it could be the same */
1354   while (*msg_nodes++)
1355     /* Should I check the return value of MPI_Wait() or status? */
1356     /* Can this loop be replaced by a call to MPI_Waitall()? */
1357     {ierr = MPI_Wait(ids_out, &status);CHKERRQ(ierr);ids_out++;}
1358 
1359 
1360   PetscFunctionReturn(0);
1361 }
1362 
1363 /******************************************************************************/
1364 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
1365 {
1366   PetscInt size, *in, *out;
1367   PetscScalar *buf, *work;
1368   PetscInt op[] = {GL_ADD,0};
1369   PetscBLASInt i1 = 1;
1370 
1371   PetscFunctionBegin;
1372   /* copy over to local variables */
1373   in   = gs->tree_map_in;
1374   out  = gs->tree_map_out;
1375   buf  = gs->tree_buf;
1376   work = gs->tree_work;
1377   size = gs->tree_nel*step;
1378 
1379   /* zero out collection buffer */
1380   rvec_zero(buf,size);
1381 
1382 
1383   /* copy over my contributions */
1384   while (*in >= 0)
1385     {
1386       PetscBLASInt dstep = PetscBLASIntCast(step);
1387       BLAScopy_(&dstep,vals + *in++*step,&i1,buf + *out++*step,&i1);
1388     }
1389 
1390   /* perform fan in/out on full buffer */
1391   /* must change grop to handle the blas */
1392   grop(buf,work,size,op);
1393 
1394   /* reset */
1395   in   = gs->tree_map_in;
1396   out  = gs->tree_map_out;
1397 
1398   /* get the portion of the results I need */
1399   while (*in >= 0)
1400     {
1401       PetscBLASInt dstep = PetscBLASIntCast(step);
1402       BLAScopy_(&dstep,buf + *out++*step,&i1,vals + *in++*step,&i1);
1403     }
1404   PetscFunctionReturn(0);
1405 }
1406 
1407 /******************************************************************************/
1408 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
1409 {
1410   PetscErrorCode ierr;
1411 
1412   PetscFunctionBegin;
1413   switch (*op) {
1414   case '+':
1415     gs_gop_plus_hc(gs,vals,dim);
1416     break;
1417   default:
1418     ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1419     ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");CHKERRQ(ierr);
1420     gs_gop_plus_hc(gs,vals,dim);
1421     break;
1422   }
1423   PetscFunctionReturn(0);
1424 }
1425 
1426 /******************************************************************************/
1427 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
1428 {
1429   PetscFunctionBegin;
1430   /* if there's nothing to do return */
1431   if (dim<=0)
1432     {  PetscFunctionReturn(0);}
1433 
1434   /* can't do more dimensions then exist */
1435   dim = PetscMin(dim,i_log2_num_nodes);
1436 
1437   /* local only operations!!! */
1438   if (gs->num_local)
1439     {gs_gop_local_plus(gs,vals);}
1440 
1441   /* if intersection tree/pairwise and local isn't empty */
1442   if (gs->num_local_gop)
1443     {
1444       gs_gop_local_in_plus(gs,vals);
1445 
1446       /* pairwise will do tree inside ... */
1447       if (gs->num_pairs)
1448         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
1449 
1450       /* tree only */
1451       else if (gs->max_left_over)
1452         {gs_gop_tree_plus_hc(gs,vals,dim);}
1453 
1454       gs_gop_local_out(gs,vals);
1455     }
1456   /* if intersection tree/pairwise and local is empty */
1457   else
1458     {
1459       /* pairwise will do tree inside */
1460       if (gs->num_pairs)
1461         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
1462 
1463       /* tree */
1464       else if (gs->max_left_over)
1465         {gs_gop_tree_plus_hc(gs,vals,dim);}
1466     }
1467   PetscFunctionReturn(0);
1468 }
1469 
1470 /******************************************************************************/
1471 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
1472 {
1473    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1474    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1475    PetscInt *pw, *list, *size, **nodes;
1476   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1477   MPI_Status status;
1478   PetscInt i, mask=1;
1479   PetscErrorCode ierr;
1480 
1481   PetscFunctionBegin;
1482   for (i=1; i<dim; i++)
1483     {mask<<=1; mask++;}
1484 
1485 
1486   /* strip and load s */
1487   msg_list =list         = gs->pair_list;
1488   msg_size =size         = gs->msg_sizes;
1489   msg_nodes=nodes        = gs->node_list;
1490   iptr=pw                = gs->pw_elm_list;
1491   dptr1=dptr3            = gs->pw_vals;
1492   msg_ids_in  = ids_in   = gs->msg_ids_in;
1493   msg_ids_out = ids_out  = gs->msg_ids_out;
1494   dptr2                  = gs->out;
1495   in1=in2                = gs->in;
1496 
1497   /* post the receives */
1498   /*  msg_nodes=nodes; */
1499   do
1500     {
1501       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1502          second one *list and do list++ afterwards */
1503       if ((my_id|mask)==(*list|mask))
1504         {
1505           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list, gs->gs_comm, msg_ids_in);CHKERRQ(ierr);
1506           list++; msg_ids_in++;in1 += *size++;
1507         }
1508       else
1509         {list++; size++;}
1510     }
1511   while (*++msg_nodes);
1512 
1513   /* load gs values into in out gs buffers */
1514   while (*iptr >= 0)
1515     {*dptr3++ = *(in_vals + *iptr++);}
1516 
1517   /* load out buffers and post the sends */
1518   msg_nodes=nodes;
1519   list = msg_list;
1520   while ((iptr = *msg_nodes++))
1521     {
1522       if ((my_id|mask)==(*list|mask))
1523         {
1524           dptr3 = dptr2;
1525           while (*iptr >= 0)
1526             {*dptr2++ = *(dptr1 + *iptr++);}
1527           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1528           /* is msg_ids_out++ correct? */
1529           ierr = MPI_Isend(dptr3, *msg_size, MPIU_SCALAR, *list, MSGTAG1+my_id, gs->gs_comm, msg_ids_out);CHKERRQ(ierr);
1530           msg_size++;list++;msg_ids_out++;
1531         }
1532       else
1533         {list++; msg_size++;}
1534     }
1535 
1536   /* do the tree while we're waiting */
1537   if (gs->max_left_over)
1538     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
1539 
1540   /* process the received data */
1541   msg_nodes=nodes;
1542   list = msg_list;
1543   while ((iptr = *nodes++))
1544     {
1545       if ((my_id|mask)==(*list|mask))
1546         {
1547           /* Should I check the return value of MPI_Wait() or status? */
1548           /* Can this loop be replaced by a call to MPI_Waitall()? */
1549           ierr = MPI_Wait(ids_in, &status);CHKERRQ(ierr);
1550           ids_in++;
1551           while (*iptr >= 0)
1552             {*(dptr1 + *iptr++) += *in2++;}
1553         }
1554       list++;
1555     }
1556 
1557   /* replace vals */
1558   while (*pw >= 0)
1559     {*(in_vals + *pw++) = *dptr1++;}
1560 
1561   /* clear isend message handles */
1562   /* This changed for clarity though it could be the same */
1563   while (*msg_nodes++)
1564     {
1565       if ((my_id|mask)==(*msg_list|mask))
1566         {
1567           /* Should I check the return value of MPI_Wait() or status? */
1568           /* Can this loop be replaced by a call to MPI_Waitall()? */
1569           ierr = MPI_Wait(ids_out, &status);CHKERRQ(ierr);
1570           ids_out++;
1571         }
1572       msg_list++;
1573     }
1574 
1575   PetscFunctionReturn(0);
1576 }
1577 
1578 /******************************************************************************/
1579 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
1580 {
1581   PetscInt size;
1582   PetscInt *in, *out;
1583   PetscScalar *buf, *work;
1584   PetscInt op[] = {GL_ADD,0};
1585 
1586   PetscFunctionBegin;
1587   in   = gs->tree_map_in;
1588   out  = gs->tree_map_out;
1589   buf  = gs->tree_buf;
1590   work = gs->tree_work;
1591   size = gs->tree_nel;
1592 
1593   rvec_zero(buf,size);
1594 
1595   while (*in >= 0)
1596     {*(buf + *out++) = *(vals + *in++);}
1597 
1598   in   = gs->tree_map_in;
1599   out  = gs->tree_map_out;
1600 
1601   grop_hc(buf,work,size,op,dim);
1602 
1603   while (*in >= 0)
1604     {*(vals + *in++) = *(buf + *out++);}
1605   PetscFunctionReturn(0);
1606 }
1607 
1608 
1609 
1610