miller/pkg/transformers/stats2.go

474 lines
15 KiB
Go

package transformers
import (
"container/list"
"fmt"
"os"
"strings"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/lib"
"github.com/johnkerl/miller/v6/pkg/mlrval"
"github.com/johnkerl/miller/v6/pkg/transformers/utils"
"github.com/johnkerl/miller/v6/pkg/types"
)
// ----------------------------------------------------------------
const verbNameStats2 = "stats2"
// For joining "x" and "y" into "x...y" for map keys. "," is another natural choice but would break
// if we were ever asked to process field names with commas in them.
const stats2KeySeparator = "\001"
var Stats2Setup = TransformerSetup{
Verb: verbNameStats2,
UsageFunc: transformerStats2Usage,
ParseCLIFunc: transformerStats2ParseCLI,
IgnoresInput: false,
}
func transformerStats2Usage(
o *os.File,
) {
argv0 := "mlr"
verb := verbNameStats2
fmt.Fprintf(o, "Usage: %s %s [options]\n", argv0, verb)
fmt.Fprintf(o, "Computes bivariate statistics for one or more given field-name pairs,\n")
fmt.Fprintf(o, "accumulated across the input record stream.\n")
fmt.Fprintf(o, "-a {linreg-ols,corr,...} Names of accumulators: one or more of:\n")
utils.ListStats2Accumulators(o)
fmt.Fprintf(o, "-f {a,b,c,d} Value-field name-pairs on which to compute statistics.\n")
fmt.Fprintf(o, " There must be an even number of names.\n")
fmt.Fprintf(o, "-g {e,f,g} Optional group-by-field names.\n")
fmt.Fprintf(o, "-v Print additional output for linreg-pca.\n")
fmt.Fprintf(o, "-s Print iterative stats. Useful in tail -f contexts, in which\n")
fmt.Fprintf(o, " case please avoid pprint-format output since end of input\n")
fmt.Fprintf(o, " stream will never be seen. Likewise, if input is coming from\n")
fmt.Fprintf(o, " `tail -f`, be sure to use `--records-per-batch 1`.\n")
fmt.Fprintf(o, "--fit Rather than printing regression parameters, applies them to\n")
fmt.Fprintf(o, " the input data to compute new fit fields. All input records are\n")
fmt.Fprintf(o, " held in memory until end of input stream. Has effect only for\n")
fmt.Fprintf(o, " linreg-ols, linreg-pca, and logireg.\n")
fmt.Fprintf(o, "Only one of -s or --fit may be used.\n")
fmt.Fprintf(o, "Example: %s %s -a linreg-pca -f x,y\n", argv0, verb)
fmt.Fprintf(o, "Example: %s %s -a linreg-ols,r2 -f x,y -g size,shape\n", argv0, verb)
fmt.Fprintf(o, "Example: %s %s -a corr -f x,y\n", argv0, verb)
}
// ----------------------------------------------------------------
func transformerStats2ParseCLI(
pargi *int,
argc int,
args []string,
_ *cli.TOptions,
doConstruct bool, // false for first pass of CLI-parse, true for second pass
) IRecordTransformer {
// Skip the verb name from the current spot in the mlr command line
argi := *pargi
verb := args[argi]
argi++
argv0 := "mlr"
var accumulatorNameList []string = nil
var valueFieldNameList []string = nil
groupByFieldNameList := make([]string, 0)
doVerbose := false
doIterativeStats := false
doHoldAndFit := false
for argi < argc /* variable increment: 1 or 2 depending on flag */ {
opt := args[argi]
if !strings.HasPrefix(opt, "-") {
break // No more flag options to process
}
if args[argi] == "--" {
break // All transformers must do this so main-flags can follow verb-flags
}
argi++
if opt == "-h" || opt == "--help" {
transformerStats2Usage(os.Stdout)
os.Exit(0)
} else if opt == "-a" {
accumulatorNameList = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-f" {
valueFieldNameList = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-g" {
groupByFieldNameList = cli.VerbGetStringArrayArgOrDie(verb, opt, args, &argi, argc)
} else if opt == "-v" {
doVerbose = true
} else if opt == "-s" {
doIterativeStats = true
} else if opt == "--fit" {
doHoldAndFit = true
} else if opt == "-S" {
// No-op pass-through for backward compatibility with Miller 5
} else if opt == "-F" {
// The -F flag isn't used for stats2: all arithmetic here is
// floating-point. Yet it is supported for step and stats1 for all
// applicable stats1/step accumulators, so we accept here as well
// for all applicable stats2 accumulators (i.e. none of them).
} else {
transformerStats2Usage(os.Stderr)
os.Exit(1)
}
}
if doIterativeStats && doHoldAndFit {
transformerStats2Usage(os.Stderr)
os.Exit(1)
}
if accumulatorNameList == nil {
fmt.Fprintf(os.Stderr, "%s %s: -a option is required.\n", argv0, verb)
fmt.Fprintf(os.Stderr, "Please see %s %s --help for more information.\n", argv0, verb)
os.Exit(1)
}
if valueFieldNameList == nil {
fmt.Fprintf(os.Stderr, "%s %s: -f option is required.\n", argv0, verb)
fmt.Fprintf(os.Stderr, "Please see %s %s --help for more information.\n", argv0, verb)
os.Exit(1)
}
if len(valueFieldNameList)%2 != 0 {
fmt.Fprintf(os.Stderr, "%s %s: argument to -f must have even number of fields.\n", argv0, verb)
fmt.Fprintf(os.Stderr, "Please see %s %s --help for more information.\n", argv0, verb)
os.Exit(1)
}
*pargi = argi
if !doConstruct { // All transformers must do this for main command-line parsing
return nil
}
transformer, err := NewTransformerStats2(
accumulatorNameList,
valueFieldNameList,
groupByFieldNameList,
doVerbose,
doIterativeStats,
doHoldAndFit,
)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
return transformer
}
// ----------------------------------------------------------------
type TransformerStats2 struct {
// Input:
accumulatorNameList []string
valueFieldNameList []string
groupByFieldNameList []string
doVerbose bool
doIterativeStats bool
doHoldAndFit bool
// State:
accumulatorFactory *utils.Stats2AccumulatorFactory
// Accumulators are indexed by
// groupByFieldName . value1FieldName+sep+value2FieldName . accumulatorName . accumulator object
// This would be
// namedAccumulators map[string]map[string]map[string]IStats2Accumulator
// except we need maps that preserve insertion order.
namedAccumulators *lib.OrderedMap
groupingKeysToGroupByFieldValues *lib.OrderedMap
// For hold-and-fit:
// ordered map from grouping-key to list of RecordAndContext
recordGroups *lib.OrderedMap
}
func NewTransformerStats2(
accumulatorNameList []string,
valueFieldNameList []string,
groupByFieldNameList []string,
doVerbose bool,
doIterativeStats bool,
doHoldAndFit bool,
) (*TransformerStats2, error) {
for _, name := range accumulatorNameList {
if !utils.ValidateStats2AccumulatorName(name) {
return nil, fmt.Errorf(`mlr stats2: accumulator "%s" not found`, name)
}
}
tr := &TransformerStats2{
accumulatorNameList: accumulatorNameList,
valueFieldNameList: valueFieldNameList,
groupByFieldNameList: groupByFieldNameList,
doVerbose: doVerbose,
doIterativeStats: doIterativeStats,
doHoldAndFit: doHoldAndFit,
accumulatorFactory: utils.NewStats2AccumulatorFactory(),
namedAccumulators: lib.NewOrderedMap(),
groupingKeysToGroupByFieldValues: lib.NewOrderedMap(),
recordGroups: lib.NewOrderedMap(),
}
return tr, nil
}
// ================================================================
// Given: accumulate corr,cov on values x,y group by a,b.
// Example input: Example output:
// a b x y a b x_corr x_cov y_corr y_cov
// s t 1 2 s t 2 6 2 8
// u v 3 4 u v 1 3 1 4
// s t 5 6 u w 1 7 1 9
// u w 7 9
//
// Multilevel hashmap structure:
// {
// ["s","t"] : { <--- group-by field names
// ["x","y"] : { <--- value field names
// "corr" : stats2_corr object,
// "cov" : stats2_cov object
// }
// },
// ["u","v"] : {
// ["x","y"] : {
// "corr" : stats2_corr object,
// "cov" : stats2_cov object
// }
// },
// ["u","w"] : {
// ["x","y"] : {
// "corr" : stats2_corr object,
// "cov" : stats2_cov object
// }
// },
// }
//
// In the iterative case, add to the current record its current group's stats fields.
// In the non-iterative case, produce output only at the end of the input stream.
// ================================================================
// ----------------------------------------------------------------
func (tr *TransformerStats2) Transform(
inrecAndContext *types.RecordAndContext,
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
) {
HandleDefaultDownstreamDone(inputDownstreamDoneChannel, outputDownstreamDoneChannel)
if !inrecAndContext.EndOfStream {
tr.ingest(inrecAndContext)
if tr.doIterativeStats {
// The input record is modified in this case, with new fields appended
outputRecordsAndContexts.PushBack(inrecAndContext)
}
// if tr.doHoldAndFit, the input record is held by the ingestor
} else { // end of record stream
if !tr.doIterativeStats { // in the iterative case, already emitted per-record
if tr.doHoldAndFit {
tr.fit(outputRecordsAndContexts)
} else {
tr.emit(outputRecordsAndContexts, &inrecAndContext.Context)
}
}
outputRecordsAndContexts.PushBack(inrecAndContext) // end-of-stream marker
}
}
// ----------------------------------------------------------------
func (tr *TransformerStats2) ingest(
inrecAndContext *types.RecordAndContext,
) {
inrec := inrecAndContext.Record
// E.g. if grouping by "a" and "b", and the current record has a=circle, b=blue,
// then groupingKey is the string "circle,blue".
groupingKey, groupByFieldValues, ok := inrec.GetSelectedValuesAndJoined(tr.groupByFieldNameList)
if !ok {
return
}
tr.groupingKeysToGroupByFieldValues.Put(groupingKey, groupByFieldValues)
groupToValueFields := tr.namedAccumulators.Get(groupingKey)
if groupToValueFields == nil {
groupToValueFields = lib.NewOrderedMap()
tr.namedAccumulators.Put(groupingKey, groupToValueFields)
}
if tr.doHoldAndFit { // Retain the input record in memory, for fitting and delivery at end of stream
groupToRecords := tr.recordGroups.Get(groupingKey)
if groupToRecords == nil {
groupToRecords = list.New()
tr.recordGroups.Put(groupingKey, groupToRecords)
}
groupToRecords.(*list.List).PushBack(inrecAndContext)
}
// for [["x","y"]]
n := len(tr.valueFieldNameList)
for i := 0; i < n; i += 2 {
valueFieldName1 := tr.valueFieldNameList[i]
valueFieldName2 := tr.valueFieldNameList[i+1]
key := valueFieldName1 + stats2KeySeparator + valueFieldName2
valueFieldsToAccumulator := groupToValueFields.(*lib.OrderedMap).Get(key)
if valueFieldsToAccumulator == nil {
valueFieldsToAccumulator = lib.NewOrderedMap()
groupToValueFields.(*lib.OrderedMap).Put(key, valueFieldsToAccumulator)
}
mval1 := inrec.Get(valueFieldName1)
mval2 := inrec.Get(valueFieldName2)
if mval1 == nil || mval2 == nil { // Key absent in current record
continue
}
if mval1.IsVoid() || mval2.IsVoid() { // Key present in current record but with empty value
continue
}
// for ["corr", "cov"]
for _, accumulatorName := range tr.accumulatorNameList {
accumulator := valueFieldsToAccumulator.(*lib.OrderedMap).Get(accumulatorName)
if accumulator == nil {
accumulator = tr.accumulatorFactory.Make(
valueFieldName1,
valueFieldName2,
accumulatorName,
tr.doVerbose,
)
if accumulator == nil {
fmt.Fprintf(os.Stderr, "%s %s: accumulator \"%s\" not found.\n",
"mlr", verbNameStats2, accumulatorName,
)
os.Exit(1)
}
valueFieldsToAccumulator.(*lib.OrderedMap).Put(accumulatorName, accumulator)
}
accumulator.(utils.IStats2Accumulator).Ingest(
mval1.GetNumericToFloatValueOrDie(),
mval2.GetNumericToFloatValueOrDie(),
)
}
if tr.doIterativeStats {
tr.populateRecord(
inrecAndContext.Record,
valueFieldName1,
valueFieldName2,
valueFieldsToAccumulator.(*lib.OrderedMap),
)
}
}
}
// ----------------------------------------------------------------
func (tr *TransformerStats2) emit(
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
context *types.Context,
) {
for pa := tr.namedAccumulators.Head; pa != nil; pa = pa.Next {
outrec := mlrval.NewMlrmapAsRecord()
// Add in a=s,b=t fields:
groupingKey := pa.Key
groupByFieldValues := tr.groupingKeysToGroupByFieldValues.Get(groupingKey).([]*mlrval.Mlrval)
for i, groupByFieldName := range tr.groupByFieldNameList {
outrec.PutReference(groupByFieldName, groupByFieldValues[i].Copy())
}
// Add in fields such as x_y_corr, etc.
groupToValueFields := tr.namedAccumulators.Get(groupingKey).(*lib.OrderedMap)
// For "x","y"
for pc := groupToValueFields.Head; pc != nil; pc = pc.Next {
pairs := strings.Split(pc.Key, stats2KeySeparator)
valueFieldName1 := pairs[0]
valueFieldName2 := pairs[1]
valueFieldsToAccumulator := pc.Value.(*lib.OrderedMap)
tr.populateRecord(outrec, valueFieldName1, valueFieldName2, valueFieldsToAccumulator)
// For "corr", "linreg"
for pd := valueFieldsToAccumulator.Head; pd != nil; pd = pd.Next {
accumulator := pd.Value.(utils.IStats2Accumulator)
accumulator.Populate(valueFieldName1, valueFieldName2, outrec)
}
}
outputRecordsAndContexts.PushBack(types.NewRecordAndContext(outrec, context))
}
}
func (tr *TransformerStats2) populateRecord(
outrec *mlrval.Mlrmap,
valueFieldName1 string,
valueFieldName2 string,
valueFieldsToAccumulator *lib.OrderedMap,
) {
// For "corr", "linreg"
for pe := valueFieldsToAccumulator.Head; pe != nil; pe = pe.Next {
accumulator := pe.Value.(utils.IStats2Accumulator)
accumulator.Populate(valueFieldName1, valueFieldName2, outrec)
}
}
func (tr *TransformerStats2) fit(
outputRecordsAndContexts *list.List, // list of *types.RecordAndContext
) {
for pa := tr.namedAccumulators.Head; pa != nil; pa = pa.Next {
groupingKey := pa.Key
groupToValueFields := pa.Value.(*lib.OrderedMap)
recordsAndContexts := tr.recordGroups.Get(groupingKey).(*list.List)
for recordsAndContexts.Front() != nil {
recordAndContext := recordsAndContexts.Remove(recordsAndContexts.Front()).(*types.RecordAndContext)
record := recordAndContext.Record
// For "x","y"
for pb := groupToValueFields.Head; pb != nil; pb = pb.Next {
pairs := strings.Split(pb.Key, stats2KeySeparator)
valueFieldName1 := pairs[0]
valueFieldName2 := pairs[1]
valueFieldsToAccumulator := pb.Value.(*lib.OrderedMap)
// For "linreg-ols", "logireg"
for pc := valueFieldsToAccumulator.Head; pc != nil; pc = pc.Next {
accumulator := pc.Value.(utils.IStats2Accumulator)
// Note R2, cov, corr, etc have no non-trivial fit-function
mval1 := record.Get(valueFieldName1)
mval2 := record.Get(valueFieldName2)
if mval1 != nil && mval2 != nil {
accumulator.Fit(
mval1.GetNumericToFloatValueOrDie(),
mval2.GetNumericToFloatValueOrDie(),
record,
)
}
}
}
outputRecordsAndContexts.PushBack(recordAndContext)
}
}
}