Logo Search packages:      
Sourcecode: zookeeper version File versions  Download package

ZOOAPI int zookeeper_process ( zhandle_t zh,
int  events 
)

Notifies zookeeper that an event of interest has happened.

Parameters:
zh the zookeeper handle obtained by a call to zookeeper_init
events will be an OR of the ZOOKEEPER_WRITE and ZOOKEEPER_READ flags.
Returns:
a result code. ZOK - success ZBADARGUMENTS - invalid input parameters ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE ZCONNECTIONLOSS - a network error occured while attempting to send request to server ZSESSIONEXPIRED - connection attempt failed -- the session's expired ZAUTHFAILED - authentication request failed, e.i. invalid credentials ZRUNTIMEINCONSISTENCY - a server response came out of order ZSYSTEMERROR -- a system (OS) error occured; it's worth checking errno to get details ZNOTHING -- not an error; simply indicates that there no more data from the server to be processed (when called with ZOOKEEPER_READ flag).

Definition at line 1723 of file zookeeper.c.

References _buffer_list::buffer, _zhandle::completions_to_process, _buffer_list::curr_offset, is_unrecoverable(), _zhandle::last_ping, _zhandle::last_zxid, _zhandle::outstanding_sync, _zhandle::sent_requests, _zhandle::to_process, _zhandle::watcher, ZAUTHFAILED, ZBADARGUMENTS, ZINVALIDSTATE, ZOK, zookeeper_process(), and ZRUNTIMEINCONSISTENCY.

Referenced by zookeeper_process().

