OIC Kafka Adapter - Trigger
The November release for the Oracle Integration Cloud (OIC) delivered the Trigger capability for the Kafka Adapter. Up until now we could only use Scheduled Integrations to consume messages. Now, we can make use of the App Driven Integration which will be triggered whenever new messages arrive to the subscribed Kafka topic. It's noteworthy to mention that this feature requires the connectivity agent in order to run.
See the official release notes here:
If you are new to the OIC Kafka Adapter, start with this post, where I covered Kafka installation, OIC prerequisites and the consumer/producer capabilities.
Now I am going to cover the Kafka Adapter as a Trigger.
In my Kafka server I will create a new Topic for this specific use case.
Create a new Kafka topic
Using the command line:
./kafka-topics.sh --create --bootstrap-server localhost:9092 -- replication-factor 1 --partitions 1 --topic TopicTrigger
Create a new Connection
The connection from the previous post was only for Invoke, so we need a new one for both Trigger/Invoke.
Bootstrap Server:
I used localhost:9092* – this is because the actual connectivity is handled by the agent, so in reality we are connecting to the Kafka server as if we were inside the machine where it runs. You can also use the private ip of the machine instead of localhost.
*9092 is the default Kafka port, but you can verify the one you are using in <Kafka_Home>/config/server.properties
Security:
I choose no security policy but in a real-life scenario this needs to be considered. More on this can be found in the official documentation!
Agent Group:
Select the group to which your agent belongs.
Make sure the connectivity agent is up and running.
Test and Save
Create an AppDriven Integration
Drag the new Kafka connection onto the canvas
Select a Topic and Partition:
My Kafka server only has 1 Topic available, and i ll use the default option for the partitions.
Consumer Group: consumer-group
In this post I elaborate a bit more on the above options.
Polling Frequency: How often to I want to check on messages - 10 seconds is more than enough.
Maximum number of records:The amount of records to read in a single time.
I will add an FTP adapter to write the messages into a file.
After we active the Integration it’s time to create some messages in the Topic.
Test and Save
Create an AppDriven Integration
Drag the new Kafka connection onto the canvas
Select a Topic and Partition:
My Kafka server only has 1 Topic available, and i ll use the default option for the partitions.
Consumer Group: consumer-group
In this post I elaborate a bit more on the above options.
Polling Frequency: How often to I want to check on messages - 10 seconds is more than enough.
Maximum number of records:The amount of records to read in a single time.
I will add an FTP adapter to write the messages into a file.
I have a very simple csv
for the schema – just one attribute called message.
We map the content of the message to the FTP Write operation.
TEST This is what the Integration looks like. Obviously for a real use case, we would need to implement a For-Each action, to individually process every message incoming from the Kafka topic.
After we active the Integration it’s time to create some messages in the Topic.
After a maximum of 10 seconds (the defined polling interval) the Integration starts and we can go to the monitoring page that gives us all the tools to track and analyse the payload.
And all works like a charm!
One final note for the added support for Confluent Kafka - This is great as its one of the most used in the market - the prerequisites are obviously different and you can read all about it here.
One final note for the added support for Confluent Kafka - This is great as its one of the most used in the market - the prerequisites are obviously different and you can read all about it here.
Comments
Post a Comment