graphdb.go 11 KB


  1. package graphdb
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "path"
  6. "strings"
  7. "sync"
  8. )
  9. const (
  10. createEntityTable = `
  11. CREATE TABLE IF NOT EXISTS entity (
  12. id text NOT NULL PRIMARY KEY
  13. );`
  14. createEdgeTable = `
  15. CREATE TABLE IF NOT EXISTS edge (
  16. "entity_id" text NOT NULL,
  17. "parent_id" text NULL,
  18. "name" text NOT NULL,
  19. CONSTRAINT "parent_fk" FOREIGN KEY ("parent_id") REFERENCES "entity" ("id"),
  20. CONSTRAINT "entity_fk" FOREIGN KEY ("entity_id") REFERENCES "entity" ("id")
  21. );
  22. `
  23. createEdgeIndices = `
  24. CREATE UNIQUE INDEX IF NOT EXISTS "name_parent_ix" ON "edge" (parent_id, name);
  25. `
  26. )
  27. // Entity with a unique id
  28. type Entity struct {
  29. id string
  30. }
  31. // An Edge connects two entities together
  32. type Edge struct {
  33. EntityID string
  34. Name string
  35. ParentID string
  36. }
  37. type Entities map[string]*Entity
  38. type Edges []*Edge
  39. type WalkFunc func(fullPath string, entity *Entity) error
  40. // Graph database for storing entities and their relationships
  41. type Database struct {
  42. conn *sql.DB
  43. mux sync.RWMutex
  44. }
  45. func IsNonUniqueNameError(err error) bool {
  46. str := err.Error()
  47. // sqlite 3.7.17-1ubuntu1 returns:
  48. // Set failure: Abort due to constraint violation: columns parent_id, name are not unique
  49. if strings.HasSuffix(str, "name are not unique") {
  50. return true
  51. }
  52. // sqlite-3.8.3-1.fc20 returns:
  53. // Set failure: Abort due to constraint violation: UNIQUE constraint failed: edge.parent_id, edge.name
  54. if strings.Contains(str, "UNIQUE constraint failed") && strings.Contains(str, "edge.name") {
  55. return true
  56. }
  57. // sqlite-3.6.20-1.el6 returns:
  58. // Set failure: Abort due to constraint violation: constraint failed
  59. if strings.HasSuffix(str, "constraint failed") {
  60. return true
  61. }
  62. return false
  63. }
  64. // Create a new graph database initialized with a root entity
  65. func NewDatabase(conn *sql.DB, init bool) (*Database, error) {
  66. if conn == nil {
  67. return nil, fmt.Errorf("Database connection cannot be nil")
  68. }
  69. db := &Database{conn: conn}
  70. if init {
  71. if _, err := conn.Exec(createEntityTable); err != nil {
  72. return nil, err
  73. }
  74. if _, err := conn.Exec(createEdgeTable); err != nil {
  75. return nil, err
  76. }
  77. if _, err := conn.Exec(createEdgeIndices); err != nil {
  78. return nil, err
  79. }
  80. rollback := func() {
  81. conn.Exec("ROLLBACK")
  82. }
  83. // Create root entities
  84. if _, err := conn.Exec("BEGIN"); err != nil {
  85. return nil, err
  86. }
  87. if _, err := conn.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil {
  88. rollback()
  89. return nil, err
  90. }
  91. if _, err := conn.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil {
  92. rollback()
  93. return nil, err
  94. }
  95. if _, err := conn.Exec("COMMIT"); err != nil {
  96. return nil, err
  97. }
  98. }
  99. return db, nil
  100. }
  101. // Close the underlying connection to the database
  102. func (db *Database) Close() error {
  103. return db.conn.Close()
  104. }
  105. // Set the entity id for a given path
  106. func (db *Database) Set(fullPath, id string) (*Entity, error) {
  107. db.mux.Lock()
  108. defer db.mux.Unlock()
  109. rollback := func() {
  110. db.conn.Exec("ROLLBACK")
  111. }
  112. if _, err := db.conn.Exec("BEGIN EXCLUSIVE"); err != nil {
  113. return nil, err
  114. }
  115. var entityId string
  116. if err := db.conn.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityId); err != nil {
  117. if err == sql.ErrNoRows {
  118. if _, err := db.conn.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil {
  119. rollback()
  120. return nil, err
  121. }
  122. } else {
  123. rollback()
  124. return nil, err
  125. }
  126. }
  127. e := &Entity{id}
  128. parentPath, name := splitPath(fullPath)
  129. if err := db.setEdge(parentPath, name, e); err != nil {
  130. rollback()
  131. return nil, err
  132. }
  133. if _, err := db.conn.Exec("COMMIT"); err != nil {
  134. return nil, err
  135. }
  136. return e, nil
  137. }
  138. // Return true if a name already exists in the database
  139. func (db *Database) Exists(name string) bool {
  140. db.mux.RLock()
  141. defer db.mux.RUnlock()
  142. e, err := db.get(name)
  143. if err != nil {
  144. return false
  145. }
  146. return e != nil
  147. }
  148. func (db *Database) setEdge(parentPath, name string, e *Entity) error {
  149. parent, err := db.get(parentPath)
  150. if err != nil {
  151. return err
  152. }
  153. if parent.id == e.id {
  154. return fmt.Errorf("Cannot set self as child")
  155. }
  156. if _, err := db.conn.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil {
  157. return err
  158. }
  159. return nil
  160. }
  161. // Return the root "/" entity for the database
  162. func (db *Database) RootEntity() *Entity {
  163. return &Entity{
  164. id: "0",
  165. }
  166. }
  167. // Return the entity for a given path
  168. func (db *Database) Get(name string) *Entity {
  169. db.mux.RLock()
  170. defer db.mux.RUnlock()
  171. e, err := db.get(name)
  172. if err != nil {
  173. return nil
  174. }
  175. return e
  176. }
  177. func (db *Database) get(name string) (*Entity, error) {
  178. e := db.RootEntity()
  179. // We always know the root name so return it if
  180. // it is requested
  181. if name == "/" {
  182. return e, nil
  183. }
  184. parts := split(name)
  185. for i := 1; i < len(parts); i++ {
  186. p := parts[i]
  187. if p == "" {
  188. continue
  189. }
  190. next := db.child(e, p)
  191. if next == nil {
  192. return nil, fmt.Errorf("Cannot find child for %s", name)
  193. }
  194. e = next
  195. }
  196. return e, nil
  197. }
  198. // List all entities by from the name
  199. // The key will be the full path of the entity
  200. func (db *Database) List(name string, depth int) Entities {
  201. db.mux.RLock()
  202. defer db.mux.RUnlock()
  203. out := Entities{}
  204. e, err := db.get(name)
  205. if err != nil {
  206. return out
  207. }
  208. children, err := db.children(e, name, depth, nil)
  209. if err != nil {
  210. return out
  211. }
  212. for _, c := range children {
  213. out[c.FullPath] = c.Entity
  214. }
  215. return out
  216. }
  217. // Walk through the child graph of an entity, calling walkFunc for each child entity.
  218. // It is safe for walkFunc to call graph functions.
  219. func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error {
  220. children, err := db.Children(name, depth)
  221. if err != nil {
  222. return err
  223. }
  224. // Note: the database lock must not be held while calling walkFunc
  225. for _, c := range children {
  226. if err := walkFunc(c.FullPath, c.Entity); err != nil {
  227. return err
  228. }
  229. }
  230. return nil
  231. }
  232. // Return the children of the specified entity
  233. func (db *Database) Children(name string, depth int) ([]WalkMeta, error) {
  234. db.mux.RLock()
  235. defer db.mux.RUnlock()
  236. e, err := db.get(name)
  237. if err != nil {
  238. return nil, err
  239. }
  240. return db.children(e, name, depth, nil)
  241. }
  242. // Return the parents of a specified entity
  243. func (db *Database) Parents(name string) ([]string, error) {
  244. db.mux.RLock()
  245. defer db.mux.RUnlock()
  246. e, err := db.get(name)
  247. if err != nil {
  248. return nil, err
  249. }
  250. return db.parents(e)
  251. }
  252. // Return the refrence count for a specified id
  253. func (db *Database) Refs(id string) int {
  254. db.mux.RLock()
  255. defer db.mux.RUnlock()
  256. var count int
  257. if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil {
  258. return 0
  259. }
  260. return count
  261. }
  262. // Return all the id's path references
  263. func (db *Database) RefPaths(id string) Edges {
  264. db.mux.RLock()
  265. defer db.mux.RUnlock()
  266. refs := Edges{}
  267. rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id)
  268. if err != nil {
  269. return refs
  270. }
  271. defer rows.Close()
  272. for rows.Next() {
  273. var name string
  274. var parentId string
  275. if err := rows.Scan(&name, &parentId); err != nil {
  276. return refs
  277. }
  278. refs = append(refs, &Edge{
  279. EntityID: id,
  280. Name: name,
  281. ParentID: parentId,
  282. })
  283. }
  284. return refs
  285. }
  286. // Delete the reference to an entity at a given path
  287. func (db *Database) Delete(name string) error {
  288. db.mux.Lock()
  289. defer db.mux.Unlock()
  290. if name == "/" {
  291. return fmt.Errorf("Cannot delete root entity")
  292. }
  293. parentPath, n := splitPath(name)
  294. parent, err := db.get(parentPath)
  295. if err != nil {
  296. return err
  297. }
  298. if _, err := db.conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name = ?;", parent.id, n); err != nil {
  299. return err
  300. }
  301. return nil
  302. }
  303. // Remove the entity with the specified id
  304. // Walk the graph to make sure all references to the entity
  305. // are removed and return the number of references removed
  306. func (db *Database) Purge(id string) (int, error) {
  307. db.mux.Lock()
  308. defer db.mux.Unlock()
  309. rollback := func() {
  310. db.conn.Exec("ROLLBACK")
  311. }
  312. if _, err := db.conn.Exec("BEGIN"); err != nil {
  313. return -1, err
  314. }
  315. // Delete all edges
  316. rows, err := db.conn.Exec("DELETE FROM edge WHERE entity_id = ?;", id)
  317. if err != nil {
  318. rollback()
  319. return -1, err
  320. }
  321. changes, err := rows.RowsAffected()
  322. if err != nil {
  323. return -1, err
  324. }
  325. // Delete entity
  326. if _, err := db.conn.Exec("DELETE FROM entity where id = ?;", id); err != nil {
  327. rollback()
  328. return -1, err
  329. }
  330. if _, err := db.conn.Exec("COMMIT"); err != nil {
  331. return -1, err
  332. }
  333. return int(changes), nil
  334. }
  335. // Rename an edge for a given path
  336. func (db *Database) Rename(currentName, newName string) error {
  337. db.mux.Lock()
  338. defer db.mux.Unlock()
  339. parentPath, name := splitPath(currentName)
  340. newParentPath, newEdgeName := splitPath(newName)
  341. if parentPath != newParentPath {
  342. return fmt.Errorf("Cannot rename when root paths do not match %s != %s", parentPath, newParentPath)
  343. }
  344. parent, err := db.get(parentPath)
  345. if err != nil {
  346. return err
  347. }
  348. rows, err := db.conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name = ?;", newEdgeName, parent.id, name)
  349. if err != nil {
  350. return err
  351. }
  352. i, err := rows.RowsAffected()
  353. if err != nil {
  354. return err
  355. }
  356. if i == 0 {
  357. return fmt.Errorf("Cannot locate edge for %s %s", parent.id, name)
  358. }
  359. return nil
  360. }
  361. type WalkMeta struct {
  362. Parent *Entity
  363. Entity *Entity
  364. FullPath string
  365. Edge *Edge
  366. }
  367. func (db *Database) children(e *Entity, name string, depth int, entities []WalkMeta) ([]WalkMeta, error) {
  368. if e == nil {
  369. return entities, nil
  370. }
  371. rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id)
  372. if err != nil {
  373. return nil, err
  374. }
  375. defer rows.Close()
  376. for rows.Next() {
  377. var entityId, entityName string
  378. if err := rows.Scan(&entityId, &entityName); err != nil {
  379. return nil, err
  380. }
  381. child := &Entity{entityId}
  382. edge := &Edge{
  383. ParentID: e.id,
  384. Name: entityName,
  385. EntityID: child.id,
  386. }
  387. meta := WalkMeta{
  388. Parent: e,
  389. Entity: child,
  390. FullPath: path.Join(name, edge.Name),
  391. Edge: edge,
  392. }
  393. entities = append(entities, meta)
  394. if depth != 0 {
  395. nDepth := depth
  396. if depth != -1 {
  397. nDepth -= 1
  398. }
  399. entities, err = db.children(child, meta.FullPath, nDepth, entities)
  400. if err != nil {
  401. return nil, err
  402. }
  403. }
  404. }
  405. return entities, nil
  406. }
  407. func (db *Database) parents(e *Entity) (parents []string, err error) {
  408. if e == nil {
  409. return parents, nil
  410. }
  411. rows, err := db.conn.Query("SELECT parent_id FROM edge where entity_id = ?;", e.id)
  412. if err != nil {
  413. return nil, err
  414. }
  415. defer rows.Close()
  416. for rows.Next() {
  417. var parentId string
  418. if err := rows.Scan(&parentId); err != nil {
  419. return nil, err
  420. }
  421. parents = append(parents, parentId)
  422. }
  423. return parents, nil
  424. }
  425. // Return the entity based on the parent path and name
  426. func (db *Database) child(parent *Entity, name string) *Entity {
  427. var id string
  428. if err := db.conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name = ?;", parent.id, name).Scan(&id); err != nil {
  429. return nil
  430. }
  431. return &Entity{id}
  432. }
  433. // Return the id used to reference this entity
  434. func (e *Entity) ID() string {
  435. return e.id
  436. }
  437. // Return the paths sorted by depth
  438. func (e Entities) Paths() []string {
  439. out := make([]string, len(e))
  440. var i int
  441. for k := range e {
  442. out[i] = k
  443. i++
  444. }
  445. sortByDepth(out)
  446. return out
  447. }