Wildmeshing Toolkit
Loading...
Searching...
No Matches
ExecutionScheduler.hpp
1#pragma once
2
3#include "wmtk/TetMesh.h"
4#include "wmtk/TriMesh.h"
5#include "wmtk/utils/Logger.hpp"
6
7// clang-format off
8#include <functional>
9#include <limits>
10#include <wmtk/utils/DisableWarnings.hpp>
11#include <tbb/concurrent_priority_queue.h>
12#include <tbb/concurrent_queue.h>
13#include <tbb/parallel_for.h>
14#include <tbb/parallel_reduce.h>
15#include <tbb/spin_mutex.h>
16#include <tbb/task_arena.h>
17#include <tbb/task_group.h>
18#include <wmtk/utils/EnableWarnings.hpp>
19// clang-format on
20
21#include <atomic>
22#include <cassert>
23#include <cstddef>
24#include <queue>
25#include <stdexcept>
26#include <type_traits>
27
28namespace wmtk {
29enum class ExecutionPolicy { kSeq, kUnSeq, kPartition, kColor, kMax };
30
31using Op = std::string;
32
33template <class AppMesh, ExecutionPolicy policy = ExecutionPolicy::kSeq>
35{
36 using Tuple = typename AppMesh::Tuple;
41 std::map<
42 Op, // strings
43 std::function<std::optional<std::vector<Tuple>>(AppMesh&, const Tuple&)>>
49 std::function<double(const AppMesh&, Op op, const Tuple&)> priority = [](auto&, auto, auto&) {
50 return 0.;
51 };
56 std::function<bool(double)> should_renew = [](auto) { return true; };
61 std::function<std::vector<std::pair<Op, Tuple>>(const AppMesh&, Op, const std::vector<Tuple>&)>
63 [](auto&, auto, auto&) -> std::vector<std::pair<Op, Tuple>> { return {}; };
68 std::function<bool(AppMesh&, const Tuple&, int task_id)> lock_vertices =
69 [](const AppMesh&, const Tuple&, int task_id) { return true; };
77 std::function<bool(const AppMesh&)> stopping_criterion = [](const AppMesh&) {
78 return false; // non-stop, process everything
79 };
84 size_t stopping_criterion_checking_frequency = std::numeric_limits<size_t>::max();
91 std::function<bool(const AppMesh&, const std::tuple<double, Op, Tuple>& t)>
92 is_weight_up_to_date = [](const AppMesh& m, const std::tuple<double, Op, Tuple>& t) {
93 // always do.
94 assert(std::get<2>(t).is_valid(m));
95 return true;
96 };
100 std::function<void(const AppMesh&, Op, const Tuple& t)> on_fail = [](auto&, auto, auto&) {};
101
102
103 int num_threads = 1;
104
109 size_t max_retry_limit = 10;
117 {
118 if constexpr (std::is_base_of<wmtk::TetMesh, AppMesh>::value) {
120 {"edge_collapse",
121 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
122 std::vector<Tuple> ret;
123 if (m.collapse_edge(t, ret))
124 return ret;
125 else
126 return {};
127 }},
128 {"edge_swap",
129 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
130 std::vector<Tuple> ret;
131 if (m.swap_edge(t, ret))
132 return ret;
133 else
134 return {};
135 }},
136 {"edge_swap_44",
137 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
138 std::vector<Tuple> ret;
139 if (m.swap_edge_44(t, ret))
140 return ret;
141 else
142 return {};
143 }},
144 {"edge_swap_56",
145 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
146 std::vector<Tuple> ret;
147 if (m.swap_edge_56(t, ret))
148 return ret;
149 else
150 return {};
151 }},
152 {"edge_split",
153 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
154 std::vector<Tuple> ret;
155 if (m.split_edge(t, ret))
156 return ret;
157 else
158 return {};
159 }},
160 {"face_swap",
161 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
162 std::vector<Tuple> ret;
163 if (m.swap_face(t, ret))
164 return ret;
165 else
166 return {};
167 }},
168 {"vertex_smooth",
169 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
170 if (m.smooth_vertex(t))
171 return std::vector<Tuple>{};
172 else
173 return {};
174 }},
175 {"face_split",
176 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
177 std::vector<Tuple> ret;
178 if (m.split_face(t, ret))
179 return ret;
180 else
181 return {};
182 }},
183 {"tet_split", [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
184 std::vector<Tuple> ret;
185 if (m.split_tet(t, ret))
186 return ret;
187 else
188 return {};
189 }}};
190 }
191 if constexpr (std::is_base_of<wmtk::TriMesh, AppMesh>::value) {
193 {"edge_collapse",
194 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
195 std::vector<Tuple> ret;
196 if (m.collapse_edge(t, ret))
197 return ret;
198 else
199 return {};
200 }},
201 {"edge_swap",
202 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
203 std::vector<Tuple> ret;
204 if (m.swap_edge(t, ret))
205 return ret;
206 else
207 return {};
208 }},
209 {"edge_split",
210 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
211 std::vector<Tuple> ret;
212 if (m.split_edge(t, ret))
213 return ret;
214 else
215 return {};
216 }},
217 {"vertex_smooth",
218 [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
219 if (m.smooth_vertex(t))
220 return std::vector<Tuple>{};
221 else
222 return {};
223 }},
224 {"face_split", [](AppMesh& m, const Tuple& t) -> std::optional<std::vector<Tuple>> {
225 std::vector<Tuple> ret;
226 if (m.split_face(t, ret))
227 return ret;
228 else
229 return {};
230 }}};
231 }
232 };
233
234 ExecutePass(ExecutePass&) = delete;
235
236private:
237 void operation_cleanup(AppMesh& m)
238 { //
239 // class ResourceManger
240 // what about RAII mesh edit locking?
241 // release mutex, but this should be implemented in TetMesh class.
242 if constexpr (policy == ExecutionPolicy::kSeq)
243 return;
244 else {
245 m.release_vertex_mutex_in_stack();
246 }
247 }
248
249 size_t get_partition_id(const AppMesh& m, const Tuple& e)
250 {
251 if constexpr (policy == ExecutionPolicy::kSeq) return 0;
252 if constexpr (std::is_base_of<wmtk::TetMesh, AppMesh>::value)
253 return m.get_partition_id(e);
254 else if constexpr (std::is_base_of<wmtk::TriMesh, AppMesh>::value) // TODO: make same
255 // interface.
256 return m.vertex_attrs[e.vid(m)].partition_id; // TODO: this is temporary.
257 return 0;
258 }
259
260public:
269 bool operator()(AppMesh& m, const std::vector<std::pair<Op, Tuple>>& operation_tuples)
270 {
271 using Elem = std::tuple<double, Op, Tuple, size_t>; // priority, operation, tuple, #retries
272 using Queue = tbb::concurrent_priority_queue<Elem>;
273
274 auto stop = std::atomic<bool>(false);
275 cnt_success = 0;
276 cnt_fail = 0;
277 cnt_update = 0;
278
279 std::vector<Queue> queues(num_threads);
280 Queue final_queue;
281
282 auto run_single_queue = [&](Queue& Q, int task_id) {
283 Elem ele_in_queue;
284 while ([&]() { return Q.try_pop(ele_in_queue); }()) {
285 auto& [weight, op, tup, retry] = ele_in_queue;
286 if (!tup.is_valid(m)) {
287 continue;
288 }
289
290 std::vector<Elem> renewed_elements;
291 {
292 auto locked_vid = lock_vertices(
293 m,
294 tup,
295 task_id); // Note that returning `Tuples` would be invalid.
296 if (!locked_vid) {
297 retry++;
298 if (retry < max_retry_limit) {
299 Q.emplace(ele_in_queue);
300 } else {
301 retry = 0;
302 final_queue.emplace(ele_in_queue);
303 }
304 continue;
305 }
306 if (tup.is_valid(m)) {
308 m,
309 std::tuple<double, Op, Tuple>(weight, op, tup))) {
310 operation_cleanup(m);
311 continue;
312 } // this can encode, in qslim, recompute(energy) == weight.
313 auto newtup = edit_operation_maps[op](m, tup);
314 std::vector<std::pair<Op, Tuple>> renewed_tuples;
315 if (newtup) {
316 renewed_tuples = renew_neighbor_tuples(m, op, newtup.value());
317 cnt_success++;
318 cnt_update++;
319 } else {
320 on_fail(m, op, tup);
321 cnt_fail++;
322 }
323 for (const auto& [o, e] : renewed_tuples) {
324 auto val = priority(m, o, e);
325 if (should_renew(val)) {
326 renewed_elements.emplace_back(val, o, e, 0);
327 }
328 }
329 }
330 operation_cleanup(m); // Maybe use RAII
331 }
332 for (auto& e : renewed_elements) {
333 Q.emplace(e);
334 }
335
336 if (stop.load(std::memory_order_acquire)) return;
337 if (cnt_success > stopping_criterion_checking_frequency) {
338 if (stopping_criterion(m)) {
339 stop.store(true);
340 return;
341 }
342 cnt_update.store(0, std::memory_order_release);
343 }
344 }
345 };
346
347 if constexpr (policy == ExecutionPolicy::kSeq) {
348 for (auto& [op, e] : operation_tuples) {
349 if (!e.is_valid(m)) continue;
350 final_queue.emplace(priority(m, op, e), op, e, 0);
351 }
352 run_single_queue(final_queue, 0);
353 } else {
354 for (auto& [op, e] : operation_tuples) {
355 if (!e.is_valid(m)) continue;
356 queues[get_partition_id(m, e)].emplace(priority(m, op, e), op, e, 0);
357 }
358 // Comment out parallel: work on serial first.
359 tbb::task_arena arena(num_threads);
360 tbb::task_group tg;
361 arena.execute([&queues, &run_single_queue, &tg]() {
362 for (int task_id = 0; task_id < queues.size(); task_id++) {
363 tg.run([&run_single_queue, &queues, task_id] {
364 run_single_queue(queues[task_id], task_id);
365 });
366 }
367 tg.wait();
368 });
369 logger().debug("Parallel Complete, remains element {}", final_queue.size());
370 run_single_queue(final_queue, 0);
371 }
372
373 logger().info(
374 "executed: {} | success / fail: {} / {}",
375 (int)cnt_success + (int)cnt_fail,
376 (int)cnt_success,
377 (int)cnt_fail);
378 return true;
379 }
380
381 int get_cnt_success() const { return cnt_success; }
382 int get_cnt_fail() const { return cnt_fail; }
383
384private:
385 std::atomic_int cnt_update = 0;
386 std::atomic_int cnt_success = 0;
387 std::atomic_int cnt_fail = 0;
388};
389} // namespace wmtk
Definition ExecutionScheduler.hpp:35
size_t max_retry_limit
Definition ExecutionScheduler.hpp:109
std::function< void(const AppMesh &, Op, const Tuple &t)> on_fail
used to collect operations that are not finished and used for later re-execution
Definition ExecutionScheduler.hpp:100
size_t stopping_criterion_checking_frequency
checking frequency to decide whether to stop execution given the stopping criterion
Definition ExecutionScheduler.hpp:84
std::function< std::vector< std::pair< Op, Tuple > >(const AppMesh &, Op, const std::vector< Tuple > &)> renew_neighbor_tuples
renew neighboring Tuples after each operation depends on the operation
Definition ExecutionScheduler.hpp:62
bool operator()(AppMesh &m, const std::vector< std::pair< Op, Tuple > > &operation_tuples)
Executes the operations for an application when the lambda function is invoked. The rules that are cu...
Definition ExecutionScheduler.hpp:269
std::function< bool(double)> should_renew
check on wheather new operations should be added to the priority queue
Definition ExecutionScheduler.hpp:56
std::function< bool(AppMesh &, const Tuple &, int task_id)> lock_vertices
lock the vertices concerned depends on the operation
Definition ExecutionScheduler.hpp:68
std::function< double(const AppMesh &, Op op, const Tuple &)> priority
Priority function (default to edge length)
Definition ExecutionScheduler.hpp:49
std::function< bool(const AppMesh &, const std::tuple< double, Op, Tuple > &t)> is_weight_up_to_date
Should Process drops some Tuple from being processed. For example, if the energy is out-dated....
Definition ExecutionScheduler.hpp:92
std::function< bool(const AppMesh &)> stopping_criterion
Stopping Criterion based on the whole mesh For efficiency, not every time is checked....
Definition ExecutionScheduler.hpp:77
std::map< Op, std::function< std::optional< std::vector< Tuple > >(AppMesh &, const Tuple &)> > edit_operation_maps
A dictionary that registers names with operations.
Definition ExecutionScheduler.hpp:44
ExecutePass()
Construct a new Execute Pass object. It contains the name-to-operation map and the functions that def...
Definition ExecutionScheduler.hpp:116