Kafka Connect HTTP Connector
DisclaimerIt is a fork of the castorm/kafka-connect-http project, designed to improve its latest implementation and continue support as support for the original project is virtually nonexistent.
This connector is for you if
- You want to (live) replicate a dataset exposed through JSON/HTTP API
- You want to do so efficiently
- You want to capture only changes, not full snapshots
- You want to do so via configuration, with no custom coding
- You want to be able to extend the connector if it comes to that
Source Connector
io.github.comrada.kafka.connect.http.HttpSourceConnector
Timer: Throttling HttpRequest
Controls the rate at which HTTP requests are performed by informing the task, how long until the next execution is due.
http.timerpublic interface Timer extends Configurable { Long getRemainingMillis(); default void reset(Instant lastZero) { // Do nothing } }
- Type:
Class- Default:
io.github.comrada.kafka.connect.timer.AdaptableIntervalTimer- Available implementations:
io.github.comrada.kafka.connect.timer.FixedIntervalTimerio.github.comrada.kafka.connect.timer.AdaptableIntervalTimer
Throttling HttpRequest with FixedIntervalThrottler
Throttles rate of requests based on a fixed interval.
http.timer.interval.millisInterval in between requests
- Type:
Long- Default:
60000
Throttling HttpRequests with AdaptableIntervalThrottler
Throttles rate of requests based on a fixed interval. It has, however, two modes of operation, with two different intervals:
- Up to date No new records in last poll, or there were new records, but “recently” created (shorter than interval)
- Catching up There were new records in last poll, but they were created “long ago” (longer than interval)
http.timer.interval.millisInterval in between requests when up-to-date
- Type:
Long- Default:
60000
http.timer.catchup.interval.millisInterval in between requests when catching up
- Type:
Long- Default:
30000
HttpRequestFactory: Creating a HttpRequest
The first thing our connector will need to do is creating a HttpRequest.
http.request.factorypublic interface HttpRequestFactory extends Configurable { HttpRequest createRequest(Offset offset); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.request.template.TemplateHttpRequestFactory- Available implementations:
io.github.comrada.kafka.connect.http.request.template.TemplateHttpRequestFactory
http.offset.initialInitial offset, comma separated list of pairs.
- Example:
property1=value1, property2=value2- Type:
String- Default:
""
Creating a HttpRequest with TemplateHttpRequestFactory
This HttpRequestFactory is based on template resolution.
http.request.methodHttp method to use in the request.
- Type:
String- Default:
GET
http.request.urlHttp url to use in the request.
- Required
- Type:
String
http.request.headersHttp headers to use in the request,
,separated list of:separated pairs.
- Example:
Name: Value, Name2: Value2- Type:
String- Default:
""
http.request.paramsHttp query parameters to use in the request,
&separated list of=separated pairs.
- Example:
name=value & name2=value2- Type:
String- Default:
""
http.request.bodyHttp body to use in the request.
- Type:
String- Default:
""
http.request.template.factorypublic interface TemplateFactory { Template create(String template); } public interface Template { String apply(Offset offset); }Class responsible for creating the templates that will be used on every request.
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory- Available implementations:
io.github.comrada.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactoryImplementation based on FreeMarker which accepts offset properties withoutoffsetnamespace (Deprecated)io.github.comrada.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactoryImplementation based on FreeMarkerio.github.comrada.kafka.connect.http.request.template.NoTemplateFactory
Creating a HttpRequest with FreeMarkerTemplateFactory
FreeMarker templates will have the following data model available:
offsetkeytimestamp(as ISO8601 string, e.g.:2020-01-01T00:00:00Z)- … (custom offset properties)
Accessing any of the above withing a template can be achieved like this:
http.request.params=after=${offset.timestamp}
For an Epoch representation of the same string, FreeMarker built-ins should be used:
http.request.params=after=${offset.timestamp?datetime.iso?long}
For a complete understanding of the features provided by FreeMarker, please, refer to the User Manual
HttpClient: Executing a HttpRequest
Once our HttpRequest is ready, we have to execute it to get some results out of it. That’s the purpose of the
HttpClient
http.clientpublic interface HttpClient extends Configurable { HttpResponse execute(HttpRequest request) throws IOException; }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.client.okhttp.OkHttpClient- Available implementations:
io.github.comrada.kafka.connect.http.client.okhttp.OkHttpClient
Executing a HttpRequest with OkHttpClient
Uses a OkHttp client.
http.client.connection.timeout.millisTimeout for opening a connection
- Type:
Long- Default:
2000
http.client.read.timeout.millisTimeout for reading a response
- Type:
Long- Default:
2000
http.client.connection.ttl.millisTime to live for the connection
- Type:
Long- Default:
300000
http.client.proxy.hostHostname of the HTTP Proxy
- Type:
String- Default: ``
http.client.proxy.portPort of the HTTP Proxy
- Type:
Integer- Default:
3128
http.client.proxy.usernameUsername of the HTTP Proxy
- Type:
String- Default: ``
http.client.proxy.passwordPassword of the HTTP Proxy
HttpAuthenticator: Authenticating a HttpRequest
When executing the request, authentication might be required. The HttpAuthenticator is responsible for resolving the Authorization header
to be included in the HttpRequest.
http.authpublic interface HttpAuthenticator extends Configurable { Optional<String> getAuthorizationHeader(); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.auth.ConfigurableHttpAuthenticator- Available implementations:
io.github.comrada.kafka.connect.http.auth.ConfigurableHttpAuthenticatorio.github.comrada.kafka.connect.http.auth.NoneHttpAuthenticatorio.github.comrada.kafka.connect.http.auth.BasicHttpAuthenticator
Authenticating with ConfigurableHttpAuthenticator
Allows selecting the authentication type via configuration property
http.auth.typeType of authentication
- Type:
Enum { None, Basic }- Default:
None
Authenticating with BasicHttpAuthenticator
Allows selecting the authentication type via configuration property
http.auth.user
- Type:
String- Default:
""
http.auth.password
- Type:
String- Default:
"""
HttpResponseParser: Parsing a HttpResponse
Once our HttpRequest has been executed, as a result we’ll have to deal with a HttpResponse and translate it into
the list of SourceRecords expected by Kafka Connect.
http.response.parserpublic interface HttpResponseParser extends Configurable { List<SourceRecord> parse(HttpResponse response); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.response.PolicyHttpResponseParser- Available implementations:
io.github.comrada.kafka.connect.http.response.PolicyHttpResponseParserio.github.comrada.kafka.connect.http.response.KvHttpResponseParser
Parsing with PolicyHttpResponseParser
Vets the HTTP response deciding whether the response should be processed, skipped or failed. This decision is delegated
to a HttpResponsePolicy.
When the decision is to process the response, this processing is delegated to a secondary HttpResponseParser.
HttpResponsePolicy: Vetting a HttpResponse
http.response.policypublic interface HttpResponsePolicy extends Configurable { HttpResponseOutcome resolve(HttpResponse response); enum HttpResponseOutcome { PROCESS, SKIP, FAIL } }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.response.StatusCodeHttpResponsePolicy- Available implementations:
io.github.comrada.kafka.connect.http.response.StatusCodeHttpResponsePolicy
http.response.policy.parser
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.response.KvHttpResponseParser- Available implementations:
io.github.comrada.kafka.connect.http.response.KvHttpResponseParser
Vetting with StatusCodeHttpResponsePolicy
Does response vetting based on HTTP status codes in the response and the configuration below.
http.response.policy.codes.processComma separated list of code ranges that will result in the parser processing the response
- Example:
200..205, 207..210- Type:
String- Default:
200..299
http.response.policy.codes.skipComma separated list of code ranges that will result in the parser skipping the response
- Example:
300..305, 307..310- Type:
String- Default:
300..399
Parsing with KvHttpResponseParser
Parses the HTTP response into a key-value SourceRecord. This process is decomposed in two steps:
- Parsing the
HttpResponseinto aKvRecord - Mapping the
KvRecordinto aSourceRecord
http.response.record.parserpublic interface KvRecordHttpResponseParser extends Configurable { List<KvRecord> parse(HttpResponse response); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParser- Available implementations:
io.github.comrada.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParser
http.response.record.mapperpublic interface KvSourceRecordMapper extends Configurable { SourceRecord map(KvRecord record); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.record.SchemedKvSourceRecordMapper- Available implementations:
io.github.comrada.kafka.connect.http.record.SchemedKvSourceRecordMapperMaps key to a Struct schema with a single propertykey, and value to a Struct schema with a single propertyvalueio.github.comrada.kafka.connect.http.record.StringKvSourceRecordMapperMaps both key and value to aStringschema
Parsing with JacksonKvRecordHttpResponseParser
Uses Jackson to look for the records in the response.
http.response.list.pointerJsonPointer to the property in the response body containing an array of records
- Example:
/items- Type:
String- Default:
/
http.response.record.pointerJsonPointer to the individual record to be used as kafka record body. Useful when the object we are interested in is under a nested structure
- Type:
String- Default:
/
http.response.record.offset.pointerComma separated list of
key=/valuepairs where the key is the name of the property in the offset, and the value is the JsonPointer to the value being used as offset for future requests. This is the mechanism that enables sharing state in betweenHttpRequests.HttpRequestFactoryimplementations receive thisOffset.Special properties:
keyis used as record’s identifier, used for de-duplication and topic partition routingtimestampis used as record’s timestamp, used for de-duplication and orderingOne of the roles of the offset, even if not required for preparing the next request, is helping in deduplication of already seen records, by providing a sense of progress, assuming consistent ordering. (e.g. even if the response returns some repeated results in between requests because they have the same timestamp, anything prior to the last seen offset will be ignored). see
OffsetFilterFactory
- Example:
id=/itemId- Type:
String- Default:
""
http.response.record.timestamp.parserClass responsible for converting the timestamp property captured above into a
java.time.Instant.
- Type:
String- Default:
io.github.comrada.kafka.connect.http.response.timestamp.EpochMillisOrDelegateTimestampParser- Available implementations:
io.github.comrada.kafka.connect.http.response.timestamp.EpochMillisTimestampParserImplementation that captures the timestamp as an epoch millis longio.github.comrada.kafka.connect.http.response.timestamp.EpochMillisOrDelegateTimestampParserImplementation that tries to capture as epoch millis or delegates to another parser in case of failureio.github.comrada.kafka.connect.http.response.timestamp.DateTimeFormatterTimestampParserImplementation based on based on aDateTimeFormatterio.github.comrada.kafka.connect.http.response.timestamp.NattyTimestampParserImplementation based on Natty parserio.github.comrada.kafka.connect.http.response.timestamp.RegexTimestampParserImplementation that extracts substring from timestamp column and parse it
http.response.record.timestamp.parser.patternWhen using
DateTimeFormatterTimestampParser, a custom pattern can be specified
- Type:
String- Default:
yyyy-MM-dd'T'HH:mm:ss[.SSS]X
http.response.record.timestamp.parser.zoneTimezone of the timestamp. Accepts ZoneId valid identifiers
- Type:
String- Default:
UTC
http.response.record.timestamp.parser.regexWhen using
RegexTimestampParser, a custom regex pattern can be specified
- Type:
String- Default:
.*
http.response.record.timestamp.parser.regex.delegateWhen using
RegexTimestampParser, a delegate class to parse timestamp
- Type:
Class- Default:
DateTimeFormatterTimestampParser
Mapping a KvRecord into SourceRecord with SimpleKvSourceRecordMapper
Once we have our KvRecord we have to translate it into what Kafka Connect is expecting: SourceRecords
Embeds the record properties into a common simple envelope to enable schema evolution. This envelope simply contains a key and a value properties with customizable field names.
Here is also where we’ll tell Kafka Connect to what topic and on what partition do we want to send our record.
** It’s worth noticing there are projects out there that allow you to infer the schema from your json document. (e.g. expandjsonsmt)
kafka.topicName of the topic where the record will be sent to
- Required
- Type:
String- Default:
""
http.record.schema.key.property.nameName of the key property in the key-value envelope
- Type:
String- Default:
key
http.record.schema.value.property.nameName of the value property in the key-value envelope
- Type:
String- Default:
value
SourceRecordSorter: Sorting SourceRecords
Some Http resources not designed for CDC, return snapshots with most recent records first. In this cases de-duplication is especially important, as subsequent request are likely to produce similar results. The de-duplication mechanisms offered by this connector are order-dependent, as they are usually based on timestamps.
To enable de-duplication in cases like this, we can instruct the connector to assume a specific order direction, either
ASC, DESC, or IMPLICIT, where implicit figures it out based on records’ timestamps.
http.record.sorterpublic interface SourceRecordSorter extends Configurable { List<SourceRecord> sort(List<SourceRecord> records); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.record.OrderDirectionSourceRecordSorter- Available implementations:
io.github.comrada.kafka.connect.http.record.OrderDirectionSourceRecordSorter
http.response.list.order.directionOrder direction of the results in the response list.
- Type:
Enum { ASC, DESC, IMPLICIT }- Default:
IMPLICIT
SourceRecordFilterFactory: Filtering out SourceRecord
There are cases when we’ll be interested in filtering out certain records. One of these would be de-duplication.
http.record.filter.factorypublic interface SourceRecordFilterFactory extends Configurable { Predicate<SourceRecord> create(Offset offset); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.record.OffsetRecordFilterFactory- Available implementations:
io.github.comrada.kafka.connect.http.record.OffsetRecordFilterFactoryio.github.comrada.kafka.connect.http.record.OffsetTimestampRecordFilterFactoryio.github.comrada.kafka.connect.http.record.PassthroughRecordFilterFactory
Filtering out SourceRecord with OffsetTimestampRecordFilterFactory
De-duplicates based on Offset’s timestamp, filtering out records with earlier or the same timestamp.
Useful when timestamp is used to filter the HTTP resource, but the filter does not have full timestamp precision.
Assumptions:
- Records are ordered by timestamp
- No two records can contain the same timestamp (to whatever precision the HTTP resource uses)
If the latter assumption cannot be satisfied, check OffsetRecordFilterFactory to try and prevents data loss.
Filtering out SourceRecord with OffsetRecordFilterFactory
De-duplicates based on Offset’s timestamp, key and any other custom property present in the Offset, filtering out
records with earlier timestamps, or when in the same timestamp, only those up to the last seen Offset properties.
Useful when timestamp alone is not unique but together with some other Offset property is.
Assumptions:
- Records are ordered by timestamp
- There is an
Offsetproperty that uniquely identify records (e.g. key) - There won’t be new items preceding already seen ones
HttpResponseTransformer: Transforming a HttpResponse
Once our HttpRequest has been executed, as a result we’ll have to deal with a HttpResponse, we can immediately do some
kind of transformation with it. This will happen before the HttpResponseParser starts its work.
This may be useful in various cases, e.g. to correct JSON into more usable form, to cast types of fields, or to select
certain number of fields from complex data structure.
As the transformer receives the full HttpResponse object, it can also modify server headers and HTTP status code.
http.response.transformerpublic interface HttpResponseTransformer extends Configurable { HttpResponse transform(HttpResponse response); }
- Type:
Class- Default:
io.github.comrada.kafka.connect.http.response.transform.NoneTransformer- Available implementations:
io.github.comrada.kafka.connect.http.response.transform.JsltBodyTransformer
Transforming with JsltBodyTransformer
Transformer first checks the server response for application/json content type and if it is different, it fails.
Transformer allows you to flexibly convert JSON from one form to another. You can learn examples of the syntax in the
official documentation.
http.response.transform.jslt
- Example:
[for(.symbols) {"symbol": .symbol,"status": .status,"baseAsset": .baseAsset,"quoteAsset": .quoteAsset} if(.status == "TRADING" and .quoteAsset == "USDT")]- Type:
String- Default:
"."
http.auth.password
- Type:
String- Default:
"""
Development
Building
mvn package
Running the tests
mvn test
Releasing
- Update CHANGELOG.md and README.md files.
- Prepare release:
mvn release:clean release:prepare
Contributing
Contributions are welcome via pull requests, pending definition of code of conduct, please just follow existing conventions.
License
This project is licensed under the Apache 2.0 License - see the LICENSE.txt file for details
Built With
- Maven - Dependency Management
- Kafka Connect - The framework for our connectors
- OkHttp - HTTP Client
- Jackson - Json deserialization
- FreeMarker - Template engine
- Natty - Date parser
- JSLT - JSLT is a complete query and transformation language for JSON
Acknowledgments
- Inspired by castorm/kafka-connect-http