libasynql  3.2.0
Asynchronous MySQL access library for PocketMine plugins.
SqlThreadPool.php
Go to the documentation of this file.
1 <?php
2 
3 /*
4  * libasynql
5  *
6  * Copyright (C) 2018 SOFe
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 declare(strict_types=1);
22 
23 namespace poggit\libasynql\base;
24 
27 use function count;
28 
29 class SqlThreadPool implements SqlThread{
30  private $workerFactory;
32  private $workers = [];
34  private $workerLimit;
35 
37  private $bufferSend;
39  private $bufferRecv;
40 
47  public function __construct(callable $workerFactory, int $workerLimit){
48  $this->workerFactory = $workerFactory;
49  $this->workerLimit = $workerLimit;
50  $this->bufferSend = new QuerySendQueue();
51  $this->bufferRecv = new QueryRecvQueue();
52  $this->addWorker();
53  }
54 
55  private function addWorker() : void{
56  $this->workers[] = ($this->workerFactory)($this->bufferSend, $this->bufferRecv);
57  }
58 
59  public function join() : void{
60  foreach($this->workers as $worker){
61  $worker->join();
62  }
63  }
64 
65  public function stopRunning() : void{
66  foreach($this->workers as $worker){
67  $worker->stopRunning();
68  }
69  }
70 
71  public function addQuery(int $queryId, int $mode, string $query, array $params) : void{
72  $this->bufferSend->scheduleQuery($queryId, $mode, $query, $params);
73 
74  // check if we need to increase worker size
75  foreach($this->workers as $worker){
76  if(!$worker->isWorking()){
77  return;
78  }
79  }
80  if(count($this->workers) < $this->workerLimit){
81  $this->addWorker();
82  }
83  }
84 
85  public function readResults(array &$callbacks) : void{
86  while($this->bufferRecv->fetchResult($queryId, $result)){
87  if(!isset($callbacks[$queryId])){
88  throw new InvalidArgumentException("Missing handler for query #$queryId");
89  }
90 
91  $callbacks[$queryId]($result);
92  unset($callbacks[$queryId]);
93  }
94  }
95 
96  public function connCreated() : bool{
97  return $this->workers[0]->connCreated();
98  }
99 
100  public function hasConnError() : bool{
101  return $this->workers[0]->hasConnError();
102  }
103 
104  public function getConnError() : ?string{
105  return $this->workers[0]->getConnError();
106  }
107 
108  public function getLoad() : float{
109  return $this->bufferSend->count() / (float) $this->workerLimit;
110  }
111 }
addQuery(int $queryId, int $mode, string $query, array $params)
__construct(callable $workerFactory, int $workerLimit)