DATAPHOS
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Usage

This page contains all of the information required to further configure the Publisher and ensure proper communication between it and your external resources.

Kubernetes Environment

Enabling Connection to Different Message Brokers

In order to enable Publisher to communicate with the service of your choice, you are required to create and deploy additional secrets in your Kubernetes environment, under the same namespace as Publisher. Google PubSub, for instance, requires the Service Account Key used to connect to your Cloud environment. Kafka, NATS and Pulsar require TLS secrets.

PubSub

Field Name Secret Name Description
key.json pubsub-key Base64-encoded JSON service account key

Kafka

Field Name Secret Name Description
ca_crt.pem kafka-tls-credentials Base64 encoded Kafka cluster CA TLS certificate
client_crt.pem kafka-tls-credentials Base64 encoded Kafka user TLS certificate
client_key.pem kafka-tls-credentials Base64 encoded Kafka user TLS private key

NATS

Field Name Secret Name Description
ca_crt.pem nats-tls-credentials Base64 encoded Nats cluster CA TLS certificate
client_crt.pem nats-tls-credentials Base64 encoded Nats user TLS certificate
client_key.pem nats-tls-credentials Base64 encoded Nats user TLS private key

Pulsar

Field Name Secret Name Description
ca_crt.pem pulsar-tls-credentials Base64 encoded Pulsar cluster CA TLS certificate
client_crt.pem pulsar-tls-credentials Base64 encoded Pulsar user TLS certificate
client_key.pem pulsar-tls-credentials Base64 encoded Pulsar user TLS private key

When deployed, the Publisher Worker will automatically look for these secrets at the time of running the individual jobs.

Publisher Configuration Files

Once you are logged into the WebUI, you can add your source, destination and instance YAML files.

  • Source defines the source from which Publisher will fetch the data.
  • Destination defines to where Publisher will deliver the data (the message broker).
  • Instance connects the Source and Destination, and defines the actual publishing job. Both the source and destination may be used in multiple different publishing jobs.

Source

The Source configuration YAML file is used to define the details regarding the source database that the user wants to use as a source of data.

Shared Source Configuration

Environment variable name Value Type Description Required
username String The username that the user has to provide so that the connection to the desired database can be made. Yes
sourceType String Source type represents the type of data source. Can be one of supported database types or API source.Supported database types: Postgres, Oracle, MySql, Db2, SqlServer, Api, Mongo. Yes
sourceName String Name of the source configuration. This value needs to be unique upon a single Publisher deployment. Specifying the existing source name when creating a new source resource will result in failure. Also, this name cannot be empty, this is the unique identifier of an existing and active source configuration in the database. Yes
port Integer Database port has to be provided so that a connection to the correct source database can be made. Yes
password String Database password for the provided username. Yes
parameters* String Represents any parameters that a specific database might use for performance purposes. These parameters can be defined in the form of a key: value map. No
host String In case of a database source, the database hostname has to be provided so that a connection to the correct source database can be made. In case of an API source, the base API URL has to be provided. E.g., http://exposed-api:3030. The rest of the source API path will be appended based on the query field in the business object. Yes
databaseName String Database name that contains source data. Yes

Source

Environment variable name Value Type Description Optimal Value Required
prefetch_rows Integer Number of rows to prefetch for each fetch request that results in a roundtrip to the Oracle server. 1000 No
prefetch_memory Integer Sets the memory allocated for rows to be prefetched. 100 No

Source YAML file example (Postgres)

sourceName: publisher-postgres-source
sourceType: Postgres
host: <LoadBalancer IP address>
port: 5432
databaseName: invoices
username: demo_user
password: demo_password

Destination

The Destination configuration YAML file is used to define the details regarding the destination service that the user wants to use as a final destination of data. Every destination includes parameters from the Destination tab. Each destination type has specific parameters that have to be added in the YAML file. These parameters are specified in the other tabs.

Destination

Environment variable name Value Type Description Required
destinationName String Name of the destination configuration. This value needs to be unique upon a single Publisher deployment. Specifying the existing destination name when creating a new destination resource will result in failure. Also, this name cannot be empty, this is the unique identifier of an existing and active destination configuration in the database. Yes
destinationType String Destination type represents the type of destination service that destination configuration will be using. Supported destination types: PubSub, Solace, Kafka, Azure (ServiceBus from version 0.5.1), NatsCore, NatsJetStream. Yes
parameters* Key-value map The values specific for some messaging platforms can be defined in the form of a key: value map. Yes

Google PubSub

Environment variable name Value Type Description Required
ProjectID String Name of the Google Cloud Platform project that contains the topic where data will be sent to. Yes
TopicID String Name of the Google Cloud Platform topic which will receive sent data. Yes
ByteThreshold Integer Suggested value: 150000000 No
CountThreshold Integer Suggested value: 400 No
DelayThreshold Integer Suggested value: 10000000 No
NumOfGoroutines Integer Suggested value: 20 No
MaxOutStandingMessages Integer Suggested value: 800 No
MaxOutStandingBytes Integer Suggested value: 1000 * 1024 * 1024 No
EnableMessageOrdering Boolean Suggested value: false No

Kafka

Environment variable name Value Type Description Required
BrokerAddr String List of Kafka broker endpoints separated by commas, or a single broker. Example: 10.0.42.206:9092 Yes
TopicID String Name of the Kafka topic which will receive sent data. Yes
BatchSize Integer BatchSize sets the max amount of records the client will buffer, blocking new produces until records are finished if this limit is reached. No
BatchBytes Integer (Int64) BatchBytes when multiple records are sent to the same partition, the producer will batch them together. BatchBytes parameter controls the amount of memory in bytes that will be used for each batch. No
Linger Integer Linger controls the amount of time to wait for additional messages before sending the current batch. No
TLS String Whether encryption should be enabled. Possible values: true or false. Default value is false. No
VerifyServerCertificate Boolean If set to true and TLS is enabled, ca_cert.pem file defined in the kafka-tls-credentials secret is used to authenticate the Kafka broker. Default value is false. No
VerifyClientCertificate Boolean If set to true and TLS is enabled, client_crt.pem and client_key.pem file defined in the kafka-tls-credentials secret are used to authenticate the client on the broker. Default value is false. No

Solace

Environment variable name Value Type Description Required
BrokerURI String URI to connect to the broker. Yes
TopicID String Name of the topic which will receive sent data. Yes
Username String The username for the client. Yes
Password String The password for the client. Yes
Qos Integer Level of quality of service. Default value is 1. No

Azure

Environment variable name Value Type Description Required
ConnectionString String A connection string includes the authorization information required to access data in an Azure Storage account at runtime using Shared Key authorization Yes
TopicID String Name of the topic which will receive sent data. Yes

Pulsar

Environment variable name Value Type Description Required
ServiceURL String URL to connect to the brokers. Yes
TopicID String Name of the topic which will receive sent data. Yes
TLS String Whether encryption should be enabled. Possible values: true or false. Default value is false. No
VerifyServerCertificate Boolean If set to true and TLS is enabled, ca_cert.pem file defined in the pulsar-tls-credentials secret is used to authenticate the Pulsar broker. Default value is false. No
VerifyClientCertificate Boolean If set to true and TLS is enabled, client_crt.pem and client_key.pem file defined in the pulsar-tls-credentials secret are used to authenticate the client on the broker. Default value is false. No
ConnectionTimeout Integer ConnectionTimeout is the timeout for the establishment of a TCP connection in seconds. Default value is 5 seconds. No
OperationTimeout Integer OperationTimeout is the timeout for creating the publisher. Default value is 30 seconds. No
SendTimeout Integer SendTimeout is the timeout for a published message to be acknowledged by the broker. Default value is 30 seconds. No
MaxConnectionsPerBroker Integer MaxConnectionsPerBroker is the max number of connections to a single broker that will be kept in the pool. Default value is 1. No
DisableBlockIfQueueFull Integer DisableBlockIfQueueFull controls whether publishing blocks if producer’s message queue is full. Default value is false. No
MaxPendingMessages Integer MaxPendingMessages specifies the max size of the queue holding messages waiting an acknowledgment from the broker. Default value is 1. No
MaxReconnectToBroker Integer MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. Default value is nil. This means the client retries forever. No

