Logo Search packages:      
Sourcecode: zookeeper version File versions  Download package

BookKeeper.java

package org. apache.bookkeeper.client;
/*
 * 
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * 
 */


import java.io.IOException;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Random;
import java.net.InetSocketAddress;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieHandle;
import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.LedgerHandle.QMode;
import org.apache.bookkeeper.client.LedgerManagementProcessor.CreateLedgerOp;
import org.apache.bookkeeper.client.LedgerManagementProcessor.OpenLedgerOp;
import org.apache.log4j.Logger;

import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.WatchedEvent;


/**
 * BookKeeper client. We assume there is one single writer 
 * to a ledger at any time. 
 * 
 * There are three possible operations: start a new ledger, 
 * write to a ledger, and read from a ledger.
 * 
 * For the ZooKeeper layout, please refer to BKDefs.java.
 * 
 */

public class BookKeeper 
implements Watcher {
 
    Logger LOG = Logger.getLogger(BookKeeper.class);
    
    ZooKeeper zk = null;
    
    /*
     * The ledgerMngProcessor is a thread that processes
     * asynchronously requests that handle ledgers, such
     * as create, open, and close.
     */
    private static LedgerManagementProcessor ledgerMngProcessor;
    
    /*
     * Blacklist of bookies
     */
    HashSet<InetSocketAddress> bookieBlackList;
    
    LedgerSequence responseRead;
    Long responseLong;
    
    public BookKeeper(String servers) 
    throws KeeperException, IOException{
      LOG.debug("Creating BookKeeper for servers " + servers);
        //Create ZooKeeper object
        this.zk = new ZooKeeper(servers, 10000, this);
        
        //List to enable clients to blacklist bookies
        this.bookieBlackList = new HashSet<InetSocketAddress>();
    }
    
    /**
     * Watcher method. 
     */
00101     synchronized public void process(WatchedEvent event) {
        LOG.debug("Process: " + event.getType() + " " + event.getPath());
    }
    
    /**
     * Formats ledger ID according to ZooKeeper rules
     * 
     * @param id  znode id
     */
00110     String getZKStringId(long id){
        return String.format("%010d", id);        
    }
    
    /**
     * return the zookeeper instance
     * @return return the zookeeper instance
     */
00118     ZooKeeper getZooKeeper() {
        return zk;
    }
    
    LedgerManagementProcessor getMngProcessor(){
        if (ledgerMngProcessor == null){
            ledgerMngProcessor = new LedgerManagementProcessor(this);
            ledgerMngProcessor.start();
        }
        return ledgerMngProcessor;
    }
    
    /**
     * Creates a new ledger. To create a ledger, we need to specify the ensemble
     * size, the quorum size, the operation mode, and a password. The ensemble size
     * and the quorum size depend upon the operation mode. The operation mode can be
     * GENERIC, VERIFIABLE, or FREEFORM (debugging). The password is used not only
     * to authenticate access to a ledger, but also to verify entries in verifiable
     * ledgers.
     * 
     * @param ensSize   ensemble size
     * @param qSize     quorum size
     * @param mode      quorum mode: VERIFIABLE (default), GENERIC, or FREEFORM
     * @param passwd    password
     */
00143     public LedgerHandle createLedger(int ensSize, int qSize, QMode mode,  byte passwd[])
        throws KeeperException, InterruptedException, 
        IOException, BKException {
        // Check that quorum size follows the minimum
        long t;
        LedgerHandle lh = null;
        
        switch(mode){
        case VERIFIABLE:
            t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
            if(t == 0){
                LOG.error("Tolerates 0 bookie failures"); 
                throw BKException.create(Code.QuorumException);
            }
            break;
        case GENERIC:
            t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/3));
            if(t == 0){
                LOG.error("Tolerates 0 bookie failures"); 
                throw BKException.create(Code.QuorumException);
            }
            break;
        case FREEFORM:
            break;
        }
        /*
         * Create ledger node on ZK.
         * We get the id from the sequence number on the node.
         */
        String path = zk.create(BKDefs.prefix, new byte[0], 
                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        /* 
         * Extract ledger id.
         */
        String parts[] = path.split("/");
        String subparts[] = parts[2].split("L");
        try{
            long lId = Long.parseLong(subparts[1]);
       
            /* 
             * Get children from "/ledgers/available" on zk
             */
            List<String> list = 
                zk.getChildren("/ledgers/available", false);
            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
            /* 
             * Select ensSize servers to form the ensemble
             */
            path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0], 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         
            /* 
             * Add quorum size to ZK metadata
             */
            ByteBuffer bb = ByteBuffer.allocate(4);
            bb.putInt(qSize);
            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            /* 
             * Quorum mode
             */
            bb = ByteBuffer.allocate(4);
            bb.putInt(mode.ordinal());
            zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            /* 
             * Create QuorumEngine
             */
            lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
            
            /*
             * Adding bookies to ledger handle
             */
            Random r = new Random();
        
            for(int i = 0; i < ensSize; i++){
                int index = 0;
                if(list.size() > 1) 
                    index = r.nextInt(list.size() - 1);
                else if(list.size() == 1)
                    index = 0;
                else {
                    LOG.error("Not enough bookies available");
                
                    return null;
                }
            
                try{
                    String bookie = list.remove(index);
                    LOG.info("Bookie: " + bookie);
                    InetSocketAddress tAddr = parseAddr(bookie);
                    int bindex = lh.addBookieForWriting(tAddr); 
                    ByteBuffer bindexBuf = ByteBuffer.allocate(4);
                    bindexBuf.putInt(bindex);
                
                    String pBookie = "/" + bookie;
                    zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(), 
                            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (IOException e) {
                    LOG.error(e);
                    i--;
                } 
            }
            LOG.debug("Created new ledger");
        } catch (NumberFormatException e) {
            LOG.error("Error when parsing the ledger identifier", e);
        }
        // Return ledger handler
        return lh; 
    }

    /**
     * Creates a new ledger. Default of 3 servers, and quorum of 2 servers,
     * verifiable ledger.
     * 
     * @param passwd    password
     */
