Logo Search packages:      
Sourcecode: zookeeper version File versions  Download package

ZOOAPI int zookeeper_interest ( zhandle_t zh,
int *  fd,
int *  interest,
struct timeval *  tv 
)

Returns the events that zookeeper is interested in.

Parameters:
zh the zookeeper handle obtained by a call to zookeeper_init
fd is the file descriptor of interest
interest is an or of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags to indicate the I/O of interest on fd.
tv a timeout value to be used with select/poll system call
Returns:
a result code. ZOK - success ZBADARGUMENTS - invalid input parameters ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE ZCONNECTIONLOSS - a network error occured while attempting to establish a connection to the server ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory ZOPERATIONTIMEOUT - hasn't received anything from the server for 2/3 of the timeout value specified in zookeeper_init() ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details

Definition at line 1323 of file zookeeper.c.

References _zhandle::addrs, _zhandle::addrs_count, _zhandle::connect_index, _zhandle::fd, is_unrecoverable(), _zhandle::last_ping, _zhandle::last_recv, _zhandle::last_send, _zhandle::next_deadline, _zhandle::recv_timeout, _zhandle::state, _zhandle::to_send, ZBADARGUMENTS, ZCONNECTIONLOSS, ZINVALIDSTATE, ZOK, zookeeper_interest(), and ZOPERATIONTIMEOUT.

Referenced by zookeeper_interest().

{
    struct timeval now;
    if(zh==0 || fd==0 ||interest==0 || tv==0)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    gettimeofday(&now, 0);
    if(zh->next_deadline.tv_sec!=0 || zh->next_deadline.tv_usec!=0){
        int time_left = calculate_interval(&zh->next_deadline, &now);
        if (time_left > 10)
            LOG_WARN(("Exceeded deadline by %dms", time_left));
    }
    api_prolog(zh);
    *fd = zh->fd;
    *interest = 0;
    tv->tv_sec = 0;
    tv->tv_usec = 0;
    if (*fd == -1) {
        if (zh->connect_index == zh->addrs_count) {
            /* Wait a bit before trying again so that we don't spin */
            zh->connect_index = 0;
        }else {
            int rc;
            int on = 1;
            
            zh->fd = socket(PF_INET, SOCK_STREAM, 0);
            setsockopt(zh->fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(int));
            fcntl(zh->fd, F_SETFL, O_NONBLOCK|fcntl(zh->fd, F_GETFL, 0));
            rc = connect(zh->fd, &zh->addrs[zh->connect_index],
                    sizeof(struct sockaddr));
            if (rc == -1) {
                if (errno == EWOULDBLOCK || errno == EINPROGRESS)
                    zh->state = ZOO_CONNECTING_STATE;
                else
                    return api_epilog(zh,handle_socket_error_msg(zh,__LINE__,
                            ZCONNECTIONLOSS,"connect() call failed"));
            } else {
                if((rc=prime_connection(zh))!=0)
                    return api_epilog(zh,rc);

                LOG_INFO(("Initiated connection to server [%s]", 
                        format_endpoint_info(&zh->addrs[zh->connect_index])));
            }
        }
        *fd = zh->fd;
        *tv = get_timeval(zh->recv_timeout/3);
        zh->last_recv = now;
        zh->last_send = now;
        zh->last_ping = now;
    }
    if (zh->fd != -1) {
        int idle_recv = calculate_interval(&zh->last_recv, &now);
        int idle_send = calculate_interval(&zh->last_send, &now);
        int recv_to = zh->recv_timeout*2/3 - idle_recv;
        int send_to = zh->recv_timeout/3;
        // have we exceeded the receive timeout threshold?
        if (recv_to <= 0) {
            // We gotta cut our losses and connect to someone else
            errno = ETIMEDOUT;               
            *fd=-1;
            *interest=0;
            *tv = get_timeval(0);
            return api_epilog(zh,handle_socket_error_msg(zh,
                    __LINE__,ZOPERATIONTIMEOUT,
                    "connection timed out (exceeded timeout by %dms)",-recv_to));
        }
        // We only allow 1/3 of our timeout time to expire before sending
        // a PING
        if (zh->state==ZOO_CONNECTED_STATE) {
            send_to = zh->recv_timeout/3 - idle_send;
            if (send_to <= 0 && zh->sent_requests.head==0) {
//                LOG_DEBUG(("Sending PING to %s (exceeded idle by %dms)",
//                                format_current_endpoint_info(zh),-send_to));
                int rc=send_ping(zh);
                if (rc < 0){
                    LOG_ERROR(("failed to send PING request (zk retcode=%d)",rc));
                    return api_epilog(zh,rc);
                }
                send_to = zh->recv_timeout/3;
            }
        }
        // choose the lesser value as the timeout
        *tv = get_timeval(recv_to < send_to? recv_to:send_to);
        zh->next_deadline.tv_sec = now.tv_sec + tv->tv_sec;
        zh->next_deadline.tv_usec = now.tv_usec + tv->tv_usec;
        if (zh->next_deadline.tv_usec > 1000000) {
            zh->next_deadline.tv_sec += zh->next_deadline.tv_usec / 1000000;
            zh->next_deadline.tv_usec = zh->next_deadline.tv_usec % 1000000;
        }
        *interest = ZOOKEEPER_READ;
        if (zh->to_send.head || zh->state == ZOO_CONNECTING_STATE) {
            *interest |= ZOOKEEPER_WRITE;
        }
    }
    return api_epilog(zh,ZOK);
}


Generated by  Doxygen 1.6.0   Back to index