Honeycomb  0.1
Component-Model Framework
Util.h
Go to the documentation of this file.
1 // Honeycomb, Copyright (C) 2015 NewGamePlus Inc. Distributed under the Boost Software License v1.0.
2 #pragma once
3 
6 #include "Honey/Thread/Pool.h"
7 #include "Honey/Misc/Range.h"
8 
9 namespace honey { namespace future
10 {
11 
12 //====================================================
13 // waitAll / waitAny
14 //====================================================
15 
16 inline void waitAll() {} //dummy to catch empty parameter pack
18 template<class Future, class... Futures, typename mt::disable_if<mt::isRange<Future>::value, int>::type=0>
19 void waitAll(Future&& f, Futures&&... fs) { f.wait(); waitAll(forward<Futures>(fs)...); }
20 
22 template<class Range, class Clock, class Dur, typename std::enable_if<mt::isRange<Range>::value, int>::type=0>
23 void waitAll(Range&& range, TimePoint<Clock,Dur> time) { for (auto& e : range) e.wait(time); }
25 template<class Range, class Rep, class Period, typename std::enable_if<mt::isRange<Range>::value, int>::type=0>
26 void waitAll(Range&& range, Duration<Rep,Period> time) { return waitAll(forward<Range>(range), time == time.max() ? MonoClock::TimePoint::max() : MonoClock::now() + time); }
28 template<class Range, typename std::enable_if<mt::isRange<Range>::value, int>::type=0>
29 void waitAll(Range&& range) { return waitAll(forward<Range>(range), MonoClock::TimePoint::max()); }
30 
32 namespace priv
33 {
35  class waitAny : public mt::FuncptrBase
36  {
37  public:
38  waitAny();
39  ~waitAny();
40 
41  void add(const FutureBase& f);
42  int wait(MonoClock::TimePoint time);
43  void operator()(StateBase& src);
44 
45  private:
48  struct ThreadData
49  {
50  vector<StateBase*> states;
51  ConditionLock cond;
52  };
53  static mt_global((thread::Local<ThreadData>), threadData,);
54 
55  ThreadData& td;
56  StateBase* readyState;
57  };
58 }
61 template<class Future, class... Futures, typename mt::disable_if<mt::isRange<Future>::value, int>::type=0>
63 int waitAny(Future&& f, Futures&&... fs)
64 {
65  priv::waitAny waiter;
66  array<const FutureBase*, sizeof...(Futures)+1> futures = {&f, &fs...};
67  for (auto& e : futures) waiter.add(*e);
68  return waiter.wait(MonoClock::TimePoint::max());
69 }
70 
72 template<class Range, class Clock, class Dur, typename std::enable_if<mt::isRange<Range>::value, int>::type=0>
74 {
75  priv::waitAny waiter;
76  for (auto& e : range) waiter.add(e);
77  int index = waiter.wait(time);
78  return index >= 0 ? next(begin(range), index) : end(range);
79 }
81 template<class Range, class Rep, class Period, typename std::enable_if<mt::isRange<Range>::value, int>::type=0>
82 auto waitAny(Range&& range, Duration<Rep,Period> time) -> mt_iterOf(range) { return waitAny(forward<Range>(range), time == time.max() ? MonoClock::TimePoint::max() : MonoClock::now() + time); }
84 template<class Range, typename std::enable_if<mt::isRange<Range>::value, int>::type=0>
85 auto waitAny(Range&& range) -> mt_iterOf(range) { return waitAny(forward<Range>(range), MonoClock::TimePoint::max()); }
86 
87 //====================================================
88 // async
89 //====================================================
90 
92 namespace priv
93 {
94  template<class Func>
96  {
97  Task(Func&& f) : f(forward<Func>(f)) {}
98  virtual ~Task() {}
99 
100  void operator()() { f(); delete_(this); }
101 
102  Func f;
103 
104  protected:
105  virtual bool traceEnabled() const;
106  };
107 
108  //TODO: this is a work-around for clang's broken std::function, it can't handle move-only bind args
109  template<class R, class Func, class... Args>
110  struct Bind
111  {
112  Bind(Func&& f, Args&&... args) : f(forward<Func>(f)), args(forward<Args>(args)...) {}
113  //clang's std::function tries to copy, just move
114  Bind(const Bind& rhs) : Bind(move(const_cast<Bind&>(rhs))) {}
115  Bind(Bind&&) = default;
116 
117  R operator()() { return func(mt::make_idxseq<sizeof...(Args)>()); }
118  template<szt... Seq>
119  R func(mt::idxseq<Seq...>) { return f(forward<Args>(get<Seq>(args))...); }
120 
121  Func f;
122  tuple<Args...> args;
123  };
124 
125  template<class Func, class... Args>
126  struct Bind<void, Func, Args...>
127  {
128  Bind(Func&& f, Args&&... args) : f(forward<Func>(f)), args(forward<Args>(args)...) {}
129  Bind(const Bind& rhs) : Bind(move(const_cast<Bind&>(rhs))) {}
130  Bind(Bind&&) = default;
131 
132  void operator()() { func(mt::make_idxseq<sizeof...(Args)>()); }
133  template<szt... Seq>
134  void func(mt::idxseq<Seq...>) { f(forward<Args>(get<Seq>(args))...); }
135 
136  Func f;
137  tuple<Args...> args;
138  };
139 
140  template<class Func, class... Args, class R = typename std::result_of<Func(Args...)>::type>
141  Bind<R, Func, Args...> bind(Func&& f, Args&&... args) { return Bind<R, Func, Args...>(forward<Func>(f), forward<Args>(args)...); }
142 }
145 struct AsyncSched;
146 struct AsyncSched_tag {};
148 
150 {
151  AsyncSched(int workerCount, int workerTaskMax) : thread::Pool(workerCount, workerTaskMax) {}
152 
154 
155  template<class Func>
156  void operator()(Func&& f) { enqueue(*new priv::Task<Func>(forward<Func>(f))); }
157 
159  static bool trace;
160 };
161 
162 #ifndef future_async_createSingleton
163  inline AsyncSched* async_createSingleton() { return new AsyncSched(3, 5); }
165 #endif
166 
168 template<class Func>
169 bool priv::Task<Func>::traceEnabled() const { return AsyncSched::trace; }
172 template<class Sched, class Func, class... Args, typename std::enable_if<mt::is_base_of<AsyncSched_tag, Sched>::value, int>::type=0>
174 Future<typename std::result_of<Func(Args...)>::type>
175  async(Sched&& sched, Func&& f, Args&&... args)
176 {
177  typedef typename std::result_of<Func(Args...)>::type R;
178  PackagedTask<R()> task(priv::bind(forward<Func>(f), forward<Args>(args)...), SmallAllocator<int>());
179  auto future = task.future();
180  sched(move(task));
181  return future;
182 }
183 
185 
188 template<class Func, class... Args>
189 Future<typename std::result_of<Func(Args...)>::type>
190  async(Func&& f, Args&&... args) { return async(AsyncSched::inst(), forward<Func>(f), forward<Args>(args)...); }
191 
192 }
193 
194 //====================================================
195 // then
196 //====================================================
197 
198 template<class Subclass, class R>
199 template<class Sched, class Func>
201 {
202  using namespace future::priv;
203  if (!subc()._state) throw_ future::NoState();
204  typedef typename std::result_of<Func(Subclass)>::type R2;
205  Promise<R2> promise{SmallAllocator<int>()};
206  auto future = promise.future();
207 
208  struct onReady : mt::FuncptrBase, SmallAllocatorObject
209  {
210  onReady(Subclass&& cont, Promise<R2>&& promise, Sched&& sched, Func&& f) :
211  cont(move(cont)), promise(move(promise)), sched(forward<Sched>(sched)), f(forward<Func>(f)) {}
212 
213  void operator()(StateBase& src)
214  {
215  if (src.ready)
216  {
217  PackagedTask<R2()> task(future::priv::bind(forward<Func>(this->f), move(this->cont)),
218  SmallAllocator<int>(), move(this->promise));
219  this->sched(move(task));
220  }
221  delete_(this);
222  }
223 
224  Subclass cont;
225  Promise<R2> promise;
226  Sched sched;
227  Func f;
228  };
229 
230  subc()._state->addOnReady(*new onReady(move(subc()), move(promise), forward<Sched>(sched), forward<Func>(f)));
231  return future;
232 }
233 
234 template<class Subclass, class R>
235 template<class Func>
237  { return then(future::AsyncSched::inst(), forward<Func>(f)); }
238 
239 namespace future
240 {
241 
242 //====================================================
243 // whenAll / whenAny
244 //====================================================
245 
247 namespace priv
248 {
249  template<class Func, class Futures, szt... Seq>
250  void when_init(Func& func, Futures& fs, mt::idxseq<Seq...>)
251  { mt_unpackEval(get<Seq>(fs).__state().addOnReady(func)); }
252 
253  template<class Result>
254  struct whenAll_onReady
255  {
256  template<class Futures, szt... Seq>
257  static void func(Promise<Result>& promise, Futures& fs, mt::idxseq<Seq...>)
258  { promise.setValue(make_tuple(get<Seq>(fs).__state().result()...)); }
259  template<class Range>
260  static void func(Promise<Result>& promise, Range& range)
261  { Result res; for (auto& e : range) res.push_back(e.__state().result()); promise.setValue(res); }
262  };
263 
264  template<>
265  struct whenAll_onReady<void>
266  {
267  template<class Futures, szt... Seq>
268  static void func(Promise<void>& promise, Futures&, mt::idxseq<Seq...>)
269  { promise.setValue(); }
270  template<class Range>
271  static void func(Promise<void>& promise, Range&)
272  { promise.setValue(); }
273  };
274 
275  template<class Futures, szt... Seq>
276  int whenAny_valIndex(StateBase& src, Futures& fs, mt::idxseq<Seq...>)
277  { return mt::valIndex(&src, &get<Seq>(fs).__state()...); }
278  template<class Range>
279  int whenAny_valIndex(StateBase& src, Range& range) { int i = -1; return find(range, [&](auto& e) { return ++i, &src == &e.__state(); }) != end(range) ? i : -1; }
280 
281  template<class Result_>
282  struct whenAny_onReady
283  {
284  template<class Futures, szt... Seq, class Result = tuple<int, Result_>>
285  static void func(Promise<Result>& promise, Futures& fs, mt::idxseq<Seq...>, int i)
286  { promise.setValue(make_tuple(i, mt::valAt(i, get<Seq>(fs)...).__state().result())); }
287  template<class Range, class Result = tuple<typename mt::iterOf<Range>::type, Result_>>
288  static void func(Promise<Result>& promise, Range& range, int i)
289  { auto it = next(begin(range), i); promise.setValue(make_tuple(it, it->__state().result())); }
290  };
291 
292  template<>
293  struct whenAny_onReady<void>
294  {
295  template<class Futures, szt... Seq>
296  static void func(Promise<int>& promise, Futures&, mt::idxseq<Seq...>, int i)
297  { promise.setValue(i); }
298  template<class Range, class Result = typename mt::iterOf<Range>::type>
299  static void func(Promise<Result>& promise, Range& range, int i)
300  { promise.setValue(next(begin(range), i)); }
301  };
302 }
305 inline Future<tuple<>> whenAll() { return FutureCreate(tuple<>()); }
306 
308 template< class... Futures,
309  class Result_ = typename std::decay<typename mt::removeRef<mt::typeAt<0, Futures...>>::type::Result>::type,
310  class Result = typename std::conditional< std::is_same<Result_, void>::value,
311  void,
312  tuple<typename std::decay<typename mt::removeRef<Futures>::type::Result>::type...>>::type>
313 Future<Result> whenAll(Futures&&... fs)
314 {
315  using namespace future::priv;
317  auto future = promise.future();
318 
319  struct onReady : mt::FuncptrBase, SmallAllocatorObject
320  {
321  onReady(Promise<Result>&& promise, Futures&&... fs) :
322  promise(move(promise)), fs(forward<Futures>(fs)...), count(0), ready(0), max(sizeof...(fs)) {}
323 
324  void operator()(StateBase& src)
325  {
326  SpinLock::Scoped _(this->lock);
327  if (src.ready && !this->promise.__state().ready)
328  {
329  if (src.ex)
330  this->promise.setException(src.ex);
331  else if (++this->ready == this->max)
332  priv::whenAll_onReady<Result>::func(this->promise, this->fs, mt::make_idxseq<sizeof...(Futures)>());
333  }
334  if (++this->count == this->max) { _.unlock(); delete_(this); }
335  }
336 
337  Promise<Result> promise;
338  tuple<Futures...> fs;
339  szt count;
340  szt ready;
341  szt max;
342  SpinLock lock;
343  };
344 
345  auto& func = *new onReady(move(promise), forward<Futures>(fs)...);
346  priv::when_init(func, func.fs, mt::make_idxseq<sizeof...(Futures)>());
347  return future;
348 }
349 
351 template< class Range, typename std::enable_if<mt::isRange<Range>::value, int>::type=0,
352  class Result_ = typename std::decay<typename mt::elemOf<Range>::type::Result>::type,
353  class Result = typename std::conditional< std::is_same<Result_, void>::value,
354  void,
355  vector<Result_>>::type>
356 Future<Result> whenAll(Range&& range)
357 {
358  using namespace future::priv;
360  auto future = promise.future();
361 
362  struct onReady : mt::FuncptrBase, SmallAllocatorObject
363  {
364  onReady(Promise<Result>&& promise, Range&& range) :
365  promise(move(promise)), range(forward<Range>(range)), count(0), ready(0), max(countOf(range)) {}
366 
367  void operator()(StateBase& src)
368  {
369  SpinLock::Scoped _(this->lock);
370  if (src.ready && !this->promise.__state().ready)
371  {
372  if (src.ex)
373  this->promise.setException(src.ex);
374  else if (++this->ready == this->max)
375  priv::whenAll_onReady<Result>::func(this->promise, this->range);
376  }
377  if (++this->count == this->max) { _.unlock(); delete_(this); }
378  }
379 
380  Promise<Result> promise;
381  Range range;
382  szt count;
383  szt ready;
384  szt max;
385  SpinLock lock;
386  };
387 
388  auto& func = *new onReady(move(promise), forward<Range>(range));
389  for (auto& e : func.range) e.__state().addOnReady(func);
390  return future;
391 }
392 
394 template< class... Futures,
395  class Result_ = typename std::decay<typename mt::removeRef<mt::typeAt<0, Futures...>>::type::Result>::type,
396  class Result = typename std::conditional< std::is_same<Result_, void>::value,
397  int,
398  tuple<int, Result_>>::type>
399 Future<Result> whenAny(Futures&&... fs)
400 {
401  using namespace future::priv;
403  auto future = promise.future();
404 
405  struct onReady : mt::FuncptrBase, SmallAllocatorObject
406  {
407  onReady(Promise<Result>&& promise, Futures&&... fs) :
408  promise(move(promise)), fs(forward<Futures>(fs)...), count(0), max(sizeof...(fs)) {}
409 
410  void operator()(StateBase& src)
411  {
412  SpinLock::Scoped _(this->lock);
413  if (src.ready && !this->promise.__state().ready)
414  {
415  if (src.ex)
416  this->promise.setException(src.ex);
417  else
418  {
419  auto seq = mt::make_idxseq<sizeof...(Futures)>();
420  priv::whenAny_onReady<Result_>::func(this->promise, this->fs, seq, whenAny_valIndex(src, this->fs, seq));
421  }
422  }
423  if (++this->count == this->max) { _.unlock(); delete_(this); }
424  }
425 
426  Promise<Result> promise;
427  tuple<Futures...> fs;
428  szt count;
429  szt max;
430  SpinLock lock;
431  };
432 
433  auto& func = *new onReady(move(promise), forward<Futures>(fs)...);
434  priv::when_init(func, func.fs, mt::make_idxseq<sizeof...(Futures)>());
435  return future;
436 }
437 
439 template< class Range, typename std::enable_if<mt::isRange<Range>::value, int>::type=0,
440  class Result_ = typename std::decay<typename mt::elemOf<Range>::type::Result>::type,
441  class Result = typename std::conditional< std::is_same<Result_, void>::value,
442  typename mt::iterOf<Range>::type,
443  tuple<typename mt::iterOf<Range>::type, Result_>>::type>
444 Future<Result> whenAny(Range&& range)
445 {
446  using namespace future::priv;
448  auto future = promise.future();
449 
450  struct onReady : mt::FuncptrBase, SmallAllocatorObject
451  {
452  onReady(Promise<Result>&& promise, Range&& range) :
453  promise(move(promise)), range(forward<Range>(range)), count(0), max(countOf(range)) {}
454 
455  void operator()(StateBase& src)
456  {
457  SpinLock::Scoped _(this->lock);
458  if (src.ready && !this->promise.__state().ready)
459  {
460  if (src.ex)
461  this->promise.setException(src.ex);
462  else
463  priv::whenAny_onReady<Result_>::func(this->promise, this->range, whenAny_valIndex(src, this->range));
464  }
465  if (++this->count == this->max) { _.unlock(); delete_(this); }
466  }
467 
468  Promise<Result> promise;
469  Range range;
470  szt count;
471  szt max;
472  SpinLock lock;
473  };
474 
475  auto& func = *new onReady(move(promise), forward<Range>(range));
476  for (auto& e : func.range) e.__state().addOnReady(func);
477  return future;
478 }
479 
480 } }
#define mt_global(Class, Func, Ctor)
Create a global object that will be initialized upon first access, so it can be accessed safely from ...
Definition: Meta.h:283
TimePoint represented by a duration since a clock's epoch time.
Definition: TimePoint.h:11
static TimePoint now()
Get current time.
Definition: Clock.h:48
Future< typename std::result_of< Func(Subclass)>::type > then(Sched &&sched, Func &&f)
Append a continuation function that will be called when this future is ready. The ready future is pas...
Unique future, guarantees sole access to a future function result.
Definition: Future.h:53
A thread lock where the lock is acquired through a busy wait loop.
Definition: Spin.h:17
Combined intrusive/non-intrusive smart pointer. Can reference and share any object automatically...
Definition: SharedPtr.h:175
AsyncSched(int workerCount, int workerTaskMax)
Definition: Util.h:151
Pool(szt workerCount, szt workerTaskMax)
Definition: Pool.cpp:24
#define mt_unpackEval(...)
Unpack and evaluate a parameter pack expression. Ex. void foo(Args... args) { mt_unpackEval(func(args...
Definition: Meta.h:22
Iter find(Range &&, Seqs &&..., Func &&pred)
Find an element in a series of sequences.
std::enable_if<!b, T > disable_if
Opposite of std::enable_if.
Definition: Meta.h:62
static AsyncSched & inst()
Definition: Util.h:153
int valIndex(Val &&val, Ts &&...ts)
Get index of first matching value in parameter pack, returns -1 if not found.
Definition: Range.h:71
void delete_(T *&p)
Destruct object, free memory and set pointer to null.
Definition: Allocator.h:75
Super::TimePoint TimePoint
Definition: Clock.h:40
AsyncSched * async_createSingleton()
Default implementation.
Definition: Util.h:164
the future result is ready
std::index_sequence< Ints... > idxseq
Shorthand for std::index_sequence.
Definition: Meta.h:111
Container to hold a delayed function result.
Definition: Promise.h:181
std::remove_reference< T > removeRef
Remove reference from type.
Definition: Meta.h:44
std::enable_if< mt::isIterator< Iter1 >::value, Range_< Iter1, Iter2 > >::type range(Iter1 &&first, Iter2 &&last)
Range from iterators [first, last)
Definition: Range.h:116
static auto _
Definition: Module.cpp:8
Lock that is bound to a single condition. This is the common usage case of condition variables...
Definition: Lock.h:33
std::decay< decltype(honey::begin(declval< Range >)))>::type type
Definition: Range.h:31
Objects that inherit from this class will use Alloc for new/delete ops.
Definition: Allocator.h:129
#define mt_iterOf(Range)
iterOf for values
Definition: Range.h:33
void unlock()
Definition: Unique.h:103
Definition: Util.h:149
Future< R > FutureCreate(T &&val)
Create a future that is immediately ready with the value.
Definition: Future.h:211
szt countOf(Range &&range)
Count number of elements in range.
Definition: Range.h:424
static constexpr Duration max()
Maximum duration (positive reps)
Definition: Duration.h:86
A container that wraps a function so that its result is stored in a future when invoked.
Definition: PackagedTask.h:10
std::conditional< std::is_same< Result, void >::value, mt::typeAt< 0, Ts... >, Result >::type valAt(int i, Ts &&...ts)
Get value at index of parameter pack. All types must be convertible to Result, which defaults to the ...
Definition: Range.h:63
All tasks must inherit from this class. std::function is not used here to avoid the operator() virtua...
Definition: Pool.h:16
Duration represented by repetitions of a period. The period must be a ratio.
Definition: Duration.h:7
#define throw_
Use in place of throw keyword to throw a honey::Exception object polymorphically and provide debug in...
Definition: Exception.h:11
size_t szt
Size type, shorthand for size_t.
Definition: Core.h:90
typename priv::typeAt< 0, I, Ts... >::type typeAt
Get type at index of parameter pack.
Definition: Meta.h:106
int waitAny(Future &&f, Futures &&...fs)
Wait until any futures are ready, returns index of ready future.
Definition: Util.h:63
void operator()(Func &&f)
Definition: Util.h:156
Global allocator for small memory blocks. To provide a custom pool define SmallAllocator_createSingle...
Definition: SmallAllocator.h:23
Future< tuple<> > whenAll()
Definition: Util.h:305
static bool trace
Whether to log task execution flow.
Definition: Util.h:159
void enqueue(Task &&task)
Schedule a task for execution.
Definition: Pool.h:35
void wait() const
Wait until result is ready.
Definition: Future.h:31
Definition: Util.h:146
std::make_index_sequence< N > make_idxseq
Shorthand for std::make_index_sequence.
Definition: Meta.h:113
Definition: Promise.h:17
auto waitAny(Range &&range) -> mt_iterOf(range)
Wait until any futures in a range are ready, returns iterator to ready future.
Definition: Util.h:85
Future< typename std::result_of< Func(Args...)>::type > async(Sched &&sched, Func &&f, Args &&...args)
Call a function asynchronously, returns a future with the result of the function call.
Definition: Util.h:175
Base class for Future types.
Definition: Future.h:20
Inherit to enable non-virtual functor calling.
Definition: Meta.h:231
void waitAll()
Definition: Util.h:16
Future< Result > whenAny(Futures &&...fs)
Returns a future to a tuple of the index of the ready future and its result.
Definition: Util.h:399
Global Honeycomb namespace.
Spreads task execution across a pool of re-usable threads. Uses a lock-free work-stealing queue to en...
Definition: Pool.h:12