kafka_consumer

Reads messages from one or more Kafka topics, converts them into MarketGrid transactions, and publishes the replies onto another topic.

Required options

NameTypeDescriptionDefault
topic_v2StringThe name of the v2 API topic. Messages consumed from this topic will never be treated as v1 messages.queue
operating_modeOne of OperatingModeThe mode the corresponding te_engine process is running in.

All options

NameTypeDescriptionDefault
service_nameStringThe name of this servicenq_18515
helpInteger (unsigned)Print this help
threadsInteger (unsigned)Number of worker threads to spawn (zero means the number of available CPUs)
debugInteger (unsigned)Start in REPL mode
shellInteger (unsigned)Start in REPL-only shell mode (no server)
inspectInteger (unsigned)Start with the Node.js debugger active
print_configInteger (unsigned)Print the configuration
runStringExecute the specified script file (blank to disable)
listInteger (unsigned)List available script files
sockets_dirStringBase directory for IPC sockets${sockets_dir}
instancekeyStringThe key of the MarketGrid instance to connect to (leave blank to use the default)
base_portInteger (unsigned)The base port for this MarketGrid instance
transaction_socketStringThe address of the raw transaction queue. Example for tserver raw tcp: tcp://0.0.0.0:11200. Engine transaction queue ipc: ipc:///opt/MarketGrid/var/sockets/mg_11000_transaction_queue_12001. Leave blank for a local IPC connection.
blob_socketStringThe address of the blob transaction queue. Example for tserver raw tcp: tcp://0.0.0.0:11600. Engine transaction queue ipc: ipc:///opt/MarketGrid/var/sockets/mg_11000_blob_queue_12009. Leave blank for a local IPC connection.
mdb_connectionsStringComma-separated list of MDB processes to connect to, in the form (process_name:hostname:port). Leave blank to skip connecting to any MDB processes.
timeoutInteger (unsigned)Timeout (in seconds) after which to abort startup if no upstream connection was achieved. It has the following special values: 0 => Wait forever to connect upstream (no timeout set) -1 => Disable waiting for transaction server (only connect to shared memory) N.B. Not used after startup e.g. when connections are lost.
offlineInteger (unsigned)Do not attach to shared memory or connect to a transaction server. Most functions will be unavailable.
environmentStringA string identifying this environment (e.g. DEV, UAT, PROD)DEV
healthcheck_portInteger (unsigned)Port on which to listen for HTTP healthcheck pings. (0 to disable)
system_private_keyStringPath to a private key used for generating JWTs for system users${etc_dir}/marketgrid.key
quietInteger (unsigned)Suppress informational logging messages
verboseInteger (unsigned)Output debug log messages
colourInteger (unsigned)Whether or not to colourise log output (default true)
trace_apiStringPath to which to trace API messages (blank to disable)
trace_wsStringPath to which to trace WebSocket messages (blank to disable)
trace_ws_filterStringFilter WebSocket message trace output by message class name (empty for no filter)
trace_restStringPath to which to trace REST messages (blank to disable)
trace_mdbStringPath to which to trace MDB queries (blank to disable)
export_metricsInteger (unsigned)Determines if metrics should be exported to the run directory
export_metrics_intervalInteger (unsigned)How often (in milliseconds) metrics will be exported to the run directory. Has no effect if --export_metrics is false
no_abi_checkInteger (unsigned)Don't check the build ABI checksum against the running system (dangerous!)
client_idStringKafka client IDkafka_consumer
brokersStringComma-separated list of Kafka brokers to connect to127.0.0.1:9092
groupStringConsumer group IDKServer-consumer
topicsStringComma-separated list of Kafka topics on which to listen for messages
topic_v2StringThe name of the v2 API topic. Messages consumed from this topic will never be treated as v1 messages.queue
topic_ackStringKafka topic onto which acknowlegment messages are published
topic_producerStringKafka topic to publish errors that are to be in engine standard broadcast stream
sender_idStringId of the sender to go in the Kafka header for acknowlegmentsMarketGrid
userStringUser ID used to enter transactions into MarketGridadmin
errormessagesInteger (unsigned)Send errors as MESSAGE_TYPE_Error, rather than MESSAGE_TYPE_Reply
debug_levelStringlibrdkafka debug level
maxknownkafkaidsInteger (unsigned)Maximum number of past kafka ids to check for uniqueness
batch_sizeInteger (unsigned)Maximum number of messages to read in a single consume() call
commit_retry_msInteger (unsigned)Time (in milliseconds) to wait before retrying a message commit
enable_api_v1Integer (unsigned)Enable consuming v1 API messages (enabled by default)
enable_api_v2Integer (unsigned)Enable consuming v2 API messages (enabled by default)
operating_modeOne of OperatingModeThe mode the corresponding te_engine process is running in.
transaction_timeoutInteger (unsigned)Timeout (in milliseconds) before a transaction sent to the engine will be considered lost. If a transaction times out, the consumer will exit.