NATS

Environment variable name Value Type Description Required
URL String URL to connect to the brokers. Yes
Subject String Name of the subject which will receive sent data. Yes
MaxPending Integer MaxPending sets the maximum outstanding async publishes that can be inflight at one time. Default value is 512. No
TLS String Whether encryption should be enabled. Possible values: true or false. Default value is false. No
VerifyServerCertificate Boolean If set to true and TLS is enabled, ca_cert.pem file defined in the kafka-tls-credentials secret is used to authenticate the Kafka broker. Default value is false. No
VerifyClientCertificate Boolean If set to true and TLS is enabled, client_crt.pem and client_key.pem file defined in the kafka-tls-credentials secret are used to authenticate the client on the broker. Default value is false. No

Destination YAML file example (Google Pub/Sub)

destinationName: publisher-pubsub-destination
destinationType: PubSub
parameters:
  ProjectID: <your project id>
  TopicID: <your topic>

Publisher Instance

The Instance configuration YAML file is used to define the job specifics. This is the central type of configuration and its creation and validation depend on the existing active source and destination configurations. The Instance configuration YAML file is fairly minimalistic in approach, but allows enough flexibility to fine-tune the requirements of your publishing job.

Here, we define the query the data will be fetched with, how it will be formatted, serialized, encrypted and, finally, published.

For ease of understanding, we will split the file into three distinct sections:

  • The Instance portion, defining how the source and destination should connect.
  • The Fetcher portion, defining how the data should be pulled from the source.
  • The Business object portion, defining how the data will be transformed directly prior to publishing.

Instance configuration

Environment variable name Value Type Description Required
publisherName String Name of the Publisher instance. This value needs to be unique upon a single Publisher deployment. Specifying the existing publisher name when creating a new Publisher instance will result in failure. Also, this value cannot be empty. Yes
sourceName String Name of the existing source configuration that this Publisher instance will use for setting up the connection to source. Specifying a nonexistent source name will result in failure in the validation process. Yes
destinationName String Name of the existing destination configuration that this Publisher instance will use for setting up destination details. Specifying a nonexistent destination name will result in failure in the validation process. Yes
serializationType String Name of the supported serialization type to be used in Publisher instance. Users can define only one of the possible serialization types. Supported serializations: Avro, Json Yes
encryptionEnabled Boolean A boolean flag indicates if encryption will be used in the Publisher instance. No
encryptionType String If the flag above is set to true, it is expected of the user to specify the encryption type that will be used in the Publisher instance. Users can only input one type. Supported encryptions: Aes256 No (Yes if encryptionEnabled is “Yes”)
encryptionKeyName String Refers to a name of a variable which the user is using to represent his encryption key, so the user does not input the key itself but only the name of that key. No (Yes if encryptionEnabled is “Yes”)
scheduleInterval String A sleeping mechanism for the Publisher. Since Publisher supports scheduled running, this field represents how often the user wants Publisher to do a single run (fetching and publishing of data). This needs to be a cron-type expression. Meaning, the user must set the recurring interval in which Publisher will run (e.g. each minute of each hour */1 * * * *), meanwhile it waits. Default value: null. Yes (only if fetchSkippedScheduledIntervals is true)
schema String Since one of the Publisher’s main components and features is serialization, a schema is required which the user can provide. If the user does not provide one, Publisher will generate one, use it and store it for future purposes. No
scheduledStartTime String The field a user can use to schedule when Publisher will start its initial running. The expected format of the input value is the date of the wanted publisher start (e.g. 2022-01-01 00:00:00), until then Publisher will sleep. This is a bonus feature meaning that by default this value is set to null and that the publisher will start its process immediately upon successful creation of publisher configuration. No
fetcherConfig Key-value map Used when the instance fetches from an API source. Yes
useApiWorker Boolean Used when the instance fetches from an API source. No
useApiJwtAuth Boolean Used when the API source requires JWT authentication to access data. No

Fetcher configuration

