view lib/php_mapi.inc @ 0:b32ff9c84370

Initial version, copied from changeset bb36451050ad of MonetDB.
author Sjoerd Mullender <sjoerd@acm.org>
date Wed, 14 Sep 2016 15:12:34 +0200 (2016-09-14)
parents
children dd84b57c63f2
line wrap: on
line source
<?php
/**
   *	Implementation of the MAPI protocol (v9).
   *
   *
   * 	Provides:
   * -  mapi_query($data) {
   * -  mapi_store($data) {
   * -  php_parse_tuples($rows) {
   * -  mapi_connect() {
   * -  mapi_authenticate($user, $passwd, $hash, $salt, $dbname) {
   * -  mapi_read() {
   * -  mapi_write($msg) {
   * -  set_timezone() {
   * -  format_command($cmd) {
   * -  mapi_connect_proxy() {
   * -  mapi_open() {
   * - mapi_close() {
   *
**/

	define("MAX_PACKET_SIZE", 8190); // Maximum packet size

	define("REPLY_SIZE", "250"); // Set the initial reply size

	define("Q_TABLE", "1"); // SELECT operation
	define("Q_UPDATE", "2"); // INSERT/UPDATE operations
	define("Q_CREATE", "3"); // CREATE/DROP TABLE operations
	define("Q_TRANSACTION", "4"); // TRANSACTION
	define("Q_PREPARE", "5");
	define("Q_BLOCK", "6"); // QBLOCK message

	define("MSG_REDIRECT", "^"); // auth redirection through merovingian
	define("MSG_QUERY", "&");
	define("MSG_SCHEMA_HEADER", "%");
	define("MSG_INFO", "!"); // info response from mserver
	define("MSG_TUPLE", "[");
	define("MSG_PROMPT", "");

	define("PROTOCOL_v9", 9); // supported protocol(s)

	define("LANG_SQL", "sql");

	define("MEROVINGIAN", "merovingian");
	define("MSERVER", "mserver");
	define("MONETDB", "monetdb");

	define('MAX_MEROVINGIAN_ITER', 10); // Maximum number of iterations during proxied auth through merovingian

	define('PLACEHOLDER', '?'); // Used for string escaping.

	/**
	* $connection_pool contains a list of the active database connections
	*/
	$connection_pool = array();

	/**
	* Stores the last error reported by the server
	*/
	$last_error = "";


	/**
	* Execute an SQL query and return the resulting handle by reference.
	*/
	function mapi_execute($conn=NULL, $query) {
		global $connection_pool;
		global $rows;
		/**
		* Query handle
		*
		* "conn" - id of the connection that fired the query
		* "header" - table header
		* "record_set" - retrieved record set (if present)
		* "operation" - query type
		*
		*/

		// if multiple connections are present, require the user to specify which one to use.
		if ($conn == NULL) {
			return FALSE;
		} else if (($socket = $conn["socket"]) == NULL) {
			return FALSE;
		}

		/* Fire the query and read back the response */
		$buf = mapi_write($socket, format_query($query, $conn["lang"]));
		$data = mapi_read($socket);

		if ($conn['lang'] == LANG_SQL) {
			$handle = array("conn" => "", "header" => array(), "query" => "", "record_set" => array(), "operation" => -1, "last_row" => 0);
			$handle["conn"] = $conn["id"];

			if ( ($operation = mapi_store($data, $handle)) == FALSE) {
				return FALSE;
			}

			/* The query produced a record set */
			if ($operation == Q_TABLE || $operation == Q_BLOCK) {
				// fetch the whole result set
				if ($handle["query"]["index"] < $handle["query"]["rows"]) {
					mapi_fetch_next($conn, $handle);
				}
			}

			if ($handle == NULL) {
				return FALSE;
			}

			return $handle;
		}
	}

	function mapi_fetch_next($conn, &$handle){
		if ($conn['socket'] == NULL){
			return FALSE;
		}
		$socket = $conn['socket'];

		$offset = 10;
		while ($handle["query"]["index"] < $handle["query"]["rows"]) {
			// export a new window
			$left_rows = $handle["query"]["rows"] - $handle["query"]["index"];

			$exp_size = min($offset, $left_rows);
			$offset += 100;
			mapi_write($socket, format_command("export " . $handle["query"]["id"] . " " . $handle["query"]["index"] . " " . $exp_size));
			$data = mapi_read($socket);

			if ( ($operation = mapi_store($data, $handle)) == FALSE ) {
				return FALSE;
			}

			$handle["query"]["index"] += $exp_size;
			$handle["operation"] = $operation;
		}

		return $handle;
	}

	function mapi_store($data, &$handle) {
		global $last_error;
		// global $rows;

		$data = explode("\n", $data);

		$operation = "";
		$header = ""; // store temporary header informations;

		$rows = ""; // stores (partially) retrieved rows
		foreach ($data as $row) {
			/*
			 	PHP5.2 complains when $row[0] is accessed with:

				Notice: Uninitialized string offset: 0

				In order to avoid the E_NOTICE error substr($row, 0, 1) is used
				to access the first character of a string

			*/
			if (substr($row, 0, 1) == MSG_QUERY) {
				if ($row[1] == Q_TABLE) {
					$operation = Q_TABLE;
					// save info about the query
					$fields = explode(" ", $row);
					$handle["query"] = array("id" => $fields[1], "rows" => $fields[2], "cols" => $fields[3], "index" => $fields[4]);

				} else if ($row[1] == Q_UPDATE) {
					$operation = Q_UPDATE;
					$fields = explode(" ", $row);
					$handle["query"] = array("affected" => $fields[1]);
				} else if ($row[1] == Q_CREATE) {
					$operation = Q_CREATE;
				} else if ($row[1] == Q_TRANSACTION) {
					$operation = Q_TRANSACTION;
				} else if ($row[1] == Q_BLOCK) {
					// add Q_BLOCK to the record set
					$operation = Q_BLOCK;
				}
			} else if (substr($row, 0, 1) == MSG_SCHEMA_HEADER){
				// process the table header
				$header = $header . $row . "\n";
			} else if (substr($row, 0, 1) == MSG_TUPLE) {
				// process tuples
				$rows .= $row;
			} else if (substr($row, 0, 1) == MSG_PROMPT) {
				fast_array_merge($handle["record_set"], php_parse_tuples($rows));
			} else if (substr($row, 0, 1) == MSG_INFO) {
				$last_error = $row;
				return FALSE;
			}
		}
		/*
		if ($record_set != "") {
			//$handle["record_set"] = array_merge($handle["record_set"], $record_set);
			$handle["record_set"] = $handle["record_set"] . $record_set;
		}*/

		if ($header != "") {

			$handle["header"] = php_parse_header($header);
			/* Store the number of fields returned by the dataset */
		   	if ($operation == Q_TABLE || $operation == Q_BLOCK ) {
				$handle["query"]["fields"] = count($handle["header"]["fields"]);
			}
		}

		$handle["operation"] = $operation;

		return $operation;
	}

	/* Concatenates two arrays in place */
	function fast_array_merge(&$a1, $a2) {
		foreach ($a2 as $row) {
			$a1[] = $row;
		}
	}

	function php_parse_tuples($rows) {
		//$parsed_rows = "";

		$rows = rtrim($rows, "\t]");
		return explode("\t]", $rows);
//		print_r($parsed_rows);
		/*
		foreach ($rows as &$row) {

			$row = ltrim($row, "[ ");
			$row = explode(",\t",  $row);

			foreach ($row as &$field) {
				$field = stripslashes($field);
				// strip left/right \" chars and right ','
				$field = rtrim($field, '"');
				$field = ltrim($field, '"');
			}
			$parsed_rows[] = $row;
		}
		*/
	//	return $parsed_rows;
	}

	function php_parse_row($row) {
		$row = ltrim($row, "[ ");
		$row = explode(",\t",  $row);
		foreach ($row as &$field) {
			if ($field == "NULL") {
				$field = NULL;
			}
			else {

				$field = stripslashes($field);
				// strip left/right \" chars and right ','
				$field = rtrim($field, '"');
				$field = ltrim($field, '"');
			}
		}
		return $row;
	}


	function php_parse_header($header) {
		$header = explode("\n", $header);
		$name = $header[0];

		$header_array = array();

		/* Field names */
		$header[1] = ltrim($header[1], "% ");
		$header[1] = explode("#", $header[1]);
		$header_array["fields"] = explode(",\t", $header[1][0]);

		/* Field types */
		$header[2] = ltrim($header[2], "% ");
		$header[2] = explode("#", $header[2]);
		$header_array["types"] = explode(",\t", $header[2][0]);

		return $header_array;
	}

	/*
		$conn_opts is an array containing:
		username =>
		password =>
		host 	 =>
		port 	 =>
		database =>
		hash	 =>

	*/
	function mapi_connect(&$socket, &$options, $merovingian_iter=NULL) {
		global $last_error;
		global $connection_pool;

		$host 	= $options["host"];
		$port 	= $options["port"];

		$user 	= $options["username"];
		$passwd = $options["password"];
		$hash 	= $options["hashfunc"];
		$dbname = $options["database"];

		$lang 	= $options["lang"];


		/* No merovingian redirect. Perform an actual connection. */
		if ($merovingian_iter == NULL) {
			if (socket_connect($socket, $host, $port) == FALSE) {
            	$last_error = socket_strerror(socket_last_error());
              	throw new Exception('Cannot connect to server: ' . $last_error);
            }
		}


		// get server challenge
		$challenge = mapi_read($socket);

		/*
			Array
			(
			    [0] => void
			    [1] => merovingian
			    [2] => 8
			    [3] => plain
			    [4] => LIT
			)

		*/
		$credentials = explode(":", $challenge);
		$algos = explode(',', $credentials[3]);


		// $challenge[0] contains the salt
		if ($credentials[2] == PROTOCOL_v9) {
			// $credentials[5] contains pwhash
			// $credentials[0] contains the hash
			mapi_authenticate_v9($socket, $user, $passwd, $hash, $algos, $credentials[0], $dbname, $credentials[5]);
		} else {
			$last_error = "Protocol " . $credentials[2] . " not supported. Aborting.";
			return FALSE;
		}

		$response = mapi_read($socket);

		/* Follow the first redirect */

		if ($response != MSG_PROMPT) {
		// not ready to authenticate yet
			if ($response[0] == MSG_REDIRECT) {
				$redirects = explode("\n", $response);
	                 	/* Follow the first redirect */

				if ( ($redirects[0] == "") || (substr($redirects[0], 0, 6) != "^mapi:") ) {
					print "Invalid redirect " . $redirects[0] . "\n";
					return FALSE;
				}

				$link = substr($redirects[0], 6, strlen($redirects[0]));
				$redirect_to = parse_url($link);
				//print_r($redirect_to);
				if ($redirect_to['scheme'] == MEROVINGIAN) {
					if ($merovingian_iter < MAX_MEROVINGIAN_ITER) {
						$merovingian_iter++;
						if (mapi_connect($socket, $options, $merovingian_iter) == FALSE) {
							return FALSE;
						}
					} else {
						$last_error = "Maximum number of merovingian iterations exceeded during authentication\n";
						return FALSE;
					}
				}
			 	else if($redirect_to['scheme'] == MONETDB ) {

					$options['host'] = $redirect_to['host'];
					$options['port']     = $redirect_to['port'];
					$options['database'] = ltrim($redirect_to['path'], '/');

					socket_close($socket);
					$socket = mapi_open();
					if (mapi_connect($socket, $options) == FALSE) {
						return FALSE;
					}

			    } else {
					print $response;
					return FALSE;
				}

			} else if ($response[0] == MSG_INFO) {
				$last_error = $response;
				return FALSE;
			}
		}

		return TRUE;
	}

	/* Hash function names have to be uppercase */
	function mapi_authenticate_v9($socket, $user, $passwd, $hash, $algos, $salt, $dbname, $pwhash) {
		$auth_string = "";
		if ( (is_array($algos) && (! in_array(strtoupper($hash), $algos)) )  ) 			 {
			$last_error = "Hash function " . $hash . " not supported";
			return FALSE;
		}

		$pwhashsum = hash(strtolower($pwhash), $passwd);
		$hashsum   = hash(strtolower($hash), $pwhashsum . $salt);

		$auth_string = "BIG:" . $user . ":{" . strtoupper($hash) . "}" . $hashsum . ":sql:" . $dbname . ":";

		mapi_write($socket, $auth_string);
	}

	// decode the header and get the requested amount of data
	function mapi_read($socket=NULL) {
		# get the first 2 bytes
		if ( ($header = socket_read($socket, 2)) == FALSE) {
			$last_error = socket_strerror(socket_last_error());
			throw new Exception('Lost connection with the server: ' . $last_error);
		}
		$data = "";

		$chunk_size = ((ord($header[1]) << 7) | (ord($header[0]) >> 1));
		// keep reading until we have everything
		while (strlen($data) < $chunk_size) {
			$data .= socket_read($socket, $chunk_size - strlen($data));
		}


		while ((ord($header[0]) & 1) == 0 ) {
			if ( ($header = socket_read($socket, 2)) == FALSE) {
				$last_error = socket_strerror(socket_last_error());
				throw new Exception('Lost connection with the server: ' . $last_error);
			}

			$chunk_size = ( ((ord($header[1]))  << 7) | (ord($header[0]) >> 1) );

			$block = "";
			while (strlen($block) < $chunk_size) {
				if ( ($block .= socket_read($socket, $chunk_size - strlen($block))) == FALSE) {
					$last_error = socket_strerror(socket_last_error());
					throw new Exception('Lost connection with the server: ' . $last_error);
				}
			}


			$data = $data . $block;
		}
		if (strlen($data) == 0) {
			return "";
		}

		return $data;
	}


	// encode data and send it to the server. Returns the number of bytes sent.
	function mapi_write($socket=NULL, $msg) {
		// print "Msg_len: " . strlen($msg) . "\n";
		global $last_error;

		$fb = 0;
		$sb = 0;

		$pos = 0;
		$data = "";

		$buf = 0;

		$is_final = FALSE;
		while (! $is_final) {
			$data  = substr($msg, $pos, min(MAX_PACKET_SIZE, (strlen($msg) - $pos))  );
			$pos += strlen($data);

			$end = 0; // more packets will follow
			if ( (strlen($msg) - $pos) == 0) {
				$is_final = TRUE;
				$end = 1;
			}

			$fb = (strlen($data) << 1) | $end;

			/**
			  * socket_write() does not guarantee all data to be transmitted.
			  * Make sure that the buffer is flushed.
			*/

			if ( ($buf = socket_flush($socket, pack("v", $fb) . $data)) == FALSE) {
				$last_error = socket_strerror(socket_last_error());
				return -1;
			}

		}
		return $buf;
	}

	function set_timezone($socket=NULL) {
		global $last_error;

		$tz_offset = "'" . date('P') . "'"; /* Difference to Greenwich time (GMT) with colon between hours and minutes */

		$query = "SET TIME ZONE INTERVAL " . $tz_offset . " HOUR TO MINUTE";

		$buf = mapi_write($socket, format_query($query, LANG_SQL)); // set_timezone is called only when connecting to an sql db

		$response = mapi_read($socket);

		if ($response == "") {
			return TRUE;
		} else {

			$last_error = $response;
			return $response;
		}
	}

	function format_command($cmd) {
		return "X" . $cmd;
	}


	function mapi_connect_proxy(&$options) {
		global $last_error;
		global $connection_pool;
		global $pconnect_count;

		$merovingian_iter = 0;

		/**
		  * When connecting, the function would first try to find a (persistent) link that's already
		  * open with the same host, username and password. If one is found, an identifier for it will be returned
		  * instead of opening a new connection.
		  * TODO: move this check to mapi_connect() to deal with options arrays rewritten by redirects.
		*/
		if ($options['persistent'] == TRUE) {
			if (count($connection_pool) > 0) {
				foreach ($connection_pool as $conn) {
					if ( ($conn["persistent"] == TRUE) && ($conn["dbname"] == $options['database']) && ($conn['username'] == $options['username']) && ($conn["password"] == hash($options['hashfunc'], $options['password']) && ($conn['host']) == $options['host']) && ( $conn['port'] == $options['port'] ) ) {
						return $conn;
					}
				}
			}
		}

		$socket	= mapi_open();

		if ( mapi_connect($socket, $options, $merovingian_iter) == TRUE ) {
			/* Connected */

			// Create a new connection instance and insert an entry in the connections table
			$id = mapi_generate_id();
			if ($options['lang'] == LANG_SQL) {


				/*
					PHP requires a timezone to be specified in the configuration environment; if not specified
					either in php.ini or via date_default_timezone_set(), a default is set 'Europe/Berlin'.

					PHP complains in case we query the OS to get system's timezone (E_STRICT error).

					To avoid unexpected behaviours and warnings at execution time we set a timezone on mserver
					only if PHP interpreter is aware of it (php.ini contains a date.timezone entry).
				*/
				if (ini_get("date.timezone")) {
					set_timezone($socket); // set the timezone according to the system's configuration
				}

				// export the reply size (max number of tuples returned at query executions)
				mapi_write($socket, format_command("reply_size " . REPLY_SIZE));
				if (strlen($response = mapi_read($socket)) > 0 ) {
					// something went wrong
					$last_error = $response;
					return FALSE;
				}
			} else {
				return FALSE;
			}
		} else {
			socket_close($socket);
			return FALSE;
		}

		$connection = array("id" => $id, "socket" => $socket, "host" => $options["host"], "port" => $options["port"], "dbname" => $options['database'], "username" => $options["username"], "password" => hash($options['hashfunc'], $options["password"]), "transactions" => array(), "persistent" => $options["persistent"], "lang" => $options['lang']);

		$connection_pool[] = $connection;

		return $connection;
	}

	function mapi_connected($conn=NULL) {
		global $last_error;
		global $connection_pool;

		if ($conn == NULL) {
			return FALSE;
		} else {
			return mapi_ping($conn);
		}

		return FALSE;
	}

	function mapi_ping($conn=NULL) {
		if ($conn != NULL) {
			switch($conn['lang']) {
				case LANG_SQL:
					$res = mapi_execute($conn, "select true;");
					break;
			}
			if ($res != NULL) {
				if (!is_array($res['query']) ||
					!array_key_exists('id', $res['query']) ||
					mapi_free_result($conn['id'], $res['query']['id']))
			   	{
					return TRUE;
				}
			}
		}
		return FALSE;
	}

	/* Returns a pointer to the current (last initialized) connection */
	function mapi_get_current_conn() {
		global $connection_pool;

		if (count($connection_pool) > 0) {
			return end($connection_pool);
		} else {
			return FALSE;
		}
	}

	/* Returns a socket */
	function mapi_open() {
		$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
		if ($socket == FALSE) {
			$last_error = socket_strerror(socket_last_error());
			return FALSE;
		}

		return $socket;
	}

	function mapi_close($conn=NULL) {
		global $connection_pool;

		/* TODO: start closing connection at the end of the array! */
		if ($conn == NULL) {
			if ((count($connection_pool) == 1) && ($connection_pool[0]["persistent"]) == FALSE) {
				$socket = $connection_pool[0]["socket"];
				socket_close($socket);

				foreach ($connection_pool as $field) {
					if (isset($field)) {
						unset($field);
					}
				}
				unset($connection_pool);

				return TRUE;
			}

		} else {
			$socket = $conn["socket"];
			socket_close($socket);

			/* remove the $conn from the pool */
			// Create anonymous callback function to filter results for connection.
			$function_body = 'return ( $input[\'id\'] != "'.$conn["id"].'" );';
			$function_name = create_function('$input', $function_body);
			// Filter the results array using the anonymous callback function.
			$connection_pool = array_filter($connection_pool, $function_name);

			if (isset($conn)) {
				foreach ($conn as $field) {
					if (isset($field)) {
						unset($field);
					}
				}
				unset($conn);
			}

			return TRUE;
		}

		return FALSE;
	}

	function mapi_free_result($conn_id, $res_id) {
		global $connection_pool;
		global $last_error;
		/* Fetch the connection from the pool */

		$conn = NULL;
		foreach ($connection_pool as $connection) {
			if ($connection["id"] == $conn_id) {
				$conn = $connection;
				break;
			}
		}

		if ($conn == NULL) {
			return FALSE;
		}

		$socket = $conn["socket"];

		/* Send a close command */
		$cmd = "close " . $res_id;

		mapi_write($socket, format_command($cmd));
		$last_error = mapi_read($socket);

		if ($last_error != "") {
			return FALSE;
		}

		return TRUE;
	}


	function mapi_generate_id(){
		global $connection_pool;

		$connections = array();
		if ($connection_pool !== null) {
		    foreach($connection_pool as $conn) {
			$connections[] = $conn["id"];
		    }
		}

		$id = hash("sha1", time());
		if (count($connections) > 0) {
			while (in_array($id, $connections) ) {
				$id = hash("sha1", time());
			}
		}

		return $id;
	}


	function format_query($query, $lang) {
		if ($lang == LANG_SQL) {

			return "s" . $query . "\n;";
		}
		return FALSE;
	}

	/* Write data through a socket; make sure that the buffer is actually flushed */
	function socket_flush($socket, $data) {
		$buf = 0;
		$bytes = strlen($data);

		if ($socket == NULL) {
			return FALSE;
		}

		while ( ($bytes - $buf) > 0 ) {
			$buf += socket_write($socket, substr($data, $buf, $bytes), $bytes - $buf);
			//print "Buf: " . $buf . "\n";

			if ($buf == FALSE) {
				return FALSE;
			}

		}
		return $buf;
	}

	function mapi_quote($string, $size) {
		$quoted_string = ""; # upper bound: $size * 2 + 1

		$index = 0; // current position in the original string
		$t = 0; // current position in the quoted string

		/* Parse the original string character by character and copy it in a new buffer escaping characters */
        while ($size < 0 ? $quoted_string[$t] : $size > 0) {
                if ($size > 0)
                        $size--;
                switch ($string[$index]) {
                	case '\n':
                        $quoted_string[$t++] = '\\';
                        $quoted_string[$t++] = 'n';
                        break;
                	case '\t':
                        $quoted_string[$t++] = '\\';
                        $quoted_string[$t++] = 't';
                        break;
                	case PLACEHOLDER:
                        $quoted_string[$t++] = '\\';
                        $quoted_string[$t++] = PLACEHOLDER;
                        break;
                	case '\\':
                        $quoted_string[$t++] = '\\';
                        $quoted_string[$t++] = '\\';
                        break;
                	case '\'':
                        $quoted_string[$t++] = '\'';
                       break;
                	case '\"':
                        $quoted_string[$t++] = '\\';
                        $quoted_string[$t++] = '"';
                        break;
                	case '\0':
                        $quoted_string[$t++] = '\\';
                        $quoted_string[$t++] = '0';
                        break;
                default:
                        $quoted_string[$t++] = $string[$index];
                        break;
                }
                $index++;
                /* also deal with binaries */
        }

		if (is_array($quoted_string)) {
			$quoted_string = implode($quoted_string);
		}
        return $quoted_string;
	}

?>