diff --git a/src/AMQPLibConnector.php b/src/AMQPLibConnector.php index a862914..93ba043 100644 --- a/src/AMQPLibConnector.php +++ b/src/AMQPLibConnector.php @@ -103,7 +103,8 @@ public function PostToExchange($connection, $details, $task, $params) $ch->queue_bind( $details['binding'], /* queue name - "celery" */ - $details['exchange'] /* exchange name - "celery" */ + $details['exchange'], /* exchange name - "celery" */ + $details['binding'] /* routing key - "celery" */ ); $msg = new \PhpAmqpLib\Message\AMQPMessage( diff --git a/src/Celery.php b/src/Celery.php index f16bd50..62b3c1b 100644 --- a/src/Celery.php +++ b/src/Celery.php @@ -53,20 +53,23 @@ class Celery extends CeleryAbstract { /** - * @param string host - * @param string login - * @param string password - * @param string vhost AMQP vhost, may be left empty or NULL for non-AMQP backends like Redis - * @param string exchange AMQP exchange to use. For Redis it maps to queue key name. See CELERY_DEFAULT_EXCHANGE in Celery docs. (set to 'celery' when in doubt) - * @param string binding AMQP binding a.k.a. routing key. See CELERY_DEFAULT_ROUTING_KEY. (set to 'celery' when in doubt) - * @param int port - * @param string connector Which connector library to use. One of: 'pecl', 'php-amqplib', 'php-amqplib-ssl', 'redis' - * @param int result_expire Expire time for result queue, milliseconds (for AMQP exchanges only) - * @param array ssl_options Used only for 'php-amqplib-ssl' connections, an associative array with values as defined here: http://php.net/manual/en/context.ssl.php + * @param string $host + * @param string $login + * @param string $password + * @param string $vhost AMQP vhost, may be left empty or NULL for non-AMQP backends like Redis + * @param string $exchange AMQP exchange to use. For Redis it maps to queue key name. See CELERY_DEFAULT_EXCHANGE in Celery docs. (set to 'celery' when in doubt) + * @param string $binding AMQP binding a.k.a. routing key. See CELERY_DEFAULT_ROUTING_KEY. (set to 'celery' when in doubt) + * @param int $port + * @param string $connector Which connector library to use. One of: 'pecl', 'php-amqplib', 'php-amqplib-ssl', 'redis' + * @param bool $persistent_messages False = transient queue, True = persistent queue. Check "Using Transient Queues" in Celery docs (set to false when in doubt) + * @see {http://docs.celeryproject.org/en/latest/userguide/optimizing.html#using-transient-queues} + * @param int $result_expire Expire time for result queue, milliseconds (for AMQP exchanges only) + * @param array $ssl_options Used only for 'php-amqplib-ssl' connections, an associative array with values as defined here: http://php.net/manual/en/context.ssl.php */ - public function __construct($host, $login, $password, $vhost, $exchange='celery', $binding='celery', $port=5672, $connector=false, $result_expire=0, $ssl_options=[]) + + public function __construct($host, $login, $password, $vhost, $exchange='celery', $binding='celery', $port=5672, $connector=null, $persistent_messages=false, $result_expire=0, $ssl_options=[]) { - $broker_connection = [ + $backend_connection = $broker_connection = [ 'host' => $host, 'login' => $login, 'password' => $password, @@ -74,13 +77,13 @@ public function __construct($host, $login, $password, $vhost, $exchange='celery' 'exchange' => $exchange, 'binding' => $binding, 'port' => $port, - 'connector' => $connector, + 'connector' => $connector !== false ? $connector : null, + 'persistent_messages' => $persistent_messages, 'result_expire' => $result_expire, 'ssl_options' => $ssl_options ]; - $backend_connection = $broker_connection; - $items = $this->BuildConnection($broker_connection); - $items = $this->BuildConnection($backend_connection, true); + $this->BuildConnection($broker_connection); + $this->BuildConnection($backend_connection, true); } } diff --git a/src/CeleryAbstract.php b/src/CeleryAbstract.php index 019cd6b..a44b7fa 100644 --- a/src/CeleryAbstract.php +++ b/src/CeleryAbstract.php @@ -41,7 +41,7 @@ public function BuildConnection($connection_details, $is_backend = false) $connection_details = $this->SetDefaultValues($connection_details); $ssl = !empty($connection_details['ssl_options']); - if ($connection_details['connector'] === false) { + if (is_null($connection_details['connector'])) { $connection_details['connector'] = AbstractAMQPConnector::GetBestInstalledExtensionName($ssl); } $amqp = AbstractAMQPConnector::GetConcrete($connection_details['connector']);