Merge branch 'master' into for_paulus
[linux-drm-fsl-dcu.git] / fs / dlm / lowcomms-tcp.c
index 9be3a440c42a084ba5c335d05d77218c1a480b21..07e0a122c32fcafe29546a2c02dabbcb93a019c9 100644 (file)
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -96,10 +96,7 @@ static bool cbuf_empty(struct cbuf *cb)
 struct connection {
        struct socket *sock;    /* NULL if not connected */
        uint32_t nodeid;        /* So we know who we are in the list */
-       struct rw_semaphore sock_sem; /* Stop connect races */
-       struct list_head read_list;   /* On this list when ready for reading */
-       struct list_head write_list;  /* On this list when ready for writing */
-       struct list_head state_list;  /* On this list when ready to connect */
+       struct mutex sock_mutex;
        unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
 #define CF_READ_PENDING 1
 #define CF_WRITE_PENDING 2
@@ -112,9 +109,10 @@ struct connection {
        struct page *rx_page;
        struct cbuf cb;
        int retries;
-       atomic_t waiting_requests;
 #define MAX_CONNECT_RETRIES 3
        struct connection *othercon;
+       struct work_struct rwork; /* Receive workqueue */
+       struct work_struct swork; /* Send workqueue */
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
@@ -131,14 +129,9 @@ struct writequeue_entry {
 
 static struct sockaddr_storage dlm_local_addr;
 
-/* Manage daemons */
-static struct task_struct *recv_task;
-static struct task_struct *send_task;
-
-static wait_queue_t lowcomms_send_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
-static wait_queue_t lowcomms_recv_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
+/* Work queues */
+static struct workqueue_struct *recv_workqueue;
+static struct workqueue_struct *send_workqueue;
 
 /* An array of pointers to connections, indexed by NODEID */
 static struct connection **connections;
@@ -146,17 +139,8 @@ static DECLARE_MUTEX(connections_lock);
 static struct kmem_cache *con_cache;
 static int conn_array_size;
 
-/* List of sockets that have reads pending */
-static LIST_HEAD(read_sockets);
-static DEFINE_SPINLOCK(read_sockets_lock);
-
-/* List of sockets which have writes pending */
-static LIST_HEAD(write_sockets);
-static DEFINE_SPINLOCK(write_sockets_lock);
-
-/* List of sockets which have connects pending */
-static LIST_HEAD(state_sockets);
-static DEFINE_SPINLOCK(state_sockets_lock);
+static void process_recv_sockets(struct work_struct *work);
+static void process_send_sockets(struct work_struct *work);
 
 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 {
@@ -186,9 +170,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
                        goto finish;
 
                con->nodeid = nodeid;
-               init_rwsem(&con->sock_sem);
+               mutex_init(&con->sock_mutex);
                INIT_LIST_HEAD(&con->writequeue);
                spin_lock_init(&con->writequeue_lock);
+               INIT_WORK(&con->swork, process_send_sockets);
+               INIT_WORK(&con->rwork, process_recv_sockets);
 
                connections[nodeid] = con;
        }
@@ -203,41 +189,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
        struct connection *con = sock2con(sk);
 
-       atomic_inc(&con->waiting_requests);
-       if (test_and_set_bit(CF_READ_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&read_sockets_lock);
-       list_add_tail(&con->read_list, &read_sockets);
-       spin_unlock_bh(&read_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_recv_waitq);
+       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+               queue_work(recv_workqueue, &con->rwork);
 }
 
 static void lowcomms_write_space(struct sock *sk)
 {
        struct connection *con = sock2con(sk);
 
-       if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&write_sockets_lock);
-       list_add_tail(&con->write_list, &write_sockets);
-       spin_unlock_bh(&write_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
+               queue_work(send_workqueue, &con->swork);
 }
 
 static inline void lowcomms_connect_sock(struct connection *con)
 {
-       if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&state_sockets_lock);
-       list_add_tail(&con->state_list, &state_sockets);
-       spin_unlock_bh(&state_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
+               queue_work(send_workqueue, &con->swork);
 }
 
 static void lowcomms_state_change(struct sock *sk)
@@ -279,7 +246,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 /* Close a remote connection and tidy up */
 static void close_connection(struct connection *con, bool and_other)
 {
-       down_write(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
 
        if (con->sock) {
                sock_release(con->sock);
@@ -294,24 +261,27 @@ static void close_connection(struct connection *con, bool and_other)
                con->rx_page = NULL;
        }
        con->retries = 0;
-       up_write(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
 }
 
 /* Data received from remote end */
 static int receive_from_sock(struct connection *con)
 {
        int ret = 0;
-       struct msghdr msg;
-       struct iovec iov[2];
-       mm_segment_t fs;
+       struct msghdr msg = {};
+       struct kvec iov[2];
        unsigned len;
        int r;
        int call_again_soon = 0;
+       int nvec;
 
-       down_read(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
+
+       if (con->sock == NULL) {
+               ret = -EAGAIN;
+               goto out_close;
+       }
 
-       if (con->sock == NULL)
-               goto out;
        if (con->rx_page == NULL) {
                /*
                 * This doesn't need to be atomic, but I think it should
@@ -323,21 +293,13 @@ static int receive_from_sock(struct connection *con)
                cbuf_init(&con->cb, PAGE_CACHE_SIZE);
        }
 
-       msg.msg_control = NULL;
-       msg.msg_controllen = 0;
-       msg.msg_iovlen = 1;
-       msg.msg_iov = iov;
-       msg.msg_name = NULL;
-       msg.msg_namelen = 0;
-       msg.msg_flags = 0;
-
        /*
         * iov[0] is the bit of the circular buffer between the current end
         * point (cb.base + cb.len) and the end of the buffer.
         */
        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
-       iov[1].iov_len = 0;
+       nvec = 1;
 
        /*
         * iov[1] is the bit of the circular buffer between the start of the
@@ -347,18 +309,18 @@ static int receive_from_sock(struct connection *con)
                iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
                iov[1].iov_len = con->cb.base;
                iov[1].iov_base = page_address(con->rx_page);
-               msg.msg_iovlen = 2;
+               nvec = 2;
        }
        len = iov[0].iov_len + iov[1].iov_len;
 
-       fs = get_fs();
-       set_fs(get_ds());
-       r = ret = sock_recvmsg(con->sock, &msg, len,
+       r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
                               MSG_DONTWAIT | MSG_NOSIGNAL);
-       set_fs(fs);
 
        if (ret <= 0)
                goto out_close;
+       if (ret == -EAGAIN)
+               goto out_resched;
+
        if (ret == len)
                call_again_soon = 1;
        cbuf_add(&con->cb, ret);
@@ -381,24 +343,26 @@ static int receive_from_sock(struct connection *con)
                con->rx_page = NULL;
        }
 
-out:
        if (call_again_soon)
                goto out_resched;
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        return 0;
 
 out_resched:
-       lowcomms_data_ready(con->sock->sk, 0);
-       up_read(&con->sock_sem);
-       cond_resched();
-       return 0;
+       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+               queue_work(recv_workqueue, &con->rwork);
+       mutex_unlock(&con->sock_mutex);
+       return -EAGAIN;
 
 out_close:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
                close_connection(con, false);
                /* Reconnect when there is something to send */
        }
+       /* Don't return success if we really got EOF */
+       if (ret == 0)
+               ret = -EAGAIN;
 
        return ret;
 }
@@ -412,6 +376,7 @@ static int accept_from_sock(struct connection *con)
        int len;
        int nodeid;
        struct connection *newcon;
+       struct connection *addcon;
 
        memset(&peeraddr, 0, sizeof(peeraddr));
        result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
@@ -419,7 +384,7 @@ static int accept_from_sock(struct connection *con)
        if (result < 0)
                return -ENOMEM;
 
-       down_read(&con->sock_sem);
+       mutex_lock_nested(&con->sock_mutex, 0);
 
        result = -ENOTCONN;
        if (con->sock == NULL)
@@ -445,7 +410,7 @@ static int accept_from_sock(struct connection *con)
        if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
                printk("dlm: connect from non cluster node\n");
                sock_release(newsock);
-               up_read(&con->sock_sem);
+               mutex_unlock(&con->sock_mutex);
                return -1;
        }
 
@@ -462,7 +427,7 @@ static int accept_from_sock(struct connection *con)
                result = -ENOMEM;
                goto accept_err;
        }
-       down_write(&newcon->sock_sem);
+       mutex_lock_nested(&newcon->sock_mutex, 1);
        if (newcon->sock) {
                struct connection *othercon = newcon->othercon;
 
@@ -470,41 +435,45 @@ static int accept_from_sock(struct connection *con)
                        othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
                        if (!othercon) {
                                printk("dlm: failed to allocate incoming socket\n");
-                               up_write(&newcon->sock_sem);
+                               mutex_unlock(&newcon->sock_mutex);
                                result = -ENOMEM;
                                goto accept_err;
                        }
                        othercon->nodeid = nodeid;
                        othercon->rx_action = receive_from_sock;
-                       init_rwsem(&othercon->sock_sem);
+                       mutex_init(&othercon->sock_mutex);
+                       INIT_WORK(&othercon->swork, process_send_sockets);
+                       INIT_WORK(&othercon->rwork, process_recv_sockets);
                        set_bit(CF_IS_OTHERCON, &othercon->flags);
                        newcon->othercon = othercon;
                }
                othercon->sock = newsock;
                newsock->sk->sk_user_data = othercon;
                add_sock(newsock, othercon);
+               addcon = othercon;
        }
        else {
                newsock->sk->sk_user_data = newcon;
                newcon->rx_action = receive_from_sock;
                add_sock(newsock, newcon);
-
+               addcon = newcon;
        }
 
-       up_write(&newcon->sock_sem);
+       mutex_unlock(&newcon->sock_mutex);
 
        /*
         * Add it to the active queue in case we got data
         * beween processing the accept adding the socket
         * to the read_sockets list
         */
-       lowcomms_data_ready(newsock->sk, 0);
-       up_read(&con->sock_sem);
+       if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
+               queue_work(recv_workqueue, &addcon->rwork);
+       mutex_unlock(&con->sock_mutex);
 
        return 0;
 
 accept_err:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        sock_release(newsock);
 
        if (result != -EAGAIN)
@@ -525,7 +494,7 @@ static void connect_to_sock(struct connection *con)
                return;
        }
 
-       down_write(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
        if (con->retries++ > MAX_CONNECT_RETRIES)
                goto out;
 
@@ -548,7 +517,7 @@ static void connect_to_sock(struct connection *con)
        sock->sk->sk_user_data = con;
        con->rx_action = receive_from_sock;
 
-       make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
+       make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
 
        add_sock(sock, con);
 
@@ -577,7 +546,7 @@ out_err:
                result = 0;
        }
 out:
-       up_write(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        return;
 }
 
@@ -616,10 +585,10 @@ static struct socket *create_listen_sock(struct connection *con,
        con->sock = sock;
 
        /* Bind to our port */
-       make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
+       make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
        if (result < 0) {
-               printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
+               printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
                sock_release(sock);
                sock = NULL;
                con->sock = NULL;
@@ -638,7 +607,7 @@ static struct socket *create_listen_sock(struct connection *con,
 
        result = sock->ops->listen(sock, 5);
        if (result < 0) {
-               printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
+               printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port);
                sock_release(sock);
                sock = NULL;
                goto create_out;
@@ -709,6 +678,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
        if (!con)
                return NULL;
 
+       spin_lock(&con->writequeue_lock);
        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
        if ((&e->list == &con->writequeue) ||
            (PAGE_CACHE_SIZE - e->end < len)) {
@@ -747,6 +717,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
        struct connection *con = e->con;
        int users;
 
+       spin_lock(&con->writequeue_lock);
        users = --e->users;
        if (users)
                goto out;
@@ -754,12 +725,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
        kunmap(e->page);
        spin_unlock(&con->writequeue_lock);
 
-       if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
-               spin_lock_bh(&write_sockets_lock);
-               list_add_tail(&con->write_list, &write_sockets);
-               spin_unlock_bh(&write_sockets_lock);
-
-               wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
+               queue_work(send_workqueue, &con->swork);
        }
        return;
 
@@ -783,7 +750,7 @@ static void send_to_sock(struct connection *con)
        struct writequeue_entry *e;
        int len, offset;
 
-       down_read(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
        if (con->sock == NULL)
                goto out_connect;
 
@@ -800,6 +767,7 @@ static void send_to_sock(struct connection *con)
                offset = e->offset;
                BUG_ON(len == 0 && e->users == 0);
                spin_unlock(&con->writequeue_lock);
+               kmap(e->page);
 
                ret = 0;
                if (len) {
@@ -828,18 +796,18 @@ static void send_to_sock(struct connection *con)
        }
        spin_unlock(&con->writequeue_lock);
 out:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        return;
 
 send_error:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        close_connection(con, false);
        lowcomms_connect_sock(con);
        return;
 
 out_connect:
-       up_read(&con->sock_sem);
-       lowcomms_connect_sock(con);
+       mutex_unlock(&con->sock_mutex);
+       connect_to_sock(con);
        return;
 }
 
@@ -872,7 +840,6 @@ int dlm_lowcomms_close(int nodeid)
        if (con) {
                clean_one_writequeue(con);
                close_connection(con, true);
-               atomic_set(&con->waiting_requests, 0);
        }
        return 0;
 
@@ -880,102 +847,29 @@ out:
        return -1;
 }
 
-/* API send message call, may queue the request */
-/* N.B. This is the old interface - use the new one for new calls */
-int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
-{
-       struct writequeue_entry *e;
-       char *b;
-
-       e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
-       if (e) {
-               memcpy(b, buf, len);
-               dlm_lowcomms_commit_buffer(e);
-               return 0;
-       }
-       return -ENOBUFS;
-}
-
 /* Look for activity on active sockets */
-static void process_sockets(void)
+static void process_recv_sockets(struct work_struct *work)
 {
-       struct list_head *list;
-       struct list_head *temp;
-       int count = 0;
-
-       spin_lock_bh(&read_sockets_lock);
-       list_for_each_safe(list, temp, &read_sockets) {
-
-               struct connection *con =
-                       list_entry(list, struct connection, read_list);
-               list_del(&con->read_list);
-               clear_bit(CF_READ_PENDING, &con->flags);
-
-               spin_unlock_bh(&read_sockets_lock);
-
-               /* This can reach zero if we are processing requests
-                * as they come in.
-                */
-               if (atomic_read(&con->waiting_requests) == 0) {
-                       spin_lock_bh(&read_sockets_lock);
-                       continue;
-               }
-
-               do {
-                       con->rx_action(con);
-
-                       /* Don't starve out everyone else */
-                       if (++count >= MAX_RX_MSG_COUNT) {
-                               cond_resched();
-                               count = 0;
-                       }
+       struct connection *con = container_of(work, struct connection, rwork);
+       int err;
 
-               } while (!atomic_dec_and_test(&con->waiting_requests) &&
-                        !kthread_should_stop());
-
-               spin_lock_bh(&read_sockets_lock);
-       }
-       spin_unlock_bh(&read_sockets_lock);
+       clear_bit(CF_READ_PENDING, &con->flags);
+       do {
+               err = con->rx_action(con);
+       } while (!err);
 }
 
-/* Try to send any messages that are pending
- */
-static void process_output_queue(void)
-{
-       struct list_head *list;
-       struct list_head *temp;
-
-       spin_lock_bh(&write_sockets_lock);
-       list_for_each_safe(list, temp, &write_sockets) {
-               struct connection *con =
-                       list_entry(list, struct connection, write_list);
-               clear_bit(CF_WRITE_PENDING, &con->flags);
-               list_del(&con->write_list);
 
-               spin_unlock_bh(&write_sockets_lock);
-               send_to_sock(con);
-               spin_lock_bh(&write_sockets_lock);
-       }
-       spin_unlock_bh(&write_sockets_lock);
-}
-
-static void process_state_queue(void)
+static void process_send_sockets(struct work_struct *work)
 {
-       struct list_head *list;
-       struct list_head *temp;
-
-       spin_lock_bh(&state_sockets_lock);
-       list_for_each_safe(list, temp, &state_sockets) {
-               struct connection *con =
-                       list_entry(list, struct connection, state_list);
-               list_del(&con->state_list);
-               clear_bit(CF_CONNECT_PENDING, &con->flags);
-               spin_unlock_bh(&state_sockets_lock);
+       struct connection *con = container_of(work, struct connection, swork);
 
+       if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
                connect_to_sock(con);
-               spin_lock_bh(&state_sockets_lock);
        }
-       spin_unlock_bh(&state_sockets_lock);
+
+       clear_bit(CF_WRITE_PENDING, &con->flags);
+       send_to_sock(con);
 }
 
 
@@ -992,109 +886,33 @@ static void clean_writequeues(void)
        }
 }
 
-static int read_list_empty(void)
-{
-       int status;
-
-       spin_lock_bh(&read_sockets_lock);
-       status = list_empty(&read_sockets);
-       spin_unlock_bh(&read_sockets_lock);
-
-       return status;
-}
-
-/* DLM Transport comms receive daemon */
-static int dlm_recvd(void *data)
+static void work_stop(void)
 {
-       init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
-       add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (read_list_empty())
-                       cond_resched();
-               set_current_state(TASK_RUNNING);
-
-               process_sockets();
-       }
-
-       return 0;
+       destroy_workqueue(recv_workqueue);
+       destroy_workqueue(send_workqueue);
 }
 
-static int write_and_state_lists_empty(void)
+static int work_start(void)
 {
-       int status;
-
-       spin_lock_bh(&write_sockets_lock);
-       status = list_empty(&write_sockets);
-       spin_unlock_bh(&write_sockets_lock);
-
-       spin_lock_bh(&state_sockets_lock);
-       if (list_empty(&state_sockets) == 0)
-               status = 0;
-       spin_unlock_bh(&state_sockets_lock);
-
-       return status;
-}
-
-/* DLM Transport send daemon */
-static int dlm_sendd(void *data)
-{
-       init_waitqueue_entry(&lowcomms_send_waitq_head, current);
-       add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (write_and_state_lists_empty())
-                       cond_resched();
-               set_current_state(TASK_RUNNING);
-
-               process_state_queue();
-               process_output_queue();
-       }
-
-       return 0;
-}
-
-static void daemons_stop(void)
-{
-       kthread_stop(recv_task);
-       kthread_stop(send_task);
-}
-
-static int daemons_start(void)
-{
-       struct task_struct *p;
        int error;
-
-       p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
-       error = IS_ERR(p);
+       recv_workqueue = create_workqueue("dlm_recv");
+       error = IS_ERR(recv_workqueue);
        if (error) {
-               log_print("can't start dlm_recvd %d", error);
+               log_print("can't start dlm_recv %d", error);
                return error;
        }
-       recv_task = p;
 
-       p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
-       error = IS_ERR(p);
+       send_workqueue = create_singlethread_workqueue("dlm_send");
+       error = IS_ERR(send_workqueue);
        if (error) {
-               log_print("can't start dlm_sendd %d", error);
-               kthread_stop(recv_task);
+               log_print("can't start dlm_send %d", error);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       send_task = p;
 
        return 0;
 }
 
-/*
- * Return the largest buffer size we can cope with.
- */
-int lowcomms_max_buffer_size(void)
-{
-       return PAGE_CACHE_SIZE;
-}
-
 void dlm_lowcomms_stop(void)
 {
        int i;
@@ -1107,7 +925,7 @@ void dlm_lowcomms_stop(void)
                        connections[i]->flags |= 0xFF;
        }
 
-       daemons_stop();
+       work_stop();
        clean_writequeues();
 
        for (i = 0; i < conn_array_size; i++) {
@@ -1159,7 +977,7 @@ int dlm_lowcomms_start(void)
        if (error)
                goto fail_unlisten;
 
-       error = daemons_start();
+       error = work_start();
        if (error)
                goto fail_unlisten;