miller/pkg/transformers/split.go
Adam Lesperance 085e831668
The package version must match the major tag version (#1654)
* Update package version

* Update makefile targets

* Update readme packages

* Remaining old packages via rg/sd
2024-09-20 12:10:11 -04:00

440 lines
13 KiB
Go

package transformers
import (
"container/list"
"fmt"
"net/url"
"os"
"strings"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/mlrval"
"github.com/johnkerl/miller/v6/pkg/output"
"github.com/johnkerl/miller/v6/pkg/types"
)
// ----------------------------------------------------------------
const verbNameSplit = "split"
const splitDefaultOutputFileNamePrefix = "split"
const splitDefaultFileNamePartJoiner = "_"
var SplitSetup = TransformerSetup{
Verb: verbNameSplit,
UsageFunc: transformerSplitUsage,
ParseCLIFunc: transformerSplitParseCLI,
IgnoresInput: false,
}
func transformerSplitUsage(
o *os.File,
) {
fmt.Fprintf(o, "Usage: %s %s [options] {filename}\n", "mlr", verbNameSplit)
fmt.Fprintf(o,
`Options:
-n {n}: Cap file sizes at N records.
-m {m}: Produce M files, round-robining records among them.
-g {a,b,c}: Write separate files with records having distinct values for fields named a,b,c.
Exactly one of -m, -n, or -g must be supplied.
--prefix {p} Specify filename prefix; default "`+splitDefaultOutputFileNamePrefix+`".
--suffix {s} Specify filename suffix; default is from mlr output format, e.g. "csv".
-a Append to existing file(s), if any, rather than overwriting.
-v Send records along to downstream verbs as well as splitting to files.
-e Do NOT URL-escape names of output files.
-j {J} Use string J to join filename parts; default "`+splitDefaultFileNamePartJoiner+`".
-h|--help Show this message.
Any of the output-format command-line flags (see mlr -h). For example, using
mlr --icsv --from myfile.csv split --ojson -n 1000
the input is CSV, but the output files are JSON.
Examples: Suppose myfile.csv has 1,000,000 records.
100 output files, 10,000 records each. First 10,000 records in split_1.csv, next in split_2.csv, etc.
mlr --csv --from myfile.csv split -n 10000
10 output files, 100,000 records each. Records 1,11,21,etc in split_1.csv, records 2,12,22, etc in split_2.csv, etc.
mlr --csv --from myfile.csv split -m 10
Same, but with JSON output.
mlr --csv --from myfile.csv split -m 10 -o json
Same but instead of split_1.csv, split_2.csv, etc. there are test_1.dat, test_2.dat, etc.
mlr --csv --from myfile.csv split -m 10 --prefix test --suffix dat
Same, but written to the /tmp/ directory.
mlr --csv --from myfile.csv split -m 10 --prefix /tmp/test --suffix dat
If the shape field has values triangle and square, then there will be split_triangle.csv and split_square.csv.
mlr --csv --from myfile.csv split -g shape
If the color field has values yellow and green, and the shape field has values triangle and square,
then there will be split_yellow_triangle.csv, split_yellow_square.csv, etc.
mlr --csv --from myfile.csv split -g color,shape
See also the "tee" DSL function which lets you do more ad-hoc customization.
`)
}
func transformerSplitParseCLI(
pargi *int,
argc int,
args []string,
mainOptions *cli.TOptions,
doConstruct bool, // false for first pass of CLI-parse, true for second pass
) IRecordTransformer {
// Skip the verb name from the current spot in the mlr command line
argi := *pargi
verb := args[argi]
argi++
var n int64 = 0
var doMod bool = false
var doSize bool = false
var groupByFieldNames []string = nil
var emitDownstream bool = false
var escapeFileNameCharacters bool = true
var fileNamePartJoiner string = splitDefaultFileNamePartJoiner
var doAppend bool = false
var outputFileNamePrefix string = splitDefaultOutputFileNamePrefix
var outputFileNameSuffix string = "uninit"
haveOutputFileNameSuffix := false
var localOptions *cli.TOptions = nil
if mainOptions != nil {
copyThereof := *mainOptions // struct copy
localOptions = &copyThereof
}
// Parse local flags.
for argi < argc /* variable increment: 1 or 2 depending on flag */ {
opt := args[argi]
if !strings.HasPrefix(opt, "-") {
break // No more flag options to process
}
if args[argi] == "--" {
break // All transformers must do this so main-flags can follow verb-flags
}
argi++
if opt == "-h" || opt == "--help" {
transformerSplitUsage(os.Stdout)
os.Exit(0)
} else if opt == "-n" {
n = cli.VerbGetIntArgOrDie(verb, opt, args, &argi, argc)
doSize = true
} else if opt == "-m" {
n = cli.VerbGetIntArgOrDie(verb, opt, args, &argi, argc)
doMod = true
} else if opt == "-g" {
groupByFieldNames = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--prefix" {
outputFileNamePrefix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "--suffix" {
outputFileNameSuffix = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
haveOutputFileNameSuffix = true
} else if opt == "-a" {
doAppend = true
} else if opt == "-v" {
emitDownstream = true
} else if opt == "-e" {
escapeFileNameCharacters = false
} else if opt == "-j" {
fileNamePartJoiner = cli.VerbGetStringArgOrDie(verb, opt, args, &argi, argc)
} else {
// This is inelegant. For error-proofing we advance argi already in our
// loop (so individual if-statements don't need to). However,
// ParseWriterOptions expects it unadvanced.
largi := argi - 1
if cli.FLAG_TABLE.Parse(args, argc, &largi, localOptions) {
// This lets mlr main and mlr split have different output formats.
// Nothing else to handle here.
argi = largi
} else {
transformerSplitUsage(os.Stderr)
os.Exit(1)
}
}
}
doGroup := groupByFieldNames != nil
if !doMod && !doSize && !doGroup {
fmt.Fprintf(os.Stderr, "mlr %s: At least one of -m, -n, or -g is required.\n", verb)
os.Exit(1)
}
if (doMod && doSize) || (doMod && doGroup) || (doSize && doGroup) {
fmt.Fprintf(os.Stderr, "mlr %s: Only one of -m, -n, or -g is required.\n", verb)
os.Exit(1)
}
cli.FinalizeWriterOptions(&localOptions.WriterOptions)
if !haveOutputFileNameSuffix {
outputFileNameSuffix = localOptions.WriterOptions.OutputFileFormat
}
*pargi = argi
if !doConstruct { // All transformers must do this for main command-line parsing
return nil
}
transformer, err := NewTransformerSplit(
n,
doMod,
doSize,
groupByFieldNames,
emitDownstream,
escapeFileNameCharacters,
fileNamePartJoiner,
doAppend,
outputFileNamePrefix,
outputFileNameSuffix,
&localOptions.WriterOptions,
)
if err != nil {
// Error message already printed out
os.Exit(1)
}
return transformer
}
// ----------------------------------------------------------------
type TransformerSplit struct {
n int64
outputFileNamePrefix string
outputFileNameSuffix string
emitDownstream bool
escapeFileNameCharacters bool
fileNamePartJoiner string
ungroupedCounter int64
groupByFieldNames []string
recordWriterOptions *cli.TWriterOptions
doAppend bool
// For doSize ungrouped: only one file open at a time
outputHandler output.OutputHandler
previousQuotient int64
// For all other cases: multiple files open at a time
outputHandlerManager output.OutputHandlerManager
recordTransformerFunc RecordTransformerFunc
}
func NewTransformerSplit(
n int64,
doMod bool,
doSize bool,
groupByFieldNames []string,
emitDownstream bool,
escapeFileNameCharacters bool,
fileNamePartJoiner string,
doAppend bool,
outputFileNamePrefix string,
outputFileNameSuffix string,
recordWriterOptions *cli.TWriterOptions,
) (*TransformerSplit, error) {
tr := &TransformerSplit{
n: n,
outputFileNamePrefix: outputFileNamePrefix,
outputFileNameSuffix: outputFileNameSuffix,
emitDownstream: emitDownstream,
escapeFileNameCharacters: escapeFileNameCharacters,
fileNamePartJoiner: fileNamePartJoiner,
ungroupedCounter: 0,
groupByFieldNames: groupByFieldNames,
recordWriterOptions: recordWriterOptions,
doAppend: doAppend,
outputHandler: nil,
previousQuotient: -1,
}
tr.outputHandlerManager = output.NewFileOutputHandlerManager(recordWriterOptions, doAppend)
if groupByFieldNames != nil {
tr.recordTransformerFunc = tr.splitGrouped
} else if doMod {
tr.recordTransformerFunc = tr.splitModUngrouped
} else {
tr.recordTransformerFunc = tr.splitSizeUngrouped
}
return tr, nil
}
func (tr *TransformerSplit) Transform(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
HandleDefaultDownstreamDone(inputDownstreamDoneChannel, outputDownstreamDoneChannel)
tr.recordTransformerFunc(inrecAndContext, outputRecordsAndContexts, inputDownstreamDoneChannel,
outputDownstreamDoneChannel)
}
func (tr *TransformerSplit) splitModUngrouped(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
if !inrecAndContext.EndOfStream {
remainder := 1 + (tr.ungroupedCounter % tr.n)
filename := tr.makeUngroupedOutputFileName(remainder)
err := tr.outputHandlerManager.WriteRecordAndContext(inrecAndContext, filename)
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: file-write error: %v\n", err)
os.Exit(1)
}
if tr.emitDownstream {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
tr.ungroupedCounter++
} else {
outputRecordsAndContexts.PushBack(inrecAndContext) // end-of-stream marker
errs := tr.outputHandlerManager.Close()
if len(errs) > 0 {
for _, err := range errs {
fmt.Fprintf(os.Stderr, "mlr: file-close error: %v\n", err)
}
os.Exit(1)
}
}
}
func (tr *TransformerSplit) splitSizeUngrouped(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
var err error
if !inrecAndContext.EndOfStream {
quotient := 1 + (tr.ungroupedCounter / tr.n)
if quotient != tr.previousQuotient {
if tr.outputHandler != nil {
err = tr.outputHandler.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: file-close error: %v\n", err)
os.Exit(1)
}
}
filename := tr.makeUngroupedOutputFileName(quotient)
tr.outputHandler, err = output.NewFileOutputHandler(
filename,
tr.recordWriterOptions,
tr.doAppend,
)
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: file-open error: %v\n", err)
os.Exit(1)
}
tr.previousQuotient = quotient
}
err = tr.outputHandler.WriteRecordAndContext(inrecAndContext)
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: file-write error: %v\n", err)
os.Exit(1)
}
if tr.emitDownstream {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
tr.ungroupedCounter++
} else {
outputRecordsAndContexts.PushBack(inrecAndContext) // end-of-stream marker
if tr.outputHandler != nil {
err := tr.outputHandler.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: file-close error: %v\n", err)
os.Exit(1)
}
}
}
}
func (tr *TransformerSplit) splitGrouped(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
if !inrecAndContext.EndOfStream {
var filename string
groupByFieldValues, ok := inrecAndContext.Record.GetSelectedValues(tr.groupByFieldNames)
if !ok {
filename = fmt.Sprintf("%s_ungrouped.%s", tr.outputFileNamePrefix, tr.outputFileNameSuffix)
} else {
filename = tr.makeGroupedOutputFileName(groupByFieldValues)
}
err := tr.outputHandlerManager.WriteRecordAndContext(inrecAndContext, filename)
if err != nil {
fmt.Fprintf(os.Stderr, "mlr: %v\n", err)
os.Exit(1)
}
if tr.emitDownstream {
outputRecordsAndContexts.PushBack(inrecAndContext)
}
} else {
outputRecordsAndContexts.PushBack(inrecAndContext) // emit end-of-stream marker
errs := tr.outputHandlerManager.Close()
if len(errs) > 0 {
for _, err := range errs {
fmt.Fprintf(os.Stderr, "mlr: file-close error: %v\n", err)
}
os.Exit(1)
}
}
}
// makeUngroupedOutputFileName example: "split_53.csv"
func (tr *TransformerSplit) makeUngroupedOutputFileName(k int64) string {
return fmt.Sprintf("%s_%d.%s", tr.outputFileNamePrefix, k, tr.outputFileNameSuffix)
}
// makeGroupedOutputFileName example: "split_orange.csv"
func (tr *TransformerSplit) makeGroupedOutputFileName(
groupByFieldValues []*mlrval.Mlrval,
) string {
var fileNameParts []string
for _, groupByFieldValue := range groupByFieldValues {
fileNameParts = append(fileNameParts, groupByFieldValue.String())
}
fileName := strings.Join(fileNameParts, tr.fileNamePartJoiner)
if tr.escapeFileNameCharacters {
fileName = url.QueryEscape(fileName)
}
if tr.outputFileNamePrefix != "" {
fileName = tr.outputFileNamePrefix + tr.fileNamePartJoiner + fileName
}
return fileName + "." + tr.outputFileNameSuffix
}