1 /** 2 Command line tool that joins tab-separated value files based on a common key. 3 4 This tool joins lines from tab-delimited files based on a common key. One file, the 'filter' 5 file, contains the records (lines) being matched. The other input files are searched for 6 matching records. Matching records are written to standard output, along with any designated 7 fields from the 'filter' file. In database parlance this is a 'hash semi-join'. 8 9 Copyright (c) 2015-2020, eBay Inc. 10 Initially written by Jon Degenhardt 11 12 License: Boost Licence 1.0 (http://boost.org/LICENSE_1_0.txt) 13 */ 14 module tsv_utils.tsv_join; 15 16 import std.exception : enforce; 17 import std.stdio; 18 import std.format : format; 19 import std.range; 20 import std.typecons : tuple; 21 22 auto helpText = q"EOS 23 Synopsis: tsv-join --filter-file file [options] [file...] 24 25 tsv-join matches input lines (the 'data stream') against lines from a 26 'filter' file. The match is based on individual fields or the entire 27 line. Fields can be specified either by field number or field name. 28 Use '--help-verbose' for details. 29 30 Options: 31 EOS"; 32 33 auto helpTextVerbose = q"EOS 34 Synopsis: tsv-join --filter-file file [options] [file...] 35 36 tsv-join matches input lines (the 'data stream') against lines from a 37 'filter' file. The match is based on exact match comparison of one or more 38 'key' fields. Fields are TAB delimited by default. Input lines are read 39 from files or standard input. Matching lines are written to standard 40 output, along with any additional fields from the filter file that have 41 been specified. For example: 42 43 tsv-join --filter-file filter.tsv --key-fields 1 --append-fields 5,6 data.tsv 44 45 This reads filter.tsv, creating a hash table keyed on field 1. Lines from 46 data.tsv are read one at a time. If field 1 is found in the hash table, 47 the line is written to standard output with fields 5 and 6 from the filter 48 file appended. In database parlance this is a "hash semi join". Note the 49 asymmetric relationship: Records in the filter file should be unique, but 50 lines in the data stream (data.tsv) can repeat. 51 52 Field names can be used instead of field numbers if the files have header 53 lines. The following command is similar to the previous example, except 54 using field names: 55 56 tsv-join -H -f filter.tsv -k ID --append-fields Date,Time data.tsv 57 58 tsv-join can also work as a simple filter based on the whole line. This is 59 the default behavior. Example: 60 61 tsv-join -f filter.tsv data.tsv 62 63 This outputs all lines from data.tsv found in filter.tsv. 64 65 Multiple fields can be specified as keys and append fields. Field numbers 66 start at one, zero represents the whole line. Fields are comma separated 67 and ranges can be used. Example: 68 69 tsv-join -f filter.tsv -k 1,2 --append-fields 3-7 data.tsv 70 71 The --e|exclude option can be used to exclude matched lines rather than 72 keep them. 73 74 The joins supported are similar to the "stream-static" joins available in 75 Spark Structured Streaming and "KStream-KTable" joins in Kafka. The filter 76 file plays the same role as the Spark static dataset or Kafka KTable. 77 78 Options: 79 EOS"; 80 81 /** Container for command line options. 82 */ 83 struct TsvJoinOptions 84 { 85 import tsv_utils.common.utils : byLineSourceRange, ByLineSourceRange, 86 inputSourceRange, InputSourceRange, ReadHeader; 87 88 /* Data available the main program. Variables used only command line argument 89 * processing are local to processArgs. 90 */ 91 string programName; /// Program name 92 InputSourceRange inputSources; /// Input Files 93 ByLineSourceRange!() filterSource; /// Derived: --filter 94 size_t[] keyFields; /// Derived: --key-fields 95 size_t[] dataFields; /// Derived: --data-fields 96 size_t[] appendFields; /// Derived: --append-fields 97 bool hasHeader = false; /// --H|header 98 string appendHeaderPrefix = ""; /// --append-header-prefix 99 bool writeAll = false; /// --write-all 100 string writeAllValue; /// --write-all 101 bool exclude = false; /// --exclude 102 char delim = '\t'; /// --delimiter 103 bool allowDupliateKeys = false; /// --allow-duplicate-keys 104 bool keyIsFullLine = false; /// Derived: --key-fields 0 105 bool dataIsFullLine = false; /// Derived: --data-fields 0 106 bool appendFullLine = false; /// Derived: --append-fields 0 107 108 /* Returns a tuple. First value is true if command line arguments were successfully 109 * processed and execution should continue, or false if an error occurred or the user 110 * asked for help. If false, the second value is the appropriate exit code (0 or 1). 111 * 112 * Returning true (execution continues) means args have been validated and derived 113 * values calculated. In addition, field indices have been converted to zero-based. 114 * If the whole line is the key, the individual fields lists will be cleared. 115 */ 116 auto processArgs (ref string[] cmdArgs) 117 { 118 import std.array : split; 119 import std.conv : to; 120 import std.getopt; 121 import std.path : baseName, stripExtension; 122 import std.typecons : Yes, No; 123 import tsv_utils.common.fieldlist; 124 import tsv_utils.common.utils : throwIfWindowsNewlineOnUnix; 125 126 bool helpVerbose = false; // --help-verbose 127 bool helpFields = false; // --help-fields 128 bool versionWanted = false; // --V|version 129 string filterFile; // --filter 130 string keyFieldsArg; // --key-fields 131 string dataFieldsArg; // --data-fields 132 string appendFieldsArg; // --append-fields 133 134 string keyFieldsOptionString = "k|key-fields"; 135 string dataFieldsOptionString = "d|data-fields"; 136 string appendFieldsOptionString = "a|append-fields"; 137 138 programName = (cmdArgs.length > 0) ? cmdArgs[0].stripExtension.baseName : "Unknown_program_name"; 139 140 /* Handler for --write-all. Special handler so two values can be set. */ 141 void writeAllHandler(string option, string value) 142 { 143 debug stderr.writeln("[writeAllHandler] |", option, "| |", value, "|"); 144 writeAll = true; 145 writeAllValue = value; 146 } 147 148 try 149 { 150 arraySep = ","; // Use comma to separate values in command line options 151 auto r = getopt( 152 cmdArgs, 153 "help-verbose", " Print full help.", &helpVerbose, 154 "help-fields", " Print help on specifying fields.", &helpFields, 155 156 "f|filter-file", "FILE (Required) File with records to use as a filter.", &filterFile, 157 158 keyFieldsOptionString, 159 "<field-list> Fields to use as join key. Default: 0 (entire line).", 160 &keyFieldsArg, 161 162 dataFieldsOptionString, 163 "<field-list> Data stream fields to use as join key, if different than --key-fields.", 164 &dataFieldsArg, 165 166 appendFieldsOptionString, 167 "<field-list> Filter file fields to append to matched data stream records.", 168 &appendFieldsArg, 169 170 std.getopt.config.caseSensitive, 171 "H|header", " Treat the first line of each file as a header.", &hasHeader, 172 std.getopt.config.caseInsensitive, 173 "p|prefix", "STR String to use as a prefix for --append-fields when writing a header line.", &appendHeaderPrefix, 174 "w|write-all", "STR Output all data stream records. STR is the --append-fields value when writing unmatched records.", &writeAllHandler, 175 "e|exclude", " Exclude matching records.", &exclude, 176 "delimiter", "CHR Field delimiter. Default: TAB. (Single byte UTF-8 characters only.)", &delim, 177 "z|allow-duplicate-keys", 178 " Allow duplicate keys with different append values (last entry wins).", &allowDupliateKeys, 179 std.getopt.config.caseSensitive, 180 "V|version", " Print version information and exit.", &versionWanted, 181 std.getopt.config.caseInsensitive, 182 ); 183 184 if (r.helpWanted) 185 { 186 defaultGetoptPrinter(helpText, r.options); 187 return tuple(false, 0); 188 } 189 else if (helpVerbose) 190 { 191 defaultGetoptPrinter(helpTextVerbose, r.options); 192 return tuple(false, 0); 193 } 194 else if (helpFields) 195 { 196 writeln(fieldListHelpText); 197 return tuple(false, 0); 198 } 199 else if (versionWanted) 200 { 201 import tsv_utils.common.tsvutils_version; 202 writeln(tsvutilsVersionNotice("tsv-join")); 203 return tuple(false, 0); 204 } 205 206 /* File arguments. 207 * * --filter-file required, converted to a one-element ByLineSourceRange 208 * * Remaining command line args are input files. 209 */ 210 enforce(filterFile.length != 0, 211 "Required option --f|filter-file was not supplied."); 212 213 enforce(!(filterFile == "-" && cmdArgs.length == 1), 214 "A data file is required when standard input is used for the filter file (--f|filter-file -)."); 215 216 string[] filepaths = (cmdArgs.length > 1) ? cmdArgs[1 .. $] : ["-"]; 217 cmdArgs.length = 1; 218 219 /* Validation and derivations - Do as much validation prior to header line 220 * processing as possible (avoids waiting on stdin). 221 * 222 * Note: In tsv-join, when header processing is on, there is very little 223 * validatation that can be done prior to reading the header line. All the 224 * logic is in the fieldListArgProcessing function. 225 */ 226 227 string[] filterFileHeaderFields; 228 string[] inputSourceHeaderFields; 229 230 /* fieldListArgProcessing encapsulates the field list dependent processing. 231 * It is called prior to reading the header line if headers are not being used, 232 * and after if headers are being used. 233 */ 234 void fieldListArgProcessing() 235 { 236 import std.algorithm : all, each; 237 238 /* field list parsing. */ 239 if (!keyFieldsArg.empty) 240 { 241 keyFields = 242 keyFieldsArg 243 .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero) 244 (hasHeader, filterFileHeaderFields, keyFieldsOptionString) 245 .array; 246 } 247 248 if (!dataFieldsArg.empty) 249 { 250 dataFields = 251 dataFieldsArg 252 .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero) 253 (hasHeader, inputSourceHeaderFields, dataFieldsOptionString) 254 .array; 255 } 256 else if (!keyFieldsArg.empty) 257 { 258 dataFields = 259 keyFieldsArg 260 .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero) 261 (hasHeader, inputSourceHeaderFields, dataFieldsOptionString) 262 .array; 263 } 264 265 if (!appendFieldsArg.empty) 266 { 267 appendFields = 268 appendFieldsArg 269 .parseFieldList!(size_t, No.convertToZeroBasedIndex, Yes.allowFieldNumZero) 270 (hasHeader, filterFileHeaderFields, appendFieldsOptionString) 271 .array; 272 } 273 274 /* Validations */ 275 if (writeAll) 276 { 277 enforce(appendFields.length != 0, 278 "Use --a|append-fields when using --w|write-all."); 279 280 enforce(!(appendFields.length == 1 && appendFields[0] == 0), 281 "Cannot use '--a|append-fields 0' (whole line) when using --w|write-all."); 282 } 283 284 enforce(!(appendFields.length > 0 && exclude), 285 "--e|exclude cannot be used with --a|append-fields."); 286 287 enforce(appendHeaderPrefix.length == 0 || hasHeader, 288 "Use --header when using --p|prefix."); 289 290 enforce(dataFields.length == 0 || keyFields.length == dataFields.length, 291 "Different number of --k|key-fields and --d|data-fields."); 292 293 enforce(keyFields.length != 1 || 294 dataFields.length != 1 || 295 (keyFields[0] == 0 && dataFields[0] == 0) || 296 (keyFields[0] != 0 && dataFields[0] != 0), 297 "If either --k|key-field or --d|data-field is zero both must be zero."); 298 299 enforce((keyFields.length <= 1 || all!(a => a != 0)(keyFields)) && 300 (dataFields.length <= 1 || all!(a => a != 0)(dataFields)) && 301 (appendFields.length <= 1 || all!(a => a != 0)(appendFields)), 302 "Field 0 (whole line) cannot be combined with individual fields (non-zero)."); 303 304 /* Derivations. */ 305 306 // Convert 'full-line' field indexes (index zero) to boolean flags. 307 if (keyFields.length == 0) 308 { 309 assert(dataFields.length == 0); 310 keyIsFullLine = true; 311 dataIsFullLine = true; 312 } 313 else if (keyFields.length == 1 && keyFields[0] == 0) 314 { 315 keyIsFullLine = true; 316 keyFields.popFront; 317 dataIsFullLine = true; 318 319 if (dataFields.length == 1) 320 { 321 assert(dataFields[0] == 0); 322 dataFields.popFront; 323 } 324 } 325 326 if (appendFields.length == 1 && appendFields[0] == 0) 327 { 328 appendFullLine = true; 329 appendFields.popFront; 330 } 331 332 assert(!(keyIsFullLine && keyFields.length > 0)); 333 assert(!(dataIsFullLine && dataFields.length > 0)); 334 assert(!(appendFullLine && appendFields.length > 0)); 335 336 // Switch to zero-based field indexes. 337 keyFields.each!((ref a) => --a); 338 dataFields.each!((ref a) => --a); 339 appendFields.each!((ref a) => --a); 340 341 } // End fieldListArgProcessing() 342 343 344 if (!hasHeader) fieldListArgProcessing(); 345 346 /* 347 * Create the input source ranges for the filter file and data stream files 348 * and perform header line processing. 349 */ 350 351 filterSource = byLineSourceRange([filterFile]); 352 ReadHeader readHeader = hasHeader ? Yes.readHeader : No.readHeader; 353 inputSources = inputSourceRange(filepaths, readHeader); 354 355 if (hasHeader) 356 { 357 if (!filterSource.front.byLine.empty) 358 { 359 throwIfWindowsNewlineOnUnix(filterSource.front.byLine.front, filterSource.front.name, 1); 360 filterFileHeaderFields = filterSource.front.byLine.front.split(delim).to!(string[]); 361 } 362 throwIfWindowsNewlineOnUnix(inputSources.front.header, inputSources.front.name, 1); 363 inputSourceHeaderFields = inputSources.front.header.split(delim).to!(string[]); 364 fieldListArgProcessing(); 365 } 366 } 367 catch (Exception exc) 368 { 369 stderr.writefln("[%s] Error processing command line arguments: %s", programName, exc.msg); 370 return tuple(false, 1); 371 } 372 return tuple(true, 0); 373 } 374 } 375 376 static if (__VERSION__ >= 2085) extern(C) __gshared string[] rt_options = [ "gcopt=cleanup:none" ]; 377 378 /** Main program. 379 */ 380 int main(string[] cmdArgs) 381 { 382 /* When running in DMD code coverage mode, turn on report merging. */ 383 version(D_Coverage) version(DigitalMars) 384 { 385 import core.runtime : dmd_coverSetMerge; 386 dmd_coverSetMerge(true); 387 } 388 389 TsvJoinOptions cmdopt; 390 auto r = cmdopt.processArgs(cmdArgs); 391 if (!r[0]) return r[1]; 392 try tsvJoin(cmdopt); 393 catch (Exception exc) 394 { 395 stderr.writefln("Error [%s]: %s", cmdopt.programName, exc.msg); 396 return 1; 397 } 398 return 0; 399 } 400 401 /** tsvJoin does the primary work of the tsv-join program. 402 */ 403 void tsvJoin(ref TsvJoinOptions cmdopt) 404 { 405 import tsv_utils.common.utils : ByLineSourceRange, bufferedByLine, BufferedOutputRange, 406 isFlushableOutputRange, InputFieldReordering, InputSourceRange, throwIfWindowsNewlineOnUnix; 407 import std.algorithm : splitter; 408 import std.array : join; 409 import std.range; 410 import std.conv : to; 411 412 /* Check that the input files were setup correctly. Should have one filter file as a 413 * ByLineSourceRange. There should be at least one input file as an InputSourceRange. 414 */ 415 assert(cmdopt.filterSource.length == 1); 416 static assert(is(typeof(cmdopt.filterSource) == ByLineSourceRange!(No.keepTerminator))); 417 418 assert(!cmdopt.inputSources.empty); 419 static assert(is(typeof(cmdopt.inputSources) == InputSourceRange)); 420 421 /* State, variables, and convenience derivations. 422 * 423 * Combinations of individual fields and whole line (field zero) are convenient for the 424 * user, but create complexities for the program. Many combinations are disallowed by 425 * command line processing, but the remaining combos still leave several states. Also, 426 * this code optimizes by doing only necessary operations, further complicating state 427 * Here's a guide to variables and state. 428 * - cmdopt.keyFields, cmdopt.dataFields arrays - Individual field indexes used as keys. 429 * Empty if the whole line is used as a key. Must be the same length. 430 * - cmdopt.keyIsFullLine, cmdopt.dataIsFullLine - True when the whole line is used key. 431 * - cmdopt.appendFields array - Indexes of individual filter file fields being appended. 432 * Empty if appending the full line, or if not appending anything. 433 * - cmdopt.appendFullLine - True when the whole line is being appended. 434 * - isAppending - True is something is being appended. 435 * - cmdopt.writeAll - True if all lines are being written 436 */ 437 /* Convenience derivations. */ 438 auto numKeyFields = cmdopt.keyFields.length; 439 auto numAppendFields = cmdopt.appendFields.length; 440 bool isAppending = (cmdopt.appendFullLine || numAppendFields > 0); 441 442 /* Mappings from field indexes in the input lines to collection arrays. */ 443 auto filterKeysReordering = new InputFieldReordering!char(cmdopt.keyFields); 444 auto dataKeysReordering = (cmdopt.dataFields.length == 0) ? 445 filterKeysReordering : new InputFieldReordering!char(cmdopt.dataFields); 446 auto appendFieldsReordering = new InputFieldReordering!char(cmdopt.appendFields); 447 448 /* The master filter hash. The key is the delimited fields concatenated together 449 * (including separators). The value is the appendFields concatenated together, as 450 * they will be appended to the input line. Both the keys and append fields are 451 * assembled in the order specified, though this only required for append fields. 452 */ 453 string[string] filterHash; 454 455 /* The append values for unmatched records. */ 456 char[] appendFieldsUnmatchedValue; 457 458 if (cmdopt.writeAll) 459 { 460 assert(cmdopt.appendFields.length > 0); // Checked in consistencyValidations 461 462 // reserve space for n values and n-1 delimiters 463 appendFieldsUnmatchedValue.reserve(cmdopt.appendFields.length * (cmdopt.writeAllValue.length + 1) - 1); 464 465 appendFieldsUnmatchedValue ~= cmdopt.writeAllValue; 466 for (size_t i = 1; i < cmdopt.appendFields.length; ++i) 467 { 468 appendFieldsUnmatchedValue ~= cmdopt.delim; 469 appendFieldsUnmatchedValue ~= cmdopt.writeAllValue; 470 } 471 } 472 473 /* Buffered output range for the final output. Setup here because the header line 474 * (if any) gets written while reading the filter file. 475 */ 476 auto bufferedOutput = BufferedOutputRange!(typeof(stdout))(stdout); 477 478 /* Read the filter file. */ 479 { 480 bool needPerFieldProcessing = (numKeyFields > 0) || (numAppendFields > 0); 481 auto filterStream = cmdopt.filterSource.front; 482 foreach (lineNum, line; filterStream.byLine.enumerate(1)) 483 { 484 debug writeln("[filter line] |", line, "|"); 485 if (needPerFieldProcessing) 486 { 487 filterKeysReordering.initNewLine; 488 appendFieldsReordering.initNewLine; 489 490 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate) 491 { 492 filterKeysReordering.processNextField(fieldIndex,fieldValue); 493 appendFieldsReordering.processNextField(fieldIndex,fieldValue); 494 495 if (filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled) 496 { 497 break; 498 } 499 } 500 501 // Processed all fields in the line. 502 enforce(filterKeysReordering.allFieldsFilled && appendFieldsReordering.allFieldsFilled, 503 format("Not enough fields in line. File: %s, Line: %s", 504 filterStream.name, lineNum)); 505 } 506 507 string key = cmdopt.keyIsFullLine ? 508 line.to!string : filterKeysReordering.outputFields.join(cmdopt.delim).to!string; 509 string appendValues = cmdopt.appendFullLine ? 510 line.to!string : appendFieldsReordering.outputFields.join(cmdopt.delim).to!string; 511 512 debug writeln(" --> [key]:[append] => [", key, "]:[", appendValues, "]"); 513 514 if (lineNum == 1) throwIfWindowsNewlineOnUnix(line, filterStream.name, lineNum); 515 516 if (lineNum == 1 && cmdopt.hasHeader) 517 { 518 /* When the input has headers, the header line from the first data 519 * file is read during command line argument processing. Output the 520 * header now to push it to the next tool in the unix pipeline. This 521 * enables earlier error detection in downstream tools. 522 * 523 * If the input data is empty there will be no header. 524 */ 525 auto inputStream = cmdopt.inputSources.front; 526 527 if (!inputStream.isHeaderEmpty) 528 { 529 string appendFieldsHeader; 530 531 if (cmdopt.appendHeaderPrefix.length == 0) 532 { 533 appendFieldsHeader = appendValues; 534 } 535 else 536 { 537 foreach (fieldIndex, fieldValue; appendValues.splitter(cmdopt.delim).enumerate) 538 { 539 if (fieldIndex > 0) appendFieldsHeader ~= cmdopt.delim; 540 appendFieldsHeader ~= cmdopt.appendHeaderPrefix; 541 appendFieldsHeader ~= fieldValue; 542 } 543 } 544 545 bufferedOutput.append(inputStream.header); 546 if (isAppending) 547 { 548 bufferedOutput.append(cmdopt.delim); 549 bufferedOutput.append(appendFieldsHeader); 550 } 551 bufferedOutput.appendln; 552 bufferedOutput.flush; 553 } 554 } 555 else 556 { 557 if (isAppending && !cmdopt.allowDupliateKeys) 558 { 559 string* currAppendValues = (key in filterHash); 560 561 enforce(currAppendValues is null || *currAppendValues == appendValues, 562 format("Duplicate keys with different append values (use --z|allow-duplicate-keys to ignore)\n [key 1][values]: [%s][%s]\n [key 2][values]: [%s][%s]", 563 key, *currAppendValues, key, appendValues)); 564 } 565 filterHash[key] = appendValues; 566 } 567 } 568 569 /* popFront here closes the filter file. */ 570 cmdopt.filterSource.popFront; 571 } 572 573 /* Now process each input file, one line at a time. */ 574 575 immutable size_t fileBodyStartLine = cmdopt.hasHeader ? 2 : 1; 576 577 foreach (inputStream; cmdopt.inputSources) 578 { 579 if (cmdopt.hasHeader) throwIfWindowsNewlineOnUnix(inputStream.header, inputStream.name, 1); 580 581 foreach (lineNum, line; inputStream.file.bufferedByLine.enumerate(fileBodyStartLine)) 582 { 583 debug writeln("[input line] |", line, "|"); 584 585 if (lineNum == 1) throwIfWindowsNewlineOnUnix(line, inputStream.name, lineNum); 586 587 /* 588 * Next block checks if the input line matches a hash entry. Two cases: 589 * a) The whole line is the key. Simply look it up in the hash. 590 * b) Individual fields are used as the key - Assemble key and look it up. 591 * 592 * At the end of the appendFields will contain the result of hash lookup. 593 */ 594 string* appendFields; 595 if (cmdopt.keyIsFullLine) 596 { 597 appendFields = (line in filterHash); 598 } 599 else 600 { 601 dataKeysReordering.initNewLine; 602 foreach (fieldIndex, fieldValue; line.splitter(cmdopt.delim).enumerate) 603 { 604 dataKeysReordering.processNextField(fieldIndex, fieldValue); 605 if (dataKeysReordering.allFieldsFilled) break; 606 } 607 // Processed all fields in the line. 608 enforce(dataKeysReordering.allFieldsFilled, 609 format("Not enough fields in line. File: %s, Line: %s", 610 inputStream.name, lineNum)); 611 612 appendFields = (dataKeysReordering.outputFields.join(cmdopt.delim) in filterHash); 613 } 614 615 bool matched = (appendFields !is null); 616 debug writeln(" --> matched? ", matched); 617 if (cmdopt.writeAll || (matched && !cmdopt.exclude) || (!matched && cmdopt.exclude)) 618 { 619 bufferedOutput.append(line); 620 if (isAppending) 621 { 622 bufferedOutput.append(cmdopt.delim); 623 bufferedOutput.append(matched ? *appendFields : appendFieldsUnmatchedValue); 624 } 625 bufferedOutput.appendln(); 626 } 627 } 628 } 629 }