1 /**
2 Command line tool that joins tab-separated value files based on a common key.
3 
4 This tool joins lines from tab-delimited files based on a common key. One file, the 'filter'
5 file, contains the records (lines) being matched. The other input files are searched for
6 matching records. Matching records are written to standard output, along with any designated
7 fields from the 'filter' file. In database parlance this is a 'hash semi-join'.
8 
9 Copyright (c) 2015-2020, eBay Inc.
10 Initially written by Jon Degenhardt
11 
12 License: Boost Licence 1.0 (http://boost.org/LICENSE_1_0.txt)
13 */
14 module tsv_utils.tsv_join;
15 
16 import std.exception : enforce;
17 import std.stdio;
18 import std.format : format;
19 import std.typecons : tuple;
20 
21 auto helpText = q"EOS
22 Synopsis: tsv-join --filter-file file [options] file [file...]
23 
24 tsv-join matches input lines against lines from a 'filter' file. The match is
25 based on fields or the entire line. Use '--help-verbose' for more details.
26 
27 Options:
28 EOS";
29 
30 auto helpTextVerbose = q"EOS
31 Synopsis: tsv-join --filter-file file [options] file [file...]
32 
33 tsv-join matches input lines against lines from a 'filter' file. The match is
34 based on exact match comparison of one or more 'key' fields. Fields are TAB
35 delimited by default. Matching lines are written to standard output, along with
36 any additional fields from the key file that have been specified. An example:
37 
38   tsv-join --filter-file filter.tsv --key-fields 1 --append-fields 5,6 data.tsv
39 
40 This reads filter.tsv, creating a hash table keyed on field 1. Lines from data.tsv
41 are read one at a time. If field 1 is found in the hash table, the line is written
42 to standard output with fields 5 and 6 from the filter file appended. In database
43 parlance this is a "hash semi join". Note the asymmetric relationship: Records in
44 the filter file should be unique, but data.tsv lines can repeat.
45 
46 tsv-join can also work as a simple filter, this is the default behavior. Example:
47 
48   tsv-join --filter-file filter.tsv data.tsv
49 
50 This outputs all lines from data.tsv found in filter.tsv. --key-fields can still
51 be used to define the match key. The --exclude option can be used to exclude
52 matched lines rather than keep them.
53 
54 Multiple fields can be specified as keys and append fields. Field numbers start
55 at one, zero represents the whole line. Fields are comma separated and ranges
56 can be used. Example:
57 
58   tsv-join -f filter.tsv -k 1,2 --append-fields 3-7 data.tsv
59 
60 Options:
61 EOS";
62 
63 /** Container for command line options.
64  */
65 struct TsvJoinOptions
66 {
67     import tsv_utils.common.utils : byLineSourceRange, ByLineSourceRange,
68         inputSourceRange, InputSourceRange, ReadHeader;
69 
70     /* Data available the main program. Variables used only command line argument
71      * processing are local to processArgs.
72      */
73     string programName;
74     InputSourceRange inputSources;     // Input Files
75     ByLineSourceRange!() filterSource; // Derived: --filter
76     size_t[] keyFields;                // --key-fields
77     size_t[] dataFields;               // --data-fields
78     size_t[] appendFields;             // --append-fields
79     bool hasHeader = false;            // --H|header
80     string appendHeaderPrefix = "";    // --append-header-prefix
81     bool writeAll = false;             // --write-all
82     string writeAllValue;              // --write-all
83     bool exclude = false;              // --exclude
84     char delim = '\t';                 // --delimiter
85     bool allowDupliateKeys = false;    // --allow-duplicate-keys
86     bool keyIsFullLine = false;        // Derived: --key-fields 0
87     bool dataIsFullLine = false;       // Derived: --data-fields 0
88     bool appendFullLine = false;       // Derived: --append-fields 0
89 
90     /* Returns a tuple. First value is true if command line arguments were successfully
91      * processed and execution should continue, or false if an error occurred or the user
92      * asked for help. If false, the second value is the appropriate exit code (0 or 1).
93      *
94      * Returning true (execution continues) means args have been validated and derived
95      * values calculated. In addition, field indices have been converted to zero-based.
96      * If the whole line is the key, the individual fields lists will be cleared.
97      */
98     auto processArgs (ref string[] cmdArgs)
99     {
100         import std.getopt;
101         import std.path : baseName, stripExtension;
102         import std.typecons : Yes, No;
103         import tsv_utils.common.utils :  makeFieldListOptionHandler;
104 
105         string filterFile;               // --filter
106         bool helpVerbose = false;        // --help-verbose
107         bool versionWanted = false;      // --V|version
108 
109         programName = (cmdArgs.length > 0) ? cmdArgs[0].stripExtension.baseName : "Unknown_program_name";
110 
111         /* Handler for --write-all. Special handler so two values can be set. */
112         void writeAllHandler(string option, string value)
113         {
114             debug stderr.writeln("[writeAllHandler] |", option, "|  |", value, "|");
115             writeAll = true;
116             writeAllValue = value;
117         }
118 
119         try
120         {
121             arraySep = ",";    // Use comma to separate values in command line options
122             auto r = getopt(
123                 cmdArgs,
124                 "help-verbose",    "              Print full help.", &helpVerbose,
125                 "f|filter-file",   "FILE          (Required) File with records to use as a filter.", &filterFile,
126 
127                 "k|key-fields",    "<field-list>  Fields to use as join key. Default: 0 (entire line).",
128                 keyFields.makeFieldListOptionHandler!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero),
129 
130                 "d|data-fields",   "<field-list>  Data record fields to use as join key, if different than --key-fields.",
131                 dataFields.makeFieldListOptionHandler!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero),
132 
133                 "a|append-fields", "<field-list>  Filter fields to append to matched records.",
134                 appendFields.makeFieldListOptionHandler!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero),
135 
136                 std.getopt.config.caseSensitive,
137                 "H|header",        "              Treat the first line of each file as a header.", &hasHeader,
138                 std.getopt.config.caseInsensitive,
139                 "p|prefix",        "STR           String to use as a prefix for --append-fields when writing a header line.", &appendHeaderPrefix,
140                 "w|write-all",     "STR           Output all data records. STR is the --append-fields value when writing unmatched records.", &writeAllHandler,
141                 "e|exclude",       "              Exclude matching records.", &exclude,
142                 "delimiter",       "CHR           Field delimiter. Default: TAB. (Single byte UTF-8 characters only.)", &delim,
143                 "z|allow-duplicate-keys",
144                                    "              Allow duplicate keys with different append values (last entry wins).", &allowDupliateKeys,
145                 std.getopt.config.caseSensitive,
146                 "V|version",       "              Print version information and exit.", &versionWanted,
147                 std.getopt.config.caseInsensitive,
148                 );
149 
150             if (r.helpWanted)
151             {
152                 defaultGetoptPrinter(helpText, r.options);
153                 return tuple(false, 0);
154             }
155             else if (helpVerbose)
156             {
157                 defaultGetoptPrinter(helpTextVerbose, r.options);
158                 return tuple(false, 0);
159             }
160             else if (versionWanted)
161             {
162                 import tsv_utils.common.tsvutils_version;
163                 writeln(tsvutilsVersionNotice("tsv-join"));
164                 return tuple(false, 0);
165             }
166 
167             /* File arguments.
168              *   *  --filter-file required, converted to a one-element ByLineSourceRange
169              *   *  Remaining command line args are input files.
170              */
171             enforce(filterFile.length != 0,
172                     "Required option --filter-file was not supplied.");
173 
174             enforce(!(filterFile == "-" && cmdArgs.length == 1),
175                     "A data file is required when standard input is used for the filter file (--f|filter-file -).");
176 
177             filterSource = byLineSourceRange([filterFile]);
178 
179             string[] filepaths = (cmdArgs.length > 1) ? cmdArgs[1 .. $] : ["-"];
180             cmdArgs.length = 1;
181             ReadHeader readHeader = hasHeader ? Yes.readHeader : No.readHeader;
182             inputSources = inputSourceRange(filepaths, readHeader);
183 
184             consistencyValidations(cmdArgs);
185             derivations();
186         }
187         catch (Exception exc)
188         {
189             stderr.writefln("[%s] Error processing command line arguments: %s", programName, exc.msg);
190             return tuple(false, 1);
191         }
192         return tuple(true, 0);
193     }
194 
195     /* This routine does validations not handled by getopt, usually because they
196      * involve interactions between multiple parameters.
197      */
198     private void consistencyValidations(ref string[] processedCmdArgs)
199     {
200         import std.algorithm : all;
201 
202         if (writeAll)
203         {
204             enforce(appendFields.length != 0,
205                     "Use --a|append-fields when using --w|write-all.");
206 
207             enforce(!(appendFields.length == 1 && appendFields[0] == 0),
208                     "Cannot use '--a|append-fields 0' (whole line) when using --w|write-all.");
209         }
210 
211         enforce(!(appendFields.length > 0 && exclude),
212                 "--e|exclude cannot be used with --a|append-fields.");
213 
214         enforce(appendHeaderPrefix.length == 0 || hasHeader,
215                 "Use --header when using --p|prefix.");
216 
217         enforce(dataFields.length == 0 || keyFields.length == dataFields.length,
218                 "Different number of --k|key-fields and --d|data-fields.");
219 
220         enforce(keyFields.length != 1 ||
221                 dataFields.length != 1 ||
222                 (keyFields[0] == 0 && dataFields[0] == 0) ||
223                 (keyFields[0] != 0 && dataFields[0] != 0),
224                 "If either --k|key-field or --d|data-field is zero both must be zero.");
225 
226         enforce((keyFields.length <= 1 || all!(a => a != 0)(keyFields)) &&
227                 (dataFields.length <= 1 || all!(a => a != 0)(dataFields)) &&
228                 (appendFields.length <= 1 || all!(a => a != 0)(appendFields)),
229                 "Field 0 (whole line) cannot be combined with individual fields (non-zero).");
230     }
231 
232     /* Post-processing derivations. */
233     void derivations()
234     {
235         import std.algorithm : each;
236         import std.range;
237 
238         // Convert 'full-line' field indexes (index zero) to boolean flags.
239         if (keyFields.length == 0)
240         {
241             assert(dataFields.length == 0);
242             keyIsFullLine = true;
243             dataIsFullLine = true;
244         }
245         else if (keyFields.length == 1 && keyFields[0] == 0)
246         {
247             keyIsFullLine = true;
248             keyFields.popFront;
249             dataIsFullLine = true;
250 
251             if (dataFields.length == 1)
252             {
253                 assert(dataFields[0] == 0);
254                 dataFields.popFront;
255             }
256         }
257 
258         if (appendFields.length == 1 && appendFields[0] == 0)
259         {
260             appendFullLine = true;
261             appendFields.popFront;
262         }
263 
264         assert(!(keyIsFullLine && keyFields.length > 0));
265         assert(!(dataIsFullLine && dataFields.length > 0));
266         assert(!(appendFullLine && appendFields.length > 0));
267 
268         // Switch to zero-based field indexes.
269         keyFields.each!((ref a) => --a);
270         dataFields.each!((ref a) => --a);
271         appendFields.each!((ref a) => --a);
272     }
273 }
274 
275 static if (__VERSION__ >= 2085) extern(C) __gshared string[] rt_options = [ "gcopt=cleanup:none" ];
276 
277 /** Main program.
278  */
279 int main(string[] cmdArgs)
280 {
281     /* When running in DMD code coverage mode, turn on report merging. */
282     version(D_Coverage) version(DigitalMars)
283     {
284         import core.runtime : dmd_coverSetMerge;
285         dmd_coverSetMerge(true);
286     }
287 
288     TsvJoinOptions cmdopt;
289     auto r = cmdopt.processArgs(cmdArgs);
290     if (!r[0]) return r[1];
291     try tsvJoin(cmdopt);
292     catch (Exception exc)
293     {
294         stderr.writefln("Error [%s]: %s", cmdopt.programName, exc.msg);
295         return 1;
296     }
297     return 0;
298 }
299 
300 /** tsvJoin does the primary work of the tsv-join program.
301  */
302 void tsvJoin(ref TsvJoinOptions cmdopt)
303 {
304     import tsv_utils.common.utils : ByLineSourceRange, bufferedByLine, BufferedOutputRange,
305         InputFieldReordering, InputSourceRange, throwIfWindowsNewlineOnUnix;
306     import std.algorithm : splitter;
307     import std.array : join;
308     import std.range;
309     import std.conv : to;
310 
311     /* Check that the input files were setup correctly. Should have one filter file as a
312      * ByLineSourceRange. There should be at least one input file as an InputSourceRange.
313      */
314     assert(cmdopt.filterSource.length == 1);
315     static assert(is(typeof(cmdopt.filterSource) == ByLineSourceRange!(No.keepTerminator)));
316 
317     assert(!cmdopt.inputSources.empty);
318     static assert(is(typeof(cmdopt.inputSources) == InputSourceRange));
319 
320     /* State, variables, and convenience derivations.
321      *
322      * Combinations of individual fields and whole line (field zero) are convenient for the
323      * user, but create complexities for the program. Many combinations are disallowed by
324      * command line processing, but the remaining combos still leave several states. Also,
325      * this code optimizes by doing only necessary operations, further complicating state
326      * Here's a guide to variables and state.
327      * - cmdopt.keyFields, cmdopt.dataFields arrays - Individual field indexes used as keys.
328      *      Empty if the  whole line is used as a key. Must be the same length.
329      * - cmdopt.keyIsFullLine, cmdopt.dataIsFullLine - True when the whole line is used key.
330      * - cmdopt.appendFields array - Indexes of individual filter file fields being appended.
331      *      Empty if appending the full line, or if not appending anything.
332      * - cmdopt.appendFullLine - True when the whole line is being appended.
333      * - isAppending - True is something is being appended.
334      * - cmdopt.writeAll - True if all lines are being written
335      */
336     /* Convenience derivations. */
337     auto numKeyFields = cmdopt.keyFields.length;
338     auto numAppendFields = cmdopt.appendFields.length;
339     bool isAppending = (cmdopt.appendFullLine || numAppendFields > 0);
340 
341     /* Mappings from field indexes in the input lines to collection arrays. */
342     auto filterKeysReordering = new InputFieldReordering!char(cmdopt.keyFields);
343     auto dataKeysReordering = (cmdopt.dataFields.length == 0) ?
344         filterKeysReordering : new InputFieldReordering!char(cmdopt.dataFields);
345     auto appendFieldsReordering = new InputFieldReordering!char(cmdopt.appendFields);
346 
347     /* The master filter hash. The key is the delimited fields concatenated together
348      * (including separators). The value is the appendFields concatenated together, as
349      * they will be appended to the input line. Both the keys and append fields are
350      * assembled in the order specified, though this only required for append fields.
351      */
352     string[string] filterHash;
353     string appendFieldsHeader;
354 
355     /* The append values for unmatched records. */
356     char[] appendFieldsUnmatchedValue;
357 
358     if (cmdopt.writeAll)
359     {
360         assert(cmdopt.appendFields.length > 0);  // Checked in consistencyValidations
361 
362         // reserve space for n values and n-1 delimiters
363         appendFieldsUnmatchedValue.reserve(cmdopt.appendFields.length * (cmdopt.writeAllValue.length + 1) - 1);
364 
365         appendFieldsUnmatchedValue ~= cmdopt.writeAllValue;
366         for (size_t i = 1; i < cmdopt.appendFields.length; ++i)
367         {
368             appendFieldsUnmatchedValue ~= cmdopt.delim;
369             appendFieldsUnmatchedValue ~= cmdopt.writeAllValue;
370         }
371     }
372 
373     /* Read the filter file. */
374     {
375         bool needPerFieldProcessing = (numKeyFields > 0) || (numAppendFields > 0);
376         auto filterStream = cmdopt.filterSource.front;
377         foreach (lineNum, line; filterStream.byLine.enumerate(1))
378         {
379             debug writeln("[filter line] |", line, "|");
380             if (needPerFieldProcessing)
381             {
382                 filterKeysReordering.initNewLine;
383                 appendFieldsReordering.initNewLine;
384 
385                 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate)
386                 {
387                     filterKeysReordering.processNextField(fieldIndex,fieldValue);
388                     appendFieldsReordering.processNextField(fieldIndex,fieldValue);
389 
390                     if (filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled)
391                     {
392                         break;
393                     }
394                 }
395 
396                 // Processed all fields in the line.
397                 enforce(filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled,
398                         format("Not enough fields in line. File: %s, Line: %s",
399                                filterStream.name, lineNum));
400             }
401 
402             string key = cmdopt.keyIsFullLine ?
403                 line.to!string : filterKeysReordering.outputFields.join(cmdopt.delim).to!string;
404             string appendValues = cmdopt.appendFullLine ?
405                 line.to!string : appendFieldsReordering.outputFields.join(cmdopt.delim).to!string;
406 
407             debug writeln("  --> [key]:[append] => [", key, "]:[", appendValues, "]");
408 
409             if (lineNum == 1) throwIfWindowsNewlineOnUnix(line, filterStream.name, lineNum);
410 
411             if (lineNum == 1 && cmdopt.hasHeader)
412             {
413                 if (cmdopt.appendHeaderPrefix.length == 0)
414                 {
415                     appendFieldsHeader = appendValues;
416                 }
417                 else
418                 {
419                     foreach (fieldIndex, fieldValue; appendValues.splitter(cmdopt.delim).enumerate)
420                     {
421                         if (fieldIndex > 0) appendFieldsHeader ~= cmdopt.delim;
422                         appendFieldsHeader ~= cmdopt.appendHeaderPrefix;
423                         appendFieldsHeader ~= fieldValue;
424                     }
425                 }
426             }
427             else
428             {
429                 if (isAppending && !cmdopt.allowDupliateKeys)
430                 {
431                     string* currAppendValues = (key in filterHash);
432 
433                     enforce(currAppendValues is null || *currAppendValues == appendValues,
434                             format("Duplicate keys with different append values (use --z|allow-duplicate-keys to ignore)\n   [key 1][values]: [%s][%s]\n   [key 2][values]: [%s][%s]",
435                                    key, *currAppendValues, key, appendValues));
436                 }
437                 filterHash[key] = appendValues;
438             }
439         }
440 
441         /* popFront here closes the filter file. */
442         cmdopt.filterSource.popFront;
443     }
444 
445     /* Now process each input file, one line at a time. */
446 
447     auto bufferedOutput = BufferedOutputRange!(typeof(stdout))(stdout);
448 
449     /* First header is read during command line argument processing. */
450     if (cmdopt.hasHeader && !cmdopt.inputSources.front.isHeaderEmpty)
451     {
452         auto inputStream = cmdopt.inputSources.front;
453         throwIfWindowsNewlineOnUnix(inputStream.header, inputStream.name, 1);
454 
455         bufferedOutput.append(inputStream.header);
456         if (isAppending)
457         {
458             bufferedOutput.append(cmdopt.delim);
459             bufferedOutput.append(appendFieldsHeader);
460         }
461         bufferedOutput.appendln;
462     }
463 
464     immutable size_t fileBodyStartLine = cmdopt.hasHeader ? 2 : 1;
465 
466     foreach (inputStream; cmdopt.inputSources)
467     {
468         if (cmdopt.hasHeader) throwIfWindowsNewlineOnUnix(inputStream.header, inputStream.name, 1);
469 
470         foreach (lineNum, line; inputStream.file.bufferedByLine.enumerate(fileBodyStartLine))
471         {
472             debug writeln("[input line] |", line, "|");
473 
474             if (lineNum == 1) throwIfWindowsNewlineOnUnix(line, inputStream.name, lineNum);
475 
476             /*
477              * Next block checks if the input line matches a hash entry. Two cases:
478              *   a) The whole line is the key. Simply look it up in the hash.
479              *   b) Individual fields are used as the key - Assemble key and look it up.
480              *
481              * At the end of the appendFields will contain the result of hash lookup.
482              */
483             string* appendFields;
484             if (cmdopt.keyIsFullLine)
485             {
486                 appendFields = (line in filterHash);
487             }
488             else
489             {
490                 dataKeysReordering.initNewLine;
491                 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate)
492                 {
493                     dataKeysReordering.processNextField(fieldIndex, fieldValue);
494                     if (dataKeysReordering.allFieldsFilled) break;
495                 }
496                 // Processed all fields in the line.
497                 enforce(dataKeysReordering.allFieldsFilled,
498                         format("Not enough fields in line. File: %s, Line: %s",
499                                inputStream.name, lineNum));
500 
501                 appendFields = (dataKeysReordering.outputFields.join(cmdopt.delim) in filterHash);
502             }
503 
504             bool matched = (appendFields !is null);
505             debug writeln("   --> matched? ", matched);
506             if (cmdopt.writeAll || (matched && !cmdopt.exclude) || (!matched && cmdopt.exclude))
507             {
508                 bufferedOutput.append(line);
509                 if (isAppending)
510                 {
511                     bufferedOutput.append(cmdopt.delim);
512                     bufferedOutput.append(matched ? *appendFields : appendFieldsUnmatchedValue);
513                 }
514                 bufferedOutput.appendln();
515             }
516         }
517     }
518 }