Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,21 @@ static void DeallocateCq(ibv_cq* cq) {
LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err);
}

static int DrainCq(ibv_cq* cq) {
if (NULL == cq) {
return 0;
}

ibv_wc wc;
int ret;
do {
ret = ibv_poll_cq(cq, 1, &wc);
} while (ret > 0);

LOG_IF(ERROR, ret < 0) << "drain CQ failed: " << ret;
return ret;
}

void RdmaEndpoint::DeallocateResources() {
if (!_resource) {
return;
Expand All @@ -1341,6 +1356,7 @@ void RdmaEndpoint::DeallocateResources() {
}

bool remove_consumer = true;
_reclaim:
if (!move_to_rdma_resource_list) {
if (NULL != _resource->qp) {
int err = IbvDestroyQp(_resource->qp);
Expand Down Expand Up @@ -1384,6 +1400,24 @@ void RdmaEndpoint::DeallocateResources() {
}

if (move_to_rdma_resource_list) {
// When a QP is moved to the RESET state, all associated send and
// receive queues are flushed, meaning any outstanding WRs are effectively
// abandoned by the hardware.
//
// However, the CQ associated with that QP is *not* cleared automatically,
// meaning that it will still contain entries for WRs that completed before
// the reset.
//
// The application should finish polling the CQ to remove these obsolete
// entries before reusing the QP.
int ret = DrainCq(_resource->polling_cq);
ret += DrainCq(_resource->send_cq);
ret += DrainCq(_resource->recv_cq);
if (ret < 0) {
move_to_rdma_resource_list = false;
goto _reclaim;
Comment on lines +1403 to +1418
}
Comment on lines +1413 to +1419

BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex);
_resource->next = g_rdma_resource_list;
g_rdma_resource_list = _resource;
Expand Down
Loading