OIC Kafka Adapter - Trigger



The November release for the Oracle Integration Cloud (OIC) delivered the Trigger capability for the Kafka Adapter. Up until now we could only use Scheduled Integrations to consume messages. Now, we can make use of the App Driven Integration which will be triggered whenever new messages arrive to the subscribed Kafka topic. It's noteworthy to mention that this feature requires the connectivity agent in order to run.

See the official release notes here:

If you are new to the OIC Kafka Adap
ter, start with this post, where I covered Kafka installation, OIC prerequisites and the consumer/producer capabilities.

Now I am going to cover the Kafka Adapter as a Trigger.

In my Kafka server I will create a new Topic for this specific use case.

Create a new Kafka topic

Using the command line:

./kafka-topics.sh --create --bootstrap-server localhost:9092 -- replication-factor 1 --partitions 1 --topic TopicTrigger




Create a new Connection

The connection from the previous post was only for Invoke, so we need a new one for both Trigger/Invoke.



Bootstrap Server:

I used localhost:9092* – this is because the actual connectivity is handled by the agent, so in reality we are connecting to the Kafka server as if we were inside the machine where it runs. You can also use the private ip of the machine instead of localhost.
*9092 is the default Kafka port, but you can verify the one you are using in <Kafka_Home>/config/server.properties

Security:
I choose no security policy but in a real-life scenario this needs to be considered. More on this can be found in the official documentation!

Agent Group:
Select the group to which your agent belongs.

Make sure the connectivity agent is up and running.



Test and Save



Create an AppDriven Integration


Drag the new Kafka connection onto the canvas

Select a Topic and Partition:
My Kafka server only has 1 Topic available, and i ll use the default option for the partitions.
Consumer Group: consumer-group

In this post I elaborate a bit more on the above options.

Polling Frequency: How often to I want to check on messages - 10 seconds is more than enough.

Maximum number of records:The amount of records to read in a single time.


I will add an FTP adapter to write the messages into a file.

I have a very simple csv for the schema – just one attribute called message.



We map the content of the message to the FTP Write operation.

This is what the Integration looks like. Obviously for a real use case, we would need to implement a For-Each action, to individually process every message incoming from the Kafka topic.

TEST

After we active the Integration it’s time to create some messages in the Topic.

After a maximum of 10 seconds (the defined polling interval) the Integration starts and we can go to the monitoring page that gives us all the tools to track and analyse the payload.

           
And all works like a charm!

One final note for the added support for Confluent Kafka - This is great as its one of the most used in the market - the prerequisites are obviously different and you can read all about it here.




Comments