mirror of
https://github.com/johnkerl/miller.git
synced 2026-01-23 10:15:36 +00:00
650 lines
23 KiB
Go
650 lines
23 KiB
Go
package transformers
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/johnkerl/miller/v6/pkg/cli"
|
|
"github.com/johnkerl/miller/v6/pkg/input"
|
|
"github.com/johnkerl/miller/v6/pkg/lib"
|
|
"github.com/johnkerl/miller/v6/pkg/mlrval"
|
|
"github.com/johnkerl/miller/v6/pkg/transformers/utils"
|
|
"github.com/johnkerl/miller/v6/pkg/types"
|
|
)
|
|
|
|
// ----------------------------------------------------------------
|
|
const verbNameJoin = "join"
|
|
|
|
var JoinSetup = TransformerSetup{
|
|
Verb: verbNameJoin,
|
|
UsageFunc: transformerJoinUsage,
|
|
ParseCLIFunc: transformerJoinParseCLI,
|
|
IgnoresInput: false,
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
// Most transformers have option-variables as individual locals within the
|
|
// transformerXYZParseCLI function, which are passed as individual arguments to
|
|
// the NewTransformerXYZ function. For join, things are a bit more complex
|
|
// and we bag up the option-variables into this data structure.
|
|
|
|
type tJoinOptions struct {
|
|
leftPrefix string
|
|
rightPrefix string
|
|
|
|
outputJoinFieldNames []string
|
|
leftKeepFieldNames []string
|
|
leftJoinFieldNames []string
|
|
rightJoinFieldNames []string
|
|
|
|
allowUnsortedInput bool
|
|
emitPairables bool
|
|
emitLeftUnpairables bool
|
|
emitRightUnpairables bool
|
|
|
|
leftFileName string
|
|
prepipe string
|
|
prepipeIsRaw bool
|
|
|
|
// These allow the joiner to have its own different format/delimiter for the left-file:
|
|
joinFlagOptions cli.TOptions
|
|
}
|
|
|
|
func newJoinOptions() *tJoinOptions {
|
|
return &tJoinOptions{
|
|
leftPrefix: "",
|
|
rightPrefix: "",
|
|
|
|
outputJoinFieldNames: nil,
|
|
leftKeepFieldNames: nil,
|
|
leftJoinFieldNames: nil,
|
|
rightJoinFieldNames: nil,
|
|
|
|
allowUnsortedInput: true,
|
|
emitPairables: true,
|
|
emitLeftUnpairables: false,
|
|
emitRightUnpairables: false,
|
|
|
|
leftFileName: "",
|
|
prepipe: "",
|
|
prepipeIsRaw: false,
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
func transformerJoinUsage(
|
|
o *os.File,
|
|
) {
|
|
fmt.Fprintf(o, "Usage: %s %s [options]\n", "mlr", verbNameJoin)
|
|
fmt.Fprintf(o, "Joins records from specified left file name with records from all file names\n")
|
|
fmt.Fprintf(o, "at the end of the Miller argument list.\n")
|
|
fmt.Fprintf(o, "Functionality is essentially the same as the system \"join\" command, but for\n")
|
|
fmt.Fprintf(o, "record streams.\n")
|
|
fmt.Fprintf(o, "Options:\n")
|
|
fmt.Fprintf(o, " -f {left file name}\n")
|
|
fmt.Fprintf(o, " -j {a,b,c} Comma-separated join-field names for output\n")
|
|
fmt.Fprintf(o, " -l {a,b,c} Comma-separated join-field names for left input file;\n")
|
|
fmt.Fprintf(o, " defaults to -j values if omitted.\n")
|
|
fmt.Fprintf(o, " -r {a,b,c} Comma-separated join-field names for right input file(s);\n")
|
|
fmt.Fprintf(o, " defaults to -j values if omitted.\n")
|
|
fmt.Fprintf(o, " --lk|--left-keep-field-names {a,b,c} If supplied, this means keep only the specified field\n")
|
|
fmt.Fprintf(o, " names from the left file. Automatically includes the join-field name(s). Helpful\n")
|
|
fmt.Fprintf(o, " for when you only want a limited subset of information from the left file.\n")
|
|
fmt.Fprintf(o, " Tip: you can use --lk \"\": this means the left file becomes solely a row-selector\n")
|
|
fmt.Fprintf(o, " for the input files.\n")
|
|
fmt.Fprintf(o, " --lp {text} Additional prefix for non-join output field names from\n")
|
|
fmt.Fprintf(o, " the left file\n")
|
|
fmt.Fprintf(o, " --rp {text} Additional prefix for non-join output field names from\n")
|
|
fmt.Fprintf(o, " the right file(s)\n")
|
|
fmt.Fprintf(o, " --np Do not emit paired records\n")
|
|
fmt.Fprintf(o, " --ul Emit unpaired records from the left file\n")
|
|
fmt.Fprintf(o, " --ur Emit unpaired records from the right file(s)\n")
|
|
fmt.Fprintf(o, " -s|--sorted-input Require sorted input: records must be sorted\n")
|
|
fmt.Fprintf(o, " lexically by their join-field names, else not all records will\n")
|
|
fmt.Fprintf(o, " be paired. The only likely use case for this is with a left\n")
|
|
fmt.Fprintf(o, " file which is too big to fit into system memory otherwise.\n")
|
|
fmt.Fprintf(o, " -u Enable unsorted input. (This is the default even without -u.)\n")
|
|
fmt.Fprintf(o, " In this case, the entire left file will be loaded into memory.\n")
|
|
fmt.Fprintf(o, " --prepipe {command} As in main input options; see %s --help for details.\n",
|
|
"mlr")
|
|
fmt.Fprintf(o, " If you wish to use a prepipe command for the main input as well\n")
|
|
fmt.Fprintf(o, " as here, it must be specified there as well as here.\n")
|
|
fmt.Fprintf(o, " --prepipex {command} Likewise.\n")
|
|
fmt.Fprintf(o, "File-format options default to those for the right file names on the Miller\n")
|
|
fmt.Fprintf(o, "argument list, but may be overridden for the left file as follows. Please see\n")
|
|
fmt.Fprintf(o, "the main \"%s --help\" for more information on syntax for these arguments:\n", "mlr")
|
|
fmt.Fprintf(o, " -i {one of csv,dkvp,nidx,pprint,xtab}\n")
|
|
fmt.Fprintf(o, " --irs {record-separator character}\n")
|
|
fmt.Fprintf(o, " --ifs {field-separator character}\n")
|
|
fmt.Fprintf(o, " --ips {pair-separator character}\n")
|
|
fmt.Fprintf(o, " --repifs\n")
|
|
fmt.Fprintf(o, " --implicit-csv-header\n")
|
|
fmt.Fprintf(o, " --implicit-tsv-header\n")
|
|
fmt.Fprintf(o, " --no-implicit-csv-header\n")
|
|
fmt.Fprintf(o, " --no-implicit-tsv-header\n")
|
|
fmt.Fprintf(o, "For example, if you have 'mlr --csv ... join -l foo ... ' then the left-file format will\n")
|
|
fmt.Fprintf(o, "be specified CSV as well unless you override with 'mlr --csv ... join --ijson -l foo' etc.\n")
|
|
fmt.Fprintf(o, "Likewise, if you have 'mlr --csv --implicit-csv-header ...' then the join-in file will be\n")
|
|
fmt.Fprintf(o, "expected to be headerless as well unless you put '--no-implicit-csv-header' after 'join'.\n")
|
|
fmt.Fprintf(o, "Please use \"%s --usage-separator-options\" for information on specifying separators.\n",
|
|
"mlr")
|
|
fmt.Fprintf(o, "Please see https://miller.readthedocs.io/en/latest/reference-verbs#join for more information\n")
|
|
fmt.Fprintf(o, "including examples.\n")
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
func transformerJoinParseCLI(
|
|
pargi *int,
|
|
argc int,
|
|
args []string,
|
|
mainOptions *cli.TOptions, // Options for the right-files
|
|
doConstruct bool, // false for first pass of CLI-parse, true for second pass
|
|
) IRecordTransformer {
|
|
|
|
// Skip the verb name from the current spot in the mlr command line
|
|
argi := *pargi
|
|
verb := args[argi]
|
|
argi++
|
|
|
|
// Parse local flags
|
|
opts := newJoinOptions()
|
|
|
|
if mainOptions != nil { // for 'mlr --usage-all-verbs', it's nil
|
|
// TODO: make sure this is a full nested-struct copy.
|
|
opts.joinFlagOptions = *mainOptions // struct copy
|
|
}
|
|
|
|
for argi < argc /* variable increment: 1 or 2 depending on flag */ {
|
|
opt := args[argi]
|
|
if !strings.HasPrefix(opt, "-") {
|
|
break // No more flag options to process
|
|
}
|
|
if args[argi] == "--" {
|
|
break // All transformers must do this so main-flags can follow verb-flags
|
|
}
|
|
argi++
|
|
|
|
if opt == "-h" || opt == "--help" {
|
|
transformerJoinUsage(os.Stdout)
|
|
os.Exit(0)
|
|
|
|
} else if opt == "--prepipe" {
|
|
opts.prepipe = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
|
|
opts.prepipeIsRaw = false
|
|
|
|
} else if opt == "--prepipex" {
|
|
opts.prepipe = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
|
|
opts.prepipeIsRaw = true
|
|
|
|
} else if opt == "-f" {
|
|
opts.leftFileName = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "-j" {
|
|
opts.outputJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "-l" {
|
|
opts.leftJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "--lk" || opt == "--left-keep-field-names" {
|
|
opts.leftKeepFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "-r" {
|
|
opts.rightJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "--lp" {
|
|
opts.leftPrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "--rp" {
|
|
opts.rightPrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
|
|
|
|
} else if opt == "--np" {
|
|
opts.emitPairables = false
|
|
|
|
} else if opt == "--ul" {
|
|
opts.emitLeftUnpairables = true
|
|
|
|
} else if opt == "--ur" {
|
|
opts.emitRightUnpairables = true
|
|
|
|
} else if opt == "-u" {
|
|
opts.allowUnsortedInput = true
|
|
|
|
} else if opt == "--sorted-input" || opt == "-s" {
|
|
opts.allowUnsortedInput = false
|
|
|
|
} else {
|
|
// This is inelegant. For error-proofing we advance argi already in our
|
|
// loop (so individual if-statements don't need to). However,
|
|
// cli.Parse expects it unadvanced.
|
|
largi := argi - 1
|
|
if cli.FLAG_TABLE.Parse(args, argc, &largi, &opts.joinFlagOptions) {
|
|
// This lets mlr main and mlr join have different input formats.
|
|
// Nothing else to handle here.
|
|
argi = largi
|
|
} else {
|
|
transformerJoinUsage(os.Stderr)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
}
|
|
|
|
cli.FinalizeReaderOptions(&opts.joinFlagOptions.ReaderOptions)
|
|
|
|
if opts.leftFileName == "" {
|
|
fmt.Fprintf(os.Stderr, "%s %s: need left file name\n", "mlr", verb)
|
|
transformerJoinUsage(os.Stderr)
|
|
os.Exit(1)
|
|
return nil
|
|
}
|
|
|
|
if !opts.emitPairables && !opts.emitLeftUnpairables && !opts.emitRightUnpairables {
|
|
fmt.Fprintf(os.Stderr, "%s %s: all emit flags are unset; no output is possible.\n",
|
|
"mlr", verb)
|
|
transformerJoinUsage(os.Stderr)
|
|
os.Exit(1)
|
|
return nil
|
|
}
|
|
|
|
if opts.outputJoinFieldNames == nil {
|
|
fmt.Fprintf(os.Stderr, "%s %s: need output field names\n", "mlr", verb)
|
|
transformerJoinUsage(os.Stderr)
|
|
os.Exit(1)
|
|
return nil
|
|
}
|
|
|
|
if opts.leftJoinFieldNames == nil {
|
|
opts.leftJoinFieldNames = opts.outputJoinFieldNames // array copy
|
|
}
|
|
if opts.rightJoinFieldNames == nil {
|
|
opts.rightJoinFieldNames = opts.outputJoinFieldNames // array copy
|
|
}
|
|
|
|
llen := len(opts.leftJoinFieldNames)
|
|
rlen := len(opts.rightJoinFieldNames)
|
|
olen := len(opts.outputJoinFieldNames)
|
|
if llen != rlen || llen != olen {
|
|
fmt.Fprintf(os.Stderr,
|
|
"%s %s: must have equal left,right,output field-name lists; got lengths %d,%d,%d.\n",
|
|
"mlr", verb, llen, rlen, olen)
|
|
os.Exit(1)
|
|
}
|
|
|
|
*pargi = argi
|
|
if !doConstruct { // All transformers must do this for main command-line parsing
|
|
return nil
|
|
}
|
|
|
|
transformer, err := NewTransformerJoin(opts)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
return transformer
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
type TransformerJoin struct {
|
|
opts *tJoinOptions
|
|
|
|
leftFieldNameSet map[string]bool
|
|
rightFieldNameSet map[string]bool
|
|
leftKeepFieldNameSet map[string]bool
|
|
|
|
// For unsorted/half-streaming input
|
|
ingested bool
|
|
leftBucketsByJoinFieldValues *lib.OrderedMap
|
|
leftUnpairableRecordsAndContexts *list.List
|
|
|
|
// For sorted/doubly-streaming input
|
|
joinBucketKeeper *utils.JoinBucketKeeper
|
|
|
|
recordTransformerFunc RecordTransformerFunc
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
func NewTransformerJoin(
|
|
opts *tJoinOptions,
|
|
) (*TransformerJoin, error) {
|
|
|
|
tr := &TransformerJoin{
|
|
opts: opts,
|
|
|
|
leftFieldNameSet: lib.StringListToSet(opts.leftJoinFieldNames),
|
|
rightFieldNameSet: lib.StringListToSet(opts.rightJoinFieldNames),
|
|
leftKeepFieldNameSet: lib.StringListToSet(opts.leftKeepFieldNames),
|
|
|
|
ingested: false,
|
|
leftBucketsByJoinFieldValues: nil,
|
|
leftUnpairableRecordsAndContexts: nil,
|
|
joinBucketKeeper: nil,
|
|
}
|
|
// Suppose left file has "id,foo,bar" and right has "id,baz,quux" and the join field name is
|
|
// "id". If they ask for --lk id,foo we should keep only id,foo from the left file. But if
|
|
// they ask for --lk foo we should keep id *and* foo fromn the left file.
|
|
if tr.leftKeepFieldNameSet != nil {
|
|
for _, name := range opts.leftJoinFieldNames {
|
|
tr.leftKeepFieldNameSet[name] = true
|
|
}
|
|
}
|
|
|
|
if opts.allowUnsortedInput {
|
|
// Half-streaming (default) case: ingest entire left file first.
|
|
|
|
tr.leftUnpairableRecordsAndContexts = list.New()
|
|
tr.leftBucketsByJoinFieldValues = lib.NewOrderedMap()
|
|
tr.recordTransformerFunc = tr.transformHalfStreaming
|
|
|
|
} else {
|
|
// Doubly-streaming (non-default) case: step left/right files forward.
|
|
// Requires both files be sorted on their join keys in order to not
|
|
// miss anything. This lets people do joins that would otherwise take
|
|
// too much RAM.
|
|
|
|
tr.joinBucketKeeper = utils.NewJoinBucketKeeper(
|
|
// opts.prepipe,
|
|
opts.leftFileName,
|
|
&opts.joinFlagOptions.ReaderOptions,
|
|
opts.leftJoinFieldNames,
|
|
tr.leftKeepFieldNameSet,
|
|
)
|
|
|
|
tr.recordTransformerFunc = tr.transformDoublyStreaming
|
|
}
|
|
|
|
return tr, nil
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
|
|
func (tr *TransformerJoin) Transform(
|
|
inrecAndContext *types.RecordAndContext,
|
|
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
|
|
inputDownstreamDoneChannel <-chan bool,
|
|
outputDownstreamDoneChannel chan<- bool,
|
|
) {
|
|
HandleDefaultDownstreamDone(inputDownstreamDoneChannel, outputDownstreamDoneChannel)
|
|
tr.recordTransformerFunc(inrecAndContext, outputRecordsAndContexts,
|
|
inputDownstreamDoneChannel, outputDownstreamDoneChannel)
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
// This is for the half-streaming case. We ingest the entire left file,
|
|
// matching each right record against those.
|
|
func (tr *TransformerJoin) transformHalfStreaming(
|
|
inrecAndContext *types.RecordAndContext,
|
|
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
|
|
inputDownstreamDoneChannel <-chan bool,
|
|
outputDownstreamDoneChannel chan<- bool,
|
|
) {
|
|
// This can't be done in the CLI-parser since it requires information which
|
|
// isn't known until after the CLI-parser is called.
|
|
//
|
|
// TODO: check if this is still true for the Go port, once everything else
|
|
// is done.
|
|
if !tr.ingested { // First call
|
|
tr.ingestLeftFile()
|
|
tr.ingested = true
|
|
}
|
|
|
|
if !inrecAndContext.EndOfStream {
|
|
inrec := inrecAndContext.Record
|
|
groupingKey, hasAllJoinKeys := inrec.GetSelectedValuesJoined(
|
|
tr.opts.rightJoinFieldNames,
|
|
)
|
|
if hasAllJoinKeys {
|
|
iLeftBucket := tr.leftBucketsByJoinFieldValues.Get(groupingKey)
|
|
if iLeftBucket == nil {
|
|
if tr.opts.emitRightUnpairables {
|
|
outputRecordsAndContexts.PushBack(inrecAndContext)
|
|
}
|
|
} else {
|
|
leftBucket := iLeftBucket.(*utils.JoinBucket)
|
|
leftBucket.WasPaired = true
|
|
if tr.opts.emitPairables {
|
|
tr.formAndEmitPairs(
|
|
leftBucket.RecordsAndContexts,
|
|
inrecAndContext,
|
|
outputRecordsAndContexts,
|
|
)
|
|
}
|
|
}
|
|
} else if tr.opts.emitRightUnpairables {
|
|
outputRecordsAndContexts.PushBack(inrecAndContext)
|
|
}
|
|
|
|
} else { // end of record stream
|
|
if tr.opts.emitLeftUnpairables {
|
|
tr.emitLeftUnpairedBuckets(outputRecordsAndContexts)
|
|
tr.emitLeftUnpairables(outputRecordsAndContexts)
|
|
}
|
|
outputRecordsAndContexts.PushBack(inrecAndContext) // emit end-of-stream marker
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
func (tr *TransformerJoin) transformDoublyStreaming(
|
|
rightRecAndContext *types.RecordAndContext,
|
|
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
|
|
inputDownstreamDoneChannel <-chan bool,
|
|
outputDownstreamDoneChannel chan<- bool,
|
|
) {
|
|
keeper := tr.joinBucketKeeper // keystroke-saver
|
|
|
|
if !rightRecAndContext.EndOfStream {
|
|
rightRec := rightRecAndContext.Record
|
|
isPaired := false
|
|
|
|
rightFieldValues, hasAllJoinKeys := rightRec.ReferenceSelectedValues(
|
|
tr.opts.rightJoinFieldNames,
|
|
)
|
|
if hasAllJoinKeys {
|
|
isPaired = keeper.FindJoinBucket(rightFieldValues)
|
|
}
|
|
if tr.opts.emitLeftUnpairables {
|
|
keeper.OutputAndReleaseLeftUnpaireds(outputRecordsAndContexts)
|
|
} else {
|
|
keeper.ReleaseLeftUnpaireds(outputRecordsAndContexts)
|
|
}
|
|
|
|
lefts := keeper.JoinBucket.RecordsAndContexts // keystroke-saver
|
|
|
|
if !isPaired && tr.opts.emitRightUnpairables {
|
|
outputRecordsAndContexts.PushBack(rightRecAndContext)
|
|
}
|
|
|
|
if isPaired && tr.opts.emitPairables && lefts != nil {
|
|
tr.formAndEmitPairs(lefts, rightRecAndContext, outputRecordsAndContexts)
|
|
}
|
|
|
|
} else { // end of record stream
|
|
keeper.FindJoinBucket(nil)
|
|
|
|
if tr.opts.emitLeftUnpairables {
|
|
keeper.OutputAndReleaseLeftUnpaireds(outputRecordsAndContexts)
|
|
}
|
|
|
|
outputRecordsAndContexts.PushBack(rightRecAndContext) // emit end-of-stream marker
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
// This is for the half-streaming case. We ingest the entire left file,
|
|
// matching each right record against those.
|
|
//
|
|
// Note: this logic is very similar to that in stream.go, which is what
|
|
// processes the main/right files.
|
|
|
|
func (tr *TransformerJoin) ingestLeftFile() {
|
|
readerOpts := &tr.opts.joinFlagOptions.ReaderOptions
|
|
|
|
// Instantiate the record-reader
|
|
// TODO: perhaps increase recordsPerBatch, and/or refactor
|
|
recordReader, err := input.Create(readerOpts, 1)
|
|
if recordReader == nil {
|
|
fmt.Fprintf(os.Stderr, "mlr join: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Set the initial context for the left-file.
|
|
//
|
|
// Since Go is concurrent, the context struct needs to be duplicated and
|
|
// passed through the channels along with each record.
|
|
initialContext := types.NewNilContext()
|
|
initialContext.UpdateForStartOfFile(tr.opts.leftFileName)
|
|
|
|
// Set up channels for the record-reader.
|
|
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
|
|
errorChannel := make(chan error, 1)
|
|
downstreamDoneChannel := make(chan bool, 1)
|
|
|
|
// Start the record reader.
|
|
// TODO: prepipe
|
|
leftFileNameArray := [1]string{tr.opts.leftFileName}
|
|
go recordReader.Read(leftFileNameArray[:], *initialContext, readerChannel, errorChannel, downstreamDoneChannel)
|
|
|
|
// Ingest parsed records and bucket them by their join-field values. E.g.
|
|
// if the join-field is "id" then put all records with id=1 in one bucket,
|
|
// all those with id=2 in another bucket, etc. And any records lacking an
|
|
// "id" field go into the unpairable list.
|
|
done := false
|
|
for !done {
|
|
select {
|
|
|
|
case err := <-errorChannel:
|
|
fmt.Fprintln(os.Stderr, "mlr", ": ", err)
|
|
os.Exit(1)
|
|
|
|
case leftrecsAndContexts := <-readerChannel:
|
|
// TODO: temp for batch-reader refactor
|
|
lib.InternalCodingErrorIf(leftrecsAndContexts.Len() != 1)
|
|
leftrecAndContext := leftrecsAndContexts.Front().Value.(*types.RecordAndContext)
|
|
leftrecAndContext.Record = utils.KeepLeftFieldNames(leftrecAndContext.Record, tr.leftKeepFieldNameSet)
|
|
|
|
if leftrecAndContext.EndOfStream {
|
|
done = true
|
|
break // breaks the switch, not the for, in Golang
|
|
}
|
|
leftrec := leftrecAndContext.Record
|
|
if leftrec == nil {
|
|
// E.g. the only payload is OutputString or EndOfStream
|
|
continue
|
|
}
|
|
|
|
groupingKey, leftFieldValues, ok := leftrec.GetSelectedValuesAndJoined(
|
|
tr.opts.leftJoinFieldNames,
|
|
)
|
|
if ok {
|
|
iBucket := tr.leftBucketsByJoinFieldValues.Get(groupingKey)
|
|
if iBucket == nil { // New key-field-value: new bucket and hash-map entry
|
|
bucket := utils.NewJoinBucket(leftFieldValues)
|
|
bucket.RecordsAndContexts.PushBack(leftrecAndContext)
|
|
tr.leftBucketsByJoinFieldValues.Put(groupingKey, bucket)
|
|
} else { // Previously seen key-field-value: append record to bucket
|
|
bucket := iBucket.(*utils.JoinBucket)
|
|
bucket.RecordsAndContexts.PushBack(leftrecAndContext)
|
|
}
|
|
} else {
|
|
tr.leftUnpairableRecordsAndContexts.PushBack(leftrecAndContext)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
// This helper method is used by the half-streaming/unsorted join, as well as
|
|
// the doubly-streaming/sorted join.
|
|
|
|
func (tr *TransformerJoin) formAndEmitPairs(
|
|
leftRecordsAndContexts *list.List,
|
|
rightRecordAndContext *types.RecordAndContext,
|
|
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
|
|
) {
|
|
////fmt.Println("-- pairs start") // VERBOSE
|
|
// Loop over each to-be-paired-with record from the left file.
|
|
for pe := leftRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
|
|
////fmt.Println("-- pairs pe") // VERBOSE
|
|
leftRecordAndContext := pe.Value.(*types.RecordAndContext)
|
|
leftrec := leftRecordAndContext.Record
|
|
rightrec := rightRecordAndContext.Record
|
|
|
|
// Allocate a new output record which is the join of the left and right records.
|
|
outrec := mlrval.NewMlrmapAsRecord()
|
|
|
|
// Add the joined-on fields to the new output record
|
|
n := len(tr.opts.leftJoinFieldNames)
|
|
for i := 0; i < n; i++ {
|
|
// These arrays are already guaranteed same-length by CLI parser
|
|
leftJoinFieldName := tr.opts.leftJoinFieldNames[i]
|
|
outputJoinFieldName := tr.opts.outputJoinFieldNames[i]
|
|
value := leftrec.Get(leftJoinFieldName)
|
|
if value != nil {
|
|
outrec.PutCopy(outputJoinFieldName, value)
|
|
}
|
|
}
|
|
|
|
// Add the left-record fields not already added
|
|
for pl := leftrec.Head; pl != nil; pl = pl.Next {
|
|
_, ok := tr.leftFieldNameSet[pl.Key]
|
|
if !ok {
|
|
key := tr.opts.leftPrefix + pl.Key
|
|
outrec.PutCopy(key, pl.Value)
|
|
}
|
|
}
|
|
|
|
// Add the right-record fields not already added
|
|
for pr := rightrec.Head; pr != nil; pr = pr.Next {
|
|
_, ok := tr.rightFieldNameSet[pr.Key]
|
|
if !ok {
|
|
key := tr.opts.rightPrefix + pr.Key
|
|
outrec.PutCopy(key, pr.Value)
|
|
}
|
|
}
|
|
////fmt.Println("-- pairs outrec") // VERBOSE
|
|
////outrec.Print() // VERBOSE
|
|
|
|
// Clone the right record's context (NR, FILENAME, etc) to use for the new output record
|
|
context := rightRecordAndContext.Context // struct copy
|
|
outrecAndContext := types.NewRecordAndContext(outrec, &context)
|
|
|
|
// Emit the new joined record on the downstream channel
|
|
outputRecordsAndContexts.PushBack(outrecAndContext)
|
|
}
|
|
////fmt.Println("-- pairs end") // VERBOSE
|
|
}
|
|
|
|
// ----------------------------------------------------------------
|
|
// There are two kinds of left non-pair records: (a) those lacking the
|
|
// specified join-keys -- can't possibly pair with anything on the right; (b)
|
|
// those having the join-keys but not matching with a record on the right.
|
|
//
|
|
// Example: join on "id" field. Records lacking an "id" field are in the first
|
|
// category. Now suppose there's a left record with id=0, but there were three
|
|
// right-file records with id-field values 1,2,3. Then the id=0 left records is
|
|
// in the second category.
|
|
|
|
func (tr *TransformerJoin) emitLeftUnpairables(
|
|
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
|
|
) {
|
|
// Loop over each to-be-paired-with record from the left file.
|
|
for pe := tr.leftUnpairableRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
|
|
leftRecordAndContext := pe.Value.(*types.RecordAndContext)
|
|
outputRecordsAndContexts.PushBack(leftRecordAndContext)
|
|
}
|
|
}
|
|
|
|
func (tr *TransformerJoin) emitLeftUnpairedBuckets(
|
|
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
|
|
) {
|
|
for pe := tr.leftBucketsByJoinFieldValues.Head; pe != nil; pe = pe.Next {
|
|
bucket := pe.Value.(*utils.JoinBucket)
|
|
if !bucket.WasPaired {
|
|
for pf := bucket.RecordsAndContexts.Front(); pf != nil; pf = pf.Next() {
|
|
recordAndContext := pf.Value.(*types.RecordAndContext)
|
|
outputRecordsAndContexts.PushBack(recordAndContext)
|
|
}
|
|
}
|
|
}
|
|
}
|