miller/internal/pkg/input/record_reader.go
Stephen Kitt d536318ed6
Use int64 wherever "64-bit integer" is assumed (#902)
Miller assumes 64-bit integers, but in Go, the int type varies in size
depending on the architecture: 32-bit architectures have int
equivalent to int32. As a result, the supported range of integer
values is greatly reduced on 32-bit architectures compared to what is
suggested by the documentation.

This patch explicitly uses int64 wherever 64-bit integers are
assumed.

Test cases affected by the behaviour of the random generator are
updated to reflect the new values (the existing seed doesn't produce
the same behaviour since the way random values are generated has
changed).

Signed-off-by: Stephen Kitt <steve@sk2.org>
2022-01-27 12:06:25 -05:00

197 lines
4.6 KiB
Go

// This file contains the interface for file-format-specific record-readers, as
// well as a collection of utility functions.
package input
import (
"bufio"
"container/list"
"io"
"regexp"
"strings"
"github.com/johnkerl/miller/internal/pkg/cli"
"github.com/johnkerl/miller/internal/pkg/lib"
"github.com/johnkerl/miller/internal/pkg/types"
)
const CSV_BOM = "\xef\xbb\xbf"
// Since Go is concurrent, the context struct (AWK-like variables such as
// FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the
// channels along with each record. Hence the initial context, which readers
// update on each new file/record, and the channel of types.RecordAndContext
// rather than channel of mlrval.Mlrmap.
type IRecordReader interface {
Read(
filenames []string,
initialContext types.Context,
readerChannel chan<- *list.List, // list of *types.RecordAndContext
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
)
}
// NewLineScanner handles read lines which may be delimited by multi-line separators,
// e.g. "\xe2\x90\x9e" for USV.
func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
scanner := bufio.NewScanner(handle)
// Handled by default scanner.
if irs == "\n" || irs == "\r\n" {
return scanner
}
irsbytes := []byte(irs)
irslen := len(irsbytes)
// Custom splitter
recordSplitter := func(
data []byte,
atEOF bool,
) (
advance int,
token []byte,
err error,
) {
datalen := len(data)
end := datalen - irslen
for i := 0; i <= end; i++ {
if data[i] == irsbytes[0] {
match := true
for j := 1; j < irslen; j++ {
if data[i+j] != irsbytes[j] {
match = false
break
}
}
if match {
return i + irslen, data[:i], nil
}
}
}
if !atEOF {
return 0, nil, nil
}
// There is one final token to be delivered, which may be the empty string.
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
// but does not trigger an error to be returned from Scan itself.
return 0, data, bufio.ErrFinalToken
}
scanner.Split(recordSplitter)
return scanner
}
// TODO: comment copiously
//
// Lines are written to the channel with their trailing newline (or whatever
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
func channelizedLineScanner(
lineScanner *bufio.Scanner,
linesChannel chan<- *list.List,
downstreamDoneChannel <-chan bool, // for mlr head
recordsPerBatch int64,
) {
i := int64(0)
done := false
lines := list.New()
for lineScanner.Scan() {
i++
lines.PushBack(lineScanner.Text())
// See if downstream processors will be ignoring further data (e.g. mlr
// head). If so, stop reading. This makes 'mlr head hugefile' exit
// quickly, as it should.
if i%recordsPerBatch == 0 {
select {
case _ = <-downstreamDoneChannel:
done = true
break
default:
break
}
if done {
break
}
linesChannel <- lines
lines = list.New()
}
if done {
break
}
}
linesChannel <- lines
close(linesChannel) // end-of-stream marker
}
// IPairSplitter splits a string into left and right, e.g. for IPS.
// This helps us reuse code for splitting by IPS string, or IPS regex.
type iPairSplitter interface {
Split(input string) []string
}
func newPairSplitter(options *cli.TReaderOptions) iPairSplitter {
if options.IPSRegex == nil {
return &tIPSSplitter{ips: options.IPS}
} else {
return &tIPSRegexSplitter{ipsRegex: options.IPSRegex}
}
}
type tIPSSplitter struct {
ips string
}
func (s *tIPSSplitter) Split(input string) []string {
return strings.SplitN(input, s.ips, 2)
}
type tIPSRegexSplitter struct {
ipsRegex *regexp.Regexp
}
func (s *tIPSRegexSplitter) Split(input string) []string {
return lib.RegexSplitString(s.ipsRegex, input, 2)
}
// IFieldSplitter splits a string into pieces, e.g. for IFS.
// This helps us reuse code for splitting by IFS string, or IFS regex.
type iFieldSplitter interface {
Split(input string) []string
}
func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter {
if options.IFSRegex == nil {
return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS}
} else {
return &tIFSRegexSplitter{ifsRegex: options.IFSRegex}
}
}
type tIFSSplitter struct {
ifs string
allowRepeatIFS bool
}
func (s *tIFSSplitter) Split(input string) []string {
fields := lib.SplitString(input, s.ifs)
if s.allowRepeatIFS {
fields = lib.StripEmpties(fields) // left/right trim
}
return fields
}
type tIFSRegexSplitter struct {
ifsRegex *regexp.Regexp
}
func (s *tIFSRegexSplitter) Split(input string) []string {
return lib.RegexSplitString(s.ifsRegex, input, -1)
}