The Ultimate Apache Flink Cheatsheet: Stream Processing & Analytics

Introduction to Apache Flink

Apache Flink is a distributed stream processing framework designed for stateful computations over unbounded and bounded data streams. It provides precise control over time and state, enabling high-throughput, low-latency stream processing with fault tolerance guarantees. This cheatsheet covers essential concepts, APIs, and operations for developing Flink applications.

Core Concepts

Programming Model

  • Streams: Unbounded/bounded sequences of data records
  • State: Long-lived data across events/operations
  • Time: Event time, processing time, ingestion time
  • Transformations: Operations applied to streams
  • Windows: Grouping operations over subsets of streams
  • Connectors: Interfaces for external systems

Processing Semantics

  • Exactly-once: State updates happen exactly once, even after failures
  • At-least-once: No data loss but may have duplicate processing
  • Event time processing: Based on timestamps in data
  • Processing time: Based on machine clocks

Flink Architecture

Components

  • JobManager: Coordinates distributed execution
  • TaskManager: Workers executing tasks
  • ResourceManager: Manages task executor slots
  • Dispatcher: REST endpoint for job submission
  • Client: Prepares/sends applications to cluster

Deployment Options

ModeDescriptionBest for
LocalSingle JVMDevelopment, testing
StandaloneDedicated Flink clusterSimple production
YARNRunning on Hadoop YARNShared clusters
KubernetesContainer orchestrationCloud-native, elastic scaling
DockerContainerized deploymentDevelopment, isolated environments

DataStream API

Environment Setup

// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Set checkpointing (enable exactly-once)
env.enableCheckpointing(1000); // checkpoint every 1000 ms
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);

// Set parallelism
env.setParallelism(4);

// Execute job
env.execute("Job Name");

Creating DataStreams

// From collection
DataStream<String> stream1 = env.fromCollection(Arrays.asList("a", "b", "c"));

// From elements
DataStream<String> stream2 = env.fromElements("a", "b", "c");

// From file
DataStream<String> stream3 = env.readTextFile("file:///path/to/file");

// From socket
DataStream<String> stream4 = env.socketTextStream("localhost", 9999);

// From Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
    "topic", new SimpleStringSchema(), properties);
    
// Configure Kafka source options
kafkaSource.setStartFromEarliest();      // start from earliest offset
// OR kafkaSource.setStartFromLatest();  // start from latest offset
// OR kafkaSource.setStartFromTimestamp(1651660800000L); // start from timestamp

DataStream<String> stream5 = env.addSource(kafkaSource);

// Custom source
DataStream<SensorReading> stream6 = env.addSource(new SensorSource());

Basic Transformations

// Map - one-to-one transformation
DataStream<Integer> mapped = stream.map(s -> s.length());

// FlatMap - one-to-many transformation
DataStream<String> flatMapped = stream.flatMap(
    (String value, Collector<String> out) -> {
        for (String word : value.split(" ")) {
            out.collect(word);
        }
    });

// Filter - keep elements that match predicate
DataStream<String> filtered = stream.filter(s -> s.startsWith("a"));

// KeyBy - partition/group stream by key
KeyedStream<SensorReading, String> keyed = stream.keyBy(reading -> reading.getSensorId());

// Reduce - rolling aggregation on keyed stream
DataStream<SensorReading> reduced = keyed.reduce((r1, r2) -> 
    new SensorReading(r1.getSensorId(), r2.getTimestamp(), Math.max(r1.getTemperature(), r2.getTemperature())));

// Process - low-level operation with full state access
DataStream<Alert> processed = stream.process(new MyProcessFunction());

Windows

// Tumbling time windows (event time)
stream.keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .aggregate(new MyAggregateFunction());

// Sliding time windows (event time)
stream.keyBy(x -> x.key)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .reduce(new MyReduceFunction());

// Session windows (event time)
stream.keyBy(x -> x.key)
    .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
    .apply(new MyWindowFunction());

// Count windows (tumbling)
stream.keyBy(x -> x.key)
    .countWindow(100)
    .aggregate(new MyAggregateFunction());

// Count windows (sliding)
stream.keyBy(x -> x.key)
    .countWindow(100, 10)
    .aggregate(new MyAggregateFunction());

// Global windows (need custom trigger)
stream.keyBy(x -> x.key)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(5))
    .process(new MyProcessWindowFunction());

Time Handling

// Set event time characteristics
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Watermark strategies
// Periodic: extract timestamp and allow specific lateness
WatermarkStrategy<MyEvent> periodicStrategy = WatermarkStrategy
    .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Punctuated: emit watermarks based on specific events
WatermarkStrategy<MyEvent> punctuatedStrategy = WatermarkStrategy
    .<MyEvent>forGenerator(context -> new PunctuatedWatermarkGenerator<>())
    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Apply watermark strategy
DataStream<MyEvent> withTimestamps = stream.assignTimestampsAndWatermarks(periodicStrategy);

// Handle late events
windowedStream
    .allowedLateness(Time.seconds(10))
    .sideOutputLateData(OutputTag<MyEvent>("late-events"))
    .process(...);

// Retrieve late events
DataStream<MyEvent> lateEvents = result.getSideOutput(new OutputTag<MyEvent>("late-events"));

State Management

// Keyed state in Process Function
public class CountFunction extends KeyedProcessFunction<String, Event, Result> {
    // Value state (single value per key)
    private ValueState<Long> countState;
    // List state (list of values per key)
    private ListState<Event> eventsState;
    // Map state (map of values per key)
    private MapState<String, Long> mapState;
    
    @Override
    public void open(Configuration parameters) {
        // Initialize value state
        countState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("count", Long.class));
        
        // Initialize list state
        eventsState = getRuntimeContext().getListState(
            new ListStateDescriptor<>("events", Event.class));
            
