Injector Job深入分析

Injector Job的主要功能是根据crawlId在hbase中创建一个表,将将文本中的seed注入表中。(一)命令执行1、运行命令[jediael@master local]$ bin/nutch inject seeds/ -crawlId sourcetestInjectorJob: starting at 2015-03-10 14:59:19InjectorJob: Injecting urlDir: seedsInjectorJob: Using class org.apache.gora.hbase.store.HBaseStore as the Gora storage class.InjectorJob: total number of urls rejected by filters: 0InjectorJob: total number of urls injected after normalization and filtering: 1Injector: finished at 2015-03-10 14:59:26, elapsed: 00:00:062、查看表中内容hbase(main):004:0> scan 'sourcetest_webpage'ROWCOLUMN+CELLcom.163.money:http/column=f:fi, timestamp=1425970761871, value=\x00'\x8D\x00com.163.money:http/column=f:ts, timestamp=1425970761871, value=\x00\x00\x01L\x02{\x08_com.163.money:http/column=mk:_injmrk_, timestamp=1425970761871, value=ycom.163.money:http/column=mk:dist, timestamp=1425970761871, value=0com.163.money:http/column=mtdt:_csh_, timestamp=1425970761871, value=?\x80\x00\x00com.163.money:http/column=s:s, timestamp=1425970761871, value=?\x80\x00\x001 row(s) in 0.0430 seconds3、读取数据库中的内容由于hbase表使用了字节码表示内容,因此需要通过以下命令来查看具体内容[jediael@master local]$ bin/nutch readdb -dump ./test -crawlId sourcetest -contentWebTable dump: startingWebTable dump: done[jediael@master local]$ cat test/part-r-00000 key: com.163.money:http/baseUrl:nullstatus: 0 (null)fetchTime:1425970759775prevFetchTime: 0fetchInterval: 2592000retriesSinceFetch:0modifiedTime: 0prevModifiedTime:0protocolStatus: (null)parseStatus: (null)title: nullscore: 1.0marker _injmrk_ :ymarker dist : 0reprUrl:nullmetadata _csh_ :?锟(二)源码流程分析类:org.apache.nutch.crawl.InjectorJob1、程序入口 public static void main(String[] args) throws Exception {int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(),args);System.exit(res); }2、ToolRunner.run(String[] args)此步骤主要是调用inject方法,其余均是一些参数合规性的检查 public int run(String[] args) throws Exception { …………inject(new Path(args[0])); ………… }3、inject()方法nutch均使用 Map<String, Object> run(Map<String, Object> args)来运行具体的job,即其使用Map类参数,并返回Map类参数。<pre name="code" class="java">public void inject(Path urlDir) throws Exception {run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir)); }4、job的具体配置,并创建hbase中的表格public Map<String, Object> run(Map<String, Object> args) throws Exception {numJobs = 1;currentJobNum = 0;currentJob = new NutchJob(getConf(), "inject " + input);FileInputFormat.addInputPath(currentJob, input);currentJob.setMapperClass(UrlMapper.class);currentJob.setMapOutputKeyClass(String.class);currentJob.setMapOutputValueClass(WebPage.class);currentJob.setOutputFormatClass(GoraOutputFormat.class);DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(), String.class, WebPage.class);GoraOutputFormat.setOutput(currentJob, store, true);currentJob.setReducerClass(Reducer.class);currentJob.setNumReduceTasks(0);currentJob.waitForCompletion(true);ToolUtil.recordJobStatus(null, currentJob, results);}5、mapper方法由于Injector Job中无reducer,因此只要关注mapper即可。mapper主要完成以下几项工作:(1)对文本中的内容进行分析,并提取其中的参数(2)根据filter过滤url(3)反转url作为key,创建Webpage对象作为value,,然后将之写入表中。protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String url = value.toString().trim(); // value is line of textif (url != null && (url.length() == 0 || url.startsWith("#"))) {/* Ignore line that start with # */return;}// if tabs : metadata that could be stored// must be name=value and separated by \tfloat customScore = -1f;int customInterval = interval;Map<String, String> metadata = new TreeMap<String, String>();if (url.indexOf("\t") != -1) {String[] splits = url.split("\t");url = splits[0];for (int s = 1; s < splits.length; s++) {// find separation between name and valueint indexEquals = splits[s].indexOf("=");if (indexEquals == -1) {// skip anything without a =continue;}String metaname = splits[s].substring(0, indexEquals);String metavalue = splits[s].substring(indexEquals + 1);if (metaname.equals(nutchScoreMDName)) {try {customScore = Float.parseFloat(metavalue);} catch (NumberFormatException nfe) {}} else if (metaname.equals(nutchFetchIntervalMDName)) {try {customInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}} elsemetadata.put(metaname, metavalue);}}try {url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);url = filters.filter(url); // filter the url} catch (Exception e) {LOG.warn("Skipping " + url + ":" + e);url = null;}if (url == null) {context.getCounter("injector", "urls_filtered").increment(1);return;} else { // if it passesString reversedUrl = TableUtil.reverseUrl(url); // collect itWebPage row = WebPage.newBuilder().build();row.setFetchTime(curTime);row.setFetchInterval(customInterval);// now add the metadataIterator<String> keysIter = metadata.keySet().iterator();while (keysIter.hasNext()) {String keymd = keysIter.next();String valuemd = metadata.get(keymd);row.getMetadata().put(new Utf8(keymd),ByteBuffer.wrap(valuemd.getBytes()));}if (customScore != -1)row.setScore(customScore);elserow.setScore(scoreInjected);try {scfilters.injectedScore(url, row);} catch (ScoringFilterException e) {if (LOG.isWarnEnabled()) {LOG.warn("Cannot filter injected score for url " + url+ ", using default (" + e.getMessage() + ")");}}context.getCounter("injector", "urls_injected").increment(1);row.getMarkers().put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));Mark.INJECT_MARK.putMark(row, YES_STRING);context.write(reversedUrl, row);}}(三)重点源码学习

离开之后,我想你不要忘记一件事:不要忘记想念我。

Injector Job深入分析

相关文章:

你感兴趣的文章:

标签云: