Rename inputChannel,outputChannel to readerChannel,writerChannel (#772)

This commit is contained in:
John Kerl 2021-12-07 08:02:30 -05:00
parent 415b55e6f9
commit 4ddb8ff25c
18 changed files with 180 additions and 105 deletions

View file

@ -65,6 +65,7 @@ dev:
# Keystroke-savers
it: build check
so: install
sure: build check
# Please see comments in ./create-release-tarball as well as
# https://miller.readthedocs.io/en/latest/build/#creating-a-new-release-for-developers

View file

@ -108,11 +108,11 @@ func NewRepl(
doWarnings: doWarnings,
cstRootNode: cstRootNode,
options: options,
inputChannel: nil,
errorChannel: nil,
recordReader: recordReader,
recordWriter: recordWriter,
options: options,
readerChannel: nil,
errorChannel: nil,
recordReader: recordReader,
recordWriter: recordWriter,
runtimeState: runtimeState,
sysToSignalHandlerChannel: sysToSignalHandlerChannel,

View file

@ -46,7 +46,7 @@ type Repl struct {
options *cli.TOptions
inputChannel chan *types.RecordAndContext
readerChannel chan *types.RecordAndContext
errorChannel chan error
downstreamDoneChannel chan bool
recordReader input.IRecordReader

View file

@ -218,14 +218,14 @@ func (repl *Repl) openFiles(filenames []string) {
// Remember for :reopen
repl.options.FileNames = filenames
repl.inputChannel = make(chan *types.RecordAndContext, 10)
repl.readerChannel = make(chan *types.RecordAndContext, 10)
repl.errorChannel = make(chan error, 1)
repl.downstreamDoneChannel = make(chan bool, 1)
go repl.recordReader.Read(
filenames,
*repl.runtimeState.Context,
repl.inputChannel,
repl.readerChannel,
repl.errorChannel,
repl.downstreamDoneChannel,
)
@ -265,7 +265,7 @@ func handleRead(repl *Repl, args []string) bool {
if len(args) != 0 {
return false
}
if repl.inputChannel == nil {
if repl.readerChannel == nil {
fmt.Println("No open files")
return true
}
@ -274,7 +274,7 @@ func handleRead(repl *Repl, args []string) bool {
var err error = nil
select {
case recordAndContext = <-repl.inputChannel:
case recordAndContext = <-repl.readerChannel:
break
case err = <-repl.errorChannel:
break
@ -282,7 +282,7 @@ func handleRead(repl *Repl, args []string) bool {
if err != nil {
fmt.Println(err)
repl.inputChannel = nil
repl.readerChannel = nil
repl.errorChannel = nil
return true
}
@ -330,7 +330,7 @@ func usageSkip(repl *Repl) {
}
func handleSkip(repl *Repl, args []string) bool {
if repl.inputChannel == nil {
if repl.readerChannel == nil {
fmt.Println("No open files")
return true
}
@ -379,7 +379,7 @@ func usageProcess(repl *Repl) {
}
func handleProcess(repl *Repl, args []string) bool {
if repl.inputChannel == nil {
if repl.readerChannel == nil {
fmt.Println("No open files")
return true
}
@ -419,7 +419,7 @@ func handleSkipOrProcessN(repl *Repl, n int, processingNotSkipping bool) {
for i := 1; i <= n; i++ {
select {
case recordAndContext = <-repl.inputChannel:
case recordAndContext = <-repl.readerChannel:
break
case err = <-repl.errorChannel:
break
@ -429,7 +429,7 @@ func handleSkipOrProcessN(repl *Repl, n int, processingNotSkipping bool) {
if err != nil {
fmt.Println(err)
repl.inputChannel = nil
repl.readerChannel = nil
repl.errorChannel = nil
return
}
@ -477,7 +477,7 @@ func handleSkipOrProcessUntil(repl *Repl, dslString string, processingNotSkippin
for {
doubleBreak := false
select {
case recordAndContext = <-repl.inputChannel:
case recordAndContext = <-repl.readerChannel:
break
case err = <-repl.errorChannel:
break
@ -491,7 +491,7 @@ func handleSkipOrProcessUntil(repl *Repl, dslString string, processingNotSkippin
if err != nil {
fmt.Println(err)
repl.inputChannel = nil
repl.readerChannel = nil
repl.errorChannel = nil
return
}
@ -537,7 +537,7 @@ func skipOrProcessRecord(
// End-of-stream marker
if recordAndContext.EndOfStream == true {
fmt.Println("End of record stream")
repl.inputChannel = nil
repl.readerChannel = nil
repl.errorChannel = nil
return true
}

View file

@ -21,17 +21,17 @@ func NewPseudoReaderGen(readerOptions *cli.TReaderOptions) (*PseudoReaderGen, er
func (reader *PseudoReaderGen) Read(
filenames []string, // ignored
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
reader.process(&context, inputChannel, errorChannel, downstreamDoneChannel)
inputChannel <- types.NewEndOfStreamMarker(&context)
reader.process(&context, readerChannel, errorChannel, downstreamDoneChannel)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
func (reader *PseudoReaderGen) process(
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -91,7 +91,7 @@ func (reader *PseudoReaderGen) process(
record.PutCopy(key, value)
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)

View file

@ -19,7 +19,7 @@ type IRecordReader interface {
Read(
filenames []string,
initialContext types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
)

View file

@ -38,7 +38,7 @@ func NewRecordReaderCSV(readerOptions *cli.TReaderOptions) (*RecordReaderCSV, er
func (reader *RecordReaderCSV) Read(
filenames []string,
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -52,7 +52,7 @@ func (reader *RecordReaderCSV) Read(
if err != nil {
errorChannel <- err
}
reader.processHandle(handle, "(stdin)", &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
} else {
for _, filename := range filenames {
handle, err := lib.OpenFileForRead(
@ -64,13 +64,13 @@ func (reader *RecordReaderCSV) Read(
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, filename, &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
handle.Close()
}
}
}
}
inputChannel <- types.NewEndOfStreamMarker(&context)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
// ----------------------------------------------------------------
@ -78,7 +78,7 @@ func (reader *RecordReaderCSV) processHandle(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -120,7 +120,7 @@ func (reader *RecordReaderCSV) processHandle(
return
}
isData := reader.maybeConsumeComment(csvRecord, context, inputChannel)
isData := reader.maybeConsumeComment(csvRecord, context, readerChannel)
if !isData {
continue
}
@ -143,7 +143,7 @@ func (reader *RecordReaderCSV) processHandle(
}
rowNumber++
isData := reader.maybeConsumeComment(csvRecord, context, inputChannel)
isData := reader.maybeConsumeComment(csvRecord, context, readerChannel)
if !isData {
continue
}
@ -204,7 +204,7 @@ func (reader *RecordReaderCSV) processHandle(
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)
@ -216,7 +216,7 @@ func (reader *RecordReaderCSV) processHandle(
func (reader *RecordReaderCSV) maybeConsumeComment(
csvRecord []string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
) bool {
if reader.readerOptions.CommentHandling == cli.CommentsAreData {
// Nothing is to be construed as a comment
@ -249,7 +249,7 @@ func (reader *RecordReaderCSV) maybeConsumeComment(
csvWriter.Comma = rune(reader.ifs0)
csvWriter.Write(csvRecord)
csvWriter.Flush()
inputChannel <- types.NewOutputString(buffer.String(), context)
readerChannel <- types.NewOutputString(buffer.String(), context)
} else /* reader.readerOptions.CommentHandling == cli.SkipComments */ {
// discard entirely
}

View file

@ -53,7 +53,7 @@ func NewRecordReaderPPRINT(readerOptions *cli.TReaderOptions) (*RecordReaderCSVL
func (reader *RecordReaderCSVLite) Read(
filenames []string,
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -72,7 +72,7 @@ func (reader *RecordReaderCSVLite) Read(
handle,
"(stdin)",
&context,
inputChannel,
readerChannel,
errorChannel,
downstreamDoneChannel,
)
@ -81,7 +81,7 @@ func (reader *RecordReaderCSVLite) Read(
handle,
"(stdin)",
&context,
inputChannel,
readerChannel,
errorChannel,
downstreamDoneChannel,
)
@ -102,7 +102,7 @@ func (reader *RecordReaderCSVLite) Read(
handle,
filename,
&context,
inputChannel,
readerChannel,
errorChannel,
downstreamDoneChannel,
)
@ -111,7 +111,7 @@ func (reader *RecordReaderCSVLite) Read(
handle,
filename,
&context,
inputChannel,
readerChannel,
errorChannel,
downstreamDoneChannel,
)
@ -121,7 +121,7 @@ func (reader *RecordReaderCSVLite) Read(
}
}
}
inputChannel <- types.NewEndOfStreamMarker(&context)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
// ----------------------------------------------------------------
@ -129,7 +129,7 @@ func (reader *RecordReaderCSVLite) processHandleExplicitCSVHeader(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -170,7 +170,7 @@ func (reader *RecordReaderCSVLite) processHandleExplicitCSVHeader(
// Check for comments-in-data feature
if strings.HasPrefix(line, reader.readerOptions.CommentString) {
if reader.readerOptions.CommentHandling == cli.PassComments {
inputChannel <- types.NewOutputString(line+"\n", context)
readerChannel <- types.NewOutputString(line+"\n", context)
continue
} else if reader.readerOptions.CommentHandling == cli.SkipComments {
continue
@ -242,7 +242,7 @@ func (reader *RecordReaderCSVLite) processHandleExplicitCSVHeader(
}
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)
@ -256,7 +256,7 @@ func (reader *RecordReaderCSVLite) processHandleImplicitCSVHeader(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -293,7 +293,7 @@ func (reader *RecordReaderCSVLite) processHandleImplicitCSVHeader(
// Check for comments-in-data feature
if strings.HasPrefix(line, reader.readerOptions.CommentString) {
if reader.readerOptions.CommentHandling == cli.PassComments {
inputChannel <- types.NewOutputString(line+"\n", context)
readerChannel <- types.NewOutputString(line+"\n", context)
continue
} else if reader.readerOptions.CommentHandling == cli.SkipComments {
continue
@ -373,7 +373,7 @@ func (reader *RecordReaderCSVLite) processHandleImplicitCSVHeader(
}
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)

View file

@ -23,7 +23,7 @@ func NewRecordReaderDKVP(readerOptions *cli.TReaderOptions) (*RecordReaderDKVP,
func (reader *RecordReaderDKVP) Read(
filenames []string,
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -37,7 +37,7 @@ func (reader *RecordReaderDKVP) Read(
if err != nil {
errorChannel <- err
}
reader.processHandle(handle, "(stdin)", &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
} else {
for _, filename := range filenames {
handle, err := lib.OpenFileForRead(
@ -49,20 +49,20 @@ func (reader *RecordReaderDKVP) Read(
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, filename, &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
handle.Close()
}
}
}
}
inputChannel <- types.NewEndOfStreamMarker(&context)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
func (reader *RecordReaderDKVP) processHandle(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -91,7 +91,7 @@ func (reader *RecordReaderDKVP) processHandle(
// Check for comments-in-data feature
if strings.HasPrefix(line, reader.readerOptions.CommentString) {
if reader.readerOptions.CommentHandling == cli.PassComments {
inputChannel <- types.NewOutputString(line+"\n", context)
readerChannel <- types.NewOutputString(line+"\n", context)
continue
} else if reader.readerOptions.CommentHandling == cli.SkipComments {
continue
@ -101,7 +101,7 @@ func (reader *RecordReaderDKVP) processHandle(
record := reader.recordFromDKVPLine(line)
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)

View file

@ -27,7 +27,7 @@ func NewRecordReaderJSON(readerOptions *cli.TReaderOptions) (*RecordReaderJSON,
func (reader *RecordReaderJSON) Read(
filenames []string,
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -41,7 +41,7 @@ func (reader *RecordReaderJSON) Read(
if err != nil {
errorChannel <- err
}
reader.processHandle(handle, "(stdin)", &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
} else {
for _, filename := range filenames {
handle, err := lib.OpenFileForRead(
@ -53,27 +53,27 @@ func (reader *RecordReaderJSON) Read(
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, filename, &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
handle.Close()
}
}
}
}
inputChannel <- types.NewEndOfStreamMarker(&context)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
func (reader *RecordReaderJSON) processHandle(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
context.UpdateForStartOfFile(filename)
if reader.readerOptions.CommentHandling != cli.CommentsAreData {
handle = NewJSONCommentEnabledReader(handle, reader.readerOptions, inputChannel)
handle = NewJSONCommentEnabledReader(handle, reader.readerOptions, readerChannel)
}
decoder := json.NewDecoder(handle)
@ -116,7 +116,7 @@ func (reader *RecordReaderJSON) processHandle(
return
}
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)
@ -143,7 +143,7 @@ func (reader *RecordReaderJSON) processHandle(
return
}
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)
@ -188,7 +188,7 @@ type JSONCommentEnabledReader struct {
lineScanner *bufio.Scanner
readerOptions *cli.TReaderOptions
context *types.Context // Needed for channelized stdout-printing logic
inputChannel chan<- *types.RecordAndContext
readerChannel chan<- *types.RecordAndContext
// In case a line was ingested which was longer than the read-buffer passed
// to us, in which case we need to split up that line and return it over
@ -199,13 +199,13 @@ type JSONCommentEnabledReader struct {
func NewJSONCommentEnabledReader(
underlying io.Reader,
readerOptions *cli.TReaderOptions,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
) *JSONCommentEnabledReader {
return &JSONCommentEnabledReader{
lineScanner: bufio.NewScanner(underlying),
readerOptions: readerOptions,
context: types.NewNilContext(),
inputChannel: inputChannel,
readerChannel: readerChannel,
lineBytes: nil,
}
@ -234,7 +234,7 @@ func (bsr *JSONCommentEnabledReader) Read(p []byte) (n int, err error) {
if bsr.readerOptions.CommentHandling == cli.PassComments {
// Insert the string into the record-output stream, so that goroutine can
// print it, resulting in deterministic output-ordering.
bsr.inputChannel <- types.NewOutputString(line+"\n", bsr.context)
bsr.readerChannel <- types.NewOutputString(line+"\n", bsr.context)
}
}
}

View file

@ -23,7 +23,7 @@ func NewRecordReaderNIDX(readerOptions *cli.TReaderOptions) (*RecordReaderNIDX,
func (reader *RecordReaderNIDX) Read(
filenames []string,
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -37,7 +37,7 @@ func (reader *RecordReaderNIDX) Read(
if err != nil {
errorChannel <- err
}
reader.processHandle(handle, "(stdin)", &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
} else {
for _, filename := range filenames {
handle, err := lib.OpenFileForRead(
@ -49,20 +49,20 @@ func (reader *RecordReaderNIDX) Read(
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, filename, &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
handle.Close()
}
}
}
}
inputChannel <- types.NewEndOfStreamMarker(&context)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
func (reader *RecordReaderNIDX) processHandle(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -93,7 +93,7 @@ func (reader *RecordReaderNIDX) processHandle(
// Check for comments-in-data feature
if strings.HasPrefix(line, reader.readerOptions.CommentString) {
if reader.readerOptions.CommentHandling == cli.PassComments {
inputChannel <- types.NewOutputString(line+"\n", context)
readerChannel <- types.NewOutputString(line+"\n", context)
continue
} else if reader.readerOptions.CommentHandling == cli.SkipComments {
continue
@ -104,7 +104,7 @@ func (reader *RecordReaderNIDX) processHandle(
record := reader.recordFromNIDXLine(line)
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(
readerChannel <- types.NewRecordAndContext(
record,
context,
)

View file

@ -27,7 +27,7 @@ func NewRecordReaderXTAB(readerOptions *cli.TReaderOptions) (*RecordReaderXTAB,
func (reader *RecordReaderXTAB) Read(
filenames []string,
context types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -41,7 +41,7 @@ func (reader *RecordReaderXTAB) Read(
if err != nil {
errorChannel <- err
}
reader.processHandle(handle, "(stdin)", &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, "(stdin)", &context, readerChannel, errorChannel, downstreamDoneChannel)
} else {
for _, filename := range filenames {
handle, err := lib.OpenFileForRead(
@ -53,20 +53,20 @@ func (reader *RecordReaderXTAB) Read(
if err != nil {
errorChannel <- err
} else {
reader.processHandle(handle, filename, &context, inputChannel, errorChannel, downstreamDoneChannel)
reader.processHandle(handle, filename, &context, readerChannel, errorChannel, downstreamDoneChannel)
handle.Close()
}
}
}
}
inputChannel <- types.NewEndOfStreamMarker(&context)
readerChannel <- types.NewEndOfStreamMarker(&context)
}
func (reader *RecordReaderXTAB) processHandle(
handle io.Reader,
filename string,
context *types.Context,
inputChannel chan<- *types.RecordAndContext,
readerChannel chan<- *types.RecordAndContext,
errorChannel chan error,
downstreamDoneChannel <-chan bool, // for mlr head
) {
@ -102,7 +102,7 @@ func (reader *RecordReaderXTAB) processHandle(
return
}
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(record, context)
readerChannel <- types.NewRecordAndContext(record, context)
linesForRecord = list.New()
}
@ -114,7 +114,7 @@ func (reader *RecordReaderXTAB) processHandle(
// Check for comments-in-data feature
if strings.HasPrefix(line, reader.readerOptions.CommentString) {
if reader.readerOptions.CommentHandling == cli.PassComments {
inputChannel <- types.NewOutputString(line+reader.readerOptions.IFS, context)
readerChannel <- types.NewOutputString(line+reader.readerOptions.IFS, context)
continue
} else if reader.readerOptions.CommentHandling == cli.SkipComments {
continue
@ -133,7 +133,7 @@ func (reader *RecordReaderXTAB) processHandle(
return
}
context.UpdateForInputRecord()
inputChannel <- types.NewRecordAndContext(record, context)
readerChannel <- types.NewRecordAndContext(record, context)
linesForRecord = list.New()
}
}

View file

@ -8,7 +8,7 @@ import (
)
func ChannelWriter(
outputChannel <-chan *types.RecordAndContext,
writerChannel <-chan *types.RecordAndContext,
recordWriter IRecordWriter,
writerOptions *cli.TWriterOptions,
doneChannel chan<- bool,
@ -16,7 +16,7 @@ func ChannelWriter(
outputIsStdout bool,
) {
for {
recordAndContext := <-outputChannel
recordAndContext := <-writerChannel
// Three things can come through:
// * End-of-stream marker

View file

@ -60,8 +60,8 @@ func Stream(
}
// Set up the reader-to-transformer and transformer-to-writer channels.
inputChannel := make(chan *types.RecordAndContext, 10)
outputChannel := make(chan *types.RecordAndContext, 1)
readerChannel := make(chan *types.RecordAndContext, 10)
writerChannel := make(chan *types.RecordAndContext, 1)
// We're done when a fatal error is registered on input (file not found,
// etc) or when the record-writer has written all its output. We use
@ -80,10 +80,11 @@ func Stream(
// error or end-of-processing happens.
bufferedOutputStream := bufio.NewWriter(outputStream)
go recordReader.Read(fileNames, *initialContext, inputChannel, errorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(inputChannel, readerDownstreamDoneChannel, recordTransformers, outputChannel,
options)
go output.ChannelWriter(outputChannel, recordWriter, &options.WriterOptions, doneWritingChannel, bufferedOutputStream, outputIsStdout)
go recordReader.Read(fileNames, *initialContext, readerChannel, errorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(readerChannel, readerDownstreamDoneChannel, recordTransformers,
writerChannel, options)
go output.ChannelWriter(writerChannel, recordWriter, &options.WriterOptions, doneWritingChannel,
bufferedOutputStream, outputIsStdout)
done := false
for !done {

View file

@ -142,10 +142,10 @@ import (
// subdivides goroutines for each transformer in the chain, with intermediary
// channels between them.
func ChainTransformer(
readerInputRecordChannel <-chan *types.RecordAndContext,
readerRecordChannel <-chan *types.RecordAndContext,
readerDownstreamDoneChannel chan<- bool, // for mlr head -- see also stream.go
recordTransformers []IRecordTransformer, // not *recordTransformer since this is an interface
writerOutputRecordChannel chan<- *types.RecordAndContext,
writerRecordChannel chan<- *types.RecordAndContext,
options *cli.TOptions,
) {
i := 0
@ -163,10 +163,10 @@ func ChainTransformer(
for i, recordTransformer := range recordTransformers {
// Downstream flow: channel a given transformer reads records from
irchan := readerInputRecordChannel
irchan := readerRecordChannel
// Downstream flow: channel a given transformer writes transformed
// records to
orchan := writerOutputRecordChannel
orchan := writerRecordChannel
// Upstream signaling: channel a given transformer reads to see if
// downstream transformers are done (e.g. mlr head)
idchan := intermediateDownstreamDoneChannels[i]

View file

@ -472,14 +472,14 @@ func (tr *TransformerJoin) ingestLeftFile() {
initialContext.UpdateForStartOfFile(tr.opts.leftFileName)
// Set up channels for the record-reader.
inputChannel := make(chan *types.RecordAndContext, 10)
readerChannel := make(chan *types.RecordAndContext, 10)
errorChannel := make(chan error, 1)
downstreamDoneChannel := make(chan bool, 1)
// Start the record reader.
// TODO: prepipe
leftFileNameArray := [1]string{tr.opts.leftFileName}
go recordReader.Read(leftFileNameArray[:], *initialContext, inputChannel, errorChannel, downstreamDoneChannel)
go recordReader.Read(leftFileNameArray[:], *initialContext, readerChannel, errorChannel, downstreamDoneChannel)
// Ingest parsed records and bucket them by their join-field values. E.g.
// if the join-field is "id" then put all records with id=1 in one bucket,
@ -493,7 +493,7 @@ func (tr *TransformerJoin) ingestLeftFile() {
fmt.Fprintln(os.Stderr, "mlr", ": ", err)
os.Exit(1)
case leftrecAndContext := <-inputChannel:
case leftrecAndContext := <-readerChannel:
if leftrecAndContext.EndOfStream {
done = true
break // breaks the switch, not the for, in Golang

View file

@ -123,10 +123,10 @@ import (
// Data stored in this class
type JoinBucketKeeper struct {
// For streaming through the left-side file
recordReader input.IRecordReader
context *types.Context
inputChannel <-chan *types.RecordAndContext
errorChannel chan error
recordReader input.IRecordReader
context *types.Context
readerChannel <-chan *types.RecordAndContext
errorChannel chan error
// TODO: merge with leof flag
recordReaderDone bool
@ -178,18 +178,18 @@ func NewJoinBucketKeeper(
initialContext.UpdateForStartOfFile(leftFileName)
// Set up channels for the record-reader
inputChannel := make(chan *types.RecordAndContext, 10)
readerChannel := make(chan *types.RecordAndContext, 10)
errorChannel := make(chan error, 1)
downstreamDoneChannel := make(chan bool, 1)
// Start the record-reader in its own goroutine.
leftFileNameArray := [1]string{leftFileName}
go recordReader.Read(leftFileNameArray[:], *initialContext, inputChannel, errorChannel, downstreamDoneChannel)
go recordReader.Read(leftFileNameArray[:], *initialContext, readerChannel, errorChannel, downstreamDoneChannel)
keeper := &JoinBucketKeeper{
recordReader: recordReader,
context: initialContext,
inputChannel: inputChannel,
readerChannel: readerChannel,
errorChannel: errorChannel,
recordReaderDone: false,
@ -570,7 +570,7 @@ func (keeper *JoinBucketKeeper) readRecord() *types.RecordAndContext {
case err := <-keeper.errorChannel:
fmt.Fprintln(os.Stderr, "mlr", ": ", err)
os.Exit(1)
case leftrecAndContext := <-keeper.inputChannel:
case leftrecAndContext := <-keeper.readerChannel:
if leftrecAndContext.EndOfStream { // end-of-stream marker
keeper.recordReaderDone = true
return nil

View file

@ -1,8 +1,73 @@
================================================================
PUNCHDOWN LIST
* perf wup @ rgp.md
* note somewhere why NewEndOfStreamMarker instead of channel close -- readers carry final context
* perf:
o go tool pprof -http=:8080 cpu.pprof
x close(chan) as EOS throughout
- note somewhere why NewEndOfStreamMarker instead of channel close -- readers carry final context
o notes
- https://github.com/golang/go/issues/18237
- https://github.com/google/gvisor/issues/1942
- https://github.com/golang/go/issues/21827
- https://news.ycombinator.com/item?id=19841580
- https://news.ycombinator.com/item?id=19839081
- https://about.sourcegraph.com/go/gophercon-2019-death-by-three-thousand-timers-streaming-video-on-demand-for-cable-tv/
- https://stackoverflow.com/questions/41353508/how-to-determine-which-side-of-go-channel-is-waiting
case p <- result:
case <-time.After(500 * time.Millisecond):
...
- https://github.com/dgryski/go-perfbook
- https://dave.cheney.net/high-performance-go-workshop/dotgo-paris.html
o find:
- perf x platform, input-file format, (output-file format), etc
- try select-check & stats-keep -- ?
- try select-check & list-build -- ?
- make a 2nd/3rd cmd main w/ simple model & tweak that
o dkvp-reader factor-out ...
o mods:
? outputChannel -> *list.List at each transformer -- ? profile first
- un-legacy fflush flag :(
> conditional on isatty stdout
> new fflushWasSpecified
> downcase OFSWasSpecified, HaveRandSeed, et al.
- do and maybe keep? record-reader return (raclist, err) & refactor repl accordingly
> needs factor for-loop to stateful so maybe not
- transfomers w/ reclist: *maybe*, but idchan/odchan too ... invest time after some refactor decions made
- fix record/line sequencing regressions
- tail -f handling
> batch-size 1 on stdin, for repl at least?
> adaptive is-blocking detection -- make sure it's not over-sensitive
o goals:
- keep goroutines -- including per-transformer -- for parallelism
- look for flex ideas on how to structure that parallelism
o ideas:
n different concurrency abstraction?
n no-select -- just <- on a single channel?
> tried in mprof/main.go: reader & writer both write/read only one channel.
> still unacceptable runtime overhead :(
K string-buffer still too many write-syscalls: use bufio.Writer
w TODO: flush() points
d idchan/odchan can maybe be marked in chain-transformers as local array
d do other stuff first
d replace errchan w/ retval only?
d do other stuff first
d rac-batching; ensure env-var access
k also prototyped for no-transformer case
> how to structure this for the transformers?
> do that after hiding
- mprof split-reader getenv something
- hide app-level scan/format under sys-level read/write: also batched
easier/line-oriented:
record_reader_csvlite.go
record_reader_dkvp.go
record_reader_nidx.go
record_reader_xtab.go
harder b/c using encoding APIs:
record_reader_csv.go
record_reader_json.go
d how to handle tail -f and repl
> try unconditional batching and hiding *first* to see how much best-case perf to be had
? lazy type-infer?? needs careful use of accessor-mutators in place of mv.type etc
* blockers:
- keep checking issues
@ -15,8 +80,16 @@ PUNCHDOWN LIST
- #756 octal
- 0b1011 olh/webdoc
- rtd links somewhere
- rd -> dbr
- more why-6 notes -- ?
- beta-link & hnp
-> beta-release page
- integrate:
> https://www.libhunt.com/r/miller
> https://repology.org/project/miller/information
* shebang
- olh -s & --norc; --version & --bare-version; --
- maybe multiple -s -- ? []args would continue growing during parse ...
- try mlr -s on windows; maybe consolation for shebang-bash -- ?
- doc re where to put --norc