        // Initialize map state
        mapState = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("map", String.class, Long.class));
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) throws Exception {
        // Value state operations
        Long count = countState.value();
        if (count == null) count = 0L;
        countState.update(count + 1);
        
        // List state operations
        eventsState.add(event);
        Iterable<Event> events = eventsState.get();
        
        // Map state operations
        mapState.put(event.getId(), count);
        mapState.get(event.getId());
        mapState.contains(event.getId());
        
        // Emit result
        out.collect(new Result(event.getKey(), count));
        
        // Use timers (event time)
        ctx.timerService().registerEventTimeTimer(event.getTimestamp() + 60000);
        
        // Use timers (processing time)
        ctx.timerService().registerProcessingTimeTimer(
            ctx.timerService().currentProcessingTime() + 60000);
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
        // Timer firing logic
        long count = countState.value();
        out.collect(new Result(ctx.getCurrentKey(), count, timestamp));
    }
}

Operator State

public class BufferingSink implements SinkFunction<String>, 
                             CheckpointedFunction {
    
    private final int threshold;
    private List<String> bufferedElements;
    private ListState<String> checkpointedState;
    
    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }
    
    @Override
    public void invoke(String value, Context contxt) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            // Send buffered elements to sink
            for (String element: bufferedElements) {
                // Send element to target sink
            }
            bufferedElements.clear();
        }
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (String element : bufferedElements) {
            checkpointedState.add(element);
        }
    }
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> descriptor = 
            new ListStateDescriptor<>("buffered-elements", String.class);
            
        checkpointedState = context
            .getOperatorStateStore()
            .getListState(descriptor);
            
        if (context.isRestored()) {
            for (String element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

Joins and Co-processing

// Window join
DataStream<Tuple3<String, Integer, Integer>> joined = 
    stream1.join(stream2)
        .where(r1 -> r1.getKey())
        .equalTo(r2 -> r2.getKey())
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply((r1, r2) -> new Tuple3<>(r1.getKey(), r1.getValue(), r2.getValue()));

// Interval join (event time)
DataStream<Enriched> intervalJoined = 
    orders.keyBy(Order::getCustomerId)
        .intervalJoin(customers.keyBy(Customer::getId))
        .between(Time.seconds(-5), Time.seconds(10))
        .process(new ProcessJoinFunction<Order, Customer, Enriched>() {
            @Override
            public void processElement(Order order, Customer customer, 
                      Context ctx, Collector<Enriched> out) {
                out.collect(new Enriched(order, customer));
            }
        });

// Connect + CoProcess
ConnectedStreams<String, Integer> connected = 
    stream1.connect(stream2);
    
DataStream<Result> processed = connected
    .keyBy(s -> s, i -> i.toString())
    .process(new CoProcessFunction<String, Integer, Result>() {
        // State for first input
        private ValueState<String> firstState;
        // State for second input
        private ValueState<Integer> secondState;
        
        @Override
        public void open(Configuration parameters) {
            firstState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("first", String.class));
            secondState = getRuntimeContext().getState(
                new ValueStateDescriptor<>("second", Integer.class));
        }
        
        @Override
        public void processElement1(String value, Context ctx, 
                                 Collector<Result> out) throws Exception {
            firstState.update(value);
            if (secondState.value() != null) {
                out.collect(new Result(value, secondState.value()));
            }
        }
        
        @Override
        public void processElement2(Integer value, Context ctx, 
                                 Collector<Result> out) throws Exception {
            secondState.update(value);
            if (firstState.value() != null) {
                out.collect(new Result(firstState.value(), value));
            }
        }
    });

Output Operations

// Print to console (for debugging)
stream.print();
stream.printToErr();

// Write to file
stream.writeAsText("file:///path/to/file");
stream.writeAsCsv("file:///path/to/file");

// Custom sink function
stream.addSink(new FlinkKafkaProducer<>(
    "topic",
    new SimpleStringSchema(),
    properties));

// JDBC sink
stream.addSink(JdbcSink.sink(
    "INSERT INTO table (col1, col2) VALUES (?, ?)",
    (statement, value) -> {
        statement.setString(1, value.getField1());
        statement.setInt(2, value.getField2());
    },
    JdbcConnectionOptions.builder()
        .withUrl("jdbc:mysql://localhost:3306/database")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("username")
        .withPassword("password")
        .build()));

Table API & SQL

Environment Setup

// Create TableEnvironment from StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create TableEnvironment directly (batch mode)
TableEnvironment batchTableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().inBatchMode().build());

// Create TableEnvironment directly (streaming mode)
TableEnvironment streamTableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance().inStreamingMode().build());

Converting Between DataStream and Table

// DataStream to Table
DataStream<Person> personStream = ...;
Table personTable = tableEnv.fromDataStream(personStream);

// With field renaming
Table namedTable = tableEnv.fromDataStream(personStream, 
    $("firstName"), $("lastName"), $("age"));

// Table to DataStream (append mode)
Table resultTable = ...;
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

// Table to DataStream (update mode)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(resultTable);

// Convert to typed DataStream
DataStream<Person> typedStream = tableEnv.toDataStream(resultTable, Person.class);

Registering Tables and Catalogs

// Register table from DataStream
tableEnv.createTemporaryView("people", personStream);

// Register table from Table object
tableEnv.createTemporaryView("filtered_people", filteredTable);

// Register connector table (Kafka example)
tableEnv.executeSql(
    "CREATE TABLE kafka_table (" +
    "  `user` STRING," +
    "  `product` STRING," +
    "  `amount` INT," +
    "  ts TIMESTAMP(3)," +
    "  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'properties.group.id' = 'test-group'," +
    "  'scan.startup.mode' = 'earliest-offset'," +
    "  'format' = 'json'" +
    ")");

// Register catalog
Catalog hiveCatalog = new HiveCatalog(
    "hive_catalog",
    "default",
    "<path_to_hive_conf>", 
    "<hive_version>");
tableEnv.registerCatalog("hive_catalog", hiveCatalog);

// Set current catalog and database
tableEnv.useCatalog("hive_catalog");
tableEnv.useDatabase("sales");

