Documentation
¶
Index ¶
- Constants
- Variables
- func Execute()
- func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Command, chan string, chan int)
- func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan string, chan string, chan int)
- type Chunk
- type Command
- type Config
- type Options
Constants ¶
const VERSION = "0.1.2"
VERSION of this package
Variables ¶
var ErrCancelled = fmt.Errorf("cancelled")
ErrCancelled means command being cancelled
var ErrTimeout = fmt.Errorf("time out")
ErrTimeout means command timeout
var OutputChunkSize = 16384 // 16K
OutputChunkSize is buffer size of output string chunk sent to channel, default 16K.
var RootCmd = &cobra.Command{ Use: "rush", Short: "parallelly execute shell commands", Long: fmt.Sprintf(` rush -- parallelly execute shell commands Version: %s Author: Wei Shen <shenwei356@gmail.com> Source code: https://github.com/shenwei356/rush `, VERSION), Run: func(cmd *cobra.Command, args []string) { var err error config := getConfigs(cmd) if config.Version { checkVersion() return } j := config.Jobs + 1 if j > runtime.NumCPU() { j = runtime.NumCPU() } runtime.GOMAXPROCS(j) config.reFieldDelimiter, err = regexp.Compile(config.FieldDelimiter) checkError(errors.Wrap(err, "compile field delimiter")) command0 := strings.Join(args, " ") if command0 == "" { command0 = `echo "{}"` } // out file handler var outfh *bufio.Writer if isStdin(config.OutFile) { outfh = bufio.NewWriter(os.Stdout) } else { var fh *os.File fh, err = os.Create(config.OutFile) checkError(err) defer fh.Close() outfh = bufio.NewWriter(fh) } defer outfh.Flush() if config.DryRun { config.Verbose = false } var succCmds = make(map[string]struct{}) var bfhSuccCmds *bufio.Writer if config.Continue { var existed bool existed, err = exists(config.SuccCmdFile) checkError(err) if existed { succCmds = readSuccCmds(config.SuccCmdFile) } var fhSuccCmds *os.File fhSuccCmds, err = os.Create(config.SuccCmdFile) checkError(err) defer fhSuccCmds.Close() bfhSuccCmds = bufio.NewWriter(fhSuccCmds) defer bfhSuccCmds.Flush() } opts := &Options{ DryRun: config.DryRun, Jobs: config.Jobs, KeepOrder: config.KeepOrder, Retries: config.Retries, RetryInterval: time.Duration(config.RetryInterval) * time.Second, Timeout: time.Duration(config.Timeout) * time.Second, StopOnErr: config.StopOnErr, Verbose: config.Verbose, RecordSuccessfulCmd: config.Continue, } if len(config.Infiles) == 0 { config.Infiles = append(config.Infiles, "-") } recordDelimiter := []byte(config.RecordDelimiter) split := func(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil } i := bytes.Index(data, recordDelimiter) if i >= 0 { return i + len(recordDelimiter), data[0:i], nil } if atEOF { return len(data), data, nil } return 0, nil, nil } cancel := make(chan struct{}) donePreprocessFiles := make(chan int) chCmdStr := make(chan string, config.Jobs) go func() { n := config.NRecords var id uint64 = 1 READFILES: for _, file := range config.Infiles { // input file handler var infh *os.File if isStdin(file) { infh = os.Stdin } else { infh, err = os.Open(file) checkError(err) defer infh.Close() } scanner := bufio.NewScanner(infh) scanner.Buffer(make([]byte, 0, 16384), 2147483647) scanner.Split(split) var record string var records []string records = make([]string, 0, n) var cmdStr string var runned bool for scanner.Scan() { select { case <-cancel: if config.Verbose { log.Warningf("cancel reading file: %s", file) } break READFILES default: } record = scanner.Text() if record == "" { continue } records = append(records, record) if len(records) == n { cmdStr = fillCommand(config, command0, Chunk{ID: id, Data: records}) if config.Continue { if _, runned = succCmds[cmdStr]; runned { log.Infof("ignore cmd #%d: %s", id, cmdStr) bfhSuccCmds.WriteString(cmdStr + endMarkOfCMD) } else { chCmdStr <- cmdStr } } else { chCmdStr <- cmdStr } id++ records = make([]string, 0, n) } } if len(records) > 0 { cmdStr = fillCommand(config, command0, Chunk{ID: id, Data: records}) if config.Continue { if _, runned = succCmds[cmdStr]; runned { log.Infof("ignore cmd #%d: %s", id, cmdStr) bfhSuccCmds.WriteString(cmdStr + endMarkOfCMD) } else { chCmdStr <- cmdStr } } else { chCmdStr <- cmdStr } } if config.Continue { bfhSuccCmds.Flush() } checkError(errors.Wrap(scanner.Err(), "read input data")) } close(chCmdStr) donePreprocessFiles <- 1 }() chOutput, chSuccessfulCmd, doneSendOutput := Run4Output(opts, cancel, chCmdStr) doneOutput := make(chan int) go func() { last := time.Now().Add(2 * time.Second) for c := range chOutput { outfh.WriteString(c) if t := time.Now(); t.After(last) { outfh.Flush() last = t.Add(2 * time.Second) } } outfh.Flush() doneOutput <- 1 }() doneSaveSuccCmd := make(chan int) if config.Continue { go func() { for c := range chSuccessfulCmd { bfhSuccCmds.WriteString(c + endMarkOfCMD) bfhSuccCmds.Flush() } doneSaveSuccCmd <- 1 }() } chExitSignalMonitor := make(chan struct{}) signalChan := make(chan os.Signal, 1) cleanupDone := make(chan int) signal.Notify(signalChan, os.Interrupt) go func() { select { case <-signalChan: log.Criticalf("received an interrupt, stopping unfinished commands...") select { case <-cancel: default: close(cancel) } cleanupDone <- 1 return case <-chExitSignalMonitor: cleanupDone <- 1 return } }() <-donePreprocessFiles <-doneSendOutput <-doneOutput if config.Continue { <-doneSaveSuccCmd } close(chExitSignalMonitor) <-cleanupDone }, }
RootCmd represents the base command when called without any subcommands
var TmpOutputDataBuffer = 1048576 // 1M
TmpOutputDataBuffer is buffer size for output of a command before saving to tmpfile, default 1M.
var Verbose bool
Verbose decide whether print extra information
Functions ¶
func Execute ¶
func Execute()
Execute adds all child commands to the root command sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.
Types ¶
type Command ¶
type Command struct {
ID uint64 // ID
Cmd string // command
Cancel chan struct{} // channel for close
Timeout time.Duration // time out
Ch chan string // channel for stdout
Err error // Error
Duration time.Duration // runtime
// contains filtered or unexported fields
}
Command is the Command struct
func NewCommand ¶
NewCommand create a Command
func (*Command) ExitCode ¶ added in v0.0.3
ExitCode returns the exit code associated with a given error
type Config ¶
type Config struct {
Verbose bool
Version bool
Jobs int
OutFile string
Infiles []string
RecordDelimiter string
RecordsJoinSeparator string
NRecords int
FieldDelimiter string
Retries int
RetryInterval int
Timeout int
KeepOrder bool
StopOnErr bool
DryRun bool
Continue bool
SuccCmdFile string
AssignMap map[string]string
Trim string
// contains filtered or unexported fields
}
Config is the struct containing all global flags
type Options ¶
type Options struct {
DryRun bool // just print command
Jobs int // max jobs number
KeepOrder bool // keep output order
Retries int // max retry chances
RetryInterval time.Duration // retry interval
Timeout time.Duration // timeout
StopOnErr bool // stop on any error
RecordSuccessfulCmd bool // send successful command to channel
Verbose bool
}
Options contains the options