Wildmeshing Toolkit
Loading...
Searching...
No Matches
ExecutionScheduler.hpp
1#pragma once
2
3#include "wmtk/TetMesh.h"
4#include "wmtk/TriMesh.h"
5#include "wmtk/utils/Logger.hpp"
6
7// clang-format off
8#include <functional>
9#include <limits>
10#include <wmtk/utils/DisableWarnings.hpp>
11#include <tbb/concurrent_priority_queue.h>
12#include <tbb/concurrent_queue.h>
13#include <tbb/parallel_for.h>
14#include <tbb/parallel_reduce.h>
15#include <tbb/spin_mutex.h>
16#include <tbb/task_arena.h>
17#include <tbb/task_group.h>
18#include <wmtk/utils/EnableWarnings.hpp>
19// clang-format on
20
21#include <atomic>
22#include <cassert>
23#include <cstddef>
24#include <queue>
25#include <stdexcept>
26#include <type_traits>
27
28namespace wmtk {
29enum class ExecutionPolicy { kSeq, kUnSeq, kPartition, kColor, kMax };
30
31using Op = std::string;
32
33template <class AppMesh, ExecutionPolicy policy = ExecutionPolicy::kSeq>
35{
36 using Tuple = typename AppMesh::Tuple;
41 std::map<
42 Op, // strings
43 std::function<std::optional<std::vector<Tuple>>(AppMesh&, const Tuple&)>>
49 std::function<double(const AppMesh&, Op op, const Tuple&)> priority = [](auto&, auto, auto&) {
50 return 0.;
51 };
56 std::function<bool(double)> should_renew = [](auto) { return true; };
61 std::function<std::vector<std::pair<Op, Tuple>>(const AppMesh&, Op, const std::vector<Tuple>&)>
63 [](auto&, auto, auto&) -> std::vector<std::pair<Op, Tuple>> { return {}; };
68 std::function<bool(AppMesh&, const Tuple&, int task_id)> lock_vertices =
69 [](const AppMesh&, const Tuple&, int task_id) { return true; };
77 std::function<bool(const AppMesh&)> stopping_criterion = [](const AppMesh&) {
78 return false; // non-stop, process everything
79 };
84 size_t stopping_criterion_checking_frequency = std::numeric_limits<size_t>::max();
91 std::function<bool(const AppMesh&, const std::tuple<double, Op, Tuple>& t)>
92 is_weight_up_to_date = [](const AppMesh& m, const std::tuple<double, Op, Tuple>& t) {
93 // always do.
94 assert(std::get<2>(t).is_valid(m));
95 return true;
96 };
100 std::function<void(const AppMesh&, Op, const Tuple& t)> on_fail = [](auto&, auto, auto&) {};
101
102
103 int num_threads = 1;
104
109 size_t max_retry_limit = 10;
117 {
118 if constexpr (std::is_base_of<wmtk::TetMesh, AppMesh>::value) {
120 {"edge_collapse",
121 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
122 std::vector<Tuple> ret;
123 if (m.collapse_edge(t, ret))
124 return ret;
125 else
126 return {};
127 }},
128 {"edge_swap",
129 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
130 std::vector<Tuple> ret;
131 if (m.swap_edge(t, ret))
132 return ret;
133 else
134 return {};
135 }},
136 {"edge_swap_44",
137 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
138 std::vector<Tuple> ret;
139 if (m.swap_edge_44(t, ret))
140 return ret;
141 else
142 return {};
143 }},
144 {"edge_swap_56",
145 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
146 std::vector<Tuple> ret;
147 if (m.swap_edge_56(t, ret))
148 return ret;
149 else
150 return {};
151 }},
152 {"edge_split",
153 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
154 std::vector<Tuple> ret;
155 if (m.split_edge(t, ret))
156 return ret;
157 else
158 return {};
159 }},
160 {"face_swap",
161 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
162 std::vector<Tuple> ret;
163 if (m.swap_face(t, ret))
164 return ret;
165 else
166 return {};
167 }},
168 {"vertex_smooth",
169 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
170 if (m.smooth_vertex(t))
171 return std::vector<Tuple>{};
172 else
173 return {};
174 }},
175 {"face_split",
176 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
177 std::vector<Tuple> ret;
178 if (m.split_face(t, ret))
179 return ret;
180 else
181 return {};
182 }},
183 {"tet_split", [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
184 std::vector<Tuple> ret;
185 if (m.split_tet(t, ret))
186 return ret;
187 else
188 return {};
189 }}};
190 }
191 if constexpr (std::is_base_of<wmtk::TriMesh, AppMesh>::value) {
193 {"edge_collapse",
194 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
195 std::vector<Tuple> ret;
196 if (m.collapse_edge(t, ret))
197 return ret;
198 else
199 return {};
200 }},
201 {"edge_swap",
202 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
203 std::vector<Tuple> ret;
204 if (m.swap_edge(t, ret))
205 return ret;
206 else
207 return {};
208 }},
209 {"edge_split",
210 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
211 std::vector<Tuple> ret;
212 if (m.split_edge(t, ret))
213 return ret;
214 else
215 return {};
216 }},
217 {"vertex_smooth",
218 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
219 if (m.smooth_vertex(t))
220 return std::vector<Tuple>{};
221 else
222 return {};
223 }},
224 {"face_split", [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
225 std::vector<Tuple> ret;
226 if (m.split_face(t, ret))
227 return ret;
228 else
229 return {};
230 }}};
231 }
232 };
233
234 ExecutePass(ExecutePass&) = delete;
235
236private:
237 void operation_cleanup(AppMesh& m)
238 { //
239 // class ResourceManger
240 // what about RAII mesh edit locking?
241 // release mutex, but this should be implemented in TetMesh class.
242 if constexpr (policy == ExecutionPolicy::kSeq)
243 return;
244 else {
245 m.release_vertex_mutex_in_stack();
246 }
247 }
248
249 size_t get_partition_id(const AppMesh& m, const Tuple& e)
250 {
251 if constexpr (policy == ExecutionPolicy::kSeq) {
252 return 0;
253 }
254 return m.get_partition_id(e);
255 }
256
257public:
266 bool operator()(AppMesh& m, const std::vector<std::pair<Op, Tuple>>& operation_tuples)
267 {
268 using Elem = std::tuple<double, Op, Tuple, size_t>; // priority, operation, tuple, #retries
269 using Queue = tbb::concurrent_priority_queue<Elem>;
270
271 auto stop = std::atomic<bool>(false);
272 cnt_success = 0;
273 cnt_fail = 0;
274 cnt_update = 0;
275
276 std::vector<Queue> queues(num_threads);
277 Queue final_queue;
278
279 auto run_single_queue = [&](Queue& Q, int task_id) {
280 Elem ele_in_queue;
281 while ([&]() { return Q.try_pop(ele_in_queue); }()) {
282 auto& [weight, op, tup, retry] = ele_in_queue;
283 if (!tup.is_valid(m)) {
284 continue;
285 }
286
287 std::vector<Elem> renewed_elements;
288 {
289 auto locked_vid = lock_vertices(
290 m,
291 tup,
292 task_id); // Note that returning `Tuples` would be invalid.
293 if (!locked_vid) {
294 retry++;
295 if (retry < max_retry_limit) {
296 Q.emplace(ele_in_queue);
297 } else {
298 retry = 0;
299 final_queue.emplace(ele_in_queue);
300 }
301 continue;
302 }
303 if (tup.is_valid(m)) {
305 m,
306 std::tuple<double, Op, Tuple>(weight, op, tup))) {
307 operation_cleanup(m);
308 continue;
309 } // this can encode, in qslim, recompute(energy) == weight.
310 auto newtup = edit_operation_maps[op](m, tup);
311 std::vector<std::pair<Op, Tuple>> renewed_tuples;
312 if (newtup) {
313 renewed_tuples = renew_neighbor_tuples(m, op, newtup.value());
314 cnt_success++;
315 cnt_update++;
316 } else {
317 on_fail(m, op, tup);
318 cnt_fail++;
319 }
320 for (const auto& [o, e] : renewed_tuples) {
321 auto val = priority(m, o, e);
322 if (should_renew(val)) {
323 renewed_elements.emplace_back(val, o, e, 0);
324 }
325 }
326 }
327 operation_cleanup(m); // Maybe use RAII
328 }
329 for (auto& e : renewed_elements) {
330 Q.emplace(e);
331 }
332
333 if (stop.load(std::memory_order_acquire)) return;
334 if (cnt_success > stopping_criterion_checking_frequency) {
335 if (stopping_criterion(m)) {
336 stop.store(true);
337 return;
338 }
339 cnt_update.store(0, std::memory_order_release);
340 }
341 }
342 };
343
344 if constexpr (policy == ExecutionPolicy::kSeq) {
345 for (auto& [op, e] : operation_tuples) {
346 if (!e.is_valid(m)) continue;
347 final_queue.emplace(priority(m, op, e), op, e, 0);
348 }
349 run_single_queue(final_queue, 0);
350 } else {
351 for (auto& [op, e] : operation_tuples) {
352 if (!e.is_valid(m)) continue;
353 queues[get_partition_id(m, e)].emplace(priority(m, op, e), op, e, 0);
354 }
355 // Comment out parallel: work on serial first.
356 tbb::task_arena arena(num_threads);
357 tbb::task_group tg;
358 arena.execute([&queues, &run_single_queue, &tg]() {
359 for (int task_id = 0; task_id < queues.size(); task_id++) {
360 tg.run([&run_single_queue, &queues, task_id] {
361 run_single_queue(queues[task_id], task_id);
362 });
363 }
364 tg.wait();
365 });
366 logger().debug("Parallel Complete, remains element {}", final_queue.size());
367 run_single_queue(final_queue, 0);
368 }
369
370 logger().info(
371 "executed: {} | success / fail: {} / {}",
372 (int)cnt_success + (int)cnt_fail,
373 (int)cnt_success,
374 (int)cnt_fail);
375 return true;
376 }
377
378 int get_cnt_success() const { return cnt_success; }
379 int get_cnt_fail() const { return cnt_fail; }
380
381private:
382 std::atomic_int cnt_update = 0;
383 std::atomic_int cnt_success = 0;
384 std::atomic_int cnt_fail = 0;
385};
386} // namespace wmtk
Definition ExecutionScheduler.hpp:35
size_t max_retry_limit
Definition ExecutionScheduler.hpp:109
std::function< void(const AppMesh &, Op, const Tuple &t)> on_fail
used to collect operations that are not finished and used for later re-execution
Definition ExecutionScheduler.hpp:100
size_t stopping_criterion_checking_frequency
checking frequency to decide whether to stop execution given the stopping criterion
Definition ExecutionScheduler.hpp:84
std::function< std::vector< std::pair< Op, Tuple > >(const AppMesh &, Op, const std::vector< Tuple > &)> renew_neighbor_tuples
renew neighboring Tuples after each operation depends on the operation
Definition ExecutionScheduler.hpp:62
bool operator()(AppMesh &m, const std::vector< std::pair< Op, Tuple > > &operation_tuples)
Executes the operations for an application when the lambda function is invoked. The rules that are cu...
Definition ExecutionScheduler.hpp:266
std::function< bool(double)> should_renew
check on wheather new operations should be added to the priority queue
Definition ExecutionScheduler.hpp:56
std::function< bool(AppMesh &, const Tuple &, int task_id)> lock_vertices
lock the vertices concerned depends on the operation
Definition ExecutionScheduler.hpp:68
std::function< double(const AppMesh &, Op op, const Tuple &)> priority
Priority function (default to edge length)
Definition ExecutionScheduler.hpp:49
std::function< bool(const AppMesh &, const std::tuple< double, Op, Tuple > &t)> is_weight_up_to_date
Should Process drops some Tuple from being processed. For example, if the energy is out-dated....
Definition ExecutionScheduler.hpp:92
std::function< bool(const AppMesh &)> stopping_criterion
Stopping Criterion based on the whole mesh For efficiency, not every time is checked....
Definition ExecutionScheduler.hpp:77
std::map< Op, std::function< std::optional< std::vector< Tuple > >(AppMesh &, const Tuple &)> > edit_operation_maps
A dictionary that registers names with operations.
Definition ExecutionScheduler.hpp:44
ExecutePass()
Construct a new Execute Pass object. It contains the name-to-operation map and the functions that def...
Definition ExecutionScheduler.hpp:116