Table API Queries

// Create table with the Table API
Table people = tableEnv.from("people");

// Selection and filtering
Table result = people
    .select($("name"), $("age"))
    .where($("age").isGreaterOrEqual(21));

// Aggregations
Table counts = people
    .groupBy($("department"))
    .select($("department"), $("age").count().as("count"), 
            $("salary").avg().as("avg_salary"));

// Joins
Table joined = orders
    .join(customers)
    .where($("orders.customerId").isEqual($("customers.id")));

// Left outer join 
Table leftJoined = orders
    .leftOuterJoin(customers,
        $("orders.customerId").isEqual($("customers.id")));

// Window aggregations - Tumbling window
Table windowed = tableEnv.from("sensor_readings")
    .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w"))
    .groupBy($("sensor_id"), $("w"))
    .select(
        $("sensor_id"),
        $("w").start().as("window_start"),
        $("w").end().as("window_end"),
        $("temperature").avg().as("avg_temp"));

// Window aggregations - Sliding window
Table slidingWindow = tableEnv.from("sensor_readings")
    .window(Slide.over(lit(1).hours())
        .every(lit(15).minutes())
        .on($("timestamp"))
        .as("w"))
    .groupBy($("sensor_id"), $("w"))
    .select(
        $("sensor_id"),
        $("w").start(),
        $("w").end(),
        $("temperature").max().as("max_temp"));

// Window aggregations - Session window
Table sessionWindow = tableEnv.from("user_actions")
    .window(Session.withGap(lit(15).minutes())
        .on($("action_time"))
        .as("w"))
    .groupBy($("user_id"), $("w"))
    .select(
        $("user_id"),
        $("w").start(),
        $("w").end(),
        $("action").count().as("action_count"));

// Execute and fetch results
TableResult tableResult = result.execute();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
    while (iterator.hasNext()) {
        Row row = iterator.next();
        System.out.println(row);
    }
}

SQL Queries

// Simple SQL query
Table result = tableEnv.sqlQuery(
    "SELECT name, age FROM people WHERE age >= 21");

// Aggregation query
Table counts = tableEnv.sqlQuery(
    "SELECT department, COUNT(*) as count, AVG(salary) as avg_salary " +
    "FROM people " +
    "GROUP BY department");

// Join query
Table joined = tableEnv.sqlQuery(
    "SELECT o.id, o.amount, c.name " +
    "FROM orders o " +
    "JOIN customers c ON o.customerId = c.id");

// Tumbling window
Table windowResult = tableEnv.sqlQuery(
    "SELECT sensor_id, " +
    "  TUMBLE_START(timestamp, INTERVAL '10' MINUTE) AS window_start, " +
    "  TUMBLE_END(timestamp, INTERVAL '10' MINUTE) AS window_end, " +
    "  AVG(temperature) AS avg_temp " +
    "FROM sensor_readings " +
    "GROUP BY sensor_id, TUMBLE(timestamp, INTERVAL '10' MINUTE)");

// Execute SQL update statement
tableEnv.executeSql(
    "INSERT INTO output_table " +
    "SELECT name, COUNT(*) as count " +
    "FROM people " +
    "GROUP BY name");

Connectors

Kafka

// Kafka source (DataStream API)
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");

FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
    "topic", new SimpleStringSchema(), properties);
    
// Configure source options
kafkaSource.setStartFromEarliest();      // start from earliest offset
// OR kafkaSource.setStartFromLatest();  // start from latest offset
// OR kafkaSource.setStartFromTimestamp(1651660800000L); // start from timestamp
// OR kafkaSource.setStartFromGroupOffsets(); // default - start from committed offsets

DataStream<String> stream = env.addSource(kafkaSource);

// Kafka sink (DataStream API)
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
    "output-topic",                   // target topic
    new SimpleStringSchema(),         // serialization schema
    properties,                       // producer config
    FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // semantic
    
stream.addSink(kafkaSink);

// Kafka connector (Table API / SQL)
tableEnv.executeSql(
    "CREATE TABLE kafka_table (" +
    "  `user` STRING," +
    "  `product` STRING," +
    "  `amount` INT," +
    "  ts TIMESTAMP(3)," +
    "  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'properties.group.id' = 'test-group'," +
    "  'scan.startup.mode' = 'earliest-offset'," +
    "  'format' = 'json'" +
    ")");

File Systems

// File source (DataStream API)
DataStream<String> stream = env.readTextFile("file:///path/to/file");
// OR with HDFS
DataStream<String> hdfsStream = env.readTextFile("hdfs://namenode:9000/path/file");

// File sink (DataStream API)
stream.writeAsText("file:///path/to/output");
// OR with partitioning
stream.writeAsText("file:///path/to/output")
    .setParallelism(1); // For a single file

// File connector (Table API / SQL)
tableEnv.executeSql(
    "CREATE TABLE csv_table (" +
    "  `user` STRING," +
    "  `product` STRING," +
    "  `amount` INT" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = 'file:///path/to/csv_file'," +
    "  'format' = 'csv'" +
    ")");

// Parquet format
tableEnv.executeSql(
    "CREATE TABLE parquet_table (" +
    "  `user` STRING," +
    "  `product` STRING," +
    "  `amount` INT" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = 'file:///path/to/parquet_file'," +
    "  'format' = 'parquet'" +
    ")");

Databases

