Documentation
¶
Index ¶
- Variables
- func AddResponseToDB(TicketID Ticket, response []byte, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec)
- func CliCtxFromKafkaMsg(kafkaMsg KafkaMsg, cliContext context.CLIContext) context.CLIContext
- func GetResponseFromDB(TicketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec) []byte
- func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
- func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
- func NewConsumer(kafkaPorts []string) sarama.Consumer
- func NewProducer(kafkaPorts []string) sarama.SyncProducer
- func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
- func QueryDB(cdc *codec.Codec, kafkaDB *dbm.GoLevelDB) http.HandlerFunc
- func RegisterCodec(cdc *codec.Codec)
- func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
- func SetTicketIDtoDB(TicketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec, msg []byte)
- func TopicsInit(admin sarama.ClusterAdmin, topic string)
- type KafkaCliCtx
- type KafkaMsg
- type KafkaState
- type Ticket
- type TicketIDResponse
Constants ¶
This section is empty.
Variables ¶
var DefaultCLIHome = os.ExpandEnv("$HOME/.kafka")
DefaultCLIHome : is the home path
var ModuleCdc *codec.Codec
module codec
var SleepRoutine = time.Duration(2500000000)
SleepRoutine : the time the kafka messages are to be taken in
var SleepTimer = time.Duration(1000000000)
SleepTimer : the time the kafka messages are to be taken in
var TicketIDAtomicCounter int64
TicketIDAtomicCounter is a counter that adds when each time a function is called
var Topics = []string{
"Topic",
}
Topics : is list of topics
Functions ¶
func AddResponseToDB ¶
AddResponseToDB : Updates response to DB
func CliCtxFromKafkaMsg ¶
func CliCtxFromKafkaMsg(kafkaMsg KafkaMsg, cliContext context.CLIContext) context.CLIContext
CliCtxFromKafkaMsg : sets the transaction and cli contexts again to consume
func GetResponseFromDB ¶
GetResponseFromDB : gives the response from DB
func KafkaAdmin ¶
func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
KafkaAdmin : is admin to create topics
func KafkaProducerDeliverMessage ¶
func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
KafkaProducerDeliverMessage : delivers messages to kafka
func NewConsumer ¶
NewConsumer : is a consumer which is needed to create child consumers to consume topics
func NewProducer ¶
func NewProducer(kafkaPorts []string) sarama.SyncProducer
NewProducer is a producer to send messages to kafka
func PartitionConsumers ¶
func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
PartitionConsumers : is a child consumer
func SendToKafka ¶
func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
SendToKafka : handles sending message to kafka
func SetTicketIDtoDB ¶
SetTicketIDtoDB : initiates ticketID in Database
func TopicsInit ¶
func TopicsInit(admin sarama.ClusterAdmin, topic string)
TopicsInit : is needed to initialise topics
Types ¶
type KafkaCliCtx ¶
type KafkaCliCtx struct {
OutputFormat string
ChainID string
Height int64
HomeDir string
NodeURI string
From string
TrustNode bool
UseLedger bool
BroadcastMode string
Simulate bool
GenerateOnly bool
FromAddress sdk.AccAddress
FromName string
Offline bool
Indent bool
SkipConfirm bool
}
KafkaCliCtx : client tx without codec
type KafkaMsg ¶
type KafkaMsg struct {
Msg sdk.Msg `json:"msg"`
TicketID Ticket `json:"ticketID"`
BaseRequest rest.BaseReq `json:"base_req"`
KafkaCli KafkaCliCtx `json:"kafkaCliCtx"`
}
KafkaMsg : is a store that can be stored in kafka queues
func KafkaTopicConsumer ¶
func KafkaTopicConsumer(topic string, consumers map[string]sarama.PartitionConsumer, cdc *codec.Codec) KafkaMsg
KafkaTopicConsumer : Takes a consumer and makes it consume a topic message at a time
func NewKafkaMsgFromRest ¶
func NewKafkaMsgFromRest(msg sdk.Msg, ticketID Ticket, baseRequest rest.BaseReq, cliCtx context.CLIContext) KafkaMsg
NewKafkaMsgFromRest : makes a msg to send to kafka queue
type KafkaState ¶
type KafkaState struct {
KafkaDB *dbm.GoLevelDB
Admin sarama.ClusterAdmin
Consumer sarama.Consumer
Consumers map[string]sarama.PartitionConsumer
Producer sarama.SyncProducer
Topics []string
}
KafkaState : is a struct showing the state of kafka
func NewKafkaState ¶
func NewKafkaState(kafkaPorts []string) KafkaState
NewKafkaState : returns a kafka state
type Ticket ¶
type Ticket string
Ticket : is a type that implements string
func TicketIDGenerator ¶
TicketIDGenerator is a random unique ticket ID generator, output is a string
type TicketIDResponse ¶
type TicketIDResponse struct {
TicketID Ticket `json:"ticketID" valid:"required~ticketID is mandatory,length(20)~ticketID length should be 20" `
}
TicketIDResponse : is a json structure to send TicketID to user