mirror of
https://github.com/johnkerl/miller.git
synced 2026-01-23 02:14:13 +00:00
Separate out ILineReader abstraction (#1504)
* Split up pkg/input/record_reader.go * new ILineReader/TLineReader
This commit is contained in:
parent
296ff87ae2
commit
57b32c3e9b
10 changed files with 223 additions and 190 deletions
3
pkg/input/constants.go
Normal file
3
pkg/input/constants.go
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
package input
|
||||
|
||||
const CSV_BOM = "\xef\xbb\xbf"
|
||||
126
pkg/input/line_reader.go
Normal file
126
pkg/input/line_reader.go
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
// This file contains the interface for file-format-specific record-readers, as
|
||||
// well as a collection of utility functions.
|
||||
|
||||
package input
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"container/list"
|
||||
"io"
|
||||
)
|
||||
|
||||
type ILineReader interface {
|
||||
Scan() bool
|
||||
Text() string
|
||||
}
|
||||
|
||||
type TLineReader struct {
|
||||
scanner *bufio.Scanner
|
||||
}
|
||||
|
||||
// NewLineReader handles reading lines which may be delimited by multi-line separators,
|
||||
// e.g. "\xe2\x90\x9e" for USV.
|
||||
func NewLineReader(handle io.Reader, irs string) *TLineReader {
|
||||
scanner := bufio.NewScanner(handle)
|
||||
|
||||
if irs == "\n" || irs == "\r\n" {
|
||||
// Handled by default scanner.
|
||||
} else {
|
||||
irsbytes := []byte(irs)
|
||||
irslen := len(irsbytes)
|
||||
|
||||
// Custom splitter
|
||||
recordSplitter := func(
|
||||
data []byte,
|
||||
atEOF bool,
|
||||
) (
|
||||
advance int,
|
||||
token []byte,
|
||||
err error,
|
||||
) {
|
||||
datalen := len(data)
|
||||
end := datalen - irslen
|
||||
for i := 0; i <= end; i++ {
|
||||
if data[i] == irsbytes[0] {
|
||||
match := true
|
||||
for j := 1; j < irslen; j++ {
|
||||
if data[i+j] != irsbytes[j] {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
return i + irslen, data[:i], nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if !atEOF {
|
||||
return 0, nil, nil
|
||||
}
|
||||
// There is one final token to be delivered, which may be the empty string.
|
||||
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
|
||||
// but does not trigger an error to be returned from Scan itself.
|
||||
return 0, data, bufio.ErrFinalToken
|
||||
}
|
||||
|
||||
scanner.Split(recordSplitter)
|
||||
}
|
||||
|
||||
return &TLineReader{
|
||||
scanner: scanner,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *TLineReader) Scan() bool {
|
||||
return r.scanner.Scan()
|
||||
}
|
||||
|
||||
func (r *TLineReader) Text() string {
|
||||
return r.scanner.Text()
|
||||
}
|
||||
|
||||
// TODO: comment copiously
|
||||
//
|
||||
// Lines are written to the channel with their trailing newline (or whatever
|
||||
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
|
||||
func channelizedLineReader(
|
||||
lineReader ILineReader,
|
||||
linesChannel chan<- *list.List,
|
||||
downstreamDoneChannel <-chan bool, // for mlr head
|
||||
recordsPerBatch int64,
|
||||
) {
|
||||
i := int64(0)
|
||||
done := false
|
||||
|
||||
lines := list.New()
|
||||
|
||||
for lineReader.Scan() {
|
||||
i++
|
||||
|
||||
lines.PushBack(lineReader.Text())
|
||||
|
||||
// See if downstream processors will be ignoring further data (e.g. mlr
|
||||
// head). If so, stop reading. This makes 'mlr head hugefile' exit
|
||||
// quickly, as it should.
|
||||
if i%recordsPerBatch == 0 {
|
||||
select {
|
||||
case _ = <-downstreamDoneChannel:
|
||||
done = true
|
||||
break
|
||||
default:
|
||||
break
|
||||
}
|
||||
if done {
|
||||
break
|
||||
}
|
||||
linesChannel <- lines
|
||||
lines = list.New()
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
}
|
||||
linesChannel <- lines
|
||||
close(linesChannel) // end-of-stream marker
|
||||
}
|
||||
|
|
@ -4,19 +4,11 @@
|
|||
package input
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"container/list"
|
||||
"io"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/johnkerl/miller/pkg/cli"
|
||||
"github.com/johnkerl/miller/pkg/lib"
|
||||
"github.com/johnkerl/miller/pkg/types"
|
||||
)
|
||||
|
||||
const CSV_BOM = "\xef\xbb\xbf"
|
||||
|
||||
// Since Go is concurrent, the context struct (AWK-like variables such as
|
||||
// FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the
|
||||
// channels along with each record. Hence the initial context, which readers
|
||||
|
|
@ -32,166 +24,3 @@ type IRecordReader interface {
|
|||
downstreamDoneChannel <-chan bool, // for mlr head
|
||||
)
|
||||
}
|
||||
|
||||
// NewLineScanner handles read lines which may be delimited by multi-line separators,
|
||||
// e.g. "\xe2\x90\x9e" for USV.
|
||||
func NewLineScanner(handle io.Reader, irs string) *bufio.Scanner {
|
||||
scanner := bufio.NewScanner(handle)
|
||||
|
||||
// Handled by default scanner.
|
||||
if irs == "\n" || irs == "\r\n" {
|
||||
return scanner
|
||||
}
|
||||
|
||||
irsbytes := []byte(irs)
|
||||
irslen := len(irsbytes)
|
||||
|
||||
// Custom splitter
|
||||
recordSplitter := func(
|
||||
data []byte,
|
||||
atEOF bool,
|
||||
) (
|
||||
advance int,
|
||||
token []byte,
|
||||
err error,
|
||||
) {
|
||||
datalen := len(data)
|
||||
end := datalen - irslen
|
||||
for i := 0; i <= end; i++ {
|
||||
if data[i] == irsbytes[0] {
|
||||
match := true
|
||||
for j := 1; j < irslen; j++ {
|
||||
if data[i+j] != irsbytes[j] {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
return i + irslen, data[:i], nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if !atEOF {
|
||||
return 0, nil, nil
|
||||
}
|
||||
// There is one final token to be delivered, which may be the empty string.
|
||||
// Returning bufio.ErrFinalToken here tells Scan there are no more tokens after this
|
||||
// but does not trigger an error to be returned from Scan itself.
|
||||
return 0, data, bufio.ErrFinalToken
|
||||
}
|
||||
|
||||
scanner.Split(recordSplitter)
|
||||
|
||||
return scanner
|
||||
}
|
||||
|
||||
// TODO: comment copiously
|
||||
//
|
||||
// Lines are written to the channel with their trailing newline (or whatever
|
||||
// IRS) stripped off. So, callers get "a=1,b=2" rather than "a=1,b=2\n".
|
||||
func channelizedLineScanner(
|
||||
lineScanner *bufio.Scanner,
|
||||
linesChannel chan<- *list.List,
|
||||
downstreamDoneChannel <-chan bool, // for mlr head
|
||||
recordsPerBatch int64,
|
||||
) {
|
||||
i := int64(0)
|
||||
done := false
|
||||
|
||||
lines := list.New()
|
||||
|
||||
for lineScanner.Scan() {
|
||||
i++
|
||||
|
||||
lines.PushBack(lineScanner.Text())
|
||||
|
||||
// See if downstream processors will be ignoring further data (e.g. mlr
|
||||
// head). If so, stop reading. This makes 'mlr head hugefile' exit
|
||||
// quickly, as it should.
|
||||
if i%recordsPerBatch == 0 {
|
||||
select {
|
||||
case _ = <-downstreamDoneChannel:
|
||||
done = true
|
||||
break
|
||||
default:
|
||||
break
|
||||
}
|
||||
if done {
|
||||
break
|
||||
}
|
||||
linesChannel <- lines
|
||||
lines = list.New()
|
||||
}
|
||||
|
||||
if done {
|
||||
break
|
||||
}
|
||||
}
|
||||
linesChannel <- lines
|
||||
close(linesChannel) // end-of-stream marker
|
||||
}
|
||||
|
||||
// IPairSplitter splits a string into left and right, e.g. for IPS.
|
||||
// This helps us reuse code for splitting by IPS string, or IPS regex.
|
||||
type iPairSplitter interface {
|
||||
Split(input string) []string
|
||||
}
|
||||
|
||||
func newPairSplitter(options *cli.TReaderOptions) iPairSplitter {
|
||||
if options.IPSRegex == nil {
|
||||
return &tIPSSplitter{ips: options.IPS}
|
||||
} else {
|
||||
return &tIPSRegexSplitter{ipsRegex: options.IPSRegex}
|
||||
}
|
||||
}
|
||||
|
||||
type tIPSSplitter struct {
|
||||
ips string
|
||||
}
|
||||
|
||||
func (s *tIPSSplitter) Split(input string) []string {
|
||||
return strings.SplitN(input, s.ips, 2)
|
||||
}
|
||||
|
||||
type tIPSRegexSplitter struct {
|
||||
ipsRegex *regexp.Regexp
|
||||
}
|
||||
|
||||
func (s *tIPSRegexSplitter) Split(input string) []string {
|
||||
return lib.RegexCompiledSplitString(s.ipsRegex, input, 2)
|
||||
}
|
||||
|
||||
// IFieldSplitter splits a string into pieces, e.g. for IFS.
|
||||
// This helps us reuse code for splitting by IFS string, or IFS regex.
|
||||
type iFieldSplitter interface {
|
||||
Split(input string) []string
|
||||
}
|
||||
|
||||
func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter {
|
||||
if options.IFSRegex == nil {
|
||||
return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS}
|
||||
} else {
|
||||
return &tIFSRegexSplitter{ifsRegex: options.IFSRegex}
|
||||
}
|
||||
}
|
||||
|
||||
type tIFSSplitter struct {
|
||||
ifs string
|
||||
allowRepeatIFS bool
|
||||
}
|
||||
|
||||
func (s *tIFSSplitter) Split(input string) []string {
|
||||
fields := lib.SplitString(input, s.ifs)
|
||||
if s.allowRepeatIFS {
|
||||
fields = lib.StripEmpties(fields) // left/right trim
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
type tIFSRegexSplitter struct {
|
||||
ifsRegex *regexp.Regexp
|
||||
}
|
||||
|
||||
func (s *tIFSRegexSplitter) Split(input string) []string {
|
||||
return lib.RegexCompiledSplitString(s.ifsRegex, input, -1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -144,9 +144,9 @@ func (reader *RecordReaderCSVLite) processHandle(
|
|||
reader.headerStrings = nil
|
||||
|
||||
recordsPerBatch := reader.recordsPerBatch
|
||||
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
|
||||
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
|
||||
linesChannel := make(chan *list.List, recordsPerBatch)
|
||||
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
|
||||
for {
|
||||
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
|
||||
|
|
|
|||
|
|
@ -101,9 +101,9 @@ func (reader *RecordReaderDKVPNIDX) processHandle(
|
|||
context.UpdateForStartOfFile(filename)
|
||||
recordsPerBatch := reader.recordsPerBatch
|
||||
|
||||
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
|
||||
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
|
||||
linesChannel := make(chan *list.List, recordsPerBatch)
|
||||
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
|
||||
for {
|
||||
recordsAndContexts, eof := reader.getRecordBatch(linesChannel, errorChannel, context)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package input
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"container/list"
|
||||
"fmt"
|
||||
"io"
|
||||
|
|
@ -203,7 +202,7 @@ func (reader *RecordReaderJSON) processHandle(
|
|||
// JSONCommentEnabledReader implements io.Reader to strip comment lines
|
||||
// off of CSV data.
|
||||
type JSONCommentEnabledReader struct {
|
||||
lineScanner *bufio.Scanner
|
||||
lineReader ILineReader
|
||||
readerOptions *cli.TReaderOptions
|
||||
context *types.Context // Needed for channelized stdout-printing logic
|
||||
readerChannel chan<- *list.List // list of *types.RecordAndContext
|
||||
|
|
@ -220,7 +219,7 @@ func NewJSONCommentEnabledReader(
|
|||
readerChannel chan<- *list.List, // list of *types.RecordAndContext
|
||||
) *JSONCommentEnabledReader {
|
||||
return &JSONCommentEnabledReader{
|
||||
lineScanner: bufio.NewScanner(underlying),
|
||||
lineReader: NewLineReader(underlying, "\n"),
|
||||
readerOptions: readerOptions,
|
||||
context: types.NewNilContext(),
|
||||
readerChannel: readerChannel,
|
||||
|
|
@ -237,10 +236,10 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) {
|
|||
// Loop until we can get a non-comment line to pass on, or end of file.
|
||||
for {
|
||||
// EOF
|
||||
if !bsr.lineScanner.Scan() {
|
||||
if !bsr.lineReader.Scan() {
|
||||
return 0, io.EOF
|
||||
}
|
||||
line := bsr.lineScanner.Text()
|
||||
line := bsr.lineReader.Text()
|
||||
|
||||
// Non-comment line
|
||||
if !strings.HasPrefix(line, bsr.readerOptions.CommentString) {
|
||||
|
|
|
|||
|
|
@ -148,9 +148,9 @@ func (reader *RecordReaderPprintBarredOrMarkdown) processHandle(
|
|||
reader.headerStrings = nil
|
||||
|
||||
recordsPerBatch := reader.recordsPerBatch
|
||||
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
|
||||
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
|
||||
linesChannel := make(chan *list.List, recordsPerBatch)
|
||||
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
|
||||
for {
|
||||
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
|
||||
|
|
|
|||
|
|
@ -126,9 +126,9 @@ func (reader *RecordReaderTSV) processHandle(
|
|||
reader.headerStrings = nil
|
||||
|
||||
recordsPerBatch := reader.recordsPerBatch
|
||||
lineScanner := NewLineScanner(handle, reader.readerOptions.IRS)
|
||||
lineReader := NewLineReader(handle, reader.readerOptions.IRS)
|
||||
linesChannel := make(chan *list.List, recordsPerBatch)
|
||||
go channelizedLineScanner(lineScanner, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
go channelizedLineReader(lineReader, linesChannel, downstreamDoneChannel, recordsPerBatch)
|
||||
|
||||
for {
|
||||
recordsAndContexts, eof := reader.recordBatchGetter(reader, linesChannel, filename, context, errorChannel)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package input
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"container/list"
|
||||
"fmt"
|
||||
"io"
|
||||
|
|
@ -105,10 +104,10 @@ func (reader *RecordReaderXTAB) processHandle(
|
|||
recordsPerBatch := reader.recordsPerBatch
|
||||
|
||||
// XTAB uses repeated IFS, rather than IRS, to delimit records
|
||||
lineScanner := NewLineScanner(handle, reader.readerOptions.IFS)
|
||||
lineReader := NewLineReader(handle, reader.readerOptions.IFS)
|
||||
|
||||
stanzasChannel := make(chan *list.List, recordsPerBatch)
|
||||
go channelizedStanzaScanner(lineScanner, reader.readerOptions, stanzasChannel, downstreamDoneChannel,
|
||||
go channelizedStanzaScanner(lineReader, reader.readerOptions, stanzasChannel, downstreamDoneChannel,
|
||||
recordsPerBatch)
|
||||
|
||||
for {
|
||||
|
|
@ -137,7 +136,7 @@ func (reader *RecordReaderXTAB) processHandle(
|
|||
// start or end of file. A single stanza, once parsed, will become a single
|
||||
// record.
|
||||
func channelizedStanzaScanner(
|
||||
lineScanner *bufio.Scanner,
|
||||
lineReader ILineReader,
|
||||
readerOptions *cli.TReaderOptions,
|
||||
stanzasChannel chan<- *list.List, // list of list of string
|
||||
downstreamDoneChannel <-chan bool, // for mlr head
|
||||
|
|
@ -150,8 +149,8 @@ func channelizedStanzaScanner(
|
|||
stanzas := list.New()
|
||||
stanza := newStanza()
|
||||
|
||||
for lineScanner.Scan() {
|
||||
line := lineScanner.Text()
|
||||
for lineReader.Scan() {
|
||||
line := lineReader.Text()
|
||||
|
||||
// Check for comments-in-data feature
|
||||
// TODO: function-pointer this away
|
||||
|
|
|
|||
77
pkg/input/splitters.go
Normal file
77
pkg/input/splitters.go
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
// This file contains the interface for file-format-specific record-readers, as
|
||||
// well as a collection of utility functions.
|
||||
|
||||
package input
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/johnkerl/miller/pkg/cli"
|
||||
"github.com/johnkerl/miller/pkg/lib"
|
||||
)
|
||||
|
||||
// IPairSplitter splits a string into left and right, e.g. for IPS.
|
||||
// This helps us reuse code for splitting by IPS string, or IPS regex.
|
||||
type iPairSplitter interface {
|
||||
Split(input string) []string
|
||||
}
|
||||
|
||||
func newPairSplitter(options *cli.TReaderOptions) iPairSplitter {
|
||||
if options.IPSRegex == nil {
|
||||
return &tIPSSplitter{ips: options.IPS}
|
||||
} else {
|
||||
return &tIPSRegexSplitter{ipsRegex: options.IPSRegex}
|
||||
}
|
||||
}
|
||||
|
||||
type tIPSSplitter struct {
|
||||
ips string
|
||||
}
|
||||
|
||||
func (s *tIPSSplitter) Split(input string) []string {
|
||||
return strings.SplitN(input, s.ips, 2)
|
||||
}
|
||||
|
||||
type tIPSRegexSplitter struct {
|
||||
ipsRegex *regexp.Regexp
|
||||
}
|
||||
|
||||
func (s *tIPSRegexSplitter) Split(input string) []string {
|
||||
return lib.RegexCompiledSplitString(s.ipsRegex, input, 2)
|
||||
}
|
||||
|
||||
// IFieldSplitter splits a string into pieces, e.g. for IFS.
|
||||
// This helps us reuse code for splitting by IFS string, or IFS regex.
|
||||
type iFieldSplitter interface {
|
||||
Split(input string) []string
|
||||
}
|
||||
|
||||
func newFieldSplitter(options *cli.TReaderOptions) iFieldSplitter {
|
||||
if options.IFSRegex == nil {
|
||||
return &tIFSSplitter{ifs: options.IFS, allowRepeatIFS: options.AllowRepeatIFS}
|
||||
} else {
|
||||
return &tIFSRegexSplitter{ifsRegex: options.IFSRegex}
|
||||
}
|
||||
}
|
||||
|
||||
type tIFSSplitter struct {
|
||||
ifs string
|
||||
allowRepeatIFS bool
|
||||
}
|
||||
|
||||
func (s *tIFSSplitter) Split(input string) []string {
|
||||
fields := lib.SplitString(input, s.ifs)
|
||||
if s.allowRepeatIFS {
|
||||
fields = lib.StripEmpties(fields) // left/right trim
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
type tIFSRegexSplitter struct {
|
||||
ifsRegex *regexp.Regexp
|
||||
}
|
||||
|
||||
func (s *tIFSRegexSplitter) Split(input string) []string {
|
||||
return lib.RegexCompiledSplitString(s.ifsRegex, input, -1)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue