Documentation
¶
Index ¶
- Constants
- Variables
- type KafkaOpreations
- func (ops *KafkaOpreations) Consume(data chan<- []byte, topic string, consumerIndex int)
- func (ops *KafkaOpreations) InitPaymentConsumer(groupId string, router func(string, []byte, *KafkaOpreations)) error
- func (ops *KafkaOpreations) InitProducer() error
- func (ops *KafkaOpreations) Produce(topic string, msg []byte) error
- func (op *KafkaOpreations) SendErrMsg(msgId, instructionId, standardType, reqMsgType, ofiId, rfiId string, ...)
- func (op *KafkaOpreations) SendRequestToKafka(topic string, msg []byte) error
Constants ¶
View Source
const ( KAFKA_SSL = "ssl" KAFKA_SASL = "sasl_ssl" ENV_KEY_KAFKA_AUTH_MODE = "KAFKA_AUTH_MODE" ENV_KEY_KAFKA_CA_LOCATION = "KAFKA_CA_LOCATION" ENV_KEY_KAFKA_CERTIFICATE_LOCATION = "KAFKA_CERTIFICATE_LOCATION" ENV_KEY_KAFKA_KEY_LOCATION = "KAFKA_KEY_LOCATION" ENV_KEY_KAFKA_KEY_PASSWORD = "KAFKA_KEY_PASSWORD" ENV_KEY_KAFKA_KEY_SASL_USER = "KAFKA_KEY_SASL_USER" ENV_KEY_KAFKA_KEY_SASL_PASSWORD = "KAFKA_KEY_SASL_PASSWORD" ENV_KEY_KAFKA_KEY_SASL_MECHANISM = "KAFKA_KEY_SASL_MECHANISM" ENV_KEY_KAFKA_BROKER_URL = "KAFKA_BROKER_URL" ENV_KEY_KAFKA_GROUP_ID = "KAFKA_GROUP_ID" ENV_KEY_KAFKA_AUTO_OFF_RESET = "KAFKA_AUTO_OFF_RESET" PAYMENT_TOPIC = "PAYMENT" QUOTES_TOPIC = "QUOTES" FEE_TOPIC = "FEE" TRANSACTION_TOPIC = "TRANSACTIONS" REQUEST_TOPIC = "_req" RESPONSE_TOPIC = "_res" ANCHOR_REDEMPTION_TOPIC = "ANCHOR_REDEMPTION" + REQUEST_TOPIC )
Variables ¶
View Source
var LOGGER = logging.MustGetLogger("kafka")
View Source
var SUPPORT_MESSAGE_TYPES = []string{PAYMENT_TOPIC, QUOTES_TOPIC, FEE_TOPIC, TRANSACTION_TOPIC}
Functions ¶
This section is empty.
Types ¶
type KafkaOpreations ¶
type KafkaOpreations struct {
BrokerURL string
AutoOffReset string
SecurityProtocol string
SslCaLocation string
SslCertificateLocation string
SslKeyLocation string
SslKeyPassword string
SaslUsername string
SaslPassword string
SaslMechanism string
Producer *kafka.Producer
Consumers []*kafka.Consumer
GroupId string
//used only by send-service
FundHandler transaction.CreateFundingOpereations
SignHandler signing.CreateSignOperations
WhitelistHandler whitelist_handler.ParticipantWhiteList
DbClient *database.PostgreDatabaseClient
ResponseHandler *parse.ResponseHandler
}
func Initialize ¶
func Initialize() (*KafkaOpreations, error)
func (*KafkaOpreations) Consume ¶
func (ops *KafkaOpreations) Consume(data chan<- []byte, topic string, consumerIndex int)
func (*KafkaOpreations) InitPaymentConsumer ¶
func (ops *KafkaOpreations) InitPaymentConsumer(groupId string, router func(string, []byte, *KafkaOpreations)) error
func (*KafkaOpreations) InitProducer ¶
func (ops *KafkaOpreations) InitProducer() error
func (*KafkaOpreations) Produce ¶
func (ops *KafkaOpreations) Produce(topic string, msg []byte) error
func (*KafkaOpreations) SendErrMsg ¶
func (op *KafkaOpreations) SendErrMsg(msgId, instructionId, standardType, reqMsgType, ofiId, rfiId string, errType int)
Send back errors happened on RFI site during request processing to OFI
func (*KafkaOpreations) SendRequestToKafka ¶
func (op *KafkaOpreations) SendRequestToKafka(topic string, msg []byte) error
Click to show internal directories.
Click to hide internal directories.