libasynql  3.2.0
Asynchronous MySQL access library for PocketMine plugins.
DataConnectorImpl.php
Go to the documentation of this file.
1 <?php
2 
3 /*
4  * libasynql
5  *
6  * Copyright (C) 2018 SOFe
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 declare(strict_types=1);
22 
24 
25 use Error;
26 use Exception;
40 use ReflectionClass;
41 use function array_merge;
42 use function array_pop;
43 use function count;
44 use function json_encode;
45 use function str_replace;
46 use function usleep;
47 
50  private $plugin;
52  private $thread;
54  private $loggingQueries;
56  private $queries = [];
57  private $handlers = [];
58  private $queryId = 0;
60  private $placeHolder;
61  private $task;
62 
69  public function __construct(Plugin $plugin, SqlThread $thread, ?string $placeHolder, bool $logQueries = false){
70  $this->plugin = $plugin;
71  $this->thread = $thread;
72  $this->loggingQueries = $logQueries;
73  $this->placeHolder = $placeHolder;
74 
75  $this->task = new CallbackTask([$this, "checkResults"]);
76  $this->plugin->getScheduler()->scheduleRepeatingTask($this->task, 1);
77  }
78 
79  public function setLoggingQueries(bool $loggingQueries) : void{
80  $this->loggingQueries = !libasynql::isPackaged() && $loggingQueries;
81  }
82 
83  public function isLoggingQueries() : bool{
84  return $this->loggingQueries;
85  }
86 
87  public function loadQueryFile($fh, string $fileName = null) : void{
88  $parser = new GenericStatementFileParser($fileName, $fh);
89  $parser->parse();
90  foreach($parser->getResults() as $result){
91  $this->loadQuery($result);
92  }
93  }
94 
95  public function loadQuery(GenericStatement $stmt) : void{
96  if(isset($this->queries[$stmt->getName()])){
97  throw new InvalidArgumentException("Duplicate GenericStatement: {$stmt->getName()}");
98  }
99  $this->queries[$stmt->getName()] = $stmt;
100  }
101 
102  public function executeGeneric(string $queryName, array $args = [], ?callable $onSuccess = null, ?callable $onError = null) : void{
103  $this->executeImpl($queryName, $args, SqlThread::MODE_GENERIC, function() use ($onSuccess){
104  if($onSuccess !== null){
105  $onSuccess();
106  }
107  }, $onError);
108  }
109 
110  public function executeGenericRaw(string $query, array $args = [], ?callable $onSuccess = null, ?callable $onError = null) : void{
111  $this->executeImplRaw($query, $args, SqlThread::MODE_GENERIC, function() use ($onSuccess){
112  if($onSuccess !== null){
113  $onSuccess();
114  }
115  }, $onError);
116  }
117 
118  public function executeChange(string $queryName, array $args = [], ?callable $onSuccess = null, ?callable $onError = null) : void{
119  $this->executeImpl($queryName, $args, SqlThread::MODE_CHANGE, function(SqlChangeResult $result) use ($onSuccess){
120  if($onSuccess !== null){
121  $onSuccess($result->getAffectedRows());
122  }
123  }, $onError);
124  }
125 
126  public function executeChangeRaw(string $query, array $args = [], ?callable $onSuccess = null, ?callable $onError = null) : void{
127  $this->executeImplRaw($query, $args, SqlThread::MODE_CHANGE, function(SqlChangeResult $result) use ($onSuccess){
128  if($onSuccess !== null){
129  $onSuccess($result->getAffectedRows());
130  }
131  }, $onError);
132  }
133 
134  public function executeInsert(string $queryName, array $args = [], ?callable $onInserted = null, ?callable $onError = null) : void{
135  $this->executeImpl($queryName, $args, SqlThread::MODE_INSERT, function(SqlInsertResult $result) use ($onInserted){
136  if($onInserted !== null){
137  $onInserted($result->getInsertId(), $result->getAffectedRows());
138  }
139  }, $onError);
140  }
141 
142  public function executeInsertRaw(string $query, array $args = [], ?callable $onInserted = null, ?callable $onError = null) : void{
143  $this->executeImplRaw($query, $args, SqlThread::MODE_INSERT, function(SqlInsertResult $result) use ($onInserted){
144  if($onInserted !== null){
145  $onInserted($result->getInsertId(), $result->getAffectedRows());
146  }
147  }, $onError);
148  }
149 
150  public function executeSelect(string $queryName, array $args = [], ?callable $onSelect = null, ?callable $onError = null) : void{
151  $this->executeImpl($queryName, $args, SqlThread::MODE_SELECT, function(SqlSelectResult $result) use ($onSelect){
152  if($onSelect !== null){
153  $onSelect($result->getRows(), $result->getColumnInfo());
154  }
155  }, $onError);
156  }
157 
158  public function executeSelectRaw(string $query, array $args = [], ?callable $onSelect = null, ?callable $onError = null) : void{
159  $this->executeImplRaw($query, $args, SqlThread::MODE_SELECT, function(SqlSelectResult $result) use ($onSelect){
160  if($onSelect !== null){
161  $onSelect($result->getRows(), $result->getColumnInfo());
162  }
163  }, $onError);
164  }
165 
166  private function executeImpl(string $queryName, array $args, int $mode, callable $handler, ?callable $onError) : void{
167  if(!isset($this->queries[$queryName])){
168  throw new InvalidArgumentException("The query $queryName has not been loaded");
169  }
170  $query = $this->queries[$queryName]->format($args, $this->placeHolder, $outArgs);
171 
172  $this->executeImplRaw($query, $outArgs, $mode, $handler, $onError);
173  }
174 
175  private function executeImplRaw(string $query, array $args, int $mode, callable $handler, ?callable $onError) : void{
176  $queryId = $this->queryId++;
177  $trace = libasynql::isPackaged() ? null : new Exception("(This is the original stack trace for the following error)");
178  $this->handlers[$queryId] = function($result) use ($handler, $onError, $trace){
179  if($result instanceof SqlError){
180  $this->reportError($onError, $result, $trace);
181  }else{
182  try{
183  $handler($result);
184  }catch(Exception $e){
185  if(!libasynql::isPackaged()){
186  $prop = (new ReflectionClass(Exception::class))->getProperty("trace");
187  $prop->setAccessible(true);
188  $newTrace = $prop->getValue($e);
189  $oldTrace = $prop->getValue($trace);
190  for($i = count($newTrace) - 1, $j = count($oldTrace) - 1; $i >= 0 && $j >= 0 && $newTrace[$i] === $oldTrace[$j]; --$i, --$j){
191  array_pop($newTrace);
192  }
194  $prop->setValue($e, array_merge($newTrace, [
195  [
196  "function" => Terminal::$COLOR_YELLOW . "--- below is the original stack trace ---" . Terminal::$FORMAT_RESET,
197  ],
198  ], $oldTrace));
199  }
200  throw $e;
201  }catch(Error $e){
202  if(!libasynql::isPackaged()){
203  $exceptionProperty = (new ReflectionClass(Exception::class))->getProperty("trace");
204  $exceptionProperty->setAccessible(true);
205  $oldTrace = $exceptionProperty->getValue($trace);
206 
207  $errorProperty = (new ReflectionClass(Error::class))->getProperty("trace");
208  $errorProperty->setAccessible(true);
209  $newTrace = $errorProperty->getValue($e);
210 
211  for($i = count($newTrace) - 1, $j = count($oldTrace) - 1; $i >= 0 && $j >= 0 && $newTrace[$i] === $oldTrace[$j]; --$i, --$j){
212  array_pop($newTrace);
213  }
215  $errorProperty->setValue($e, array_merge($newTrace, [
216  [
217  "function" => Terminal::$COLOR_YELLOW . "--- below is the original stack trace ---" . Terminal::$FORMAT_RESET,
218  ],
219  ], $oldTrace));
220  }
221  throw $e;
222  }
223  }
224  };
225  if($this->loggingQueries){
226  $this->plugin->getLogger()->debug("Queuing mode-$mode query: " . str_replace(["\r\n", "\n"], "\\n ", $query) . " | Args: " . json_encode($args));
227  }
228  $this->thread->addQuery($queryId, $mode, $query, $args);
229  }
230 
231  private function reportError(?callable $default, SqlError $error, ?Exception $trace) : void{
232  if($default !== null){
233  try{
234  $default($error, $trace);
235  $error = null;
236  }catch(SqlError $err){
237  $error = $err;
238  }
239  }
240  if($error !== null){
241  $this->plugin->getLogger()->error($error->getMessage());
242  if($error->getQuery() !== null){
243  $this->plugin->getLogger()->debug("Query: " . $error->getQuery());
244  }
245  if($error->getArgs() !== null){
246  $this->plugin->getLogger()->debug("Args: " . json_encode($error->getArgs()));
247  }
248  if($trace !== null){
249  $this->plugin->getLogger()->debug("Stack trace: " . $trace->getTraceAsString());
250  }
251  }
252  }
253 
254  public function waitAll() : void{
255  while(!empty($this->handlers)){
256  $this->checkResults();
257  usleep(1000);
258  }
259  }
260 
261  public function checkResults() : void{
262  $this->thread->readResults($this->handlers);
263  }
264 
265  public function close() : void{
266  $this->thread->stopRunning();
267  $this->plugin->getScheduler()->cancelTask($this->task->getTaskId());
268  }
269 }
executeSelect(string $queryName, array $args=[], ?callable $onSelect=null, ?callable $onError=null)
executeChangeRaw(string $query, array $args=[], ?callable $onSuccess=null, ?callable $onError=null)
executeGenericRaw(string $query, array $args=[], ?callable $onSuccess=null, ?callable $onError=null)
executeInsert(string $queryName, array $args=[], ?callable $onInserted=null, ?callable $onError=null)
__construct(Plugin $plugin, SqlThread $thread, ?string $placeHolder, bool $logQueries=false)
executeGeneric(string $queryName, array $args=[], ?callable $onSuccess=null, ?callable $onError=null)
loadQueryFile($fh, string $fileName=null)
executeInsertRaw(string $query, array $args=[], ?callable $onInserted=null, ?callable $onError=null)
executeSelectRaw(string $query, array $args=[], ?callable $onSelect=null, ?callable $onError=null)
executeChange(string $queryName, array $args=[], ?callable $onSuccess=null, ?callable $onError=null)