* @license http://arc.semsol.org/license * @homepage * @package ARC2 */ ARC2::inc('Class'); class ARC2_Store extends ARC2_Class { function __construct($a, &$caller) { parent::__construct($a, $caller); } function __init() {/* db_con */ parent::__init(); $this->table_lock = 0; $this->triggers = $this->v('store_triggers', array(), $this->a); $this->queue_queries = $this->v('store_queue_queries', 0, $this->a); $this->is_win = (strtolower(substr(PHP_OS, 0, 3)) == 'win') ? true : false; $this->max_split_tables = $this->v('store_max_split_tables', 10, $this->a); $this->split_predicates = $this->v('store_split_predicates', array(), $this->a); } /* */ function getName() { return $this->v('store_name', 'arc', $this->a); } function getTablePrefix() { if (!isset($this->tbl_prefix)) { $r = $this->v('db_table_prefix', '', $this->a); $r .= $r ? '_' : ''; $r .= $this->getName() . '_'; $this->tbl_prefix = $r; } return $this->tbl_prefix;; } /* */ function createDBCon() { foreach (array('db_host' => 'localhost', 'db_user' => '', 'db_pwd' => '', 'db_name' => '') as $k => $v) { $this->a[$k] = $this->v($k, $v, $this->a); } if (!$db_con = mysql_connect($this->a['db_host'], $this->a['db_user'], $this->a['db_pwd'], true)) { return $this->addError(mysql_error()); } $this->a['db_con'] = $db_con; if (!mysql_select_db($this->a['db_name'], $db_con)) { $fixed = 0; /* try to create it */ if ($this->a['db_name']) { $this->queryDB(" CREATE DATABASE IF NOT EXISTS " . $this->a['db_name'] . " DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci ", $db_con, 1 ); if (mysql_select_db($this->a['db_name'], $db_con)) { $this->queryDB("SET NAMES 'utf8'", $db_con); $fixed = 1; } } if (!$fixed) { return $this->addError(mysql_error($db_con)); } } if (preg_match('/^utf8/', $this->getCollation())) { $this->queryDB("SET NAMES 'utf8'", $db_con); } // This is RDF, we may need many JOINs... $this->queryDB("SET SESSION SQL_BIG_SELECTS=1", $db_con); return true; } function getDBCon($force = 0) { if ($force || !isset($this->a['db_con'])) { if (!$this->createDBCon()) { return false; } } if (!$force && !@mysql_thread_id($this->a['db_con'])) return $this->getDBCon(1); return $this->a['db_con']; } function closeDBCon() { if ($this->v('db_con', false, $this->a)) { @mysql_close($this->a['db_con']); } unset($this->a['db_con']); } function getDBVersion() { if (!$this->v('db_version')) { $this->db_version = preg_match("/^([0-9]+)\.([0-9]+)\.([0-9]+)/", mysql_get_server_info($this->getDBCon()), $m) ? sprintf("%02d-%02d-%02d", $m[1], $m[2], $m[3]) : '00-00-00'; } return $this->db_version; } /* */ function getCollation() { $rs = $this->queryDB('SHOW TABLE STATUS LIKE "' . $this->getTablePrefix(). 'setting"', $this->getDBCon()); return ($rs && ($row = mysql_fetch_array($rs)) && isset($row['Collation'])) ? $row['Collation'] : ''; } function getColumnType() { if (!$this->v('column_type')) { $tbl = $this->getTablePrefix() . 'g2t'; $rs = $this->queryDB('SHOW COLUMNS FROM ' . $tbl . ' LIKE "t"', $this->getDBCon()); $row = $rs ? mysql_fetch_array($rs) : array('Type' => 'mediumint'); $this->column_type = preg_match('/mediumint/', $row['Type']) ? 'mediumint' : 'int'; } return $this->column_type; } /* */ function hasHashColumn($tbl) { $var_name = 'has_hash_column_' . $tbl; if (!isset($this->$var_name)) { $tbl = $this->getTablePrefix() . $tbl; $rs = $this->queryDB('SHOW COLUMNS FROM ' . $tbl . ' LIKE "val_hash"', $this->getDBCon()); $this->$var_name = ($rs && mysql_fetch_array($rs)); } return $this->$var_name; } /* */ function hasFulltextIndex() { if (!isset($this->has_fulltext_index)) { $this->has_fulltext_index = 0; $tbl = $this->getTablePrefix() . 'o2val'; $rs = $this->queryDB('SHOW INDEX FROM ' . $tbl, $this->getDBCon()); while ($row = mysql_fetch_array($rs)) { if ($row['Column_name'] != 'val') continue; if ($row['Index_type'] != 'FULLTEXT') continue; $this->has_fulltext_index = 1; break; } } return $this->has_fulltext_index; } function enableFulltextSearch() { if ($this->hasFulltextIndex()) return 1; $tbl = $this->getTablePrefix() . 'o2val'; $this->queryDB('CREATE FULLTEXT INDEX vft ON ' . $tbl . '(val(128))', $this->getDBCon(), 1); } function disableFulltextSearch() { if (!$this->hasFulltextIndex()) return 1; $tbl = $this->getTablePrefix() . 'o2val'; $this->queryDB('DROP INDEX vft ON ' . $tbl, $this->getDBCon()); } /* */ function countDBProcesses() { return ($rs = $this->queryDB('SHOW PROCESSLIST', $this->getDBCon())) ? mysql_num_rows($rs) : 0; } function killDBProcesses($needle = '', $runtime = 30) { $dbcon = $this->getDBCon(); /* make sure needle is sql */ if (preg_match('/\?.+ WHERE/i', $needle, $m)) { $needle = $this->query($needle, 'sql'); } $rs = $this->queryDB('SHOW FULL PROCESSLIST', $dbcon); $ref_tbl = $this->getTablePrefix() . 'triple'; while ($row = mysql_fetch_array($rs)) { if ($row['Time'] < $runtime) continue; if (!preg_match('/^\s*(INSERT|SELECT) /s', $row['Info'])) continue; /* only basic queries */ if (!strpos($row['Info'], $ref_tbl . ' ')) continue; /* only from this store */ $kill = 0; if ($needle && (strpos($row['Info'], $needle) !== false)) $kill = 1; if (!$needle) $kill = 1; if (!$kill) continue; $this->queryDB('KILL ' . $row['Id'], $dbcon); } } /* */ function getTables() { return array('triple', 'g2t', 'id2val', 's2val', 'o2val', 'setting'); } /* */ function isSetUp() { if (($con = $this->getDBCon())) { $tbl = $this->getTablePrefix() . 'setting'; return $this->queryDB("SELECT 1 FROM " . $tbl . " LIMIT 0", $con) ? 1 : 0; } } function setUp($force = 0) { if (($force || !$this->isSetUp()) && ($con = $this->getDBCon())) { if ($this->getDBVersion() < '04-00-04') { /* UPDATE + JOINs */ return $this->addError('MySQL version not supported. ARC requires version 4.0.4 or higher.'); } ARC2::inc('StoreTableManager'); $mgr = new ARC2_StoreTableManager($this->a, $this); $mgr->createTables(); } } function extendColumns() { ARC2::inc('StoreTableManager'); $mgr = new ARC2_StoreTableManager($this->a, $this); $mgr->extendColumns(); $this->column_type = 'int'; } function splitTables() { ARC2::inc('StoreTableManager'); $mgr = new ARC2_StoreTableManager($this->a, $this); $mgr->splitTables(); } /* */ function hasSetting($k) { $tbl = $this->getTablePrefix() . 'setting'; $sql = "SELECT val FROM " . $tbl . " WHERE k = '" .md5($k). "'"; $rs = $this->queryDB($sql, $this->getDBCon()); return ($rs && ($row = mysql_fetch_array($rs))) ? 1 : 0; } function getSetting($k, $default = 0) { $tbl = $this->getTablePrefix() . 'setting'; $sql = "SELECT val FROM " . $tbl . " WHERE k = '" .md5($k). "'"; $rs = $this->queryDB($sql, $this->getDBCon()); if ($rs && ($row = mysql_fetch_array($rs))) { return unserialize($row['val']); } return $default; } function setSetting($k, $v) { $con = $this->getDBCon(); $tbl = $this->getTablePrefix() . 'setting'; if ($this->hasSetting($k)) { $sql = "UPDATE " .$tbl . " SET val = '" . mysql_real_escape_string(serialize($v), $con) . "' WHERE k = '" . md5($k) . "'"; } else { $sql = "INSERT INTO " . $tbl . " (k, val) VALUES ('" . md5($k) . "', '" . mysql_real_escape_string(serialize($v), $con) . "')"; } return $this->queryDB($sql, $con); } function removeSetting($k) { $tbl = $this->getTablePrefix() . 'setting'; return $this->queryDB("DELETE FROM " . $tbl . " WHERE k = '" . md5($k) . "'", $this->getDBCon()); } function getQueueTicket() { if (!$this->queue_queries) return 1; $t = 'ticket_' . substr(md5(uniqid(rand())), 0, 10); $con = $this->getDBCon(); /* lock */ $rs = $this->queryDB('LOCK TABLES ' . $this->getTablePrefix() . 'setting WRITE', $con); /* queue */ $queue = $this->getSetting('query_queue', array()); $queue[] = $t; $this->setSetting('query_queue', $queue); $this->queryDB('UNLOCK TABLES', $con); /* loop */ $lc = 0; $queue = $this->getSetting('query_queue', array()); while ($queue && ($queue[0] != $t) && ($lc < 30)) { if ($this->is_win) { sleep(1); $lc++; } else { usleep(100000); $lc += 0.1; } $queue = $this->getSetting('query_queue', array()); } return ($lc < 30) ? $t : 0; } function removeQueueTicket($t) { if (!$this->queue_queries) return 1; $con = $this->getDBCon(); /* lock */ $this->queryDB('LOCK TABLES ' . $this->getTablePrefix() . 'setting WRITE', $con); /* queue */ $vals = $this->getSetting('query_queue', array()); $pos = array_search($t, $vals); $queue = ($pos < (count($vals) - 1)) ? array_slice($vals, $pos + 1) : array(); $this->setSetting('query_queue', $queue); $this->queryDB('UNLOCK TABLES', $con); } /* */ function reset($keep_settings = 0) { $con = $this->getDBCon(); $tbls = $this->getTables(); $prefix = $this->getTablePrefix(); /* remove split tables */ $ps = $this->getSetting('split_predicates', array()); foreach ($ps as $p) { $tbl = 'triple_' . abs(crc32($p)); $this->queryDB('DROP TABLE ' . $prefix . $tbl, $con); } $this->removeSetting('split_predicates'); /* truncate tables */ foreach ($tbls as $tbl) { if ($keep_settings && ($tbl == 'setting')) { continue; } $this->queryDB('TRUNCATE ' . $prefix . $tbl, $con); } } function drop() { $con = $this->getDBCon(); $tbls = $this->getTables(); $prefix = $this->getTablePrefix(); foreach ($tbls as $tbl) { $this->queryDB('DROP TABLE ' . $prefix . $tbl, $con); } } function insert($doc, $g, $keep_bnode_ids = 0) { $doc = is_array($doc) ? $this->toTurtle($doc) : $doc; $infos = array('query' => array('url' => $g, 'target_graph' => $g)); ARC2::inc('StoreLoadQueryHandler'); $h = new ARC2_StoreLoadQueryHandler($this->a, $this); $r = $h->runQuery($infos, $doc, $keep_bnode_ids); $this->processTriggers('insert', $infos); return $r; } function delete($doc, $g) { if (!$doc) { $infos = array('query' => array('target_graphs' => array($g))); ARC2::inc('StoreDeleteQueryHandler'); $h = new ARC2_StoreDeleteQueryHandler($this->a, $this); $r = $h->runQuery($infos); $this->processTriggers('delete', $infos); return $r; } } function replace($doc, $g, $doc_2) { return array($this->delete($doc, $g), $this->insert($doc_2, $g)); } function dump() { ARC2::inc('StoreDumper'); $d = new ARC2_StoreDumper($this->a, $this); $d->dumpSPOG(); } function createBackup($path, $q = '') { ARC2::inc('StoreDumper'); $d = new ARC2_StoreDumper($this->a, $this); $d->saveSPOG($path, $q); } function renameTo($name) { $con = $this->getDBCon(); $tbls = $this->getTables(); $old_prefix = $this->getTablePrefix(); $new_prefix = $this->v('db_table_prefix', '', $this->a); $new_prefix .= $new_prefix ? '_' : ''; $new_prefix .= $name . '_'; foreach ($tbls as $tbl) { $rs = $this->queryDB('RENAME TABLE ' . $old_prefix . $tbl .' TO ' . $new_prefix . $tbl, $con); $err = mysql_error($con); if ($err) { return $this->addError($err); } } $this->a['store_name'] = $name; unset($this->tbl_prefix); } function replicateTo($name) { $conf = array_merge($this->a, array('store_name' => $name)); $new_store = ARC2::getStore($conf); $new_store->setUp(); $new_store->reset(); $con = $this->getDBCon(); $tbls = $this->getTables(); $old_prefix = $this->getTablePrefix(); $new_prefix = $new_store->getTablePrefix(); foreach ($tbls as $tbl) { $rs = $this->queryDB('INSERT IGNORE INTO ' . $new_prefix . $tbl .' SELECT * FROM ' . $old_prefix . $tbl, $con); $err = mysql_error($con); if ($err) { return $this->addError($err); } } return $new_store->query('SELECT COUNT(*) AS t_count WHERE { ?s ?p ?o}', 'row'); } /* */ function query($q, $result_format = '', $src = '', $keep_bnode_ids = 0, $log_query = 0) { if ($log_query) $this->logQuery($q); $con = $this->getDBCon(); if (preg_match('/^dump/i', $q)) { $infos = array('query' => array('type' => 'dump')); } else { ARC2::inc('SPARQLPlusParser'); $p = new ARC2_SPARQLPlusParser($this->a, $this); $p->parse($q, $src); $infos = $p->getQueryInfos(); } if ($result_format == 'infos') return $infos; $infos['result_format'] = $result_format; if (!isset($p) || !$p->getErrors()) { $qt = $infos['query']['type']; if (!in_array($qt, array('select', 'ask', 'describe', 'construct', 'load', 'insert', 'delete', 'dump'))) { return $this->addError('Unsupported query type "'.$qt.'"'); } $t1 = ARC2::mtime(); $r = array('query_type' => $qt, 'result' => $this->runQuery($infos, $qt, $keep_bnode_ids, $q)); $t2 = ARC2::mtime(); $r['query_time'] = $t2 - $t1; /* query result */ if ($result_format == 'raw') { return $r['result']; } if ($result_format == 'rows') { return $r['result']['rows'] ? $r['result']['rows'] : array(); } if ($result_format == 'row') { return $r['result']['rows'] ? $r['result']['rows'][0] : array(); } return $r; } return 0; } function runQuery($infos, $type, $keep_bnode_ids = 0, $q = '') { ARC2::inc('Store' . ucfirst($type) . 'QueryHandler'); $cls = 'ARC2_Store' . ucfirst($type) . 'QueryHandler'; $h = new $cls($this->a, $this); $ticket = 1; $r = array(); if ($q && ($type == 'select')) $ticket = $this->getQueueTicket($q); if ($ticket) { if ($type == 'load') {/* the LoadQH supports raw data as 2nd parameter */ $r = $h->runQuery($infos, '', $keep_bnode_ids); } else { $r = $h->runQuery($infos, $keep_bnode_ids); } } if ($q && ($type == 'select')) $this->removeQueueTicket($ticket); $trigger_r = $this->processTriggers($type, $infos); return $r; } function processTriggers($type, $infos) { $r = array(); $trigger_defs = $this->triggers; $this->triggers = array(); $triggers = $this->v($type, array(), $trigger_defs); if ($triggers) { $r['trigger_results'] = array(); $triggers = is_array($triggers) ? $triggers : array($triggers); $trigger_inc_path = $this->v('store_triggers_path', '', $this->a); foreach ($triggers as $trigger) { $trigger .= !preg_match('/Trigger$/', $trigger) ? 'Trigger' : ''; if (ARC2::inc(ucfirst($trigger), $trigger_inc_path)) { $cls = 'ARC2_' . ucfirst($trigger); $config = array_merge($this->a, array('query_infos' => $infos)); $trigger_obj = new $cls($config, $this); if (method_exists($trigger_obj, 'go')) { $r['trigger_results'][] = $trigger_obj->go(); } } } } $this->triggers = $trigger_defs; return $r; } /* */ function getValueHash($val) { return abs(crc32($val)); } function getTermID($val, $term = '') { /* mem cache */ if (!isset($this->term_id_cache) || (count(array_keys($this->term_id_cache)) > 100)) { $this->term_id_cache = array(); } if (!isset($this->term_id_cache[$term])) { $this->term_id_cache[$term] = array(); } $tbl = preg_match('/^(s|o)$/', $term) ? $term . '2val' : 'id2val'; /* cached? */ if ((strlen($val) < 100) && isset($this->term_id_cache[$term][$val])) { return $this->term_id_cache[$term][$val]; } $con = $this->getDBCon(); $r = 0; /* via hash */ if (preg_match('/^(s2val|o2val)$/', $tbl) && $this->hasHashColumn($tbl)) { $sql = "SELECT id, val FROM " . $this->getTablePrefix() . $tbl . " WHERE val_hash = '" . $this->getValueHash($val) . "'"; if (($rs = $this->queryDB($sql, $con)) && mysql_num_rows($rs)) { while ($row = mysql_fetch_array($rs)) { if ($row['val'] == $val) { $r = $row['id']; break; } } } } /* exact match */ else { $sql = "SELECT id FROM " . $this->getTablePrefix() . $tbl . " WHERE val = BINARY '" . mysql_real_escape_string($val, $con) . "' LIMIT 1"; if (($rs = $this->queryDB($sql, $con)) && mysql_num_rows($rs) && ($row = mysql_fetch_array($rs))) { $r = $row['id']; } } if ($r && (strlen($val) < 100)) { $this->term_id_cache[$term][$val] = $r; } return $r; } function getIDValue($id, $term = '') { $tbl = preg_match('/^(s|o)$/', $term) ? $term . '2val' : 'id2val'; $con = $this->getDBCon(); $sql = "SELECT val FROM " . $this->getTablePrefix() . $tbl . " WHERE id = " . mysql_real_escape_string($id, $con) . " LIMIT 1"; if (($rs = $this->queryDB($sql, $con)) && mysql_num_rows($rs) && ($row = mysql_fetch_array($rs))) { return $row['val']; } return 0; } /* */ function getLock($t_out = 10, $t_out_init = '') { if (!$t_out_init) $t_out_init = $t_out; $con = $this->getDBCon(); $l_name = $this->a['db_name'] . '.' . $this->getTablePrefix() . '.write_lock'; $rs = $this->queryDB('SELECT IS_FREE_LOCK("' . $l_name. '") AS success', $con); if ($rs) { $row = mysql_fetch_array($rs); if (!$row['success']) { if ($t_out) { sleep(1); return $this->getLock($t_out - 1, $t_out_init); } } else { $rs = $this->queryDB('SELECT GET_LOCK("' . $l_name. '", ' . $t_out_init. ') AS success', $con); if ($rs) { $row = mysql_fetch_array($rs); return $row['success']; } } } return 0; } function releaseLock() { $con = $this->getDBCon(); return $this->queryDB('DO RELEASE_LOCK("' . $this->a['db_name'] . '.' . $this->getTablePrefix() . '.write_lock")', $con); } /* */ function processTables($level = 2, $operation = 'optimize') {/* 1: triple + g2t, 2: triple + *2val, 3: all tables */ $con = $this->getDBCon(); $pre = $this->getTablePrefix(); $tbls = $this->getTables(); $sql = ''; foreach ($tbls as $tbl) { if (($level < 3) && preg_match('/(backup|setting)$/', $tbl)) continue; if (($level < 2) && preg_match('/(val)$/', $tbl)) continue; $sql .= $sql ? ', ' : strtoupper($operation) . ' TABLE '; $sql .= $pre . $tbl; } $this->queryDB($sql, $con); $err = mysql_error($con); if ($err) { $this->addError($err . ' in ' . $sql); } } function optimizeTables($level = 2) { if ($this->v('ignore_optimization')) return 1; return $this->processTables($level, 'optimize'); } function checkTables($level = 2) { return $this->processTables($level, 'check'); } function repairTables($level = 2) { return $this->processTables($level, 'repair'); } /* */ function changeNamespaceURI($old_uri, $new_uri) { ARC2::inc('StoreHelper'); $c = new ARC2_StoreHelper($this->a, $this); return $c->changeNamespaceURI($old_uri, $new_uri); } /* */ function getResourceLabel($res, $unnamed_label = 'An unnamed resource') { if (!isset($this->resource_labels)) $this->resource_labels = array(); if (isset($this->resource_labels[$res])) return $this->resource_labels[$res]; if (!preg_match('/^[a-z0-9\_]+\:[^\s]+$/si', $res)) return $res;/* literal */ $ps = $this->getLabelProps(); if ($this->getSetting('store_label_properties', '-') != md5(serialize($ps))) { $this->inferLabelProps($ps); } //$sub_q .= $sub_q ? ' || ' : ''; //$sub_q .= 'REGEX(str(?p), "(last_name|name|fn|title|label)$", "i")'; $q = 'SELECT ?label WHERE { <' . $res . '> ?p ?label . ?p a } LIMIT 3'; $r = ''; $rows = $this->query($q, 'rows'); foreach ($rows as $row) { $r = strlen($row['label']) > strlen($r) ? $row['label'] : $r; } if (!$r && preg_match('/^\_\:/', $res)) { return $unnamed_label; } $r = $r ? $r : preg_replace("/^(.*[\/\#])([^\/\#]+)$/", '\\2', str_replace('#self', '', $res)); $r = str_replace('_', ' ', $r); $r = preg_replace('/([a-z])([A-Z])/e', '"\\1 " . strtolower("\\2")', $r); $this->resource_labels[$res] = $r; return $r; } function getLabelProps() { return array_merge( $this->v('rdf_label_properties' , array(), $this->a), array( 'http://www.w3.org/2000/01/rdf-schema#label', 'http://xmlns.com/foaf/0.1/name', 'http://purl.org/dc/elements/1.1/title', 'http://purl.org/rss/1.0/title', 'http://www.w3.org/2004/02/skos/core#prefLabel', 'http://xmlns.com/foaf/0.1/nick', ) ); } function inferLabelProps($ps) { $this->query('DELETE FROM '); $sub_q = ''; foreach ($ps as $p) { $sub_q .= ' <' . $p . '> a . '; } $this->query('INSERT INTO { ' . $sub_q. ' }'); $this->setSetting('store_label_properties', md5(serialize($ps))); } /* */ function getResourcePredicates($res) { $r = array(); $rows = $this->query('SELECT DISTINCT ?p WHERE { <' . $res . '> ?p ?o . }', 'rows'); foreach ($rows as $row) { $r[$row['p']] = array(); } return $r; } function getDomains($p) { $r = array(); foreach($this->query('SELECT DISTINCT ?type WHERE {?s <' . $p . '> ?o ; a ?type . }', 'rows') as $row) { $r[] = $row['type']; } return $r; } function getPredicateRange($p) { $row = $this->query('SELECT ?val WHERE {<' . $p . '> rdfs:range ?val . } LIMIT 1', 'row'); return $row ? $row['val'] : ''; } /* */ function logQuery($q) { $fp = @fopen("arc_query_log.txt", "a"); @fwrite($fp, date('Y-m-d\TH:i:s\Z', time()) . ' : ' . $q . '' . "\n\n"); @fclose($fp); } /* */ }