博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Heritrix 3.1.0 源码解析(五)
阅读量:5223 次
发布时间:2019-06-14

本文共 13538 字,大约阅读时间需要 45 分钟。

我们从上文的CrawlController对象可以看到,爬虫任务是通过ToePool类建立线程ToeThread的线程池的

我们在了解采集线程池的相关类之前,先有必要了解一下CrawlController类,因为我们的爬虫操作指令最终是通过调用CrawlController对象的方法的

CrawlController类的成员和方法都是直接与采集任务相关的,好比控制中心

// ApplicationContextAware implementation, for eventing    AbstractApplicationContext appCtx;    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.appCtx = (AbstractApplicationContext)applicationContext;    }        CrawlMetadata metadata;    public CrawlMetadata getMetadata() {        return metadata;    }    @Autowired    public void setMetadata(CrawlMetadata provider) {        this.metadata = provider;    }        protected ServerCache serverCache;    public ServerCache getServerCache() {        return this.serverCache;    }    @Autowired    public void setServerCache(ServerCache serverCache) {        this.serverCache = serverCache;    }    /**     * The frontier to use for the crawl.     */    protected Frontier frontier;    public Frontier getFrontier() {        return this.frontier;    }    @Autowired    public void setFrontier(Frontier frontier) {        this.frontier = frontier;    }    /**     * Scratch directory for temporary overflow-to-disk     */    protected ConfigPath scratchDir =         new ConfigPath("scratch subdirectory","scratch");    public ConfigPath getScratchDir() {        return scratchDir;    }    public void setScratchDir(ConfigPath scratchDir) {        this.scratchDir = scratchDir;    }    /**     * Statistics tracking modules.  Any number of specialized statistics      * trackers that monitor a crawl and write logs, reports and/or provide      * information to the user interface.     */    protected StatisticsTracker statisticsTracker;    public StatisticsTracker getStatisticsTracker() {        return this.statisticsTracker;    }    @Autowired    public void setStatisticsTracker(StatisticsTracker statisticsTracker) {        this.statisticsTracker = statisticsTracker;    }    protected SeedModule seeds;    public SeedModule getSeeds() {        return this.seeds;    }    @Autowired    public void setSeeds(SeedModule seeds) {        this.seeds = seeds;    }        /**     * Fetch chain     */    protected FetchChain fetchChain;    public FetchChain getFetchChain() {        return this.fetchChain;    }    @Autowired    public void setFetchChain(FetchChain fetchChain) {        this.fetchChain = fetchChain;    }        /**     * Disposition chain     */    protected DispositionChain dispositionChain;    public DispositionChain getDispositionChain() {        return this.dispositionChain;    }    @Autowired    public void setDispositionChain(DispositionChain dispositionChain) {        this.dispositionChain = dispositionChain;    }        /**     * Candidate chain     */    protected CandidateChain candidateChain;    public CandidateChain getCandidateChain() {        return this.candidateChain;    }    @Autowired    public void setCandidateChain(CandidateChain candidateChain) {        this.candidateChain = candidateChain;    }

上述成员变量都是很重要的,包括spring容器对象、CrawlMetadata元数据、ServerCache服务缓存、Frontier对象、SeedModule种子模块、statisticsTracker统计跟踪、以及后面的处理器链FetchChain  DispositionChain CandidateChain等,其他的没有贴出来的基本上与采集任务配置相关的参数,如线程数量等

它的初始化方法设置初始状态

public void start() {        // cache AlertThreadGroup for later ToePool launch        AlertThreadGroup atg = AlertThreadGroup.current();        if(atg!=null) {            alertThreadGroup = atg;        }                if(isRunning) {            return;         }               sExit = CrawlStatus.FINISHED_ABNORMAL;        // force creation of DNS Cache now -- avoids CacheCleaner in toe-threads group        // also cap size at 1 (we never wanta cached value; 0 is non-operative)        Lookup.getDefaultCache(DClass.IN).setMaxEntries(1);                reserveMemory = new LinkedList
(); for(int i = 0; i < RESERVE_BLOCKS; i++) { reserveMemory.add(new char[RESERVE_BLOCK_SIZE]); } isRunning = true; }

ToePool类继承自ThreadGroup线程组类,它的成员变量如下