{
    buffer_list_t *bptr;
    int rc;

    if (zh==NULL)
        return ZBADARGUMENTS;
    if (is_unrecoverable(zh))
        return ZINVALIDSTATE;
    api_prolog(zh);
    IF_DEBUG(checkResponseLatency(zh));
    rc = check_events(zh, events);
    if (rc!=ZOK)
        return api_epilog(zh, rc);

    IF_DEBUG(isSocketReadable(zh));
    
    while (rc >= 0 && (bptr=dequeue_buffer(&zh->to_process))) {
        struct ReplyHeader hdr;
        struct iarchive *ia = create_buffer_iarchive(
                                    bptr->buffer, bptr->curr_offset);
        deserialize_ReplyHeader(ia, "hdr", &hdr);
        if (hdr.zxid > 0) {
            zh->last_zxid = hdr.zxid;
        } else {
            // fprintf(stderr, "Got %llx for %x\n", hdr.zxid, hdr.xid);
        }
        
        if (hdr.xid == WATCHER_EVENT_XID) {
            LOG_DEBUG(("Processing WATCHER_EVENT"));

            struct WatcherEvent evt;
            int type;
            char *path;
            deserialize_WatcherEvent(ia, "event", &evt);
            type = evt.type;
            path = evt.path;
            /* We are doing a notification, so there is no pending request */
            completion_list_t *c = 
                create_completion_entry(WATCHER_EVENT_XID,-1,0,0,0);
            c->buffer = bptr;
            c->c.watcher_result = collectWatchers(zh, type, path);

            // We cannot free until now, otherwise path will become invalid
            deallocate_WatcherEvent(&evt);
            queue_completion(&zh->completions_to_process, c, 0);
        } else if (hdr.xid == SET_WATCHES_XID) {
            LOG_DEBUG(("Processing SET_WATCHES"));
            free_buffer(bptr);
        } else if (hdr.xid == AUTH_XID){
            LOG_DEBUG(("Processing AUTH_XID"));

            /* special handling for the AUTH response as it may come back 
             * out-of-band */
            auth_completion_func(hdr.err,zh);
            free_buffer(bptr);
            /* authentication completion may change the connection state to 
             * unrecoverable */
            if(is_unrecoverable(zh)){
                handle_error(zh, ZAUTHFAILED);
                close_buffer_iarchive(&ia);
                return api_epilog(zh, ZAUTHFAILED);
            }
        } else { 
            int rc = hdr.err;
            /* Find the request corresponding to the response */
            completion_list_t *cptr = dequeue_completion(&zh->sent_requests);
            assert(cptr);
            /* The requests are going to come back in order */
            if (cptr->xid != hdr.xid) {
                LOG_DEBUG(("Processing unexpected or out-of-order response!"));

                // received unexpected (or out-of-order) response
                close_buffer_iarchive(&ia);
                free_buffer(bptr);
                // put the completion back on the queue (so it gets properly 
                // signaled and deallocated) and disconnect from the server
                queue_completion(&zh->sent_requests,cptr,1);
                return handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
                        "unexpected server response: expected %x, but received %x",
                        hdr.xid,cptr->xid);
            }

            activateWatcher(zh, cptr->watcher, rc);

            if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
                if(hdr.xid == PING_XID){
                    struct timeval now;
                    gettimeofday(&now, 0);
                    int elapsed = calculate_interval(&zh->last_ping, &now);
                    LOG_DEBUG(("Got ping response in %d ms", elapsed));

                    // Nothing to do with a ping response
                    free_buffer(bptr);
                    destroy_completion_entry(cptr);
                } else { 
                    LOG_DEBUG(("Queueing asynchronous response"));

                    cptr->buffer = bptr;
                    queue_completion(&zh->completions_to_process, cptr, 0);
                }
            } else {
                struct sync_completion
                        *sc = (struct sync_completion*)cptr->data;
                sc->rc = rc;
                switch(cptr->completion_type) {
                case COMPLETION_DATA:
                    LOG_DEBUG(("Calling COMPLETION_DATA for xid=%x rc=%d",
                               cptr->xid, rc));
                    if (rc==0) {
                        struct GetDataResponse res;
                        int len;
                        deserialize_GetDataResponse(ia, "reply", &res);
                        if (res.data.len <= sc->u.data.buff_len) {
                            len = res.data.len;
                        } else {
                            len = sc->u.data.buff_len;
                        }
                        sc->u.data.buff_len = len;
                        // check if len is negative 
                        // just of NULL which is -1 int 
                        if (len == -1) {
                            sc->u.data.buffer = NULL;
                        } else {
                            memcpy(sc->u.data.buffer, res.data.buff, len);
                        }
                        sc->u.data.stat = res.stat;
                        deallocate_GetDataResponse(&res);
                    }
                    break;
                case COMPLETION_STAT:
                    LOG_DEBUG(("Calling COMPLETION_STAT for xid=%x rc=%d",
                               cptr->xid, rc));
                    if (rc == 0) {
                        struct SetDataResponse res;
                        deserialize_SetDataResponse(ia, "reply", &res);
                        sc->u.stat = res.stat;
                        deallocate_SetDataResponse(&res);
                    }
                    break;
                case COMPLETION_STRINGLIST:
                    LOG_DEBUG(("Calling COMPLETION_STRINGLIST for xid=%x rc=%d",
                               cptr->xid, rc));
                    if (rc == 0) {
                        struct GetChildrenResponse res;
                        deserialize_GetChildrenResponse(ia, "reply", &res);
                        sc->u.strs = res.children;
                        /* We don't deallocate since we are passing it back */
                        // deallocate_GetChildrenResponse(&res);
                    }
                    break;
                case COMPLETION_STRING:
                    LOG_DEBUG(("Calling COMPLETION_STRING for xid=%x rc=%d",
                               cptr->xid, rc));
                    if (rc == 0) {
                        struct CreateResponse res;
                        int len;
                        deserialize_CreateResponse(ia, "reply", &res);
                        len = strlen(res.path) + 1;
                        if (len > sc->u.str.str_len) {
                            len = sc->u.str.str_len;
                        }
                        if (len > 0) {
                            memcpy(sc->u.str.str, res.path, len - 1);
                            sc->u.str.str[len - 1] = '\0';
                        }
                        deallocate_CreateResponse(&res);
                    }
                    break;
                case COMPLETION_ACLLIST:
                    LOG_DEBUG(("Calling COMPLETION_ACLLIST for xid=%x rc=%d",
                               cptr->xid, rc));
                    if (rc == 0) {
                        struct GetACLResponse res;
                        deserialize_GetACLResponse(ia, "reply", &res);
                        sc->u.acl.acl = res.acl;
                        sc->u.acl.stat = res.stat;
                        /* We don't deallocate since we are passing it back */
                        //deallocate_GetACLResponse(&res);
                    }
                    break;
                case COMPLETION_VOID:
                    LOG_DEBUG(("Calling COMPLETION_VOID for xid=%x rc=%d",
                               cptr->xid, rc));
                    break;
                default:
                    LOG_DEBUG(("UNKNOWN response type xid=%x rc=%d",
                               cptr->xid, rc));
                    break;
                }
                notify_sync_completion(sc);
                free_buffer(bptr);
                zh->outstanding_sync--;
                destroy_completion_entry(cptr);
            }
        }

        close_buffer_iarchive(&ia);

    }
    if (process_async(zh->outstanding_sync)) {
        process_completions(zh);
    }
    return api_epilog(zh,ZOK);}


Generated by  Doxygen 1.6.0   Back to index