Environment variable name Value Type Description Required
useNativeDriver Boolean This boolean flag indicates if the user wants to use native Golang drivers for fetching which are faster but behave inconsistently or the implemented Java fetcher which is more stable but slower. No
UseReflectTypeFetch Boolean Used when native Golang drivers are being used for fetching. If true, fetched data types will be mapped to the ones in the database and therefore will be slower. No
ReturnCsv Boolean CSV format will be returned instead of JSON from the Java Fetcher. No
initialFetchValue Boolean Defines the lower bound of the first, initial fetch period in the WHERE condition clause. For the first run ever, Publisher will do a greater-or-equal ( >= ) condition on it. No
endFetch String Since queries in Publisher are executed on specific time intervals, this represents the date which when reached will stop the fetching process and therefore the Publisher itself. By default it is empty but the user can input the date he desires (e.g. 2010-01-01 00:00:00). No
fetchingThreadsNO Integer The number of parallel threads that will split the query fetch period in order to speed-up row-fetching. No
apiFetchFromParam String The query parameter in the API URL which represents the date from which the data is fetched from. E.g., with date_from apiFetchFromParam: http://exposed-api:3001?date_from=&… Yes (only if useApiWorker is set to true)
apiFetchToParam String The query parameter in the API URL which represents the date until which the data is fetched to. E.g., with date_to apiFetchToParam: http://exposed-api:3001?…&date_to= Yes (only if useApiWorker is set to true)
apiTimeLayout String The format of the timestamp value that the Publisher appends to apiFetchFromParam and apiFetchToParam. E.g., 2006-01-02 15:04:05. Yes (only if useApiWorker is set to true)
apiQueryParams String Map of key-value pairs containing static query parameters for the API URL. No
apiHeaderParams String Map of key-value pairs containing header information for the API request to fetch data. No
apiJwtAuthPath String The path which will be appended to the host field in the source configuration. This URL will be used to generate the JWT. Yes (only if useApiJwtAuth is set to true)
apiJwtAuthBody String Map of key-value pairs containing the body for the POST request to generate the JWT. Yes (only if useApiJwtAuth is set to true)
apiJwtAuthHeaders String Map of key-value pairs containing header information for the API request to generate the JWT. No
queryIncrementType queryIncrementValue String Integer The two parameters as a pair determine the upper bound of the fetch period. Publisher will automatically apply the strictly lower condition ( < ) and calculate the upper bound by using the formula: last successful fetched period + queryIncrementValue queryIncrementType , for example: 2010-03-01 00:00:00 + 1 year => < 2011-03-01 00:00:00 2010-03-01 00:00:00 + 3 month => < 2010-06-01 00:00:00 2010-03-01 00:00:00 + 5 minute => < 2010-03-01 00:05:00 Supported types for queryIncrementType : year, month, day, hour, minute, second Yes
dataSeparatorChar String This value is taken into account when the Publisher is using Java Fetcher in his fetching process. Indicates the separating sign that will be used when separating fetched data. No
fetchSkippedScheduledIntervals Boolean Used when the user wants the Publisher instance to catch up to current time if it stopped for some reason when it is running in scheduled mode. No

Business object