// JDBC sink (DataStream API)
stream.addSink(JdbcSink.sink(
    "INSERT INTO orders (user_id, product, amount) VALUES (?, ?, ?)",
    (statement, order) -> {
        statement.setString(1, order.getUserId());
        statement.setString(2, order.getProduct());
        statement.setInt(3, order.getAmount());
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
    JdbcConnectionOptions.builder()
        .withUrl("jdbc:mysql://localhost:3306/database")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("username")
        .withPassword("password")
        .build()));

// JDBC connector (Table API / SQL)
tableEnv.executeSql(
    "CREATE TABLE jdbc_table (" +
    "  id INT," +
    "  name STRING," +
    "  age INT," +
    "  PRIMARY KEY (id) NOT ENFORCED" +
    ") WITH (" +
    "  'connector' = 'jdbc'," +
    "  'url' = 'jdbc:mysql://localhost:3306/test'," +
    "  'table-name' = 'users'," +
    "  'username' = 'root'," +
    "  'password' = 'password'" +
    ")");

// Elasticsearch connector (Table API / SQL)
tableEnv.executeSql(
    "CREATE TABLE es_table (" +
    "  user_id STRING," +
    "  message STRING," +
    "  ts TIMESTAMP(3)," +
    "  PRIMARY KEY (user_id) NOT ENFORCED" +
    ") WITH (" +
    "  'connector' = 'elasticsearch-7'," +
    "  'hosts' = 'http://localhost:9200'," +
    "  'index' = 'users'," +
    "  'document-type' = '_doc'" +
    ")");

Formats

// JSON format
tableEnv.executeSql(
    "CREATE TABLE json_table (" +
    "  user_id STRING," +
    "  order_id BIGINT," +
    "  timestamp TIMESTAMP(3)," +
    "  WATERMARK FOR timestamp AS timestamp - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'properties.group.id' = 'test-group'," +
    "  'format' = 'json'," +
    "  'json.fail-on-missing-field' = 'false'," +
    "  'json.ignore-parse-errors' = 'true'" +
    ")");

// CSV format
tableEnv.executeSql(
    "CREATE TABLE csv_table (" +
    "  user_id STRING," +
    "  order_id BIGINT," +
    "  amount INT" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = 'file:///path/to/file'," +
    "  'format' = 'csv'," +
    "  'csv.field-delimiter' = ';'," +
    "  'csv.quote-character' = '\"'," +
    "  'csv.null-literal' = 'n/a'," +
    "  'csv.allow-comments' = 'true'" +
    ")");

// Avro format
tableEnv.executeSql(
    "CREATE TABLE avro_table (" +
    "  user_id STRING," +
    "  order_id BIGINT," +
    "  amount INT" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = 'file:///path/to/file'," +
    "  'format' = 'avro'" +
    ")");

// Parquet format
tableEnv.executeSql(
    "CREATE TABLE parquet_table (" +
    "  user_id STRING," +
    "  order_id BIGINT," +
    "  amount INT" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = 'file:///path/to/file'," +
    "  'format' = 'parquet'" +
    ")");

Configuration & Deployment

Configuration Parameters

// Via code
Configuration config = new Configuration();
config.setString("state.backend", "rocksdb");
config.setString("state.checkpoints.dir", "file:///checkpoint-dir");
config.setString("execution.checkpointing.interval", "60000");

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment(config);

// Via environment variables
// export FLINK_CONF_DIR=/path/to/conf
// export FLINK_ENV_JAVA_OPTS="-Dstate.backend=rocksdb"

// Via flink-conf.yaml
// state.backend: rocksdb
// state.checkpoints.dir: file:///checkpoint-dir
// execution.checkpointing.interval: 60000

State Backends

// Memory state backend (for testing)
env.setStateBackend(new MemoryStateBackend());

// Filesystem state backend
env.setStateBackend(new FsStateBackend(
    "file:///path/to/checkpoint-dir", // or hdfs://
    true)); // async snapshots

// RocksDB state backend (for large state)
env.setStateBackend(new RocksDBStateBackend(
    "file:///path/to/checkpoint-dir", // or hdfs://
    true)); // incremental checkpoints

Deployment Commands

# Submit job to running cluster
flink run -c com.example.MainClass path/to/flink-job.jar

# Run with parallelism
flink run -p 4 -c com.example.MainClass path/to/flink-job.jar

# Run with savepoint
flink run -s path/to/savepoint -c com.example.MainClass path/to/flink-job.jar

# List running jobs
flink list

# Cancel job
flink cancel <jobID>

# Cancel job with savepoint
flink cancel -s path/to/savepoints <jobID>

# Create savepoint
flink savepoint <jobID> path/to/savepoints

# Start a Flink cluster (standalone)
start-cluster.sh

Savepoint Management

# Trigger savepoint for a job
flink savepoint <jobID> [targetDirectory]

# Cancel job with savepoint
flink cancel -s [targetDirectory] <jobID>

# Resume from savepoint
flink run -s <savepointPath> -c com.example.MainClass path/to/flink-job.jar

# Dispose savepoint
flink savepoint -d <savepointPath>

Performance & Troubleshooting

Parallelism Settings

// Set default parallelism for all operators
env.setParallelism(4);

// Set parallelism for specific operator
stream.map(...)
    .setParallelism(8);

// Max parallelism (for rescaling)
env.setMaxParallelism(128);

// Execution mode
env.setRuntimeMode(RuntimeMode.STREAMING);
// OR
env.setRuntimeMode(RuntimeMode.BATCH);

Common Performance Tips

  1. Right-size state backends: Use RocksDB for large state
  2. Tune checkpointing: Adjust frequency based on recovery needs
  3. Adjust buffer timeout: Control latency vs. throughput
env.setBufferTimeout(100); // in milliseconds
  1. Use partitioning wisely: Avoid skewed keys
  2. Configure memory: Adjust task manager memory
taskmanager.memory.process.size: 4g
  1. Measure throughput: Use metrics to identify bottlenecks
  2. Set optimal parallelism: Based on cluster resources and workload

Monitoring & Metrics

// Register custom metric
public class MyMapper extends RichMapFunction<String, String> {
    private transient Counter counter;
    
    @Override
    public void open(Configuration config) {
        counter = getRuntimeContext()
            .getMetricGroup()
            .counter("myCustomCounter");
    }
    
    @Override
    public String map(String value) throws Exception {
        counter.inc();
        return value.toUpperCase();
    }
}

// Access Flink Web UI: http://localhost:8081 by default

Fault Tolerance Settings

// Enable checkpointing
env.enableCheckpointing(60000); // 60 seconds

// Configure checkpointing behavior
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(30000); // 30 seconds
config.setCheckpointTimeout(900000); // 15 minutes
config.setMaxConcurrentCheckpoints(1);
config.setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.enableUnalignedCheckpoints();

Resources

Key Libraries & Dependencies

<!-- Flink Core -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>

<!-- Flink Table API & SQL -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-jvm</artifactId>
    <version>1.16.0</version>
</dependency>

<!-- Connectors -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch7</artifactId>
    <version>1.16.0</version>
</dependency>

<!-- State Backends -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>1.16.0</version>
</dependency>

Further Learning Resources

This cheatsheet provides a comprehensive overview of Apache Flink’s key features, APIs, and operations. For more detailed information, refer to the official Apache Flink documentation. # The Ultimate Apache Flink Cheatsheet: Concepts & Code Examples

Introduction to Apache Flink

Apache Flink is a distributed processing engine for stateful computations over bounded and unbounded data streams. It provides exactly-once processing semantics, high throughput, low latency, and native support for iterative processing and event time windowing. This cheatsheet covers essential concepts, APIs, operations, and best practices for Apache Flink development.

Core Concepts

Flink Architecture Components

  • JobManager: Coordinates distributed execution, scheduling tasks, managing checkpoints
  • TaskManager: Worker processes executing tasks, managing memory and resources
  • Client: Submits applications to the cluster
  • ResourceManager: Manages TaskManager slots and resources across the cluster

Data Processing Models

ModelDescriptionUse Cases
DataStream APIProcessing unbounded/bounded streamsEvent processing, anomaly detection, streaming ETL
DataSet APIProcessing bounded datasets (deprecated in Flink 1.12+)Batch processing (use DataStream API with BATCH execution mode instead)
Table APIRelational API for streams and batchesSQL-like transformations, aggregations
SQLStandard SQL interfaceAnalytics, ad-hoc queries
ProcessFunctionLow-level stream operations with stateComplex event processing, custom stateful logic

Time Semantics

  • Event Time: When event occurred (embedded in data)
  • Processing Time: When operator processes event (system clock)
  • Ingestion Time: When event enters Flink (system clock at source)

Setting Up Flink Applications

Maven Dependencies

<!-- Flink Core -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<!-- Flink Connectors -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- Table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<!-- SQL Client -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

Basic Application Structure

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        // Set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configure execution (checkpointing, parallelism, etc.)
        env.enableCheckpointing(60000); // Checkpoint every 60 seconds
        env.setParallelism(4);          // Set parallelism
        
        // Add source
        DataStream<String> source = env.addSource(new FlinkKafkaConsumer<>(...));
        
        // Transformations
        DataStream<ProcessedData> processed = source
            .map(new MyMapFunction())
            .keyBy(data -> data.getKey())
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .reduce(new MyReduceFunction());
        
        // Add sink
        processed.addSink(new FlinkKafkaProducer<>(...));
        
        // Execute program
        env.execute("Streaming Example");
    }
}

DataStream API Operations

Source Connectors

// Kafka Source
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
    "topic-name",
    new SimpleStringSchema(),
    properties
);
// Configure source behavior
kafkaSource.setStartFromEarliest();      // Start from earliest offset
kafkaSource.setStartFromLatest();        // Start from latest offset
kafkaSource.setStartFromTimestamp(1234); // Start from specific timestamp
DataStream<String> stream = env.addSource(kafkaSource);

// File Source
DataStream<String> fileStream = env.readTextFile("file:///path/to/file");

// Socket Source (for testing)
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

// Collection Source (for testing)
DataStream<Integer> collectionStream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));

// Custom Source
DataStream<SensorReading> sensorStream = env.addSource(new SensorSource());

Basic Transformations

// Map: one-to-one transformation
DataStream<Integer> mapped = stream.map(s -> Integer.parseInt(s));

// FlatMap: one-to-many transformation
DataStream<String> words = stream.flatMap((String line, Collector<String> out) -> {
    for (String word : line.split(" ")) {
        out.collect(word);
    }
});

// Filter: keep only elements matching a predicate
DataStream<Integer> filtered = stream.filter(value -> value > 100);

// KeyBy: partition data based on key
KeyedStream<Event, String> keyedStream = stream.keyBy(event -> event.getUserId());

// Reduce: aggregation on keyed stream
DataStream<Event> reduced = keyedStream.reduce((a, b) -> new Event(a.getUserId(), a.getValue() + b.getValue()));

// Aggregations: built-in aggregation functions
DataStream<Tuple2<String, Integer>> summed = keyedStream.sum(1); // Sum second field
DataStream<Tuple2<String, Integer>> maxed = keyedStream.max(1);  // Max second field
DataStream<Tuple2<String, Integer>> minBy = keyedStream.minBy(1); // Min by second field

// Project (for Tuple streams): select subset of fields
DataStream<Tuple2<String, Long>> projected = tupleStream.project(0, 2); // select fields 0 and 2

Windows

// Time-based Windows
// Tumbling time windows (non-overlapping, fixed size)
stream.keyBy(x -> x.key)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .sum("value");

// Sliding time windows (overlapping, fixed size)
stream.keyBy(x -> x.key)
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      .apply(new MyWindowFunction());

// Session windows (dynamic size, timeout-based)
stream.keyBy(x -> x.key)
      .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
      .reduce((a, b) -> a.add(b));

// Count-based Windows
// Tumbling count windows
stream.keyBy(x -> x.key)
      .countWindow(100) // 100 elements per window
      .aggregate(new AverageAggregate());

// Sliding count windows
stream.keyBy(x -> x.key)
      .countWindow(100, 10) // window size 100, slide by 10
      .reduce(new MyReduceFunction());

// Window Functions
// ReduceFunction
windowedStream.reduce((a, b) -> new Sum(a.key, a.value + b.value));

// AggregateFunction (incremental aggregation)
windowedStream.aggregate(new AverageAggregate());

// ProcessWindowFunction (full window access)
windowedStream.process(new MyProcessWindowFunction());

// Combine incremental aggregation with full window
windowedStream.reduce(
    (a, b) -> a + b,
    new ProcessWindowFunction<Integer, Result, String, TimeWindow>() {
        @Override
        public void process(
                String key,
                Context ctx,
                Iterable<Integer> sum,
                Collector<Result> out) {
            out.collect(new Result(key, sum.iterator().next(), ctx.window().getStart(), ctx.window().getEnd()));
        }
    }
);

Event Time Processing

// Setting up environment with event time
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Watermark generation (periodic)
DataStream<Event> withTimestampsAndWatermarks = stream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
    );

// For records with monotonically increasing timestamps
DataStream<Event> withTimestampsAndWatermarks = stream
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forMonotonousTimestamps()
            .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
    );

// Custom watermark strategy
WatermarkStrategy<Event> watermarkStrategy = new WatermarkStrategy<Event>() {
    @Override
    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new MyWatermarkGenerator();
    }
}.withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Handling late events
windowedStream
    .allowedLateness(Time.seconds(10))   // Allow late events up to 10 seconds
    .sideOutputLateData(lateOutputTag);  // Collect late events to side output

// Retrieve the side output stream
DataStream<Event> lateEvents = mainStream.getSideOutput(lateOutputTag);

State Management

// Keyed State (available in keyed streams)

// Value State
public class StatefulFunctionExample extends KeyedProcessFunction<String, Event, Result> {
    // Declare state
    private ValueState<Long> countState;
    private ValueState<Long> timestampState;
    
    @Override
    public void open(Configuration parameters) {
        // Initialize state
        countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
        timestampState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timestamp", Long.class));
    }
    
    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) throws Exception {
        // Access and update state
        Long count = countState.value();
        if (count == null) {
            count = 0L;
        }
        countState.update(count + 1);
        
        // Schedule timer
        long currentTime = ctx.timestamp();
        timestampState.update(currentTime);
        ctx.timerService().registerEventTimeTimer(currentTime + 60000); // 60 seconds later
        
        out.collect(new Result(event.getKey(), count + 1));
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
        // Timer callback
        Long storedTimestamp = timestampState.value();
        if (storedTimestamp != null && storedTimestamp + 60000 == timestamp) {
            out.collect(new Result(ctx.getCurrentKey(), countState.value(), "Timer fired"));
            countState.clear(); // Reset state
        }
    }
}

// List State
ListState<Event> listState = getRuntimeContext().getListState(
    new ListStateDescriptor<>("events", Event.class));
// Add items
listState.add(event);
// Get all items
Iterable<Event> events = listState.get();
// Add multiple items
listState.addAll(Arrays.asList(event1, event2));
// Clear state
listState.clear();

// Map State
MapState<String, Long> mapState = getRuntimeContext().getMapState(
    new MapStateDescriptor<>("counts", String.class, Long.class));
// Put entry
mapState.put(key, value);
// Get entry
Long value = mapState.get(key);
// Check if key exists
boolean exists = mapState.contains(key);
// Iterate entries
for (Map.Entry<String, Long> entry : mapState.entries()) {
    // Process entry
}
// Remove entry
mapState.remove(key);

// Reducing State
ReducingState<Integer> reducingState = getRuntimeContext().getReducingState(
    new ReducingStateDescriptor<>("sum", (a, b) -> a + b, Integer.class));
// Add value (will be reduced with current value)
reducingState.add(value);
// Get current value
Integer result = reducingState.get();

// Aggregating State
AggregatingState<Integer, Double> avgState = getRuntimeContext().getAggregatingState(
    new AggregatingStateDescriptor<>("average", 
        new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
            @Override
            public Tuple2<Integer, Integer> createAccumulator() {
                return Tuple2.of(0, 0);
            }
            
            @Override
            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> acc) {
                return Tuple2.of(acc.f0 + value, acc.f1 + 1);
            }
            
            @Override
            public Double getResult(Tuple2<Integer, Integer> acc) {
                return acc.f1 == 0 ? 0.0 : (double) acc.f0 / acc.f1;
            }
            
            @Override
            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
            }
        }, 
        TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})));

// Operator State (applies to entire operator)
public class BufferingSink implements SinkFunction<Integer>, 
                                      CheckpointedFunction {
    
    private final int threshold;
    private List<Integer> bufferedElements;
    private ListState<Integer> checkpointedState;
    
    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }
    
    @Override
    public void invoke(Integer value, Context context) {
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            // Send buffered elements to sink
            for (Integer element : bufferedElements) {
                // Write to external system
            }
            bufferedElements.clear();
        }
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Integer element : bufferedElements) {
            checkpointedState.add(element);
        }
    }
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        checkpointedState = context.getOperatorStateStore()
            .getListState(new ListStateDescriptor<>("buffered-elements", Integer.class));
        
        if (context.isRestored()) {
            // Restore state after failure
            for (Integer element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

// Broadcast State (for sharing state across tasks)
// Define broadcast state descriptor
MapStateDescriptor<String, String> configDescriptor = 
    new MapStateDescriptor<>("config", String.class, String.class);

// Broadcast the configuration stream
BroadcastStream<Config> broadcastConfig = configStream.broadcast(configDescriptor);

// Connect with main stream and process
mainStream
    .connect(broadcastConfig)
    .process(new BroadcastProcessFunction<Event, Config, Result>() {
        @Override
        public void processElement(Event event, ReadOnlyContext ctx, Collector<Result> out) {
            // Access broadcast state (read-only)
            String config = ctx.getBroadcastState(configDescriptor).get(event.getType());
            // Process based on current config
            out.collect(new Result(event, config));
        }
        
        @Override
        public void processBroadcastElement(Config config, Context ctx, Collector<Result> out) {
            // Update broadcast state with new config
            ctx.getBroadcastState(configDescriptor).put(config.getType(), config.getValue());
        }
    });

Checkpointing & Fault Tolerance

// Enable and configure checkpointing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Basic checkpointing every 5 seconds
env.enableCheckpointing(5000);

// Advanced configuration
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // Default mode
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60 seconds
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 0.5 seconds
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Default: 1
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// Set checkpoint storage location
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints");

// Configure state backend
// Memory state backend (for testing)
env.setStateBackend(new MemoryStateBackend()); 

// FS state backend
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));

// RocksDB state backend (for large state)
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/checkpoints", true));

Side Outputs

// Define output tag for side output
final OutputTag<LateEvent> lateOutputTag = new OutputTag<LateEvent>("late-events") {};

// Emit to side output from ProcessFunction
public class ProcessFunctionWithSideOutput extends ProcessFunction<Event, Result> {
    @Override
    public void processElement(Event event, Context ctx, Collector<Result> out) {
        // Regular output
        out.collect(new Result(event));
        
        // Side output for specific events
        if (event.getTimestamp() < ctx.timerService().currentWatermark()) {
            ctx.output(lateOutputTag, new LateEvent(event));
        }
    }
}

// Apply process function
SingleOutputStreamOperator<Result> mainStream = 
    inputStream.process(new ProcessFunctionWithSideOutput());

// Retrieve side output stream
DataStream<LateEvent> lateEvents = mainStream.getSideOutput(lateOutputTag);

Process Functions (Low-Level Operations)

// KeyedProcessFunction
public class MonitoringAlerts extends KeyedProcessFunction<String, SensorReading, Alert> {
    // State to store last temperature
    private ValueState<Double> lastTempState;
    // State to store current timer
    private ValueState<Long> timerState;
    // Threshold for alerts
    private final double threshold;
    
    public MonitoringAlerts(double threshold) {
        this.threshold = threshold;
    }
    
    @Override
    public void open(Configuration parameters) {
        lastTempState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-temp", Double.class));
        timerState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("timer", Long.class));
    }
    
    @Override
    public void processElement(SensorReading reading, Context ctx, Collector<Alert> out) throws Exception {
        // Get previous temperature
        Double lastTemp = lastTempState.value();
        // Update state with current temperature
        lastTempState.update(reading.getTemperature());
        
        // First reading or temperature decreased
        if (lastTemp == null || reading.getTemperature() < lastTemp) {
            // Remove existing timer if any
            Long timer = timerState.value();
            if (timer != null) {
                ctx.timerService().deleteEventTimeTimer(timer);
                timerState.clear();
            }
        }
        // Temperature increased beyond threshold
        else if (reading.getTemperature() >= lastTemp + threshold) {
            // Set timer for 1 minute in the future if not already set
            if (timerState.value() == null) {
                long timerTs = ctx.timestamp() + 60000;
                ctx.timerService().registerEventTimeTimer(timerTs);
                timerState.update(timerTs);
            }
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        // Timer fired - emit alert
        out.collect(new Alert(ctx.getCurrentKey(), 
                     "Temperature rising for more than 1 minute"));
        // Clear timer state
        timerState.clear();
    }
}

// CoProcessFunction (for connected streams)
public class TemperatureAlertFunction extends CoProcessFunction<SensorReading, ThresholdUpdate, Alert> {
    // State for the threshold
    private ValueState<Double> thresholdState;
    
    @Override
    public void open(Configuration parameters) {
        thresholdState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("threshold", Double.class));
    }
    
    @Override
    public void processElement1(SensorReading reading, Context ctx, Collector<Alert> out) throws Exception {
        // Process temperature reading
        Double threshold = thresholdState.value();
        if (threshold != null && reading.getTemperature() > threshold) {
            out.collect(new Alert(reading.getId(), 
                       "Temperature " + reading.getTemperature() + 
                       " exceeded threshold " + threshold));
        }
    }
    
    @Override
    public void processElement2(ThresholdUpdate update, Context ctx, Collector<Alert> out) throws Exception {
        // Update threshold
        thresholdState.update(update.getThreshold());
    }
}

Sink Connectors

// Kafka Sink
stream.addSink(new FlinkKafkaProducer<>(
    "output-topic",
    new SimpleStringSchema(),
    properties
));

