1 /**
2 Command line tool that joins tab-separated value files based on a common key.
3 
4 This tool joins lines from tab-delimited files based on a common key. One file, the 'filter'
5 file, contains the records (lines) being matched. The other input files are searched for
6 matching records. Matching records are written to standard output, along with any designated
7 fields from the 'filter' file. In database parlance this is a 'hash semi-join'.
8 
9 Copyright (c) 2015-2019, eBay Software Foundation
10 Initially written by Jon Degenhardt
11 
12 License: Boost Licence 1.0 (http://boost.org/LICENSE_1_0.txt)
13 */
14 module tsv_utils.tsv_join;
15 
16 import std.stdio;
17 import std.format : format;
18 import std.typecons : tuple;
19 
20 auto helpText = q"EOS
21 Synopsis: tsv-join --filter-file file [options] file [file...]
22 
23 tsv-join matches input lines against lines from a 'filter' file. The match is
24 based on fields or the entire line. Use '--help-verbose' for more details.
25 
26 Options:
27 EOS";
28 
29 auto helpTextVerbose = q"EOS
30 Synopsis: tsv-join --filter-file file [options] file [file...]
31 
32 tsv-join matches input lines against lines from a 'filter' file. The match is
33 based on exact match comparison of one or more 'key' fields. Fields are TAB
34 delimited by default. Matching lines are written to standard output, along with
35 any additional fields from the key file that have been specified. An example:
36 
37   tsv-join --filter-file filter.tsv --key-fields 1 --append-fields 5,6 data.tsv
38 
39 This reads filter.tsv, creating a hash table keyed on field 1. Lines from data.tsv
40 are read one at a time. If field 1 is found in the hash table, the line is written
41 to standard output with fields 5 and 6 from the filter file appended. In database
42 parlance this is a "hash semi join". Note the asymmetric relationship: Records in
43 the filter file should be unique, but data.tsv lines can repeat.
44 
45 tsv-join can also work as a simple filter, this is the default behavior. Example:
46 
47   tsv-join --filter-file filter.tsv data.tsv
48 
49 This outputs all lines from data.tsv found in filter.tsv. --key-fields can still
50 be used to define the match key. The --exclude option can be used to exclude
51 matched lines rather than keep them.
52 
53 Multiple fields can be specified as keys and append fields. Field numbers start
54 at one, zero represents the whole line. Fields are comma separated and ranges
55 can be used. Example:
56 
57   tsv-join -f filter.tsv -k 1,2 --append-fields 3-7 data.tsv
58 
59 Options:
60 EOS";
61 
62 /** Container for command line options.
63  */
64 struct TsvJoinOptions
65 {
66     string programName;
67     string filterFile;               // --filter
68     size_t[] keyFields;              // --key-fields
69     size_t[] dataFields;             // --data-fields
70     size_t[] appendFields;           // --append-fields
71     bool hasHeader = false;          // --H|header
72     string appendHeaderPrefix = "";  // --append-header-prefix
73     bool writeAll = false;           // --write-all
74     string writeAllValue;            // --write-all
75     bool exclude = false;            // --exclude
76     char delim = '\t';               // --delimiter
77     bool helpVerbose = false;        // --help-verbose
78     bool versionWanted = false;      // --V|version
79     bool allowDupliateKeys = false;  // --allow-duplicate-keys
80     bool keyIsFullLine = false;      // Derived: --key-fields 0
81     bool dataIsFullLine = false;     // Derived: --data-fields 0
82     bool appendFullLine = false;     // Derived: --append-fields 0
83 
84     /* Returns a tuple. First value is true if command line arguments were successfully
85      * processed and execution should continue, or false if an error occurred or the user
86      * asked for help. If false, the second value is the appropriate exit code (0 or 1).
87      *
88      * Returning true (execution continues) means args have been validated and derived
89      * values calculated. In addition, field indices have been converted to zero-based.
90      * If the whole line is the key, the individual fields lists will be cleared.
91      */
92     auto processArgs (ref string[] cmdArgs)
93     {
94         import std.algorithm : any, each;
95         import std.getopt;
96         import std.path : baseName, stripExtension;
97         import std.typecons : Yes, No;
98         import tsv_utils.common.utils :  makeFieldListOptionHandler;
99 
100         programName = (cmdArgs.length > 0) ? cmdArgs[0].stripExtension.baseName : "Unknown_program_name";
101 
102         /* Handler for --write-all. Special handler so two values can be set. */
103         void writeAllHandler(string option, string value)
104         {
105             debug stderr.writeln("[writeAllHandler] |", option, "|  |", value, "|");
106             writeAll = true;
107             writeAllValue = value;
108         }
109 
110         try
111         {
112             arraySep = ",";    // Use comma to separate values in command line options
113             auto r = getopt(
114                 cmdArgs,
115                 "help-verbose",    "              Print full help.", &helpVerbose,
116                 "f|filter-file",   "FILE          (Required) File with records to use as a filter.", &filterFile,
117 
118                 "k|key-fields",    "<field-list>  Fields to use as join key. Default: 0 (entire line).",
119                 keyFields.makeFieldListOptionHandler!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero),
120 
121                 "d|data-fields",   "<field-list>  Data record fields to use as join key, if different than --key-fields.",
122                 dataFields.makeFieldListOptionHandler!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero),
123 
124                 "a|append-fields", "<field-list>  Filter fields to append to matched records.",
125                 appendFields.makeFieldListOptionHandler!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero),
126 
127                 std.getopt.config.caseSensitive,
128                 "H|header",        "              Treat the first line of each file as a header.", &hasHeader,
129                 std.getopt.config.caseInsensitive,
130                 "p|prefix",        "STR           String to use as a prefix for --append-fields when writing a header line.", &appendHeaderPrefix,
131                 "w|write-all",     "STR           Output all data records. STR is the --append-fields value when writing unmatched records.", &writeAllHandler,
132                 "e|exclude",       "              Exclude matching records.", &exclude,
133                 "delimiter",       "CHR           Field delimiter. Default: TAB. (Single byte UTF-8 characters only.)", &delim,
134                 "z|allow-duplicate-keys",
135                                    "              Allow duplicate keys with different append values (last entry wins).", &allowDupliateKeys,
136                 std.getopt.config.caseSensitive,
137                 "V|version",       "              Print version information and exit.", &versionWanted,
138                 std.getopt.config.caseInsensitive,
139                 );
140 
141             if (r.helpWanted)
142             {
143                 defaultGetoptPrinter(helpText, r.options);
144                 return tuple(false, 0);
145             }
146             else if (helpVerbose)
147             {
148                 defaultGetoptPrinter(helpTextVerbose, r.options);
149                 return tuple(false, 0);
150             }
151             else if (versionWanted)
152             {
153                 import tsv_utils.common.tsvutils_version;
154                 writeln(tsvutilsVersionNotice("tsv-join"));
155                 return tuple(false, 0);
156             }
157 
158             consistencyValidations(cmdArgs);
159             derivations();
160         }
161         catch (Exception exc)
162         {
163             stderr.writefln("[%s] Error processing command line arguments: %s", programName, exc.msg);
164             return tuple(false, 1);
165         }
166         return tuple(true, 0);
167     }
168 
169     /* This routine does validations not handled by getopt, usually because they
170      * involve interactions between multiple parameters.
171      */
172     private void consistencyValidations(ref string[] processedCmdArgs)
173     {
174         import std.algorithm : any;
175 
176         if (filterFile.length == 0)
177         {
178             throw new Exception("Required option --filter-file was not supplied.");
179         }
180         else if (filterFile == "-" && processedCmdArgs.length == 1)
181         {
182             throw new Exception("A data file is required when standard input is used for the filter file (--f|filter-file -).");
183         }
184 
185         if (writeAll && appendFields.length == 0)
186         {
187             throw new Exception("Use --a|append-fields when using --w|write-all.");
188         }
189 
190         if (writeAll && appendFields.length == 1 && appendFields[0] == 0)
191         {
192             throw new Exception("Cannot use '--a|append-fields 0' (whole line) when using --w|write-all.");
193         }
194 
195         if (appendFields.length > 0 && exclude)
196         {
197             throw new Exception("--e|exclude cannot be used with --a|append-fields.");
198         }
199 
200         if (appendHeaderPrefix.length > 0 && !hasHeader)
201         {
202             throw new Exception("Use --header when using --p|prefix.");
203         }
204 
205         if (dataFields.length > 0 && keyFields.length != dataFields.length)
206         {
207             throw new Exception("Different number of --k|key-fields and --d|data-fields.");
208         }
209 
210         if (keyFields.length == 1 && dataFields.length == 1 &&
211             ((keyFields[0] == 0 && dataFields[0] != 0) || (keyFields[0] != 0 && dataFields[0] == 0)))
212         {
213             throw new Exception("If either --k|key-field or --d|data-field is zero both must be zero.");
214         }
215 
216         if ((keyFields.length > 1    && any!(a => a == 0)(keyFields)) ||
217             (dataFields.length > 1   && any!(a => a == 0)(dataFields)) ||
218             (appendFields.length > 1 && any!(a => a == 0)(appendFields)))
219         {
220             throw new Exception("Field 0 (whole line) cannot be combined with individual fields (non-zero).");
221         }
222 
223     }
224 
225     /* Post-processing derivations. */
226     void derivations()
227     {
228         import std.algorithm : each;
229         import std.range;
230 
231         // Convert 'full-line' field indexes (index zero) to boolean flags.
232         if (keyFields.length == 0)
233         {
234             assert(dataFields.length == 0);
235             keyIsFullLine = true;
236             dataIsFullLine = true;
237         }
238         else if (keyFields.length == 1 && keyFields[0] == 0)
239         {
240             keyIsFullLine = true;
241             keyFields.popFront;
242             dataIsFullLine = true;
243 
244             if (dataFields.length == 1)
245             {
246                 assert(dataFields[0] == 0);
247                 dataFields.popFront;
248             }
249         }
250 
251         if (appendFields.length == 1 && appendFields[0] == 0)
252         {
253             appendFullLine = true;
254             appendFields.popFront;
255         }
256 
257         assert(!(keyIsFullLine && keyFields.length > 0));
258         assert(!(dataIsFullLine && dataFields.length > 0));
259         assert(!(appendFullLine && appendFields.length > 0));
260 
261         // Switch to zero-based field indexes.
262         keyFields.each!((ref a) => --a);
263         dataFields.each!((ref a) => --a);
264         appendFields.each!((ref a) => --a);
265     }
266 }
267 
268 static if (__VERSION__ >= 2085) extern(C) __gshared string[] rt_options = [ "gcopt=cleanup:none" ];
269 
270 /** Main program.
271  */
272 int main(string[] cmdArgs)
273 {
274     /* When running in DMD code coverage mode, turn on report merging. */
275     version(D_Coverage) version(DigitalMars)
276     {
277         import core.runtime : dmd_coverSetMerge;
278         dmd_coverSetMerge(true);
279     }
280 
281     TsvJoinOptions cmdopt;
282     auto r = cmdopt.processArgs(cmdArgs);
283     if (!r[0]) return r[1];
284     try tsvJoin(cmdopt, cmdArgs[1..$]);
285     catch (Exception exc)
286     {
287         stderr.writefln("Error [%s]: %s", cmdopt.programName, exc.msg);
288         return 1;
289     }
290     return 0;
291 }
292 
293 /** tsvJoin does the primary work of the tsv-join program.
294  */
295 void tsvJoin(in TsvJoinOptions cmdopt, in string[] inputFiles)
296 {
297     import tsv_utils.common.utils : InputFieldReordering, bufferedByLine, BufferedOutputRange, throwIfWindowsNewlineOnUnix;
298     import std.algorithm : splitter;
299     import std.array : join;
300     import std.range;
301     import std.conv : to;
302 
303     /* State, variables, and convenience derivations.
304      *
305      * Combinations of individual fields and whole line (field zero) are convenient for the
306      * user, but create complexities for the program. Many combinations are disallowed by
307      * command line processing, but the remaining combos still leave several states. Also,
308      * this code optimizes by doing only necessary operations, further complicating state
309      * Here's a guide to variables and state.
310      * - cmdopt.keyFields, cmdopt.dataFields arrays - Individual field indexes used as keys.
311      *      Empty if the  whole line is used as a key. Must be the same length.
312      * - cmdopt.keyIsFullLine, cmdopt.dataIsFullLine - True when the whole line is used key.
313      * - cmdopt.appendFields array - Indexes of individual filter file fields being appended.
314      *      Empty if appending the full line, or if not appending anything.
315      * - cmdopt.appendFullLine - True when the whole line is being appended.
316      * - isAppending - True is something is being appended.
317      * - cmdopt.writeAll - True if all lines are being written
318      */
319     /* Convenience derivations. */
320     auto numKeyFields = cmdopt.keyFields.length;
321     auto numAppendFields = cmdopt.appendFields.length;
322     bool isAppending = (cmdopt.appendFullLine || numAppendFields > 0);
323 
324     /* Mappings from field indexes in the input lines to collection arrays. */
325     auto filterKeysReordering = new InputFieldReordering!char(cmdopt.keyFields);
326     auto dataKeysReordering = (cmdopt.dataFields.length == 0) ?
327         filterKeysReordering : new InputFieldReordering!char(cmdopt.dataFields);
328     auto appendFieldsReordering = new InputFieldReordering!char(cmdopt.appendFields);
329 
330     /* The master filter hash. The key is the delimited fields concatenated together
331      * (including separators). The value is the appendFields concatenated together, as
332      * they will be appended to the input line. Both the keys and append fields are
333      * assembled in the order specified, though this only required for append fields.
334      */
335     string[string] filterHash;
336     string appendFieldsHeader;
337 
338     /* The append values for unmatched records. */
339     char[] appendFieldsUnmatchedValue;
340 
341     if (cmdopt.writeAll)
342     {
343         assert(cmdopt.appendFields.length > 0);  // Checked in consistencyValidations
344 
345         // reserve space for n values and n-1 delimiters
346         appendFieldsUnmatchedValue.reserve(cmdopt.appendFields.length * (cmdopt.writeAllValue.length + 1) - 1);
347 
348         appendFieldsUnmatchedValue ~= cmdopt.writeAllValue;
349         for (size_t i = 1; i < cmdopt.appendFields.length; ++i)
350         {
351             appendFieldsUnmatchedValue ~= cmdopt.delim;
352             appendFieldsUnmatchedValue ~= cmdopt.writeAllValue;
353         }
354     }
355 
356     /* Read the filter file. */
357     {
358         bool needPerFieldProcessing = (numKeyFields > 0) || (numAppendFields > 0);
359         auto filterStream = (cmdopt.filterFile == "-") ? stdin : cmdopt.filterFile.File;
360         foreach (lineNum, line; filterStream.bufferedByLine.enumerate(1))
361         {
362             debug writeln("[filter line] |", line, "|");
363             if (needPerFieldProcessing)
364             {
365                 filterKeysReordering.initNewLine;
366                 appendFieldsReordering.initNewLine;
367 
368                 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate)
369                 {
370                     filterKeysReordering.processNextField(fieldIndex,fieldValue);
371                     appendFieldsReordering.processNextField(fieldIndex,fieldValue);
372 
373                     if (filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled)
374                     {
375                         break;
376                     }
377                 }
378                 // Processed all fields in the line.
379                 if (!filterKeysReordering.allFieldsFilled || !appendFieldsReordering.allFieldsFilled)
380                 {
381                     throw new Exception(
382                         format("Not enough fields in line. File: %s, Line: %s",
383                                (cmdopt.filterFile == "-") ? "Standard Input" : cmdopt.filterFile, lineNum));
384                 }
385             }
386 
387             string key = cmdopt.keyIsFullLine ?
388                 line.to!string : filterKeysReordering.outputFields.join(cmdopt.delim).to!string;
389             string appendValues = cmdopt.appendFullLine ?
390                 line.to!string : appendFieldsReordering.outputFields.join(cmdopt.delim).to!string;
391 
392             debug writeln("  --> [key]:[append] => [", key, "]:[", appendValues, "]");
393 
394             if (lineNum == 1) throwIfWindowsNewlineOnUnix(line, cmdopt.filterFile, lineNum);
395 
396             if (lineNum == 1 && cmdopt.hasHeader)
397             {
398                 if (cmdopt.appendHeaderPrefix.length == 0)
399                 {
400                     appendFieldsHeader = appendValues;
401                 }
402                 else
403                 {
404                     foreach (fieldIndex, fieldValue; appendValues.splitter(cmdopt.delim).enumerate)
405                     {
406                         if (fieldIndex > 0) appendFieldsHeader ~= cmdopt.delim;
407                         appendFieldsHeader ~= cmdopt.appendHeaderPrefix;
408                         appendFieldsHeader ~= fieldValue;
409                     }
410                 }
411             }
412             else
413             {
414                 if (isAppending && !cmdopt.allowDupliateKeys)
415                 {
416                     string* currAppendValues = (key in filterHash);
417                     if (currAppendValues !is null && *currAppendValues != appendValues)
418                     {
419                         throw new Exception(
420                             format("Duplicate keys with different append values (use --z|allow-duplicate-keys to ignore)\n   [key 1][values]: [%s][%s]\n   [key 2][values]: [%s][%s]",
421                                    key, *currAppendValues, key, appendValues));
422                     }
423                 }
424                 filterHash[key] = appendValues;
425             }
426         }
427     }
428 
429     filterHash.rehash;    // For faster lookups. (Per docs. In my tests no performance delta.)
430 
431     /* Now process each input file, one line at a time. */
432 
433     auto bufferedOutput = BufferedOutputRange!(typeof(stdout))(stdout);
434     bool headerWritten = false;
435 
436     foreach (filename; (inputFiles.length > 0) ? inputFiles : ["-"])
437     {
438         auto inputStream = (filename == "-") ? stdin : filename.File();
439         foreach (lineNum, line; inputStream.bufferedByLine.enumerate(1))
440         {
441             debug writeln("[input line] |", line, "|");
442 
443             if (lineNum == 1) throwIfWindowsNewlineOnUnix(line, filename, lineNum);
444 
445             if (lineNum == 1 && cmdopt.hasHeader)
446             {
447                 /* Header line processing. */
448                 if (!headerWritten)
449                 {
450                     bufferedOutput.append(line);
451                     if (isAppending)
452                     {
453                         bufferedOutput.append(cmdopt.delim);
454                         bufferedOutput.append(appendFieldsHeader);
455                     }
456                     bufferedOutput.appendln;
457                     headerWritten = true;
458                 }
459             }
460             else
461             {
462                 /* Regular line (not a header line).
463                  *
464                  * Next block checks if the input line matches a hash entry. Two cases:
465                  *   a) The whole line is the key. Simply look it up in the hash.
466                  *   b) Individual fields are used as the key - Assemble key and look it up.
467                  *
468                  * At the end of the appendFields will contain the result of hash lookup.
469                  */
470                 string* appendFields;
471                 if (cmdopt.keyIsFullLine)
472                 {
473                     appendFields = (line in filterHash);
474                 }
475                 else
476                 {
477                     dataKeysReordering.initNewLine;
478                     foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate)
479                     {
480                         dataKeysReordering.processNextField(fieldIndex, fieldValue);
481                         if (dataKeysReordering.allFieldsFilled) break;
482                     }
483                     // Processed all fields in the line.
484                     if (!dataKeysReordering.allFieldsFilled)
485                     {
486                         throw new Exception(
487                             format("Not enough fields in line. File: %s, Line: %s",
488                                    (filename == "-") ? "Standard Input" : filename, lineNum));
489                     }
490                     appendFields = (dataKeysReordering.outputFields.join(cmdopt.delim) in filterHash);
491                 }
492 
493                 bool matched = (appendFields !is null);
494                 debug writeln("   --> matched? ", matched);
495                 if (cmdopt.writeAll || (matched && !cmdopt.exclude) || (!matched && cmdopt.exclude))
496                 {
497                     bufferedOutput.append(line);
498                     if (isAppending)
499                     {
500                         bufferedOutput.append(cmdopt.delim);
501                         bufferedOutput.append(matched ? *appendFields : appendFieldsUnmatchedValue);
502                     }
503                     bufferedOutput.appendln();
504                 }
505             }
506         }
507     }
508 }