forked from HariSekhon/DevOps-Python-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpig-text-to-solr.pig
86 lines (67 loc) · 3.82 KB
/
pig-text-to-solr.pig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
--
-- Author: Hari Sekhon
-- Date: 2015-03-12 22:49:01 +0000 (Thu, 12 Mar 2015)
--
-- vim:ts=4:sts=4:sw=4:et
--
-- Pig script to index [bz2 compressed] text files or logs for fast source file lookups in SolrCloud
--
-- This was a simple use case where I didn't need to parse the logs as it's more oriented around finding source data files based on full-text search.
-- Tested on Pig 0.14 on Tez via Hortonworks HDP 2.2
-- https://docs.lucidworks.com/display/lweug/Using+Pig+with+LucidWorks+Search
-- USAGE:
--
-- must download LucidWorks connector for Hadoop from here (the Hortonworks link is much bigger as it contains a full HDP Search including Solr, Banana and Tika pipeline as well as this connector):
--
-- https://lucidworks.com/product/integrations/hadoop/
--
-- hadoop fs -put hadoop-lws-job.jar
--
-- pig -p path=/data/logs -p collection=LOGS -p zkhost=<zookeeper_list>/solr pig-text-to-solr.pig
REGISTER 'hadoop-lws-job.jar';
REGISTER 'pig-udfs.jy' USING jython AS hari;
-- can set defaults for collection, zkhost etc to not have to enter them on command line every time
--%default path '/data';
--%default collection 'collection1';
--%default zkhost 'localhost:2181';
-- use zkhost for SolrCloud, it's more efficient to skip first hop using client side logic and also it's more highly available
set solr.zkhost $zkhost;
-- use solrUrl only for standard old standalone Solr
--set solr.solrUrl $solrUrl;
set solr.collection $collection;
--%declare solr.collection $collection;
-- for file-by-file as one doc each but doesn't scale
--set lww.buffer.docs.size 1;
set lww.buffer.docs.size 1000;
set lww.commit.on.close true;
-- don't retry I don't want duplicates since using autogenerated IDs, instead fail job => tune => retry
set mapreduce.map.maxattempts 1;
set mapreduce.reduce.maxattempts 1;
-- old settings
set mapred.map.max.attempts 1;
set mapred.reduce.max.attempts 1;
-- none of the above work on Tez but this does
set tez.am.task.max.failed.attempts 0;
-- avoid dups
set mapreduce.map.speculative false;
set mapreduce.reduce.speculative false;
-- old variables
set mapred.map.tasks.speculative.execution false;
set mapred.reduce.tasks.speculative.execution false;
lines = LOAD '$path' USING PigStorage('\n', '-tagPath') AS (path:chararray, line:chararray);
-- this causes out of heap errors in Solr because some files may be too large to handle this way - it doesn't scale
--lines2 = FOREACH (GROUP lines BY path) GENERATE $0 AS path, BagToString($0, ' ') AS line:chararray;
--lines_final = FOREACH lines2 GENERATE UniqueId() AS id, 'path_s', path, 'line_s', line;
-- preserve whitespace but check and remove lines that are only whitespace
lines2 = FILTER lines BY line IS NOT NULL AND TRIM(line) != '';
-- strip redundant prefixes like hdfs://nameservice1 or file: to avoid storing the same bytes over and over without value
--lines3 = FOREACH lines2 GENERATE REPLACE(path, '^file:', '') AS path, line;
lines3 = FOREACH lines2 GENERATE REPLACE(path, '^hdfs://\\w+(?::\\d+)?', '') AS path, line;
-- order by path asc -- to force a sort + shuffle -- to find out if the avg requests per sec are being held back by the mapper phase decompressing bz2 files or something else by forcing a reduce phase
-- since the lines in the file may not be unique was considering using a uuid
-- can use UniqueId() from Pig 0.14
-- hari.md5_uuid(line) from Jython UDFs gives a uuid based of concatenated millisecond timestamp, host, pid and md5 of line
-- this allows to find duplicate lines via a 'id:$path*$md5' type search if wanted
-- using type suffixed Solr fields in case someone hasn't configured their schema properly they should be able to fall back on dynamicFields
lines_final = FOREACH lines3 GENERATE CONCAT(path, '|', hari.md5_uuid(line)) AS id, 'path_s', path, 'line_s', line;
STORE lines_final INTO 'IGNORED' USING com.lucidworks.hadoop.pig.SolrStoreFunc();