00260     public LedgerHandle createLedger(byte passwd[])
    throws KeeperException, BKException, 
    InterruptedException, IOException {
        return createLedger(3, 2, QMode.VERIFIABLE, passwd);
    }

    /**
     * Asychronous call to create ledger
     * 
     * @param ensSize
     * @param qSize
     * @param mode
     * @param passwd
     * @param cb
     * @param ctx
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     * @throws BKException
     */
00280     public void asyncCreateLedger(int ensSize, 
            int qSize, 
            QMode mode,  
            byte passwd[],
            CreateCallback cb,
            Object ctx
            )
    throws KeeperException, InterruptedException, 
    IOException, BKException {
        CreateLedgerOp op = new CreateLedgerOp(ensSize, 
                qSize, 
                mode, 
                passwd, 
                cb, 
                ctx);
        LedgerManagementProcessor lmp = getMngProcessor();
        lmp.addOp(op);
        
    }
    
    /**
     * Open existing ledger for reading. Default for quorum size is 2.
     * 
     * @param long  the long corresponding to the ledger id
     * @param byte[]    byte array corresponding to the password to access a ledger
     * @param int   the quorum size, it has to be at least ceil(n+1/2)
     */
00307     public LedgerHandle openLedger(long lId, byte passwd[])
    throws KeeperException, InterruptedException, IOException, BKException {
        
        Stat stat = null;
        
        /*
         * Check if ledger exists
         */
        if(zk.exists(BKDefs.prefix + getZKStringId(lId), false) == null){
            LOG.error("Ledger " + getZKStringId(lId) + " doesn't exist.");
            throw BKException.create(Code.NoSuchLedgerExistsException);
        }
        
        /*
         * Get quorum size.
         */
        ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
        int qSize = bb.getInt();
         
        /*
         * Get last entry written from ZK 
         */
        
        long last = 0;
        LOG.debug("Close path: " + BKDefs.prefix + getZKStringId(lId) + BKDefs.close);
        if(zk.exists(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false) == null){
            recoverLedger(lId, passwd);
        }
            
        stat = null;
        byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, false, stat);
        ByteBuffer buf = ByteBuffer.wrap(data);
        last = buf.getLong();
        //zk.delete(BKDefs.prefix + getZKStringId(lId) + BKDefs.close, -1);
        
        /*
         * Quorum mode 
         */
        data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
        buf = ByteBuffer.wrap(data);
        
        QMode qMode;
        switch(buf.getInt()){
        case 1:
            qMode = QMode.GENERIC;
            LOG.info("Generic ledger");
            break;
        case 2:
            qMode = QMode.FREEFORM;
            break;
        default:
            qMode = QMode.VERIFIABLE;
            LOG.info("Verifiable ledger");
        }
        
        /*
         *  Create QuorumEngine
         */
        LedgerHandle lh = new LedgerHandle(this, lId, last, qSize, qMode, passwd);
        
        /*
         * Get children of "/ledgers/id/ensemble" 
         */
        
        List<String> list = 
            zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
        
        LOG.debug("Length of list of bookies: " + list.size());
        for(int i = 0 ; i < list.size() ; i++){
            for(String s : list){
                LOG.debug("Extracting bookie: " + s);
                byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
                ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
                if(bindexBuf.getInt() == i){                      
                    try{
                        lh.addBookieForReading(parseAddr(s));
                    } catch (IOException e){
                        LOG.error(e);
                    }
                }
            }
        }
        
        /*
         * Read changes to quorum over time. To determine if there has been changes during
         * writes to the ledger, check if there is a znode called quorumEvolution.
         */
        if(zk.exists(BKDefs.prefix + 
                getZKStringId(lh.getId()) +  
                BKDefs.quorumEvolution, false) != null){
                    String path = BKDefs.prefix + 
                    getZKStringId(lh.getId()) +  
                    BKDefs.quorumEvolution;
                    
                    List<String> faultList = zk.getChildren(path, false);
                    try{
                        for(String s : faultList){
                            LOG.debug("Faulty list child: " + s);
                            long entry = Long.parseLong(s);
                            String addresses = new String(zk.getData(path + "/" + s, false, stat));
                            String parts[] = addresses.split(" ");

                            ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
                            for(int i = 0 ; i < parts.length ; i++){
                                LOG.debug("Address: " + parts[i]);
                                InetSocketAddress faultyBookie =  
                                    parseAddr(parts[i].substring(1));                           
                        
                                newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
                            }
                            lh.setNewBookieConfig(entry, newBookieSet);
                            LOG.debug("NewBookieSet size: " + newBookieSet.size());
                        }

                        lh.prepareEntryChange();
                    } catch (NumberFormatException e) {
                        LOG.error("Error when parsing the ledger identifier", e);
                    }
                }
      
        /*
         *  Return ledger handler
         */
        return lh;
    }    
    
    public void asyncOpenLedger(long lId, byte passwd[], OpenCallback cb, Object ctx)
    throws InterruptedException{
        OpenLedgerOp op = new OpenLedgerOp(lId, 
                passwd,  
                cb, 
                ctx);
        LedgerManagementProcessor lmp = getMngProcessor();
        lmp.addOp(op);
    }
    
    /**
     * Parses address into IP and port.
     * 
     *  @param addr     String
     */
    
00449     InetSocketAddress parseAddr(String s){
        String parts[] = s.split(":");
        if (parts.length != 2) {
            System.out.println(s
                    + " does not have the form host:port");
        }
        InetSocketAddress addr = new InetSocketAddress(parts[0],
                Integer.parseInt(parts[1]));
        return addr;
    }
    
 
    /**
     * Check if close node exists. 
     * 
     * @param ledgerId  id of the ledger to check
     */
00466     public boolean hasClosed(long ledgerId)
    throws KeeperException, InterruptedException{
        String closePath = BKDefs.prefix + getZKStringId(ledgerId) + BKDefs.close;
        if(zk.exists(closePath, false) == null) return false;
        else return true;
    }
    
    /**
     * Recover a ledger that was not closed properly.
     * 
     * @param lId ledger identifier
     * @param passwd    password
     */
    
00480     boolean recoverLedger(long lId, byte passwd[])
    throws KeeperException, InterruptedException, IOException, BKException {
        
        Stat stat = null;
       
        LOG.info("Recovering ledger");
        
        /*
         * Get quorum size.
         */
        ByteBuffer bb = ByteBuffer.wrap(zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, false, stat));
        int qSize = bb.getInt();
                
        
        /*
         * Get children of "/ledgers/id/ensemble" 
         */
        
        List<String> list = 
            zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
        
        ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
        for(String s : list){
            addresses.add(parseAddr(s));
        }
        
        /*
         * Quorum mode 
         */
        byte[] data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
        ByteBuffer buf = ByteBuffer.wrap(data);
        //int ordinal = buf.getInt();
            
        QMode qMode = QMode.VERIFIABLE;
        switch(buf.getInt()){
        case 0:
            qMode = QMode.VERIFIABLE;
            break;
        case 1:
            qMode = QMode.GENERIC;
            break;
        case 2:
            qMode = QMode.FREEFORM;
            break;
        }
        
        /*
         * Create ledger recovery monitor object
         */
        
        LedgerRecoveryMonitor lrm = new LedgerRecoveryMonitor(this, lId, qSize, addresses, qMode);
        
        return lrm.recover(passwd);
    }
    
    /**
     * Get new bookies
     * 
     * @param addrList  list of bookies to replace
     */
