Documentation
¶
Index ¶
Constants ¶
View Source
const VERSION = "0.0.2"
VERSION of this package
Variables ¶
View Source
var OutputChunkSize = 16384 // 16K
OutputChunkSize is buffer size of output string chunk sent to channel, default 16K.
View Source
var RootCmd = &cobra.Command{ Use: "rush", Short: "parallelly execute shell commands", Long: fmt.Sprintf(` rush -- parallelly execute shell commands Version: %s Author: Wei Shen <shenwei356@gmail.com> Source code: https://github.com/shenwei356/rush `, VERSION), Run: func(cmd *cobra.Command, args []string) { var err error config := getConfigs(cmd) if config.Version { checkVersion() return } j := config.Jobs + 1 if j > runtime.NumCPU() { j = runtime.NumCPU() } runtime.GOMAXPROCS(j) config.reFieldDelimiter, err = regexp.Compile(config.FieldDelimiter) checkError(errors.Wrap(err, "compile field delimiter")) command0 := strings.Join(args, " ") if command0 == "" { command0 = "echo {}" } cancel := make(chan struct{}) opts := &Options{ DryRun: config.DryRun, Jobs: config.Jobs, KeepOrder: config.KeepOrder, Retries: config.Retries, Timeout: time.Duration(config.Timeout) * time.Second, StopOnErr: config.StopOnErr, Verbose: config.Verbose, } // out file handler var outfh *bufio.Writer if isStdin(config.OutFile) { outfh = bufio.NewWriter(os.Stdout) } else { var fh *os.File fh, err = os.Create(config.OutFile) defer fh.Close() outfh = bufio.NewWriter(fh) checkError(err) } defer outfh.Flush() if len(config.Infiles) == 0 { config.Infiles = append(config.Infiles, "-") } split := func(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil } i := bytes.IndexAny(data, config.RecordDelimiter) if i >= 0 { return i + 1, data[0:i], nil } if atEOF { return len(data), data, nil } return 0, nil, nil } for _, file := range config.Infiles { // input file handler var infh *os.File if isStdin(file) { infh = os.Stdin } else { infh, err = os.Open(file) checkError(err) defer infh.Close() } chCmdStr := make(chan string, config.Jobs) donePreprocess := make(chan int) go func() { scanner := bufio.NewScanner(infh) scanner.Buffer(make([]byte, 0, 16384), 2147483647) scanner.Split(split) n := config.NRecords var id uint64 = 1 var records []string records = make([]string, 0, n) for scanner.Scan() { records = append(records, scanner.Text()) if len(records) == n { chCmdStr <- fillCommand(config, command0, Chunk{ID: id, Data: records}) id++ records = make([]string, 0, n) } } if len(records) > 0 { chCmdStr <- fillCommand(config, command0, Chunk{ID: id, Data: records}) id++ } checkError(errors.Wrap(scanner.Err(), "read input data")) close(chCmdStr) donePreprocess <- 1 }() chOutput, doneSendOutput := Run4Output(opts, cancel, chCmdStr) doneOutput := make(chan int) go func() { last := time.Now().Add(2 * time.Second) for c := range chOutput { outfh.WriteString(c) if t := time.Now(); t.After(last) { outfh.Flush() last = t.Add(2 * time.Second) } } outfh.Flush() doneOutput <- 1 }() <-donePreprocess <-doneSendOutput <-doneOutput } }, }
RootCmd represents the base command when called without any subcommands
View Source
var TmpOutputDataBuffer = 1048576 // 1M
TmpOutputDataBuffer is buffer size for output of a command before saving to tmpfile, default 1M.
View Source
var Verbose bool
Verbose decide whether print extra information
Functions ¶
func Execute ¶
func Execute()
Execute adds all child commands to the root command sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.
Types ¶
type Command ¶
type Command struct {
ID uint64
Cmd string
Cancel chan struct{}
Timeout time.Duration
Ch chan string // buffer for output
Err error
Duration time.Duration
// contains filtered or unexported fields
}
Command is the Command struct
func NewCommand ¶
NewCommand create a Command
type Config ¶
type Config struct {
Verbose bool
Version bool
Jobs int
OutFile string
Infiles []string
RecordDelimiter string
NRecords int
FieldDelimiter string
Retries int
RetryInterval int
Timeout int
KeepOrder bool
StopOnErr bool
Continue bool
DryRun bool
AssignMap map[string]string
Trim string
// contains filtered or unexported fields
}
Config is the struct containing all global flags
Click to show internal directories.
Click to hide internal directories.