计算机系统应用教程网站

网站首页 > 技术文章 正文

(十) JanusGraph事务机制 事务型结构数据流图的特点是

btikc 2024-10-26 08:41:16 技术文章 7 ℃ 0 评论

JanusGraph 事务不一定是ACID,其实在了解了JanusGraph的机制后,大致可以发现JanusGraph本身不提供存储能力,无论是在后端存储还是索引能力都依赖于外部的存储殷勤及索引引擎。Hbase, Cassandra都是如此。

官方是这么说的:几乎所有与JanusGraph的交互都与事务相关联。JanusGraph事务对于多个线程并发使用是安全的。JanusGraph实例上的方法,如graph.V(..)和graph.tx().commit()执行ThreadLocal查找以检索或创建与调用线程关联的事务。调用者可以选择放弃ThreadLocal事务管理,转而调用 graph.tx().createThreadedTx(),它返回对事务对象的引用,其中包含读/写图数据和提交或回滚的方法。


JanusGraph 事务不一定是ACID。它们可以在BerkeleyDB上进行这样的配置,但在Cassandra或HBase上通常不会这样,因为在这些地方,底层存储系统不提供可序列化的隔离或多行原子写入,并且模拟这些属性的成本会很高。


基本事务处理

JanusGraph中的每个图形操作都发生在事务的上下文中。根据TinkerPop的事务规范,每个线程执行图形上的第一个操作(即 retrieval 或 mutation)时便会打开针对图形数据库的事务:

graph = JanusGraphFactory.open("berkeleyje:/tmp/janusgraph")
juno = graph.addVertex() //Automatically opens a new transaction
juno.property("name", "juno")
graph.tx().commit() //Commits transaction


事务范围

这里我理解所有基础数据库的操作默认都会开启一个事务,在TinkerPop的默认事务语义下,随着图形上的第一个操作自动创建事务,并使用commit()或rollback()显式的关闭事务。关闭事务后,与该事务关联的所有图形元素都将过时且不可用。但是,JanusGraph会自动将vertices和types转换为新的事务范围,如下例所示:

graph = JanusGraphFactory.open(xxxxxx)
juno = graph.addVertex() //自动创建一个事务
graph.tx().commit() //事务结束
juno.property("name", "zhongshengwang") //节点自动转换为新的事务

另一方面,edge不会自动转换新的事务,也不能在原始事务之外访问。他们必须明确过渡

e = juno.addEdge("person", graph.addVertex())
graph.tx().commit() /事务结束
e = g.E(e).next() // 需要去刷新边 , next()为获取e边集合的第一个
e.property("time", 99) // 只有上一步刷新过 接下来才可以使用

事务失败

提交事务时,JanusGraph将尝试将所有更改保留到存储后端。由于IO异常,网络错误,计算机崩溃或资源不可用,这可能并不总是成功。因此,交易可能会失败。事实上,在足够大的系统中,事务总会存在失败。因此,官方强烈建议通过代码处理此类失败:

try {
    if (g.V().has("name", name).iterator().hasNext())
        throw new IllegalArgumentException("Username already taken: " + name)
    user = graph.addVertex()
    user.property("name", name)
    graph.tx().commit()
} catch (Exception e) {
    //Recover, retry, or return error message
    // 恢复、重试、返回错误消息
    println(e.getMessage())
}

JanusGraphException: 如果事务失败,则抛出一个JanusGraphException。事务可能失败的原因有很多种。JanusGraph区分潜在的临时故障永久性故障


临时故障:潜在的临时故障是与资源不可用和IO超时(例如网络超时)相关的故障。JanusGraph会在一段延迟后重试保持事务状态,自动尝试从临时故障中恢复。重试尝试次数和重试延迟是可配置的。

永久性故障: 完全连接丢失,硬件故障或锁争用可能导致永久性故障。可能导致事务失败的永久性异常包括:

  1. PermanentLockingException(本地锁争用):另一个本地线程已被授予冲突锁。
  2. PermanentLockingException(X的预期值不匹配:expected = Y vs actual = Z):验证此事务中读取的值与申请锁定后数据存储区中的值相同失败。换句话说,另一个事务在读取和修改后修改了该值。

