利用 Phalcon7 的异步功能实现,完整源码 https://github.com/dreamsxin/cphalcon7/blob/master/examples/async/Socks5Server.php。
<?php
class Pool
{
    private $channel;
    private $concurrency;
    private $count = 0;
    private $context;
    public function __construct(int $concurrency = 1, int $capacity = 0)
    {
        $this->concurrency = max(1, $concurrency);
        $this->channel = new \Phalcon\Async\Channel($capacity);
        $this->context = \Phalcon\Async\Context::background();
    }
    public function close(?\Throwable $e = null): void
    {
        $this->count = \PHP_INT_MAX;
        $this->channel->close($e);
    }
    public function submit(callable $work, $socket, ...$args): \Phalcon\Async\Awaitable
    {
        if ($this->count < $this->concurrency) {
            $this->count++;
            Socks5Server::info('Pool count '.$this->count);
            \Phalcon\Async\Task::asyncWithContext($this->context, static function (iterable $it) {
                try {
                    foreach ($it as list ($defer, $context, $work, $socket, $args)) {
                        try {
                            $defer->resolve($context->run($work, $socket, ...$args));
                        } catch (\Throwable $e) {
                            Socks5Server::err($e->getMessage());
                            $defer->fail($e);
                        } finally {
                        }
                    }
                } catch (\Throwable $e) {
                    Socks5Server::err($e->getMessage());
                } finally {
                    --$this->count;
                }
            }, $this->channel->getIterator());
        }
        $this->channel->send([
            $defer = new \Phalcon\Async\Deferred(),
            \Phalcon\Async\Context::current(),
            $work,
            $socket,
            $args
        ]);
        return $defer->awaitable();
    }
}
class Socks5Server
{
    static public $debug = false;
    private $server;
    private $host;
    private $port;
    private $clients;
    public function __construct($host, $port, callable $callback = NULL, int $concurrency = 1, int $capacity = 0)
    {
        $this->host = $host;
        $this->port = $port;
        $this->callback = $callback;
        $this->pool = new Pool($concurrency, $capacity);
    }
    public function start()
    {
        $callback = $this->callback;
        $ws = $this;
        $worker = static function ($socket) use ($ws, $callback) {
            $socket->status = Socks5::STATUS_INIT;
            $socket->is_closing = false;
            try {
                $buffer = '';
                while (!$socket->is_closing && null !== ($chunk = $socket->read())) {
                    $buffer .= $chunk;
                    switch ($socket->status) {
                        case Socks5::STATUS_INIT:
                            /**
                             * 协商版本以及验证方式
                             * +---------+-----------------+------------------+
                             * |协议版本 |支持的验证式数量 |验证方式          |
                             * +---------+-----------------+------------------+
                             * |1个字节  |1个字节          |1种式占一个字节   |
                             * +---------+-----------------+------------------+
                             * |0x05     |0x02             |0x00,0x02         |
                             * +---------+-----------------+------------------+
                             */
                            /**
                             * 0x00 无验证需求
                             * 0x01 通用安全服务应用程序接口(GSSAPI)
                             * 0x02 用户名/密码(USERNAME/PASSWORD)
                             * 0x03 至 X’7F’ IANA 分配(IANA ASSIGNED)
                             * 0x80 至 X’FE’ 私人方法保留(RESERVED FOR PRIVATE METHODS)
                             * 0xFF 无可接受方法(NO ACCEPTABLE METHODS)
                             */
                            $authtypes = Socks5::parseAuth($buffer);
                            if ($authtypes === false) {
                                $socket->is_closing = true;
                                break;
                            }
                            if ($authtypes === true)  { // continue
                                break;
                            }
                            $socket->status = Socks5::STATUS_ADDR;
                            $socket->write("\x05\x00"); // TODO: 暂时
                            $buffer = '';
                            break;
                        case Socks5::STATUS_AUTH:
                            
                            if ($callback && \is_callable($callback)) {
                                // TODO
                            }
                            break;
                        case Socks5::STATUS_ADDR:
                            self::info('Socks5::STATUS_ADDR');
                            /**
                             * 建立代理连接
                             * +----------+------------+---------+-----------+-----------------------+------------+
                             * |协议版本  |请求的类型  |保留字段 |地址类型   |地址数据               |地址端口    |
                             * +----------+------------+---------+-----------+-----------------------+------------+
                             * |1个字节   |1个字节     |1个字节  |1个字节    |变长                   |2个字节     |
                             * +----------+------------+---------+-----------+-----------------------+------------+
                             * |0x05      |0x01        |0x00     |0x01       |0x0a,0x00,0x01,0x0a    |0x00,0x50   |
                             * +----------+------------+---------+-----------+-----------------------+------------+
                             */
                            /**
                             * 请求类型
                             * CONNECT : 0x01, 建立代理连接
                             * BIND : 0x02,告诉代理服务器监听目标机器的连接,也就是让代理服务器创建socket监听来自目标机器的连接。FTP这类需要服务端主动联接客户端的的应用场景。
                             *     1. 只有在完成了connnect操作之后才能进行bind操作
                             *     2. bind操作之后,代理服务器会有两次响应, 第一次响应是在创建socket监听完成之后,第二次是在目标机器连接到代理服务器上之后。
                             * UDP ASSOCIATE : 0x03, udp 协议请求代理。
                             */
                            if (strlen($buffer) < 2) {
                                break;
                            }
                            $cmd = ord($buffer[1]);
                            if ($cmd != Socks5::CMD_CONNECT) {
                                self::err("Bad command ".$cmd);
                                $socket->is_closing = true;
                                break;
                            }
                            $headers = Socks5::parseAddr($buffer);
                            if (!$headers)  {
                                self::err('Error header');
                                $socket->is_closing = true;
                                break;
                            }
                            if ($headers === true)  { // continue
                                break;
                            }
                            /**
                             * 数据包转发
                             * +----+------+------+----------+----------+----------+
                             * |RSV | FRAG | ATYP | DST.ADDR | DST.PORT |   DATA   |
                             * +----+------+------+----------+----------+----------+
                             * | 2  |  1   |  1   | Variable |    2     | Variable |
                             * +----+------+------+----------+----------+----------+
                             */
                            $buffer = substr($buffer, $headers[3]);
                            $socket->status  = Socks5::STATUS_CONNECTING;
                            if (!in_array($headers[0], [Socks5::TYPE_IPV4, Socks5::TYPE_HOST, Socks5::TYPE_IPV6])) {
                                self::err('Addr type error');
                                $socket->is_closing = true;
                                break;
                            }
                            $tls = NULL;
                            if ($headers[2] == 443) {
                                $tls = new \Phalcon\Async\Network\TlsClientEncryption();
                                $tls = $tls->withAllowSelfSigned(true);
                            }
                            $socket->client = \Phalcon\Async\Network\TcpSocket::connect($headers[1], $headers[2], $tls);
                            $socket->write(Socks5::REPLY_ADDR);
                            $socket->status  = Socks5::STATUS_STREAM;
    
                            \Phalcon\Async\Task::async(function ($client) use ($socket) {
                                while (!$socket->is_closing && null !== ($chunk = $client->read())) {
                                    $socket->write($chunk);
                                }
                            }, $socket->client);
                            break;
                        case Socks5::STATUS_STREAM:
                            self::info('Socks5::STATUS_STREAM');
                            $socket->client->write($buffer);
                            $buffer = '';
                            break;
                    }
                }
            } catch (\Throwable $e) {
                self::err($e->getMessage());
            } finally {
                $socket->close();
            }
        };
        try {
            $this->server = \Phalcon\Async\Network\TcpServer::listen($this->host, $this->port);
            echo Phalcon\Cli\Color::info('start server listen:'.$this->host.':'.$this->port).PHP_EOL;
            while (true) {
                $socket = $this->server->accept();
                if ($socket === false) {
                    continue;
                }
                $this->pool->submit($worker, $socket);
            }
        } catch (\Throwable $e) {
            self::err($e->getMessage());
        } finally {
            if ($this->server) {
                $this->server->close();
            }
        }
    }
    static public function info($message)
    {
        if (self::$debug) {
            echo Phalcon\Cli\Color::info($message).PHP_EOL;
        }
    }
    static public function err($message)
    {
        echo Phalcon\Cli\Color::error($message).PHP_EOL;
    }
}
$opts = new \Phalcon\Cli\Options('Websocket CLI');
$opts->add([
    'type' => \Phalcon\Cli\Options::TYPE_STRING,
    'name' => 'server',
    'shortName' => 's',
    'required' => false, // 可选,需要用=号赋值
    'help' => "address"
]);
$opts->add([
    'type' => \Phalcon\Cli\Options::TYPE_INT,
    'name' => 'port',
    'shortName' => 'p',
    'required' => false,
    'help' => "port"
]);
$opts->add([
    'type' => \Phalcon\Cli\Options::TYPE_BOOLEAN,
    'name' => 'concurrency',
    'shortName' => 'c',
    'required' => false
]);
$opts->add([
    'type' => \Phalcon\Cli\Options::TYPE_BOOLEAN,
    'name' => 'capacity',
    'shortName' => 'C',
    'required' => false
]);
$opts->add([
    'type' => \Phalcon\Cli\Options::TYPE_BOOLEAN,
    'name' => 'debug',
    'shortName' => 'v',
    'required' => false,
    'help' => "enable debug"
]);
$vals = $opts->parse();
if ($vals === false ) {
    exit;
}
/**
 * 运行 php websocket-server.php
 */
if (isset($vals['debug'])) {
    Socks5Server::$debug = true;
    echo Phalcon\Cli\Color::info('Use debug mode').PHP_EOL;
}
$sserver = new Socks5Server(\Phalcon\Arr::get($vals, 'server', '0.0.0.0'), \Phalcon\Arr::get($vals, 'port', 10002), function($socket, $status, $data) {
    // TODO
}, \Phalcon\Arr::get($vals, 'concurrency', 500), \Phalcon\Arr::get($vals, 'capacity', 1));
$sserver->start();
 
  
  
  
 
 
  
 
 
 