Различия

Показаны различия между двумя версиями страницы.

Ссылка на это сравнение

Предыдущая версия справа и слева Предыдущая версия
Следующая версия
Предыдущая версия
php:pthreads [2017/01/23 21:08] – [Threading for PHP - Share Nothing] mirocowphp:pthreads [2017/01/25 00:46] (текущий) – [Статьи] mirocow
Строка 1: Строка 1:
-====== Threading for PHP - Share Nothing ======+{{tag>php languages extension}} 
 + 
 +====== Pthread (Threading for PHP - Share Nothing======
  
   * https://github.com/krakjoe/pthreads ([[https://github.com/krakjoe/pthreads/tree/PHP5|5.x]] - [[https://github.com/krakjoe/pthreads|7.x]])   * https://github.com/krakjoe/pthreads ([[https://github.com/krakjoe/pthreads/tree/PHP5|5.x]] - [[https://github.com/krakjoe/pthreads|7.x]])
Строка 5: Строка 7:
   * https://github.com/krakjoe/pthreads-polyfill   * https://github.com/krakjoe/pthreads-polyfill
  
 +===== Установка =====
 +
 +==== Brew (MacOS) ====
 +
 +<code bash>
 +$ brew install php70-pthreads
 +</code>
 +
 +==== pecl ====
 +
 +<code bash>
 +$ pecl install pthread
 +</code>
 +
 +==== Docker ====
 +
 +<code bash>
 +$ docker pull jdecool/php-pthreads
 +</code>
 ===== Примеры ===== ===== Примеры =====
  
Строка 27: Строка 48:
 $pool->shutdown(); $pool->shutdown();
 </code> </code>
 +
 +<code php>
 +<?php
 +
 +function demanding(...$params) {
 +  /* you have parameters here */
 +  return array(rand(), rand());
 +}
 +
 +class Task extends Collectable {
 +  public function __construct(Threaded $result, $params) {
 +    $this->result = $result;
 +    $this->params = $params;
 +  }
 +
 +  public function run() {
 +    $this->result[] = 
 +      demanding(...$this->params);
 +  }
 +
 +  protected $result;
 +  protected $params;
 +}
 +
 +$pool = new Pool(16);
 +
 +$result = new Threaded();
 +
 +while (@$i++<16) {
 +  $pool->submit(
 +    new Task($result, $argv));
 +}
 +
 +$pool->shutdown();
 +
 +var_dump($result);
 +?>
 +</code>
 +
 +<code php>
 +class Scanner {
 +    public function performScan() {
 +        // Add initial task
 +        $initialTask = "Task 1";
 +        TaskQueue::addTask($initialTask);
 +
 +        $i = 0;
 +        while(true) {
 +            // Get task from queue
 +           $task = TaskQueue::getTask();
 +           if ($task == null)
 +               break;
 +
 +            // Handle task
 +            $parser[$i] = new Parser($task);
 +            $parser[$i]->start();
 +
 +            // Wait
 +            $thread = $parser[$i];
 +            $thread->synchronized(function() use($thread) {
 +                while (!$thread->awake) {
 +                    $thread->wait();
 +                }
 +            });
 +
 +            // Join
 +            $i++;
 +        }
 +
 +        // Done
 +        echo "Done\n";
 +    }
 +}
 +
 +class Parser extends Thread {
 +    private $task;
 +
 +    public function __construct($task) {
 +        $this->task = $task;
 +    }
 +
 +    public function run() {
 +        // Perform a time-consuming operation
 +        // This operation adds an unknown number of extra tasks
 +        sleep(1);
 +
 +        // Add new tasks to queue
 +        foreach(range(0, 4) as $i) {
 +            TaskQueue::addTask("Task {$i}");
 +        }
 +
 +        // Notify
 +        $this->synchronized(function(){
 +            $this->awake = true;
 +            $this->notify();
 +        });
 +    }
 +}
 +
 +class TaskQueue {
 +    private static $queue = array();
 +
 +    public static function addTask($task) {
 +        self::$queue[] = $task;
 +        echo "Add task to queue!\n";
 +    }
 +
 +    public static function getTask() {
 +        if (sizeof(self::$queue) > 0) {
 +            $task = array_shift(self::$queue);
 +            echo "Get task from queue!\n";
 +            return $task;
 +        }
 +    }
 +}
 +
 +$scanner = new Scanner();
 +$scanner->performScan();
 +</code>
 +
 +<code php>
 +<?php
 +class Referee extends Threaded {
 +
 +    public function find(string $ident, Threaded $reference) {
 +        return $this->synchronized(function () use($ident, $reference) {
 +            if (isset($this[$ident])) {
 +                return $this[$ident];
 +            } else return ($this[$ident] = $reference);
 +        });
 +    }
 +
 +    public function foreach(Closure $closure) {
 +        $this->synchronized(function() use($closure) {
 +            foreach ($this as $ident => $reference) {
 +                $closure($ident, $reference);
 +            }
 +        });
 +    }
 +}
 +
 +class Test extends Thread {
 +
 +    public function __construct(Referee $referee, string $ident, bool $delay) {
 +        $this->referee = $referee;
 +        $this->ident   = $ident;
 +        $this->delay   = $delay;
 +    }
 +
 +    public function run() {
 +        while (1) {
 +            if ($this->delay) {
 +                $this->synchronized(function(){
 +                    $this->wait(1000000);
 +                });
 +            }
 +
 +            $reference = 
 +                $this->referee->find($this->ident, $this);
 +
 +            /* do something with reference here, I guess */         
 +
 +            /* do something with all references here */
 +            $this->referee->foreach(function($ident, $reference){
 +                var_dump(Thread::getCurrentThreadId(),
 +                        $reference->getIdent(), 
 +                        $reference->isRunning());
 +            });
 +        }
 +    }
 +
 +    public function getIdent() {
 +        return $this->ident;
 +    }
 +
 +    private $referee;
 +    private $ident;
 +    private $delay;
 +}
 +
 +$referee = new Referee();
 +$threads = [];
 +$thread = 0;
 +$idents = [
 +    "smelly",
 +    "dopey",
 +    "bashful",
 +    "grumpy",
 +    "sneezy",
 +    "sleepy",
 +    "happy",
 +    "naughty"
 +];
 +
 +while ($thread < 8) {
 +    $threads[$thread] = new Test($referee, $idents[$thread], rand(0, 1));
 +    $threads[$thread]->start();
 +    $thread++;
 +}
 +
 +foreach ($threads as $thread)
 +    $thread->join();
 +?>
 +</code>
 +
 +<code php>
 +class Wallet{
 +    public $balance;
 +    public function __construct($money){
 +        $this->balance = $money;
 +    }
 +    public function getBalance(){
 +        return $this->balance;
 +    }
 +    public function setBalance($value){
 +        $this->balance = $value;
 +    }
 +}
 +class MyThread extends Thread{
 +    private $wallet;
 +    private $std;
 +    public function __construct($wallet,$std){
 +        $this->wallet = $wallet;
 +        $this->std = $std;
 +    }
 +    public function run(){
 +        $this->synchronized(function($thread){
 +                $hack = $this->wallet;
 +                if($hack->getBalance() - 80 >0){
 +                    sleep(1);
 +                    $hack->setBalance($hack->getBalance() - 80);
 +                    echo $this->getThreadId() . "reduce 80 successful<br/>Current num is:" . $hack->getBalance() . "<Br/>";
 +                    //Here is Wrong!  The result is bool(false)????!!!!
 +                    var_dump($hack == $this->wallet);
 +                }
 +                 else
 +                     echo $this->getThreadId() . "reduce fail<br/>Current num is:" . $hack->getBalance() . "<br/>";
 +            
 +        },$this->std);
 +    }
 +}
 +$wallet = new Wallet(200);
 +$std = new stdClass();
 +for($x=0;$x<3;$x++){
 +    $pool[] = new MyThread($wallet,$std);
 +    $pool[$x]->start();
 +}
 +</code>
 +
 +===== Примеры использования / Помошники (IDE Helpers) =====
 +
 +  * https://github.com/zerustech/pthreads-tutorial
 +  * https://github.com/unusorin/pthreads (IDE support for pthreads)
 +  * https://github.com/krakjoe/promises
 +===== Документация =====
 +
 +  * http://docs.php.net/manual/ru/book.pthreads.php
 +
 +===== Статьи =====
 +
 +  * https://habrahabr.ru/post/300952/
 +  * [[https://habrahabr.ru/post/193270/|PHP IPC — Межпроцессное взаимодействие в PHP]]
 +  * [[https://habrahabr.ru/post/75454/|Почти настоящая многопоточность средствами php 5]]
 +  * http://www.smddzcy.com/2016/01/tutorial-multi-threading-in-php7-pthreads/
 +  * https://blog.madewithlove.be/post/thread-carefully/