xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision 84df9cb40eca90ea9b18a456fab7a4ecc7f6c1a4)
1 
2 /***********************************gs.c***************************************
3 
4 Author: Henry M. Tufo III
5 
6 e-mail: hmt@cs.brown.edu
7 
8 snail-mail:
9 Division of Applied Mathematics
10 Brown University
11 Providence, RI 02912
12 
13 Last Modification:
14 6.21.97
15 ************************************gs.c**************************************/
16 
17 /***********************************gs.c***************************************
18 File Description:
19 -----------------
20 
21 ************************************gs.c**************************************/
22 
23 #include <../src/ksp/pc/impls/tfs/tfs.h>
24 
25 /* default length of number of items via tree - doubles if exceeded */
26 #define TREE_BUF_SZ 2048;
27 #define GS_VEC_SZ   1
28 
29 
30 
31 /***********************************gs.c***************************************
32 Type: struct gather_scatter_id
33 ------------------------------
34 
35 ************************************gs.c**************************************/
36 typedef struct gather_scatter_id {
37   PetscInt id;
38   PetscInt nel_min;
39   PetscInt nel_max;
40   PetscInt nel_sum;
41   PetscInt negl;
42   PetscInt gl_max;
43   PetscInt gl_min;
44   PetscInt repeats;
45   PetscInt ordered;
46   PetscInt positive;
47   PetscScalar *vals;
48 
49   /* bit mask info */
50   PetscInt *my_proc_mask;
51   PetscInt mask_sz;
52   PetscInt *ngh_buf;
53   PetscInt ngh_buf_sz;
54   PetscInt *nghs;
55   PetscInt num_nghs;
56   PetscInt max_nghs;
57   PetscInt *pw_nghs;
58   PetscInt num_pw_nghs;
59   PetscInt *tree_nghs;
60   PetscInt num_tree_nghs;
61 
62   PetscInt num_loads;
63 
64   /* repeats == true -> local info */
65   PetscInt nel;         /* number of unique elememts */
66   PetscInt *elms;       /* of size nel */
67   PetscInt nel_total;
68   PetscInt *local_elms; /* of size nel_total */
69   PetscInt *companion;  /* of size nel_total */
70 
71   /* local info */
72   PetscInt num_local_total;
73   PetscInt local_strength;
74   PetscInt num_local;
75   PetscInt *num_local_reduce;
76   PetscInt **local_reduce;
77   PetscInt num_local_gop;
78   PetscInt *num_gop_local_reduce;
79   PetscInt **gop_local_reduce;
80 
81   /* pairwise info */
82   PetscInt level;
83   PetscInt num_pairs;
84   PetscInt max_pairs;
85   PetscInt loc_node_pairs;
86   PetscInt max_node_pairs;
87   PetscInt min_node_pairs;
88   PetscInt avg_node_pairs;
89   PetscInt *pair_list;
90   PetscInt *msg_sizes;
91   PetscInt **node_list;
92   PetscInt len_pw_list;
93   PetscInt *pw_elm_list;
94   PetscScalar *pw_vals;
95 
96   MPI_Request *msg_ids_in;
97   MPI_Request *msg_ids_out;
98 
99   PetscScalar *out;
100   PetscScalar *in;
101   PetscInt msg_total;
102 
103   /* tree - crystal accumulator info */
104   PetscInt max_left_over;
105   PetscInt *pre;
106   PetscInt *in_num;
107   PetscInt *out_num;
108   PetscInt **in_list;
109   PetscInt **out_list;
110 
111   /* new tree work*/
112   PetscInt  tree_nel;
113   PetscInt *tree_elms;
114   PetscScalar *tree_buf;
115   PetscScalar *tree_work;
116 
117   PetscInt  tree_map_sz;
118   PetscInt *tree_map_in;
119   PetscInt *tree_map_out;
120 
121   /* current memory status */
122   PetscInt gl_bss_min;
123   PetscInt gl_perm_min;
124 
125   /* max segment size for gs_gop_vec() */
126   PetscInt vec_sz;
127 
128   /* hack to make paul happy */
129   MPI_Comm gs_comm;
130 
131 } gs_id;
132 
133 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
134 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
135 static PetscErrorCode get_ngh_buf(gs_id *gs);
136 static PetscErrorCode set_pairwise(gs_id *gs);
137 static gs_id * gsi_new(void);
138 static PetscErrorCode set_tree(gs_id *gs);
139 
140 /* same for all but vector flavor */
141 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
142 /* vector flavor */
143 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
144 
145 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
146 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 
151 
152 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
153 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
154 
155 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
156 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
157 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
158 
159 /* global vars */
160 /* from comm.c module */
161 
162 static PetscInt num_gs_ids = 0;
163 
164 /* should make this dynamic ... later */
165 static PetscInt msg_buf=MAX_MSG_BUF;
166 static PetscInt vec_sz=GS_VEC_SZ;
167 static PetscInt *tree_buf=NULL;
168 static PetscInt tree_buf_sz=0;
169 static PetscInt ntree=0;
170 
171 /***************************************************************************/
172 PetscErrorCode gs_init_vec_sz(PetscInt size)
173 {
174   PetscFunctionBegin;
175   vec_sz = size;
176   PetscFunctionReturn(0);
177 }
178 
179 /******************************************************************************/
180 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
181 {
182   PetscFunctionBegin;
183   msg_buf = buf_size;
184   PetscFunctionReturn(0);
185 }
186 
187 /******************************************************************************/
188 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
189 {
190    gs_id *gs;
191   MPI_Group gs_group;
192   MPI_Comm  gs_comm;
193   PetscErrorCode ierr;
194 
195   PetscFunctionBegin;
196   /* ensure that communication package has been initialized */
197   comm_init();
198 
199 
200   /* determines if we have enough dynamic/semi-static memory */
201   /* checks input, allocs and sets gd_id template            */
202   gs = gsi_check_args(elms,nel,level);
203 
204   /* only bit mask version up and working for the moment    */
205   /* LATER :: get int list version working for sparse pblms */
206   ierr = gsi_via_bit_mask(gs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
207 
208 
209   ierr = MPI_Comm_group(MPI_COMM_WORLD,&gs_group);CHKERRABORT(PETSC_COMM_WORLD,ierr);
210   ierr = MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);CHKERRABORT(PETSC_COMM_WORLD,ierr);
211   gs->gs_comm=gs_comm;
212 
213   return(gs);
214 }
215 
216 /******************************************************************************/
217 static gs_id *gsi_new(void)
218 {
219   PetscErrorCode ierr;
220   gs_id *gs;
221   gs = (gs_id *) malloc(sizeof(gs_id));
222   ierr = PetscMemzero(gs,sizeof(gs_id));CHKERRABORT(PETSC_COMM_WORLD,ierr);
223   return(gs);
224 }
225 
226 /******************************************************************************/
227 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
228 {
229    PetscInt i, j, k, t2;
230   PetscInt *companion, *elms, *unique, *iptr;
231   PetscInt num_local=0, *num_to_reduce, **local_reduce;
232   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
233   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
234   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
235   gs_id *gs;
236   PetscErrorCode ierr;
237 
238 
239   if (!in_elms) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");
240   if (nel<0) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");
241 
242   if (nel==0)
243     {ierr = PetscInfo(0,"I don't have any elements!!!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
244 
245   /* get space for gs template */
246   gs = gsi_new();
247   gs->id = ++num_gs_ids;
248 
249   /* hmt 6.4.99                                            */
250   /* caller can set global ids that don't participate to 0 */
251   /* gs_init ignores all zeros in elm list                 */
252   /* negative global ids are still invalid                 */
253   for (i=j=0;i<nel;i++)
254     {if (in_elms[i]!=0) {j++;}}
255 
256   k=nel; nel=j;
257 
258   /* copy over in_elms list and create inverse map */
259   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
260   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
261 
262   for (i=j=0;i<k;i++)
263     {
264       if (in_elms[i]!=0)
265         {elms[j] = in_elms[i]; companion[j++] = i;}
266     }
267 
268   if (j!=nel) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");
269 
270   /* pre-pass ... check to see if sorted */
271   elms[nel] = INT_MAX;
272   iptr = elms;
273   unique = elms+1;
274   j=0;
275   while (*iptr!=INT_MAX)
276     {
277       if (*iptr++>*unique++)
278         {j=1; break;}
279     }
280 
281   /* set up inverse map */
282   if (j)
283     {
284       ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);
285       ierr = SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);CHKERRABORT(PETSC_COMM_WORLD,ierr);
286     }
287   else
288     {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
289   elms[nel] = INT_MIN;
290 
291   /* first pass */
292   /* determine number of unique elements, check pd */
293   for (i=k=0;i<nel;i+=j)
294     {
295       t2 = elms[i];
296       j=++i;
297 
298       /* clump 'em for now */
299       while (elms[j]==t2) {j++;}
300 
301       /* how many together and num local */
302       if (j-=i)
303         {num_local++; k+=j;}
304     }
305 
306   /* how many unique elements? */
307   gs->repeats=k;
308   gs->nel = nel-k;
309 
310 
311   /* number of repeats? */
312   gs->num_local = num_local;
313   num_local+=2;
314   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
315   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
316 
317   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
318   gs->elms = unique;
319   gs->nel_total = nel;
320   gs->local_elms = elms;
321   gs->companion = companion;
322 
323   /* compess map as well as keep track of local ops */
324   for (num_local=i=j=0;i<gs->nel;i++)
325     {
326       k=j;
327       t2 = unique[i] = elms[j];
328       companion[i] = companion[j];
329 
330       while (elms[j]==t2) {j++;}
331 
332       if ((t2=(j-k))>1)
333         {
334           /* number together */
335           num_to_reduce[num_local] = t2++;
336           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
337 
338           /* to use binary searching don't remap until we check intersection */
339           *iptr++ = i;
340 
341           /* note that we're skipping the first one */
342           while (++k<j)
343             {*(iptr++) = companion[k];}
344           *iptr = -1;
345         }
346     }
347 
348   /* sentinel for ngh_buf */
349   unique[gs->nel]=INT_MAX;
350 
351   /* for two partition sort hack */
352   num_to_reduce[num_local] = 0;
353   local_reduce[num_local] = NULL;
354   num_to_reduce[++num_local] = 0;
355   local_reduce[num_local] = NULL;
356 
357   /* load 'em up */
358   /* note one extra to hold NON_UNIFORM flag!!! */
359   vals[2] = vals[1] = vals[0] = nel;
360   if (gs->nel>0)
361     {
362        vals[3] = unique[0];
363        vals[4] = unique[gs->nel-1];
364     }
365   else
366     {
367        vals[3] = INT_MAX;
368        vals[4] = INT_MIN;
369     }
370   vals[5] = level;
371   vals[6] = num_gs_ids;
372 
373   /* GLOBAL: send 'em out */
374   ierr = giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
375 
376   /* must be semi-pos def - only pairwise depends on this */
377   /* LATER - remove this restriction */
378   if (vals[3]<0) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");
379   if (vals[4]==INT_MAX) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");
380 
381   gs->nel_min = vals[0];
382   gs->nel_max = vals[1];
383   gs->nel_sum = vals[2];
384   gs->gl_min  = vals[3];
385   gs->gl_max  = vals[4];
386   gs->negl    = vals[4]-vals[3]+1;
387 
388   if (gs->negl<=0) SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");
389 
390   /* LATER :: add level == -1 -> program selects level */
391   if (vals[5]<0)
392     {vals[5]=0;}
393   else if (vals[5]>num_nodes)
394     {vals[5]=num_nodes;}
395   gs->level = vals[5];
396 
397   return(gs);
398 }
399 
400 /******************************************************************************/
401 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
402 {
403    PetscInt i, nel, *elms;
404   PetscInt t1;
405   PetscInt **reduce;
406   PetscInt *map;
407   PetscErrorCode ierr;
408 
409   PetscFunctionBegin;
410   /* totally local removes ... ct_bits == 0 */
411   get_ngh_buf(gs);
412 
413   if (gs->level)
414     {set_pairwise(gs);}
415 
416   if (gs->max_left_over)
417     {set_tree(gs);}
418 
419   /* intersection local and pairwise/tree? */
420   gs->num_local_total = gs->num_local;
421   gs->gop_local_reduce = gs->local_reduce;
422   gs->num_gop_local_reduce = gs->num_local_reduce;
423 
424   map = gs->companion;
425 
426   /* is there any local compression */
427   if (!gs->num_local) {
428     gs->local_strength = NONE;
429     gs->num_local_gop = 0;
430   } else {
431       /* ok find intersection */
432       map = gs->companion;
433       reduce = gs->local_reduce;
434       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
435         {
436           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
437               ||
438               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
439             {
440               t1++;
441               if (gs->num_local_reduce[i]<=0) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_PLIB,"nobody in list?");
442               gs->num_local_reduce[i] *= -1;
443             }
444            **reduce=map[**reduce];
445         }
446 
447       /* intersection is empty */
448       if (!t1)
449         {
450           gs->local_strength = FULL;
451           gs->num_local_gop = 0;
452         }
453       /* intersection not empty */
454       else
455         {
456           gs->local_strength = PARTIAL;
457           ierr = SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce, gs->num_local + 1, SORT_INT_PTR);CHKERRQ(ierr);
458 
459           gs->num_local_gop = t1;
460           gs->num_local_total =  gs->num_local;
461           gs->num_local    -= t1;
462           gs->gop_local_reduce = gs->local_reduce;
463           gs->num_gop_local_reduce = gs->num_local_reduce;
464 
465           for (i=0; i<t1; i++)
466             {
467               if (gs->num_gop_local_reduce[i]>=0) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_PLIB,"they aren't negative?");
468               gs->num_gop_local_reduce[i] *= -1;
469               gs->local_reduce++;
470               gs->num_local_reduce++;
471             }
472           gs->local_reduce++;
473           gs->num_local_reduce++;
474         }
475     }
476 
477   elms = gs->pw_elm_list;
478   nel  = gs->len_pw_list;
479   for (i=0; i<nel; i++)
480     {elms[i] = map[elms[i]];}
481 
482   elms = gs->tree_map_in;
483   nel  = gs->tree_map_sz;
484   for (i=0; i<nel; i++)
485     {elms[i] = map[elms[i]];}
486 
487   /* clean up */
488   free((void*) gs->local_elms);
489   free((void*) gs->companion);
490   free((void*) gs->elms);
491   free((void*) gs->ngh_buf);
492   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
493   PetscFunctionReturn(0);
494 }
495 
496 /******************************************************************************/
497 static PetscErrorCode place_in_tree( PetscInt elm)
498 {
499    PetscInt *tp, n;
500 
501   PetscFunctionBegin;
502   if (ntree==tree_buf_sz)
503     {
504       if (tree_buf_sz)
505         {
506           tp = tree_buf;
507           n = tree_buf_sz;
508           tree_buf_sz<<=1;
509           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
510           ivec_copy(tree_buf,tp,n);
511           free(tp);
512         }
513       else
514         {
515           tree_buf_sz = TREE_BUF_SZ;
516           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
517         }
518     }
519 
520   tree_buf[ntree++] = elm;
521   PetscFunctionReturn(0);
522 }
523 
524 /******************************************************************************/
525 static PetscErrorCode get_ngh_buf(gs_id *gs)
526 {
527    PetscInt i, j, npw=0, ntree_map=0;
528   PetscInt p_mask_size, ngh_buf_size, buf_size;
529   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
530   PetscInt *ngh_buf, *buf1, *buf2;
531   PetscInt offset, per_load, num_loads, or_ct, start, end;
532   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
533   PetscInt oper=GL_B_OR;
534   PetscInt *ptr3, *t_mask, level, ct1, ct2;
535   PetscErrorCode ierr;
536 
537   PetscFunctionBegin;
538   /* to make life easier */
539   nel   = gs->nel;
540   elms  = gs->elms;
541   level = gs->level;
542 
543   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
544   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
545   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
546 
547   /* allocate space for masks and info bufs */
548   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
549   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
550   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
551   t_mask = (PetscInt*) malloc(p_mask_size);
552   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
553 
554   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
555   /* had thought I could exploit rendezvous threshold */
556 
557   /* default is one pass */
558   per_load = negl  = gs->negl;
559   gs->num_loads = num_loads = 1;
560   i=p_mask_size*negl;
561 
562   /* possible overflow on buffer size */
563   /* overflow hack                    */
564   if (i<0) {i=INT_MAX;}
565 
566   buf_size = PetscMin(msg_buf,i);
567 
568   /* can we do it? */
569   if (p_mask_size>buf_size) SETERRQ2(PETSC_COMM_SELF,PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);
570 
571   /* get giop buf space ... make *only* one malloc */
572   buf1 = (PetscInt*) malloc(buf_size<<1);
573 
574   /* more than one gior exchange needed? */
575   if (buf_size!=i)
576     {
577       per_load = buf_size/p_mask_size;
578       buf_size = per_load*p_mask_size;
579       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
580     }
581 
582 
583   /* convert buf sizes from #bytes to #ints - 32 bit only! */
584   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
585 
586   /* find giop work space */
587   buf2 = buf1+buf_size;
588 
589   /* hold #ints needed for processor masks */
590   gs->mask_sz=p_mask_size;
591 
592   /* init buffers */
593   ierr = ivec_zero(sh_proc_mask,p_mask_size);CHKERRQ(ierr);
594   ierr = ivec_zero(pw_sh_proc_mask,p_mask_size);CHKERRQ(ierr);
595   ierr = ivec_zero(ngh_buf,ngh_buf_size);CHKERRQ(ierr);
596 
597   /* HACK reset tree info */
598   tree_buf=NULL;
599   tree_buf_sz=ntree=0;
600 
601   /* ok do it */
602   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
603     {
604       /* identity for bitwise or is 000...000 */
605       ivec_zero(buf1,buf_size);
606 
607       /* load msg buffer */
608       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
609         {
610           offset = (offset-start)*p_mask_size;
611           ivec_copy(buf1+offset,p_mask,p_mask_size);
612         }
613 
614       /* GLOBAL: pass buffer */
615       ierr = giop(buf1,buf2,buf_size,&oper);CHKERRQ(ierr);
616 
617 
618       /* unload buffer into ngh_buf */
619       ptr2=(elms+i_start);
620       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
621         {
622           /* I own it ... may have to pairwise it */
623           if (j==*ptr2)
624             {
625               /* do i share it w/anyone? */
626               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
627               /* guess not */
628               if (ct1<2)
629                 {ptr2++; ptr1+=p_mask_size; continue;}
630 
631               /* i do ... so keep info and turn off my bit */
632               ivec_copy(ptr1,ptr3,p_mask_size);
633               ierr = ivec_xor(ptr1,p_mask,p_mask_size);CHKERRQ(ierr);
634               ierr = ivec_or(sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
635 
636               /* is it to be done pairwise? */
637               if (--ct1<=level)
638                 {
639                   npw++;
640 
641                   /* turn on high bit to indicate pw need to process */
642                   *ptr2++ |= TOP_BIT;
643                   ierr = ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
644                   ptr1+=p_mask_size;
645                   continue;
646                 }
647 
648               /* get set for next and note that I have a tree contribution */
649               /* could save exact elm index for tree here -> save a search */
650               ptr2++; ptr1+=p_mask_size; ntree_map++;
651             }
652           /* i don't but still might be involved in tree */
653           else
654             {
655 
656               /* shared by how many? */
657               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
658 
659               /* none! */
660               if (ct1<2) continue;
661 
662               /* is it going to be done pairwise? but not by me of course!*/
663               if (--ct1<=level) continue;
664             }
665           /* LATER we're going to have to process it NOW */
666           /* nope ... tree it */
667           ierr = place_in_tree(j);CHKERRQ(ierr);
668         }
669     }
670 
671   free((void*)t_mask);
672   free((void*)buf1);
673 
674   gs->len_pw_list=npw;
675   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
676 
677   /* expand from bit mask list to int list and save ngh list */
678   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
679   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
680 
681   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
682 
683   oper = GL_MAX;
684   ct1 = gs->num_nghs;
685   ierr = giop(&ct1,&ct2,1,&oper);CHKERRQ(ierr);
686   gs->max_nghs = ct1;
687 
688   gs->tree_map_sz  = ntree_map;
689   gs->max_left_over=ntree;
690 
691   free((void*)p_mask);
692   free((void*)sh_proc_mask);
693   PetscFunctionReturn(0);
694 }
695 
696 /******************************************************************************/
697 static PetscErrorCode set_pairwise(gs_id *gs)
698 {
699    PetscInt i, j;
700   PetscInt p_mask_size;
701   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
702   PetscInt *ngh_buf, *buf2;
703   PetscInt offset;
704   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
705   PetscInt *pairwise_elm_list, len_pair_list=0;
706   PetscInt *iptr, t1, i_start, nel, *elms;
707   PetscInt ct;
708   PetscErrorCode ierr;
709 
710   PetscFunctionBegin;
711   /* to make life easier */
712   nel  = gs->nel;
713   elms = gs->elms;
714   ngh_buf = gs->ngh_buf;
715   sh_proc_mask  = gs->pw_nghs;
716 
717   /* need a few temp masks */
718   p_mask_size   = len_bit_mask(num_nodes);
719   p_mask        = (PetscInt*) malloc(p_mask_size);
720   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
721 
722   /* set mask to my my_id's bit mask */
723   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
724 
725   p_mask_size /= sizeof(PetscInt);
726 
727   len_pair_list=gs->len_pw_list;
728   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
729 
730   /* how many processors (nghs) do we have to exchange with? */
731   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
732 
733 
734   /* allocate space for gs_gop() info */
735   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
736   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
737   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
738 
739   /* init msg_size list */
740   ierr = ivec_zero(msg_size,nprs);CHKERRQ(ierr);
741 
742   /* expand from bit mask list to int list */
743   ierr = bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);CHKERRQ(ierr);
744 
745   /* keep list of elements being handled pairwise */
746   for (i=j=0;i<nel;i++)
747     {
748       if (elms[i] & TOP_BIT)
749         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
750     }
751   pairwise_elm_list[j] = -1;
752 
753   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
754   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
755   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
756   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
757   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
758 
759   /* find who goes to each processor */
760   for (i_start=i=0;i<nprs;i++)
761     {
762       /* processor i's mask */
763       ierr = set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);CHKERRQ(ierr);
764 
765       /* det # going to processor i */
766       for (ct=j=0;j<len_pair_list;j++)
767         {
768           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
769           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
770           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
771             {ct++;}
772         }
773       msg_size[i] = ct;
774       i_start = PetscMax(i_start,ct);
775 
776       /*space to hold nodes in message to first neighbor */
777       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
778 
779       for (j=0;j<len_pair_list;j++)
780         {
781           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
782           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
783           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
784             {*iptr++ = j;}
785         }
786       *iptr = -1;
787     }
788   msg_nodes[nprs] = NULL;
789 
790   j=gs->loc_node_pairs=i_start;
791   t1 = GL_MAX;
792   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
793   gs->max_node_pairs = i_start;
794 
795   i_start=j;
796   t1 = GL_MIN;
797   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
798   gs->min_node_pairs = i_start;
799 
800   i_start=j;
801   t1 = GL_ADD;
802   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
803   gs->avg_node_pairs = i_start/num_nodes + 1;
804 
805   i_start=nprs;
806   t1 = GL_MAX;
807   giop(&i_start,&offset,1,&t1);
808   gs->max_pairs = i_start;
809 
810 
811   /* remap pairwise in tail of gsi_via_bit_mask() */
812   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
813   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
814   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
815 
816   /* reset malloc pool */
817   free((void*)p_mask);
818   free((void*)tmp_proc_mask);
819   PetscFunctionReturn(0);
820 }
821 
822 /* to do pruned tree just save ngh buf copy for each one and decode here!
823 ******************************************************************************/
824 static PetscErrorCode set_tree(gs_id *gs)
825 {
826   PetscInt i, j, n, nel;
827   PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
828 
829   PetscFunctionBegin;
830   /* local work ptrs */
831   elms = gs->elms;
832   nel     = gs->nel;
833 
834   /* how many via tree */
835   gs->tree_nel  = n = ntree;
836   gs->tree_elms = tree_elms = iptr_in = tree_buf;
837   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
838   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
839   j=gs->tree_map_sz;
840   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
841   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
842 
843   /* search the longer of the two lists */
844   /* note ... could save this info in get_ngh_buf and save searches */
845   if (n<=nel)
846     {
847       /* bijective fct w/remap - search elm list */
848       for (i=0; i<n; i++)
849         {
850           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
851             {*iptr_in++ = j; *iptr_out++ = i;}
852         }
853     }
854   else
855     {
856       for (i=0; i<nel; i++)
857         {
858           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
859             {*iptr_in++ = i; *iptr_out++ = j;}
860         }
861     }
862 
863   /* sentinel */
864   *iptr_in = *iptr_out = -1;
865   PetscFunctionReturn(0);
866 }
867 
868 /******************************************************************************/
869 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
870 {
871   PetscInt *num, *map, **reduce;
872   PetscScalar tmp;
873 
874   PetscFunctionBegin;
875   num    = gs->num_gop_local_reduce;
876   reduce = gs->gop_local_reduce;
877   while ((map = *reduce++))
878     {
879       /* wall */
880       if (*num == 2)
881         {
882           num ++;
883           vals[map[1]] = vals[map[0]];
884         }
885       /* corner shared by three elements */
886       else if (*num == 3)
887         {
888           num ++;
889           vals[map[2]] = vals[map[1]] = vals[map[0]];
890         }
891       /* corner shared by four elements */
892       else if (*num == 4)
893         {
894           num ++;
895           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
896         }
897       /* general case ... odd geoms ... 3D*/
898       else
899         {
900           num++;
901           tmp = *(vals + *map++);
902           while (*map >= 0)
903             {*(vals + *map++) = tmp;}
904         }
905     }
906   PetscFunctionReturn(0);
907 }
908 
909 /******************************************************************************/
910 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
911 {
912    PetscInt *num, *map, **reduce;
913    PetscScalar tmp;
914 
915   PetscFunctionBegin;
916   num    = gs->num_local_reduce;
917   reduce = gs->local_reduce;
918   while ((map = *reduce))
919     {
920       /* wall */
921       if (*num == 2)
922         {
923           num ++; reduce++;
924           vals[map[1]] = vals[map[0]] += vals[map[1]];
925         }
926       /* corner shared by three elements */
927       else if (*num == 3)
928         {
929           num ++; reduce++;
930           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
931         }
932       /* corner shared by four elements */
933       else if (*num == 4)
934         {
935           num ++; reduce++;
936           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
937                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
938         }
939       /* general case ... odd geoms ... 3D*/
940       else
941         {
942           num ++;
943           tmp = 0.0;
944           while (*map >= 0)
945             {tmp += *(vals + *map++);}
946 
947           map = *reduce++;
948           while (*map >= 0)
949             {*(vals + *map++) = tmp;}
950         }
951     }
952   PetscFunctionReturn(0);
953 }
954 
955 /******************************************************************************/
956 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
957 {
958    PetscInt *num, *map, **reduce;
959    PetscScalar *base;
960 
961   PetscFunctionBegin;
962   num    = gs->num_gop_local_reduce;
963   reduce = gs->gop_local_reduce;
964   while ((map = *reduce++))
965     {
966       /* wall */
967       if (*num == 2)
968         {
969           num ++;
970           vals[map[0]] += vals[map[1]];
971         }
972       /* corner shared by three elements */
973       else if (*num == 3)
974         {
975           num ++;
976           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
977         }
978       /* corner shared by four elements */
979       else if (*num == 4)
980         {
981           num ++;
982           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
983         }
984       /* general case ... odd geoms ... 3D*/
985       else
986         {
987           num++;
988           base = vals + *map++;
989           while (*map >= 0)
990             {*base += *(vals + *map++);}
991         }
992     }
993   PetscFunctionReturn(0);
994 }
995 
996 /******************************************************************************/
997 PetscErrorCode gs_free( gs_id *gs)
998 {
999    PetscInt i;
1000 
1001   PetscFunctionBegin;
1002   if (gs->nghs) {free((void*) gs->nghs);}
1003   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
1004 
1005   /* tree */
1006   if (gs->max_left_over)
1007     {
1008       if (gs->tree_elms) {free((void*) gs->tree_elms);}
1009       if (gs->tree_buf) {free((void*) gs->tree_buf);}
1010       if (gs->tree_work) {free((void*) gs->tree_work);}
1011       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
1012       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
1013     }
1014 
1015   /* pairwise info */
1016   if (gs->num_pairs)
1017     {
1018       /* should be NULL already */
1019       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
1020       if (gs->elms) {free((void*) gs->elms);}
1021       if (gs->local_elms) {free((void*) gs->local_elms);}
1022       if (gs->companion) {free((void*) gs->companion);}
1023 
1024       /* only set if pairwise */
1025       if (gs->vals) {free((void*) gs->vals);}
1026       if (gs->in) {free((void*) gs->in);}
1027       if (gs->out) {free((void*) gs->out);}
1028       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
1029       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
1030       if (gs->pw_vals) {free((void*) gs->pw_vals);}
1031       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
1032       if (gs->node_list)
1033         {
1034           for (i=0;i<gs->num_pairs;i++)
1035             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
1036           free((void*) gs->node_list);
1037         }
1038       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
1039       if (gs->pair_list) {free((void*) gs->pair_list);}
1040     }
1041 
1042   /* local info */
1043   if (gs->num_local_total>=0)
1044     {
1045       for (i=0;i<gs->num_local_total+1;i++)
1046         /*      for (i=0;i<gs->num_local_total;i++) */
1047         {
1048           if (gs->num_gop_local_reduce[i])
1049             {free((void*) gs->gop_local_reduce[i]);}
1050         }
1051     }
1052 
1053   /* if intersection tree/pairwise and local isn't empty */
1054   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
1055   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
1056 
1057   free((void*) gs);
1058   PetscFunctionReturn(0);
1059 }
1060 
1061 /******************************************************************************/
1062 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
1063 {
1064   PetscErrorCode ierr;
1065 
1066   PetscFunctionBegin;
1067   switch (*op) {
1068   case '+':
1069     gs_gop_vec_plus(gs,vals,step);
1070     break;
1071   default:
1072     ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1073     ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");CHKERRQ(ierr);
1074     gs_gop_vec_plus(gs,vals,step);
1075     break;
1076   }
1077   PetscFunctionReturn(0);
1078 }
1079 
1080 /******************************************************************************/
1081 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
1082 {
1083   PetscFunctionBegin;
1084   if (!gs) SETERRQ(PETSC_COMM_SELF,PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");
1085 
1086   /* local only operations!!! */
1087   if (gs->num_local)
1088     {gs_gop_vec_local_plus(gs,vals,step);}
1089 
1090   /* if intersection tree/pairwise and local isn't empty */
1091   if (gs->num_local_gop)
1092     {
1093       gs_gop_vec_local_in_plus(gs,vals,step);
1094 
1095       /* pairwise */
1096       if (gs->num_pairs)
1097         {gs_gop_vec_pairwise_plus(gs,vals,step);}
1098 
1099       /* tree */
1100       else if (gs->max_left_over)
1101         {gs_gop_vec_tree_plus(gs,vals,step);}
1102 
1103       gs_gop_vec_local_out(gs,vals,step);
1104     }
1105   /* if intersection tree/pairwise and local is empty */
1106   else
1107     {
1108       /* pairwise */
1109       if (gs->num_pairs)
1110         {gs_gop_vec_pairwise_plus(gs,vals,step);}
1111 
1112       /* tree */
1113       else if (gs->max_left_over)
1114         {gs_gop_vec_tree_plus(gs,vals,step);}
1115     }
1116   PetscFunctionReturn(0);
1117 }
1118 
1119 /******************************************************************************/
1120 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
1121 {
1122    PetscInt *num, *map, **reduce;
1123    PetscScalar *base;
1124 
1125   PetscFunctionBegin;
1126   num    = gs->num_local_reduce;
1127   reduce = gs->local_reduce;
1128   while ((map = *reduce))
1129     {
1130       base = vals + map[0] * step;
1131 
1132       /* wall */
1133       if (*num == 2)
1134         {
1135           num++; reduce++;
1136           rvec_add (base,vals+map[1]*step,step);
1137           rvec_copy(vals+map[1]*step,base,step);
1138         }
1139       /* corner shared by three elements */
1140       else if (*num == 3)
1141         {
1142           num++; reduce++;
1143           rvec_add (base,vals+map[1]*step,step);
1144           rvec_add (base,vals+map[2]*step,step);
1145           rvec_copy(vals+map[2]*step,base,step);
1146           rvec_copy(vals+map[1]*step,base,step);
1147         }
1148       /* corner shared by four elements */
1149       else if (*num == 4)
1150         {
1151           num++; reduce++;
1152           rvec_add (base,vals+map[1]*step,step);
1153           rvec_add (base,vals+map[2]*step,step);
1154           rvec_add (base,vals+map[3]*step,step);
1155           rvec_copy(vals+map[3]*step,base,step);
1156           rvec_copy(vals+map[2]*step,base,step);
1157           rvec_copy(vals+map[1]*step,base,step);
1158         }
1159       /* general case ... odd geoms ... 3D */
1160       else
1161         {
1162           num++;
1163           while (*++map >= 0)
1164             {rvec_add (base,vals+*map*step,step);}
1165 
1166           map = *reduce;
1167           while (*++map >= 0)
1168             {rvec_copy(vals+*map*step,base,step);}
1169 
1170           reduce++;
1171         }
1172     }
1173   PetscFunctionReturn(0);
1174 }
1175 
1176 /******************************************************************************/
1177 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
1178 {
1179    PetscInt  *num, *map, **reduce;
1180    PetscScalar *base;
1181   PetscFunctionBegin;
1182   num    = gs->num_gop_local_reduce;
1183   reduce = gs->gop_local_reduce;
1184   while ((map = *reduce++))
1185     {
1186       base = vals + map[0] * step;
1187 
1188       /* wall */
1189       if (*num == 2)
1190         {
1191           num ++;
1192           rvec_add(base,vals+map[1]*step,step);
1193         }
1194       /* corner shared by three elements */
1195       else if (*num == 3)
1196         {
1197           num ++;
1198           rvec_add(base,vals+map[1]*step,step);
1199           rvec_add(base,vals+map[2]*step,step);
1200         }
1201       /* corner shared by four elements */
1202       else if (*num == 4)
1203         {
1204           num ++;
1205           rvec_add(base,vals+map[1]*step,step);
1206           rvec_add(base,vals+map[2]*step,step);
1207           rvec_add(base,vals+map[3]*step,step);
1208         }
1209       /* general case ... odd geoms ... 3D*/
1210       else
1211         {
1212           num++;
1213           while (*++map >= 0)
1214             {rvec_add(base,vals+*map*step,step);}
1215         }
1216     }
1217   PetscFunctionReturn(0);
1218 }
1219 
1220 /******************************************************************************/
1221 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
1222 {
1223    PetscInt *num, *map, **reduce;
1224    PetscScalar *base;
1225 
1226   PetscFunctionBegin;
1227   num    = gs->num_gop_local_reduce;
1228   reduce = gs->gop_local_reduce;
1229   while ((map = *reduce++))
1230     {
1231       base = vals + map[0] * step;
1232 
1233       /* wall */
1234       if (*num == 2)
1235         {
1236           num ++;
1237           rvec_copy(vals+map[1]*step,base,step);
1238         }
1239       /* corner shared by three elements */
1240       else if (*num == 3)
1241         {
1242           num ++;
1243           rvec_copy(vals+map[1]*step,base,step);
1244           rvec_copy(vals+map[2]*step,base,step);
1245         }
1246       /* corner shared by four elements */
1247       else if (*num == 4)
1248         {
1249           num ++;
1250           rvec_copy(vals+map[1]*step,base,step);
1251           rvec_copy(vals+map[2]*step,base,step);
1252           rvec_copy(vals+map[3]*step,base,step);
1253         }
1254       /* general case ... odd geoms ... 3D*/
1255       else
1256         {
1257           num++;
1258           while (*++map >= 0)
1259             {rvec_copy(vals+*map*step,base,step);}
1260         }
1261     }
1262   PetscFunctionReturn(0);
1263 }
1264 
1265 /******************************************************************************/
1266 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
1267 {
1268   PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1269   PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1270   PetscInt *pw, *list, *size, **nodes;
1271   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1272   MPI_Status status;
1273   PetscBLASInt i1 = 1,dstep;
1274   PetscErrorCode ierr;
1275 
1276   PetscFunctionBegin;
1277   /* strip and load s */
1278   msg_list =list         = gs->pair_list;
1279   msg_size =size         = gs->msg_sizes;
1280   msg_nodes=nodes        = gs->node_list;
1281   iptr=pw                = gs->pw_elm_list;
1282   dptr1=dptr3            = gs->pw_vals;
1283   msg_ids_in  = ids_in   = gs->msg_ids_in;
1284   msg_ids_out = ids_out  = gs->msg_ids_out;
1285   dptr2                  = gs->out;
1286   in1=in2                = gs->in;
1287 
1288   /* post the receives */
1289   /*  msg_nodes=nodes; */
1290   do
1291     {
1292       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1293          second one *list and do list++ afterwards */
1294       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list, gs->gs_comm, msg_ids_in);CHKERRQ(ierr);
1295       list++;msg_ids_in++;
1296       in1 += *size++ *step;
1297     }
1298   while (*++msg_nodes);
1299   msg_nodes=nodes;
1300 
1301   /* load gs values into in out gs buffers */
1302   while (*iptr >= 0)
1303     {
1304       rvec_copy(dptr3,in_vals + *iptr*step,step);
1305       dptr3+=step;
1306       iptr++;
1307     }
1308 
1309   /* load out buffers and post the sends */
1310   while ((iptr = *msg_nodes++))
1311     {
1312       dptr3 = dptr2;
1313       while (*iptr >= 0)
1314         {
1315           rvec_copy(dptr2,dptr1 + *iptr*step,step);
1316           dptr2+=step;
1317           iptr++;
1318         }
1319       ierr = MPI_Isend(dptr3, *msg_size *step, MPIU_SCALAR, *msg_list, MSGTAG1+my_id, gs->gs_comm, msg_ids_out);CHKERRQ(ierr);
1320       msg_size++; msg_list++;msg_ids_out++;
1321     }
1322 
1323   /* tree */
1324   if (gs->max_left_over)
1325     {gs_gop_vec_tree_plus(gs,in_vals,step);}
1326 
1327   /* process the received data */
1328   msg_nodes=nodes;
1329   while ((iptr = *nodes++)){
1330     PetscScalar d1 = 1.0;
1331       /* Should I check the return value of MPI_Wait() or status? */
1332       /* Can this loop be replaced by a call to MPI_Waitall()? */
1333       ierr = MPI_Wait(ids_in, &status);CHKERRQ(ierr);
1334       ids_in++;
1335       while (*iptr >= 0) {
1336 	dstep = PetscBLASIntCast(step);
1337         BLASaxpy_(&dstep,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
1338 	in2+=step;
1339 	iptr++;
1340       }
1341   }
1342 
1343   /* replace vals */
1344   while (*pw >= 0)
1345     {
1346       rvec_copy(in_vals + *pw*step,dptr1,step);
1347       dptr1+=step;
1348       pw++;
1349     }
1350 
1351   /* clear isend message handles */
1352   /* This changed for clarity though it could be the same */
1353   while (*msg_nodes++)
1354     /* Should I check the return value of MPI_Wait() or status? */
1355     /* Can this loop be replaced by a call to MPI_Waitall()? */
1356     {ierr = MPI_Wait(ids_out, &status);CHKERRQ(ierr);ids_out++;}
1357 
1358 
1359   PetscFunctionReturn(0);
1360 }
1361 
1362 /******************************************************************************/
1363 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
1364 {
1365   PetscInt size, *in, *out;
1366   PetscScalar *buf, *work;
1367   PetscInt op[] = {GL_ADD,0};
1368   PetscBLASInt i1 = 1;
1369 
1370   PetscFunctionBegin;
1371   /* copy over to local variables */
1372   in   = gs->tree_map_in;
1373   out  = gs->tree_map_out;
1374   buf  = gs->tree_buf;
1375   work = gs->tree_work;
1376   size = gs->tree_nel*step;
1377 
1378   /* zero out collection buffer */
1379   rvec_zero(buf,size);
1380 
1381 
1382   /* copy over my contributions */
1383   while (*in >= 0)
1384     {
1385       PetscBLASInt dstep = PetscBLASIntCast(step);
1386       BLAScopy_(&dstep,vals + *in++*step,&i1,buf + *out++*step,&i1);
1387     }
1388 
1389   /* perform fan in/out on full buffer */
1390   /* must change grop to handle the blas */
1391   grop(buf,work,size,op);
1392 
1393   /* reset */
1394   in   = gs->tree_map_in;
1395   out  = gs->tree_map_out;
1396 
1397   /* get the portion of the results I need */
1398   while (*in >= 0)
1399     {
1400       PetscBLASInt dstep = PetscBLASIntCast(step);
1401       BLAScopy_(&dstep,buf + *out++*step,&i1,vals + *in++*step,&i1);
1402     }
1403   PetscFunctionReturn(0);
1404 }
1405 
1406 /******************************************************************************/
1407 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
1408 {
1409   PetscErrorCode ierr;
1410 
1411   PetscFunctionBegin;
1412   switch (*op) {
1413   case '+':
1414     gs_gop_plus_hc(gs,vals,dim);
1415     break;
1416   default:
1417     ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1418     ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");CHKERRQ(ierr);
1419     gs_gop_plus_hc(gs,vals,dim);
1420     break;
1421   }
1422   PetscFunctionReturn(0);
1423 }
1424 
1425 /******************************************************************************/
1426 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
1427 {
1428   PetscFunctionBegin;
1429   /* if there's nothing to do return */
1430   if (dim<=0)
1431     {  PetscFunctionReturn(0);}
1432 
1433   /* can't do more dimensions then exist */
1434   dim = PetscMin(dim,i_log2_num_nodes);
1435 
1436   /* local only operations!!! */
1437   if (gs->num_local)
1438     {gs_gop_local_plus(gs,vals);}
1439 
1440   /* if intersection tree/pairwise and local isn't empty */
1441   if (gs->num_local_gop)
1442     {
1443       gs_gop_local_in_plus(gs,vals);
1444 
1445       /* pairwise will do tree inside ... */
1446       if (gs->num_pairs)
1447         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
1448 
1449       /* tree only */
1450       else if (gs->max_left_over)
1451         {gs_gop_tree_plus_hc(gs,vals,dim);}
1452 
1453       gs_gop_local_out(gs,vals);
1454     }
1455   /* if intersection tree/pairwise and local is empty */
1456   else
1457     {
1458       /* pairwise will do tree inside */
1459       if (gs->num_pairs)
1460         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
1461 
1462       /* tree */
1463       else if (gs->max_left_over)
1464         {gs_gop_tree_plus_hc(gs,vals,dim);}
1465     }
1466   PetscFunctionReturn(0);
1467 }
1468 
1469 /******************************************************************************/
1470 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
1471 {
1472    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1473    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1474    PetscInt *pw, *list, *size, **nodes;
1475   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1476   MPI_Status status;
1477   PetscInt i, mask=1;
1478   PetscErrorCode ierr;
1479 
1480   PetscFunctionBegin;
1481   for (i=1; i<dim; i++)
1482     {mask<<=1; mask++;}
1483 
1484 
1485   /* strip and load s */
1486   msg_list =list         = gs->pair_list;
1487   msg_size =size         = gs->msg_sizes;
1488   msg_nodes=nodes        = gs->node_list;
1489   iptr=pw                = gs->pw_elm_list;
1490   dptr1=dptr3            = gs->pw_vals;
1491   msg_ids_in  = ids_in   = gs->msg_ids_in;
1492   msg_ids_out = ids_out  = gs->msg_ids_out;
1493   dptr2                  = gs->out;
1494   in1=in2                = gs->in;
1495 
1496   /* post the receives */
1497   /*  msg_nodes=nodes; */
1498   do
1499     {
1500       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1501          second one *list and do list++ afterwards */
1502       if ((my_id|mask)==(*list|mask))
1503         {
1504           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list, gs->gs_comm, msg_ids_in);CHKERRQ(ierr);
1505           list++; msg_ids_in++;in1 += *size++;
1506         }
1507       else
1508         {list++; size++;}
1509     }
1510   while (*++msg_nodes);
1511 
1512   /* load gs values into in out gs buffers */
1513   while (*iptr >= 0)
1514     {*dptr3++ = *(in_vals + *iptr++);}
1515 
1516   /* load out buffers and post the sends */
1517   msg_nodes=nodes;
1518   list = msg_list;
1519   while ((iptr = *msg_nodes++))
1520     {
1521       if ((my_id|mask)==(*list|mask))
1522         {
1523           dptr3 = dptr2;
1524           while (*iptr >= 0)
1525             {*dptr2++ = *(dptr1 + *iptr++);}
1526           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1527           /* is msg_ids_out++ correct? */
1528           ierr = MPI_Isend(dptr3, *msg_size, MPIU_SCALAR, *list, MSGTAG1+my_id, gs->gs_comm, msg_ids_out);CHKERRQ(ierr);
1529           msg_size++;list++;msg_ids_out++;
1530         }
1531       else
1532         {list++; msg_size++;}
1533     }
1534 
1535   /* do the tree while we're waiting */
1536   if (gs->max_left_over)
1537     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
1538 
1539   /* process the received data */
1540   msg_nodes=nodes;
1541   list = msg_list;
1542   while ((iptr = *nodes++))
1543     {
1544       if ((my_id|mask)==(*list|mask))
1545         {
1546           /* Should I check the return value of MPI_Wait() or status? */
1547           /* Can this loop be replaced by a call to MPI_Waitall()? */
1548           ierr = MPI_Wait(ids_in, &status);CHKERRQ(ierr);
1549           ids_in++;
1550           while (*iptr >= 0)
1551             {*(dptr1 + *iptr++) += *in2++;}
1552         }
1553       list++;
1554     }
1555 
1556   /* replace vals */
1557   while (*pw >= 0)
1558     {*(in_vals + *pw++) = *dptr1++;}
1559 
1560   /* clear isend message handles */
1561   /* This changed for clarity though it could be the same */
1562   while (*msg_nodes++)
1563     {
1564       if ((my_id|mask)==(*msg_list|mask))
1565         {
1566           /* Should I check the return value of MPI_Wait() or status? */
1567           /* Can this loop be replaced by a call to MPI_Waitall()? */
1568           ierr = MPI_Wait(ids_out, &status);CHKERRQ(ierr);
1569           ids_out++;
1570         }
1571       msg_list++;
1572     }
1573 
1574   PetscFunctionReturn(0);
1575 }
1576 
1577 /******************************************************************************/
1578 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
1579 {
1580   PetscInt size;
1581   PetscInt *in, *out;
1582   PetscScalar *buf, *work;
1583   PetscInt op[] = {GL_ADD,0};
1584 
1585   PetscFunctionBegin;
1586   in   = gs->tree_map_in;
1587   out  = gs->tree_map_out;
1588   buf  = gs->tree_buf;
1589   work = gs->tree_work;
1590   size = gs->tree_nel;
1591 
1592   rvec_zero(buf,size);
1593 
1594   while (*in >= 0)
1595     {*(buf + *out++) = *(vals + *in++);}
1596 
1597   in   = gs->tree_map_in;
1598   out  = gs->tree_map_out;
1599 
1600   grop_hc(buf,work,size,op,dim);
1601 
1602   while (*in >= 0)
1603     {*(vals + *in++) = *(buf + *out++);}
1604   PetscFunctionReturn(0);
1605 }
1606 
1607 
1608 
1609