Anoncoin  0.9.4
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012 The Bitcoin developers
2 // Copyright (c) 2013-2014 The Anoncoin Core developers
3 // Distributed under the MIT/X11 software license, see the accompanying
4 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 
6 #ifndef CHECKQUEUE_H
7 #define CHECKQUEUE_H
8 
9 #include <algorithm>
10 #include <vector>
11 
12 #include <boost/foreach.hpp>
13 #include <boost/thread/condition_variable.hpp>
14 #include <boost/thread/locks.hpp>
15 #include <boost/thread/mutex.hpp>
16 
17 template<typename T> class CCheckQueueControl;
18 
28 template<typename T> class CCheckQueue {
29 private:
30  // Mutex to protect the inner state
31  boost::mutex mutex;
32 
33  // Worker threads block on this when out of work
34  boost::condition_variable condWorker;
35 
36  // Master thread blocks on this when out of work
37  boost::condition_variable condMaster;
38 
39  // The queue of elements to be processed.
40  // As the order of booleans doesn't matter, it is used as a LIFO (stack)
41  std::vector<T> queue;
42 
43  // The number of workers (including the master) that are idle.
44  int nIdle;
45 
46  // The total number of workers (including the master).
47  int nTotal;
48 
49  // The temporary evaluation result.
50  bool fAllOk;
51 
52  // Number of verifications that haven't completed yet.
53  // This includes elements that are not anymore in queue, but still in
54  // worker's own batches.
55  unsigned int nTodo;
56 
57  // Whether we're shutting down.
58  bool fQuit;
59 
60  // The maximum number of elements to be processed in one batch
61  unsigned int nBatchSize;
62 
63  // Internal function that does bulk of the verification work.
64  bool Loop(bool fMaster = false) {
65  boost::condition_variable &cond = fMaster ? condMaster : condWorker;
66  std::vector<T> vChecks;
67  vChecks.reserve(nBatchSize);
68  unsigned int nNow = 0;
69  bool fOk = true;
70  do {
71  {
72  boost::unique_lock<boost::mutex> lock(mutex);
73  // first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
74  if (nNow) {
75  fAllOk &= fOk;
76  nTodo -= nNow;
77  if (nTodo == 0 && !fMaster)
78  // We processed the last element; inform the master he can exit and return the result
79  condMaster.notify_one();
80  } else {
81  // first iteration
82  nTotal++;
83  }
84  // logically, the do loop starts here
85  while (queue.empty()) {
86  if ((fMaster || fQuit) && nTodo == 0) {
87  nTotal--;
88  bool fRet = fAllOk;
89  // reset the status for new work later
90  if (fMaster)
91  fAllOk = true;
92  // return the current status
93  return fRet;
94  }
95  nIdle++;
96  cond.wait(lock); // wait
97  nIdle--;
98  }
99  // Decide how many work units to process now.
100  // * Do not try to do everything at once, but aim for increasingly smaller batches so
101  // all workers finish approximately simultaneously.
102  // * Try to account for idle jobs which will instantly start helping.
103  // * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
104  nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
105  vChecks.resize(nNow);
106  for (unsigned int i = 0; i < nNow; i++) {
107  // We want the lock on the mutex to be as short as possible, so swap jobs from the global
108  // queue to the local batch vector instead of copying.
109  vChecks[i].swap(queue.back());
110  queue.pop_back();
111  }
112  // Check whether we need to do work at all
113  fOk = fAllOk;
114  }
115  // execute work
116  BOOST_FOREACH(T &check, vChecks)
117  if (fOk)
118  fOk = check();
119  vChecks.clear();
120  } while(true);
121  }
122 
123 public:
124  // Create a new check queue
125  CCheckQueue(unsigned int nBatchSizeIn) :
126  nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
127 
128  // Worker thread
129  void Thread() {
130  Loop();
131  }
132 
133  // Wait until execution finishes, and return whether all evaluations where succesful.
134  bool Wait() {
135  return Loop(true);
136  }
137 
138  // Add a batch of checks to the queue
139  void Add(std::vector<T> &vChecks) {
140  boost::unique_lock<boost::mutex> lock(mutex);
141  BOOST_FOREACH(T &check, vChecks) {
142  queue.push_back(T());
143  check.swap(queue.back());
144  }
145  nTodo += vChecks.size();
146  if (vChecks.size() == 1)
147  condWorker.notify_one();
148  else if (vChecks.size() > 1)
149  condWorker.notify_all();
150  }
151 
153  }
154 
155  friend class CCheckQueueControl<T>;
156 };
157 
161 template<typename T> class CCheckQueueControl {
162 private:
164  bool fDone;
165 
166 public:
167  CCheckQueueControl(CCheckQueue<T> *pqueueIn) : pqueue(pqueueIn), fDone(false) {
168  // passed queue is supposed to be unused, or NULL
169  if (pqueue != NULL) {
170  assert(pqueue->nTotal == pqueue->nIdle);
171  assert(pqueue->nTodo == 0);
172  assert(pqueue->fAllOk == true);
173  }
174  }
175 
176  bool Wait() {
177  if (pqueue == NULL)
178  return true;
179  bool fRet = pqueue->Wait();
180  fDone = true;
181  return fRet;
182  }
183 
184  void Add(std::vector<T> &vChecks) {
185  if (pqueue != NULL)
186  pqueue->Add(vChecks);
187  }
188 
190  if (!fDone)
191  Wait();
192  }
193 };
194 
195 #endif
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:184
boost::condition_variable condWorker
Definition: checkqueue.h:34
boost::mutex mutex
Definition: checkqueue.h:31
boost::condition_variable condMaster
Definition: checkqueue.h:37
bool Loop(bool fMaster=false)
Definition: checkqueue.h:64
void Thread()
Definition: checkqueue.h:129
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:17
CCheckQueueControl(CCheckQueue< T > *pqueueIn)
Definition: checkqueue.h:167
CCheckQueue(unsigned int nBatchSizeIn)
Definition: checkqueue.h:125
std::vector< T > queue
Definition: checkqueue.h:41
bool fAllOk
Definition: checkqueue.h:50
Queue for verifications that have to be performed.
Definition: checkqueue.h:28
bool Wait()
Definition: checkqueue.h:134
bool fQuit
Definition: checkqueue.h:58
unsigned int nTodo
Definition: checkqueue.h:55
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:139
unsigned int nBatchSize
Definition: checkqueue.h:61
CCheckQueue< T > * pqueue
Definition: checkqueue.h:163