00540     InetSocketAddress getNewBookie(ArrayList<InetSocketAddress> addrList)
    throws InterruptedException {
        try{
            // Get children from "/ledgers/available" on zk
            List<String> list = 
                zk.getChildren("/ledgers/available", false);
            ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
    
            for(String addr : list){
                InetSocketAddress nAddr = parseAddr(addr); 
                if(!addrList.contains(nAddr) &&
                        !bookieBlackList.contains(nAddr))
                    return nAddr;
            }
        } catch (KeeperException e){
            LOG.error("Problem accessing ZooKeeper: " + e);
        }
        
        return null;
    }
    
    HashMap<InetSocketAddress, BookieHandle> bhMap = 
      new HashMap<InetSocketAddress, BookieHandle>();
    
    /**
     *  Keeps a list of available BookieHandle objects and returns
     *  the corresponding object given an address.
     *  
     *  @param    a     InetSocketAddress
     */
    
00571     synchronized BookieHandle getBookieHandle(LedgerHandle lh, InetSocketAddress a)
    throws ConnectException, IOException {
      if(!bhMap.containsKey(a)){
          BookieHandle bh = new BookieHandle(a, true); 
            bhMap.put(a, bh);
            bh.start();
      }
      bhMap.get(a).incRefCount(lh);
      
      return bhMap.get(a);
    }
    
    /**
     * When there are no more references to a BookieHandle,
     * remove it from the list. 
     */
    
00588     synchronized void haltBookieHandles(LedgerHandle lh, ArrayList<BookieHandle> bookies){
        while(bookies.size() > 0){
            BookieHandle bh = bookies.remove(0);
            if(bh.halt(lh) <= 0)
                bhMap.remove(bh.addr);
        }
    }
    
    /**
     * Blacklists bookies.
     * 
     * @param addr      address of bookie
     */
00601     void blackListBookie(InetSocketAddress addr){
        bookieBlackList.add(addr);
    }
    
    /**
     * Halts all bookie handles
     * 
     */
00609     public void halt() throws InterruptedException{
        
        for(BookieHandle bh: bhMap.values()){
            bh.shutdown();
        }
        zk.close();
    }
}

Generated by  Doxygen 1.6.0   Back to index