Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Data Processing in GCP With Apache Airflow and BigQuery
Introduction to Modern Data Stack
Over the past few years, Apache Kafka has emerged as the leading standard for streaming data. Fast-forward to the present day: Kafka has achieved ubiquity, being adopted by at least 80% of the Fortune 100. This widespread adoption is attributed to Kafka's architecture, which goes far beyond basic messaging. Kafka's architecture versatility makes it exceptionally suitable for streaming data at a vast "internet" scale, ensuring fault tolerance and data consistency crucial for supporting mission-critical applications. Flink is a high-throughput, unified batch and stream processing engine, renowned for its capability to handle continuous data streams at scale. It seamlessly integrates with Kafka and offers robust support for exactly-once semantics, ensuring each event is processed precisely once, even amidst system failures. Flink emerges as a natural choice as a stream processor for Kafka. While Apache Flink enjoys significant success and popularity as a tool for real-time data processing, accessing sufficient resources and current examples for learning Flink can be challenging. In this article, I will guide you through the step-by-step process of integrating Kafka 2.13-3.7.0 with Flink 1.18.1 to consume data from a topic and process it within Flink on the single-node cluster. Ubuntu-22.04 LTS has been used as an OS in the cluster. Assumptions The system has a minimum of 8 GB RAM and 250 GB SSD along with Ubuntu-22.04.2 amd64 as the operating system. OpenJDK 11 is installed with JAVA_HOME environment variable configuration. Python 3 or Python 2 along with Perl 5 is available on the system. Single-node Apache Kafka-3.7.0 cluster has been up and running with Apache Zookeeper -3.5.6. (Please read here how to set up a Kafka cluster.). Install and Start Flink 1.18.1 The binary distribution of Flink-1.18.1 can be downloaded here. Extract the archive flink-1.18.1-bin-scala_2.12.tgz on the terminal using $ tar -xzf flink-1.18.1-bin-scala_2.12.tgz. After successful extraction, directory flink-1.18.1 will be created. Please make sure that inside it bin/, conf/, and examples/ directories are available. Navigate to the bin directory through the terminal, and execute $ ./bin/start-cluster.sh to start the single-node Flink cluster. Moreover, we can utilize Flink's web UI to monitor the status of the cluster and running jobs by accessing the browser at port 8081. The Flink cluster can be stopped by executing $ ./bin/stop-cluster.sh. List of Dependent JARs The following .jars should be included in the classpath/build file: I've created a basic Java program using Eclipse IDE 23-12 to continuously consume messages within Flink from a Kafka topic. Dummy string messages are being published to the topic using Kafka's built-in kafka-console-publisher script. Upon arrival in the Flink engine, no data transformation occurs for each message. Instead, an additional string is simply appended to each message and printed for verification, ensuring that messages are continuously streamed to Flink. Java package com.dataview.flink; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.dataview.flink.util.IKafkaConstants; public class readFromKafkaTopic { public static void main(String[] args) throws Exception { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(IKafkaConstants.KAFKA_BROKERS) .setTopics(IKafkaConstants.FIRST_TOPIC_NAME) .setGroupId(IKafkaConstants.GROUP_ID_CONFIG) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStream<String> messageStream = see.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); messageStream.rebalance().map(new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(String value) throws Exception { return "Kafka and Flink says: " + value; } }).print(); see.execute(); } } The entire execution has been screen-recorded. If interested, you can watch it below:< I hope you enjoyed reading this. Please stay tuned for another upcoming article where I will explain how to stream messages/data from Flink to a Kafka topic.
In Part 1 of this series, we looked at MongoDB, one of the most reliable and robust document-oriented NoSQL databases. Here in Part 2, we'll examine another quite unavoidable NoSQL database: Elasticsearch. More than just a popular and powerful open-source distributed NoSQL database, Elasticsearch is first of all a search and analytics engine. It is built on the top of Apache Lucene, the most famous search engine Java library, and is able to perform real-time search and analysis operations on structured and unstructured data. It is designed to handle efficiently large amounts of data. Once again, we need to disclaim that this short post is by no means an Elasticsearch tutorial. Accordingly, the reader is strongly advised to extensively use the official documentation, as well as the excellent book, "Elasticsearch in Action" by Madhusudhan Konda (Manning, 2023) to learn more about the product's architecture and operations. Here, we're just reimplementing the same use case as previously, but using this time, using Elasticsearch instead of MongoDB. So, here we go! The Domain Model The diagram below shows our *customer-order-product* domain model: This diagram is the same as the one presented in Part 1. Like MongoDB, Elasticsearch is also a document data store and, as such, it expects documents to be presented in JSON notation. The only difference is that to handle its data, Elasticsearch needs to get them indexed. There are several ways that data can be indexed in an Elasticsearch data store; for example, piping them from a relational database, extracting them from a filesystem, streaming them from a real-time source, etc. But whatever the ingestion method might be, it eventually consists of invoking the Elasticsearch RESTful API via a dedicated client. There are two categories of such dedicated clients: REST-based clients like curl, Postman, HTTP modules for Java, JavaScript, Node.js, etc. Programming language SDKs (Software Development Kit): Elasticsearch provides SDKs for all the most used programming languages, including but not limited to Java, Python, etc. Indexing a new document with Elasticsearch means creating it using a POST request against a special RESTful API endpoint named _doc. For example, the following request will create a new Elasticsearch index and store a new customer instance in it. Plain Text POST customers/_doc/ { "id": 10, "firstName": "John", "lastName": "Doe", "email": { "address": "john.doe@gmail.com", "personal": "John Doe", "encodedPersonal": "John Doe", "type": "personal", "simple": true, "group": true }, "addresses": [ { "street": "75, rue Véronique Coulon", "city": "Coste", "country": "France" }, { "street": "Wulfweg 827", "city": "Bautzen", "country": "Germany" } ] } Running the request above using curl or the Kibana console (as we'll see later) will produce the following result: Plain Text { "_index": "customers", "_id": "ZEQsJI4BbwDzNcFB0ubC", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "_seq_no": 1, "_primary_term": 1 } This is the Elasticsearch standard response to a POST request. It confirms having created the index named customers, having a new customer document, identified by an automatically generated ID ( in this case, ZEQsJI4BbwDzNcFB0ubC). Other interesting parameters appear here, like _version and especially _shards. Without going into too much detail, Elasticsearch creates indexes as logical collections of documents. Just like keeping paper documents in a filing cabinet, Elasticsearch keeps documents in an index. Each index is composed of shards, which are physical instances of Apache Lucene, the engine behind the scenes responsible for getting the data in or out of the storage. They might be either primary, storing documents, or replicas, storing, as the name suggests, copies of primary shards. More on that in the Elasticsearch documentation - for now, we need to notice that our index named customers is composed of two shards: of which one, of course, is primary. A final notice: the POST request above doesn't mention the ID value as it is automatically generated. While this is probably the most common use case, we could have provided our own ID value. In each case, the HTTP request to be used isn't POST anymore, but PUT. To come back to our domain model diagram, as you can see, its central document is Order, stored in a dedicated collection named Orders. An Order is an aggregate of OrderItem documents, each of which points to its associated Product. An Order document references also the Customer who placed it. In Java, this is implemented as follows: Java public class Customer { private Long id; private String firstName, lastName; private InternetAddress email; private Set<Address> addresses; ... } The code above shows a fragment of the Customer class. This is a simple POJO (Plain Old Java Object) having properties like the customer's ID, first and last name, email address, and a set of postal addresses. Let's look now at the Order document. Java public class Order { private Long id; private String customerId; private Address shippingAddress; private Address billingAddress; private Set<String> orderItemSet = new HashSet<>() ... } Here you can notice some differences compared to the MongoDB version. As a matter of fact, with MongoDB, we were using a reference to the customer instance associated with this order. This notion of reference doesn't exist with Elasticsearch and, hence, we're using this document ID to create an association between the order and the customer who placed it. The same applies to the orderItemSet property which creates an association between the order and its items.The rest of our domain model is quite similar and based on the same normalization ideas. For example, the OrderItem document: Java public class OrderItem { private String id; private String productId; private BigDecimal price; private int amount; ... } Here, we need to associate the product which makes the object of the current order item. Last but not least, we have the Product document: Java public class Product { private String id; private String name, description; private BigDecimal price; private Map<String, String> attributes = new HashMap<>(); ... } The Data Repositories Quarkus Panache greatly simplifies the data persistence process by supporting both the active record and the repository design patterns. In Part 1, we used the Quarkus Panache extension for MongoDB to implement our data repositories, but there is not yet an equivalent Quarkus Panache extension for Elasticsearch. Accordingly, waiting for a possible future Quarkus extension for Elasticsearch, here we have to manually implement our data repositories using the Elasticsearch dedicated client. Elasticsearch is written in Java and, consequently, it is not a surprise that it offers native support for invoking the Elasticsearch API using the Java client library. This library is based on fluent API builder design patterns and provides both synchronous and asynchronous processing models. It requires Java 8 at minimum. So, what do our fluent API builder-based data repositories look like? Below is an excerpt from the CustomerServiceImpl class which acts as a data repository for the Customer document. Java @ApplicationScoped public class CustomerServiceImpl implements CustomerService { private static final String INDEX = "customers"; @Inject ElasticsearchClient client; @Override public String doIndex(Customer customer) throws IOException { return client.index(IndexRequest.of(ir -> ir.index(INDEX).document(customer))).id(); } ... As we can see, our data repository implementation must be a CDI bean having an application scope. The Elasticsearch Java client is simply injected, thanks to the quarkus-elasticsearch-java-client Quarkus extension. This way avoids lots of bells and whistles that we would have had to use otherwise. The only thing we need to be able to inject the client is to declare the following property: Properties files quarkus.elasticsearch.hosts = elasticsearch:9200 Here, elasticsearch is the DNS (Domain Name Server) name that we associate with the Elastic search database server in the docker-compose.yaml file. 9200 is the TCP port number used by the server to listen for connections.The method doIndex() above creates a new index named customers if it doesn't exist and indexes (stores) into it a new document representing an instance of the class Customer. The indexing process is performed based on an IndexRequest accepting as input arguments the index name and the document body. As for the document ID, it is automatically generated and returned to the caller for further reference.The following method allows to retrieve the customer identified by the ID given as an input argument: Java ... @Override public Customer getCustomer(String id) throws IOException { GetResponse<Customer> getResponse = client.get(GetRequest.of(gr -> gr.index(INDEX).id(id)), Customer.class); return getResponse.found() ? getResponse.source() : null; } ... The principle is the same: using this fluent API builder pattern, we construct a GetRequest instance in a similar way that we did with the IndexRequest, and we run it against the Elasticsearch Java client. The other endpoints of our data repository, allowing us to perform full search operations or to update and delete customers, are designed the same way. Please take some time to look at the code to understand how things are working. The REST API Our MongoDB REST API interface was simple to implement, thanks to the quarkus-mongodb-rest-data-panache extension, in which the annotation processor automatically generated all the required endpoints. With Elasticsearch, we don't benefit yet from the same comfort and, hence, we need to manually implement it. That's not a big deal, as we can inject the previous data repositories, shown below: Java @Path("customers") @Produces(APPLICATION_JSON) @Consumes(APPLICATION_JSON) public class CustomerResourceImpl implements CustomerResource { @Inject CustomerService customerService; @Override public Response createCustomer(Customer customer, @Context UriInfo uriInfo) throws IOException { return Response.accepted(customerService.doIndex(customer)).build(); } @Override public Response findCustomerById(String id) throws IOException { return Response.ok().entity(customerService.getCustomer(id)).build(); } @Override public Response updateCustomer(Customer customer) throws IOException { customerService.modifyCustomer(customer); return Response.noContent().build(); } @Override public Response deleteCustomerById(String id) throws IOException { customerService.removeCustomerById(id); return Response.noContent().build(); } } This is the customer's REST API implementation. The other ones associated with orders, order items, and products are similar.Let's see now how to run and test the whole thing. Running and Testing Our Microservices Now that we looked at the details of our implementation, let's see how to run and test it. We chose to do it on behalf of the docker-compose utility. Here is the associated docker-compose.yml file: YAML version: "3.7" services: elasticsearch: image: elasticsearch:8.12.2 environment: node.name: node1 cluster.name: elasticsearch discovery.type: single-node bootstrap.memory_lock: "true" xpack.security.enabled: "false" path.repo: /usr/share/elasticsearch/backups ES_JAVA_OPTS: -Xms512m -Xmx512m hostname: elasticsearch container_name: elasticsearch ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 volumes: - node1-data:/usr/share/elasticsearch/data networks: - elasticsearch kibana: image: docker.elastic.co/kibana/kibana:8.6.2 hostname: kibana container_name: kibana environment: - elasticsearch.url=http://elasticsearch:9200 - csp.strict=false ulimits: memlock: soft: -1 hard: -1 ports: - 5601:5601 networks: - elasticsearch depends_on: - elasticsearch links: - elasticsearch:elasticsearch docstore: image: quarkus-nosql-tests/docstore-elasticsearch:1.0-SNAPSHOT depends_on: - elasticsearch - kibana hostname: docstore container_name: docstore links: - elasticsearch:elasticsearch - kibana:kibana ports: - "8080:8080" - "5005:5005" networks: - elasticsearch environment: JAVA_DEBUG: "true" JAVA_APP_DIR: /home/jboss JAVA_APP_JAR: quarkus-run.jar volumes: node1-data: driver: local networks: elasticsearch: This file instructs the docker-compose utility to run three services: A service named elasticsearch running the Elasticsearch 8.6.2 database A service named kibana running the multipurpose web console providing different options such as executing queries, creating aggregations, and developing dashboards and graphs A service named docstore running our Quarkus microservice Now, you may check that all the required processes are running: Shell $ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 005ab8ebf6c0 quarkus-nosql-tests/docstore-elasticsearch:1.0-SNAPSHOT "/opt/jboss/containe…" 3 days ago Up 3 days 0.0.0.0:5005->5005/tcp, :::5005->5005/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp, 8443/tcp docstore 9678c0a04307 docker.elastic.co/kibana/kibana:8.6.2 "/bin/tini -- /usr/l…" 3 days ago Up 3 days 0.0.0.0:5601->5601/tcp, :::5601->5601/tcp kibana 805eba38ff6c elasticsearch:8.12.2 "/bin/tini -- /usr/l…" 3 days ago Up 3 days 0.0.0.0:9200->9200/tcp, :::9200->9200/tcp, 0.0.0.0:9300->9300/tcp, :::9300->9300/tcp elasticsearch $ To confirm that the Elasticsearch server is available and able to run queries, you can connect to Kibana at http://localhost:601. After scrolling down the page and selecting Dev Tools in the preferences menu, you can run queries as shown below: In order to test the microservices, proceed as follows: 1. Clone the associated GitHub repository: Shell $ git clone https://github.com/nicolasduminil/docstore.git 2. Go to the project: Shell $ cd docstore 3. Checkout the right branch: Shell $ git checkout elastic-search 4. Build: Shell $ mvn clean install 5. Run the integration tests: Shell $ mvn -DskipTests=false failsafe:integration-test This last command will run the 17 provided integration tests, which should all succeed. You can also use the Swagger UI interface for testing purposes by firing your preferred browser at http://localhost:8080/q:swagger-ui. Then, in order to test endpoints, you can use the payload in the JSON files located in the src/resources/data directory of the docstore-api project.Enjoy!
In the landscape of data management and analytics, data lakes and data warehouses stand out as two foundational technologies. They serve distinct purposes and offer different advantages, each fitting various needs of organizations in handling big data. Understanding their differences, benefits, and trade-offs is essential for making informed decisions about which to use for specific data storage, management, and analysis needs. Data Lake A data lake is a centralized repository that allows for the storage of structured, semi-structured, and unstructured data at any scale. It can store data in its raw form without needing to first structure the data, making it highly flexible and scalable. Data lakes adopt a “schema-on-read” approach, meaning the data’s structure is not defined until the data is queried. This allows for storing vast amounts of raw, unstructured data from various sources, offering flexibility and adaptability for data analysis and discovery tasks. Data Lake representation Benefits Flexibility in data types and structures: Data lakes can store data in various formats, including logs, XML, JSON, and more. This versatility makes it ideal for organizations dealing with a wide array of data sources. Scalability and cost-effectiveness: With the ability to store vast amounts of data, data lakes leverage the scalability of cloud storage solutions, which can be more cost-effective than traditional data storage options. Advanced analytics and machine learning: Data lakes support big data analytics, machine learning models, and real-time analytics, providing deep insights and enabling data-driven decision-making. Trade-Offs Complex data management: Without proper governance and management, data lakes can become “data swamps,” where unorganized and outdated data makes it challenging to find and utilize information. Security and compliance risks: Managing access and ensuring security for a wide variety of data types can be complex, requiring sophisticated security measures to protect sensitive information. Data Warehouse A data warehouse is a system used for reporting and data analysis, acting as a repository of structured data extracted from various sources. The data is processed, transformed, and loaded into a structured format, making it suitable for querying and analysis. Data warehouses use a “schema-on-write” methodology, where data is cleansed, structured, and defined before storage. This ensures that the data is ready for querying and analysis, facilitating fast and reliable reporting but requiring upfront data modeling efforts. Data Warehouse representation Benefits Structured for easy access: Data is organized into schemas and optimized for SQL queries, making it easier for users to perform complex analyses and generate reports. High performance: Data warehouses are designed to handle complex queries efficiently. They support large volumes of data and numerous simultaneous queries, providing quick and reliable access to insights. Historical data analysis: They excel in storing historical data, enabling trend analysis over time, and helping in forecasting and decision-making. Data integrity and quality: The process of transforming data into a structured format ensures consistency, accuracy, and reliability of the data stored in data warehouses. Trade-Offs Constraints on data types: Data warehouses are less adaptable to unstructured data, requiring data to be converted into a structured format before it can be stored and analyzed. Cost and complexity in scaling: Traditional data warehouses can be expensive and complex to scale, especially as data volume grows. To understand this point, you can read my paper on the CAP theorem, which explains how databases are classified and their inherent limitations: Navigating the CAP Theorem: In search of the perfect database Longer setup and integration time: Setting up a data warehouse and integrating various data sources can be time-consuming, requiring significant upfront investment in planning and development. Conclusion Both data lakes and data warehouses offer valuable capabilities for data storage, management, and analysis. The choice between them depends on the specific needs of an organization, such as the types of data being dealt with, the intended use of the data, and the desired balance between flexibility and structure. For organizations prioritizing flexibility in handling various data types and formats, and focusing on advanced analytics, a data lake might be the more suitable option. On the other hand, for those requiring fast, reliable access to structured data for reporting and historical analysis, a data warehouse could be the better choice. In many cases, organizations find value in utilizing both technologies in a complementary manner, leveraging the strengths of each to meet their comprehensive data management and analysis needs. This hybrid approach ensures that businesses can harness the power of their data effectively, driving insights and decisions that propel them forward.
Columnar storage is a commonly used storage technique. Often, it implies high performance and has basically become a standard configuration for today’s analytical databases. Understanding Columnar Storage The basic principle of columnar storage is reducing the amount of data retrieved from the hard disk. A data table can have a lot of columns, but the computation may use only a very small number of them. With columnar storage, useless columns do not need to be retrieved, while with row-wise storage, all columns need to be scanned. When the retrieved columns only take up a very small part of the total, columnar storage has a big advantage in terms of IO time, and computation seems to get much faster. But the columnar storage also has another side – it isn’t the fastest for any scenario. Implementation Challenges of Columnar Storage Implementing columnar storage is much more complex than implementing row-wise storage because, for a data table, the number of columns can be determined in advance, but the number of rows will not stop growing. With row-wise storage, we write and append data to the table according to the order of records. It is easy to store the data table as a single file. But this does not work for data stored in columnar format. As there will be data appending, we cannot know the number of rows beforehand, and it is thus impossible to finish writing a column and then the next. Generally, we divide the storage space into a number of blocks, write a fixed number of rows (represented by N) to one block, and then move to the next when finish the writing. Later, data will be retrieved block by block. In each block, data is stored in the column-wise format, while between blocks, data can be regarded as stored row-wise. A special management module, where a table of contents is used to record information on the continuously growing data blocks and every column they store, is needed, causing a lot of inconveniences. So, it is difficult to implement columnar storage in a single data file. The storage schema is usually adopted by special data warehouse products. However, the block storage mechanism is unfriendly to the implementation of parallel processing when the data amount is not large. Parallel processing requires that data be divided into multiple segments. To be able to do this, there are two conditions: an almost equal amount of data in each segment (equal processing load for each thread) and the ability for flexible segmentation (the number of parallel tasks cannot be determined beforehand). Row-wise data can be segmented according to the number of rows, and parallel processing becomes feasible even for a very small amount of data. Column-wise data can only be divided into blocks, where data cannot be further divided. The number of records (the above-mentioned N) should not be too small; otherwise, too many resources will be wasted due to the existence of the smallest disk retrieval unit. In the extreme case of N=1, the storage schema is equal to row-wise storage. When N is too small, and the total amount of data involved is huge, the table of contents becomes very large and overburdens the content management. So, N is usually specified as one million or above. In order to segment data flexibly, there need to be at least hundreds of data blocks. That is to say, the parallel computation on column-wise data becomes smooth only when the total amount reaches at least hundreds of millions of data rows. esProc SPL offers the double increment segmentation strategy to make N grow as the data amount increases while maintaining the same number of data blocks. This way, the size of the table of contents can also be fixed, the columnar storage can be conveniently implemented in a single file, and flexible segmentation can be implemented for performing parallel computation on a small amount of data. According to the principle of columnar storage, the storage schema brings an obvious advantage only when the computation involves a relatively small number of columns. Many performance test cases (such as TPCH used as the international standard) choose such computing scenarios so they are convenient for bringing out the advantages of columnar databases. Those are only a part of the real-life business scenarios. In the finance industry, it is not rare that a computation involves most of the columns in a table having over one hundred columns. In that case, columnar storage only gives half-play to its advantage. Even if columnar storage has a higher compression ratio and a smaller amount of data retrieved than row-wise storage, its advantage is not that noticeable when many columns participate in the computation. After all, the process of retrieving data stored column-wise is much more complex than that of retrieving row-wised stored data. Therefore, when a real-world computation does not have as good performance as the test case gets, it is normal, and this does not mean that the test result is fake. Performance Considerations in Columnar Storage Columnar storage also leads to random disk accesses. Data in each column is stored continuously, but data in different columns isn’t. The more columns that are retrieved, the more serious the degree of randomness resulting from the retrieval, even with a single-thread task. For SSDs, it isn’t a very serious problem because when data in each column is continuous, and the above-mentioned N is big enough, the retrieval cost takes up a very small proportion, and the SSD does not have the seek time. But for HDDs that have the seek time, the problem becomes disastrous. When a lot of columns are accessed, it is probable that the performance is not even as good as that of the row-wise storage. Both concurrency and parallel processing will worsen the problem. On the other hand, increasing the size of the cache to alleviate the problem will occupy too much memory space. Be cautious when you try to use the columnar storage on HDDs. Another big problem with columnar storage is that it has a much lower indexing performance than row-wise storage. As we said, the index table stores ordered key values and positions of their corresponding records in the original table. For row-wise storage, the position of a record can be represented by one number, but for columnar storage, each column in a record has a different position, and, in principle, these positions should all be recorded. This creates an index table almost as big as the original table, which leads to heavy storage utilization and high retrieval costs. There isn’t much difference between this and the method of copying the original table and sorting it. Choosing the Right Storage Schema Of course, no one will do that in real-world practices. The general approach is still the previously mentioned block storage mechanism. The index only stores ordinal numbers of the records. The search reads an ordinal number from the index table, locates the corresponding block, “counts” from the first record to the one with the corresponding ordinal number in the block, and retrieves the column value. The “count” action is performed on each column. In the best-case scenario, a number of disk units equal to the number of columns will be read; if you are not lucky, the whole block will be scanned. By contrast, an index for row-wise storage generally only needs to read one or two disk units (determined by the space the records occupy). The amount of data retrieved under columnar storage is dozens of, even one hundred, times more than that under row-wise storage. With HDDs, there is also the unbearable seek time. Therefore, the columnar storage basically cannot handle the high-concurrency query requirements. Use the columnar storage for traversal and the row-wise storage for search. For data on which both traversal and search require high performance, it is even necessary to store two copies of data redundantly. The data platform should permit programmers to adopt the most suitable storage schema for each computing scenario rather than making the same decision for all scenarios. Users have the flexibility to select the most suitable storage schema based on their needs, along with implementing efficient indexing strategies to enhance search capabilities within their database systems. Conclusion In conclusion, while columnar storage offers advantages like reduced data retrieval and improved IO time, its implementation complexity, challenges with random disk accesses, and lower indexing performance compared to row-wise storage require careful consideration. Organizations should choose storage schemas based on their specific needs, balancing the benefits of columnar storage for traversal and row-wise storage for search to optimize performance effectively. Understanding these principles and navigating implementation challenges are key to leveraging columnar storage effectively for analytical database needs.
In the ever-expanding digital landscape, where data is generated at an unprecedented rate, the architecture of databases stands as the bedrock of efficient data management. With the rise of Big Data and Cloud technologies, alongside the integration of Artificial Intelligence (AI), the realm of database architectures has undergone a profound transformation. This article delves into the intricate world of database architectures, exploring their adaptation to Big Data and Cloud environments while also dissecting the evolving impact of AI on their structure and functionality. As organizations grapple with the challenges of handling vast amounts of data in real time, the significance of robust database architectures becomes increasingly apparent. From the traditional foundations of Relational Database Management Systems (RDBMS) to the flexible solutions offered by NoSQL databases and the scalability of cloud-based architectures, the evolution continues to meet the demands of today's data-driven landscape. Furthermore, the convergence of AI technologies introduces new dimensions to database management, enabling intelligent query optimization, predictive maintenance, and the emergence of autonomous databases. Understanding these dynamics is crucial for navigating the complexities of modern data ecosystems and leveraging the full potential of data-driven insights. The Traditional Foundation: Relational Database Management Systems (RDBMS) Traditionally, Relational Database Management Systems (RDBMS) have been the stalwarts of data management. Characterized by structured data organized into tables with predefined schemas, RDBMS ensures data integrity and transactional reliability through ACID (Atomicity, Consistency, Isolation, Durability) properties. Examples of RDBMS include MySQL, Oracle, and PostgreSQL. Embracing the Complexity of Big Data: NoSQL Databases The advent of Big Data necessitated a shift from the rigid structures of RDBMS to more flexible solutions capable of handling massive volumes of unstructured or semi-structured data. Enter NoSQL databases, a family of database systems designed to cater to the velocity, volume, and variety of Big Data (Kaushik Kumar Patel (2024)). NoSQL databases come in various forms, including document-oriented, key-value stores, column-family stores, and graph databases, each optimized for specific data models and use cases. Examples include MongoDB, Cassandra, and Apache HBase. Harnessing the Power of the Cloud: Cloud-Based Database Architectures Cloud-based database architectures leverage the scalability, flexibility, and cost-efficiency of cloud infrastructure to provide on-demand access to data storage and processing resources. Through models such as Infrastructure as a Service (IaaS), Platform as a Service (PaaS), and Database as a Service (DBaaS), organizations can choose the level of abstraction and management that suits their needs. Multi-cloud and hybrid cloud architectures further enhance flexibility by enabling workload distribution across multiple cloud providers or integration with on-premises infrastructure (Hichem Moulahoum, Faezeh Ghorbanizamani (2024)). Notable examples include Amazon Aurora, Google Cloud Spanner, and Microsoft Azure Cosmos DB. Data Flow and Storage: On-Premises vs. Cloud Databases Understanding data flow and storage is crucial for managing both on-premises and cloud databases effectively. Here's a breakdown with a Data Base Architect (DBA) diagram for each scenario: On-Premises Database Explanation Application server: This interacts with the database, initiating data creation, retrieval, and updates. Data extraction: This process, often utilizing Extract, Transform, Load (ETL) or Extract, Load, transform (ELT) methodologies, extracts data from various sources, transforms it into a format compatible with the database, and loads it. Database: This is the core storage location, managing and organizing data using specific structures like relational tables or NoSQL document stores. Storage: This represents physical storage devices like hard disk drives (HDDs) or solid-state drives (SSDs) holding the database files. Backup system: Regular backups are crucial for disaster recovery and ensuring data availability. Data Flow Applications interact with the database server, sending data creation, retrieval, and update requests. The ETL/ELT process extracts data from various sources, transforms it, and loads it into the database. Data is persisted within the database engine, organized by its specific structure. Storage devices physically hold the database files. Backups are periodically created and stored separately for data recovery purposes Cloud Database Explanation Application server: Like the on-premises scenario, this interacts with the database but through an API gateway or SDK provided by the cloud service provider. API Gateway/SDK: This layer acts as an abstraction, hiding the underlying infrastructure complexity and providing a standardized way for applications to interact with the cloud database. Cloud database: This is a managed service offered by cloud providers that handles database creation, maintenance, and scaling automatically. Cloud storage: This represents the cloud provider's storage infrastructure, where database files and backups are stored. Data Flow Applications interact with the cloud database through the API gateway or SDK, sending data requests. The API gateway/SDK translates the requests and interacts with the cloud database service. The cloud database service manages data persistence, organization, and retrieval. Data is stored within the cloud provider's storage infrastructure. Key Differences Management: On-premises databases require in-house expertise for setup, configuration, maintenance, and backups. Cloud databases are managed services, with the provider handling these aspects, freeing up IT resources. Scalability: On-premises databases require manual scaling of hardware resources, while cloud databases offer elastic scaling, automatically adjusting to meet changing needs. Security: Both options require security measures like access control and encryption. However, cloud providers often have robust security infrastructure and compliance certifications. The Convergence of AI and Database Architectures The integration of Artificial Intelligence (AI) into database architectures heralds a new era of intelligent data management solutions. AI technologies such as machine learning and natural language processing augment database functionality by enabling automated data analysis, prediction, and decision-making. These advancements not only streamline operations but also unlock new avenues for optimizing database performance and reliability. Intelligent Query Optimization In the realm of intelligent query optimization, AI-powered techniques revolutionize how databases handle complex queries. By analyzing workload patterns and system resources in real time, AI algorithms dynamically adjust query execution plans to enhance efficiency and minimize latency. This proactive approach ensures optimal performance, even in the face of fluctuating workloads and evolving data structures. Predictive Maintenance Predictive maintenance, empowered by AI, transforms how organizations manage database health and stability. By leveraging historical data and predictive analytics, AI algorithms forecast potential system failures or performance bottlenecks before they occur. This foresight enables proactive maintenance strategies, such as resource allocation and system upgrades, mitigating downtime, and optimizing database reliability. Autonomous Databases Autonomous databases represent the pinnacle of AI-driven innovation in database architectures. These systems leverage AI algorithms to automate routine tasks, including performance tuning, security management, and data backups. By autonomously optimizing database configurations and addressing security vulnerabilities in real time, autonomous databases minimize operational overhead and enhance system reliability. This newfound autonomy allows organizations to focus on strategic initiatives rather than routine maintenance tasks, driving innovation and efficiency across the enterprise. Looking Towards the Future: Trends and Challenges As the trajectory of database architectures unfolds, a spectrum of trends and challenges beckons our attention: Edge Computing The proliferation of Internet of Things (IoT) devices and the rise of edge computing architectures herald a shift towards decentralized data processing. This necessitates the development of distributed database solutions capable of efficiently managing and analyzing data at the network edge, optimizing latency and bandwidth usage while ensuring real-time insights and responsiveness. Data Privacy and Security In an era of burgeoning data volumes, the preservation of data privacy and security assumes paramount importance (Jonny Bairstow, (2024)). As regulatory frameworks tighten and cyber threats escalate, organizations must navigate the intricate landscape of data governance to ensure compliance with stringent regulations and fortify defenses against evolving security vulnerabilities, safeguarding sensitive information from breaches and unauthorized access. Federated Data Management The proliferation of disparate data sources across diverse systems and platforms underscores the need for federated data management solutions. Federated database architectures offer a cohesive framework for seamless integration and access to distributed data sources, facilitating interoperability and enabling organizations to harness the full spectrum of their data assets for informed decision-making and actionable insights. Quantum Databases The advent of quantum computing heralds paradigm shifts in database architectures, promising exponential leaps in computational power and algorithmic efficiency. Quantum databases, leveraging the principles of quantum mechanics, hold the potential to revolutionize data processing by enabling faster computations and more sophisticated analytics for complex data sets. As quantum computing matures, organizations must prepare to embrace these transformative capabilities, harnessing quantum databases to unlock new frontiers in data-driven innovation and discovery. Conclusion The evolution of database architectures mirrors the relentless march of technological progress. From the rigid structures of traditional RDBMS to the flexibility of NoSQL databases and the scalability of cloud-based solutions, databases have adapted to meet the evolving needs of data-intensive applications. Moreover, the integration of AI augments database functionality, paving the way for more intelligent and automated data management solutions. As we navigate the future, addressing emerging challenges and embracing innovative technologies will be essential in shaping the next generation of database architectures. References Kaushikkumar Patel (2024), Mastering Cloud Scalability: Strategies, Challenges, and Future Directions: Navigating Complexities of Scaling in Digital Era Hichem Moulahoum, Faezeh Ghorbanizamani (2024), Navigating the development of silver nanoparticles based food analysis through the power of artificial intelligence D. Dhinakaran, S.M. Udhaya Sankar, D. Selvaraj, S. Edwin Raja (2024), Privacy-Preserving Data in IoT-based Cloud Systems: A Comprehensive Survey with AI Integration Mihaly Varadi, Damian Bertoni, Paulyna Magana, Urmila Paramval, Ivanna Pidruchna, (2024), AlphaFold Protein Structure Database in 2024: providing structure coverage for over 214 million protein sequences Jonny Bairstow, (2024), “Navigating the Confluence: Big Data Analytics and Artificial Intelligence - Innovations, Challenges, and Future Directions”
Data Streaming is one of the most relevant buzzwords in tech to build scalable real-time applications and innovative business models. Do you wonder about my predicted TOP 5 data streaming trends in 2024 to set data in motion? Learn what role Apache Kafka and Apache Flink play. Discover new technology trends and best practices for event-driven architectures, including data sharing, data contracts, serverless stream processing, multi-cloud architectures, and GenAI. Some followers might notice that this became a series with past posts about the top 5 data streaming trends for 2021, the top 5 for 2022, and the top 5 for 2023. Trends change over time, but the huge value of having a scalable real-time infrastructure as the central data hub stays. Data streaming with Apache Kafka is a journey and evolution to set data in motion. Gartner’s Top Strategic Technology Trends 2024 The research and consulting company Gartner defines the top strategic technology trends every year. This time, the trends are around building new (AI) platforms and delivering value by automation but also protecting investment. On a higher level, it is all about automating, scaling, and pioneering. Here is what Gartner expects for 2024: It is funny (but not surprising): Gartner’s predictions overlap and complement the five trends I focus on for data streaming with Apache Kafka looking forward to 2024. I explore how data streaming enables faster time to market, good data quality across independent data products, and innovation with technologies like Generative AI. The Top 5 Data Streaming Trends for 2024 I see the following topics coming up more regularly in conversations with customers, prospects, and the broader data-streaming community across the globe: Data sharing for faster innovation with independent data products Data contracts for better data governance and policy enforcement Serverless stream processing for easier building of scalable and elastic streaming apps Multi-cloud deployments for cost-efficient delivering value where the customers sit Reliable Generative AI (GenAI) with embedded accurate, up-to-date information to avoid hallucination The following sections describe each trend in more detail. The trends are relevant for many scenarios, no matter if you use the open-source Apache Kafka or Apache Flink, a commercial platform, or a fully managed cloud service like Confluent Cloud. I start each section with a real-world case study. The end of the article contains the complete slide deck and video recording. Data Sharing Across Business Units and Organizations Data sharing refers to the process of exchanging or providing access to data among different individuals, organizations, or systems. This can involve sharing data within an organization or sharing data with external entities. The goal of data sharing is to make information available to those who need it, whether for collaboration, analysis, decision-making, or other purposes. Obviously, real-time data beats slow data for almost all data-sharing use cases. NASA: Real-Time Data Sharing With Apache Kafka NASA enables real-time data between space- and ground-based observatories. The General Coordinates Network (GCN) allows real-time alerts in the astronomy community. With this system, NASA researchers, private space companies, and even backyard astronomy enthusiasts can publish and receive information about current activity in the sky. Apache Kafka plays an essential role in astronomy research for data sharing. Particularly where black holes and neutron stars are involved, astronomers are increasingly seeking out the “time domain” and want to study explosive transients and variability. In response, observatories are increasingly adopting streaming technologies to send alerts to astronomers and to get their data to their science users in real-time. The talk "General Coordinates Network: Harnessing Kafka for Real-Time Open Astronomy at NASA" explores architectural choices, challenges, and lessons learned in adapting Kafka for open science and open data sharing at NASA. NASA's approach to OpenID Connect / OAuth2 in Kafka is designed to securely scale Kafka from access inside a single organization to access by the general public. Stream Data Exchange With Kafka Using Cluster Linking, Stream Sharing, and AsyncAPI The Kafka ecosystem provides various functions to share data in real time at any scale. Some are vendor-specific. I look at this from the perspective of Confluent so that you see a lot of innovative options (even if you want to build it by yourself with open-source Kafka): Kafka Connect connector ecosystem to integrate with other data sources and sinks out-of-the-box HTTP/REST proxies and connectors for Kafka to use simple and well-understood request-response (HTTP is, unfortunately, also an anti-pattern for streaming data) Cluster Linking for replication between Kafka clusters using the native Kafka protocol (instead of separate infrastructure like MirrorMaker) Stream Sharing for exposing a Kafka Topic through a simple button click with access control, encryption, quotas, and chargeback billing APIs Generation of AsyncAPI specs to share data with non-Kafka applications (like other message brokers or API gateways that support AsyncAPI, which is an open data for contract for asynchronous event-based messaging (similar to Swagger for HTTP/REST APIs) Here is an example of Cluster Linking for bi-directional replication between Kafka clusters in the automotive industry: Another example of stream sharing for easy access to a Kafka Topic in financial services: Data Contracts for Data Governance and Policy Enforcement A data contract is an agreement or understanding that defines the terms and conditions governing the exchange or sharing of data between parties. It is a formal arrangement specifying how data will be handled, used, protected, and shared among entities. Data contracts are crucial when multiple parties need to interact with and utilize shared data, ensuring clarity and compliance with agreed-upon rules. Raiffeisen Bank International: Data Contracts for Data Sharing Across Countries Raiffeisen Bank International (RBI) is scaling an event-driven architecture across the group as part of a bank-wide transformation program. This includes the creation of a reference architecture and the re-use of technology and concepts across 12 countries. Policy Enforcement and Data Quality for Apache Kafka With Schema Registry Good data quality is one of the most critical requirements in decoupled architectures like microservices or data mesh. Apache Kafka became the de facto standard for these architectures. But Kafka is a dumb broker that only stores byte arrays. The Schema Registry for Apache Kafka enforces message structures. This blog post examines Schema Registry enhancements to leverage data contracts for policies and rules to enforce good data quality on field-level and advanced use cases like routing malicious messages to a dead letter queue. Serverless Stream Processing With Apache Flink for Scalable, Elastic Streaming Apps Serverless stream processing refers to a computing architecture where developers can build and deploy applications without having to manage the underlying infrastructure. In the context of stream processing, it involves the real-time processing of data streams without the need to provision or manage servers explicitly. This approach allows developers to focus on writing code and building applications. The cloud service takes care of the operational aspects, such as scaling, provisioning, and maintaining servers. Sencrop: Smart Agriculture With Apache Kafka and Apache Flink Designed to answer professional farmers' needs, Sencrop offers a range of connectedweather stations that bring you precision agricultural weather data straight from your plots. Over 20,000 connected ag-weather stations throughout Europe. An intuitive, user-friendly application: Access accurate, ultra-local data to optimize your daily actions. Prevent risks and reduce costs: Streamline inputs and reduce your environmental impact and associated costs. Apache Flink Becomes the De Facto Standard for Stream Processing Apache Kafka and Apache Flink increasingly join forces to build innovative real-time stream processing applications. The Y-axis in the diagram shows the monthly unique users (based on statistics of Maven downloads). Unfortunately, operating a Flink cluster is really hard. Even harder than Kafka. Because Flink is not just a distributed system, it also has to keep the state of applications for hours or even longer. Hence, serverless stream processing helps take over the operation burden. And it makes the life of the developer easier, too. Staying tuned for exciting cloud products offering serverless Flink in 2024. But be aware that some vendors use the same trick as for Kafka: Provisioning a Flink cluster and handing it over to you is NOT a serverless or fully managed offering! Multi-Cloud for Cost-Efficient and Reliable Customer Experience Multi-cloud refers to a cloud computing strategy that uses services from multiple cloud providers to meet specific business or technical requirements. In a multi-cloud environment, organizations distribute their workloads across two or more cloud platforms, including public clouds, private clouds, or a combination of both. The goal of a multi-cloud strategy is to avoid dependence on a single cloud provider and to leverage the strengths of different providers for various needs. Cost efficiency and regional laws (like operating in the United States or China) required different deployment strategies. Some countries do not provide a public cloud. A private cloud is the only option then. New Relic: Multi-Cloud Kafka Deployments at Extreme Scale for Real-Time Observability New Relic is a software analytics company that provides monitoring and performance management solutions for applications and infrastructure. It's designed to help organizations gain insights into the performance of their software and systems, allowing them to optimize and troubleshoot issues efficiently. Observability has two key requirements: first, monitor data in real-time at any scale. Second, deploy the monitoring solution where the applications are running. The obvious consequence for New Relic is to process data with Apache Kafka and multi-cloud where the customers are. Hybrid and Multi-Cloud Data Replication for Cost-Efficiency, Low Latency, or Disaster Recovery Multi-cloud deployments of Apache Kafka have become the norm rather than an exception. Several scenarios require multi-cluster solutions with specific requirements and trade-offs: Regional separation because of legal requirements Independence of a single cloud provider Disaster recovery Aggregation for analytics Cloud migration Mission-critical stretched deployments Reliable Generative AI (GenAI) With Accurate Context To Avoid Hallucination Generative AI is a class of artificial intelligence systems that generate new content, such as images, text, or even entire datasets, often by learning patterns and structures from existing data. These systems use techniques such as neural networks to create content that is not explicitly programmed but is instead generated based on the patterns and knowledge learned during training. Elemental Cognition: GenAI Platform Powered by Apache Kafka Elemental Cognition’s AI platform develops responsible and transparent AI that helps solve problems and deliver expertise that can be understood and trusted. Confluent Cloud powers the AI platform to enable scalable real-time data and data integration use cases. I recommend looking at their website to learn from various impressive use cases. Apache Kafka as Data Fabric for Genai Using Rag, Vector Database and Semantic Search Apache Kafka serves thousands of enterprises as the mission-critical and scalable real-time data fabric for machine learning infrastructures. The evolution of Generative AI (GenAI) with large language models (LLM) like ChatGPT changed how people think about intelligent software and automation. The relationship between data streaming and GenAI has enormous opportunities. An excellent example, especially for Generative AI, is context-specific customer service. The following diagram shows an enterprise architecture leveraging event-driven data streaming for data ingestion and processing across the entire GenAI pipeline: Stateful Stream Processing With Apache Flink and GenAI Using a Large Language Model (LLM) Stream processing with Kafka and Flink enables data correlation of real-time and historical data. A stateful stream processor takes existing customer information from the CRM, loyalty platform, and other applications, correlates it with the query from the customer into the chatbot, and makes an RPC call to an LLM. Slides and Video Recording for the Data Streaming Trends in 2024 With Kafka and Flink Do you want to look at more details? This section provides the entire slide deck and a video walking you through the content. Slide Deck Here is the slide deck from my presentation. Video Recording And here is the video recording of my presentation. 2024 Makes Data Streaming More Mature, and Apache Flink Becomes Mainstream I have two conclusions for data streaming trends in 2024: Data streaming goes up in the maturity curve. More and more projects build streaming applications instead of just leveraging Apache Kafka as a dumb data pipeline between databases, data warehouses, and data lakes. Apache Flink becomes mainstream. The open-source framework shines with a scalable engine, multiple APIs like SQL, Java, and Python, and serverless cloud offerings from various software vendors. The latter makes building applications much more accessible. Data sharing with data contracts is mandatory for a successful enterprise architecture with microservices or a data mesh. And data streaming is the foundation for innovation with technology trends like Generative AI. Therefore, we are just at the tipping point of adopting data streaming technologies such as Apache Kafka and Apache Flink. What are your most relevant and exciting data streaming trends with Apache Kafka and Apache Flink in 2024 to set data in motion? What are your strategy and timeline? Do you use serverless cloud offerings or self-managed infrastructure? Let’s connect on LinkedIn and discuss it! Stay informed about new blog posts by subscribing to my newsletter.
Apache Kafka’s real-time data processing relies on Kafka consumers (more background here) that read messages within its infrastructure. Producers publish messages to Kafka topics, and consumers — often part of a consumer group — subscribe to these topics for real-time message reception. A consumer tracks its position in the queue using an offset. To configure a consumer, developers create one with the appropriate group ID, prior offset, and details. They then implement a loop for the consumer to process arriving messages efficiently. It’s an important understanding for any organization using Kafka in its 100% open-source, enterprise-ready version — and here’s what to know. Example: Creating a Kafka Consumer The process of creating and configuring Kafka consumers follows consistent principles across programming languages, with a few language-specific nuances. This example illustrates the fundamental steps for creating a Kafka consumer in Java. Start by crafting a properties file. While programmatic approaches are feasible, it's advisable to use a properties file. In the code below, substitute 'MYKAFKAIPADDRESS1' with the actual IP addresses of your Kafka brokers: Java bootstrap.servers=MYKAFKAIPADDRESS1:9092, MYKAFKAIPADDRESS2:9092, MYKAF KAIPADDRESS3:9092 key.deserializer=org.apache.kafka.common.serialization. StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=my-group security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram. ScramLoginModule required \ username="[USER NAME]" \ password="[USER PASSWORD]"; The next step is creating the consumer. This example code prepares the main program entry point, as well as the necessary message processing loop: Java import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer. Consumer Record; import org.apache.kafka.clients.consumer. Consumer Records; import java.io.FileReader; import java.io.IOException; import java.time. Duration; import java.util.Collections; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties kafkaProps = new Properties(); try (FileReader fileReader = new FileReader ("consumer.properties")) { kafkaProps.load(fileReader); } catch (IOException e) { e.printStackTrace(); } try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) { consumer.subscribe (Collections.singleton("test")); while (true) { Consumer Records<String, String> records = consumer.poll (Duration.ofMillis(100)); for (Consumer Record<String, String>; record records) { System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %", record.topic (), record.partition(), record.offset(), record.key(), record.value())); } } } } Commonly Used Kafka Consumer Configuration Options With the foundational setup complete, developers have a range of powerful options to fine-tune Kafka consumers according to their preferences. The following summaries highlight commonly used options, while your Kafka driver documentation provides a comprehensive list of configurations. Some of the most popular Kafka consumer configuration options include: client.id – Identifies the client (consumer) to brokers in the Kafka cluster, to make clear which consumer made which requests. As a best practice, consumers in a consumer group should each have the same client ID, enabling client quota enforcement for that group. session.timeout.ms – This value controls how long a broker will listen for a consumer’s heartbeat before declaring it dead. The default value is 10 seconds. heartbeat.interval.ms – This value controls how often a consumer sends a heartbeat to let the broker know it’s alive and functioning. The default value is 3 seconds. Note that session.timeout.ms and heartbeat.interval.ms must work together to let the broker know a consumer’s status. As a best practice, the consumer should send several heartbeats per timeout interval. For example, the default settings of a heartbeat every 3 seconds and a 10-second timeout offer a healthy strategy. max.poll.interval.ms – This sets the length of time that a broker will wait between poll method calls, and prompts a consumer to attempt to receive further messages before declaring it dead. The default value is 300 seconds. enable.auto.commit – Tells the consumer to automatically commit periodic offsets (at an interval determined by auto.commit.interval.ms). This configuration is enabled by default. fetch.min.bytes – This sets the minimum data amount that a consumer fetches from a broker. A consumer will wait for more data if the available data doesn’t fulfill the set amount. This approach minimizes back and forth between consumers and brokers, boosting throughput at the cost of latency. fetch.max.wait.ms – Used together with fetch.min.bytes, this sets the maximum time that a consumer will wait before then fetching messages from the broker. max.partition.fetch.bytes – This sets the maximum bytes a consumer will fetch per partition, effectively placing an upper limit on memory requirements for fetching messages. It’s important to be mindful of max.poll.interval when choosing this value: an overly large maximum can make it so there’s more data to deliver than consumers can fetch during the poll interval timespan. auto.offset.reset – This tells the consumer what to do if it reads a partition with no last read offset available. This can be set to “latest” or “earliest”, telling the consumer to start either with the latest available message, or the earliest available offset. partition.assignment.strategy – This tells the group leader consumer how to assign partitions to consumers in its consumer group. max.poll.records – This sets the maximum records a consumer will fetch in a poll call, offering control over throughput. Unlock Apache Kafka’s Potential As a reliable, high-performance, hyperscale real-time data streaming solution, open-source Apache Kafka offers tremendous opportunities. By understanding how to create and configure Kafka consumers, developers can more closely control the solution’s behavior and get more out of their Kafka deployment.
In the ever-evolving landscape of the Financial Services Industry (FSI), organizations face a multitude of challenges that hinder their journey toward AI-driven transformation. Legacy systems, stringent regulations, data silos, and a lack of agility have created a chaotic environment in need of a more effective way to use and share data across the organization. In this two-part series, I delve into my own personal observations, break open the prevailing issues within the FSI, and closely inspect the factors holding back progress. I’ll also highlight the need to integrate legacy systems, navigate strict regulatory landscapes, and break down data silos that impede agility and hinder data-driven decision-making. Last but not least, I’ll introduce a proven data strategy approach in which adopting data streaming technologies will help organizations overhaul their data pipelines—enabling real-time data ingestion, efficient processing, and seamless integration of disparate systems. If you're interested in a practical use case that showcases everything in action, keep an eye out for our upcoming report. But before diving into the solution, let’s start by understanding the problem. Complexity, Chaos, and Data Dilemmas Stepping into larger financial institutions often reveals a fascinating insight: a two-decade technology progression unfolding before my eyes. The core business continues to rely on mainframe systems running COBOL, while a secondary layer of services acts as the gateway to access the core and extension of services offerings that can’t be done in the core system. Data is heavily batched and undergoes nightly ETL processes to facilitate transfer between these layers. Real-time data access poses challenges, demanding multiple attempts and queries through the gateway for even a simple status update. Data warehouses are established, serving as data dumping grounds through ETL, where nearly half of the data remains unused. Business Intelligence (BI) tools extract, transform, and analyze the data to provide valuable insights for business decisions and product design. Batch and distributed processing prevail due to the sheer volume of data to be handled, resulting in data silos and delayed reflection of changing trends. In recent years, more agile approaches have emerged, with a data shift towards binary, key-value formats for better scalability on the cloud. However, due to the architectural complexity, data transfers between services have multiplied, leading to challenges in maintaining data integrity. Plus, these innovations primarily cater to new projects, leaving developers and internal users to navigate through multiple hoops within the system to accomplish tasks. Companies also find themselves paying the price for slow innovation and encounter high costs when implementing new changes. This is particularly true when it comes to AI-driven initiatives that demand a significant amount of data and swift action. Consequently, several challenges bubble to the surface and get in the way of progress, making it increasingly difficult for FSIs to adapt and prepare for the future. Here’s a breakdown of these challenges and the ideal state for FSIs. Reality Ideal state Data silos Decentralized nature of financial operations or team’s geographical location. Separate departments or business units maintain their own data and systems that were implemented over the years, resulting in isolated data and making it difficult to collaborate. There were already several attempts to break the silos, and the solutions somehow contributed to one of the many problems below (i.e., data pipeline chaos). A consolidated view of data across the organization. Ability to quickly view and pull data when needed. Legacy systems FSIs often grapple with legacy systems that have been in place for many years. These systems usually lack the agility to adapt to changes quickly. As a result, accessing and actioning data from these legacy systems can be time-consuming, leading to delays and sometimes making it downright impossible to make good use of the latest data. Data synchronization with the old systems, and modernized ETL pipelines. Migrate and retire from the old process. Data overload With vast amounts of data from various sources, including transactions, customer interactions, market data, and more, it can be overwhelming, making it challenging to extract valuable insights and derive actionable intelligence. It often leads to high storage bills and data is not fully used most of the time. Infrastructural change to adopt larger ingestion of data, planned data storage strategy, and a more cost-effective way to safely secure and store data with sufficient failover and recovery plan. Data pipeline chaos Managing data pipelines within FSIs can be a complex endeavor. With numerous data sources, formats, and integration points, the data pipeline can become fragmented and chaotic. Inconsistent data formats, incompatible systems, and manual processes can introduce errors and inefficiencies, making it challenging to ensure smooth data flow and maintain data quality. A data catalog is a centralized repository that serves as a comprehensive inventory and metadata management system for an organization's data assets.Reduced redundancy, improved efficiency, streamlined data flow, and introduce automation, monitoring and regular inspection. Open data initiatives With the increasing need for partner collaboration and government open API projects, the FSI faces the challenge of adapting its data practices. The demand to share data securely and seamlessly with external partners and government entities is growing. FSIs must establish frameworks and processes to facilitate data exchange while ensuring privacy, security, and compliance with regulations. Secure and well-defined APIs for data access that ensure data interoperability through common standards. Plus, version and access control over access points. Clearly, there’s a lot stacked up against FSIs attempting to leap into the world of AI. Now, let’s zoom in on the different data pipelines organizations are using to move their data from point A to B and the challenges many teams are facing with them. Understanding Batch, Micro-Batch, and Real-Time Data Pipelines There are all sorts of ways that move data around. To keep things simple, I’ll distill the most common pipelines today into three categories: Batch Micro-batch Real-time 1. Batch Pipelines These are typically used when processing large volumes of data in scheduled “chunks” at a time—often in overnight processing, periodic data updates, or batch reporting. Batch pipelines are well-suited for scenarios where immediate data processing isn't crucial, and the output is usually a report, like for investment profiles and insurance claims. The main setbacks include processing delays, potentially outdated results, scalability complexities, managing task sequences, resource allocation issues, and limitations in providing real-time data insights. I’ve witnessed an insurance customer running out of windows at night to run batches due to the sheer volume of data that needed processing (updating premiums, investment details, documents, agents’ commissions, etc.). Parallel processing or map-reduce are a few techniques to shorten the time, but they also introduce complexities, as parallel both require the developer to understand the distribution of data, dependency of data, and be able to maneuver between map and reduce functions. 2. Micro-Batch Pipelines Micro batch pipelines are a variation of batch pipelines where data is processed in smaller, more frequent batches at regular intervals for lower latency and fresher results. They’re commonly used for financial trading insights, clickstream analysis, recommendation systems, underwriting, and customer churn predictions. Challenges with micro-batch pipelines include managing the trade-off between processing frequency and resource usage, handling potential data inconsistencies across micro-batches, and addressing the overhead of initiating frequent processing jobs while still maintaining efficiency and reliability. 3. Real-Time Pipelines These pipelines process data as soon as it flows in. They offer minimal latency and are essential for applications requiring instant reactions, such as real-time analytics, transaction fraud detection, monitoring critical systems, interactive user experiences, continuous model training, and real-time predictions. However, real-time pipelines face challenges like handling high throughputs, maintaining consistently low latency, ensuring data correctness and consistency, managing resource scalability to accommodate varying workloads, and dealing with potential data integration complexities—all of which require robust architectural designs and careful implementation to deliver accurate and timely results. To summarize, here’s the important information about all three pipelines in one table. Batch Micro batch Real time Cadence Scheduled longer intervals Scheduled short intervals Real time Data size Large Small defined chunks Large Scaling Vertical Horizontal Horizontal Latency High (hours/days) Medium (seconds) Low (milliseconds) Datastore Data warehouse, Data lake, Databases, Files Distributed files system, Data warehouses, Databases Stream processing Systems, Data lake, Databases Open-source technologies Apache Hadoop, Map-reduce Apache Spark™ Apache Kafka®, Apache Flink® Industry use case examples Moving files (customer signature scans), and transferring data from the mainframe for core banking data or core insurance policy information. Large datasets for ML. Prepare near real-time business reports and needs to consume data from large dataset lookups such as generating risk management reviews for investment. Daily market trend analysis. Real-time transaction/fraud detection, instant claim approval, monitoring critical systems, and customer chatbot service. As a side note, some may categorize pipelines as either ETL or ELT. ETL (Extract, Transform, and Load) transforms data on a separate processing server before moving it to the data warehouse. ELT (Extract, Load, and Transform) transforms the data within the data warehouse first before it hits its destination. Depending on the destination of the data, if it’s going to a data lake, you’ll see most pipelines doing ELT. Whereas with a data source, like a data warehouse or database, since it requires data to be stored in a more structured manner, you will see more ETL. In my opinion, all three pipelines should be using both techniques to convert data into the desired state. Common Challenges of Working With Data Pipelines Pipelines are scattered across departments, and IT teams implement them with various technologies and platforms. From my own experience working with on-site data engineers, here are some common challenges working with data pipelines: Difficulty Accessing Data Unstructured data can be tricky. The lack of metadata makes it difficult to locate the desired data within the repository (like customer correspondence, emails, chat logs, legal documents.) Certain data analytics tools or platforms may have strict requirements regarding the input data format, posing difficulties in converting the data to the required format. So, multiple complex pipelines transform logic (and lots of it). Stringent security measures and regulatory compliance can introduce additional steps and complexities in gaining access to the necessary data. (Personal identifiable data, health record for claims). Noisy, “Dirty” Data Data lakes are prone to issues like duplicated data. Persistence of decayed or outdated data within the system can compromise the accuracy and reliability of AI models and insights. Input errors during data entry were not caught and filtered. (biggest data processing troubleshooting time wasted) Data mismatches between different datasets and inconsistencies in data. (Incorrect report and pipeline errors) Performance Large volumes of data, lack of efficient storage and processing power. Methods of retrieving data, such as APIs in which the request and response aren’t ideal for large volumes of data ingestion. The location of relevant data within the system and where they’re stored heavily impacts the frequency of when to process data, plus the latency and cost of retrieving it. Data Visibility (Data Governance and Metadata) Inadequate metadata results in a lack of clarity regarding the availability, ownership, and usage of data assets. Difficult to determine the existence and availability of specific data, impeding effective data usage and analysis. Troubleshooting Identifying inconsistencies, addressing data quality problems, or troubleshooting data processing failures can be time-consuming and complex. During the process of redesigning the data framework for AI, both predictive and generative, I’ll address the primary pain points for data engineers and also help solve some of the biggest challenges plaguing the FSI today. Taking FSIs From Point A to AI Looking through a data lens, the AI-driven world can be dissected into two primary categories: inference and machine learning. These domains differ in their data requirements and usage. Machine learning needs comprehensive datasets derived from historical, operational, and real-time sources, enabling training more accurate models. Incorporating real-time data into the dataset enhances the model and facilitates agile and intelligent systems. Inference prioritizes real-time focus, leveraging ML-generated models to respond to incoming events, queries, and requests. Building a generative AI model is a major undertaking. For FSI, it makes sense to reuse an existing model (foundation model) with some fine-tuning in specific areas to fit your use case. The “fine-tuning” will require you to provide a high-quality, high-volume dataset. The old saying still holds true: garbage in, garbage out. If the data isn’t reliable, to begin with, you’ll inevitably end up with unreliable AI. In my opinion, to prepare for the best AI outcome possible, it’s crucial to set up the following foundations: Data infrastructure: You need a robust, low latency, high throughput framework to transfer and store vast volumes of financial data for efficient data ingestion, storage, processing, and retrieval. It should support distributed and cloud computing and prioritize network latency, storage costs, and data safety. Data quality: To provide better data for determining the model, it’s best to go through data cleansing, normalization, de-duplication, and validation processes to remove inconsistencies, errors, and redundancies. Now, if I were to say that there’s a simple solution, I would either be an exceptional genius capable of solving world crises or blatantly lying. However, given the complexity we already have, it’s best to focus on generating the datasets required for ML and streamline the data needed for the inference phase to make decisions. Then, you can gradually address the issues caused by the current data being overly disorganized. Taking one domain at a time, solving business users’ problems first, and not being overly ambitious is the fastest path to success. But we’ll leave that for the next post. Summary Implementing a data strategy in the financial services industry can be intricate due to factors such as legacy systems and the consolidation of other businesses. Introducing AI into this mix can pose performance challenges, and some businesses might struggle to prepare data for machine learning applications. In my next post, I’ll walk you through a proven data strategy approach to streamline your troublesome data pipelines for real-time data ingestion, efficient processing, and seamless integration of disparate systems.
As the world takes a multi-layered approach to data storage, there is a shift in how organizations transform data. It has driven businesses to integrate extract, load, and transform (ELT) tools with Medallion architecture. This trend reshapes how data is ingested and transformed across lines of business as well as by departmental users, data analysts, and C-level executives. Applying rigid data transformation rules and making data available for your teams through a data warehouse may not fully address your business's evolving and exploratory data integration needs. Depending on the volume of data your organization produces and the rate at which it's generated, processing data without knowing the consumption patterns could prove to be costly. Case-based data transformation could be more economically viable as more ad-hoc queries and analyses pop up every day. That doesn't mean you store the data in raw form. Instead, it's necessary to add several layers of transformations, enrichments, and business rules to optimize cost and performance. How Business Requirements Shape Database Technologies Let's take a quick look at how data management has evolved. We started with cloud data warehouses. Traditional data warehouses, such as those based on relational database systems, have been the backbone of enterprise data management for years. They're optimized for structured data and typically used for business intelligence and reporting. Then, we moved into the era of data lakes. Data lakes became popular for handling large volumes of structured and unstructured data. They offer flexibility in data storage and processing, allowing organizations to store raw and diverse data in its native format. Now, we have data lakehouses. The concept of a data lakehouse emerged as a response to some of the challenges associated with data lakes, such as data quality, data governance, and the need for transactional capabilities. The data lakehouse architecture aims to combine the best features of data lakes and data warehouses, combining the scalability and flexibility of data lakes with the reliability and performance of data warehouses. Technologies like Delta Lake and Apache Iceberg have contributed to developing the data lakehouse concept by adding transactional capabilities and schema enforcement to data lakes. To fully leverage the potential of this evolving architecture, we recommend implementing best practices, one of which is medallion architecture. What Is Medallion Architecture? Medallion architecture is gaining popularity in the data world. Unlike traditional data lake architectures, where raw or unstructured data is stored without any schema enforcement or strong consistency guarantees, medallion architecture introduces structure and organization to your data. It allows you to add schema evolution capabilities to your datasets stored in Delta Lake, making it easier to query and analyze your data effectively. One of the reasons why medallion architecture is gaining popularity is its ability to handle large volumes of diverse data types in a scalable manner. By leveraging Delta Lake's transactional capabilities, you can ensure Atomic, Consistent, Independent, and Durable (ACID) compliance for your operations on massive datasets. But how does it differ from traditional data lake architectures? While both approaches store raw or unstructured data, medallion architecture introduces a systematic method of defining bronze, silver, and gold layers within a data lake. This allows data engineers to curate the right data for the right audience. It also makes it easier for users to query and analyze their datasets without sacrificing performance or reliability. This shows an SQL ELT (native) reference architecture. This is why medallion architecture is taking off in the realm of Delta Lake and cloud data warehousing. It offers a powerful combination of scalability, reliability, performance, and structured storage for your valuable datasets. Now, let's explore how data processing needs to change along with changes in architecture. Why Is ELT the Right Data Transformation Process for Medallion Architecture? As defined, there are several layers in Medallion data architecture. Data is progressively processed and refined as it moves through these layers. Using traditional extract, transform, load (ETL) can be inefficient, as it often requires moving data out of your data warehouse or lakehouse for every transformation, which is needed for the next processing level. Instead, a more effective approach is to use pushdown technology, where you push the code into the target/source, allowing data processing to occur where it resides. In this case, only the data transformation code moves, not the data itself. ELT further streamlines this process by enabling you to transform the data as many times as you want, making your system more efficient. With ELT, you reduce the burden on the source system, as the data is ingested only once into the data lake/lakehouse. The optimal design of ELT provides several competitive advantages. It enables you to process large volumes of data more rapidly, accelerating insights and decision-making. It also reduces operational costs by minimizing unnecessary data movement across networks and systems. Necessary Data Integration Capabilities to Run ELT in Medallion Data Architecture A few specific data integration capabilities will enable you to run ELT successfully in Medallion data architecture. These include: Parallel processing at scale: This is a must-have technology that runs your ELT code on multiple machines at the same time, which can improve the performance of your data jobs. A processing engine like Spark can handle massive datasets by scaling out to larger clusters and adding more nodes. The scheduler distributes tasks to worker nodes, balancing workload and maximizing resource utilization. Data loading patterns: Make sure the tool doesn't solely rely on batch load but also supports real-time streaming and full and incremental loads. Change data capture (CDC) and schema drift are the most frequently used features when transferring data from the sources to a data lakehouse. Optimized data processing at each stage: Medallion architecture is a system for logically organizing data within a data lakehouse. Each layer in a Medallion architecture serves a different purpose, and transformations are applied while considering the security boundaries, retention rules, user access policies, required latency, and business impact level. You should be able to process data at a granular level, optimizing it for the next step of logical data processing. Preview code during design time: This capability allows you to see the results of your ELT code before you run it, which can help you catch errors and ensure your code is doing what you want it to do. Multi-cloud support: Don't limit your integration capabilities to one particular ecosystem. Ensure you can run your data pipeline jobs in multiple cloud environments, such as Snowflake, Databricks, Amazon Web Services (AWS), Microsoft Azure, and Google Cloud. Auto tuning: This lets your ELT tool automatically adjust the settings of your jobs to improve their performance. The tool should be AI-enabled to collect runtime statistics and adjust execution strategies based on data characteristics. Flexible transformation: ELT tools must allow flexible transformation logic, as transformations can be performed using a wider range of tools and techniques, including SQL, Python, and Spark. This can be useful if you need to perform complex transformations not supported by SQL. Combine SQL code with proprietary code: This enables you to use both SQL code and proprietary code in a single ELT pipeline. This can be useful if you need to perform tasks not supported by SQL. For example, you might use SQL to query the database and retrieve the data, then write a Python function to implement a data quality check, applying custom logic to identify and address any data issues. End-to-end workflow: This capability provides a visual interface that allows you to design and execute your ELT jobs as part of a complete task flow. The tool should enable the scheduling and orchestration of a set of tasks, starting from extracting data to triggering downstream tasks, managing dependencies, and enabling data observability. Security, access control, and masking capabilities: This allows you to control who has access to your data and to mask sensitive data. This is important for protecting your data from unauthorized access. The ability to implement DataOps: This gives you the ability to integrate your ETL processes with your DevOps processes. This can help you to improve the quality and reliability of your data. Easy switching between ETL and ELT: This makes it easy for you to switch between ETL and ELT processing. This can be useful if you need to change your data processing strategy. Data transformation as a code: This makes it possible for you to store your ETL code in a repository, making it easier to manage and version your code. Advanced transformation: When ELT becomes your mainstream way of processing data, you need to ensure you don't have to run to different tools for complex transformations. Data quality: This gives you the ability to identify and address data quality issues early in your ELT process. This can help you to improve the quality of your data. Integration with data lineage and governance: This capability allows you to track the origins and transformations of your data. This can help you ensure your data complies with your data governance policies. The ELT tool should integrate seamlessly with your data lineage and governance frameworks to maintain data traceability, consistency, and security. It should provide visibility into data origins, transformations, and destinations, enabling effective data auditing and compliance with data governance policies. Next Steps It's crucial for your business to select an ELT tool that's high-performing and also compatible with Medallion data architecture. This will enhance data integration capabilities, allowing you to utilize the structured, layered approach of Medallion architecture fully. This alignment will give your business a competitive edge by efficiently handling large data volumes, improving scalability, streamlining workflow processes, and achieving cost efficiencies.
When data is analyzed and processed in real time, it can yield insights and actionable information either instantly or with very little delay from the time the data is collected. The capacity to collect, handle, and retain user-generated data in real time is crucial for many applications in today’s data-driven environment. There are various ways to emphasize the significance of real-time data analytics like timely decision-making, IoT and sensor data processing, enhanced customer experience, proactive problem resolution, fraud detection and security, etc. Rising to the demands of diverse real-time data processing scenarios, Apache Kafka has established itself as a dependable and scalable event streaming platform. In short, the process of collecting data in real-time as streams of events from event sources such as databases, sensors, and software applications is known as event streaming. With real-time data processing and analytics in mind, Apache Flink is a potent open-source program. For situations where quick insights and minimal processing latency are critical, it offers a consistent and effective platform for managing continuous streams of data. Causes for the Improved Collaboration Between Apache Flink and Kafka Apache Flink joined the Apache Incubator in 2014, and since its inception, Apache Kafka has consistently stood out as one of the most frequently utilized connectors for Apache Flink. It is just a data processing engine that can be clubbed with the processing logic but does not provide any storage mechanism. Since Kafka provides the foundational layer for storing streaming data, Flink can serve as a computational layer for Kafka, powering real-time applications and pipelines. Apache Flink has produced first-rate support for creating Kafka-based apps throughout the years. By utilizing the numerous services and resources offered by the Kafka ecosystem, Flink applications are able to leverage Kafka as both a source and a sink. Avro, JSON, and Protobuf are just a few widely used formats that Flink natively supports. Apache Kafka proved to be an especially suitable match for Apache Flink. Unlike alternative systems such as ActiveMQ, RabbitMQ, etc., Kafka offers the capability to durably store data streams indefinitely, enabling consumers to read streams in parallel and replay them as necessary. This aligns with Flink’s distributed processing model and fulfills a crucial requirement for Flink’s fault tolerance mechanism. Kafka can be used by Flink applications as a source as well as a sink by utilizing the many tools and services available in the Kafka ecosystem. Flink offers native support for commonly used formats like Avro, JSON, and Protobuf, similar to Kafka’s support for these formats. Other external systems can be linked to Flink’s Table API and SQL programs to read and write batch and streaming tables. Access to data kept in external systems such as a file system, database, message queue, or key-value store is made possible by a table source. For Kafka, it’s nothing but a key-value pair. Events are added to the Flink table in a similar manner as they are appended to the Kafka topic. A topic in a Kafka cluster is mapped to a table in Flink. In Flink, each table is equal to a stream of events that describe the modifications being made to that particular table. The table is automatically updated when a query refers to it, and its results are either materialized or emitted. Conclusion In conclusion, we can create reliable, scalable, low-latency real-time data processing pipelines with fault tolerance and exactly-once processing guarantees by combining Apache Flink and Apache Kafka. For businesses wishing to instantly evaluate and gain insights from streaming data, this combination provides a potent option. Thank you for reading this write-up. If you found this content valuable, please consider liking and sharing.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView