Basilisk source code (http://basilisk.fr/src/)

root / src / tag.h.page

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/**
# Tagging connected neighborhoods

The goal is to associate a unique (strictly positive) index ("tag")
*t* to cells which belong to the same "neighborhood". All cells in a
neighborhood have an initial tag value (given by the user) which is
non-zero and are separated from cells in other neighborhoods by cells
which have an initial (and final) tag value of zero.

We first define the restriction function for tag values. The parent
tag is just the minimum of its children's tags. */

static void restriction_tag (Point point, scalar t)
{
  double min = HUGE;
  foreach_child()
    if (t[] < min)
      min = t[];
  t[] = min;
}

/**
We also need a few helper functions. The function below implements a
[binary search](https://en.wikipedia.org/wiki/Binary_search_algorithm)
of a sorted array. It returns the index in the array so that $a[s] \leq
tag < a[s+1]$. */

static long lookup_tag (Array * a, double tag)
{
  long len = a->len/sizeof(double);
  double * p = (double *) a->p;
  if (tag == p[0])
    return 0;
  if (tag == p[len - 1])
    return len - 1;

  long s = 0, e = len - 1;
  while (s < e - 1) {
    long m = (s + e)/2;
    if (p[m] <= tag)
      s = m;
    else
      e = m;
  }
  return s;
}

#if _MPI
static int compar_double (const void * p1, const void * p2)
{
  const double * a = p1, * b = p2;
  return *a > *b;
}
#endif

/**
The function just takes the scalar field *t* which holds the initial
and final tag values. It returns the maximum neighborhood tag value
(which is also the number of neighborhoods). */

int tag (scalar t)
{

  /**
  We first set the restriction and prolongation functions (on trees). */

  t.restriction = restriction_tag;
#if TREE  
  t.refine = t.prolongation = refine_injection;
#endif

  /**
  As an initial guess, we set all the (leaf) cells which have a
  non-zero initial tag value to the Z- (or Morton-) index. We thus
  have a different "neighborhood" for each cell which has a non-zero
  initial tag, and a single neighborhood (tagged zero) for all the
  cells which have a zero initial tag. */
  
#if _MPI
  scalar index[];
  z_indexing (index, true);
  foreach()
    t[] = (t[] != 0)*(index[] + 1);
#else // !_MPI
  long i = 1;
  foreach_cell()
    if (is_leaf(cell)) {
      t[] = (t[] != 0)*i++;
      continue;
    }
#endif // !_MPI
  boundary ({t});

  /**
  To gather cells which belong to the same neighborhood, we repeat
  multigrid iterations until none of the tag values changes. */
  
  bool changed;
  do {

    /**
    We first do a restriction from the finest to the coarsest level of
    the multigrid hierarchy, using the `restriction_tag()` function
    above. */
    
    restriction ({t});

    /**
    We then go from the coarsest to the finest level and update the
    tag values. */

    changed = false;
    for (int l = 1; l <= grid->maxdepth; l++) {

      /**
      If the parent tag is non-zero, we set the child tag to the value
      of its parent (i.e. to the minimum tag value of its
      siblings). */
      
      foreach_level(l)
	if (coarse(t))
	  t[] = coarse(t);
      boundary_level ({t}, l);

      /**
      For cells which verify the threshold condition (i.e. for which
      `t[] != 0`), we refine this initial guess by taking the minimum
      (non-zero) tag value of its closest neighbors. We also track
      whether this update changes any of the tag values. */
      
      foreach_level (l, reduction(max:changed))
        if (t[]) {
	  double min = t[];
	  foreach_neighbor(1)
	    if (t[] && t[] < min)
	      min = t[];

	  /**
	  On trees, we need to take into account the minimum tag value
	  of neighboring fine cells. */
	  
#if TREE
	  foreach_dimension()
	    for (int i = -1; i <= 2; i += 3)
	      if (is_refined (neighbor((2*i - 1)/3)))
		for (int j = 0; j <= 1; j++)
		  for (int k = 0; k <= 1; k++)
		    if (fine(t,i,j,k) && fine(t,i,j,k) < min)
		      min = fine(t,i,j,k);
#endif // TREE

	  if (t[] != min) {
	    changed = true;
	    t[] = min;
	  }
	}
      boundary_level ({t}, l);
    }
  } while (changed);

  /**
  ## Reducing the range of indices

  Each neighborhood is now tagged with a unique index. The range of
  indices is large however (between one and the total number of
  leaves). The goal of this step is to reduce this range to between
  one and the number of neighborhoods. To do this, we create an ordered
  array of unique indices. */

  Array * a = array_new();
  foreach_leaf()
    if (t[] > 0) {
  
      /**
      We first check whether the index is larger than the maximum or
      smaller than the minimum value in the array. *s* is the
      position of the (possibly new) index in the array. A negative
      value means that the index is already in the array.  */
  
      double tag = t[], * ap = (double *) a->p;
      long s = -1;
      if (a->len == 0 || tag > ap[a->len/sizeof(double) - 1])
	s = a->len/sizeof(double);
      else if (tag < ap[0])
	s = 0;
      else {
	
	/**
	We find the range of existing indices [s-1:s] which contains
	the index. We check whether the index is already in the
	array. */
	
	s = lookup_tag (a, tag) + 1;
	if (tag == ap[s - 1] || tag == ap[s])
	  s = -1;
      }
      if (s >= 0) {

	/**
	If the index is new, we add it to the array in the correct
	position (s). */
	
	array_append (a, &tag, sizeof(double)), ap = (double *) a->p;
	for (int i = a->len/sizeof(double) - 1; i > s; i--)
	  ap[i] = ap[i-1];
	ap[s] = tag;
      }
    }

  /**
  ## Parallel reduction

  Each process now has its own local correspondence map between the
  neighborhood index and its rank in the array *a* (i.e. its reduced
  index). In parallel, we now need to build a global correspondence map. 

  We first get the maximum size over all processes of the local map
  and increase the size of the local map to this value. */

#if _MPI
  long lmax = a->len;
  mpi_all_reduce (lmax, MPI_LONG, MPI_MAX);
  a->p = realloc (a->p, lmax);
  lmax /= sizeof(double);

  /**
  We then gather all the local maps into a global map and sort it. All
  local arrays need to be of the same size to be able to use
  MPI_Allgather(), so we first pad the arrays with -1. */
  
  double * q = a->p;
  for (int i = a->len/sizeof(double); i < lmax; i++)
    q[i] = -1;
  double p[lmax*npe()];
  MPI_Allgather (a->p, lmax, MPI_DOUBLE, p, lmax, MPI_DOUBLE, MPI_COMM_WORLD);
  qsort (p, lmax*npe(), sizeof(double), compar_double);

  /**
  This sorted global map will (probably) contain duplicated entries
  (i.e. indices of neighborhoods which span multiple processes). To
  build the new global map (stored in *a*), we eliminate these, as well
  as the negative indices which were used to pad the local arrays. */
  
  array_free (a);
  a = array_new();
  double last = -1;
  for (int i = 0; i < lmax*npe(); i++)
    if (p[i] != last) {
      array_append (a, &p[i], sizeof(double));
      last = p[i];
    }
#endif

  /**
  Once we have the (global) map, we can replace the neighborhood
  indices with their index in the global map (+1). */
  
  foreach()
    if (t[] > 0)
      t[] = lookup_tag (a, t[]) + 1;
  boundary ({t});

  /**
  We return the maximum index value. */
  
  int n = a->len/sizeof(double);
  array_free (a);
  return n;
}