cmd

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2017 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const VERSION = "0.1.2"

VERSION of this package

Variables

View Source
var ErrCancelled = fmt.Errorf("cancelled")

ErrCancelled means command being cancelled

View Source
var ErrTimeout = fmt.Errorf("time out")

ErrTimeout means command timeout

View Source
var OutputChunkSize = 16384 // 16K

OutputChunkSize is buffer size of output string chunk sent to channel, default 16K.

View Source
var RootCmd = &cobra.Command{
	Use:   "rush",
	Short: "parallelly execute shell commands",
	Long: fmt.Sprintf(`
rush -- parallelly execute shell commands

Version: %s

Author: Wei Shen <shenwei356@gmail.com>

Source code: https://github.com/shenwei356/rush

`, VERSION),
	Run: func(cmd *cobra.Command, args []string) {
		var err error
		config := getConfigs(cmd)

		if config.Version {
			checkVersion()
			return
		}

		j := config.Jobs + 1
		if j > runtime.NumCPU() {
			j = runtime.NumCPU()
		}
		runtime.GOMAXPROCS(j)

		config.reFieldDelimiter, err = regexp.Compile(config.FieldDelimiter)
		checkError(errors.Wrap(err, "compile field delimiter"))

		command0 := strings.Join(args, " ")
		if command0 == "" {
			command0 = `echo "{}"`
		}

		// out file handler
		var outfh *bufio.Writer
		if isStdin(config.OutFile) {
			outfh = bufio.NewWriter(os.Stdout)
		} else {
			var fh *os.File
			fh, err = os.Create(config.OutFile)
			checkError(err)
			defer fh.Close()

			outfh = bufio.NewWriter(fh)
		}
		defer outfh.Flush()

		if config.DryRun {
			config.Verbose = false
		}

		var succCmds = make(map[string]struct{})
		var bfhSuccCmds *bufio.Writer
		if config.Continue {
			var existed bool
			existed, err = exists(config.SuccCmdFile)
			checkError(err)
			if existed {
				succCmds = readSuccCmds(config.SuccCmdFile)
			}

			var fhSuccCmds *os.File
			fhSuccCmds, err = os.Create(config.SuccCmdFile)
			checkError(err)
			defer fhSuccCmds.Close()

			bfhSuccCmds = bufio.NewWriter(fhSuccCmds)
			defer bfhSuccCmds.Flush()
		}

		opts := &Options{
			DryRun:              config.DryRun,
			Jobs:                config.Jobs,
			KeepOrder:           config.KeepOrder,
			Retries:             config.Retries,
			RetryInterval:       time.Duration(config.RetryInterval) * time.Second,
			Timeout:             time.Duration(config.Timeout) * time.Second,
			StopOnErr:           config.StopOnErr,
			Verbose:             config.Verbose,
			RecordSuccessfulCmd: config.Continue,
		}

		if len(config.Infiles) == 0 {
			config.Infiles = append(config.Infiles, "-")
		}

		recordDelimiter := []byte(config.RecordDelimiter)
		split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
			if atEOF && len(data) == 0 {
				return 0, nil, nil
			}
			i := bytes.Index(data, recordDelimiter)
			if i >= 0 {
				return i + len(recordDelimiter), data[0:i], nil
			}
			if atEOF {
				return len(data), data, nil
			}
			return 0, nil, nil
		}

		cancel := make(chan struct{})

		donePreprocessFiles := make(chan int)

		chCmdStr := make(chan string, config.Jobs)

		go func() {
			n := config.NRecords
			var id uint64 = 1

		READFILES:
			for _, file := range config.Infiles {
				// input file handler
				var infh *os.File
				if isStdin(file) {
					infh = os.Stdin
				} else {
					infh, err = os.Open(file)
					checkError(err)
					defer infh.Close()
				}

				scanner := bufio.NewScanner(infh)

				scanner.Buffer(make([]byte, 0, 16384), 2147483647)
				scanner.Split(split)

				var record string
				var records []string
				records = make([]string, 0, n)
				var cmdStr string
				var runned bool
				for scanner.Scan() {
					select {
					case <-cancel:
						if config.Verbose {
							log.Warningf("cancel reading file: %s", file)
						}
						break READFILES
					default:
					}

					record = scanner.Text()
					if record == "" {
						continue
					}
					records = append(records, record)

					if len(records) == n {
						cmdStr = fillCommand(config, command0, Chunk{ID: id, Data: records})
						if config.Continue {
							if _, runned = succCmds[cmdStr]; runned {
								log.Infof("ignore cmd #%d: %s", id, cmdStr)
								bfhSuccCmds.WriteString(cmdStr + endMarkOfCMD)
							} else {
								chCmdStr <- cmdStr
							}
						} else {
							chCmdStr <- cmdStr
						}

						id++
						records = make([]string, 0, n)
					}
				}
				if len(records) > 0 {
					cmdStr = fillCommand(config, command0, Chunk{ID: id, Data: records})
					if config.Continue {
						if _, runned = succCmds[cmdStr]; runned {
							log.Infof("ignore cmd #%d: %s", id, cmdStr)
							bfhSuccCmds.WriteString(cmdStr + endMarkOfCMD)
						} else {
							chCmdStr <- cmdStr
						}
					} else {
						chCmdStr <- cmdStr
					}
				}
				if config.Continue {
					bfhSuccCmds.Flush()
				}
				checkError(errors.Wrap(scanner.Err(), "read input data"))
			}

			close(chCmdStr)

			donePreprocessFiles <- 1
		}()

		chOutput, chSuccessfulCmd, doneSendOutput := Run4Output(opts, cancel, chCmdStr)

		doneOutput := make(chan int)
		go func() {
			last := time.Now().Add(2 * time.Second)
			for c := range chOutput {
				outfh.WriteString(c)
				if t := time.Now(); t.After(last) {
					outfh.Flush()
					last = t.Add(2 * time.Second)
				}
			}
			outfh.Flush()

			doneOutput <- 1
		}()

		doneSaveSuccCmd := make(chan int)
		if config.Continue {
			go func() {
				for c := range chSuccessfulCmd {
					bfhSuccCmds.WriteString(c + endMarkOfCMD)
					bfhSuccCmds.Flush()
				}
				doneSaveSuccCmd <- 1
			}()
		}

		chExitSignalMonitor := make(chan struct{})
		signalChan := make(chan os.Signal, 1)
		cleanupDone := make(chan int)
		signal.Notify(signalChan, os.Interrupt)
		go func() {
			select {
			case <-signalChan:
				log.Criticalf("received an interrupt, stopping unfinished commands...")
				select {
				case <-cancel:
				default:
					close(cancel)
				}
				cleanupDone <- 1
				return
			case <-chExitSignalMonitor:
				cleanupDone <- 1
				return
			}
		}()

		<-donePreprocessFiles
		<-doneSendOutput
		<-doneOutput
		if config.Continue {
			<-doneSaveSuccCmd
		}

		close(chExitSignalMonitor)
		<-cleanupDone
	},
}

