1 /**
2 Command line tool that joins tab-separated value files based on a common key.
3 
4 This tool joins lines from tab-delimited files based on a common key. One file, the 'filter'
5 file, contains the records (lines) being matched. The other input files are searched for
6 matching records. Matching records are written to standard output, along with any designated
7 fields from the 'filter' file. In database parlance this is a 'hash semi-join'.
8 
9 Copyright (c) 2015-2021, eBay Inc.
10 Initially written by Jon Degenhardt
11 
12 License: Boost Licence 1.0 (http://boost.org/LICENSE_1_0.txt)
13 */
14 module tsv_utils.tsv_join;
15 
16 import std.exception : enforce;
17 import std.stdio;
18 import std.format : format;
19 import std.range;
20 import std.typecons : tuple;
21 
22 auto helpText = q"EOS
23 Synopsis: tsv-join --filter-file file [options] [file...]
24 
25 tsv-join matches input lines (the 'data stream') against lines from a
26 'filter' file. The match is based on individual fields or the entire
27 line. Fields can be specified either by field number or field name.
28 Use '--help-verbose' for details.
29 
30 Options:
31 EOS";
32 
33 auto helpTextVerbose = q"EOS
34 Synopsis: tsv-join --filter-file file [options] [file...]
35 
36 tsv-join matches input lines (the 'data stream') against lines from a
37 'filter' file. The match is based on exact match comparison of one or more
38 'key' fields. Fields are TAB delimited by default. Input lines are read
39 from files or standard input. Matching lines are written to standard
40 output, along with any additional fields from the filter file that have
41 been specified. For example:
42 
43   tsv-join --filter-file filter.tsv --key-fields 1 --append-fields 5,6 data.tsv
44 
45 This reads filter.tsv, creating a hash table keyed on field 1. Lines from
46 data.tsv are read one at a time. If field 1 is found in the hash table,
47 the line is written to standard output with fields 5 and 6 from the filter
48 file appended. In database parlance this is a "hash semi join". Note the
49 asymmetric relationship: Records in the filter file should be unique, but
50 lines in the data stream (data.tsv) can repeat.
51 
52 Field names can be used instead of field numbers if the files have header
53 lines. The following command is similar to the previous example, except
54 using field names:
55 
56   tsv-join -H -f filter.tsv -k ID --append-fields Date,Time data.tsv
57 
58 tsv-join can also work as a simple filter based on the whole line. This is
59 the default behavior. Example:
60 
61   tsv-join -f filter.tsv data.tsv
62 
63 This outputs all lines from data.tsv found in filter.tsv.
64 
65 Multiple fields can be specified as keys and append fields. Field numbers
66 start at one, zero represents the whole line. Fields are comma separated
67 and ranges can be used. Example:
68 
69   tsv-join -f filter.tsv -k 1,2 --append-fields 3-7 data.tsv
70 
71 The --e|exclude option can be used to exclude matched lines rather than
72 keep them.
73 
74 The joins supported are similar to the "stream-static" joins available in
75 Spark Structured Streaming and "KStream-KTable" joins in Kafka. The filter
76 file plays the same role as the Spark static dataset or Kafka KTable.
77 
78 Options:
79 EOS";
80 
81 /** Container for command line options.
82  */
83 struct TsvJoinOptions
84 {
85     import tsv_utils.common.utils : byLineSourceRange, ByLineSourceRange,
86         inputSourceRange, InputSourceRange, ReadHeader;
87 
88     /* Data available the main program. Variables used only command line argument
89      * processing are local to processArgs.
90      */
91     string programName;                /// Program name
92     InputSourceRange inputSources;     /// Input Files
93     ByLineSourceRange!() filterSource; /// Derived: --filter
94     size_t[] keyFields;                /// Derived: --key-fields
95     size_t[] dataFields;               /// Derived: --data-fields
96     size_t[] appendFields;             /// Derived: --append-fields
97     bool hasHeader = false;            /// --H|header
98     string appendHeaderPrefix = "";    /// --append-header-prefix
99     bool writeAll = false;             /// --write-all
100     string writeAllValue;              /// --write-all
101     bool exclude = false;              /// --exclude
102     char delim = '\t';                 /// --delimiter
103     bool allowDupliateKeys = false;    /// --allow-duplicate-keys
104     bool lineBuffered = false;         /// --line-buffered
105     bool keyIsFullLine = false;        /// Derived: --key-fields 0
106     bool dataIsFullLine = false;       /// Derived: --data-fields 0
107     bool appendFullLine = false;       /// Derived: --append-fields 0
108 
109     /* Returns a tuple. First value is true if command line arguments were successfully
110      * processed and execution should continue, or false if an error occurred or the user
111      * asked for help. If false, the second value is the appropriate exit code (0 or 1).
112      *
113      * Returning true (execution continues) means args have been validated and derived
114      * values calculated. In addition, field indices have been converted to zero-based.
115      * If the whole line is the key, the individual fields lists will be cleared.
116      */
117     auto processArgs (ref string[] cmdArgs)
118     {
119         import std.array : split;
120         import std.conv : to;
121         import std.getopt;
122         import std.path : baseName, stripExtension;
123         import std.typecons : Yes, No;
124         import tsv_utils.common.fieldlist;
125         import tsv_utils.common.utils : throwIfWindowsNewline;
126 
127         bool helpVerbose = false;        // --help-verbose
128         bool helpFields = false;         // --help-fields
129         bool versionWanted = false;      // --V|version
130         string filterFile;               // --filter
131         string keyFieldsArg;             // --key-fields
132         string dataFieldsArg;            // --data-fields
133         string appendFieldsArg;          // --append-fields
134 
135         string keyFieldsOptionString = "k|key-fields";
136         string dataFieldsOptionString = "d|data-fields";
137         string appendFieldsOptionString = "a|append-fields";
138 
139         programName = (cmdArgs.length > 0) ? cmdArgs[0].stripExtension.baseName : "Unknown_program_name";
140 
141         /* Handler for --write-all. Special handler so two values can be set. */
142         void writeAllHandler(string option, string value)
143         {
144             debug stderr.writeln("[writeAllHandler] |", option, "|  |", value, "|");
145             writeAll = true;
146             writeAllValue = value;
147         }
148 
149         try
150         {
151             arraySep = ",";    // Use comma to separate values in command line options
152             auto r = getopt(
153                 cmdArgs,
154                 "help-verbose",    "              Print full help.", &helpVerbose,
155                 "help-fields",     "              Print help on specifying fields.", &helpFields,
156 
157                 "f|filter-file",   "FILE          (Required) File with records to use as a filter.", &filterFile,
158 
159                 keyFieldsOptionString,
160                 "<field-list>  Fields to use as the join key. Default: 0 (entire line).",
161                 &keyFieldsArg,
162 
163                 dataFieldsOptionString,
164                 "<field-list>  Data stream fields to use as the join key, if different than --key-fields.",
165                 &dataFieldsArg,
166 
167                 appendFieldsOptionString,
168                 "<field-list>  Filter file fields to append to matched data stream records.",
169                 &appendFieldsArg,
170 
171                 std.getopt.config.caseSensitive,
172                 "H|header",        "              Treat the first line of each file as a header.", &hasHeader,
173                 std.getopt.config.caseInsensitive,
174                 "p|prefix",        "STR           String to use as a prefix for --append-fields when writing a header line.", &appendHeaderPrefix,
175                 "w|write-all",     "STR           Output all data stream records. STR is the --append-fields value when writing unmatched records.", &writeAllHandler,
176                 "e|exclude",       "              Exclude matching records.", &exclude,
177                 "delimiter",       "CHR           Field delimiter. Default: TAB. (Single byte UTF-8 characters only.)", &delim,
178                 "z|allow-duplicate-keys",
179                                    "              Allow duplicate keys with different append values (last entry wins).", &allowDupliateKeys,
180                 "line-buffered",   "              Immediately output every line.", &lineBuffered,
181                 std.getopt.config.caseSensitive,
182                 "V|version",       "              Print version information and exit.", &versionWanted,
183                 std.getopt.config.caseInsensitive,
184                 );
185 
186             if (r.helpWanted)
187             {
188                 defaultGetoptPrinter(helpText, r.options);
189                 return tuple(false, 0);
190             }
191             else if (helpVerbose)
192             {
193                 defaultGetoptPrinter(helpTextVerbose, r.options);
194                 return tuple(false, 0);
195             }
196             else if (helpFields)
197             {
198                 writeln(fieldListHelpText);
199                 return tuple(false, 0);
200             }
201             else if (versionWanted)
202             {
203                 import tsv_utils.common.tsvutils_version;
204                 writeln(tsvutilsVersionNotice("tsv-join"));
205                 return tuple(false, 0);
206             }
207 
208             /* File arguments.
209              *   *  --filter-file required, converted to a one-element ByLineSourceRange
210              *   *  Remaining command line args are input files.
211              */
212             enforce(filterFile.length != 0,
213                     "Required option --f|filter-file was not supplied.");
214 
215             enforce(!(filterFile == "-" && cmdArgs.length == 1),
216                     "A data file is required when standard input is used for the filter file (--f|filter-file -).");
217 
218             string[] filepaths = (cmdArgs.length > 1) ? cmdArgs[1 .. $] : ["-"];
219             cmdArgs.length = 1;
220 
221              /* Validation and derivations - Do as much validation prior to header line
222              * processing as possible (avoids waiting on stdin).
223              *
224              * Note: In tsv-join, when header processing is on, there is very little
225              * validatation that can be done prior to reading the header line. All the
226              * logic is in the fieldListArgProcessing function.
227              */
228 
229             string[] filterFileHeaderFields;
230             string[] inputSourceHeaderFields;
231 
232             /* fieldListArgProcessing encapsulates the field list dependent processing.
233              * It is called prior to reading the header line if headers are not being used,
234              * and after if headers are being used.
235              */
236             void fieldListArgProcessing()
237             {
238                 import std.algorithm : all, each;
239 
240                 /* field list parsing. */
241                 if (!keyFieldsArg.empty)
242                 {
243                     keyFields =
244                         keyFieldsArg
245                         .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero)
246                         (hasHeader, filterFileHeaderFields, keyFieldsOptionString)
247                         .array;
248                 }
249 
250                 if (!dataFieldsArg.empty)
251                 {
252                     dataFields =
253                         dataFieldsArg
254                         .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero)
255                         (hasHeader, inputSourceHeaderFields, dataFieldsOptionString)
256                         .array;
257                 }
258                 else if (!keyFieldsArg.empty)
259                 {
260                     dataFields =
261                         keyFieldsArg
262                         .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero)
263                         (hasHeader, inputSourceHeaderFields, dataFieldsOptionString)
264                         .array;
265                 }
266 
267                 if (!appendFieldsArg.empty)
268                 {
269                     appendFields =
270                         appendFieldsArg
271                         .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero)
272                         (hasHeader, filterFileHeaderFields, appendFieldsOptionString)
273                         .array;
274                 }
275 
276                 /* Validations */
277                 if (writeAll)
278                 {
279                     enforce(appendFields.length != 0,
280                             "Use --a|append-fields when using --w|write-all.");
281 
282                     enforce(!(appendFields.length == 1 && appendFields[0] == 0),
283                             "Cannot use '--a|append-fields 0' (whole line) when using --w|write-all.");
284                 }
285 
286                 enforce(!(appendFields.length > 0 && exclude),
287                         "--e|exclude cannot be used with --a|append-fields.");
288 
289                 enforce(appendHeaderPrefix.length == 0 || hasHeader,
290                         "Use --header when using --p|prefix.");
291 
292                 enforce(dataFields.length == 0 || keyFields.length == dataFields.length,
293                         "Different number of --k|key-fields and --d|data-fields.");
294 
295                 enforce(keyFields.length != 1 ||
296                         dataFields.length != 1 ||
297                         (keyFields[0] == 0 && dataFields[0] == 0) ||
298                         (keyFields[0] != 0 && dataFields[0] != 0),
299                         "If either --k|key-field or --d|data-field is zero both must be zero.");
300 
301                 enforce((keyFields.length <= 1 || all!(a => a != 0)(keyFields)) &&
302                         (dataFields.length <= 1 || all!(a => a != 0)(dataFields)) &&
303                         (appendFields.length <= 1 || all!(a => a != 0)(appendFields)),
304                         "Field 0 (whole line) cannot be combined with individual fields (non-zero).");
305 
306                 /* Derivations. */
307 
308                 // Convert 'full-line' field indexes (index zero) to boolean flags.
309                 if (keyFields.length == 0)
310                 {
311                     assert(dataFields.length == 0);
312                     keyIsFullLine = true;
313                     dataIsFullLine = true;
314                 }
315                 else if (keyFields.length == 1 && keyFields[0] == 0)
316                 {
317                     keyIsFullLine = true;
318                     keyFields.popFront;
319                     dataIsFullLine = true;
320 
321                     if (dataFields.length == 1)
322                     {
323                         assert(dataFields[0] == 0);
324                         dataFields.popFront;
325                     }
326                 }
327 
328                 if (appendFields.length == 1 && appendFields[0] == 0)
329                 {
330                     appendFullLine = true;
331                     appendFields.popFront;
332                 }
333 
334                 assert(!(keyIsFullLine && keyFields.length > 0));
335                 assert(!(dataIsFullLine && dataFields.length > 0));
336                 assert(!(appendFullLine && appendFields.length > 0));
337 
338                 // Switch to zero-based field indexes.
339                 keyFields.each!((ref a) => --a);
340                 dataFields.each!((ref a) => --a);
341                 appendFields.each!((ref a) => --a);
342 
343             } // End fieldListArgProcessing()
344 
345 
346             if (!hasHeader) fieldListArgProcessing();
347 
348             /*
349              * Create the input source ranges for the filter file and data stream files
350              * and perform header line processing.
351              */
352 
353             filterSource = byLineSourceRange([filterFile]);
354             ReadHeader readHeader = hasHeader ? Yes.readHeader : No.readHeader;
355             inputSources = inputSourceRange(filepaths, readHeader);
356 
357             if (hasHeader)
358             {
359                 if (!filterSource.front.byLine.empty)
360                 {
361                     throwIfWindowsNewline(filterSource.front.byLine.front, filterSource.front.name, 1);
362                     filterFileHeaderFields = filterSource.front.byLine.front.split(delim).to!(string[]);
363                 }
364                 throwIfWindowsNewline(inputSources.front.header, inputSources.front.name, 1);
365                 inputSourceHeaderFields = inputSources.front.header.split(delim).to!(string[]);
366                 fieldListArgProcessing();
367             }
368         }
369         catch (Exception exc)
370         {
371             stderr.writefln("[%s] Error processing command line arguments: %s", programName, exc.msg);
372             return tuple(false, 1);
373         }
374         return tuple(true, 0);
375     }
376 }
377 
378 static if (__VERSION__ >= 2085) extern(C) __gshared string[] rt_options = [ "gcopt=cleanup:none" ];
379 
380 /** Main program.
381  */
382 int main(string[] cmdArgs)
383 {
384     /* When running in DMD code coverage mode, turn on report merging. */
385     version(D_Coverage) version(DigitalMars)
386     {
387         import core.runtime : dmd_coverSetMerge;
388         dmd_coverSetMerge(true);
389     }
390 
391     TsvJoinOptions cmdopt;
392     auto r = cmdopt.processArgs(cmdArgs);
393     if (!r[0]) return r[1];
394     try tsvJoin(cmdopt);
395     catch (Exception exc)
396     {
397         stderr.writefln("Error [%s]: %s", cmdopt.programName, exc.msg);
398         return 1;
399     }
400     return 0;
401 }
402 
403 /** tsvJoin does the primary work of the tsv-join program.
404  */
405 void tsvJoin(ref TsvJoinOptions cmdopt)
406 {
407     import tsv_utils.common.utils : bufferedByLine, BufferedOutputRange, ByLineSourceRange,
408         InputFieldReordering, InputSourceRange, isFlushableOutputRange, LineBuffered,
409         throwIfWindowsNewline;
410     import std.algorithm : splitter;
411     import std.array : join;
412     import std.range;
413     import std.conv : to;
414 
415     /* Check that the input files were setup correctly. Should have one filter file as a
416      * ByLineSourceRange. There should be at least one input file as an InputSourceRange.
417      */
418     assert(cmdopt.filterSource.length == 1);
419     static assert(is(typeof(cmdopt.filterSource) == ByLineSourceRange!(No.keepTerminator)));
420 
421     assert(!cmdopt.inputSources.empty);
422     static assert(is(typeof(cmdopt.inputSources) == InputSourceRange));
423 
424     /* State, variables, and convenience derivations.
425      *
426      * Combinations of individual fields and whole line (field zero) are convenient for the
427      * user, but create complexities for the program. Many combinations are disallowed by
428      * command line processing, but the remaining combos still leave several states. Also,
429      * this code optimizes by doing only necessary operations, further complicating state
430      * Here's a guide to variables and state.
431      * - cmdopt.keyFields, cmdopt.dataFields arrays - Individual field indexes used as keys.
432      *      Empty if the  whole line is used as a key. Must be the same length.
433      * - cmdopt.keyIsFullLine, cmdopt.dataIsFullLine - True when the whole line is used key.
434      * - cmdopt.appendFields array - Indexes of individual filter file fields being appended.
435      *      Empty if appending the full line, or if not appending anything.
436      * - cmdopt.appendFullLine - True when the whole line is being appended.
437      * - isAppending - True is something is being appended.
438      * - cmdopt.writeAll - True if all lines are being written
439      */
440     /* Convenience derivations. */
441     auto numKeyFields = cmdopt.keyFields.length;
442     auto numAppendFields = cmdopt.appendFields.length;
443     bool isAppending = (cmdopt.appendFullLine || numAppendFields > 0);
444 
445     /* Mappings from field indexes in the input lines to collection arrays. */
446     auto filterKeysReordering = new InputFieldReordering!char(cmdopt.keyFields);
447     auto dataKeysReordering = (cmdopt.dataFields.length == 0) ?
448         filterKeysReordering : new InputFieldReordering!char(cmdopt.dataFields);
449     auto appendFieldsReordering = new InputFieldReordering!char(cmdopt.appendFields);
450 
451     /* The master filter hash. The key is the delimited fields concatenated together
452      * (including separators). The value is the appendFields concatenated together, as
453      * they will be appended to the input line. Both the keys and append fields are
454      * assembled in the order specified, though this only required for append fields.
455      */
456     string[string] filterHash;
457 
458     /* The append values for unmatched records. */
459     char[] appendFieldsUnmatchedValue;
460 
461     if (cmdopt.writeAll)
462     {
463         assert(cmdopt.appendFields.length > 0);  // Checked in consistencyValidations
464 
465         // reserve space for n values and n-1 delimiters
466         appendFieldsUnmatchedValue.reserve(cmdopt.appendFields.length * (cmdopt.writeAllValue.length + 1) - 1);
467 
468         appendFieldsUnmatchedValue ~= cmdopt.writeAllValue;
469         for (size_t i = 1; i < cmdopt.appendFields.length; ++i)
470         {
471             appendFieldsUnmatchedValue ~= cmdopt.delim;
472             appendFieldsUnmatchedValue ~= cmdopt.writeAllValue;
473         }
474     }
475 
476     /* Buffered output range for the final output. Setup here because the header line
477      * (if any) gets written while reading the filter file.
478      */
479     immutable LineBuffered isLineBuffered = cmdopt.lineBuffered ? Yes.lineBuffered : No.lineBuffered;
480     auto bufferedOutput = BufferedOutputRange!(typeof(stdout))(stdout, isLineBuffered);
481 
482     /* Read the filter file. */
483     {
484         bool needPerFieldProcessing = (numKeyFields > 0) || (numAppendFields > 0);
485         auto filterStream = cmdopt.filterSource.front;
486         foreach (lineNum, line; filterStream.byLine.enumerate(1))
487         {
488             debug writeln("[filter line] |", line, "|");
489             if (needPerFieldProcessing)
490             {
491                 filterKeysReordering.initNewLine;
492                 appendFieldsReordering.initNewLine;
493 
494                 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate)
495                 {
496                     filterKeysReordering.processNextField(fieldIndex,fieldValue);
497                     appendFieldsReordering.processNextField(fieldIndex,fieldValue);
498 
499                     if (filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled)
500                     {
501                         break;
502                     }
503                 }
504 
505                 // Processed all fields in the line.
506                 enforce(filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled,
507                         format("Not enough fields in line. File: %s, Line: %s",
508                                filterStream.name, lineNum));
509             }
510 
511             string key = cmdopt.keyIsFullLine ?
512                 line.to!string : filterKeysReordering.outputFields.join(cmdopt.delim).to!string;
513             string appendValues = cmdopt.appendFullLine ?
514                 line.to!string : appendFieldsReordering.outputFields.join(cmdopt.delim).to!string;
515 
516             debug writeln("  --> [key]:[append] => [", key, "]:[", appendValues, "]");
517 
518             if (lineNum == 1) throwIfWindowsNewline(line, filterStream.name, lineNum);
519 
520             if (lineNum == 1 && cmdopt.hasHeader)
521             {
522                 /* When the input has headers, the header line from the first data
523                  * file is read during command line argument processing. Output the
524                  * header now to push it to the next tool in the unix pipeline. This
525                  * enables earlier error detection in downstream tools.
526                  *
527                  * If the input data is empty there will be no header.
528                  */
529                 auto inputStream = cmdopt.inputSources.front;
530 
531                 if (!inputStream.isHeaderEmpty)
532                 {
533                     string appendFieldsHeader;
534 
535                     if (cmdopt.appendHeaderPrefix.length == 0)
536                     {
537                         appendFieldsHeader = appendValues;
538                     }
539                     else
540                     {
541                         foreach (fieldIndex, fieldValue; appendValues.splitter(cmdopt.delim).enumerate)
542                         {
543                             if (fieldIndex > 0) appendFieldsHeader ~= cmdopt.delim;
544                             appendFieldsHeader ~= cmdopt.appendHeaderPrefix;
545                             appendFieldsHeader ~= fieldValue;
546                         }
547                     }
548 
549                     bufferedOutput.append(inputStream.header);
550                     if (isAppending) bufferedOutput.append(cmdopt.delim, appendFieldsHeader);
551                     bufferedOutput.appendln;
552                     bufferedOutput.flush;
553                 }
554             }
555             else
556             {
557                 if (isAppending && !cmdopt.allowDupliateKeys)
558                 {
559                     string* currAppendValues = (key in filterHash);
560 
561                     enforce(currAppendValues is null || *currAppendValues == appendValues,
562                             format("Duplicate keys with different append values (use --z|allow-duplicate-keys to ignore)\n   [key 1][values]: [%s][%s]\n   [key 2][values]: [%s][%s]",
563                                    key, *currAppendValues, key, appendValues));
564                 }
565                 filterHash[key] = appendValues;
566             }
567         }
568 
569         /* popFront here closes the filter file. */
570         cmdopt.filterSource.popFront;
571     }
572 
573     /* Now process each input file, one line at a time. */
574 
575     immutable size_t fileBodyStartLine = cmdopt.hasHeader ? 2 : 1;
576 
577     foreach (inputStream; cmdopt.inputSources)
578     {
579         if (cmdopt.hasHeader) throwIfWindowsNewline(inputStream.header, inputStream.name, 1);
580 
581         foreach (lineNum, line;
582                  inputStream
583                  .file
584                  .bufferedByLine(isLineBuffered)
585                  .enumerate(fileBodyStartLine))
586         {
587             debug writeln("[input line] |", line, "|");
588 
589             if (lineNum == 1) throwIfWindowsNewline(line, inputStream.name, lineNum);
590 
591             /*
592              * Next block checks if the input line matches a hash entry. Two cases:
593              *   a) The whole line is the key. Simply look it up in the hash.
594              *   b) Individual fields are used as the key - Assemble key and look it up.
595              *
596              * At the end of the appendFields will contain the result of hash lookup.
597              */
598             string* appendFields;
599             if (cmdopt.keyIsFullLine)
600             {
601                 appendFields = (line in filterHash);
602             }
603             else
604             {
605                 dataKeysReordering.initNewLine;
606                 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate)
607                 {
608                     dataKeysReordering.processNextField(fieldIndex, fieldValue);
609                     if (dataKeysReordering.allFieldsFilled) break;
610                 }
611                 // Processed all fields in the line.
612                 enforce(dataKeysReordering.allFieldsFilled,
613                         format("Not enough fields in line. File: %s, Line: %s",
614                                inputStream.name, lineNum));
615 
616                 appendFields = (dataKeysReordering.outputFields.join(cmdopt.delim) in filterHash);
617             }
618 
619             bool matched = (appendFields !is null);
620             debug writeln("   --> matched? ", matched);
621             if (cmdopt.writeAll || (matched && !cmdopt.exclude) || (!matched && cmdopt.exclude))
622             {
623                 bufferedOutput.append(line);
624                 if (isAppending)
625                 {
626                     bufferedOutput.append(
627                         cmdopt.delim, matched ? *appendFields : appendFieldsUnmatchedValue);
628                 }
629                 bufferedOutput.appendln();
630             }
631         }
632     }
633 }