StreamingJob.java 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.springblade;
  19. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  20. import org.apache.flink.streaming.api.datastream.DataStream;
  21. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  22. import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
  23. import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
  24. import org.slf4j.Logger;
  25. import org.slf4j.LoggerFactory;
  26. import org.springblade.sink.MongoSink;
  27. /**
  28. * Skeleton for a Flink Streaming Job.
  29. *
  30. * <p>For a tutorial how to write a Flink streaming application, check the
  31. * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
  32. *
  33. * <p>To package your application into a JAR file for execution, run
  34. * 'mvn clean package' on the command line.
  35. *
  36. * <p>If you change the name of the main class (with the public static void main(String[] args))
  37. * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
  38. */
  39. public class StreamingJob {
  40. // 创建Logger对象
  41. private static final Logger log = LoggerFactory.getLogger(StreamingJob.class);
  42. public static void main(String[] args) throws Exception {
  43. // set up the streaming execution environment
  44. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  45. /*
  46. * Here, you can start creating your execution plan for Flink.
  47. *
  48. * Start with getting some data from the environment, like
  49. * env.readTextFile(textPath);
  50. *
  51. * then, transform the resulting DataStream<String> using operations
  52. * like
  53. * .filter()
  54. * .flatMap()
  55. * .join()
  56. * .coGroup()
  57. *
  58. * and many more.
  59. * Have a look at the programming guide for the Java API:
  60. *
  61. * http://flink.apache.org/docs/latest/apis/streaming/index.html
  62. *
  63. */
  64. // execute program
  65. final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
  66. .setHost("192.168.1.218")
  67. .setPort(5672)
  68. .setUserName("guest")
  69. .setPassword("guest")
  70. .setVirtualHost("ldt")
  71. .build();
  72. final DataStream<String> stream = env
  73. .addSource(new RMQSource<String>(
  74. connectionConfig, // config for the RabbitMQ connection
  75. "LogDirectQueue", // name of the RabbitMQ queue to consume
  76. true, // use correlation ids; can be false if only at-least-once is required
  77. new SimpleStringSchema())) // deserialization schema to turn messages into Java objects
  78. .setParallelism(1);
  79. stream.addSink(new MongoSink());
  80. stream.print("demo");
  81. log.error("ceshi===========================================================================");
  82. env.execute("Flink Streaming Java API Skeleton");
  83. }
  84. }