graphdb_linux.go 12 KB


  1. package graphdb
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "path"
  6. "strings"
  7. "sync"
  8. )
  9. const (
  10. createEntityTable = `
  11. CREATE TABLE IF NOT EXISTS entity (
  12. id text NOT NULL PRIMARY KEY
  13. );`
  14. createEdgeTable = `
  15. CREATE TABLE IF NOT EXISTS edge (
  16. "entity_id" text NOT NULL,
  17. "parent_id" text NULL,
  18. "name" text NOT NULL,
  19. CONSTRAINT "parent_fk" FOREIGN KEY ("parent_id") REFERENCES "entity" ("id"),
  20. CONSTRAINT "entity_fk" FOREIGN KEY ("entity_id") REFERENCES "entity" ("id")
  21. );
  22. `
  23. createEdgeIndices = `
  24. CREATE UNIQUE INDEX IF NOT EXISTS "name_parent_ix" ON "edge" (parent_id, name);
  25. `
  26. )
  27. // Entity with a unique id.
  28. type Entity struct {
  29. id string
  30. }
  31. // An Edge connects two entities together.
  32. type Edge struct {
  33. EntityID string
  34. Name string
  35. ParentID string
  36. }
  37. // Entities stores the list of entities.
  38. type Entities map[string]*Entity
  39. // Edges stores the relationships between entities.
  40. type Edges []*Edge
  41. // WalkFunc is a function invoked to process an individual entity.
  42. type WalkFunc func(fullPath string, entity *Entity) error
  43. // Database is a graph database for storing entities and their relationships.
  44. type Database struct {
  45. conn *sql.DB
  46. mux sync.RWMutex
  47. }
  48. // IsNonUniqueNameError processes the error to check if it's caused by
  49. // a constraint violation.
  50. // This is necessary because the error isn't the same across various
  51. // sqlite versions.
  52. func IsNonUniqueNameError(err error) bool {
  53. str := err.Error()
  54. // sqlite 3.7.17-1ubuntu1 returns:
  55. // Set failure: Abort due to constraint violation: columns parent_id, name are not unique
  56. if strings.HasSuffix(str, "name are not unique") {
  57. return true
  58. }
  59. // sqlite-3.8.3-1.fc20 returns:
  60. // Set failure: Abort due to constraint violation: UNIQUE constraint failed: edge.parent_id, edge.name
  61. if strings.Contains(str, "UNIQUE constraint failed") && strings.Contains(str, "edge.name") {
  62. return true
  63. }
  64. // sqlite-3.6.20-1.el6 returns:
  65. // Set failure: Abort due to constraint violation: constraint failed
  66. if strings.HasSuffix(str, "constraint failed") {
  67. return true
  68. }
  69. return false
  70. }
  71. // NewDatabase creates a new graph database initialized with a root entity.
  72. func NewDatabase(conn *sql.DB) (*Database, error) {
  73. if conn == nil {
  74. return nil, fmt.Errorf("Database connection cannot be nil")
  75. }
  76. db := &Database{conn: conn}
  77. // Create root entities
  78. tx, err := conn.Begin()
  79. if err != nil {
  80. return nil, err
  81. }
  82. if _, err := tx.Exec(createEntityTable); err != nil {
  83. return nil, err
  84. }
  85. if _, err := tx.Exec(createEdgeTable); err != nil {
  86. return nil, err
  87. }
  88. if _, err := tx.Exec(createEdgeIndices); err != nil {
  89. return nil, err
  90. }
  91. if _, err := tx.Exec("DELETE FROM entity where id = ?", "0"); err != nil {
  92. tx.Rollback()
  93. return nil, err
  94. }
  95. if _, err := tx.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil {
  96. tx.Rollback()
  97. return nil, err
  98. }
  99. if _, err := tx.Exec("DELETE FROM edge where entity_id=? and name=?", "0", "/"); err != nil {
  100. tx.Rollback()
  101. return nil, err
  102. }
  103. if _, err := tx.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil {
  104. tx.Rollback()
  105. return nil, err
  106. }
  107. if err := tx.Commit(); err != nil {
  108. return nil, err
  109. }
  110. return db, nil
  111. }
  112. // Close the underlying connection to the database.
  113. func (db *Database) Close() error {
  114. return db.conn.Close()
  115. }
  116. // Set the entity id for a given path.
  117. func (db *Database) Set(fullPath, id string) (*Entity, error) {
  118. db.mux.Lock()
  119. defer db.mux.Unlock()
  120. tx, err := db.conn.Begin()
  121. if err != nil {
  122. return nil, err
  123. }
  124. var entityID string
  125. if err := tx.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityID); err != nil {
  126. if err == sql.ErrNoRows {
  127. if _, err := tx.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil {
  128. tx.Rollback()
  129. return nil, err
  130. }
  131. } else {
  132. tx.Rollback()
  133. return nil, err
  134. }
  135. }
  136. e := &Entity{id}
  137. parentPath, name := splitPath(fullPath)
  138. if err := db.setEdge(parentPath, name, e, tx); err != nil {
  139. tx.Rollback()
  140. return nil, err
  141. }
  142. if err := tx.Commit(); err != nil {
  143. return nil, err
  144. }
  145. return e, nil
  146. }
  147. // Exists returns true if a name already exists in the database.
  148. func (db *Database) Exists(name string) bool {
  149. db.mux.RLock()
  150. defer db.mux.RUnlock()
  151. e, err := db.get(name)
  152. if err != nil {
  153. return false
  154. }
  155. return e != nil
  156. }
  157. func (db *Database) setEdge(parentPath, name string, e *Entity, tx *sql.Tx) error {
  158. parent, err := db.get(parentPath)
  159. if err != nil {
  160. return err
  161. }
  162. if parent.id == e.id {
  163. return fmt.Errorf("Cannot set self as child")
  164. }
  165. if _, err := tx.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil {
  166. return err
  167. }
  168. return nil
  169. }
  170. // RootEntity returns the root "/" entity for the database.
  171. func (db *Database) RootEntity() *Entity {
  172. return &Entity{
  173. id: "0",
  174. }
  175. }
  176. // Get returns the entity for a given path.
  177. func (db *Database) Get(name string) *Entity {
  178. db.mux.RLock()
  179. defer db.mux.RUnlock()
  180. e, err := db.get(name)
  181. if err != nil {
  182. return nil
  183. }
  184. return e
  185. }
  186. func (db *Database) get(name string) (*Entity, error) {
  187. e := db.RootEntity()
  188. // We always know the root name so return it if
  189. // it is requested
  190. if name == "/" {
  191. return e, nil
  192. }
  193. parts := split(name)
  194. for i := 1; i < len(parts); i++ {
  195. p := parts[i]
  196. if p == "" {
  197. continue
  198. }
  199. next := db.child(e, p)
  200. if next == nil {
  201. return nil, fmt.Errorf("Cannot find child for %s", name)
  202. }
  203. e = next
  204. }
  205. return e, nil
  206. }
  207. // List all entities by from the name.
  208. // The key will be the full path of the entity.
  209. func (db *Database) List(name string, depth int) Entities {
  210. db.mux.RLock()
  211. defer db.mux.RUnlock()
  212. out := Entities{}
  213. e, err := db.get(name)
  214. if err != nil {
  215. return out
  216. }
  217. children, err := db.children(e, name, depth, nil)
  218. if err != nil {
  219. return out
  220. }
  221. for _, c := range children {
  222. out[c.FullPath] = c.Entity
  223. }
  224. return out
  225. }
  226. // Walk through the child graph of an entity, calling walkFunc for each child entity.
  227. // It is safe for walkFunc to call graph functions.
  228. func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
  229. children, err := db.Children(name, depth)
  230. if err != nil {
  231. return err
  232. }
  233. // Note: the database lock must not be held while calling walkFunc
  234. for _, c := range children {
  235. if err := walkFunc(c.FullPath, c.Entity); err != nil {
  236. return err
  237. }
  238. }
  239. return nil
  240. }
  241. // Children returns the children of the specified entity.
  242. func (db *Database) Children(name string, depth int) ([]WalkMeta, error) {
  243. db.mux.RLock()
  244. defer db.mux.RUnlock()
  245. e, err := db.get(name)
  246. if err != nil {
  247. return nil, err
  248. }
  249. return db.children(e, name, depth, nil)
  250. }
  251. // Parents returns the parents of a specified entity.
  252. func (db *Database) Parents(name string) ([]string, error) {
  253. db.mux.RLock()
  254. defer db.mux.RUnlock()
  255. e, err := db.get(name)
  256. if err != nil {
  257. return nil, err
  258. }
  259. return db.parents(e)
  260. }
  261. // Refs returns the reference count for a specified id.
  262. func (db *Database) Refs(id string) int {
  263. db.mux.RLock()
  264. defer db.mux.RUnlock()
  265. var count int
  266. if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil {
  267. return 0
  268. }
  269. return count
  270. }
  271. // RefPaths returns all the id's path references.
  272. func (db *Database) RefPaths(id string) Edges {
  273. db.mux.RLock()
  274. defer db.mux.RUnlock()
  275. refs := Edges{}
  276. rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id)
  277. if err != nil {
  278. return refs
  279. }
  280. defer rows.Close()
  281. for rows.Next() {
  282. var name string
  283. var parentID string
  284. if err := rows.Scan(&name, &parentID); err != nil {
  285. return refs
  286. }
  287. refs = append(refs, &Edge{
  288. EntityID: id,
  289. Name: name,
  290. ParentID: parentID,
  291. })
  292. }
  293. return refs
  294. }
  295. // Delete the reference to an entity at a given path.
  296. func (db *Database) Delete(name string) error {
  297. db.mux.Lock()
  298. defer db.mux.Unlock()
  299. if name == "/" {
  300. return fmt.Errorf("Cannot delete root entity")
  301. }
  302. parentPath, n := splitPath(name)
  303. parent, err := db.get(parentPath)
  304. if err != nil {
  305. return err
  306. }
  307. if _, err := db.conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name = ?;", parent.id, n); err != nil {
  308. return err
  309. }
  310. return nil
  311. }
  312. // Purge removes the entity with the specified id
  313. // Walk the graph to make sure all references to the entity
  314. // are removed and return the number of references removed
  315. func (db *Database) Purge(id string) (int, error) {
  316. db.mux.Lock()
  317. defer db.mux.Unlock()
  318. tx, err := db.conn.Begin()
  319. if err != nil {
  320. return -1, err
  321. }
  322. // Delete all edges
  323. rows, err := tx.Exec("DELETE FROM edge WHERE entity_id = ?;", id)
  324. if err != nil {
  325. tx.Rollback()
  326. return -1, err
  327. }
  328. changes, err := rows.RowsAffected()
  329. if err != nil {
  330. return -1, err
  331. }
  332. // Clear who's using this id as parent
  333. refs, err := tx.Exec("DELETE FROM edge WHERE parent_id = ?;", id)
  334. if err != nil {
  335. tx.Rollback()
  336. return -1, err
  337. }
  338. refsCount, err := refs.RowsAffected()
  339. if err != nil {
  340. return -1, err
  341. }
  342. // Delete entity
  343. if _, err := tx.Exec("DELETE FROM entity where id = ?;", id); err != nil {
  344. tx.Rollback()
  345. return -1, err
  346. }
  347. if err := tx.Commit(); err != nil {
  348. return -1, err
  349. }
  350. return int(changes + refsCount), nil
  351. }
  352. // Rename an edge for a given path
  353. func (db *Database) Rename(currentName, newName string) error {
  354. db.mux.Lock()
  355. defer db.mux.Unlock()
  356. parentPath, name := splitPath(currentName)
  357. newParentPath, newEdgeName := splitPath(newName)
  358. if parentPath != newParentPath {
  359. return fmt.Errorf("Cannot rename when root paths do not match %s != %s", parentPath, newParentPath)
  360. }
  361. parent, err := db.get(parentPath)
  362. if err != nil {
  363. return err
  364. }
  365. rows, err := db.conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name = ?;", newEdgeName, parent.id, name)
  366. if err != nil {
  367. return err
  368. }
  369. i, err := rows.RowsAffected()
  370. if err != nil {
  371. return err
  372. }
  373. if i == 0 {
  374. return fmt.Errorf("Cannot locate edge for %s %s", parent.id, name)
  375. }
  376. return nil
  377. }
  378. // WalkMeta stores the walk metadata.
  379. type WalkMeta struct {
  380. Parent *Entity
  381. Entity *Entity
  382. FullPath string
  383. Edge *Edge
  384. }
  385. func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) {
  386. if e == nil {
  387. return entities, nil
  388. }
  389. rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id)
  390. if err != nil {
  391. return nil, err
  392. }
  393. defer rows.Close()
  394. for rows.Next() {
  395. var entityID, entityName string
  396. if err := rows.Scan(&entityID, &entityName); err != nil {
  397. return nil, err
  398. }
  399. child := &Entity{entityID}
  400. edge := &Edge{
  401. ParentID: e.id,
  402. Name: entityName,
  403. EntityID: child.id,
  404. }
  405. meta := WalkMeta{
  406. Parent: e,
  407. Entity: child,
  408. FullPath: path.Join(name, edge.Name),
  409. Edge: edge,
  410. }
  411. entities = append(entities, meta)
  412. if depth != 0 {
  413. nDepth := depth
  414. if depth != -1 {
  415. nDepth--
  416. }
  417. entities, err = db.children(child, meta.FullPath, nDepth, entities)
  418. if err != nil {
  419. return nil, err
  420. }
  421. }
  422. }
  423. return entities, nil
  424. }
  425. func (db *Database) parents(e *Entity) (parents []string, err error) {
  426. if e == nil {
  427. return parents, nil
  428. }
  429. rows, err := db.conn.Query("SELECT parent_id FROM edge where entity_id = ?;", e.id)
  430. if err != nil {
  431. return nil, err
  432. }
  433. defer rows.Close()
  434. for rows.Next() {
  435. var parentID string
  436. if err := rows.Scan(&parentID); err != nil {
  437. return nil, err
  438. }
  439. parents = append(parents, parentID)
  440. }
  441. return parents, nil
  442. }
  443. // Return the entity based on the parent path and name.
  444. func (db *Database) child(parent *Entity, name string) *Entity {
  445. var id string
  446. if err := db.conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name = ?;", parent.id, name).Scan(&id); err != nil {
  447. return nil
  448. }
  449. return &Entity{id}
  450. }
  451. // ID returns the id used to reference this entity.
  452. func (e *Entity) ID() string {
  453. return e.id
  454. }
  455. // Paths returns the paths sorted by depth.
  456. func (e Entities) Paths() []string {
  457. out := make([]string, len(e))
  458. var i int
  459. for k := range e {
  460. out[i] = k
  461. i++
  462. }
  463. sortByDepth(out)
  464. return out
  465. }