miller/pkg/transformers/join.go
John Kerl 05aa16cfcf
Join docs wrong link (#1695)
* Fix join-docs link in online help

* run `make dev` and commit the artifacts
2024-10-17 09:11:03 -04:00

650 lines
23 KiB
Go

package transformers
import (
"container/list"
"fmt"
"os"
"strings"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/input"
"github.com/johnkerl/miller/v6/pkg/lib"
"github.com/johnkerl/miller/v6/pkg/mlrval"
"github.com/johnkerl/miller/v6/pkg/transformers/utils"
"github.com/johnkerl/miller/v6/pkg/types"
)
// ----------------------------------------------------------------
const verbNameJoin = "join"
var JoinSetup = TransformerSetup{
Verb: verbNameJoin,
UsageFunc: transformerJoinUsage,
ParseCLIFunc: transformerJoinParseCLI,
IgnoresInput: false,
}
// ----------------------------------------------------------------
// Most transformers have option-variables as individual locals within the
// transformerXYZParseCLI function, which are passed as individual arguments to
// the NewTransformerXYZ function. For join, things are a bit more complex
// and we bag up the option-variables into this data structure.
type tJoinOptions struct {
leftPrefix string
rightPrefix string
outputJoinFieldNames []string
leftKeepFieldNames []string
leftJoinFieldNames []string
rightJoinFieldNames []string
allowUnsortedInput bool
emitPairables bool
emitLeftUnpairables bool
emitRightUnpairables bool
leftFileName string
prepipe string
prepipeIsRaw bool
// These allow the joiner to have its own different format/delimiter for the left-file:
joinFlagOptions cli.TOptions
}
func newJoinOptions() *tJoinOptions {
return &tJoinOptions{
leftPrefix: "",
rightPrefix: "",
outputJoinFieldNames: nil,
leftKeepFieldNames: nil,
leftJoinFieldNames: nil,
rightJoinFieldNames: nil,
allowUnsortedInput: true,
emitPairables: true,
emitLeftUnpairables: false,
emitRightUnpairables: false,
leftFileName: "",
prepipe: "",
prepipeIsRaw: false,
}
}
// ----------------------------------------------------------------
func transformerJoinUsage(
o *os.File,
) {
fmt.Fprintf(o, "Usage: %s %s [options]\n", "mlr", verbNameJoin)
fmt.Fprintf(o, "Joins records from specified left file name with records from all file names\n")
fmt.Fprintf(o, "at the end of the Miller argument list.\n")
fmt.Fprintf(o, "Functionality is essentially the same as the system \"join\" command, but for\n")
fmt.Fprintf(o, "record streams.\n")
fmt.Fprintf(o, "Options:\n")
fmt.Fprintf(o, " -f {left file name}\n")
fmt.Fprintf(o, " -j {a,b,c} Comma-separated join-field names for output\n")
fmt.Fprintf(o, " -l {a,b,c} Comma-separated join-field names for left input file;\n")
fmt.Fprintf(o, " defaults to -j values if omitted.\n")
fmt.Fprintf(o, " -r {a,b,c} Comma-separated join-field names for right input file(s);\n")
fmt.Fprintf(o, " defaults to -j values if omitted.\n")
fmt.Fprintf(o, " --lk|--left-keep-field-names {a,b,c} If supplied, this means keep only the specified field\n")
fmt.Fprintf(o, " names from the left file. Automatically includes the join-field name(s). Helpful\n")
fmt.Fprintf(o, " for when you only want a limited subset of information from the left file.\n")
fmt.Fprintf(o, " Tip: you can use --lk \"\": this means the left file becomes solely a row-selector\n")
fmt.Fprintf(o, " for the input files.\n")
fmt.Fprintf(o, " --lp {text} Additional prefix for non-join output field names from\n")
fmt.Fprintf(o, " the left file\n")
fmt.Fprintf(o, " --rp {text} Additional prefix for non-join output field names from\n")
fmt.Fprintf(o, " the right file(s)\n")
fmt.Fprintf(o, " --np Do not emit paired records\n")
fmt.Fprintf(o, " --ul Emit unpaired records from the left file\n")
fmt.Fprintf(o, " --ur Emit unpaired records from the right file(s)\n")
fmt.Fprintf(o, " -s|--sorted-input Require sorted input: records must be sorted\n")
fmt.Fprintf(o, " lexically by their join-field names, else not all records will\n")
fmt.Fprintf(o, " be paired. The only likely use case for this is with a left\n")
fmt.Fprintf(o, " file which is too big to fit into system memory otherwise.\n")
fmt.Fprintf(o, " -u Enable unsorted input. (This is the default even without -u.)\n")
fmt.Fprintf(o, " In this case, the entire left file will be loaded into memory.\n")
fmt.Fprintf(o, " --prepipe {command} As in main input options; see %s --help for details.\n",
"mlr")
fmt.Fprintf(o, " If you wish to use a prepipe command for the main input as well\n")
fmt.Fprintf(o, " as here, it must be specified there as well as here.\n")
fmt.Fprintf(o, " --prepipex {command} Likewise.\n")
fmt.Fprintf(o, "File-format options default to those for the right file names on the Miller\n")
fmt.Fprintf(o, "argument list, but may be overridden for the left file as follows. Please see\n")
fmt.Fprintf(o, "the main \"%s --help\" for more information on syntax for these arguments:\n", "mlr")
fmt.Fprintf(o, " -i {one of csv,dkvp,nidx,pprint,xtab}\n")
fmt.Fprintf(o, " --irs {record-separator character}\n")
fmt.Fprintf(o, " --ifs {field-separator character}\n")
fmt.Fprintf(o, " --ips {pair-separator character}\n")
fmt.Fprintf(o, " --repifs\n")
fmt.Fprintf(o, " --implicit-csv-header\n")
fmt.Fprintf(o, " --implicit-tsv-header\n")
fmt.Fprintf(o, " --no-implicit-csv-header\n")
fmt.Fprintf(o, " --no-implicit-tsv-header\n")
fmt.Fprintf(o, "For example, if you have 'mlr --csv ... join -l foo ... ' then the left-file format will\n")
fmt.Fprintf(o, "be specified CSV as well unless you override with 'mlr --csv ... join --ijson -l foo' etc.\n")
fmt.Fprintf(o, "Likewise, if you have 'mlr --csv --implicit-csv-header ...' then the join-in file will be\n")
fmt.Fprintf(o, "expected to be headerless as well unless you put '--no-implicit-csv-header' after 'join'.\n")
fmt.Fprintf(o, "Please use \"%s --usage-separator-options\" for information on specifying separators.\n",
"mlr")
fmt.Fprintf(o, "Please see https://miller.readthedocs.io/en/latest/reference-verbs#join for more information\n")
fmt.Fprintf(o, "including examples.\n")
}
// ----------------------------------------------------------------
func transformerJoinParseCLI(
pargi *int,
argc int,
args []string,
mainOptions *cli.TOptions, // Options for the right-files
doConstruct bool, // false for first pass of CLI-parse, true for second pass
) IRecordTransformer {
// Skip the verb name from the current spot in the mlr command line
argi := *pargi
verb := args[argi]
argi++
// Parse local flags
opts := newJoinOptions()
if mainOptions != nil { // for 'mlr --usage-all-verbs', it's nil
// TODO: make sure this is a full nested-struct copy.
opts.joinFlagOptions = *mainOptions // struct copy
}
for argi < argc /* variable increment: 1 or 2 depending on flag */ {
opt := args[argi]
if !strings.HasPrefix(opt, "-") {
break // No more flag options to process
}
if args[argi] == "--" {
break // All transformers must do this so main-flags can follow verb-flags
}
argi++
if opt == "-h" || opt == "--help" {
transformerJoinUsage(os.Stdout)
os.Exit(0)
} else if opt == "--prepipe" {
opts.prepipe = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
opts.prepipeIsRaw = false
} else if opt == "--prepipex" {
opts.prepipe = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
opts.prepipeIsRaw = true
} else if opt == "-f" {
opts.leftFileName = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-j" {
opts.outputJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-l" {
opts.leftJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--lk" || opt == "--left-keep-field-names" {
opts.leftKeepFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-r" {
opts.rightJoinFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--lp" {
opts.leftPrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--rp" {
opts.rightPrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--np" {
opts.emitPairables = false
} else if opt == "--ul" {
opts.emitLeftUnpairables = true
} else if opt == "--ur" {
opts.emitRightUnpairables = true
} else if opt == "-u" {
opts.allowUnsortedInput = true
} else if opt == "--sorted-input" || opt == "-s" {
opts.allowUnsortedInput = false
} else {
// This is inelegant. For error-proofing we advance argi already in our
// loop (so individual if-statements don't need to). However,
// cli.Parse expects it unadvanced.
largi := argi - 1
if cli.FLAG_TABLE.Parse(args, argc, &largi, &opts.joinFlagOptions) {
// This lets mlr main and mlr join have different input formats.
// Nothing else to handle here.
argi = largi
} else {
transformerJoinUsage(os.Stderr)
os.Exit(1)
}
}
}
cli.FinalizeReaderOptions(&opts.joinFlagOptions.ReaderOptions)
if opts.leftFileName == "" {
fmt.Fprintf(os.Stderr, "%s %s: need left file name\n", "mlr", verb)
transformerJoinUsage(os.Stderr)
os.Exit(1)
return nil
}
if !opts.emitPairables && !opts.emitLeftUnpairables && !opts.emitRightUnpairables {
fmt.Fprintf(os.Stderr, "%s %s: all emit flags are unset; no output is possible.\n",
"mlr", verb)
transformerJoinUsage(os.Stderr)
os.Exit(1)
return nil
}
if opts.outputJoinFieldNames == nil {
fmt.Fprintf(os.Stderr, "%s %s: need output field names\n", "mlr", verb)
transformerJoinUsage(os.Stderr)
os.Exit(1)
return nil
}
if opts.leftJoinFieldNames == nil {
opts.leftJoinFieldNames = opts.outputJoinFieldNames // array copy
}
if opts.rightJoinFieldNames == nil {
opts.rightJoinFieldNames = opts.outputJoinFieldNames // array copy
}
llen := len(opts.leftJoinFieldNames)
rlen := len(opts.rightJoinFieldNames)
olen := len(opts.outputJoinFieldNames)
if llen != rlen || llen != olen {
fmt.Fprintf(os.Stderr,
"%s %s: must have equal left,right,output field-name lists; got lengths %d,%d,%d.\n",
"mlr", verb, llen, rlen, olen)
os.Exit(1)
}
*pargi = argi
if !doConstruct { // All transformers must do this for main command-line parsing
return nil
}
transformer, err := NewTransformerJoin(opts)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
return transformer
}
// ----------------------------------------------------------------
type TransformerJoin struct {
opts *tJoinOptions
leftFieldNameSet map[string]bool
rightFieldNameSet map[string]bool
leftKeepFieldNameSet map[string]bool
// For unsorted/half-streaming input
ingested bool
leftBucketsByJoinFieldValues *lib.OrderedMap
leftUnpairableRecordsAndContexts *list.List
// For sorted/doubly-streaming input
joinBucketKeeper *utils.JoinBucketKeeper
recordTransformerFunc RecordTransformerFunc
}
// ----------------------------------------------------------------
func NewTransformerJoin(
opts *tJoinOptions,
) (*TransformerJoin, error) {
tr := &TransformerJoin{
opts: opts,
leftFieldNameSet: lib.StringListToSet(opts.leftJoinFieldNames),
rightFieldNameSet: lib.StringListToSet(opts.rightJoinFieldNames),
leftKeepFieldNameSet: lib.StringListToSet(opts.leftKeepFieldNames),
ingested: false,
leftBucketsByJoinFieldValues: nil,
leftUnpairableRecordsAndContexts: nil,
joinBucketKeeper: nil,
}
// Suppose left file has "id,foo,bar" and right has "id,baz,quux" and the join field name is
// "id". If they ask for --lk id,foo we should keep only id,foo from the left file. But if
// they ask for --lk foo we should keep id *and* foo fromn the left file.
if tr.leftKeepFieldNameSet != nil {
for _, name := range opts.leftJoinFieldNames {
tr.leftKeepFieldNameSet[name] = true
}
}
if opts.allowUnsortedInput {
// Half-streaming (default) case: ingest entire left file first.
tr.leftUnpairableRecordsAndContexts = list.New()
tr.leftBucketsByJoinFieldValues = lib.NewOrderedMap()
tr.recordTransformerFunc = tr.transformHalfStreaming
} else {
// Doubly-streaming (non-default) case: step left/right files forward.
// Requires both files be sorted on their join keys in order to not
// miss anything. This lets people do joins that would otherwise take
// too much RAM.
tr.joinBucketKeeper = utils.NewJoinBucketKeeper(
// opts.prepipe,
opts.leftFileName,
&opts.joinFlagOptions.ReaderOptions,
opts.leftJoinFieldNames,
tr.leftKeepFieldNameSet,
)
tr.recordTransformerFunc = tr.transformDoublyStreaming
}
return tr, nil
}
// ----------------------------------------------------------------
func (tr *TransformerJoin) Transform(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
HandleDefaultDownstreamDone(inputDownstreamDoneChannel, outputDownstreamDoneChannel)
tr.recordTransformerFunc(inrecAndContext, outputRecordsAndContexts,
inputDownstreamDoneChannel, outputDownstreamDoneChannel)
}
// ----------------------------------------------------------------
// This is for the half-streaming case. We ingest the entire left file,
// matching each right record against those.
func (tr *TransformerJoin) transformHalfStreaming(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
// This can't be done in the CLI-parser since it requires information which
// isn't known until after the CLI-parser is called.
//
// TODO: check if this is still true for the Go port, once everything else
// is done.
if !tr.ingested { // First call
tr.ingestLeftFile()
tr.ingested = true
}
if !inrecAndContext.EndOfStream {
inrec := inrecAndContext.Record
groupingKey, hasAllJoinKeys := inrec.GetSelectedValuesJoined(
tr.opts.rightJoinFieldNames,
)
if hasAllJoinKeys {
iLeftBucket := tr.leftBucketsByJoinFieldValues.Get(groupingKey)
if iLeftBucket == nil {
if tr.opts.emitRightUnpairables {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
} else {
leftBucket := iLeftBucket.(*utils.JoinBucket)
leftBucket.WasPaired = true
if tr.opts.emitPairables {
tr.formAndEmitPairs(
leftBucket.RecordsAndContexts,
inrecAndContext,
outputRecordsAndContexts,
)
}
}
} else if tr.opts.emitRightUnpairables {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
} else { // end of record stream
if tr.opts.emitLeftUnpairables {
tr.emitLeftUnpairedBuckets(outputRecordsAndContexts)
tr.emitLeftUnpairables(outputRecordsAndContexts)
}
outputRecordsAndContexts.PushBack(inrecAndContext) // emit end-of-stream marker
}
}
// ----------------------------------------------------------------
func (tr *TransformerJoin) transformDoublyStreaming(
rightRecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
keeper := tr.joinBucketKeeper // keystroke-saver
if !rightRecAndContext.EndOfStream {
rightRec := rightRecAndContext.Record
isPaired := false
rightFieldValues, hasAllJoinKeys := rightRec.ReferenceSelectedValues(
tr.opts.rightJoinFieldNames,
)
if hasAllJoinKeys {
isPaired = keeper.FindJoinBucket(rightFieldValues)
}
if tr.opts.emitLeftUnpairables {
keeper.OutputAndReleaseLeftUnpaireds(outputRecordsAndContexts)
} else {
keeper.ReleaseLeftUnpaireds(outputRecordsAndContexts)
}
lefts := keeper.JoinBucket.RecordsAndContexts // keystroke-saver
if !isPaired && tr.opts.emitRightUnpairables {
outputRecordsAndContexts.PushBack(rightRecAndContext)
}
if isPaired && tr.opts.emitPairables && lefts != nil {
tr.formAndEmitPairs(lefts, rightRecAndContext, outputRecordsAndContexts)
}
} else { // end of record stream
keeper.FindJoinBucket(nil)
if tr.opts.emitLeftUnpairables {
keeper.OutputAndReleaseLeftUnpaireds(outputRecordsAndContexts)
}
outputRecordsAndContexts.PushBack(rightRecAndContext) // emit end-of-stream marker
}
}
// ----------------------------------------------------------------
// This is for the half-streaming case. We ingest the entire left file,
// matching each right record against those.
//
// Note: this logic is very similar to that in stream.go, which is what
// processes the main/right files.
func (tr *TransformerJoin) ingestLeftFile() {
readerOpts := &tr.opts.joinFlagOptions.ReaderOptions
// Instantiate the record-reader
// TODO: perhaps increase recordsPerBatch, and/or refactor
recordReader, err := input.Create(readerOpts, 1)
if recordReader == nil {
fmt.Fprintf(os.Stderr, "mlr join: %v\n", err)
os.Exit(1)
}
// Set the initial context for the left-file.
//
// Since Go is concurrent, the context struct needs to be duplicated and
// passed through the channels along with each record.
initialContext := types.NewNilContext()
initialContext.UpdateForStartOfFile(tr.opts.leftFileName)
// Set up channels for the record-reader.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
errorChannel := make(chan error, 1)
downstreamDoneChannel := make(chan bool, 1)
// Start the record reader.
// TODO: prepipe
leftFileNameArray := [1]string{tr.opts.leftFileName}
go recordReader.Read(leftFileNameArray[:], *initialContext, readerChannel, errorChannel, downstreamDoneChannel)
// Ingest parsed records and bucket them by their join-field values. E.g.
// if the join-field is "id" then put all records with id=1 in one bucket,
// all those with id=2 in another bucket, etc. And any records lacking an
// "id" field go into the unpairable list.
done := false
for !done {
select {
case err := <-errorChannel:
fmt.Fprintln(os.Stderr, "mlr", ": ", err)
os.Exit(1)
case leftrecsAndContexts := <-readerChannel:
// TODO: temp for batch-reader refactor
lib.InternalCodingErrorIf(leftrecsAndContexts.Len() != 1)
leftrecAndContext := leftrecsAndContexts.Front().Value.(*types.RecordAndContext)
leftrecAndContext.Record = utils.KeepLeftFieldNames(leftrecAndContext.Record, tr.leftKeepFieldNameSet)
if leftrecAndContext.EndOfStream {
done = true
break // breaks the switch, not the for, in Golang
}
leftrec := leftrecAndContext.Record
if leftrec == nil {
// E.g. the only payload is OutputString or EndOfStream
continue
}
groupingKey, leftFieldValues, ok := leftrec.GetSelectedValuesAndJoined(
tr.opts.leftJoinFieldNames,
)
if ok {
iBucket := tr.leftBucketsByJoinFieldValues.Get(groupingKey)
if iBucket == nil { // New key-field-value: new bucket and hash-map entry
bucket := utils.NewJoinBucket(leftFieldValues)
bucket.RecordsAndContexts.PushBack(leftrecAndContext)
tr.leftBucketsByJoinFieldValues.Put(groupingKey, bucket)
} else { // Previously seen key-field-value: append record to bucket
bucket := iBucket.(*utils.JoinBucket)
bucket.RecordsAndContexts.PushBack(leftrecAndContext)
}
} else {
tr.leftUnpairableRecordsAndContexts.PushBack(leftrecAndContext)
}
}
}
}
// ----------------------------------------------------------------
// This helper method is used by the half-streaming/unsorted join, as well as
// the doubly-streaming/sorted join.
func (tr *TransformerJoin) formAndEmitPairs(
leftRecordsAndContexts *list.List,
rightRecordAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
////fmt.Println("-- pairs start") // VERBOSE
// Loop over each to-be-paired-with record from the left file.
for pe := leftRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
////fmt.Println("-- pairs pe") // VERBOSE
leftRecordAndContext := pe.Value.(*types.RecordAndContext)
leftrec := leftRecordAndContext.Record
rightrec := rightRecordAndContext.Record
// Allocate a new output record which is the join of the left and right records.
outrec := mlrval.NewMlrmapAsRecord()
// Add the joined-on fields to the new output record
n := len(tr.opts.leftJoinFieldNames)
for i := 0; i < n; i++ {
// These arrays are already guaranteed same-length by CLI parser
leftJoinFieldName := tr.opts.leftJoinFieldNames[i]
outputJoinFieldName := tr.opts.outputJoinFieldNames[i]
value := leftrec.Get(leftJoinFieldName)
if value != nil {
outrec.PutCopy(outputJoinFieldName, value)
}
}
// Add the left-record fields not already added
for pl := leftrec.Head; pl != nil; pl = pl.Next {
_, ok := tr.leftFieldNameSet[pl.Key]
if !ok {
key := tr.opts.leftPrefix + pl.Key
outrec.PutCopy(key, pl.Value)
}
}
// Add the right-record fields not already added
for pr := rightrec.Head; pr != nil; pr = pr.Next {
_, ok := tr.rightFieldNameSet[pr.Key]
if !ok {
key := tr.opts.rightPrefix + pr.Key
outrec.PutCopy(key, pr.Value)
}
}
////fmt.Println("-- pairs outrec") // VERBOSE
////outrec.Print() // VERBOSE
// Clone the right record's context (NR, FILENAME, etc) to use for the new output record
context := rightRecordAndContext.Context // struct copy
outrecAndContext := types.NewRecordAndContext(outrec, &context)
// Emit the new joined record on the downstream channel
outputRecordsAndContexts.PushBack(outrecAndContext)
}
////fmt.Println("-- pairs end") // VERBOSE
}
// ----------------------------------------------------------------
// There are two kinds of left non-pair records: (a) those lacking the
// specified join-keys -- can't possibly pair with anything on the right; (b)
// those having the join-keys but not matching with a record on the right.
//
// Example: join on "id" field. Records lacking an "id" field are in the first
// category. Now suppose there's a left record with id=0, but there were three
// right-file records with id-field values 1,2,3. Then the id=0 left records is
// in the second category.
func (tr *TransformerJoin) emitLeftUnpairables(
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
// Loop over each to-be-paired-with record from the left file.
for pe := tr.leftUnpairableRecordsAndContexts.Front(); pe != nil; pe = pe.Next() {
leftRecordAndContext := pe.Value.(*types.RecordAndContext)
outputRecordsAndContexts.PushBack(leftRecordAndContext)
}
}
func (tr *TransformerJoin) emitLeftUnpairedBuckets(
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
for pe := tr.leftBucketsByJoinFieldValues.Head; pe != nil; pe = pe.Next {
bucket := pe.Value.(*utils.JoinBucket)
if !bucket.WasPaired {
for pf := bucket.RecordsAndContexts.Front(); pf != nil; pf = pf.Next() {
recordAndContext := pf.Value.(*types.RecordAndContext)
outputRecordsAndContexts.PushBack(recordAndContext)
}
}
}
}