xref: /petsc/src/ksp/pc/impls/tfs/gs.c (revision ce0a2cd1da0658c2b28aad1be2e2c8e41567bece)
1 #define PETSCKSP_DLL
2 
3 /***********************************gs.c***************************************
4 
5 Author: Henry M. Tufo III
6 
7 e-mail: hmt@cs.brown.edu
8 
9 snail-mail:
10 Division of Applied Mathematics
11 Brown University
12 Providence, RI 02912
13 
14 Last Modification:
15 6.21.97
16 ************************************gs.c**************************************/
17 
18 /***********************************gs.c***************************************
19 File Description:
20 -----------------
21 
22 ************************************gs.c**************************************/
23 
24 #include "src/ksp/pc/impls/tfs/tfs.h"
25 
26 /* default length of number of items via tree - doubles if exceeded */
27 #define TREE_BUF_SZ 2048;
28 #define GS_VEC_SZ   1
29 
30 
31 
32 /***********************************gs.c***************************************
33 Type: struct gather_scatter_id
34 ------------------------------
35 
36 ************************************gs.c**************************************/
37 typedef struct gather_scatter_id {
38   PetscInt id;
39   PetscInt nel_min;
40   PetscInt nel_max;
41   PetscInt nel_sum;
42   PetscInt negl;
43   PetscInt gl_max;
44   PetscInt gl_min;
45   PetscInt repeats;
46   PetscInt ordered;
47   PetscInt positive;
48   PetscScalar *vals;
49 
50   /* bit mask info */
51   PetscInt *my_proc_mask;
52   PetscInt mask_sz;
53   PetscInt *ngh_buf;
54   PetscInt ngh_buf_sz;
55   PetscInt *nghs;
56   PetscInt num_nghs;
57   PetscInt max_nghs;
58   PetscInt *pw_nghs;
59   PetscInt num_pw_nghs;
60   PetscInt *tree_nghs;
61   PetscInt num_tree_nghs;
62 
63   PetscInt num_loads;
64 
65   /* repeats == true -> local info */
66   PetscInt nel;         /* number of unique elememts */
67   PetscInt *elms;       /* of size nel */
68   PetscInt nel_total;
69   PetscInt *local_elms; /* of size nel_total */
70   PetscInt *companion;  /* of size nel_total */
71 
72   /* local info */
73   PetscInt num_local_total;
74   PetscInt local_strength;
75   PetscInt num_local;
76   PetscInt *num_local_reduce;
77   PetscInt **local_reduce;
78   PetscInt num_local_gop;
79   PetscInt *num_gop_local_reduce;
80   PetscInt **gop_local_reduce;
81 
82   /* pairwise info */
83   PetscInt level;
84   PetscInt num_pairs;
85   PetscInt max_pairs;
86   PetscInt loc_node_pairs;
87   PetscInt max_node_pairs;
88   PetscInt min_node_pairs;
89   PetscInt avg_node_pairs;
90   PetscInt *pair_list;
91   PetscInt *msg_sizes;
92   PetscInt **node_list;
93   PetscInt len_pw_list;
94   PetscInt *pw_elm_list;
95   PetscScalar *pw_vals;
96 
97   MPI_Request *msg_ids_in;
98   MPI_Request *msg_ids_out;
99 
100   PetscScalar *out;
101   PetscScalar *in;
102   PetscInt msg_total;
103 
104   /* tree - crystal accumulator info */
105   PetscInt max_left_over;
106   PetscInt *pre;
107   PetscInt *in_num;
108   PetscInt *out_num;
109   PetscInt **in_list;
110   PetscInt **out_list;
111 
112   /* new tree work*/
113   PetscInt  tree_nel;
114   PetscInt *tree_elms;
115   PetscScalar *tree_buf;
116   PetscScalar *tree_work;
117 
118   PetscInt  tree_map_sz;
119   PetscInt *tree_map_in;
120   PetscInt *tree_map_out;
121 
122   /* current memory status */
123   PetscInt gl_bss_min;
124   PetscInt gl_perm_min;
125 
126   /* max segment size for gs_gop_vec() */
127   PetscInt vec_sz;
128 
129   /* hack to make paul happy */
130   MPI_Comm gs_comm;
131 
132 } gs_id;
133 
134 static gs_id *gsi_check_args(PetscInt *elms, PetscInt nel, PetscInt level);
135 static PetscErrorCode gsi_via_bit_mask(gs_id *gs);
136 static PetscErrorCode get_ngh_buf(gs_id *gs);
137 static PetscErrorCode set_pairwise(gs_id *gs);
138 static gs_id * gsi_new(void);
139 static PetscErrorCode set_tree(gs_id *gs);
140 
141 /* same for all but vector flavor */
142 static PetscErrorCode gs_gop_local_out(gs_id *gs, PetscScalar *vals);
143 /* vector flavor */
144 static PetscErrorCode gs_gop_vec_local_out(gs_id *gs, PetscScalar *vals, PetscInt step);
145 
146 static PetscErrorCode gs_gop_vec_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
147 static PetscErrorCode gs_gop_vec_pairwise_plus(gs_id *gs, PetscScalar *in_vals, PetscInt step);
148 static PetscErrorCode gs_gop_vec_local_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
149 static PetscErrorCode gs_gop_vec_local_in_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
150 static PetscErrorCode gs_gop_vec_tree_plus(gs_id *gs, PetscScalar *vals, PetscInt step);
151 
152 
153 static PetscErrorCode gs_gop_local_plus(gs_id *gs, PetscScalar *vals);
154 static PetscErrorCode gs_gop_local_in_plus(gs_id *gs, PetscScalar *vals);
155 
156 static PetscErrorCode gs_gop_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
157 static PetscErrorCode gs_gop_pairwise_plus_hc(gs_id *gs, PetscScalar *in_vals, PetscInt dim);
158 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim);
159 
160 /* global vars */
161 /* from comm.c module */
162 
163 static PetscInt num_gs_ids = 0;
164 
165 /* should make this dynamic ... later */
166 static PetscInt msg_buf=MAX_MSG_BUF;
167 static PetscInt vec_sz=GS_VEC_SZ;
168 static PetscInt *tree_buf=NULL;
169 static PetscInt tree_buf_sz=0;
170 static PetscInt ntree=0;
171 
172 /***************************************************************************/
173 PetscErrorCode gs_init_vec_sz(PetscInt size)
174 {
175   PetscFunctionBegin;
176   vec_sz = size;
177   PetscFunctionReturn(0);
178 }
179 
180 /******************************************************************************/
181 PetscErrorCode gs_init_msg_buf_sz(PetscInt buf_size)
182 {
183   PetscFunctionBegin;
184   msg_buf = buf_size;
185   PetscFunctionReturn(0);
186 }
187 
188 /******************************************************************************/
189 gs_id *gs_init( PetscInt *elms, PetscInt nel, PetscInt level)
190 {
191    gs_id *gs;
192   MPI_Group gs_group;
193   MPI_Comm  gs_comm;
194   PetscErrorCode ierr;
195 
196   PetscFunctionBegin;
197   /* ensure that communication package has been initialized */
198   comm_init();
199 
200 
201   /* determines if we have enough dynamic/semi-static memory */
202   /* checks input, allocs and sets gd_id template            */
203   gs = gsi_check_args(elms,nel,level);
204 
205   /* only bit mask version up and working for the moment    */
206   /* LATER :: get int list version working for sparse pblms */
207   ierr = gsi_via_bit_mask(gs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
208 
209 
210   ierr = MPI_Comm_group(MPI_COMM_WORLD,&gs_group);CHKERRABORT(PETSC_COMM_WORLD,ierr);
211   ierr = MPI_Comm_create(MPI_COMM_WORLD,gs_group,&gs_comm);CHKERRABORT(PETSC_COMM_WORLD,ierr);
212   gs->gs_comm=gs_comm;
213 
214   return(gs);
215 }
216 
217 /******************************************************************************/
218 static gs_id *gsi_new(void)
219 {
220   PetscErrorCode ierr;
221   gs_id *gs;
222   gs = (gs_id *) malloc(sizeof(gs_id));
223   ierr = PetscMemzero(gs,sizeof(gs_id));CHKERRABORT(PETSC_COMM_WORLD,ierr);
224   return(gs);
225 }
226 
227 /******************************************************************************/
228 static gs_id * gsi_check_args(PetscInt *in_elms, PetscInt nel, PetscInt level)
229 {
230    PetscInt i, j, k, t2;
231   PetscInt *companion, *elms, *unique, *iptr;
232   PetscInt num_local=0, *num_to_reduce, **local_reduce;
233   PetscInt oprs[] = {NON_UNIFORM,GL_MIN,GL_MAX,GL_ADD,GL_MIN,GL_MAX,GL_MIN,GL_B_AND};
234   PetscInt vals[sizeof(oprs)/sizeof(oprs[0])-1];
235   PetscInt work[sizeof(oprs)/sizeof(oprs[0])-1];
236   gs_id *gs;
237   PetscErrorCode ierr;
238 
239 
240   if (!in_elms)
241     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"elms point to nothing!!!\n");}
242 
243   if (nel<0)
244     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"can't have fewer than 0 elms!!!\n");}
245 
246   if (nel==0)
247     {ierr = PetscInfo(0,"I don't have any elements!!!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
248 
249   /* get space for gs template */
250   gs = gsi_new();
251   gs->id = ++num_gs_ids;
252 
253   /* hmt 6.4.99                                            */
254   /* caller can set global ids that don't participate to 0 */
255   /* gs_init ignores all zeros in elm list                 */
256   /* negative global ids are still invalid                 */
257   for (i=j=0;i<nel;i++)
258     {if (in_elms[i]!=0) {j++;}}
259 
260   k=nel; nel=j;
261 
262   /* copy over in_elms list and create inverse map */
263   elms = (PetscInt*) malloc((nel+1)*sizeof(PetscInt));
264   companion = (PetscInt*) malloc(nel*sizeof(PetscInt));
265 
266   for (i=j=0;i<k;i++)
267     {
268       if (in_elms[i]!=0)
269         {elms[j] = in_elms[i]; companion[j++] = i;}
270     }
271 
272   if (j!=nel)
273     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"nel j mismatch!\n");}
274 
275   /* pre-pass ... check to see if sorted */
276   elms[nel] = INT_MAX;
277   iptr = elms;
278   unique = elms+1;
279   j=0;
280   while (*iptr!=INT_MAX)
281     {
282       if (*iptr++>*unique++)
283         {j=1; break;}
284     }
285 
286   /* set up inverse map */
287   if (j)
288     {
289       ierr = PetscInfo(0,"gsi_check_args() :: elm list *not* sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);
290       ierr = SMI_sort((void*)elms, (void*)companion, nel, SORT_INTEGER);CHKERRABORT(PETSC_COMM_WORLD,ierr);
291     }
292   else
293     {ierr = PetscInfo(0,"gsi_check_args() :: elm list sorted!\n");CHKERRABORT(PETSC_COMM_WORLD,ierr);}
294   elms[nel] = INT_MIN;
295 
296   /* first pass */
297   /* determine number of unique elements, check pd */
298   for (i=k=0;i<nel;i+=j)
299     {
300       t2 = elms[i];
301       j=++i;
302 
303       /* clump 'em for now */
304       while (elms[j]==t2) {j++;}
305 
306       /* how many together and num local */
307       if (j-=i)
308         {num_local++; k+=j;}
309     }
310 
311   /* how many unique elements? */
312   gs->repeats=k;
313   gs->nel = nel-k;
314 
315 
316   /* number of repeats? */
317   gs->num_local = num_local;
318   num_local+=2;
319   gs->local_reduce=local_reduce=(PetscInt **)malloc(num_local*sizeof(PetscInt*));
320   gs->num_local_reduce=num_to_reduce=(PetscInt*) malloc(num_local*sizeof(PetscInt));
321 
322   unique = (PetscInt*) malloc((gs->nel+1)*sizeof(PetscInt));
323   gs->elms = unique;
324   gs->nel_total = nel;
325   gs->local_elms = elms;
326   gs->companion = companion;
327 
328   /* compess map as well as keep track of local ops */
329   for (num_local=i=j=0;i<gs->nel;i++)
330     {
331       k=j;
332       t2 = unique[i] = elms[j];
333       companion[i] = companion[j];
334 
335       while (elms[j]==t2) {j++;}
336 
337       if ((t2=(j-k))>1)
338         {
339           /* number together */
340           num_to_reduce[num_local] = t2++;
341           iptr = local_reduce[num_local++] = (PetscInt*)malloc(t2*sizeof(PetscInt));
342 
343           /* to use binary searching don't remap until we check intersection */
344           *iptr++ = i;
345 
346           /* note that we're skipping the first one */
347           while (++k<j)
348             {*(iptr++) = companion[k];}
349           *iptr = -1;
350         }
351     }
352 
353   /* sentinel for ngh_buf */
354   unique[gs->nel]=INT_MAX;
355 
356   /* for two partition sort hack */
357   num_to_reduce[num_local] = 0;
358   local_reduce[num_local] = NULL;
359   num_to_reduce[++num_local] = 0;
360   local_reduce[num_local] = NULL;
361 
362   /* load 'em up */
363   /* note one extra to hold NON_UNIFORM flag!!! */
364   vals[2] = vals[1] = vals[0] = nel;
365   if (gs->nel>0)
366     {
367        vals[3] = unique[0];
368        vals[4] = unique[gs->nel-1];
369     }
370   else
371     {
372        vals[3] = INT_MAX;
373        vals[4] = INT_MIN;
374     }
375   vals[5] = level;
376   vals[6] = num_gs_ids;
377 
378   /* GLOBAL: send 'em out */
379   ierr = giop(vals,work,sizeof(oprs)/sizeof(oprs[0])-1,oprs);CHKERRABORT(PETSC_COMM_WORLD,ierr);
380 
381   /* must be semi-pos def - only pairwise depends on this */
382   /* LATER - remove this restriction */
383   if (vals[3]<0)
384     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system not semi-pos def \n");}
385 
386   if (vals[4]==INT_MAX)
387     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system ub too large !\n");}
388 
389   gs->nel_min = vals[0];
390   gs->nel_max = vals[1];
391   gs->nel_sum = vals[2];
392   gs->gl_min  = vals[3];
393   gs->gl_max  = vals[4];
394   gs->negl    = vals[4]-vals[3]+1;
395 
396   if (gs->negl<=0)
397     {SETERRABORT(PETSC_COMM_WORLD,PETSC_ERR_PLIB,"gsi_check_args() :: system empty or neg :: %d\n");}
398 
399   /* LATER :: add level == -1 -> program selects level */
400   if (vals[5]<0)
401     {vals[5]=0;}
402   else if (vals[5]>num_nodes)
403     {vals[5]=num_nodes;}
404   gs->level = vals[5];
405 
406   return(gs);
407 }
408 
409 /******************************************************************************/
410 static PetscErrorCode gsi_via_bit_mask(gs_id *gs)
411 {
412    PetscInt i, nel, *elms;
413   PetscInt t1;
414   PetscInt **reduce;
415   PetscInt *map;
416   PetscErrorCode ierr;
417 
418   PetscFunctionBegin;
419   /* totally local removes ... ct_bits == 0 */
420   get_ngh_buf(gs);
421 
422   if (gs->level)
423     {set_pairwise(gs);}
424 
425   if (gs->max_left_over)
426     {set_tree(gs);}
427 
428   /* intersection local and pairwise/tree? */
429   gs->num_local_total = gs->num_local;
430   gs->gop_local_reduce = gs->local_reduce;
431   gs->num_gop_local_reduce = gs->num_local_reduce;
432 
433   map = gs->companion;
434 
435   /* is there any local compression */
436   if (!gs->num_local) {
437     gs->local_strength = NONE;
438     gs->num_local_gop = 0;
439   } else {
440       /* ok find intersection */
441       map = gs->companion;
442       reduce = gs->local_reduce;
443       for (i=0, t1=0; i<gs->num_local; i++, reduce++)
444         {
445           if ((ivec_binary_search(**reduce,gs->pw_elm_list,gs->len_pw_list)>=0)
446               ||
447               ivec_binary_search(**reduce,gs->tree_map_in,gs->tree_map_sz)>=0)
448             {
449               t1++;
450               if (gs->num_local_reduce[i]<=0) SETERRQ(PETSC_ERR_PLIB,"nobody in list?");
451               gs->num_local_reduce[i] *= -1;
452             }
453            **reduce=map[**reduce];
454         }
455 
456       /* intersection is empty */
457       if (!t1)
458         {
459           gs->local_strength = FULL;
460           gs->num_local_gop = 0;
461         }
462       /* intersection not empty */
463       else
464         {
465           gs->local_strength = PARTIAL;
466           ierr = SMI_sort((void*)gs->num_local_reduce, (void*)gs->local_reduce, gs->num_local + 1, SORT_INT_PTR);CHKERRQ(ierr);
467 
468           gs->num_local_gop = t1;
469           gs->num_local_total =  gs->num_local;
470           gs->num_local    -= t1;
471           gs->gop_local_reduce = gs->local_reduce;
472           gs->num_gop_local_reduce = gs->num_local_reduce;
473 
474           for (i=0; i<t1; i++)
475             {
476               if (gs->num_gop_local_reduce[i]>=0) SETERRQ(PETSC_ERR_PLIB,"they aren't negative?");
477               gs->num_gop_local_reduce[i] *= -1;
478               gs->local_reduce++;
479               gs->num_local_reduce++;
480             }
481           gs->local_reduce++;
482           gs->num_local_reduce++;
483         }
484     }
485 
486   elms = gs->pw_elm_list;
487   nel  = gs->len_pw_list;
488   for (i=0; i<nel; i++)
489     {elms[i] = map[elms[i]];}
490 
491   elms = gs->tree_map_in;
492   nel  = gs->tree_map_sz;
493   for (i=0; i<nel; i++)
494     {elms[i] = map[elms[i]];}
495 
496   /* clean up */
497   free((void*) gs->local_elms);
498   free((void*) gs->companion);
499   free((void*) gs->elms);
500   free((void*) gs->ngh_buf);
501   gs->local_elms = gs->companion = gs->elms = gs->ngh_buf = NULL;
502   PetscFunctionReturn(0);
503 }
504 
505 /******************************************************************************/
506 static PetscErrorCode place_in_tree( PetscInt elm)
507 {
508    PetscInt *tp, n;
509 
510   PetscFunctionBegin;
511   if (ntree==tree_buf_sz)
512     {
513       if (tree_buf_sz)
514         {
515           tp = tree_buf;
516           n = tree_buf_sz;
517           tree_buf_sz<<=1;
518           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
519           ivec_copy(tree_buf,tp,n);
520           free(tp);
521         }
522       else
523         {
524           tree_buf_sz = TREE_BUF_SZ;
525           tree_buf = (PetscInt*)malloc(tree_buf_sz*sizeof(PetscInt));
526         }
527     }
528 
529   tree_buf[ntree++] = elm;
530   PetscFunctionReturn(0);
531 }
532 
533 /******************************************************************************/
534 static PetscErrorCode get_ngh_buf(gs_id *gs)
535 {
536    PetscInt i, j, npw=0, ntree_map=0;
537   PetscInt p_mask_size, ngh_buf_size, buf_size;
538   PetscInt *p_mask, *sh_proc_mask, *pw_sh_proc_mask;
539   PetscInt *ngh_buf, *buf1, *buf2;
540   PetscInt offset, per_load, num_loads, or_ct, start, end;
541   PetscInt *ptr1, *ptr2, i_start, negl, nel, *elms;
542   PetscInt oper=GL_B_OR;
543   PetscInt *ptr3, *t_mask, level, ct1, ct2;
544   PetscErrorCode ierr;
545 
546   PetscFunctionBegin;
547   /* to make life easier */
548   nel   = gs->nel;
549   elms  = gs->elms;
550   level = gs->level;
551 
552   /* det #bytes needed for processor bit masks and init w/mask cor. to my_id */
553   p_mask = (PetscInt*) malloc(p_mask_size=len_bit_mask(num_nodes));
554   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
555 
556   /* allocate space for masks and info bufs */
557   gs->nghs = sh_proc_mask = (PetscInt*) malloc(p_mask_size);
558   gs->pw_nghs = pw_sh_proc_mask = (PetscInt*) malloc(p_mask_size);
559   gs->ngh_buf_sz = ngh_buf_size = p_mask_size*nel;
560   t_mask = (PetscInt*) malloc(p_mask_size);
561   gs->ngh_buf = ngh_buf = (PetscInt*) malloc(ngh_buf_size);
562 
563   /* comm buffer size ... memory usage bounded by ~2*msg_buf */
564   /* had thought I could exploit rendezvous threshold */
565 
566   /* default is one pass */
567   per_load = negl  = gs->negl;
568   gs->num_loads = num_loads = 1;
569   i=p_mask_size*negl;
570 
571   /* possible overflow on buffer size */
572   /* overflow hack                    */
573   if (i<0) {i=INT_MAX;}
574 
575   buf_size = PetscMin(msg_buf,i);
576 
577   /* can we do it? */
578   if (p_mask_size>buf_size) SETERRQ2(PETSC_ERR_PLIB,"get_ngh_buf() :: buf<pms :: %d>%d\n",p_mask_size,buf_size);
579 
580   /* get giop buf space ... make *only* one malloc */
581   buf1 = (PetscInt*) malloc(buf_size<<1);
582 
583   /* more than one gior exchange needed? */
584   if (buf_size!=i)
585     {
586       per_load = buf_size/p_mask_size;
587       buf_size = per_load*p_mask_size;
588       gs->num_loads = num_loads = negl/per_load + (negl%per_load>0);
589     }
590 
591 
592   /* convert buf sizes from #bytes to #ints - 32 bit only! */
593   p_mask_size/=sizeof(PetscInt); ngh_buf_size/=sizeof(PetscInt); buf_size/=sizeof(PetscInt);
594 
595   /* find giop work space */
596   buf2 = buf1+buf_size;
597 
598   /* hold #ints needed for processor masks */
599   gs->mask_sz=p_mask_size;
600 
601   /* init buffers */
602   ierr = ivec_zero(sh_proc_mask,p_mask_size);CHKERRQ(ierr);
603   ierr = ivec_zero(pw_sh_proc_mask,p_mask_size);CHKERRQ(ierr);
604   ierr = ivec_zero(ngh_buf,ngh_buf_size);CHKERRQ(ierr);
605 
606   /* HACK reset tree info */
607   tree_buf=NULL;
608   tree_buf_sz=ntree=0;
609 
610   /* ok do it */
611   for (ptr1=ngh_buf,ptr2=elms,end=gs->gl_min,or_ct=i=0; or_ct<num_loads; or_ct++)
612     {
613       /* identity for bitwise or is 000...000 */
614       ivec_zero(buf1,buf_size);
615 
616       /* load msg buffer */
617       for (start=end,end+=per_load,i_start=i; (offset=*ptr2)<end; i++, ptr2++)
618         {
619           offset = (offset-start)*p_mask_size;
620           ivec_copy(buf1+offset,p_mask,p_mask_size);
621         }
622 
623       /* GLOBAL: pass buffer */
624       ierr = giop(buf1,buf2,buf_size,&oper);CHKERRQ(ierr);
625 
626 
627       /* unload buffer into ngh_buf */
628       ptr2=(elms+i_start);
629       for(ptr3=buf1,j=start; j<end; ptr3+=p_mask_size,j++)
630         {
631           /* I own it ... may have to pairwise it */
632           if (j==*ptr2)
633             {
634               /* do i share it w/anyone? */
635               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
636               /* guess not */
637               if (ct1<2)
638                 {ptr2++; ptr1+=p_mask_size; continue;}
639 
640               /* i do ... so keep info and turn off my bit */
641               ivec_copy(ptr1,ptr3,p_mask_size);
642               ierr = ivec_xor(ptr1,p_mask,p_mask_size);CHKERRQ(ierr);
643               ierr = ivec_or(sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
644 
645               /* is it to be done pairwise? */
646               if (--ct1<=level)
647                 {
648                   npw++;
649 
650                   /* turn on high bit to indicate pw need to process */
651                   *ptr2++ |= TOP_BIT;
652                   ierr = ivec_or(pw_sh_proc_mask,ptr1,p_mask_size);CHKERRQ(ierr);
653                   ptr1+=p_mask_size;
654                   continue;
655                 }
656 
657               /* get set for next and note that I have a tree contribution */
658               /* could save exact elm index for tree here -> save a search */
659               ptr2++; ptr1+=p_mask_size; ntree_map++;
660             }
661           /* i don't but still might be involved in tree */
662           else
663             {
664 
665               /* shared by how many? */
666               ct1 = ct_bits((char *)ptr3,p_mask_size*sizeof(PetscInt));
667 
668               /* none! */
669               if (ct1<2) continue;
670 
671               /* is it going to be done pairwise? but not by me of course!*/
672               if (--ct1<=level) continue;
673             }
674           /* LATER we're going to have to process it NOW */
675           /* nope ... tree it */
676           ierr = place_in_tree(j);CHKERRQ(ierr);
677         }
678     }
679 
680   free((void*)t_mask);
681   free((void*)buf1);
682 
683   gs->len_pw_list=npw;
684   gs->num_nghs = ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
685 
686   /* expand from bit mask list to int list and save ngh list */
687   gs->nghs = (PetscInt*) malloc(gs->num_nghs * sizeof(PetscInt));
688   bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),gs->nghs);
689 
690   gs->num_pw_nghs = ct_bits((char *)pw_sh_proc_mask,p_mask_size*sizeof(PetscInt));
691 
692   oper = GL_MAX;
693   ct1 = gs->num_nghs;
694   ierr = giop(&ct1,&ct2,1,&oper);CHKERRQ(ierr);
695   gs->max_nghs = ct1;
696 
697   gs->tree_map_sz  = ntree_map;
698   gs->max_left_over=ntree;
699 
700   free((void*)p_mask);
701   free((void*)sh_proc_mask);
702   PetscFunctionReturn(0);
703 }
704 
705 /******************************************************************************/
706 static PetscErrorCode set_pairwise(gs_id *gs)
707 {
708    PetscInt i, j;
709   PetscInt p_mask_size;
710   PetscInt *p_mask, *sh_proc_mask, *tmp_proc_mask;
711   PetscInt *ngh_buf, *buf2;
712   PetscInt offset;
713   PetscInt *msg_list, *msg_size, **msg_nodes, nprs;
714   PetscInt *pairwise_elm_list, len_pair_list=0;
715   PetscInt *iptr, t1, i_start, nel, *elms;
716   PetscInt ct;
717   PetscErrorCode ierr;
718 
719   PetscFunctionBegin;
720   /* to make life easier */
721   nel  = gs->nel;
722   elms = gs->elms;
723   ngh_buf = gs->ngh_buf;
724   sh_proc_mask  = gs->pw_nghs;
725 
726   /* need a few temp masks */
727   p_mask_size   = len_bit_mask(num_nodes);
728   p_mask        = (PetscInt*) malloc(p_mask_size);
729   tmp_proc_mask = (PetscInt*) malloc(p_mask_size);
730 
731   /* set mask to my my_id's bit mask */
732   ierr = set_bit_mask(p_mask,p_mask_size,my_id);CHKERRQ(ierr);
733 
734   p_mask_size /= sizeof(PetscInt);
735 
736   len_pair_list=gs->len_pw_list;
737   gs->pw_elm_list=pairwise_elm_list=(PetscInt*)malloc((len_pair_list+1)*sizeof(PetscInt));
738 
739   /* how many processors (nghs) do we have to exchange with? */
740   nprs=gs->num_pairs=ct_bits((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt));
741 
742 
743   /* allocate space for gs_gop() info */
744   gs->pair_list = msg_list = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
745   gs->msg_sizes = msg_size  = (PetscInt *)  malloc(sizeof(PetscInt)*nprs);
746   gs->node_list = msg_nodes = (PetscInt **) malloc(sizeof(PetscInt*)*(nprs+1));
747 
748   /* init msg_size list */
749   ierr = ivec_zero(msg_size,nprs);CHKERRQ(ierr);
750 
751   /* expand from bit mask list to int list */
752   ierr = bm_to_proc((char *)sh_proc_mask,p_mask_size*sizeof(PetscInt),msg_list);CHKERRQ(ierr);
753 
754   /* keep list of elements being handled pairwise */
755   for (i=j=0;i<nel;i++)
756     {
757       if (elms[i] & TOP_BIT)
758         {elms[i] ^= TOP_BIT; pairwise_elm_list[j++] = i;}
759     }
760   pairwise_elm_list[j] = -1;
761 
762   gs->msg_ids_out = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
763   gs->msg_ids_out[nprs] = MPI_REQUEST_NULL;
764   gs->msg_ids_in = (MPI_Request *)  malloc(sizeof(MPI_Request)*(nprs+1));
765   gs->msg_ids_in[nprs] = MPI_REQUEST_NULL;
766   gs->pw_vals = (PetscScalar *) malloc(sizeof(PetscScalar)*len_pair_list*vec_sz);
767 
768   /* find who goes to each processor */
769   for (i_start=i=0;i<nprs;i++)
770     {
771       /* processor i's mask */
772       ierr = set_bit_mask(p_mask,p_mask_size*sizeof(PetscInt),msg_list[i]);CHKERRQ(ierr);
773 
774       /* det # going to processor i */
775       for (ct=j=0;j<len_pair_list;j++)
776         {
777           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
778           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
779           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
780             {ct++;}
781         }
782       msg_size[i] = ct;
783       i_start = PetscMax(i_start,ct);
784 
785       /*space to hold nodes in message to first neighbor */
786       msg_nodes[i] = iptr = (PetscInt*) malloc(sizeof(PetscInt)*(ct+1));
787 
788       for (j=0;j<len_pair_list;j++)
789         {
790           buf2 = ngh_buf+(pairwise_elm_list[j]*p_mask_size);
791           ierr = ivec_and3(tmp_proc_mask,p_mask,buf2,p_mask_size);CHKERRQ(ierr);
792           if (ct_bits((char *)tmp_proc_mask,p_mask_size*sizeof(PetscInt)))
793             {*iptr++ = j;}
794         }
795       *iptr = -1;
796     }
797   msg_nodes[nprs] = NULL;
798 
799   j=gs->loc_node_pairs=i_start;
800   t1 = GL_MAX;
801   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
802   gs->max_node_pairs = i_start;
803 
804   i_start=j;
805   t1 = GL_MIN;
806   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
807   gs->min_node_pairs = i_start;
808 
809   i_start=j;
810   t1 = GL_ADD;
811   ierr = giop(&i_start,&offset,1,&t1);CHKERRQ(ierr);
812   gs->avg_node_pairs = i_start/num_nodes + 1;
813 
814   i_start=nprs;
815   t1 = GL_MAX;
816   giop(&i_start,&offset,1,&t1);
817   gs->max_pairs = i_start;
818 
819 
820   /* remap pairwise in tail of gsi_via_bit_mask() */
821   gs->msg_total = ivec_sum(gs->msg_sizes,nprs);
822   gs->out = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
823   gs->in  = (PetscScalar *) malloc(sizeof(PetscScalar)*gs->msg_total*vec_sz);
824 
825   /* reset malloc pool */
826   free((void*)p_mask);
827   free((void*)tmp_proc_mask);
828   PetscFunctionReturn(0);
829 }
830 
831 /* to do pruned tree just save ngh buf copy for each one and decode here!
832 ******************************************************************************/
833 static PetscErrorCode set_tree(gs_id *gs)
834 {
835   PetscInt i, j, n, nel;
836   PetscInt *iptr_in, *iptr_out, *tree_elms, *elms;
837 
838   PetscFunctionBegin;
839   /* local work ptrs */
840   elms = gs->elms;
841   nel     = gs->nel;
842 
843   /* how many via tree */
844   gs->tree_nel  = n = ntree;
845   gs->tree_elms = tree_elms = iptr_in = tree_buf;
846   gs->tree_buf  = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
847   gs->tree_work = (PetscScalar *) malloc(sizeof(PetscScalar)*n*vec_sz);
848   j=gs->tree_map_sz;
849   gs->tree_map_in = iptr_in  = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
850   gs->tree_map_out = iptr_out = (PetscInt*) malloc(sizeof(PetscInt)*(j+1));
851 
852   /* search the longer of the two lists */
853   /* note ... could save this info in get_ngh_buf and save searches */
854   if (n<=nel)
855     {
856       /* bijective fct w/remap - search elm list */
857       for (i=0; i<n; i++)
858         {
859           if ((j=ivec_binary_search(*tree_elms++,elms,nel))>=0)
860             {*iptr_in++ = j; *iptr_out++ = i;}
861         }
862     }
863   else
864     {
865       for (i=0; i<nel; i++)
866         {
867           if ((j=ivec_binary_search(*elms++,tree_elms,n))>=0)
868             {*iptr_in++ = i; *iptr_out++ = j;}
869         }
870     }
871 
872   /* sentinel */
873   *iptr_in = *iptr_out = -1;
874   PetscFunctionReturn(0);
875 }
876 
877 /******************************************************************************/
878 static PetscErrorCode gs_gop_local_out( gs_id *gs,  PetscScalar *vals)
879 {
880   PetscInt *num, *map, **reduce;
881   PetscScalar tmp;
882 
883   PetscFunctionBegin;
884   num    = gs->num_gop_local_reduce;
885   reduce = gs->gop_local_reduce;
886   while ((map = *reduce++))
887     {
888       /* wall */
889       if (*num == 2)
890         {
891           num ++;
892           vals[map[1]] = vals[map[0]];
893         }
894       /* corner shared by three elements */
895       else if (*num == 3)
896         {
897           num ++;
898           vals[map[2]] = vals[map[1]] = vals[map[0]];
899         }
900       /* corner shared by four elements */
901       else if (*num == 4)
902         {
903           num ++;
904           vals[map[3]] = vals[map[2]] = vals[map[1]] = vals[map[0]];
905         }
906       /* general case ... odd geoms ... 3D*/
907       else
908         {
909           num++;
910           tmp = *(vals + *map++);
911           while (*map >= 0)
912             {*(vals + *map++) = tmp;}
913         }
914     }
915   PetscFunctionReturn(0);
916 }
917 
918 /******************************************************************************/
919 static PetscErrorCode gs_gop_local_plus( gs_id *gs,  PetscScalar *vals)
920 {
921    PetscInt *num, *map, **reduce;
922    PetscScalar tmp;
923 
924   PetscFunctionBegin;
925   num    = gs->num_local_reduce;
926   reduce = gs->local_reduce;
927   while ((map = *reduce))
928     {
929       /* wall */
930       if (*num == 2)
931         {
932           num ++; reduce++;
933           vals[map[1]] = vals[map[0]] += vals[map[1]];
934         }
935       /* corner shared by three elements */
936       else if (*num == 3)
937         {
938           num ++; reduce++;
939           vals[map[2]]=vals[map[1]]=vals[map[0]]+=(vals[map[1]]+vals[map[2]]);
940         }
941       /* corner shared by four elements */
942       else if (*num == 4)
943         {
944           num ++; reduce++;
945           vals[map[1]]=vals[map[2]]=vals[map[3]]=vals[map[0]] +=
946                                  (vals[map[1]] + vals[map[2]] + vals[map[3]]);
947         }
948       /* general case ... odd geoms ... 3D*/
949       else
950         {
951           num ++;
952           tmp = 0.0;
953           while (*map >= 0)
954             {tmp += *(vals + *map++);}
955 
956           map = *reduce++;
957           while (*map >= 0)
958             {*(vals + *map++) = tmp;}
959         }
960     }
961   PetscFunctionReturn(0);
962 }
963 
964 /******************************************************************************/
965 static PetscErrorCode gs_gop_local_in_plus( gs_id *gs,  PetscScalar *vals)
966 {
967    PetscInt *num, *map, **reduce;
968    PetscScalar *base;
969 
970   PetscFunctionBegin;
971   num    = gs->num_gop_local_reduce;
972   reduce = gs->gop_local_reduce;
973   while ((map = *reduce++))
974     {
975       /* wall */
976       if (*num == 2)
977         {
978           num ++;
979           vals[map[0]] += vals[map[1]];
980         }
981       /* corner shared by three elements */
982       else if (*num == 3)
983         {
984           num ++;
985           vals[map[0]] += (vals[map[1]] + vals[map[2]]);
986         }
987       /* corner shared by four elements */
988       else if (*num == 4)
989         {
990           num ++;
991           vals[map[0]] += (vals[map[1]] + vals[map[2]] + vals[map[3]]);
992         }
993       /* general case ... odd geoms ... 3D*/
994       else
995         {
996           num++;
997           base = vals + *map++;
998           while (*map >= 0)
999             {*base += *(vals + *map++);}
1000         }
1001     }
1002   PetscFunctionReturn(0);
1003 }
1004 
1005 /******************************************************************************/
1006 PetscErrorCode gs_free( gs_id *gs)
1007 {
1008    PetscInt i;
1009 
1010   PetscFunctionBegin;
1011   if (gs->nghs) {free((void*) gs->nghs);}
1012   if (gs->pw_nghs) {free((void*) gs->pw_nghs);}
1013 
1014   /* tree */
1015   if (gs->max_left_over)
1016     {
1017       if (gs->tree_elms) {free((void*) gs->tree_elms);}
1018       if (gs->tree_buf) {free((void*) gs->tree_buf);}
1019       if (gs->tree_work) {free((void*) gs->tree_work);}
1020       if (gs->tree_map_in) {free((void*) gs->tree_map_in);}
1021       if (gs->tree_map_out) {free((void*) gs->tree_map_out);}
1022     }
1023 
1024   /* pairwise info */
1025   if (gs->num_pairs)
1026     {
1027       /* should be NULL already */
1028       if (gs->ngh_buf) {free((void*) gs->ngh_buf);}
1029       if (gs->elms) {free((void*) gs->elms);}
1030       if (gs->local_elms) {free((void*) gs->local_elms);}
1031       if (gs->companion) {free((void*) gs->companion);}
1032 
1033       /* only set if pairwise */
1034       if (gs->vals) {free((void*) gs->vals);}
1035       if (gs->in) {free((void*) gs->in);}
1036       if (gs->out) {free((void*) gs->out);}
1037       if (gs->msg_ids_in) {free((void*) gs->msg_ids_in);}
1038       if (gs->msg_ids_out) {free((void*) gs->msg_ids_out);}
1039       if (gs->pw_vals) {free((void*) gs->pw_vals);}
1040       if (gs->pw_elm_list) {free((void*) gs->pw_elm_list);}
1041       if (gs->node_list)
1042         {
1043           for (i=0;i<gs->num_pairs;i++)
1044             {if (gs->node_list[i]) {free((void*) gs->node_list[i]);}}
1045           free((void*) gs->node_list);
1046         }
1047       if (gs->msg_sizes) {free((void*) gs->msg_sizes);}
1048       if (gs->pair_list) {free((void*) gs->pair_list);}
1049     }
1050 
1051   /* local info */
1052   if (gs->num_local_total>=0)
1053     {
1054       for (i=0;i<gs->num_local_total+1;i++)
1055         /*      for (i=0;i<gs->num_local_total;i++) */
1056         {
1057           if (gs->num_gop_local_reduce[i])
1058             {free((void*) gs->gop_local_reduce[i]);}
1059         }
1060     }
1061 
1062   /* if intersection tree/pairwise and local isn't empty */
1063   if (gs->gop_local_reduce) {free((void*) gs->gop_local_reduce);}
1064   if (gs->num_gop_local_reduce) {free((void*) gs->num_gop_local_reduce);}
1065 
1066   free((void*) gs);
1067   PetscFunctionReturn(0);
1068 }
1069 
1070 /******************************************************************************/
1071 PetscErrorCode gs_gop_vec( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt step)
1072 {
1073   PetscErrorCode ierr;
1074 
1075   PetscFunctionBegin;
1076   switch (*op) {
1077   case '+':
1078     gs_gop_vec_plus(gs,vals,step);
1079     break;
1080   default:
1081     ierr = PetscInfo1(0,"gs_gop_vec() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1082     ierr = PetscInfo(0,"gs_gop_vec() :: default :: plus");CHKERRQ(ierr);
1083     gs_gop_vec_plus(gs,vals,step);
1084     break;
1085   }
1086   PetscFunctionReturn(0);
1087 }
1088 
1089 /******************************************************************************/
1090 static PetscErrorCode gs_gop_vec_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
1091 {
1092   PetscFunctionBegin;
1093   if (!gs) {SETERRQ(PETSC_ERR_PLIB,"gs_gop_vec() passed NULL gs handle!!!");}
1094 
1095   /* local only operations!!! */
1096   if (gs->num_local)
1097     {gs_gop_vec_local_plus(gs,vals,step);}
1098 
1099   /* if intersection tree/pairwise and local isn't empty */
1100   if (gs->num_local_gop)
1101     {
1102       gs_gop_vec_local_in_plus(gs,vals,step);
1103 
1104       /* pairwise */
1105       if (gs->num_pairs)
1106         {gs_gop_vec_pairwise_plus(gs,vals,step);}
1107 
1108       /* tree */
1109       else if (gs->max_left_over)
1110         {gs_gop_vec_tree_plus(gs,vals,step);}
1111 
1112       gs_gop_vec_local_out(gs,vals,step);
1113     }
1114   /* if intersection tree/pairwise and local is empty */
1115   else
1116     {
1117       /* pairwise */
1118       if (gs->num_pairs)
1119         {gs_gop_vec_pairwise_plus(gs,vals,step);}
1120 
1121       /* tree */
1122       else if (gs->max_left_over)
1123         {gs_gop_vec_tree_plus(gs,vals,step);}
1124     }
1125   PetscFunctionReturn(0);
1126 }
1127 
1128 /******************************************************************************/
1129 static PetscErrorCode gs_gop_vec_local_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
1130 {
1131    PetscInt *num, *map, **reduce;
1132    PetscScalar *base;
1133 
1134   PetscFunctionBegin;
1135   num    = gs->num_local_reduce;
1136   reduce = gs->local_reduce;
1137   while ((map = *reduce))
1138     {
1139       base = vals + map[0] * step;
1140 
1141       /* wall */
1142       if (*num == 2)
1143         {
1144           num++; reduce++;
1145           rvec_add (base,vals+map[1]*step,step);
1146           rvec_copy(vals+map[1]*step,base,step);
1147         }
1148       /* corner shared by three elements */
1149       else if (*num == 3)
1150         {
1151           num++; reduce++;
1152           rvec_add (base,vals+map[1]*step,step);
1153           rvec_add (base,vals+map[2]*step,step);
1154           rvec_copy(vals+map[2]*step,base,step);
1155           rvec_copy(vals+map[1]*step,base,step);
1156         }
1157       /* corner shared by four elements */
1158       else if (*num == 4)
1159         {
1160           num++; reduce++;
1161           rvec_add (base,vals+map[1]*step,step);
1162           rvec_add (base,vals+map[2]*step,step);
1163           rvec_add (base,vals+map[3]*step,step);
1164           rvec_copy(vals+map[3]*step,base,step);
1165           rvec_copy(vals+map[2]*step,base,step);
1166           rvec_copy(vals+map[1]*step,base,step);
1167         }
1168       /* general case ... odd geoms ... 3D */
1169       else
1170         {
1171           num++;
1172           while (*++map >= 0)
1173             {rvec_add (base,vals+*map*step,step);}
1174 
1175           map = *reduce;
1176           while (*++map >= 0)
1177             {rvec_copy(vals+*map*step,base,step);}
1178 
1179           reduce++;
1180         }
1181     }
1182   PetscFunctionReturn(0);
1183 }
1184 
1185 /******************************************************************************/
1186 static PetscErrorCode gs_gop_vec_local_in_plus( gs_id *gs,  PetscScalar *vals, PetscInt step)
1187 {
1188    PetscInt  *num, *map, **reduce;
1189    PetscScalar *base;
1190   PetscFunctionBegin;
1191   num    = gs->num_gop_local_reduce;
1192   reduce = gs->gop_local_reduce;
1193   while ((map = *reduce++))
1194     {
1195       base = vals + map[0] * step;
1196 
1197       /* wall */
1198       if (*num == 2)
1199         {
1200           num ++;
1201           rvec_add(base,vals+map[1]*step,step);
1202         }
1203       /* corner shared by three elements */
1204       else if (*num == 3)
1205         {
1206           num ++;
1207           rvec_add(base,vals+map[1]*step,step);
1208           rvec_add(base,vals+map[2]*step,step);
1209         }
1210       /* corner shared by four elements */
1211       else if (*num == 4)
1212         {
1213           num ++;
1214           rvec_add(base,vals+map[1]*step,step);
1215           rvec_add(base,vals+map[2]*step,step);
1216           rvec_add(base,vals+map[3]*step,step);
1217         }
1218       /* general case ... odd geoms ... 3D*/
1219       else
1220         {
1221           num++;
1222           while (*++map >= 0)
1223             {rvec_add(base,vals+*map*step,step);}
1224         }
1225     }
1226   PetscFunctionReturn(0);
1227 }
1228 
1229 /******************************************************************************/
1230 static PetscErrorCode gs_gop_vec_local_out( gs_id *gs,  PetscScalar *vals, PetscInt step)
1231 {
1232    PetscInt *num, *map, **reduce;
1233    PetscScalar *base;
1234 
1235   PetscFunctionBegin;
1236   num    = gs->num_gop_local_reduce;
1237   reduce = gs->gop_local_reduce;
1238   while ((map = *reduce++))
1239     {
1240       base = vals + map[0] * step;
1241 
1242       /* wall */
1243       if (*num == 2)
1244         {
1245           num ++;
1246           rvec_copy(vals+map[1]*step,base,step);
1247         }
1248       /* corner shared by three elements */
1249       else if (*num == 3)
1250         {
1251           num ++;
1252           rvec_copy(vals+map[1]*step,base,step);
1253           rvec_copy(vals+map[2]*step,base,step);
1254         }
1255       /* corner shared by four elements */
1256       else if (*num == 4)
1257         {
1258           num ++;
1259           rvec_copy(vals+map[1]*step,base,step);
1260           rvec_copy(vals+map[2]*step,base,step);
1261           rvec_copy(vals+map[3]*step,base,step);
1262         }
1263       /* general case ... odd geoms ... 3D*/
1264       else
1265         {
1266           num++;
1267           while (*++map >= 0)
1268             {rvec_copy(vals+*map*step,base,step);}
1269         }
1270     }
1271   PetscFunctionReturn(0);
1272 }
1273 
1274 /******************************************************************************/
1275 static PetscErrorCode gs_gop_vec_pairwise_plus( gs_id *gs,  PetscScalar *in_vals, PetscInt step)
1276 {
1277   PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1278   PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1279   PetscInt *pw, *list, *size, **nodes;
1280   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1281   MPI_Status status;
1282   PetscBLASInt i1 = 1,dstep;
1283   PetscErrorCode ierr;
1284 
1285   PetscFunctionBegin;
1286   /* strip and load s */
1287   msg_list =list         = gs->pair_list;
1288   msg_size =size         = gs->msg_sizes;
1289   msg_nodes=nodes        = gs->node_list;
1290   iptr=pw                = gs->pw_elm_list;
1291   dptr1=dptr3            = gs->pw_vals;
1292   msg_ids_in  = ids_in   = gs->msg_ids_in;
1293   msg_ids_out = ids_out  = gs->msg_ids_out;
1294   dptr2                  = gs->out;
1295   in1=in2                = gs->in;
1296 
1297   /* post the receives */
1298   /*  msg_nodes=nodes; */
1299   do
1300     {
1301       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1302          second one *list and do list++ afterwards */
1303       ierr = MPI_Irecv(in1, *size *step, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1304       in1 += *size++ *step;
1305     }
1306   while (*++msg_nodes);
1307   msg_nodes=nodes;
1308 
1309   /* load gs values into in out gs buffers */
1310   while (*iptr >= 0)
1311     {
1312       rvec_copy(dptr3,in_vals + *iptr*step,step);
1313       dptr3+=step;
1314       iptr++;
1315     }
1316 
1317   /* load out buffers and post the sends */
1318   while ((iptr = *msg_nodes++))
1319     {
1320       dptr3 = dptr2;
1321       while (*iptr >= 0)
1322         {
1323           rvec_copy(dptr2,dptr1 + *iptr*step,step);
1324           dptr2+=step;
1325           iptr++;
1326         }
1327       ierr = MPI_Isend(dptr3, *msg_size++ *step, MPIU_SCALAR, *msg_list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1328     }
1329 
1330   /* tree */
1331   if (gs->max_left_over)
1332     {gs_gop_vec_tree_plus(gs,in_vals,step);}
1333 
1334   /* process the received data */
1335   msg_nodes=nodes;
1336   while ((iptr = *nodes++)){
1337     PetscScalar d1 = 1.0;
1338       /* Should I check the return value of MPI_Wait() or status? */
1339       /* Can this loop be replaced by a call to MPI_Waitall()? */
1340       ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1341       while (*iptr >= 0) {
1342 	dstep = PetscBLASIntCast(step);
1343         BLASaxpy_(&dstep,&d1,in2,&i1,dptr1 + *iptr*step,&i1);
1344 	in2+=step;
1345 	iptr++;
1346       }
1347   }
1348 
1349   /* replace vals */
1350   while (*pw >= 0)
1351     {
1352       rvec_copy(in_vals + *pw*step,dptr1,step);
1353       dptr1+=step;
1354       pw++;
1355     }
1356 
1357   /* clear isend message handles */
1358   /* This changed for clarity though it could be the same */
1359   while (*msg_nodes++)
1360     /* Should I check the return value of MPI_Wait() or status? */
1361     /* Can this loop be replaced by a call to MPI_Waitall()? */
1362     {ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);}
1363 
1364   PetscFunctionReturn(0);
1365 }
1366 
1367 /******************************************************************************/
1368 static PetscErrorCode gs_gop_vec_tree_plus( gs_id *gs,  PetscScalar *vals,  PetscInt step)
1369 {
1370   PetscInt size, *in, *out;
1371   PetscScalar *buf, *work;
1372   PetscInt op[] = {GL_ADD,0};
1373   PetscBLASInt i1 = 1;
1374 
1375   PetscFunctionBegin;
1376   /* copy over to local variables */
1377   in   = gs->tree_map_in;
1378   out  = gs->tree_map_out;
1379   buf  = gs->tree_buf;
1380   work = gs->tree_work;
1381   size = gs->tree_nel*step;
1382 
1383   /* zero out collection buffer */
1384   rvec_zero(buf,size);
1385 
1386 
1387   /* copy over my contributions */
1388   while (*in >= 0)
1389     {
1390       PetscBLASInt dstep = PetscBLASIntCast(step);
1391       BLAScopy_(&dstep,vals + *in++*step,&i1,buf + *out++*step,&i1);
1392     }
1393 
1394   /* perform fan in/out on full buffer */
1395   /* must change grop to handle the blas */
1396   grop(buf,work,size,op);
1397 
1398   /* reset */
1399   in   = gs->tree_map_in;
1400   out  = gs->tree_map_out;
1401 
1402   /* get the portion of the results I need */
1403   while (*in >= 0)
1404     {
1405       PetscBLASInt dstep = PetscBLASIntCast(step);
1406       BLAScopy_(&dstep,buf + *out++*step,&i1,vals + *in++*step,&i1);
1407     }
1408   PetscFunctionReturn(0);
1409 }
1410 
1411 /******************************************************************************/
1412 PetscErrorCode gs_gop_hc( gs_id *gs,  PetscScalar *vals,  const char *op,  PetscInt dim)
1413 {
1414   PetscErrorCode ierr;
1415 
1416   PetscFunctionBegin;
1417   switch (*op) {
1418   case '+':
1419     gs_gop_plus_hc(gs,vals,dim);
1420     break;
1421   default:
1422     ierr = PetscInfo1(0,"gs_gop_hc() :: %c is not a valid op",op[0]);CHKERRQ(ierr);
1423     ierr = PetscInfo(0,"gs_gop_hc() :: default :: plus\n");CHKERRQ(ierr);
1424     gs_gop_plus_hc(gs,vals,dim);
1425     break;
1426   }
1427   PetscFunctionReturn(0);
1428 }
1429 
1430 /******************************************************************************/
1431 static PetscErrorCode gs_gop_plus_hc( gs_id *gs,  PetscScalar *vals, PetscInt dim)
1432 {
1433   PetscFunctionBegin;
1434   /* if there's nothing to do return */
1435   if (dim<=0)
1436     {  PetscFunctionReturn(0);}
1437 
1438   /* can't do more dimensions then exist */
1439   dim = PetscMin(dim,i_log2_num_nodes);
1440 
1441   /* local only operations!!! */
1442   if (gs->num_local)
1443     {gs_gop_local_plus(gs,vals);}
1444 
1445   /* if intersection tree/pairwise and local isn't empty */
1446   if (gs->num_local_gop)
1447     {
1448       gs_gop_local_in_plus(gs,vals);
1449 
1450       /* pairwise will do tree inside ... */
1451       if (gs->num_pairs)
1452         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
1453 
1454       /* tree only */
1455       else if (gs->max_left_over)
1456         {gs_gop_tree_plus_hc(gs,vals,dim);}
1457 
1458       gs_gop_local_out(gs,vals);
1459     }
1460   /* if intersection tree/pairwise and local is empty */
1461   else
1462     {
1463       /* pairwise will do tree inside */
1464       if (gs->num_pairs)
1465         {gs_gop_pairwise_plus_hc(gs,vals,dim);}
1466 
1467       /* tree */
1468       else if (gs->max_left_over)
1469         {gs_gop_tree_plus_hc(gs,vals,dim);}
1470     }
1471   PetscFunctionReturn(0);
1472 }
1473 
1474 /******************************************************************************/
1475 static PetscErrorCode gs_gop_pairwise_plus_hc( gs_id *gs,  PetscScalar *in_vals, PetscInt dim)
1476 {
1477    PetscScalar *dptr1, *dptr2, *dptr3, *in1, *in2;
1478    PetscInt *iptr, *msg_list, *msg_size, **msg_nodes;
1479    PetscInt *pw, *list, *size, **nodes;
1480   MPI_Request *msg_ids_in, *msg_ids_out, *ids_in, *ids_out;
1481   MPI_Status status;
1482   PetscInt i, mask=1;
1483   PetscErrorCode ierr;
1484 
1485   PetscFunctionBegin;
1486   for (i=1; i<dim; i++)
1487     {mask<<=1; mask++;}
1488 
1489 
1490   /* strip and load s */
1491   msg_list =list         = gs->pair_list;
1492   msg_size =size         = gs->msg_sizes;
1493   msg_nodes=nodes        = gs->node_list;
1494   iptr=pw                = gs->pw_elm_list;
1495   dptr1=dptr3            = gs->pw_vals;
1496   msg_ids_in  = ids_in   = gs->msg_ids_in;
1497   msg_ids_out = ids_out  = gs->msg_ids_out;
1498   dptr2                  = gs->out;
1499   in1=in2                = gs->in;
1500 
1501   /* post the receives */
1502   /*  msg_nodes=nodes; */
1503   do
1504     {
1505       /* Should MPI_ANY_SOURCE be replaced by *list ? In that case do the
1506          second one *list and do list++ afterwards */
1507       if ((my_id|mask)==(*list|mask))
1508         {
1509           ierr = MPI_Irecv(in1, *size, MPIU_SCALAR, MPI_ANY_SOURCE, MSGTAG1 + *list++, gs->gs_comm, msg_ids_in++);CHKERRQ(ierr);
1510           in1 += *size++;
1511         }
1512       else
1513         {list++; size++;}
1514     }
1515   while (*++msg_nodes);
1516 
1517   /* load gs values into in out gs buffers */
1518   while (*iptr >= 0)
1519     {*dptr3++ = *(in_vals + *iptr++);}
1520 
1521   /* load out buffers and post the sends */
1522   msg_nodes=nodes;
1523   list = msg_list;
1524   while ((iptr = *msg_nodes++))
1525     {
1526       if ((my_id|mask)==(*list|mask))
1527         {
1528           dptr3 = dptr2;
1529           while (*iptr >= 0)
1530             {*dptr2++ = *(dptr1 + *iptr++);}
1531           /* CHECK PERSISTENT COMMS MODE FOR ALL THIS STUFF */
1532           /* is msg_ids_out++ correct? */
1533           ierr = MPI_Isend(dptr3, *msg_size++, MPIU_SCALAR, *list++, MSGTAG1+my_id, gs->gs_comm, msg_ids_out++);CHKERRQ(ierr);
1534         }
1535       else
1536         {list++; msg_size++;}
1537     }
1538 
1539   /* do the tree while we're waiting */
1540   if (gs->max_left_over)
1541     {gs_gop_tree_plus_hc(gs,in_vals,dim);}
1542 
1543   /* process the received data */
1544   msg_nodes=nodes;
1545   list = msg_list;
1546   while ((iptr = *nodes++))
1547     {
1548       if ((my_id|mask)==(*list|mask))
1549         {
1550           /* Should I check the return value of MPI_Wait() or status? */
1551           /* Can this loop be replaced by a call to MPI_Waitall()? */
1552           ierr = MPI_Wait(ids_in++, &status);CHKERRQ(ierr);
1553           while (*iptr >= 0)
1554             {*(dptr1 + *iptr++) += *in2++;}
1555         }
1556       list++;
1557     }
1558 
1559   /* replace vals */
1560   while (*pw >= 0)
1561     {*(in_vals + *pw++) = *dptr1++;}
1562 
1563   /* clear isend message handles */
1564   /* This changed for clarity though it could be the same */
1565   while (*msg_nodes++)
1566     {
1567       if ((my_id|mask)==(*msg_list|mask))
1568         {
1569           /* Should I check the return value of MPI_Wait() or status? */
1570           /* Can this loop be replaced by a call to MPI_Waitall()? */
1571           ierr = MPI_Wait(ids_out++, &status);CHKERRQ(ierr);
1572         }
1573       msg_list++;
1574     }
1575 
1576   PetscFunctionReturn(0);
1577 }
1578 
1579 /******************************************************************************/
1580 static PetscErrorCode gs_gop_tree_plus_hc(gs_id *gs, PetscScalar *vals, PetscInt dim)
1581 {
1582   PetscInt size;
1583   PetscInt *in, *out;
1584   PetscScalar *buf, *work;
1585   PetscInt op[] = {GL_ADD,0};
1586 
1587   PetscFunctionBegin;
1588   in   = gs->tree_map_in;
1589   out  = gs->tree_map_out;
1590   buf  = gs->tree_buf;
1591   work = gs->tree_work;
1592   size = gs->tree_nel;
1593 
1594   rvec_zero(buf,size);
1595 
1596   while (*in >= 0)
1597     {*(buf + *out++) = *(vals + *in++);}
1598 
1599   in   = gs->tree_map_in;
1600   out  = gs->tree_map_out;
1601 
1602   grop_hc(buf,work,size,op,dim);
1603 
1604   while (*in >= 0)
1605     {*(vals + *in++) = *(buf + *out++);}
1606   PetscFunctionReturn(0);
1607 }
1608 
1609 
1610 
1611