mirror of
https://github.com/johnkerl/miller.git
synced 2026-01-23 10:15:36 +00:00
* Static-check fixes from @lespea #1657, batch 2/n * Static-check fixes from @lespea #1657, batch 3/n * Static-check fixes from @lespea #1657, batch 4/n
381 lines
10 KiB
Go
381 lines
10 KiB
Go
package input
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/johnkerl/miller/v6/pkg/cli"
|
|
"github.com/johnkerl/miller/v6/pkg/lib"
|
|
"github.com/johnkerl/miller/v6/pkg/mlrval"
|
|
"github.com/johnkerl/miller/v6/pkg/types"
|
|
)
|
|
|
|
type iXTABPairSplitter interface {
|
|
Split(input string) (key, value string, err error)
|
|
}
|
|
|
|
type RecordReaderXTAB struct {
|
|
readerOptions *cli.TReaderOptions
|
|
recordsPerBatch int64 // distinct from readerOptions.RecordsPerBatch for join/repl
|
|
pairSplitter iXTABPairSplitter
|
|
|
|
// Note: XTAB uses two consecutive IFS in place of an IRS; IRS is ignored
|
|
}
|
|
|
|
// tStanza is for the channelized reader which operates (for performance) in
|
|
// its own goroutine. An XTAB "stanza" is a collection of lines which will be
|
|
// parsed as a Miller record. Also for performance (to reduce
|
|
// goroutine-scheduler thrash) stanzas are delivered in batches (nominally max
|
|
// 500 or so). This struct helps us keep each stanza's comment lines along with
|
|
// the stanza they originated in.
|
|
type tStanza struct {
|
|
dataLines *list.List
|
|
commentLines *list.List
|
|
}
|
|
|
|
func newStanza() *tStanza {
|
|
return &tStanza{
|
|
dataLines: list.New(),
|
|
commentLines: list.New(),
|
|
}
|
|
}
|
|
|
|
func NewRecordReaderXTAB(
|
|
readerOptions *cli.TReaderOptions,
|
|
recordsPerBatch int64,
|
|
) (*RecordReaderXTAB, error) {
|
|
return &RecordReaderXTAB{
|
|
readerOptions: readerOptions,
|
|
recordsPerBatch: recordsPerBatch,
|
|
pairSplitter: newXTABPairSplitter(readerOptions),
|
|
}, nil
|
|
}
|
|
|
|
func (reader *RecordReaderXTAB) Read(
|
|
filenames []string,
|
|
context types.Context,
|
|
readerChannel chan<- *list.List, // list of *types.RecordAndContext
|
|
errorChannel chan error,
|
|
downstreamDoneChannel <-chan bool, // for mlr head
|
|
) {
|
|
if filenames != nil { // nil for mlr -n
|
|
if len(filenames) == 0 { // read from stdin
|
|
handle, err := lib.OpenStdin(
|
|
reader.readerOptions.Prepipe,
|
|
reader.readerOptions.PrepipeIsRaw,
|
|
reader.readerOptions.FileInputEncoding,
|
|
)
|
|
if err != nil {
|
|
errorChannel <- err
|
|
} else {
|
|
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
|
|
}
|
|
} else {
|
|
for _, filename := range filenames {
|
|
handle, err := lib.OpenFileForRead(
|
|
filename,
|
|
reader.readerOptions.Prepipe,
|
|
reader.readerOptions.PrepipeIsRaw,
|
|
reader.readerOptions.FileInputEncoding,
|
|
)
|
|
if err != nil {
|
|
errorChannel <- err
|
|
} else {
|
|
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
|
|
handle.Close()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
readerChannel <- types.NewEndOfStreamMarkerList(&context)
|
|
}
|
|
|
|
func (reader *RecordReaderXTAB) processHandle(
|
|
handle io.Reader,
|
|
filename string,
|
|
context *types.Context,
|
|
readerChannel chan<- *list.List, // list of *types.RecordAndContext
|
|
errorChannel chan error,
|
|
downstreamDoneChannel <-chan bool, // for mlr head
|
|
) {
|
|
context.UpdateForStartOfFile(filename)
|
|
recordsPerBatch := reader.recordsPerBatch
|
|
|
|
// XTAB uses repeated IFS, rather than IRS, to delimit records
|
|
lineReader := NewLineReader(handle, reader.readerOptions.IFS)
|
|
|
|
stanzasChannel := make(chan *list.List, recordsPerBatch)
|
|
go channelizedStanzaScanner(lineReader, reader.readerOptions, stanzasChannel, downstreamDoneChannel,
|
|
recordsPerBatch)
|
|
|
|
for {
|
|
recordsAndContexts, eof := reader.getRecordBatch(stanzasChannel, context, errorChannel)
|
|
if recordsAndContexts.Len() > 0 {
|
|
readerChannel <- recordsAndContexts
|
|
}
|
|
if eof {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Given input like
|
|
//
|
|
// a 1
|
|
// b 2
|
|
// c 3
|
|
//
|
|
// a 4
|
|
// b 5
|
|
// c 6
|
|
//
|
|
// this function reads the input stream a line at a time, then produces
|
|
// string-lists one per stanza where a stanza is delimited by blank line, or
|
|
// start or end of file. A single stanza, once parsed, will become a single
|
|
// record.
|
|
func channelizedStanzaScanner(
|
|
lineReader ILineReader,
|
|
readerOptions *cli.TReaderOptions,
|
|
stanzasChannel chan<- *list.List, // list of list of string
|
|
downstreamDoneChannel <-chan bool, // for mlr head
|
|
recordsPerBatch int64,
|
|
) {
|
|
numStanzasSeen := int64(0)
|
|
inStanza := false
|
|
done := false
|
|
|
|
stanzas := list.New()
|
|
stanza := newStanza()
|
|
|
|
for {
|
|
line, err := lineReader.Read()
|
|
if err != nil {
|
|
if lib.IsEOF(err) {
|
|
done = true
|
|
break
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "mlr: %#v\n", err)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Check for comments-in-data feature
|
|
// TODO: function-pointer this away
|
|
if readerOptions.CommentHandling != cli.CommentsAreData {
|
|
if strings.HasPrefix(line, readerOptions.CommentString) {
|
|
if readerOptions.CommentHandling == cli.PassComments {
|
|
stanza.commentLines.PushBack(line)
|
|
continue
|
|
} else if readerOptions.CommentHandling == cli.SkipComments {
|
|
continue
|
|
}
|
|
// else comments are data
|
|
}
|
|
}
|
|
|
|
if line == "" {
|
|
// Empty-line handling:
|
|
// 1. First empty line(s) in the stream are ignored.
|
|
// 2. After that, one or more empty lines separate records.
|
|
// 3. At end of file, multiple empty lines are ignored.
|
|
if inStanza {
|
|
inStanza = false
|
|
stanzas.PushBack(stanza)
|
|
numStanzasSeen++
|
|
stanza = newStanza()
|
|
} else {
|
|
continue
|
|
}
|
|
} else {
|
|
if !inStanza {
|
|
inStanza = true
|
|
}
|
|
stanza.dataLines.PushBack(line)
|
|
}
|
|
|
|
// See if downstream processors will be ignoring further data (e.g. mlr
|
|
// head). If so, stop reading. This makes 'mlr head hugefile' exit
|
|
// quickly, as it should.
|
|
if numStanzasSeen%recordsPerBatch == 0 {
|
|
select {
|
|
case <-downstreamDoneChannel:
|
|
done = true
|
|
break
|
|
default:
|
|
break
|
|
}
|
|
if done {
|
|
break
|
|
}
|
|
stanzasChannel <- stanzas
|
|
stanzas = list.New()
|
|
}
|
|
|
|
if done {
|
|
break
|
|
}
|
|
}
|
|
|
|
// The last stanza may not have a trailing newline after it. Any lines in the stanza
|
|
// at this point will form the final record in the stream.
|
|
if stanza.dataLines.Len() > 0 || stanza.commentLines.Len() > 0 {
|
|
stanzas.PushBack(stanza)
|
|
}
|
|
|
|
stanzasChannel <- stanzas
|
|
close(stanzasChannel) // end-of-stream marker
|
|
}
|
|
|
|
// TODO: comment copiously we're trying to handle slow/fast/short/long reads: tail -f, smallfile, bigfile.
|
|
func (reader *RecordReaderXTAB) getRecordBatch(
|
|
stanzasChannel <-chan *list.List,
|
|
context *types.Context,
|
|
errorChannel chan error,
|
|
) (
|
|
recordsAndContexts *list.List,
|
|
eof bool,
|
|
) {
|
|
recordsAndContexts = list.New()
|
|
|
|
stanzas, more := <-stanzasChannel
|
|
if !more {
|
|
return recordsAndContexts, true
|
|
}
|
|
|
|
for e := stanzas.Front(); e != nil; e = e.Next() {
|
|
stanza := e.Value.(*tStanza)
|
|
|
|
if stanza.commentLines.Len() > 0 {
|
|
for f := stanza.commentLines.Front(); f != nil; f = f.Next() {
|
|
line := f.Value.(string)
|
|
recordsAndContexts.PushBack(types.NewOutputString(line+reader.readerOptions.IFS, context))
|
|
}
|
|
}
|
|
|
|
if stanza.dataLines.Len() > 0 {
|
|
record, err := reader.recordFromXTABLines(stanza.dataLines)
|
|
if err != nil {
|
|
errorChannel <- err
|
|
return
|
|
}
|
|
context.UpdateForInputRecord()
|
|
recordsAndContexts.PushBack(types.NewRecordAndContext(record, context))
|
|
}
|
|
}
|
|
|
|
return recordsAndContexts, false
|
|
}
|
|
|
|
func (reader *RecordReaderXTAB) recordFromXTABLines(
|
|
stanza *list.List,
|
|
) (*mlrval.Mlrmap, error) {
|
|
record := mlrval.NewMlrmapAsRecord()
|
|
dedupeFieldNames := reader.readerOptions.DedupeFieldNames
|
|
|
|
for e := stanza.Front(); e != nil; e = e.Next() {
|
|
line := e.Value.(string)
|
|
|
|
key, value, err := reader.pairSplitter.Split(line)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = record.PutReferenceMaybeDedupe(key, mlrval.FromDeferredType(value), dedupeFieldNames)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return record, nil
|
|
}
|
|
|
|
// IPairSplitter splits a string into left and right, e.g. for IPS.
|
|
// This is similar to the general one for multiple formats; the exception
|
|
// is that for XTAB we always allow repeat IPS.
|
|
func newXTABPairSplitter(options *cli.TReaderOptions) iXTABPairSplitter {
|
|
if options.IPSRegex == nil {
|
|
return &tXTABIPSSplitter{ips: options.IPS, ipslen: len(options.IPS)}
|
|
} else {
|
|
return &tXTABIPSRegexSplitter{ipsRegex: options.IPSRegex}
|
|
}
|
|
}
|
|
|
|
type tXTABIPSSplitter struct {
|
|
ips string
|
|
ipslen int
|
|
}
|
|
|
|
// This is a splitter for XTAB lines, like 'abc 123'. It's not quite the same as the
|
|
// field/pair-splitter functions shared by DKVP, NIDX, and CSV-lite. XTAB is the omly format for
|
|
// which we need to produce just a pair of items -- a key and a value -- delimited by one or more
|
|
// IPS. For exaemple, with IPS being a space, in 'abc 123' we need to get key 'abc' and value
|
|
// '123'; for 'abc 123 456' we need key 'abc' and value '123 456'. It's super-elegant to simply
|
|
// regex-split the line like 'kv = lib.RegexCompiledSplitString(reader.readerOptions.IPSRegex, line, 2)' --
|
|
// however, that's 3x slower than the current implementation. It turns out regexes are great
|
|
// but we should use them only when we must, since they are expensive.
|
|
func (s *tXTABIPSSplitter) Split(input string) (key, value string, err error) {
|
|
// Empty string is a length-0 return value.
|
|
n := len(input)
|
|
if n == 0 {
|
|
return "", "", fmt.Errorf("internal coding error in XTAB reader")
|
|
}
|
|
|
|
// ' abc 123' splits as key '', value 'abc 123'.
|
|
if strings.HasPrefix(input, s.ips) {
|
|
keyStart := 0
|
|
for keyStart < n && strings.HasPrefix(input[keyStart:], s.ips) {
|
|
keyStart += s.ipslen
|
|
}
|
|
return "", input[keyStart:n], nil
|
|
}
|
|
|
|
// Find the first IPS, if any. If there isn't any in the input line then there is no value, only key:
|
|
// e.g. the line is 'abc'.
|
|
var keyEnd, valueStart int
|
|
foundIPS := false
|
|
for keyEnd = 1; keyEnd <= n; keyEnd++ {
|
|
if strings.HasPrefix(input[keyEnd:], s.ips) {
|
|
foundIPS = true
|
|
break
|
|
}
|
|
}
|
|
if !foundIPS {
|
|
return input, "", nil
|
|
}
|
|
|
|
// Find the first non-IPS character after last-found IPS, if any. If there isn't any in the input
|
|
// line then there is no value, only key: e.g. the line is 'abc '.
|
|
foundValue := false
|
|
for valueStart = keyEnd + s.ipslen; valueStart <= n; valueStart++ {
|
|
if !strings.HasPrefix(input[valueStart:], s.ips) {
|
|
foundValue = true
|
|
break
|
|
}
|
|
}
|
|
if !foundValue {
|
|
return input[0:keyEnd], "", nil
|
|
}
|
|
|
|
return input[0:keyEnd], input[valueStart:n], nil
|
|
}
|
|
|
|
type tXTABIPSRegexSplitter struct {
|
|
ipsRegex *regexp.Regexp
|
|
}
|
|
|
|
func (s *tXTABIPSRegexSplitter) Split(input string) (key, value string, err error) {
|
|
kv := lib.RegexCompiledSplitString(s.ipsRegex, input, 2)
|
|
if len(kv) == 0 {
|
|
return "", "", fmt.Errorf("internal coding error in XTAB reader")
|
|
} else if len(kv) == 1 {
|
|
return kv[0], "", nil
|
|
} else if len(kv) == 2 {
|
|
return kv[0], kv[1], nil
|
|
} else {
|
|
return "", "", fmt.Errorf("internal coding error in XTAB reader")
|
|
}
|
|
}
|