Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from fine grained locking throughout the code base to device and domain level locking #743

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions include/nccl_ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,12 @@ struct nccl_net_ofi_domain {
int (*create_endpoint)(nccl_net_ofi_domain_t *domain,
nccl_net_ofi_ep_t **ep);

/* hash table of active endpoints. We reuse endpoints based
* on the thread that calls get_ep().
*/
nccl_net_ofi_ep_t *endpoint_table;
/* endpoint used for (at a minimum) receiving connection
messages. Send/Recv protocol uses this for all
communication. The rdma protocol uses this for all tx
requests and all connection-establishment requests, but may
have additional endpoints for the rx side of rdma writes. */
nccl_net_ofi_ep_t *endpoint;

/* thread id of the thread that called get_domain(). Used as
the hash key for the domain hash */
Expand Down
42 changes: 7 additions & 35 deletions include/nccl_ofi_deque.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ struct nccl_ofi_deque_t {
* locations.
*/
nccl_ofi_deque_elem_t head;
/* Lock for deque operations */
pthread_mutex_t lock;
};
typedef struct nccl_ofi_deque_t nccl_ofi_deque_t;

Expand Down Expand Up @@ -79,17 +77,13 @@ static inline int nccl_ofi_deque_insert_back(nccl_ofi_deque_t *deque, nccl_ofi_d
assert(deque);
assert(deque_elem);

nccl_net_ofi_mutex_lock(&deque->lock);

deque_elem->next = &deque->head;
deque_elem->prev = deque->head.prev;

assert(deque->head.prev);
deque->head.prev->next = deque_elem;
deque->head.prev = deque_elem;

nccl_net_ofi_mutex_unlock(&deque->lock);

return 0;
}

Expand All @@ -104,22 +98,18 @@ static inline int nccl_ofi_deque_insert_front(nccl_ofi_deque_t *deque, nccl_ofi_
assert(deque);
assert(deque_elem);

nccl_net_ofi_mutex_lock(&deque->lock);

deque_elem->next = deque->head.next;
deque_elem->prev = &deque->head;

assert(deque->head.next);
deque->head.next->prev = deque_elem;
deque->head.next = deque_elem;

nccl_net_ofi_mutex_unlock(&deque->lock);

return 0;
}

/*
* Check if the deque is empty. This call does not take the mutex.
* Check if the deque is empty.
*
* @return true if empty, false if not
*/
Expand All @@ -138,28 +128,21 @@ static inline int nccl_ofi_deque_remove_front(nccl_ofi_deque_t *deque, nccl_ofi_
assert(deque);
assert(deque_elem);

/* Shortcut to avoid taking mutex for empty deque */
if (nccl_ofi_deque_isempty(deque)) {
*deque_elem = NULL;
return 0;
}

nccl_net_ofi_mutex_lock(&deque->lock);

/* Check for empty deque. We need to do this again because the check above
was before we acquired the lock. */
if (nccl_ofi_deque_isempty(deque)) {
*deque_elem = NULL;
goto unlock;
}

*deque_elem = deque->head.next;
/* gcc 7.3.1 (AL2) is terrified *deque_elem is NULL here.
* None of us can figure out why, but add this check to make
* the compiler happy. */
if (OFI_UNLIKELY(*deque_elem == NULL)) {
return 0;
}
deque->head.next = (*deque_elem)->next;
(*deque_elem)->next->prev = &deque->head;

unlock:
nccl_net_ofi_mutex_unlock(&deque->lock);

return 0;
}

Expand All @@ -171,8 +154,6 @@ static inline void nccl_ofi_deque_remove(nccl_ofi_deque_t *deque, nccl_ofi_deque
assert(deque);
assert(deque_elem);

nccl_net_ofi_mutex_lock(&deque->lock);

assert(deque_elem->prev && deque_elem->next);

deque_elem->prev->next = deque_elem->next;
Expand All @@ -183,8 +164,6 @@ static inline void nccl_ofi_deque_remove(nccl_ofi_deque_t *deque, nccl_ofi_deque
/* Reset deque_elem pointers to avoid dangling pointers */
deque_elem->prev = NULL;
deque_elem->next = NULL;

nccl_net_ofi_mutex_unlock(&deque->lock);
}

/**
Expand All @@ -196,15 +175,12 @@ static inline nccl_ofi_deque_elem_t *nccl_ofi_deque_get_front(nccl_ofi_deque_t *

nccl_ofi_deque_elem_t *ret_elem = NULL;

nccl_net_ofi_mutex_lock(&deque->lock);

if (nccl_ofi_deque_isempty(deque)) {
ret_elem = NULL;
} else {
ret_elem = deque->head.next;
}

nccl_net_ofi_mutex_unlock(&deque->lock);
return ret_elem;
}

Expand All @@ -218,15 +194,11 @@ static inline nccl_ofi_deque_elem_t *nccl_ofi_deque_get_next(nccl_ofi_deque_t *d

nccl_ofi_deque_elem_t *ret_elem = NULL;

nccl_net_ofi_mutex_lock(&deque->lock);

ret_elem = deque_elem->next;
if (ret_elem == (&deque->head)) {
ret_elem = NULL;
}

nccl_net_ofi_mutex_unlock(&deque->lock);

return ret_elem;
}

Expand Down
13 changes: 1 addition & 12 deletions include/nccl_ofi_freelist.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ struct nccl_ofi_freelist_t {
size_t reginfo_offset;

size_t memcheck_redzone_size;

pthread_mutex_t lock;
};
typedef struct nccl_ofi_freelist_t nccl_ofi_freelist_t;

Expand Down Expand Up @@ -259,13 +257,11 @@ static inline void *nccl_ofi_freelist_entry_alloc(nccl_ofi_freelist_t *freelist)

assert(freelist);

nccl_net_ofi_mutex_lock(&freelist->lock);

if (!freelist->entries) {
ret = nccl_ofi_freelist_add(freelist, freelist->increase_entry_count);
if (ret != 0) {
NCCL_OFI_WARN("Could not extend freelist: %d", ret);
goto cleanup;
return NULL;
}
}

Expand All @@ -276,9 +272,6 @@ static inline void *nccl_ofi_freelist_entry_alloc(nccl_ofi_freelist_t *freelist)
buf = entry->ptr;
nccl_ofi_freelist_entry_set_undefined(freelist, buf);

cleanup:
nccl_net_ofi_mutex_unlock(&freelist->lock);

return buf;
}

Expand All @@ -297,8 +290,6 @@ static inline void nccl_ofi_freelist_entry_free(nccl_ofi_freelist_t *freelist, v
assert(freelist);
assert(entry_p);

nccl_net_ofi_mutex_lock(&freelist->lock);

if (freelist->have_reginfo) {
entry = (struct nccl_ofi_freelist_elem_t *)((uintptr_t)entry_p + freelist->reginfo_offset);
nccl_net_ofi_mem_defined_unaligned(entry, sizeof(*entry));
Expand All @@ -311,8 +302,6 @@ static inline void nccl_ofi_freelist_entry_free(nccl_ofi_freelist_t *freelist, v
freelist->entries = entry;

nccl_net_ofi_mem_noaccess(entry_p, user_entry_size);

nccl_net_ofi_mutex_unlock(&freelist->lock);
}

#ifdef __cplusplus
Expand Down
3 changes: 0 additions & 3 deletions include/nccl_ofi_idpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ typedef struct nccl_ofi_idpool {
/* ID pool bit array. A bit set in the array indicates
that the ID corresponding to its index is available.*/
uint64_t *ids;

/* Lock for concurrency */
pthread_mutex_t lock;
} nccl_ofi_idpool_t;

/*
Expand Down
1 change: 0 additions & 1 deletion include/nccl_ofi_mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ typedef struct nccl_ofi_mr_cache {
size_t used;
uint32_t hit_count;
uint32_t miss_count;
pthread_mutex_t lock;
} nccl_ofi_mr_cache_t;

/**
Expand Down
2 changes: 0 additions & 2 deletions include/nccl_ofi_msgbuff.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ typedef struct {
uint16_t msg_last_incomplete;
// Points to the message after the inserted message with highest sequence number.
uint16_t msg_next;
// Mutex for this msg buffer -- locks all non-init operations
pthread_mutex_t lock;
} nccl_ofi_msgbuff_t;

/**
Expand Down
13 changes: 3 additions & 10 deletions include/nccl_ofi_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ typedef uint16_t nccl_ofi_rdma_msg_type_t;
* allocate a RDMA memory registration handle with `num_rails`+`num_control_rails` rails.
*/
typedef struct nccl_net_ofi_rdma_mr_handle {
struct nccl_net_ofi_rdma_device *device;

int num_rails;

Expand Down Expand Up @@ -408,12 +409,6 @@ typedef struct nccl_net_ofi_rdma_req {
/* Size of completed request */
size_t size;

/*
* Protect updating critical fields such as size and ncompls when
* network xfer happened over multiple rails
*/
pthread_mutex_t req_lock;

/* State of request */
nccl_net_ofi_rdma_req_state_t state;

Expand Down Expand Up @@ -544,7 +539,6 @@ typedef struct nccl_net_ofi_rdma_send_comm {

nccl_ofi_deque_elem_t cleanup_list_elem;

pthread_mutex_t ctrl_recv_lock;
bool received_close_message;
/* Counters for total sent and received control messages */
uint64_t n_ctrl_received;
Expand Down Expand Up @@ -624,7 +618,6 @@ typedef struct nccl_net_ofi_rdma_recv_comm {
nccl_ofi_deque_elem_t cleanup_list_elem;

/* Counters for total sent and received control messages */
pthread_mutex_t ctrl_counter_lock;
uint64_t n_ctrl_sent;
uint64_t n_ctrl_delivered;

Expand Down Expand Up @@ -701,8 +694,6 @@ struct nccl_net_ofi_ep_rail {
size_t min_bounce_posted;
/* Maximum posted bounce buffers (see RDMA_MAX_POSTED_BOUNCE_BUFFERS) */
size_t max_bounce_posted;
/* Mutex for bounce buffer operations */
pthread_mutex_t bounce_mutex;
};

/*
Expand Down Expand Up @@ -841,6 +832,8 @@ typedef struct nccl_net_ofi_rdma_domain {

/* List of endpoints and set of addresses they have connections to */
nccl_ofi_ep_addr_list_t *ep_addr_list;

pthread_mutex_t rdma_domain_lock;
} nccl_net_ofi_rdma_domain_t;


Expand Down
3 changes: 1 addition & 2 deletions include/nccl_ofi_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ typedef struct nccl_net_ofi_threshold_scheduler {
nccl_net_ofi_scheduler_t base;
/* Round robin counter */
unsigned int rr_counter;
/* Lock for round robin counter */
pthread_mutex_t rr_lock;

/* Minimum size of the message in bytes before message is
* multiplexed */
size_t min_stripe_size;
Expand Down
3 changes: 3 additions & 0 deletions include/nccl_ofi_sendrecv.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ typedef struct nccl_net_ofi_sendrecv_ep {
/* Current available tag ID */
uint64_t tag;

/* copy of device's max_tag to reading device information */
uint64_t max_tag;

/* Endpoint handle to communicate to */
struct fid_ep *ofi_ep;

Expand Down
1 change: 1 addition & 0 deletions m4/check_pkg_libfabric.m4
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ AC_DEFUN([CHECK_PKG_LIBFABRIC], [
FI_OPT_MAX_MSG_SIZE,
FI_OPT_SHARED_MEMORY_PERMITTED,
FI_MR_DMABUF,
FI_PROGRESS_CONTROL_UNIFIED,
FI_OPT_INJECT_RMA_SIZE],
[], [], [AC_INCLUDES_DEFAULT
[#include <rdma/fi_endpoint.h>
Expand Down
15 changes: 0 additions & 15 deletions src/nccl_ofi_deque.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@ int nccl_ofi_deque_init(nccl_ofi_deque_t **deque_p)
deque->head.prev = &deque->head;
deque->head.next = &deque->head;

int ret = pthread_mutex_init(&deque->lock, NULL);
if (ret != 0) {
NCCL_OFI_WARN("Failed to initialize deque mutex.");
free(deque);
return -ret;
}

assert(deque_p);
*deque_p = deque;

Expand All @@ -40,14 +33,6 @@ int nccl_ofi_deque_finalize(nccl_ofi_deque_t *deque)
{
assert(deque);

/* Since user allocates all memory used for deque elements, we don't need to
deallocate any entries here. :D */
int ret = pthread_mutex_destroy(&deque->lock);
if (ret != 0) {
NCCL_OFI_WARN("Failed to destroy deque mutex.");
return -ret;
}

free(deque);
return 0;
}
Loading
Loading