当 Gremlin SDK 遇上 Celery

背景

收到使用方反馈问题:在并发请求的场景,Gremlin Python SDK有概率会报错KeyError,具体的堆栈如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[WARNING/ForkPoolWorker-2] '8bd6c40e-5dcd-4848-b988-ab3bf8243a30'
[WARNING/ForkPoolWorker-2] <class 'KeyError'>
[WARNING/ForkPoolWorker-2] Traceback (most recent call last):
...
File "../lib/python3.5/site-packages/gremlin_python/driver/resultset.py", line 83, in one
return self.done.result()
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
return self.__get_result()
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
raise self._exception
File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "../lib/python3.5/site-packages/gremlin_python/driver/connection.py", line 80, in _receive
status_code = self._protocol.data_received(data, self._results)
File "../lib/python3.5/site-packages/gremlin_python/driver/protocol.py", line 84, in data_received
result_set = results_dict[request_id]

排查

ResultsDict更新

从代码来看,从message中拿到的request_idresults_dict中是不可能找不到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def data_received(self, message, results_dict):
# if Gremlin Server cuts off then we get a None for the message
if message is None:
raise GremlinServerError(500, "500: Server disconnected - please try to reconnect")
message = self._message_serializer.deserialize_message(json.loads(message.decode('utf-8')))
request_id = message['requestId']
result_set = results_dict[request_id]
status_code = message['status']['code']
aggregate_to = message['result']['meta'].get('aggregateTo', 'list')
data = message['result']['data']
result_set.aggregate_to = aggregate_to
if status_code == 407:
# 鉴权
elif status_code == 204:
result_set.stream.put_nowait([])
del results_dict[request_id]
return status_code
elif status_code in [200, 206]:
result_set.stream.put_nowait(data)
if status_code == 200:
del results_dict[request_id]
return status_code
else:
del results_dict[request_id]
raise GremlinServerError(status_code,
"{0}: {1}".format(status_code, message["status"]["message"]))

所有从results_dict中删除request_id都是在data_received中进行的。

如果找不到request_id看起来极有可能是在data_received中收到了相同的request_id的两次消息

真相渐显

在发送和接收Message的位置添加日志之后,发现并没有重复收到相同的request_id的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[WARNING/ForkPoolWorker-2] Send ResultDictId: 139943049597576, RequestId: b847a328-5c77-4419-8009-3d5b99b43a3d
[WARNING/ForkPoolWorker-3] Send ResultDictId: 139943049597576, RequestId: 44107b26-82b4-406b-9da2-02513f6a4748
[WARNING/ForkPoolWorker-3] Recv: ResultsDictId: 139943049597576, RequestId: b847a328-5c77-4419-8009-3d5b99b43a3d
[WARNING/ForkPoolWorker-3] 'b847a328-5c77-4419-8009-3d5b99b43a3d'
[WARNING/ForkPoolWorker-3] <class 'KeyError'>
[WARNING/ForkPoolWorker-3] Traceback (most recent call last):
...
File "../lib/python3.5/site-packages/gremlin_python/driver/resultset.py", line 83, in one
return self.done.result()
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
return self.__get_result()
File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
raise self._exception
File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "../lib/python3.5/site-packages/gremlin_python/driver/connection.py", line 81, in _receive
status_code = self._protocol.data_received(data, self._results)
File "../lib/python3.5/site-packages/gremlin_python/driver/protocol.py", line 84, in data_received
result_set = results_dict[request_id]
KeyError: 'b847a328-5c77-4419-8009-3d5b99b43a3d'

虽然之前的推断没有成立,但是从日志中可以看到出错的RequestIdb847a328-5c77-4419-8009-3d5b99b43a3d,对应的Request是在ForkPoolWorker-2中发送的,但是在ForkPoolWorker-3中接收到相应的Response的,随后报错KeyError的。

考虑到业务方使用了Celery框架,上面的ForkPoolWorker-2ForkPoolWorker-3是在框架中Fork出来的子进程。所以,目前看来比较合理的假设是,Gremlin Python SDK在多进程模型下使用的问题

水落石出

在单进程情况下,Gremlin Python SDK的处理流程如下:

在多进程的模型下,现有的Python SDK的处理是存在问题的。由于client实例是在主进程中创建的,其数据结构会被其它子进程共享,所以Socket的句柄是共享的。但是ResultsDict在子进程中会被写入,每个子进程维护了一份自己的拷贝。

可能子进程1发送了request,然后子进程2收到相应的response,这个时候会丢失一些上下文的信息。

解决方案

处理上述问题,可以将Socket的创建延迟到write时,这样就不会产生不同的子进程同享同一个Socket的情况。已经提交相关PR并合并到官方。