// File Sink (StreamingFileSink for continuous files)
final StreamingFileSink<String> fileSink = StreamingFileSink
    .forRowFormat(new Path("file:///path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
            .withMaxPartSize(1024 * 1024 * 1024)
            .build())
    .build();
    
stream.addSink(fileSink);

// JDBC Sink (custom implementation)
stream.addSink(new JdbcSink<>(
    "jdbc:mysql://localhost:3306/database",
    "INSERT INTO events (id, timestamp, value) VALUES (?, ?, ?)",
    (statement, event) -> {
        statement.setString(1, event.getId());
        statement.setLong(2, event.getTimestamp());
        statement.setDouble(3, event.getValue());
    },
    JdbcConnectionOptions.builder()
        .withUrl("jdbc:mysql://localhost:3306/database")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("username")
        .withPassword("password")
        .build()
));

// Elasticsearch Sink
stream.addSink(new ElasticsearchSink.Builder<>(
    httpHosts,
    new ElasticsearchSinkFunction<Event>() {
        @Override
        public void process(Event event, RuntimeContext ctx, RequestIndexer indexer) {
            Map<String, Object> json = new HashMap<>();
            json.put("id", event.getId());
            json.put("timestamp", event.getTimestamp());
            json.put("value", event.getValue());
            
            IndexRequest request = Requests.indexRequest()
                .index("index-name")
                .type("type-name")
                .source(json);
                
            indexer.add(request);
        }
    }
).build());

// Print Sink (for debugging)
stream.print();  // Print to standard output
stream.print("prefix");  // With custom prefix
stream.printToErr();  // Print to standard error

Table API & SQL

Setting Up Table Environment

// Create TableEnvironment from StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Or create stand-alone TableEnvironment for batch processing
TableEnvironment batchTableEnv = TableEnvironment.create(
    EnvironmentSettings.newInstance()
        .inBatchMode()
        .build());

// Register catalog
tableEnv.registerCatalog("my_catalog", new HiveCatalog(...));

// Use catalog
tableEnv.useCatalog("my_catalog");

// Register table from DataStream
tableEnv.createTemporaryView("events", dataStream, $("id"), $("timestamp"), $("value"));

// Register table from external source
tableEnv.executeSql("CREATE TABLE orders (" +
    "order_id BIGINT, " +
    "price DECIMAL(10, 2), " +
    "currency STRING, " +
    "order_time TIMESTAMP(3), " +
    "proc_time AS PROCTIME(), " +
    "WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND " +
    ") WITH (" +
    "  'connector' = 'kafka', " +
    "  'topic' = 'orders', " +
    "  'properties.bootstrap.servers' = 'localhost:9092', " +
    "  'format' = 'json' " +
    ")");

Table API Examples

// Create a Table
Table orders = tableEnv.from("orders");

// Table transformations
Table result = orders
    // Filter
    .filter($("price").isGreater(100))
    // Project
    .select($("order_id"), $("price"), $("currency"))
    // Group & aggregate
    .groupBy($("currency"))
    .aggregate($("price").sum().as("total_price"))
    .select($("currency"), $("total_price"));

// Join
Table ordersWithCustomers = orders
    .join(customers)
    .where($("orders.customer_id").isEqual($("customers.id")))
    .select($("orders.order_id"), $("customers.name"), $("orders.price"));

// Window aggregation
Table windowedAgg = orders
    .window(Tumble.over(lit(10).minutes()).on($("order_time")).as("w"))
    .groupBy($("w"), $("currency"))
    .aggregate($("price").sum().as("window_price"))
    .select(
        $("w").start().as("window_start"),
        $("w").end().as("window_end"),
        $("currency"),
        $("window_price")
    );

// Write table to sink
TableDescriptor sinkDescriptor = TableDescriptor.forConnector("kafka")
    .schema(Schema.newBuilder()
        .column("currency", DataTypes.STRING())
        .column("total_price", DataTypes.DECIMAL(10, 2))
        .build())
    .option("topic", "results")
    .option("properties.bootstrap.servers", "localhost:9092")
    .format(FormatDescriptor.forFormat("json").build())
    .build();

result.executeInsert(sinkDescriptor);

// Convert Table back to DataStream
// Append mode (for insert-only queries)
DataStream<Row> appendStream = tableEnv.toDataStream(result);

// Retract mode (for updating queries)
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(result, Row.class);

SQL Examples

// Simple SQL query
Table result = tableEnv.sqlQuery(
    "SELECT currency, SUM(price) AS total_price " +
    "FROM orders " +
    "WHERE price > 100 " +
    "GROUP BY currency"
);

// SQL query with window
Table windowResult = tableEnv.sqlQuery(
    "SELECT " +
    "  TUMBLE_START(order_time, INTERVAL '10' MINUTE) AS window_start, " +
    "  TUMBLE_END(order_time, INTERVAL '10' MINUTE) AS window_end, " +
    "  currency, " +
    "  SUM(price) AS window_price " +
    "FROM orders " +
    "GROUP BY " +
    "  TUMBLE(order_time, INTERVAL '10' MINUTE), " +
    "  currency"
);

// Execute SQL update statements
tableEnv.executeSql(
    "INSERT INTO results " +
    "SELECT currency, SUM(price) AS total_price " +
    "FROM orders " +
    "GROUP BY currency"
);

// Create a view
tableEnv.createTemporaryView("high_value_orders", 
    tableEnv.sqlQuery("SELECT * FROM orders WHERE price > 1000"));

Performance Tuning

Parallelism & Resources

// Set default parallelism for the environment
env.setParallelism(4);

// Set parallelism for specific operator
dataStream.map(new MyMapper()).setParallelism(8);

// Set max parallelism (for rescaling state)
env.setMaxParallelism(128);

// Memory configuration (via config or flink-conf.yaml)
Configuration config = new Configuration();
config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024));
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(512));

State Optimization

// State TTL (Time-to-Live) configuration
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()
    .build();

ValueStateDescriptor<String> stateDescriptor = 
    new ValueStateDescriptor<>("my-state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

// Configure RocksDB for large state
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs:///checkpoints");
// Predefined options
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
// Custom options
rocksDBStateBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
    @Override
    public DBOptions createDBOptions(DBOptions currentOptions, Collection<ColumnFamilyHandle> columnFamilyHandles) {
        return currentOptions.setMaxOpenFiles(10);
    }
});
env.setStateBackend(rocksDBStateBackend);

Efficient Data Types

// Use POJO classes
public class SensorReading {
    public String id;
    public long timestamp;
    public double temperature;
    
    // Default constructor for deserialization
    public SensorReading() {}
    
    public SensorReading(String id, long timestamp, double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }
}

// Or Tuple classes for simple data
DataStream<Tuple3<String, Long, Double>> tupleStream = ...

// Define TypeInformation for custom types if needed
TypeInformation<MyType> typeInfo = TypeInformation.of(MyType.class);

Deployment & Operations

Application Submission

# Submit a Flink job to a running cluster
./bin/flink run \
    -c com.example.MyStreamingJob \
    -p 4 \  # Parallelism
    path/to/my-flink-job.jar \
    --param1 value1 --param2 value2  # Job parameters

# Run with a savepoint
./bin/flink run \
    -s hdfs:///savepoints/savepoint-1234 \  # Savepoint path
    -c com.example.MyStreamingJob \
    path/to/my-flink-job.jar

# Submit with specific configuration options
./bin/flink run \
    -D taskmanager.memory.process.size=4g \
    -c com.example.MyStreamingJob \
    path/to/my-flink-job
Scroll to Top