Spinning Topp Logo BlackTopp Studios
inc
framescheduler.cpp
Go to the documentation of this file.
1 // The DAGFrameScheduler is a Multi-Threaded lock free and wait free scheduling library.
2 // © Copyright 2010 - 2016 BlackTopp Studios Inc.
3 /* This file is part of The DAGFrameScheduler.
4 
5  The DAGFrameScheduler is free software: you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation, either version 3 of the License, or
8  (at your option) any later version.
9 
10  The DAGFrameScheduler is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  GNU General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with The DAGFrameScheduler. If not, see <http://www.gnu.org/licenses/>.
17 */
18 /* The original authors have included a copy of the license specified above in the
19  'doc' folder. See 'gpl.txt'
20 */
21 /* We welcome the use of the DAGFrameScheduler to anyone, including companies who wish to
22  Build professional software and charge for their product.
23 
24  However there are some practical restrictions, so if your project involves
25  any of the following you should contact us and we will try to work something
26  out:
27  - DRM or Copy Protection of any kind(except Copyrights)
28  - Software Patents You Do Not Wish to Freely License
29  - Any Kind of Linking to Non-GPL licensed Works
30  - Are Currently In Violation of Another Copyright Holder's GPL License
31  - If You want to change our code and not add a few hundred MB of stuff to
32  your distribution
33 
34  These and other limitations could cause serious legal problems if you ignore
35  them, so it is best to simply contact us or the Free Software Foundation, if
36  you have any questions.
37 
38  Joseph Toppi - toppij@gmail.com
39  John Blackwood - makoenergy02@gmail.com
40 */
41 #ifndef _framescheduler_cpp
42 #define _framescheduler_cpp
43 
44 #include "framescheduler.h"
45 #include "doublebufferedresource.h"
46 #include "monopoly.h"
48 #include "lockguard.h"
49 #ifdef MEZZ_USEATOMICSTODECACHECOMPLETEWORK
50  #include "atomicoperations.h"
51 #endif
52 
53 
54 #include <exception>
55 #include <iostream>
56 #include <algorithm>
57 
58 using namespace std;
59 
60 /// @file
61 /// @brief This is the core object implementation for this algorithm
62 
63 #ifdef _MEZZ_THREAD_WIN32_
64  #ifdef _MSC_VER
65  #pragma warning( disable : 4706) // Disable Legitimate assignment in a WorkUnit acquisition loops
66  #endif
67 #endif
68 
69 
70 namespace Mezzanine
71 {
72  namespace Threading
73  {
74  /// @cond false
75 
76  // Initializing static members
77  SpinLock FrameScheduler::FrameSchedulersLock;
78  std::vector<FrameScheduler*> FrameScheduler::FrameSchedulers;
79 
80  /// @brief The function Frameschedulers have std::terminate() call.
81  /// @details This will iterate over every frame scheduler, finds and activate its log aggregator, and flush its log.
82  void TerminateHandler()
83  {
84  lock_guard<SpinLock> g(FrameScheduler::FrameSchedulersLock);
85  for(std::vector<FrameScheduler*>::iterator Iter = FrameScheduler::FrameSchedulers.begin();
86  Iter != FrameScheduler::FrameSchedulers.end();
87  Iter++)
88  {
89  FrameScheduler* CurrentFrameScheduler = *Iter;
90  CurrentFrameScheduler->ForceLogFlush();
91  }
92  exit(1);
93  }
94 
95  /// @brief This is the function that all threads will run, except the main.
96  /// @param ThreadStorage A pointer to a ThreadSpecificStorage that has the required data for a thread after it launches.
97  void ThreadWork(void* ThreadStorage)
98  {
99  DefaultThreadSpecificStorage::Type& Storage = *((DefaultThreadSpecificStorage::Type*)ThreadStorage);
100  FrameScheduler& FS = *(Storage.GetFrameScheduler());
101  iWorkUnit* CurrentUnit;
102 
103  do
104  {
105  while( (CurrentUnit = FS.GetNextWorkUnit()) ) /// @todo needs to skip ahead a unit instead of spinning
106  {
107  if(Starting==CurrentUnit->TakeOwnerShip())
108  { CurrentUnit->operator()(Storage); }
109  }
110  } while(!FS.AreAllWorkUnitsComplete());
111  }
112 
113  /// @brief This is the function that the main thread runs.
114  /// @param ThreadStorage A pointer to a ThreadSpecificStorage that has the required data for a thread after it launches.
115  void ThreadWorkAffinity(void* ThreadStorage)
116  {
117  DefaultThreadSpecificStorage::Type& Storage = *((DefaultThreadSpecificStorage::Type*)ThreadStorage);
118  FrameScheduler& FS = *(Storage.GetFrameScheduler());
119  iWorkUnit* CurrentUnit;
120  do
121  {
122  while( (CurrentUnit = FS.GetNextWorkUnitAffinity()) ) /// @todo needs to skip ahead a unit instead of spinning
123  {
124  if(Starting==CurrentUnit->TakeOwnerShip())
125  { CurrentUnit->operator()(Storage); }
126  }
127  }
128  while(!FS.AreAllWorkUnitsComplete());
129  }
130  /// @endcond
131 
132  ////////////////////////////////////////////////////////////////////////////////
133  // Protected Methods
134 
135  void FrameScheduler::AddErrorScheduler(FrameScheduler* SchedulerToAdd)
136  {
137  lock_guard<SpinLock> g(FrameSchedulersLock);
138  FrameSchedulers.push_back(SchedulerToAdd);
139  }
140 
141  void FrameScheduler::RemoveErrorScheduler(FrameScheduler* SchedulerToRemove)
142  {
143  lock_guard<SpinLock> g(FrameSchedulersLock);
144  FrameSchedulers.erase( std::remove( FrameSchedulers.begin(), FrameSchedulers.end(), SchedulerToRemove ), FrameSchedulers.end() );
145  }
146 
147 
148  void FrameScheduler::CleanUpThreads()
149  { JoinAllThreads(); }
150 
151  void FrameScheduler::DeleteThreads()
152  {
153  for(std::vector<Thread*>::iterator Iter = Threads.begin(); Iter!=Threads.end(); ++Iter)
154  { delete *Iter; }
155  }
156 
157  void FrameScheduler::UpdateDependentGraph(const std::vector<WorkUnitKey>& Units)
158  {
159  // for each WorkUnit
160  for(std::vector<WorkUnitKey>::const_iterator Iter=Units.begin(); Iter!=Units.end(); ++Iter)
161  {
162  // For each Dependent of that Workunit
163  size_t Max = Iter->Unit->GetImmediateDependencyCount();
164  for(size_t Counter=0; Counter<Max; ++Counter)
165  { DependentGraph[Iter->Unit->GetDependency(Counter)].insert(Iter->Unit); } // Make a record of the reverse association
166  }
167  }
168 
169  void FrameScheduler::UpdateWorkUnitKeys(std::vector<WorkUnitKey> &Units)
170  {
171  for(std::vector<WorkUnitKey>::iterator Iter=Units.begin(); Iter!=Units.end(); ++Iter)
172  { *Iter = Iter->Unit->GetSortingKey(*this); }
173  }
174 
175  ////////////////////////////////////////////////////////////////////////////////
176  // Construction and Destruction
177  FrameScheduler::FrameScheduler(std::fstream *_LogDestination, Whole StartingThreadCount) :
178  FrameTimeLog(MEZZ_FRAMESTOTRACK),
179  PauseTimeLog(MEZZ_FRAMESTOTRACK),
180  CurrentFrameStart(GetTimeStamp()),
181  CurrentPauseStart(GetTimeStamp()),
182  Sorter(0),
184  DecacheMain(0),
185  DecacheAffinity(0),
186  #endif
187  CurrentThreadCount(StartingThreadCount),
188  FrameCount(0), TargetFrameLength(16666),
189  TimingCostAllowance(0),
190  MainThreadID(this_thread::get_id()),
191  LoggingToAnOwnedFileStream(true),
192  NeedToLogDeps(true)
193  {
194  Resources.push_back(new DefaultThreadSpecificStorage::Type(this));
195  if(_LogDestination)
196  { InstallLog(_LogDestination); }
197  else
198  { InstallLog(new std::fstream("Mezzanine.log", std::ios::out | std::ios::trunc)); }
199  AddErrorScheduler(this);
200  SetErrorHandler();
201  }
202 
203  FrameScheduler::FrameScheduler(std::ostream *_LogDestination, Whole StartingThreadCount) :
204  FrameTimeLog(MEZZ_FRAMESTOTRACK),
205  PauseTimeLog(MEZZ_FRAMESTOTRACK),
206  CurrentFrameStart(GetTimeStamp()),
207  CurrentPauseStart(GetTimeStamp()),
208 
209  Sorter(0),
211  DecacheMain(0),
212  DecacheAffinity(0),
213  #endif
214  CurrentThreadCount(StartingThreadCount),
215  FrameCount(0), TargetFrameLength(16666),
216  TimingCostAllowance(0),
217  MainThreadID(this_thread::get_id()),
218  LoggingToAnOwnedFileStream(false),
219  NeedToLogDeps(true)
220  {
221  Resources.push_back(new DefaultThreadSpecificStorage::Type(this));
222  InstallLog(_LogDestination);
223  AddErrorScheduler(this);
224  SetErrorHandler();
225  }
226 
228  {
229  std::set_terminate(TerminateHandler);
230  }
231 
233  {
234  RemoveErrorScheduler(this);
235  CleanUpThreads();
236  RemoveLog();
237  for(std::vector<WorkUnitKey>::iterator Iter=WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); ++Iter)
238  { delete Iter->Unit; }
239  for(std::vector<MonopolyWorkUnit*>::iterator Iter = WorkUnitsMonopolies.begin(); Iter!=WorkUnitsMonopolies.end(); ++Iter)
240  { delete *Iter; }
241  for(std::vector<DefaultThreadSpecificStorage::Type*>::iterator Iter = Resources.begin(); Iter!=Resources.end(); ++Iter)
242  { delete *Iter; }
243  DeleteThreads();
244  }
245 
246  ////////////////////////////////////////////////////////////////////////////////
247  // WorkUnit management
248  void FrameScheduler::AddWorkUnitMain(iWorkUnit* MoreWork, const String& WorkUnitName)
249  {
251  this->WorkUnitsMain.push_back(MoreWork->GetSortingKey(*this));
252  (*this->LogDestination) << "<WorkUnitMainInsertion ID=\"" << hex << MoreWork << "\" Name=\"" << WorkUnitName << "\" />" << endl;
253  }
254 
255  void FrameScheduler::AddWorkUnitAffinity(iWorkUnit* MoreWork, const String& WorkUnitName)
256  {
258  this->WorkUnitsAffinity.push_back(MoreWork->GetSortingKey(*this));
259  (*this->LogDestination) << "<WorkUnitAffinityInsertion ID=\"" << hex << MoreWork << "\" Name=\"" << WorkUnitName << "\" />" << endl;
260  }
261 
262  void FrameScheduler::AddWorkUnitMonopoly(MonopolyWorkUnit* MoreWork, const String& WorkUnitName)
263  {
265  this->WorkUnitsMonopolies.push_back(MoreWork);
266  (*this->LogDestination) << "<WorkUnitMonopolyInsertion ID=\"" << hex << MoreWork << "\" Name=\"" << WorkUnitName << "\" />" << endl;
267  }
268 
269  void FrameScheduler::SortWorkUnitsMain(bool UpdateDependentGraph_)
270  {
271  if(UpdateDependentGraph_)
272  { UpdateDependentGraph(); }
273  if(WorkUnitsMain.size())
274  {
276  std::sort(WorkUnitsMain.begin(),WorkUnitsMain.end(),std::less<WorkUnitKey>() );
277  }
278  }
279 
280  void FrameScheduler::SortWorkUnitsAffinity(bool UpdateDependentGraph_)
281  {
282  if(UpdateDependentGraph_)
283  { UpdateDependentGraph(); }
284  if(WorkUnitsAffinity.size())
285  {
287  std::sort(WorkUnitsAffinity.begin(),WorkUnitsAffinity.end(),std::less<WorkUnitKey>() );
288  }
289  }
290 
291  void FrameScheduler::SortWorkUnitsAll(bool UpdateDependentGraph_)
292  {
293  SortWorkUnitsAffinity(UpdateDependentGraph_);
294  SortWorkUnitsMain(false);
295  }
296 
298  {
299  if(WorkUnitsMain.size())
300  {
301  IteratorMain RemovalTarget = WorkUnitsMain.end();
302  for(IteratorMain Iter = WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); Iter++)
303  {
304  if(Iter->Unit == LessWork)
305  {
306  if(Iter+1 == WorkUnitsMain.end()) // once we find it, push it to the back linearly.
307  { RemovalTarget = Iter;} // This way we can erase from the back where it is cheap
308  else // to do so and still make just on pass through the list
309  { std::swap (*Iter,*(Iter+1)); }
310  }
311  Iter->Unit->RemoveDependency(LessWork); // This has a ton of cache miss potential I am curious what it benchmarks like?!
312  }
313  if(RemovalTarget!=WorkUnitsMain.end())
314  { WorkUnitsMain.erase(RemovalTarget); }
315  }
316  if(WorkUnitsAffinity.size())
317  {
318  for(IteratorAffinity Iter = WorkUnitsAffinity.begin(); Iter!=WorkUnitsAffinity.end(); Iter++)
319  { Iter->Unit->RemoveDependency(LessWork); }
320  }
321  }
322 
324  {
325  if(WorkUnitsAffinity.size())
326  {
327  IteratorAffinity RemovalTarget = WorkUnitsMain.end();
328  for(IteratorAffinity Iter = WorkUnitsAffinity.begin(); Iter!=WorkUnitsAffinity.end(); Iter++)
329  {
330  if(Iter->Unit == LessWork)
331  {
332  if(Iter+1 == WorkUnitsAffinity.end())
333  { RemovalTarget = Iter;}
334  else
335  { std::swap (*Iter,*(Iter+1)); }
336  }
337  Iter->Unit->RemoveDependency(LessWork);
338  }
339  if(RemovalTarget!=WorkUnitsAffinity.end())
340  { WorkUnitsAffinity.erase(RemovalTarget); }
341  }
342  if(WorkUnitsMain.size())
343  {
344  for(IteratorMain Iter = WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); Iter++)
345  { Iter->Unit->RemoveDependency(LessWork); }
346  }
347  }
348 
350  {
351  if(WorkUnitsMain.size())
352  {
353  for(IteratorMain Iter = WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); Iter++)
354  { Iter->Unit->RemoveDependency(LessWork); }
355  }
356  if(WorkUnitsAffinity.size())
357  {
358  for(IteratorAffinity Iter = WorkUnitsAffinity.begin(); Iter!=WorkUnitsAffinity.end(); Iter++)
359  { Iter->Unit->RemoveDependency(LessWork); }
360  }
361  if(WorkUnitsMonopolies.size())
362  {
363  for(IteratorMonoply Iter = WorkUnitsMonopolies.begin(); Iter!=WorkUnitsMonopolies.end(); Iter++)
364  {
365  if(*Iter==LessWork)
366  {
367  WorkUnitsMonopolies.erase(Iter);
368  return;
369  }
370  }
371  }
372  }
373 
374  ////////////////////////////////////////////////////////////////////////////////
375  // Algorithm essentials
376 
377  Whole FrameScheduler::GetDependentCountOf(iWorkUnit* Work, bool UsedCachedDepedentGraph)
378  {
379  if(UsedCachedDepedentGraph)
380  { UpdateDependentGraph(); }
381  Whole Results = DependentGraph[Work].size();
382  for(std::set<iWorkUnit*>::iterator Iter=DependentGraph[Work].begin(); Iter!=DependentGraph[Work].end(); ++Iter)
383  {
384  Results+=GetDependentCountOf(*Iter);
385  }
386  return Results;
387  }
388  //DecacheMain(0),
389  //DecacheAffinity(0),
391  {
392  #ifdef MEZZ_USEATOMICSTODECACHECOMPLETEWORK
393  bool CompleteSoFar = true;
394  Int32 CurrentRun = DecacheMain;
395  for(std::vector<WorkUnitKey>::reverse_iterator Iter = WorkUnitsMain.rbegin()+CurrentRun; Iter!=WorkUnitsMain.rend(); ++Iter)
396  {
397  if (NotStarted==Iter->Unit->GetRunningState())
398  {
399  if(Iter->Unit->IsEveryDependencyComplete())
400  { return Iter->Unit; }
401  CompleteSoFar=false;
402  }
403 
404  if(CompleteSoFar)
405  {
406  CurrentRun++;
407  if(Complete==Iter->Unit->GetRunningState())
408  { AtomicCompareAndSwap32(&DecacheMain,DecacheMain,CurrentRun); }
409  }
410  }
411  #else
412  for(std::vector<WorkUnitKey>::reverse_iterator Iter = WorkUnitsMain.rbegin(); Iter!=WorkUnitsMain.rend(); ++Iter)
413  {
414  if (NotStarted==Iter->Unit->GetRunningState())
415  {
416  if(Iter->Unit->IsEveryDependencyComplete())
417  { return Iter->Unit; }
418  }
419  }
420  #endif
421 
422  return 0;
423  }
424 
426  {
427  #ifdef MEZZ_USEATOMICSTODECACHECOMPLETEWORK
428  bool CompleteSoFar = true;
429  Int32 CurrentRun = DecacheAffinity;
430  for(std::vector<WorkUnitKey>::reverse_iterator Iter = WorkUnitsAffinity.rbegin()+CurrentRun; Iter!=WorkUnitsAffinity.rend(); ++Iter)
431  {
432  if (NotStarted==Iter->Unit->GetRunningState())
433  {
434  if(Iter->Unit->IsEveryDependencyComplete())
435  { return Iter->Unit; }
436  CompleteSoFar=false;
437  }
438 
439  if(CompleteSoFar)
440  {
441  CurrentRun++;
442  if(Complete==Iter->Unit->GetRunningState())
443  { AtomicCompareAndSwap32(&DecacheAffinity,DecacheAffinity,CurrentRun); }
444  }
445  }
446  #else
447  for(std::vector<WorkUnitKey>::reverse_iterator Iter = WorkUnitsAffinity.rbegin(); Iter!=WorkUnitsAffinity.rend(); ++Iter)
448  {
449  if (NotStarted==Iter->Unit->GetRunningState())
450  {
451  if(Iter->Unit->IsEveryDependencyComplete())
452  { return Iter->Unit; }
453  }
454  }
455  #endif
456 
457  return GetNextWorkUnit();
458  }
459 
461  {
462  // start reading from units likely to be executed last.
463  for(IteratorMain Iter = WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); ++Iter)
464  {
465  if(Complete!=Iter->Unit->GetRunningState())
466  { return false; }
467  }
468 
469  for(IteratorAffinity Iter = WorkUnitsAffinity.begin(); Iter!=WorkUnitsAffinity.end(); ++Iter)
470  {
471  if(Complete!=Iter->Unit->GetRunningState())
472  { return false; }
473  }
474 
475  return true;
476  }
477 
479  {
480  DependentGraph.clear();
483  }
484 
485  ////////////////////////////////////////////////////////////////////////////////
486  // Algorithm Configuration and Introspection
487 
489  { return FrameCount; }
490 
492  { return TargetFrameLength; }
493 
494  void FrameScheduler::SetFrameRate(const Whole& FrameRate)
495  {
496  if(FrameRate)
497  { TargetFrameLength = 1000000/FrameRate; }
498  else
499  { TargetFrameLength = 0; }
500  }
501 
502  void FrameScheduler::SetFrameLength(const Whole& FrameLength)
503  { TargetFrameLength = FrameLength; }
504 
506  { return CurrentThreadCount; }
507 
508  void FrameScheduler::SetThreadCount(const Whole& NewThreadCount)
509  { CurrentThreadCount = NewThreadCount; }
510 
512  { return CurrentFrameStart; }
513 
515  { return this->PauseTimeLog; }
516 
518  { return this->PauseTimeLog[PauseTimeLog.RecordCapacity()-1]; }
519 
521  { return this->FrameTimeLog; }
522 
524  { return this->FrameTimeLog[FrameTimeLog.RecordCapacity()-1]; }
525 
526  ////////////////////////////////////////////////////////////////////////////////
527  // Executing a Frame
528 
530  {
532  CreateThreads();
534  JoinAllThreads();
537  }
538 
540  {
541  for(IteratorMonoply Iter = WorkUnitsMonopolies.begin(); Iter!=WorkUnitsMonopolies.end(); ++Iter)
542  { (*Iter)->operator()(*(Resources.at(0))); }
543  }
544 
546  {
547  LogResources.Lock(); //Unlocks in FrameScheduler::RunMainThreadWork() after last resource is swapped
549  }
550 
552  {
553  LogDependencies();
555  ThreadWorkAffinity(Resources[0]); // Do work in this thread and get the units with affinity
556 
557  }
558 
560  {
561  for(std::vector<Thread*>::iterator Iter=Threads.begin(); Iter!=Threads.end(); ++Iter)
562  {
563  (*Iter)->join();
564  delete *Iter;
565  }
566  Threads.clear();
567  Threads.reserve(CurrentThreadCount);
568 
569  if(Sorter)
570  {
573  Sorter=0;
574  }
575 
577  }
578 
580  {
581  /// @todo could be replace with a parallel for, or a monopoly
582  for(std::vector<WorkUnitKey>::reverse_iterator Iter = WorkUnitsMain.rbegin(); Iter!=WorkUnitsMain.rend(); ++Iter)
583  { Iter->Unit->PrepareForNextFrame(); }
584  for(std::vector<WorkUnitKey>::reverse_iterator Iter = WorkUnitsAffinity.rbegin(); Iter!=WorkUnitsAffinity.rend(); ++Iter)
585  { Iter->Unit->PrepareForNextFrame(); }
586  #ifdef MEZZ_USEATOMICSTODECACHECOMPLETEWORK
587  DecacheMain=0;
588  DecacheAffinity=0;
589  #endif
590  }
591 
593  {
594  FrameCount++;
595  Whole TargetFrameEnd=0;
597  {
598  TargetFrameEnd = TargetFrameLength + CurrentFrameStart;
599  Whole WaitTime = Whole(TargetFrameEnd - GetTimeStamp()) + TimingCostAllowance;
600  if(WaitTime>1000000) /// @todo Replace hard-code timeout with compiler/define/cmake_option
601  { WaitTime = 0; }
603  }
604  MaxInt Now = GetTimeStamp();
605  FrameTimeLog.Insert(Now-CurrentFrameStart); //Track Frame Time for the past while
606  PauseTimeLog.Insert(Now-CurrentPauseStart); //Track Pause Time for the past while
607  (*LogDestination) << dec << "<FrameTimes Frame=\"" << (FrameCount-1) << "\" PauseTimeLog=\"" << (Now-CurrentPauseStart) << "\" FrameLength=\"" << (Now-CurrentFrameStart) << "\" />" << endl;
609  TimingCostAllowance -= (CurrentFrameStart-TargetFrameEnd);
610  }
611 
612  ////////////////////////////////////////////////////////////////////////////////
613  // Basic container features
614 
616  { return this->WorkUnitsMonopolies.size(); }
617 
619  { return this->WorkUnitsAffinity.size(); }
620 
622  { return this->WorkUnitsMain.size(); }
623 
624  ////////////////////////////////////////////////////////////////////////////////
625  // Other Utility Features
626 
628  {
629  std::vector<Resource*>::iterator Results = Resources.begin();
630  if(ID == MainThreadID)
631  { return *Results; } // Main Thread Resources are stored in slot 0
632 
633  for(std::vector<Thread*>::iterator Iter=Threads.begin(); Iter!=Threads.end(); ++Iter)
634  {
635  Results++;
636  if ( (*Iter)->get_id()==ID)
637  { return *Results; }
638  }
639  return NULL;
640  }
641 
643  {
644  Resource* AlmostResults = GetThreadResource(ID);
645  if(AlmostResults)
646  { return &AlmostResults->GetUsableLogger(); }
647  return 0;
648  }
649 
651  { this->NeedToLogDeps = Changed; }
652 
654  {
655  if(this->NeedToLogDeps)
656  {
657  this->NeedToLogDeps = false;
658  for(IteratorMain Iter = WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); ++Iter)
659  {
660  Whole MainCount = Iter->Unit->GetImmediateDependencyCount();
661  for(Whole Counter = 0; Counter<MainCount; Counter++)
662  {
663  *(this->LogDestination) << "<WorkUnitDependency "
664  "Unit=\"" << hex << Iter->Unit
665  << "\" DependsOn=\"" << Iter->Unit->GetDependency(Counter) << "\" "
666  "/>" << endl;
667  }
668  }
669 
670  for(IteratorAffinity Iter = WorkUnitsAffinity.begin(); Iter!=WorkUnitsAffinity.end(); ++Iter)
671  {
672  Whole AffinityCount = Iter->Unit->GetImmediateDependencyCount();
673  for(Whole Counter = 0; Counter<AffinityCount; Counter++)
674  {
675  *(this->LogDestination) << "<WorkUnitDependency "
676  "Unit=\"" << hex << Iter->Unit
677  << "\" DependsOn=\"" << Iter->Unit->GetDependency(Counter) << "\" "
678  "/>" << endl;
679  }
680  }
681 
682  for(IteratorMonoply Iter = WorkUnitsMonopolies.begin(); Iter!=WorkUnitsMonopolies.end(); ++Iter)
683  {
684  Whole MonopolyCount = (*Iter)->GetImmediateDependencyCount();
685  for(Whole Counter = 0; Counter<MonopolyCount; Counter++)
686  {
687  *(this->LogDestination) << "<WorkUnitDependency "
688  "Unit=\"" << hex << (*Iter)
689  << "\" DependsOn=\"" << (*Iter)->GetDependency(Counter) << "\" "
690  "/>" << endl;
691  }
692  }
693  }
694  }
695 
696  std::ostream& FrameScheduler::GetLog()
697  { return *LogDestination; }
698 
699  void FrameScheduler::ChangeLogTarget(std::ostream* LogTarget)
700  {
701  RemoveLog();
702  InstallLog(LogTarget);
703  }
704 
705  void FrameScheduler::InstallLog(std::ostream* LogTarget)
706  {
707  LogDestination = LogTarget;
708  (*LogDestination)<< "<MezzanineLog>" << std::endl;
709  LogDestination->flush();
710  }
711 
712  void FrameScheduler::RemoveLog()
713  {
714  (*LogDestination) << "</MezzanineLog>" << std::endl;
715  LogDestination->flush();
716 
718  {
719  ((std::fstream*)LogDestination)->close();
720  delete LogDestination;
721  }
722  }
723 
725  {
726  LogAggregator* Results = NULL;
727  for(IteratorMain Iter = WorkUnitsMain.begin(); Iter!=WorkUnitsMain.end(); ++Iter)
728  {
729  Results = dynamic_cast<LogAggregator*>(Iter->Unit);
730  if(Results)
731  { return Results; }
732  }
733 
734  for(IteratorAffinity Iter = WorkUnitsAffinity.begin(); Iter!=WorkUnitsAffinity.end(); ++Iter)
735  {
736  Results = dynamic_cast<LogAggregator*>(Iter->Unit);
737  if(Results)
738  { return Results; }
739  }
740 
741  for(IteratorMonoply Iter = WorkUnitsMonopolies.begin(); Iter!=WorkUnitsMonopolies.end(); ++Iter)
742  {
743  Results = dynamic_cast<LogAggregator*>(*Iter);
744  if(Results)
745  { return Results; }
746  }
747  return NULL;
748 
749  }
750 
752  {
753  LogAggregator* Pointer = GetLogAggregator();
754  if(Pointer)
755  {
757  Pointer->NextFlushForced();
758  Pointer->DoWork(Storage);
759  this->SwapBufferedResources();
760  Pointer->NextFlushForced();
761  Pointer->DoWork(Storage);
762  GetLog().flush();
763  return true;
764  }
765  return false;
766  }
767 
769  {
770  Resources[0]->SwapAllBufferedResources();
771  for(Whole Count = 1; Count<CurrentThreadCount; ++Count)
772  {
773  if(Count+1>Resources.size())
774  { Resources.push_back(new DefaultThreadSpecificStorage::Type(this)); }
775  Resources[Count]->SwapAllBufferedResources();
776  Threads.push_back(new Thread(ThreadWork, Resources[Count]));
777  }
778  }
779 
780  } // \FrameScheduler
781 }// \Mezanine
782 
783 
784 #endif
int32_t Int32
An 32-bit integer.
Definition: datatypes.h:124
virtual void AddWorkUnitMonopoly(MonopolyWorkUnit *MoreWork, const String &WorkUnitName)
Add a MonopolyWorkUnit for execution at the beginning of the frame.
Use this to get the default rolling average for a given type.
virtual void SetFrameRate(const Whole &FrameRate)
Set the desired Frate rate.
virtual Whole GetDependentCountOf(iWorkUnit *Work, bool UsedCachedDepedentGraph=false)
How many other WorkUnit instances must wait on this one.
ThreadId MEZZ_LIB get_id()
Return the thread ID of the calling thread.
virtual void DoOneFrame()
Do one frame worth of work.
std::stringstream Logger
In case we ever replace the stringstream with another class, this will allow us to swap it out...
Definition: datatypes.h:180
#define MEZZ_USEATOMICSTODECACHECOMPLETEWORK
This is used to configure whether to atomically store a shortcut to skip checking all workunits...
Resource * GetThreadResource(ThreadId ID=this_thread::get_id())
Get the Resource to go with a thread of a given ID.
bool Boole
Generally acts a single bit, true or false.
Definition: datatypes.h:173
virtual void SortWorkUnitsMain(bool UpdateDependentGraph_=true)
Sort the the main pool of WorkUnits to allow them to be used more efficiently in the next frame execu...
std::vector< WorkUnitKey > WorkUnitsAffinity
A collection of iWorkUnits that must be run on the main thread.
Whole GetWorkUnitMainCount() const
Returns the amount of iWorkUnit ready to be scheduled in the Main pool.
std::vector< Resource * > Resources
This maintains ownership of all the thread specific resources.
virtual void AddWorkUnitMain(iWorkUnit *MoreWork, const String &WorkUnitName)
Add a normal Mezzanine::Threading::iWorkUnit to this For fcheduling.
void JoinAllThreads()
This is the 4th step (of 6) in a frame.
DefaultRollingAverage< Whole >::Type & GetFrameTimeRollingAverage()
Get The complete record of the durations of the last few frames.
virtual void CreateThreads()
This is the 2nd step (of 6) in a frame.
void CleanUpThreads()
Used in destruction to tear down threads.
virtual void SortWorkUnitsAll(bool UpdateDependentGraph_=true)
Sort all the WorkUnits that must run on the main thread to allow them to be used more efficiently in ...
virtual bool AreAllWorkUnitsComplete()
Is the work of the frame done?
MaxInt GetTimeStamp()
Get a timestamp, in microseconds. This will generally be some multiple of the GetTimeStampResolution ...
static void RemoveErrorScheduler(FrameScheduler *SchedulerToRemove)
Remove the passed scheduler from the list that will log in the event of an unhand;ed exception...
FrameScheduler(std::fstream *_LogDestination=0, Whole StartingThreadCount=GetCPUCount())
Create a Framescheduler that owns a filestream for logging.
Whole TargetFrameLength
The Maximum frame rate this algorithm should run at.
bool LoggingToAnOwnedFileStream
Set based on which constructor is called, and only used during destruction.
Integer TimingCostAllowance
To prevent frame time drift this many microseconds is subtracted from the wait period to allow time f...
virtual WorkUnitKey GetSortingKey(FrameScheduler &SchedulerToCount)=0
Get the sorting metadata.
std::ostream & GetLog()
Get the endpoint for the logs.
virtual iWorkUnit * GetNextWorkUnitAffinity()
Just like GetNextWorkUnit except that it also searches through and prioritizes work units with affini...
virtual void RemoveWorkUnitMain(iWorkUnit *LessWork)
Remove a WorkUnit from the main pool of WorkUnits (and not from the groups of Affinity or MonpolyWork...
Whole GetLastPauseTime() const
How long was the pause, if any, last frame?
std::vector< WorkUnitKey > WorkUnitsAffinity
A freshly sorted WorkUnitsAffinity or an empty vector.
STL namespace.
DependentGraphType DependentGraph
This structure allows reverse lookup of dependencies.
virtual Whole GetThreadCount()
Get the amount of threads that will be used to execute WorkUnits a the start of the next frame...
#define MEZZ_FRAMESTOTRACK
Used to control how long frames track length and other similar values. This is controlled by the CMak...
Whole GetLastFrameTime() const
How long was the previous frame?
Interface of a WorkUnit. This represents on piece of work through time.
Definition: workunit.h:66
std::ostream * LogDestination
When the logs are aggregated, this is where they are sent.
std::vector< WorkUnitKey >::iterator IteratorMain
An iterator suitable for iterating over the main pool of work units.
This file defines the template double buffered resources that can be attached to a thread...
void DeleteThreads()
Simply iterates over and deletes everything in Threads.
virtual void RunMainThreadWork()
This is the 3rd step (of 6) in a frame.
void SwapBufferedResources()
This takes all active buffered resources offline (therfore available for async processing) and makes ...
A kind of workunit given exclusive runtime so it can consume time on multiple threads.
Definition: monopoly.h:62
bool NeedToLogDeps
Brief Do we have to log ependencies have they changed since last logged?
Whole FrameCount
Used to store a count of frames from the begining of game execution.
Only used when a thread successfully attempts to gain ownership of a task, or some other tasks succes...
Whole GetWorkUnitAffinityCount() const
Returns the amount of iWorkUnit ready to be scheduled in the Affinity pool.
A thread specific collection of double-buffered and algorithm specific resources. ...
A small wrapper around the system thread.
Definition: thread.h:94
std::vector< WorkUnitKey > WorkUnitsMain
A collection of all the work units that are not Monopolies and do not have affinity for a given threa...
Declares a tool for automatically unlocking a mutex in an exception safe way.
std::vector< MonopolyWorkUnit * >::iterator IteratorMonoply
An iterator suitable for iterating over the main pool of work units.
Int32 AtomicCompareAndSwap32(Int32 *VariableToChange, const Int32 &OldValue, const Int32 &NewValue)
Atomically Compares And Swaps a 32 bit value.
This file has the Declarations for the main FrameScheduler class.
void SetErrorHandler()
When Things crash the logs still needs to be flushed and other resources cleaned. This sets a functio...
virtual MaxInt GetCurrentFrameStart() const
When did this frame start?
MaxInt Now()
Get a timestamp, in microseconds. This will generally be some multiple of the GetTimeStampResolution ...
MaxInt CurrentPauseStart
What time did the current Frame Start at.
void DependenciesChanged(bool Changed=true)
Indicate to the framescheduler if dependencies need to be logged.
virtual void SetFrameLength(const Whole &FrameLength)
Set the Desired length of a frame in microseconds.
void Lock()
Lock the SpinLock.
Definition: spinlock.cpp:61
SpinLock LogResources
Protects DoubleBufferedResources during creation and error handling from being accessed by the LogAgg...
virtual void RemoveWorkUnitAffinity(iWorkUnit *LessWork)
Remove a WorkUnit from the Affinity pool of WorkUnits (and not from the Main group or MonpolyWorkUnit...
virtual Whole GetFrameCount() const
Get the current number of frames that have elapsed.
virtual LogAggregator * GetLogAggregator()
Get the Log aggregation work unit if one exists for emergency loggin purposes only.
std::vector< WorkUnitKey > WorkUnitsMain
A freshly sorted WorkUnitsMain or an empty vector.
virtual void UpdateDependentGraph()
Create a reverse depedent graph that can be used for sorting Mezzanine::Threading::iWorkUnit "iWorkUn...
virtual void SetThreadCount(const Whole &NewThreadCount)
Set the amount of threads to use.
void MEZZ_LIB sleep_for(UInt32 MicroSeconds)
Blocks the calling thread for a period of time.
This is central object in this algorithm, it is responsible for spawning threads and managing the ord...
virtual void AddWorkUnitAffinity(iWorkUnit *MoreWork, const String &WorkUnitName)
Add a normal Mezzanine::Threading::iWorkUnit to this For scheduling.
void LogDependencies()
This sends the dependencies to the LogDestination (Skipping any thread specific resources) ...
Logger & GetUsableLogger()
Get the usable logger for this thread specific resource.
std::vector< WorkUnitKey >::iterator IteratorAffinity
An iterator suitable for iterating over the main pool of work units.
virtual void ResetAllWorkUnits()
This is the 5th step (of 6) in a frame.
void NextFlushForced(Boole Force=true)
Used to indicate the next log flush is abnormally forced.
DefaultRollingAverage< Whole >::Type & GetPauseTimeRollingAverage()
Get The complete record of the past durations of the Pauses Each frame.
This defines a number of workunits that are required for doing some tasks that the Framescheduler req...
virtual void RemoveWorkUnitMonopoly(MonopolyWorkUnit *LessWork)
Remove a WorkUnit from the Monopoly pool of WorkUnits (and not from the Main or Affinity group)...
Contains an interface for a kind of WorkUnit that consumes time on multiple thread.
Gather all the thread specific logs and commit them to the main log.
virtual iWorkUnit * GetNextWorkUnit()
Gets the next available workunit for execution.
virtual Whole GetFrameLength() const
Get the desired length of a frame.
void UpdateWorkUnitKeys(std::vector< WorkUnitKey > &Units)
Iterate over the passed container of WorkUnitKeys and refresh them with the correct data from their r...
std::vector< Thread * > Threads
A way to track an arbitrary number of threads.
MaxInt CurrentFrameStart
What time did the current Frame Start at.
Task is not yet started this frame, this can change without notice.
static void AddErrorScheduler(FrameScheduler *SchedulerToAdd)
Add this scheduler to the list that will log in the event of an unhand;ed exception.
Simple thread safe ways to check and change a specified variable atomically.
long long MaxInt
A large integer type suitable for compile time math and long term microsecond time keeping...
Definition: datatypes.h:190
Thread has completed all work this from frame, will not change until this frame ends.
std::vector< MonopolyWorkUnit * > WorkUnitsMonopolies
A collection of all the monopolies this scheduler must run and keep ownership of. ...
virtual void RunAllMonopolies()
This is the 1st step (of 6) in a frame.
The bulk of the engine components go in this namspace.
Definition: actor.cpp:56
unsigned long Whole
Whole is an unsigned integer, it will be at least 32bits in size.
Definition: datatypes.h:151
WorkSorter * Sorter
If this pointer is non-zero then the WorkSorter it points at will be used to sort WorkUnits...
ThreadId MainThreadID
For some task it is important to know the ID of the main thread.
Logger * GetThreadUsableLogger(ThreadId ID=this_thread::get_id())
Get the logger safe to use this thread.
The thread ID is a unique identifier for each thread.
Definition: thread.h:209
void Unlock()
Unlock the spinlock.
Definition: spinlock.cpp:70
DefaultRollingAverage< Whole >::Type FrameTimeLog
A rolling average of Frame times.
virtual void DoWork(DefaultThreadSpecificStorage::Type &CurrentThreadStorage)
This does the actual work of log aggregation.
Whole GetWorkUnitMonopolyCount() const
Returns the amount of MonopolyWorkUnit ready to be scheduled.
std::string String
A datatype used to a series of characters.
Definition: datatypes.h:159
void WaitUntilNextFrame()
This is the final step (of 6) in a frame.
Whole CurrentThreadCount
How many threads will this try to execute with in the next frame.
virtual void SortWorkUnitsAffinity(bool UpdateDependentGraph_=true)
Sort the WorkUnits that must run on the main thread to allow them to be used more efficiently in the ...
virtual Boole ForceLogFlush()
Forces the FrameScheduler to find Its LogAggregator and make it flush the logs if it can...
DefaultRollingAverage< Whole >::Type PauseTimeLog
A rolling average of Frame Pause times.