public static int DEFAULT_TOE_PRIORITY = Thread.NORM_PRIORITY - 1;        protected CrawlController controller;    protected int nextSerialNumber = 1;    protected int targetSize = 0;

我们再回头看上文中提到的CrawlController对象里面初始化ToePool方法

protected void setupToePool() {        toePool = new ToePool(alertThreadGroup,this);        // TODO: make # of toes self-optimizing        toePool.setSize(getMaxToeThreads());        toePool.waitForAll();    }

传入线程组(这里作为父线程组)和CrawlController对象

ToeThread类的构造函数如下

/**     * Constructor. Creates a pool of ToeThreads.      *     * @param c A reference to the CrawlController for the current crawl.     */    public ToePool(AlertThreadGroup atg, CrawlController c) {        super(atg, "ToeThreads");                this.controller = c;        setDaemon(true);    }

设置父线程组对象和初始化CrawlController对象

void setSize(int newsize)方法设置线程池大小,并启动指定数量线程

/**     * Change the number of ToeThreads.     *     * @param newsize The new number of ToeThreads.     */    public void setSize(int newsize)    {        targetSize = newsize;        int difference = newsize - getToeCount();         if (difference > 0) {            // must create threads            for(int i = 1; i <= difference; i++) {                startNewThread();            }        } else {            // must retire extra threads            int retainedToes = targetSize;             Thread[] toes = this.getToes();            for (int i = 0; i < toes.length ; i++) {                if(!(toes[i] instanceof ToeThread)) {                    continue;                }                retainedToes--;                if (retainedToes>=0) {                    continue; // this toe is spared                }                // otherwise:                ToeThread tt = (ToeThread)toes[i];                tt.retire();            }        }    }

关键在这个方法startNewThread();

private synchronized void startNewThread() {        ToeThread newThread = new ToeThread(this, nextSerialNumber++);        newThread.setPriority(DEFAULT_TOE_PRIORITY);        newThread.start();    }

这里新建线程并将当前线程组ToePool对象作为参数传入并调用其启动方法

ToeThread类继承自Thread类,其成员变量如下: 

public enum Step {        NASCENT, ABOUT_TO_GET_URI, FINISHED,         ABOUT_TO_BEGIN_PROCESSOR, HANDLING_RUNTIME_EXCEPTION,         ABOUT_TO_RETURN_URI, FINISHING_PROCESS    }    private static Logger logger =        Logger.getLogger("org.archive.crawler.framework.ToeThread");    private CrawlController controller;    private int serialNumber;        /**     * Each ToeThead has an instance of HttpRecord that gets used     * over and over by each request.     *      * @see org.archive.util.RecorderMarker     */    private Recorder httpRecorder = null;    // activity monitoring, debugging, and problem detection    private Step step = Step.NASCENT;    private long atStepSince;    private String currentProcessorName = "";        private String coreName;    private CrawlURI currentCuri;    private long lastStartTime;    private long lastFinishTime;        // default priority; may not be meaningful in recent JVMs    private static final int DEFAULT_PRIORITY = Thread.NORM_PRIORITY-2;        // indicator that a thread is now surplus based on current desired    // count; it should wrap up cleanly    private volatile boolean shouldRetire = false;

我们再查看ToeThread类的构造函数

/**     * Create a ToeThread     *      * @param g ToeThreadGroup     * @param sn serial number     */    public ToeThread(ToePool g, int sn) {        // TODO: add crawl name?        super(g,"ToeThread #" + sn);        coreName="ToeThread #" + sn + ": ";        controller = g.getController();        serialNumber = sn;        setPriority(DEFAULT_PRIORITY);        int outBufferSize = controller.getRecorderOutBufferBytes();        int inBufferSize = controller.getRecorderInBufferBytes();        httpRecorder = new Recorder(controller.getScratchDir().getFile(),            "tt" + sn + "http", outBufferSize, inBufferSize);        lastFinishTime = System.currentTimeMillis();    }

设置线程组对象、线程序号、初始化CrawlController controller对象等

当启动线程时,ToeThread线程对象的void run()方法如下

/** (non-Javadoc)     * @see java.lang.Thread#run()     */    public void run() {        String name = controller.getMetadata().getJobName();        logger.fine(getName()+" started for order '"+name+"'");        Recorder.setHttpRecorder(httpRecorder);                 try {            while ( true ) {                ArchiveUtils.continueCheck();                                setStep(Step.ABOUT_TO_GET_URI, null);                CrawlURI curi = controller.getFrontier().next();                                                synchronized(this) {                    ArchiveUtils.continueCheck();                    setCurrentCuri(curi);                    currentCuri.setThreadNumber(this.serialNumber);                    lastStartTime = System.currentTimeMillis();                    currentCuri.setRecorder(httpRecorder);                }                                try {                    KeyedProperties.loadOverridesFrom(curi);                                        //System.out.println("FetchChain:"+controller.getFetchChain().getClass().getName());                                                            controller.getFetchChain().process(curi,this);                    //System.out.println("Frontier:"+controller.getFrontier().getClass().getName());                    controller.getFrontier().beginDisposition(curi);                                        //System.out.println("DispositionChain:"+controller.getDispositionChain().getClass().getName());                    controller.getDispositionChain().process(curi,this);                  } catch (RuntimeExceptionWrapper e) {                    // Workaround to get cause from BDB                    if(e.getCause() == null) {                        e.initCause(e.getCause());                    }                    recoverableProblem(e);                } catch (AssertionError ae) {                    // This risks leaving crawl in fatally inconsistent state,                     // but is often reasonable for per-Processor assertion problems                     recoverableProblem(ae);                } catch (RuntimeException e) {                    recoverableProblem(e);                } catch (InterruptedException e) {                    if(currentCuri!=null) {                        recoverableProblem(e);                        Thread.interrupted(); // clear interrupt status                    } else {                        throw e;                    }                } catch (StackOverflowError err) {                    recoverableProblem(err);                } catch (Error err) {                    // OutOfMemory and any others                    seriousError(err);                 } finally {                    httpRecorder.endReplays();                    KeyedProperties.clearOverridesFrom(curi);                 }                                setStep(Step.ABOUT_TO_RETURN_URI, null);                ArchiveUtils.continueCheck();                synchronized(this) {                    controller.getFrontier().finished(currentCuri);                    controller.getFrontier().endDisposition();                    setCurrentCuri(null);                }                curi = null;                                setStep(Step.FINISHING_PROCESS, null);                lastFinishTime = System.currentTimeMillis();                if(shouldRetire) {                    break; // from while(true)                }            }        } catch (InterruptedException e) {            if(currentCuri!=null){                logger.log(Level.SEVERE,"Interrupt leaving unfinished CrawlURI "+getName()+" - job may hang",e);            }            // thread interrupted, ok to end            logger.log(Level.FINE,this.getName()+ " ended with Interruption");        } catch (Exception e) {            // everything else (including interruption)            logger.log(Level.SEVERE,"Fatal exception in "+getName(),e);        } catch (OutOfMemoryError err) {            seriousError(err);        } finally {            controller.getFrontier().endDisposition();        }        setCurrentCuri(null);        // Do cleanup so that objects can be GC.        this.httpRecorder.closeRecorders();        this.httpRecorder = null;        logger.fine(getName()+" finished for order '"+name+"'");        setStep(Step.FINISHED, null);        controller = null;    }

ToePool类的void waitForAll()方法如下 

public void waitForAll() {        while (true) try {            if (isAllAlive(getToes())) {                return;            }            Thread.sleep(1000);        } catch (InterruptedException e) {            throw new IllegalStateException(e);        }    }

是否存在活动线程 

private static boolean isAllAlive(Thread[] threads) {        for (Thread t: threads) {            if ((t != null) && (!t.isAlive())) {                return false;            }        }        return true;    }

当CrawlController对象启动线程时,调用BdbFrontier对象的void unpause()方法,在BdbFrontier类的父类的父类AbstractFrontier类里面

org.archive.crawler.frontier.BdbFrontier

         org.archive.crawler.frontier.AbstractFrontier

public void unpause() {        requestState(State.RUN);    }

设置BdbFrontier对象的状态volatile State targetState = State.PAUSE; 

下文我们再来分析BdbFrontier对象的相关状态和方法

--------------------------------------------------------------------------- 

本系列Heritrix 3.1.0 源码解析系本人原创 

转载请注明出处 博客园 刺猬的温驯 

本文链接 http://www.cnblogs.com/chenying99/archive/2013/04/18/3027672.html

转载于:https://www.cnblogs.com/chenying99/archive/2013/04/19/3027672.html

你可能感兴趣的文章