Environment variable name Value Type Description Required
description String Used for describing and clarifying the business objects. No
objectGroup String Can be filled out if this business object belongs to a certain predefined group of business objects. Used for easier searching. No
additionalMetadata Key-value map Any additional metadata that the user wants to provide with the business object. In the form of a key: value map. No
batchMode String The type of database row batching. When set, the rows that are fetched from the database are batched. Each batch of rows is then formatted into a message (if groupElements is defined) by the definition. If groupElements is not defined, each batch is returned as a message.The batches are consisted of a maximum of batchSize rows. Batches can be smaller if there is no more data in the Publisher run to fill the batch.maxRowCount. Batch mode batches database rows by the batchSize number of rows, or less. maxEstimatedSizeInBytes batch mode batches database rows by the batchSize memory size of data in bytes, or less. No
batchSize Integer The size of a batch used when a batchMode is specified. Yes (only if batchMode is specified)
definition String Defines message format. Used for detailing business object structure and fields. Needs to be defined when groupElements are set (grouping is enabled). Database rows that are grouped by the groupElements values into a single business object are formatted into a single message by the definition. Important note: To prevent data loss, every column in the definition that is not the same for each grouped database row in the business object, should be in an array of values (arrayElements). No
arrayElements String Definition elements that will be created and treated as arrays. Those elements need to contain non-repeating columns which can then be treated as array elements (each record that has the same grouping keys will have non-repeating values stored as a single element of array node). No
groupElements String Column list which indicates the values which need to be the same for multiple fetched dataset rows in order for them to be grouped into a single message structure that will fit the specified definition (business object). If groupElements is not defined (grouping is disabled), the fetched database rows are returned as messages without additional formatting. In that case, the definition is not used. No
keyElements String List of columns whose values will be used to create a single value. This value will be one out of the two values (id_elements) that can uniquely mark every record sent to some messaging platform. The values are concatenated with an underscore delimiter. The value extracted using keyElements is set as the key for the message sent to the broker. When groupElements is not defined (grouping is disabled), and batchMode is specified (batching is enabled), the key elements are extracted for the first and last row in the batch and concatenated with a semicolon delimiter. In that case, the recommended value for keyElements is the timestamp column used in the query (the rows are sorted by the timestamp) No
idElements String List of columns whose values will be used to create a single value. This value will be one out of the two values (key_elements) that can uniquely mark every record sent to some messaging platform. The values are concatenated with an underscore delimiter. When groupElements are not defined (grouping is disabled), and batchMode is specified (batching is enabled), the key elements are extracted for the first and last row in the batch and concatenated with a semicolon delimiter. No
query String Users must enter a valid SQL SELECT query (WITH CTE is also supported) used to fetch database rows. The query must select all columns that will be used in the business object definition. The query must have placeholders variables for a timestamp column used to fetch data in uniform timestamp intervals per Publisher run. Publisher will automatically replace the variables with actual timestamps calculated based on configuration. E.g., SELECT invoice_id, cost FROM invoices WHERE creation_date >= to_timestamp({{ .FetchFrom }}, ‘YYYY-MM-DD HH24:MI:SS’) AND creation_date < to_timestamp({{ .FetchTo }}, ‘YYYY-MM-DD HH24:MI:SS’); Yes

Instance YAML file example (Postgres to Google Pub/Sub)

publisherName: publisher-demo
sourceName: publisher-postgres-source
destinationName: publisher-pubsub-destination
serializationType: Avro
encryptionEnabled: false
businessObject:
  description: "Demo Publisher - invoices for client"
  objectGroup: "invoices-client"
  additionalMetadata:
    organization: Syntio
  definition:
    - client_info:
        - client_id
        - client_username
        - client_location
    - invoice_info:
        - invoice_id
        - creation_date
        - due_date
        - fully_paid_date
    - invoice_items:
        - invoice_item_id
        - quantity
        - total_cost
  groupElements:
    - client_id
    - invoice_id
  arrayElements:
    - invoice_items
  keyElements:
    - client_id
    - invoice_id
  idElements:
    - client_id
  query: | 
    SELECT 
      invoice_id, 
      client_id, 
      client_username, 
      client_location, 
      creation_date, 
      due_date, 
      fully_paid_date, 
      invoice_item_id, 
      quantity, 
      total_cost, 
      billing_item_id
    FROM demo_invoices
    WHERE 
      creation_date >= to_timestamp({{ .FetchFrom }},'YYYY-MM-DD HH24:MI:SS') 
      AND 
      creation_date < to_timestamp({{ .FetchTo }}, 'YYYY-MM-DD HH24:MI:SS');
fetcherConfig:
  fetchingThreadsNO: 3
  queryIncrementType: HOUR
  queryIncrementValue: 12
  initialFetchValue: 2020-01-01 00:20:00.000
  useNativeDriver: true

Web UI usage

Below is the screen you will be greeted with after first logging into the Publisher Web UI.

Publisher runs - statistical information about the execution of each instance.

Publisher Instances - overview of Publisher’s instance configurations, providing the ability to start and stop instances as required.

Publisher Lineage - data origin, what happens to it, and where it moves over time.

Publisher queries - testing Publisher instance configuration with verbose error messages. You can view the efficiency of query as well as a sample of the data obtained.

WEB CLI - update and process YAML files to update sources, destinations, configuration. First, add source YAML, then destination and then instance. You can add, update and delete your configuration.