Documentation
¶
Index ¶
- Variables
- func AddResponseToDB(ticketID Ticket, response []byte, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec)
- func CliCtxFromKafkaMsg(msg KafkaMsg, cliCtx context.CLIContext) context.CLIContext
- func GetResponseFromDB(ticketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec) []byte
- func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
- func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
- func NewConsumer(kafkaPorts []string) sarama.Consumer
- func NewProducer(kafkaPorts []string) sarama.SyncProducer
- func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
- func QueryDB(cdc *codec.Codec, r *mux.Router, kafkaDB *dbm.GoLevelDB) http.HandlerFunc
- func RegisterCodec(cdc *codec.Codec)
- func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
- func SetTicketIDtoDB(ticketID Ticket, kafkaDB *dbm.GoLevelDB, cdc *codec.Codec, msg []byte)
- func TopicsInit(admin sarama.ClusterAdmin, topic string)
- type KafkaCliCtx
- type KafkaMsg
- type KafkaState
- type Ticket
- type TicketIDResponse
Constants ¶
This section is empty.
Variables ¶
var DefaultCLIHome = os.ExpandEnv("$HOME/.kafka")
DefaultCLIHome : is the home path
var ModuleCdc *codec.Codec
module codec
var SleepRoutine = time.Duration(2500000000)
SleepRoutine : the time the kafka msgs are to be taken in
var SleepTimer = time.Duration(1000000000)
SleepTimer : the time the kafka msgs are to be taken in
var TicketIDAtomicCounter int64
TicketIDAtomicCounter is a counter that adds when each time a function is called
var Topics = []string{
"Topic",
}
Topics : is list of topics
Functions ¶
func AddResponseToDB ¶
AddResponseToDB : Updates response to DB
func CliCtxFromKafkaMsg ¶
func CliCtxFromKafkaMsg(msg KafkaMsg, cliCtx context.CLIContext) context.CLIContext
CliCtxFromKafkaMsg : sets the txctx and clictx again to consume
func GetResponseFromDB ¶
GetResponseFromDB : gives the response from DB
func KafkaAdmin ¶
func KafkaAdmin(kafkaPorts []string) sarama.ClusterAdmin
KafkaAdmin : is admin to create topics
func KafkaProducerDeliverMessage ¶
func KafkaProducerDeliverMessage(msg KafkaMsg, topic string, producer sarama.SyncProducer, cdc *codec.Codec) error
KafkaProducerDeliverMessage : delivers messages to kafka
func NewConsumer ¶
NewConsumer : is a consumer which is needed to create child consumers to consume topics
func NewProducer ¶
func NewProducer(kafkaPorts []string) sarama.SyncProducer
NewProducer is a producer to send messages to kafka
func PartitionConsumers ¶
func PartitionConsumers(consumer sarama.Consumer, topic string) sarama.PartitionConsumer
PartitionConsumers : is a child consumer
func SendToKafka ¶
func SendToKafka(msg KafkaMsg, kafkaState KafkaState, cdc *codec.Codec) []byte
SendToKafka : handles sending message to kafka
func SetTicketIDtoDB ¶
SetTicketIDtoDB : initiates ticketid in Database
func TopicsInit ¶
func TopicsInit(admin sarama.ClusterAdmin, topic string)
TopicsInit : is needed to initialise topics
Types ¶
type KafkaCliCtx ¶
type KafkaCliCtx struct {
OutputFormat string
Height int64
NodeURI string
From string
TrustNode bool
UseLedger bool
BroadcastMode string
VerifierHome string
Simulate bool
GenerateOnly bool
FromAddress sdk.AccAddress
FromName string
Indent bool
SkipConfirm bool
}
KafkaCliCtx : client tx without codec
type KafkaMsg ¶
type KafkaMsg struct {
Msg sdk.Msg `json:"msg"`
TicketID Ticket `json:"ticketID"`
BaseRequest rest.BaseReq `json:"base_req"`
KafkaCli KafkaCliCtx `json:"kafkaCliCtx"`
Password string `json:"password"`
Mode string `json:"mode"`
}
KafkaMsg : is a store that can be stored in kafka queues
func KafkaTopicConsumer ¶
func KafkaTopicConsumer(topic string, consumers map[string]sarama.PartitionConsumer, cdc *codec.Codec) KafkaMsg
KafkaTopicConsumer : Takes a consumer and makes it consume a topic message at a time
type KafkaState ¶
type KafkaState struct {
KafkaDB *dbm.GoLevelDB
Admin sarama.ClusterAdmin
Consumer sarama.Consumer
Consumers map[string]sarama.PartitionConsumer
Producer sarama.SyncProducer
Topics []string
}
KafkaState : is a struct showing the state of kafka
func NewKafkaState ¶
func NewKafkaState(kafkaPorts []string) KafkaState
NewKafkaState : returns a kafka state
type Ticket ¶
type Ticket string
Ticket : is a type that implements string
func TicketIDGenerator ¶
TicketIDGenerator is a random unique ticket ID generator, output is a string
type TicketIDResponse ¶
type TicketIDResponse struct {
TicketID Ticket `json:"TicketID" valid:"required~TicketID is mandatory,length(20)~RelayerAddress length should be 20" `
}
TicketIDResponse : is a json structure to send TicketID to user