Gremin Python SDK Hang

使用压测程序测试一个Gremlin服务的HA切换,发现切换之后压测程序Hang住,无法恢复。

重现过程

出现问题的逻辑类似于reproduce.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from __future__ import print_function # Python 2/3 compatibility
from gremlin_python.driver import client
from gremlin_python.driver.protocol import GremlinServerError
import time
my_endpoint="$my_endpoint"
username="$username"
password="$password"
my_client = client.Client('ws://' + my_endpoint + ':8182/gremlin', 'g', username=username, password=password)
count = 0
while True:
callback = my_client.submitAsync("g.V().limit(1)")
result_set = callback.result(2)
print("%d: %s" % (count, result_set.__class__.__name__))
count += 1
time.sleep(1)

1. 运行reproduce.py

1
2
3
4
[root@my-ecs ~]# python reproduce.py
0: ResultSet
1: ResultSet
2: ResultSet

2. 使用tcpkill关闭链接

1
2
3
4
5
6
7
[root@my-ecs ~]# tcpkill -i eth0 -9 port 8182
tcpkill: listening on eth0 [port 8182]
my-ecs:48560 > my-graph-server:8182: R 34858991:34858991(0) win 0
my-ecs:48560 > my-graph-server:8182: R 34859245:34859245(0) win 0
my-ecs:48560 > my-graph-server:8182: R 34859753:34859753(0) win 0
my-ecs:48560 > my-graph-server:8182: R 34860515:34860515(0) win 0
my-ecs:48560 > my-graph-server:8182: R 34861531:34861531(0) win 0

3. 这时候观察reproduce.py的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@my-ecs ~]# python reproduce.py
0: ResultSet
1: ResultSet
2: ResultSet
3: ResultSet
4: ResultSet
5: ResultSet
6: ResultSet
7: ResultSet
8: ResultSet
9: ResultSet
10: ResultSet
11: ResultSet
12: ResultSet
13: ResultSet

程序卡住,没有产生新的输出。

4. 停止tcpkill

停止tcpkillreproduce.py依然Hang住,无法恢复。

排查

使用hanging_threads来观测具体Hang在什么位置:

1
2
3
4
5
6
7
8
9
---------- Thread 140231143462720 hangs ----------
File "reproduce.py", line 15, in <module>
callback = my_client.submitAsync("g.V().limit(1)")
File "gremlin_python/driver/client.py", line 123, in submitAsync
conn = self._pool.get(True)
File "Queue.py", line 168, in get
self.not_empty.wait()
File "threading.py", line 339, in wait
waiter.acquire()

客户端的逻辑大概是这样的:

  1. 有一个pool来存放几个connection(connection数目由创建client时的参数pool_size来决定,默认是4)
  2. 在调用submitAsync时,会阻塞地从pool中拿出一个connection
  3. 在接收response处理完之后,会把对应的connection放回pool中

现在这个Hang住的现象可以让我们猜测到可能是第3步发生了异常,在接收response时出现了错误,导致connection没有被放回pool。

查看请求相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# gremlin_python/driver/connection.py
def write(self, request_message):
request_id = str(uuid.uuid4())
result_set = resultset.ResultSet(queue.Queue(), request_id)
self._results[request_id] = result_set
# Create write task
future = Future()
future_write = self._executor.submit(
self._protocol.write, request_id, request_message)
def cb(f):
try:
f.result()
except Exception as e:
future.set_exception(e)
self._pool.put_nowait(self)
else:
# Start receive task
done = self._executor.submit(self._receive)
result_set.done = done
future.set_result(result_set)
future_write.add_done_callback(cb)
return future
def _receive(self):
try:
while True:
data = self._transport.read()
status_code = self._protocol.data_received(data, self._results)
if status_code != 206:
break
finally:
self._pool.put_nowait(self)

可以看到不论是write失败还是_receive失败,最终都会把connection放回pool中。还有一种可能是放回pool的connection是无效的,导致下一次发送时,收不到回包。

在请求的过程中,会有3个Future。如下图所示:

和这个Case相关的比较重要的是Done Future,Done Future是调用GremlinServerWSProtocol.data_received产生的Future。

只要Write Future执行成功,就会向线程池提任务,获取Done Future,这时候就会标志User Future执行成功。

上面Case中,如果网络产生异常,GremlinServerWSProtocol.data_received的调用是会抛异常的:

1
2
3
4
5
6
# gremlin_python/driver/protocol.py
def data_received(self, message, results_dict):
if message is None:
raise GremlinServerError("Server disconnected - please try to reconnect")
...

需要通过调用栈观察这个异常是如何处理的。下面是GremlinServerWSProtocol.data_received的调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
File "threading.py", line 785, in __bootstrap
self.__bootstrap_inner()
File "threading.py", line 812, in __bootstrap_inner
self.run()
File "threading.py", line 765, in run
self.__target(*self.__args, **self.__kwargs)
File "concurrent/futures/thread.py", line 76, in _worker
work_item.run()
File "concurrent/futures/thread.py", line 63, in run
result = self.fn(*self.args, **self.kwargs)
File "gremlin_python/driver/connection.py", line 80, in _receive
status_code = self._protocol.data_received(data, self._results)

work_item.run()相关的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# concurrent/futures/thread.py
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except:
e, tb = sys.exc_info()[1:]
print("set exception: %r" % self.future)
self.future.set_exception_info(e, tb)
else:
self.future.set_result(result)

可以看到,这个异常会被设置到Done Future的exception中,如果对Done Future调用result(),就会对上层抛出这个异常。

现在问题是,为什么下面这段代码没有访问到Done Future的result()呢:

1
2
3
callback = my_client.submitAsync("g.V().limit(1)")
result_set = callback.result(2)
print("%d: %s" % (count, result_set.__class__.__name__))

从上面的图中可以看到,result_set的done字段是指向Done Future的,而这段代码里面没有相应的调用,如果使用result_set.one(),在这种情况下就会调用Done Future的result,从而抛出异常。

建议

上面的重现代码是模拟压测代码实现的,真实业务模型中不可能不去获取result_set中的值,所以Hang住的问题不会出现。但是运行过程中,如果出现网络抖动,是会造成请求异常的,这时候就需要使用类似于下面的代码进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
count = 0
while True:
try:
callback = my_client.submitAsync("g.V().limit(1)")
result_set = callback.result(2)
print("%d: %s" % (count, result_set.__class__.__name__))
count += 1
# 添加result_set.one()来保证从stream中读取数据,或者调用result_set.done.result()
print(result_set.one())
time.sleep(1)
except GremlinServerError as e:
if e.status_code == 500:
# 尝试重连 'my_client'
continue
raise e

注意:在gremlinpython 3.4.0中这里有Bug,抛出来的是异常不是GremlinServerError而是TypeError,详细见TINKERPOP-2178