RootCmd represents the base command when called without any subcommands

View Source
var TmpOutputDataBuffer = 1048576 // 1M

TmpOutputDataBuffer is buffer size for output of a command before saving to tmpfile, default 1M.

View Source
var Verbose bool

Verbose decide whether print extra information

Functions

func Execute

func Execute()

Execute adds all child commands to the root command sets flags appropriately. This is called by main.main(). It only needs to happen once to the rootCmd.

func Run

func Run(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan *Command, chan string, chan int)

Run runs commands in parallel from channel chCmdStr, and returns a Command channel, and a done channel to ensure safe exit.

func Run4Output

func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan string, chan string, chan int)

Run4Output runs commands in parallel from channel chCmdStr, and returns an output text channel, and a done channel to ensure safe exit.

Types

type Chunk

type Chunk struct {
	ID   uint64
	Data []string
}

Chunk contains input data records sent to a command

type Command

type Command struct {
	ID  uint64 // ID
	Cmd string // command

	Cancel  chan struct{} // channel for close
	Timeout time.Duration // time out

	Ch chan string // channel for stdout

	Err      error         // Error
	Duration time.Duration // runtime
	// contains filtered or unexported fields
}

Command is the Command struct

func NewCommand

func NewCommand(id uint64, cmdStr string, cancel chan struct{}, timeout time.Duration) *Command

NewCommand create a Command

func (*Command) Cleanup

func (c *Command) Cleanup() error

Cleanup removes tmpfile

func (*Command) ExitCode added in v0.0.3

func (c *Command) ExitCode() int

ExitCode returns the exit code associated with a given error

func (*Command) Run

func (c *Command) Run() error

Run runs a command and send output to command.Ch in background.

func (*Command) String added in v0.0.3

func (c *Command) String() string

type Config

type Config struct {
	Verbose bool
	Version bool

	Jobs    int
	OutFile string

	Infiles []string

	RecordDelimiter      string
	RecordsJoinSeparator string
	NRecords             int
	FieldDelimiter       string

	Retries       int
	RetryInterval int
	Timeout       int

	KeepOrder bool
	StopOnErr bool
	DryRun    bool

	Continue    bool
	SuccCmdFile string

	AssignMap map[string]string
	Trim      string
	// contains filtered or unexported fields
}

Config is the struct containing all global flags

type Options

type Options struct {
	DryRun              bool          // just print command
	Jobs                int           // max jobs number
	KeepOrder           bool          // keep output order
	Retries             int           // max retry chances
	RetryInterval       time.Duration // retry interval
	Timeout             time.Duration // timeout
	StopOnErr           bool          // stop on any error
	RecordSuccessfulCmd bool          // send successful command to channel
	Verbose             bool
}

Options contains the options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL