libasynql  3.2.0
Asynchronous MySQL access library for PocketMine plugins.
SqlSlaveThread.php
Go to the documentation of this file.
1 <?php
2 
3 /*
4  * libasynql
5  *
6  * Copyright (C) 2018 SOFe
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 declare(strict_types=1);
22 
23 namespace poggit\libasynql\base;
24 
25 use ClassLoader;
33 use function usleep;
35 use const PTHREADS_INHERIT_INI;
36 
37 abstract class SqlSlaveThread extends Thread implements SqlThread{
38  private static $nextSlaveNumber = 0;
39 
40  private $running = true;
41  protected $slaveNumber;
42  protected $bufferSend;
43  protected $bufferRecv;
44  protected $connCreated = false;
45  protected $connError;
46  protected $working = false;
47 
48  protected function __construct(QuerySendQueue $bufferSend = null, QueryRecvQueue $bufferRecv = null){
49  $this->slaveNumber = self::$nextSlaveNumber++;
50  $this->bufferSend = $bufferSend ?? new QuerySendQueue();
51  $this->bufferRecv = $bufferRecv ?? new QueryRecvQueue();
52 
53  if(!libasynql::isPackaged()){
57  $cl = Server::getInstance()->getPluginManager()->getPlugin("DEVirion")->getVirionClassLoader();
58  $this->setClassLoader($cl);
59  }
61  }
62 
63  public function run() : void{
64  $this->registerClassLoader();
65  $error = $this->createConn($resource);
66  $this->connCreated = true;
67  $this->connError = $error;
68 
69  if($error !== null){
70  return;
71  }
72 
73  while($this->running){
74  while($this->bufferSend->fetchQuery($queryId, $mode, $query, $params)){
75  $this->working = true;
76  try{
77  $result = $this->executeQuery($resource, $mode, $query, $params);
78  $this->bufferRecv->publishResult($queryId, $result);
79  }catch(SqlError $error){
80  $this->bufferRecv->publishError($queryId, $error);
81  }
82  }
83  $this->working = false;
84  usleep(100);
85  }
86  $this->close($resource);
87  }
88 
94  public function isWorking() : bool{
95  return $this->working;
96  }
97 
98  public function stopRunning() : void{
99  $this->running = false;
100  }
101 
102  public function quit(){
103  $this->stopRunning();
104  }
105 
106  public function addQuery(int $queryId, int $mode, string $query, array $params) : void{
107  $this->bufferSend->scheduleQuery($queryId, $mode, $query, $params);
108  }
109 
110  public function readResults(array &$callbacks) : void{
111  while($this->bufferRecv->fetchResult($queryId, $result)){
112  if(!isset($callbacks[$queryId])){
113  throw new InvalidArgumentException("Missing handler for query #$queryId");
114  }
115 
116  $callbacks[$queryId]($result);
117  unset($callbacks[$queryId]);
118  }
119  }
120 
121  public function connCreated() : bool{
122  return $this->connCreated;
123  }
124 
125  public function hasConnError() : bool{
126  return $this->connError !== null;
127  }
128 
129  public function getConnError() : ?string{
130  return $this->connError;
131  }
132 
133  protected abstract function createConn(&$resource) : ?string;
134 
144  protected abstract function executeQuery($resource, int $mode, string $query, array $params) : SqlResult;
145 
146  protected abstract function close(&$resource) : void;
147 }
addQuery(int $queryId, int $mode, string $query, array $params)
executeQuery($resource, int $mode, string $query, array $params)