Try   HackMD

Removing Flink Table API legacy

Incomplete plan to get rid of the legacy in Table API, including:

  • Old Factory stack
  • Old String expression stack
  • Old functions stack
  • Old type system
  • Legacy rules

A couple of notes first

  • When removing legacy in tests, make sure the test still makes sense after removing legacy. We have a lot of test utilities that are used only by tests effectively testing the old stack, and tests checking new stack might already have a similar utility implemented.
  • When porting test code, make sure to:
    • Don't rely on planner internals
    • Use JUnit 5/AssertJ
    • Move it to flink-table-tests module whenever is possible

Peeling the onion’s layers

1. Remove all implementations of TableSink and TableSource

Implementations of TableSink in  (25 usages found)
    Production  (11 usages found)
        flink-connector-cassandra_2.12  (1 usage found)
            org.apache.flink.streaming.connectors.cassandra  (1 usage found)
                CassandraAppendTableSink.java  (1 usage found)
                    33 public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
        flink-table-api-java-bridge  (6 usages found)
            org.apache.flink.table.sinks  (6 usages found)
                AppendStreamTableSink.java  (1 usage found)
                    39 public interface AppendStreamTableSink<T> extends StreamTableSink<T> {}
                CsvTableSink.java  (1 usage found)
                    44 public class CsvTableSink implements AppendStreamTableSink<Row> {
                OutputFormatTableSink.java  (1 usage found)
                    38 public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
                RetractStreamTableSink.java  (1 usage found)
                    47 public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
                StreamTableSink.java  (1 usage found)
                    33 public interface StreamTableSink<T> extends TableSink<T> {
                UpsertStreamTableSink.java  (1 usage found)
                    58 public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
        flink-table-common  (1 usage found)
            org.apache.flink.table.sinks  (1 usage found)
                TableSinkBase.java  (1 usage found)
                    36 public abstract class TableSinkBase<T> implements TableSink<T> {
        flink-table-planner_2.12  (3 usages found)
            org.apache.flink.table.planner.sinks  (3 usages found)
                CollectTableSink.scala  (2 usages found)
                    34 class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeInformation[T]))
                    80 class CollectRowTableSink extends CollectTableSink[Row](new RowTypeInfo(_: _*))
                DataStreamTableSink.scala  (1 usage found)
                    36 class DataStreamTableSink[T](
    Test  (14 usages found)
        flink-python_2.12  (4 usages found)
            org.apache.flink.table.legacyutils  (4 usages found)
                legacyTestingSinks.scala  (3 usages found)
                    39 private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {
                    69 private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {
                    95 private[flink] class TestUpsertSink(
                TestCollectionTableFactory.scala  (1 usage found)
                    168 class CollectionTableSink(val outputType: RowTypeInfo)
        flink-table-common  (1 usage found)
            org.apache.flink.table.utils  (1 usage found)
                TypeMappingUtilsTest.java  (1 usage found)
                    419 private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> {
        flink-table-planner_2.12  (9 usages found)
            org.apache.flink.table.planner.factories.utils  (1 usage found)
                TestCollectionTableFactory.scala  (1 usage found)
                    152 class CollectionTableSink(val schema: TableSchema)
            org.apache.flink.table.planner.runtime.batch.sql  (1 usage found)
                PartitionableSinkITCase.scala  (1 usage found)
                    331 private class TestSink(
            org.apache.flink.table.planner.runtime.utils  (3 usages found)
                StreamTestSink.scala  (3 usages found)
                    265 final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
                    344 final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[Row] {
                    499 final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink[Row] {
            org.apache.flink.table.planner.utils  (4 usages found)
                MemoryTableSourceSinkUtil.scala  (3 usages found)
                    80 final class UnsafeMemoryAppendTableSink
                    146 final class DataTypeOutputFormatTableSink(
                    181 final class DataTypeAppendStreamTableSink(
                testTableSourceSinks.scala  (1 usage found)
                    1317 class OptionsTableSink(
Implementations of TableSource in  (31 usages found)
    Production  (8 usages found)
        flink-batch-sql-test  (1 usage found)
            org.apache.flink.sql.tests  (1 usage found)
                BatchSQLTestProgram.java  (1 usage found)
                    79 public static class GeneratorTableSource extends InputFormatTableSource<Row> {
        flink-python_2.12  (2 usages found)
            org.apache.flink.table.runtime.arrow.sources  (1 usage found)
                ArrowTableSource.java  (1 usage found)
                    32 public class ArrowTableSource implements StreamTableSource<RowData> {
            org.apache.flink.table.utils.python  (1 usage found)
                PythonInputFormatTableSource.java  (1 usage found)
                    35 public class PythonInputFormatTableSource extends InputFormatTableSource<Row> {
        flink-stream-sql-test_2.12  (1 usage found)
            org.apache.flink.sql.tests  (1 usage found)
                StreamSQLTestProgram.java  (1 usage found)
                    192 public static class GeneratorTableSource
        flink-table-api-java-bridge  (3 usages found)
            org.apache.flink.table.sources  (3 usages found)
                CsvTableSource.java  (1 usage found)
                    62 public class CsvTableSource
                InputFormatTableSource.java  (1 usage found)
                    39 public abstract class InputFormatTableSource<T> implements StreamTableSource<T> {
                StreamTableSource.java  (1 usage found)
                    33 public interface StreamTableSource<T> extends TableSource<T> {
        flink-table-common  (1 usage found)
            org.apache.flink.table.sources  (1 usage found)
                LookupableTableSource.java  (1 usage found)
                    38 public interface LookupableTableSource<T> extends TableSource<T> {
    Test  (23 usages found)
        flink-python_2.12  (1 usage found)
            org.apache.flink.table.legacyutils  (1 usage found)
                TestCollectionTableFactory.scala  (1 usage found)
                    127 class CollectionTableSource(
        flink-sql-client_2.12  (1 usage found)
            org.apache.flink.table.client.gateway.utils  (1 usage found)
                SimpleCatalogFactory.java  (1 usage found)
                    95 new StreamTableSource<Row>() {
        flink-table-api-java  (1 usage found)
            org.apache.flink.table.utils  (1 usage found)
                TableSourceMock.java  (1 usage found)
                    27 public class TableSourceMock implements TableSource<Row> {
        flink-table-common  (1 usage found)
            org.apache.flink.table.utils  (1 usage found)
                TypeMappingUtilsTest.java  (1 usage found)
                    375 private static class TestTableSource
        flink-table-planner_2.12  (19 usages found)
            org.apache.flink.table.planner.factories.utils  (1 usage found)
                TestCollectionTableFactory.scala  (1 usage found)
                    115 class CollectionTableSource(
            org.apache.flink.table.planner.plan.stream.sql.join  (2 usages found)
                LookupJoinTest.scala  (2 usages found)
                    601 class TestTemporalTable(bounded: Boolean = false)
                    667 class TestInvalidTemporalTable private(
            org.apache.flink.table.planner.runtime.utils  (1 usage found)
                InMemoryLookupableTableSource.scala  (1 usage found)
                    50 class InMemoryLookupableTableSource(
            org.apache.flink.table.planner.utils  (15 usages found)
                TableTestBase.scala  (1 usage found)
                    1372 class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
                TestLegacyLimitableTableSource.scala  (1 usage found)
                    44 class TestLegacyLimitableTableSource(
                testTableSourceSinks.scala  (13 usages found)
                    205 class TestTableSourceWithTime[T](
                    316 class TestPreserveWMTableSource[T](
                    344 class TestLegacyProjectableTableSource(
                    407 class TestNestedProjectableTableSource(
                    504 class TestLegacyFilterableTableSource(
                    731 class TestInputFormatTableSource[T](
                    786 class TestDataTypeTableSource(
                    847 class TestDataTypeTableSourceWithTime(
                    926 class TestStreamTableSource(
                    977 class TestFileInputFormatTableSource(
                    1038 class TestPartitionableTableSource(
                    1209 class WithoutTimeAttributesTableSource(bounded: Boolean) extends StreamTableSource[Row] {
                    1297 class OptionsTableSource(

This task can be further splitted in:

  1. Remove all usages of production implementations of TableSink and TableSource. There should be already implementations replacing the legacy ones, except for:
    1. CassandraAppendTableSink. Discuss with cassandra maintainers.
    2. ArrowTableSource and related PythonInputFormatTableSource. Discuss with python maintainers.
  2. Identify which test implementations are still required, and port them to DynamicTableSourceFactory and DynamicTableSinkFactory. Note that some of the test utilities are already provided in the new stack, e.g. CollectionTableSource and GeneratorTableSource sounds like overcomplicated mocks of connectors we already support natively in the new stack
  3. Remove all usages of test implementations of TableSink and TableSource. This will involve removing entire tests, e.g. LegacyTableSinkITCase which is merely testing the old stack.
  4. Cleanup remaining implementations of TableSink and TableSource.

I suggest in step 2 to port the old test implementations of TableSink/TableSource without removing the old ones, in order to simplify the execution of step 3. Then, in step 4, all the code now not referenced anymore, can be removed.

2. Remove TableSink and TableSource

Remove TableSink, TableSource and TableFormatFactory and cleanup all the code relying on these interfaces until the project compiles.

On the road, you should hit the following things to cleanup:

  • Some user facing APIs, notably:
    • Table#insertInto
    • TableEnvironment#insertInto
    • TableEnvironment#fromTableSource
  • Old Operation nodes, e.g. TableSourceQueryOperation, UnregisteredSinkModifyOperation
  • Some code here and there in the sql-to-rel conversion
  • Old conversion rules, e.g. BatchPhysicalLegacySinkRule, PushPartitionIntoLegacyTableSourceScanRule.

3. Remove TableFactory

Remove TableFactory and cleanup all the code relying on it until the project compiles.

Some interfaces are implementing TableFactory for compatibility reason but should not be cleaned up, e.g. CatalogFactory, ModuleFactory.

This involves also cleaning up TableFactoryUtil and TableFactoryService.

4. Old functions stack

* Update Scala table and aggregate functions implicits to new stack.

  • Update Python table and aggregate functions to new stack. Discuss with python maintainers.
  • Update Hive functions to the new stack. Discuss with Hive maintainers.
  • Remove old stack i.e. TableFunctionDefinition, TableEnvironment.registerFunction

5. Remove String Scala Expression stack

This includes all the Table API methods that take strings, e.g. Table.select(String) or fromDataStream(DataStream<T>, String).

This task includes:

  • Reimplement the codegen tests with the new expression stack
  • Python API methods that take expression strings. Discuss with python maintainers.
  • Remove ExpressionParser entrypoint interface, its usages and its implementation.

6. Remove PlannerExpression tree

List of classes:

Subclasses of PlannerExpression in  (56 usages found)
    Production  (56 usages found)
        Unclassified  (56 usages found)
            flink-table-planner_2.12  (56 usages found)
                org.apache.flink.table.planner.expressions  (56 usages found)
                    aggregations.scala  (5 usages found)
                        32 abstract sealed class Aggregation extends PlannerExpression {
                        43 case class ApiResolvedAggregateCallExpression(
                        55 case class DistinctAgg(child: PlannerExpression) extends Aggregation {
                        79 case class Collect(child: PlannerExpression) extends Aggregation  {
                        92 case class AggFunctionCall(
                    call.scala  (6 usages found)
                        38 case class ApiResolvedExpression(resolvedDataType: DataType)
                        51 case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression)
                        71 case class OverCall(
                        166 case class PlannerScalarFunctionCall(
                        206 case class PlannerTableFunctionCall(
                        234 case class ThrowException(msg: PlannerExpression, tp: TypeInformation[_]) extends UnaryExpression {
                    collection.scala  (3 usages found)
                        27 case class ArrayElement(array: PlannerExpression) extends PlannerExpression {
                        47 case class Cardinality(container: PlannerExpression) extends PlannerExpression {
                        64 case class ItemAt(container: PlannerExpression, key: PlannerExpression) extends PlannerExpression {
                    fieldExpression.scala  (12 usages found)
                        32 trait NamedExpression extends PlannerExpression {
                        37 abstract class Attribute extends LeafExpression with NamedExpression {
                        46 case class RexPlannerExpression(
                        54 case class UnresolvedFieldReference(name: String) extends Attribute {
                        68 case class PlannerResolvedFieldReference(
                        83 case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {
                        99 case class TableReference(name: String, tableOperation: QueryOperation)
                        112 abstract class TimeAttribute(val expression: PlannerExpression)
                        119 case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
                        159 case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
                        184 case class StreamRecordTimestamp() extends LeafExpression {
                        194 case class PlannerLocalReference(
                    InputTypeSpec.scala  (1 usage found)
                        29 trait InputTypeSpec extends PlannerExpression {
                    literals.scala  (2 usages found)
                        51 case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
                        80 case class Null(resultType: TypeInformation[_]) extends LeafExpression {
                    ordering.scala  (3 usages found)
                        23 abstract class Ordering extends UnaryExpression {
                        33 case class Asc(child: PlannerExpression) extends Ordering {
                        39 case class Desc(child: PlannerExpression) extends Ordering {
                    overOffsets.scala  (4 usages found)
                        24 case class CurrentRow() extends PlannerExpression {
                        32 case class CurrentRange() extends PlannerExpression {
                        40 case class UnboundedRow() extends PlannerExpression {
                        48 case class UnboundedRange() extends PlannerExpression {
                    PlannerExpression.scala  (3 usages found)
                        74 abstract class BinaryExpression extends PlannerExpression {
                        80 abstract class UnaryExpression extends PlannerExpression {
                        85 abstract class LeafExpression extends PlannerExpression {
                    Reinterpret.scala  (1 usage found)
                        26 case class Reinterpret(child: PlannerExpression, resultType: TypeInformation[_],
                    subquery.scala  (1 usage found)
                        29 case class In(expression: PlannerExpression, elements: Seq[PlannerExpression])
                    symbols.scala  (1 usage found)
                        31 case class SymbolPlannerExpression(symbol: PlannerSymbol) extends LeafExpression {
                    time.scala  (11 usages found)
                        32 case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpression)
                        80 abstract class CurrentTimePoint(
                        106 case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
                        108 case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
                        110 case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
                        112 case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
                        114 case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
                        119 case class TemporalOverlaps(
                        214 case class DateFormat(timestamp: PlannerExpression, format: PlannerExpression)
                        223 case class TimestampDiff(
                        275 case class ToTimestampLtz(
                    windowProperties.scala  (3 usages found)
                        33 abstract class AbstractWindowProperty(child: PlannerExpression)
                        49 case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
                        56 case class WindowEnd(child: PlannerExpression) extends AbstractWindowProperty(child) {

This task includes:

  • Convert functions still creating these PlannerExpression nodes to new type inference, like Extract, CurrentTimestamp, ArrayElement,
  • Remove PlannerExpressionConverter.

7. Pre-FLIP-136 DataStream conversions

  • toAppendStream/toRetractStream/fromDataStream(, Expression)