3#include "wmtk/TetMesh.h"
4#include "wmtk/TriMesh.h"
5#include "wmtk/utils/Logger.hpp"
10#include <wmtk/utils/DisableWarnings.hpp>
11#include <tbb/concurrent_priority_queue.h>
12#include <tbb/concurrent_queue.h>
13#include <tbb/parallel_for.h>
14#include <tbb/parallel_reduce.h>
15#include <tbb/spin_mutex.h>
16#include <tbb/task_arena.h>
17#include <tbb/task_group.h>
18#include <wmtk/utils/EnableWarnings.hpp>
29enum class ExecutionPolicy { kSeq, kUnSeq, kPartition, kColor, kMax };
31using Op = std::string;
33template <
class AppMesh>
36 using Tuple =
typename AppMesh::Tuple;
43 std::function<std::optional<std::vector<Tuple>>(AppMesh&,
const Tuple&)>>
49 std::function<double(
const AppMesh&, Op op,
const Tuple&)>
priority =
50 [](
const AppMesh&, Op,
const Tuple&) {
return 0.; };
55 std::function<bool(
double)>
should_renew = [](double) {
return true; };
60 std::function<std::vector<std::pair<Op, Tuple>>(
const AppMesh&, Op,
const std::vector<Tuple>&)>
62 [](
const AppMesh&, Op,
const std::vector<Tuple>&) -> std::vector<std::pair<Op, Tuple>> {
69 std::function<bool(AppMesh&,
const Tuple&,
int task_id)>
lock_vertices =
70 [](
const AppMesh&,
const Tuple&,
int task_id) {
return true; };
92 std::function<bool(
const AppMesh&,
const std::tuple<double, Op, Tuple>& t)>
95 assert(std::get<2>(t).is_valid(m));
101 std::function<void(
const AppMesh&, Op,
const Tuple& t)>
on_fail =
102 [](
const AppMesh&, Op,
const Tuple& t) {};
104 ExecutionPolicy policy;
119 ExecutePass(
const ExecutionPolicy& policy_ = ExecutionPolicy::kSeq)
122 if constexpr (std::is_base_of<TetMesh, AppMesh>::value) {
125 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
126 std::vector<Tuple> ret;
127 if (m.collapse_edge(t, ret))
133 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
134 std::vector<Tuple> ret;
135 if (m.swap_edge(t, ret))
141 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
142 std::vector<Tuple> ret;
143 if (m.swap_edge_44(t, ret))
149 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
150 std::vector<Tuple> ret;
151 if (m.swap_edge_56(t, ret))
157 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
158 std::vector<Tuple> ret;
159 if (m.split_edge(t, ret))
165 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
166 std::vector<Tuple> ret;
167 if (m.swap_face(t, ret))
173 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
174 if (m.smooth_vertex(t))
175 return std::vector<Tuple>{};
180 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
181 std::vector<Tuple> ret;
182 if (m.split_face(t, ret))
187 {
"tet_split", [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
188 std::vector<Tuple> ret;
189 if (m.split_tet(t, ret))
195 if constexpr (std::is_base_of<TriMesh, AppMesh>::value) {
198 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
199 std::vector<Tuple> ret;
200 if (m.collapse_edge(t, ret))
206 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
207 std::vector<Tuple> ret;
208 if (m.swap_edge(t, ret))
214 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
215 std::vector<Tuple> ret;
216 if (m.split_edge(t, ret))
222 [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
223 if (m.smooth_vertex(t))
224 return std::vector<Tuple>{};
228 {
"face_split", [](AppMesh& m,
const Tuple& t) -> std::optional<std::vector<Tuple>> {
229 std::vector<Tuple> ret;
230 if (m.split_face(t, ret))
241 void operation_cleanup(AppMesh& m)
246 if (policy == ExecutionPolicy::kSeq)
249 m.release_vertex_mutex_in_stack();
253 size_t get_partition_id(
const AppMesh& m,
const Tuple& e)
255 if (policy == ExecutionPolicy::kSeq) {
258 return m.get_partition_id(e);
270 bool operator()(AppMesh& m,
const std::vector<std::pair<Op, Tuple>>& operation_tuples)
272 using Elem = std::tuple<double, Op, Tuple, size_t>;
273 using Queue = tbb::concurrent_priority_queue<Elem>;
275 std::atomic<bool> stop(
false);
280 std::vector<Queue> queues(num_threads);
283 auto run_single_queue = [&](Queue& Q,
int task_id) {
285 while ([&]() {
return Q.try_pop(ele_in_queue); }()) {
286 auto& [weight, op, tup, retry] = ele_in_queue;
287 if (!tup.is_valid(m)) {
291 std::vector<Elem> renewed_elements;
300 Q.emplace(ele_in_queue);
303 final_queue.emplace(ele_in_queue);
307 if (tup.is_valid(m)) {
310 std::tuple<double, Op, Tuple>(weight, op, tup))) {
311 operation_cleanup(m);
315 std::vector<std::pair<Op, Tuple>> renewed_tuples;
324 for (
const auto& [o, e] : renewed_tuples) {
327 renewed_elements.emplace_back(val, o, e, 0);
331 operation_cleanup(m);
333 for (
auto& e : renewed_elements) {
337 if (stop.load(std::memory_order_acquire)) {
345 cnt_update.store(0, std::memory_order_release);
350 if (policy == ExecutionPolicy::kSeq) {
351 for (
const auto& [op, e] : operation_tuples) {
352 if (!e.is_valid(m)) {
355 final_queue.emplace(
priority(m, op, e), op, e, 0);
357 run_single_queue(final_queue, 0);
359 for (
const auto& [op, e] : operation_tuples) {
360 if (!e.is_valid(m)) {
363 queues[get_partition_id(m, e)].emplace(
priority(m, op, e), op, e, 0);
366 tbb::task_arena arena(num_threads);
368 arena.execute([&queues, &run_single_queue, &tg]() {
369 for (
int task_id = 0; task_id < queues.size(); task_id++) {
370 tg.run([&run_single_queue, &queues, task_id] {
371 run_single_queue(queues[task_id], task_id);
376 logger().debug(
"Parallel Complete, remains element {}", final_queue.size());
377 run_single_queue(final_queue, 0);
381 "executed: {} | success / fail: {} / {}",
382 (
int)cnt_success + (
int)cnt_fail,
388 int get_cnt_success()
const {
return cnt_success; }
389 int get_cnt_fail()
const {
return cnt_fail; }
392 std::atomic_int cnt_update = 0;
393 std::atomic_int cnt_success = 0;
394 std::atomic_int cnt_fail = 0;
Definition ExecutionScheduler.hpp:35
size_t max_retry_limit
Definition ExecutionScheduler.hpp:112
std::function< void(const AppMesh &, Op, const Tuple &t)> on_fail
used to collect operations that are not finished and used for later re-execution
Definition ExecutionScheduler.hpp:101
std::function< bool(const AppMesh &, const std::tuple< double, Op, Tuple > &t)> is_weight_up_to_date
Should Process drops some Tuple from being processed. For example, if the energy is out-dated....
Definition ExecutionScheduler.hpp:93
std::function< bool(AppMesh &, const Tuple &, int task_id)> lock_vertices
lock the vertices concerned depends on the operation
Definition ExecutionScheduler.hpp:69
std::function< std::vector< std::pair< Op, Tuple > >(const AppMesh &, Op, const std::vector< Tuple > &)> renew_neighbor_tuples
renew neighboring Tuples after each operation depends on the operation
Definition ExecutionScheduler.hpp:61
size_t stopping_criterion_checking_frequency
checking frequency to decide whether to stop execution given the stopping criterion
Definition ExecutionScheduler.hpp:85
std::function< double(const AppMesh &, Op op, const Tuple &)> priority
Priority function (default to edge length)
Definition ExecutionScheduler.hpp:49
ExecutePass(const ExecutionPolicy &policy_=ExecutionPolicy::kSeq)
Construct a new Execute Pass object. It contains the name-to-operation map and the functions that def...
Definition ExecutionScheduler.hpp:119
bool operator()(AppMesh &m, const std::vector< std::pair< Op, Tuple > > &operation_tuples)
Executes the operations for an application when the lambda function is invoked. The rules that are cu...
Definition ExecutionScheduler.hpp:270
std::function< bool(double)> should_renew
check on wheather new operations should be added to the priority queue
Definition ExecutionScheduler.hpp:55
std::map< Op, std::function< std::optional< std::vector< Tuple > >(AppMesh &, const Tuple &)> > edit_operation_maps
A dictionary that registers names with operations.
Definition ExecutionScheduler.hpp:44
std::function< bool(const AppMesh &)> stopping_criterion
Stopping Criterion based on the whole mesh For efficiency, not every time is checked....
Definition ExecutionScheduler.hpp:78