多线程事务

JanusGraph通过TinkerPop的线程事务支持多线程事务。因此,为了加速事务处理并利用多核架构,多个线程可以在单个事务中并发运行。使用TinkerPop的默认事务处理,每个线程都会自动对图形数据库打开自己的事务。要打开与线程无关的事务,需使用createThreadedTx()方法。

threadedGraph = graph.tx().createThreadedTx();
// createThreadedTx()方法返回一个新的Graph对象,该对象表示这个新打开的事务
threads = new Thread[10];
for (int i=0; i<threads.length; i++) {
    threads[i] = new Thread({
        println("Do something with 'threadedGraph''");
    });
    threads[i].start();
}
 
for (int i=0; i<threads.length; i++) threads[i].join();
threadedGraph.tx().commit();

Graph对象不会为每个线程打开新事务。这允许我们启动多个线程,这些线程在同一个事务中同时工作,其中一个线程最终在所有线程完成工作时提交事务。JanusGraph依靠优化的并发数据结构来支持在单个事务中高效运行的数百个并发线程。

并发算法

其实官方这里的说法, 个人暂时并未能有一个全面的认知和更细的了解。不过可以了解到的是,janusgraph对多个线程同时处理事务是有一定优势在的。

通过createThreadedTx()启动的事务独立于线程,这在实现并发图形算法时特别有用。大多数遍历或消息传递(以自我为中心)的图形算法都是令人尴尬的并行(这里是官方的一种说法),这意味着它们可以通过多个线程轻松并行化并执行。这些线程中的每一个都可以在Graph返回的单个对象上操作createThreadedTx()而不会相互阻塞。

嵌套事务

这部分需要注意的是,在一个长事务中,由于事务运行了很长时间,因此可能会出现锁定拥塞和代价高昂的事务性故障。

v1 = graph.addVertex()
 
//Do many other things
 
v2 = graph.addVertex()
 
v2.property("uniqueName", "foo")
 
v1.addEdge("related", v2)
 
//Do many other things
 
graph.tx().commit() // 由于其uniqueName锁争用,这个长时间运行的tx可能会失败

具体解法:在尽量短的事务周期内尽量做简单的事。

v1 = graph.addVertex()
 
//Do many other things
 
 
tx = graph.tx().createThreadedTx()  // 开启嵌套事务
 
v2 = tx.addVertex()
 
v2.property("uniqueName", "foo")
 
tx.commit() // 此处将检测到任何的锁争用  // 嵌套事务结束
 
 
v1.addEdge("related", g.V(v2).next()) //需要将v2加载到外部事务中
 
//Do many other things
 
graph.tx().commit() // 不会因为涉及v2的uniqueName写锁争用而失败

常见问题

事务在TinkerPop语义下自动启动,但不会自动终止。必须使用commit()或手动终止交易rollback()。如果commit()事务失败,则应rollback()在捕获失败后手动终止。手动终止事务是必要的,因为只有用户知道事务边界。事务将尝试从事务开始时维护其状态。这可能会导致多线程应用程序中的意外行为。

v = g.V(4).next() // 第一个图形操作,自动启动事务
 
g.V(v).bothE()
 
>> returns nothing, v has no edges
 
//线程空闲几秒钟,另一个线程向v添加边
 
g.V(v).bothE()
 
>> 仍然不返回任何值,因为事务从一开始就维护事务状态

解法: 及时提交事务,在新的事务内保证数据可见(有点像MySQL数据库不可重复读的特性。读到其他事物未提交的数据)

v = g.V(4).next() // 第一个图形操作,自动启动事务
 
g.V(v).bothE()
 
graph.tx().commit() // 提交事务,避免出现上述问题
 
//线程空闲几秒钟,另一个线程向v添加边
 
 
g.V(v).bothE() // 此处也相当于第一个图形操作,自动启动事务
 
>> 返回了添加的边
 
graph.tx().commit()

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表