# hub.darcs.net :: basilisk -> basilisk -> files

Basilisk source code

## root / src / tag.h.page

 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269  /** # Tagging connected neighborhoods The goal is to associate a unique (strictly positive) index ("tag") *t* to cells which belong to the same "neighborhood". All cells in a neighborhood have an initial tag value (given by the user) which is non-zero and are separated from cells in other neighborhoods by cells which have an initial (and final) tag value of zero. We first define the restriction function for tag values. The parent tag is just the minimum of its children's tags. */ static void restriction_tag (Point point, scalar t) { double min = HUGE; foreach_child() if (t[] < min) min = t[]; t[] = min; } /** We also need a few helper functions. The function below implements a [binary search](https://en.wikipedia.org/wiki/Binary_search_algorithm) of a sorted array. It returns the index in the array so that $a[s] \leq tag < a[s+1]$. */ static long lookup_tag (Array * a, double tag) { long len = a->len/sizeof(double); double * p = (double *) a->p; if (tag == p[0]) return 0; if (tag == p[len - 1]) return len - 1; long s = 0, e = len - 1; while (s < e - 1) { long m = (s + e)/2; if (p[m] <= tag) s = m; else e = m; } return s; } #if _MPI static int compar_double (const void * p1, const void * p2) { const double * a = p1, * b = p2; return *a > *b; } #endif /** The function just takes the scalar field *t* which holds the initial and final tag values. It returns the maximum neighborhood tag value (which is also the number of neighborhoods). */ int tag (scalar t) { /** We first set the restriction and prolongation functions (on trees). */ t.restriction = restriction_tag; #if TREE t.refine = t.prolongation = refine_injection; #endif /** As an initial guess, we set all the (leaf) cells which have a non-zero initial tag value to the Z- (or Morton-) index. We thus have a different "neighborhood" for each cell which has a non-zero initial tag, and a single neighborhood (tagged zero) for all the cells which have a zero initial tag. */ #if _MPI scalar index[]; z_indexing (index, true); foreach() t[] = (t[] != 0)*(index[] + 1); #else // !_MPI long i = 1; foreach_cell() if (is_leaf(cell)) { t[] = (t[] != 0)*i++; continue; } #endif // !_MPI boundary ({t}); /** To gather cells which belong to the same neighborhood, we repeat multigrid iterations until none of the tag values changes. */ bool changed; do { /** We first do a restriction from the finest to the coarsest level of the multigrid hierarchy, using the restriction_tag() function above. */ restriction ({t}); /** We then go from the coarsest to the finest level and update the tag values. */ changed = false; for (int l = 1; l <= grid->maxdepth; l++) { /** If the parent tag is non-zero, we set the child tag to the value of its parent (i.e. to the minimum tag value of its siblings). */ foreach_level(l) if (coarse(t)) t[] = coarse(t); boundary_level ({t}, l); /** For cells which verify the threshold condition (i.e. for which t[] != 0), we refine this initial guess by taking the minimum (non-zero) tag value of its closest neighbors. We also track whether this update changes any of the tag values. */ foreach_level (l, reduction(max:changed)) if (t[]) { double min = t[]; foreach_neighbor(1) if (t[] && t[] < min) min = t[]; /** On trees, we need to take into account the minimum tag value of neighboring fine cells. */ #if TREE foreach_dimension() for (int i = -1; i <= 2; i += 3) if (is_refined (neighbor((2*i - 1)/3))) for (int j = 0; j <= 1; j++) for (int k = 0; k <= 1; k++) if (fine(t,i,j,k) && fine(t,i,j,k) < min) min = fine(t,i,j,k); #endif // TREE if (t[] != min) { changed = true; t[] = min; } } boundary_level ({t}, l); } } while (changed); /** ## Reducing the range of indices Each neighborhood is now tagged with a unique index. The range of indices is large however (between one and the total number of leaves). The goal of this step is to reduce this range to between one and the number of neighborhoods. To do this, we create an ordered array of unique indices. */ Array * a = array_new(); foreach_leaf() if (t[] > 0) { /** We first check whether the index is larger than the maximum or smaller than the minimum value in the array. *s* is the position of the (possibly new) index in the array. A negative value means that the index is already in the array. */ double tag = t[], * ap = (double *) a->p; long s = -1; if (a->len == 0 || tag > ap[a->len/sizeof(double) - 1]) s = a->len/sizeof(double); else if (tag < ap[0]) s = 0; else { /** We find the range of existing indices [s-1:s] which contains the index. We check whether the index is already in the array. */ s = lookup_tag (a, tag) + 1; if (tag == ap[s - 1] || tag == ap[s]) s = -1; } if (s >= 0) { /** If the index is new, we add it to the array in the correct position (s). */ array_append (a, &tag, sizeof(double)), ap = (double *) a->p; for (int i = a->len/sizeof(double) - 1; i > s; i--) ap[i] = ap[i-1]; ap[s] = tag; } } /** ## Parallel reduction Each process now has its own local correspondence map between the neighborhood index and its rank in the array *a* (i.e. its reduced index). In parallel, we now need to build a global correspondence map. We first get the maximum size over all processes of the local map and increase the size of the local map to this value. */ #if _MPI long lmax = a->len; mpi_all_reduce (lmax, MPI_LONG, MPI_MAX); a->p = realloc (a->p, lmax); lmax /= sizeof(double); /** We then gather all the local maps into a global map and sort it. All local arrays need to be of the same size to be able to use MPI_Allgather(), so we first pad the arrays with -1. */ double * q = a->p; for (int i = a->len/sizeof(double); i < lmax; i++) q[i] = -1; double p[lmax*npe()]; MPI_Allgather (a->p, lmax, MPI_DOUBLE, p, lmax, MPI_DOUBLE, MPI_COMM_WORLD); qsort (p, lmax*npe(), sizeof(double), compar_double); /** This sorted global map will (probably) contain duplicated entries (i.e. indices of neighborhoods which span multiple processes). To build the new global map (stored in *a*), we eliminate these, as well as the negative indices which were used to pad the local arrays. */ array_free (a); a = array_new(); double last = -1; for (int i = 0; i < lmax*npe(); i++) if (p[i] != last) { array_append (a, &p[i], sizeof(double)); last = p[i]; } #endif /** Once we have the (global) map, we can replace the neighborhood indices with their index in the global map (+1). */ foreach() if (t[] > 0) t[] = lookup_tag (a, t[]) + 1; boundary ({t}); /** We return the maximum index value. */ int n = a->len/sizeof(double); array_free (a); return n; }