summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--workers/queue/queueitem.class.php11
-rw-r--r--workers/queue/smsrequest.class.php60
-rw-r--r--workers/queue/smsresponse.class.php42
-rw-r--r--workers/smsfactory.class.php136
-rw-r--r--workers/smsreceiver.class.php132
-rw-r--r--workers/smssender.class.php205
6 files changed, 0 insertions, 586 deletions
diff --git a/workers/queue/queueitem.class.php b/workers/queue/queueitem.class.php
deleted file mode 100644
index b4c3d86..0000000
--- a/workers/queue/queueitem.class.php
+++ /dev/null
@@ -1,11 +0,0 @@
-<?php
-namespace gateway\workers\queue;
-
-/**
- * Generic class for all queue items. Not much to see here
- * @author hd@onlinecity.dk
- */
-abstract class QueueItem implements \Serializable
-{
- const TYPE=0;
-} \ No newline at end of file
diff --git a/workers/queue/smsrequest.class.php b/workers/queue/smsrequest.class.php
deleted file mode 100644
index d58fb77..0000000
--- a/workers/queue/smsrequest.class.php
+++ /dev/null
@@ -1,60 +0,0 @@
-<?php
-namespace gateway\workers\queue;
-use gateway\workers\queue\QueueItem;
-use gateway\protocol\SmppClient;
-
-/**
- * Class to represent smsrequest objects. The objects will be serialized in the message queue.
- * This object must contain all data the worker needs to process the SMS.
- * For ease of use it will accept multiple recipients, and convert them to \SMPP\Address objects.
- *
- * Copyright (C) 2011 OnlineCity
- * Licensed under the MIT license, which can be read at: http://www.opensource.org/licenses/mit-license.php
- * @author hd@onlinecity.dk
- */
-class SmsRequest extends QueueItem
-{
- const TYPE=1;
-
- public $id;
- public $sender;
- public $message;
- public $recipients;
- public $dataCoding;
-
- /**
- * A request for a SmsWorker to send a SMS.
- * The $recipients must be an array of international (E164) numbers.
- * The sender can be either alphanumeric, international, or national formatted.
- * If the sender address is 4-digits or less it's assumed to be a short number, and
- * the type will be set to national, otherwise it must be an international number.
- * If $dataCoding is set to default the $message will be automatically converted to GSM 03.38.
- *
- * @param string $message
- * @param array $recipients
- * @param string $sender
- * @param integer $id - the request id, used for matching with the response
- * @param integer $dataCoding
- */
- public function __construct($message,$recipients,$sender,$id,$dataCoding=0x00)
- {
- $this->message = $message;
- $this->recipients = $recipients;
- $this->sender = $sender;
- $this->id = $id;
- $this->dataCoding = $dataCoding;
- }
-
- public function serialize()
- {
- foreach($this->recipients as &$recipient) {
- $recipient = (int) $recipient; // cast them all to integers first
- }
- return serialize(array($this->id,$this->sender,$this->recipients,$this->dataCoding,$this->message));
- }
-
- public function unserialize($serialized)
- {
- list($this->id,$this->sender,$this->recipients,$this->dataCoding,$this->message) = unserialize($serialized);
- }
-} \ No newline at end of file
diff --git a/workers/queue/smsresponse.class.php b/workers/queue/smsresponse.class.php
deleted file mode 100644
index 3fc055f..0000000
--- a/workers/queue/smsresponse.class.php
+++ /dev/null
@@ -1,42 +0,0 @@
-<?php
-namespace gateway\workers\queue;
-use gateway\workers\queue\QueueItem;
-
-/**
- * Objects of this class will be returned by the workers.
- * They contain a mapping between a request id, and one or more sms ids.
- * This mapping is used for processing delivery reports.
- *
- * Copyright (C) 2011 OnlineCity
- * Licensed under the MIT license, which can be read at: http://www.opensource.org/licenses/mit-license.php
- * @author hd@onlinecity.dk
- */
-class SmsResponse extends QueueItem
-{
- const TYPE=2;
-
- public $id;
- public $smsIds;
-
- /**
- * A response from a worker, with IDs returned from SMSC
- *
- * @param integer $id
- * @param array $smsIds
- */
- public function __construct($id,$smsIds)
- {
- $this->id = $id;
- $this->smsIds = $smsIds;
- }
-
- public function serialize()
- {
- return serialize(array($this->id,$this->smsIds));
- }
-
- public function unserialize($serialized)
- {
- list($this->id,$this->smsIds) = unserialize($serialized);
- }
-} \ No newline at end of file
diff --git a/workers/smsfactory.class.php b/workers/smsfactory.class.php
deleted file mode 100644
index fcd206f..0000000
--- a/workers/smsfactory.class.php
+++ /dev/null
@@ -1,136 +0,0 @@
-<?php
-namespace gateway\workers;
-
-/**
- * Factory class for the SmsSender and SmsReceiver workers.
- * This class will fork the required amount of workers, and keep them running until it's closed.
- * The implementation uses the posix and pcntl extensions to support forking. In addition the
- * SmsSenders use the semaphore extension to keep synchronized through a message queue.
- * The message queue must be implemented at a higher level, since this class will just start the
- * workers and keep them running, but not send or receive data.
- *
- * Copyright (C) 2011 OnlineCity
- * Licensed under the MIT license, which can be read at: http://www.opensource.org/licenses/mit-license.php
- * @author hd@onlinecity.dk
- */
-class SmsFactory
-{
- protected $senderClass;
- protected $receiverClass;
- protected $numSenders;
-
- protected $options;
- protected $queue;
- protected $debugHandler;
- protected $senders;
- protected $receiver;
-
- public static $pidFile = 'parent.pid';
-
- public function __construct($senderClass='\gateway\workers\SmsSender',$receiverClass='\gateway\workers\SmsReceiver',$numSenders=10)
- {
- $this->senderClass = $senderClass;
- $this->receiverClass = $receiverClass;
- $this->numSenders = $numSenders;
- $this->senders = array();
- }
-
-
- /**
- * Start all workers.
- *
- * As a bare minimum the following options should be set:
- * hostname,port,username,password
- * For more options see SmsSender and SmsReceiver classes.
- *
- * @param unknown_type $options
- * @param unknown_type $queue
- */
- public function startAll($options, $queue)
- {
- if (!is_resource($queue)) throw new \InvalidArgumentException('Queue must be an IPC message queue resource');
- if (empty($options)) throw new \InvalidArgumentException('Options must be set');
- if (!isset($options['hostname'])) throw new \InvalidArgumentException('Hostname option must be set');
- if (!isset($options['port'])) throw new \InvalidArgumentException('Port option must be set');
-
- $this->options = $options;
- $this->queue = $queue;
- $this->debugHandler = isset($options['debug_handler']) ? $options['debug_handler'] : 'error_log';
- call_user_func($this->debugHandler, "Factory started with pid: ".getmypid());
- file_put_contents(self::$pidFile, getmypid());
-
- $this->fork();
- }
-
- protected function constructReceiver()
- {
- $class = $this->receiverClass;
- $hostname = isset($this->options['recv_hostname']) ? $this->options['recv_hostname'] : $this->options['hostname'];
- $port = isset($this->options['recv_port']) ? $this->options['recv_port'] : $this->options['port'];
-
- return new $class($hostname,$port,$this->options);
- }
-
- protected function constructSender()
- {
- $class = $this->senderClass;
- return new $class($this->options['hostname'],$this->options['port'],$this->queue,$this->options);
- }
-
- private function fork()
- {
- $constructReceiver=true;
-
- for($i=0;$i<($this->numSenders+1);$i++) {
- switch ($pid = pcntl_fork()) {
- case -1: // @fail
- die('Fork failed');
- break;
- case 0: // @child
- $worker = $constructReceiver ? $this->constructReceiver() : $this->constructSender();
- call_user_func($this->debugHandler, "Constructed: ".get_class($worker)." with pid: ".getmypid());
- $worker->run();
- break;
- default: // @parent
-
- // Store PID
- if ($constructReceiver) {
- $this->receiver = $pid;
- $constructReceiver = false;
- } else {
- $this->senders[$pid] = $pid;
- }
-
- if ($i<($this->numSenders)) { // fork more
- continue;
- }
-
- // All children are spawned, wait for something to happen, and respawn if it does
- $exitedPid = pcntl_wait($status);
-
- // What happened to our child?
- if (pcntl_wifsignaled($status)) {
- $what = 'was signaled';
- } else if (pcntl_wifexited($status)) {
- $what = 'has exited';
- } else {
- $what = 'returned for some reason';
- }
- call_user_func($this->debugHandler, "Pid: $exitedPid $what");
-
- // Respawn
- if ($exitedPid == $this->receiver) {
- $constructReceiver = true;
- $this->receiver = null;
- } else {
- unset($this->senders[$exitedPid]);
- }
- $i--;
- call_user_func($this->debugHandler, "Will respawn new ".($constructReceiver ? 'receiver' : 'sender'). " to cover loss in one second");
- sleep(1); // Sleep for one second before respawning child
- continue;
- break;
- }
- }
- }
-} \ No newline at end of file
diff --git a/workers/smsreceiver.class.php b/workers/smsreceiver.class.php
deleted file mode 100644
index 2e2d26d..0000000
--- a/workers/smsreceiver.class.php
+++ /dev/null
@@ -1,132 +0,0 @@
-<?php
-namespace gateway\workers;
-
-use gateway\protocol\SmppClient;
-use gateway\transport\TSocket;
-use gateway\transport\TTransportException;
-
-/**
- * SMS receiver worker.
- * This worker maintains it's own connection to the SMSC, and will exit if the parent process also exists.
- * The implementation uses the posix, pcntl to support forking.
- * This class does not receive or send messages to the IPC message queue as the senders. You should override
- * this basic processing methods with something more useful.
- *
- * Copyright (C) 2011 OnlineCity
- * Licensed under the MIT license, which can be read at: http://www.opensource.org/licenses/mit-license.php
- * @author hd@onlinecity.dk
- */
-class SmsReceiver
-{
- protected $client;
- protected $transport;
- protected $options;
- protected $lastEnquireLink;
-
- /**
- * Construct a new SmsSender
- * This will prepare the transport and SMPP client.
- * It works with a single host.
- *
- * It will use the following default options, but you can override them by specifing an $options array.
- * array(
- * 'persistent_connections' => false,
- * 'debug_handler' => null,
- * 'debug' => false,
- * 'enquire_link_timeout' => 30,
- * 'username' => 'JaneDoe',
- * 'password' => 'iHeartPasswordz'
- * )
- *
- * @param string $hostname
- * @param integer $port
- * @param array $options
- */
- public function __construct($hostname, $port, $options=null)
- {
- // Merge options
- if (is_null($options)) $options = array();
- $defaultOptions = array(
- 'persistent_connections' => false,
- 'debug_handler' => null,
- 'debug' => false,
- 'enquire_link_timeout' => 30,
- 'recv_timeout' => 30000,
- 'send_timeout' => 10000,
- 'username' => 'JaneDoe',
- 'password' => 'iHeartPasswordz'
- );
- $this->options = $options = array_merge($defaultOptions,$options);
-
-
- $this->transport = new TSocket($hostname, $port, $options['persistent_connections'], $options['debug_handler']);
- $this->transport->setDebug($options['debug']);
-
- $this->client = new SmppClient($this->transport);
- $this->client->debug = $options['debug'];
-
- // Set transport timeouts
- $this->transport->setRecvTimeout($options['recv_timeout']);
- $this->transport->setSendTimeout($options['send_timeout']);
-
- $this->lastEnquireLink = 0;
- }
-
- /**
- * Open transport connection and bind as a receiver
- */
- protected function connect()
- {
- $this->transport->open();
- $this->client->bindReceiver($this->options['username'],$this->options['password']);
- }
-
- /**
- * Do fancy processing here, you probably want to override this method.
- * @param \SMPP\SMS $sms
- */
- protected function processSms(\SMPP\SMS $sms)
- {
- call_user_func($this->options['debug_handler'] ?: 'error_log', "Processing SMS:\n".print_r($sms,true)); // dummy
- }
-
- /**
- * This is a callback method, which is called when the connection times out
- */
- protected function refreshConnection()
- {
- $this->client->enquireLink();
- $this->lastEnquireLink = time();
- }
-
- /**
- * The main loop of the worker
- */
- public function run()
- {
- $this->connect();
-
- while (true) {
- // commit suicide if the parent process no longer exists
- if (posix_getppid() == 1) exit();
-
- // Make sure to send enquire link periodically to keep the link alive
- if (time()-$this->lastEnquireLink >= $this->options['enquire_link_timeout']) {
- $this->refreshConnection();
- }
-
- // Read the SMS and send it to processing
- try {
- $sms = $this->client->readSMS();
- $this->processSms($sms);
- } catch (TTransportException $e) {
- if (time()-$this->lastEnquireLink > 1) { // connection probably timed out, send enquireLink, and try again
- $this->refreshConnection();
- continue;
- } else {
- throw $e; // oh no... something went very wrong?
- }
- }
- }
- }
-}
diff --git a/workers/smssender.class.php b/workers/smssender.class.php
deleted file mode 100644
index 774a46f..0000000
--- a/workers/smssender.class.php
+++ /dev/null
@@ -1,205 +0,0 @@
-<?php
-namespace gateway\workers;
-
-use gateway\protocol\SmppClient;
-use gateway\protocol\GsmEncoder;
-use gateway\workers\queue\SmsRequest;
-use gateway\workers\queue\SmsResponse;
-
-/**
- * SMS sender worker.
- * Since the worker uses an IPC message queue, one can fork several of these workers.
- * Each worker maintains it's own connection to the SMSC, and will exit if the parent process also exists.
- * The implementation uses the posix, pcntl and semaphore extensions to be able to fork several workers and
- * keep everything synchronised through a IPC Message Queue.
- * The default implementation pushes ID'es from the SMSC back through the same message queue. If you do not
- * override this remember to read the ID'es from the queue, or it will fill up eventually.
- *
- * Copyright (C) 2011 OnlineCity
- * Licensed under the MIT license, which can be read at: http://www.opensource.org/licenses/mit-license.php
- * @author hd@onlinecity.dk
- */
-class SmsSender
-{
- protected $client;
- protected $transport;
- protected $queue;
- protected $options;
- protected $lastEnquireLink;
-
- /**
- * Construct a new SmsSender
- * This will prepare the transport and SMPP client.
- * It works with either a pool of hosts (arrays), or a single host.
- *
- * It will use the following default options, but you can override them by specifing an $options array.
- * array(
- * 'persistent_connections' => false,
- * 'null_terminate_octetstrings' => false,
- * 'use_msg_payload_for_csms' => true,
- * 'registered_delivery_flag' => \SMPP\REG_DELIVERY_SMSC_BOTH,
- * 'debug_handler' => null,
- * 'debug' => false,
- * 'enquire_link_timeout' => 30,
- * 'recv_timeout' => 10,
- * 'send_timeout' => 10,
- * 'username' => 'JaneDoe',
- * 'password' => 'iHeartPasswordz',
- * 'max_object_size' => 65536
- * )
- *
- * @param mixed $hostname
- * @param mixed $port
- * @param resource $queue
- * @param array $options
- */
- public function __construct($hostname, $port, $queue, $options=null)
- {
- if (!is_resource($queue)) throw new \InvalidArgumentException('Queue must be an IPC message queue resource');
-
- // Merge options
- if (is_null($options)) $options = array();
- $defaultOptions = array(
- 'persistent_connections' => false,
- 'null_terminate_octetstrings' => false,
- 'use_msg_payload_for_csms' => true,
- 'registered_delivery_flag' => 0x01,
- 'debug_handler' => null,
- 'debug' => false,
- 'enquire_link_timeout' => 30,
- 'recv_timeout' => 10000,
- 'send_timeout' => 10000,
- 'username' => 'JaneDoe',
- 'password' => 'iHeartPasswordz',
- 'max_object_size' => 65536
- );
- $this->options = $options = array_merge($defaultOptions,$options);
-
-
- // If given an array of hosts use a socket-pool, otherwise just a single socket
- if (is_array($hostname) && is_array($port)) {
- $this->transport = new \gateway\transport\TSocketPool($hostname, $port, $options['persistent_connections'], $options['debug_handler']);
- } else {
- $this->transport = new \gateway\transport\TSocket($hostname, $port, $options['persistent_connections'], $options['debug_handler']);
- }
- $this->transport->setDebug($options['debug']);
-
- $this->client = new SmppClient($this->transport);
- $this->client->debug = $options['debug'];
-
- // Set static options for SMPP client.
- SmppClient::$sms_null_terminate_octetstrings = $options['null_terminate_octetstrings'];
- SmppClient::$sms_use_msg_payload_for_csms = $options['use_msg_payload_for_csms'];
- SmppClient::$sms_registered_delivery_flag = $options['registered_delivery_flag'];
-
- // Set transport timeouts
- $this->transport->setRecvTimeout($options['recv_timeout']);
- $this->transport->setSendTimeout($options['send_timeout']);
-
- $this->queue = $queue;
- $this->lastEnquireLink = 0;
- }
-
- /**
- * Open transport connection and bind as a transmitter
- */
- protected function connect()
- {
- $this->transport->open();
- $this->client->bindTransmitter($this->options['username'],$this->options['password']);
- }
-
- /**
- * The main loop of the worker
- */
- public function run()
- {
- $this->connect();
-
- while (true) {
- // commit suicide if the parent process no longer exists
- if (posix_getppid() == 1) exit();
-
- // Make sure to send enquire link periodically to keep the link alive
- if (time()-$this->lastEnquireLink >= $this->options['enquire_link_timeout']) {
- $this->ping();
- $this->lastEnquireLink = time();
- }
-
- // Check for new messages
- $res = msg_receive($this->queue, SmsRequest::TYPE, $msgtype, $this->options['max_object_size'], $sms, true, \MSG_IPC_NOWAIT, $errorcode);
-
- if (!$res && $errorcode === \MSG_ENOMSG) { // No messages for us
- // Sleep 0.01 seconds between each iteration, to avoid wasting CPU
- usleep(10000);
- continue;
- }
- if (!$res) { // something bad happend to our queue
- exit('Message queue receive failed, parent probably exited, errorcode: '.$errorcode);
- }
- if (!$sms instanceof SmsRequest) throw new \InvalidArgumentException('Unknown message received');
-
- // Prepare message
- if ($sms->dataCoding == \SMPP\DATA_CODING_DEFAULT) {
- $encoded = GsmEncoder::utf8_to_gsm0338($sms->message);
- $encSender = GsmEncoder::utf8_to_gsm0338($sms->sender);
- } else {
- $encoded = $message;
- $encSender = $sms->sender;
- }
-
- // Contruct SMPP Address objects
- if (!ctype_digit($sms->sender)) {
- $sender = new \SMPP\Address($encSender,\SMPP\TON_ALPHANUMERIC);
- } else if ($sms->sender < 10000) {
- $sender = new \SMPP\Address($sms->sender,\SMPP\TON_NATIONAL,\SMPP\NPI_E164);
- } else {
- $sender = new \SMPP\Address($sms->sender,\SMPP\TON_INTERNATIONAL,\SMPP\NPI_E164);
- }
-
- // Send message
- $ids = array();
- try {
- $i = 0;
- foreach ($sms->recipients as $number) {
- $address = new \SMPP\Address($number,\SMPP\TON_INTERNATIONAL,\SMPP\NPI_E164);
- $ids[] = $this->client->sendSMS($sender, $address, $encoded, null, $sms->dataCoding);
-
- if ($i++ % 10 == 0) { // relay back for every 10 SMSes
- $this->relaySmsIds($sms->id, $ids);
- $ids = array();
- }
- }
- } catch (\Exception $e) {
- if (!empty($ids)) { // make sure to report any partial progress back
- $this->relaySmsIds($sms->id, $ids);
- }
- throw $e; // rethrow
- }
-
- $this->relaySmsIds($sms->id, $ids);
- }
- }
-
- /**
- * Ping the SMSC (enquire link).
- * Override to ie. also ping other servers
- */
- protected function ping()
- {
- $this->client->enquireLink();
- }
-
- /**
- * Relay the SMS ids returned by the SMSC back.
- * The default implementation uses the message queue to send them back, but you can override this method.
- *
- * @param string $requestId
- * @param array $ids
- */
- protected function relaySmsIds($requestId, $ids)
- {
- // Send the IDs back with a SmsResponse object
- msg_send($this->queue, SmsResponse::TYPE, new SmsResponse($requestId, $ids), true);
- }
-}